feat(processor): 实现批量写库容错机制并添加失败统计

添加数据库批量写入失败处理逻辑,当批量写入失败时自动切换为逐条写入
记录失败数据并统计失败数量,同时更新相关测试和统计模块
符合新增的批量写库容错需求规范
This commit is contained in:
2026-01-20 08:22:55 +08:00
parent 41301f9ce5
commit b90faf4aa4
5 changed files with 111 additions and 44 deletions

View File

@@ -90,3 +90,11 @@
- **WHEN** Kafka 消息 value 为 JSON 数组(批量心跳) - **WHEN** Kafka 消息 value 为 JSON 数组(批量心跳)
- **THEN** 系统应将数组内每条心跳作为独立项进入批处理队列 - **THEN** 系统应将数组内每条心跳作为独立项进入批处理队列
### Requirement: 批量写库容错
系统 MUST 在批量写库时确保单条失败不影响同批次其他记录的写入。
#### Scenario: 单条数据写库失败不影响同批次
- **WHEN** 批量写库中存在某条记录违反约束或写入失败
- **THEN** 系统应继续写入同批次其他合法记录
- **AND** 失败记录应按错误日志规则写入 Redis 项目控制台

View File

@@ -425,11 +425,7 @@ class DatabaseManager {
'extra', 'extra',
]; ];
const values = []; const toRowValues = (e) => [
const placeholders = events
.map((e, rowIndex) => {
const base = rowIndex * columns.length;
values.push(
e.ts_ms, e.ts_ms,
e.hotel_id, e.hotel_id,
e.room_id, e.room_id,
@@ -457,14 +453,23 @@ class DatabaseManager {
Array.isArray(e.set_temp) ? e.set_temp : null, Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null, Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null e.extra ?? null,
); ];
const values = [];
const placeholders = events
.map((e, rowIndex) => {
const base = rowIndex * columns.length;
values.push(...toRowValues(e));
const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', ');
return `(${row})`; return `(${row})`;
}) })
.join(', '); .join(', ');
const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`;
const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns
.map((_, i) => `$${i + 1}`)
.join(', ')})`;
const runInsertOnce = async () => { const runInsertOnce = async () => {
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
@@ -518,9 +523,26 @@ class DatabaseManager {
} }
} }
const failedRecords = [];
let insertedCount = 0;
console.error('[db] 批量写入失败,已切换为逐条写入:', lastError);
for (const event of events) {
try {
await this.pool.query(singleSql, toRowValues(event));
insertedCount += 1;
} catch (error) {
failedRecords.push({ error, record: event });
}
}
if (insertedCount === 0 && failedRecords.length === events.length) {
throw lastError; throw lastError;
} }
return { insertedCount, failedRecords, batchError: lastError };
}
async insertHeartbeatData(data) { async insertHeartbeatData(data) {
try { try {
if (!Array.isArray(data)) { if (!Array.isArray(data)) {

View File

@@ -204,18 +204,16 @@ class HeartbeatProcessor {
const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount); const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount);
let insertedCount = 0; let insertedCount = 0;
let failedRecords = [];
if (typeof this.databaseManager.insertHeartbeatEvents === 'function') { if (typeof this.databaseManager.insertHeartbeatEvents === 'function') {
const result = await this.databaseManager.insertHeartbeatEvents(batchData); const result = await this.databaseManager.insertHeartbeatEvents(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0); insertedCount = Number(result?.insertedCount ?? result ?? 0);
failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : [];
} else { } else {
const result = await this.databaseManager.insertHeartbeatData(batchData); const result = await this.databaseManager.insertHeartbeatData(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0); insertedCount = Number(result?.insertedCount ?? result ?? 0);
} }
if (insertedCount !== batchData.length) {
throw new Error(`落库结果校验失败expect=${batchData.length} actual=${insertedCount}`);
}
this.batchQueue.splice(0, batchEventCount); this.batchQueue.splice(0, batchEventCount);
this.batchMessageQueue.splice(0, batchMessageCount); this.batchMessageQueue.splice(0, batchMessageCount);
@@ -223,8 +221,23 @@ class HeartbeatProcessor {
entry.deferred.resolve({ insertedCount: entry.eventCount }); entry.deferred.resolve({ insertedCount: entry.eventCount });
} }
this.stats?.incDbWritten?.(batchData.length); if (failedRecords.length > 0) {
for (const item of failedRecords) {
this._emitRejectedRecord({
errorId: 'db_write_failed',
error: item?.error,
rawData: item?.record,
});
}
this.stats?.incDbWriteFailed?.(failedRecords.length);
}
this.stats?.incDbWritten?.(insertedCount);
const failedCount = failedRecords.length;
if (failedCount > 0) {
console.log(`批次处理完成:成功 ${insertedCount} 条,失败 ${failedCount}`);
} else {
console.log(`成功处理批次数据,共 ${batchData.length}`); console.log(`成功处理批次数据,共 ${batchData.length}`);
}
hasMore = this.batchQueue.length > 0; hasMore = this.batchQueue.length > 0;
} catch (error) { } catch (error) {
console.error('批量处理失败:', error); console.error('批量处理失败:', error);
@@ -244,8 +257,11 @@ class HeartbeatProcessor {
} }
_emitDbWriteError(error, rawData) { _emitDbWriteError(error, rawData) {
if (!this.redis?.isEnabled?.()) return;
const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : [];
if (list.length > 0) {
this.stats?.incDbWriteFailed?.(list.length);
}
if (!this.redis?.isEnabled?.()) return;
for (const record of list) { for (const record of list) {
this._emitRejectedRecord({ this._emitRejectedRecord({
errorId: 'db_write_failed', errorId: 'db_write_failed',

View File

@@ -1,6 +1,6 @@
class StatsCounters { class StatsCounters {
constructor() { constructor() {
this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 3); this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 4);
this._minute = new BigInt64Array(this._minuteBuf); this._minute = new BigInt64Array(this._minuteBuf);
} }
@@ -22,11 +22,18 @@ class StatsCounters {
Atomics.add(this._minute, 2, v); Atomics.add(this._minute, 2, v);
} }
incDbWriteFailed(n = 1) {
const v = BigInt(Math.max(0, Number(n) || 0));
if (v === 0n) return;
Atomics.add(this._minute, 3, v);
}
snapshotAndResetMinute() { snapshotAndResetMinute() {
const dbWritten = Atomics.exchange(this._minute, 0, 0n); const dbWritten = Atomics.exchange(this._minute, 0, 0n);
const filtered = Atomics.exchange(this._minute, 1, 0n); const filtered = Atomics.exchange(this._minute, 1, 0n);
const kafkaPulled = Atomics.exchange(this._minute, 2, 0n); const kafkaPulled = Atomics.exchange(this._minute, 2, 0n);
return { dbWritten, filtered, kafkaPulled }; const dbWriteFailed = Atomics.exchange(this._minute, 3, 0n);
return { dbWritten, filtered, kafkaPulled, dbWriteFailed };
} }
} }
@@ -50,6 +57,7 @@ class StatsReporter {
this.stats = stats; this.stats = stats;
this._timer = null; this._timer = null;
this._running = false; this._running = false;
this._lastFlushMinute = null;
} }
start() { start() {
@@ -68,9 +76,16 @@ class StatsReporter {
flushOnce() { flushOnce() {
if (!this.redis?.isEnabled?.()) return; if (!this.redis?.isEnabled?.()) return;
const { dbWritten, filtered, kafkaPulled } = this.stats.snapshotAndResetMinute(); const now = Date.now();
const minuteKey = Math.floor(now / 60_000);
if (this._lastFlushMinute === minuteKey) {
return;
}
const { dbWritten, filtered, kafkaPulled, dbWriteFailed } = this.stats.snapshotAndResetMinute();
this._lastFlushMinute = minuteKey;
const ts = formatTimestamp(new Date()); const ts = formatTimestamp(new Date());
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入失败量: ${dbWriteFailed}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}`, metadata: { module: 'stats' } }); this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}`, metadata: { module: 'stats' } });
} }

View File

@@ -8,16 +8,19 @@ describe('StatsCounters', () => {
stats.incDbWritten(3); stats.incDbWritten(3);
stats.incFiltered(2); stats.incFiltered(2);
stats.incKafkaPulled(5); stats.incKafkaPulled(5);
stats.incDbWriteFailed(4);
const first = stats.snapshotAndResetMinute(); const first = stats.snapshotAndResetMinute();
assert.equal(first.dbWritten, 3n); assert.equal(first.dbWritten, 3n);
assert.equal(first.filtered, 2n); assert.equal(first.filtered, 2n);
assert.equal(first.kafkaPulled, 5n); assert.equal(first.kafkaPulled, 5n);
assert.equal(first.dbWriteFailed, 4n);
const second = stats.snapshotAndResetMinute(); const second = stats.snapshotAndResetMinute();
assert.equal(second.dbWritten, 0n); assert.equal(second.dbWritten, 0n);
assert.equal(second.filtered, 0n); assert.equal(second.filtered, 0n);
assert.equal(second.kafkaPulled, 0n); assert.equal(second.kafkaPulled, 0n);
assert.equal(second.dbWriteFailed, 0n);
}); });
}); });
@@ -27,6 +30,7 @@ describe('StatsReporter', () => {
stats.incDbWritten(7); stats.incDbWritten(7);
stats.incFiltered(8); stats.incFiltered(8);
stats.incKafkaPulled(9); stats.incKafkaPulled(9);
stats.incDbWriteFailed(2);
const calls = { push: [] }; const calls = { push: [] };
const redis = { const redis = {
@@ -39,13 +43,15 @@ describe('StatsReporter', () => {
const reporter = new StatsReporter({ redis, stats }); const reporter = new StatsReporter({ redis, stats });
reporter.flushOnce(); reporter.flushOnce();
assert.equal(calls.push.length, 3); assert.equal(calls.push.length, 4);
assert.equal(calls.push[0].level, 'info'); assert.equal(calls.push[0].level, 'info');
assert.equal(calls.push[1].level, 'info'); assert.equal(calls.push[1].level, 'info');
assert.equal(calls.push[2].level, 'info'); assert.equal(calls.push[2].level, 'info');
assert.equal(calls.push[3].level, 'info');
assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/); assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/);
assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/); assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入失败量: 2条$/);
assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/); assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/);
assert.match(calls.push[3].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/);
}); });
}); });