# 架构规范 (Architecture Specification) ## 1. 系统架构图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ Kafka Cluster │ │ Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions) │ └──────────────────────┬──────────────────────────────────────────┘ │ ┌──────────────┼──────────────┐ │ │ │ Part0 Part1 ... Part5 │ │ │ ┌───▼────┐ ┌────▼──┐ ┌──▼────┐ │ Cons- │ │ Cons- │ ... │ Cons- │ (6 Consumer Instances) │ umer-0 │ │ umer-1 │ │ umer-5│ └───┬────┘ └────┬──┘ └──┬────┘ │ │ │ └──────────────┼─────────────┘ │ ┌──────────────▼───────────────┐ │ HeartbeatParser │ │ (Validation & Type Check) │ │ - ts_ms: number │ │ - hotel_id: digit-string │ │ - room_id: non-blank string │ │ - device_id: non-blank str │ └──────────────┬───────────────┘ │ ┌──────────────▼────────────────────────┐ │ HeartbeatBuffer (Layer 1 Dedup) │ │ ◆ 5-second window │ │ ◆ In-memory dedup by key │ │ ◆ Keep latest ts_ms per key │ │ ◆ Stats tracking (pulled/merged) │ └──────────────┬────────────────────────┘ │ ┌──────────────▼────────────────────────┐ │ Cooldown Filter (Layer 2 Dedup) │ │ ◆ 30-second cooldown-per-key │ │ ◆ lastWrittenAt Map tracking │ │ ◆ Hold updates during cooldown │ │ ◆ Flush only eligible keys │ └──────────────┬────────────────────────┘ │ ┌──────────────▼────────────────────────┐ │ HeartbeatDbManager (Batch Upsert) │ │ ◆ Parameterized SQL with type cast │ │ ◆ ::smallint for hotel_id │ │ ◆ ON CONFLICT with ts_ms ordering │ │ ◆ Batched writes (5s or maxSize) │ └──────────────┬────────────────────────┘ │ ┌──────────────▼────────────────────────┐ │ PostgreSQL G5 │ │ (room_status_moment_g5 table) │ │ Primary Key: (hotel_id, room_id) │ │ Columns: ts_ms, device_id, status │ └──────────────┬────────────────────────┘ │ ┌──────────────▼────────────────────────┐ │ Metrics Reporter (Redis + Cron) │ │ ◆ Consumption rate │ │ ◆ Dedup hit rate │ │ ◆ Write latency │ └───────────────────────────────────────┘ ``` ## 2. 模块交互时序 ### 2.1 单条消息处理时序 ``` Timeline: Message arrives at Kafka T=0ms [Kafka] Partition-0 retrieves message ├─ raw: {"ts_ms":1234567890, "hotel_id":"2045", "room_id":"6010", │ "device_id":"DEV001", "current_time": "2026-03-11T10:30:00Z"} │ T=0.1ms [Consumer] Receives from Kafka │ T=0.2ms [Parser] Validates ├─ ts_ms check: Number.isFinite(1234567890) ✓ ├─ hotel_id check: isDigitsOnly("2045") ✓ ├─ room_id check: isNonBlankString("6010") ✓ ├─ device_id check: isNonBlankString("DEV001") ✓ ├─ Returns: {ts_ms, hotel_id, room_id, device_id} ✓ │ T=1ms [HeartbeatBuffer] Add to buffer ├─ key = "2045:6010" ├─ Check buffer.has(key)? │ ├─ NO → Add new entry │ └─ YES → Merge if ts_ms newer ├─ Check if buffer.size >= maxSize (5000)? │ └─ YES → Trigger flush immediately │ T=2ms [Cooldown Check] In flush() ├─ nowTs = Date.now() ├─ For each key in buffer: │ ├─ cooldownLeft = lastWrittenAt[key] + 30000 - nowTs │ ├─ IF cooldownLeft > 0 │ │ └─ Skip (keep in buffer for later) │ ├─ ELSE (eligible) │ │ ├─ Move to writableEntries │ │ └─ Remove from buffer │ T=5000ms [Scheduled Flush Every 5s] ├─ Writable entries collected ├─ DB upsert batch ├─ On success: │ └─ Mark lastWrittenAt[key] = current time ├─ On error: │ └─ Re-add to buffer for retry │ T=5001ms [Schedule Next Flush] ├─ IF buffer is empty │ └─ Schedule next at T+5000ms ├─ IF cooldown exists │ └─ Schedule at earliest cooldown expiry ``` ### 2.2 多消费者分区映射 ``` Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions) Partition Assignment (after auto-scaling): ┌────────────────────────────────────────┐ │ Partition 0 → Consumer Instance 0 │ │ Partition 1 → Consumer Instance 1 │ │ Partition 2 → Consumer Instance 2 │ │ Partition 3 → Consumer Instance 3 │ │ Partition 4 → Consumer Instance 4 │ │ Partition 5 → Consumer Instance 5 │ └────────────────────────────────────────┘ All instances share: - Same HeartbeatBuffer instance (in-memory, 5s window) - Same HeartbeatDbManager (batched writes to G5) - Same Redis connection (metrics reporting) Benefit: Load distributed across partitions, bottleneck = DB write rate ``` ## 3. 消费者自动伸缩机制 ### 3.1 启动时分区检测流程 ```javascript // src/kafka/consumer.js async function resolveTopicPartitionCount(kafkaConfig) { // Step 1: 建立临时 Kafka 客户端 const client = new kafka.KafkaClient({ kafkaHost: kafkaConfig.brokers }); // Step 2: 异步查询主题元数据 await new Promise((resolve, reject) => { client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => { if (err) reject(err); else { const partitions = metadata[0].partitions; const count = partitions.length; // e.g., 6 resolve(count); } }); }); // Step 3: 关闭客户端,返回分区数 client.close(); return count; } // 启动流程 const configuredInstances = 3; const actualPartitionCount = await resolveTopicPartitionCount(kafkaConfig); const instanceCount = Math.max(configuredInstances, actualPartitionCount); // 结果: max(3, 6) = 6 instances created ``` ### 3.2 消费者动态创建 ```javascript async function createKafkaConsumers(kafkaConfig) { const partitionCount = await resolveTopicPartitionCount(kafkaConfig); const count = Math.max(3, partitionCount); const consumers = []; for (let i = 0; i < count; i++) { const consumer = createOneConsumer(i, kafkaConfig); consumers.push(consumer); logger.info(`Started Kafka consumer ${i}/${count-1}`); } return consumers; } ``` ## 4. 缓冲区与去重策略详解 ### 4.1 双层去重架构 #### Layer 1: 5秒时间窗口去重 ```javascript class HeartbeatBuffer { // 内存 Map,按键存储最新记录 buffer = new Map(); add(record) { const key = `${record.hotel_id}:${record.room_id}`; if (this.buffer.has(key)) { // 合并逻辑:只保留 ts_ms 更新的版本 const existing = this.buffer.get(key); if (record.ts_ms > existing.ts_ms) { this.buffer.set(key, record); } // 否则丢弃更旧的 } else { this.buffer.set(key, record); } // 缓冲满 → 立即刷新 if (this.buffer.size >= this.maxBufferSize) { this.flush(); } } } // 示例: // T=0ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1000}) // buffer = {"2045:6010" → {ts_ms: 1000}} // T=100ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1100}) // ts_ms newer → 覆盖 // buffer = {"2045:6010" → {ts_ms: 1100}} // T=200ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1050}) // ts_ms 旧 → 丢弃,不更新 // buffer = {"2045:6010" → {ts_ms: 1100}} (保持) // T=5000ms [Scheduled flush] // Write {hotel_id:"2045", room_id:"6010", ts_ms: 1100} to DB ``` #### Layer 2: 30秒写入冷却期 ```javascript class HeartbeatBuffer { // 追踪每个键的最后写入时间 lastWrittenAt = new Map(); // 冷却期配置(毫秒) cooldownMs = 30000; flush() { const nowTs = this.now(); const writableEntries = []; let minCooldownDelayMs = null; for (const [key, row] of this.buffer.entries()) { const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs); if (cooldownDelayMs > 0) { // 仍在冷却期内 → 跳过写入,保留在缓冲中 minCooldownDelayMs = minCooldownDelayMs == null ? cooldownDelayMs : Math.min(minCooldownDelayMs, cooldownDelayMs); continue; } // 已过冷却期 → 标记为可写 writableEntries.push([key, row]); this.buffer.delete(key); } // 执行数据库写入 const rows = writableEntries.map(([, row]) => row); if (rows.length > 0) { await this.dbManager.upsertBatch(rows); // 记录写入时间,启动新的冷却期 const writtenAt = this.now(); for (const [key] of writableEntries) { this.lastWrittenAt.set(key, writtenAt); } } // 调度下次刷新 const nextFlushDelayMs = minCooldownDelayMs ?? 5000; this.flushTimer = setTimeout(() => this.flush(), nextFlushDelayMs); } _getCooldownDelayMs(key, nowTs) { const lastWritten = this.lastWrittenAt.get(key); if (lastWritten == null) return 0; // 从未写入 → 立即可写 const cooldownExpiry = lastWritten + this.cooldownMs; const delay = cooldownExpiry - nowTs; return Math.max(0, delay); } } // 时间线示例(30秒冷却期): // T=0s flush() writes key "2045:6010" to DB // lastWrittenAt["2045:6010"] = 0 // T=1s add({hotel_id:"2045", room_id:"6010", ts_ms: 2000}) // buffer["2045:6010"] = {ts_ms: 2000} // T=2s flush() check: // cooldownLeft = 30000 - 2000 = 28000ms > 0 // → Skip write, keep in buffer // T=29s add({hotel_id:"2045", room_id:"6010", ts_ms: 2500}) // buffer update: {ts_ms: 2500} // T=30s flush() check: // cooldownLeft = 30000 - 30000 = 0 ≤ 0 // → Write {ts_ms: 2500} to DB // lastWrittenAt["2045:6010"] = 30000 (new cooldown starts) ``` ## 5. 批量 Upsert 与时间序列保护 ### 5.1 SQL 设计 ```sql -- 参数化查询 INSERT INTO room_status_moment_g5 (hotel_id, room_id, device_id, ts_ms, status) VALUES -- ($1::smallint, $2, $3, $4, 1), -- Record 1 -- ($5::smallint, $6, $7, $8, 1), -- Record 2 -- ... ON CONFLICT (hotel_id, room_id) DO UPDATE SET ts_ms = EXCLUDED.ts_ms, status = 1 WHERE EXCLUDED.ts_ms >= current.ts_ms; ``` ### 5.2 类型转换策略 ```javascript // Kafka 传来的字符串 hotel_id const kafkaData = { hotel_id: "2045", // STRING in Kafka room_id: "6010", device_id: "DEV001", ts_ms: 1234567890 }; // 构建参数化查询 const params = [ parseInt(kafkaData.hotel_id), // 转为数字 kafkaData.room_id, kafkaData.device_id, kafkaData.ts_ms ]; // SQL 中的 ::smallint 强制类型转换 // $1::smallint 确保即使参数是数字也能与 G5 smallint 列兼容 ``` ## 6. 批量提交策略 ### 6.1 Kafka 偏移量提交 ```javascript // 旧策略(低效):逐条消息提交 message => { const parsed = parseHeartbeat(message); if (parsed) buffer.add(parsed); consumer.commitOffset({...}); // 每条消息都提交! } // 新策略(高效):200ms 周期性批量提交 const commitInterval = 200; // 毫秒 setInterval(() => { // 此时刻之前消费的所有消息一次性提交 consumer.commit(false, (err) => { if (!err) logger.debug('Batch offset committed'); }); }, commitInterval); ``` ### 6.2 提交间隔与吞吐量的权衡 | 策略 | 提交间隔 | 优点 | 缺点 | |------|---------|------|------| | Per-message | 1ms | 最高可靠性 | 消费速度慢 50% | | 200ms batch | 200ms | 平衡可靠性与吞吐 | 故障时丢失 <200ms 消息 | | 5s batch | 5000ms | 最高吞吐 | 故障风险大 | **当前选择**: 200ms (平衡) ## 7. 错误恢复机制 ### 7.1 Parser 错误 ```javascript const parsed = parseHeartbeat(rawMessage); if (parsed === null) { // 验证失败: // - 计数记录 (invalidCount++) // - 偏移量正常提交(不再消费此消息) // - 无重试(垃圾消息被丢弃) stats.invalidCount++; consumer.commitOffset(...); continue; } ``` ### 7.2 数据库写入失败 ```javascript try { await this.dbManager.upsertBatch(rows); // 标记冷却期 const writtenAt = this.now(); for (const [key] of writableEntries) { this.lastWrittenAt.set(key, writtenAt); } } catch (err) { // 写入失败 → 记录保留在缓冲中 // 30秒后重试(或等待缓冲满时重试) logger.error(`DB upsert failed: ${err.message}`); // writableEntries 已从 buffer 中删除,需要重新添加 for (const [key, row] of writableEntries) { this.buffer.set(key, row); } } ``` ## 8. 性能特征与优化 ### 8.1 吞吐量分析 ``` 配置: 6 消费者 × 6 分区 = 1:1 映射 (最优) Kafka fetch batch size: 100,000 messages Kafka commit interval: 200ms 理论吞吐: - 每个消费者每秒消费: ~5000 msg/s - 6 消费者总计: ~30,000 msg/s 缓冲与去重: - 5s 窗口: 去除重复速度 ~99.5% (典型数据) - 30s 冷却: 进一步降低 DB 写入压力 ~60-80% DB 写入: - Batch size: 最多 5000 条 或 5s 周期 - 实际写入速率: ~3,000-5,000 rows/s (受冷却期抑制) ``` ### 8.2 内存占用 ``` 缓冲区 (5s 窗口): - 每条记录 ~200 bytes - 最多 5000 条 - 总计 ~1 MB lastWrittenAt 追踪: - 键数 = 酒店数 × 房间数 - ~100 酒店 × 1000 房间 = 100K 键 - 每个键 Map 条目 ~50 bytes - 总计 ~5 MB Stats 对象: ~1 KB 整体估计: ~10 MB 内存占用 ``` ## 9. 关键设计决策 | 决策 | 选择 | 理由 | |------|------|------| | 消费者数 | 动态 = 分区数 | 避免静态配置失效 | | 验证框架 | 手写 vs Zod | 手写快 10x,热路径优化 | | 去重策略 | 双层 (5s+30s) | 单层不足,内存与性能折衷 | | hotel_id 类型 | 字符串(数字形式) | 与 Kafka 实际数据一致 | | SQL 冲突解决 | WHERE ts_ms 保护 | 防止乱序消息回滚数据 | | 批量提交周期 | 200ms | 平衡吞吐与可靠性 | --- **上次修订**: 2026-03-11 **维护者**: BLS OldRCU Heartbeat Team