485 lines
16 KiB
Markdown
485 lines
16 KiB
Markdown
|
|
# 架构规范 (Architecture Specification)
|
|||
|
|
|
|||
|
|
## 1. 系统架构图
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|||
|
|
│ Kafka Cluster │
|
|||
|
|
│ Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions) │
|
|||
|
|
└──────────────────────┬──────────────────────────────────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────┼──────────────┐
|
|||
|
|
│ │ │
|
|||
|
|
Part0 Part1 ... Part5
|
|||
|
|
│ │ │
|
|||
|
|
┌───▼────┐ ┌────▼──┐ ┌──▼────┐
|
|||
|
|
│ Cons- │ │ Cons- │ ... │ Cons- │ (6 Consumer Instances)
|
|||
|
|
│ umer-0 │ │ umer-1 │ │ umer-5│
|
|||
|
|
└───┬────┘ └────┬──┘ └──┬────┘
|
|||
|
|
│ │ │
|
|||
|
|
└──────────────┼─────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼───────────────┐
|
|||
|
|
│ HeartbeatParser │
|
|||
|
|
│ (Validation & Type Check) │
|
|||
|
|
│ - ts_ms: number │
|
|||
|
|
│ - hotel_id: digit-string │
|
|||
|
|
│ - room_id: non-blank string │
|
|||
|
|
│ - device_id: non-blank str │
|
|||
|
|
└──────────────┬───────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼────────────────────────┐
|
|||
|
|
│ HeartbeatBuffer (Layer 1 Dedup) │
|
|||
|
|
│ ◆ 5-second window │
|
|||
|
|
│ ◆ In-memory dedup by key │
|
|||
|
|
│ ◆ Keep latest ts_ms per key │
|
|||
|
|
│ ◆ Stats tracking (pulled/merged) │
|
|||
|
|
└──────────────┬────────────────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼────────────────────────┐
|
|||
|
|
│ Cooldown Filter (Layer 2 Dedup) │
|
|||
|
|
│ ◆ 30-second cooldown-per-key │
|
|||
|
|
│ ◆ lastWrittenAt Map tracking │
|
|||
|
|
│ ◆ Hold updates during cooldown │
|
|||
|
|
│ ◆ Flush only eligible keys │
|
|||
|
|
└──────────────┬────────────────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼────────────────────────┐
|
|||
|
|
│ HeartbeatDbManager (Batch Upsert) │
|
|||
|
|
│ ◆ Parameterized SQL with type cast │
|
|||
|
|
│ ◆ ::smallint for hotel_id │
|
|||
|
|
│ ◆ ON CONFLICT with ts_ms ordering │
|
|||
|
|
│ ◆ Batched writes (5s or maxSize) │
|
|||
|
|
└──────────────┬────────────────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼────────────────────────┐
|
|||
|
|
│ PostgreSQL G5 │
|
|||
|
|
│ (room_status_moment_g5 table) │
|
|||
|
|
│ Primary Key: (hotel_id, room_id) │
|
|||
|
|
│ Columns: ts_ms, device_id, status │
|
|||
|
|
└──────────────┬────────────────────────┘
|
|||
|
|
│
|
|||
|
|
┌──────────────▼────────────────────────┐
|
|||
|
|
│ Metrics Reporter (Redis + Cron) │
|
|||
|
|
│ ◆ Consumption rate │
|
|||
|
|
│ ◆ Dedup hit rate │
|
|||
|
|
│ ◆ Write latency │
|
|||
|
|
└───────────────────────────────────────┘
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 2. 模块交互时序
|
|||
|
|
|
|||
|
|
### 2.1 单条消息处理时序
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
Timeline: Message arrives at Kafka
|
|||
|
|
|
|||
|
|
T=0ms [Kafka] Partition-0 retrieves message
|
|||
|
|
├─ raw: {"ts_ms":1234567890, "hotel_id":"2045", "room_id":"6010",
|
|||
|
|
│ "device_id":"DEV001", "current_time": "2026-03-11T10:30:00Z"}
|
|||
|
|
│
|
|||
|
|
T=0.1ms [Consumer] Receives from Kafka
|
|||
|
|
│
|
|||
|
|
T=0.2ms [Parser] Validates
|
|||
|
|
├─ ts_ms check: Number.isFinite(1234567890) ✓
|
|||
|
|
├─ hotel_id check: isDigitsOnly("2045") ✓
|
|||
|
|
├─ room_id check: isNonBlankString("6010") ✓
|
|||
|
|
├─ device_id check: isNonBlankString("DEV001") ✓
|
|||
|
|
├─ Returns: {ts_ms, hotel_id, room_id, device_id} ✓
|
|||
|
|
│
|
|||
|
|
T=1ms [HeartbeatBuffer] Add to buffer
|
|||
|
|
├─ key = "2045:6010"
|
|||
|
|
├─ Check buffer.has(key)?
|
|||
|
|
│ ├─ NO → Add new entry
|
|||
|
|
│ └─ YES → Merge if ts_ms newer
|
|||
|
|
├─ Check if buffer.size >= maxSize (5000)?
|
|||
|
|
│ └─ YES → Trigger flush immediately
|
|||
|
|
│
|
|||
|
|
T=2ms [Cooldown Check] In flush()
|
|||
|
|
├─ nowTs = Date.now()
|
|||
|
|
├─ For each key in buffer:
|
|||
|
|
│ ├─ cooldownLeft = lastWrittenAt[key] + 30000 - nowTs
|
|||
|
|
│ ├─ IF cooldownLeft > 0
|
|||
|
|
│ │ └─ Skip (keep in buffer for later)
|
|||
|
|
│ ├─ ELSE (eligible)
|
|||
|
|
│ │ ├─ Move to writableEntries
|
|||
|
|
│ │ └─ Remove from buffer
|
|||
|
|
│
|
|||
|
|
T=5000ms [Scheduled Flush Every 5s]
|
|||
|
|
├─ Writable entries collected
|
|||
|
|
├─ DB upsert batch
|
|||
|
|
├─ On success:
|
|||
|
|
│ └─ Mark lastWrittenAt[key] = current time
|
|||
|
|
├─ On error:
|
|||
|
|
│ └─ Re-add to buffer for retry
|
|||
|
|
│
|
|||
|
|
T=5001ms [Schedule Next Flush]
|
|||
|
|
├─ IF buffer is empty
|
|||
|
|
│ └─ Schedule next at T+5000ms
|
|||
|
|
├─ IF cooldown exists
|
|||
|
|
│ └─ Schedule at earliest cooldown expiry
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2.2 多消费者分区映射
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
Topic: blwlog4Nodejs-oldrcu-heartbeat-topic (6 partitions)
|
|||
|
|
|
|||
|
|
Partition Assignment (after auto-scaling):
|
|||
|
|
┌────────────────────────────────────────┐
|
|||
|
|
│ Partition 0 → Consumer Instance 0 │
|
|||
|
|
│ Partition 1 → Consumer Instance 1 │
|
|||
|
|
│ Partition 2 → Consumer Instance 2 │
|
|||
|
|
│ Partition 3 → Consumer Instance 3 │
|
|||
|
|
│ Partition 4 → Consumer Instance 4 │
|
|||
|
|
│ Partition 5 → Consumer Instance 5 │
|
|||
|
|
└────────────────────────────────────────┘
|
|||
|
|
|
|||
|
|
All instances share:
|
|||
|
|
- Same HeartbeatBuffer instance (in-memory, 5s window)
|
|||
|
|
- Same HeartbeatDbManager (batched writes to G5)
|
|||
|
|
- Same Redis connection (metrics reporting)
|
|||
|
|
|
|||
|
|
Benefit: Load distributed across partitions, bottleneck = DB write rate
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 3. 消费者自动伸缩机制
|
|||
|
|
|
|||
|
|
### 3.1 启动时分区检测流程
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// src/kafka/consumer.js
|
|||
|
|
|
|||
|
|
async function resolveTopicPartitionCount(kafkaConfig) {
|
|||
|
|
// Step 1: 建立临时 Kafka 客户端
|
|||
|
|
const client = new kafka.KafkaClient({
|
|||
|
|
kafkaHost: kafkaConfig.brokers
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Step 2: 异步查询主题元数据
|
|||
|
|
await new Promise((resolve, reject) => {
|
|||
|
|
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
|
|||
|
|
if (err) reject(err);
|
|||
|
|
else {
|
|||
|
|
const partitions = metadata[0].partitions;
|
|||
|
|
const count = partitions.length; // e.g., 6
|
|||
|
|
resolve(count);
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Step 3: 关闭客户端,返回分区数
|
|||
|
|
client.close();
|
|||
|
|
return count;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 启动流程
|
|||
|
|
const configuredInstances = 3;
|
|||
|
|
const actualPartitionCount = await resolveTopicPartitionCount(kafkaConfig);
|
|||
|
|
const instanceCount = Math.max(configuredInstances, actualPartitionCount);
|
|||
|
|
// 结果: max(3, 6) = 6 instances created
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.2 消费者动态创建
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
async function createKafkaConsumers(kafkaConfig) {
|
|||
|
|
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
|
|||
|
|
const count = Math.max(3, partitionCount);
|
|||
|
|
|
|||
|
|
const consumers = [];
|
|||
|
|
for (let i = 0; i < count; i++) {
|
|||
|
|
const consumer = createOneConsumer(i, kafkaConfig);
|
|||
|
|
consumers.push(consumer);
|
|||
|
|
logger.info(`Started Kafka consumer ${i}/${count-1}`);
|
|||
|
|
}
|
|||
|
|
return consumers;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 4. 缓冲区与去重策略详解
|
|||
|
|
|
|||
|
|
### 4.1 双层去重架构
|
|||
|
|
|
|||
|
|
#### Layer 1: 5秒时间窗口去重
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
class HeartbeatBuffer {
|
|||
|
|
// 内存 Map,按键存储最新记录
|
|||
|
|
buffer = new Map();
|
|||
|
|
|
|||
|
|
add(record) {
|
|||
|
|
const key = `${record.hotel_id}:${record.room_id}`;
|
|||
|
|
|
|||
|
|
if (this.buffer.has(key)) {
|
|||
|
|
// 合并逻辑:只保留 ts_ms 更新的版本
|
|||
|
|
const existing = this.buffer.get(key);
|
|||
|
|
if (record.ts_ms > existing.ts_ms) {
|
|||
|
|
this.buffer.set(key, record);
|
|||
|
|
}
|
|||
|
|
// 否则丢弃更旧的
|
|||
|
|
} else {
|
|||
|
|
this.buffer.set(key, record);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 缓冲满 → 立即刷新
|
|||
|
|
if (this.buffer.size >= this.maxBufferSize) {
|
|||
|
|
this.flush();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 示例:
|
|||
|
|
// T=0ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1000})
|
|||
|
|
// buffer = {"2045:6010" → {ts_ms: 1000}}
|
|||
|
|
// T=100ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1100})
|
|||
|
|
// ts_ms newer → 覆盖
|
|||
|
|
// buffer = {"2045:6010" → {ts_ms: 1100}}
|
|||
|
|
// T=200ms add({hotel_id:"2045", room_id:"6010", ts_ms: 1050})
|
|||
|
|
// ts_ms 旧 → 丢弃,不更新
|
|||
|
|
// buffer = {"2045:6010" → {ts_ms: 1100}} (保持)
|
|||
|
|
// T=5000ms [Scheduled flush]
|
|||
|
|
// Write {hotel_id:"2045", room_id:"6010", ts_ms: 1100} to DB
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### Layer 2: 30秒写入冷却期
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
class HeartbeatBuffer {
|
|||
|
|
// 追踪每个键的最后写入时间
|
|||
|
|
lastWrittenAt = new Map();
|
|||
|
|
|
|||
|
|
// 冷却期配置(毫秒)
|
|||
|
|
cooldownMs = 30000;
|
|||
|
|
|
|||
|
|
flush() {
|
|||
|
|
const nowTs = this.now();
|
|||
|
|
const writableEntries = [];
|
|||
|
|
let minCooldownDelayMs = null;
|
|||
|
|
|
|||
|
|
for (const [key, row] of this.buffer.entries()) {
|
|||
|
|
const cooldownDelayMs = this._getCooldownDelayMs(key, nowTs);
|
|||
|
|
|
|||
|
|
if (cooldownDelayMs > 0) {
|
|||
|
|
// 仍在冷却期内 → 跳过写入,保留在缓冲中
|
|||
|
|
minCooldownDelayMs = minCooldownDelayMs == null
|
|||
|
|
? cooldownDelayMs
|
|||
|
|
: Math.min(minCooldownDelayMs, cooldownDelayMs);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 已过冷却期 → 标记为可写
|
|||
|
|
writableEntries.push([key, row]);
|
|||
|
|
this.buffer.delete(key);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行数据库写入
|
|||
|
|
const rows = writableEntries.map(([, row]) => row);
|
|||
|
|
if (rows.length > 0) {
|
|||
|
|
await this.dbManager.upsertBatch(rows);
|
|||
|
|
|
|||
|
|
// 记录写入时间,启动新的冷却期
|
|||
|
|
const writtenAt = this.now();
|
|||
|
|
for (const [key] of writableEntries) {
|
|||
|
|
this.lastWrittenAt.set(key, writtenAt);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 调度下次刷新
|
|||
|
|
const nextFlushDelayMs = minCooldownDelayMs ?? 5000;
|
|||
|
|
this.flushTimer = setTimeout(() => this.flush(), nextFlushDelayMs);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
_getCooldownDelayMs(key, nowTs) {
|
|||
|
|
const lastWritten = this.lastWrittenAt.get(key);
|
|||
|
|
if (lastWritten == null) return 0; // 从未写入 → 立即可写
|
|||
|
|
|
|||
|
|
const cooldownExpiry = lastWritten + this.cooldownMs;
|
|||
|
|
const delay = cooldownExpiry - nowTs;
|
|||
|
|
return Math.max(0, delay);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 时间线示例(30秒冷却期):
|
|||
|
|
// T=0s flush() writes key "2045:6010" to DB
|
|||
|
|
// lastWrittenAt["2045:6010"] = 0
|
|||
|
|
// T=1s add({hotel_id:"2045", room_id:"6010", ts_ms: 2000})
|
|||
|
|
// buffer["2045:6010"] = {ts_ms: 2000}
|
|||
|
|
// T=2s flush() check:
|
|||
|
|
// cooldownLeft = 30000 - 2000 = 28000ms > 0
|
|||
|
|
// → Skip write, keep in buffer
|
|||
|
|
// T=29s add({hotel_id:"2045", room_id:"6010", ts_ms: 2500})
|
|||
|
|
// buffer update: {ts_ms: 2500}
|
|||
|
|
// T=30s flush() check:
|
|||
|
|
// cooldownLeft = 30000 - 30000 = 0 ≤ 0
|
|||
|
|
// → Write {ts_ms: 2500} to DB
|
|||
|
|
// lastWrittenAt["2045:6010"] = 30000 (new cooldown starts)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 5. 批量 Upsert 与时间序列保护
|
|||
|
|
|
|||
|
|
### 5.1 SQL 设计
|
|||
|
|
|
|||
|
|
```sql
|
|||
|
|
-- 参数化查询
|
|||
|
|
INSERT INTO room_status_moment_g5
|
|||
|
|
(hotel_id, room_id, device_id, ts_ms, status)
|
|||
|
|
VALUES
|
|||
|
|
-- ($1::smallint, $2, $3, $4, 1), -- Record 1
|
|||
|
|
-- ($5::smallint, $6, $7, $8, 1), -- Record 2
|
|||
|
|
-- ...
|
|||
|
|
ON CONFLICT (hotel_id, room_id) DO UPDATE SET
|
|||
|
|
ts_ms = EXCLUDED.ts_ms,
|
|||
|
|
status = 1
|
|||
|
|
WHERE EXCLUDED.ts_ms >= current.ts_ms;
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 5.2 类型转换策略
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// Kafka 传来的字符串 hotel_id
|
|||
|
|
const kafkaData = {
|
|||
|
|
hotel_id: "2045", // STRING in Kafka
|
|||
|
|
room_id: "6010",
|
|||
|
|
device_id: "DEV001",
|
|||
|
|
ts_ms: 1234567890
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 构建参数化查询
|
|||
|
|
const params = [
|
|||
|
|
parseInt(kafkaData.hotel_id), // 转为数字
|
|||
|
|
kafkaData.room_id,
|
|||
|
|
kafkaData.device_id,
|
|||
|
|
kafkaData.ts_ms
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
// SQL 中的 ::smallint 强制类型转换
|
|||
|
|
// $1::smallint 确保即使参数是数字也能与 G5 smallint 列兼容
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 6. 批量提交策略
|
|||
|
|
|
|||
|
|
### 6.1 Kafka 偏移量提交
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// 旧策略(低效):逐条消息提交
|
|||
|
|
message => {
|
|||
|
|
const parsed = parseHeartbeat(message);
|
|||
|
|
if (parsed) buffer.add(parsed);
|
|||
|
|
consumer.commitOffset({...}); // 每条消息都提交!
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 新策略(高效):200ms 周期性批量提交
|
|||
|
|
const commitInterval = 200; // 毫秒
|
|||
|
|
setInterval(() => {
|
|||
|
|
// 此时刻之前消费的所有消息一次性提交
|
|||
|
|
consumer.commit(false, (err) => {
|
|||
|
|
if (!err) logger.debug('Batch offset committed');
|
|||
|
|
});
|
|||
|
|
}, commitInterval);
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 6.2 提交间隔与吞吐量的权衡
|
|||
|
|
|
|||
|
|
| 策略 | 提交间隔 | 优点 | 缺点 |
|
|||
|
|
|------|---------|------|------|
|
|||
|
|
| Per-message | 1ms | 最高可靠性 | 消费速度慢 50% |
|
|||
|
|
| 200ms batch | 200ms | 平衡可靠性与吞吐 | 故障时丢失 <200ms 消息 |
|
|||
|
|
| 5s batch | 5000ms | 最高吞吐 | 故障风险大 |
|
|||
|
|
|
|||
|
|
**当前选择**: 200ms (平衡)
|
|||
|
|
|
|||
|
|
## 7. 错误恢复机制
|
|||
|
|
|
|||
|
|
### 7.1 Parser 错误
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
const parsed = parseHeartbeat(rawMessage);
|
|||
|
|
if (parsed === null) {
|
|||
|
|
// 验证失败:
|
|||
|
|
// - 计数记录 (invalidCount++)
|
|||
|
|
// - 偏移量正常提交(不再消费此消息)
|
|||
|
|
// - 无重试(垃圾消息被丢弃)
|
|||
|
|
stats.invalidCount++;
|
|||
|
|
consumer.commitOffset(...);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 7.2 数据库写入失败
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
try {
|
|||
|
|
await this.dbManager.upsertBatch(rows);
|
|||
|
|
// 标记冷却期
|
|||
|
|
const writtenAt = this.now();
|
|||
|
|
for (const [key] of writableEntries) {
|
|||
|
|
this.lastWrittenAt.set(key, writtenAt);
|
|||
|
|
}
|
|||
|
|
} catch (err) {
|
|||
|
|
// 写入失败 → 记录保留在缓冲中
|
|||
|
|
// 30秒后重试(或等待缓冲满时重试)
|
|||
|
|
logger.error(`DB upsert failed: ${err.message}`);
|
|||
|
|
// writableEntries 已从 buffer 中删除,需要重新添加
|
|||
|
|
for (const [key, row] of writableEntries) {
|
|||
|
|
this.buffer.set(key, row);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 8. 性能特征与优化
|
|||
|
|
|
|||
|
|
### 8.1 吞吐量分析
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
配置: 6 消费者 × 6 分区 = 1:1 映射 (最优)
|
|||
|
|
Kafka fetch batch size: 100,000 messages
|
|||
|
|
Kafka commit interval: 200ms
|
|||
|
|
|
|||
|
|
理论吞吐:
|
|||
|
|
- 每个消费者每秒消费: ~5000 msg/s
|
|||
|
|
- 6 消费者总计: ~30,000 msg/s
|
|||
|
|
|
|||
|
|
缓冲与去重:
|
|||
|
|
- 5s 窗口: 去除重复速度 ~99.5% (典型数据)
|
|||
|
|
- 30s 冷却: 进一步降低 DB 写入压力 ~60-80%
|
|||
|
|
|
|||
|
|
DB 写入:
|
|||
|
|
- Batch size: 最多 5000 条 或 5s 周期
|
|||
|
|
- 实际写入速率: ~3,000-5,000 rows/s (受冷却期抑制)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 8.2 内存占用
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
缓冲区 (5s 窗口):
|
|||
|
|
- 每条记录 ~200 bytes
|
|||
|
|
- 最多 5000 条
|
|||
|
|
- 总计 ~1 MB
|
|||
|
|
|
|||
|
|
lastWrittenAt 追踪:
|
|||
|
|
- 键数 = 酒店数 × 房间数
|
|||
|
|
- ~100 酒店 × 1000 房间 = 100K 键
|
|||
|
|
- 每个键 Map 条目 ~50 bytes
|
|||
|
|
- 总计 ~5 MB
|
|||
|
|
|
|||
|
|
Stats 对象: ~1 KB
|
|||
|
|
整体估计: ~10 MB 内存占用
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 9. 关键设计决策
|
|||
|
|
|
|||
|
|
| 决策 | 选择 | 理由 |
|
|||
|
|
|------|------|------|
|
|||
|
|
| 消费者数 | 动态 = 分区数 | 避免静态配置失效 |
|
|||
|
|
| 验证框架 | 手写 vs Zod | 手写快 10x,热路径优化 |
|
|||
|
|
| 去重策略 | 双层 (5s+30s) | 单层不足,内存与性能折衷 |
|
|||
|
|
| hotel_id 类型 | 字符串(数字形式) | 与 Kafka 实际数据一致 |
|
|||
|
|
| SQL 冲突解决 | WHERE ts_ms 保护 | 防止乱序消息回滚数据 |
|
|||
|
|
| 批量提交周期 | 200ms | 平衡吞吐与可靠性 |
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
**上次修订**: 2026-03-11
|
|||
|
|
**维护者**: BLS OldRCU Heartbeat Team
|