Files
Web_BLS_Heartbeat_Server/scripts/db/smokeG4Hot.js
XuJiacheng 43fa7505e5 feat: 新增 G4 热表独立双写能力
- 新增配置项以支持旧/新明细表的独立写入开关及目标表名。
- 重构 DatabaseManager,抽象通用批量 COPY 写入内核,支持不同目标表的复用。
- 新增双明细写入编排器,支持旧/新表独立执行、重试及 fallback。
- 调整 HeartbeatProcessor.processBatch(),确保 room_status 独立执行。
- 错误表仅记录新表写入失败,旧表失败不再写入错误表。
- 重新定义消费暂停策略,基于当前启用的关键 sink 判断。
- 补充按 sink 维度的统计项与启动日志。

新增 G4 热表相关的数据库规范与处理逻辑,确保系统在双写模式下的稳定性与可扩展性。
2026-03-09 15:49:12 +08:00

183 lines
6.2 KiB
JavaScript

import { Client } from 'pg';
import config from '../../src/config/config.js';
import { DatabaseManager } from '../../src/db/databaseManager.js';
async function main() {
const client = new Client({
host: config.db.host,
port: config.db.port,
user: config.db.user,
password: config.db.password,
database: config.db.database,
});
await client.connect();
console.log('=== G4 Hot Smoke Test ===\n');
// 1. 检查新表是否存在
const tableExists = await client.query(
`SELECT 1 FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events_g4_hot'`
);
if (tableExists.rowCount === 0) {
console.error('FAIL: heartbeat.heartbeat_events_g4_hot 表不存在');
process.exit(1);
}
console.log('[OK] 新表 heartbeat_events_g4_hot 已存在');
// 2. 检查分区表类型
const parentKind = await client.query(
`SELECT c.relkind AS kind FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events_g4_hot'`
);
const kind = parentKind.rows?.[0]?.kind;
console.log('[INFO] 父表 relkind:', kind, kind === 'p' ? '(分区表)' : '(非分区表)');
// 3. 列出已有分区
const partitions = await client.query(
`SELECT c.relname AS partition
FROM pg_inherits i
JOIN pg_class c ON c.oid = i.inhrelid
JOIN pg_class p ON p.oid = i.inhparent
JOIN pg_namespace n ON n.oid = p.relnamespace
WHERE n.nspname = 'heartbeat' AND p.relname = 'heartbeat_events_g4_hot'
ORDER BY c.relname`
);
console.log('[INFO] 现有分区:', partitions.rows.map((r) => r.partition));
// 4. 检查唯一索引
const indexes = await client.query(
`SELECT indexname, indexdef FROM pg_indexes
WHERE schemaname = 'heartbeat' AND tablename = 'heartbeat_events_g4_hot'
ORDER BY indexname`
);
console.log('[INFO] 索引:');
for (const r of indexes.rows) {
console.log(' -', r.indexname);
}
// 5. 使用 DatabaseManager 验证 COPY 写入
const dm = new DatabaseManager({ ...config.db, maxConnections: 1, g4HotHeartbeatEnabled: true });
await dm.connect();
const ts = Date.now();
const testEvent = {
ts_ms: ts,
hotel_id: 1,
room_id: '101',
device_id: 'smoke-g4hot-test',
ip: '10.0.0.1',
power_state: 1,
guest_type: 0,
cardless_state: 0,
service_mask: 7, // bits 0,1,2
pms_state: 1,
carbon_state: 0,
device_count: 2,
comm_seq: 1,
insert_card: null,
bright_g: 50,
version: 1,
elec_address: ['e1', 'e2'],
air_address: ['ac1', 'ac2', 'ac3'],
voltage: [220.1, 220.2],
ampere: [1.1, 1.2],
power: [242.0, 264.0],
phase: ['A', 'B'],
energy: [10.5, 11.5],
sum_energy: [100.1, 200.2],
state: [1, 2, 3],
model: [1, 1, 2],
speed: [3, 3, 1],
set_temp: [26, 25, 24],
now_temp: [27, 26, 25],
solenoid_valve: [1, 0, 1],
extra: { source: 'smoke-g4hot', power_carbon_on: 1.5, power_person_exist: 2.5 },
};
try {
const result = await dm._insertEventsToTarget([testEvent], {
tableName: config.db.g4HotTable ?? 'heartbeat.heartbeat_events_g4_hot',
columns: dm._getG4HotColumns(),
toRowValues: (e) => dm._g4HotToRowValues(e),
ensurePartitions: false,
logPrefix: '[g4hot-smoke]',
missingPartitionTable: null,
});
if (result.success) {
console.log(`[OK] G4 Hot COPY 写入成功: ${result.insertedCount}`);
} else {
console.error('[FAIL] G4 Hot COPY 写入失败:', result.error?.message);
if (result.error?.message?.includes('no partition')) {
console.error(' 说明: 新表当日分区尚未创建,请先创建分区');
}
}
} catch (err) {
console.error('[FAIL] G4 Hot 写入异常:', err.message);
}
// 6. 验证写入的数据
try {
const readback = await client.query(
`SELECT svc_01, svc_02, svc_03, svc_04,
air_address_1, air_address_2, air_address_residual,
elec_address_1, elec_address_2,
power_carbon_on, power_person_exist, guid
FROM heartbeat.heartbeat_events_g4_hot
WHERE device_id = 'smoke-g4hot-test' AND ts_ms = $1`,
[ts]
);
if (readback.rowCount > 0) {
const row = readback.rows[0];
console.log('[OK] 数据回读成功:');
console.log(' svc_01:', row.svc_01, '(期望 true)');
console.log(' svc_02:', row.svc_02, '(期望 true)');
console.log(' svc_03:', row.svc_03, '(期望 true)');
console.log(' svc_04:', row.svc_04, '(期望 false)');
console.log(' air_address_1:', row.air_address_1, '(期望 ac1)');
console.log(' air_address_2:', row.air_address_2, '(期望 ac2)');
console.log(' air_address_residual:', row.air_address_residual, '(期望 [ac3])');
console.log(' elec_address_1:', row.elec_address_1, '(期望 e1)');
console.log(' elec_address_2:', row.elec_address_2, '(期望 e2)');
console.log(' power_carbon_on:', row.power_carbon_on, '(期望 1.5)');
console.log(' power_person_exist:', row.power_person_exist, '(期望 2.5)');
console.log(' guid:', row.guid, '(期望 32 位无连接符小写)');
} else {
console.warn('[WARN] 未读回数据(可能因分区缺失而写入失败)');
}
} catch (err) {
console.warn('[WARN] 数据回读失败:', err.message);
}
// 7. 清理测试数据
try {
await client.query(
`DELETE FROM heartbeat.heartbeat_events_g4_hot WHERE device_id = 'smoke-g4hot-test' AND ts_ms = $1`,
[ts]
);
console.log('[OK] 测试数据已清理');
} catch (err) {
console.warn('[WARN] 清理失败:', err.message);
}
// 8. 验证配置摘要
console.log('\n=== 当前配置 ===');
console.log(' legacyHeartbeatEnabled:', config.db.legacyHeartbeatEnabled);
console.log(' g4HotHeartbeatEnabled:', config.db.g4HotHeartbeatEnabled);
console.log(' roomStatusEnabled:', config.db.roomStatusEnabled);
console.log(' legacyTable:', config.db.legacyTable);
console.log(' g4HotTable:', config.db.g4HotTable);
await dm.disconnect();
await client.end();
console.log('\n=== Smoke Test 完成 ===');
}
main().catch((err) => {
console.error('smoke test failed:', err);
process.exit(1);
});