Files
Web_BLS_OldRcu_Heartbeat_Se…/README.md

5.6 KiB
Raw Blame History

Web_BLS_OldRcu_Heartbeat_Server

BLS 老主机 RCU 心跳刷新状态表服务。

项目说明

当前已初始化的后端项目位于 bls-oldrcu-heartbeat-backend/package.json,功能是从 Kafka topic blwlog4Nodejs-oldrcu-heartbeat-topic 消费心跳数据,提取 ts_mshotel_idroom_iddevice_id,再批量写入 G5 库 room_status.room_status_moment_g5

写库策略不是纯 INSERT也不是先查再 UPDATE而是统一采用单条批量 SQLINSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE

核心规则

  1. Kafka 来源 topicblwlog4Nodejs-oldrcu-heartbeat-topic
  2. 目标表:room_status.room_status_moment_g5
  3. 数据库连接来源:使用 .env 中的 POSTGRES_HOST_G5POSTGRES_PORT_G5POSTGRES_DATABASE_G5POSTGRES_USER_G5POSTGRES_PASSWORD_G5
  4. 主键冲突键:hotel_id + room_id
  5. 写库频率:每 5 秒 flush 一次当前缓冲批次
  6. 批次内去重:同一个 hotel_id + room_id 只保留 ts_ms 最大的一条
  7. 冲突更新:统一走 ON CONFLICT DO UPDATE
  8. 行已存在时,仍然要执行更新,将 online_status 置为 1
  9. ts_ms 使用新旧值中的较大者,防止乱序消息导致时间回滚

处理流程

方法级链路

  1. src/index.js 中的 bootstrap() 初始化 Redis、PostgreSQL、批处理器和 Kafka consumer。
  2. bootstrap() 调用 src/kafka/consumer.jscreateKafkaConsumers() 创建多个 ConsumerGroup 实例。
  3. 每条 Kafka 消息进入 src/index.js 中的 handleMessage(message)
  4. handleMessage(message)message.value 转成字符串后,调用 src/processor/heartbeatParser.jsparseHeartbeat(raw)
  5. parseHeartbeat(raw) 通过 zod 校验,只允许 { ts_ms, hotel_id, room_id, device_id } 进入后续链路。
  6. 解析成功后,handleMessage(message) 调用 src/buffer/heartbeatBuffer.jsadd(record)
  7. add(record) 使用 hotel_id:room_id 作为 Map key在内存缓冲中去重只保留 ts_ms 更大的那条记录。
  8. 达到 5 秒窗口或缓冲上限后,src/buffer/heartbeatBuffer.jsflush() 被触发。
  9. flush() 取出当前批次快照,调用 src/db/heartbeatDbManager.jsupsertBatch(rows)
  10. upsertBatch(rows) 生成批量 INSERT ... ON CONFLICT (hotel_id, room_id) DO UPDATE SQL并写入 room_status.room_status_moment_g5
  11. 如果发生主键冲突,则始终执行更新:online_status = 1ts_ms 取新旧较大值,device_id 仅在新消息时间不早于当前记录时才覆盖。

流程图

flowchart TD
	A[Kafka topic\nblwlog4Nodejs-oldrcu-heartbeat-topic] --> B[createKafkaConsumers\n创建 ConsumerGroup]
	B --> C[handleMessage(message)]
	C --> D[message.value 转 UTF-8 字符串]
	D --> E[parseHeartbeat(raw)]
	E --> F{zod 校验通过?}
	F -- 否 --> G[metricCollector.increment('parse_error')\n丢弃消息]
	F -- 是 --> H[得到 record\n{ts_ms, hotel_id, room_id, device_id}]
	H --> I[HeartbeatBuffer.add(record)]
	I --> J[生成 key = hotel_id:room_id]
	J --> K{buffer 中已存在同 key?}
	K -- 否 --> L[直接放入 Map]
	K -- 是 --> M{record.ts_ms > existing.ts_ms?}
	M -- 否 --> N[忽略旧记录]
	M -- 是 --> O[覆盖 existing.ts_ms\n覆盖 existing.device_id]
	L --> P{达到 5 秒或 buffer 上限?}
	O --> P
	N --> P
	P -- 否 --> Q[继续等待下一批 Kafka 消息]
	P -- 是 --> R[HeartbeatBuffer.flush()]
	R --> S[rows = 当前 Map 快照]
	S --> T[HeartbeatDbManager.upsertBatch(rows)]
	T --> U[INSERT INTO room_status.room_status_moment_g5]
	U --> V[ON CONFLICT (hotel_id, room_id) DO UPDATE]
	V --> W[SET ts_ms = EXCLUDED.ts_ms]
	W --> X[SET device_id = EXCLUDED.device_id]
	X --> Y[SET online_status = 1]
	Y --> Z[WHERE EXCLUDED.ts_ms >= 当前表 ts_ms]
	Z --> AA[批量写库完成]

关键代码位置

  1. Kafka 启动入口:bls-oldrcu-heartbeat-backend/src/index.js
  2. Kafka consumer 封装:bls-oldrcu-heartbeat-backend/src/kafka/consumer.js
  3. 心跳解析器:bls-oldrcu-heartbeat-backend/src/processor/heartbeatParser.js
  4. 批处理去重缓冲:bls-oldrcu-heartbeat-backend/src/buffer/heartbeatBuffer.js
  5. 数据库 upsert 写入:bls-oldrcu-heartbeat-backend/src/db/heartbeatDbManager.js

运行方式

bls-oldrcu-heartbeat-backend/package.json 所在目录执行:

npm install
npm run dev

构建与测试:

npm run build
npm run test

当前实现结论

当前实现已经满足以下要求:

  1. 从指定 Kafka topic 消费数据
  2. 使用 G5 库连接参数,而不是基础库连接参数
  3. 只提取并处理 ts_mshotel_idroom_iddevice_id
  4. hotel_id + room_id 作为唯一键
  5. 每 5 秒批量写库一次
  6. 批次内重复 key 只保留最新 ts_ms
  7. 数据库侧统一使用 ON CONFLICT DO UPDATE
  8. 每次落库时 online_status 固定写成 1
  9. 通过 GREATEST(EXCLUDED.ts_ms, current.ts_ms) 避免乱序旧消息回滚时间