- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。 - 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。 - 新增配置文件 config.js,支持从环境变量加载配置。 - 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。 - 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。 - 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。 - 新增日志记录工具,支持不同级别的日志输出。 - 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。 - 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。 - 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。 - 配置 Vite 构建工具,支持 Node.js 环境的构建。
7.8 KiB
BLS OldRCU Heartbeat Backend - OpenSpec
1. 项目简介
项目名称: bls-oldrcu-heartbeat-backend
版本: 1.0.0
维护状态: Active
语言环境: Node.js (ECMAScript Modules)
构建工具: Vite
1.1 核心功能
从 Kafka 消费酒店设备心跳数据,通过多层去重与验证,批量写入 PostgreSQL G5 数据库,并通过 Redis 进行度量上报。
1.2 关键指标
- 消费吞吐: 6 个并行 Kafka 消费者实例
- 去重策略: 双层去重(5秒缓冲 + 30秒冷却)
- 写入批量: 批量 upsert,支持时间序列保护
- 可靠性: 批量提交偏移量(200ms 周期)
- 消息验证: 严格类型检查,4 个必需字段验证
2. 架构设计
2.1 消息处理流水线
Kafka Topic
↓
[Parser] - 类型验证(ts_ms, hotel_id, room_id, device_id)
↓
[Buffer] - 5秒缓冲窗口 + 内存去重
↓
[Cooldown Filter] - 30秒写入冷却期检查
↓
[DB Manager] - Batch Upsert with ts_ms ordering protection
↓
PostgreSQL G5 Database (room_status_moment_g5)
↓
[Redis Reporter] - 度量统计上报
2.2 消费者扩展策略
- 自动分区检测: 启动时通过 Kafka 元数据 API 查询实际分区数
- 动态伸缩: 消费者实例数 = max(配置值 3, Kafka 分区数)
- 当前配置: 主题有 6 个分区 → 创建 6 个消费者实例
2.3 关键技术选型
| 技术栈 | 库版本 | 用途 |
|---|---|---|
| 消息队列 | kafka-node@5.0.0 | Kafka 消费端 |
| 数据库 | pg@8.11.5 | PostgreSQL 6.0 |
| 缓存 | redis@4.6.13 | 度量上报 |
| 定时任务 | node-cron@4.2.1 | 周期性报告 |
| 配置管理 | dotenv@16.4.5 | 环境变量加载 |
3. 规范文档结构
完整的规范文档按照以下模块划分:
| 文档 | 覆盖范围 |
|---|---|
| architecture.md | 系统架构、消费者伸缩、批处理策略 |
| validation.md | 数据验证规则、字段类型、空值处理 |
| kafka.md | Kafka 配置、消费策略、分区感知扩展 |
| deduplication.md | 双层去重策略、冷却期管理、键值设计 |
| database.md | G5 数据库连接、Upsert 逻辑、时间序列保护 |
| testing.md | 单元测试、集成测试、验证策略 |
| deployment.md | 环境配置、启动流程、监控指标 |
4. 快速开始
4.1 开发环境
# 安装依赖
npm install
# 运行开发服务
npm run dev
# 执行单元测试
npm run test
# 构建生产版本
npm run build
# Kafka 数据采样(用于验证消息结构)
npm run sample:kafka
4.2 环境变量配置
# PostgreSQL G5 连接
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_DATABASE_G5=dbv6
# Kafka
KAFKA_BROKERS=kafka.blv-oa.com:9092
KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic
# Redis
REDIS_HOST=10.8.8.109
REDIS_PORT=6379
# 缓冲与去重
HEARTBEAT_BUFFER_SIZE_MAX=5000
HEARTBEAT_BUFFER_WINDOW_MS=5000
HEARTBEAT_WRITE_COOLDOWN_MS=30000
# Kafka 消费优化
KAFKA_CONSUMER_INSTANCES=3
KAFKA_BATCH_SIZE=100000
KAFKA_COMMIT_INTERVAL_MS=200
5. 核心模块解析
5.1 Parser (src/processor/heartbeatParser.js)
职责: 验证并解析单条 Kafka 消息
验证规则:
ts_ms: 必需,数字,有限值hotel_id: 必需,字符串,仅数字字符room_id: 必需,非空字符串,允许中英文混合device_id: 必需,非空字符串
设计决策: 使用手写验证器替代 Zod,以优化热路径性能
5.2 HeartbeatBuffer (src/buffer/heartbeatBuffer.js)
职责: 5秒时间窗口内的缓冲与内存去重,30秒冷却期管理
关键数据结构:
buffer: Map<string, record> - 活跃记录等待刷新lastWrittenAt: Map<string, timestamp> - 每个键的最后写入时间windowStats: 统计信息(已拉取、符合条件的计数)
冷却期逻辑: 一旦某键写入 DB,30 秒内该键的任何新更新被抑制,但最新值保留在缓冲中,待冷却期过期后再写入
5.3 HeartbeatDbManager (src/db/heartbeatDbManager.js)
职责: 批量 upsert 操作 + 时间序列保护
核心 SQL 模式:
INSERT INTO room_status_moment_g5 (hotel_id, room_id, device_id, ts_ms, status)
VALUES ($1::smallint, $2, $3, $4, 1)
ON CONFLICT (hotel_id, room_id)
DO UPDATE SET ts_ms = EXCLUDED.ts_ms, status = 1
WHERE EXCLUDED.ts_ms >= current.ts_ms
设计决策: ::smallint 强制类型转换确保 Kafka 字符串 hotel_id 与 G5 smallint 列兼容
5.4 Kafka Consumer (src/kafka/consumer.js)
职责: 创建并管理 N 个消费者实例,实现分区感知自动扩展
关键函数:
resolveTopicPartitionCount(kafkaConfig): 异步查询 Kafka 元数据,获取真实分区数createKafkaConsumers(kafkaConfig): 异步创建 N = max(配置, 分区数) 个消费者
批量提交策略: 200ms 周期性批量提交偏移量(非逐条提交)
6. 问题根源与解决方案
问题 1: 100% 消息解析失败
根源: hotel_id 验证期望数字,但 Kafka 实际传输字符串 ("2045" vs 2045)
解决: 实现 isDigitsOnly() 验证器,接受数字字符的字符串值
验证: 采样 50 条真实 Kafka 消息,验证 100% 符合更新后的规范
问题 2: 消费者实例数不匹配分区数
根源: 配置了 3 个消费者,但主题有 6 个分区
解决: 添加 resolveTopicPartitionCount() 异步函数,启动时自动检测并扩展到 6 个实例
问题 3: 写入压力过高
根源: 5秒缓冲窗口过短,同一键频繁写入 DB
解决: 实现 30 秒写入冷却期,同一键(room_id + hotel_id)在冷却期内只写一次,新更新在缓冲中等待
7. 质量保证
7.1 测试覆盖
- Parser 测试: 8 个用例(有效、无效 JSON、缺失字段、类型错误、空值、非数字 hotel_id)
- Buffer 测试: 6 个用例(去重、分离条目、无效记录、写入失败、冷却期抑制、冷却期后更新)
- 集成测试: 启动 → Kafka 连接 → DB 连接 → 消费者伸缩 → 消息处理流水线
7.2 持续集成命令
npm run test # Vitest 单元测试
npm run build # Vite 构建验证
npm run dev # 完整启动流程验证
npm run sample:kafka # 消息结构采样与验证
7.3 监控与审计
- 依赖审计: 修改 package.json 后运行
npm audit - 类型安全: 手写验证器确保类型边界(数字字符检查、空值处理)
- 性能监控: Redis 上报消费速度、去重命中率、写入延迟统计
8. 部署与维护
8.1 标准启动流程
- 环境变量加载 (dotenv)
- Redis 连接验证
- PostgreSQL G5 连接验证
- Kafka 分区数自动检测(关键步骤)
- 创建 N 个消费者实例
- 启动定时报告 cron
- 开始消费与处理
8.2 故障恢复
- 消息验证失败: 消息被完全忽略(计数记录),偏移量正常提交
- DB 写入失败: 记录保留在缓冲中,30秒后重试
- 连接中断: 使用现有 pg/redis 的重连机制
9. 性能特征
| 指标 | 值 | 说明 |
|---|---|---|
| 消费吞吐 | 6 并行消费者 | 自动扩展到分区数 |
| 缓冲窗口 | 5 秒 | 内存去重窗口 |
| 冷却期 | 30 秒 | 每键写入间隔下限 |
| 批量提交周期 | 200ms | Kafka 偏移量提交间隔 |
| 构建大小 | ~22KB | dist/index.js 最终产物 |
| 测试覆盖 | 14 个用例 | 全部通过 |
10. 修订历史
| 版本 | 日期 | 变更 |
|---|---|---|
| 1.0.0 | 2026-03-11 | 初始 OpenSpec,双层去重、自动伸缩、类型修正 |
文档维护责任: 每次修改核心逻辑(Parser、Buffer、DbManager)后,同步更新相应 spec/*.md 文档。
最后更新: 2026-03-11