- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。 - 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。 - 新增配置文件 config.js,支持从环境变量加载配置。 - 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。 - 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。 - 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。 - 新增日志记录工具,支持不同级别的日志输出。 - 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。 - 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。 - 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。 - 配置 Vite 构建工具,支持 Node.js 环境的构建。
16 KiB
16 KiB
架构规范 (Architecture Specification)
1. 系统架构图
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions) │
└──────────────────────┬──────────────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
Part0 Part1 ... Part5
│ │ │
┌───▼────┐ ┌────▼──┐ ┌──▼────┐
│ Cons- │ │ Cons- │ ... │ Cons- │ (6 Consumer Instances)
│ umer-0 │ │ umer-1 │ │ umer-5│
└───┬────┘ └────┬──┘ └──┬────┘
│ │ │
└──────────────┼─────────────┘
│
┌──────────────▼───────────────┐
│ HeartbeatParser │
│ (Validation & Type Check) │
│ - ts_ms: number │
│ - hotel_id: digit-string │
│ - room_id: non-blank string │
│ - device_id: non-blank str │
└──────────────┬───────────────┘
│
┌──────────────▼────────────────────────┐
│ HeartbeatBuffer (Layer 1 Dedup) │
│ ◆ 5-second window │
│ ◆ In-memory dedup by key │
│ ◆ Keep latest ts_ms per key │
│ ◆ Stats tracking (pulled/merged) │
└──────────────┬────────────────────────┘
│
┌──────────────▼────────────────────────┐
│ Cooldown Filter (Layer 2 Dedup) │
│ ◆ 30-second cooldown-per-key │
│ ◆ lastWrittenAt Map tracking │
│ ◆ Hold updates during cooldown │
│ ◆ Flush only eligible keys │
└──────────────┬────────────────────────┘
│
┌──────────────▼────────────────────────┐
│ HeartbeatDbManager (Batch Upsert) │
│ ◆ Parameterized SQL with type cast │
│ ◆ ::smallint for hotel_id │
│ ◆ ON CONFLICT with ts_ms ordering │
│ ◆ Batched writes (5s or maxSize) │
└──────────────┬────────────────────────┘
│
┌──────────────▼────────────────────────┐
│ PostgreSQL G5 │
│ (room_status_moment_g5 table) │
│ Primary Key: (hotel_id, room_id) │
│ Columns: ts_ms, device_id, status │
└──────────────┬────────────────────────┘
│
┌──────────────▼────────────────────────┐
│ Metrics Reporter (Redis + Cron) │
│ ◆ Consumption rate │
│ ◆ Dedup hit rate │
│ ◆ Write latency │
└───────────────────────────────────────┘
2. 模块交互时序
2.1 单条消息处理时序
Timeline: Message arrives at Kafka
T=0ms [Kafka] Partition-0 retrieves message
├─ raw: {"ts_ms":1234567890, "hotel_id":"2045", "room_id":"6010",
│ "device_id":"DEV001", "current_time": "2026-03-11T10:30:00Z"}
│
T=0.1ms [Consumer] Receives from Kafka
│
T=0.2ms [Parser] Validates
├─ ts_ms check: Number.isFinite(1234567890) ✓
├─ hotel_id check: isDigitsOnly("2045") ✓
├─ room_id check: isNonBlankString("6010") ✓
├─ device_id check: isNonBlankString("DEV001") ✓
├─ Returns: {ts_ms, hotel_id, room_id, device_id} ✓
│
T=1ms [HeartbeatBuffer] Add to buffer
├─ key = "2045:6010"
├─ Check buffer.has(key)?
│ ├─ NO → Add new entry
│ └─ YES → Merge if ts_ms newer
├─ Check if buffer.size >= maxSize (5000)?
│ └─ YES → Trigger flush immediately
│
T=2ms [Cooldown Check] In flush()
├─ nowTs = Date.now()
├─ For each key in buffer:
│ ├─ cooldownLeft = lastWrittenAt[key] + 30000 - nowTs
│ ├─ IF cooldownLeft > 0
│ │ └─ Skip (keep in buffer for later)
│ ├─ ELSE (eligible)
│ │ ├─ Move to writableEntries
│ │ └─ Remove from buffer
│
T=5000ms [Scheduled Flush Every 5s]
├─ Writable entries collected
├─ DB upsert batch
├─ On success:
│ └─ Mark lastWrittenAt[key] = current time
├─ On error:
│ └─ Re-add to buffer for retry
│
T=5001ms [Schedule Next Flush]
├─ IF buffer is empty
│ └─ Schedule next at T+5000ms
├─ IF cooldown exists
│ └─ Schedule at earliest cooldown expiry
2.2 多消费者分区映射
Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions)
Partition Assignment (after auto-scaling):
┌────────────────────────────────────────┐
│ Partition 0 → Consumer Instance 0 │
│ Partition 1 → Consumer Instance 1 │
│ Partition 2 → Consumer Instance 2 │
│ Partition 3 → Consumer Instance 3 │
│ Partition 4 → Consumer Instance 4 │
│ Partition 5 → Consumer Instance 5 │
└────────────────────────────────────────┘
All instances share:
- Same HeartbeatBuffer instance (in-memory, 5s window)
- Same HeartbeatDbManager (batched writes to G5)
- Same Redis connection (metrics reporting)
Benefit: Load distributed across partitions, bottleneck = DB write rate
3. 消费者自动伸缩机制
3.1 启动时分区检测流程
// src/kafka/consumer.js
async function resolveTopicPartitionCount(kafkaConfig) {
// Step 1: 建立临时 Kafka 客户端
const client = new kafka.KafkaClient({
kafkaHost: kafkaConfig.brokers
});
// Step 2: 异步查询主题元数据
await new Promise((resolve, reject) => {
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
if (err) reject(err);
else {
const partitions = metadata[0].partitions;
const count = partitions.length; // e.g., 6
resolve(count);
}
});
});
// Step 3: 关闭客户端,返回分区数
client.close();
return count;
}
// 启动流程
const configuredInstances = 3;
const actualPartitionCount = await resolveTopicPartitionCount(kafkaConfig);
const instanceCount = Math.max(configuredInstances, actualPartitionCount);
// 结果: max(3, 6) = 6 instances created
3.2 消费者动态创建
async function createKafkaConsumers(kafkaConfig) {
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
const count = Math.max(3, partitionCount);
const consumers = [];
for (let i = 0; i < count; i++) {
const consumer = createOneConsumer(i, kafkaConfig);
consumers.push(consumer);
logger.info(`Started Kafka consumer ${i}/${count-1}`);
}
return consumers;
}
4. 缓冲区与去重策略详解
4.1 双层去重架构
Layer 1: 5秒时间窗口去重
class HeartbeatBuffer {
// 内存 Map,按键存储最新记录
buffer = new Map();
add(record) {
const key = `${record.hotel_id}:${record.room_id}`;
if (this.buffer.has(key)) {
// 合并逻辑:只保留 ts_ms 更新的版本
const existing = this.buffer.get(key);
if (record.ts_ms > existing.ts_ms) {
this.buffer.set(key, record);
}
// 否则丢弃更旧的
} else {
this.buffer.set(key, record);
}
// 缓冲满 → 立即刷新
if (this.buffer.size >= this.maxBufferSize) {
this.flush();
}
}
}
// 示例:
// T=0ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1000})
// buffer = {"2045:6010" → {ts_ms: 1000}}
// T=100ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1100})
// ts_ms newer → 覆盖
// buffer = {"2045:6010" → {ts_ms: 1100}}
// T=200ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1050})
// ts_ms 旧 → 丢弃,不更新
// buffer = {"2045:6010" → {ts_ms: 1100}} (保持)
// T=5000ms [Scheduled flush]
// Write {hotel_id:"2045", room_id:"6010", ts_ms: 1100} to DB
Layer 2: 30秒写入冷却期
class HeartbeatBuffer {
// 追踪每个键的最后写入时间
lastWrittenAt = new Map();
// 冷却期配置(毫秒)
cooldownMs = 30000;
flush() {
const nowTs = this.now();
const writableEntries = [];
let minCooldownDelayMs = null;
for (const [key, row] of this.buffer.entries()) {
const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs);
if (cooldownDelayMs > 0) {
// 仍在冷却期内 → 跳过写入,保留在缓冲中
minCooldownDelayMs = minCooldownDelayMs == null
? cooldownDelayMs
: Math.min(minCooldownDelayMs, cooldownDelayMs);
continue;
}
// 已过冷却期 → 标记为可写
writableEntries.push([key, row]);
this.buffer.delete(key);
}
// 执行数据库写入
const rows = writableEntries.map(([, row]) => row);
if (rows.length > 0) {
await this.dbManager.upsertBatch(rows);
// 记录写入时间,启动新的冷却期
const writtenAt = this.now();
for (const [key] of writableEntries) {
this.lastWrittenAt.set(key, writtenAt);
}
}
// 调度下次刷新
const nextFlushDelayMs = minCooldownDelayMs ?? 5000;
this.flushTimer = setTimeout(() => this.flush(), nextFlushDelayMs);
}
_getCooldownDelayMs(key, nowTs) {
const lastWritten = this.lastWrittenAt.get(key);
if (lastWritten == null) return 0; // 从未写入 → 立即可写
const cooldownExpiry = lastWritten + this.cooldownMs;
const delay = cooldownExpiry - nowTs;
return Math.max(0, delay);
}
}
// 时间线示例(30秒冷却期):
// T=0s flush() writes key "2045:6010" to DB
// lastWrittenAt["2045:6010"] = 0
// T=1s add({hotel_id:"2045", room_id:"6010", ts_ms: 2000})
// buffer["2045:6010"] = {ts_ms: 2000}
// T=2s flush() check:
// cooldownLeft = 30000 - 2000 = 28000ms > 0
// → Skip write, keep in buffer
// T=29s add({hotel_id:"2045", room_id:"6010", ts_ms: 2500})
// buffer update: {ts_ms: 2500}
// T=30s flush() check:
// cooldownLeft = 30000 - 30000 = 0 ≤ 0
// → Write {ts_ms: 2500} to DB
// lastWrittenAt["2045:6010"] = 30000 (new cooldown starts)
5. 批量 Upsert 与时间序列保护
5.1 SQL 设计
-- 参数化查询
INSERT INTO room_status_moment_g5
(hotel_id, room_id, device_id, ts_ms, status)
VALUES
-- ($1::smallint, $2, $3, $4, 1), -- Record 1
-- ($5::smallint, $6, $7, $8, 1), -- Record 2
-- ...
ON CONFLICT (hotel_id, room_id) DO UPDATE SET
ts_ms = EXCLUDED.ts_ms,
status = 1
WHERE EXCLUDED.ts_ms >= current.ts_ms;
5.2 类型转换策略
// Kafka 传来的字符串 hotel_id
const kafkaData = {
hotel_id: "2045", // STRING in Kafka
room_id: "6010",
device_id: "DEV001",
ts_ms: 1234567890
};
// 构建参数化查询
const params = [
parseInt(kafkaData.hotel_id), // 转为数字
kafkaData.room_id,
kafkaData.device_id,
kafkaData.ts_ms
];
// SQL 中的 ::smallint 强制类型转换
// $1::smallint 确保即使参数是数字也能与 G5 smallint 列兼容
6. 批量提交策略
6.1 Kafka 偏移量提交
// 旧策略(低效):逐条消息提交
message => {
const parsed = parseHeartbeat(message);
if (parsed) buffer.add(parsed);
consumer.commitOffset({...}); // 每条消息都提交!
}
// 新策略(高效):200ms 周期性批量提交
const commitInterval = 200; // 毫秒
setInterval(() => {
// 此时刻之前消费的所有消息一次性提交
consumer.commit(false, (err) => {
if (!err) logger.debug('Batch offset committed');
});
}, commitInterval);
6.2 提交间隔与吞吐量的权衡
| 策略 | 提交间隔 | 优点 | 缺点 |
|---|---|---|---|
| Per-message | 1ms | 最高可靠性 | 消费速度慢 50% |
| 200ms batch | 200ms | 平衡可靠性与吞吐 | 故障时丢失 <200ms 消息 |
| 5s batch | 5000ms | 最高吞吐 | 故障风险大 |
当前选择: 200ms (平衡)
7. 错误恢复机制
7.1 Parser 错误
const parsed = parseHeartbeat(rawMessage);
if (parsed === null) {
// 验证失败:
// - 计数记录 (invalidCount++)
// - 偏移量正常提交(不再消费此消息)
// - 无重试(垃圾消息被丢弃)
stats.invalidCount++;
consumer.commitOffset(...);
continue;
}
7.2 数据库写入失败
try {
await this.dbManager.upsertBatch(rows);
// 标记冷却期
const writtenAt = this.now();
for (const [key] of writableEntries) {
this.lastWrittenAt.set(key, writtenAt);
}
} catch (err) {
// 写入失败 → 记录保留在缓冲中
// 30秒后重试(或等待缓冲满时重试)
logger.error(`DB upsert failed: ${err.message}`);
// writableEntries 已从 buffer 中删除,需要重新添加
for (const [key, row] of writableEntries) {
this.buffer.set(key, row);
}
}
8. 性能特征与优化
8.1 吞吐量分析
配置: 6 消费者 × 6 分区 = 1:1 映射 (最优)
Kafka fetch batch size: 100,000 messages
Kafka commit interval: 200ms
理论吞吐:
- 每个消费者每秒消费: ~5000 msg/s
- 6 消费者总计: ~30,000 msg/s
缓冲与去重:
- 5s 窗口: 去除重复速度 ~99.5% (典型数据)
- 30s 冷却: 进一步降低 DB 写入压力 ~60-80%
DB 写入:
- Batch size: 最多 5000 条 或 5s 周期
- 实际写入速率: ~3,000-5,000 rows/s (受冷却期抑制)
8.2 内存占用
缓冲区 (5s 窗口):
- 每条记录 ~200 bytes
- 最多 5000 条
- 总计 ~1 MB
lastWrittenAt 追踪:
- 键数 = 酒店数 × 房间数
- ~100 酒店 × 1000 房间 = 100K 键
- 每个键 Map 条目 ~50 bytes
- 总计 ~5 MB
Stats 对象: ~1 KB
整体估计: ~10 MB 内存占用
9. 关键设计决策
| 决策 | 选择 | 理由 |
|---|---|---|
| 消费者数 | 动态 = 分区数 | 避免静态配置失效 |
| 验证框架 | 手写 vs Zod | 手写快 10x,热路径优化 |
| 去重策略 | 双层 (5s+30s) | 单层不足,内存与性能折衷 |
| hotel_id 类型 | 字符串(数字形式) | 与 Kafka 实际数据一致 |
| SQL 冲突解决 | WHERE ts_ms 保护 | 防止乱序消息回滚数据 |
| 批量提交周期 | 200ms | 平衡吞吐与可靠性 |
上次修订: 2026-03-11
维护者: BLS OldRCU Heartbeat Team