Files
XuJiacheng e45d14b720 feat: 实现心跳消息处理模块
- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。
- 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。
- 新增配置文件 config.js,支持从环境变量加载配置。
- 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。
- 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。
- 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。
- 新增日志记录工具,支持不同级别的日志输出。
- 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。
- 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。
- 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。
- 配置 Vite 构建工具,支持 Node.js 环境的构建。
2026-03-12 14:11:02 +08:00

15 KiB
Raw Permalink Blame History

去重策略规范 (Deduplication Specification)

1. 去重概述

本系统实现双层去重策略,分别在内存缓冲和数据库写入两个层面对心跳数据进行去重:

  1. Layer 1 - 5秒缓冲去重: 内存中维护 5 秒时间窗口,同一键只保留最新记录
  2. Layer 2 - 30秒写入冷却: 每个键写入 DB 后30 秒内不再写入,减轻数据库压力

2. 去重键设计

2.1 键的组成

const key = `${hotel_id}:${room_id}`;

// 示例:
// hotel_id = "2045", room_id = "6010"
// → key = "2045:6010"

// hotel_id = "1309", room_id = "大会议室"
// → key = "1309:大会议室"

2.2 为什么选择 (hotel_id, room_id) 作为去重键

业务含义: 一个酒店内的一个房间在同一时刻只能有一个设备状态

设计决策:

  • 不包含 device_id: 同一房间的多个设备(如多个传感器)应被视为同一状态
  • 不包含 ts_ms: 时间戳用于排序,不用于去重
  • 不包含 current_time: 冗余时间戳,已有 ts_ms

SQL 对应: 数据库表 room_status_moment_g5 的主键 = (hotel_id, room_id)

CREATE TABLE room_status_moment_g5 (
  hotel_id SMALLINT,
  room_id TEXT,
  device_id VARCHAR(255),
  ts_ms BIGINT,
  status SMALLINT,
  PRIMARY KEY (hotel_id, room_id)
);

3. 第一层5秒缓冲去重

3.1 工作原理

class HeartbeatBuffer {
  constructor(maxBufferSize = 5000, windowMs = 5000) {
    this.buffer = new Map();        // key → latest record
    this.maxBufferSize = maxBufferSize;
    this.windowMs = windowMs;       // 5000ms
    this.flushTimer = null;
  }

  add(record) {
    const key = this._getKey(record);
    
    if (this.buffer.has(key)) {
      // 更新逻辑:只保留 ts_ms 最新的
      const existing = this.buffer.get(key);
      if (record.ts_ms > existing.ts_ms) {
        // 新记录更新 → 覆盖旧的
        this.buffer.set(key, record);
      }
      // 否则丢弃更旧的记录,保持缓冲中的最新版本
    } else {
      // 新键 → 直接添加
      this.buffer.set(key, record);
    }
    
    // 如果缓冲满 → 立即刷新(不等待 5s
    if (this.buffer.size >= this.maxBufferSize) {
      this._flush();
    }
  }

  _getKey(record) {
    return `${record.hotel_id}:${record.room_id}`;
  }

  _flush() {
    // 触发数据库写入...
  }
}

3.2 时间窗口示意

时间轴 (单位: 毫秒)

T=0ms     ┌─ add({hotel_id:"2045", room_id:"6010", ts_ms:1000})
          │  buffer = {"2045:6010" → {ts_ms:1000}}
          │
T=100ms   ├─ add({hotel_id:"2045", room_id:"6010", ts_ms:1050})
          │  ts_ms 1050 > 1000 → 更新
          │  buffer = {"2045:6010" → {ts_ms:1050}}
          │
T=200ms   ├─ add({hotel_id:"2045", room_id:"6010", ts_ms:1030})
          │  ts_ms 1030 < 1050 → 丢弃,保持 1050
          │  buffer = {"2045:6010" → {ts_ms:1050}}
          │
T=500ms   ├─ add({hotel_id:"2045", room_id:"6010", ts_ms:1200})
          │  ts_ms 1200 > 1050 → 更新
          │  buffer = {"2045:6010" → {ts_ms:1200}}
          │
