import cron from 'node-cron'; import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; import dbInitializer from './db/initializer.js'; import partitionManager from './db/partitionManager.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { parseMessageToRows } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; import { RedisIntegration } from './redis/redisIntegration.js'; import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js'; import { MetricCollector } from './utils/metricCollector.js'; import { logger } from './utils/logger.js'; const bootstrap = async () => { // Log startup config (masked) logger.info('Starting application with config', { env: process.env.NODE_ENV, db: { host: config.db.host, port: config.db.port, user: config.db.user, database: config.db.database, schema: config.db.schema }, kafka: { brokers: config.kafka.brokers, topic: config.kafka.topic, groupId: config.kafka.groupId }, redis: { host: config.redis.host, port: config.redis.port } }); // 0. Initialize Database (Create DB, Schema, Table, Partitions) await dbInitializer.initialize(); // Metric Collector const metricCollector = new MetricCollector(); // 1. Setup Partition Maintenance Cron Job (Every day at 00:00) cron.schedule('0 0 * * *', async () => { logger.info('Running scheduled partition maintenance...'); try { await partitionManager.ensurePartitions(30); } catch (err) { logger.error('Scheduled partition maintenance failed', err); } }); // 1.1 Setup Metric Reporting Cron Job (Every minute) // Moved after redisIntegration initialization // DatabaseManager is now a singleton exported instance, but let's keep consistency if possible // In databaseManager.js it exports `dbManager` instance by default. // The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export. // Let's check `databaseManager.js` content. // Wait, I imported `dbManager` from `./db/databaseManager.js`. // If `databaseManager.js` exports an instance as default, I should use that. // If it exports a class, I should instantiate it. // Let's assume the previous code `new DatabaseManager` was correct if it was a class. // BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance. // I need to verify `databaseManager.js`. const redisClient = await createRedisClient(config.redis); const redisIntegration = new RedisIntegration( redisClient, config.redis.projectName, config.redis.apiBaseUrl ); redisIntegration.startHeartbeat(); // 1.1 Setup Metric Reporting Cron Job (Every minute) cron.schedule('* * * * *', async () => { const metrics = metricCollector.getAndReset(); const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : '0.0'; const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : '0.0'; const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`; console.log(report); logger.info(report, metrics); try { await redisIntegration.info('Minute Metrics', metrics); } catch (err) { logger.error('Failed to report metrics to Redis', { error: err?.message }); } }); const errorQueueKey = buildErrorQueueKey(config.redis.projectName); const handleError = async (error, message) => { logger.error('Kafka processing error', { error: error?.message, type: error?.type, stack: error?.stack }); try { await redisIntegration.error('Kafka processing error', { module: 'kafka', stack: error?.stack || error?.message }); } catch (redisError) { logger.error('Redis error log failed', { error: redisError?.message }); } if (message) { const messageValue = Buffer.isBuffer(message.value) ? message.value.toString('utf8') : message.value; try { await enqueueError(redisClient, errorQueueKey, { attempts: 0, value: messageValue, meta: { topic: message.topic, partition: message.partition, offset: message.offset, key: message.key }, timestamp: Date.now() }); } catch (enqueueError) { logger.error('Enqueue error payload failed', { error: enqueueError?.message }); } } }; const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1000; const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20; const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5000; const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); const commitOnAttempt = config.kafka.commitOnAttempt === true; const batchStates = new Map(); const partitionKeyFromMessage = (message) => { if (message?.topic !== undefined && message?.partition !== undefined) { return `${message.topic}-${message.partition}`; } return 'retry'; }; const dayKeyFromTsMs = (tsMs) => { const numeric = typeof tsMs === 'string' ? Number(tsMs) : tsMs; if (!Number.isFinite(numeric)) return null; const d = new Date(numeric); if (Number.isNaN(d.getTime())) return null; const yyyy = d.getFullYear(); const mm = String(d.getMonth() + 1).padStart(2, '0'); const dd = String(d.getDate()).padStart(2, '0'); return `${yyyy}${mm}${dd}`; }; const getBatchState = (key) => { if (!batchStates.has(key)) { batchStates.set(key, { items: [], timer: null, flushing: null }); } return batchStates.get(key); }; const isDbConnectionError = (err) => { const code = err?.code; if (typeof code === 'string') { const networkCodes = new Set([ 'ECONNREFUSED', 'ECONNRESET', 'EPIPE', 'ETIMEDOUT', 'ENOTFOUND', 'EHOSTUNREACH', 'ENETUNREACH', '57P03', '08006', '08001', '08000', '08003' ]); if (networkCodes.has(code)) return true; } const message = typeof err?.message === 'string' ? err.message : ''; if (!message) return false; const lower = message.toLowerCase(); return ( lower.includes('connection timeout') || lower.includes('connection terminated') || lower.includes('connection refused') || lower.includes('terminating connection') || lower.includes('econnrefused') || lower.includes('econnreset') || lower.includes('etimedout') || lower.includes('could not connect') || lower.includes('the database system is starting up') || lower.includes('no pg_hba.conf entry') ); }; const isMissingPartitionError = (err) => err?.code === '23514' || (typeof err?.message === 'string' && err.message.includes('no partition of relation')); const insertRowsWithRetry = async (rows) => { const startedAt = Date.now(); let attemptedPartitionFix = false; while (true) { try { await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); return; } catch (err) { if (isDbConnectionError(err)) { logger.error('Database offline during batch insert. Retrying in 5s...', { error: err.message }); await new Promise(r => setTimeout(r, 5000)); while (!(await dbManager.checkConnection())) { logger.warn('Database still offline. Waiting 5s...'); await new Promise(r => setTimeout(r, 5000)); } continue; } if (isMissingPartitionError(err) && !attemptedPartitionFix) { attemptedPartitionFix = true; try { await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms)); } catch (partitionErr) { if (isDbConnectionError(partitionErr)) { logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message }); await new Promise(r => setTimeout(r, 5000)); while (!(await dbManager.checkConnection())) { logger.warn('Database still offline. Waiting 5s...'); await new Promise(r => setTimeout(r, 5000)); } continue; } throw partitionErr; } continue; } throw err; } } }; const insertRowsOnce = async (rows) => { const startedAt = Date.now(); await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); }; const resolveInsertedItems = (partitionKey, items) => { let insertedRows = 0; for (const p of items) { insertedRows += p.rows.length; const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); if (dayKey) { metricCollector.incrementKeyed('db_inserted_by_day', dayKey, p.rows.length); } p.item.resolve(); } metricCollector.increment('db_inserted', insertedRows); metricCollector.incrementKeyed('db_inserted_by_partition', partitionKey, insertedRows); }; const handleFailedItem = async (partitionKey, p, err) => { metricCollector.increment('db_failed'); metricCollector.incrementKeyed('db_failed_by_partition', partitionKey, 1); const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); if (dayKey) { metricCollector.incrementKeyed('db_failed_by_day', dayKey, 1); } await handleError(err, p.item.message); p.item.resolve(); }; const insertItemsDegraded = async (partitionKey, items) => { if (items.length === 0) return; const rows = items.flatMap(p => p.rows); if (commitOnAttempt) { try { await insertRowsOnce(rows); resolveInsertedItems(partitionKey, items); } catch (err) { for (const item of items) { await handleFailedItem(partitionKey, item, err); } } return; } try { await insertRowsWithRetry(rows); resolveInsertedItems(partitionKey, items); return; } catch (err) { if (items.length === 1) { try { await insertRowsWithRetry(items[0].rows); resolveInsertedItems(partitionKey, items); } catch (innerErr) { await handleFailedItem(partitionKey, items[0], innerErr); } return; } const mid = Math.floor(items.length / 2); await insertItemsDegraded(partitionKey, items.slice(0, mid)); await insertItemsDegraded(partitionKey, items.slice(mid)); } }; const flushBatchForKey = async (partitionKey) => { const state = getBatchState(partitionKey); if (state.flushing) return state.flushing; state.flushing = (async () => { if (state.timer) { clearTimeout(state.timer); state.timer = null; } if (state.items.length === 0) return; const startedAt = Date.now(); const currentBatch = state.items; state.items = []; const pendingDbItems = []; const unresolvedItems = []; try { for (const item of currentBatch) { try { const rows = parseMessageToRows(item.message); pendingDbItems.push({ item, rows }); unresolvedItems.push(item); } catch (err) { metricCollector.increment('parse_error'); metricCollector.incrementKeyed('parse_error_by_partition', partitionKey, 1); logger.error('Message processing failed (Parse/Validation)', { error: err.message }); await handleError(err, item.message); item.resolve(); } } if (pendingDbItems.length > 0) { const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms; const dayKey = dayKeyFromTsMs(firstTs); if (dayKey) { const dayStartMs = Date.now(); await insertItemsDegraded(partitionKey, pendingDbItems); metricCollector.incrementKeyed('db_insert_ms_sum_by_day', dayKey, Date.now() - dayStartMs); } else { await insertItemsDegraded(partitionKey, pendingDbItems); } } metricCollector.increment('batch_flush_count', 1); metricCollector.increment('batch_flush_ms_sum', Date.now() - startedAt); } catch (err) { if (!commitOnAttempt && isDbConnectionError(err)) { state.items = unresolvedItems.concat(state.items); if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, 5000); } return; } logger.error('Batch flush failed (non-network). Marking as consumed', { error: err?.message, partitionKey, batchSize: currentBatch.length }); for (const item of unresolvedItems) { try { await handleError(err, item.message); } catch {} item.resolve(); } } })().finally(() => { state.flushing = null; if (state.items.length > 0) { if (state.items.length >= BATCH_SIZE) { flushBatchForKey(partitionKey); } else if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, BATCH_TIMEOUT_MS); } } }); return state.flushing; }; const handleMessage = (message) => { if (message.topic) { metricCollector.increment('kafka_pulled'); metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1); } // const messageValue = Buffer.isBuffer(message.value) // ? message.value.toString('utf8') // : message.value; // const messageKey = Buffer.isBuffer(message.key) // ? message.key.toString('utf8') // : message.key; // const logDetails = { // topic: message.topic, // partition: message.partition, // offset: message.offset, // key: messageKey, // value: config.kafka.logMessages ? messageValue : undefined, // valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null // }; // logger.info('Kafka message received', logDetails); const partitionKey = partitionKeyFromMessage(message); const state = getBatchState(partitionKey); return new Promise((resolve, reject) => { state.items.push({ message, resolve, reject }); if (state.items.length >= BATCH_SIZE) { flushBatchForKey(partitionKey); } else if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, BATCH_TIMEOUT_MS); } }); }; const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, onError: handleError }); // Start retry worker (non-blocking) startErrorRetryWorker({ client: redisClient, queueKey: errorQueueKey, redisIntegration, handler: async (item) => { if (!item?.value) { throw new Error('Missing value in retry payload'); } await handleMessage({ value: item.value }); } }).catch(err => { logger.error('Retry worker failed', { error: err?.message }); }); // Graceful Shutdown Logic const shutdown = async (signal) => { logger.info(`Received ${signal}, shutting down...`); try { // 1. Close Kafka Consumer if (consumers && consumers.length > 0) { await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve)))); logger.info('Kafka consumer closed', { count: consumers.length }); } // 2. Stop Redis Heartbeat (if method exists, otherwise just close client) // redisIntegration.stopHeartbeat(); // Assuming implementation or just rely on client close // 3. Close Redis Client await redisClient.quit(); logger.info('Redis client closed'); // 4. Close Database Pool await dbManager.close(); logger.info('Database connection closed'); process.exit(0); } catch (err) { logger.error('Error during shutdown', { error: err?.message }); process.exit(1); } }; process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); }; bootstrap().catch((error) => { logger.error('Service bootstrap failed', { error: error?.message }); process.exit(1); });