diff --git a/.gitignore b/.gitignore index 2ea494c..87fd898 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ coverage/ *.temp .cache/ release +docs/room_status_moment.sql diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 71eda82..9ff67ec 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -42,6 +42,20 @@ class DatabaseManager { } } + async checkConnection() { + if (!this.pool) return false; + let client; + try { + client = await this.pool.connect(); + await client.query('SELECT 1'); + return true; + } catch (e) { + return false; + } finally { + if (client) client.release(); + } + } + async disconnect() { try { this.stopPartitionMaintenance(); diff --git a/src/index.js b/src/index.js index a2add06..223bbff 100644 --- a/src/index.js +++ b/src/index.js @@ -48,6 +48,16 @@ class WebBLSHeartbeatServer { this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, { redis: this.redis, stats: this.stats, + onDbOffline: () => { + if (this.consumers) { + this.consumers.forEach(c => c.consumer.pause()); + } + }, + onDbOnline: () => { + if (this.consumers) { + this.consumers.forEach(c => c.consumer.resume()); + } + } }); // 在单进程内启动 N 个消费者实例(与分区数匹配) diff --git a/src/kafka/consumer.js b/src/kafka/consumer.js index f6bc571..d0d53d6 100644 --- a/src/kafka/consumer.js +++ b/src/kafka/consumer.js @@ -14,7 +14,9 @@ class KafkaConsumer { this._reconnectAttempts = 0; this._inFlight = new Set(); - this._paused = false; + this._paused = false; // 实际 Kafka 暂停状态 + this._isExternalPaused = false; // 外部手动暂停标志 + this._isBackpressurePaused = false; // 背压暂停标志 } async connect() { @@ -116,6 +118,16 @@ class KafkaConsumer { await Promise.allSettled(Array.from(this._inFlight)); } + pause() { + this._isExternalPaused = true; + this._updatePauseState(); + } + + resume() { + this._isExternalPaused = false; + this._updatePauseState(); + } + getTopics() { const topics = this.config?.topics; if (Array.isArray(topics) && topics.length) { @@ -168,18 +180,24 @@ class KafkaConsumer { const max = Number(this.config?.maxInFlightMessages ?? 0); if (!Number.isFinite(max) || max <= 0) return; - const shouldPause = this._inFlight.size >= max; + this._isBackpressurePaused = this._inFlight.size >= max; + this._updatePauseState(); + } + + _updatePauseState() { + if (!this.consumerGroupStream) return; + + const shouldPause = this._isExternalPaused || this._isBackpressurePaused; + if (shouldPause && !this._paused) { this.consumerGroupStream.pause(); this._paused = true; - console.warn(`[kafka] paused: inFlight=${this._inFlight.size} max=${max}`); - return; - } - - if (!shouldPause && this._paused) { + const reason = this._isExternalPaused ? 'external' : 'backpressure'; + console.warn(`[kafka] paused: reason=${reason} inFlight=${this._inFlight.size}`); + } else if (!shouldPause && this._paused) { this.consumerGroupStream.resume(); this._paused = false; - console.warn(`[kafka] resumed: inFlight=${this._inFlight.size} max=${max}`); + console.warn(`[kafka] resumed`); } } diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index e44792d..0f24dd6 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -12,6 +12,10 @@ class HeartbeatProcessor { this.batchMessageQueue = []; this.batchTimer = null; this._batchInFlight = false; + + this.onDbOffline = deps?.onDbOffline; + this.onDbOnline = deps?.onDbOnline; + this._dbOffline = false; } async processMessage(message) { @@ -186,6 +190,7 @@ class HeartbeatProcessor { async processBatch() { if (this._batchInFlight) return; + if (this._dbOffline) return; if (this.batchQueue.length === 0) return; if (this.batchMessageQueue.length === 0) return; @@ -234,21 +239,44 @@ class HeartbeatProcessor { this.stats?.incDbWritten?.(insertedCount); const failedCount = failedRecords.length; if (failedCount > 0) { - console.log(`批次处理完成:成功 ${insertedCount} 条,失败 ${failedCount} 条`); - } else { - console.log(`成功处理批次数据,共 ${batchData.length} 条`); + console.warn(`批次处理部分失败:成功 ${insertedCount} 条,失败 ${failedCount} 条`); } hasMore = this.batchQueue.length > 0; } catch (error) { console.error('批量处理失败:', error); - this._emitDbWriteError(error, batchData); - if (!this.batchTimer) { - const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000)); - this.batchTimer = setTimeout(() => this.processBatch(), retryDelay); + + if (this._isConnectionError(error)) { + if (!this._dbOffline) { + this._dbOffline = true; + console.error('数据库连接断开,暂停拉取并开始每分钟检查'); + this.onDbOffline?.(); + } + this._emitDbWriteError(error, batchData); + this._scheduleDbCheck(); + return; } + + // 非连接错误(可能是严重的数据错误或逻辑错误): + // 不应无限重试,否则会阻塞后续消费。 + // 记录错误后,将本批次标记为已处理(失败),并移除出队列,允许提交Offset。 + console.error('批量处理遇到严重错误(非连接错误),放弃本批次数据并提交Offset:', error); + this._emitDbWriteError(error, batchData); + + // 1. 从队列中移除本批次数据 + this.batchQueue.splice(0, batchEventCount); + this.batchMessageQueue.splice(0, batchMessageCount); + + // 2. 通知 Consumer 处理结束(虽然是0插入),使其能提交 Offset + for (const entry of batchMessages) { + entry.deferred.resolve({ insertedCount: 0 }); + } + + // 不需要手动设置 batchTimer,finally 块会根据剩余队列情况调度后续处理 } finally { this._batchInFlight = false; - if (hasMore && this.shouldFlushNow()) { + if (this._dbOffline) { + // do nothing, timer is managed by _scheduleDbCheck + } else if (hasMore && this.shouldFlushNow()) { setImmediate(() => this.processBatch()); } else if (!this.batchTimer && this.batchQueue.length > 0) { this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout); @@ -256,6 +284,43 @@ class HeartbeatProcessor { } } + _isConnectionError(error) { + if (!error) return false; + const code = error.code; + const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007']; + if (connCodes.includes(code)) return true; + if (error.message && ( + error.message.includes('ECONNREFUSED') || + error.message.includes('ETIMEDOUT') || + error.message.includes('connection') + )) { + return true; + } + return false; + } + + _scheduleDbCheck() { + if (this.batchTimer) clearTimeout(this.batchTimer); + this.batchTimer = setTimeout(async () => { + console.log('正在检查数据库连接...'); + try { + const isUp = await this.databaseManager.checkConnection(); + if (isUp) { + console.log('数据库连接已恢复'); + this._dbOffline = false; + this.onDbOnline?.(); + this.processBatch(); + } else { + console.warn('数据库仍离线,1分钟后重试...'); + this._scheduleDbCheck(); + } + } catch (err) { + console.warn('数据库检查异常:', err); + this._scheduleDbCheck(); + } + }, 60000); + } + _emitDbWriteError(error, rawData) { const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; if (list.length > 0) {