2026-03-11 09:55:05 +08:00
|
|
|
|
# Web_BLS_OldRcu_Heartbeat_Server
|
|
|
|
|
|
|
2026-03-12 14:11:02 +08:00
|
|
|
|
BLS 老主机 RCU 心跳刷新状态表服务。
|
|
|
|
|
|
|
|
|
|
|
|
## 项目说明
|
|
|
|
|
|
|
|
|
|
|
|
当前已初始化的后端项目位于 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json),功能是从 Kafka topic `blwlog4Nodejs-oldrcu-heartbeat-topic` 消费心跳数据,提取 `ts_ms`、`hotel_id`、`room_id`、`device_id`,再批量写入 G5 库 `room_status.room_status_moment_g5`。
|
|
|
|
|
|
|
|
|
|
|
|
写库策略不是纯 INSERT,也不是先查再 UPDATE,而是统一采用单条批量 SQL:`INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE`。
|
|
|
|
|
|
|
2026-03-14 17:47:26 +08:00
|
|
|
|
## 核心规则
|
2026-03-12 14:11:02 +08:00
|
|
|
|
|
|
|
|
|
|
1. Kafka 来源 topic:`blwlog4Nodejs-oldrcu-heartbeat-topic`
|
|
|
|
|
|
2. 目标表:`room_status.room_status_moment_g5`
|
|
|
|
|
|
3. 数据库连接来源:使用 `.env` 中的 `POSTGRES_HOST_G5`、`POSTGRES_PORT_G5`、`POSTGRES_DATABASE_G5`、`POSTGRES_USER_G5`、`POSTGRES_PASSWORD_G5`
|
|
|
|
|
|
3. 主键冲突键:`hotel_id + room_id`
|
|
|
|
|
|
4. 写库频率:每 5 秒 flush 一次当前缓冲批次
|
|
|
|
|
|
5. 批次内去重:同一个 `hotel_id + room_id` 只保留 `ts_ms` 最大的一条
|
|
|
|
|
|
6. 冲突更新:统一走 `ON CONFLICT DO UPDATE`
|
|
|
|
|
|
7. 行已存在时,仍然要执行更新,将 `online_status` 置为 `1`
|
|
|
|
|
|
8. `ts_ms` 使用新旧值中的较大者,防止乱序消息导致时间回滚
|
|
|
|
|
|
|
|
|
|
|
|
## 处理流程
|
|
|
|
|
|
|
|
|
|
|
|
### 方法级链路
|
|
|
|
|
|
|
|
|
|
|
|
1. [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `bootstrap()` 初始化 Redis、PostgreSQL、批处理器和 Kafka consumer。
|
|
|
|
|
|
2. `bootstrap()` 调用 [src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js) 的 `createKafkaConsumers()` 创建多个 `ConsumerGroup` 实例。
|
|
|
|
|
|
3. 每条 Kafka 消息进入 [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `handleMessage(message)`。
|
|
|
|
|
|
4. `handleMessage(message)` 将 `message.value` 转成字符串后,调用 [src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js) 的 `parseHeartbeat(raw)`。
|
|
|
|
|
|
5. `parseHeartbeat(raw)` 通过 `zod` 校验,只允许 `{ ts_ms, hotel_id, room_id, device_id }` 进入后续链路。
|
|
|
|
|
|
6. 解析成功后,`handleMessage(message)` 调用 [src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `add(record)`。
|
|
|
|
|
|
7. `add(record)` 使用 `hotel_id:room_id` 作为 Map key,在内存缓冲中去重,只保留 `ts_ms` 更大的那条记录。
|
|
|
|
|
|
8. 达到 5 秒窗口或缓冲上限后,[src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `flush()` 被触发。
|
|
|
|
|
|
9. `flush()` 取出当前批次快照,调用 [src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js) 的 `upsertBatch(rows)`。
|
|
|
|
|
|
10. `upsertBatch(rows)` 生成批量 `INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE` SQL,并写入 `room_status.room_status_moment_g5`。
|
|
|
|
|
|
11. 如果发生主键冲突,则始终执行更新:`online_status = 1`,`ts_ms` 取新旧较大值,`device_id` 仅在新消息时间不早于当前记录时才覆盖。
|
|
|
|
|
|
|
|
|
|
|
|
### 流程图
|
|
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
|
flowchart TD
|
|
|
|
|
|
A[Kafka topic\nblwlog4Nodejs-oldrcu-heartbeat-topic] --> B[createKafkaConsumers\n创建 ConsumerGroup]
|
|
|
|
|
|
B --> C[handleMessage(message)]
|
|
|
|
|
|
C --> D[message.value 转 UTF-8 字符串]
|
|
|
|
|
|
D --> E[parseHeartbeat(raw)]
|
|
|
|
|
|
E --> F{zod 校验通过?}
|
|
|
|
|
|
F -- 否 --> G[metricCollector.increment('parse_error')\n丢弃消息]
|
|
|
|
|
|
F -- 是 --> H[得到 record\n{ts_ms, hotel_id, room_id, device_id}]
|
|
|
|
|
|
H --> I[HeartbeatBuffer.add(record)]
|
|
|
|
|
|
I --> J[生成 key = hotel_id:room_id]
|
|
|
|
|
|
J --> K{buffer 中已存在同 key?}
|
|
|
|
|
|
K -- 否 --> L[直接放入 Map]
|
|
|
|
|
|
K -- 是 --> M{record.ts_ms > existing.ts_ms?}
|
|
|
|
|
|
M -- 否 --> N[忽略旧记录]
|
|
|
|
|
|
M -- 是 --> O[覆盖 existing.ts_ms\n覆盖 existing.device_id]
|
|
|
|
|
|
L --> P{达到 5 秒或 buffer 上限?}
|
|
|
|
|
|
O --> P
|
|
|
|
|
|
N --> P
|
|
|
|
|
|
P -- 否 --> Q[继续等待下一批 Kafka 消息]
|
|
|
|
|
|
P -- 是 --> R[HeartbeatBuffer.flush()]
|
|
|
|
|
|
R --> S[rows = 当前 Map 快照]
|
|
|
|
|
|
S --> T[HeartbeatDbManager.upsertBatch(rows)]
|
|
|
|
|
|
T --> U[INSERT INTO room_status.room_status_moment_g5]
|
|
|
|
|
|
U --> V[ON CONFLICT (hotel_id, room_id) DO UPDATE]
|
|
|
|
|
|
V --> W[SET ts_ms = EXCLUDED.ts_ms]
|
|
|
|
|
|
W --> X[SET device_id = EXCLUDED.device_id]
|
|
|
|
|
|
X --> Y[SET online_status = 1]
|
|
|
|
|
|
Y --> Z[WHERE EXCLUDED.ts_ms >= 当前表 ts_ms]
|
|
|
|
|
|
Z --> AA[批量写库完成]
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
## 关键代码位置
|
|
|
|
|
|
|
|
|
|
|
|
1. Kafka 启动入口:[bls-oldrcu-heartbeat-backend/src/index.js](bls-oldrcu-heartbeat-backend/src/index.js)
|
|
|
|
|
|
2. Kafka consumer 封装:[bls-oldrcu-heartbeat-backend/src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js)
|
|
|
|
|
|
3. 心跳解析器:[bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js)
|
|
|
|
|
|
4. 批处理去重缓冲:[bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js)
|
|
|
|
|
|
5. 数据库 upsert 写入:[bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js)
|
|
|
|
|
|
|
|
|
|
|
|
## 运行方式
|
|
|
|
|
|
|
|
|
|
|
|
在 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json) 所在目录执行:
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
npm install
|
|
|
|
|
|
npm run dev
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
构建与测试:
|
|
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
|
npm run build
|
|
|
|
|
|
npm run test
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
## 当前实现结论
|
|
|
|
|
|
|
|
|
|
|
|
当前实现已经满足以下要求:
|
|
|
|
|
|
|
|
|
|
|
|
1. 从指定 Kafka topic 消费数据
|
|
|
|
|
|
2. 使用 G5 库连接参数,而不是基础库连接参数
|
|
|
|
|
|
3. 只提取并处理 `ts_ms`、`hotel_id`、`room_id`、`device_id`
|
|
|
|
|
|
4. 以 `hotel_id + room_id` 作为唯一键
|
|
|
|
|
|
5. 每 5 秒批量写库一次
|
|
|
|
|
|
6. 批次内重复 key 只保留最新 `ts_ms`
|
|
|
|
|
|
7. 数据库侧统一使用 `ON CONFLICT DO UPDATE`
|
|
|
|
|
|
8. 每次落库时 `online_status` 固定写成 `1`
|
|
|
|
|
|
9. 通过 `GREATEST(EXCLUDED.ts_ms, current.ts_ms)` 避免乱序旧消息回滚时间
|