2026-02-04 17:51:50 +08:00
|
|
|
import cron from 'node-cron';
|
|
|
|
|
import { config } from './config/config.js';
|
|
|
|
|
import dbManager from './db/databaseManager.js';
|
2026-03-10 19:52:58 +08:00
|
|
|
import g5DbManager from './db/g5DatabaseManager.js';
|
2026-02-04 17:51:50 +08:00
|
|
|
import { createKafkaConsumers } from './kafka/consumer.js';
|
2026-02-09 10:50:56 +08:00
|
|
|
import { parseMessageToRows } from './processor/index.js';
|
2026-02-04 17:51:50 +08:00
|
|
|
import { createRedisClient } from './redis/redisClient.js';
|
|
|
|
|
import { RedisIntegration } from './redis/redisIntegration.js';
|
|
|
|
|
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
|
|
|
|
|
import { MetricCollector } from './utils/metricCollector.js';
|
|
|
|
|
import { logger } from './utils/logger.js';
|
|
|
|
|
|
|
|
|
|
const bootstrap = async () => {
|
|
|
|
|
// Log startup config (masked)
|
|
|
|
|
logger.info('Starting application with config', {
|
|
|
|
|
env: process.env.NODE_ENV,
|
|
|
|
|
db: {
|
|
|
|
|
host: config.db.host,
|
|
|
|
|
port: config.db.port,
|
|
|
|
|
user: config.db.user,
|
|
|
|
|
database: config.db.database,
|
|
|
|
|
schema: config.db.schema
|
|
|
|
|
},
|
2026-03-24 08:34:36 +08:00
|
|
|
g5db: {
|
|
|
|
|
enabled: config.g5db.enabled,
|
|
|
|
|
roomStatusMomentSyncEnabled: config.g5db.roomStatusMomentSyncEnabled
|
|
|
|
|
},
|
2026-02-04 17:51:50 +08:00
|
|
|
kafka: {
|
|
|
|
|
brokers: config.kafka.brokers,
|
|
|
|
|
topic: config.kafka.topic,
|
|
|
|
|
groupId: config.kafka.groupId
|
|
|
|
|
},
|
|
|
|
|
redis: {
|
|
|
|
|
host: config.redis.host,
|
|
|
|
|
port: config.redis.port
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Metric Collector
|
|
|
|
|
const metricCollector = new MetricCollector();
|
|
|
|
|
|
|
|
|
|
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
|
|
|
|
// Moved after redisIntegration initialization
|
|
|
|
|
|
|
|
|
|
const redisClient = await createRedisClient(config.redis);
|
|
|
|
|
const redisIntegration = new RedisIntegration(
|
|
|
|
|
redisClient,
|
|
|
|
|
config.redis.projectName,
|
|
|
|
|
config.redis.apiBaseUrl
|
|
|
|
|
);
|
|
|
|
|
redisIntegration.startHeartbeat();
|
|
|
|
|
|
|
|
|
|
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
|
|
|
|
cron.schedule('* * * * *', async () => {
|
|
|
|
|
const metrics = metricCollector.getAndReset();
|
2026-02-09 10:50:56 +08:00
|
|
|
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : '0.0';
|
|
|
|
|
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : '0.0';
|
2026-03-02 11:49:02 +08:00
|
|
|
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
|
2026-02-04 17:51:50 +08:00
|
|
|
console.log(report);
|
2026-03-02 11:49:02 +08:00
|
|
|
logger.info(report);
|
2026-03-10 19:52:58 +08:00
|
|
|
|
2026-02-04 17:51:50 +08:00
|
|
|
try {
|
|
|
|
|
await redisIntegration.info('Minute Metrics', metrics);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
logger.error('Failed to report metrics to Redis', { error: err?.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
|
|
|
|
|
|
|
|
|
|
const handleError = async (error, message) => {
|
|
|
|
|
logger.error('Kafka processing error', {
|
|
|
|
|
error: error?.message,
|
|
|
|
|
type: error?.type,
|
|
|
|
|
stack: error?.stack
|
|
|
|
|
});
|
|
|
|
|
try {
|
|
|
|
|
await redisIntegration.error('Kafka processing error', {
|
|
|
|
|
module: 'kafka',
|
|
|
|
|
stack: error?.stack || error?.message
|
|
|
|
|
});
|
|
|
|
|
} catch (redisError) {
|
|
|
|
|
logger.error('Redis error log failed', { error: redisError?.message });
|
|
|
|
|
}
|
|
|
|
|
if (message) {
|
|
|
|
|
const messageValue = Buffer.isBuffer(message.value)
|
|
|
|
|
? message.value.toString('utf8')
|
|
|
|
|
: message.value;
|
|
|
|
|
try {
|
|
|
|
|
await enqueueError(redisClient, errorQueueKey, {
|
|
|
|
|
attempts: 0,
|
|
|
|
|
value: messageValue,
|
|
|
|
|
meta: {
|
|
|
|
|
topic: message.topic,
|
|
|
|
|
partition: message.partition,
|
|
|
|
|
offset: message.offset,
|
|
|
|
|
key: message.key
|
|
|
|
|
},
|
|
|
|
|
timestamp: Date.now()
|
|
|
|
|
});
|
|
|
|
|
} catch (enqueueError) {
|
|
|
|
|
logger.error('Enqueue error payload failed', { error: enqueueError?.message });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1000;
|
|
|
|
|
const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20;
|
|
|
|
|
const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5000;
|
|
|
|
|
|
|
|
|
|
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
|
|
|
|
|
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
|
|
|
|
|
const commitOnAttempt = config.kafka.commitOnAttempt === true;
|
2026-03-10 19:52:58 +08:00
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
const batchStates = new Map();
|
|
|
|
|
|
|
|
|
|
const partitionKeyFromMessage = (message) => {
|
|
|
|
|
if (message?.topic !== undefined && message?.partition !== undefined) {
|
|
|
|
|
return `${message.topic}-${message.partition}`;
|
2026-02-04 17:51:50 +08:00
|
|
|
}
|
2026-02-09 10:50:56 +08:00
|
|
|
return 'retry';
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const dayKeyFromTsMs = (tsMs) => {
|
|
|
|
|
const numeric = typeof tsMs === 'string' ? Number(tsMs) : tsMs;
|
|
|
|
|
if (!Number.isFinite(numeric)) return null;
|
|
|
|
|
const d = new Date(numeric);
|
|
|
|
|
if (Number.isNaN(d.getTime())) return null;
|
|
|
|
|
const yyyy = d.getFullYear();
|
|
|
|
|
const mm = String(d.getMonth() + 1).padStart(2, '0');
|
|
|
|
|
const dd = String(d.getDate()).padStart(2, '0');
|
|
|
|
|
return `${yyyy}${mm}${dd}`;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const getBatchState = (key) => {
|
|
|
|
|
if (!batchStates.has(key)) {
|
|
|
|
|
batchStates.set(key, { items: [], timer: null, flushing: null });
|
|
|
|
|
}
|
|
|
|
|
return batchStates.get(key);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const isDbConnectionError = (err) => {
|
|
|
|
|
const code = err?.code;
|
|
|
|
|
if (typeof code === 'string') {
|
|
|
|
|
const networkCodes = new Set([
|
|
|
|
|
'ECONNREFUSED',
|
|
|
|
|
'ECONNRESET',
|
|
|
|
|
'EPIPE',
|
|
|
|
|
'ETIMEDOUT',
|
|
|
|
|
'ENOTFOUND',
|
|
|
|
|
'EHOSTUNREACH',
|
|
|
|
|
'ENETUNREACH',
|
|
|
|
|
'57P03',
|
|
|
|
|
'08006',
|
|
|
|
|
'08001',
|
|
|
|
|
'08000',
|
|
|
|
|
'08003'
|
|
|
|
|
]);
|
|
|
|
|
if (networkCodes.has(code)) return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const message = typeof err?.message === 'string' ? err.message : '';
|
|
|
|
|
if (!message) return false;
|
|
|
|
|
const lower = message.toLowerCase();
|
|
|
|
|
return (
|
|
|
|
|
lower.includes('connection timeout') ||
|
|
|
|
|
lower.includes('connection terminated') ||
|
|
|
|
|
lower.includes('connection refused') ||
|
|
|
|
|
lower.includes('terminating connection') ||
|
|
|
|
|
lower.includes('econnrefused') ||
|
|
|
|
|
lower.includes('econnreset') ||
|
|
|
|
|
lower.includes('etimedout') ||
|
|
|
|
|
lower.includes('could not connect') ||
|
|
|
|
|
lower.includes('the database system is starting up') ||
|
|
|
|
|
lower.includes('no pg_hba.conf entry')
|
|
|
|
|
);
|
|
|
|
|
};
|
2026-02-04 17:51:50 +08:00
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
const insertRowsWithRetry = async (rows) => {
|
|
|
|
|
const startedAt = Date.now();
|
2026-02-04 17:51:50 +08:00
|
|
|
while (true) {
|
|
|
|
|
try {
|
2026-03-10 19:52:58 +08:00
|
|
|
const promises = [
|
|
|
|
|
dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows })
|
|
|
|
|
];
|
|
|
|
|
if (config.g5db.enabled) {
|
|
|
|
|
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
|
|
|
|
logger.error('G5 Database insert failed but non-blocking', { error: e.message });
|
|
|
|
|
}));
|
2026-03-24 08:34:36 +08:00
|
|
|
if (config.g5db.roomStatusMomentSyncEnabled) {
|
|
|
|
|
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
|
|
|
|
logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message });
|
|
|
|
|
}));
|
|
|
|
|
}
|
2026-03-10 19:52:58 +08:00
|
|
|
}
|
|
|
|
|
await Promise.all(promises);
|
|
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
metricCollector.increment('db_insert_count', 1);
|
|
|
|
|
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
|
|
|
|
return;
|
|
|
|
|
} catch (err) {
|
|
|
|
|
if (isDbConnectionError(err)) {
|
|
|
|
|
logger.error('Database offline during batch insert. Retrying in 5s...', { error: err.message });
|
|
|
|
|
await new Promise(r => setTimeout(r, 5000));
|
|
|
|
|
while (!(await dbManager.checkConnection())) {
|
|
|
|
|
logger.warn('Database still offline. Waiting 5s...');
|
|
|
|
|
await new Promise(r => setTimeout(r, 5000));
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
throw err;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const insertRowsOnce = async (rows) => {
|
|
|
|
|
const startedAt = Date.now();
|
2026-03-10 19:52:58 +08:00
|
|
|
const promises = [
|
|
|
|
|
dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows })
|
|
|
|
|
];
|
|
|
|
|
if (config.g5db.enabled) {
|
|
|
|
|
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
|
|
|
|
logger.error('G5 Database insert failed in insertOnce', { error: e.message });
|
2026-03-18 11:51:17 +08:00
|
|
|
}));
|
2026-03-24 08:34:36 +08:00
|
|
|
if (config.g5db.roomStatusMomentSyncEnabled) {
|
|
|
|
|
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
|
|
|
|
logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message });
|
|
|
|
|
}));
|
|
|
|
|
}
|
2026-03-10 19:52:58 +08:00
|
|
|
}
|
|
|
|
|
await Promise.all(promises);
|
2026-02-09 10:50:56 +08:00
|
|
|
metricCollector.increment('db_insert_count', 1);
|
|
|
|
|
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const resolveInsertedItems = (partitionKey, items) => {
|
|
|
|
|
let insertedRows = 0;
|
|
|
|
|
for (const p of items) {
|
|
|
|
|
insertedRows += p.rows.length;
|
|
|
|
|
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
|
|
|
|
|
if (dayKey) {
|
|
|
|
|
metricCollector.incrementKeyed('db_inserted_by_day', dayKey, p.rows.length);
|
|
|
|
|
}
|
|
|
|
|
p.item.resolve();
|
|
|
|
|
}
|
|
|
|
|
metricCollector.increment('db_inserted', insertedRows);
|
|
|
|
|
metricCollector.incrementKeyed('db_inserted_by_partition', partitionKey, insertedRows);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const handleFailedItem = async (partitionKey, p, err) => {
|
|
|
|
|
metricCollector.increment('db_failed');
|
|
|
|
|
metricCollector.incrementKeyed('db_failed_by_partition', partitionKey, 1);
|
|
|
|
|
const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms);
|
|
|
|
|
if (dayKey) {
|
|
|
|
|
metricCollector.incrementKeyed('db_failed_by_day', dayKey, 1);
|
|
|
|
|
}
|
|
|
|
|
await handleError(err, p.item.message);
|
|
|
|
|
p.item.resolve();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const insertItemsDegraded = async (partitionKey, items) => {
|
|
|
|
|
if (items.length === 0) return;
|
|
|
|
|
const rows = items.flatMap(p => p.rows);
|
|
|
|
|
if (commitOnAttempt) {
|
|
|
|
|
try {
|
|
|
|
|
await insertRowsOnce(rows);
|
|
|
|
|
resolveInsertedItems(partitionKey, items);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
for (const item of items) {
|
|
|
|
|
await handleFailedItem(partitionKey, item, err);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
await insertRowsWithRetry(rows);
|
|
|
|
|
resolveInsertedItems(partitionKey, items);
|
|
|
|
|
return;
|
|
|
|
|
} catch (err) {
|
|
|
|
|
if (items.length === 1) {
|
|
|
|
|
try {
|
|
|
|
|
await insertRowsWithRetry(items[0].rows);
|
|
|
|
|
resolveInsertedItems(partitionKey, items);
|
|
|
|
|
} catch (innerErr) {
|
|
|
|
|
await handleFailedItem(partitionKey, items[0], innerErr);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
const mid = Math.floor(items.length / 2);
|
|
|
|
|
await insertItemsDegraded(partitionKey, items.slice(0, mid));
|
|
|
|
|
await insertItemsDegraded(partitionKey, items.slice(mid));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const flushBatchForKey = async (partitionKey) => {
|
|
|
|
|
const state = getBatchState(partitionKey);
|
|
|
|
|
if (state.flushing) return state.flushing;
|
|
|
|
|
|
|
|
|
|
state.flushing = (async () => {
|
|
|
|
|
if (state.timer) {
|
|
|
|
|
clearTimeout(state.timer);
|
|
|
|
|
state.timer = null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (state.items.length === 0) return;
|
|
|
|
|
|
|
|
|
|
const startedAt = Date.now();
|
|
|
|
|
const currentBatch = state.items;
|
|
|
|
|
state.items = [];
|
|
|
|
|
|
|
|
|
|
const pendingDbItems = [];
|
|
|
|
|
const unresolvedItems = [];
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
for (const item of currentBatch) {
|
|
|
|
|
try {
|
|
|
|
|
const rows = parseMessageToRows(item.message);
|
|
|
|
|
pendingDbItems.push({ item, rows });
|
|
|
|
|
unresolvedItems.push(item);
|
|
|
|
|
} catch (err) {
|
2026-02-04 17:51:50 +08:00
|
|
|
metricCollector.increment('parse_error');
|
2026-02-09 10:50:56 +08:00
|
|
|
metricCollector.incrementKeyed('parse_error_by_partition', partitionKey, 1);
|
|
|
|
|
logger.error('Message processing failed (Parse/Validation)', { error: err.message });
|
|
|
|
|
await handleError(err, item.message);
|
|
|
|
|
item.resolve();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pendingDbItems.length > 0) {
|
|
|
|
|
const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms;
|
|
|
|
|
const dayKey = dayKeyFromTsMs(firstTs);
|
|
|
|
|
if (dayKey) {
|
|
|
|
|
const dayStartMs = Date.now();
|
|
|
|
|
await insertItemsDegraded(partitionKey, pendingDbItems);
|
|
|
|
|
metricCollector.incrementKeyed('db_insert_ms_sum_by_day', dayKey, Date.now() - dayStartMs);
|
2026-02-04 17:51:50 +08:00
|
|
|
} else {
|
2026-02-09 10:50:56 +08:00
|
|
|
await insertItemsDegraded(partitionKey, pendingDbItems);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metricCollector.increment('batch_flush_count', 1);
|
|
|
|
|
metricCollector.increment('batch_flush_ms_sum', Date.now() - startedAt);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
if (!commitOnAttempt && isDbConnectionError(err)) {
|
|
|
|
|
state.items = unresolvedItems.concat(state.items);
|
|
|
|
|
if (!state.timer) {
|
|
|
|
|
state.timer = setTimeout(() => {
|
|
|
|
|
state.timer = null;
|
|
|
|
|
flushBatchForKey(partitionKey);
|
|
|
|
|
}, 5000);
|
2026-02-04 17:51:50 +08:00
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
2026-02-09 10:50:56 +08:00
|
|
|
|
|
|
|
|
logger.error('Batch flush failed (non-network). Marking as consumed', {
|
|
|
|
|
error: err?.message,
|
|
|
|
|
partitionKey,
|
|
|
|
|
batchSize: currentBatch.length
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for (const item of unresolvedItems) {
|
|
|
|
|
try {
|
|
|
|
|
await handleError(err, item.message);
|
2026-03-10 19:52:58 +08:00
|
|
|
} catch { }
|
2026-02-09 10:50:56 +08:00
|
|
|
item.resolve();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})().finally(() => {
|
|
|
|
|
state.flushing = null;
|
|
|
|
|
if (state.items.length > 0) {
|
|
|
|
|
if (state.items.length >= BATCH_SIZE) {
|
|
|
|
|
flushBatchForKey(partitionKey);
|
|
|
|
|
} else if (!state.timer) {
|
|
|
|
|
state.timer = setTimeout(() => {
|
|
|
|
|
state.timer = null;
|
|
|
|
|
flushBatchForKey(partitionKey);
|
|
|
|
|
}, BATCH_TIMEOUT_MS);
|
|
|
|
|
}
|
2026-02-04 17:51:50 +08:00
|
|
|
}
|
2026-02-09 10:50:56 +08:00
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return state.flushing;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const handleMessage = (message) => {
|
|
|
|
|
if (message.topic) {
|
|
|
|
|
metricCollector.increment('kafka_pulled');
|
|
|
|
|
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
|
2026-02-04 17:51:50 +08:00
|
|
|
}
|
2026-03-10 19:52:58 +08:00
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
// const messageValue = Buffer.isBuffer(message.value)
|
|
|
|
|
// ? message.value.toString('utf8')
|
|
|
|
|
// : message.value;
|
|
|
|
|
// const messageKey = Buffer.isBuffer(message.key)
|
|
|
|
|
// ? message.key.toString('utf8')
|
|
|
|
|
// : message.key;
|
|
|
|
|
|
|
|
|
|
// const logDetails = {
|
|
|
|
|
// topic: message.topic,
|
|
|
|
|
// partition: message.partition,
|
|
|
|
|
// offset: message.offset,
|
|
|
|
|
// key: messageKey,
|
|
|
|
|
// value: config.kafka.logMessages ? messageValue : undefined,
|
|
|
|
|
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
|
|
|
|
|
// };
|
2026-03-10 19:52:58 +08:00
|
|
|
|
2026-02-09 10:50:56 +08:00
|
|
|
// logger.info('Kafka message received', logDetails);
|
|
|
|
|
|
|
|
|
|
const partitionKey = partitionKeyFromMessage(message);
|
|
|
|
|
const state = getBatchState(partitionKey);
|
|
|
|
|
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
|
state.items.push({ message, resolve, reject });
|
|
|
|
|
if (state.items.length >= BATCH_SIZE) {
|
|
|
|
|
flushBatchForKey(partitionKey);
|
|
|
|
|
} else if (!state.timer) {
|
|
|
|
|
state.timer = setTimeout(() => {
|
|
|
|
|
state.timer = null;
|
|
|
|
|
flushBatchForKey(partitionKey);
|
|
|
|
|
}, BATCH_TIMEOUT_MS);
|
|
|
|
|
}
|
|
|
|
|
});
|
2026-02-04 17:51:50 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const consumers = createKafkaConsumers({
|
|
|
|
|
kafkaConfig: config.kafka,
|
|
|
|
|
onMessage: handleMessage,
|
|
|
|
|
onError: handleError
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Start retry worker (non-blocking)
|
|
|
|
|
startErrorRetryWorker({
|
|
|
|
|
client: redisClient,
|
|
|
|
|
queueKey: errorQueueKey,
|
|
|
|
|
redisIntegration,
|
|
|
|
|
handler: async (item) => {
|
|
|
|
|
if (!item?.value) {
|
|
|
|
|
throw new Error('Missing value in retry payload');
|
|
|
|
|
}
|
|
|
|
|
await handleMessage({ value: item.value });
|
|
|
|
|
}
|
|
|
|
|
}).catch(err => {
|
|
|
|
|
logger.error('Retry worker failed', { error: err?.message });
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Graceful Shutdown Logic
|
|
|
|
|
const shutdown = async (signal) => {
|
|
|
|
|
logger.info(`Received ${signal}, shutting down...`);
|
2026-03-10 19:52:58 +08:00
|
|
|
|
2026-02-04 17:51:50 +08:00
|
|
|
try {
|
|
|
|
|
// 1. Close Kafka Consumer
|
|
|
|
|
if (consumers && consumers.length > 0) {
|
|
|
|
|
await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve))));
|
|
|
|
|
logger.info('Kafka consumer closed', { count: consumers.length });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Stop Redis Heartbeat (if method exists, otherwise just close client)
|
|
|
|
|
// redisIntegration.stopHeartbeat(); // Assuming implementation or just rely on client close
|
|
|
|
|
|
|
|
|
|
// 3. Close Redis Client
|
|
|
|
|
await redisClient.quit();
|
|
|
|
|
logger.info('Redis client closed');
|
|
|
|
|
|
2026-03-10 19:52:58 +08:00
|
|
|
// 4. Close Database Pools
|
2026-02-04 17:51:50 +08:00
|
|
|
await dbManager.close();
|
2026-03-10 19:52:58 +08:00
|
|
|
await g5DbManager.close();
|
2026-02-04 17:51:50 +08:00
|
|
|
logger.info('Database connection closed');
|
|
|
|
|
|
|
|
|
|
process.exit(0);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
logger.error('Error during shutdown', { error: err?.message });
|
|
|
|
|
process.exit(1);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
|
|
|
|
process.on('SIGINT', () => shutdown('SIGINT'));
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
bootstrap().catch((error) => {
|
|
|
|
|
logger.error('Service bootstrap failed', { error: error?.message });
|
|
|
|
|
process.exit(1);
|
|
|
|
|
});
|