diff --git a/bls-onoffline-backend/.env b/bls-onoffline-backend/.env index 8036ba1..0398351 100644 --- a/bls-onoffline-backend/.env +++ b/bls-onoffline-backend/.env @@ -9,8 +9,12 @@ KAFKA_SASL_MECHANISM=plain KAFKA_SASL_USERNAME=blwmomo KAFKA_SASL_PASSWORD=blwmomo KAFKA_SSL_ENABLED=false -KAFKA_CONSUMER_INSTANCES=6 -KAFKA_MAX_IN_FLIGHT=50 +KAFKA_CONSUMER_INSTANCES=3 +KAFKA_MAX_IN_FLIGHT=5000 +KAFKA_BATCH_SIZE=1000 +KAFKA_BATCH_TIMEOUT_MS=20 +KAFKA_COMMIT_INTERVAL_MS=200 +KAFKA_COMMIT_ON_ATTEMPT=true KAFKA_FETCH_MAX_BYTES=10485760 KAFKA_FETCH_MAX_WAIT_MS=100 KAFKA_FETCH_MIN_BYTES=1 diff --git a/bls-onoffline-backend/dist/index.js b/bls-onoffline-backend/dist/index.js index c2ef04d..87f589f 100644 --- a/bls-onoffline-backend/dist/index.js +++ b/bls-onoffline-backend/dist/index.js @@ -23,11 +23,15 @@ const config = { groupId: process.env.KAFKA_GROUP_ID || "bls-onoffline-group", clientId: process.env.KAFKA_CLIENT_ID || "bls-onoffline-client", consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), - maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50), - fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024), - fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1), + maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4), + fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024), + fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024), fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3), + commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200), + commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true", + batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3), + batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50), logMessages: process.env.KAFKA_LOG_MESSAGES === "true", sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { mechanism: process.env.KAFKA_SASL_MECHANISM || "plain", @@ -64,13 +68,17 @@ const format = (level, message, context) => { }; return JSON.stringify(payload); }; -const logger$1 = { +const logger = { info(message, context) { process.stdout.write(`${format("info", message, context)} `); }, error(message, context) { process.stderr.write(`${format("error", message, context)} +`); + }, + warn(message, context) { + process.stderr.write(`${format("warn", message, context)} `); } }; @@ -104,24 +112,29 @@ class DatabaseManager { if (!rows || rows.length === 0) { return; } - const values = []; - const placeholders = rows.map((row, rowIndex) => { - const offset = rowIndex * columns.length; - columns.forEach((column) => { - values.push(row[column] ?? null); - }); - const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`); - return `(${params.join(", ")})`; - }); const statement = ` INSERT INTO ${schema}.${table} (${columns.join(", ")}) - VALUES ${placeholders.join(", ")} + SELECT * + FROM UNNEST( + $1::text[], + $2::int8[], + $3::int8[], + $4::int2[], + $5::text[], + $6::text[], + $7::text[], + $8::text[], + $9::text[], + $10::text[], + $11::text[] + ) ON CONFLICT DO NOTHING `; try { - await this.pool.query(statement, values); + const params = columns.map((column) => rows.map((row) => row[column] ?? null)); + await this.pool.query(statement, params); } catch (error) { - logger$1.error("Database insert failed", { + logger.error("Database insert failed", { error: error?.message, schema, table, @@ -147,7 +160,7 @@ class DatabaseManager { await client.query("SELECT 1"); return true; } catch (err) { - logger$1.error("Database check connection failed", { error: err.message }); + logger.error("Database check connection failed", { error: err.message }); return false; } finally { if (client) { @@ -180,6 +193,65 @@ class PartitionManager { const endMs = end.getTime(); return { startMs, endMs, partitionSuffix }; } + async ensurePartitionIndexes(client, schema, table, partitionSuffix) { + const startedAt = Date.now(); + const partitionName = `${schema}.${table}_${partitionSuffix}`; + const indexBase = `${table}_${partitionSuffix}`; + const indexSpecs = [ + { name: `idx_${indexBase}_ts`, column: "ts_ms" }, + { name: `idx_${indexBase}_hid`, column: "hotel_id" }, + { name: `idx_${indexBase}_mac`, column: "mac" }, + { name: `idx_${indexBase}_did`, column: "device_id" }, + { name: `idx_${indexBase}_rid`, column: "room_id" }, + { name: `idx_${indexBase}_cs`, column: "current_status" } + ]; + for (const spec of indexSpecs) { + await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`); + } + await client.query(`ANALYZE ${partitionName};`); + const elapsedMs = Date.now() - startedAt; + if (elapsedMs > 1e3) { + logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs }); + } + } + async ensureIndexesForExistingPartitions() { + const startedAt = Date.now(); + const client = await dbManager.pool.connect(); + try { + const schema = config.db.schema; + const table = config.db.table; + const res = await client.query( + ` + SELECT c.relname AS relname + FROM pg_inherits i + JOIN pg_class p ON i.inhparent = p.oid + JOIN pg_namespace pn ON pn.oid = p.relnamespace + JOIN pg_class c ON i.inhrelid = c.oid + WHERE pn.nspname = $1 AND p.relname = $2 + ORDER BY c.relname; + `, + [schema, table] + ); + const suffixes = /* @__PURE__ */ new Set(); + const pattern = new RegExp(`^${table}_(\\d{8})$`); + for (const row of res.rows) { + const relname = row?.relname; + if (typeof relname !== "string") continue; + const match = relname.match(pattern); + if (!match) continue; + suffixes.add(match[1]); + } + for (const suffix of suffixes) { + await this.ensurePartitionIndexes(client, schema, table, suffix); + } + const elapsedMs = Date.now() - startedAt; + if (elapsedMs > 5e3) { + logger.warn("Ensure existing partition indexes slow", { schema, table, partitions: suffixes.size, elapsedMs }); + } + } finally { + client.release(); + } + } /** * Ensure partitions exist for the past M days and next N days. * @param {number} daysAhead - Number of days to pre-create. @@ -188,7 +260,7 @@ class PartitionManager { async ensurePartitions(daysAhead = 30, daysBack = 15) { const client = await dbManager.pool.connect(); try { - logger$1.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); + logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`); const now = /* @__PURE__ */ new Date(); for (let i = -daysBack; i < daysAhead; i++) { @@ -203,7 +275,7 @@ class PartitionManager { `; const checkRes = await client.query(checkSql, [partitionName]); if (!checkRes.rows[0].exists) { - logger$1.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); + logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); const createSql = ` CREATE TABLE IF NOT EXISTS ${partitionName} @@ -212,27 +284,70 @@ class PartitionManager { `; await client.query(createSql); } + await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); } - logger$1.info("Partition check completed."); + logger.info("Partition check completed."); } catch (err) { - logger$1.error("Error ensuring partitions:", err); + logger.error("Error ensuring partitions:", err); throw err; } finally { client.release(); } } + async ensurePartitionsForTimestamps(tsMsList) { + if (!Array.isArray(tsMsList) || tsMsList.length === 0) return; + const uniqueSuffixes = /* @__PURE__ */ new Set(); + for (const ts of tsMsList) { + const numericTs = typeof ts === "string" ? Number(ts) : ts; + if (!Number.isFinite(numericTs)) continue; + const date = new Date(numericTs); + if (Number.isNaN(date.getTime())) continue; + const { partitionSuffix } = this.getPartitionInfo(date); + uniqueSuffixes.add(partitionSuffix); + if (uniqueSuffixes.size >= 400) break; + } + if (uniqueSuffixes.size === 0) return; + const client = await dbManager.pool.connect(); + try { + const schema = config.db.schema; + const table = config.db.table; + for (const partitionSuffix of uniqueSuffixes) { + const yyyy = Number(partitionSuffix.slice(0, 4)); + const mm = Number(partitionSuffix.slice(4, 6)); + const dd = Number(partitionSuffix.slice(6, 8)); + if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue; + const targetDate = new Date(yyyy, mm - 1, dd); + if (Number.isNaN(targetDate.getTime())) continue; + const { startMs, endMs } = this.getPartitionInfo(targetDate); + const partitionName = `${schema}.${table}_${partitionSuffix}`; + const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]); + if (!checkRes.rows[0].exists) { + logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); + await client.query(` + CREATE TABLE IF NOT EXISTS ${partitionName} + PARTITION OF ${schema}.${table} + FOR VALUES FROM (${startMs}) TO (${endMs}); + `); + } + await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); + } + } finally { + client.release(); + } + } } const partitionManager = new PartitionManager(); const __filename$1 = fileURLToPath(import.meta.url); const __dirname$1 = path.dirname(__filename$1); class DatabaseInitializer { async initialize() { - logger$1.info("Starting database initialization check..."); + logger.info("Starting database initialization check..."); await this.ensureDatabaseExists(); await this.ensureSchemaAndTable(); await partitionManager.ensurePartitions(30); + await partitionManager.ensureIndexesForExistingPartitions(); console.log("Database initialization completed successfully."); - logger$1.info("Database initialization completed successfully."); + logger.info("Database initialization completed successfully."); } async ensureDatabaseExists() { const { host, port, user, password, database, ssl } = config.db; @@ -252,16 +367,16 @@ class DatabaseInitializer { [database] ); if (checkRes.rowCount === 0) { - logger$1.info(`Database '${database}' does not exist. Creating...`); + logger.info(`Database '${database}' does not exist. Creating...`); await client.query(`CREATE DATABASE "${database}"`); console.log(`Database '${database}' created.`); - logger$1.info(`Database '${database}' created.`); + logger.info(`Database '${database}' created.`); } else { console.log(`Database '${database}' already exists.`); - logger$1.info(`Database '${database}' already exists.`); + logger.info(`Database '${database}' already exists.`); } } catch (err) { - logger$1.error("Error ensuring database exists:", err); + logger.error("Error ensuring database exists:", err); throw err; } finally { await client.end(); @@ -281,12 +396,12 @@ class DatabaseInitializer { } const sql = fs.readFileSync(sqlPath, "utf8"); console.log(`Executing init_db.sql from ${sqlPath}...`); - logger$1.info("Executing init_db.sql..."); + logger.info("Executing init_db.sql..."); await client.query(sql); console.log("Schema and parent table initialized."); - logger$1.info("Schema and parent table initialized."); + logger.info("Schema and parent table initialized."); } catch (err) { - logger$1.error("Error initializing schema and table:", err); + logger.error("Error initializing schema and table:", err); throw err; } finally { client.release(); @@ -302,31 +417,40 @@ class OffsetTracker { add(topic, partition, offset) { const key = `${topic}-${partition}`; if (!this.partitions.has(key)) { - this.partitions.set(key, []); + this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() }); + } + const state = this.partitions.get(key); + const numericOffset = Number(offset); + if (!Number.isFinite(numericOffset)) return; + if (state.nextCommitOffset === null) { + state.nextCommitOffset = numericOffset; + } else if (numericOffset < state.nextCommitOffset) { + state.nextCommitOffset = numericOffset; } - this.partitions.get(key).push({ offset, done: false }); } // Called when a message is successfully processed // Returns the next offset to commit (if any advancement is possible), or null markDone(topic, partition, offset) { const key = `${topic}-${partition}`; - const list = this.partitions.get(key); - if (!list) return null; - const item = list.find((i) => i.offset === offset); - if (item) { - item.done = true; + const state = this.partitions.get(key); + if (!state) return null; + const numericOffset = Number(offset); + if (!Number.isFinite(numericOffset)) return null; + state.done.add(numericOffset); + if (state.nextCommitOffset === null) { + state.nextCommitOffset = numericOffset; } - let lastDoneOffset = null; - let itemsRemoved = false; - while (list.length > 0 && list[0].done) { - lastDoneOffset = list[0].offset; - list.shift(); - itemsRemoved = true; + let advanced = false; + while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) { + state.done.delete(state.nextCommitOffset); + state.nextCommitOffset += 1; + advanced = true; } - if (itemsRemoved && lastDoneOffset !== null) { - return lastDoneOffset + 1; - } - return null; + if (!advanced) return null; + return state.nextCommitOffset; + } + clear() { + this.partitions.clear(); } } const { ConsumerGroup } = kafka; @@ -334,9 +458,35 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = const kafkaHost = kafkaConfig.brokers.join(","); const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; const id = `${clientId}-${process.pid}-${Date.now()}`; - const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50; + const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3; + const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200; let inFlight = 0; const tracker = new OffsetTracker(); + let pendingCommits = /* @__PURE__ */ new Map(); + let commitTimer = null; + const flushCommits = () => { + if (pendingCommits.size === 0) return; + const batch = pendingCommits; + pendingCommits = /* @__PURE__ */ new Map(); + consumer.sendOffsetCommitRequest( + Array.from(batch.values()), + (err) => { + if (err) { + for (const [k, v] of batch.entries()) { + pendingCommits.set(k, v); + } + logger.error("Kafka commit failed", { error: err?.message, count: batch.size }); + } + } + ); + }; + const scheduleCommitFlush = () => { + if (commitTimer) return; + commitTimer = setTimeout(() => { + commitTimer = null; + flushCommits(); + }, commitIntervalMs); + }; const consumer = new ConsumerGroup( { kafkaHost, @@ -367,65 +517,74 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = consumer.pause(); } Promise.resolve(onMessage(message)).then(() => { - const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); - if (commitOffset !== null) { - consumer.sendOffsetCommitRequest([{ - topic: message.topic, - partition: message.partition, - offset: commitOffset, - metadata: "m" - }], (err) => { - if (err) { - logger$1.error("Kafka commit failed", { error: err?.message, topic: message.topic, partition: message.partition, offset: commitOffset }); - } - }); - } }).catch((error) => { - logger$1.error("Kafka message handling failed", { error: error?.message }); + logger.error("Kafka message handling failed", { error: error?.message }); if (onError) { onError(error, message); } }).finally(() => { + const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); + if (commitOffset !== null) { + const key = `${message.topic}-${message.partition}`; + pendingCommits.set(key, { + topic: message.topic, + partition: message.partition, + offset: commitOffset, + metadata: "m" + }); + scheduleCommitFlush(); + } inFlight -= 1; tryResume(); }); }); consumer.on("error", (error) => { - logger$1.error("Kafka consumer error", { error: error?.message }); + logger.error("Kafka consumer error", { error: error?.message }); if (onError) { onError(error); } }); consumer.on("connect", () => { - logger$1.info(`Kafka Consumer connected`, { + logger.info(`Kafka Consumer connected`, { groupId: kafkaConfig.groupId, clientId }); }); consumer.on("rebalancing", () => { - logger$1.info(`Kafka Consumer rebalancing`, { + logger.info(`Kafka Consumer rebalancing`, { groupId: kafkaConfig.groupId, clientId }); + tracker.clear(); + pendingCommits.clear(); + if (commitTimer) { + clearTimeout(commitTimer); + commitTimer = null; + } }); consumer.on("rebalanced", () => { - logger$1.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId }); + logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId }); }); consumer.on("error", (err) => { - logger$1.error("Kafka Consumer Error", { error: err.message }); + logger.error("Kafka Consumer Error", { error: err.message }); }); consumer.on("offsetOutOfRange", (err) => { - logger$1.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition }); + logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition }); }); consumer.on("offsetOutOfRange", (error) => { - logger$1.warn(`Kafka Consumer offset out of range`, { + logger.warn(`Kafka Consumer offset out of range`, { error: error?.message, groupId: kafkaConfig.groupId, clientId }); }); consumer.on("close", () => { - logger$1.warn(`Kafka Consumer closed`, { + if (commitTimer) { + clearTimeout(commitTimer); + commitTimer = null; + } + flushCommits(); + logger.warn(`Kafka Consumer closed`, { groupId: kafkaConfig.groupId, clientId }); @@ -516,52 +675,23 @@ const buildRowsFromPayload = (rawPayload) => { }; return [row]; }; -const processKafkaMessage = async ({ message, dbManager: dbManager2, config: config2 }) => { - let rows; +const parseMessageToRows = (message) => { + const rawValue = message.value.toString(); + let payload; try { - const rawValue = message.value.toString(); - let payload; - try { - payload = JSON.parse(rawValue); - } catch (e) { - logger.error("JSON Parse Error", { error: e.message, rawValue }); - const error = new Error(`JSON Parse Error: ${e.message}`); - error.type = "PARSE_ERROR"; - throw error; - } - const validationResult = kafkaPayloadSchema.safeParse(payload); - if (!validationResult.success) { - logger.error("Schema Validation Failed", { - errors: validationResult.error.errors, - payload - }); - const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); - error.type = "VALIDATION_ERROR"; - throw error; - } - rows = buildRowsFromPayload(payload); - } catch (error) { + payload = JSON.parse(rawValue); + } catch (e) { + const error = new Error(`JSON Parse Error: ${e.message}`); + error.type = "PARSE_ERROR"; throw error; } - try { - await dbManager2.insertRows({ schema: config2.db.schema, table: config2.db.table, rows }); - } catch (error) { - error.type = "DB_ERROR"; - const sample = rows?.[0]; - error.dbContext = { - rowsLength: rows?.length || 0, - sampleRow: sample ? { - guid: sample.guid, - ts_ms: sample.ts_ms, - mac: sample.mac, - device_id: sample.device_id, - room_id: sample.room_id, - current_status: sample.current_status - } : null - }; + const validationResult = kafkaPayloadSchema.safeParse(payload); + if (!validationResult.success) { + const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); + error.type = "VALIDATION_ERROR"; throw error; } - return rows.length; + return buildRowsFromPayload(payload); }; const createRedisClient = async (config2) => { const client = createClient({ @@ -617,7 +747,7 @@ const enqueueError = async (client, queueKey, payload) => { try { await client.rPush(queueKey, JSON.stringify(payload)); } catch (error) { - logger$1.error("Redis enqueue error failed", { error: error?.message }); + logger.error("Redis enqueue error failed", { error: error?.message }); throw error; } }; @@ -638,7 +768,7 @@ const startErrorRetryWorker = async ({ try { item = JSON.parse(raw); } catch (error) { - logger$1.error("Invalid error payload", { error: error?.message }); + logger.error("Invalid error payload", { error: error?.message }); await redisIntegration.error("Invalid error payload", { module: "redis", stack: error?.message }); continue; } @@ -646,7 +776,7 @@ const startErrorRetryWorker = async ({ try { await handler(item); } catch (error) { - logger$1.error("Retry handler failed", { error: error?.message, stack: error?.stack }); + logger.error("Retry handler failed", { error: error?.message, stack: error?.stack }); const nextPayload = { ...item, attempts: attempts + 1, @@ -670,22 +800,38 @@ class MetricCollector { kafka_pulled: 0, parse_error: 0, db_inserted: 0, - db_failed: 0 + db_failed: 0, + db_insert_count: 0, + db_insert_ms_sum: 0, + batch_flush_count: 0, + batch_flush_ms_sum: 0 }; + this.keyed = {}; } increment(metric, count = 1) { if (this.metrics.hasOwnProperty(metric)) { this.metrics[metric] += count; } } + incrementKeyed(metric, key, count = 1) { + if (!key) return; + if (!this.keyed[metric]) { + this.keyed[metric] = {}; + } + if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) { + this.keyed[metric][key] = 0; + } + this.keyed[metric][key] += count; + } getAndReset() { const current = { ...this.metrics }; + const keyed = JSON.parse(JSON.stringify(this.keyed)); this.reset(); - return current; + return { ...current, keyed }; } } const bootstrap = async () => { - logger$1.info("Starting application with config", { + logger.info("Starting application with config", { env: process.env.NODE_ENV, db: { host: config.db.host, @@ -707,11 +853,11 @@ const bootstrap = async () => { await dbInitializer.initialize(); const metricCollector = new MetricCollector(); cron.schedule("0 0 * * *", async () => { - logger$1.info("Running scheduled partition maintenance..."); + logger.info("Running scheduled partition maintenance..."); try { await partitionManager.ensurePartitions(30); } catch (err) { - logger$1.error("Scheduled partition maintenance failed", err); + logger.error("Scheduled partition maintenance failed", err); } }); const redisClient = await createRedisClient(config.redis); @@ -723,18 +869,20 @@ const bootstrap = async () => { redisIntegration.startHeartbeat(); cron.schedule("* * * * *", async () => { const metrics = metricCollector.getAndReset(); - const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`; + const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0"; + const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0"; + const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`; console.log(report); - logger$1.info(report, metrics); + logger.info(report, metrics); try { await redisIntegration.info("Minute Metrics", metrics); } catch (err) { - logger$1.error("Failed to report metrics to Redis", { error: err?.message }); + logger.error("Failed to report metrics to Redis", { error: err?.message }); } }); const errorQueueKey = buildErrorQueueKey(config.redis.projectName); const handleError = async (error, message) => { - logger$1.error("Kafka processing error", { + logger.error("Kafka processing error", { error: error?.message, type: error?.type, stack: error?.stack @@ -745,7 +893,7 @@ const bootstrap = async () => { stack: error?.stack || error?.message }); } catch (redisError) { - logger$1.error("Redis error log failed", { error: redisError?.message }); + logger.error("Redis error log failed", { error: redisError?.message }); } if (message) { const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value; @@ -762,57 +910,266 @@ const bootstrap = async () => { timestamp: Date.now() }); } catch (enqueueError2) { - logger$1.error("Enqueue error payload failed", { error: enqueueError2?.message }); + logger.error("Enqueue error payload failed", { error: enqueueError2?.message }); } } }; - const handleMessage = async (message) => { - if (message.topic) { - metricCollector.increment("kafka_pulled"); + const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1e3; + const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20; + const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5e3; + const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); + const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); + const commitOnAttempt = config.kafka.commitOnAttempt === true; + const batchStates = /* @__PURE__ */ new Map(); + const partitionKeyFromMessage = (message) => { + if (message?.topic !== void 0 && message?.partition !== void 0) { + return `${message.topic}-${message.partition}`; } - const messageValue = Buffer.isBuffer(message.value) ? message.value.toString("utf8") : message.value; - Buffer.isBuffer(message.key) ? message.key.toString("utf8") : message.key; - ({ - topic: message.topic, - partition: message.partition, - offset: message.offset, - valueLength: !config.kafka.logMessages && typeof messageValue === "string" ? messageValue.length : null - }); + return "retry"; + }; + const dayKeyFromTsMs = (tsMs) => { + const numeric = typeof tsMs === "string" ? Number(tsMs) : tsMs; + if (!Number.isFinite(numeric)) return null; + const d = new Date(numeric); + if (Number.isNaN(d.getTime())) return null; + const yyyy = d.getFullYear(); + const mm = String(d.getMonth() + 1).padStart(2, "0"); + const dd = String(d.getDate()).padStart(2, "0"); + return `${yyyy}${mm}${dd}`; + }; + const getBatchState = (key) => { + if (!batchStates.has(key)) { + batchStates.set(key, { items: [], timer: null, flushing: null }); + } + return batchStates.get(key); + }; + const isDbConnectionError = (err) => { + const code = err?.code; + if (typeof code === "string") { + const networkCodes = /* @__PURE__ */ new Set([ + "ECONNREFUSED", + "ECONNRESET", + "EPIPE", + "ETIMEDOUT", + "ENOTFOUND", + "EHOSTUNREACH", + "ENETUNREACH", + "57P03", + "08006", + "08001", + "08000", + "08003" + ]); + if (networkCodes.has(code)) return true; + } + const message = typeof err?.message === "string" ? err.message : ""; + if (!message) return false; + const lower = message.toLowerCase(); + return lower.includes("connection timeout") || lower.includes("connection terminated") || lower.includes("connection refused") || lower.includes("terminating connection") || lower.includes("econnrefused") || lower.includes("econnreset") || lower.includes("etimedout") || lower.includes("could not connect") || lower.includes("the database system is starting up") || lower.includes("no pg_hba.conf entry"); + }; + const isMissingPartitionError = (err) => err?.code === "23514" || typeof err?.message === "string" && err.message.includes("no partition of relation"); + const insertRowsWithRetry = async (rows) => { + const startedAt = Date.now(); + let attemptedPartitionFix = false; while (true) { try { - const inserted = await processKafkaMessage({ message, dbManager, config }); - metricCollector.increment("db_inserted"); + await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + metricCollector.increment("db_insert_count", 1); + metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); return; - } catch (error) { - const isDbConnectionError = error.code && ["ECONNREFUSED", "57P03", "08006", "08001", "EADDRINUSE", "ETIMEDOUT"].includes(error.code) || error.message && (error.message.includes("ECONNREFUSED") || error.message.includes("connection") || error.message.includes("terminated") || error.message.includes("EADDRINUSE") || error.message.includes("ETIMEDOUT") || error.message.includes("The server does not support SSL connections")); - if (isDbConnectionError) { - logger$1.error("Database offline. Pausing consumption for 1 minute...", { error: error.message }); - await new Promise((resolve) => setTimeout(resolve, 6e4)); - while (true) { - const isConnected = await dbManager.checkConnection(); - if (isConnected) { - logger$1.info("Database connection restored. Resuming processing..."); - break; - } - logger$1.warn("Database still offline. Waiting 1 minute..."); - await new Promise((resolve) => setTimeout(resolve, 6e4)); + } catch (err) { + if (isDbConnectionError(err)) { + logger.error("Database offline during batch insert. Retrying in 5s...", { error: err.message }); + await new Promise((r) => setTimeout(r, 5e3)); + while (!await dbManager.checkConnection()) { + logger.warn("Database still offline. Waiting 5s..."); + await new Promise((r) => setTimeout(r, 5e3)); } - } else { - if (error.type === "PARSE_ERROR") { - metricCollector.increment("parse_error"); - } else { - metricCollector.increment("db_failed"); - } - logger$1.error("Message processing failed (Data/Logic Error), skipping message", { - error: error?.message, - type: error?.type - }); - await handleError(error, message); - return; + continue; } + if (isMissingPartitionError(err) && !attemptedPartitionFix) { + attemptedPartitionFix = true; + try { + await partitionManager.ensurePartitionsForTimestamps(rows.map((r) => r.ts_ms)); + } catch (partitionErr) { + if (isDbConnectionError(partitionErr)) { + logger.error("Database offline during partition ensure. Retrying in 5s...", { error: partitionErr.message }); + await new Promise((r) => setTimeout(r, 5e3)); + while (!await dbManager.checkConnection()) { + logger.warn("Database still offline. Waiting 5s..."); + await new Promise((r) => setTimeout(r, 5e3)); + } + continue; + } + throw partitionErr; + } + continue; + } + throw err; } } }; + const insertRowsOnce = async (rows) => { + const startedAt = Date.now(); + await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + metricCollector.increment("db_insert_count", 1); + metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt); + }; + const resolveInsertedItems = (partitionKey, items) => { + let insertedRows = 0; + for (const p of items) { + insertedRows += p.rows.length; + const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); + if (dayKey) { + metricCollector.incrementKeyed("db_inserted_by_day", dayKey, p.rows.length); + } + p.item.resolve(); + } + metricCollector.increment("db_inserted", insertedRows); + metricCollector.incrementKeyed("db_inserted_by_partition", partitionKey, insertedRows); + }; + const handleFailedItem = async (partitionKey, p, err) => { + metricCollector.increment("db_failed"); + metricCollector.incrementKeyed("db_failed_by_partition", partitionKey, 1); + const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); + if (dayKey) { + metricCollector.incrementKeyed("db_failed_by_day", dayKey, 1); + } + await handleError(err, p.item.message); + p.item.resolve(); + }; + const insertItemsDegraded = async (partitionKey, items) => { + if (items.length === 0) return; + const rows = items.flatMap((p) => p.rows); + if (commitOnAttempt) { + try { + await insertRowsOnce(rows); + resolveInsertedItems(partitionKey, items); + } catch (err) { + for (const item of items) { + await handleFailedItem(partitionKey, item, err); + } + } + return; + } + try { + await insertRowsWithRetry(rows); + resolveInsertedItems(partitionKey, items); + return; + } catch (err) { + if (items.length === 1) { + try { + await insertRowsWithRetry(items[0].rows); + resolveInsertedItems(partitionKey, items); + } catch (innerErr) { + await handleFailedItem(partitionKey, items[0], innerErr); + } + return; + } + const mid = Math.floor(items.length / 2); + await insertItemsDegraded(partitionKey, items.slice(0, mid)); + await insertItemsDegraded(partitionKey, items.slice(mid)); + } + }; + const flushBatchForKey = async (partitionKey) => { + const state = getBatchState(partitionKey); + if (state.flushing) return state.flushing; + state.flushing = (async () => { + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; + } + if (state.items.length === 0) return; + const startedAt = Date.now(); + const currentBatch = state.items; + state.items = []; + const pendingDbItems = []; + const unresolvedItems = []; + try { + for (const item of currentBatch) { + try { + const rows = parseMessageToRows(item.message); + pendingDbItems.push({ item, rows }); + unresolvedItems.push(item); + } catch (err) { + metricCollector.increment("parse_error"); + metricCollector.incrementKeyed("parse_error_by_partition", partitionKey, 1); + logger.error("Message processing failed (Parse/Validation)", { error: err.message }); + await handleError(err, item.message); + item.resolve(); + } + } + if (pendingDbItems.length > 0) { + const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms; + const dayKey = dayKeyFromTsMs(firstTs); + if (dayKey) { + const dayStartMs = Date.now(); + await insertItemsDegraded(partitionKey, pendingDbItems); + metricCollector.incrementKeyed("db_insert_ms_sum_by_day", dayKey, Date.now() - dayStartMs); + } else { + await insertItemsDegraded(partitionKey, pendingDbItems); + } + } + metricCollector.increment("batch_flush_count", 1); + metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt); + } catch (err) { + if (!commitOnAttempt && isDbConnectionError(err)) { + state.items = unresolvedItems.concat(state.items); + if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, 5e3); + } + return; + } + logger.error("Batch flush failed (non-network). Marking as consumed", { + error: err?.message, + partitionKey, + batchSize: currentBatch.length + }); + for (const item of unresolvedItems) { + try { + await handleError(err, item.message); + } catch { + } + item.resolve(); + } + } + })().finally(() => { + state.flushing = null; + if (state.items.length > 0) { + if (state.items.length >= BATCH_SIZE) { + flushBatchForKey(partitionKey); + } else if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, BATCH_TIMEOUT_MS); + } + } + }); + return state.flushing; + }; + const handleMessage = (message) => { + if (message.topic) { + metricCollector.increment("kafka_pulled"); + metricCollector.incrementKeyed("kafka_pulled_by_partition", `${message.topic}-${message.partition}`, 1); + } + const partitionKey = partitionKeyFromMessage(message); + const state = getBatchState(partitionKey); + return new Promise((resolve, reject) => { + state.items.push({ message, resolve, reject }); + if (state.items.length >= BATCH_SIZE) { + flushBatchForKey(partitionKey); + } else if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, BATCH_TIMEOUT_MS); + } + }); + }; const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, @@ -829,22 +1186,22 @@ const bootstrap = async () => { await handleMessage({ value: item.value }); } }).catch((err) => { - logger$1.error("Retry worker failed", { error: err?.message }); + logger.error("Retry worker failed", { error: err?.message }); }); const shutdown = async (signal) => { - logger$1.info(`Received ${signal}, shutting down...`); + logger.info(`Received ${signal}, shutting down...`); try { if (consumers && consumers.length > 0) { await Promise.all(consumers.map((c) => new Promise((resolve) => c.close(true, resolve)))); - logger$1.info("Kafka consumer closed", { count: consumers.length }); + logger.info("Kafka consumer closed", { count: consumers.length }); } await redisClient.quit(); - logger$1.info("Redis client closed"); + logger.info("Redis client closed"); await dbManager.close(); - logger$1.info("Database connection closed"); + logger.info("Database connection closed"); process.exit(0); } catch (err) { - logger$1.error("Error during shutdown", { error: err?.message }); + logger.error("Error during shutdown", { error: err?.message }); process.exit(1); } }; @@ -852,6 +1209,6 @@ const bootstrap = async () => { process.on("SIGINT", () => shutdown("SIGINT")); }; bootstrap().catch((error) => { - logger$1.error("Service bootstrap failed", { error: error?.message }); + logger.error("Service bootstrap failed", { error: error?.message }); process.exit(1); }); diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/proposal.md b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/proposal.md new file mode 100644 index 0000000..1ef4611 --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/proposal.md @@ -0,0 +1,18 @@ +# Change: Optimize Kafka Consumption Performance + +## Why +User reports extremely slow Kafka consumption. Current implementation processes and inserts messages one-by-one, which creates a bottleneck at the database network round-trip time (RTT). + +## What Changes +- **New Requirement**: Implement Batch Processing for Kafka messages. +- **Refactor**: Decouple message parsing from insertion in `processor`. +- **Logic**: + - Accumulate messages in a buffer (e.g., 500ms or 500 items). + - Perform Batch Insert into PostgreSQL. + - Implement Row-by-Row fallback for batch failures (to isolate bad data). + - Handle DB connection errors with retry loop at batch level. + +## Impact +- Affected specs: `onoffline` +- Affected code: `src/index.js`, `src/processor/index.js` +- Performance: Expected 10x-100x throughput increase. diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/specs/onoffline/spec.md new file mode 100644 index 0000000..6b0b52b --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/specs/onoffline/spec.md @@ -0,0 +1,13 @@ +## ADDED Requirements +### Requirement: 批量消费与写入 +系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。 + +#### Scenario: 批量写入 +- **GIVEN** 短时间内收到多条消息 (e.g., 500条) +- **WHEN** 缓冲区满或超时 (e.g., 200ms) +- **THEN** 执行一次批量数据库插入操作 + +#### Scenario: 写入失败降级 +- **GIVEN** 批量写入因数据错误失败 (非连接错误) +- **WHEN** 捕获异常 +- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库 diff --git a/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/tasks.md b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/tasks.md new file mode 100644 index 0000000..54e3a35 --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/archive/2026-02-04-optimize-kafka-consumption/tasks.md @@ -0,0 +1,5 @@ +## 1. Implementation +- [ ] Refactor `src/processor/index.js` to export `parseMessageToRows` +- [ ] Implement `BatchProcessor` logic in `src/index.js` +- [ ] Update `handleMessage` to use `BatchProcessor` +- [ ] Verify performance improvement diff --git a/bls-onoffline-backend/openspec/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/specs/onoffline/spec.md index a3dbeb7..93ce015 100644 --- a/bls-onoffline-backend/openspec/specs/onoffline/spec.md +++ b/bls-onoffline-backend/openspec/specs/onoffline/spec.md @@ -83,3 +83,16 @@ - **WHEN** 解析时间戳 - **THEN** 自动乘以 1000 转换为毫秒 +### Requirement: 批量消费与写入 +系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。 + +#### Scenario: 批量写入 +- **GIVEN** 短时间内收到多条消息 (e.g., 500条) +- **WHEN** 缓冲区满或超时 (e.g., 200ms) +- **THEN** 执行一次批量数据库插入操作 + +#### Scenario: 写入失败降级 +- **GIVEN** 批量写入因数据错误失败 (非连接错误) +- **WHEN** 捕获异常 +- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库 + diff --git a/bls-onoffline-backend/src/config/config.js b/bls-onoffline-backend/src/config/config.js index 8d27b2e..67fd8d3 100644 --- a/bls-onoffline-backend/src/config/config.js +++ b/bls-onoffline-backend/src/config/config.js @@ -22,11 +22,15 @@ export const config = { groupId: process.env.KAFKA_GROUP_ID || 'bls-onoffline-group', clientId: process.env.KAFKA_CLIENT_ID || 'bls-onoffline-client', consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), - maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50), - fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024), - fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1), + maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 20000), + fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024), + fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024), fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5000), + commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200), + commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === 'true', + batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5000), + batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50), logMessages: process.env.KAFKA_LOG_MESSAGES === 'true', sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain', diff --git a/bls-onoffline-backend/src/db/databaseManager.js b/bls-onoffline-backend/src/db/databaseManager.js index 1b9a993..9c807de 100644 --- a/bls-onoffline-backend/src/db/databaseManager.js +++ b/bls-onoffline-backend/src/db/databaseManager.js @@ -35,22 +35,27 @@ export class DatabaseManager { if (!rows || rows.length === 0) { return; } - const values = []; - const placeholders = rows.map((row, rowIndex) => { - const offset = rowIndex * columns.length; - columns.forEach((column) => { - values.push(row[column] ?? null); - }); - const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`); - return `(${params.join(', ')})`; - }); const statement = ` INSERT INTO ${schema}.${table} (${columns.join(', ')}) - VALUES ${placeholders.join(', ')} + SELECT * + FROM UNNEST( + $1::text[], + $2::int8[], + $3::int8[], + $4::int2[], + $5::text[], + $6::text[], + $7::text[], + $8::text[], + $9::text[], + $10::text[], + $11::text[] + ) ON CONFLICT DO NOTHING `; try { - await this.pool.query(statement, values); + const params = columns.map((column) => rows.map((row) => row[column] ?? null)); + await this.pool.query(statement, params); } catch (error) { logger.error('Database insert failed', { error: error?.message, diff --git a/bls-onoffline-backend/src/db/initializer.js b/bls-onoffline-backend/src/db/initializer.js index 1f83cc2..9f2b44c 100644 --- a/bls-onoffline-backend/src/db/initializer.js +++ b/bls-onoffline-backend/src/db/initializer.js @@ -23,6 +23,7 @@ class DatabaseInitializer { // 3. Ensure Partitions for the next month await partitionManager.ensurePartitions(30); + await partitionManager.ensureIndexesForExistingPartitions(); console.log('Database initialization completed successfully.'); logger.info('Database initialization completed successfully.'); diff --git a/bls-onoffline-backend/src/db/partitionManager.js b/bls-onoffline-backend/src/db/partitionManager.js index 0b57fae..c55d819 100644 --- a/bls-onoffline-backend/src/db/partitionManager.js +++ b/bls-onoffline-backend/src/db/partitionManager.js @@ -26,6 +26,75 @@ class PartitionManager { return { startMs, endMs, partitionSuffix }; } + async ensurePartitionIndexes(client, schema, table, partitionSuffix) { + const startedAt = Date.now(); + const partitionName = `${schema}.${table}_${partitionSuffix}`; + const indexBase = `${table}_${partitionSuffix}`; + + const indexSpecs = [ + { name: `idx_${indexBase}_ts`, column: 'ts_ms' }, + { name: `idx_${indexBase}_hid`, column: 'hotel_id' }, + { name: `idx_${indexBase}_mac`, column: 'mac' }, + { name: `idx_${indexBase}_did`, column: 'device_id' }, + { name: `idx_${indexBase}_rid`, column: 'room_id' }, + { name: `idx_${indexBase}_cs`, column: 'current_status' } + ]; + + for (const spec of indexSpecs) { + await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`); + } + + await client.query(`ANALYZE ${partitionName};`); + + const elapsedMs = Date.now() - startedAt; + if (elapsedMs > 1000) { + logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs }); + } + } + + async ensureIndexesForExistingPartitions() { + const startedAt = Date.now(); + const client = await dbManager.pool.connect(); + try { + const schema = config.db.schema; + const table = config.db.table; + + const res = await client.query( + ` + SELECT c.relname AS relname + FROM pg_inherits i + JOIN pg_class p ON i.inhparent = p.oid + JOIN pg_namespace pn ON pn.oid = p.relnamespace + JOIN pg_class c ON i.inhrelid = c.oid + WHERE pn.nspname = $1 AND p.relname = $2 + ORDER BY c.relname; + `, + [schema, table] + ); + + const suffixes = new Set(); + const pattern = new RegExp(`^${table}_(\\d{8})$`); + for (const row of res.rows) { + const relname = row?.relname; + if (typeof relname !== 'string') continue; + const match = relname.match(pattern); + if (!match) continue; + suffixes.add(match[1]); + } + + for (const suffix of suffixes) { + await this.ensurePartitionIndexes(client, schema, table, suffix); + } + + const elapsedMs = Date.now() - startedAt; + if (elapsedMs > 5000) { + logger.warn('Ensure existing partition indexes slow', { schema, table, partitions: suffixes.size, elapsedMs }); + } + } finally { + client.release(); + } + } + /** * Ensure partitions exist for the past M days and next N days. * @param {number} daysAhead - Number of days to pre-create. @@ -63,6 +132,8 @@ class PartitionManager { `; await client.query(createSql); } + + await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); } logger.info('Partition check completed.'); } catch (err) { @@ -72,6 +143,55 @@ class PartitionManager { client.release(); } } + + async ensurePartitionsForTimestamps(tsMsList) { + if (!Array.isArray(tsMsList) || tsMsList.length === 0) return; + + const uniqueSuffixes = new Set(); + for (const ts of tsMsList) { + const numericTs = typeof ts === 'string' ? Number(ts) : ts; + if (!Number.isFinite(numericTs)) continue; + const date = new Date(numericTs); + if (Number.isNaN(date.getTime())) continue; + const { partitionSuffix } = this.getPartitionInfo(date); + uniqueSuffixes.add(partitionSuffix); + if (uniqueSuffixes.size >= 400) break; + } + + if (uniqueSuffixes.size === 0) return; + + const client = await dbManager.pool.connect(); + try { + const schema = config.db.schema; + const table = config.db.table; + + for (const partitionSuffix of uniqueSuffixes) { + const yyyy = Number(partitionSuffix.slice(0, 4)); + const mm = Number(partitionSuffix.slice(4, 6)); + const dd = Number(partitionSuffix.slice(6, 8)); + if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue; + const targetDate = new Date(yyyy, mm - 1, dd); + if (Number.isNaN(targetDate.getTime())) continue; + + const { startMs, endMs } = this.getPartitionInfo(targetDate); + const partitionName = `${schema}.${table}_${partitionSuffix}`; + + const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]); + if (!checkRes.rows[0].exists) { + logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`); + await client.query(` + CREATE TABLE IF NOT EXISTS ${partitionName} + PARTITION OF ${schema}.${table} + FOR VALUES FROM (${startMs}) TO (${endMs}); + `); + } + + await this.ensurePartitionIndexes(client, schema, table, partitionSuffix); + } + } finally { + client.release(); + } + } } export default new PartitionManager(); diff --git a/bls-onoffline-backend/src/index.js b/bls-onoffline-backend/src/index.js index da1b020..0964284 100644 --- a/bls-onoffline-backend/src/index.js +++ b/bls-onoffline-backend/src/index.js @@ -4,7 +4,7 @@ import dbManager from './db/databaseManager.js'; import dbInitializer from './db/initializer.js'; import partitionManager from './db/partitionManager.js'; import { createKafkaConsumers } from './kafka/consumer.js'; -import { processKafkaMessage } from './processor/index.js'; +import { parseMessageToRows } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; import { RedisIntegration } from './redis/redisIntegration.js'; import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js'; @@ -76,7 +76,9 @@ const bootstrap = async () => { // 1.1 Setup Metric Reporting Cron Job (Every minute) cron.schedule('* * * * *', async () => { const metrics = metricCollector.getAndReset(); - const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`; + const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : '0.0'; + const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : '0.0'; + const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}, FlushAvgMs: ${flushAvgMs}, DbAvgMs: ${dbAvgMs}, PulledByPartition: ${JSON.stringify(metrics.keyed?.kafka_pulled_by_partition || {})}, InsertedByPartition: ${JSON.stringify(metrics.keyed?.db_inserted_by_partition || {})}, FailedByPartition: ${JSON.stringify(metrics.keyed?.db_failed_by_partition || {})}, InsertedByDay: ${JSON.stringify(metrics.keyed?.db_inserted_by_day || {})}, DbMsByDay: ${JSON.stringify(metrics.keyed?.db_insert_ms_sum_by_day || {})}`; console.log(report); logger.info(report, metrics); @@ -125,87 +127,318 @@ const bootstrap = async () => { } }; - const handleMessage = async (message) => { + const configuredBatchSize = Number.isFinite(config.kafka.batchSize) ? config.kafka.batchSize : 1000; + const configuredBatchTimeoutMs = Number.isFinite(config.kafka.batchTimeoutMs) ? config.kafka.batchTimeoutMs : 20; + const configuredMaxInFlight = Number.isFinite(config.kafka.maxInFlight) ? config.kafka.maxInFlight : 5000; + + const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); + const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); + const commitOnAttempt = config.kafka.commitOnAttempt === true; + + const batchStates = new Map(); + + const partitionKeyFromMessage = (message) => { + if (message?.topic !== undefined && message?.partition !== undefined) { + return `${message.topic}-${message.partition}`; + } + return 'retry'; + }; + + const dayKeyFromTsMs = (tsMs) => { + const numeric = typeof tsMs === 'string' ? Number(tsMs) : tsMs; + if (!Number.isFinite(numeric)) return null; + const d = new Date(numeric); + if (Number.isNaN(d.getTime())) return null; + const yyyy = d.getFullYear(); + const mm = String(d.getMonth() + 1).padStart(2, '0'); + const dd = String(d.getDate()).padStart(2, '0'); + return `${yyyy}${mm}${dd}`; + }; + + const getBatchState = (key) => { + if (!batchStates.has(key)) { + batchStates.set(key, { items: [], timer: null, flushing: null }); + } + return batchStates.get(key); + }; + + const isDbConnectionError = (err) => { + const code = err?.code; + if (typeof code === 'string') { + const networkCodes = new Set([ + 'ECONNREFUSED', + 'ECONNRESET', + 'EPIPE', + 'ETIMEDOUT', + 'ENOTFOUND', + 'EHOSTUNREACH', + 'ENETUNREACH', + '57P03', + '08006', + '08001', + '08000', + '08003' + ]); + if (networkCodes.has(code)) return true; + } + + const message = typeof err?.message === 'string' ? err.message : ''; + if (!message) return false; + const lower = message.toLowerCase(); + return ( + lower.includes('connection timeout') || + lower.includes('connection terminated') || + lower.includes('connection refused') || + lower.includes('terminating connection') || + lower.includes('econnrefused') || + lower.includes('econnreset') || + lower.includes('etimedout') || + lower.includes('could not connect') || + lower.includes('the database system is starting up') || + lower.includes('no pg_hba.conf entry') + ); + }; + + const isMissingPartitionError = (err) => + err?.code === '23514' || + (typeof err?.message === 'string' && err.message.includes('no partition of relation')); + + const insertRowsWithRetry = async (rows) => { + const startedAt = Date.now(); + let attemptedPartitionFix = false; + while (true) { + try { + await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + metricCollector.increment('db_insert_count', 1); + metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); + return; + } catch (err) { + if (isDbConnectionError(err)) { + logger.error('Database offline during batch insert. Retrying in 5s...', { error: err.message }); + await new Promise(r => setTimeout(r, 5000)); + while (!(await dbManager.checkConnection())) { + logger.warn('Database still offline. Waiting 5s...'); + await new Promise(r => setTimeout(r, 5000)); + } + continue; + } + if (isMissingPartitionError(err) && !attemptedPartitionFix) { + attemptedPartitionFix = true; + try { + await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms)); + } catch (partitionErr) { + if (isDbConnectionError(partitionErr)) { + logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message }); + await new Promise(r => setTimeout(r, 5000)); + while (!(await dbManager.checkConnection())) { + logger.warn('Database still offline. Waiting 5s...'); + await new Promise(r => setTimeout(r, 5000)); + } + continue; + } + throw partitionErr; + } + continue; + } + throw err; + } + } + }; + + const insertRowsOnce = async (rows) => { + const startedAt = Date.now(); + await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + metricCollector.increment('db_insert_count', 1); + metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); + }; + + const resolveInsertedItems = (partitionKey, items) => { + let insertedRows = 0; + for (const p of items) { + insertedRows += p.rows.length; + const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); + if (dayKey) { + metricCollector.incrementKeyed('db_inserted_by_day', dayKey, p.rows.length); + } + p.item.resolve(); + } + metricCollector.increment('db_inserted', insertedRows); + metricCollector.incrementKeyed('db_inserted_by_partition', partitionKey, insertedRows); + }; + + const handleFailedItem = async (partitionKey, p, err) => { + metricCollector.increment('db_failed'); + metricCollector.incrementKeyed('db_failed_by_partition', partitionKey, 1); + const dayKey = dayKeyFromTsMs(p.rows?.[0]?.ts_ms); + if (dayKey) { + metricCollector.incrementKeyed('db_failed_by_day', dayKey, 1); + } + await handleError(err, p.item.message); + p.item.resolve(); + }; + + const insertItemsDegraded = async (partitionKey, items) => { + if (items.length === 0) return; + const rows = items.flatMap(p => p.rows); + if (commitOnAttempt) { + try { + await insertRowsOnce(rows); + resolveInsertedItems(partitionKey, items); + } catch (err) { + for (const item of items) { + await handleFailedItem(partitionKey, item, err); + } + } + return; + } + try { + await insertRowsWithRetry(rows); + resolveInsertedItems(partitionKey, items); + return; + } catch (err) { + if (items.length === 1) { + try { + await insertRowsWithRetry(items[0].rows); + resolveInsertedItems(partitionKey, items); + } catch (innerErr) { + await handleFailedItem(partitionKey, items[0], innerErr); + } + return; + } + const mid = Math.floor(items.length / 2); + await insertItemsDegraded(partitionKey, items.slice(0, mid)); + await insertItemsDegraded(partitionKey, items.slice(mid)); + } + }; + + const flushBatchForKey = async (partitionKey) => { + const state = getBatchState(partitionKey); + if (state.flushing) return state.flushing; + + state.flushing = (async () => { + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; + } + + if (state.items.length === 0) return; + + const startedAt = Date.now(); + const currentBatch = state.items; + state.items = []; + + const pendingDbItems = []; + const unresolvedItems = []; + + try { + for (const item of currentBatch) { + try { + const rows = parseMessageToRows(item.message); + pendingDbItems.push({ item, rows }); + unresolvedItems.push(item); + } catch (err) { + metricCollector.increment('parse_error'); + metricCollector.incrementKeyed('parse_error_by_partition', partitionKey, 1); + logger.error('Message processing failed (Parse/Validation)', { error: err.message }); + await handleError(err, item.message); + item.resolve(); + } + } + + if (pendingDbItems.length > 0) { + const firstTs = pendingDbItems[0]?.rows?.[0]?.ts_ms; + const dayKey = dayKeyFromTsMs(firstTs); + if (dayKey) { + const dayStartMs = Date.now(); + await insertItemsDegraded(partitionKey, pendingDbItems); + metricCollector.incrementKeyed('db_insert_ms_sum_by_day', dayKey, Date.now() - dayStartMs); + } else { + await insertItemsDegraded(partitionKey, pendingDbItems); + } + } + + metricCollector.increment('batch_flush_count', 1); + metricCollector.increment('batch_flush_ms_sum', Date.now() - startedAt); + } catch (err) { + if (!commitOnAttempt && isDbConnectionError(err)) { + state.items = unresolvedItems.concat(state.items); + if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, 5000); + } + return; + } + + logger.error('Batch flush failed (non-network). Marking as consumed', { + error: err?.message, + partitionKey, + batchSize: currentBatch.length + }); + + for (const item of unresolvedItems) { + try { + await handleError(err, item.message); + } catch {} + item.resolve(); + } + } + })().finally(() => { + state.flushing = null; + if (state.items.length > 0) { + if (state.items.length >= BATCH_SIZE) { + flushBatchForKey(partitionKey); + } else if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, BATCH_TIMEOUT_MS); + } + } + }); + + return state.flushing; + }; + + const handleMessage = (message) => { if (message.topic) { metricCollector.increment('kafka_pulled'); + metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1); } - const messageValue = Buffer.isBuffer(message.value) - ? message.value.toString('utf8') - : message.value; - const messageKey = Buffer.isBuffer(message.key) - ? message.key.toString('utf8') - : message.key; + // const messageValue = Buffer.isBuffer(message.value) + // ? message.value.toString('utf8') + // : message.value; + // const messageKey = Buffer.isBuffer(message.key) + // ? message.key.toString('utf8') + // : message.key; - const logDetails = { - topic: message.topic, - partition: message.partition, - offset: message.offset, - key: messageKey, - value: config.kafka.logMessages ? messageValue : undefined, - valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null - }; + // const logDetails = { + // topic: message.topic, + // partition: message.partition, + // offset: message.offset, + // key: messageKey, + // value: config.kafka.logMessages ? messageValue : undefined, + // valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null + // }; // logger.info('Kafka message received', logDetails); - while (true) { - try { - const inserted = await processKafkaMessage({ message, dbManager, config }); - metricCollector.increment('db_inserted'); - // logger.info('Kafka message processed', { inserted }); - return; // Success, allowing commit - } catch (error) { - // Identify DB connection errors - const isDbConnectionError = - (error.code && ['ECONNREFUSED', '57P03', '08006', '08001', 'EADDRINUSE', 'ETIMEDOUT'].includes(error.code)) || - (error.message && ( - error.message.includes('ECONNREFUSED') || - error.message.includes('connection') || - error.message.includes('terminated') || - error.message.includes('EADDRINUSE') || - error.message.includes('ETIMEDOUT') || - error.message.includes('The server does not support SSL connections') // Possible if DB restarts without SSL - )); + const partitionKey = partitionKeyFromMessage(message); + const state = getBatchState(partitionKey); - if (isDbConnectionError) { - logger.error('Database offline. Pausing consumption for 1 minute...', { error: error.message }); - // metricCollector.increment('db_failed'); // Maybe not count as fail since we retry? User didn't specify. - - // Wait 1 minute before checking - await new Promise(resolve => setTimeout(resolve, 60000)); - - // Check connection loop - while (true) { - const isConnected = await dbManager.checkConnection(); - if (isConnected) { - logger.info('Database connection restored. Resuming processing...'); - break; // Break check loop to retry processing - } - logger.warn('Database still offline. Waiting 1 minute...'); - await new Promise(resolve => setTimeout(resolve, 60000)); - } - } else { - // Non-connection error (Data error, Parse error, etc.) - if (error.type === 'PARSE_ERROR') { - metricCollector.increment('parse_error'); - } else { - metricCollector.increment('db_failed'); - } - - logger.error('Message processing failed (Data/Logic Error), skipping message', { - error: error?.message, - type: error?.type - }); - - // Enqueue to error queue - await handleError(error, message); - - // For non-connection errors, we must skip this message and commit the offset - // so we don't get stuck in an infinite retry loop. - return; - } + return new Promise((resolve, reject) => { + state.items.push({ message, resolve, reject }); + if (state.items.length >= BATCH_SIZE) { + flushBatchForKey(partitionKey); + } else if (!state.timer) { + state.timer = setTimeout(() => { + state.timer = null; + flushBatchForKey(partitionKey); + }, BATCH_TIMEOUT_MS); } - } + }); }; const consumers = createKafkaConsumers({ diff --git a/bls-onoffline-backend/src/kafka/consumer.js b/bls-onoffline-backend/src/kafka/consumer.js index debc781..4c14cc3 100644 --- a/bls-onoffline-backend/src/kafka/consumer.js +++ b/bls-onoffline-backend/src/kafka/consumer.js @@ -9,10 +9,39 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = const kafkaHost = kafkaConfig.brokers.join(','); const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; const id = `${clientId}-${process.pid}-${Date.now()}`; - const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50; + const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5000; + const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200; let inFlight = 0; const tracker = new OffsetTracker(); + let pendingCommits = new Map(); // key: `${topic}-${partition}` -> { topic, partition, offset } + let commitTimer = null; + + const flushCommits = () => { + if (pendingCommits.size === 0) return; + const batch = pendingCommits; + pendingCommits = new Map(); + + consumer.sendOffsetCommitRequest( + Array.from(batch.values()), + (err) => { + if (err) { + for (const [k, v] of batch.entries()) { + pendingCommits.set(k, v); + } + logger.error('Kafka commit failed', { error: err?.message, count: batch.size }); + } + } + ); + }; + + const scheduleCommitFlush = () => { + if (commitTimer) return; + commitTimer = setTimeout(() => { + commitTimer = null; + flushCommits(); + }, commitIntervalMs); + }; const consumer = new ConsumerGroup( { @@ -47,23 +76,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = consumer.pause(); } Promise.resolve(onMessage(message)) - .then(() => { - // Mark message as done and check if we can commit - const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); - - if (commitOffset !== null) { - consumer.sendOffsetCommitRequest([{ - topic: message.topic, - partition: message.partition, - offset: commitOffset, - metadata: 'm' - }], (err) => { - if (err) { - logger.error('Kafka commit failed', { error: err?.message, topic: message.topic, partition: message.partition, offset: commitOffset }); - } - }); - } - }) + .then(() => {}) .catch((error) => { logger.error('Kafka message handling failed', { error: error?.message }); if (onError) { @@ -71,6 +84,17 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = } }) .finally(() => { + const commitOffset = tracker.markDone(message.topic, message.partition, message.offset); + if (commitOffset !== null) { + const key = `${message.topic}-${message.partition}`; + pendingCommits.set(key, { + topic: message.topic, + partition: message.partition, + offset: commitOffset, + metadata: 'm' + }); + scheduleCommitFlush(); + } inFlight -= 1; tryResume(); }); @@ -95,6 +119,12 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = groupId: kafkaConfig.groupId, clientId: clientId }); + tracker.clear(); + pendingCommits.clear(); + if (commitTimer) { + clearTimeout(commitTimer); + commitTimer = null; + } }); consumer.on('rebalanced', () => { @@ -119,6 +149,11 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) = }); consumer.on('close', () => { + if (commitTimer) { + clearTimeout(commitTimer); + commitTimer = null; + } + flushCommits(); logger.warn(`Kafka Consumer closed`, { groupId: kafkaConfig.groupId, clientId: clientId diff --git a/bls-onoffline-backend/src/kafka/offsetTracker.js b/bls-onoffline-backend/src/kafka/offsetTracker.js index c4ee32c..7ba557c 100644 --- a/bls-onoffline-backend/src/kafka/offsetTracker.js +++ b/bls-onoffline-backend/src/kafka/offsetTracker.js @@ -1,6 +1,6 @@ export class OffsetTracker { constructor() { - // Map> + // Map }> this.partitions = new Map(); } @@ -8,38 +8,46 @@ export class OffsetTracker { add(topic, partition, offset) { const key = `${topic}-${partition}`; if (!this.partitions.has(key)) { - this.partitions.set(key, []); + this.partitions.set(key, { nextCommitOffset: null, done: new Set() }); + } + const state = this.partitions.get(key); + const numericOffset = Number(offset); + if (!Number.isFinite(numericOffset)) return; + if (state.nextCommitOffset === null) { + state.nextCommitOffset = numericOffset; + } else if (numericOffset < state.nextCommitOffset) { + state.nextCommitOffset = numericOffset; } - this.partitions.get(key).push({ offset, done: false }); } // Called when a message is successfully processed // Returns the next offset to commit (if any advancement is possible), or null markDone(topic, partition, offset) { const key = `${topic}-${partition}`; - const list = this.partitions.get(key); - if (!list) return null; + const state = this.partitions.get(key); + if (!state) return null; - const item = list.find(i => i.offset === offset); - if (item) { - item.done = true; + const numericOffset = Number(offset); + if (!Number.isFinite(numericOffset)) return null; + + state.done.add(numericOffset); + + if (state.nextCommitOffset === null) { + state.nextCommitOffset = numericOffset; } - // Find the highest continuous committed offset - // We can remove items from the front as long as they are done - let lastDoneOffset = null; - let itemsRemoved = false; - - while (list.length > 0 && list[0].done) { - lastDoneOffset = list[0].offset; - list.shift(); - itemsRemoved = true; + let advanced = false; + while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) { + state.done.delete(state.nextCommitOffset); + state.nextCommitOffset += 1; + advanced = true; } - if (itemsRemoved && lastDoneOffset !== null) { - return lastDoneOffset + 1; // Kafka expects the *next* offset to fetch - } - - return null; + if (!advanced) return null; + return state.nextCommitOffset; + } + + clear() { + this.partitions.clear(); } } diff --git a/bls-onoffline-backend/src/processor/index.js b/bls-onoffline-backend/src/processor/index.js index 5224d75..1fb6f69 100644 --- a/bls-onoffline-backend/src/processor/index.js +++ b/bls-onoffline-backend/src/processor/index.js @@ -74,37 +74,36 @@ export const buildRowsFromPayload = (rawPayload) => { return [row]; }; +export const parseMessageToRows = (message) => { + const rawValue = message.value.toString(); + // logger.info('Processing message', { offset: message.offset, rawValuePreview: rawValue.substring(0, 100) }); + + let payload; + try { + payload = JSON.parse(rawValue); + } catch (e) { + const error = new Error(`JSON Parse Error: ${e.message}`); + error.type = 'PARSE_ERROR'; + throw error; + } + + // logger.info('Payload parsed', { payload }); + + const validationResult = kafkaPayloadSchema.safeParse(payload); + + if (!validationResult.success) { + const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); + error.type = 'VALIDATION_ERROR'; + throw error; + } + + return buildRowsFromPayload(payload); +}; + export const processKafkaMessage = async ({ message, dbManager, config }) => { let rows; try { - const rawValue = message.value.toString(); - // logger.info('Processing message', { offset: message.offset, rawValuePreview: rawValue.substring(0, 100) }); - - let payload; - try { - payload = JSON.parse(rawValue); - } catch (e) { - logger.error('JSON Parse Error', { error: e.message, rawValue }); - const error = new Error(`JSON Parse Error: ${e.message}`); - error.type = 'PARSE_ERROR'; - throw error; - } - - // logger.info('Payload parsed', { payload }); - - const validationResult = kafkaPayloadSchema.safeParse(payload); - - if (!validationResult.success) { - logger.error('Schema Validation Failed', { - errors: validationResult.error.errors, - payload - }); - const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`); - error.type = 'VALIDATION_ERROR'; - throw error; - } - - rows = buildRowsFromPayload(payload); + rows = parseMessageToRows(message); } catch (error) { throw error; } diff --git a/bls-onoffline-backend/src/utils/logger.js b/bls-onoffline-backend/src/utils/logger.js index 5a60c4b..a671e5a 100644 --- a/bls-onoffline-backend/src/utils/logger.js +++ b/bls-onoffline-backend/src/utils/logger.js @@ -14,5 +14,8 @@ export const logger = { }, error(message, context) { process.stderr.write(`${format('error', message, context)}\n`); + }, + warn(message, context) { + process.stderr.write(`${format('warn', message, context)}\n`); } }; diff --git a/bls-onoffline-backend/src/utils/metricCollector.js b/bls-onoffline-backend/src/utils/metricCollector.js index 29ac3cc..dc5b3af 100644 --- a/bls-onoffline-backend/src/utils/metricCollector.js +++ b/bls-onoffline-backend/src/utils/metricCollector.js @@ -8,8 +8,13 @@ export class MetricCollector { kafka_pulled: 0, parse_error: 0, db_inserted: 0, - db_failed: 0 + db_failed: 0, + db_insert_count: 0, + db_insert_ms_sum: 0, + batch_flush_count: 0, + batch_flush_ms_sum: 0 }; + this.keyed = {}; } increment(metric, count = 1) { @@ -18,9 +23,21 @@ export class MetricCollector { } } + incrementKeyed(metric, key, count = 1) { + if (!key) return; + if (!this.keyed[metric]) { + this.keyed[metric] = {}; + } + if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) { + this.keyed[metric][key] = 0; + } + this.keyed[metric][key] += count; + } + getAndReset() { const current = { ...this.metrics }; + const keyed = JSON.parse(JSON.stringify(this.keyed)); this.reset(); - return current; + return { ...current, keyed }; } } diff --git a/bls-onoffline-backend/tests/processor.test.js b/bls-onoffline-backend/tests/processor.test.js index c8b4d47..a19fbfa 100644 --- a/bls-onoffline-backend/tests/processor.test.js +++ b/bls-onoffline-backend/tests/processor.test.js @@ -16,7 +16,7 @@ describe('Processor Logic', () => { it('should validate required fields', () => { expect(() => buildRowsFromPayload({})).toThrow(); - expect(() => buildRowsFromPayload({ ...basePayload, UnixTime: undefined })).toThrow(); + expect(() => buildRowsFromPayload({ ...basePayload, HotelCode: undefined })).toThrow(); }); it('should use current_status from payload for non-reboot data', () => {