T=5000ms  └─ [Scheduled Flush]
             Write {hotel_id:"2045", room_id:"6010", ts_ms:1200}
             → database
             
结果: 5 秒内 4 条重复消息,实际只写入 1 条(最新的)
去重率: 75% (4-1)/4

3.3 去重效果分析

输入场景: 同一房间心跳设备在 5 秒内发送多条消息

// 真实数据示例
const messagesIn5Seconds = [
  {hotel_id:"2045", room_id:"6010", device_id:"DEV1", ts_ms:1000},
  {hotel_id:"2045", room_id:"6010", device_id:"DEV1", ts_ms:1010},  // 重复
  {hotel_id:"2045", room_id:"6010", device_id:"DEV2", ts_ms:1005},  // 同房不同设备
  {hotel_id:"2045", room_id:"6010", device_id:"DEV1", ts_ms:1015},  // 重复(最新)
  {hotel_id:"2045", room_id:"6010", device_id:"DEV1", ts_ms:1008},  // 重复(旧)
];

// 缓冲处理
const buffer = new HeartbeatBuffer(5000, 5000);
for (const msg of messagesIn5Seconds) {
  const parsed = parseHeartbeat(JSON.stringify(msg));
  buffer.add(parsed);
}

// 缓冲内容5秒后刷新
// {"2045:6010" → {ts_ms: 1015}}

// 结果5 条输入 → 1 条输出
// device_id 合并(同房)+ ts_ms 排序(保留最新) = 高效去重

3.4 缓冲满时的行为

// 配置
const buffer = new HeartbeatBuffer(maxBufferSize = 5000, windowMs = 5000);

// 如果在短时间内收到超过 5000 条不同键的消息
for (let i = 0; i < 6000; i++) {
  buffer.add({
    hotel_id: String(Math.floor(i / 1000)),  // 0-5
    room_id: String(i % 1000),                // 0-999
    device_id: "DEV1",
    ts_ms: Date.now()
  });
}

// 当 buffer.size >= 5000 时,主动触发 flush不等待 5s
// 这是防止内存溢出的安全机制

4. 第二层30秒写入冷却期

4.1 冷却期的核心逻辑

class HeartbeatBuffer {
  constructor(cooldownMs = 30000) {
    this.lastWrittenAt = new Map();  // key → timestamp
    this.cooldownMs = cooldownMs;      // 30000ms
  }

  async _flush() {
    const nowTs = this.now();
    const writableEntries = [];
    let minCooldownDelayMs = null;

    // 遍历缓冲中的所有键
    for (const [key, row] of this.buffer.entries()) {
      
      // 检查冷却期
      const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs);
      
      if (cooldownDelayMs > 0) {
        // 仍在冷却期 → 跳过,保留在缓冲中等待
        minCooldownDelayMs = minCooldownDelayMs == null
          ? cooldownDelayMs
          : Math.min(minCooldownDelayMs, cooldownDelayMs);
        continue;  // 不写入
      }
      
      // 冷却期已过 → 标记为可写
      writableEntries.push([key, row]);
      this.buffer.delete(key);  // 从缓冲移除
    }

    // 执行数据库写入
    if (writableEntries.length > 0) {
      try {
        const rows = writableEntries.map(([, row]) => row);
        await this.dbManager.upsertBatch(rows);

        // 标记写入时间(启动新冷却期)
        const writtenAt = this.now();
        for (const [key] of writableEntries) {
          this.lastWrittenAt.set(key, writtenAt);
        }
      } catch (err) {
        // 写入失败 → 重新添加到缓冲
        for (const [key, row] of writableEntries) {
          this.buffer.set(key, row);
        }
        throw err;
      }
    }

