import cron from 'node-cron'; import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; import dbInitializer from './db/initializer.js'; import partitionManager from './db/partitionManager.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { processKafkaMessage } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; import { RedisIntegration } from './redis/redisIntegration.js'; import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js'; import { MetricCollector } from './utils/metricCollector.js'; import { logger } from './utils/logger.js'; const bootstrap = async () => { // Log startup config (masked) logger.info('Starting application with config', { env: process.env.NODE_ENV, db: { host: config.db.host, port: config.db.port, user: config.db.user, database: config.db.database, schema: config.db.schema }, kafka: { brokers: config.kafka.brokers, topic: config.kafka.topic, groupId: config.kafka.groupId }, redis: { host: config.redis.host, port: config.redis.port } }); // 0. Initialize Database (Create DB, Schema, Table, Partitions) await dbInitializer.initialize(); // Metric Collector const metricCollector = new MetricCollector(); // 1. Setup Partition Maintenance Cron Job (Every day at 00:00) cron.schedule('0 0 * * *', async () => { logger.info('Running scheduled partition maintenance...'); try { await partitionManager.ensurePartitions(30); } catch (err) { logger.error('Scheduled partition maintenance failed', err); } }); // 1.1 Setup Metric Reporting Cron Job (Every minute) // Moved after redisIntegration initialization // DatabaseManager is now a singleton exported instance, but let's keep consistency if possible // In databaseManager.js it exports `dbManager` instance by default. // The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export. // Let's check `databaseManager.js` content. // Wait, I imported `dbManager` from `./db/databaseManager.js`. // If `databaseManager.js` exports an instance as default, I should use that. // If it exports a class, I should instantiate it. // Let's assume the previous code `new DatabaseManager` was correct if it was a class. // BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance. // I need to verify `databaseManager.js`. const redisClient = await createRedisClient(config.redis); const redisIntegration = new RedisIntegration( redisClient, config.redis.projectName, config.redis.apiBaseUrl ); redisIntegration.startHeartbeat(); // 1.1 Setup Metric Reporting Cron Job (Every minute) cron.schedule('* * * * *', async () => { const metrics = metricCollector.getAndReset(); const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`; console.log(report); logger.info(report, metrics); try { await redisIntegration.info('Minute Metrics', metrics); } catch (err) { logger.error('Failed to report metrics to Redis', { error: err?.message }); } }); const errorQueueKey = buildErrorQueueKey(config.redis.projectName); const handleError = async (error, message) => { logger.error('Kafka processing error', { error: error?.message, type: error?.type, stack: error?.stack }); try { await redisIntegration.error('Kafka processing error', { module: 'kafka', stack: error?.stack || error?.message }); } catch (redisError) { logger.error('Redis error log failed', { error: redisError?.message }); } if (message) { const messageValue = Buffer.isBuffer(message.value) ? message.value.toString('utf8') : message.value; try { await enqueueError(redisClient, errorQueueKey, { attempts: 0, value: messageValue, meta: { topic: message.topic, partition: message.partition, offset: message.offset, key: message.key }, timestamp: Date.now() }); } catch (enqueueError) { logger.error('Enqueue error payload failed', { error: enqueueError?.message }); } } }; const handleMessage = async (message) => { if (message.topic) { metricCollector.increment('kafka_pulled'); } const messageValue = Buffer.isBuffer(message.value) ? message.value.toString('utf8') : message.value; const messageKey = Buffer.isBuffer(message.key) ? message.key.toString('utf8') : message.key; const logDetails = { topic: message.topic, partition: message.partition, offset: message.offset, key: messageKey, value: config.kafka.logMessages ? messageValue : undefined, valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null }; // logger.info('Kafka message received', logDetails); while (true) { try { const inserted = await processKafkaMessage({ message, dbManager, config }); metricCollector.increment('db_inserted'); // logger.info('Kafka message processed', { inserted }); return; // Success, allowing commit } catch (error) { // Identify DB connection errors const isDbConnectionError = (error.code && ['ECONNREFUSED', '57P03', '08006', '08001', 'EADDRINUSE', 'ETIMEDOUT'].includes(error.code)) || (error.message && ( error.message.includes('ECONNREFUSED') || error.message.includes('connection') || error.message.includes('terminated') || error.message.includes('EADDRINUSE') || error.message.includes('ETIMEDOUT') || error.message.includes('The server does not support SSL connections') // Possible if DB restarts without SSL )); if (isDbConnectionError) { logger.error('Database offline. Pausing consumption for 1 minute...', { error: error.message }); // metricCollector.increment('db_failed'); // Maybe not count as fail since we retry? User didn't specify. // Wait 1 minute before checking await new Promise(resolve => setTimeout(resolve, 60000)); // Check connection loop while (true) { const isConnected = await dbManager.checkConnection(); if (isConnected) { logger.info('Database connection restored. Resuming processing...'); break; // Break check loop to retry processing } logger.warn('Database still offline. Waiting 1 minute...'); await new Promise(resolve => setTimeout(resolve, 60000)); } } else { // Non-connection error (Data error, Parse error, etc.) if (error.type === 'PARSE_ERROR') { metricCollector.increment('parse_error'); } else { metricCollector.increment('db_failed'); } logger.error('Message processing failed (Data/Logic Error), skipping message', { error: error?.message, type: error?.type }); // Enqueue to error queue await handleError(error, message); // For non-connection errors, we must skip this message and commit the offset // so we don't get stuck in an infinite retry loop. return; } } } }; const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, onError: handleError }); // Start retry worker (non-blocking) startErrorRetryWorker({ client: redisClient, queueKey: errorQueueKey, redisIntegration, handler: async (item) => { if (!item?.value) { throw new Error('Missing value in retry payload'); } await handleMessage({ value: item.value }); } }).catch(err => { logger.error('Retry worker failed', { error: err?.message }); }); // Graceful Shutdown Logic const shutdown = async (signal) => { logger.info(`Received ${signal}, shutting down...`); try { // 1. Close Kafka Consumer if (consumers && consumers.length > 0) { await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve)))); logger.info('Kafka consumer closed', { count: consumers.length }); } // 2. Stop Redis Heartbeat (if method exists, otherwise just close client) // redisIntegration.stopHeartbeat(); // Assuming implementation or just rely on client close // 3. Close Redis Client await redisClient.quit(); logger.info('Redis client closed'); // 4. Close Database Pool await dbManager.close(); logger.info('Database connection closed'); process.exit(0); } catch (err) { logger.error('Error during shutdown', { error: err?.message }); process.exit(1); } }; process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); }; bootstrap().catch((error) => { logger.error('Service bootstrap failed', { error: error?.message }); process.exit(1); });