Files

109 lines
5.6 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Web_BLS_OldRcu_Heartbeat_Server
BLS 老主机 RCU 心跳刷新状态表服务。
## 项目说明
当前已初始化的后端项目位于 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json),功能是从 Kafka topic `blwlog4Nodejs-oldrcu-heartbeat-topic` 消费心跳数据,提取 `ts_ms``hotel_id``room_id``device_id`,再批量写入 G5 库 `room_status.room_status_moment_g5`
写库策略不是纯 INSERT也不是先查再 UPDATE而是统一采用单条批量 SQL`INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE`
## 核心规则
1. Kafka 来源 topic`blwlog4Nodejs-oldrcu-heartbeat-topic`
2. 目标表:`room_status.room_status_moment_g5`
3. 数据库连接来源:使用 `.env` 中的 `POSTGRES_HOST_G5``POSTGRES_PORT_G5``POSTGRES_DATABASE_G5``POSTGRES_USER_G5``POSTGRES_PASSWORD_G5`
3. 主键冲突键:`hotel_id + room_id`
4. 写库频率:每 5 秒 flush 一次当前缓冲批次
5. 批次内去重:同一个 `hotel_id + room_id` 只保留 `ts_ms` 最大的一条
6. 冲突更新:统一走 `ON CONFLICT DO UPDATE`
7. 行已存在时,仍然要执行更新,将 `online_status` 置为 `1`
8. `ts_ms` 使用新旧值中的较大者,防止乱序消息导致时间回滚
## 处理流程
### 方法级链路
1. [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `bootstrap()` 初始化 Redis、PostgreSQL、批处理器和 Kafka consumer。
2. `bootstrap()` 调用 [src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js) 的 `createKafkaConsumers()` 创建多个 `ConsumerGroup` 实例。
3. 每条 Kafka 消息进入 [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `handleMessage(message)`
4. `handleMessage(message)``message.value` 转成字符串后,调用 [src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js) 的 `parseHeartbeat(raw)`
5. `parseHeartbeat(raw)` 通过 `zod` 校验,只允许 `{ ts_ms, hotel_id, room_id, device_id }` 进入后续链路。
6. 解析成功后,`handleMessage(message)` 调用 [src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `add(record)`
7. `add(record)` 使用 `hotel_id:room_id` 作为 Map key在内存缓冲中去重只保留 `ts_ms` 更大的那条记录。
8. 达到 5 秒窗口或缓冲上限后,[src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `flush()` 被触发。
9. `flush()` 取出当前批次快照,调用 [src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js) 的 `upsertBatch(rows)`
10. `upsertBatch(rows)` 生成批量 `INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE` SQL并写入 `room_status.room_status_moment_g5`
11. 如果发生主键冲突,则始终执行更新:`online_status = 1``ts_ms` 取新旧较大值,`device_id` 仅在新消息时间不早于当前记录时才覆盖。
### 流程图
```mermaid
flowchart TD
A[Kafka topic\nblwlog4Nodejs-oldrcu-heartbeat-topic] --> B[createKafkaConsumers\n创建 ConsumerGroup]
B --> C[handleMessage(message)]
C --> D[message.value 转 UTF-8 字符串]
D --> E[parseHeartbeat(raw)]
E --> F{zod 校验通过?}
F -- 否 --> G[metricCollector.increment('parse_error')\n丢弃消息]
F -- 是 --> H[得到 record\n{ts_ms, hotel_id, room_id, device_id}]
H --> I[HeartbeatBuffer.add(record)]
I --> J[生成 key = hotel_id:room_id]
J --> K{buffer 中已存在同 key?}
K -- 否 --> L[直接放入 Map]
K -- 是 --> M{record.ts_ms > existing.ts_ms?}
M -- 否 --> N[忽略旧记录]
M -- 是 --> O[覆盖 existing.ts_ms\n覆盖 existing.device_id]
L --> P{达到 5 秒或 buffer 上限?}
O --> P
N --> P
P -- 否 --> Q[继续等待下一批 Kafka 消息]
P -- 是 --> R[HeartbeatBuffer.flush()]
R --> S[rows = 当前 Map 快照]
S --> T[HeartbeatDbManager.upsertBatch(rows)]
T --> U[INSERT INTO room_status.room_status_moment_g5]
U --> V[ON CONFLICT (hotel_id, room_id) DO UPDATE]
V --> W[SET ts_ms = EXCLUDED.ts_ms]
W --> X[SET device_id = EXCLUDED.device_id]
X --> Y[SET online_status = 1]
Y --> Z[WHERE EXCLUDED.ts_ms >= 当前表 ts_ms]
Z --> AA[批量写库完成]
```
## 关键代码位置
1. Kafka 启动入口:[bls-oldrcu-heartbeat-backend/src/index.js](bls-oldrcu-heartbeat-backend/src/index.js)
2. Kafka consumer 封装:[bls-oldrcu-heartbeat-backend/src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js)
3. 心跳解析器:[bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js)
4. 批处理去重缓冲:[bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js)
5. 数据库 upsert 写入:[bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js)
## 运行方式
在 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json) 所在目录执行:
```bash
npm install
npm run dev
```
构建与测试:
```bash
npm run build
npm run test
```
## 当前实现结论
当前实现已经满足以下要求:
1. 从指定 Kafka topic 消费数据
2. 使用 G5 库连接参数,而不是基础库连接参数
3. 只提取并处理 `ts_ms``hotel_id``room_id``device_id`
4.`hotel_id + room_id` 作为唯一键
5. 每 5 秒批量写库一次
6. 批次内重复 key 只保留最新 `ts_ms`
7. 数据库侧统一使用 `ON CONFLICT DO UPDATE`
8. 每次落库时 `online_status` 固定写成 `1`
9. 通过 `GREATEST(EXCLUDED.ts_ms, current.ts_ms)` 避免乱序旧消息回滚时间