2026-02-04 17:51:50 +08:00
import cron from "node-cron" ;
import dotenv from "dotenv" ;
import pg from "pg" ;
import fs from "fs" ;
import path from "path" ;
import { fileURLToPath } from "url" ;
import kafka from "kafka-node" ;
import { randomUUID } from "crypto" ;
import { z } from "zod" ;
import { createClient } from "redis" ;
dotenv . config ( ) ;
const parseNumber = ( value , defaultValue ) => {
const parsed = Number ( value ) ;
return Number . isFinite ( parsed ) ? parsed : defaultValue ;
} ;
const parseList = ( value ) => ( value || "" ) . split ( "," ) . map ( ( item ) => item . trim ( ) ) . filter ( Boolean ) ;
const config = {
env : process . env . NODE _ENV || "development" ,
port : parseNumber ( process . env . PORT , 3001 ) ,
kafka : {
brokers : parseList ( process . env . KAFKA _BROKERS ) ,
topic : process . env . KAFKA _TOPIC || process . env . KAFKA _TOPICS || "blwlog4Nodejs-rcu-onoffline-topic" ,
groupId : process . env . KAFKA _GROUP _ID || "bls-onoffline-group" ,
clientId : process . env . KAFKA _CLIENT _ID || "bls-onoffline-client" ,
consumerInstances : parseNumber ( process . env . KAFKA _CONSUMER _INSTANCES , 1 ) ,
2026-02-09 10:50:56 +08:00
maxInFlight : parseNumber ( process . env . KAFKA _MAX _IN _FLIGHT , 2e4 ) ,
fetchMaxBytes : parseNumber ( process . env . KAFKA _FETCH _MAX _BYTES , 50 * 1024 * 1024 ) ,
fetchMinBytes : parseNumber ( process . env . KAFKA _FETCH _MIN _BYTES , 256 * 1024 ) ,
2026-02-04 17:51:50 +08:00
fetchMaxWaitMs : parseNumber ( process . env . KAFKA _FETCH _MAX _WAIT _MS , 100 ) ,
autoCommitIntervalMs : parseNumber ( process . env . KAFKA _AUTO _COMMIT _INTERVAL _MS , 5e3 ) ,
2026-02-09 10:50:56 +08:00
commitIntervalMs : parseNumber ( process . env . KAFKA _COMMIT _INTERVAL _MS , 200 ) ,
commitOnAttempt : process . env . KAFKA _COMMIT _ON _ATTEMPT === "true" ,
batchSize : parseNumber ( process . env . KAFKA _BATCH _SIZE , 5e3 ) ,
batchTimeoutMs : parseNumber ( process . env . KAFKA _BATCH _TIMEOUT _MS , 50 ) ,
2026-02-04 17:51:50 +08:00
logMessages : process . env . KAFKA _LOG _MESSAGES === "true" ,
sasl : process . env . KAFKA _SASL _USERNAME && process . env . KAFKA _SASL _PASSWORD ? {
mechanism : process . env . KAFKA _SASL _MECHANISM || "plain" ,
username : process . env . KAFKA _SASL _USERNAME ,
password : process . env . KAFKA _SASL _PASSWORD
} : void 0
} ,
db : {
host : process . env . DB _HOST || process . env . POSTGRES _HOST || "localhost" ,
port : parseNumber ( process . env . DB _PORT || process . env . POSTGRES _PORT , 5432 ) ,
user : process . env . DB _USER || process . env . POSTGRES _USER || "postgres" ,
password : process . env . DB _PASSWORD || process . env . POSTGRES _PASSWORD || "" ,
database : process . env . DB _DATABASE || process . env . POSTGRES _DATABASE || "log_platform" ,
max : parseNumber ( process . env . DB _MAX _CONNECTIONS || process . env . POSTGRES _MAX _CONNECTIONS , 10 ) ,
ssl : process . env . DB _SSL === "true" ? { rejectUnauthorized : false } : void 0 ,
schema : process . env . DB _SCHEMA || "onoffline" ,
table : process . env . DB _TABLE || "onoffline_record"
} ,
redis : {
host : process . env . REDIS _HOST || "localhost" ,
port : parseNumber ( process . env . REDIS _PORT , 6379 ) ,
password : process . env . REDIS _PASSWORD || void 0 ,
db : parseNumber ( process . env . REDIS _DB , 0 ) ,
projectName : process . env . REDIS _PROJECT _NAME || "bls-onoffline" ,
apiBaseUrl : process . env . REDIS _API _BASE _URL || ` http://localhost: ${ parseNumber ( process . env . PORT , 3001 ) } `
}
} ;
const format = ( level , message , context ) => {
const payload = {
level ,
message ,
timestamp : Date . now ( ) ,
... context ? { context } : { }
} ;
return JSON . stringify ( payload ) ;
} ;
2026-02-09 10:50:56 +08:00
const logger = {
2026-02-04 17:51:50 +08:00
info ( message , context ) {
process . stdout . write ( ` ${ format ( "info" , message , context ) }
` );
} ,
error ( message , context ) {
process . stderr . write ( ` ${ format ( "error" , message , context ) }
2026-02-09 10:50:56 +08:00
` );
} ,
warn ( message , context ) {
process . stderr . write ( ` ${ format ( "warn" , message , context ) }
2026-02-04 17:51:50 +08:00
` );
}
} ;
const { Pool } = pg ;
const columns = [
"guid" ,
"ts_ms" ,
"write_ts_ms" ,
"hotel_id" ,
"mac" ,
"device_id" ,
"room_id" ,
"ip" ,
"current_status" ,
"launcher_version" ,
"reboot_reason"
] ;
class DatabaseManager {
constructor ( dbConfig ) {
this . pool = new Pool ( {
host : dbConfig . host ,
port : dbConfig . port ,
user : dbConfig . user ,
password : dbConfig . password ,
database : dbConfig . database ,
max : dbConfig . max ,
ssl : dbConfig . ssl
} ) ;
}
async insertRows ( { schema , table , rows } ) {
if ( ! rows || rows . length === 0 ) {
return ;
}
const statement = `
INSERT INTO $ { schema } . $ { table } ( $ { columns . join ( ", " ) } )
2026-02-09 10:50:56 +08:00
SELECT *
FROM UNNEST (
$1 : : text [ ] ,
$2 : : int8 [ ] ,
$3 : : int8 [ ] ,
$4 : : int2 [ ] ,
$5 : : text [ ] ,
$6 : : text [ ] ,
$7 : : text [ ] ,
$8 : : text [ ] ,
$9 : : text [ ] ,
$10 : : text [ ] ,
$11 : : text [ ]
)
2026-02-04 17:51:50 +08:00
ON CONFLICT DO NOTHING
` ;
try {
2026-02-09 10:50:56 +08:00
const params = columns . map ( ( column ) => rows . map ( ( row ) => row [ column ] ? ? null ) ) ;
await this . pool . query ( statement , params ) ;
2026-02-04 17:51:50 +08:00
} catch ( error ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Database insert failed" , {
2026-02-04 17:51:50 +08:00
error : error ? . message ,
schema ,
table ,
rowsLength : rows . length
} ) ;
throw error ;
}
}
async checkConnection ( ) {
let client ;
try {
const connectPromise = this . pool . connect ( ) ;
const timeoutPromise = new Promise ( ( _ , reject ) => {
setTimeout ( ( ) => reject ( new Error ( "Connection timeout" ) ) , 5e3 ) ;
} ) ;
try {
client = await Promise . race ( [ connectPromise , timeoutPromise ] ) ;
} catch ( raceError ) {
connectPromise . then ( ( c ) => c . release ( ) ) . catch ( ( ) => {
} ) ;
throw raceError ;
}
await client . query ( "SELECT 1" ) ;
return true ;
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Database check connection failed" , { error : err . message } ) ;
2026-02-04 17:51:50 +08:00
return false ;
} finally {
if ( client ) {
client . release ( ) ;
}
}
}
async close ( ) {
await this . pool . end ( ) ;
}
}
const dbManager = new DatabaseManager ( config . db ) ;
class PartitionManager {
/ * *
* Calculate the start and end timestamps ( milliseconds ) for a given date .
* @ param { Date } date - The date to calculate for .
* @ returns { Object } { startMs , endMs , partitionSuffix }
* /
getPartitionInfo ( date ) {
const yyyy = date . getFullYear ( ) ;
const mm = String ( date . getMonth ( ) + 1 ) . padStart ( 2 , "0" ) ;
const dd = String ( date . getDate ( ) ) . padStart ( 2 , "0" ) ;
const partitionSuffix = ` ${ yyyy } ${ mm } ${ dd } ` ;
const start = new Date ( date ) ;
start . setHours ( 0 , 0 , 0 , 0 ) ;
const startMs = start . getTime ( ) ;
const end = new Date ( date ) ;
end . setDate ( end . getDate ( ) + 1 ) ;
end . setHours ( 0 , 0 , 0 , 0 ) ;
const endMs = end . getTime ( ) ;
return { startMs , endMs , partitionSuffix } ;
}
2026-02-09 10:50:56 +08:00
async ensurePartitionIndexes ( client , schema , table , partitionSuffix ) {
const startedAt = Date . now ( ) ;
const partitionName = ` ${ schema } . ${ table } _ ${ partitionSuffix } ` ;
const indexBase = ` ${ table } _ ${ partitionSuffix } ` ;
const indexSpecs = [
{ name : ` idx_ ${ indexBase } _ts ` , column : "ts_ms" } ,
{ name : ` idx_ ${ indexBase } _hid ` , column : "hotel_id" } ,
{ name : ` idx_ ${ indexBase } _mac ` , column : "mac" } ,
{ name : ` idx_ ${ indexBase } _did ` , column : "device_id" } ,
{ name : ` idx_ ${ indexBase } _rid ` , column : "room_id" } ,
{ name : ` idx_ ${ indexBase } _cs ` , column : "current_status" }
] ;
for ( const spec of indexSpecs ) {
await client . query ( ` CREATE INDEX IF NOT EXISTS ${ spec . name } ON ${ partitionName } ( ${ spec . column } ); ` ) ;
}
await client . query ( ` ANALYZE ${ partitionName } ; ` ) ;
const elapsedMs = Date . now ( ) - startedAt ;
if ( elapsedMs > 1e3 ) {
logger . warn ( ` Partition index ensure slow ` , { partitionName , elapsedMs } ) ;
}
}
async ensureIndexesForExistingPartitions ( ) {
const startedAt = Date . now ( ) ;
const client = await dbManager . pool . connect ( ) ;
try {
const schema = config . db . schema ;
const table = config . db . table ;
const res = await client . query (
`
SELECT c . relname AS relname
FROM pg _inherits i
JOIN pg _class p ON i . inhparent = p . oid
JOIN pg _namespace pn ON pn . oid = p . relnamespace
JOIN pg _class c ON i . inhrelid = c . oid
WHERE pn . nspname = $1 AND p . relname = $2
ORDER BY c . relname ;
` ,
[ schema , table ]
) ;
const suffixes = /* @__PURE__ */ new Set ( ) ;
const pattern = new RegExp ( ` ^ ${ table } _( \\ d{8}) $ ` ) ;
for ( const row of res . rows ) {
const relname = row ? . relname ;
if ( typeof relname !== "string" ) continue ;
const match = relname . match ( pattern ) ;
if ( ! match ) continue ;
suffixes . add ( match [ 1 ] ) ;
}
for ( const suffix of suffixes ) {
await this . ensurePartitionIndexes ( client , schema , table , suffix ) ;
}
const elapsedMs = Date . now ( ) - startedAt ;
if ( elapsedMs > 5e3 ) {
logger . warn ( "Ensure existing partition indexes slow" , { schema , table , partitions : suffixes . size , elapsedMs } ) ;
}
} finally {
client . release ( ) ;
}
}
2026-02-04 17:51:50 +08:00
/ * *
* Ensure partitions exist for the past M days and next N days .
* @ param { number } daysAhead - Number of days to pre - create .
* @ param { number } daysBack - Number of days to look back .
* /
async ensurePartitions ( daysAhead = 30 , daysBack = 15 ) {
const client = await dbManager . pool . connect ( ) ;
try {
2026-02-09 10:50:56 +08:00
logger . info ( ` Starting partition check for the past ${ daysBack } days and next ${ daysAhead } days... ` ) ;
2026-02-04 17:51:50 +08:00
console . log ( ` Starting partition check for the past ${ daysBack } days and next ${ daysAhead } days... ` ) ;
const now = /* @__PURE__ */ new Date ( ) ;
for ( let i = - daysBack ; i < daysAhead ; i ++ ) {
const targetDate = new Date ( now ) ;
targetDate . setDate ( now . getDate ( ) + i ) ;
const { startMs , endMs , partitionSuffix } = this . getPartitionInfo ( targetDate ) ;
const schema = config . db . schema ;
const table = config . db . table ;
const partitionName = ` ${ schema } . ${ table } _ ${ partitionSuffix } ` ;
const checkSql = `
SELECT to _regclass ( $1 ) as exists ;
` ;
const checkRes = await client . query ( checkSql , [ partitionName ] ) ;
if ( ! checkRes . rows [ 0 ] . exists ) {
2026-02-09 10:50:56 +08:00
logger . info ( ` Creating partition ${ partitionName } for range [ ${ startMs } , ${ endMs } ) ` ) ;
2026-02-04 17:51:50 +08:00
console . log ( ` Creating partition ${ partitionName } for range [ ${ startMs } , ${ endMs } ) ` ) ;
const createSql = `
CREATE TABLE IF NOT EXISTS $ { partitionName }
PARTITION OF $ { schema } . $ { table }
FOR VALUES FROM ( $ { startMs } ) TO ( $ { endMs } ) ;
` ;
await client . query ( createSql ) ;
}
2026-02-09 10:50:56 +08:00
await this . ensurePartitionIndexes ( client , schema , table , partitionSuffix ) ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
logger . info ( "Partition check completed." ) ;
2026-02-04 17:51:50 +08:00
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Error ensuring partitions:" , err ) ;
2026-02-04 17:51:50 +08:00
throw err ;
} finally {
client . release ( ) ;
}
}
2026-02-09 10:50:56 +08:00
async ensurePartitionsForTimestamps ( tsMsList ) {
if ( ! Array . isArray ( tsMsList ) || tsMsList . length === 0 ) return ;
const uniqueSuffixes = /* @__PURE__ */ new Set ( ) ;
for ( const ts of tsMsList ) {
const numericTs = typeof ts === "string" ? Number ( ts ) : ts ;
if ( ! Number . isFinite ( numericTs ) ) continue ;
const date = new Date ( numericTs ) ;
if ( Number . isNaN ( date . getTime ( ) ) ) continue ;
const { partitionSuffix } = this . getPartitionInfo ( date ) ;
uniqueSuffixes . add ( partitionSuffix ) ;
if ( uniqueSuffixes . size >= 400 ) break ;
}
if ( uniqueSuffixes . size === 0 ) return ;
const client = await dbManager . pool . connect ( ) ;
try {
const schema = config . db . schema ;
const table = config . db . table ;
for ( const partitionSuffix of uniqueSuffixes ) {
const yyyy = Number ( partitionSuffix . slice ( 0 , 4 ) ) ;
const mm = Number ( partitionSuffix . slice ( 4 , 6 ) ) ;
const dd = Number ( partitionSuffix . slice ( 6 , 8 ) ) ;
if ( ! Number . isFinite ( yyyy ) || ! Number . isFinite ( mm ) || ! Number . isFinite ( dd ) ) continue ;
const targetDate = new Date ( yyyy , mm - 1 , dd ) ;
if ( Number . isNaN ( targetDate . getTime ( ) ) ) continue ;
const { startMs , endMs } = this . getPartitionInfo ( targetDate ) ;
const partitionName = ` ${ schema } . ${ table } _ ${ partitionSuffix } ` ;
const checkRes = await client . query ( ` SELECT to_regclass( $ 1) as exists; ` , [ partitionName ] ) ;
if ( ! checkRes . rows [ 0 ] . exists ) {
logger . info ( ` Creating partition ${ partitionName } for range [ ${ startMs } , ${ endMs } ) ` ) ;
await client . query ( `
CREATE TABLE IF NOT EXISTS $ { partitionName }
PARTITION OF $ { schema } . $ { table }
FOR VALUES FROM ( $ { startMs } ) TO ( $ { endMs } ) ;
` );
}
await this . ensurePartitionIndexes ( client , schema , table , partitionSuffix ) ;
}
} finally {
client . release ( ) ;
}
}
2026-02-04 17:51:50 +08:00
}
const partitionManager = new PartitionManager ( ) ;
const _ _filename$1 = fileURLToPath ( import . meta . url ) ;
const _ _dirname$1 = path . dirname ( _ _filename$1 ) ;
class DatabaseInitializer {
async initialize ( ) {
2026-02-09 10:50:56 +08:00
logger . info ( "Starting database initialization check..." ) ;
2026-02-04 17:51:50 +08:00
await this . ensureDatabaseExists ( ) ;
await this . ensureSchemaAndTable ( ) ;
await partitionManager . ensurePartitions ( 30 ) ;
2026-02-09 10:50:56 +08:00
await partitionManager . ensureIndexesForExistingPartitions ( ) ;
2026-02-04 17:51:50 +08:00
console . log ( "Database initialization completed successfully." ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Database initialization completed successfully." ) ;
2026-02-04 17:51:50 +08:00
}
async ensureDatabaseExists ( ) {
const { host , port , user , password , database , ssl } = config . db ;
console . log ( ` Checking if database ' ${ database } ' exists at ${ host } : ${ port } ... ` ) ;
const client = new pg . Client ( {
host ,
port ,
user ,
password ,
database : "postgres" ,
ssl : ssl ? { rejectUnauthorized : false } : false
} ) ;
try {
await client . connect ( ) ;
const checkRes = await client . query (
` SELECT 1 FROM pg_database WHERE datname = $ 1 ` ,
[ database ]
) ;
if ( checkRes . rowCount === 0 ) {
2026-02-09 10:50:56 +08:00
logger . info ( ` Database ' ${ database } ' does not exist. Creating... ` ) ;
2026-02-04 17:51:50 +08:00
await client . query ( ` CREATE DATABASE " ${ database } " ` ) ;
console . log ( ` Database ' ${ database } ' created. ` ) ;
2026-02-09 10:50:56 +08:00
logger . info ( ` Database ' ${ database } ' created. ` ) ;
2026-02-04 17:51:50 +08:00
} else {
console . log ( ` Database ' ${ database } ' already exists. ` ) ;
2026-02-09 10:50:56 +08:00
logger . info ( ` Database ' ${ database } ' already exists. ` ) ;
2026-02-04 17:51:50 +08:00
}
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Error ensuring database exists:" , err ) ;
2026-02-04 17:51:50 +08:00
throw err ;
} finally {
await client . end ( ) ;
}
}
async ensureSchemaAndTable ( ) {
const client = await dbManager . pool . connect ( ) ;
try {
const sqlPathCandidates = [
path . resolve ( process . cwd ( ) , "scripts/init_db.sql" ) ,
path . resolve ( _ _dirname$1 , "../scripts/init_db.sql" ) ,
path . resolve ( _ _dirname$1 , "../../scripts/init_db.sql" )
] ;
const sqlPath = sqlPathCandidates . find ( ( candidate ) => fs . existsSync ( candidate ) ) ;
if ( ! sqlPath ) {
throw new Error ( ` init_db.sql not found. Candidates: ${ sqlPathCandidates . join ( " | " ) } ` ) ;
}
const sql = fs . readFileSync ( sqlPath , "utf8" ) ;
console . log ( ` Executing init_db.sql from ${ sqlPath } ... ` ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Executing init_db.sql..." ) ;
2026-02-04 17:51:50 +08:00
await client . query ( sql ) ;
console . log ( "Schema and parent table initialized." ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Schema and parent table initialized." ) ;
2026-02-04 17:51:50 +08:00
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Error initializing schema and table:" , err ) ;
2026-02-04 17:51:50 +08:00
throw err ;
} finally {
client . release ( ) ;
}
}
}
const dbInitializer = new DatabaseInitializer ( ) ;
class OffsetTracker {
constructor ( ) {
this . partitions = /* @__PURE__ */ new Map ( ) ;
}
// Called when a message is received (before processing)
add ( topic , partition , offset ) {
const key = ` ${ topic } - ${ partition } ` ;
if ( ! this . partitions . has ( key ) ) {
2026-02-09 10:50:56 +08:00
this . partitions . set ( key , { nextCommitOffset : null , done : /* @__PURE__ */ new Set ( ) } ) ;
}
const state = this . partitions . get ( key ) ;
const numericOffset = Number ( offset ) ;
if ( ! Number . isFinite ( numericOffset ) ) return ;
if ( state . nextCommitOffset === null ) {
state . nextCommitOffset = numericOffset ;
} else if ( numericOffset < state . nextCommitOffset ) {
state . nextCommitOffset = numericOffset ;
2026-02-04 17:51:50 +08:00
}
}
// Called when a message is successfully processed
// Returns the next offset to commit (if any advancement is possible), or null
markDone ( topic , partition , offset ) {
const key = ` ${ topic } - ${ partition } ` ;
2026-02-09 10:50:56 +08:00
const state = this . partitions . get ( key ) ;
if ( ! state ) return null ;
const numericOffset = Number ( offset ) ;
if ( ! Number . isFinite ( numericOffset ) ) return null ;
state . done . add ( numericOffset ) ;
if ( state . nextCommitOffset === null ) {
state . nextCommitOffset = numericOffset ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
let advanced = false ;
while ( state . nextCommitOffset !== null && state . done . has ( state . nextCommitOffset ) ) {
state . done . delete ( state . nextCommitOffset ) ;
state . nextCommitOffset += 1 ;
advanced = true ;
}
if ( ! advanced ) return null ;
return state . nextCommitOffset ;
}
clear ( ) {
this . partitions . clear ( ) ;
2026-02-04 17:51:50 +08:00
}
}
const { ConsumerGroup } = kafka ;
const createOneConsumer = ( { kafkaConfig , onMessage , onError , instanceIndex } ) => {
const kafkaHost = kafkaConfig . brokers . join ( "," ) ;
const clientId = instanceIndex === 0 ? kafkaConfig . clientId : ` ${ kafkaConfig . clientId } - ${ instanceIndex } ` ;
const id = ` ${ clientId } - ${ process . pid } - ${ Date . now ( ) } ` ;
2026-02-09 10:50:56 +08:00
const maxInFlight = Number . isFinite ( kafkaConfig . maxInFlight ) ? kafkaConfig . maxInFlight : 5e3 ;
const commitIntervalMs = Number . isFinite ( kafkaConfig . commitIntervalMs ) ? kafkaConfig . commitIntervalMs : 200 ;
2026-02-04 17:51:50 +08:00
let inFlight = 0 ;
const tracker = new OffsetTracker ( ) ;
2026-02-09 10:50:56 +08:00
let pendingCommits = /* @__PURE__ */ new Map ( ) ;
let commitTimer = null ;
const flushCommits = ( ) => {
if ( pendingCommits . size === 0 ) return ;
const batch = pendingCommits ;
pendingCommits = /* @__PURE__ */ new Map ( ) ;
consumer . sendOffsetCommitRequest (
Array . from ( batch . values ( ) ) ,
( err ) => {
if ( err ) {
for ( const [ k , v ] of batch . entries ( ) ) {
pendingCommits . set ( k , v ) ;
}
logger . error ( "Kafka commit failed" , { error : err ? . message , count : batch . size } ) ;
}
}
) ;
} ;
const scheduleCommitFlush = ( ) => {
if ( commitTimer ) return ;
commitTimer = setTimeout ( ( ) => {
commitTimer = null ;
flushCommits ( ) ;
} , commitIntervalMs ) ;
} ;
2026-02-04 17:51:50 +08:00
const consumer = new ConsumerGroup (
{
kafkaHost ,
groupId : kafkaConfig . groupId ,
clientId ,
id ,
fromOffset : "earliest" ,
protocol : [ "roundrobin" ] ,
outOfRangeOffset : "latest" ,
autoCommit : false ,
autoCommitIntervalMs : kafkaConfig . autoCommitIntervalMs ,
fetchMaxBytes : kafkaConfig . fetchMaxBytes ,
fetchMinBytes : kafkaConfig . fetchMinBytes ,
fetchMaxWaitMs : kafkaConfig . fetchMaxWaitMs ,
sasl : kafkaConfig . sasl
} ,
kafkaConfig . topic
) ;
const tryResume = ( ) => {
if ( inFlight < maxInFlight && consumer . paused ) {
consumer . resume ( ) ;
}
} ;
consumer . on ( "message" , ( message ) => {
inFlight += 1 ;
tracker . add ( message . topic , message . partition , message . offset ) ;
if ( inFlight >= maxInFlight ) {
consumer . pause ( ) ;
}
Promise . resolve ( onMessage ( message ) ) . then ( ( ) => {
2026-02-09 10:50:56 +08:00
} ) . catch ( ( error ) => {
logger . error ( "Kafka message handling failed" , { error : error ? . message } ) ;
if ( onError ) {
onError ( error , message ) ;
}
} ) . finally ( ( ) => {
2026-02-04 17:51:50 +08:00
const commitOffset = tracker . markDone ( message . topic , message . partition , message . offset ) ;
if ( commitOffset !== null ) {
2026-02-09 10:50:56 +08:00
const key = ` ${ message . topic } - ${ message . partition } ` ;
pendingCommits . set ( key , {
2026-02-04 17:51:50 +08:00
topic : message . topic ,
partition : message . partition ,
offset : commitOffset ,
metadata : "m"
} ) ;
2026-02-09 10:50:56 +08:00
scheduleCommitFlush ( ) ;
2026-02-04 17:51:50 +08:00
}
inFlight -= 1 ;
tryResume ( ) ;
} ) ;
} ) ;
consumer . on ( "error" , ( error ) => {
2026-02-09 10:50:56 +08:00
logger . error ( "Kafka consumer error" , { error : error ? . message } ) ;
2026-02-04 17:51:50 +08:00
if ( onError ) {
onError ( error ) ;
}
} ) ;
consumer . on ( "connect" , ( ) => {
2026-02-09 10:50:56 +08:00
logger . info ( ` Kafka Consumer connected ` , {
2026-02-04 17:51:50 +08:00
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
consumer . on ( "rebalancing" , ( ) => {
2026-02-09 10:50:56 +08:00
logger . info ( ` Kafka Consumer rebalancing ` , {
2026-02-04 17:51:50 +08:00
groupId : kafkaConfig . groupId ,
clientId
} ) ;
2026-02-09 10:50:56 +08:00
tracker . clear ( ) ;
pendingCommits . clear ( ) ;
if ( commitTimer ) {
clearTimeout ( commitTimer ) ;
commitTimer = null ;
}
2026-02-04 17:51:50 +08:00
} ) ;
consumer . on ( "rebalanced" , ( ) => {
2026-02-09 10:50:56 +08:00
logger . info ( "Kafka Consumer rebalanced" , { clientId , groupId : kafkaConfig . groupId } ) ;
2026-02-04 17:51:50 +08:00
} ) ;
consumer . on ( "error" , ( err ) => {
2026-02-09 10:50:56 +08:00
logger . error ( "Kafka Consumer Error" , { error : err . message } ) ;
2026-02-04 17:51:50 +08:00
} ) ;
consumer . on ( "offsetOutOfRange" , ( err ) => {
2026-02-09 10:50:56 +08:00
logger . warn ( "Offset out of range" , { error : err . message , topic : err . topic , partition : err . partition } ) ;
2026-02-04 17:51:50 +08:00
} ) ;
consumer . on ( "offsetOutOfRange" , ( error ) => {
2026-02-09 10:50:56 +08:00
logger . warn ( ` Kafka Consumer offset out of range ` , {
2026-02-04 17:51:50 +08:00
error : error ? . message ,
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
consumer . on ( "close" , ( ) => {
2026-02-09 10:50:56 +08:00
if ( commitTimer ) {
clearTimeout ( commitTimer ) ;
commitTimer = null ;
}
flushCommits ( ) ;
logger . warn ( ` Kafka Consumer closed ` , {
2026-02-04 17:51:50 +08:00
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
return consumer ;
} ;
const createKafkaConsumers = ( { kafkaConfig , onMessage , onError } ) => {
const instances = Number . isFinite ( kafkaConfig . consumerInstances ) ? kafkaConfig . consumerInstances : 1 ;
const count = Math . max ( 1 , instances ) ;
return Array . from (
{ length : count } ,
( _ , idx ) => createOneConsumer ( { kafkaConfig , onMessage , onError , instanceIndex : idx } )
) ;
} ;
const createGuid = ( ) => randomUUID ( ) . replace ( /-/g , "" ) ;
const toNumber = ( value ) => {
if ( value === void 0 || value === null || value === "" ) {
return value ;
}
if ( typeof value === "number" ) {
return value ;
}
const parsed = Number ( value ) ;
return Number . isFinite ( parsed ) ? parsed : value ;
} ;
const toStringAllowEmpty = ( value ) => {
if ( value === void 0 || value === null ) {
return value ;
}
return String ( value ) ;
} ;
const kafkaPayloadSchema = z . object ( {
HotelCode : z . preprocess ( toNumber , z . number ( ) ) ,
MAC : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
HostNumber : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
RoomNumber : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
EndPoint : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
CurrentStatus : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
CurrentTime : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
UnixTime : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
LauncherVersion : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
RebootReason : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( )
} ) ;
const normalizeText = ( value , maxLength ) => {
if ( value === void 0 || value === null ) {
return null ;
}
const str = String ( value ) ;
if ( maxLength && str . length > maxLength ) {
return str . substring ( 0 , maxLength ) ;
}
return str ;
} ;
const buildRowsFromPayload = ( rawPayload ) => {
const payload = kafkaPayloadSchema . parse ( rawPayload ) ;
const rebootReason = normalizeText ( payload . RebootReason , 255 ) ;
const currentStatusRaw = normalizeText ( payload . CurrentStatus , 255 ) ;
const hasRebootReason = rebootReason !== null && rebootReason !== "" ;
const currentStatus = hasRebootReason ? "on" : currentStatusRaw ;
let tsMs = payload . UnixTime ;
if ( typeof tsMs === "number" && tsMs < 1e11 ) {
tsMs = tsMs * 1e3 ;
}
if ( ! tsMs && payload . CurrentTime ) {
const parsed = Date . parse ( payload . CurrentTime ) ;
if ( ! isNaN ( parsed ) ) {
tsMs = parsed ;
}
}
if ( ! tsMs ) {
tsMs = Date . now ( ) ;
}
const mac = normalizeText ( payload . MAC ) || "" ;
const deviceId = normalizeText ( payload . HostNumber ) || "" ;
const roomId = normalizeText ( payload . RoomNumber ) || "" ;
const row = {
guid : createGuid ( ) ,
ts _ms : tsMs ,
write _ts _ms : Date . now ( ) ,
hotel _id : payload . HotelCode ,
mac ,
device _id : deviceId ,
room _id : roomId ,
ip : normalizeText ( payload . EndPoint ) ,
current _status : currentStatus ,
launcher _version : normalizeText ( payload . LauncherVersion , 255 ) ,
reboot _reason : rebootReason
} ;
return [ row ] ;
} ;
2026-02-09 10:50:56 +08:00
const parseMessageToRows = ( message ) => {
const rawValue = message . value . toString ( ) ;
let payload ;
2026-02-04 17:51:50 +08:00
try {
2026-02-09 10:50:56 +08:00
payload = JSON . parse ( rawValue ) ;
} catch ( e ) {
const error = new Error ( ` JSON Parse Error: ${ e . message } ` ) ;
error . type = "PARSE_ERROR" ;
2026-02-04 17:51:50 +08:00
throw error ;
}
2026-02-09 10:50:56 +08:00
const validationResult = kafkaPayloadSchema . safeParse ( payload ) ;
if ( ! validationResult . success ) {
const error = new Error ( ` Schema Validation Failed: ${ JSON . stringify ( validationResult . error . errors ) } ` ) ;
error . type = "VALIDATION_ERROR" ;
2026-02-04 17:51:50 +08:00
throw error ;
}
2026-02-09 10:50:56 +08:00
return buildRowsFromPayload ( payload ) ;
2026-02-04 17:51:50 +08:00
} ;
const createRedisClient = async ( config2 ) => {
const client = createClient ( {
socket : {
host : config2 . host ,
port : config2 . port
} ,
password : config2 . password ,
database : config2 . db
} ) ;
await client . connect ( ) ;
return client ;
} ;
class RedisIntegration {
constructor ( client , projectName , apiBaseUrl ) {
this . client = client ;
this . projectName = projectName ;
this . apiBaseUrl = apiBaseUrl ;
this . heartbeatKey = "项目心跳" ;
this . logKey = ` ${ projectName } _项目控制台 ` ;
}
async info ( message , context ) {
const payload = {
timestamp : ( /* @__PURE__ */ new Date ( ) ) . toISOString ( ) ,
level : "info" ,
message ,
metadata : context || void 0
} ;
await this . client . rPush ( this . logKey , JSON . stringify ( payload ) ) ;
}
async error ( message , context ) {
const payload = {
timestamp : ( /* @__PURE__ */ new Date ( ) ) . toISOString ( ) ,
level : "error" ,
message ,
metadata : context || void 0
} ;
await this . client . rPush ( this . logKey , JSON . stringify ( payload ) ) ;
}
startHeartbeat ( ) {
setInterval ( ( ) => {
const payload = {
projectName : this . projectName ,
apiBaseUrl : this . apiBaseUrl ,
lastActiveAt : Date . now ( )
} ;
this . client . rPush ( this . heartbeatKey , JSON . stringify ( payload ) ) ;
} , 3e3 ) ;
}
}
const buildErrorQueueKey = ( projectName ) => ` ${ projectName } _error_queue ` ;
const enqueueError = async ( client , queueKey , payload ) => {
try {
await client . rPush ( queueKey , JSON . stringify ( payload ) ) ;
} catch ( error ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Redis enqueue error failed" , { error : error ? . message } ) ;
2026-02-04 17:51:50 +08:00
throw error ;
}
} ;
const startErrorRetryWorker = async ( {
client ,
queueKey ,
handler ,
redisIntegration ,
maxAttempts = 5
} ) => {
while ( true ) {
const result = await client . blPop ( queueKey , 0 ) ;
const raw = result ? . element ;
if ( ! raw ) {
continue ;
}
let item ;
try {
item = JSON . parse ( raw ) ;
} catch ( error ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Invalid error payload" , { error : error ? . message } ) ;
2026-02-04 17:51:50 +08:00
await redisIntegration . error ( "Invalid error payload" , { module : "redis" , stack : error ? . message } ) ;
continue ;
}
const attempts = item . attempts || 0 ;
try {
await handler ( item ) ;
} catch ( error ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Retry handler failed" , { error : error ? . message , stack : error ? . stack } ) ;
2026-02-04 17:51:50 +08:00
const nextPayload = {
... item ,
attempts : attempts + 1 ,
lastError : error ? . message ,
lastAttemptAt : Date . now ( )
} ;
if ( nextPayload . attempts >= maxAttempts ) {
await redisIntegration . error ( "Retry attempts exceeded" , { module : "retry" , stack : JSON . stringify ( nextPayload ) } ) ;
} else {
await enqueueError ( client , queueKey , nextPayload ) ;
}
}
}
} ;
class MetricCollector {
constructor ( ) {
this . reset ( ) ;
}
reset ( ) {
this . metrics = {
kafka _pulled : 0 ,
parse _error : 0 ,
db _inserted : 0 ,
2026-02-09 10:50:56 +08:00
db _failed : 0 ,
db _insert _count : 0 ,
db _insert _ms _sum : 0 ,
batch _flush _count : 0 ,
batch _flush _ms _sum : 0
2026-02-04 17:51:50 +08:00
} ;
2026-02-09 10:50:56 +08:00
this . keyed = { } ;
2026-02-04 17:51:50 +08:00
}
increment ( metric , count = 1 ) {
if ( this . metrics . hasOwnProperty ( metric ) ) {
this . metrics [ metric ] += count ;
}
}
2026-02-09 10:50:56 +08:00
incrementKeyed ( metric , key , count = 1 ) {
if ( ! key ) return ;
if ( ! this . keyed [ metric ] ) {
this . keyed [ metric ] = { } ;
}
if ( ! Object . prototype . hasOwnProperty . call ( this . keyed [ metric ] , key ) ) {
this . keyed [ metric ] [ key ] = 0 ;
}
this . keyed [ metric ] [ key ] += count ;
}
2026-02-04 17:51:50 +08:00
getAndReset ( ) {
const current = { ... this . metrics } ;
2026-02-09 10:50:56 +08:00
const keyed = JSON . parse ( JSON . stringify ( this . keyed ) ) ;
2026-02-04 17:51:50 +08:00
this . reset ( ) ;
2026-02-09 10:50:56 +08:00
return { ... current , keyed } ;
2026-02-04 17:51:50 +08:00
}
}
const bootstrap = async ( ) => {
2026-02-09 10:50:56 +08:00
logger . info ( "Starting application with config" , {
2026-02-04 17:51:50 +08:00
env : process . env . NODE _ENV ,
db : {
host : config . db . host ,
port : config . db . port ,
user : config . db . user ,
database : config . db . database ,
schema : config . db . schema
} ,
kafka : {
brokers : config . kafka . brokers ,
topic : config . kafka . topic ,
groupId : config . kafka . groupId
} ,
redis : {
host : config . redis . host ,
port : config . redis . port
}
} ) ;
await dbInitializer . initialize ( ) ;
const metricCollector = new MetricCollector ( ) ;
cron . schedule ( "0 0 * * *" , async ( ) => {
2026-02-09 10:50:56 +08:00
logger . info ( "Running scheduled partition maintenance..." ) ;
2026-02-04 17:51:50 +08:00
try {
await partitionManager . ensurePartitions ( 30 ) ;
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Scheduled partition maintenance failed" , err ) ;
2026-02-04 17:51:50 +08:00
}
} ) ;
const redisClient = await createRedisClient ( config . redis ) ;
const redisIntegration = new RedisIntegration (
redisClient ,
config . redis . projectName ,
config . redis . apiBaseUrl
) ;
redisIntegration . startHeartbeat ( ) ;
cron . schedule ( "* * * * *" , async ( ) => {
const metrics = metricCollector . getAndReset ( ) ;
2026-02-09 10:50:56 +08:00
const flushAvgMs = metrics . batch _flush _count > 0 ? ( metrics . batch _flush _ms _sum / metrics . batch _flush _count ) . toFixed ( 1 ) : "0.0" ;
const dbAvgMs = metrics . db _insert _count > 0 ? ( metrics . db _insert _ms _sum / metrics . db _insert _count ) . toFixed ( 1 ) : "0.0" ;
const report = ` [Minute Metrics] Pulled: ${ metrics . kafka _pulled } , Parse Error: ${ metrics . parse _error } , Inserted: ${ metrics . db _inserted } , Failed: ${ metrics . db _failed } , FlushAvgMs: ${ flushAvgMs } , DbAvgMs: ${ dbAvgMs } , PulledByPartition: ${ JSON . stringify ( metrics . keyed ? . kafka _pulled _by _partition || { } )}, InsertedByPartition: ${ JSON . stringify ( metrics . keyed ? . db _inserted _by _partition || { } )}, FailedByPartition: ${ JSON . stringify ( metrics . keyed ? . db _failed _by _partition || { } )}, InsertedByDay: ${ JSON . stringify ( metrics . keyed ? . db _inserted _by _day || { } )}, DbMsByDay: ${ JSON . stringify ( metrics . keyed ? . db _insert _ms _sum _by _day || { } )} ` ;
2026-02-04 17:51:50 +08:00
console . log ( report ) ;
2026-02-09 10:50:56 +08:00
logger . info ( report , metrics ) ;
2026-02-04 17:51:50 +08:00
try {
await redisIntegration . info ( "Minute Metrics" , metrics ) ;
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Failed to report metrics to Redis" , { error : err ? . message } ) ;
2026-02-04 17:51:50 +08:00
}
} ) ;
const errorQueueKey = buildErrorQueueKey ( config . redis . projectName ) ;
const handleError = async ( error , message ) => {
2026-02-09 10:50:56 +08:00
logger . error ( "Kafka processing error" , {
2026-02-04 17:51:50 +08:00
error : error ? . message ,
type : error ? . type ,
stack : error ? . stack
} ) ;
try {
await redisIntegration . error ( "Kafka processing error" , {
module : "kafka" ,
stack : error ? . stack || error ? . message
} ) ;
} catch ( redisError ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Redis error log failed" , { error : redisError ? . message } ) ;
2026-02-04 17:51:50 +08:00
}
if ( message ) {
const messageValue = Buffer . isBuffer ( message . value ) ? message . value . toString ( "utf8" ) : message . value ;
try {
await enqueueError ( redisClient , errorQueueKey , {
attempts : 0 ,
value : messageValue ,
meta : {
topic : message . topic ,
partition : message . partition ,
offset : message . offset ,
key : message . key
} ,
timestamp : Date . now ( )
} ) ;
} catch ( enqueueError2 ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Enqueue error payload failed" , { error : enqueueError2 ? . message } ) ;
2026-02-04 17:51:50 +08:00
}
}
} ;
2026-02-09 10:50:56 +08:00
const configuredBatchSize = Number . isFinite ( config . kafka . batchSize ) ? config . kafka . batchSize : 1e3 ;
const configuredBatchTimeoutMs = Number . isFinite ( config . kafka . batchTimeoutMs ) ? config . kafka . batchTimeoutMs : 20 ;
const configuredMaxInFlight = Number . isFinite ( config . kafka . maxInFlight ) ? config . kafka . maxInFlight : 5e3 ;
const BATCH _SIZE = Math . max ( 10 , Math . min ( configuredBatchSize , configuredMaxInFlight ) ) ;
const BATCH _TIMEOUT _MS = Math . max ( 1 , configuredBatchTimeoutMs ) ;
const commitOnAttempt = config . kafka . commitOnAttempt === true ;
const batchStates = /* @__PURE__ */ new Map ( ) ;
const partitionKeyFromMessage = ( message ) => {
if ( message ? . topic !== void 0 && message ? . partition !== void 0 ) {
return ` ${ message . topic } - ${ message . partition } ` ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
return "retry" ;
} ;
const dayKeyFromTsMs = ( tsMs ) => {
const numeric = typeof tsMs === "string" ? Number ( tsMs ) : tsMs ;
if ( ! Number . isFinite ( numeric ) ) return null ;
const d = new Date ( numeric ) ;
if ( Number . isNaN ( d . getTime ( ) ) ) return null ;
const yyyy = d . getFullYear ( ) ;
const mm = String ( d . getMonth ( ) + 1 ) . padStart ( 2 , "0" ) ;
const dd = String ( d . getDate ( ) ) . padStart ( 2 , "0" ) ;
return ` ${ yyyy } ${ mm } ${ dd } ` ;
} ;
const getBatchState = ( key ) => {
if ( ! batchStates . has ( key ) ) {
batchStates . set ( key , { items : [ ] , timer : null , flushing : null } ) ;
}
return batchStates . get ( key ) ;
} ;
const isDbConnectionError = ( err ) => {
const code = err ? . code ;
if ( typeof code === "string" ) {
const networkCodes = /* @__PURE__ */ new Set ( [
"ECONNREFUSED" ,
"ECONNRESET" ,
"EPIPE" ,
"ETIMEDOUT" ,
"ENOTFOUND" ,
"EHOSTUNREACH" ,
"ENETUNREACH" ,
"57P03" ,
"08006" ,
"08001" ,
"08000" ,
"08003"
] ) ;
if ( networkCodes . has ( code ) ) return true ;
}
const message = typeof err ? . message === "string" ? err . message : "" ;
if ( ! message ) return false ;
const lower = message . toLowerCase ( ) ;
return lower . includes ( "connection timeout" ) || lower . includes ( "connection terminated" ) || lower . includes ( "connection refused" ) || lower . includes ( "terminating connection" ) || lower . includes ( "econnrefused" ) || lower . includes ( "econnreset" ) || lower . includes ( "etimedout" ) || lower . includes ( "could not connect" ) || lower . includes ( "the database system is starting up" ) || lower . includes ( "no pg_hba.conf entry" ) ;
} ;
const isMissingPartitionError = ( err ) => err ? . code === "23514" || typeof err ? . message === "string" && err . message . includes ( "no partition of relation" ) ;
const insertRowsWithRetry = async ( rows ) => {
const startedAt = Date . now ( ) ;
let attemptedPartitionFix = false ;
2026-02-04 17:51:50 +08:00
while ( true ) {
try {
2026-02-09 10:50:56 +08:00
await dbManager . insertRows ( { schema : config . db . schema , table : config . db . table , rows } ) ;
metricCollector . increment ( "db_insert_count" , 1 ) ;
metricCollector . increment ( "db_insert_ms_sum" , Date . now ( ) - startedAt ) ;
2026-02-04 17:51:50 +08:00
return ;
2026-02-09 10:50:56 +08:00
} catch ( err ) {
if ( isDbConnectionError ( err ) ) {
logger . error ( "Database offline during batch insert. Retrying in 5s..." , { error : err . message } ) ;
await new Promise ( ( r ) => setTimeout ( r , 5e3 ) ) ;
while ( ! await dbManager . checkConnection ( ) ) {
logger . warn ( "Database still offline. Waiting 5s..." ) ;
await new Promise ( ( r ) => setTimeout ( r , 5e3 ) ) ;
}
continue ;
}
if ( isMissingPartitionError ( err ) && ! attemptedPartitionFix ) {
attemptedPartitionFix = true ;
try {
await partitionManager . ensurePartitionsForTimestamps ( rows . map ( ( r ) => r . ts _ms ) ) ;
} catch ( partitionErr ) {
if ( isDbConnectionError ( partitionErr ) ) {
logger . error ( "Database offline during partition ensure. Retrying in 5s..." , { error : partitionErr . message } ) ;
await new Promise ( ( r ) => setTimeout ( r , 5e3 ) ) ;
while ( ! await dbManager . checkConnection ( ) ) {
logger . warn ( "Database still offline. Waiting 5s..." ) ;
await new Promise ( ( r ) => setTimeout ( r , 5e3 ) ) ;
}
continue ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
throw partitionErr ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
continue ;
}
throw err ;
}
}
} ;
const insertRowsOnce = async ( rows ) => {
const startedAt = Date . now ( ) ;
await dbManager . insertRows ( { schema : config . db . schema , table : config . db . table , rows } ) ;
metricCollector . increment ( "db_insert_count" , 1 ) ;
metricCollector . increment ( "db_insert_ms_sum" , Date . now ( ) - startedAt ) ;
} ;
const resolveInsertedItems = ( partitionKey , items ) => {
let insertedRows = 0 ;
for ( const p of items ) {
insertedRows += p . rows . length ;
const dayKey = dayKeyFromTsMs ( p . rows ? . [ 0 ] ? . ts _ms ) ;
if ( dayKey ) {
metricCollector . incrementKeyed ( "db_inserted_by_day" , dayKey , p . rows . length ) ;
}
p . item . resolve ( ) ;
}
metricCollector . increment ( "db_inserted" , insertedRows ) ;
metricCollector . incrementKeyed ( "db_inserted_by_partition" , partitionKey , insertedRows ) ;
} ;
const handleFailedItem = async ( partitionKey , p , err ) => {
metricCollector . increment ( "db_failed" ) ;
metricCollector . incrementKeyed ( "db_failed_by_partition" , partitionKey , 1 ) ;
const dayKey = dayKeyFromTsMs ( p . rows ? . [ 0 ] ? . ts _ms ) ;
if ( dayKey ) {
metricCollector . incrementKeyed ( "db_failed_by_day" , dayKey , 1 ) ;
}
await handleError ( err , p . item . message ) ;
p . item . resolve ( ) ;
} ;
const insertItemsDegraded = async ( partitionKey , items ) => {
if ( items . length === 0 ) return ;
const rows = items . flatMap ( ( p ) => p . rows ) ;
if ( commitOnAttempt ) {
try {
await insertRowsOnce ( rows ) ;
resolveInsertedItems ( partitionKey , items ) ;
} catch ( err ) {
for ( const item of items ) {
await handleFailedItem ( partitionKey , item , err ) ;
}
}
return ;
}
try {
await insertRowsWithRetry ( rows ) ;
resolveInsertedItems ( partitionKey , items ) ;
return ;
} catch ( err ) {
if ( items . length === 1 ) {
try {
await insertRowsWithRetry ( items [ 0 ] . rows ) ;
resolveInsertedItems ( partitionKey , items ) ;
} catch ( innerErr ) {
await handleFailedItem ( partitionKey , items [ 0 ] , innerErr ) ;
}
return ;
}
const mid = Math . floor ( items . length / 2 ) ;
await insertItemsDegraded ( partitionKey , items . slice ( 0 , mid ) ) ;
await insertItemsDegraded ( partitionKey , items . slice ( mid ) ) ;
}
} ;
const flushBatchForKey = async ( partitionKey ) => {
const state = getBatchState ( partitionKey ) ;
if ( state . flushing ) return state . flushing ;
state . flushing = ( async ( ) => {
if ( state . timer ) {
clearTimeout ( state . timer ) ;
state . timer = null ;
}
if ( state . items . length === 0 ) return ;
const startedAt = Date . now ( ) ;
const currentBatch = state . items ;
state . items = [ ] ;
const pendingDbItems = [ ] ;
const unresolvedItems = [ ] ;
try {
for ( const item of currentBatch ) {
try {
const rows = parseMessageToRows ( item . message ) ;
pendingDbItems . push ( { item , rows } ) ;
unresolvedItems . push ( item ) ;
} catch ( err ) {
2026-02-04 17:51:50 +08:00
metricCollector . increment ( "parse_error" ) ;
2026-02-09 10:50:56 +08:00
metricCollector . incrementKeyed ( "parse_error_by_partition" , partitionKey , 1 ) ;
logger . error ( "Message processing failed (Parse/Validation)" , { error : err . message } ) ;
await handleError ( err , item . message ) ;
item . resolve ( ) ;
}
}
if ( pendingDbItems . length > 0 ) {
const firstTs = pendingDbItems [ 0 ] ? . rows ? . [ 0 ] ? . ts _ms ;
const dayKey = dayKeyFromTsMs ( firstTs ) ;
if ( dayKey ) {
const dayStartMs = Date . now ( ) ;
await insertItemsDegraded ( partitionKey , pendingDbItems ) ;
metricCollector . incrementKeyed ( "db_insert_ms_sum_by_day" , dayKey , Date . now ( ) - dayStartMs ) ;
2026-02-04 17:51:50 +08:00
} else {
2026-02-09 10:50:56 +08:00
await insertItemsDegraded ( partitionKey , pendingDbItems ) ;
}
}
metricCollector . increment ( "batch_flush_count" , 1 ) ;
metricCollector . increment ( "batch_flush_ms_sum" , Date . now ( ) - startedAt ) ;
} catch ( err ) {
if ( ! commitOnAttempt && isDbConnectionError ( err ) ) {
state . items = unresolvedItems . concat ( state . items ) ;
if ( ! state . timer ) {
state . timer = setTimeout ( ( ) => {
state . timer = null ;
flushBatchForKey ( partitionKey ) ;
} , 5e3 ) ;
2026-02-04 17:51:50 +08:00
}
return ;
}
2026-02-09 10:50:56 +08:00
logger . error ( "Batch flush failed (non-network). Marking as consumed" , {
error : err ? . message ,
partitionKey ,
batchSize : currentBatch . length
} ) ;
for ( const item of unresolvedItems ) {
try {
await handleError ( err , item . message ) ;
} catch {
}
item . resolve ( ) ;
}
}
} ) ( ) . finally ( ( ) => {
state . flushing = null ;
if ( state . items . length > 0 ) {
if ( state . items . length >= BATCH _SIZE ) {
flushBatchForKey ( partitionKey ) ;
} else if ( ! state . timer ) {
state . timer = setTimeout ( ( ) => {
state . timer = null ;
flushBatchForKey ( partitionKey ) ;
} , BATCH _TIMEOUT _MS ) ;
}
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
} ) ;
return state . flushing ;
} ;
const handleMessage = ( message ) => {
if ( message . topic ) {
metricCollector . increment ( "kafka_pulled" ) ;
metricCollector . incrementKeyed ( "kafka_pulled_by_partition" , ` ${ message . topic } - ${ message . partition } ` , 1 ) ;
2026-02-04 17:51:50 +08:00
}
2026-02-09 10:50:56 +08:00
const partitionKey = partitionKeyFromMessage ( message ) ;
const state = getBatchState ( partitionKey ) ;
return new Promise ( ( resolve , reject ) => {
state . items . push ( { message , resolve , reject } ) ;
if ( state . items . length >= BATCH _SIZE ) {
flushBatchForKey ( partitionKey ) ;
} else if ( ! state . timer ) {
state . timer = setTimeout ( ( ) => {
state . timer = null ;
flushBatchForKey ( partitionKey ) ;
} , BATCH _TIMEOUT _MS ) ;
}
} ) ;
2026-02-04 17:51:50 +08:00
} ;
const consumers = createKafkaConsumers ( {
kafkaConfig : config . kafka ,
onMessage : handleMessage ,
onError : handleError
} ) ;
startErrorRetryWorker ( {
client : redisClient ,
queueKey : errorQueueKey ,
redisIntegration ,
handler : async ( item ) => {
if ( ! item ? . value ) {
throw new Error ( "Missing value in retry payload" ) ;
}
await handleMessage ( { value : item . value } ) ;
}
} ) . catch ( ( err ) => {
2026-02-09 10:50:56 +08:00
logger . error ( "Retry worker failed" , { error : err ? . message } ) ;
2026-02-04 17:51:50 +08:00
} ) ;
const shutdown = async ( signal ) => {
2026-02-09 10:50:56 +08:00
logger . info ( ` Received ${ signal } , shutting down... ` ) ;
2026-02-04 17:51:50 +08:00
try {
if ( consumers && consumers . length > 0 ) {
await Promise . all ( consumers . map ( ( c ) => new Promise ( ( resolve ) => c . close ( true , resolve ) ) ) ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Kafka consumer closed" , { count : consumers . length } ) ;
2026-02-04 17:51:50 +08:00
}
await redisClient . quit ( ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Redis client closed" ) ;
2026-02-04 17:51:50 +08:00
await dbManager . close ( ) ;
2026-02-09 10:50:56 +08:00
logger . info ( "Database connection closed" ) ;
2026-02-04 17:51:50 +08:00
process . exit ( 0 ) ;
} catch ( err ) {
2026-02-09 10:50:56 +08:00
logger . error ( "Error during shutdown" , { error : err ? . message } ) ;
2026-02-04 17:51:50 +08:00
process . exit ( 1 ) ;
}
} ;
process . on ( "SIGTERM" , ( ) => shutdown ( "SIGTERM" ) ) ;
process . on ( "SIGINT" , ( ) => shutdown ( "SIGINT" ) ) ;
} ;
bootstrap ( ) . catch ( ( error ) => {
2026-02-09 10:50:56 +08:00
logger . error ( "Service bootstrap failed" , { error : error ? . message } ) ;
2026-02-04 17:51:50 +08:00
process . exit ( 1 ) ;
} ) ;