diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index 0583c57..e0c1695 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -90,3 +90,11 @@ - **WHEN** Kafka 消息 value 为 JSON 数组(批量心跳) - **THEN** 系统应将数组内每条心跳作为独立项进入批处理队列 +### Requirement: 批量写库容错 +系统 MUST 在批量写库时确保单条失败不影响同批次其他记录的写入。 + +#### Scenario: 单条数据写库失败不影响同批次 +- **WHEN** 批量写库中存在某条记录违反约束或写入失败 +- **THEN** 系统应继续写入同批次其他合法记录 +- **AND** 失败记录应按错误日志规则写入 Redis 项目控制台 + diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 9b20b1b..2d8d0bb 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -425,46 +425,51 @@ class DatabaseManager { 'extra', ]; + const toRowValues = (e) => [ + e.ts_ms, + e.hotel_id, + e.room_id, + e.device_id, + e.ip, + e.power_state, + e.guest_type, + e.cardless_state, + e.service_mask, + e.pms_state, + e.carbon_state, + e.device_count, + e.comm_seq, + Array.isArray(e.elec_address) ? e.elec_address : null, + Array.isArray(e.air_address) ? e.air_address : null, + Array.isArray(e.voltage) ? e.voltage : null, + Array.isArray(e.ampere) ? e.ampere : null, + Array.isArray(e.power) ? e.power : null, + Array.isArray(e.phase) ? e.phase : null, + Array.isArray(e.energy) ? e.energy : null, + Array.isArray(e.sum_energy) ? e.sum_energy : null, + Array.isArray(e.state) ? e.state : null, + Array.isArray(e.model) ? e.model : null, + Array.isArray(e.speed) ? e.speed : null, + Array.isArray(e.set_temp) ? e.set_temp : null, + Array.isArray(e.now_temp) ? e.now_temp : null, + Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, + e.extra ?? null, + ]; + const values = []; const placeholders = events .map((e, rowIndex) => { const base = rowIndex * columns.length; - values.push( - e.ts_ms, - e.hotel_id, - e.room_id, - e.device_id, - e.ip, - e.power_state, - e.guest_type, - e.cardless_state, - e.service_mask, - e.pms_state, - e.carbon_state, - e.device_count, - e.comm_seq, - Array.isArray(e.elec_address) ? e.elec_address : null, - Array.isArray(e.air_address) ? e.air_address : null, - Array.isArray(e.voltage) ? e.voltage : null, - Array.isArray(e.ampere) ? e.ampere : null, - Array.isArray(e.power) ? e.power : null, - Array.isArray(e.phase) ? e.phase : null, - Array.isArray(e.energy) ? e.energy : null, - Array.isArray(e.sum_energy) ? e.sum_energy : null, - Array.isArray(e.state) ? e.state : null, - Array.isArray(e.model) ? e.model : null, - Array.isArray(e.speed) ? e.speed : null, - Array.isArray(e.set_temp) ? e.set_temp : null, - Array.isArray(e.now_temp) ? e.now_temp : null, - Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, - e.extra ?? null - ); + values.push(...toRowValues(e)); const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); return `(${row})`; }) .join(', '); const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; + const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns + .map((_, i) => `$${i + 1}`) + .join(', ')})`; const runInsertOnce = async () => { const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); @@ -518,7 +523,24 @@ class DatabaseManager { } } - throw lastError; + const failedRecords = []; + let insertedCount = 0; + console.error('[db] 批量写入失败,已切换为逐条写入:', lastError); + + for (const event of events) { + try { + await this.pool.query(singleSql, toRowValues(event)); + insertedCount += 1; + } catch (error) { + failedRecords.push({ error, record: event }); + } + } + + if (insertedCount === 0 && failedRecords.length === events.length) { + throw lastError; + } + + return { insertedCount, failedRecords, batchError: lastError }; } async insertHeartbeatData(data) { diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index 8b4bbb6..a32dae8 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -204,18 +204,16 @@ class HeartbeatProcessor { const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount); let insertedCount = 0; + let failedRecords = []; if (typeof this.databaseManager.insertHeartbeatEvents === 'function') { const result = await this.databaseManager.insertHeartbeatEvents(batchData); insertedCount = Number(result?.insertedCount ?? result ?? 0); + failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : []; } else { const result = await this.databaseManager.insertHeartbeatData(batchData); insertedCount = Number(result?.insertedCount ?? result ?? 0); } - if (insertedCount !== batchData.length) { - throw new Error(`落库结果校验失败:expect=${batchData.length} actual=${insertedCount}`); - } - this.batchQueue.splice(0, batchEventCount); this.batchMessageQueue.splice(0, batchMessageCount); @@ -223,8 +221,23 @@ class HeartbeatProcessor { entry.deferred.resolve({ insertedCount: entry.eventCount }); } - this.stats?.incDbWritten?.(batchData.length); - console.log(`成功处理批次数据,共 ${batchData.length} 条`); + if (failedRecords.length > 0) { + for (const item of failedRecords) { + this._emitRejectedRecord({ + errorId: 'db_write_failed', + error: item?.error, + rawData: item?.record, + }); + } + this.stats?.incDbWriteFailed?.(failedRecords.length); + } + this.stats?.incDbWritten?.(insertedCount); + const failedCount = failedRecords.length; + if (failedCount > 0) { + console.log(`批次处理完成:成功 ${insertedCount} 条,失败 ${failedCount} 条`); + } else { + console.log(`成功处理批次数据,共 ${batchData.length} 条`); + } hasMore = this.batchQueue.length > 0; } catch (error) { console.error('批量处理失败:', error); @@ -244,8 +257,11 @@ class HeartbeatProcessor { } _emitDbWriteError(error, rawData) { - if (!this.redis?.isEnabled?.()) return; const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; + if (list.length > 0) { + this.stats?.incDbWriteFailed?.(list.length); + } + if (!this.redis?.isEnabled?.()) return; for (const record of list) { this._emitRejectedRecord({ errorId: 'db_write_failed', diff --git a/src/stats/statsManager.js b/src/stats/statsManager.js index e28b7c6..075d06e 100644 --- a/src/stats/statsManager.js +++ b/src/stats/statsManager.js @@ -1,6 +1,6 @@ class StatsCounters { constructor() { - this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 3); + this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 4); this._minute = new BigInt64Array(this._minuteBuf); } @@ -22,11 +22,18 @@ class StatsCounters { Atomics.add(this._minute, 2, v); } + incDbWriteFailed(n = 1) { + const v = BigInt(Math.max(0, Number(n) || 0)); + if (v === 0n) return; + Atomics.add(this._minute, 3, v); + } + snapshotAndResetMinute() { const dbWritten = Atomics.exchange(this._minute, 0, 0n); const filtered = Atomics.exchange(this._minute, 1, 0n); const kafkaPulled = Atomics.exchange(this._minute, 2, 0n); - return { dbWritten, filtered, kafkaPulled }; + const dbWriteFailed = Atomics.exchange(this._minute, 3, 0n); + return { dbWritten, filtered, kafkaPulled, dbWriteFailed }; } } @@ -50,6 +57,7 @@ class StatsReporter { this.stats = stats; this._timer = null; this._running = false; + this._lastFlushMinute = null; } start() { @@ -68,9 +76,16 @@ class StatsReporter { flushOnce() { if (!this.redis?.isEnabled?.()) return; - const { dbWritten, filtered, kafkaPulled } = this.stats.snapshotAndResetMinute(); + const now = Date.now(); + const minuteKey = Math.floor(now / 60_000); + if (this._lastFlushMinute === minuteKey) { + return; + } + const { dbWritten, filtered, kafkaPulled, dbWriteFailed } = this.stats.snapshotAndResetMinute(); + this._lastFlushMinute = minuteKey; const ts = formatTimestamp(new Date()); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入失败量: ${dbWriteFailed}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}条`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}条`, metadata: { module: 'stats' } }); } diff --git a/test/stats.test.js b/test/stats.test.js index 2116a4d..cc3593f 100644 --- a/test/stats.test.js +++ b/test/stats.test.js @@ -8,16 +8,19 @@ describe('StatsCounters', () => { stats.incDbWritten(3); stats.incFiltered(2); stats.incKafkaPulled(5); + stats.incDbWriteFailed(4); const first = stats.snapshotAndResetMinute(); assert.equal(first.dbWritten, 3n); assert.equal(first.filtered, 2n); assert.equal(first.kafkaPulled, 5n); + assert.equal(first.dbWriteFailed, 4n); const second = stats.snapshotAndResetMinute(); assert.equal(second.dbWritten, 0n); assert.equal(second.filtered, 0n); assert.equal(second.kafkaPulled, 0n); + assert.equal(second.dbWriteFailed, 0n); }); }); @@ -27,6 +30,7 @@ describe('StatsReporter', () => { stats.incDbWritten(7); stats.incFiltered(8); stats.incKafkaPulled(9); + stats.incDbWriteFailed(2); const calls = { push: [] }; const redis = { @@ -39,13 +43,15 @@ describe('StatsReporter', () => { const reporter = new StatsReporter({ redis, stats }); reporter.flushOnce(); - assert.equal(calls.push.length, 3); + assert.equal(calls.push.length, 4); assert.equal(calls.push[0].level, 'info'); assert.equal(calls.push[1].level, 'info'); assert.equal(calls.push[2].level, 'info'); + assert.equal(calls.push[3].level, 'info'); assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/); - assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/); - assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/); + assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入失败量: 2条$/); + assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/); + assert.match(calls.push[3].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/); }); });