Files
XuJiacheng e45d14b720 feat: 实现心跳消息处理模块
- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。
- 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。
- 新增配置文件 config.js,支持从环境变量加载配置。
- 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。
- 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。
- 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。
- 新增日志记录工具,支持不同级别的日志输出。
- 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。
- 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。
- 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。
- 配置 Vite 构建工具,支持 Node.js 环境的构建。
2026-03-12 14:11:02 +08:00

16 KiB
Raw Permalink Blame History

架构规范 (Architecture Specification)

1. 系统架构图

┌─────────────────────────────────────────────────────────────────┐
│                        Kafka Cluster                             │
│  Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions)    │
└──────────────────────┬──────────────────────────────────────────┘
                       │
        ┌──────────────┼──────────────┐
        │              │              │
     Part0          Part1 ... Part5   
        │              │              │
    ┌───▼────┐    ┌────▼──┐      ┌──▼────┐
    │ Cons-  │    │ Cons-  │ ... │ Cons- │  (6 Consumer Instances)
    │ umer-0 │    │ umer-1 │     │ umer-5│
    └───┬────┘    └────┬──┘      └──┬────┘
        │              │             │
        └──────────────┼─────────────┘
                       │
        ┌──────────────▼───────────────┐
        │   HeartbeatParser            │
        │ (Validation & Type Check)    │
        │ - ts_ms: number              │
        │ - hotel_id: digit-string     │
        │ - room_id: non-blank string  │
        │ - device_id: non-blank str   │
        └──────────────┬───────────────┘
                       │
        ┌──────────────▼────────────────────────┐
        │   HeartbeatBuffer (Layer 1 Dedup)     │
        │ ◆ 5-second window                    │
        │ ◆ In-memory dedup by key             │
        │ ◆ Keep latest ts_ms per key          │
        │ ◆ Stats tracking (pulled/merged)     │
        └──────────────┬────────────────────────┘
                       │
        ┌──────────────▼────────────────────────┐
        │   Cooldown Filter (Layer 2 Dedup)     │
        │ ◆ 30-second cooldown-per-key         │
        │ ◆ lastWrittenAt Map tracking         │
        │ ◆ Hold updates during cooldown       │
        │ ◆ Flush only eligible keys           │
        └──────────────┬────────────────────────┘
                       │
        ┌──────────────▼────────────────────────┐
        │   HeartbeatDbManager (Batch Upsert)   │
        │ ◆ Parameterized SQL with type cast   │
        │ ◆ ::smallint for hotel_id            │
        │ ◆ ON CONFLICT with ts_ms ordering    │
        │ ◆ Batched writes (5s or maxSize)     │
        └──────────────┬────────────────────────┘
                       │
        ┌──────────────▼────────────────────────┐
        │     PostgreSQL G5                     │
        │  (room_status_moment_g5 table)        │
        │  Primary Key: (hotel_id, room_id)    │
        │  Columns: ts_ms, device_id, status   │
        └──────────────┬────────────────────────┘
                       │
        ┌──────────────▼────────────────────────┐
        │   Metrics Reporter (Redis + Cron)     │
        │ ◆ Consumption rate                   │
        │ ◆ Dedup hit rate                     │
        │ ◆ Write latency                      │
        └───────────────────────────────────────┘

2. 模块交互时序

2.1 单条消息处理时序

Timeline: Message arrives at Kafka

T=0ms     [Kafka] Partition-0 retrieves message
          ├─ raw: {"ts_ms":1234567890, "hotel_id":"2045", "room_id":"6010", 
          │         "device_id":"DEV001", "current_time": "2026-03-11T10:30:00Z"}
          │
T=0.1ms   [Consumer] Receives from Kafka
          │
T=0.2ms   [Parser] Validates
          ├─ ts_ms check: Number.isFinite(1234567890) ✓
          ├─ hotel_id check: isDigitsOnly("2045") ✓
          ├─ room_id check: isNonBlankString("6010") ✓
          ├─ device_id check: isNonBlankString("DEV001") ✓
          ├─ Returns: {ts_ms, hotel_id, room_id, device_id} ✓
          │
