import cron from "node-cron"; import dotenv from "dotenv"; import pg from "pg"; import fs from "fs"; import path from "path"; import { fileURLToPath } from "url"; import kafka from "kafka-node"; import { randomUUID } from "crypto"; import { z } from "zod"; import { createClient } from "redis"; dotenv.config(); const parseNumber = (value, defaultValue) => { const parsed = Number(value); return Number.isFinite(parsed) ? parsed : defaultValue; }; const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean); const config = { env: process.env.NODE_ENV || "development", port: parseNumber(process.env.PORT, 3001), kafka: { brokers: parseList(process.env.KAFKA_BROKERS), topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-onoffline-topic", groupId: process.env.KAFKA_GROUP_ID || "bls-onoffline-group", clientId: process.env.KAFKA_CLIENT_ID || "bls-onoffline-client", consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4), fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024), fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024), fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3), commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200), commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true", batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3), batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50), logMessages: process.env.KAFKA_LOG_MESSAGES === "true", sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { mechanism: process.env.KAFKA_SASL_MECHANISM || "plain", username: process.env.KAFKA_SASL_USERNAME, password: process.env.KAFKA_SASL_PASSWORD } : void 0 }, db: { host: process.env.DB_HOST || process.env.POSTGRES_HOST || "localhost", port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432), user: process.env.DB_USER || process.env.POSTGRES_USER || "postgres", password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || "", database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || "log_platform", max: parseNumber(process.env.DB_MAX_CONNECTIONS || process.env.POSTGRES_MAX_CONNECTIONS, 10), ssl: process.env.DB_SSL === "true" ? { rejectUnauthorized: false } : void 0, schema: process.env.DB_SCHEMA || "onoffline", table: process.env.DB_TABLE || "onoffline_record" }, redis: { host: process.env.REDIS_HOST || "localhost", port: parseNumber(process.env.REDIS_PORT, 6379), password: process.env.REDIS_PASSWORD || void 0, db: parseNumber(process.env.REDIS_DB, 0), projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline", apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}` } }; const format = (level, message, context) => { const payload = { level, message, timestamp: Date.now(), ...context ? { context } : {} }; return JSON.stringify(payload); }; const logger = { info(message, context) { process.stdout.write(`${format("info", message, context)} `); }, error(message, context) { process.stderr.write(`${format("error", message, context)} `); }, warn(message, context) { process.stderr.write(`${format("warn", message, context)} `); } }; const { Pool } = pg; const columns = [ "guid", "ts_ms", "write_ts_ms", "hotel_id", "mac", "device_id", "room_id", "ip", "current_status", "launcher_version", "reboot_reason" ]; class DatabaseManager { constructor(dbConfig) { this.pool = new Pool({ host: dbConfig.host, port: dbConfig.port, user: dbConfig.user, password: dbConfig.password, database: dbConfig.database, max: dbConfig.max, ssl: dbConfig.ssl }); } async insertRows({ schema, table, rows }) { if (!rows || rows.length === 0) { return; } const statement = ` INSERT INTO ${schema}.${table} (${columns.join(", ")}) SELECT * FROM UNNEST( $1::text[], $2::int8[], $3::int8[], $4::int2[], $5::text[], $6::text[], $7::text[], $8::text[], $9::text[], $10::text[], $11::text[] ) ON CONFLICT DO NOTHING `; try { const params = columns.map((column) => rows.map((row) => row[column] ?? null)); await this.pool.query(statement, params); } catch (error) { logger.error("Database insert failed", { error: error?.message, schema, table, rowsLength: rows.length }); throw error; } } async checkConnection() { let client; try { const connectPromise = this.pool.connect(); const timeoutPromise = new Promise((_, reject) => { setTimeout(() => reject(new Error("Connection timeout")), 5e3); }); try { client = await Promise.race([connectPromise, timeoutPromise]); } catch (raceError) { connectPromise.then((c) => c.release()).catch(() => { }); throw raceError; } await client.query("SELECT 1"); return true; } catch (err) { logger.error("Database check connection failed", { error: err.message }); return false; } finally { if (client) { client.release(); } } } async close() { await this.pool.end(); } } const dbManager = new DatabaseManager(config.db); class PartitionManager { /** * Calculate the start and end timestamps (milliseconds) for a given date. * @param {Date} date - The date to calculate for. * @returns {Object} { startMs, endMs, partitionSuffix } */ getPartitionInfo(date) { const yyyy = date.getFullYear(); const mm = String(date.getMonth() + 1).padStart(2, "0"); const dd = String(date.getDate()).padStart(2, "0"); const partitionSuffix = `${yyyy}${mm}${dd}`; const start = new Date(date); start.setHours(0, 0, 0, 0); const startMs = start.getTime(); const end = new Date(date); end.setDate(end.getDate() + 1); end.setHours(0, 0, 0, 0); const endMs = end.getTime(); return { startMs, endMs, partitionSuffix }; } async ensurePartitionIndexes(client, schema, table, partitionSuffix) { const startedAt = Date.now(); const partitionName = `${schema}.${table}_${partitionSuffix}`; const indexBase = `${table}_${partitionSuffix}`; const indexSpecs = [ { name: `idx_${indexBase}_ts`, column: "ts_ms" }, { name: `idx_${indexBase}_hid`, column: "hotel_id" }, { name: `idx_${indexBase}_mac`, column: "mac" }, { name: `idx_${indexBase}_did`, column: "device_id" }, { name: `idx_${indexBase}_rid`, column: "room_id" }, { name: `idx_${indexBase}_cs`, column: "current_status" } ]; for (const spec of indexSpecs) { await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`); } await client.query(`ANALYZE ${partitionName};`); const elapsedMs = Date.now() - startedAt; if (elapsedMs > 1e3) { logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs }); } } async ensureIndexesForExistingPartitions() { const startedAt = Date.now(); const client = await dbManager.pool.connect(); try { const schema = config.db.schema; const table = config.db.table; const res = await client.query( ` SELECT c.relname AS relname FROM pg_inherits i JOIN pg_class p ON i.inhparent = p.oid JOIN pg_namespace pn ON pn.oid = p.relnamespace JOIN pg_class c ON i.inhrelid = c.oid WHERE pn.nspname = $1 AND p.relname = $2 ORDER BY c.relname; `, [schema, table] ); const suffixes = /* @__PURE__ */ new Set(); const pattern = new RegExp(`^${table}_(\\d{8})$`); for (const row of res.rows) { const relname = row?.relname; if (typeof relname !== "string") continue; const match = relname.match(pattern); if (!match) continue; suffixes.add(match[1]); } for (const suffix of suffixes) { await this.ensurePartitionIndexes(client, schema, table, suffix); } const elapsedMs = Date.now() - startedAt; if (elapsedMs > 5e3) { logger.warn("Ensure existing partition indexes slow", { schema, table, partitions: suffixes.size, elapsedMs }); } } finally { client.release(); } } /** * Ensure partitions exist for the past M days and next N days. * @param {number} daysAhead - Number of days to pre-create. * @param {number} daysBack - Number of days to look back. */ async ensurePartitions(daysAhead = 30, daysBack = 15) { const client = await dbManager.pool.connect(); try { logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); const now = /* @__PURE__ */ new Date(); for (let i = -daysBack; i < daysAhead; i++) { const targetDate = new Date(now); targetDate.setDate(now.getDate() + i); const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate); const schema = config.db.schema; const table = config.db.table; const partitionName = `${schema}.${table}_${partitionSuffix}`; const checkSql = ` SELECT to_regclass($1) as exists; `; const checkRes = await client.query(checkSql, [partitionName]); if (!checkRes.rows[0].exists) { logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); const createSql = ` CREATE TABLE IF NOT EXISTS ${partitionName} PARTITION OF ${schema}.${table} FOR VALUES FROM (${startMs}) TO (${endMs}); `; await client.query(createSql); } await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); } logger.info("Partition check completed."); } catch (err) { logger.error("Error ensuring partitions:", err); throw err; } finally { client.release(); } } async ensurePartitionsForTimestamps(tsMsList) { if (!Array.isArray(tsMsList) || tsMsList.length === 0) return; const uniqueSuffixes = /* @__PURE__ */ new Set(); for (const ts of tsMsList) { const numericTs = typeof ts === "string" ? Number(ts) : ts; if (!Number.isFinite(numericTs)) continue; const date = new Date(numericTs); if (Number.isNaN(date.getTime())) continue; const { partitionSuffix } = this.getPartitionInfo(date); uniqueSuffixes.add(partitionSuffix); if (uniqueSuffixes.size >= 400) break; } if (uniqueSuffixes.size === 0) return; const client = await dbManager.pool.connect(); try { const schema = config.db.schema; const table = config.db.table; for (const partitionSuffix of uniqueSuffixes) { const yyyy = Number(partitionSuffix.slice(0, 4)); const mm = Number(partitionSuffix.slice(4, 6)); const dd = Number(partitionSuffix.slice(6, 8)); if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue; const targetDate = new Date(yyyy, mm - 1, dd); if (Number.isNaN(targetDate.getTime())) continue; const { startMs, endMs } = this.getPartitionInfo(targetDate); const partitionName = `${schema}.${table}_${partitionSuffix}`; const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]); if (!checkRes.rows[0].exists) { logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); await client.query(` CREATE TABLE IF NOT EXISTS ${partitionName} PARTITION OF ${schema}.${table} FOR VALUES FROM (${startMs}) TO (${endMs}); `); } await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); } } finally { client.release(); } } } const partitionManager = new PartitionManager(); const __filename$1 = fileURLToPath(import.meta.url); const __dirname$1 = path.dirname(__filename$1); class DatabaseInitializer { async initialize() { logger.info("Starting database initialization check..."); await this.ensureDatabaseExists(); await this.ensureSchemaAndTable(); await partitionManager.ensurePartitions(30); await partitionManager.ensureIndexesForExistingPartitions(); console.log("Database initialization completed successfully."); logger.info("Database initialization completed successfully."); } async ensureDatabaseExists() { const { host, port, user, password, database, ssl } = config.db; console.log(`Checking if database '${database}' exists at ${host}:${port}...`); const client = new pg.Client({ host, port, user, password, database: "postgres", ssl: ssl ? { rejectUnauthorized: false } : false }); try { await client.connect(); const checkRes = await client.query( `SELECT 1 FROM pg_database WHERE datname = $1`, [database] ); if (checkRes.rowCount === 0) { logger.info(`Database '${database}' does not exist. Creating...`); await client.query(`CREATE DATABASE "${database}"`); console.log(`Database '${database}' created.`); logger.info(`Database '${database}' created.`); } else { console.log(`Database '${database}' already exists.`); logger.info(`Database '${database}' already exists.`); } } catch (err) { logger.error("Error ensuring database exists:", err); throw err; } finally { await client.end(); } } async ensureSchemaAndTable() { const client = await dbManager.pool.connect(); try { const sqlPathCandidates = [ path.resolve(process.cwd(), "scripts/init_db.sql"), path.resolve(__dirname$1, "../scripts/init_db.sql"), path.resolve(__dirname$1, "../../scripts/init_db.sql") ]; const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate)); if (!sqlPath) { throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(" | ")}`); } const sql = fs.readFileSync(sqlPath, "utf8"); console.log(`Executing init_db.sql from ${sqlPath}...`); logger.info("Executing init_db.sql..."); await client.query(sql); console.log("Schema and parent table initialized."); logger.info("Schema and parent table initialized."); } catch (err) { logger.error("Error initializing schema and table:", err); throw err; } finally { client.release(); } } } const dbInitializer = new DatabaseInitializer(); class OffsetTracker { constructor() { this.partitions = /* @__PURE__ */ new Map(); } // Called when a message is received (before processing) add(topic, partition, offset) { const key = `${topic}-${partition}`; if (!this.partitions.has(key)) { this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() }); } const state = this.partitions.get(key); const numericOffset = Number(offset); if (!Number.isFinite(numericOffset)) return; if (state.nextCommitOffset === null) { state.nextCommitOffset = numericOffset; } else if (numericOffset < state.nextCommitOffset) { state.nextCommitOffset = numericOffset; } } // Called when a message is successfully processed // Returns the next offset to commit (if any advancement is possible), or null markDone(topic, partition, offset) { const key = `${topic}-${partition}`; const state = this.partitions.get(key); if (!state) return null; const numericOffset = Number(offset); if (!Number.isFinite(numericOffset)) return null; state.done.add(numericOffset); if (state.nextCommitOffset === null) { state.nextCommitOffset = numericOffset; } let advanced = false; while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) { state.done.delete(state.nextCommitOffset); state.nextCommitOffset += 1; advanced = true; } if (!advanced) return null; return state.nextCommitOffset; } clear() { this.partitions.clear(); } } const { ConsumerGroup } = kafka; const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => { const kafkaHost = kafkaConfig.brokers.join(","); const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; const id = `${clientId}-${process.pid}-${Date.now()}`; const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3; const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200; let inFlight = 0; const tracker = new OffsetTracker(); let pendingCommits = /* @__PURE__ */ new Map(); let commitTimer = null; const flushCommits = () => { if (pendingCommits.size === 0) return; const batch = pendingCommits; pendingCommits = /* @__PURE__ */ new Map(); consumer.sendOffsetCommitRequest( Array.from(batch.values()), (err) => { if (err) { for (const [k, v] of batch.entries()) { pendingCommits.set(k, v); } logger.error("Kafka commit failed", { error: err?.message, count: batch.size }); } } ); }; const scheduleCommitFlush = () => { if (commitTimer) return; commitTimer = setTimeout(() => { commitTimer = null; flushCommits(); }, commitIntervalMs); }; const consumer = new ConsumerGroup( { kafkaHost, groupId: kafkaConfig.groupId, clientId, id, fromOffset: "earliest", protocol: ["roundrobin"], outOfRangeOffset: "latest", autoCommit: false, autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs, fetchMaxBytes: kafkaConfig.fetchMaxBytes, fetchMinBytes: kafkaConfig.fetchMinBytes, fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs, sasl: kafkaConfig.sasl }, kafkaConfig.topic ); const tryResume = () => { if (inFlight < maxInFlight && consumer.paused) { consumer.resume(); } }; consumer.on("message", (message) => { inFlight += 1; tracker.add(message.topic, message.partition, message.offset); if (inFlight >= maxInFlight) { consumer.pause(); } Promise.resolve(onMessage(message)).then(() => { }).catch((error) => { logger.error("Kafka message handling failed", { error: error?.message }); if (onError) { onError(error, message); } }).finally(() => { const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); if (commitOffset !== null) { const key = `${message.topic}-${message.partition}`; pendingCommits.set(key, { topic: message.topic, partition: message.partition, offset: commitOffset, metadata: "m" }); scheduleCommitFlush(); } inFlight -= 1; tryResume(); }); }); consumer.on("error", (error) => { logger.error("Kafka consumer error", { error: error?.message }); if (onError) { onError(error); } }); consumer.on("connect", () => { logger.info(`Kafka Consumer connected`, { groupId: kafkaConfig.groupId, clientId }); }); consumer.on("rebalancing", () => { logger.info(`Kafka Consumer rebalancing`, { groupId: kafkaConfig.groupId, clientId }); tracker.clear(); pendingCommits.clear(); if (commitTimer) { clearTimeout(commitTimer); commitTimer = null; } }); consumer.on("rebalanced", () => { logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId }); }); consumer.on("error", (err) => { logger.error("Kafka Consumer Error", { error: err.message }); }); consumer.on("offsetOutOfRange", (err) => { logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition }); }); consumer.on("offsetOutOfRange", (error) => { logger.warn(`Kafka Consumer offset out of range`, { error: error?.message, groupId: kafkaConfig.groupId, clientId }); }); consumer.on("close", () => { if (commitTimer) { clearTimeout(commitTimer); commitTimer = null; } flushCommits(); logger.warn(`Kafka Consumer closed`, { groupId: kafkaConfig.groupId, clientId }); }); return consumer; }; const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => { const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1; const count = Math.max(1, instances); return Array.from( { length: count }, (_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx }) ); }; const createGuid = () => randomUUID().replace(/-/g, ""); const toNumber = (value) => { if (value === void 0 || value === null || value === "") { return value; } if (typeof value === "number") { return value; } const parsed = Number(value); return Number.isFinite(parsed) ? parsed : value; }; const toStringAllowEmpty = (value) => { if (value === void 0 || value === null) { return value; } return String(value); }; const kafkaPayloadSchema = z.object({ HotelCode: z.preprocess(toNumber, z.number()), MAC: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), HostNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), RoomNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), EndPoint: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), CurrentStatus: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), CurrentTime: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), UnixTime: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), LauncherVersion: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), RebootReason: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable() }); const normalizeText = (value, maxLength) => { if (value === void 0 || value === null) { return null; } const str = String(value); if (maxLength && str.length > maxLength) { return str.substring(0, maxLength); } return str; }; const buildRowsFromPayload = (rawPayload) => { const payload = kafkaPayloadSchema.parse(rawPayload); const rebootReason = normalizeText(payload.RebootReason, 255); const currentStatusRaw = normalizeText(payload.CurrentStatus, 255); const hasRebootReason = rebootReason !== null && rebootReason !== ""; const currentStatus = hasRebootReason ? "on" : currentStatusRaw; let tsMs = payload.UnixTime; if (typeof tsMs === "number" && tsMs < 1e11) { tsMs = tsMs * 1e3; } if (!tsMs && payload.CurrentTime) { const parsed = Date.parse(payload.CurrentTime); if (!isNaN(parsed)) { tsMs = parsed; } } if (!tsMs) { tsMs = Date.now(); } const mac = normalizeText(payload.MAC) || ""; const deviceId = normalizeText(payload.HostNumber) || ""; const roomId = normalizeText(payload.RoomNumber) || ""; const row = { guid: createGuid(), ts_ms: tsMs, write_ts_ms: Date.now(), hotel_id: payload.HotelCode, mac, device_id: deviceId, room_id: roomId, ip: normalizeText(payload.EndPoint), current_status: currentStatus, launcher_version: normalizeText(payload.LauncherVersion, 255), reboot_reason: rebootReason }; return [row]; }; const parseMessageToRows = (message) => { const rawValue = message.value.toString(); let payload; try { payload = JSON.parse(rawValue); } catch (e) { const error = new Error(`JSON Parse Error: ${e.message}`); error.type = "PARSE_ERROR"; throw error; } const validationResult = kafkaPayloadSchema.safeParse(payload); if (!validationResult.success) { const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); error.type = "VALIDATION_ERROR"; throw error; } return buildRowsFromPayload(payload); }; const createRedisClient = async (config2) => { const client = createClient({ socket: { host: config2.host, port: config2.port }, password: config2.password, database: config2.db }); await client.connect(); return client; }; class RedisIntegration { constructor(client, projectName, apiBaseUrl) { this.client = client; this.projectName = projectName; this.apiBaseUrl = apiBaseUrl; this.heartbeatKey = "项目心跳"; this.logKey = `${projectName}_项目控制台`; } async info(message, context) { const payload = { timestamp: (/* @__PURE__ */ new Date()).toISOString(), level: "info", message, metadata: context || void 0 }; await this.client.rPush(this.logKey, JSON.stringify(payload)); } async error(message, context) { const payload = { timestamp: (/* @__PURE__ */ new Date()).toISOString(), level: "error", message, metadata: context || void 0 }; await this.client.rPush(this.logKey, JSON.stringify(payload)); } startHeartbeat() { setInterval(() => { const payload = { projectName: this.projectName, apiBaseUrl: this.apiBaseUrl, lastActiveAt: Date.now() }; this.client.rPush(this.heartbeatKey, JSON.stringify(payload)); }, 3e3); } } const buildErrorQueueKey = (projectName) => `${projectName}_error_queue`; const enqueueError = async (client, queueKey, payload) => { try { await client.rPush(queueKey, JSON.stringify(payload)); } catch (error) { logger.error("Redis enqueue error failed", { error: error?.message }); throw error; } }; const startErrorRetryWorker = async ({ client, queueKey, handler, redisIntegration, maxAttempts = 5 }) => { while (true) { const result = await client.blPop(queueKey, 0); const raw = result?.element; if (!raw) { continue; } let item; try { item = JSON.parse(raw); } catch (error) { logger.error("Invalid error payload", { error: error?.message }); await redisIntegration.error("Invalid error payload", { module: "redis", stack: error?.message }); continue; } const attempts = item.attempts || 0; try { await handler(item); } catch (error) { logger.error("Retry handler failed", { error: error?.message, stack: error?.stack }); const nextPayload = { ...item, attempts: attempts + 1, lastError: error?.message, lastAttemptAt: Date.now() }; if (nextPayload.attempts >= maxAttempts) { await redisIntegration.error("Retry attempts exceeded", { module: "retry", stack: JSON.stringify(nextPayload) }); } else { await enqueueError(client, queueKey, nextPayload); } } } }; class MetricCollector { constructor() { this.reset(); } reset() { this.metrics = { kafka_pulled: 0, parse_error: 0, db_inserted: 0, db_failed: 0, db_insert_count: 0, db_insert_ms_sum: 0, batch_flush_count: 0, batch_flush_ms_sum: 0 }; this.keyed = {}; } increment(metric, count = 1) { if (this.metrics.hasOwnProperty(metric)) { this.metrics[metric] += count; } } incrementKeyed(metric, key, count = 1) { if (!key) return; if (!this.keyed[metric]) { this.keyed[metric] = {}; } if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) { this.keyed[metric][key] = 0; } this.keyed[metric][key] += count; } getAndReset() { const current = { ...this.metrics }; const keyed = JSON.parse(JSON.stringify(this.keyed)); this.reset(); return { ...current, keyed }; } } const bootstrap = async () => { logger.info("Starting application with config", { env: process.env.NODE_ENV, db: { host: config.db.host, port: config.db.port, user: config.db.user, database: config.db.database, schema: config.db.schema }, kafka: { brokers: config.kafka.brokers, topic: config.kafka.topic, groupId: config.kafka.groupId }, redis: { host: config.redis.host, port: config.redis.port } }); await dbInitializer.initialize(); const metricCollector = new MetricCollector(); cron.schedule("0 0 * * *", async () => { logger.info("Running scheduled partition maintenance..."); try { await partitionManager.ensurePartitions(30); } catch (err) { logger.error("Scheduled partition maintenance failed", err); } }); const redisClient = await createRedisClient(config.redis); const redisIntegration = new RedisIntegration( redisClient, config.redis.projectName, config.redis.apiBaseUrl ); redisIntegration.startHeartbeat(); cron.schedule("* * * * *", async () => { const metrics = metricCollector.getAndReset(); const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0"; const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0"; const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`; console.log(report); logger.info(report, metrics); try { await redisIntegration.info("Minute Metrics", metrics); } catch (err) { logger.error("Failed to report metrics to Redis", { error: err?.message }); } }); const errorQueueKey = buildErrorQueueKey(config.redis.projectName); const handleError = async (error, message) => { logger.error("Kafka processing error", { error: error?.message, type: error?.type, stack: error?.stack }); try { await redisIntegration.error("Kafka processing error", { module: "kafka", stack: error?.stack || error?.message }); } catch (redisError) { logger.error("Redis error log failed", { error: redisError?.message }); } if (message) { const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value; try { await enqueueError(redisClient, errorQueueKey, { attempts: 0, value: messageValue, meta: { topic: message.topic, partition: message.partition, offset: message.offset, key: message.key }, timestamp: Date.now() }); } catch (enqueueError2) { logger.error("Enqueue error payload failed", { error: enqueueError2?.message }); } } }; const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1e3; const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20; const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5e3; const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); const commitOnAttempt = config.kafka.commitOnAttempt === true; const batchStates = /* @__PURE__ */ new Map(); const partitionKeyFromMessage = (message) => { if (message?.topic !== void 0 && message?.partition !== void 0) { return `${message.topic}-${message.partition}`; } return "retry"; }; const dayKeyFromTsMs = (tsMs) => { const numeric = typeof tsMs === "string" ? Number(tsMs) : tsMs; if (!Number.isFinite(numeric)) return null; const d = new Date(numeric); if (Number.isNaN(d.getTime())) return null; const yyyy = d.getFullYear(); const mm = String(d.getMonth() + 1).padStart(2, "0"); const dd = String(d.getDate()).padStart(2, "0"); return `${yyyy}${mm}${dd}`; }; const getBatchState = (key) => { if (!batchStates.has(key)) { batchStates.set(key, { items: [], timer: null, flushing: null }); } return batchStates.get(key); }; const isDbConnectionError = (err) => { const code = err?.code; if (typeof code === "string") { const networkCodes = /* @__PURE__ */ new Set([ "ECONNREFUSED", "ECONNRESET", "EPIPE", "ETIMEDOUT", "ENOTFOUND", "EHOSTUNREACH", "ENETUNREACH", "57P03", "08006", "08001", "08000", "08003" ]); if (networkCodes.has(code)) return true; } const message = typeof err?.message === "string" ? err.message : ""; if (!message) return false; const lower = message.toLowerCase(); return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry"); }; const isMissingPartitionError = (err) => err?.code === "23514" || typeof err?.message === "string" && err.message.includes("no partition of relation"); const insertRowsWithRetry = async (rows) => { const startedAt = Date.now(); let attemptedPartitionFix = false; while (true) { try { await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); metricCollector.increment("db_insert_count", 1); metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); return; } catch (err) { if (isDbConnectionError(err)) { logger.error("Database offline during batch insert. Retrying in 5s...", { error: err.message }); await new Promise((r) => setTimeout(r, 5e3)); while (!await dbManager.checkConnection()) { logger.warn("Database still offline. Waiting 5s..."); await new Promise((r) => setTimeout(r, 5e3)); } continue; } if (isMissingPartitionError(err) && !attemptedPartitionFix) { attemptedPartitionFix = true; try { await partitionManager.ensurePartitionsForTimestamps(rows.map((r) => r.ts_ms)); } catch (partitionErr) { if (isDbConnectionError(partitionErr)) { logger.error("Database offline during partition ensure. Retrying in 5s...", { error: partitionErr.message }); await new Promise((r) => setTimeout(r, 5e3)); while (!await dbManager.checkConnection()) { logger.warn("Database still offline. Waiting 5s..."); await new Promise((r) => setTimeout(r, 5e3)); } continue; } throw partitionErr; } continue; } throw err; } } }; const insertRowsOnce = async (rows) => { const startedAt = Date.now(); await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); metricCollector.increment("db_insert_count", 1); metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); }; const resolveInsertedItems = (partitionKey, items) => { let insertedRows = 0; for (const p of items) { insertedRows += p.rows.length; const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); if (dayKey) { metricCollector.incrementKeyed("db_inserted_by_day", dayKey, p.rows.length); } p.item.resolve(); } metricCollector.increment("db_inserted", insertedRows); metricCollector.incrementKeyed("db_inserted_by_partition", partitionKey, insertedRows); }; const handleFailedItem = async (partitionKey, p, err) => { metricCollector.increment("db_failed"); metricCollector.incrementKeyed("db_failed_by_partition", partitionKey, 1); const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); if (dayKey) { metricCollector.incrementKeyed("db_failed_by_day", dayKey, 1); } await handleError(err, p.item.message); p.item.resolve(); }; const insertItemsDegraded = async (partitionKey, items) => { if (items.length === 0) return; const rows = items.flatMap((p) => p.rows); if (commitOnAttempt) { try { await insertRowsOnce(rows); resolveInsertedItems(partitionKey, items); } catch (err) { for (const item of items) { await handleFailedItem(partitionKey, item, err); } } return; } try { await insertRowsWithRetry(rows); resolveInsertedItems(partitionKey, items); return; } catch (err) { if (items.length === 1) { try { await insertRowsWithRetry(items[0].rows); resolveInsertedItems(partitionKey, items); } catch (innerErr) { await handleFailedItem(partitionKey, items[0], innerErr); } return; } const mid = Math.floor(items.length / 2); await insertItemsDegraded(partitionKey, items.slice(0, mid)); await insertItemsDegraded(partitionKey, items.slice(mid)); } }; const flushBatchForKey = async (partitionKey) => { const state = getBatchState(partitionKey); if (state.flushing) return state.flushing; state.flushing = (async () => { if (state.timer) { clearTimeout(state.timer); state.timer = null; } if (state.items.length === 0) return; const startedAt = Date.now(); const currentBatch = state.items; state.items = []; const pendingDbItems = []; const unresolvedItems = []; try { for (const item of currentBatch) { try { const rows = parseMessageToRows(item.message); pendingDbItems.push({ item, rows }); unresolvedItems.push(item); } catch (err) { metricCollector.increment("parse_error"); metricCollector.incrementKeyed("parse_error_by_partition", partitionKey, 1); logger.error("Message processing failed (Parse/Validation)", { error: err.message }); await handleError(err, item.message); item.resolve(); } } if (pendingDbItems.length > 0) { const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms; const dayKey = dayKeyFromTsMs(firstTs); if (dayKey) { const dayStartMs = Date.now(); await insertItemsDegraded(partitionKey, pendingDbItems); metricCollector.incrementKeyed("db_insert_ms_sum_by_day", dayKey, Date.now() - dayStartMs); } else { await insertItemsDegraded(partitionKey, pendingDbItems); } } metricCollector.increment("batch_flush_count", 1); metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt); } catch (err) { if (!commitOnAttempt && isDbConnectionError(err)) { state.items = unresolvedItems.concat(state.items); if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, 5e3); } return; } logger.error("Batch flush failed (non-network). Marking as consumed", { error: err?.message, partitionKey, batchSize: currentBatch.length }); for (const item of unresolvedItems) { try { await handleError(err, item.message); } catch { } item.resolve(); } } })().finally(() => { state.flushing = null; if (state.items.length > 0) { if (state.items.length >= BATCH_SIZE) { flushBatchForKey(partitionKey); } else if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, BATCH_TIMEOUT_MS); } } }); return state.flushing; }; const handleMessage = (message) => { if (message.topic) { metricCollector.increment("kafka_pulled"); metricCollector.incrementKeyed("kafka_pulled_by_partition", `${message.topic}-${message.partition}`, 1); } const partitionKey = partitionKeyFromMessage(message); const state = getBatchState(partitionKey); return new Promise((resolve, reject) => { state.items.push({ message, resolve, reject }); if (state.items.length >= BATCH_SIZE) { flushBatchForKey(partitionKey); } else if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; flushBatchForKey(partitionKey); }, BATCH_TIMEOUT_MS); } }); }; const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, onError: handleError }); startErrorRetryWorker({ client: redisClient, queueKey: errorQueueKey, redisIntegration, handler: async (item) => { if (!item?.value) { throw new Error("Missing value in retry payload"); } await handleMessage({ value: item.value }); } }).catch((err) => { logger.error("Retry worker failed", { error: err?.message }); }); const shutdown = async (signal) => { logger.info(`Received ${signal}, shutting down...`); try { if (consumers && consumers.length > 0) { await Promise.all(consumers.map((c) => new Promise((resolve) => c.close(true, resolve)))); logger.info("Kafka consumer closed", { count: consumers.length }); } await redisClient.quit(); logger.info("Redis client closed"); await dbManager.close(); logger.info("Database connection closed"); process.exit(0); } catch (err) { logger.error("Error during shutdown", { error: err?.message }); process.exit(1); } }; process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGINT", () => shutdown("SIGINT")); }; bootstrap().catch((error) => { logger.error("Service bootstrap failed", { error: error?.message }); process.exit(1); });