- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。 - 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。 - 新增配置文件 config.js,支持从环境变量加载配置。 - 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。 - 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。 - 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。 - 新增日志记录工具,支持不同级别的日志输出。 - 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。 - 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。 - 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。 - 配置 Vite 构建工具,支持 Node.js 环境的构建。
6.6 KiB
6.6 KiB
Kafka 处理规范 (Kafka Specification)
1. Kafka 集群配置
1.1 基本信息
| 配置项 | 值 | 说明 |
|---|---|---|
| 集群地址 | kafka.blv-oa.com:9092 | 生产 Kafka broker |
| 主题 | blwlog4Nodejs-oldrcu-heartbeat-topic | 心跳事件主题 |
| 分区数 | 6 (auto-detected) | 运行时动态检测 |
| 消费者组 | (TBD) | 建议配置消费者组 ID |
| 协议版本 | v5 (kafka-node) | Kafka Node 库版本 |
1.2 消费者连接参数
// src/config/config.js
const kafkaConfig = {
brokers: process.env.KAFKA_BROKERS || 'kafka.blv-oa.com:9092',
topic: process.env.KAFKA_TOPIC_HEARTBEAT || 'blwlog4Nodejs-oldrcu-heartbeat-topic',
// 消费实例数
consumerInstances: parseInt(process.env.KAFKA_CONSUMER_INSTANCES || '3', 10),
// 性能优化
batchSize: parseInt(process.env.KAFKA_BATCH_SIZE || '100000', 10),
fetchMinBytes: parseInt(process.env.KAFKA_FETCH_MIN_BYTES || '65536', 10),
commitIntervalMs: parseInt(process.env.KAFKA_COMMIT_INTERVAL_MS || '200', 10)
};
2. 消费者创建与配置
2.1 分区感知的动态伸缩
启动时自动查询 Kafka 元数据,根据实际分区数扩展消费者:
// src/kafka/consumer.js
async function resolveTopicPartitionCount(kafkaConfig) {
const client = new kafka.KafkaClient({
kafkaHost: kafkaConfig.brokers
});
return new Promise((resolve, reject) => {
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
if (err) return reject(err);
const topicMetadata = metadata[0];
const partitionCount = topicMetadata.partitions.length;
logger.info(`Topic "${kafkaConfig.topic}" has ${partitionCount} partitions`);
client.close();
resolve(partitionCount);
});
});
}
async function createKafkaConsumers(kafkaConfig) {
const configuredInstances = kafkaConfig.consumerInstances;
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
// 关键:伸缩到 max(配置, 分区数)
const instanceCount = Math.max(configuredInstances, partitionCount);
logger.info(`Kafka consumer scaling: ${configuredInstances} configured, ${partitionCount} partitions, creating ${instanceCount} instances`);
const consumers = [];
for (let i = 0; i < instanceCount; i++) {
const consumer = createOneConsumer(i, kafkaConfig);
consumers.push(consumer);
}
return consumers;
}
3. 消息消费流程
3.1 消息处理时序
Kafka Broker (Topic: blwlog4Nodejs-oldrcu-heartbeat-topic)
↓
6 个消费者实例 (Consumer 0-5)
↓
┌─────────────────────────────────────┐
│ Consumer 0 (Partition 0) │
├─────────────────────────────────────┤
│ on('message'): handle message │
│ ├─ parseHeartbeat(message.value) │
│ ├─ if valid: buffer.add(parsed) │
│ └─ if invalid: stats.invalidCount++│
│ │
│ 200ms 周期性提交偏移量 │
│ └─ consumer.commit(false, cb) │
└─────────────────────────────────────┘
3.2 失败恢复策略
消息处理失败
try {
const parsed = parseHeartbeat(message.value);
if (parsed !== null) {
heartbeatBuffer.add(parsed);
stats.validMessages++;
} else {
stats.invalidMessages++;
// 注意:即使验证失败,偏移量仍会提交
// 垃圾消息被永久丢弃(不重试)
}
} catch (err) {
logger.error(`Unexpected error processing message: ${err.message}`);
stats.errorCount++;
// 错误消息偏移量也会被提交,避免无限重试
}
连接中断
// kafka-node 内置重连机制
consumer.on('error', (err) => {
logger.error(`Consumer error: ${err.message}`, err);
// kafka-node 自动尝试重新连接
// 如需手动控制,可添加重连逻辑
if (err.name === 'FailedToRegistMetadata') {
setTimeout(() => {
logger.info('Attempting to reconnect to Kafka');
// consumer.connect();
}, 5000);
}
});
4. 偏移量管理
4.1 手动批量提交策略
// 配置:关闭自动提交
const consumerConfig = {
autoCommit: false, // 手动管理偏移量
// ...
};
// 实现:200ms 周期性批量提交
consumer.on('message', (message) => {
handleMessage(message);
// 周期性检查
const now = Date.now();
if (now - lastCommitTime >= 200) {
// 非阻塞提交(false 参数)
consumer.commit(false, (err) => {
if (err) {
logger.warn(`Offset commit failed: ${err.message}`);
} else {
lastCommitTime = now;
logger.debug('Offsets committed');
}
});
}
});
4.2 提交间隔的权衡
| 间隔 | 吞吐 | 可靠性 | 使用场景 |
|---|---|---|---|
| 10ms | 极高 | 低 | 不建议 |
| 200ms | 高 | 中 | ✓ 推荐(当前) |
| 1000ms | 高 | 中 | 可接受 |
| 5000ms | 最高 | 低 | 高吞吐但容易丢消息 |
选择 200ms 的理由:
- 不阻塞消费速度(消费速率 ~30K msg/s,提交开销 <1%)
- 故障时最多丢失 200ms 左右的消息(可接受)
- 平衡吞吐与可靠性
5. 监控与调试
5.1 消费者状态监控
const stats = {
totalMessages: 0,
validMessages: 0,
invalidMessages: 0,
errorCount: 0,
lastUpdateTime: Date.now()
};
// 周期性报告(通过 Redis 或日志)
setInterval(() => {
const rate = stats.validMessages / ((Date.now() - stats.lastUpdateTime) / 1000);
logger.info(`Consumption stats:
Total: ${stats.totalMessages}
Valid: ${stats.validMessages} (${(stats.validMessages/stats.totalMessages*100).toFixed(2)}%)
Invalid: ${stats.invalidMessages}
Errors: ${stats.errorCount}
Rate: ${rate.toFixed(0)} msg/s
`);
stats.lastUpdateTime = Date.now();
}, 10000); // 每 10s 报告一次
5.2 Kafka 消息采样
用于诊断消息结构和验证规范吻合度:
npm run sample:kafka
6. 性能调优指南
6.1 动态参数调整
# 环境变量控制
export KAFKA_BROKER_HOSTS=kafka.blv-oa.com:9092
export KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic
export KAFKA_CONSUMER_INSTANCES=3 # 基础实例数(自动伸缩到分区数)
export KAFKA_BATCH_SIZE=100000 # 批量拉取大小
export KAFKA_FETCH_MIN_BYTES=65536 # 最小字节数
export KAFKA_COMMIT_INTERVAL_MS=200 # 提交间隔
上次修订: 2026-03-11