import kafka from 'kafka-node'; import config from '../config/config.js'; import logger from '../utils/logger.js'; import { OffsetTracker } from './offsetTracker.js'; const { ConsumerGroup } = kafka; class KafkaConsumer { constructor() { this.consumer = null; this.tracker = new OffsetTracker(); this.pendingCommits = new Map(); this.commitTimer = null; this.inFlight = 0; this.maxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5000; this.commitIntervalMs = Number.isFinite(config.kafka.commitIntervalMs) ? config.kafka.commitIntervalMs : 200; } init() { const kafkaConfig = { kafkaHost: config.kafka.brokers, clientId: config.kafka.clientId, groupId: config.kafka.groupId, fromOffset: config.kafka.fromOffset, protocol: ['roundrobin'], outOfRangeOffset: 'latest', autoCommit: config.kafka.autoCommit, autoCommitIntervalMs: config.kafka.autoCommitIntervalMs, fetchMaxBytes: config.kafka.fetchMaxBytes, fetchMaxWaitMs: config.kafka.fetchMaxWaitMs, fetchMinBytes: config.kafka.fetchMinBytes, sasl: config.kafka.saslEnabled ? { mechanism: config.kafka.saslMechanism, username: config.kafka.saslUsername, password: config.kafka.saslPassword } : undefined, ssl: config.kafka.sslEnabled, connectTimeout: 10000, requestTimeout: 10000 }; logger.info('Initializing Kafka consumer with config:', { kafkaHost: config.kafka.brokers, clientId: config.kafka.clientId, groupId: config.kafka.groupId, topics: config.kafka.topics, fromOffset: config.kafka.fromOffset, saslEnabled: config.kafka.saslEnabled }); const topics = config.kafka.topics.split(',').map(topic => topic.trim()).filter(Boolean); this.consumer = new ConsumerGroup(kafkaConfig, topics); this.consumer.on('connect', () => { logger.info('Kafka consumer connected', { groupId: config.kafka.groupId, topics }); }); this.consumer.on('rebalancing', () => { logger.info('Kafka consumer rebalancing'); this.tracker.clear(); this.pendingCommits.clear(); if (this.commitTimer) { clearTimeout(this.commitTimer); this.commitTimer = null; } }); this.consumer.on('rebalanced', () => { logger.info('Kafka consumer rebalanced'); }); this.consumer.on('message', (message) => { logger.debug('Received Kafka message:', { messageId: message.offset }); this.inFlight += 1; this.tracker.add(message.topic, message.partition, message.offset); if (this.inFlight >= this.maxInFlight && this.consumer.pause) { this.consumer.pause(); } Promise.resolve(this.onMessage(message)) .then(() => { if (!config.kafka.autoCommit) { const commitOffset = this.tracker.markDone(message.topic, message.partition, message.offset); if (commitOffset !== null) { const key = `${message.topic}-${message.partition}`; this.pendingCommits.set(key, { topic: message.topic, partition: message.partition, offset: commitOffset, metadata: 'm' }); this.scheduleCommitFlush(); } } }) .catch((err) => { logger.error('Kafka message handling failed, skip commit', { error: err.message, topic: message.topic, partition: message.partition, offset: message.offset }); }) .finally(() => { this.inFlight -= 1; if (this.inFlight < this.maxInFlight && this.consumer.resume) { this.consumer.resume(); } }); }); this.consumer.on('error', (err) => { logger.error('Kafka consumer error:', { error: err.message, stack: err.stack }); }); this.consumer.on('offsetOutOfRange', (topic) => { logger.warn('Kafka offset out of range:', { topic: topic.topic, partition: topic.partition }); }); logger.info('Kafka consumer initialized'); this.consumer.on('close', () => { logger.info('Kafka consumer closed'); }); } onMessage(message) { // 子类实现 } scheduleCommitFlush() { if (this.commitTimer) return; this.commitTimer = setTimeout(() => { this.commitTimer = null; this.flushCommits(); }, this.commitIntervalMs); } flushCommits() { if (!this.consumer || this.pendingCommits.size === 0) return; const batch = this.pendingCommits; this.pendingCommits = new Map(); this.consumer.sendOffsetCommitRequest(Array.from(batch.values()), (err) => { if (err) { for (const [k, v] of batch.entries()) { this.pendingCommits.set(k, v); } logger.error('Failed to commit Kafka offsets', { error: err.message, groupId: config.kafka.groupId, count: batch.size }); return; } logger.info('Kafka offsets committed', { groupId: config.kafka.groupId, count: batch.size, commits: Array.from(batch.values()) }); }); } close() { return new Promise((resolve) => { if (this.commitTimer) { clearTimeout(this.commitTimer); this.commitTimer = null; } this.flushCommits(); if (!this.consumer) { resolve(); return; } this.consumer.close(true, () => { logger.info('Kafka consumer closed'); resolve(); }); }); } } export default KafkaConsumer;