Files
Web_BLS_Heartbeat_Server/test/dualWrite.test.js
XuJiacheng a79c06d4f3 fix(db): 启用冲突键排序去重并增加死锁重试机制
修改数据库管理器配置,默认启用冲突键排序去重功能,保留最大时间戳记录
增加死锁重试机制,默认重试3次,基础延迟100毫秒
添加相关测试用例验证排序去重和死锁重试功能
2026-04-03 18:46:30 +08:00

772 lines
30 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import assert from 'node:assert/strict';
import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js';
import { DatabaseManager } from '../src/db/databaseManager.js';
// ---- 辅助 ----
const buildBasePayload = () => ({
ts_ms: 1700000000123,
hotel_id: 1,
room_id: '101',
device_id: 'dev1',
ip: '10.0.0.1',
power_state: 1,
guest_type: 0,
cardless_state: 0,
service_mask: 7,
pms_state: 1,
carbon_state: 0,
device_count: 1,
comm_seq: 1,
});
function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false,
g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false,
g5Enabled = false, g5Success = true, g5Conn = false,
insertedCount = 1, failedRecords = [] } = {}) {
return {
legacy: {
enabled: legacyEnabled, success: legacySuccess, insertedCount: legacySuccess ? insertedCount : 0,
failedRecords: legacySuccess ? [] : failedRecords, error: legacySuccess ? null : new Error('legacy fail'),
isConnectionError: legacyConn, batchError: legacySuccess ? null : new Error('legacy fail'),
},
g4Hot: {
enabled: g4HotEnabled, success: g4HotSuccess, insertedCount: g4HotSuccess ? insertedCount : 0,
failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'),
isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'),
},
g5: {
enabled: g5Enabled, success: g5Success, insertedCount: g5Success ? insertedCount : 0,
failedRecords: g5Success ? [] : failedRecords, error: g5Success ? null : new Error('g5 fail'),
isConnectionError: g5Conn, batchError: g5Success ? null : new Error('g5 fail'),
},
};
}
function buildMockDb(overrides = {}) {
return {
config: {
legacyHeartbeatEnabled: true,
g4HotHeartbeatEnabled: false,
roomStatusEnabled: true,
legacyTable: 'heartbeat.heartbeat_events',
g4HotTable: 'heartbeat.heartbeat_events_g4_hot',
roomStatusTable: 'room_status.room_status_moment',
...overrides.config,
},
insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()),
insertHeartbeatEventsErrors: overrides.insertHeartbeatEventsErrors ?? (async () => {}),
upsertRoomStatus: overrides.upsertRoomStatus ?? (async () => ({ rowCount: 1 })),
insertHeartbeatEvents: overrides.insertHeartbeatEvents ?? (async () => ({ insertedCount: 1 })),
checkConnection: overrides.checkConnection ?? (async () => true),
};
}
function buildMockG5Db(overrides = {}) {
return {
config: {
enabled: true,
g5HeartbeatEnabled: true,
g5Table: 'heartbeat.heartbeat_events_g5',
roomStatusEnabled: true,
roomStatusTable: 'room_status.room_status_moment_g5',
...overrides.config,
},
insertHeartbeatEventsG5: overrides.insertHeartbeatEventsG5 ?? (async () => ({
enabled: true,
success: true,
insertedCount: 1,
failedRecords: [],
error: null,
isConnectionError: false,
batchError: null,
})),
upsertRoomStatusG5: overrides.upsertRoomStatusG5 ?? (async () => ({ rowCount: 1 })),
_isDbConnectionError: overrides._isDbConnectionError ?? (() => false),
};
}
function buildProcessor(dbOverrides = {}, processorConfig = {}, g5Overrides = null) {
const db = buildMockDb(dbOverrides);
const deps = {};
if (g5Overrides) {
deps.g5DatabaseManager = buildMockG5Db(g5Overrides);
}
return new HeartbeatProcessor(
{ batchSize: 1, batchTimeout: 1000, ...processorConfig },
db,
deps
);
}
// ---- 测试 ----
describe('Dual-write: 仅旧表开启', () => {
it('should call insertHeartbeatEventsDual and succeed', async () => {
let captured = null;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false },
insertHeartbeatEventsDual: async (events) => {
captured = events;
return makeDualResult({ legacyEnabled: true, g4HotEnabled: false });
},
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
const res = await processor.processMessage(msg);
assert.deepEqual(res, { insertedCount: 1 });
assert.ok(Array.isArray(captured));
assert.equal(captured.length, 1);
});
});
describe('Dual-write: 仅新表开启', () => {
it('should call insertHeartbeatEventsDual with g4hot enabled', async () => {
let dualCalled = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async (events) => {
dualCalled = true;
return makeDualResult({ legacyEnabled: false, g4HotEnabled: true });
},
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
assert.ok(dualCalled);
});
});
describe('Dual-write: 旧新双开', () => {
it('should call insertHeartbeatEventsDual with both enabled', async () => {
let dualCalled = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => {
dualCalled = true;
return makeDualResult({ legacyEnabled: true, g4HotEnabled: true });
},
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
assert.ok(dualCalled);
});
});
describe('Dual-write: 旧新双关room_status 仍执行', () => {
it('should still call upsertRoomStatus when both detail writes are off', async () => {
let roomStatusCalled = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }),
upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; },
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
// 等待异步 room_status 调用完成
await new Promise(r => setTimeout(r, 50));
assert.ok(roomStatusCalled);
});
});
describe('Dual-write: 旧成功新失败', () => {
it('should continue and only write g4hot failures to error table', async () => {
let errorTablePayload = null;
const failedRecord = { error: new Error('g4hot column mismatch'), record: buildBasePayload() };
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({
legacyEnabled: true, legacySuccess: true,
g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord],
}),
insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; },
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
assert.ok(errorTablePayload);
assert.equal(errorTablePayload.length, 1);
assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed');
});
});
describe('Dual-write: 旧失败新成功', () => {
it('should NOT write legacy failures to error table', async () => {
let errorTableCalled = false;
const failedRecord = { error: new Error('legacy column mismatch'), record: buildBasePayload() };
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({
legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord],
g4HotEnabled: true, g4HotSuccess: true,
}),
insertHeartbeatEventsErrors: async () => { errorTableCalled = true; },
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
assert.equal(errorTableCalled, false);
});
});
describe('Dual-write: 双失败', () => {
it('should only write g4hot failures to error table, not legacy failures', async () => {
let errorTablePayload = null;
const failedRecord = { error: new Error('fail'), record: buildBasePayload() };
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({
legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord],
g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord],
}),
insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; },
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
// 错误表只包含 g4hot 失败
assert.ok(errorTablePayload);
assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed');
});
});
describe('Dual-write: room_status 始终执行', () => {
it('should call upsertRoomStatus regardless of detail write success/failure', async () => {
let roomStatusCalled = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({
legacyEnabled: true, legacySuccess: false,
failedRecords: [{ error: new Error('fail'), record: buildBasePayload() }],
}),
upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; },
});
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
assert.ok(roomStatusCalled);
});
});
describe('RoomStatus dual-write', () => {
it('should write old and g5 room_status independently', async () => {
let oldCalled = false;
let g5Called = false;
const processor = buildProcessor(
{
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; },
},
{},
{
config: { roomStatusEnabled: true },
upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; },
}
);
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
assert.equal(oldCalled, true);
assert.equal(g5Called, true);
});
it('should allow old room_status off while g5 room_status stays on', async () => {
let oldCalled = false;
let g5Called = false;
const processor = buildProcessor(
{
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: false },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; },
},
{},
{
config: { roomStatusEnabled: true },
upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; },
}
);
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
await new Promise(r => setTimeout(r, 50));
assert.equal(oldCalled, false);
assert.equal(g5Called, true);
});
});
describe('G5-write: 独立写库', () => {
it('should write to g5 independently after legacy/g4 succeed', async () => {
let g5Captured = null;
const processor = buildProcessor(
{
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: true }),
},
{},
{
insertHeartbeatEventsG5: async (events) => {
g5Captured = events;
return {
enabled: true,
success: true,
insertedCount: events.length,
failedRecords: [],
error: null,
isConnectionError: false,
batchError: null,
};
},
}
);
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
const res = await processor.processMessage(msg);
assert.deepEqual(res, { insertedCount: 1 });
assert.ok(Array.isArray(g5Captured));
assert.equal(g5Captured.length, 1);
});
it('should not block main flow when g5 write fails', async () => {
const processor = buildProcessor(
{
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
},
{},
{
insertHeartbeatEventsG5: async () => ({
enabled: true,
success: false,
insertedCount: 0,
failedRecords: [{ error: new Error('g5 fail'), record: buildBasePayload() }],
error: new Error('g5 fail'),
isConnectionError: false,
batchError: new Error('g5 fail'),
}),
}
);
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
const res = await processor.processMessage(msg);
assert.deepEqual(res, { insertedCount: 1 });
});
});
describe('Dual-write: 暂停消费策略', () => {
it('should NOT pause when both detail writes disabled but room_status enabled', async () => {
let paused = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }),
});
processor.onDbOffline = () => { paused = true; };
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
assert.equal(paused, false);
});
it('should pause when only legacy enabled and legacy has connection error', () => {
const processor = buildProcessor({});
const result = processor._shouldPauseConsumption(
{ enabled: true, isConnectionError: true },
{ enabled: false, isConnectionError: false }
);
assert.ok(result);
});
it('should NOT pause in dual-write when only g4hot has connection error', async () => {
let paused = false;
const processor = buildProcessor({
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
insertHeartbeatEventsDual: async () => makeDualResult({
legacyEnabled: true, legacySuccess: true,
g4HotEnabled: true, g4HotSuccess: false, g4HotConn: true,
}),
});
processor.onDbOffline = () => { paused = true; };
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
await processor.processMessage(msg);
assert.equal(paused, false);
});
it('should pause in dual-write when both have connection errors', () => {
const processor = buildProcessor({});
const result = processor._shouldPauseConsumption(
{ enabled: true, isConnectionError: true },
{ enabled: true, isConnectionError: true }
);
assert.ok(result);
});
it('should NOT pause when only g4hot has connection error in dual mode', () => {
const processor = buildProcessor({});
const result = processor._shouldPauseConsumption(
{ enabled: true, isConnectionError: false },
{ enabled: true, isConnectionError: true }
);
assert.equal(result, false);
});
it('should pause when only g4hot enabled and g4hot has connection error', () => {
const processor = buildProcessor({});
const result = processor._shouldPauseConsumption(
{ enabled: false, isConnectionError: false },
{ enabled: true, isConnectionError: true }
);
assert.ok(result);
});
});
describe('DatabaseManager: _g4HotToRowValues', () => {
it('unpacks svc booleans from service_mask', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = { ...buildBasePayload(), service_mask: 7 }; // bits 0,1,2 set
const values = dm._g4HotToRowValues(e);
const cols = dm._getG4HotColumns();
const svc01Idx = cols.indexOf('svc_01');
assert.equal(values[svc01Idx], true); // bit 0
assert.equal(values[svc01Idx + 1], true); // bit 1 (svc_02)
assert.equal(values[svc01Idx + 2], true); // bit 2 (svc_03)
assert.equal(values[svc01Idx + 3], false); // bit 3 (svc_04)
assert.equal(values[svc01Idx + 63], false); // svc_64
});
it('unpacks array fields into _1, _2, _residual', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = {
...buildBasePayload(),
air_address: ['ac1', 'ac2', 'ac3'],
state: [1, 2, 3],
elec_address: ['e1'],
voltage: [220.5],
};
const values = dm._g4HotToRowValues(e);
const cols = dm._getG4HotColumns();
const airAddr1Idx = cols.indexOf('air_address_1');
assert.equal(values[airAddr1Idx], 'ac1');
assert.equal(values[airAddr1Idx + 1], 'ac2'); // air_address_2
assert.deepEqual(values[airAddr1Idx + 2], ['ac3']); // air_address_residual
const state1Idx = cols.indexOf('state_1');
assert.equal(values[state1Idx], 1);
assert.equal(values[state1Idx + 1], 2);
assert.deepEqual(values[state1Idx + 2], [3]);
const elecAddr1Idx = cols.indexOf('elec_address_1');
assert.equal(values[elecAddr1Idx], 'e1');
assert.equal(values[elecAddr1Idx + 1], null); // only 1 element
assert.equal(values[elecAddr1Idx + 2], null); // no residual
});
it('produces correct column count', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = buildBasePayload();
const cols = dm._getG4HotColumns();
const values = dm._g4HotToRowValues(e);
assert.equal(values.length, cols.length);
});
it('writes power helper fields as null temporarily', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = {
...buildBasePayload(),
extra: {
power_carbon_on: 1.5,
power_carbon_off: 2.5,
power_person_exist: 3.5,
power_person_left: 4.5,
},
};
const cols = dm._getG4HotColumns();
const values = dm._g4HotToRowValues(e);
assert.equal(values[cols.indexOf('power_carbon_on')], null);
assert.equal(values[cols.indexOf('power_carbon_off')], null);
assert.equal(values[cols.indexOf('power_person_exist')], null);
assert.equal(values[cols.indexOf('power_person_left')], null);
});
it('generates lowercase guid without hyphens when missing', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = buildBasePayload();
const cols = dm._getG4HotColumns();
const values = dm._g4HotToRowValues(e);
const guid = values[cols.indexOf('guid')];
assert.match(guid, /^[0-9a-f]{32}$/);
});
it('normalizes provided guid to lowercase without hyphens', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const e = { ...buildBasePayload(), guid: 'A0B1C2D3-E4F5-6789-ABCD-EF0123456789' };
const cols = dm._getG4HotColumns();
const values = dm._g4HotToRowValues(e);
const guid = values[cols.indexOf('guid')];
assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789');
});
it('omits guid column and value for g5 rows', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const cols = dm._getG5Columns();
const values = dm._g5ToRowValues(buildBasePayload());
assert.equal(cols.includes('guid'), false);
assert.equal(values.length, cols.length);
});
it('writes g5 base array columns as null', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
const cols = dm._getG5Columns();
const values = dm._g5ToRowValues({
...buildBasePayload(),
service_mask: 7,
elec_address: ['e1', 'e2'],
air_address: ['ac1', 'ac2'],
voltage: [220.5, 221.5],
ampere: [1.1, 1.2],
power: [100, 200],
phase: ['A', 'B'],
energy: [10, 20],
sum_energy: [100, 200],
state: [1, 0],
model: [2, 3],
speed: [1, 2],
set_temp: [24, 25],
now_temp: [26, 27],
solenoid_valve: [1, 0],
extra: { source: 'test' },
});
for (const column of ['service_mask', 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', 'now_temp', 'solenoid_valve', 'extra']) {
assert.equal(values[cols.indexOf(column)], null);
}
assert.equal(values[cols.indexOf('svc_01')], true);
assert.equal(values[cols.indexOf('svc_02')], true);
assert.equal(values[cols.indexOf('svc_03')], true);
assert.equal(values[cols.indexOf('air_address_1')], 'ac1');
assert.equal(values[cols.indexOf('elec_address_1')], 'e1');
});
});
describe('DatabaseManager: _formatPgCol', () => {
it('formats boolean values correctly', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
assert.equal(dm._formatPgCol(true), 't');
assert.equal(dm._formatPgCol(false), 'f');
assert.equal(dm._formatPgCol(null), '\\N');
});
});
describe('DatabaseManager: room_status upsert SQL', () => {
it('does not update ts_ms in old room_status DO UPDATE SET', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' });
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
tableName: 'room_status.room_status_moment',
conflictColumns: ['hotel_id', 'room_id', 'device_id'],
includeGuid: true,
autoCreatePartitions: true,
tableRef: 'room_status.room_status_moment',
logPrefix: 'upsertRoomStatus',
});
assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id, device_id\)/);
assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/);
});
it('uses hotel_id and room_id as conflict key for g5 room_status', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
tableName: 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: 'room_status.room_status_moment_g5',
forceOnlineStatusOnWrite: true,
forceUpdateOnConflict: true,
logPrefix: 'upsertRoomStatusG5',
});
assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id\)/);
assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/);
assert.equal(/guid/.test(built.sql), false);
});
it('forces online_status to 1 for g5 room_status insert and update', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
tableName: 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: 'room_status.room_status_moment_g5',
forceOnlineStatusOnWrite: true,
forceUpdateOnConflict: true,
logPrefix: 'upsertRoomStatusG5',
});
assert.match(built.sql, /INSERT INTO room_status\.room_status_moment_g5 \(.*online_status\)/s);
assert.match(built.sql, /online_status = 1/);
assert.equal(built.values[built.values.length - 1], 1);
});
it('always updates g5 room_status on conflict even when business fields are unchanged', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
tableName: 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: 'room_status.room_status_moment_g5',
forceOnlineStatusOnWrite: true,
forceUpdateOnConflict: true,
sortAndDedupByConflictKey: true,
logPrefix: 'upsertRoomStatusG5',
});
assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/);
assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/);
});
it('sorts and dedups g5 room_status by conflict key, keeping max ts_ms', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
const built = dm._buildRoomStatusUpsertQuery([
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 1000, device_id: 'dev-old-102' },
{ ...buildBasePayload(), hotel_id: 1, room_id: '101', ts_ms: 3000, device_id: 'dev-101' },
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 5000, device_id: 'dev-new-102' },
], {
tableName: 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: 'room_status.room_status_moment_g5',
forceOnlineStatusOnWrite: true,
forceUpdateOnConflict: true,
sortAndDedupByConflictKey: true,
logPrefix: 'upsertRoomStatusG5',
});
assert.equal(built.uniqueEvents.length, 2);
assert.equal(built.uniqueEvents[0].hotel_id, 1);
assert.equal(built.uniqueEvents[0].room_id, '101');
assert.equal(built.uniqueEvents[1].hotel_id, 2);
assert.equal(built.uniqueEvents[1].room_id, '102');
assert.equal(built.uniqueEvents[1].ts_ms, 5000);
assert.equal(built.uniqueEvents[1].device_id, 'dev-new-102');
});
it('retries g5 room_status on deadlock', async () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
let calls = 0;
dm.pool = {
query: async () => {
calls += 1;
if (calls < 3) {
const err = new Error('deadlock detected');
err.code = '40P01';
throw err;
}
return { rowCount: 1 };
},
};
dm._sleep = async () => {};
const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], {
tableName: 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: 'room_status.room_status_moment_g5',
forceOnlineStatusOnWrite: true,
forceUpdateOnConflict: true,
sortAndDedupByConflictKey: true,
deadlockRetryAttempts: 3,
deadlockRetryBaseDelayMs: 1,
logPrefix: 'upsertRoomStatusG5',
});
assert.equal(result.rowCount, 1);
assert.equal(calls, 3);
});
it('sorts and dedups old room_status by conflict key, keeping max ts_ms', () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' });
const built = dm._buildRoomStatusUpsertQuery([
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', device_id: 'dev-b', ts_ms: 1000 },
{ ...buildBasePayload(), hotel_id: 1, room_id: '101', device_id: 'dev-a', ts_ms: 3000 },
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', device_id: 'dev-b', ts_ms: 5000 },
], {
tableName: 'room_status.room_status_moment',
conflictColumns: ['hotel_id', 'room_id', 'device_id'],
includeGuid: true,
autoCreatePartitions: true,
tableRef: 'room_status.room_status_moment',
forceOnlineStatusOnWrite: false,
forceUpdateOnConflict: false,
sortAndDedupByConflictKey: true,
logPrefix: 'upsertRoomStatus',
});
assert.equal(built.uniqueEvents.length, 2);
assert.equal(built.uniqueEvents[0].hotel_id, 1);
assert.equal(built.uniqueEvents[0].room_id, '101');
assert.equal(built.uniqueEvents[0].device_id, 'dev-a');
assert.equal(built.uniqueEvents[1].hotel_id, 2);
assert.equal(built.uniqueEvents[1].room_id, '102');
assert.equal(built.uniqueEvents[1].device_id, 'dev-b');
assert.equal(built.uniqueEvents[1].ts_ms, 5000);
});
it('retries old room_status on deadlock', async () => {
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' });
let calls = 0;
dm.pool = {
query: async () => {
calls += 1;
if (calls < 3) {
const err = new Error('deadlock detected');
err.code = '40P01';
throw err;
}
return { rowCount: 1 };
},
};
dm._sleep = async () => {};
const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], {
tableName: 'room_status.room_status_moment',
conflictColumns: ['hotel_id', 'room_id', 'device_id'],
includeGuid: true,
autoCreatePartitions: true,
tableRef: 'room_status.room_status_moment',
forceOnlineStatusOnWrite: false,
forceUpdateOnConflict: false,
sortAndDedupByConflictKey: true,
deadlockRetryAttempts: 3,
deadlockRetryBaseDelayMs: 1,
logPrefix: 'upsertRoomStatus',
});
assert.equal(result.rowCount, 1);
assert.equal(calls, 3);
});
});
describe('DatabaseManager: insertHeartbeatEventsDual', () => {
it('returns empty results when both targets disabled', async () => {
const dm = new DatabaseManager({
host: 'x', port: 5432, user: 'x', password: 'x', database: 'x',
legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false,
});
const result = await dm.insertHeartbeatEventsDual([buildBasePayload()]);
assert.equal(result.legacy.enabled, false);
assert.equal(result.g4Hot.enabled, false);
assert.equal(result.legacy.insertedCount, 0);
assert.equal(result.g4Hot.insertedCount, 0);
});
});