Files
Web_BLS_OldRcu_Heartbeat_Se…/bls-oldrcu-heartbeat-backend/spec/kafka.md
XuJiacheng e45d14b720 feat: 实现心跳消息处理模块
- 新增 HeartbeatBuffer 类,用于收集和去重 Kafka 心跳消息,并定期将数据刷新到数据库。
- 新增 HeartbeatDbManager 类,负责与 PostgreSQL 数据库的交互,支持批量 upsert 操作。
- 新增配置文件 config.js,支持从环境变量加载配置。
- 新增 Kafka 消费者模块,支持从 Kafka 中消费心跳消息。
- 新增 Redis 集成模块,支持将日志和心跳信息推送到 Redis。
- 新增心跳消息解析器,负责解析 Kafka 消息并提取心跳字段。
- 新增日志记录工具,支持不同级别的日志输出。
- 新增指标收集器,跟踪 Kafka 消息处理和数据库操作的指标。
- 新增单元测试,覆盖 HeartbeatBuffer 和 HeartbeatDbManager 的主要功能。
- 新增数据库表结构 SQL 文件,定义 room_status_moment_g5 表的结构。
- 配置 Vite 构建工具,支持 Node.js 环境的构建。
2026-03-12 14:11:02 +08:00

6.6 KiB
Raw Blame History

Kafka 处理规范 (Kafka Specification)

1. Kafka 集群配置

1.1 基本信息

配置项 说明
集群地址 kafka.blv-oa.com:9092 生产 Kafka broker
主题 blwlog4Nodejs-oldrcu-heartbeat-topic 心跳事件主题
分区数 6 (auto-detected) 运行时动态检测
消费者组 (TBD) 建议配置消费者组 ID
协议版本 v5 (kafka-node) Kafka Node 库版本

1.2 消费者连接参数

// src/config/config.js
const kafkaConfig = {
  brokers: process.env.KAFKA_BROKERS || 'kafka.blv-oa.com:9092',
  topic: process.env.KAFKA_TOPIC_HEARTBEAT || 'blwlog4Nodejs-oldrcu-heartbeat-topic',
  
  // 消费实例数
  consumerInstances: parseInt(process.env.KAFKA_CONSUMER_INSTANCES || '3', 10),
  
  // 性能优化
  batchSize: parseInt(process.env.KAFKA_BATCH_SIZE || '100000', 10),
  fetchMinBytes: parseInt(process.env.KAFKA_FETCH_MIN_BYTES || '65536', 10),
  commitIntervalMs: parseInt(process.env.KAFKA_COMMIT_INTERVAL_MS || '200', 10)
};

2. 消费者创建与配置

2.1 分区感知的动态伸缩

启动时自动查询 Kafka 元数据,根据实际分区数扩展消费者:

// src/kafka/consumer.js

async function resolveTopicPartitionCount(kafkaConfig) {
  const client = new kafka.KafkaClient({
    kafkaHost: kafkaConfig.brokers
  });

  return new Promise((resolve, reject) => {
    client.loadMetadataForTopics([kafkaConfig.topic], (err, metadata) => {
      if (err) return reject(err);

      const topicMetadata = metadata[0];
      const partitionCount = topicMetadata.partitions.length;
      
      logger.info(`Topic "${kafkaConfig.topic}" has ${partitionCount} partitions`);
      
      client.close();
      resolve(partitionCount);
    });
  });
}

async function createKafkaConsumers(kafkaConfig) {
  const configuredInstances = kafkaConfig.consumerInstances;
  const partitionCount = await resolveTopicPartitionCount(kafkaConfig);
  
  // 关键:伸缩到 max(配置, 分区数)
  const instanceCount = Math.max(configuredInstances, partitionCount);
  
  logger.info(`Kafka consumer scaling: ${configuredInstances} configured, ${partitionCount} partitions, creating ${instanceCount} instances`);

  const consumers = [];
  for (let i = 0; i < instanceCount; i++) {
    const consumer = createOneConsumer(i, kafkaConfig);
    consumers.push(consumer);
  }

  return consumers;
}

3. 消息消费流程

3.1 消息处理时序

Kafka Broker (Topic: blwlog4Nodejs-oldrcu-heartbeat-topic)
           ↓
