import kafka from 'kafka-node'; import { logger } from '../utils/logger.js'; const { ConsumerGroup } = kafka; const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex, healthCheck }) => { const kafkaHost = kafkaConfig.brokers.join(','); const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; const id = `${clientId}-${process.pid}-${Date.now()}`; const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50; let inFlight = 0; let isPausedForHealth = false; const consumer = new ConsumerGroup( { kafkaHost, groupId: kafkaConfig.groupId, clientId, id, fromOffset: 'earliest', protocol: ['roundrobin'], outOfRangeOffset: 'latest', autoCommit: false, autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs, fetchMaxBytes: kafkaConfig.fetchMaxBytes, fetchMinBytes: kafkaConfig.fetchMinBytes, fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs, sasl: kafkaConfig.sasl }, kafkaConfig.topic ); const tryResume = () => { if (!isPausedForHealth && inFlight < maxInFlight) { consumer.resume(); } }; consumer.on('message', (message) => { inFlight += 1; if (inFlight >= maxInFlight) { consumer.pause(); } return Promise.resolve(onMessage(message)) .then(() => { consumer.commit((err) => { if (err) { logger.error('Kafka commit failed', { error: err.message }); } }); }) .catch(async (error) => { logger.error('Kafka message handling failed', { error: error?.message }); let shouldCommit = true; if (!isPausedForHealth && healthCheck && await healthCheck.shouldPause(error)) { shouldCommit = false; isPausedForHealth = true; consumer.pause(); logger.warn('Pausing consumer due to dependency failure. Entering recovery mode...'); const checkInterval = setInterval(async () => { try { const isHealthy = await healthCheck.check(); if (isHealthy) { clearInterval(checkInterval); isPausedForHealth = false; consumer.resume(); logger.info('Dependency recovered. Resuming consumer.'); } } catch (err) { logger.error('Health check failed', { error: err.message }); } }, 60000); } if (shouldCommit) { consumer.commit((err) => { if (err) { logger.error('Kafka commit failed (error case)', { error: err.message }); } }); } if (onError) { onError(error, message); } }) .finally(() => { inFlight -= 1; tryResume(); }); }); consumer.on('error', (error) => { logger.error('Kafka consumer error', { error: error?.message }); if (onError) { onError(error); } }); return consumer; }; export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError, healthCheck }) => { const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1; const count = Math.max(1, instances); return Array.from({ length: count }, (_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx, healthCheck }) ); }; export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError, healthCheck }) => createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck })[0];