2026-03-09 15:49:12 +08:00
|
|
|
|
import assert from 'node:assert/strict';
|
|
|
|
|
|
import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js';
|
|
|
|
|
|
import { DatabaseManager } from '../src/db/databaseManager.js';
|
|
|
|
|
|
|
|
|
|
|
|
// ---- 辅助 ----
|
|
|
|
|
|
|
|
|
|
|
|
const buildBasePayload = () => ({
|
|
|
|
|
|
ts_ms: 1700000000123,
|
|
|
|
|
|
hotel_id: 1,
|
|
|
|
|
|
room_id: '101',
|
|
|
|
|
|
device_id: 'dev1',
|
|
|
|
|
|
ip: '10.0.0.1',
|
|
|
|
|
|
power_state: 1,
|
|
|
|
|
|
guest_type: 0,
|
|
|
|
|
|
cardless_state: 0,
|
|
|
|
|
|
service_mask: 7,
|
|
|
|
|
|
pms_state: 1,
|
|
|
|
|
|
carbon_state: 0,
|
|
|
|
|
|
device_count: 1,
|
|
|
|
|
|
comm_seq: 1,
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
function makeDualResult({ legacyEnabled = true, legacySuccess = true, legacyConn = false,
|
|
|
|
|
|
g4HotEnabled = false, g4HotSuccess = true, g4HotConn = false,
|
2026-03-10 16:29:24 +08:00
|
|
|
|
g5Enabled = false, g5Success = true, g5Conn = false,
|
2026-03-09 15:49:12 +08:00
|
|
|
|
insertedCount = 1, failedRecords = [] } = {}) {
|
|
|
|
|
|
return {
|
|
|
|
|
|
legacy: {
|
|
|
|
|
|
enabled: legacyEnabled, success: legacySuccess, insertedCount: legacySuccess ? insertedCount : 0,
|
|
|
|
|
|
failedRecords: legacySuccess ? [] : failedRecords, error: legacySuccess ? null : new Error('legacy fail'),
|
|
|
|
|
|
isConnectionError: legacyConn, batchError: legacySuccess ? null : new Error('legacy fail'),
|
|
|
|
|
|
},
|
|
|
|
|
|
g4Hot: {
|
|
|
|
|
|
enabled: g4HotEnabled, success: g4HotSuccess, insertedCount: g4HotSuccess ? insertedCount : 0,
|
|
|
|
|
|
failedRecords: g4HotSuccess ? [] : failedRecords, error: g4HotSuccess ? null : new Error('g4hot fail'),
|
|
|
|
|
|
isConnectionError: g4HotConn, batchError: g4HotSuccess ? null : new Error('g4hot fail'),
|
|
|
|
|
|
},
|
2026-03-10 16:29:24 +08:00
|
|
|
|
g5: {
|
|
|
|
|
|
enabled: g5Enabled, success: g5Success, insertedCount: g5Success ? insertedCount : 0,
|
|
|
|
|
|
failedRecords: g5Success ? [] : failedRecords, error: g5Success ? null : new Error('g5 fail'),
|
|
|
|
|
|
isConnectionError: g5Conn, batchError: g5Success ? null : new Error('g5 fail'),
|
|
|
|
|
|
},
|
2026-03-09 15:49:12 +08:00
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
function buildMockDb(overrides = {}) {
|
|
|
|
|
|
return {
|
|
|
|
|
|
config: {
|
|
|
|
|
|
legacyHeartbeatEnabled: true,
|
|
|
|
|
|
g4HotHeartbeatEnabled: false,
|
|
|
|
|
|
roomStatusEnabled: true,
|
|
|
|
|
|
legacyTable: 'heartbeat.heartbeat_events',
|
|
|
|
|
|
g4HotTable: 'heartbeat.heartbeat_events_g4_hot',
|
2026-03-10 16:29:24 +08:00
|
|
|
|
roomStatusTable: 'room_status.room_status_moment',
|
2026-03-09 15:49:12 +08:00
|
|
|
|
...overrides.config,
|
|
|
|
|
|
},
|
|
|
|
|
|
insertHeartbeatEventsDual: overrides.insertHeartbeatEventsDual ?? (async () => makeDualResult()),
|
|
|
|
|
|
insertHeartbeatEventsErrors: overrides.insertHeartbeatEventsErrors ?? (async () => {}),
|
|
|
|
|
|
upsertRoomStatus: overrides.upsertRoomStatus ?? (async () => ({ rowCount: 1 })),
|
|
|
|
|
|
insertHeartbeatEvents: overrides.insertHeartbeatEvents ?? (async () => ({ insertedCount: 1 })),
|
|
|
|
|
|
checkConnection: overrides.checkConnection ?? (async () => true),
|
|
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-10 16:29:24 +08:00
|
|
|
|
function buildMockG5Db(overrides = {}) {
|
|
|
|
|
|
return {
|
|
|
|
|
|
config: {
|
|
|
|
|
|
enabled: true,
|
|
|
|
|
|
g5HeartbeatEnabled: true,
|
|
|
|
|
|
g5Table: 'heartbeat.heartbeat_events_g5',
|
|
|
|
|
|
roomStatusEnabled: true,
|
|
|
|
|
|
roomStatusTable: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
...overrides.config,
|
|
|
|
|
|
},
|
|
|
|
|
|
insertHeartbeatEventsG5: overrides.insertHeartbeatEventsG5 ?? (async () => ({
|
|
|
|
|
|
enabled: true,
|
|
|
|
|
|
success: true,
|
|
|
|
|
|
insertedCount: 1,
|
|
|
|
|
|
failedRecords: [],
|
|
|
|
|
|
error: null,
|
|
|
|
|
|
isConnectionError: false,
|
|
|
|
|
|
batchError: null,
|
|
|
|
|
|
})),
|
|
|
|
|
|
upsertRoomStatusG5: overrides.upsertRoomStatusG5 ?? (async () => ({ rowCount: 1 })),
|
|
|
|
|
|
_isDbConnectionError: overrides._isDbConnectionError ?? (() => false),
|
|
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
function buildProcessor(dbOverrides = {}, processorConfig = {}, g5Overrides = null) {
|
2026-03-09 15:49:12 +08:00
|
|
|
|
const db = buildMockDb(dbOverrides);
|
2026-03-10 16:29:24 +08:00
|
|
|
|
const deps = {};
|
|
|
|
|
|
if (g5Overrides) {
|
|
|
|
|
|
deps.g5DatabaseManager = buildMockG5Db(g5Overrides);
|
|
|
|
|
|
}
|
2026-03-09 15:49:12 +08:00
|
|
|
|
return new HeartbeatProcessor(
|
|
|
|
|
|
{ batchSize: 1, batchTimeout: 1000, ...processorConfig },
|
2026-03-10 16:29:24 +08:00
|
|
|
|
db,
|
|
|
|
|
|
deps
|
2026-03-09 15:49:12 +08:00
|
|
|
|
);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ---- 测试 ----
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 仅旧表开启', () => {
|
|
|
|
|
|
it('should call insertHeartbeatEventsDual and succeed', async () => {
|
|
|
|
|
|
let captured = null;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false },
|
|
|
|
|
|
insertHeartbeatEventsDual: async (events) => {
|
|
|
|
|
|
captured = events;
|
|
|
|
|
|
return makeDualResult({ legacyEnabled: true, g4HotEnabled: false });
|
|
|
|
|
|
},
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
const res = await processor.processMessage(msg);
|
|
|
|
|
|
assert.deepEqual(res, { insertedCount: 1 });
|
|
|
|
|
|
assert.ok(Array.isArray(captured));
|
|
|
|
|
|
assert.equal(captured.length, 1);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 仅新表开启', () => {
|
|
|
|
|
|
it('should call insertHeartbeatEventsDual with g4hot enabled', async () => {
|
|
|
|
|
|
let dualCalled = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async (events) => {
|
|
|
|
|
|
dualCalled = true;
|
|
|
|
|
|
return makeDualResult({ legacyEnabled: false, g4HotEnabled: true });
|
|
|
|
|
|
},
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
assert.ok(dualCalled);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 旧新双开', () => {
|
|
|
|
|
|
it('should call insertHeartbeatEventsDual with both enabled', async () => {
|
|
|
|
|
|
let dualCalled = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => {
|
|
|
|
|
|
dualCalled = true;
|
|
|
|
|
|
return makeDualResult({ legacyEnabled: true, g4HotEnabled: true });
|
|
|
|
|
|
},
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
assert.ok(dualCalled);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 旧新双关,room_status 仍执行', () => {
|
|
|
|
|
|
it('should still call upsertRoomStatus when both detail writes are off', async () => {
|
|
|
|
|
|
let roomStatusCalled = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }),
|
|
|
|
|
|
upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
// 等待异步 room_status 调用完成
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.ok(roomStatusCalled);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 旧成功新失败', () => {
|
|
|
|
|
|
it('should continue and only write g4hot failures to error table', async () => {
|
|
|
|
|
|
let errorTablePayload = null;
|
|
|
|
|
|
const failedRecord = { error: new Error('g4hot column mismatch'), record: buildBasePayload() };
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({
|
|
|
|
|
|
legacyEnabled: true, legacySuccess: true,
|
|
|
|
|
|
g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord],
|
|
|
|
|
|
}),
|
|
|
|
|
|
insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.ok(errorTablePayload);
|
|
|
|
|
|
assert.equal(errorTablePayload.length, 1);
|
|
|
|
|
|
assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed');
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 旧失败新成功', () => {
|
|
|
|
|
|
it('should NOT write legacy failures to error table', async () => {
|
|
|
|
|
|
let errorTableCalled = false;
|
|
|
|
|
|
const failedRecord = { error: new Error('legacy column mismatch'), record: buildBasePayload() };
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({
|
|
|
|
|
|
legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord],
|
|
|
|
|
|
g4HotEnabled: true, g4HotSuccess: true,
|
|
|
|
|
|
}),
|
|
|
|
|
|
insertHeartbeatEventsErrors: async () => { errorTableCalled = true; },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.equal(errorTableCalled, false);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: 双失败', () => {
|
|
|
|
|
|
it('should only write g4hot failures to error table, not legacy failures', async () => {
|
|
|
|
|
|
let errorTablePayload = null;
|
|
|
|
|
|
const failedRecord = { error: new Error('fail'), record: buildBasePayload() };
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({
|
|
|
|
|
|
legacyEnabled: true, legacySuccess: false, failedRecords: [failedRecord],
|
|
|
|
|
|
g4HotEnabled: true, g4HotSuccess: false, failedRecords: [failedRecord],
|
|
|
|
|
|
}),
|
|
|
|
|
|
insertHeartbeatEventsErrors: async (payload) => { errorTablePayload = payload; },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
// 错误表只包含 g4hot 失败
|
|
|
|
|
|
assert.ok(errorTablePayload);
|
|
|
|
|
|
assert.equal(errorTablePayload[0].error_code, 'g4hot_write_failed');
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('Dual-write: room_status 始终执行', () => {
|
|
|
|
|
|
it('should call upsertRoomStatus regardless of detail write success/failure', async () => {
|
|
|
|
|
|
let roomStatusCalled = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({
|
|
|
|
|
|
legacyEnabled: true, legacySuccess: false,
|
|
|
|
|
|
failedRecords: [{ error: new Error('fail'), record: buildBasePayload() }],
|
|
|
|
|
|
}),
|
|
|
|
|
|
upsertRoomStatus: async () => { roomStatusCalled = true; return { rowCount: 1 }; },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.ok(roomStatusCalled);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
2026-03-10 16:29:24 +08:00
|
|
|
|
describe('RoomStatus dual-write', () => {
|
|
|
|
|
|
it('should write old and g5 room_status independently', async () => {
|
|
|
|
|
|
let oldCalled = false;
|
|
|
|
|
|
let g5Called = false;
|
|
|
|
|
|
const processor = buildProcessor(
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
|
|
|
|
|
|
upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; },
|
|
|
|
|
|
},
|
|
|
|
|
|
{},
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { roomStatusEnabled: true },
|
|
|
|
|
|
upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; },
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.equal(oldCalled, true);
|
|
|
|
|
|
assert.equal(g5Called, true);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should allow old room_status off while g5 room_status stays on', async () => {
|
|
|
|
|
|
let oldCalled = false;
|
|
|
|
|
|
let g5Called = false;
|
|
|
|
|
|
const processor = buildProcessor(
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false, roomStatusEnabled: false },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
|
|
|
|
|
|
upsertRoomStatus: async () => { oldCalled = true; return { rowCount: 1 }; },
|
|
|
|
|
|
},
|
|
|
|
|
|
{},
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { roomStatusEnabled: true },
|
|
|
|
|
|
upsertRoomStatusG5: async () => { g5Called = true; return { rowCount: 1 }; },
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
await new Promise(r => setTimeout(r, 50));
|
|
|
|
|
|
assert.equal(oldCalled, false);
|
|
|
|
|
|
assert.equal(g5Called, true);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('G5-write: 独立写库', () => {
|
|
|
|
|
|
it('should write to g5 independently after legacy/g4 succeed', async () => {
|
|
|
|
|
|
let g5Captured = null;
|
|
|
|
|
|
const processor = buildProcessor(
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: true }),
|
|
|
|
|
|
},
|
|
|
|
|
|
{},
|
|
|
|
|
|
{
|
|
|
|
|
|
insertHeartbeatEventsG5: async (events) => {
|
|
|
|
|
|
g5Captured = events;
|
|
|
|
|
|
return {
|
|
|
|
|
|
enabled: true,
|
|
|
|
|
|
success: true,
|
|
|
|
|
|
insertedCount: events.length,
|
|
|
|
|
|
failedRecords: [],
|
|
|
|
|
|
error: null,
|
|
|
|
|
|
isConnectionError: false,
|
|
|
|
|
|
batchError: null,
|
|
|
|
|
|
};
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
const res = await processor.processMessage(msg);
|
|
|
|
|
|
assert.deepEqual(res, { insertedCount: 1 });
|
|
|
|
|
|
assert.ok(Array.isArray(g5Captured));
|
|
|
|
|
|
assert.equal(g5Captured.length, 1);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should not block main flow when g5 write fails', async () => {
|
|
|
|
|
|
const processor = buildProcessor(
|
|
|
|
|
|
{
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: false },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: true, g4HotEnabled: false }),
|
|
|
|
|
|
},
|
|
|
|
|
|
{},
|
|
|
|
|
|
{
|
|
|
|
|
|
insertHeartbeatEventsG5: async () => ({
|
|
|
|
|
|
enabled: true,
|
|
|
|
|
|
success: false,
|
|
|
|
|
|
insertedCount: 0,
|
|
|
|
|
|
failedRecords: [{ error: new Error('g5 fail'), record: buildBasePayload() }],
|
|
|
|
|
|
error: new Error('g5 fail'),
|
|
|
|
|
|
isConnectionError: false,
|
|
|
|
|
|
batchError: new Error('g5 fail'),
|
|
|
|
|
|
}),
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
const res = await processor.processMessage(msg);
|
|
|
|
|
|
assert.deepEqual(res, { insertedCount: 1 });
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
2026-03-09 15:49:12 +08:00
|
|
|
|
describe('Dual-write: 暂停消费策略', () => {
|
|
|
|
|
|
it('should NOT pause when both detail writes disabled but room_status enabled', async () => {
|
|
|
|
|
|
let paused = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false, roomStatusEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({ legacyEnabled: false, g4HotEnabled: false }),
|
|
|
|
|
|
});
|
|
|
|
|
|
processor.onDbOffline = () => { paused = true; };
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
assert.equal(paused, false);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should pause when only legacy enabled and legacy has connection error', () => {
|
|
|
|
|
|
const processor = buildProcessor({});
|
|
|
|
|
|
const result = processor._shouldPauseConsumption(
|
|
|
|
|
|
{ enabled: true, isConnectionError: true },
|
|
|
|
|
|
{ enabled: false, isConnectionError: false }
|
|
|
|
|
|
);
|
|
|
|
|
|
assert.ok(result);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should NOT pause in dual-write when only g4hot has connection error', async () => {
|
|
|
|
|
|
let paused = false;
|
|
|
|
|
|
const processor = buildProcessor({
|
|
|
|
|
|
config: { legacyHeartbeatEnabled: true, g4HotHeartbeatEnabled: true },
|
|
|
|
|
|
insertHeartbeatEventsDual: async () => makeDualResult({
|
|
|
|
|
|
legacyEnabled: true, legacySuccess: true,
|
|
|
|
|
|
g4HotEnabled: true, g4HotSuccess: false, g4HotConn: true,
|
|
|
|
|
|
}),
|
|
|
|
|
|
});
|
|
|
|
|
|
processor.onDbOffline = () => { paused = true; };
|
|
|
|
|
|
|
|
|
|
|
|
const msg = { value: Buffer.from(JSON.stringify(buildBasePayload()), 'utf8') };
|
|
|
|
|
|
await processor.processMessage(msg);
|
|
|
|
|
|
assert.equal(paused, false);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should pause in dual-write when both have connection errors', () => {
|
|
|
|
|
|
const processor = buildProcessor({});
|
|
|
|
|
|
const result = processor._shouldPauseConsumption(
|
|
|
|
|
|
{ enabled: true, isConnectionError: true },
|
|
|
|
|
|
{ enabled: true, isConnectionError: true }
|
|
|
|
|
|
);
|
|
|
|
|
|
assert.ok(result);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should NOT pause when only g4hot has connection error in dual mode', () => {
|
|
|
|
|
|
const processor = buildProcessor({});
|
|
|
|
|
|
const result = processor._shouldPauseConsumption(
|
|
|
|
|
|
{ enabled: true, isConnectionError: false },
|
|
|
|
|
|
{ enabled: true, isConnectionError: true }
|
|
|
|
|
|
);
|
|
|
|
|
|
assert.equal(result, false);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('should pause when only g4hot enabled and g4hot has connection error', () => {
|
|
|
|
|
|
const processor = buildProcessor({});
|
|
|
|
|
|
const result = processor._shouldPauseConsumption(
|
|
|
|
|
|
{ enabled: false, isConnectionError: false },
|
|
|
|
|
|
{ enabled: true, isConnectionError: true }
|
|
|
|
|
|
);
|
|
|
|
|
|
assert.ok(result);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('DatabaseManager: _g4HotToRowValues', () => {
|
|
|
|
|
|
it('unpacks svc booleans from service_mask', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = { ...buildBasePayload(), service_mask: 7 }; // bits 0,1,2 set
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
const svc01Idx = cols.indexOf('svc_01');
|
|
|
|
|
|
assert.equal(values[svc01Idx], true); // bit 0
|
|
|
|
|
|
assert.equal(values[svc01Idx + 1], true); // bit 1 (svc_02)
|
|
|
|
|
|
assert.equal(values[svc01Idx + 2], true); // bit 2 (svc_03)
|
|
|
|
|
|
assert.equal(values[svc01Idx + 3], false); // bit 3 (svc_04)
|
|
|
|
|
|
assert.equal(values[svc01Idx + 63], false); // svc_64
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('unpacks array fields into _1, _2, _residual', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = {
|
|
|
|
|
|
...buildBasePayload(),
|
|
|
|
|
|
air_address: ['ac1', 'ac2', 'ac3'],
|
|
|
|
|
|
state: [1, 2, 3],
|
|
|
|
|
|
elec_address: ['e1'],
|
|
|
|
|
|
voltage: [220.5],
|
|
|
|
|
|
};
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
|
|
|
|
|
|
const airAddr1Idx = cols.indexOf('air_address_1');
|
|
|
|
|
|
assert.equal(values[airAddr1Idx], 'ac1');
|
|
|
|
|
|
assert.equal(values[airAddr1Idx + 1], 'ac2'); // air_address_2
|
|
|
|
|
|
assert.deepEqual(values[airAddr1Idx + 2], ['ac3']); // air_address_residual
|
|
|
|
|
|
|
|
|
|
|
|
const state1Idx = cols.indexOf('state_1');
|
|
|
|
|
|
assert.equal(values[state1Idx], 1);
|
|
|
|
|
|
assert.equal(values[state1Idx + 1], 2);
|
|
|
|
|
|
assert.deepEqual(values[state1Idx + 2], [3]);
|
|
|
|
|
|
|
|
|
|
|
|
const elecAddr1Idx = cols.indexOf('elec_address_1');
|
|
|
|
|
|
assert.equal(values[elecAddr1Idx], 'e1');
|
|
|
|
|
|
assert.equal(values[elecAddr1Idx + 1], null); // only 1 element
|
|
|
|
|
|
assert.equal(values[elecAddr1Idx + 2], null); // no residual
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('produces correct column count', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = buildBasePayload();
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
assert.equal(values.length, cols.length);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('writes power helper fields as null temporarily', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = {
|
|
|
|
|
|
...buildBasePayload(),
|
|
|
|
|
|
extra: {
|
|
|
|
|
|
power_carbon_on: 1.5,
|
|
|
|
|
|
power_carbon_off: 2.5,
|
|
|
|
|
|
power_person_exist: 3.5,
|
|
|
|
|
|
power_person_left: 4.5,
|
|
|
|
|
|
},
|
|
|
|
|
|
};
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('power_carbon_on')], null);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('power_carbon_off')], null);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('power_person_exist')], null);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('power_person_left')], null);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('generates lowercase guid without hyphens when missing', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = buildBasePayload();
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
const guid = values[cols.indexOf('guid')];
|
|
|
|
|
|
assert.match(guid, /^[0-9a-f]{32}$/);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('normalizes provided guid to lowercase without hyphens', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const e = { ...buildBasePayload(), guid: 'A0B1C2D3-E4F5-6789-ABCD-EF0123456789' };
|
|
|
|
|
|
const cols = dm._getG4HotColumns();
|
|
|
|
|
|
const values = dm._g4HotToRowValues(e);
|
|
|
|
|
|
const guid = values[cols.indexOf('guid')];
|
|
|
|
|
|
assert.equal(guid, 'a0b1c2d3e4f56789abcdef0123456789');
|
|
|
|
|
|
});
|
2026-03-10 16:29:24 +08:00
|
|
|
|
|
|
|
|
|
|
it('omits guid column and value for g5 rows', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const cols = dm._getG5Columns();
|
|
|
|
|
|
const values = dm._g5ToRowValues(buildBasePayload());
|
|
|
|
|
|
assert.equal(cols.includes('guid'), false);
|
|
|
|
|
|
assert.equal(values.length, cols.length);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('writes g5 base array columns as null', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
const cols = dm._getG5Columns();
|
|
|
|
|
|
const values = dm._g5ToRowValues({
|
|
|
|
|
|
...buildBasePayload(),
|
|
|
|
|
|
service_mask: 7,
|
|
|
|
|
|
elec_address: ['e1', 'e2'],
|
|
|
|
|
|
air_address: ['ac1', 'ac2'],
|
|
|
|
|
|
voltage: [220.5, 221.5],
|
|
|
|
|
|
ampere: [1.1, 1.2],
|
|
|
|
|
|
power: [100, 200],
|
|
|
|
|
|
phase: ['A', 'B'],
|
|
|
|
|
|
energy: [10, 20],
|
|
|
|
|
|
sum_energy: [100, 200],
|
|
|
|
|
|
state: [1, 0],
|
|
|
|
|
|
model: [2, 3],
|
|
|
|
|
|
speed: [1, 2],
|
|
|
|
|
|
set_temp: [24, 25],
|
|
|
|
|
|
now_temp: [26, 27],
|
|
|
|
|
|
solenoid_valve: [1, 0],
|
|
|
|
|
|
extra: { source: 'test' },
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
for (const column of ['service_mask', 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', 'now_temp', 'solenoid_valve', 'extra']) {
|
|
|
|
|
|
assert.equal(values[cols.indexOf(column)], null);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
assert.equal(values[cols.indexOf('svc_01')], true);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('svc_02')], true);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('svc_03')], true);
|
|
|
|
|
|
assert.equal(values[cols.indexOf('air_address_1')], 'ac1');
|
|
|
|
|
|
assert.equal(values[cols.indexOf('elec_address_1')], 'e1');
|
|
|
|
|
|
});
|
2026-03-09 15:49:12 +08:00
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
describe('DatabaseManager: _formatPgCol', () => {
|
|
|
|
|
|
it('formats boolean values correctly', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x' });
|
|
|
|
|
|
assert.equal(dm._formatPgCol(true), 't');
|
|
|
|
|
|
assert.equal(dm._formatPgCol(false), 'f');
|
|
|
|
|
|
assert.equal(dm._formatPgCol(null), '\\N');
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|
|
|
|
|
|
|
2026-03-10 16:29:24 +08:00
|
|
|
|
describe('DatabaseManager: room_status upsert SQL', () => {
|
|
|
|
|
|
it('does not update ts_ms in old room_status DO UPDATE SET', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment' });
|
|
|
|
|
|
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
|
|
|
|
|
|
tableName: 'room_status.room_status_moment',
|
|
|
|
|
|
conflictColumns: ['hotel_id', 'room_id', 'device_id'],
|
|
|
|
|
|
includeGuid: true,
|
|
|
|
|
|
autoCreatePartitions: true,
|
|
|
|
|
|
tableRef: 'room_status.room_status_moment',
|
|
|
|
|
|
logPrefix: 'upsertRoomStatus',
|
|
|
|
|
|
});
|
|
|
|
|
|
assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id, device_id\)/);
|
|
|
|
|
|
assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('uses hotel_id and room_id as conflict key for g5 room_status', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
|
|
|
|
|
|
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
|
|
|
|
|
|
tableName: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
conflictColumns: ['hotel_id', 'room_id'],
|
|
|
|
|
|
includeGuid: false,
|
|
|
|
|
|
autoCreatePartitions: false,
|
|
|
|
|
|
tableRef: 'room_status.room_status_moment_g5',
|
2026-03-10 19:52:14 +08:00
|
|
|
|
forceOnlineStatusOnWrite: true,
|
|
|
|
|
|
forceUpdateOnConflict: true,
|
2026-03-10 16:29:24 +08:00
|
|
|
|
logPrefix: 'upsertRoomStatusG5',
|
|
|
|
|
|
});
|
|
|
|
|
|
assert.match(built.sql, /ON CONFLICT \(hotel_id, room_id\)/);
|
|
|
|
|
|
assert.doesNotMatch(built.sql, /ts_ms = EXCLUDED\.ts_ms/);
|
|
|
|
|
|
assert.equal(/guid/.test(built.sql), false);
|
|
|
|
|
|
});
|
2026-03-10 19:52:14 +08:00
|
|
|
|
|
|
|
|
|
|
it('forces online_status to 1 for g5 room_status insert and update', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
|
|
|
|
|
|
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
|
|
|
|
|
|
tableName: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
conflictColumns: ['hotel_id', 'room_id'],
|
|
|
|
|
|
includeGuid: false,
|
|
|
|
|
|
autoCreatePartitions: false,
|
|
|
|
|
|
tableRef: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
forceOnlineStatusOnWrite: true,
|
|
|
|
|
|
forceUpdateOnConflict: true,
|
|
|
|
|
|
logPrefix: 'upsertRoomStatusG5',
|
|
|
|
|
|
});
|
|
|
|
|
|
assert.match(built.sql, /INSERT INTO room_status\.room_status_moment_g5 \(.*online_status\)/s);
|
|
|
|
|
|
assert.match(built.sql, /online_status = 1/);
|
|
|
|
|
|
assert.equal(built.values[built.values.length - 1], 1);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
it('always updates g5 room_status on conflict even when business fields are unchanged', () => {
|
|
|
|
|
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
|
|
|
|
|
|
const built = dm._buildRoomStatusUpsertQuery([buildBasePayload()], {
|
|
|
|
|
|
tableName: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
conflictColumns: ['hotel_id', 'room_id'],
|
|
|
|
|
|
includeGuid: false,
|
|
|
|
|
|
autoCreatePartitions: false,
|
|
|
|
|
|
tableRef: 'room_status.room_status_moment_g5',
|
|
|
|
|
|
forceOnlineStatusOnWrite: true,
|
|
|
|
|
|
forceUpdateOnConflict: true,
|
|
|
|
|
|
logPrefix: 'upsertRoomStatusG5',
|
|
|
|
|
|
});
|
|
|
|
|
|
assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/);
|
|
|
|
|
|
assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/);
|
|
|
|
|
|
});
|
2026-03-10 16:29:24 +08:00
|
|
|
|
});
|
|
|
|
|
|
|
2026-03-09 15:49:12 +08:00
|
|
|
|
describe('DatabaseManager: insertHeartbeatEventsDual', () => {
|
|
|
|
|
|
it('returns empty results when both targets disabled', async () => {
|
|
|
|
|
|
const dm = new DatabaseManager({
|
|
|
|
|
|
host: 'x', port: 5432, user: 'x', password: 'x', database: 'x',
|
|
|
|
|
|
legacyHeartbeatEnabled: false, g4HotHeartbeatEnabled: false,
|
|
|
|
|
|
});
|
|
|
|
|
|
const result = await dm.insertHeartbeatEventsDual([buildBasePayload()]);
|
|
|
|
|
|
assert.equal(result.legacy.enabled, false);
|
|
|
|
|
|
assert.equal(result.g4Hot.enabled, false);
|
|
|
|
|
|
assert.equal(result.legacy.insertedCount, 0);
|
|
|
|
|
|
assert.equal(result.g4Hot.insertedCount, 0);
|
|
|
|
|
|
});
|
|
|
|
|
|
});
|