- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。 - 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。 - 新增配置文件 config.js,支持从环境变量加载配置。 - 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。 - 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。 - 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。 - 新增日志记录工具,支持不同级别的日志输出。 - 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。 - 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。 - 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。 - 配置 Vite 构建工具,支持 Node.js 环境的构建。
1.2 KiB
1.2 KiB
OpenSpec Proposal: bls-oldrcu-heartbeat-backend
功能概述
从 Kafka topic blwlog4Nodejs-oldrcu-heartbeat-topic 消费 OldRCU 心跳数据,
经过去重与批处理后,upsert 写入 PostgreSQL G5 库的 room_status.room_status_moment_g5 表。
数据流
Kafka (blwlog4Nodejs-oldrcu-heartbeat-topic)
↓ 消费消息
Message Parser (提取 ts_ms, hotel_id, room_id, device_id)
↓ 投入缓冲
HeartbeatBuffer (Map, key=hotel_id:room_id, 每5秒flush)
↓ 批量写库
PostgreSQL G5 (room_status.room_status_moment_g5)
→ INSERT ON CONFLICT (hotel_id, room_id) DO UPDATE
→ SET ts_ms, device_id, online_status=1
关键约束
- 写库频率:每5秒最多写一次
- 去重策略:同一 hotel_id+room_id 只保留 ts_ms 最大的记录
- online_status:每次写库强制置为 1
npm 依赖
| 包名 | 版本策略 | 用途 |
|---|---|---|
| kafka-node | ^5.0.0 | Kafka 消费 |
| pg | ^8.11.5 | PostgreSQL 连接池 |
| redis | ^4.6.13 | Redis 心跳/日志 |
| dotenv | ^16.4.5 | 环境变量 |
| node-cron | ^4.2.1 | 定时指标上报 |
| zod | ^4.3.6 | 消息Schema校验 |
目标数据库表
- Schema:
room_status - Table:
room_status_moment_g5 - PK:
(hotel_id, room_id)