6 个消费者实例 (Consumer 0-5)
           ↓
┌─────────────────────────────────────┐
│ Consumer 0 (Partition 0)              │
├─────────────────────────────────────┤
│ on('message'): handle message        │
│   ├─ parseHeartbeat(message.value)  │
│   ├─ if valid: buffer.add(parsed)   │
│   └─ if invalid: stats.invalidCount++│
│                                       │
│ 200ms 周期性提交偏移量               │
│   └─ consumer.commit(false, cb)     │
└─────────────────────────────────────┘

3.2 失败恢复策略

消息处理失败

try {
  const parsed = parseHeartbeat(message.value);
  if (parsed !== null) {
    heartbeatBuffer.add(parsed);
    stats.validMessages++;
  } else {
    stats.invalidMessages++;
    // 注意:即使验证失败,偏移量仍会提交
    // 垃圾消息被永久丢弃(不重试)
  }
} catch (err) {
  logger.error(`Unexpected error processing message: ${err.message}`);
  stats.errorCount++;
  // 错误消息偏移量也会被提交,避免无限重试
}

连接中断

// kafka-node 内置重连机制
consumer.on('error', (err) => {
  logger.error(`Consumer error: ${err.message}`, err);
  
  // kafka-node 自动尝试重新连接
  // 如需手动控制,可添加重连逻辑
  if (err.name === 'FailedToRegistMetadata') {
    setTimeout(() => {
      logger.info('Attempting to reconnect to Kafka');
      // consumer.connect();
    }, 5000);
  }
});

4. 偏移量管理

4.1 手动批量提交策略

// 配置:关闭自动提交
const consumerConfig = {
  autoCommit: false,  // 手动管理偏移量
  // ...
};

// 实现200ms 周期性批量提交
consumer.on('message', (message) => {
  handleMessage(message);

  // 周期性检查
  const now = Date.now();
  if (now - lastCommitTime >= 200) {
    // 非阻塞提交false 参数)
    consumer.commit(false, (err) => {
      if (err) {
        logger.warn(`Offset commit failed: ${err.message}`);
      } else {
        lastCommitTime = now;
        logger.debug('Offsets committed');
      }
    });
  }
});

4.2 提交间隔的权衡

间隔 吞吐 可靠性 使用场景
10ms 极高 不建议
200ms ✓ 推荐(当前)
1000ms 可接受
5000ms 最高 高吞吐但容易丢消息

选择 200ms 的理由:

  • 不阻塞消费速度(消费速率 ~30K msg/s提交开销 <1%
  • 故障时最多丢失 200ms 左右的消息(可接受)
  • 平衡吞吐与可靠性

5. 监控与调试

5.1 消费者状态监控

const stats = {
  totalMessages: 0,
  validMessages: 0,
  invalidMessages: 0,
  errorCount: 0,
  lastUpdateTime: Date.now()
};

// 周期性报告(通过 Redis 或日志)
setInterval(() => {
  const rate = stats.validMessages / ((Date.now() - stats.lastUpdateTime) / 1000);
  logger.info(`Consumption stats:
    Total: ${stats.totalMessages}
    Valid: ${stats.validMessages} (${(stats.validMessages/stats.totalMessages*100).toFixed(2)}%)
    Invalid: ${stats.invalidMessages}
    Errors: ${stats.errorCount}
    Rate: ${rate.toFixed(0)} msg/s
  `);
  
  stats.lastUpdateTime = Date.now();
}, 10000);  // 每 10s 报告一次

5.2 Kafka 消息采样

用于诊断消息结构和验证规范吻合度:

npm run sample:kafka

6. 性能调优指南

6.1 动态参数调整

# 环境变量控制
export KAFKA_BROKER_HOSTS=kafka.blv-oa.com:9092
export KAFKA_TOPIC_HEARTBEAT=blwlog4Nodejs-oldrcu-heartbeat-topic
export KAFKA_CONSUMER_INSTANCES=3        # 基础实例数(自动伸缩到分区数)
export KAFKA_BATCH_SIZE=100000           # 批量拉取大小
export KAFKA_FETCH_MIN_BYTES=65536       # 最小字节数
export KAFKA_COMMIT_INTERVAL_MS=200      # 提交间隔

上次修订: 2026-03-11