diff --git a/bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js b/bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js index 5eeb31b..7010e0f 100644 --- a/bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js +++ b/bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js @@ -12,6 +12,7 @@ export class HeartbeatBuffer { * @param {Object} options * @param {number} [options.flushInterval=5000] - flush 间隔 (ms) * @param {number} [options.maxBufferSize=10000] - 缓冲区上限触发强制 flush + * @param {number} [options.flushBatchSize=5000] - 单次 flush 最大落库条数 * @param {import('../redis/redisIntegration.js').RedisIntegration} [options.redisIntegration] * @param {import('../utils/metricCollector.js').MetricCollector} [options.metricCollector] * @param {() => number} [options.now] - 测试用时间函数 @@ -20,6 +21,7 @@ export class HeartbeatBuffer { this.dbManager = dbManager; this.flushInterval = options.flushInterval || 5000; this.maxBufferSize = options.maxBufferSize || 10000; + this.flushBatchSize = options.flushBatchSize > 0 ? options.flushBatchSize : 5000; this.redisIntegration = options.redisIntegration || null; this.metricCollector = options.metricCollector || null; this.now = options.now || (() => Date.now()); @@ -127,9 +129,13 @@ export class HeartbeatBuffer { if (this.isFlushing) return; this.isFlushing = true; - const writableEntries = Array.from(this.buffer.entries()); - for (const [key] of writableEntries) { + const writableEntries = []; + for (const [key, row] of this.buffer.entries()) { + writableEntries.push([key, row]); this.buffer.delete(key); + if (writableEntries.length >= this.flushBatchSize) { + break; + } } const rows = writableEntries.map(([, row]) => row); @@ -180,6 +186,8 @@ export class HeartbeatBuffer { this.isFlushing = false; if (this.buffer.size >= this.maxBufferSize) { this.flush(); + } else if (this.buffer.size >= this.flushBatchSize) { + this.flush(); } else if (this.buffer.size > 0 && !this.timer) { this._scheduleFlush(this.flushInterval); } diff --git a/bls-oldrcu-heartbeat-backend/src/config/config.js b/bls-oldrcu-heartbeat-backend/src/config/config.js index 3ae8c2c..03d9131 100644 --- a/bls-oldrcu-heartbeat-backend/src/config/config.js +++ b/bls-oldrcu-heartbeat-backend/src/config/config.js @@ -58,6 +58,7 @@ export const config = { database: process.env.POSTGRES_DATABASE_G5, max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS, 6), idleTimeoutMillis: parseNumber(process.env.POSTGRES_IDLE_TIMEOUT_MS_G5, 30000), + upsertChunkSize: parseNumber(process.env.DB_UPSERT_CHUNK_SIZE, 5000), schema: 'room_status', table: 'room_status_moment_g5' }, @@ -72,6 +73,7 @@ export const config = { }, heartbeatBuffer: { flushInterval: 5000, - maxBufferSize: 10000 + maxBufferSize: 10000, + flushBatchSize: parseNumber(process.env.HEARTBEAT_FLUSH_BATCH_SIZE, 5000) } }; diff --git a/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js b/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js index d6a490c..ebc56a2 100644 --- a/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js +++ b/bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js @@ -5,6 +5,9 @@ const { Pool } = pg; const SMALLINT_MIN = -32768; const SMALLINT_MAX = 32767; +const PARAMS_PER_ROW = 4; +const PG_MAX_BIND_PARAMS = 65535; +const MAX_ROWS_PER_STATEMENT = Math.floor(PG_MAX_BIND_PARAMS / PARAMS_PER_ROW); const normalizeHotelId = (hotelId) => { const parsed = Number(hotelId); @@ -34,25 +37,35 @@ export class HeartbeatDbManager { this.schema = dbConfig.schema; this.table = dbConfig.table; this.fullTableName = `${this.schema}.${this.table}`; + const configuredChunkSize = Number(dbConfig.upsertChunkSize); + const fallbackChunkSize = 5000; + const normalizedChunkSize = Number.isInteger(configuredChunkSize) && configuredChunkSize > 0 + ? configuredChunkSize + : fallbackChunkSize; + this.upsertChunkSize = Math.min(normalizedChunkSize, MAX_ROWS_PER_STATEMENT); } /** * Batch upsert heartbeat rows. - * ON CONFLICT (hotel_id, room_id) → upsert latest heartbeat only. - * If the row already exists, only overwrite it when EXCLUDED.ts_ms is not older - * than the current row, preventing out-of-order Kafka messages from rolling data back. + * 为避免 PostgreSQL bind 参数上限,按 upsertChunkSize 分片执行。 * @param {Array<{ts_ms: number, hotel_id: string, room_id: string, device_id: string}>} rows */ async upsertBatch(rows) { if (!rows || rows.length === 0) return; + for (let start = 0; start < rows.length; start += this.upsertChunkSize) { + const chunk = rows.slice(start, start + this.upsertChunkSize); + await this._upsertChunk(chunk); + } + } + + async _upsertChunk(rows) { const values = []; const placeholders = []; - const colsPerRow = 4; for (let i = 0; i < rows.length; i++) { const row = rows[i]; - const offset = i * colsPerRow; + const offset = i * PARAMS_PER_ROW; values.push(normalizeHotelId(row.hotel_id), row.room_id, row.device_id, row.ts_ms); placeholders.push( `($${offset + 1}::smallint, $${offset + 2}, $${offset + 3}, $${offset + 4}, 1)` @@ -77,7 +90,8 @@ export class HeartbeatDbManager { } catch (error) { logger.error('Database upsert failed', { error: error?.message, - rowCount: rows.length + rowCount: rows.length, + bindParamCount: values.length }); throw error; } diff --git a/bls-oldrcu-heartbeat-backend/src/index.js b/bls-oldrcu-heartbeat-backend/src/index.js index 3877d12..ccbc8d3 100644 --- a/bls-oldrcu-heartbeat-backend/src/index.js +++ b/bls-oldrcu-heartbeat-backend/src/index.js @@ -42,6 +42,7 @@ const bootstrap = async () => { const heartbeatBuffer = new HeartbeatBuffer(dbManager, { flushInterval: config.heartbeatBuffer.flushInterval, maxBufferSize: config.heartbeatBuffer.maxBufferSize, + flushBatchSize: config.heartbeatBuffer.flushBatchSize, redisIntegration, metricCollector }); diff --git a/bls-oldrcu-heartbeat-backend/tests/heartbeat_buffer.test.js b/bls-oldrcu-heartbeat-backend/tests/heartbeat_buffer.test.js index cf55800..cc8fdd1 100644 --- a/bls-oldrcu-heartbeat-backend/tests/heartbeat_buffer.test.js +++ b/bls-oldrcu-heartbeat-backend/tests/heartbeat_buffer.test.js @@ -86,4 +86,28 @@ describe('HeartbeatBuffer', () => { expect(rows[0].ts_ms).toBe(3000); expect(rows[0].device_id).toBe('dev-c'); }); + + it('should flush in chunks when buffer size exceeds flushBatchSize', async () => { + buffer = new HeartbeatBuffer(dbManager, { + flushInterval: 100000, + maxBufferSize: 99999, + flushBatchSize: 2, + now: () => nowTs + }); + + buffer.add({ ts_ms: 1000, hotel_id: '1', room_id: '101', device_id: 'dev-a' }); + buffer.add({ ts_ms: 2000, hotel_id: '1', room_id: '102', device_id: 'dev-b' }); + buffer.add({ ts_ms: 3000, hotel_id: '1', room_id: '103', device_id: 'dev-c' }); + + await buffer.flush(); + + expect(dbManager.upsertBatch).toHaveBeenCalledTimes(1); + expect(dbManager.upsertBatch.mock.calls[0][0]).toHaveLength(2); + + await buffer.flush(); + + expect(dbManager.upsertBatch).toHaveBeenCalledTimes(2); + expect(dbManager.upsertBatch.mock.calls[1][0]).toHaveLength(1); + expect(dbManager.upsertBatch.mock.calls[1][0][0].room_id).toBe('103'); + }); }); diff --git a/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js b/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js index 191dcbf..d464e7b 100644 --- a/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js +++ b/bls-oldrcu-heartbeat-backend/tests/heartbeat_db_manager.test.js @@ -70,4 +70,33 @@ describe('HeartbeatDbManager', () => { const [, values] = queryMock.mock.calls[0]; expect(values).toEqual([0, '101', 'dev-a', 1000]); }); + + it('should split large batches into multiple upsert queries', async () => { + const { HeartbeatDbManager } = await import('../src/db/heartbeatDbManager.js'); + const manager = new HeartbeatDbManager({ + host: '127.0.0.1', + port: 5432, + user: 'postgres', + password: 'secret', + database: 'demo', + max: 1, + idleTimeoutMillis: 1000, + upsertChunkSize: 2, + schema: 'room_status', + table: 'room_status_moment_g5' + }); + + await manager.upsertBatch([ + { hotel_id: '1', room_id: '101', device_id: 'dev-a', ts_ms: 1000 }, + { hotel_id: '1', room_id: '102', device_id: 'dev-b', ts_ms: 1001 }, + { hotel_id: '1', room_id: '103', device_id: 'dev-c', ts_ms: 1002 }, + { hotel_id: '1', room_id: '104', device_id: 'dev-d', ts_ms: 1003 }, + { hotel_id: '1', room_id: '105', device_id: 'dev-e', ts_ms: 1004 } + ]); + + expect(queryMock).toHaveBeenCalledTimes(3); + expect(queryMock.mock.calls[0][1]).toEqual([1, '101', 'dev-a', 1000, 1, '102', 'dev-b', 1001]); + expect(queryMock.mock.calls[1][1]).toEqual([1, '103', 'dev-c', 1002, 1, '104', 'dev-d', 1003]); + expect(queryMock.mock.calls[2][1]).toEqual([1, '105', 'dev-e', 1004]); + }); }); \ No newline at end of file