T=1ms     [HeartbeatBuffer] Add to buffer
          ├─ key = "2045:6010"
          ├─ Check buffer.has(key)?
          │  ├─ NO  → Add new entry
          │  └─ YES → Merge if ts_ms newer
          ├─ Check if buffer.size >= maxSize (5000)?
          │  └─ YES → Trigger flush immediately
          │
T=2ms     [Cooldown Check] In flush()
          ├─ nowTs = Date.now()
          ├─ For each key in buffer:
          │  ├─ cooldownLeft = lastWrittenAt[key] + 30000 - nowTs
          │  ├─ IF cooldownLeft > 0
          │  │  └─ Skip (keep in buffer for later)
          │  ├─ ELSE (eligible)
          │  │  ├─ Move to writableEntries
          │  │  └─ Remove from buffer
          │
T=5000ms  [Scheduled Flush Every 5s]
          ├─ Writable entries collected
          ├─ DB upsert batch
          ├─ On success:
          │  └─ Mark lastWrittenAt[key] = current time
          ├─ On error:
          │  └─ Re-add to buffer for retry
          │
T=5001ms  [Schedule Next Flush]
          ├─ IF buffer is empty
          │  └─ Schedule next at T+5000ms
          ├─ IF cooldown exists
          │  └─ Schedule at earliest cooldown expiry

2.2 多消费者分区映射

Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions)

Partition Assignment (after auto-scaling):
┌────────────────────────────────────────┐
│ Partition 0 → Consumer Instance 0      │
│ Partition 1 → Consumer Instance 1      │
│ Partition 2 → Consumer Instance 2      │
│ Partition 3 → Consumer Instance 3      │
│ Partition 4 → Consumer Instance 4      │
│ Partition 5 → Consumer Instance 5      │
└────────────────────────────────────────┘

All instances share:
  - Same HeartbeatBuffer instance (in-memory, 5s window)
  - Same HeartbeatDbManager (batched writes to G5)
  - Same Redis connection (metrics reporting)

Benefit: Load distributed across partitions, bottleneck = DB write rate

3. 消费者自动伸缩机制

3.1 启动时分区检测流程

// src/kafka/consumer.js

async function resolveTopicPartitionCount(kafkaConfig) {
  // Step 1: 建立临时 Kafka 客户端
  const client = new kafka.KafkaClient({
    kafkaHost: kafkaConfig.brokers
  });
  
  // Step 2: 异步查询主题元数据
  await new Promise((resolve, reject) => {
    client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
      if (err) reject(err);
      else {
        const partitions = metadata[0].partitions;
        const count = partitions.length;  // e.g., 6
        resolve(count);
      }
    });
  });
  
  // Step 3: 关闭客户端,返回分区数
  client.close();
  return count;
}

// 启动流程
const configuredInstances = 3;
const actualPartitionCount = await resolveTopicPartitionCount(kafkaConfig);
const instanceCount = Math.max(configuredInstances, actualPartitionCount);
// 结果: max(3, 6) = 6 instances created

3.2 消费者动态创建

async function createKafkaConsumers(kafkaConfig) {
  const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
  const count = Math.max(3, partitionCount);
  
  const consumers = [];
  for (let i = 0; i < count; i++) {
    const consumer = createOneConsumer(i, kafkaConfig);
    consumers.push(consumer);
    logger.info(`Started Kafka consumer ${i}/${count-1}`);
  }
  return consumers;
}

4. 缓冲区与去重策略详解

4.1 双层去重架构

Layer 1: 5秒时间窗口去重

class HeartbeatBuffer {
  // 内存 Map按键存储最新记录
  buffer = new Map();
  
  add(record) {
    const key = `${record.hotel_id}:${record.room_id}`;
    
    if (this.buffer.has(key)) {
      // 合并逻辑:只保留 ts_ms 更新的版本
      const existing = this.buffer.get(key);
      if (record.ts_ms > existing.ts_ms) {
        this.buffer.set(key, record);
      }
      // 否则丢弃更旧的
    } else {
      this.buffer.set(key, record);
    }
    
    // 缓冲满 → 立即刷新
    if (this.buffer.size >= this.maxBufferSize) {
      this.flush();
    }
  }
}

