Files

241 lines
6.6 KiB
Markdown
Raw Permalink Normal View History

# Kafka 处理规范 (Kafka Specification)
## 1. Kafka 集群配置
### 1.1 基本信息
| 配置项 | 值 | 说明 |
|--------|-----|------|
| 集群地址 | kafka.blv-oa.com:9092 | 生产 Kafka broker |
| 主题 | blwlog4Nodejs-oldrcu-heartbeat-topic | 心跳事件主题 |
| 分区数 | 6 (auto-detected) | 运行时动态检测 |
| 消费者组 | (TBD) | 建议配置消费者组 ID |
| 协议版本 | v5 (kafka-node) | Kafka Node 库版本 |
### 1.2 消费者连接参数
```javascript
// src/config/config.js
const kafkaConfig = {
brokers: process.env.KAFKA_BROKERS || 'kafka.blv-oa.com:9092',
topic: process.env.KAFKA_TOPIC_HEARTBEAT || 'blwlog4Nodejs-oldrcu-heartbeat-topic',
// 消费实例数
consumerInstances: parseInt(process.env.KAFKA_CONSUMER_INSTANCES || '3', 10),
// 性能优化
batchSize: parseInt(process.env.KAFKA_BATCH_SIZE || '100000', 10),
fetchMinBytes: parseInt(process.env.KAFKA_FETCH_MIN_BYTES || '65536', 10),
commitIntervalMs: parseInt(process.env.KAFKA_COMMIT_INTERVAL_MS || '200', 10)
};
```
## 2. 消费者创建与配置
### 2.1 分区感知的动态伸缩
启动时自动查询 Kafka 元数据,根据实际分区数扩展消费者:
```javascript
// src/kafka/consumer.js
async function resolveTopicPartitionCount(kafkaConfig) {
const client = new kafka.KafkaClient({
kafkaHost: kafkaConfig.brokers
});
return new Promise((resolve, reject) => {
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
if (err) return reject(err);
const topicMetadata = metadata[0];
const partitionCount = topicMetadata.partitions.length;
logger.info(`Topic "${kafkaConfig.topic}" has ${partitionCount} partitions`);
client.close();
resolve(partitionCount);
});
});
}
async function createKafkaConsumers(kafkaConfig) {
const configuredInstances = kafkaConfig.consumerInstances;
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
// 关键:伸缩到 max(配置, 分区数)
const instanceCount = Math.max(configuredInstances, partitionCount);
logger.info(`Kafka consumer scaling: ${configuredInstances} configured, ${partitionCount} partitions, creating ${instanceCount} instances`);
const consumers = [];
for (let i = 0; i < instanceCount; i++) {
const consumer = createOneConsumer(i, kafkaConfig);
consumers.push(consumer);
}
return consumers;
}
```
## 3. 消息消费流程
### 3.1 消息处理时序
```
Kafka Broker (Topic: blwlog4Nodejs-oldrcu-heartbeat-topic)
6 个消费者实例 (Consumer 0-5)
┌─────────────────────────────────────┐
│ Consumer 0 (Partition 0) │
├─────────────────────────────────────┤
│ on('message'): handle message │
│ ├─ parseHeartbeat(message.value) │
│ ├─ if valid: buffer.add(parsed) │
│ └─ if invalid: stats.invalidCount++│
│ │
│ 200ms 周期性提交偏移量 │
│ └─ consumer.commit(false, cb) │
└─────────────────────────────────────┘
```
### 3.2 失败恢复策略
#### 消息处理失败
```javascript
try {
const parsed = parseHeartbeat(message.value);
if (parsed !== null) {
heartbeatBuffer.add(parsed);
stats.validMessages++;
} else {
stats.invalidMessages++;
// 注意:即使验证失败,偏移量仍会提交
// 垃圾消息被永久丢弃(不重试)
}
} catch (err) {
logger.error(`Unexpected error processing message: ${err.message}`);
stats.errorCount++;
// 错误消息偏移量也会被提交,避免无限重试
}
```
#### 连接中断
```javascript
// kafka-node 内置重连机制
consumer.on('error', (err) => {
logger.error(`Consumer error: ${err.message}`, err);
// kafka-node 自动尝试重新连接
// 如需手动控制,可添加重连逻辑
if (err.name === 'FailedToRegistMetadata') {
setTimeout(() => {
logger.info('Attempting to reconnect to Kafka');
// consumer.connect();
}, 5000);
}
});
```
## 4. 偏移量管理
### 4.1 手动批量提交策略
```javascript
// 配置:关闭自动提交
const consumerConfig = {
autoCommit: false, // 手动管理偏移量
// ...
};
// 实现200ms 周期性批量提交
consumer.on('message', (message) => {
handleMessage(message);
// 周期性检查
const now = Date.now();
if (now - lastCommitTime >= 200) {
// 非阻塞提交false 参数)
consumer.commit(false, (err) => {
if (err) {
logger.warn(`Offset commit failed: ${err.message}`);
} else {
lastCommitTime = now;
logger.debug('Offsets committed');
}
});
}
});
```
### 4.2 提交间隔的权衡
| 间隔 | 吞吐 | 可靠性 | 使用场景 |
|-----|------|--------|----------|
| 10ms | 极高 | 低 | 不建议 |
| 200ms | 高 | 中 | ✓ 推荐(当前) |
| 1000ms | 高 | 中 | 可接受 |
| 5000ms | 最高 | 低 | 高吞吐但容易丢消息 |
**选择 200ms 的理由**:
- 不阻塞消费速度(消费速率 ~30K msg/s提交开销 <1%
- 故障时最多丢失 200ms 左右的消息(可接受)
- 平衡吞吐与可靠性
## 5. 监控与调试
### 5.1 消费者状态监控
```javascript
const stats = {
totalMessages: 0,
validMessages: 0,
invalidMessages: 0,
errorCount: 0,
lastUpdateTime: Date.now()
};
// 周期性报告(通过 Redis 或日志)
setInterval(() => {
const rate = stats.validMessages / ((Date.now() - stats.lastUpdateTime) / 1000);
logger.info(`Consumption stats:
Total: ${stats.totalMessages}
Valid: ${stats.validMessages} (${(stats.validMessages/stats.totalMessages*100).toFixed(2)}%)
Invalid: ${stats.invalidMessages}
Errors: ${stats.errorCount}
Rate: ${rate.toFixed(0)} msg/s
`);
stats.lastUpdateTime = Date.now();
}, 10000); // 每 10s 报告一次
```
### 5.2 Kafka 消息采样
用于诊断消息结构和验证规范吻合度:
```bash
npm run sample:kafka
```
## 6. 性能调优指南
### 6.1 动态参数调整
```bash
# 环境变量控制
export KAFKA_BROKER_HOSTS=kafka.blv-oa.com:9092
export KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic
export KAFKA_CONSUMER_INSTANCES=3 # 基础实例数(自动伸缩到分区数)
export KAFKA_BATCH_SIZE=100000 # 批量拉取大小
export KAFKA_FETCH_MIN_BYTES=65536 # 最小字节数
export KAFKA_COMMIT_INTERVAL_MS=200 # 提交间隔
```
---
**上次修订**: 2026-03-11