feat: 实现Kafka批量消费与写入以提升吞吐量

引入批量处理机制,将消息缓冲并按批次写入数据库,显著提高消费性能。调整Kafka配置参数,优化消费者并发与提交策略。新增分区索引自动创建功能,并重构处理器以支持批量操作。添加降级写入逻辑以处理数据错误,同时增强指标收集以监控批量处理效果。
This commit is contained in:
2026-02-09 10:50:56 +08:00
parent a8c7cf74e6
commit 8337c60f98
17 changed files with 1165 additions and 330 deletions

View File

@@ -22,11 +22,15 @@ export const config = {
groupId: process.env.KAFKA_GROUP_ID || 'bls-onoffline-group',
clientId: process.env.KAFKA_CLIENT_ID || 'bls-onoffline-client',
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 20000),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5000),
commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200),
commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === 'true',
batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5000),
batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50),
logMessages: process.env.KAFKA_LOG_MESSAGES === 'true',
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain',

View File

@@ -35,22 +35,27 @@ export class DatabaseManager {
if (!rows || rows.length === 0) {
return;
}
const values = [];
const placeholders = rows.map((row, rowIndex) => {
const offset = rowIndex * columns.length;
columns.forEach((column) => {
values.push(row[column] ?? null);
});
const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
return `(${params.join(', ')})`;
});
const statement = `
INSERT INTO ${schema}.${table} (${columns.join(', ')})
VALUES ${placeholders.join(', ')}
SELECT *
FROM UNNEST(
$1::text[],
$2::int8[],
$3::int8[],
$4::int2[],
$5::text[],
$6::text[],
$7::text[],
$8::text[],
$9::text[],
$10::text[],
$11::text[]
)
ON CONFLICT DO NOTHING
`;
try {
await this.pool.query(statement, values);
const params = columns.map((column) => rows.map((row) => row[column] ?? null));
await this.pool.query(statement, params);
} catch (error) {
logger.error('Database insert failed', {
error: error?.message,

View File

@@ -23,6 +23,7 @@ class DatabaseInitializer {
// 3. Ensure Partitions for the next month
await partitionManager.ensurePartitions(30);
await partitionManager.ensureIndexesForExistingPartitions();
console.log('Database initialization completed successfully.');
logger.info('Database initialization completed successfully.');

View File

@@ -26,6 +26,75 @@ class PartitionManager {
return { startMs, endMs, partitionSuffix };
}
async ensurePartitionIndexes(client, schema, table, partitionSuffix) {
const startedAt = Date.now();
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const indexBase = `${table}_${partitionSuffix}`;
const indexSpecs = [
{ name: `idx_${indexBase}_ts`, column: 'ts_ms' },
{ name: `idx_${indexBase}_hid`, column: 'hotel_id' },
{ name: `idx_${indexBase}_mac`, column: 'mac' },
{ name: `idx_${indexBase}_did`, column: 'device_id' },
{ name: `idx_${indexBase}_rid`, column: 'room_id' },
{ name: `idx_${indexBase}_cs`, column: 'current_status' }
];
for (const spec of indexSpecs) {
await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`);
}
await client.query(`ANALYZE ${partitionName};`);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 1000) {
logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs });
}
}
async ensureIndexesForExistingPartitions() {
const startedAt = Date.now();
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
const res = await client.query(
`
SELECT c.relname AS relname
FROM pg_inherits i
JOIN pg_class p ON i.inhparent = p.oid
JOIN pg_namespace pn ON pn.oid = p.relnamespace
JOIN pg_class c ON i.inhrelid = c.oid
WHERE pn.nspname = $1 AND p.relname = $2
ORDER BY c.relname;
`,
[schema, table]
);
const suffixes = new Set();
const pattern = new RegExp(`^${table}_(\\d{8})$`);
for (const row of res.rows) {
const relname = row?.relname;
if (typeof relname !== 'string') continue;
const match = relname.match(pattern);
if (!match) continue;
suffixes.add(match[1]);
}
for (const suffix of suffixes) {
await this.ensurePartitionIndexes(client, schema, table, suffix);
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 5000) {
logger.warn('Ensure existing partition indexes slow', { schema, table, partitions: suffixes.size, elapsedMs });
}
} finally {
client.release();
}
}
/**
* Ensure partitions exist for the past M days and next N days.
* @param {number} daysAhead - Number of days to pre-create.
@@ -63,6 +132,8 @@ class PartitionManager {
`;
await client.query(createSql);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
logger.info('Partition check completed.');
} catch (err) {
@@ -72,6 +143,55 @@ class PartitionManager {
client.release();
}
}
async ensurePartitionsForTimestamps(tsMsList) {
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
const uniqueSuffixes = new Set();
for (const ts of tsMsList) {
const numericTs = typeof ts === 'string' ? Number(ts) : ts;
if (!Number.isFinite(numericTs)) continue;
const date = new Date(numericTs);
if (Number.isNaN(date.getTime())) continue;
const { partitionSuffix } = this.getPartitionInfo(date);
uniqueSuffixes.add(partitionSuffix);
if (uniqueSuffixes.size >= 400) break;
}
if (uniqueSuffixes.size === 0) return;
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
for (const partitionSuffix of uniqueSuffixes) {
const yyyy = Number(partitionSuffix.slice(0, 4));
const mm = Number(partitionSuffix.slice(4, 6));
const dd = Number(partitionSuffix.slice(6, 8));
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
const targetDate = new Date(yyyy, mm - 1, dd);
if (Number.isNaN(targetDate.getTime())) continue;
const { startMs, endMs } = this.getPartitionInfo(targetDate);
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
await client.query(`
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs});
`);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
} finally {
client.release();
}
}
}
export default new PartitionManager();

View File

@@ -4,7 +4,7 @@ import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
import { parseMessageToRows } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js';
import { RedisIntegration } from './redis/redisIntegration.js';
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
@@ -76,7 +76,9 @@ const bootstrap = async () => {
// 1.1 Setup Metric Reporting Cron Job (Every minute)
cron.schedule('* * * * *', async () => {
const metrics = metricCollector.getAndReset();
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`;
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : '0.0';
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : '0.0';
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`;
console.log(report);
logger.info(report, metrics);
@@ -125,87 +127,318 @@ const bootstrap = async () => {
}
};
const handleMessage = async (message) => {
const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1000;
const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20;
const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5000;
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
const commitOnAttempt = config.kafka.commitOnAttempt === true;
const batchStates = new Map();
const partitionKeyFromMessage = (message) => {
if (message?.topic !== undefined && message?.partition !== undefined) {
return `${message.topic}-${message.partition}`;
}
return 'retry';
};
const dayKeyFromTsMs = (tsMs) => {
const numeric = typeof tsMs === 'string' ? Number(tsMs) : tsMs;
if (!Number.isFinite(numeric)) return null;
const d = new Date(numeric);
if (Number.isNaN(d.getTime())) return null;
const yyyy = d.getFullYear();
const mm = String(d.getMonth() + 1).padStart(2, '0');
const dd = String(d.getDate()).padStart(2, '0');
return `${yyyy}${mm}${dd}`;
};
const getBatchState = (key) => {
if (!batchStates.has(key)) {
batchStates.set(key, { items: [], timer: null, flushing: null });
}
return batchStates.get(key);
};
const isDbConnectionError = (err) => {
const code = err?.code;
if (typeof code === 'string') {
const networkCodes = new Set([
'ECONNREFUSED',
'ECONNRESET',
'EPIPE',
'ETIMEDOUT',
'ENOTFOUND',
'EHOSTUNREACH',
'ENETUNREACH',
'57P03',
'08006',
'08001',
'08000',
'08003'
]);
if (networkCodes.has(code)) return true;
}
const message = typeof err?.message === 'string' ? err.message : '';
if (!message) return false;
const lower = message.toLowerCase();
return (
lower.includes('connection timeout') ||
lower.includes('connection terminated') ||
lower.includes('connection refused') ||
lower.includes('terminating connection') ||
lower.includes('econnrefused') ||
lower.includes('econnreset') ||
lower.includes('etimedout') ||
lower.includes('could not connect') ||
lower.includes('the database system is starting up') ||
lower.includes('no pg_hba.conf entry')
);
};
const isMissingPartitionError = (err) =>
err?.code === '23514' ||
(typeof err?.message === 'string' && err.message.includes('no partition of relation'));
const insertRowsWithRetry = async (rows) => {
const startedAt = Date.now();
let attemptedPartitionFix = false;
while (true) {
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
return;
} catch (err) {
if (isDbConnectionError(err)) {
logger.error('Database offline during batch insert. Retrying in 5s...', { error: err.message });
await new Promise(r => setTimeout(r, 5000));
while (!(await dbManager.checkConnection())) {
logger.warn('Database still offline. Waiting 5s...');
await new Promise(r => setTimeout(r, 5000));
}
continue;
}
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
attemptedPartitionFix = true;
try {
await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms));
} catch (partitionErr) {
if (isDbConnectionError(partitionErr)) {
logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message });
await new Promise(r => setTimeout(r, 5000));
while (!(await dbManager.checkConnection())) {
logger.warn('Database still offline. Waiting 5s...');
await new Promise(r => setTimeout(r, 5000));
}
continue;
}
throw partitionErr;
}
continue;
}
throw err;
}
}
};
const insertRowsOnce = async (rows) => {
const startedAt = Date.now();
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
};
const resolveInsertedItems = (partitionKey, items) => {
let insertedRows = 0;
for (const p of items) {
insertedRows += p.rows.length;
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
if (dayKey) {
metricCollector.incrementKeyed('db_inserted_by_day', dayKey, p.rows.length);
}
p.item.resolve();
}
metricCollector.increment('db_inserted', insertedRows);
metricCollector.incrementKeyed('db_inserted_by_partition', partitionKey, insertedRows);
};
const handleFailedItem = async (partitionKey, p, err) => {
metricCollector.increment('db_failed');
metricCollector.incrementKeyed('db_failed_by_partition', partitionKey, 1);
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
if (dayKey) {
metricCollector.incrementKeyed('db_failed_by_day', dayKey, 1);
}
await handleError(err, p.item.message);
p.item.resolve();
};
const insertItemsDegraded = async (partitionKey, items) => {
if (items.length === 0) return;
const rows = items.flatMap(p => p.rows);
if (commitOnAttempt) {
try {
await insertRowsOnce(rows);
resolveInsertedItems(partitionKey, items);
} catch (err) {
for (const item of items) {
await handleFailedItem(partitionKey, item, err);
}
}
return;
}
try {
await insertRowsWithRetry(rows);
resolveInsertedItems(partitionKey, items);
return;
} catch (err) {
if (items.length === 1) {
try {
await insertRowsWithRetry(items[0].rows);
resolveInsertedItems(partitionKey, items);
} catch (innerErr) {
await handleFailedItem(partitionKey, items[0], innerErr);
}
return;
}
const mid = Math.floor(items.length / 2);
await insertItemsDegraded(partitionKey, items.slice(0, mid));
await insertItemsDegraded(partitionKey, items.slice(mid));
}
};
const flushBatchForKey = async (partitionKey) => {
const state = getBatchState(partitionKey);
if (state.flushing) return state.flushing;
state.flushing = (async () => {
if (state.timer) {
clearTimeout(state.timer);
state.timer = null;
}
if (state.items.length === 0) return;
const startedAt = Date.now();
const currentBatch = state.items;
state.items = [];
const pendingDbItems = [];
const unresolvedItems = [];
try {
for (const item of currentBatch) {
try {
const rows = parseMessageToRows(item.message);
pendingDbItems.push({ item, rows });
unresolvedItems.push(item);
} catch (err) {
metricCollector.increment('parse_error');
metricCollector.incrementKeyed('parse_error_by_partition', partitionKey, 1);
logger.error('Message processing failed (Parse/Validation)', { error: err.message });
await handleError(err, item.message);
item.resolve();
}
}
if (pendingDbItems.length > 0) {
const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms;
const dayKey = dayKeyFromTsMs(firstTs);
if (dayKey) {
const dayStartMs = Date.now();
await insertItemsDegraded(partitionKey, pendingDbItems);
metricCollector.incrementKeyed('db_insert_ms_sum_by_day', dayKey, Date.now() - dayStartMs);
} else {
await insertItemsDegraded(partitionKey, pendingDbItems);
}
}
metricCollector.increment('batch_flush_count', 1);
metricCollector.increment('batch_flush_ms_sum', Date.now() - startedAt);
} catch (err) {
if (!commitOnAttempt && isDbConnectionError(err)) {
state.items = unresolvedItems.concat(state.items);
if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, 5000);
}
return;
}
logger.error('Batch flush failed (non-network). Marking as consumed', {
error: err?.message,
partitionKey,
batchSize: currentBatch.length
});
for (const item of unresolvedItems) {
try {
await handleError(err, item.message);
} catch {}
item.resolve();
}
}
})().finally(() => {
state.flushing = null;
if (state.items.length > 0) {
if (state.items.length >= BATCH_SIZE) {
flushBatchForKey(partitionKey);
} else if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, BATCH_TIMEOUT_MS);
}
}
});
return state.flushing;
};
const handleMessage = (message) => {
if (message.topic) {
metricCollector.increment('kafka_pulled');
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
}
const messageValue = Buffer.isBuffer(message.value)
? message.value.toString('utf8')
: message.value;
const messageKey = Buffer.isBuffer(message.key)
? message.key.toString('utf8')
: message.key;
// const messageValue = Buffer.isBuffer(message.value)
// ? message.value.toString('utf8')
// : message.value;
// const messageKey = Buffer.isBuffer(message.key)
// ? message.key.toString('utf8')
// : message.key;
const logDetails = {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
value: config.kafka.logMessages ? messageValue : undefined,
valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
};
// const logDetails = {
// topic: message.topic,
// partition: message.partition,
// offset: message.offset,
// key: messageKey,
// value: config.kafka.logMessages ? messageValue : undefined,
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
// };
// logger.info('Kafka message received', logDetails);
while (true) {
try {
const inserted = await processKafkaMessage({ message, dbManager, config });
metricCollector.increment('db_inserted');
// logger.info('Kafka message processed', { inserted });
return; // Success, allowing commit
} catch (error) {
// Identify DB connection errors
const isDbConnectionError =
(error.code && ['ECONNREFUSED', '57P03', '08006', '08001', 'EADDRINUSE', 'ETIMEDOUT'].includes(error.code)) ||
(error.message && (
error.message.includes('ECONNREFUSED') ||
error.message.includes('connection') ||
error.message.includes('terminated') ||
error.message.includes('EADDRINUSE') ||
error.message.includes('ETIMEDOUT') ||
error.message.includes('The server does not support SSL connections') // Possible if DB restarts without SSL
));
const partitionKey = partitionKeyFromMessage(message);
const state = getBatchState(partitionKey);
if (isDbConnectionError) {
logger.error('Database offline. Pausing consumption for 1 minute...', { error: error.message });
// metricCollector.increment('db_failed'); // Maybe not count as fail since we retry? User didn't specify.
// Wait 1 minute before checking
await new Promise(resolve => setTimeout(resolve, 60000));
// Check connection loop
while (true) {
const isConnected = await dbManager.checkConnection();
if (isConnected) {
logger.info('Database connection restored. Resuming processing...');
break; // Break check loop to retry processing
}
logger.warn('Database still offline. Waiting 1 minute...');
await new Promise(resolve => setTimeout(resolve, 60000));
}
} else {
// Non-connection error (Data error, Parse error, etc.)
if (error.type === 'PARSE_ERROR') {
metricCollector.increment('parse_error');
} else {
metricCollector.increment('db_failed');
}
logger.error('Message processing failed (Data/Logic Error), skipping message', {
error: error?.message,
type: error?.type
});
// Enqueue to error queue
await handleError(error, message);
// For non-connection errors, we must skip this message and commit the offset
// so we don't get stuck in an infinite retry loop.
return;
}
return new Promise((resolve, reject) => {
state.items.push({ message, resolve, reject });
if (state.items.length >= BATCH_SIZE) {
flushBatchForKey(partitionKey);
} else if (!state.timer) {
state.timer = setTimeout(() => {
state.timer = null;
flushBatchForKey(partitionKey);
}, BATCH_TIMEOUT_MS);
}
}
});
};
const consumers = createKafkaConsumers({

View File

@@ -9,10 +9,39 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
const kafkaHost = kafkaConfig.brokers.join(',');
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5000;
const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200;
let inFlight = 0;
const tracker = new OffsetTracker();
let pendingCommits = new Map(); // key: `${topic}-${partition}` -> { topic, partition, offset }
let commitTimer = null;
const flushCommits = () => {
if (pendingCommits.size === 0) return;
const batch = pendingCommits;
pendingCommits = new Map();
consumer.sendOffsetCommitRequest(
Array.from(batch.values()),
(err) => {
if (err) {
for (const [k, v] of batch.entries()) {
pendingCommits.set(k, v);
}
logger.error('Kafka commit failed', { error: err?.message, count: batch.size });
}
}
);
};
const scheduleCommitFlush = () => {
if (commitTimer) return;
commitTimer = setTimeout(() => {
commitTimer = null;
flushCommits();
}, commitIntervalMs);
};
const consumer = new ConsumerGroup(
{
@@ -47,23 +76,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
consumer.pause();
}
Promise.resolve(onMessage(message))
.then(() => {
// Mark message as done and check if we can commit
const commitOffset = tracker.markDone(message.topic, message.partition, message.offset);
if (commitOffset !== null) {
consumer.sendOffsetCommitRequest([{
topic: message.topic,
partition: message.partition,
offset: commitOffset,
metadata: 'm'
}], (err) => {
if (err) {
logger.error('Kafka commit failed', { error: err?.message, topic: message.topic, partition: message.partition, offset: commitOffset });
}
});
}
})
.then(() => {})
.catch((error) => {
logger.error('Kafka message handling failed', { error: error?.message });
if (onError) {
@@ -71,6 +84,17 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
}
})
.finally(() => {
const commitOffset = tracker.markDone(message.topic, message.partition, message.offset);
if (commitOffset !== null) {
const key = `${message.topic}-${message.partition}`;
pendingCommits.set(key, {
topic: message.topic,
partition: message.partition,
offset: commitOffset,
metadata: 'm'
});
scheduleCommitFlush();
}
inFlight -= 1;
tryResume();
});
@@ -95,6 +119,12 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
groupId: kafkaConfig.groupId,
clientId: clientId
});
tracker.clear();
pendingCommits.clear();
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
});
consumer.on('rebalanced', () => {
@@ -119,6 +149,11 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
});
consumer.on('close', () => {
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
flushCommits();
logger.warn(`Kafka Consumer closed`, {
groupId: kafkaConfig.groupId,
clientId: clientId

View File

@@ -1,6 +1,6 @@
export class OffsetTracker {
constructor() {
// Map<topic-partition, Array<{ offset: number, done: boolean }>>
// Map<topic-partition, { nextCommitOffset: number|null, done: Set<number> }>
this.partitions = new Map();
}
@@ -8,38 +8,46 @@ export class OffsetTracker {
add(topic, partition, offset) {
const key = `${topic}-${partition}`;
if (!this.partitions.has(key)) {
this.partitions.set(key, []);
this.partitions.set(key, { nextCommitOffset: null, done: new Set() });
}
const state = this.partitions.get(key);
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return;
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
} else if (numericOffset < state.nextCommitOffset) {
state.nextCommitOffset = numericOffset;
}
this.partitions.get(key).push({ offset, done: false });
}
// Called when a message is successfully processed
// Returns the next offset to commit (if any advancement is possible), or null
markDone(topic, partition, offset) {
const key = `${topic}-${partition}`;
const list = this.partitions.get(key);
if (!list) return null;
const state = this.partitions.get(key);
if (!state) return null;
const item = list.find(i => i.offset === offset);
if (item) {
item.done = true;
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return null;
state.done.add(numericOffset);
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
}
// Find the highest continuous committed offset
// We can remove items from the front as long as they are done
let lastDoneOffset = null;
let itemsRemoved = false;
while (list.length > 0 && list[0].done) {
lastDoneOffset = list[0].offset;
list.shift();
itemsRemoved = true;
let advanced = false;
while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) {
state.done.delete(state.nextCommitOffset);
state.nextCommitOffset += 1;
advanced = true;
}
if (itemsRemoved && lastDoneOffset !== null) {
return lastDoneOffset + 1; // Kafka expects the *next* offset to fetch
}
return null;
if (!advanced) return null;
return state.nextCommitOffset;
}
clear() {
this.partitions.clear();
}
}

View File

@@ -74,37 +74,36 @@ export const buildRowsFromPayload = (rawPayload) => {
return [row];
};
export const parseMessageToRows = (message) => {
const rawValue = message.value.toString();
// logger.info('Processing message', { offset: message.offset, rawValuePreview: rawValue.substring(0, 100) });
let payload;
try {
payload = JSON.parse(rawValue);
} catch (e) {
const error = new Error(`JSON Parse Error: ${e.message}`);
error.type = 'PARSE_ERROR';
throw error;
}
// logger.info('Payload parsed', { payload });
const validationResult = kafkaPayloadSchema.safeParse(payload);
if (!validationResult.success) {
const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`);
error.type = 'VALIDATION_ERROR';
throw error;
}
return buildRowsFromPayload(payload);
};
export const processKafkaMessage = async ({ message, dbManager, config }) => {
let rows;
try {
const rawValue = message.value.toString();
// logger.info('Processing message', { offset: message.offset, rawValuePreview: rawValue.substring(0, 100) });
let payload;
try {
payload = JSON.parse(rawValue);
} catch (e) {
logger.error('JSON Parse Error', { error: e.message, rawValue });
const error = new Error(`JSON Parse Error: ${e.message}`);
error.type = 'PARSE_ERROR';
throw error;
}
// logger.info('Payload parsed', { payload });
const validationResult = kafkaPayloadSchema.safeParse(payload);
if (!validationResult.success) {
logger.error('Schema Validation Failed', {
errors: validationResult.error.errors,
payload
});
const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`);
error.type = 'VALIDATION_ERROR';
throw error;
}
rows = buildRowsFromPayload(payload);
rows = parseMessageToRows(message);
} catch (error) {
throw error;
}

View File

@@ -14,5 +14,8 @@ export const logger = {
},
error(message, context) {
process.stderr.write(`${format('error', message, context)}\n`);
},
warn(message, context) {
process.stderr.write(`${format('warn', message, context)}\n`);
}
};

View File

@@ -8,8 +8,13 @@ export class MetricCollector {
kafka_pulled: 0,
parse_error: 0,
db_inserted: 0,
db_failed: 0
db_failed: 0,
db_insert_count: 0,
db_insert_ms_sum: 0,
batch_flush_count: 0,
batch_flush_ms_sum: 0
};
this.keyed = {};
}
increment(metric, count = 1) {
@@ -18,9 +23,21 @@ export class MetricCollector {
}
}
incrementKeyed(metric, key, count = 1) {
if (!key) return;
if (!this.keyed[metric]) {
this.keyed[metric] = {};
}
if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) {
this.keyed[metric][key] = 0;
}
this.keyed[metric][key] += count;
}
getAndReset() {
const current = { ...this.metrics };
const keyed = JSON.parse(JSON.stringify(this.keyed));
this.reset();
return current;
return { ...current, keyed };
}
}