import cron from "node-cron"; import dotenv from "dotenv"; import pg from "pg"; import kafka from "kafka-node"; import { z } from "zod"; dotenv.config(); const parseNumber = (value, defaultValue) => { const parsed = Number(value); return Number.isFinite(parsed) ? parsed : defaultValue; }; const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean); const config = { env: process.env.NODE_ENV || "development", port: parseNumber(process.env.PORT, 3001), kafka: { brokers: parseList(process.env.KAFKA_BROKERS), topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-register-topic", groupId: process.env.KAFKA_GROUP_ID || "bls-register-consumer", clientId: process.env.KAFKA_CLIENT_ID || "bls-register-consumer-client", consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4), fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024), fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024), fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), fromOffset: process.env.KAFKA_FROM_OFFSET || "latest", autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3), commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200), commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true", batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3), batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50), flushIntervalMs: parseNumber(process.env.KAFKA_FLUSH_INTERVAL_MS, 3e3), logMessages: process.env.KAFKA_LOG_MESSAGES === "true", sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { mechanism: process.env.KAFKA_SASL_MECHANISM || "plain", username: process.env.KAFKA_SASL_USERNAME, password: process.env.KAFKA_SASL_PASSWORD } : void 0 }, db: { host: process.env.POSTGRES_HOST_G5, port: parseNumber(process.env.POSTGRES_PORT_G5, 5434), user: process.env.POSTGRES_USER_G5, password: process.env.POSTGRES_PASSWORD_G5, database: process.env.POSTGRES_DATABASE_G5, max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 6), ssl: process.env.POSTGRES_SSL_G5 === "true" ? { rejectUnauthorized: false } : void 0, schema: process.env.DB_SCHEMA || "rcu_info", table: process.env.DB_TABLE || "rcu_info_events_g5", roomStatusSchema: process.env.DB_ROOM_STATUS_SCHEMA || "room_status", roomStatusTable: process.env.DB_ROOM_STATUS_TABLE || "room_status_moment_g5" }, redis: { host: process.env.REDIS_HOST || "localhost", port: parseNumber(process.env.REDIS_PORT, 6379), password: process.env.REDIS_PASSWORD || void 0, db: parseNumber(process.env.REDIS_DB, 0), projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline", apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}` } }; const format = (level, message, context) => { const payload = { level, message, timestamp: Date.now(), ...context ? { context } : {} }; return JSON.stringify(payload); }; const logger = { info(message, context) { process.stdout.write(`${format("info", message, context)} `); }, error(message, context) { process.stderr.write(`${format("error", message, context)} `); }, warn(message, context) { process.stderr.write(`${format("warn", message, context)} `); } }; const { Pool } = pg; const registerColumns = [ "ts_ms", "hotel_id", "room_id", "device_id", "write_ts_ms", "is_send", "udp_raw", "extra", "ip_type", "model_num", "server_ip", "ip", "subnet_mask", "gateway", "dns", "app_version", "rcu_time", "launcher_version", "mac", "room_type_id", "config_version", "room_status", "season", "sys_lock_status", "authorization_time", "authorization_days", "room_num_remark", "room_type_remark", "room_remark", "mcu_name", "central_control_name", "configure_hotel_name", "configure_room_type_name" ]; const roomStatusColumns = [ "hotel_id", "room_id", "app_version", "launcher_version", "config_version", "upgrade_ts_ms", "register_ts_ms" ]; class DatabaseManager { constructor(dbConfig) { this.pool = new Pool({ host: dbConfig.host, port: dbConfig.port, user: dbConfig.user, password: dbConfig.password, database: dbConfig.database, max: dbConfig.max, ssl: dbConfig.ssl }); } async insertRegisterRows({ schema, table, rows }) { if (!rows || rows.length === 0) { return; } const statement = ` INSERT INTO ${schema}.${table} (${registerColumns.join(", ")}) SELECT * FROM UNNEST( $1::int8[], $2::int2[], $3::text[], $4::text[], $5::int8[], $6::int2[], $7::text[], $8::jsonb[], $9::int2[], $10::text[], $11::text[], $12::text[], $13::text[], $14::text[], $15::text[], $16::text[], $17::text[], $18::text[], $19::text[], $20::int8[], $21::text[], $22::int4[], $23::int4[], $24::int4[], $25::text[], $26::text[], $27::text[], $28::text[], $29::text[], $30::text[], $31::text[], $32::text[], $33::text[] ) ON CONFLICT DO NOTHING `; try { const params = registerColumns.map((column) => rows.map((row) => row[column] ?? null)); await this.pool.query(statement, params); } catch (error) { logger.error("Register table insert failed", { error: error?.message, schema, table, rowsLength: rows.length }); throw error; } } async updateRoomStatusRows({ schema, table, rows }) { if (!rows || rows.length === 0) { return; } const statement = ` WITH incoming AS ( SELECT * FROM UNNEST( $1::int2[], $2::text[], $3::text[], $4::text[], $5::text[], $6::int8[], $7::int8[] ) AS u(${roomStatusColumns.join(", ")}) ), dedup AS ( SELECT DISTINCT ON (hotel_id, room_id) hotel_id, room_id, app_version, launcher_version, config_version, upgrade_ts_ms, register_ts_ms FROM incoming ORDER BY hotel_id, room_id, register_ts_ms DESC ), existing AS ( SELECT i.*, t.device_id FROM dedup i INNER JOIN ${schema}.${table} t ON t.hotel_id = i.hotel_id AND t.room_id = i.room_id ) INSERT INTO ${schema}.${table} ( hotel_id, room_id, device_id, app_version, launcher_version, config_version, upgrade_ts_ms, register_ts_ms ) SELECT hotel_id, room_id, device_id, app_version, launcher_version, config_version, upgrade_ts_ms, register_ts_ms FROM existing ON CONFLICT (hotel_id, room_id) DO UPDATE SET app_version = EXCLUDED.app_version, launcher_version = EXCLUDED.launcher_version, config_version = EXCLUDED.config_version, upgrade_ts_ms = EXCLUDED.upgrade_ts_ms, register_ts_ms = EXCLUDED.register_ts_ms `; try { const params = roomStatusColumns.map((column) => rows.map((row) => row[column] ?? null)); await this.pool.query(statement, params); } catch (error) { logger.error("Room status table update failed", { error: error?.message, schema, table, rowsLength: rows.length }); throw error; } } async checkConnection() { let client; try { const connectPromise = this.pool.connect(); const timeoutPromise = new Promise((_, reject) => { setTimeout(() => reject(new Error("Connection timeout")), 5e3); }); try { client = await Promise.race([connectPromise, timeoutPromise]); } catch (raceError) { connectPromise.then((c) => c.release()).catch(() => { }); throw raceError; } await client.query("SELECT 1"); return true; } catch (err) { logger.error("Database check connection failed", { error: err.message }); return false; } finally { if (client) { client.release(); } } } async close() { await this.pool.end(); } } const dbManager = new DatabaseManager(config.db); class OffsetTracker { constructor() { this.partitions = /* @__PURE__ */ new Map(); } // Called when a message is received (before processing) add(topic, partition, offset) { const key = `${topic}-${partition}`; if (!this.partitions.has(key)) { this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() }); } const state = this.partitions.get(key); const numericOffset = Number(offset); if (!Number.isFinite(numericOffset)) return; if (state.nextCommitOffset === null) { state.nextCommitOffset = numericOffset; } else if (numericOffset < state.nextCommitOffset) { state.nextCommitOffset = numericOffset; } } // Called when a message is successfully processed // Returns the next offset to commit (if any advancement is possible), or null markDone(topic, partition, offset) { const key = `${topic}-${partition}`; const state = this.partitions.get(key); if (!state) return null; const numericOffset = Number(offset); if (!Number.isFinite(numericOffset)) return null; state.done.add(numericOffset); if (state.nextCommitOffset === null) { state.nextCommitOffset = numericOffset; } let advanced = false; while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) { state.done.delete(state.nextCommitOffset); state.nextCommitOffset += 1; advanced = true; } if (!advanced) return null; return state.nextCommitOffset; } clear() { this.partitions.clear(); } } const { ConsumerGroup } = kafka; const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => { const kafkaHost = kafkaConfig.brokers.join(","); const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; const id = `${clientId}-${process.pid}-${Date.now()}`; const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3; const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200; let inFlight = 0; const tracker = new OffsetTracker(); let pendingCommits = /* @__PURE__ */ new Map(); let commitTimer = null; const flushCommits = () => { if (pendingCommits.size === 0) return; const batch = pendingCommits; pendingCommits = /* @__PURE__ */ new Map(); consumer.sendOffsetCommitRequest( Array.from(batch.values()), (err) => { if (err) { for (const [k, v] of batch.entries()) { pendingCommits.set(k, v); } logger.error("Kafka commit failed", { error: err?.message, count: batch.size }); } } ); }; const scheduleCommitFlush = () => { if (commitTimer) return; commitTimer = setTimeout(() => { commitTimer = null; flushCommits(); }, commitIntervalMs); }; const consumer = new ConsumerGroup( { kafkaHost, groupId: kafkaConfig.groupId, clientId, id, fromOffset: kafkaConfig.fromOffset || "latest", protocol: ["roundrobin"], outOfRangeOffset: "latest", autoCommit: false, autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs, fetchMaxBytes: kafkaConfig.fetchMaxBytes, fetchMinBytes: kafkaConfig.fetchMinBytes, fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs, sasl: kafkaConfig.sasl }, kafkaConfig.topic ); const tryResume = () => { if (inFlight < maxInFlight && consumer.paused) { consumer.resume(); } }; consumer.on("message", (message) => { inFlight += 1; tracker.add(message.topic, message.partition, message.offset); if (inFlight >= maxInFlight) { consumer.pause(); } Promise.resolve(onMessage(message)).then(() => { }).catch((error) => { logger.error("Kafka message handling failed", { error: error?.message }); if (onError) { onError(error, message); } }).finally(() => { const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); if (commitOffset !== null) { const key = `${message.topic}-${message.partition}`; pendingCommits.set(key, { topic: message.topic, partition: message.partition, offset: commitOffset, metadata: "m" }); scheduleCommitFlush(); } inFlight -= 1; tryResume(); }); }); consumer.on("error", (error) => { logger.error("Kafka consumer error", { error: error?.message }); if (onError) { onError(error); } }); consumer.on("connect", () => { logger.info(`Kafka Consumer connected`, { groupId: kafkaConfig.groupId, clientId }); }); consumer.on("rebalancing", () => { logger.info(`Kafka Consumer rebalancing`, { groupId: kafkaConfig.groupId, clientId }); tracker.clear(); pendingCommits.clear(); if (commitTimer) { clearTimeout(commitTimer); commitTimer = null; } }); consumer.on("rebalanced", () => { logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId }); }); consumer.on("error", (err) => { logger.error("Kafka Consumer Error", { error: err.message }); }); consumer.on("offsetOutOfRange", (err) => { logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition }); }); consumer.on("offsetOutOfRange", (error) => { logger.warn(`Kafka Consumer offset out of range`, { error: error?.message, groupId: kafkaConfig.groupId, clientId }); }); consumer.on("close", () => { if (commitTimer) { clearTimeout(commitTimer); commitTimer = null; } flushCommits(); logger.warn(`Kafka Consumer closed`, { groupId: kafkaConfig.groupId, clientId }); }); return consumer; }; const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => { const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1; const count = Math.max(1, instances); return Array.from( { length: count }, (_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx }) ); }; const toNumber = (value) => { if (value === void 0 || value === null || value === "") { return null; } if (typeof value === "number") { return value; } const parsed = Number(value); return Number.isFinite(parsed) ? parsed : null; }; const toStringAllowEmpty = (value) => { if (value === void 0 || value === null) { return value; } return String(value); }; const kafkaPayloadSchema = z.object({ ts_ms: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), upgrade_ts_ms: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), hotel_id: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), room_id: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), device_id: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), is_send: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), udp_raw: z.any().optional().nullable(), extra: z.any().optional().nullable(), ip_type: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), model_num: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), server_ip: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), ip: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), subnet_mask: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), gateway: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), dns: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), app_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), rcu_time: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), launcher_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), mac: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), room_type_id: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), config_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), room_status: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), season: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), sys_lock_status: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), authorization_time: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), authorization_days: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), room_num_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), room_type_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), room_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), mcu_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), central_control_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), configure_hotel_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), configure_room_type_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable() }); const normalizeText = (value, maxLength) => { if (value === void 0 || value === null) { return null; } const str = String(value).replace(/\u0000/g, ""); if (maxLength && str.length > maxLength) { return str.substring(0, maxLength); } return str; }; const sanitizeJsonValue = (value) => { if (value === void 0 || value === null) { return value; } if (typeof value === "string") { return value.replace(/\u0000/g, ""); } if (Array.isArray(value)) { return value.map((item) => sanitizeJsonValue(item)); } if (typeof value === "object") { const out = {}; for (const [k, v] of Object.entries(value)) { out[k] = sanitizeJsonValue(v); } return out; } return value; }; const isLikelyBase64 = (text) => { if (!text || text.length % 4 !== 0) { return false; } return /^[A-Za-z0-9+/]+={0,2}$/.test(text); }; const normalizeInteger = (value) => { if (value === void 0 || value === null || value === "") { return null; } const numeric = typeof value === "number" ? value : Number(value); if (!Number.isFinite(numeric)) { return null; } return Math.trunc(numeric); }; const inRangeOr = (value, min, max, fallback) => { if (typeof value !== "number" || Number.isNaN(value) || value < min || value > max) { return fallback; } return value; }; const normalizeTsMs = (value) => { const numeric = normalizeInteger(value); if (numeric === null) { return Date.now(); } if (numeric > 0 && numeric < 1e11) { return numeric * 1e3; } return numeric; }; const normalizeUdpRaw = (value) => { if (value === void 0 || value === null) { return null; } if (typeof value === "string") { const text = value.replace(/\u0000/g, ""); if (isLikelyBase64(text)) { return text; } return Buffer.from(text, "utf8").toString("base64"); } if (Buffer.isBuffer(value)) { return value.toString("base64"); } if (Array.isArray(value)) { return Buffer.from(value).toString("base64"); } return Buffer.from(String(value), "utf8").toString("base64"); }; const normalizeExtra = (value) => { if (value === void 0 || value === null || value === "") { return null; } if (typeof value === "object") { return sanitizeJsonValue(value); } if (typeof value === "string") { try { const parsed = JSON.parse(value); if (parsed && typeof parsed === "object") { return sanitizeJsonValue(parsed); } return sanitizeJsonValue({ value: parsed }); } catch { return sanitizeJsonValue({ raw: value }); } } return sanitizeJsonValue({ raw: String(value) }); }; const pick = (payload, snakeKey, pascalKey) => { if (payload[snakeKey] !== void 0) { return payload[snakeKey]; } if (payload[pascalKey] !== void 0) { return payload[pascalKey]; } return void 0; }; const buildRowsFromPayload = (rawPayload) => { const normalizedInput = { ts_ms: pick(rawPayload, "ts_ms", "ts_ms"), upgrade_ts_ms: pick(rawPayload, "upgrade_ts_ms", "upgrade_ts_ms"), hotel_id: pick(rawPayload, "hotel_id", "hotel_id"), room_id: pick(rawPayload, "room_id", "room_id"), device_id: pick(rawPayload, "device_id", "device_id"), is_send: pick(rawPayload, "is_send", "is_send"), udp_raw: pick(rawPayload, "udp_raw", "udp_raw"), extra: pick(rawPayload, "extra", "extra"), ip_type: pick(rawPayload, "ip_type", "ip_type"), model_num: pick(rawPayload, "model_num", "model_num"), server_ip: pick(rawPayload, "server_ip", "server_ip"), ip: pick(rawPayload, "ip", "ip"), subnet_mask: pick(rawPayload, "subnet_mask", "subnet_mask"), gateway: pick(rawPayload, "gateway", "gateway"), dns: pick(rawPayload, "dns", "dns"), app_version: pick(rawPayload, "app_version", "app_version"), rcu_time: pick(rawPayload, "rcu_time", "rcu_time"), launcher_version: pick(rawPayload, "launcher_version", "launcher_version"), mac: pick(rawPayload, "mac", "mac"), room_type_id: pick(rawPayload, "room_type_id", "room_type_id"), config_version: pick(rawPayload, "config_version", "config_version"), room_status: pick(rawPayload, "room_status", "room_status"), season: pick(rawPayload, "season", "season"), sys_lock_status: pick(rawPayload, "sys_lock_status", "sys_lock_status"), authorization_time: pick(rawPayload, "authorization_time", "authorization_time"), authorization_days: pick(rawPayload, "authorization_days", "authorization_days"), room_num_remark: pick(rawPayload, "room_num_remark", "room_num_remark"), room_type_remark: pick(rawPayload, "room_type_remark", "room_type_remark"), room_remark: pick(rawPayload, "room_remark", "room_remark"), mcu_name: pick(rawPayload, "mcu_name", "mcu_name"), central_control_name: pick(rawPayload, "central_control_name", "central_control_name"), configure_hotel_name: pick(rawPayload, "configure_hotel_name", "configure_hotel_name"), configure_room_type_name: pick(rawPayload, "configure_room_type_name", "configure_room_type_name") }; const payload = kafkaPayloadSchema.parse(normalizedInput); const tsMs = normalizeTsMs(payload.ts_ms); const hotelId = inRangeOr(normalizeInteger(payload.hotel_id), -32768, 32767, 0); const roomId = normalizeText(payload.room_id, 50) || ""; const registerRow = { ts_ms: tsMs, hotel_id: hotelId, room_id: roomId, device_id: normalizeText(payload.device_id, 64), write_ts_ms: Date.now(), is_send: inRangeOr(normalizeInteger(payload.is_send), -32768, 32767, 0), udp_raw: normalizeUdpRaw(payload.udp_raw), extra: normalizeExtra(payload.extra), ip_type: inRangeOr(normalizeInteger(payload.ip_type), -32768, 32767, null), model_num: normalizeText(payload.model_num, 32), server_ip: normalizeText(payload.server_ip, 21), ip: normalizeText(payload.ip, 21), subnet_mask: normalizeText(payload.subnet_mask, 15), gateway: normalizeText(payload.gateway, 15), dns: normalizeText(payload.dns, 15), app_version: normalizeText(payload.app_version, 64), rcu_time: normalizeText(payload.rcu_time, 25), launcher_version: normalizeText(payload.launcher_version, 64), mac: normalizeText(payload.mac, 17), room_type_id: normalizeInteger(payload.room_type_id), config_version: normalizeText(payload.config_version, 32), room_status: inRangeOr(normalizeInteger(payload.room_status), -2147483648, 2147483647, null), season: inRangeOr(normalizeInteger(payload.season), -2147483648, 2147483647, null), sys_lock_status: inRangeOr(normalizeInteger(payload.sys_lock_status), -2147483648, 2147483647, null), authorization_time: normalizeText(payload.authorization_time, 10), authorization_days: normalizeText(payload.authorization_days, 10), room_num_remark: normalizeText(payload.room_num_remark, 255), room_type_remark: normalizeText(payload.room_type_remark, 64), room_remark: normalizeText(payload.room_remark, 64), mcu_name: normalizeText(payload.mcu_name, 255), central_control_name: normalizeText(payload.central_control_name, 255), configure_hotel_name: normalizeText(payload.configure_hotel_name, 255), configure_room_type_name: normalizeText(payload.configure_room_type_name, 255) }; const roomStatusUpdateRow = { hotel_id: hotelId, room_id: roomId, app_version: registerRow.app_version, launcher_version: registerRow.launcher_version, config_version: registerRow.config_version, upgrade_ts_ms: normalizeTsMs(payload.upgrade_ts_ms), register_ts_ms: tsMs }; return { registerRows: [registerRow], roomStatusRows: [roomStatusUpdateRow] }; }; const parseMessageToRows = (message) => { const rawValue = message.value.toString(); let payload; try { payload = JSON.parse(rawValue); } catch (e) { const error = new Error(`JSON Parse Error: ${e.message}`); error.type = "PARSE_ERROR"; throw error; } const validationResult = kafkaPayloadSchema.safeParse(payload); if (!validationResult.success) { const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); error.type = "VALIDATION_ERROR"; throw error; } return buildRowsFromPayload(payload); }; class MetricCollector { constructor() { this.reset(); } reset() { this.metrics = { kafka_pulled: 0, parse_error: 0, db_inserted: 0, db_failed: 0, db_insert_count: 0, db_insert_ms_sum: 0, batch_flush_count: 0, batch_flush_ms_sum: 0 }; this.keyed = {}; } increment(metric, count = 1) { if (this.metrics.hasOwnProperty(metric)) { this.metrics[metric] += count; } } incrementKeyed(metric, key, count = 1) { if (!key) return; if (!this.keyed[metric]) { this.keyed[metric] = {}; } if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) { this.keyed[metric][key] = 0; } this.keyed[metric][key] += count; } getAndReset() { const current = { ...this.metrics }; const keyed = JSON.parse(JSON.stringify(this.keyed)); this.reset(); return { ...current, keyed }; } } const NETWORK_CODES = /* @__PURE__ */ new Set([ "ECONNREFUSED", "ECONNRESET", "EPIPE", "ETIMEDOUT", "ENOTFOUND", "EHOSTUNREACH", "ENETUNREACH", "57P03", "08006", "08001", "08000", "08003" ]); const isDbConnectionError = (err) => { if (typeof err?.code === "string" && NETWORK_CODES.has(err.code)) { return true; } const message = typeof err?.message === "string" ? err.message.toLowerCase() : ""; return message.includes("connection timeout") || message.includes("connection terminated") || message.includes("connection refused") || message.includes("terminating connection") || message.includes("econnrefused") || message.includes("econnreset") || message.includes("etimedout") || message.includes("could not connect") || message.includes("the database system is starting up") || message.includes("no pg_hba.conf entry"); }; const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); const bootstrap = async () => { logger.info("Starting register consumer", { env: config.env, kafka: { brokers: config.kafka.brokers, topic: config.kafka.topic, groupId: config.kafka.groupId }, db: { host: config.db.host, port: config.db.port, database: config.db.database, schema: config.db.schema, table: config.db.table, roomStatusSchema: config.db.roomStatusSchema, roomStatusTable: config.db.roomStatusTable }, flushIntervalMs: config.kafka.flushIntervalMs }); const metricCollector = new MetricCollector(); const totals = { kafkaPulled: 0, dbInserted: 0, parseError: 0, dbFailed: 0 }; const flushIntervalMs = Math.max(3e3, Number.isFinite(config.kafka.flushIntervalMs) ? config.kafka.flushIntervalMs : 3e3); const queue = []; let flushTimer = null; let flushing = false; const runCounterTimer = setInterval(() => { logger.info("Run counters", { kafkaPulled: totals.kafkaPulled, dbInserted: totals.dbInserted, parseError: totals.parseError, dbFailed: totals.dbFailed }); }, 1e4); const handleError = (error, message) => { logger.error("Kafka processing error", { error: error?.message, type: error?.type, topic: message?.topic, partition: message?.partition, offset: message?.offset }); }; cron.schedule("* * * * *", () => { const metrics = metricCollector.getAndReset(); const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0"; const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0"; logger.info("Minute metrics", { kafkaPulled: metrics.kafka_pulled, parseError: metrics.parse_error, dbInserted: metrics.db_inserted, dbFailed: metrics.db_failed, flushAvgMs, dbAvgMs }); }); const processValidRowsWithRetry = async (registerRows, roomStatusRows) => { const startedAt = Date.now(); while (true) { try { await dbManager.insertRegisterRows({ schema: config.db.schema, table: config.db.table, rows: registerRows }); await dbManager.updateRoomStatusRows({ schema: config.db.roomStatusSchema, table: config.db.roomStatusTable, rows: roomStatusRows }); metricCollector.increment("db_insert_count", 1); metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); metricCollector.increment("db_inserted", registerRows.length); totals.dbInserted += registerRows.length; return; } catch (err) { if (!isDbConnectionError(err)) { throw err; } logger.warn("Database unavailable, retrying in 5s", { error: err?.message }); await sleep(5e3); } } }; const scheduleFlush = () => { if (flushTimer) { return; } flushTimer = setTimeout(() => { flushTimer = null; void flushQueue(); }, flushIntervalMs); }; const flushQueue = async () => { if (flushing) { return; } if (queue.length === 0) { return; } flushing = true; const startedAt = Date.now(); const currentBatch = queue.splice(0, queue.length); const parsedItems = []; for (const item of currentBatch) { try { const parsed = parseMessageToRows(item.message); parsedItems.push({ item, parsed }); } catch (err) { metricCollector.increment("parse_error"); totals.parseError += 1; handleError(err, item.message); item.resolve(); } } const insertParsedItems = async (items) => { if (items.length === 0) { return; } const registerRows = items.flatMap((it) => it.parsed.registerRows); const roomStatusRows = items.flatMap((it) => it.parsed.roomStatusRows); try { await processValidRowsWithRetry(registerRows, roomStatusRows); } catch (err) { if (items.length > 1) { const mid = Math.floor(items.length / 2); await insertParsedItems(items.slice(0, mid)); await insertParsedItems(items.slice(mid)); return; } metricCollector.increment("db_failed", 1); totals.dbFailed += 1; handleError(err, items[0].item.message); } }; if (parsedItems.length > 0) { await insertParsedItems(parsedItems); for (const parsedItem of parsedItems) { parsedItem.item.resolve(); } } metricCollector.increment("batch_flush_count", 1); metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt); flushing = false; if (queue.length > 0) { scheduleFlush(); } }; const handleMessage = (message) => { metricCollector.increment("kafka_pulled"); totals.kafkaPulled += 1; return new Promise((resolve) => { queue.push({ message, resolve }); scheduleFlush(); }); }; const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, onError: handleError }); const shutdown = async (signal) => { logger.info(`Received ${signal}, shutting down...`); try { if (flushTimer) { clearTimeout(flushTimer); flushTimer = null; } clearInterval(runCounterTimer); await flushQueue(); if (consumers && consumers.length > 0) { await Promise.all(consumers.map((consumer) => new Promise((resolve) => consumer.close(true, resolve)))); } await dbManager.close(); logger.info("Run summary", { kafkaPulled: totals.kafkaPulled, dbInserted: totals.dbInserted, parseError: totals.parseError, dbFailed: totals.dbFailed }); process.exit(0); } catch (err) { logger.error("Error during shutdown", { error: err?.message }); process.exit(1); } }; process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGINT", () => shutdown("SIGINT")); }; bootstrap().catch((error) => { logger.error("Service bootstrap failed", { error: error?.message }); process.exit(1); });