241 lines
6.6 KiB
Markdown
241 lines
6.6 KiB
Markdown
|
|
# Kafka 处理规范 (Kafka Specification)
|
|||
|
|
|
|||
|
|
## 1. Kafka 集群配置
|
|||
|
|
|
|||
|
|
### 1.1 基本信息
|
|||
|
|
|
|||
|
|
| 配置项 | 值 | 说明 |
|
|||
|
|
|--------|-----|------|
|
|||
|
|
| 集群地址 | kafka.blv-oa.com:9092 | 生产 Kafka broker |
|
|||
|
|
| 主题 | blwlog4Nodejs-oldrcu-heartbeat-topic | 心跳事件主题 |
|
|||
|
|
| 分区数 | 6 (auto-detected) | 运行时动态检测 |
|
|||
|
|
| 消费者组 | (TBD) | 建议配置消费者组 ID |
|
|||
|
|
| 协议版本 | v5 (kafka-node) | Kafka Node 库版本 |
|
|||
|
|
|
|||
|
|
### 1.2 消费者连接参数
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// src/config/config.js
|
|||
|
|
const kafkaConfig = {
|
|||
|
|
brokers: process.env.KAFKA_BROKERS || 'kafka.blv-oa.com:9092',
|
|||
|
|
topic: process.env.KAFKA_TOPIC_HEARTBEAT || 'blwlog4Nodejs-oldrcu-heartbeat-topic',
|
|||
|
|
|
|||
|
|
// 消费实例数
|
|||
|
|
consumerInstances: parseInt(process.env.KAFKA_CONSUMER_INSTANCES || '3', 10),
|
|||
|
|
|
|||
|
|
// 性能优化
|
|||
|
|
batchSize: parseInt(process.env.KAFKA_BATCH_SIZE || '100000', 10),
|
|||
|
|
fetchMinBytes: parseInt(process.env.KAFKA_FETCH_MIN_BYTES || '65536', 10),
|
|||
|
|
commitIntervalMs: parseInt(process.env.KAFKA_COMMIT_INTERVAL_MS || '200', 10)
|
|||
|
|
};
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 2. 消费者创建与配置
|
|||
|
|
|
|||
|
|
### 2.1 分区感知的动态伸缩
|
|||
|
|
|
|||
|
|
启动时自动查询 Kafka 元数据,根据实际分区数扩展消费者:
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// src/kafka/consumer.js
|
|||
|
|
|
|||
|
|
async function resolveTopicPartitionCount(kafkaConfig) {
|
|||
|
|
const client = new kafka.KafkaClient({
|
|||
|
|
kafkaHost: kafkaConfig.brokers
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
return new Promise((resolve, reject) => {
|
|||
|
|
client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
|
|||
|
|
if (err) return reject(err);
|
|||
|
|
|
|||
|
|
const topicMetadata = metadata[0];
|
|||
|
|
const partitionCount = topicMetadata.partitions.length;
|
|||
|
|
|
|||
|
|
logger.info(`Topic "${kafkaConfig.topic}" has ${partitionCount} partitions`);
|
|||
|
|
|
|||
|
|
client.close();
|
|||
|
|
resolve(partitionCount);
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async function createKafkaConsumers(kafkaConfig) {
|
|||
|
|
const configuredInstances = kafkaConfig.consumerInstances;
|
|||
|
|
const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
|
|||
|
|
|
|||
|
|
// 关键:伸缩到 max(配置, 分区数)
|
|||
|
|
const instanceCount = Math.max(configuredInstances, partitionCount);
|
|||
|
|
|
|||
|
|
logger.info(`Kafka consumer scaling: ${configuredInstances} configured, ${partitionCount} partitions, creating ${instanceCount} instances`);
|
|||
|
|
|
|||
|
|
const consumers = [];
|
|||
|
|
for (let i = 0; i < instanceCount; i++) {
|
|||
|
|
const consumer = createOneConsumer(i, kafkaConfig);
|
|||
|
|
consumers.push(consumer);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return consumers;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 3. 消息消费流程
|
|||
|
|
|
|||
|
|
### 3.1 消息处理时序
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
Kafka Broker (Topic: blwlog4Nodejs-oldrcu-heartbeat-topic)
|
|||
|
|
↓
|
|||
|
|
6 个消费者实例 (Consumer 0-5)
|
|||
|
|
↓
|
|||
|
|
┌─────────────────────────────────────┐
|
|||
|
|
│ Consumer 0 (Partition 0) │
|
|||
|
|
├─────────────────────────────────────┤
|
|||
|
|
│ on('message'): handle message │
|
|||
|
|
│ ├─ parseHeartbeat(message.value) │
|
|||
|
|
│ ├─ if valid: buffer.add(parsed) │
|
|||
|
|
│ └─ if invalid: stats.invalidCount++│
|
|||
|
|
│ │
|
|||
|
|
│ 200ms 周期性提交偏移量 │
|
|||
|
|
│ └─ consumer.commit(false, cb) │
|
|||
|
|
└─────────────────────────────────────┘
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.2 失败恢复策略
|
|||
|
|
|
|||
|
|
#### 消息处理失败
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
try {
|
|||
|
|
const parsed = parseHeartbeat(message.value);
|
|||
|
|
if (parsed !== null) {
|
|||
|
|
heartbeatBuffer.add(parsed);
|
|||
|
|
stats.validMessages++;
|
|||
|
|
} else {
|
|||
|
|
stats.invalidMessages++;
|
|||
|
|
// 注意:即使验证失败,偏移量仍会提交
|
|||
|
|
// 垃圾消息被永久丢弃(不重试)
|
|||
|
|
}
|
|||
|
|
} catch (err) {
|
|||
|
|
logger.error(`Unexpected error processing message: ${err.message}`);
|
|||
|
|
stats.errorCount++;
|
|||
|
|
// 错误消息偏移量也会被提交,避免无限重试
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 连接中断
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// kafka-node 内置重连机制
|
|||
|
|
consumer.on('error', (err) => {
|
|||
|
|
logger.error(`Consumer error: ${err.message}`, err);
|
|||
|
|
|
|||
|
|
// kafka-node 自动尝试重新连接
|
|||
|
|
// 如需手动控制,可添加重连逻辑
|
|||
|
|
if (err.name === 'FailedToRegistMetadata') {
|
|||
|
|
setTimeout(() => {
|
|||
|
|
logger.info('Attempting to reconnect to Kafka');
|
|||
|
|
// consumer.connect();
|
|||
|
|
}, 5000);
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 4. 偏移量管理
|
|||
|
|
|
|||
|
|
### 4.1 手动批量提交策略
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// 配置:关闭自动提交
|
|||
|
|
const consumerConfig = {
|
|||
|
|
autoCommit: false, // 手动管理偏移量
|
|||
|
|
// ...
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 实现:200ms 周期性批量提交
|
|||
|
|
consumer.on('message', (message) => {
|
|||
|
|
handleMessage(message);
|
|||
|
|
|
|||
|
|
// 周期性检查
|
|||
|
|
const now = Date.now();
|
|||
|
|
if (now - lastCommitTime >= 200) {
|
|||
|
|
// 非阻塞提交(false 参数)
|
|||
|
|
consumer.commit(false, (err) => {
|
|||
|
|
if (err) {
|
|||
|
|
logger.warn(`Offset commit failed: ${err.message}`);
|
|||
|
|
} else {
|
|||
|
|
lastCommitTime = now;
|
|||
|
|
logger.debug('Offsets committed');
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.2 提交间隔的权衡
|
|||
|
|
|
|||
|
|
| 间隔 | 吞吐 | 可靠性 | 使用场景 |
|
|||
|
|
|-----|------|--------|----------|
|
|||
|
|
| 10ms | 极高 | 低 | 不建议 |
|
|||
|
|
| 200ms | 高 | 中 | ✓ 推荐(当前) |
|
|||
|
|
| 1000ms | 高 | 中 | 可接受 |
|
|||
|
|
| 5000ms | 最高 | 低 | 高吞吐但容易丢消息 |
|
|||
|
|
|
|||
|
|
**选择 200ms 的理由**:
|
|||
|
|
- 不阻塞消费速度(消费速率 ~30K msg/s,提交开销 <1%)
|
|||
|
|
- 故障时最多丢失 200ms 左右的消息(可接受)
|
|||
|
|
- 平衡吞吐与可靠性
|
|||
|
|
|
|||
|
|
## 5. 监控与调试
|
|||
|
|
|
|||
|
|
### 5.1 消费者状态监控
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
const stats = {
|
|||
|
|
totalMessages: 0,
|
|||
|
|
validMessages: 0,
|
|||
|
|
invalidMessages: 0,
|
|||
|
|
errorCount: 0,
|
|||
|
|
lastUpdateTime: Date.now()
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 周期性报告(通过 Redis 或日志)
|
|||
|
|
setInterval(() => {
|
|||
|
|
const rate = stats.validMessages / ((Date.now() - stats.lastUpdateTime) / 1000);
|
|||
|
|
logger.info(`Consumption stats:
|
|||
|
|
Total: ${stats.totalMessages}
|
|||
|
|
Valid: ${stats.validMessages} (${(stats.validMessages/stats.totalMessages*100).toFixed(2)}%)
|
|||
|
|
Invalid: ${stats.invalidMessages}
|
|||
|
|
Errors: ${stats.errorCount}
|
|||
|
|
Rate: ${rate.toFixed(0)} msg/s
|
|||
|
|
`);
|
|||
|
|
|
|||
|
|
stats.lastUpdateTime = Date.now();
|
|||
|
|
}, 10000); // 每 10s 报告一次
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 5.2 Kafka 消息采样
|
|||
|
|
|
|||
|
|
用于诊断消息结构和验证规范吻合度:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
npm run sample:kafka
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 6. 性能调优指南
|
|||
|
|
|
|||
|
|
### 6.1 动态参数调整
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 环境变量控制
|
|||
|
|
export KAFKA_BROKER_HOSTS=kafka.blv-oa.com:9092
|
|||
|
|
export KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic
|
|||
|
|
export KAFKA_CONSUMER_INSTANCES=3 # 基础实例数(自动伸缩到分区数)
|
|||
|
|
export KAFKA_BATCH_SIZE=100000 # 批量拉取大小
|
|||
|
|
export KAFKA_FETCH_MIN_BYTES=65536 # 最小字节数
|
|||
|
|
export KAFKA_COMMIT_INTERVAL_MS=200 # 提交间隔
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
**上次修订**: 2026-03-11
|