Files
XuJiacheng e45d14b720 feat: 实现心跳消息处理模块
- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。
- 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。
- 新增配置文件 config.js,支持从环境变量加载配置。
- 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。
- 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。
- 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。
- 新增日志记录工具,支持不同级别的日志输出。
- 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。
- 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。
- 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。
- 配置 Vite 构建工具,支持 Node.js 环境的构建。
2026-03-12 14:11:02 +08:00

7.8 KiB
Raw Permalink Blame History

BLS OldRCU Heartbeat Backend - OpenSpec

1. 项目简介

项目名称: bls-oldrcu-heartbeat-backend
版本: 1.0.0
维护状态: Active
语言环境: Node.js (ECMAScript Modules)
构建工具: Vite

1.1 核心功能

从 Kafka 消费酒店设备心跳数据,通过多层去重与验证,批量写入 PostgreSQL G5 数据库,并通过 Redis 进行度量上报。

1.2 关键指标

  • 消费吞吐: 6 个并行 Kafka 消费者实例
  • 去重策略: 双层去重5秒缓冲 + 30秒冷却
  • 写入批量: 批量 upsert支持时间序列保护
  • 可靠性: 批量提交偏移量200ms 周期)
  • 消息验证: 严格类型检查4 个必需字段验证

2. 架构设计

2.1 消息处理流水线

Kafka Topic
    ↓
[Parser] - 类型验证ts_ms, hotel_id, room_id, device_id
    ↓
[Buffer] - 5秒缓冲窗口 + 内存去重
    ↓
[Cooldown Filter] - 30秒写入冷却期检查
    ↓
[DB Manager] - Batch Upsert with ts_ms ordering protection
    ↓
PostgreSQL G5 Database (room_status_moment_g5)
    ↓
[Redis Reporter] - 度量统计上报

2.2 消费者扩展策略

  • 自动分区检测: 启动时通过 Kafka 元数据 API 查询实际分区数
  • 动态伸缩: 消费者实例数 = max(配置值 3, Kafka 分区数)
  • 当前配置: 主题有 6 个分区 → 创建 6 个消费者实例

2.3 关键技术选型

技术栈 库版本 用途
消息队列 kafka-node@5.0.0 Kafka 消费端
数据库 pg@8.11.5 PostgreSQL 6.0
缓存 redis@4.6.13 度量上报
定时任务 node-cron@4.2.1 周期性报告
配置管理 dotenv@16.4.5 环境变量加载

3. 规范文档结构

完整的规范文档按照以下模块划分:

文档 覆盖范围
architecture.md 系统架构、消费者伸缩、批处理策略
validation.md 数据验证规则、字段类型、空值处理
kafka.md Kafka 配置、消费策略、分区感知扩展
deduplication.md 双层去重策略、冷却期管理、键值设计
database.md G5 数据库连接、Upsert 逻辑、时间序列保护
testing.md 单元测试、集成测试、验证策略
deployment.md 环境配置、启动流程、监控指标

4. 快速开始

4.1 开发环境

# 安装依赖
npm install

# 运行开发服务
npm run dev

# 执行单元测试
npm run test

# 构建生产版本
npm run build

# Kafka 数据采样(用于验证消息结构)
npm run sample:kafka

4.2 环境变量配置

# PostgreSQL G5 连接
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_DATABASE_G5=dbv6

# Kafka
KAFKA_BROKERS=kafka.blv-oa.com:9092
KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic

# Redis
REDIS_HOST=10.8.8.109
REDIS_PORT=6379

# 缓冲与去重
HEARTBEAT_BUFFER_SIZE_MAX=5000
HEARTBEAT_BUFFER_WINDOW_MS=5000
HEARTBEAT_WRITE_COOLDOWN_MS=30000

# Kafka 消费优化
KAFKA_CONSUMER_INSTANCES=3
KAFKA_BATCH_SIZE=100000
KAFKA_COMMIT_INTERVAL_MS=200

5. 核心模块解析

5.1 Parser (src/processor/heartbeatParser.js)

职责: 验证并解析单条 Kafka 消息

验证规则:

  • ts_ms: 必需,数字,有限值
  • hotel_id: 必需,字符串,仅数字字符
  • room_id: 必需,非空字符串,允许中英文混合
  • device_id: 必需,非空字符串

设计决策: 使用手写验证器替代 Zod以优化热路径性能

5.2 HeartbeatBuffer (src/buffer/heartbeatBuffer.js)

职责: 5秒时间窗口内的缓冲与内存去重30秒冷却期管理

