From 156930e6bc786d0ed8af4e2b6839a8f15ce9f8d0 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Wed, 4 Mar 2026 16:45:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9gitigonre?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + bls-onoffline-backend/dist/index.js | 950 ---------------------------- 2 files changed, 1 insertion(+), 950 deletions(-) delete mode 100644 bls-onoffline-backend/dist/index.js diff --git a/.gitignore b/.gitignore index 7780745..a874300 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /bls-onoffline-backend/node_modules /template +/bls-onoffline-backend/dist diff --git a/bls-onoffline-backend/dist/index.js b/bls-onoffline-backend/dist/index.js deleted file mode 100644 index 5296636..0000000 --- a/bls-onoffline-backend/dist/index.js +++ /dev/null @@ -1,950 +0,0 @@ -import cron from "node-cron"; -import dotenv from "dotenv"; -import pg from "pg"; -import kafka from "kafka-node"; -import { randomUUID } from "crypto"; -import { z } from "zod"; -import { createClient } from "redis"; -dotenv.config(); -const parseNumber = (value, defaultValue) => { - const parsed = Number(value); - return Number.isFinite(parsed) ? parsed : defaultValue; -}; -const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean); -const config = { - env: process.env.NODE_ENV || "development", - port: parseNumber(process.env.PORT, 3001), - kafka: { - brokers: parseList(process.env.KAFKA_BROKERS), - topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-onoffline-topic", - groupId: process.env.KAFKA_GROUP_ID || "bls-onoffline-group", - clientId: process.env.KAFKA_CLIENT_ID || "bls-onoffline-client", - consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), - maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4), - fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024), - fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024), - fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), - autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3), - commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200), - commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true", - batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3), - batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50), - logMessages: process.env.KAFKA_LOG_MESSAGES === "true", - sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { - mechanism: process.env.KAFKA_SASL_MECHANISM || "plain", - username: process.env.KAFKA_SASL_USERNAME, - password: process.env.KAFKA_SASL_PASSWORD - } : void 0 - }, - db: { - host: process.env.DB_HOST || process.env.POSTGRES_HOST || "localhost", - port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432), - user: process.env.DB_USER || process.env.POSTGRES_USER || "postgres", - password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || "", - database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || "log_platform", - max: parseNumber(process.env.DB_MAX_CONNECTIONS || process.env.POSTGRES_MAX_CONNECTIONS, 10), - ssl: process.env.DB_SSL === "true" ? { rejectUnauthorized: false } : void 0, - schema: process.env.DB_SCHEMA || "onoffline", - table: process.env.DB_TABLE || "onoffline_record" - }, - redis: { - host: process.env.REDIS_HOST || "localhost", - port: parseNumber(process.env.REDIS_PORT, 6379), - password: process.env.REDIS_PASSWORD || void 0, - db: parseNumber(process.env.REDIS_DB, 0), - projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline", - apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}` - } -}; -const format = (level, message, context) => { - const payload = { - level, - message, - timestamp: Date.now(), - ...context ? { context } : {} - }; - return JSON.stringify(payload); -}; -const logger = { - info(message, context) { - process.stdout.write(`${format("info", message, context)} -`); - }, - error(message, context) { - process.stderr.write(`${format("error", message, context)} -`); - }, - warn(message, context) { - process.stderr.write(`${format("warn", message, context)} -`); - } -}; -const { Pool } = pg; -const columns = [ - "guid", - "ts_ms", - "write_ts_ms", - "hotel_id", - "mac", - "device_id", - "room_id", - "ip", - "current_status", - "launcher_version", - "reboot_reason" -]; -class DatabaseManager { - constructor(dbConfig) { - this.pool = new Pool({ - host: dbConfig.host, - port: dbConfig.port, - user: dbConfig.user, - password: dbConfig.password, - database: dbConfig.database, - max: dbConfig.max, - ssl: dbConfig.ssl - }); - } - async insertRows({ schema, table, rows }) { - if (!rows || rows.length === 0) { - return; - } - const statement = ` - INSERT INTO ${schema}.${table} (${columns.join(", ")}) - SELECT * - FROM UNNEST( - $1::text[], - $2::int8[], - $3::int8[], - $4::int2[], - $5::text[], - $6::text[], - $7::text[], - $8::text[], - $9::text[], - $10::text[], - $11::text[] - ) - ON CONFLICT DO NOTHING - `; - try { - const params = columns.map((column) => rows.map((row) => row[column] ?? null)); - await this.pool.query(statement, params); - } catch (error) { - logger.error("Database insert failed", { - error: error?.message, - schema, - table, - rowsLength: rows.length - }); - throw error; - } - } - async checkConnection() { - let client; - try { - const connectPromise = this.pool.connect(); - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => reject(new Error("Connection timeout")), 5e3); - }); - try { - client = await Promise.race([connectPromise, timeoutPromise]); - } catch (raceError) { - connectPromise.then((c) => c.release()).catch(() => { - }); - throw raceError; - } - await client.query("SELECT 1"); - return true; - } catch (err) { - logger.error("Database check connection failed", { error: err.message }); - return false; - } finally { - if (client) { - client.release(); - } - } - } - async close() { - await this.pool.end(); - } -} -const dbManager = new DatabaseManager(config.db); -class OffsetTracker { - constructor() { - this.partitions = /* @__PURE__ */ new Map(); - } - // Called when a message is received (before processing) - add(topic, partition, offset) { - const key = `${topic}-${partition}`; - if (!this.partitions.has(key)) { - this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() }); - } - const state = this.partitions.get(key); - const numericOffset = Number(offset); - if (!Number.isFinite(numericOffset)) return; - if (state.nextCommitOffset === null) { - state.nextCommitOffset = numericOffset; - } else if (numericOffset < state.nextCommitOffset) { - state.nextCommitOffset = numericOffset; - } - } - // Called when a message is successfully processed - // Returns the next offset to commit (if any advancement is possible), or null - markDone(topic, partition, offset) { - const key = `${topic}-${partition}`; - const state = this.partitions.get(key); - if (!state) return null; - const numericOffset = Number(offset); - if (!Number.isFinite(numericOffset)) return null; - state.done.add(numericOffset); - if (state.nextCommitOffset === null) { - state.nextCommitOffset = numericOffset; - } - let advanced = false; - while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) { - state.done.delete(state.nextCommitOffset); - state.nextCommitOffset += 1; - advanced = true; - } - if (!advanced) return null; - return state.nextCommitOffset; - } - clear() { - this.partitions.clear(); - } -} -const { ConsumerGroup } = kafka; -const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => { - const kafkaHost = kafkaConfig.brokers.join(","); - const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; - const id = `${clientId}-${process.pid}-${Date.now()}`; - const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3; - const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200; - let inFlight = 0; - const tracker = new OffsetTracker(); - let pendingCommits = /* @__PURE__ */ new Map(); - let commitTimer = null; - const flushCommits = () => { - if (pendingCommits.size === 0) return; - const batch = pendingCommits; - pendingCommits = /* @__PURE__ */ new Map(); - consumer.sendOffsetCommitRequest( - Array.from(batch.values()), - (err) => { - if (err) { - for (const [k, v] of batch.entries()) { - pendingCommits.set(k, v); - } - logger.error("Kafka commit failed", { error: err?.message, count: batch.size }); - } - } - ); - }; - const scheduleCommitFlush = () => { - if (commitTimer) return; - commitTimer = setTimeout(() => { - commitTimer = null; - flushCommits(); - }, commitIntervalMs); - }; - const consumer = new ConsumerGroup( - { - kafkaHost, - groupId: kafkaConfig.groupId, - clientId, - id, - fromOffset: "earliest", - protocol: ["roundrobin"], - outOfRangeOffset: "latest", - autoCommit: false, - autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs, - fetchMaxBytes: kafkaConfig.fetchMaxBytes, - fetchMinBytes: kafkaConfig.fetchMinBytes, - fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs, - sasl: kafkaConfig.sasl - }, - kafkaConfig.topic - ); - const tryResume = () => { - if (inFlight < maxInFlight && consumer.paused) { - consumer.resume(); - } - }; - consumer.on("message", (message) => { - inFlight += 1; - tracker.add(message.topic, message.partition, message.offset); - if (inFlight >= maxInFlight) { - consumer.pause(); - } - Promise.resolve(onMessage(message)).then(() => { - }).catch((error) => { - logger.error("Kafka message handling failed", { error: error?.message }); - if (onError) { - onError(error, message); - } - }).finally(() => { - const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); - if (commitOffset !== null) { - const key = `${message.topic}-${message.partition}`; - pendingCommits.set(key, { - topic: message.topic, - partition: message.partition, - offset: commitOffset, - metadata: "m" - }); - scheduleCommitFlush(); - } - inFlight -= 1; - tryResume(); - }); - }); - consumer.on("error", (error) => { - logger.error("Kafka consumer error", { error: error?.message }); - if (onError) { - onError(error); - } - }); - consumer.on("connect", () => { - logger.info(`Kafka Consumer connected`, { - groupId: kafkaConfig.groupId, - clientId - }); - }); - consumer.on("rebalancing", () => { - logger.info(`Kafka Consumer rebalancing`, { - groupId: kafkaConfig.groupId, - clientId - }); - tracker.clear(); - pendingCommits.clear(); - if (commitTimer) { - clearTimeout(commitTimer); - commitTimer = null; - } - }); - consumer.on("rebalanced", () => { - logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId }); - }); - consumer.on("error", (err) => { - logger.error("Kafka Consumer Error", { error: err.message }); - }); - consumer.on("offsetOutOfRange", (err) => { - logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition }); - }); - consumer.on("offsetOutOfRange", (error) => { - logger.warn(`Kafka Consumer offset out of range`, { - error: error?.message, - groupId: kafkaConfig.groupId, - clientId - }); - }); - consumer.on("close", () => { - if (commitTimer) { - clearTimeout(commitTimer); - commitTimer = null; - } - flushCommits(); - logger.warn(`Kafka Consumer closed`, { - groupId: kafkaConfig.groupId, - clientId - }); - }); - return consumer; -}; -const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => { - const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1; - const count = Math.max(1, instances); - return Array.from( - { length: count }, - (_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx }) - ); -}; -const createGuid = () => randomUUID().replace(/-/g, ""); -const toNumber = (value) => { - if (value === void 0 || value === null || value === "") { - return value; - } - if (typeof value === "number") { - return value; - } - const parsed = Number(value); - return Number.isFinite(parsed) ? parsed : value; -}; -const toStringAllowEmpty = (value) => { - if (value === void 0 || value === null) { - return value; - } - return String(value); -}; -const kafkaPayloadSchema = z.object({ - HotelCode: z.preprocess(toNumber, z.number()), - MAC: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - HostNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - RoomNumber: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - EndPoint: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - CurrentStatus: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - CurrentTime: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - UnixTime: z.preprocess(toNumber, z.number().nullable()).optional().nullable(), - LauncherVersion: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(), - RebootReason: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable() -}); -const normalizeText = (value, maxLength) => { - if (value === void 0 || value === null) { - return null; - } - const str = String(value); - if (maxLength && str.length > maxLength) { - return str.substring(0, maxLength); - } - return str; -}; -const buildRowsFromPayload = (rawPayload) => { - const payload = kafkaPayloadSchema.parse(rawPayload); - const rebootReason = normalizeText(payload.RebootReason, 255); - const currentStatusRaw = normalizeText(payload.CurrentStatus, 255); - const hasRebootReason = rebootReason !== null && rebootReason !== ""; - const currentStatus = hasRebootReason ? "on" : currentStatusRaw; - let tsMs = payload.UnixTime; - if (typeof tsMs === "number" && tsMs < 1e11) { - tsMs = tsMs * 1e3; - } - if (!tsMs && payload.CurrentTime) { - const parsed = Date.parse(payload.CurrentTime); - if (!isNaN(parsed)) { - tsMs = parsed; - } - } - if (!tsMs) { - tsMs = Date.now(); - } - const mac = normalizeText(payload.MAC) || ""; - const deviceId = normalizeText(payload.HostNumber) || ""; - const roomId = normalizeText(payload.RoomNumber) || ""; - let hotelId = payload.HotelCode; - if (typeof hotelId !== "number" || Number.isNaN(hotelId) || hotelId < -32768 || hotelId > 32767) { - hotelId = 0; - } - const row = { - guid: createGuid(), - ts_ms: tsMs, - write_ts_ms: Date.now(), - hotel_id: hotelId, - mac, - device_id: deviceId, - room_id: roomId, - ip: normalizeText(payload.EndPoint), - current_status: currentStatus, - launcher_version: normalizeText(payload.LauncherVersion, 255), - reboot_reason: rebootReason - }; - return [row]; -}; -const parseMessageToRows = (message) => { - const rawValue = message.value.toString(); - let payload; - try { - payload = JSON.parse(rawValue); - } catch (e) { - const error = new Error(`JSON Parse Error: ${e.message}`); - error.type = "PARSE_ERROR"; - throw error; - } - const validationResult = kafkaPayloadSchema.safeParse(payload); - if (!validationResult.success) { - const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); - error.type = "VALIDATION_ERROR"; - throw error; - } - return buildRowsFromPayload(payload); -}; -const createRedisClient = async (config2) => { - const client = createClient({ - socket: { - host: config2.host, - port: config2.port - }, - password: config2.password, - database: config2.db - }); - await client.connect(); - return client; -}; -class RedisIntegration { - constructor(client, projectName, apiBaseUrl) { - this.client = client; - this.projectName = projectName; - this.apiBaseUrl = apiBaseUrl; - this.heartbeatKey = "项目心跳"; - this.logKey = `${projectName}_项目控制台`; - } - async info(message, context) { - const payload = { - timestamp: (/* @__PURE__ */ new Date()).toISOString(), - level: "info", - message, - metadata: context || void 0 - }; - await this.client.rPush(this.logKey, JSON.stringify(payload)); - } - async error(message, context) { - const payload = { - timestamp: (/* @__PURE__ */ new Date()).toISOString(), - level: "error", - message, - metadata: context || void 0 - }; - await this.client.rPush(this.logKey, JSON.stringify(payload)); - } - startHeartbeat() { - setInterval(() => { - const payload = { - projectName: this.projectName, - apiBaseUrl: this.apiBaseUrl, - lastActiveAt: Date.now() - }; - this.client.rPush(this.heartbeatKey, JSON.stringify(payload)); - }, 3e3); - } -} -const buildErrorQueueKey = (projectName) => `${projectName}_error_queue`; -const enqueueError = async (client, queueKey, payload) => { - try { - await client.rPush(queueKey, JSON.stringify(payload)); - } catch (error) { - logger.error("Redis enqueue error failed", { error: error?.message }); - throw error; - } -}; -const startErrorRetryWorker = async ({ - client, - queueKey, - handler, - redisIntegration, - maxAttempts = 5 -}) => { - while (true) { - const result = await client.blPop(queueKey, 0); - const raw = result?.element; - if (!raw) { - continue; - } - let item; - try { - item = JSON.parse(raw); - } catch (error) { - logger.error("Invalid error payload", { error: error?.message }); - await redisIntegration.error("Invalid error payload", { module: "redis", stack: error?.message }); - continue; - } - const attempts = item.attempts || 0; - try { - await handler(item); - } catch (error) { - logger.error("Retry handler failed", { error: error?.message, stack: error?.stack }); - const nextPayload = { - ...item, - attempts: attempts + 1, - lastError: error?.message, - lastAttemptAt: Date.now() - }; - if (nextPayload.attempts >= maxAttempts) { - await redisIntegration.error("Retry attempts exceeded", { module: "retry", stack: JSON.stringify(nextPayload) }); - } else { - await enqueueError(client, queueKey, nextPayload); - } - } - } -}; -class MetricCollector { - constructor() { - this.reset(); - } - reset() { - this.metrics = { - kafka_pulled: 0, - parse_error: 0, - db_inserted: 0, - db_failed: 0, - db_insert_count: 0, - db_insert_ms_sum: 0, - batch_flush_count: 0, - batch_flush_ms_sum: 0 - }; - this.keyed = {}; - } - increment(metric, count = 1) { - if (this.metrics.hasOwnProperty(metric)) { - this.metrics[metric] += count; - } - } - incrementKeyed(metric, key, count = 1) { - if (!key) return; - if (!this.keyed[metric]) { - this.keyed[metric] = {}; - } - if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) { - this.keyed[metric][key] = 0; - } - this.keyed[metric][key] += count; - } - getAndReset() { - const current = { ...this.metrics }; - const keyed = JSON.parse(JSON.stringify(this.keyed)); - this.reset(); - return { ...current, keyed }; - } -} -const bootstrap = async () => { - logger.info("Starting application with config", { - env: process.env.NODE_ENV, - db: { - host: config.db.host, - port: config.db.port, - user: config.db.user, - database: config.db.database, - schema: config.db.schema - }, - kafka: { - brokers: config.kafka.brokers, - topic: config.kafka.topic, - groupId: config.kafka.groupId - }, - redis: { - host: config.redis.host, - port: config.redis.port - } - }); - const metricCollector = new MetricCollector(); - const redisClient = await createRedisClient(config.redis); - const redisIntegration = new RedisIntegration( - redisClient, - config.redis.projectName, - config.redis.apiBaseUrl - ); - redisIntegration.startHeartbeat(); - cron.schedule("* * * * *", async () => { - const metrics = metricCollector.getAndReset(); - const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0"; - const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0"; - const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`; - console.log(report); - logger.info(report); - try { - await redisIntegration.info("Minute Metrics", metrics); - } catch (err) { - logger.error("Failed to report metrics to Redis", { error: err?.message }); - } - }); - const errorQueueKey = buildErrorQueueKey(config.redis.projectName); - const handleError = async (error, message) => { - logger.error("Kafka processing error", { - error: error?.message, - type: error?.type, - stack: error?.stack - }); - try { - await redisIntegration.error("Kafka processing error", { - module: "kafka", - stack: error?.stack || error?.message - }); - } catch (redisError) { - logger.error("Redis error log failed", { error: redisError?.message }); - } - if (message) { - const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value; - try { - await enqueueError(redisClient, errorQueueKey, { - attempts: 0, - value: messageValue, - meta: { - topic: message.topic, - partition: message.partition, - offset: message.offset, - key: message.key - }, - timestamp: Date.now() - }); - } catch (enqueueError2) { - logger.error("Enqueue error payload failed", { error: enqueueError2?.message }); - } - } - }; - const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1e3; - const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20; - const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5e3; - const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); - const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); - const commitOnAttempt = config.kafka.commitOnAttempt === true; - const batchStates = /* @__PURE__ */ new Map(); - const partitionKeyFromMessage = (message) => { - if (message?.topic !== void 0 && message?.partition !== void 0) { - return `${message.topic}-${message.partition}`; - } - return "retry"; - }; - const dayKeyFromTsMs = (tsMs) => { - const numeric = typeof tsMs === "string" ? Number(tsMs) : tsMs; - if (!Number.isFinite(numeric)) return null; - const d = new Date(numeric); - if (Number.isNaN(d.getTime())) return null; - const yyyy = d.getFullYear(); - const mm = String(d.getMonth() + 1).padStart(2, "0"); - const dd = String(d.getDate()).padStart(2, "0"); - return `${yyyy}${mm}${dd}`; - }; - const getBatchState = (key) => { - if (!batchStates.has(key)) { - batchStates.set(key, { items: [], timer: null, flushing: null }); - } - return batchStates.get(key); - }; - const isDbConnectionError = (err) => { - const code = err?.code; - if (typeof code === "string") { - const networkCodes = /* @__PURE__ */ new Set([ - "ECONNREFUSED", - "ECONNRESET", - "EPIPE", - "ETIMEDOUT", - "ENOTFOUND", - "EHOSTUNREACH", - "ENETUNREACH", - "57P03", - "08006", - "08001", - "08000", - "08003" - ]); - if (networkCodes.has(code)) return true; - } - const message = typeof err?.message === "string" ? err.message : ""; - if (!message) return false; - const lower = message.toLowerCase(); - return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry"); - }; - const insertRowsWithRetry = async (rows) => { - const startedAt = Date.now(); - while (true) { - try { - await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); - metricCollector.increment("db_insert_count", 1); - metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); - return; - } catch (err) { - if (isDbConnectionError(err)) { - logger.error("Database offline during batch insert. Retrying in 5s...", { error: err.message }); - await new Promise((r) => setTimeout(r, 5e3)); - while (!await dbManager.checkConnection()) { - logger.warn("Database still offline. Waiting 5s..."); - await new Promise((r) => setTimeout(r, 5e3)); - } - continue; - } - throw err; - } - } - }; - const insertRowsOnce = async (rows) => { - const startedAt = Date.now(); - await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); - metricCollector.increment("db_insert_count", 1); - metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); - }; - const resolveInsertedItems = (partitionKey, items) => { - let insertedRows = 0; - for (const p of items) { - insertedRows += p.rows.length; - const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); - if (dayKey) { - metricCollector.incrementKeyed("db_inserted_by_day", dayKey, p.rows.length); - } - p.item.resolve(); - } - metricCollector.increment("db_inserted", insertedRows); - metricCollector.incrementKeyed("db_inserted_by_partition", partitionKey, insertedRows); - }; - const handleFailedItem = async (partitionKey, p, err) => { - metricCollector.increment("db_failed"); - metricCollector.incrementKeyed("db_failed_by_partition", partitionKey, 1); - const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); - if (dayKey) { - metricCollector.incrementKeyed("db_failed_by_day", dayKey, 1); - } - await handleError(err, p.item.message); - p.item.resolve(); - }; - const insertItemsDegraded = async (partitionKey, items) => { - if (items.length === 0) return; - const rows = items.flatMap((p) => p.rows); - if (commitOnAttempt) { - try { - await insertRowsOnce(rows); - resolveInsertedItems(partitionKey, items); - } catch (err) { - for (const item of items) { - await handleFailedItem(partitionKey, item, err); - } - } - return; - } - try { - await insertRowsWithRetry(rows); - resolveInsertedItems(partitionKey, items); - return; - } catch (err) { - if (items.length === 1) { - try { - await insertRowsWithRetry(items[0].rows); - resolveInsertedItems(partitionKey, items); - } catch (innerErr) { - await handleFailedItem(partitionKey, items[0], innerErr); - } - return; - } - const mid = Math.floor(items.length / 2); - await insertItemsDegraded(partitionKey, items.slice(0, mid)); - await insertItemsDegraded(partitionKey, items.slice(mid)); - } - }; - const flushBatchForKey = async (partitionKey) => { - const state = getBatchState(partitionKey); - if (state.flushing) return state.flushing; - state.flushing = (async () => { - if (state.timer) { - clearTimeout(state.timer); - state.timer = null; - } - if (state.items.length === 0) return; - const startedAt = Date.now(); - const currentBatch = state.items; - state.items = []; - const pendingDbItems = []; - const unresolvedItems = []; - try { - for (const item of currentBatch) { - try { - const rows = parseMessageToRows(item.message); - pendingDbItems.push({ item, rows }); - unresolvedItems.push(item); - } catch (err) { - metricCollector.increment("parse_error"); - metricCollector.incrementKeyed("parse_error_by_partition", partitionKey, 1); - logger.error("Message processing failed (Parse/Validation)", { error: err.message }); - await handleError(err, item.message); - item.resolve(); - } - } - if (pendingDbItems.length > 0) { - const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms; - const dayKey = dayKeyFromTsMs(firstTs); - if (dayKey) { - const dayStartMs = Date.now(); - await insertItemsDegraded(partitionKey, pendingDbItems); - metricCollector.incrementKeyed("db_insert_ms_sum_by_day", dayKey, Date.now() - dayStartMs); - } else { - await insertItemsDegraded(partitionKey, pendingDbItems); - } - } - metricCollector.increment("batch_flush_count", 1); - metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt); - } catch (err) { - if (!commitOnAttempt && isDbConnectionError(err)) { - state.items = unresolvedItems.concat(state.items); - if (!state.timer) { - state.timer = setTimeout(() => { - state.timer = null; - flushBatchForKey(partitionKey); - }, 5e3); - } - return; - } - logger.error("Batch flush failed (non-network). Marking as consumed", { - error: err?.message, - partitionKey, - batchSize: currentBatch.length - }); - for (const item of unresolvedItems) { - try { - await handleError(err, item.message); - } catch { - } - item.resolve(); - } - } - })().finally(() => { - state.flushing = null; - if (state.items.length > 0) { - if (state.items.length >= BATCH_SIZE) { - flushBatchForKey(partitionKey); - } else if (!state.timer) { - state.timer = setTimeout(() => { - state.timer = null; - flushBatchForKey(partitionKey); - }, BATCH_TIMEOUT_MS); - } - } - }); - return state.flushing; - }; - const handleMessage = (message) => { - if (message.topic) { - metricCollector.increment("kafka_pulled"); - metricCollector.incrementKeyed("kafka_pulled_by_partition", `${message.topic}-${message.partition}`, 1); - } - const partitionKey = partitionKeyFromMessage(message); - const state = getBatchState(partitionKey); - return new Promise((resolve, reject) => { - state.items.push({ message, resolve, reject }); - if (state.items.length >= BATCH_SIZE) { - flushBatchForKey(partitionKey); - } else if (!state.timer) { - state.timer = setTimeout(() => { - state.timer = null; - flushBatchForKey(partitionKey); - }, BATCH_TIMEOUT_MS); - } - }); - }; - const consumers = createKafkaConsumers({ - kafkaConfig: config.kafka, - onMessage: handleMessage, - onError: handleError - }); - startErrorRetryWorker({ - client: redisClient, - queueKey: errorQueueKey, - redisIntegration, - handler: async (item) => { - if (!item?.value) { - throw new Error("Missing value in retry payload"); - } - await handleMessage({ value: item.value }); - } - }).catch((err) => { - logger.error("Retry worker failed", { error: err?.message }); - }); - const shutdown = async (signal) => { - logger.info(`Received ${signal}, shutting down...`); - try { - if (consumers && consumers.length > 0) { - await Promise.all(consumers.map((c) => new Promise((resolve) => c.close(true, resolve)))); - logger.info("Kafka consumer closed", { count: consumers.length }); - } - await redisClient.quit(); - logger.info("Redis client closed"); - await dbManager.close(); - logger.info("Database connection closed"); - process.exit(0); - } catch (err) { - logger.error("Error during shutdown", { error: err?.message }); - process.exit(1); - } - }; - process.on("SIGTERM", () => shutdown("SIGTERM")); - process.on("SIGINT", () => shutdown("SIGINT")); -}; -bootstrap().catch((error) => { - logger.error("Service bootstrap failed", { error: error?.message }); - process.exit(1); -});