Files
Web_BLS_RCUAction_Server/docs/plan-room-status-sync.md
XuJiacheng cf61e8dac6 feat: 实现房间状态同步功能
- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。
- 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。
- 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。
- 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。
- 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。
- 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
- 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。
- 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
2026-03-02 11:47:52 +08:00

77 lines
5.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Room Status 状态表同步方案
## 1. 背景
我们需要将 RCU Action Server 接收到的 Kafka 数据(主要是 `0x36` 上报和 `0x0F` 下发指令),实时同步提炼并写入快照表 `room_status.room_status_moment`
该表为设备实时状态表,主要用于存储每个房间/设备的最新状态,它可能由多个微服务共同维护。当前服务主要负责基于不固定长度的通用设备回调帧来更新对应的 `dev_loops``faulty_device_count` JSONB 字段。
注意:本更新为非阻塞异步更新,旨在不拖累主力流水表(`rcu_action_events`)写入性能的同时,保证外部系统读状态表时的即时性。
## 2. 表结构与唯一性分析
**目标表**`room_status.room_status_moment`
**逻辑唯一键**`(hotel_id, room_id, device_id)`
**数据库设计策略**
- 为满足 PostgreSQL 的 `ON CONFLICT` 原子级并发合并Upsert能力表中必需包含针对 `(hotel_id, room_id, device_id)` 的唯一约束 / 索引。
- 快照表可能在独立数据库进行负载隔离,业务侧需提供其独立的 DB 池连接能力。
## 3. 字段映射方案 (当前项目)
不采用硬编码多列映射,转而采用 JSONB 结构进行灵活映射。
| 源字段 (Kafka: `0x36`/`0x0F`) | 目标表字段 | 数据类型 | 更新逻辑 |
| :--- | :--- | :--- | :--- |
| `hotel_id`, `room_id`, `device_id`| `hotel_id`,`room_id`,`device_id` | 匹配 | 主键/查询键 |
| `ts_ms` | `ts_ms` | BIGINT | 保存该快照的更新时间,取最新 |
| `sys_lock_status` | `sys_lock_status` | SMALLINT | 只在kafka传入的该字段非空时覆盖 |
| `device_list` (0x36) 或 `control_list` (0x0F) | `dev_loops` | JSONB | **合并策略 (Merge)**: <br/>提取每个元素的 `dev_type`, `dev_addr`, `dev_loop` 分别补零为 3 位并拼接成 9 位长作为 Key (如 `001023012`),其 Value 取对应的状态数据。<br/>与数据库中原有 JSONB 执行 `\|\|` 运算进行增量覆盖合并。 |
| `fault_list` (0x36) | `faulty_device_count`| JSONB | **替换策略 (Replace)**: <br/>由于 `0x36` 是上报此刻的全量故障清单,故直接将其整个存为 JSON Array 覆盖原字段即可。 |
## 4. 写入策略与性能优化
### 4.1 内存合并去重设计
Kafka 瞬发同设备大批量跳变事件会引发严重的数据库并发写压。故采取**批处理聚合**配合**JSON 局部合并**的策略:
- **`StatusBatchProcessor`** 会收集给定窗口期 (例如 `500ms`) 内产生的所有更新状态。
- 使用 `Map<hotel_id+room_id+device_id, MergedStatusData>` 来暂存去重状态。
- **深合并**:针对 `dev_loops` 更新,内存中会对同一个设备的后者对前者进行 Object 解析与替换合并,保证推到数据库的只有一条终态的最完整 SQL 语句。
### 4.2 SQL UPSERT 模板
利用 PostgreSQL `ON CONFLICT DO UPDATE SET` 自动完成字段保鲜与合并:
```sql
INSERT INTO room_status.room_status_moment (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, dev_loops, faulty_device_count
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8::jsonb
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = GREATEST(room_status.room_status_moment.ts_ms, EXCLUDED.ts_ms),
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
END,
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
WHERE EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms;
```
### 4.3 代码修改计划
为完全支持本项目的特性及独立解耦要求,接下来我们将实施以下新增及重构计划:
1. **配置层 (`src/config/config.js`)**
- 补充 `roomStatusDb` 的独立环境配置项定义(如 `ROOM_STATUS_DB_HOST` 等),支持独立连入快照库以避免抢占写日志的大库连接池。
2. **数据库管理器 (`src/db/roomStatusManager.js`)**
- 新建单独的管理器及连接池单例,专职处理向目的状态表的批量 Upsert 操作及报错封装,与原有的流水日志追加入库流程(`databaseManager.js`)从库底层隔离开来。
3. **批量写入器 (`src/db/statusBatchProcessor.js`)**
- 专属于快照表的批量聚合任务列队类。
- 实现批量数据接收保护,以及在触发定时 Flush 执行前,依据前文设计的基于 `(hotel, room, device)` 唯一维度对于 `dev_loops` 的状态内存全合并算法。
4. **状态转换提取器 (`src/processor/statusExtractor.js`)**
- 提取业务侧逻辑的编解码中间件功能。它负责消化并筛选被校验验证后的 `KafkaPayload` 模型数据,将结构里错综的 `device_list``fault_list` 提炼为符合合并规则的 `dev_loops`/`faulty_device_count` 结构扁平化独立快照对象。
5. **主流程集成 (`src/processor/index.js` 及 `src/index.js`)**
- 不改变原先 `processKafkaMessage``BatchProcessor` (`rcu_action_events`) 无脑推送的过程。
- 在主流程处理完毕的数据流中,提取有效状态对象,并以并行非阻塞的形式推送至新的 `StatusBatchProcessor.add(...)`。以此做到即使状态库网络出现波动也能保护 Kafka 的持续稳定流转消费。