Files

485 lines
16 KiB
Markdown
Raw Permalink Normal View History

# 架构规范 (Architecture Specification)
## 1. 系统架构图
```
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions) │
└──────────────────────┬──────────────────────────────────────────┘
┌──────────────┼──────────────┐
│ │ │
Part0 Part1 ... Part5
│ │ │
┌───▼────┐ ┌────▼──┐ ┌──▼────┐
│ Cons- │ │ Cons- │ ... │ Cons- │ (6 Consumer Instances)
│ umer-0 │ │ umer-1 │ │ umer-5│
└───┬────┘ └────┬──┘ └──┬────┘
│ │ │
└──────────────┼─────────────┘
┌──────────────▼───────────────┐
│ HeartbeatParser │
│ (Validation & Type Check) │
│ - ts_ms: number │
│ - hotel_id: digit-string │
│ - room_id: non-blank string │
│ - device_id: non-blank str │
└──────────────┬───────────────┘
┌──────────────▼────────────────────────┐
│ HeartbeatBuffer (Layer 1 Dedup) │
│ ◆ 5-second window │
│ ◆ In-memory dedup by key │
│ ◆ Keep latest ts_ms per key │
│ ◆ Stats tracking (pulled/merged) │
└──────────────┬────────────────────────┘
┌──────────────▼────────────────────────┐
│ Cooldown Filter (Layer 2 Dedup) │
│ ◆ 30-second cooldown-per-key │
│ ◆ lastWrittenAt Map tracking │
│ ◆ Hold updates during cooldown │
│ ◆ Flush only eligible keys │
└──────────────┬────────────────────────┘
┌──────────────▼────────────────────────┐
│ HeartbeatDbManager (Batch Upsert) │
│ ◆ Parameterized SQL with type cast │
│ ◆ ::smallint for hotel_id │
│ ◆ ON CONFLICT with ts_ms ordering │
│ ◆ Batched writes (5s or maxSize) │
└──────────────┬────────────────────────┘
┌──────────────▼────────────────────────┐
│ PostgreSQL G5 │
│ (room_status_moment_g5 table) │
│ Primary Key: (hotel_id, room_id) │
│ Columns: ts_ms, device_id, status │
└──────────────┬────────────────────────┘
┌──────────────▼────────────────────────┐
│ Metrics Reporter (Redis + Cron) │
│ ◆ Consumption rate │
│ ◆ Dedup hit rate │
│ ◆ Write latency │
└───────────────────────────────────────┘
```
## 2. 模块交互时序
### 2.1 单条消息处理时序
```
Timeline: Message arrives at Kafka
T=0ms [Kafka] Partition-0 retrieves message
├─ raw: {"ts_ms":1234567890, "hotel_id":"2045", "room_id":"6010",
│ "device_id":"DEV001", "current_time": "2026-03-11T10:30:00Z"}
T=0.1ms [Consumer] Receives from Kafka
T=0.2ms [Parser] Validates
├─ ts_ms check: Number.isFinite(1234567890) ✓
├─ hotel_id check: isDigitsOnly("2045") ✓
├─ room_id check: isNonBlankString("6010") ✓
├─ device_id check: isNonBlankString("DEV001") ✓
├─ Returns: {ts_ms, hotel_id, room_id, device_id} ✓
T=1ms [HeartbeatBuffer] Add to buffer
├─ key = "2045:6010"
├─ Check buffer.has(key)?
│ ├─ NO → Add new entry
│ └─ YES → Merge if ts_ms newer
├─ Check if buffer.size >= maxSize (5000)?
│ └─ YES → Trigger flush immediately
T=2ms [Cooldown Check] In flush()
├─ nowTs = Date.now()
├─ For each key in buffer:
│ ├─ cooldownLeft = lastWrittenAt[key] + 30000 - nowTs
│ ├─ IF cooldownLeft > 0
│ │ └─ Skip (keep in buffer for later)
│ ├─ ELSE (eligible)
│ │ ├─ Move to writableEntries
│ │ └─ Remove from buffer
T=5000ms [Scheduled Flush Every 5s]
├─ Writable entries collected
├─ DB upsert batch
├─ On success:
│ └─ Mark lastWrittenAt[key] = current time
├─ On error:
│ └─ Re-add to buffer for retry
T=5001ms [Schedule Next Flush]
├─ IF buffer is empty
│ └─ Schedule next at T+5000ms
├─ IF cooldown exists
│ └─ Schedule at earliest cooldown expiry
```
### 2.2 多消费者分区映射
```
Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions)
Partition Assignment (after auto-scaling):
┌────────────────────────────────────────┐
│ Partition 0 → Consumer Instance 0 │
│ Partition 1 → Consumer Instance 1 │
│ Partition 2 → Consumer Instance 2 │
│ Partition 3 → Consumer Instance 3 │
│ Partition 4 → Consumer Instance 4 │
│ Partition 5 → Consumer Instance 5 │
└────────────────────────────────────────┘
All instances share:
- Same HeartbeatBuffer instance (in-memory, 5s window)
- Same HeartbeatDbManager (batched writes to G5)
- Same Redis connection (metrics reporting)
Benefit: Load distributed across partitions, bottleneck = DB write rate
```
## 3. 消费者自动伸缩机制
### 3.1 启动时分区检测流程
```javascript
// src/kafka/consumer.js
async function resolveTopicPartitionCount(kafkaConfig) {
// Step 1: 建立临时 Kafka 客户端
const client = new kafka.KafkaClient({
kafkaHost: kafkaConfig.brokers
});
// Step 2: 异步查询主题元数据
await new Promise((resolve, reject) => {
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
if (err) reject(err);
else {
const partitions = metadata[0].partitions;
const count = partitions.length; // e.g., 6
resolve(count);
}
});
});
// Step 3: 关闭客户端,返回分区数
client.close();
return count;
}
// 启动流程
const configuredInstances = 3;
const actualPartitionCount = await resolveTopicPartitionCount(kafkaConfig);
const instanceCount = Math.max(configuredInstances, actualPartitionCount);
// 结果: max(3, 6) = 6 instances created
```
### 3.2 消费者动态创建
```javascript
async function createKafkaConsumers(kafkaConfig) {
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
const count = Math.max(3, partitionCount);
const consumers = [];
for (let i = 0; i < count; i++) {
const consumer = createOneConsumer(i, kafkaConfig);
consumers.push(consumer);
logger.info(`Started Kafka consumer ${i}/${count-1}`);
}
return consumers;
}
```
## 4. 缓冲区与去重策略详解
### 4.1 双层去重架构
#### Layer 1: 5秒时间窗口去重
```javascript
class HeartbeatBuffer {
// 内存 Map按键存储最新记录
buffer = new Map();
add(record) {
const key = `${record.hotel_id}:${record.room_id}`;
if (this.buffer.has(key)) {
// 合并逻辑:只保留 ts_ms 更新的版本
const existing = this.buffer.get(key);
if (record.ts_ms > existing.ts_ms) {
this.buffer.set(key, record);
}
// 否则丢弃更旧的
} else {
this.buffer.set(key, record);
}
// 缓冲满 → 立即刷新
if (this.buffer.size >= this.maxBufferSize) {
this.flush();
}
}
}
// 示例:
// T=0ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1000})
// buffer = {"2045:6010" → {ts_ms: 1000}}
// T=100ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1100})
// ts_ms newer → 覆盖
// buffer = {"2045:6010" → {ts_ms: 1100}}
// T=200ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1050})
// ts_ms 旧 → 丢弃,不更新
// buffer = {"2045:6010" → {ts_ms: 1100}} (保持)
// T=5000ms [Scheduled flush]
// Write {hotel_id:"2045", room_id:"6010", ts_ms: 1100} to DB
```
#### Layer 2: 30秒写入冷却期
```javascript
class HeartbeatBuffer {
// 追踪每个键的最后写入时间
lastWrittenAt = new Map();
// 冷却期配置(毫秒)
cooldownMs = 30000;
flush() {
const nowTs = this.now();
const writableEntries = [];
let minCooldownDelayMs = null;
for (const [key, row] of this.buffer.entries()) {
const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs);
if (cooldownDelayMs > 0) {
// 仍在冷却期内 → 跳过写入,保留在缓冲中
minCooldownDelayMs = minCooldownDelayMs == null
? cooldownDelayMs
: Math.min(minCooldownDelayMs, cooldownDelayMs);
continue;
}
// 已过冷却期 → 标记为可写
writableEntries.push([key, row]);
this.buffer.delete(key);
}
// 执行数据库写入
const rows = writableEntries.map(([, row]) => row);
if (rows.length > 0) {
await this.dbManager.upsertBatch(rows);
// 记录写入时间,启动新的冷却期
const writtenAt = this.now();
for (const [key] of writableEntries) {
this.lastWrittenAt.set(key, writtenAt);
}
}
// 调度下次刷新
const nextFlushDelayMs = minCooldownDelayMs ?? 5000;
this.flushTimer = setTimeout(() => this.flush(), nextFlushDelayMs);
}
_getCooldownDelayMs(key, nowTs) {
const lastWritten = this.lastWrittenAt.get(key);
if (lastWritten == null) return 0; // 从未写入 → 立即可写
const cooldownExpiry = lastWritten + this.cooldownMs;
const delay = cooldownExpiry - nowTs;
return Math.max(0, delay);
}
}
// 时间线示例30秒冷却期
// T=0s flush() writes key "2045:6010" to DB
// lastWrittenAt["2045:6010"] = 0
// T=1s add({hotel_id:"2045", room_id:"6010", ts_ms: 2000})
// buffer["2045:6010"] = {ts_ms: 2000}
// T=2s flush() check:
// cooldownLeft = 30000 - 2000 = 28000ms > 0
// → Skip write, keep in buffer
// T=29s add({hotel_id:"2045", room_id:"6010", ts_ms: 2500})
// buffer update: {ts_ms: 2500}
// T=30s flush() check:
// cooldownLeft = 30000 - 30000 = 0 ≤ 0
// → Write {ts_ms: 2500} to DB
// lastWrittenAt["2045:6010"] = 30000 (new cooldown starts)
```
## 5. 批量 Upsert 与时间序列保护
### 5.1 SQL 设计
```sql
-- 参数化查询
INSERT INTO room_status_moment_g5
(hotel_id, room_id, device_id, ts_ms, status)
VALUES
-- ($1::smallint, $2, $3, $4, 1), -- Record 1
-- ($5::smallint, $6, $7, $8, 1), -- Record 2
-- ...
ON CONFLICT (hotel_id, room_id) DO UPDATE SET
ts_ms = EXCLUDED.ts_ms,
status = 1
WHERE EXCLUDED.ts_ms >= current.ts_ms;
```
### 5.2 类型转换策略
```javascript
// Kafka 传来的字符串 hotel_id
const kafkaData = {
hotel_id: "2045", // STRING in Kafka
room_id: "6010",
device_id: "DEV001",
ts_ms: 1234567890
};
// 构建参数化查询
const params = [
parseInt(kafkaData.hotel_id), // 转为数字
kafkaData.room_id,
kafkaData.device_id,
kafkaData.ts_ms
];
// SQL 中的 ::smallint 强制类型转换
// $1::smallint 确保即使参数是数字也能与 G5 smallint 列兼容
```
## 6. 批量提交策略
### 6.1 Kafka 偏移量提交
```javascript
// 旧策略(低效):逐条消息提交
message => {
const parsed = parseHeartbeat(message);
if (parsed) buffer.add(parsed);
consumer.commitOffset({...}); // 每条消息都提交!
}
// 新策略高效200ms 周期性批量提交
const commitInterval = 200; // 毫秒
setInterval(() => {
// 此时刻之前消费的所有消息一次性提交
consumer.commit(false, (err) => {
if (!err) logger.debug('Batch offset committed');
});
}, commitInterval);
```
### 6.2 提交间隔与吞吐量的权衡
| 策略 | 提交间隔 | 优点 | 缺点 |
|------|---------|------|------|
| Per-message | 1ms | 最高可靠性 | 消费速度慢 50% |
| 200ms batch | 200ms | 平衡可靠性与吞吐 | 故障时丢失 <200ms 消息 |
| 5s batch | 5000ms | 最高吞吐 | 故障风险大 |
**当前选择**: 200ms (平衡)
## 7. 错误恢复机制
### 7.1 Parser 错误
```javascript
const parsed = parseHeartbeat(rawMessage);
if (parsed === null) {
// 验证失败:
// - 计数记录 (invalidCount++)
// - 偏移量正常提交(不再消费此消息)
// - 无重试(垃圾消息被丢弃)
stats.invalidCount++;
consumer.commitOffset(...);
continue;
}
```
### 7.2 数据库写入失败
```javascript
try {
await this.dbManager.upsertBatch(rows);
// 标记冷却期
const writtenAt = this.now();
for (const [key] of writableEntries) {
this.lastWrittenAt.set(key, writtenAt);
}
} catch (err) {
// 写入失败 → 记录保留在缓冲中
// 30秒后重试或等待缓冲满时重试
logger.error(`DB upsert failed: ${err.message}`);
// writableEntries 已从 buffer 中删除,需要重新添加
for (const [key, row] of writableEntries) {
this.buffer.set(key, row);
}
}
```
## 8. 性能特征与优化
### 8.1 吞吐量分析
```
配置: 6 消费者 × 6 分区 = 1:1 映射 (最优)
Kafka fetch batch size: 100,000 messages
Kafka commit interval: 200ms
理论吞吐:
- 每个消费者每秒消费: ~5000 msg/s
- 6 消费者总计: ~30,000 msg/s
缓冲与去重:
- 5s 窗口: 去除重复速度 ~99.5% (典型数据)
- 30s 冷却: 进一步降低 DB 写入压力 ~60-80%
DB 写入:
- Batch size: 最多 5000 条 或 5s 周期
- 实际写入速率: ~3,000-5,000 rows/s (受冷却期抑制)
```
### 8.2 内存占用
```
缓冲区 (5s 窗口):
- 每条记录 ~200 bytes
- 最多 5000 条
- 总计 ~1 MB
lastWrittenAt 追踪:
- 键数 = 酒店数 × 房间数
- ~100 酒店 × 1000 房间 = 100K 键
- 每个键 Map 条目 ~50 bytes
- 总计 ~5 MB
Stats 对象: ~1 KB
整体估计: ~10 MB 内存占用
```
## 9. 关键设计决策
| 决策 | 选择 | 理由 |
|------|------|------|
| 消费者数 | 动态 = 分区数 | 避免静态配置失效 |
| 验证框架 | 手写 vs Zod | 手写快 10x热路径优化 |
| 去重策略 | 双层 (5s+30s) | 单层不足,内存与性能折衷 |
| hotel_id 类型 | 字符串(数字形式) | 与 Kafka 实际数据一致 |
| SQL 冲突解决 | WHERE ts_ms 保护 | 防止乱序消息回滚数据 |
| 批量提交周期 | 200ms | 平衡吞吐与可靠性 |
---
**上次修订**: 2026-03-11
**维护者**: BLS OldRCU Heartbeat Team