    // 安排下次刷新
    const nextFlushDelayMs = minCooldownDelayMs ?? this.windowMs;
    this.flushTimer = setTimeout(() => this._flush(), nextFlushDelayMs);
  }

  _getCooldownDelayMs(key, nowTs) {
    const lastWritten = this.lastWrittenAt.get(key);
    
    if (lastWritten == null) {
      // 从未写入 → 立即可写
      return 0;
    }
    
    // 计算冷却期剩余时间
    const cooldownExpiry = lastWritten + this.cooldownMs;
    const delayMs = cooldownExpiry - nowTs;
    
    return Math.max(0, delayMs);
  }

  now() {
    return Date.now();
  }
}

4.2 冷却期时间线示例

时间点               事件
─────────────────────────────────────────

T=0s    write("2045:6010") to DB
        lastWrittenAt["2045:6010"] = 0
        ↓ 冷却期开始

T=1s    buffer 中有 "2045:6010" 的新数据
        但 cooldownLeft = 30000 - 1000 = 29000ms > 0
        ✗ 跳过写入,保留在缓冲中

T=15s   buffer 仍有 "2045:6010"
        cooldownLeft = 30000 - 15000 = 15000ms > 0
        ✗ 跳过写入

T=29s   buffer 收到 "2045:6010" 的最新更新
        cooldownLeft = 30000 - 29000 = 1000ms > 0
        ✗ 跳过写入,但缓冲中的值已是最新的

T=30s   flush() 检查 "2045:6010"
        cooldownLeft = 30000 - 30000 = 0 ≤ 0
        ✓ 可以写入!
        write("2045:6010") to DB with latest value
        lastWrittenAt["2045:6010"] = 30000
        ↓ 新冷却期开始

T=31s   buffer 有新的 "2045:6010"
        cooldownLeft = 60000 - 31000 = 29000ms > 0
        ✗ 跳过写入

...循环...

4.3 冷却期的优势

优势 说明
减轻 DB 压力 同一键 30s 只写一次,而不是每 5s 写一次
保持数据新鲜 虽然 30s 内不写 DB但缓冲中保留最新值
防止频繁更新 避免 UPDATE 语句的过度执行
简化版本控制 每 30s 保证一次更新,易于追踪数据变化

4.4 与缓冲窗口的关系

┌─────────────────────────────────────────────────────────┐
│ 5秒缓冲窗口 (Layer 1)                                    │
├────────────────────────────────────────┬──────────────┤
│ buffer = {                              │ @T=5s flush: │
│   "2045:6010" → {ts_ms: 1200},          │ write if no  │
│   "1309:8809" → {ts_ms: 2300},          │ cooldown     │
│   ...                                   │              │
│ }                                        │              │
└────────────────────────────────────────┴──────────────┘

        ↓ 满足 2 个条件之一:
        - 缓冲满≥5000 条)
        - 5秒时间过期

┌──────────────────────────────────────────────────────────┐
│ 30秒冷却期检查 (Layer 2)                                  │
├─────────────────────────────────────────────────────────┤
│ for each key in buffer:                                  │
│   if (now - lastWrittenAt[key]) < 30000:               │
│     → skip (keep in buffer)                            │
│   else:                                                  │
│     → write to DB, update lastWrittenAt[key]           │
└─────────────────────────────────────────────────────────┘

        ↓ DB 的最终状态

┌──────────────────────────────────────────────────────────┐
│ PostgreSQL (room_status_moment_g5)                       │
├─────────────────────────────────────────────────────────┤
│ (hotel_id:2045, room_id:6010) → {ts_ms: 1200, ...}    │
│ (hotel_id:1309, room_id:8809) → {ts_ms: 2300, ...}    │
│ ...                                                      │
└─────────────────────────────────────────────────────────┘

5. 去重命中率估算

5.1 典型场景分析

假设:

  • 消费速率30,000 msg/s
  • 酒店数100
  • 房间数/酒店1000
  • 总不同键100 × 1000 = 100,000 个
  • 每键消息频率30,000 / 100,000 = 0.3 msg/s = 1 msg/3.3s

5秒缓冲去重率:

同一键在 5s 内的消息数0.3 × 5 = 1.5(平均)
→ 缓冲虽有去重,但每键大多只有 1-2 条,去重率较低 ~20-30%

