Files
Web_BLS_RCUAction_Server/docs/plan-room-status-sync.md
XuJiacheng cf61e8dac6 feat: 实现房间状态同步功能
- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。
- 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。
- 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。
- 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。
- 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。
- 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
- 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。
- 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
2026-03-02 11:47:52 +08:00

5.4 KiB
Raw Permalink Blame History

Room Status 状态表同步方案

1. 背景

我们需要将 RCU Action Server 接收到的 Kafka 数据(主要是 0x36 上报和 0x0F 下发指令),实时同步提炼并写入快照表 room_status.room_status_moment。 该表为设备实时状态表,主要用于存储每个房间/设备的最新状态,它可能由多个微服务共同维护。当前服务主要负责基于不固定长度的通用设备回调帧来更新对应的 dev_loopsfaulty_device_count JSONB 字段。 注意:本更新为非阻塞异步更新,旨在不拖累主力流水表(rcu_action_events)写入性能的同时,保证外部系统读状态表时的即时性。

2. 表结构与唯一性分析

目标表room_status.room_status_moment 逻辑唯一键(hotel_id, room_id, device_id)

数据库设计策略

  • 为满足 PostgreSQL 的 ON CONFLICT 原子级并发合并Upsert能力表中必需包含针对 (hotel_id, room_id, device_id) 的唯一约束 / 索引。
  • 快照表可能在独立数据库进行负载隔离,业务侧需提供其独立的 DB 池连接能力。

3. 字段映射方案 (当前项目)

不采用硬编码多列映射,转而采用 JSONB 结构进行灵活映射。

源字段 (Kafka: 0x36/0x0F) 目标表字段 数据类型 更新逻辑
hotel_id, room_id, device_id hotel_id,room_id,device_id 匹配 主键/查询键
ts_ms ts_ms BIGINT 保存该快照的更新时间,取最新
sys_lock_status sys_lock_status SMALLINT 只在kafka传入的该字段非空时覆盖
device_list (0x36) 或 control_list (0x0F) dev_loops JSONB 合并策略 (Merge):
提取每个元素的 dev_type, dev_addr, dev_loop 分别补零为 3 位并拼接成 9 位长作为 Key (如 001023012),其 Value 取对应的状态数据。
与数据库中原有 JSONB 执行 || 运算进行增量覆盖合并。
fault_list (0x36) faulty_device_count JSONB 替换策略 (Replace):
由于 0x36 是上报此刻的全量故障清单,故直接将其整个存为 JSON Array 覆盖原字段即可。

4. 写入策略与性能优化

4.1 内存合并去重设计

Kafka 瞬发同设备大批量跳变事件会引发严重的数据库并发写压。故采取批处理聚合配合JSON 局部合并的策略:

  • StatusBatchProcessor 会收集给定窗口期 (例如 500ms) 内产生的所有更新状态。
  • 使用 Map<hotel_id+room_id+device_id, MergedStatusData> 来暂存去重状态。
  • 深合并:针对 dev_loops 更新,内存中会对同一个设备的后者对前者进行 Object 解析与替换合并,保证推到数据库的只有一条终态的最完整 SQL 语句。

4.2 SQL UPSERT 模板

利用 PostgreSQL ON CONFLICT DO UPDATE SET 自动完成字段保鲜与合并:

INSERT INTO room_status.room_status_moment (
  guid, ts_ms, hotel_id, room_id, device_id,
  sys_lock_status, dev_loops, faulty_device_count
) VALUES (
  $1, $2, $3, $4, $5,
  $6, $7::jsonb, $8::jsonb
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
  ts_ms = GREATEST(room_status.room_status_moment.ts_ms, EXCLUDED.ts_ms),
  sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
  dev_loops = CASE 
    WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
    ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
  END,
  faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
WHERE EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms;

4.3 代码修改计划

为完全支持本项目的特性及独立解耦要求,接下来我们将实施以下新增及重构计划:

  1. 配置层 (src/config/config.js)

    • 补充 roomStatusDb 的独立环境配置项定义(如 ROOM_STATUS_DB_HOST 等),支持独立连入快照库以避免抢占写日志的大库连接池。
  2. 数据库管理器 (src/db/roomStatusManager.js)

    • 新建单独的管理器及连接池单例,专职处理向目的状态表的批量 Upsert 操作及报错封装,与原有的流水日志追加入库流程(databaseManager.js)从库底层隔离开来。
  3. 批量写入器 (src/db/statusBatchProcessor.js)

    • 专属于快照表的批量聚合任务列队类。
    • 实现批量数据接收保护,以及在触发定时 Flush 执行前,依据前文设计的基于 (hotel, room, device) 唯一维度对于 dev_loops 的状态内存全合并算法。
  4. 状态转换提取器 (src/processor/statusExtractor.js)

    • 提取业务侧逻辑的编解码中间件功能。它负责消化并筛选被校验验证后的 KafkaPayload 模型数据,将结构里错综的 device_listfault_list 提炼为符合合并规则的 dev_loops/faulty_device_count 结构扁平化独立快照对象。
  5. 主流程集成 (src/processor/index.jssrc/index.js)

    • 不改变原先 processKafkaMessageBatchProcessor (rcu_action_events) 无脑推送的过程。
    • 在主流程处理完毕的数据流中,提取有效状态对象,并以并行非阻塞的形式推送至新的 StatusBatchProcessor.add(...)。以此做到即使状态库网络出现波动也能保护 Kafka 的持续稳定流转消费。