关键数据结构:

  • buffer: Map<string, record> - 活跃记录等待刷新
  • lastWrittenAt: Map<string, timestamp> - 每个键的最后写入时间
  • windowStats: 统计信息(已拉取、符合条件的计数)

冷却期逻辑: 一旦某键写入 DB30 秒内该键的任何新更新被抑制,但最新值保留在缓冲中,待冷却期过期后再写入

5.3 HeartbeatDbManager (src/db/heartbeatDbManager.js)

职责: 批量 upsert 操作 + 时间序列保护

核心 SQL 模式:

INSERT INTO room_status_moment_g5 (hotel_id, room_id, device_id, ts_ms, status)
VALUES ($1::smallint, $2, $3, $4, 1)
ON CONFLICT (hotel_id, room_id) 
  DO UPDATE SET ts_ms = EXCLUDED.ts_ms, status = 1
  WHERE EXCLUDED.ts_ms >= current.ts_ms

设计决策: ::smallint 强制类型转换确保 Kafka 字符串 hotel_id 与 G5 smallint 列兼容

5.4 Kafka Consumer (src/kafka/consumer.js)

职责: 创建并管理 N 个消费者实例,实现分区感知自动扩展

关键函数:

  • resolveTopicPartitionCount(kafkaConfig): 异步查询 Kafka 元数据,获取真实分区数
  • createKafkaConsumers(kafkaConfig): 异步创建 N = max(配置, 分区数) 个消费者

批量提交策略: 200ms 周期性批量提交偏移量(非逐条提交)

6. 问题根源与解决方案

问题 1: 100% 消息解析失败

根源: hotel_id 验证期望数字,但 Kafka 实际传输字符串 ("2045" vs 2045)

解决: 实现 isDigitsOnly() 验证器,接受数字字符的字符串值

验证: 采样 50 条真实 Kafka 消息,验证 100% 符合更新后的规范

问题 2: 消费者实例数不匹配分区数

根源: 配置了 3 个消费者,但主题有 6 个分区

解决: 添加 resolveTopicPartitionCount() 异步函数,启动时自动检测并扩展到 6 个实例

问题 3: 写入压力过高

根源: 5秒缓冲窗口过短同一键频繁写入 DB

解决: 实现 30 秒写入冷却期同一键room_id + hotel_id在冷却期内只写一次新更新在缓冲中等待

7. 质量保证

7.1 测试覆盖

  • Parser 测试: 8 个用例(有效、无效 JSON、缺失字段、类型错误、空值、非数字 hotel_id
  • Buffer 测试: 6 个用例(去重、分离条目、无效记录、写入失败、冷却期抑制、冷却期后更新)
  • 集成测试: 启动 → Kafka 连接 → DB 连接 → 消费者伸缩 → 消息处理流水线

7.2 持续集成命令

npm run test          # Vitest 单元测试
npm run build         # Vite 构建验证
npm run dev           # 完整启动流程验证
npm run sample:kafka  # 消息结构采样与验证

7.3 监控与审计

  • 依赖审计: 修改 package.json 后运行 npm audit
  • 类型安全: 手写验证器确保类型边界(数字字符检查、空值处理)
  • 性能监控: Redis 上报消费速度、去重命中率、写入延迟统计

8. 部署与维护

8.1 标准启动流程

  1. 环境变量加载 (dotenv)
  2. Redis 连接验证
  3. PostgreSQL G5 连接验证
  4. Kafka 分区数自动检测(关键步骤)
  5. 创建 N 个消费者实例
  6. 启动定时报告 cron
  7. 开始消费与处理

8.2 故障恢复

  • 消息验证失败: 消息被完全忽略(计数记录),偏移量正常提交
  • DB 写入失败: 记录保留在缓冲中30秒后重试
  • 连接中断: 使用现有 pg/redis 的重连机制

9. 性能特征

指标 说明
消费吞吐 6 并行消费者 自动扩展到分区数
缓冲窗口 5 秒 内存去重窗口
冷却期 30 秒 每键写入间隔下限
批量提交周期 200ms Kafka 偏移量提交间隔
构建大小 ~22KB dist/index.js 最终产物
测试覆盖 14 个用例 全部通过

10. 修订历史

版本 日期 变更
1.0.0 2026-03-11 初始 OpenSpec双层去重、自动伸缩、类型修正

文档维护责任: 每次修改核心逻辑Parser、Buffer、DbManager同步更新相应 spec/*.md 文档。
最后更新: 2026-03-11