feat: 添加心跳缓冲区批量刷新功能,支持按配置的批量大小分片刷新;更新数据库管理器以处理大批量数据的分片插入;增加相关配置项和测试用例

This commit is contained in:
2026-03-14 17:41:03 +08:00
parent e45d14b720
commit dd75d3c2de
6 changed files with 87 additions and 9 deletions

View File

@@ -12,6 +12,7 @@ export class HeartbeatBuffer {
* @param {Object} options
* @param {number} [options.flushInterval=5000] - flush 间隔 (ms)
* @param {number} [options.maxBufferSize=10000] - 缓冲区上限触发强制 flush
* @param {number} [options.flushBatchSize=5000] - 单次 flush 最大落库条数
* @param {import('../redis/redisIntegration.js').RedisIntegration} [options.redisIntegration]
* @param {import('../utils/metricCollector.js').MetricCollector} [options.metricCollector]
* @param {() => number} [options.now] - 测试用时间函数
@@ -20,6 +21,7 @@ export class HeartbeatBuffer {
this.dbManager = dbManager;
this.flushInterval = options.flushInterval || 5000;
this.maxBufferSize = options.maxBufferSize || 10000;
this.flushBatchSize = options.flushBatchSize > 0 ? options.flushBatchSize : 5000;
this.redisIntegration = options.redisIntegration || null;
this.metricCollector = options.metricCollector || null;
this.now = options.now || (() => Date.now());
@@ -127,9 +129,13 @@ export class HeartbeatBuffer {
if (this.isFlushing) return;
this.isFlushing = true;
const writableEntries = Array.from(this.buffer.entries());
for (const [key] of writableEntries) {
const writableEntries = [];
for (const [key, row] of this.buffer.entries()) {
writableEntries.push([key, row]);
this.buffer.delete(key);
if (writableEntries.length >= this.flushBatchSize) {
break;
}
}
const rows = writableEntries.map(([, row]) => row);
@@ -180,6 +186,8 @@ export class HeartbeatBuffer {
this.isFlushing = false;
if (this.buffer.size >= this.maxBufferSize) {
this.flush();
} else if (this.buffer.size >= this.flushBatchSize) {
this.flush();
} else if (this.buffer.size > 0 && !this.timer) {
this._scheduleFlush(this.flushInterval);
}

View File

@@ -58,6 +58,7 @@ export const config = {
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS, 6),
idleTimeoutMillis: parseNumber(process.env.POSTGRES_IDLE_TIMEOUT_MS_G5, 30000),
upsertChunkSize: parseNumber(process.env.DB_UPSERT_CHUNK_SIZE, 5000),
schema: 'room_status',
table: 'room_status_moment_g5'
},
@@ -72,6 +73,7 @@ export const config = {
},
heartbeatBuffer: {
flushInterval: 5000,
maxBufferSize: 10000
maxBufferSize: 10000,
flushBatchSize: parseNumber(process.env.HEARTBEAT_FLUSH_BATCH_SIZE, 5000)
}
};

View File

@@ -5,6 +5,9 @@ const { Pool } = pg;
const SMALLINT_MIN = -32768;
const SMALLINT_MAX = 32767;
const PARAMS_PER_ROW = 4;
const PG_MAX_BIND_PARAMS = 65535;
const MAX_ROWS_PER_STATEMENT = Math.floor(PG_MAX_BIND_PARAMS / PARAMS_PER_ROW);
const normalizeHotelId = (hotelId) => {
const parsed = Number(hotelId);
@@ -34,25 +37,35 @@ export class HeartbeatDbManager {
this.schema = dbConfig.schema;
this.table = dbConfig.table;
this.fullTableName = `${this.schema}.${this.table}`;
const configuredChunkSize = Number(dbConfig.upsertChunkSize);
const fallbackChunkSize = 5000;
const normalizedChunkSize = Number.isInteger(configuredChunkSize) && configuredChunkSize > 0
? configuredChunkSize
: fallbackChunkSize;
this.upsertChunkSize = Math.min(normalizedChunkSize, MAX_ROWS_PER_STATEMENT);
}
/**
* Batch upsert heartbeat rows.
* ON CONFLICT (hotel_id, room_id) → upsert latest heartbeat only.
* If the row already exists, only overwrite it when EXCLUDED.ts_ms is not older
* than the current row, preventing out-of-order Kafka messages from rolling data back.
* 为避免 PostgreSQL bind 参数上限,按 upsertChunkSize 分片执行。
* @param {Array<{ts_ms: number, hotel_id: string, room_id: string, device_id: string}>} rows
*/
async upsertBatch(rows) {
if (!rows || rows.length === 0) return;
for (let start = 0; start < rows.length; start += this.upsertChunkSize) {
const chunk = rows.slice(start, start + this.upsertChunkSize);
await this._upsertChunk(chunk);
}
}
async _upsertChunk(rows) {
const values = [];
const placeholders = [];
const colsPerRow = 4;
for (let i = 0; i < rows.length; i++) {
const row = rows[i];
const offset = i * colsPerRow;
const offset = i * PARAMS_PER_ROW;
values.push(normalizeHotelId(row.hotel_id), row.room_id, row.device_id, row.ts_ms);
placeholders.push(
`($${offset + 1}::smallint, $${offset + 2}, $${offset + 3}, $${offset + 4}, 1)`
@@ -77,7 +90,8 @@ export class HeartbeatDbManager {
} catch (error) {
logger.error('Database upsert failed', {
error: error?.message,
rowCount: rows.length
rowCount: rows.length,
bindParamCount: values.length
});
throw error;
}

View File

@@ -42,6 +42,7 @@ const bootstrap = async () => {
const heartbeatBuffer = new HeartbeatBuffer(dbManager, {
flushInterval: config.heartbeatBuffer.flushInterval,
maxBufferSize: config.heartbeatBuffer.maxBufferSize,
flushBatchSize: config.heartbeatBuffer.flushBatchSize,
redisIntegration,
metricCollector
});

View File

@@ -86,4 +86,28 @@ describe('HeartbeatBuffer', () => {
expect(rows[0].ts_ms).toBe(3000);
expect(rows[0].device_id).toBe('dev-c');
});
it('should flush in chunks when buffer size exceeds flushBatchSize', async () => {
buffer = new HeartbeatBuffer(dbManager, {
flushInterval: 100000,
maxBufferSize: 99999,
flushBatchSize: 2,
now: () => nowTs
});
buffer.add({ ts_ms: 1000, hotel_id: '1', room_id: '101', device_id: 'dev-a' });
buffer.add({ ts_ms: 2000, hotel_id: '1', room_id: '102', device_id: 'dev-b' });
buffer.add({ ts_ms: 3000, hotel_id: '1', room_id: '103', device_id: 'dev-c' });
await buffer.flush();
expect(dbManager.upsertBatch).toHaveBeenCalledTimes(1);
expect(dbManager.upsertBatch.mock.calls[0][0]).toHaveLength(2);
await buffer.flush();
expect(dbManager.upsertBatch).toHaveBeenCalledTimes(2);
expect(dbManager.upsertBatch.mock.calls[1][0]).toHaveLength(1);
expect(dbManager.upsertBatch.mock.calls[1][0][0].room_id).toBe('103');
});
});

View File

@@ -70,4 +70,33 @@ describe('HeartbeatDbManager', () => {
const [, values] = queryMock.mock.calls[0];
expect(values).toEqual([0, '101', 'dev-a', 1000]);
});
it('should split large batches into multiple upsert queries', async () => {
const { HeartbeatDbManager } = await import('../src/db/heartbeatDbManager.js');
const manager = new HeartbeatDbManager({
host: '127.0.0.1',
port: 5432,
user: 'postgres',
password: 'secret',
database: 'demo',
max: 1,
idleTimeoutMillis: 1000,
upsertChunkSize: 2,
schema: 'room_status',
table: 'room_status_moment_g5'
});
await manager.upsertBatch([
{ hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000 },
{ hotel_id: '1', room_id: '102', device_id: 'dev-b', ts_ms: 1001 },
{ hotel_id: '1', room_id: '103', device_id: 'dev-c', ts_ms: 1002 },
{ hotel_id: '1', room_id: '104', device_id: 'dev-d', ts_ms: 1003 },
{ hotel_id: '1', room_id: '105', device_id: 'dev-e', ts_ms: 1004 }
]);
expect(queryMock).toHaveBeenCalledTimes(3);
expect(queryMock.mock.calls[0][1]).toEqual([1, '101', 'dev-a', 1000, 1, '102', 'dev-b', 1001]);
expect(queryMock.mock.calls[1][1]).toEqual([1, '103', 'dev-c', 1002, 1, '104', 'dev-d', 1003]);
expect(queryMock.mock.calls[2][1]).toEqual([1, '105', 'dev-e', 1004]);
});
});