结论:缓冲主要用于吸收毛刺(短时间内的重复),不是主要去重机制

30秒冷却期去重率:

不考虑冷却期:每键 5s 写一次 → 30s 内写 6 次
使用冷却期:每键 30s 内只写 1 次 → 去重率 = (6-1)/6 = 83.3%

结论30秒冷却期是关键减轻 DB 压力 83%

5.2 极端场景

场景 A单键频繁更新

同一房间的设备每 100ms 发送一次心跳

缓冲处理:
  T=0ms: add({...ts_ms:1000})
  T=100ms: add({...ts_ms:1100}) → 缓冲中更新到 1100
  T=200ms: add({...ts_ms:1200}) → 缓冲中更新到 1200
  ...
  T=5000ms: flush() → 写入 {ts_ms:5000}
  T=10000ms: flush() → 冷却期仍有 20s 剩余 → 跳过
  ...
  T=35000ms: flush() → 冷却期过 → 写入最新值

结果50 条消息T=0-5000ms 内)→ 1 条写入T=5000ms
      → 再加 1 条写入T=35000ms 冷却期过)
      总计 50 msg → 2 DB writes去重率 96%

场景 B多键均匀分布

100 个不同的键,每键每 30s 写一次

缓冲 + 冷却期协同:
  Layer 1 (5s): 100 键中有去重 → 实际缓冲可能只有 80 条(去重 20%
  Layer 2 (30s): 无冷却期情况下 30s 写 6 次,现在只写 1 次 → 减少 83%
  
整体效果DB 写入量减少到原来的 1/6约 16.7%

6. 错误场景与恢复

6.1 缓冲满的处理

add(record) {
  const key = this._getKey(record);
  if (this.buffer.has(key)) {
    const existing = this.buffer.get(key);
    if (record.ts_ms > existing.ts_ms) {
      this.buffer.set(key, record);
    }
  } else {
    this.buffer.set(key, record);
  }
  
  // 防止内存溢出:缓冲满 → 立即刷新
  if (this.buffer.size >= this.maxBufferSize) {
    this._flush();  // 进入 Layer 2 检查和写入
  }
}

// maxBufferSize 默认 5000可配置
// HEARTBEAT_BUFFER_SIZE_MAX=5000

6.2 写入失败的恢复

async _flush() {
  // ... 选出 writableEntries ...
  
  try {
    const rows = writableEntries.map(([, row]) => row);
    await this.dbManager.upsertBatch(rows);
    
    // 成功 → 更新 lastWrittenAt
    const writtenAt = this.now();
    for (const [key] of writableEntries) {
      this.lastWrittenAt.set(key, writtenAt);
    }
  } catch (err) {
    // 失败 → 重新添加到缓冲,稍后重试
    for (const [key, row] of writableEntries) {
      this.buffer.set(key, row);
    }
    logger.error(`Batch upsert failed: ${err.message}`);
    throw err;  // 可选:传播错误或继续
  }
}

7. 性能特征

7.1 内存占用

缓冲区最大容量5000 条记录
每条记录大小≈200 bytes (包括 ts_ms, hotel_id, room_id, device_id)
最大缓冲内存5000 × 200 = 1 MB

lastWrittenAt 追踪:
  最多 100K 个键100 酒店 × 1000 房间)
  每个 Map 条目≈50 bytes (key + timestamp)
  总计100K × 50 = 5 MB

整体估计≈6-10 MB可接受

7.2 CPU 开销

add() 操作O(1) Map 查找 + 比较
_getCooldownDelayMs()O(1) 查找 + 算术
flush() 循环O(缓冲大小) ≈ O(5000)

典型负载30K msg/s
  = 30K add() 调用/s
  = 30K × O(1) = 常数时间CPU 占用低
  
flush() 每 5s 或缓冲满时执行一次 ≈ 6-10 次/s
  = 6 × O(5000) = 30K 操作/s ≈ 与消费速率相当
  
总 CPU中等不是瓶颈

上次修订: 2026-03-11
维护者: BLS OldRCU Heartbeat Team