// 示例:
// T=0ms    add({hotel_id:"2045", room_id:"6010", ts_ms: 1000})
//          buffer = {"2045:6010" → {ts_ms: 1000}}
// T=100ms  add({hotel_id:"2045", room_id:"6010", ts_ms: 1100})
//          ts_ms newer → 覆盖
//          buffer = {"2045:6010" → {ts_ms: 1100}}
// T=200ms  add({hotel_id:"2045", room_id:"6010", ts_ms: 1050})
//          ts_ms 旧 → 丢弃,不更新
//          buffer = {"2045:6010" → {ts_ms: 1100}} (保持)
// T=5000ms [Scheduled flush]
//          Write {hotel_id:"2045", room_id:"6010", ts_ms: 1100} to DB

Layer 2: 30秒写入冷却期

class HeartbeatBuffer {
  // 追踪每个键的最后写入时间
  lastWrittenAt = new Map();
  
  // 冷却期配置(毫秒)
  cooldownMs = 30000;
  
  flush() {
    const nowTs = this.now();
    const writableEntries = [];
    let minCooldownDelayMs = null;
    
    for (const [key, row] of this.buffer.entries()) {
      const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs);
      
      if (cooldownDelayMs > 0) {
        // 仍在冷却期内 → 跳过写入,保留在缓冲中
        minCooldownDelayMs = minCooldownDelayMs == null
          ? cooldownDelayMs
          : Math.min(minCooldownDelayMs, cooldownDelayMs);
        continue;
      }
      
      // 已过冷却期 → 标记为可写
      writableEntries.push([key, row]);
      this.buffer.delete(key);
    }
    
    // 执行数据库写入
    const rows = writableEntries.map(([, row]) => row);
    if (rows.length > 0) {
      await this.dbManager.upsertBatch(rows);
      
      // 记录写入时间,启动新的冷却期
      const writtenAt = this.now();
      for (const [key] of writableEntries) {
        this.lastWrittenAt.set(key, writtenAt);
      }
    }
    
    // 调度下次刷新
    const nextFlushDelayMs = minCooldownDelayMs ?? 5000;
    this.flushTimer = setTimeout(() => this.flush(), nextFlushDelayMs);
  }
  
  _getCooldownDelayMs(key, nowTs) {
    const lastWritten = this.lastWrittenAt.get(key);
    if (lastWritten == null) return 0; // 从未写入 → 立即可写
    
    const cooldownExpiry = lastWritten + this.cooldownMs;
    const delay = cooldownExpiry - nowTs;
    return Math.max(0, delay);
  }
}

// 时间线示例30秒冷却期
// T=0s    flush() writes key "2045:6010" to DB
//         lastWrittenAt["2045:6010"] = 0
// T=1s    add({hotel_id:"2045", room_id:"6010", ts_ms: 2000})
//         buffer["2045:6010"] = {ts_ms: 2000}
// T=2s    flush() check:
//         cooldownLeft = 30000 - 2000 = 28000ms > 0
//         → Skip write, keep in buffer
// T=29s   add({hotel_id:"2045", room_id:"6010", ts_ms: 2500})
//         buffer update: {ts_ms: 2500}
// T=30s   flush() check:
//         cooldownLeft = 30000 - 30000 = 0 ≤ 0
//         → Write {ts_ms: 2500} to DB
//         lastWrittenAt["2045:6010"] = 30000 (new cooldown starts)

5. 批量 Upsert 与时间序列保护

5.1 SQL 设计

-- 参数化查询
INSERT INTO room_status_moment_g5 
  (hotel_id, room_id, device_id, ts_ms, status)
VALUES 
  -- ($1::smallint, $2, $3, $4, 1),  -- Record 1
  -- ($5::smallint, $6, $7, $8, 1),  -- Record 2
  -- ...
ON CONFLICT (hotel_id, room_id) DO UPDATE SET
  ts_ms = EXCLUDED.ts_ms,
  status = 1
WHERE EXCLUDED.ts_ms >= current.ts_ms;

5.2 类型转换策略

