Files
Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/kafka/consumer.js

76 lines
2.2 KiB
JavaScript
Raw Normal View History

import kafka from 'kafka-node';
import { logger } from '../utils/logger.js';
const { ConsumerGroup } = kafka;
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
const kafkaHost = kafkaConfig.brokers.join(',');
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
let inFlight = 0;
const consumer = new ConsumerGroup(
{
kafkaHost,
groupId: kafkaConfig.groupId,
clientId,
id,
fromOffset: 'earliest',
protocol: ['roundrobin'],
outOfRangeOffset: 'latest',
autoCommit: true,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
sasl: kafkaConfig.sasl
},
kafkaConfig.topic
);
const tryResume = () => {
if (inFlight < maxInFlight) {
consumer.resume();
}
};
consumer.on('message', (message) => {
inFlight += 1;
if (inFlight >= maxInFlight) {
consumer.pause();
}
Promise.resolve(onMessage(message))
.catch((error) => {
logger.error('Kafka message handling failed', { error: error?.message });
if (onError) {
onError(error, message);
}
})
.finally(() => {
inFlight -= 1;
tryResume();
});
});
consumer.on('error', (error) => {
logger.error('Kafka consumer error', { error: error?.message });
if (onError) {
onError(error);
}
});
return consumer;
};
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from({ length: count }, (_, idx) =>
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
);
};
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) =>
createKafkaConsumers({ kafkaConfig, onMessage, onError })[0];