Files
XuJiacheng cf61e8dac6 feat: 实现房间状态同步功能
- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。
- 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。
- 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。
- 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。
- 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。
- 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
- 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。
- 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
2026-03-02 11:47:52 +08:00

2.0 KiB
Raw Permalink Blame History

Room Status Moment 实时快照同步

背景

将 Kafka 消费的 RCU 设备状态/控制事件(0x36 上报、0x0F 下发),在写入流水日志表 rcu_action_events 的同时,并行提取关键状态信息,以 Upsert 方式同步到独立的快照表 room_status.room_status_moment

变更范围

新增文件

  • src/db/roomStatusManager.js — 独立连接池 + Upsert SQL 封装
  • src/db/statusBatchProcessor.js — 快照专用的批量聚合处理器(内存合并去重)
  • src/processor/statusExtractor.js — 从 Kafka payload 提取 dev_loops/faulty_device_count
  • tests/status_extractor.test.js — statusExtractor 单元测试
  • tests/status_batch_processor.test.js — statusBatchProcessor 单元测试

修改文件

  • src/config/config.js — 新增 roomStatusDb 配置段
  • src/index.js — 挂载 StatusBatchProcessor 并在消息处理流程中并行推送状态更新
  • .env.example — 补充 ROOM_STATUS_DB_* 环境变量示例

缺陷修复 (Bug Fixes)

  • src/db/statusBatchProcessor.js — 添加 isFlushing 状态锁防止高并发带来的重复刷新及数据库死锁 (ON CONFLICT 互锁)。
  • src/db/roomStatusManager.js — 去除 WHERE EXCLUDED.ts_ms >= room_status_moment.ts_ms 条件限制,保证始终强制应用最新的状态覆盖。
  • src/schema/kafkaPayload.js & src/processor/statusExtractor.js — 修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
  • docs/plan-room-status-sync.md — 明确 sys_lock_status 仅在 Kafka 传入的值非空时,才进行覆盖。

设计约束

  1. 状态表写入失败不得阻塞主流水表写入和 Kafka offset 提交
  2. 同一批次中同设备多次更新需在内存中合并后再提交数据库
  3. dev_loops JSONB 使用增量合并 (||)faulty_device_count 使用整体替换
  4. 快照表使用独立数据库连接池,与流水表连接池资源隔离

参考文档

  • docs/plan-room-status-sync.md
  • docs/room_status_moment.sql