import assert from 'node:assert/strict'; import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; import { DatabaseManager } from '../src/db/databaseManager.js'; // ---- 辅助 ---- const buildBasePayload = () => ({ ts_ms: 1700000000123, hotel_id: 1, room_id: '101', device_id: 'dev1', ip: '10.0.0.1', power_state: 1, guest_type: 0, cardless_state: 0, service_mask: 7, pms_state: 1, carbon_state: 0, device_count: 1, comm_seq: 1, }); function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false, g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false, insertedCount = 1, failedRecords = [] } = {}) { return { legacy: { enabled: legacyEnabled, success: legacySuccess, insertedCount: legacySuccess ? insertedCount : 0, failedRecords: legacySuccess ? [] : failedRecords, error: legacySuccess ? null : new Error('legacy fail'), isConnectionError: legacyConn, batchError: legacySuccess ? null : new Error('legacy fail'), }, g4Hot: { enabled: g4HotEnabled, success: g4HotSuccess, insertedCount: g4HotSuccess ? insertedCount : 0, failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'), isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'), }, }; } function buildMockDb(overrides = {}) { return { config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true, legacyTable: 'heartbeat.heartbeat_events', g4HotTable: 'heartbeat.heartbeat_events_g4_hot', ...overrides.config, }, insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()), insertHeartbeatEventsErrors: overrides.insertHeartbeatEventsErrors ?? (async () => {}), upsertRoomStatus: overrides.upsertRoomStatus ?? (async () => ({ rowCount: 1 })), insertHeartbeatEvents: overrides.insertHeartbeatEvents ?? (async () => ({ insertedCount: 1 })), checkConnection: overrides.checkConnection ?? (async () => true), }; } function buildProcessor(dbOverrides = {}, processorConfig = {}) { const db = buildMockDb(dbOverrides); return new HeartbeatProcessor( { batchSize: 1, batchTimeout: 1000, ...processorConfig }, db ); } // ---- 测试 ---- describe('Dual-write: 仅旧表开启', () => { it('should call insertHeartbeatEventsDual and succeed', async () => { let captured = null; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false }, insertHeartbeatEventsDual: async (events) => { captured = events; return makeDualResult({ legacyEnabled: true, g4HotEnabled: false }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; const res = await processor.processMessage(msg); assert.deepEqual(res, { insertedCount: 1 }); assert.ok(Array.isArray(captured)); assert.equal(captured.length, 1); }); }); describe('Dual-write: 仅新表开启', () => { it('should call insertHeartbeatEventsDual with g4hot enabled', async () => { let dualCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async (events) => { dualCalled = true; return makeDualResult({ legacyEnabled: false, g4HotEnabled: true }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.ok(dualCalled); }); }); describe('Dual-write: 旧新双开', () => { it('should call insertHeartbeatEventsDual with both enabled', async () => { let dualCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => { dualCalled = true; return makeDualResult({ legacyEnabled: true, g4HotEnabled: true }); }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.ok(dualCalled); }); }); describe('Dual-write: 旧新双关,room_status 仍执行', () => { it('should still call upsertRoomStatus when both detail writes are off', async () => { let roomStatusCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); // 等待异步 room_status 调用完成 await new Promise(r => setTimeout(r, 50)); assert.ok(roomStatusCalled); }); }); describe('Dual-write: 旧成功新失败', () => { it('should continue and only write g4hot failures to error table', async () => { let errorTablePayload = null; const failedRecord = { error: new Error('g4hot column mismatch'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: true, g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], }), insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.ok(errorTablePayload); assert.equal(errorTablePayload.length, 1); assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); }); }); describe('Dual-write: 旧失败新成功', () => { it('should NOT write legacy failures to error table', async () => { let errorTableCalled = false; const failedRecord = { error: new Error('legacy column mismatch'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], g4HotEnabled: true, g4HotSuccess: true, }), insertHeartbeatEventsErrors: async () => { errorTableCalled = true; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.equal(errorTableCalled, false); }); }); describe('Dual-write: 双失败', () => { it('should only write g4hot failures to error table, not legacy failures', async () => { let errorTablePayload = null; const failedRecord = { error: new Error('fail'), record: buildBasePayload() }; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord], g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord], }), insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); // 错误表只包含 g4hot 失败 assert.ok(errorTablePayload); assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed'); }); }); describe('Dual-write: room_status 始终执行', () => { it('should call upsertRoomStatus regardless of detail write success/failure', async () => { let roomStatusCalled = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: false, failedRecords: [{ error: new Error('fail'), record: buildBasePayload() }], }), upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; }, }); const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); await new Promise(r => setTimeout(r, 50)); assert.ok(roomStatusCalled); }); }); describe('Dual-write: 暂停消费策略', () => { it('should NOT pause when both detail writes disabled but room_status enabled', async () => { let paused = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }), }); processor.onDbOffline = () => { paused = true; }; const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.equal(paused, false); }); it('should pause when only legacy enabled and legacy has connection error', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: true }, { enabled: false, isConnectionError: false } ); assert.ok(result); }); it('should NOT pause in dual-write when only g4hot has connection error', async () => { let paused = false; const processor = buildProcessor({ config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true }, insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, legacySuccess: true, g4HotEnabled: true, g4HotSuccess: false, g4HotConn: true, }), }); processor.onDbOffline = () => { paused = true; }; const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') }; await processor.processMessage(msg); assert.equal(paused, false); }); it('should pause in dual-write when both have connection errors', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: true }, { enabled: true, isConnectionError: true } ); assert.ok(result); }); it('should NOT pause when only g4hot has connection error in dual mode', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: true, isConnectionError: false }, { enabled: true, isConnectionError: true } ); assert.equal(result, false); }); it('should pause when only g4hot enabled and g4hot has connection error', () => { const processor = buildProcessor({}); const result = processor._shouldPauseConsumption( { enabled: false, isConnectionError: false }, { enabled: true, isConnectionError: true } ); assert.ok(result); }); }); describe('DatabaseManager: _g4HotToRowValues', () => { it('unpacks svc booleans from service_mask', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), service_mask: 7 }; // bits 0,1,2 set const values = dm._g4HotToRowValues(e); const cols = dm._getG4HotColumns(); const svc01Idx = cols.indexOf('svc_01'); assert.equal(values[svc01Idx], true); // bit 0 assert.equal(values[svc01Idx + 1], true); // bit 1 (svc_02) assert.equal(values[svc01Idx + 2], true); // bit 2 (svc_03) assert.equal(values[svc01Idx + 3], false); // bit 3 (svc_04) assert.equal(values[svc01Idx + 63], false); // svc_64 }); it('unpacks array fields into _1, _2, _residual', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), air_address: ['ac1', 'ac2', 'ac3'], state: [1, 2, 3], elec_address: ['e1'], voltage: [220.5], }; const values = dm._g4HotToRowValues(e); const cols = dm._getG4HotColumns(); const airAddr1Idx = cols.indexOf('air_address_1'); assert.equal(values[airAddr1Idx], 'ac1'); assert.equal(values[airAddr1Idx + 1], 'ac2'); // air_address_2 assert.deepEqual(values[airAddr1Idx + 2], ['ac3']); // air_address_residual const state1Idx = cols.indexOf('state_1'); assert.equal(values[state1Idx], 1); assert.equal(values[state1Idx + 1], 2); assert.deepEqual(values[state1Idx + 2], [3]); const elecAddr1Idx = cols.indexOf('elec_address_1'); assert.equal(values[elecAddr1Idx], 'e1'); assert.equal(values[elecAddr1Idx + 1], null); // only 1 element assert.equal(values[elecAddr1Idx + 2], null); // no residual }); it('produces correct column count', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = buildBasePayload(); const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); assert.equal(values.length, cols.length); }); it('writes power helper fields as null temporarily', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), extra: { power_carbon_on: 1.5, power_carbon_off: 2.5, power_person_exist: 3.5, power_person_left: 4.5, }, }; const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); assert.equal(values[cols.indexOf('power_carbon_on')], null); assert.equal(values[cols.indexOf('power_carbon_off')], null); assert.equal(values[cols.indexOf('power_person_exist')], null); assert.equal(values[cols.indexOf('power_person_left')], null); }); it('generates lowercase guid without hyphens when missing', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = buildBasePayload(); const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); const guid = values[cols.indexOf('guid')]; assert.match(guid, /^[0-9a-f]{32}$/); }); it('normalizes provided guid to lowercase without hyphens', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); const e = { ...buildBasePayload(), guid: 'A0B1C2D3-E4F5-6789-ABCD-EF0123456789' }; const cols = dm._getG4HotColumns(); const values = dm._g4HotToRowValues(e); const guid = values[cols.indexOf('guid')]; assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789'); }); }); describe('DatabaseManager: _formatPgCol', () => { it('formats boolean values correctly', () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' }); assert.equal(dm._formatPgCol(true), 't'); assert.equal(dm._formatPgCol(false), 'f'); assert.equal(dm._formatPgCol(null), '\\N'); }); }); describe('DatabaseManager: insertHeartbeatEventsDual', () => { it('returns empty results when both targets disabled', async () => { const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, }); const result = await dm.insertHeartbeatEventsDual([buildBasePayload()]); assert.equal(result.legacy.enabled, false); assert.equal(result.g4Hot.enabled, false); assert.equal(result.legacy.insertedCount, 0); assert.equal(result.g4Hot.insertedCount, 0); }); });