// Kafka 传来的字符串 hotel_id
const kafkaData = {
  hotel_id: "2045",  // STRING in Kafka
  room_id: "6010",
  device_id: "DEV001",
  ts_ms: 1234567890
};

// 构建参数化查询
const params = [
  parseInt(kafkaData.hotel_id),  // 转为数字
  kafkaData.room_id,
  kafkaData.device_id,
  kafkaData.ts_ms
];

// SQL 中的 ::smallint 强制类型转换
// $1::smallint 确保即使参数是数字也能与 G5 smallint 列兼容

6. 批量提交策略

6.1 Kafka 偏移量提交

// 旧策略(低效):逐条消息提交
message => {
  const parsed = parseHeartbeat(message);
  if (parsed) buffer.add(parsed);
  consumer.commitOffset({...});  // 每条消息都提交!
}

// 新策略高效200ms 周期性批量提交
const commitInterval = 200; // 毫秒
setInterval(() => {
  // 此时刻之前消费的所有消息一次性提交
  consumer.commit(false, (err) => {
    if (!err) logger.debug('Batch offset committed');
  });
}, commitInterval);

6.2 提交间隔与吞吐量的权衡

策略 提交间隔 优点 缺点
Per-message 1ms 最高可靠性 消费速度慢 50%
200ms batch 200ms 平衡可靠性与吞吐 故障时丢失 <200ms 消息
5s batch 5000ms 最高吞吐 故障风险大

当前选择: 200ms (平衡)

7. 错误恢复机制

7.1 Parser 错误

const parsed = parseHeartbeat(rawMessage);
if (parsed === null) {
  // 验证失败:
  // - 计数记录 (invalidCount++)
  // - 偏移量正常提交(不再消费此消息)
  // - 无重试(垃圾消息被丢弃)
  stats.invalidCount++;
  consumer.commitOffset(...);
  continue;
}

7.2 数据库写入失败

try {
  await this.dbManager.upsertBatch(rows);
  // 标记冷却期
  const writtenAt = this.now();
  for (const [key] of writableEntries) {
    this.lastWrittenAt.set(key, writtenAt);
  }
} catch (err) {
  // 写入失败 → 记录保留在缓冲中
  // 30秒后重试或等待缓冲满时重试
  logger.error(`DB upsert failed: ${err.message}`);
  // writableEntries 已从 buffer 中删除,需要重新添加
  for (const [key, row] of writableEntries) {
    this.buffer.set(key, row);
  }
}

8. 性能特征与优化

8.1 吞吐量分析

配置: 6 消费者 × 6 分区 = 1:1 映射 (最优)
Kafka fetch batch size: 100,000 messages
Kafka commit interval: 200ms

理论吞吐:
- 每个消费者每秒消费: ~5000 msg/s
- 6 消费者总计: ~30,000 msg/s

缓冲与去重:
- 5s 窗口: 去除重复速度 ~99.5% (典型数据)
- 30s 冷却: 进一步降低 DB 写入压力 ~60-80%

DB 写入:
- Batch size: 最多 5000 条 或 5s 周期
- 实际写入速率: ~3,000-5,000 rows/s (受冷却期抑制)

8.2 内存占用

缓冲区 (5s 窗口):
- 每条记录 ~200 bytes
- 最多 5000 条
- 总计 ~1 MB

lastWrittenAt 追踪:
- 键数 = 酒店数 × 房间数
- ~100 酒店 × 1000 房间 = 100K 键
- 每个键 Map 条目 ~50 bytes
- 总计 ~5 MB

Stats 对象: ~1 KB
整体估计: ~10 MB 内存占用

9. 关键设计决策

决策 选择 理由
消费者数 动态 = 分区数 避免静态配置失效
验证框架 手写 vs Zod 手写快 10x热路径优化
去重策略 双层 (5s+30s) 单层不足,内存与性能折衷
hotel_id 类型 字符串(数字形式) 与 Kafka 实际数据一致
SQL 冲突解决 WHERE ts_ms 保护 防止乱序消息回滚数据
批量提交周期 200ms 平衡吞吐与可靠性

上次修订: 2026-03-11
维护者: BLS OldRCU Heartbeat Team