- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。 - 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。 - 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。 - 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。 - 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。 - 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。 - 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。 - 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
5.4 KiB
5.4 KiB
Room Status 状态表同步方案
1. 背景
我们需要将 RCU Action Server 接收到的 Kafka 数据(主要是 0x36 上报和 0x0F 下发指令),实时同步提炼并写入快照表 room_status.room_status_moment。
该表为设备实时状态表,主要用于存储每个房间/设备的最新状态,它可能由多个微服务共同维护。当前服务主要负责基于不固定长度的通用设备回调帧来更新对应的 dev_loops 及 faulty_device_count JSONB 字段。
注意:本更新为非阻塞异步更新,旨在不拖累主力流水表(rcu_action_events)写入性能的同时,保证外部系统读状态表时的即时性。
2. 表结构与唯一性分析
目标表:room_status.room_status_moment
逻辑唯一键:(hotel_id, room_id, device_id)
数据库设计策略:
- 为满足 PostgreSQL 的
ON CONFLICT原子级并发合并(Upsert)能力,表中必需包含针对(hotel_id, room_id, device_id)的唯一约束 / 索引。 - 快照表可能在独立数据库进行负载隔离,业务侧需提供其独立的 DB 池连接能力。
3. 字段映射方案 (当前项目)
不采用硬编码多列映射,转而采用 JSONB 结构进行灵活映射。
源字段 (Kafka: 0x36/0x0F) |
目标表字段 | 数据类型 | 更新逻辑 |
|---|---|---|---|
hotel_id, room_id, device_id |
hotel_id,room_id,device_id |
匹配 | 主键/查询键 |
ts_ms |
ts_ms |
BIGINT | 保存该快照的更新时间,取最新 |
sys_lock_status |
sys_lock_status |
SMALLINT | 只在kafka传入的该字段非空时覆盖 |
device_list (0x36) 或 control_list (0x0F) |
dev_loops |
JSONB | 合并策略 (Merge): 提取每个元素的 dev_type, dev_addr, dev_loop 分别补零为 3 位并拼接成 9 位长作为 Key (如 001023012),其 Value 取对应的状态数据。与数据库中原有 JSONB 执行 || 运算进行增量覆盖合并。 |
fault_list (0x36) |
faulty_device_count |
JSONB | 替换策略 (Replace): 由于 0x36 是上报此刻的全量故障清单,故直接将其整个存为 JSON Array 覆盖原字段即可。 |
4. 写入策略与性能优化
4.1 内存合并去重设计
Kafka 瞬发同设备大批量跳变事件会引发严重的数据库并发写压。故采取批处理聚合配合JSON 局部合并的策略:
StatusBatchProcessor会收集给定窗口期 (例如500ms) 内产生的所有更新状态。- 使用
Map<hotel_id+room_id+device_id, MergedStatusData>来暂存去重状态。 - 深合并:针对
dev_loops更新,内存中会对同一个设备的后者对前者进行 Object 解析与替换合并,保证推到数据库的只有一条终态的最完整 SQL 语句。
4.2 SQL UPSERT 模板
利用 PostgreSQL ON CONFLICT DO UPDATE SET 自动完成字段保鲜与合并:
INSERT INTO room_status.room_status_moment (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, dev_loops, faulty_device_count
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8::jsonb
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = GREATEST(room_status.room_status_moment.ts_ms, EXCLUDED.ts_ms),
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
END,
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
WHERE EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms;
4.3 代码修改计划
为完全支持本项目的特性及独立解耦要求,接下来我们将实施以下新增及重构计划:
-
配置层 (
src/config/config.js)- 补充
roomStatusDb的独立环境配置项定义(如ROOM_STATUS_DB_HOST等),支持独立连入快照库以避免抢占写日志的大库连接池。
- 补充
-
数据库管理器 (
src/db/roomStatusManager.js)- 新建单独的管理器及连接池单例,专职处理向目的状态表的批量 Upsert 操作及报错封装,与原有的流水日志追加入库流程(
databaseManager.js)从库底层隔离开来。
- 新建单独的管理器及连接池单例,专职处理向目的状态表的批量 Upsert 操作及报错封装,与原有的流水日志追加入库流程(
-
批量写入器 (
src/db/statusBatchProcessor.js)- 专属于快照表的批量聚合任务列队类。
- 实现批量数据接收保护,以及在触发定时 Flush 执行前,依据前文设计的基于
(hotel, room, device)唯一维度对于dev_loops的状态内存全合并算法。
-
状态转换提取器 (
src/processor/statusExtractor.js)- 提取业务侧逻辑的编解码中间件功能。它负责消化并筛选被校验验证后的
KafkaPayload模型数据,将结构里错综的device_list及fault_list提炼为符合合并规则的dev_loops/faulty_device_count结构扁平化独立快照对象。
- 提取业务侧逻辑的编解码中间件功能。它负责消化并筛选被校验验证后的
-
主流程集成 (
src/processor/index.js及src/index.js)- 不改变原先
processKafkaMessage往BatchProcessor(rcu_action_events) 无脑推送的过程。 - 在主流程处理完毕的数据流中,提取有效状态对象,并以并行非阻塞的形式推送至新的
StatusBatchProcessor.add(...)。以此做到即使状态库网络出现波动也能保护 Kafka 的持续稳定流转消费。
- 不改变原先