- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。 - 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。 - 新增配置文件 config.js,支持从环境变量加载配置。 - 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。 - 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。 - 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。 - 新增日志记录工具,支持不同级别的日志输出。 - 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。 - 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。 - 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。 - 配置 Vite 构建工具,支持 Node.js 环境的构建。
109 lines
5.6 KiB
Markdown
109 lines
5.6 KiB
Markdown
# Web_BLS_OldRcu_Heartbeat_Server
|
||
|
||
BLS 老主机 RCU 心跳刷新状态表服务。
|
||
|
||
## 项目说明
|
||
|
||
当前已初始化的后端项目位于 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json),功能是从 Kafka topic `blwlog4Nodejs-oldrcu-heartbeat-topic` 消费心跳数据,提取 `ts_ms`、`hotel_id`、`room_id`、`device_id`,再批量写入 G5 库 `room_status.room_status_moment_g5`。
|
||
|
||
写库策略不是纯 INSERT,也不是先查再 UPDATE,而是统一采用单条批量 SQL:`INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE`。
|
||
|
||
## 核心规则
|
||
|
||
1. Kafka 来源 topic:`blwlog4Nodejs-oldrcu-heartbeat-topic`
|
||
2. 目标表:`room_status.room_status_moment_g5`
|
||
3. 数据库连接来源:使用 `.env` 中的 `POSTGRES_HOST_G5`、`POSTGRES_PORT_G5`、`POSTGRES_DATABASE_G5`、`POSTGRES_USER_G5`、`POSTGRES_PASSWORD_G5`
|
||
3. 主键冲突键:`hotel_id + room_id`
|
||
4. 写库频率:每 5 秒 flush 一次当前缓冲批次
|
||
5. 批次内去重:同一个 `hotel_id + room_id` 只保留 `ts_ms` 最大的一条
|
||
6. 冲突更新:统一走 `ON CONFLICT DO UPDATE`
|
||
7. 行已存在时,仍然要执行更新,将 `online_status` 置为 `1`
|
||
8. `ts_ms` 使用新旧值中的较大者,防止乱序消息导致时间回滚
|
||
|
||
## 处理流程
|
||
|
||
### 方法级链路
|
||
|
||
1. [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `bootstrap()` 初始化 Redis、PostgreSQL、批处理器和 Kafka consumer。
|
||
2. `bootstrap()` 调用 [src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js) 的 `createKafkaConsumers()` 创建多个 `ConsumerGroup` 实例。
|
||
3. 每条 Kafka 消息进入 [src/index.js](bls-oldrcu-heartbeat-backend/src/index.js) 中的 `handleMessage(message)`。
|
||
4. `handleMessage(message)` 将 `message.value` 转成字符串后,调用 [src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js) 的 `parseHeartbeat(raw)`。
|
||
5. `parseHeartbeat(raw)` 通过 `zod` 校验,只允许 `{ ts_ms, hotel_id, room_id, device_id }` 进入后续链路。
|
||
6. 解析成功后,`handleMessage(message)` 调用 [src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `add(record)`。
|
||
7. `add(record)` 使用 `hotel_id:room_id` 作为 Map key,在内存缓冲中去重,只保留 `ts_ms` 更大的那条记录。
|
||
8. 达到 5 秒窗口或缓冲上限后,[src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js) 的 `flush()` 被触发。
|
||
9. `flush()` 取出当前批次快照,调用 [src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js) 的 `upsertBatch(rows)`。
|
||
10. `upsertBatch(rows)` 生成批量 `INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE` SQL,并写入 `room_status.room_status_moment_g5`。
|
||
11. 如果发生主键冲突,则始终执行更新:`online_status = 1`,`ts_ms` 取新旧较大值,`device_id` 仅在新消息时间不早于当前记录时才覆盖。
|
||
|
||
### 流程图
|
||
|
||
```mermaid
|
||
flowchart TD
|
||
A[Kafka topic\nblwlog4Nodejs-oldrcu-heartbeat-topic] --> B[createKafkaConsumers\n创建 ConsumerGroup]
|
||
B --> C[handleMessage(message)]
|
||
C --> D[message.value 转 UTF-8 字符串]
|
||
D --> E[parseHeartbeat(raw)]
|
||
E --> F{zod 校验通过?}
|
||
F -- 否 --> G[metricCollector.increment('parse_error')\n丢弃消息]
|
||
F -- 是 --> H[得到 record\n{ts_ms, hotel_id, room_id, device_id}]
|
||
H --> I[HeartbeatBuffer.add(record)]
|
||
I --> J[生成 key = hotel_id:room_id]
|
||
J --> K{buffer 中已存在同 key?}
|
||
K -- 否 --> L[直接放入 Map]
|
||
K -- 是 --> M{record.ts_ms > existing.ts_ms?}
|
||
M -- 否 --> N[忽略旧记录]
|
||
M -- 是 --> O[覆盖 existing.ts_ms\n覆盖 existing.device_id]
|
||
L --> P{达到 5 秒或 buffer 上限?}
|
||
O --> P
|
||
N --> P
|
||
P -- 否 --> Q[继续等待下一批 Kafka 消息]
|
||
P -- 是 --> R[HeartbeatBuffer.flush()]
|
||
R --> S[rows = 当前 Map 快照]
|
||
S --> T[HeartbeatDbManager.upsertBatch(rows)]
|
||
T --> U[INSERT INTO room_status.room_status_moment_g5]
|
||
U --> V[ON CONFLICT (hotel_id, room_id) DO UPDATE]
|
||
V --> W[SET ts_ms = EXCLUDED.ts_ms]
|
||
W --> X[SET device_id = EXCLUDED.device_id]
|
||
X --> Y[SET online_status = 1]
|
||
Y --> Z[WHERE EXCLUDED.ts_ms >= 当前表 ts_ms]
|
||
Z --> AA[批量写库完成]
|
||
```
|
||
|
||
## 关键代码位置
|
||
|
||
1. Kafka 启动入口:[bls-oldrcu-heartbeat-backend/src/index.js](bls-oldrcu-heartbeat-backend/src/index.js)
|
||
2. Kafka consumer 封装:[bls-oldrcu-heartbeat-backend/src/kafka/consumer.js](bls-oldrcu-heartbeat-backend/src/kafka/consumer.js)
|
||
3. 心跳解析器:[bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js](bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js)
|
||
4. 批处理去重缓冲:[bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js](bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js)
|
||
5. 数据库 upsert 写入:[bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js](bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js)
|
||
|
||
## 运行方式
|
||
|
||
在 [bls-oldrcu-heartbeat-backend/package.json](bls-oldrcu-heartbeat-backend/package.json) 所在目录执行:
|
||
|
||
```bash
|
||
npm install
|
||
npm run dev
|
||
```
|
||
|
||
构建与测试:
|
||
|
||
```bash
|
||
npm run build
|
||
npm run test
|
||
```
|
||
|
||
## 当前实现结论
|
||
|
||
当前实现已经满足以下要求:
|
||
|
||
1. 从指定 Kafka topic 消费数据
|
||
2. 使用 G5 库连接参数,而不是基础库连接参数
|
||
3. 只提取并处理 `ts_ms`、`hotel_id`、`room_id`、`device_id`
|
||
4. 以 `hotel_id + room_id` 作为唯一键
|
||
5. 每 5 秒批量写库一次
|
||
6. 批次内重复 key 只保留最新 `ts_ms`
|
||
7. 数据库侧统一使用 `ON CONFLICT DO UPDATE`
|
||
8. 每次落库时 `online_status` 固定写成 `1`
|
||
9. 通过 `GREATEST(EXCLUDED.ts_ms, current.ts_ms)` 避免乱序旧消息回滚时间 |