import assert from 'node:assert/strict'; import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; import { DatabaseManager } from '../src/db/databaseManager.js'; // ---- 辅助 ---- const buildBasePayload = () => ({ ts_ms: 1700000000123, hotel_id: 1, room_id: '101', device_id: 'dev1', ip: '10.0.0.1', power_state: 1, guest_type: 0, cardless_state: 0, service_mask: 7, pms_state: 1, carbon_state: 0, device_count: 1, comm_seq: 1, }); function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false, g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false, g5Enabled = false, g5Success = true, g5Conn = false, insertedCount = 1, failedRecords = [] } = {}) { return { legacy: { enabled: legacyEnabled, success: legacySuccess, insertedCount: legacySuccess ? insertedCount : 0, failedRecords: legacySuccess ? [] : failedRecords, error: legacySuccess ? null : new Error('legacy fail'), isConnectionError: legacyConn, batchError: legacySuccess ? null : new Error('legacy fail'), }, g4Hot: { enabled: g4HotEnabled, success: g4HotSuccess, insertedCount: g4HotSuccess ? insertedCount : 0, failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'), isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'), }, g5: { enabled: g5Enabled, success: g5Success, insertedCount: g5Success ? insertedCount : 0, failedRecords: g5Success ? [] : failedRecords, error: g5Success ? null : new Error('g5 fail'), isConnectionError: g5Conn, batchError: g5Success ? null : new Error('g5 fail'), }, }; } function buildMockDb(overrides = {}) { return { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true, legacyTable: 'heartbeat.heartbeat_events', g4HotTable: 'heartbeat.heartbeat_events_g4_hot', roomStatusTable: 'room_status.room_status_moment', ...overrides.config, }, insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()), insertHeartbeatEventsErrors: overrides.insertHeartbeatEventsErrors ?? (async () => {}), upsertRoomStatus: overrides.upsertRoomStatus ?? (async () => ({ rowCount: 1 })), insertHeartbeatEvents: overrides.insertHeartbeatEvents ?? (async () => ({ insertedCount: 1 })), checkConnection: overrides.checkConnection ?? (async () => true), }; } function buildMockG5Db(overrides = {}) { return { config: { enabled: true, g5HeartbeatEnabled: true, g5Table: 'heartbeat.heartbeat_events_g5', roomStatusEnabled: true, roomStatusTable: 'room_status.room_status_moment_g5', ...overrides.config, }, insertHeartbeatEventsG5: overrides.insertHeartbeatEventsG5 ?? (async () => ({ enabled: true, success: true, insertedCount: 1, failedRecords: [], error: null, isConnectionError: false, batchError: null, })), upsertRoomStatusG5: overrides.upsertRoomStatusG5 ?? (async () => ({ rowCount: 1 })), _isDbConnectionError: overrides._isDbConnectionError ?? (() => false), }; } function buildProcessor(dbOverrides = {}, processorConfig = {}, g5Overrides = null) { const db = buildMockDb(dbOverrides); const deps = {}; if (g5Overrides) { deps.g5DatabaseManager = buildMockG5Db(g5Overrides); } return new HeartbeatProcessor( { batchSize: 1, batchTimeout: 1000, ...processorConfig }, db, deps ); } // ---- 测试 ---- describe('Dual-write: 仅旧表开启', () => { it('should call insertHeartbeatEventsDual and succeed', async () => { let captured = null; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false }, insertHeartbeatEventsDual: async (events) => { captured = events; return makeDualResult({ legacyEnabled: true, g4HotEnabled: false }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; const res = await processor.processMessage(msg); assert.deepEqual(res, { insertedCount: 1 }); assert.ok(Array.isArray(captured)); assert.equal(captured.length, 1); }); }); describe('Dual-write: 仅新表开启', () => { it('should call insertHeartbeatEventsDual with g4hot enabled', async () => { let dualCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async (events) => { dualCalled = true; return makeDualResult({ legacyEnabled: false, g4HotEnabled: true }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.ok(dualCalled); }); }); describe('Dual-write: 旧新双开', () => { it('should call insertHeartbeatEventsDual with both enabled', async () => { let dualCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => { dualCalled = true; return makeDualResult({ legacyEnabled: true, g4HotEnabled: true }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.ok(dualCalled); }); }); describe('Dual-write: 旧新双关,room_status 仍执行', () => { it('should still call upsertRoomStatus when both detail writes are off', async () => { let roomStatusCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); // 等待异步 room_status 调用完成 await new Promise(r => setTimeout(r, 50)); assert.ok(roomStatusCalled); }); }); describe('Dual-write: 旧成功新失败', () => { it('should continue and only write g4hot failures to error table', async () => { let errorTablePayload = null; const failedRecord = { error: new Error('g4hot column mismatch'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: true, g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], }), insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.ok(errorTablePayload); assert.equal(errorTablePayload.length, 1); assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); }); }); describe('Dual-write: 旧失败新成功', () => { it('should NOT write legacy failures to error table', async () => { let errorTableCalled = false; const failedRecord = { error: new Error('legacy column mismatch'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], g4HotEnabled: true, g4HotSuccess: true, }), insertHeartbeatEventsErrors: async () => { errorTableCalled = true; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.equal(errorTableCalled, false); }); }); describe('Dual-write: 双失败', () => { it('should only write g4hot failures to error table, not legacy failures', async () => { let errorTablePayload = null; const failedRecord = { error: new Error('fail'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], }), insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); // 错误表只包含 g4hot 失败 assert.ok(errorTablePayload); assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); }); }); describe('Dual-write: room_status 始终执行', () => { it('should call upsertRoomStatus regardless of detail write success/failure', async () => { let roomStatusCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [{ error: new Error('fail'), record: buildBasePayload() }], }), upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.ok(roomStatusCalled); }); }); describe('RoomStatus dual-write', () => { it('should write old and g5 room_status independently', async () => { let oldCalled = false; let g5Called = false; const processor = buildProcessor( { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; }, }, {}, { config: { roomStatusEnabled: true }, upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; }, } ); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.equal(oldCalled, true); assert.equal(g5Called, true); }); it('should allow old room_status off while g5 room_status stays on', async () => { let oldCalled = false; let g5Called = false; const processor = buildProcessor( { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: false }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; }, }, {}, { config: { roomStatusEnabled: true }, upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; }, } ); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.equal(oldCalled, false); assert.equal(g5Called, true); }); }); describe('G5-write: 独立写库', () => { it('should write to g5 independently after legacy/g4 succeed', async () => { let g5Captured = null; const processor = buildProcessor( { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: true }), }, {}, { insertHeartbeatEventsG5: async (events) => { g5Captured = events; return { enabled: true, success: true, insertedCount: events.length, failedRecords: [], error: null, isConnectionError: false, batchError: null, }; }, } ); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; const res = await processor.processMessage(msg); assert.deepEqual(res, { insertedCount: 1 }); assert.ok(Array.isArray(g5Captured)); assert.equal(g5Captured.length, 1); }); it('should not block main flow when g5 write fails', async () => { const processor = buildProcessor( { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }), }, {}, { insertHeartbeatEventsG5: async () => ({ enabled: true, success: false, insertedCount: 0, failedRecords: [{ error: new Error('g5 fail'), record: buildBasePayload() }], error: new Error('g5 fail'), isConnectionError: false, batchError: new Error('g5 fail'), }), } ); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; const res = await processor.processMessage(msg); assert.deepEqual(res, { insertedCount: 1 }); }); }); describe('Dual-write: 暂停消费策略', () => { it('should NOT pause when both detail writes disabled but room_status enabled', async () => { let paused = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), }); processor.onDbOffline = () => { paused = true; }; const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.equal(paused, false); }); it('should pause when only legacy enabled and legacy has connection error', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: true }, { enabled: false, isConnectionError: false } ); assert.ok(result); }); it('should NOT pause in dual-write when only g4hot has connection error', async () => { let paused = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: true, g4HotEnabled: true, g4HotSuccess: false, g4HotConn: true, }), }); processor.onDbOffline = () => { paused = true; }; const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.equal(paused, false); }); it('should pause in dual-write when both have connection errors', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: true }, { enabled: true, isConnectionError: true } ); assert.ok(result); }); it('should NOT pause when only g4hot has connection error in dual mode', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: false }, { enabled: true, isConnectionError: true } ); assert.equal(result, false); }); it('should pause when only g4hot enabled and g4hot has connection error', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: false, isConnectionError: false }, { enabled: true, isConnectionError: true } ); assert.ok(result); }); }); describe('DatabaseManager: _g4HotToRowValues', () => { it('unpacks svc booleans from service_mask', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), service_mask: 7 }; // bits 0,1,2 set const values = dm._g4HotToRowValues(e); const cols = dm._getG4HotColumns(); const svc01Idx = cols.indexOf('svc_01'); assert.equal(values[svc01Idx], true); // bit 0 assert.equal(values[svc01Idx + 1], true); // bit 1 (svc_02) assert.equal(values[svc01Idx + 2], true); // bit 2 (svc_03) assert.equal(values[svc01Idx + 3], false); // bit 3 (svc_04) assert.equal(values[svc01Idx + 63], false); // svc_64 }); it('unpacks array fields into _1, _2, _residual', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), air_address: ['ac1', 'ac2', 'ac3'], state: [1, 2, 3], elec_address: ['e1'], voltage: [220.5], }; const values = dm._g4HotToRowValues(e); const cols = dm._getG4HotColumns(); const airAddr1Idx = cols.indexOf('air_address_1'); assert.equal(values[airAddr1Idx], 'ac1'); assert.equal(values[airAddr1Idx + 1], 'ac2'); // air_address_2 assert.deepEqual(values[airAddr1Idx + 2], ['ac3']); // air_address_residual const state1Idx = cols.indexOf('state_1'); assert.equal(values[state1Idx], 1); assert.equal(values[state1Idx + 1], 2); assert.deepEqual(values[state1Idx + 2], [3]); const elecAddr1Idx = cols.indexOf('elec_address_1'); assert.equal(values[elecAddr1Idx], 'e1'); assert.equal(values[elecAddr1Idx + 1], null); // only 1 element assert.equal(values[elecAddr1Idx + 2], null); // no residual }); it('produces correct column count', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = buildBasePayload(); const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); assert.equal(values.length, cols.length); }); it('writes power helper fields as null temporarily', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), extra: { power_carbon_on: 1.5, power_carbon_off: 2.5, power_person_exist: 3.5, power_person_left: 4.5, }, }; const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); assert.equal(values[cols.indexOf('power_carbon_on')], null); assert.equal(values[cols.indexOf('power_carbon_off')], null); assert.equal(values[cols.indexOf('power_person_exist')], null); assert.equal(values[cols.indexOf('power_person_left')], null); }); it('generates lowercase guid without hyphens when missing', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = buildBasePayload(); const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); const guid = values[cols.indexOf('guid')]; assert.match(guid, /^[0-9a-f]{32}$/); }); it('normalizes provided guid to lowercase without hyphens', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), guid: 'A0B1C2D3-E4F5-6789-ABCD-EF0123456789' }; const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); const guid = values[cols.indexOf('guid')]; assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789'); }); it('omits guid column and value for g5 rows', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const cols = dm._getG5Columns(); const values = dm._g5ToRowValues(buildBasePayload()); assert.equal(cols.includes('guid'), false); assert.equal(values.length, cols.length); }); it('writes g5 base array columns as null', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const cols = dm._getG5Columns(); const values = dm._g5ToRowValues({ ...buildBasePayload(), service_mask: 7, elec_address: ['e1', 'e2'], air_address: ['ac1', 'ac2'], voltage: [220.5, 221.5], ampere: [1.1, 1.2], power: [100, 200], phase: ['A', 'B'], energy: [10, 20], sum_energy: [100, 200], state: [1, 0], model: [2, 3], speed: [1, 2], set_temp: [24, 25], now_temp: [26, 27], solenoid_valve: [1, 0], extra: { source: 'test' }, }); for (const column of ['service_mask', 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', 'now_temp', 'solenoid_valve', 'extra']) { assert.equal(values[cols.indexOf(column)], null); } assert.equal(values[cols.indexOf('svc_01')], true); assert.equal(values[cols.indexOf('svc_02')], true); assert.equal(values[cols.indexOf('svc_03')], true); assert.equal(values[cols.indexOf('air_address_1')], 'ac1'); assert.equal(values[cols.indexOf('elec_address_1')], 'e1'); }); }); describe('DatabaseManager: _formatPgCol', () => { it('formats boolean values correctly', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); assert.equal(dm._formatPgCol(true), 't'); assert.equal(dm._formatPgCol(false), 'f'); assert.equal(dm._formatPgCol(null), '\\N'); }); }); describe('DatabaseManager: room_status upsert SQL', () => { it('does not update ts_ms in old room_status DO UPDATE SET', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' }); const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { tableName: 'room_status.room_status_moment', conflictColumns: ['hotel_id', 'room_id', 'device_id'], includeGuid: true, autoCreatePartitions: true, tableRef: 'room_status.room_status_moment', logPrefix: 'upsertRoomStatus', }); assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id, device_id\)/); assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/); }); it('uses hotel_id and room_id as conflict key for g5 room_status', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { tableName: 'room_status.room_status_moment_g5', conflictColumns: ['hotel_id', 'room_id'], includeGuid: false, autoCreatePartitions: false, tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, logPrefix: 'upsertRoomStatusG5', }); assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id\)/); assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/); assert.equal(/guid/.test(built.sql), false); }); it('forces online_status to 1 for g5 room_status insert and update', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { tableName: 'room_status.room_status_moment_g5', conflictColumns: ['hotel_id', 'room_id'], includeGuid: false, autoCreatePartitions: false, tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, logPrefix: 'upsertRoomStatusG5', }); assert.match(built.sql, /INSERT INTO room_status\.room_status_moment_g5 \(.*online_status\)/s); assert.match(built.sql, /online_status = 1/); assert.equal(built.values[built.values.length - 1], 1); }); it('always updates g5 room_status on conflict even when business fields are unchanged', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], { tableName: 'room_status.room_status_moment_g5', conflictColumns: ['hotel_id', 'room_id'], includeGuid: false, autoCreatePartitions: false, tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, sortAndDedupByConflictKey: true, logPrefix: 'upsertRoomStatusG5', }); assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/); assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/); }); it('sorts and dedups g5 room_status by conflict key, keeping max ts_ms', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); const built = dm._buildRoomStatusUpsertQuery([ { ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 1000, device_id: 'dev-old-102' }, { ...buildBasePayload(), hotel_id: 1, room_id: '101', ts_ms: 3000, device_id: 'dev-101' }, { ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 5000, device_id: 'dev-new-102' }, ], { tableName: 'room_status.room_status_moment_g5', conflictColumns: ['hotel_id', 'room_id'], includeGuid: false, autoCreatePartitions: false, tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, sortAndDedupByConflictKey: true, logPrefix: 'upsertRoomStatusG5', }); assert.equal(built.uniqueEvents.length, 2); assert.equal(built.uniqueEvents[0].hotel_id, 1); assert.equal(built.uniqueEvents[0].room_id, '101'); assert.equal(built.uniqueEvents[1].hotel_id, 2); assert.equal(built.uniqueEvents[1].room_id, '102'); assert.equal(built.uniqueEvents[1].ts_ms, 5000); assert.equal(built.uniqueEvents[1].device_id, 'dev-new-102'); }); it('retries g5 room_status on deadlock', async () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); let calls = 0; dm.pool = { query: async () => { calls += 1; if (calls < 3) { const err = new Error('deadlock detected'); err.code = '40P01'; throw err; } return { rowCount: 1 }; }, }; dm._sleep = async () => {}; const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], { tableName: 'room_status.room_status_moment_g5', conflictColumns: ['hotel_id', 'room_id'], includeGuid: false, autoCreatePartitions: false, tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, sortAndDedupByConflictKey: true, deadlockRetryAttempts: 3, deadlockRetryBaseDelayMs: 1, logPrefix: 'upsertRoomStatusG5', }); assert.equal(result.rowCount, 1); assert.equal(calls, 3); }); it('sorts and dedups old room_status by conflict key, keeping max ts_ms', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' }); const built = dm._buildRoomStatusUpsertQuery([ { ...buildBasePayload(), hotel_id: 2, room_id: '102', device_id: 'dev-b', ts_ms: 1000 }, { ...buildBasePayload(), hotel_id: 1, room_id: '101', device_id: 'dev-a', ts_ms: 3000 }, { ...buildBasePayload(), hotel_id: 2, room_id: '102', device_id: 'dev-b', ts_ms: 5000 }, ], { tableName: 'room_status.room_status_moment', conflictColumns: ['hotel_id', 'room_id', 'device_id'], includeGuid: true, autoCreatePartitions: true, tableRef: 'room_status.room_status_moment', forceOnlineStatusOnWrite: false, forceUpdateOnConflict: false, sortAndDedupByConflictKey: true, logPrefix: 'upsertRoomStatus', }); assert.equal(built.uniqueEvents.length, 2); assert.equal(built.uniqueEvents[0].hotel_id, 1); assert.equal(built.uniqueEvents[0].room_id, '101'); assert.equal(built.uniqueEvents[0].device_id, 'dev-a'); assert.equal(built.uniqueEvents[1].hotel_id, 2); assert.equal(built.uniqueEvents[1].room_id, '102'); assert.equal(built.uniqueEvents[1].device_id, 'dev-b'); assert.equal(built.uniqueEvents[1].ts_ms, 5000); }); it('retries old room_status on deadlock', async () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' }); let calls = 0; dm.pool = { query: async () => { calls += 1; if (calls < 3) { const err = new Error('deadlock detected'); err.code = '40P01'; throw err; } return { rowCount: 1 }; }, }; dm._sleep = async () => {}; const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], { tableName: 'room_status.room_status_moment', conflictColumns: ['hotel_id', 'room_id', 'device_id'], includeGuid: true, autoCreatePartitions: true, tableRef: 'room_status.room_status_moment', forceOnlineStatusOnWrite: false, forceUpdateOnConflict: false, sortAndDedupByConflictKey: true, deadlockRetryAttempts: 3, deadlockRetryBaseDelayMs: 1, logPrefix: 'upsertRoomStatus', }); assert.equal(result.rowCount, 1); assert.equal(calls, 3); }); }); describe('DatabaseManager: insertHeartbeatEventsDual', () => { it('returns empty results when both targets disabled', async () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, }); const result = await dm.insertHeartbeatEventsDual([buildBasePayload()]); assert.equal(result.legacy.enabled, false); assert.equal(result.g4Hot.enabled, false); assert.equal(result.legacy.insertedCount, 0); assert.equal(result.g4Hot.insertedCount, 0); }); });