Files
XuJiacheng cf61e8dac6 feat: 实现房间状态同步功能
- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。
- 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。
- 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。
- 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。
- 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。
- 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
- 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。
- 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
2026-03-02 11:47:52 +08:00

12 lines
812 B
JavaScript

import pg from 'pg';
const pool = new pg.Pool({
host: '10.8.8.109', port: 5433, user: 'log_admin',
password: 'YourActualStrongPasswordForPostgres!', database: 'log_platform', max: 1
});
const s = await pool.query("SELECT count(*) as total, count(*) FILTER (WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb) as with_loops, count(*) FILTER (WHERE sys_lock_status IS NOT NULL) as with_lock FROM room_status.room_status_moment");
console.log(JSON.stringify(s.rows[0]));
const r = await pool.query("SELECT hotel_id, room_id, device_id, sys_lock_status, dev_loops FROM room_status.room_status_moment WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb ORDER BY ts_ms DESC LIMIT 3");
console.log('Samples:', r.rows.length);
for (const row of r.rows) console.log(JSON.stringify(row));
await pool.end();