feat(processor): 增加数据库连接断开检测与自动恢复机制

- 在 DatabaseManager 中添加 checkConnection 方法用于检测数据库连接状态
- 当数据库连接断开时,自动暂停 Kafka 消费者,防止消息堆积
- 实现每分钟数据库连接检查,连接恢复后自动恢复消费者处理
- 区分数据库连接错误和其他严重错误,连接错误时保留数据等待重试
- 在 .gitignore 中添加 SQL 文件排除
This commit is contained in:
2026-02-06 13:37:46 +08:00
parent d10bedb7e7
commit b72cdde8bf
5 changed files with 124 additions and 16 deletions

1
.gitignore vendored
View File

@@ -37,3 +37,4 @@ coverage/
*.temp *.temp
.cache/ .cache/
release release
docs/room_status_moment.sql

View File

@@ -42,6 +42,20 @@ class DatabaseManager {
} }
} }
async checkConnection() {
if (!this.pool) return false;
let client;
try {
client = await this.pool.connect();
await client.query('SELECT 1');
return true;
} catch (e) {
return false;
} finally {
if (client) client.release();
}
}
async disconnect() { async disconnect() {
try { try {
this.stopPartitionMaintenance(); this.stopPartitionMaintenance();

View File

@@ -48,6 +48,16 @@ class WebBLSHeartbeatServer {
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, { this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, {
redis: this.redis, redis: this.redis,
stats: this.stats, stats: this.stats,
onDbOffline: () => {
if (this.consumers) {
this.consumers.forEach(c => c.consumer.pause());
}
},
onDbOnline: () => {
if (this.consumers) {
this.consumers.forEach(c => c.consumer.resume());
}
}
}); });
// 在单进程内启动 N 个消费者实例(与分区数匹配) // 在单进程内启动 N 个消费者实例(与分区数匹配)

View File

@@ -14,7 +14,9 @@ class KafkaConsumer {
this._reconnectAttempts = 0; this._reconnectAttempts = 0;
this._inFlight = new Set(); this._inFlight = new Set();
this._paused = false; this._paused = false; // 实际 Kafka 暂停状态
this._isExternalPaused = false; // 外部手动暂停标志
this._isBackpressurePaused = false; // 背压暂停标志
} }
async connect() { async connect() {
@@ -116,6 +118,16 @@ class KafkaConsumer {
await Promise.allSettled(Array.from(this._inFlight)); await Promise.allSettled(Array.from(this._inFlight));
} }
pause() {
this._isExternalPaused = true;
this._updatePauseState();
}
resume() {
this._isExternalPaused = false;
this._updatePauseState();
}
getTopics() { getTopics() {
const topics = this.config?.topics; const topics = this.config?.topics;
if (Array.isArray(topics) && topics.length) { if (Array.isArray(topics) && topics.length) {
@@ -168,18 +180,24 @@ class KafkaConsumer {
const max = Number(this.config?.maxInFlightMessages ?? 0); const max = Number(this.config?.maxInFlightMessages ?? 0);
if (!Number.isFinite(max) || max <= 0) return; if (!Number.isFinite(max) || max <= 0) return;
const shouldPause = this._inFlight.size >= max; this._isBackpressurePaused = this._inFlight.size >= max;
this._updatePauseState();
}
_updatePauseState() {
if (!this.consumerGroupStream) return;
const shouldPause = this._isExternalPaused || this._isBackpressurePaused;
if (shouldPause && !this._paused) { if (shouldPause && !this._paused) {
this.consumerGroupStream.pause(); this.consumerGroupStream.pause();
this._paused = true; this._paused = true;
console.warn(`[kafka] paused: inFlight=${this._inFlight.size} max=${max}`); const reason = this._isExternalPaused ? 'external' : 'backpressure';
return; console.warn(`[kafka] paused: reason=${reason} inFlight=${this._inFlight.size}`);
} } else if (!shouldPause && this._paused) {
if (!shouldPause && this._paused) {
this.consumerGroupStream.resume(); this.consumerGroupStream.resume();
this._paused = false; this._paused = false;
console.warn(`[kafka] resumed: inFlight=${this._inFlight.size} max=${max}`); console.warn(`[kafka] resumed`);
} }
} }

View File

@@ -12,6 +12,10 @@ class HeartbeatProcessor {
this.batchMessageQueue = []; this.batchMessageQueue = [];
this.batchTimer = null; this.batchTimer = null;
this._batchInFlight = false; this._batchInFlight = false;
this.onDbOffline = deps?.onDbOffline;
this.onDbOnline = deps?.onDbOnline;
this._dbOffline = false;
} }
async processMessage(message) { async processMessage(message) {
@@ -186,6 +190,7 @@ class HeartbeatProcessor {
async processBatch() { async processBatch() {
if (this._batchInFlight) return; if (this._batchInFlight) return;
if (this._dbOffline) return;
if (this.batchQueue.length === 0) return; if (this.batchQueue.length === 0) return;
if (this.batchMessageQueue.length === 0) return; if (this.batchMessageQueue.length === 0) return;
@@ -234,21 +239,44 @@ class HeartbeatProcessor {
this.stats?.incDbWritten?.(insertedCount); this.stats?.incDbWritten?.(insertedCount);
const failedCount = failedRecords.length; const failedCount = failedRecords.length;
if (failedCount > 0) { if (failedCount > 0) {
console.log(`批次处理完成:成功 ${insertedCount} 条,失败 ${failedCount}`); console.warn(`批次处理部分失败:成功 ${insertedCount} 条,失败 ${failedCount}`);
} else {
console.log(`成功处理批次数据,共 ${batchData.length}`);
} }
hasMore = this.batchQueue.length > 0; hasMore = this.batchQueue.length > 0;
} catch (error) { } catch (error) {
console.error('批量处理失败:', error); console.error('批量处理失败:', error);
this._emitDbWriteError(error, batchData);
if (!this.batchTimer) { if (this._isConnectionError(error)) {
const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000)); if (!this._dbOffline) {
this.batchTimer = setTimeout(() => this.processBatch(), retryDelay); this._dbOffline = true;
console.error('数据库连接断开,暂停拉取并开始每分钟检查');
this.onDbOffline?.();
}
this._emitDbWriteError(error, batchData);
this._scheduleDbCheck();
return;
} }
// 非连接错误(可能是严重的数据错误或逻辑错误):
// 不应无限重试,否则会阻塞后续消费。
// 记录错误后将本批次标记为已处理失败并移除出队列允许提交Offset。
console.error('批量处理遇到严重错误非连接错误放弃本批次数据并提交Offset:', error);
this._emitDbWriteError(error, batchData);
// 1. 从队列中移除本批次数据
this.batchQueue.splice(0, batchEventCount);
this.batchMessageQueue.splice(0, batchMessageCount);
// 2. 通知 Consumer 处理结束虽然是0插入使其能提交 Offset
for (const entry of batchMessages) {
entry.deferred.resolve({ insertedCount: 0 });
}
// 不需要手动设置 batchTimerfinally 块会根据剩余队列情况调度后续处理
} finally { } finally {
this._batchInFlight = false; this._batchInFlight = false;
if (hasMore && this.shouldFlushNow()) { if (this._dbOffline) {
// do nothing, timer is managed by _scheduleDbCheck
} else if (hasMore && this.shouldFlushNow()) {
setImmediate(() => this.processBatch()); setImmediate(() => this.processBatch());
} else if (!this.batchTimer && this.batchQueue.length > 0) { } else if (!this.batchTimer && this.batchQueue.length > 0) {
this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout); this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout);
@@ -256,6 +284,43 @@ class HeartbeatProcessor {
} }
} }
_isConnectionError(error) {
if (!error) return false;
const code = error.code;
const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007'];
if (connCodes.includes(code)) return true;
if (error.message && (
error.message.includes('ECONNREFUSED') ||
error.message.includes('ETIMEDOUT') ||
error.message.includes('connection')
)) {
return true;
}
return false;
}
_scheduleDbCheck() {
if (this.batchTimer) clearTimeout(this.batchTimer);
this.batchTimer = setTimeout(async () => {
console.log('正在检查数据库连接...');
try {
const isUp = await this.databaseManager.checkConnection();
if (isUp) {
console.log('数据库连接已恢复');
this._dbOffline = false;
this.onDbOnline?.();
this.processBatch();
} else {
console.warn('数据库仍离线1分钟后重试...');
this._scheduleDbCheck();
}
} catch (err) {
console.warn('数据库检查异常:', err);
this._scheduleDbCheck();
}
}, 60000);
}
_emitDbWriteError(error, rawData) { _emitDbWriteError(error, rawData) {
const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : [];
if (list.length > 0) { if (list.length > 0) {