2026-03-14 17:33:19 +08:00
import cron from "node-cron" ;
import dotenv from "dotenv" ;
import pg from "pg" ;
import kafka from "kafka-node" ;
import { z } from "zod" ;
dotenv . config ( ) ;
const parseNumber = ( value , defaultValue ) => {
const parsed = Number ( value ) ;
return Number . isFinite ( parsed ) ? parsed : defaultValue ;
} ;
const parseList = ( value ) => ( value || "" ) . split ( "," ) . map ( ( item ) => item . trim ( ) ) . filter ( Boolean ) ;
const config = {
env : process . env . NODE _ENV || "development" ,
port : parseNumber ( process . env . PORT , 3001 ) ,
kafka : {
brokers : parseList ( process . env . KAFKA _BROKERS ) ,
topic : process . env . KAFKA _TOPIC || process . env . KAFKA _TOPICS || "blwlog4Nodejs-rcu-register-topic" ,
groupId : process . env . KAFKA _GROUP _ID || "bls-register-consumer" ,
clientId : process . env . KAFKA _CLIENT _ID || "bls-register-consumer-client" ,
consumerInstances : parseNumber ( process . env . KAFKA _CONSUMER _INSTANCES , 1 ) ,
maxInFlight : parseNumber ( process . env . KAFKA _MAX _IN _FLIGHT , 2e4 ) ,
fetchMaxBytes : parseNumber ( process . env . KAFKA _FETCH _MAX _BYTES , 50 * 1024 * 1024 ) ,
fetchMinBytes : parseNumber ( process . env . KAFKA _FETCH _MIN _BYTES , 256 * 1024 ) ,
fetchMaxWaitMs : parseNumber ( process . env . KAFKA _FETCH _MAX _WAIT _MS , 100 ) ,
fromOffset : process . env . KAFKA _FROM _OFFSET || "latest" ,
autoCommitIntervalMs : parseNumber ( process . env . KAFKA _AUTO _COMMIT _INTERVAL _MS , 5e3 ) ,
commitIntervalMs : parseNumber ( process . env . KAFKA _COMMIT _INTERVAL _MS , 200 ) ,
commitOnAttempt : process . env . KAFKA _COMMIT _ON _ATTEMPT === "true" ,
batchSize : parseNumber ( process . env . KAFKA _BATCH _SIZE , 5e3 ) ,
batchTimeoutMs : parseNumber ( process . env . KAFKA _BATCH _TIMEOUT _MS , 50 ) ,
flushIntervalMs : parseNumber ( process . env . KAFKA _FLUSH _INTERVAL _MS , 3e3 ) ,
logMessages : process . env . KAFKA _LOG _MESSAGES === "true" ,
sasl : process . env . KAFKA _SASL _USERNAME && process . env . KAFKA _SASL _PASSWORD ? {
mechanism : process . env . KAFKA _SASL _MECHANISM || "plain" ,
username : process . env . KAFKA _SASL _USERNAME ,
password : process . env . KAFKA _SASL _PASSWORD
} : void 0
} ,
db : {
host : process . env . POSTGRES _HOST _G5 ,
port : parseNumber ( process . env . POSTGRES _PORT _G5 , 5434 ) ,
user : process . env . POSTGRES _USER _G5 ,
password : process . env . POSTGRES _PASSWORD _G5 ,
database : process . env . POSTGRES _DATABASE _G5 ,
max : parseNumber ( process . env . POSTGRES _MAX _CONNECTIONS _G5 , 6 ) ,
ssl : process . env . POSTGRES _SSL _G5 === "true" ? { rejectUnauthorized : false } : void 0 ,
schema : process . env . DB _SCHEMA || "rcu_info" ,
table : process . env . DB _TABLE || "rcu_info_events_g5" ,
roomStatusSchema : process . env . DB _ROOM _STATUS _SCHEMA || "room_status" ,
roomStatusTable : process . env . DB _ROOM _STATUS _TABLE || "room_status_moment_g5"
} ,
redis : {
host : process . env . REDIS _HOST || "localhost" ,
port : parseNumber ( process . env . REDIS _PORT , 6379 ) ,
password : process . env . REDIS _PASSWORD || void 0 ,
db : parseNumber ( process . env . REDIS _DB , 0 ) ,
projectName : process . env . REDIS _PROJECT _NAME || "bls-onoffline" ,
apiBaseUrl : process . env . REDIS _API _BASE _URL || ` http://localhost: ${ parseNumber ( process . env . PORT , 3001 ) } `
}
} ;
const format = ( level , message , context ) => {
const payload = {
level ,
message ,
timestamp : Date . now ( ) ,
... context ? { context } : { }
} ;
return JSON . stringify ( payload ) ;
} ;
const logger = {
info ( message , context ) {
process . stdout . write ( ` ${ format ( "info" , message , context ) }
` );
} ,
error ( message , context ) {
process . stderr . write ( ` ${ format ( "error" , message , context ) }
` );
} ,
warn ( message , context ) {
process . stderr . write ( ` ${ format ( "warn" , message , context ) }
` );
}
} ;
const { Pool } = pg ;
const registerColumns = [
"ts_ms" ,
"hotel_id" ,
"room_id" ,
"device_id" ,
"write_ts_ms" ,
"is_send" ,
"udp_raw" ,
"extra" ,
"ip_type" ,
"model_num" ,
"server_ip" ,
"ip" ,
"subnet_mask" ,
"gateway" ,
"dns" ,
"app_version" ,
"rcu_time" ,
"launcher_version" ,
"mac" ,
"room_type_id" ,
"config_version" ,
"room_status" ,
"season" ,
"sys_lock_status" ,
"authorization_time" ,
"authorization_days" ,
"room_num_remark" ,
"room_type_remark" ,
"room_remark" ,
"mcu_name" ,
"central_control_name" ,
"configure_hotel_name" ,
"configure_room_type_name"
] ;
const roomStatusColumns = [
"hotel_id" ,
"room_id" ,
2026-03-18 15:12:29 +08:00
"ip" ,
2026-03-14 17:33:19 +08:00
"app_version" ,
"launcher_version" ,
"config_version" ,
"upgrade_ts_ms" ,
"register_ts_ms"
] ;
class DatabaseManager {
constructor ( dbConfig ) {
this . pool = new Pool ( {
host : dbConfig . host ,
port : dbConfig . port ,
user : dbConfig . user ,
password : dbConfig . password ,
database : dbConfig . database ,
max : dbConfig . max ,
ssl : dbConfig . ssl
} ) ;
}
async insertRegisterRows ( { schema , table , rows } ) {
if ( ! rows || rows . length === 0 ) {
return ;
}
const statement = `
INSERT INTO $ { schema } . $ { table } ( $ { registerColumns . join ( ", " ) } )
SELECT *
FROM UNNEST (
$1 : : int8 [ ] ,
$2 : : int2 [ ] ,
$3 : : text [ ] ,
$4 : : text [ ] ,
$5 : : int8 [ ] ,
$6 : : int2 [ ] ,
$7 : : text [ ] ,
$8 : : jsonb [ ] ,
$9 : : int2 [ ] ,
$10 : : text [ ] ,
$11 : : text [ ] ,
$12 : : text [ ] ,
$13 : : text [ ] ,
$14 : : text [ ] ,
$15 : : text [ ] ,
$16 : : text [ ] ,
$17 : : text [ ] ,
$18 : : text [ ] ,
$19 : : text [ ] ,
$20 : : int8 [ ] ,
$21 : : text [ ] ,
$22 : : int4 [ ] ,
$23 : : int4 [ ] ,
$24 : : int4 [ ] ,
$25 : : text [ ] ,
$26 : : text [ ] ,
$27 : : text [ ] ,
$28 : : text [ ] ,
$29 : : text [ ] ,
$30 : : text [ ] ,
$31 : : text [ ] ,
$32 : : text [ ] ,
$33 : : text [ ]
)
ON CONFLICT DO NOTHING
` ;
try {
const params = registerColumns . map ( ( column ) => rows . map ( ( row ) => row [ column ] ? ? null ) ) ;
await this . pool . query ( statement , params ) ;
} catch ( error ) {
logger . error ( "Register table insert failed" , {
error : error ? . message ,
schema ,
table ,
rowsLength : rows . length
} ) ;
throw error ;
}
}
async updateRoomStatusRows ( { schema , table , rows } ) {
if ( ! rows || rows . length === 0 ) {
return ;
}
const statement = `
WITH incoming AS (
SELECT *
FROM UNNEST (
$1 : : int2 [ ] ,
$2 : : text [ ] ,
$3 : : text [ ] ,
$4 : : text [ ] ,
$5 : : text [ ] ,
2026-03-18 15:12:29 +08:00
$6 : : text [ ] ,
$7 : : int8 [ ] ,
$8 : : int8 [ ]
2026-03-14 17:33:19 +08:00
) AS u ( $ { roomStatusColumns . join ( ", " ) } )
) , dedup AS (
SELECT DISTINCT ON ( hotel _id , room _id )
hotel _id ,
room _id ,
2026-03-18 15:12:29 +08:00
ip ,
2026-03-14 17:33:19 +08:00
app _version ,
launcher _version ,
config _version ,
upgrade _ts _ms ,
register _ts _ms
FROM incoming
ORDER BY hotel _id , room _id , register _ts _ms DESC
) , existing AS (
SELECT i . * , t . device _id
FROM dedup i
INNER JOIN $ { schema } . $ { table } t
ON t . hotel _id = i . hotel _id
AND t . room _id = i . room _id
)
INSERT INTO $ { schema } . $ { table } (
hotel _id ,
room _id ,
device _id ,
2026-03-18 15:12:29 +08:00
ip ,
2026-03-14 17:33:19 +08:00
app _version ,
launcher _version ,
config _version ,
upgrade _ts _ms ,
register _ts _ms
)
SELECT
hotel _id ,
room _id ,
device _id ,
2026-03-18 15:12:29 +08:00
ip ,
2026-03-14 17:33:19 +08:00
app _version ,
launcher _version ,
config _version ,
upgrade _ts _ms ,
register _ts _ms
FROM existing
ON CONFLICT ( hotel _id , room _id ) DO UPDATE
SET
2026-03-18 15:12:29 +08:00
ip = EXCLUDED . ip ,
2026-03-14 17:33:19 +08:00
app _version = EXCLUDED . app _version ,
launcher _version = EXCLUDED . launcher _version ,
config _version = EXCLUDED . config _version ,
upgrade _ts _ms = EXCLUDED . upgrade _ts _ms ,
register _ts _ms = EXCLUDED . register _ts _ms
` ;
try {
const params = roomStatusColumns . map ( ( column ) => rows . map ( ( row ) => row [ column ] ? ? null ) ) ;
await this . pool . query ( statement , params ) ;
} catch ( error ) {
logger . error ( "Room status table update failed" , {
error : error ? . message ,
schema ,
table ,
rowsLength : rows . length
} ) ;
throw error ;
}
}
async checkConnection ( ) {
let client ;
try {
const connectPromise = this . pool . connect ( ) ;
const timeoutPromise = new Promise ( ( _ , reject ) => {
setTimeout ( ( ) => reject ( new Error ( "Connection timeout" ) ) , 5e3 ) ;
} ) ;
try {
client = await Promise . race ( [ connectPromise , timeoutPromise ] ) ;
} catch ( raceError ) {
connectPromise . then ( ( c ) => c . release ( ) ) . catch ( ( ) => {
} ) ;
throw raceError ;
}
await client . query ( "SELECT 1" ) ;
return true ;
} catch ( err ) {
logger . error ( "Database check connection failed" , { error : err . message } ) ;
return false ;
} finally {
if ( client ) {
client . release ( ) ;
}
}
}
async close ( ) {
await this . pool . end ( ) ;
}
}
const dbManager = new DatabaseManager ( config . db ) ;
class OffsetTracker {
constructor ( ) {
this . partitions = /* @__PURE__ */ new Map ( ) ;
}
// Called when a message is received (before processing)
add ( topic , partition , offset ) {
const key = ` ${ topic } - ${ partition } ` ;
if ( ! this . partitions . has ( key ) ) {
this . partitions . set ( key , { nextCommitOffset : null , done : /* @__PURE__ */ new Set ( ) } ) ;
}
const state = this . partitions . get ( key ) ;
const numericOffset = Number ( offset ) ;
if ( ! Number . isFinite ( numericOffset ) ) return ;
if ( state . nextCommitOffset === null ) {
state . nextCommitOffset = numericOffset ;
} else if ( numericOffset < state . nextCommitOffset ) {
state . nextCommitOffset = numericOffset ;
}
}
// Called when a message is successfully processed
// Returns the next offset to commit (if any advancement is possible), or null
markDone ( topic , partition , offset ) {
const key = ` ${ topic } - ${ partition } ` ;
const state = this . partitions . get ( key ) ;
if ( ! state ) return null ;
const numericOffset = Number ( offset ) ;
if ( ! Number . isFinite ( numericOffset ) ) return null ;
state . done . add ( numericOffset ) ;
if ( state . nextCommitOffset === null ) {
state . nextCommitOffset = numericOffset ;
}
let advanced = false ;
while ( state . nextCommitOffset !== null && state . done . has ( state . nextCommitOffset ) ) {
state . done . delete ( state . nextCommitOffset ) ;
state . nextCommitOffset += 1 ;
advanced = true ;
}
if ( ! advanced ) return null ;
return state . nextCommitOffset ;
}
clear ( ) {
this . partitions . clear ( ) ;
}
}
const { ConsumerGroup } = kafka ;
const createOneConsumer = ( { kafkaConfig , onMessage , onError , instanceIndex } ) => {
const kafkaHost = kafkaConfig . brokers . join ( "," ) ;
const clientId = instanceIndex === 0 ? kafkaConfig . clientId : ` ${ kafkaConfig . clientId } - ${ instanceIndex } ` ;
const id = ` ${ clientId } - ${ process . pid } - ${ Date . now ( ) } ` ;
const maxInFlight = Number . isFinite ( kafkaConfig . maxInFlight ) ? kafkaConfig . maxInFlight : 5e3 ;
const commitIntervalMs = Number . isFinite ( kafkaConfig . commitIntervalMs ) ? kafkaConfig . commitIntervalMs : 200 ;
let inFlight = 0 ;
const tracker = new OffsetTracker ( ) ;
let pendingCommits = /* @__PURE__ */ new Map ( ) ;
let commitTimer = null ;
const flushCommits = ( ) => {
if ( pendingCommits . size === 0 ) return ;
const batch = pendingCommits ;
pendingCommits = /* @__PURE__ */ new Map ( ) ;
consumer . sendOffsetCommitRequest (
Array . from ( batch . values ( ) ) ,
( err ) => {
if ( err ) {
for ( const [ k , v ] of batch . entries ( ) ) {
pendingCommits . set ( k , v ) ;
}
logger . error ( "Kafka commit failed" , { error : err ? . message , count : batch . size } ) ;
}
}
) ;
} ;
const scheduleCommitFlush = ( ) => {
if ( commitTimer ) return ;
commitTimer = setTimeout ( ( ) => {
commitTimer = null ;
flushCommits ( ) ;
} , commitIntervalMs ) ;
} ;
const consumer = new ConsumerGroup (
{
kafkaHost ,
groupId : kafkaConfig . groupId ,
clientId ,
id ,
fromOffset : kafkaConfig . fromOffset || "latest" ,
protocol : [ "roundrobin" ] ,
outOfRangeOffset : "latest" ,
autoCommit : false ,
autoCommitIntervalMs : kafkaConfig . autoCommitIntervalMs ,
fetchMaxBytes : kafkaConfig . fetchMaxBytes ,
fetchMinBytes : kafkaConfig . fetchMinBytes ,
fetchMaxWaitMs : kafkaConfig . fetchMaxWaitMs ,
sasl : kafkaConfig . sasl
} ,
kafkaConfig . topic
) ;
const tryResume = ( ) => {
if ( inFlight < maxInFlight && consumer . paused ) {
consumer . resume ( ) ;
}
} ;
consumer . on ( "message" , ( message ) => {
inFlight += 1 ;
tracker . add ( message . topic , message . partition , message . offset ) ;
if ( inFlight >= maxInFlight ) {
consumer . pause ( ) ;
}
Promise . resolve ( onMessage ( message ) ) . then ( ( ) => {
} ) . catch ( ( error ) => {
logger . error ( "Kafka message handling failed" , { error : error ? . message } ) ;
if ( onError ) {
onError ( error , message ) ;
}
} ) . finally ( ( ) => {
const commitOffset = tracker . markDone ( message . topic , message . partition , message . offset ) ;
if ( commitOffset !== null ) {
const key = ` ${ message . topic } - ${ message . partition } ` ;
pendingCommits . set ( key , {
topic : message . topic ,
partition : message . partition ,
offset : commitOffset ,
metadata : "m"
} ) ;
scheduleCommitFlush ( ) ;
}
inFlight -= 1 ;
tryResume ( ) ;
} ) ;
} ) ;
consumer . on ( "error" , ( error ) => {
logger . error ( "Kafka consumer error" , { error : error ? . message } ) ;
if ( onError ) {
onError ( error ) ;
}
} ) ;
consumer . on ( "connect" , ( ) => {
logger . info ( ` Kafka Consumer connected ` , {
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
consumer . on ( "rebalancing" , ( ) => {
logger . info ( ` Kafka Consumer rebalancing ` , {
groupId : kafkaConfig . groupId ,
clientId
} ) ;
tracker . clear ( ) ;
pendingCommits . clear ( ) ;
if ( commitTimer ) {
clearTimeout ( commitTimer ) ;
commitTimer = null ;
}
} ) ;
consumer . on ( "rebalanced" , ( ) => {
logger . info ( "Kafka Consumer rebalanced" , { clientId , groupId : kafkaConfig . groupId } ) ;
} ) ;
consumer . on ( "error" , ( err ) => {
logger . error ( "Kafka Consumer Error" , { error : err . message } ) ;
} ) ;
consumer . on ( "offsetOutOfRange" , ( err ) => {
logger . warn ( "Offset out of range" , { error : err . message , topic : err . topic , partition : err . partition } ) ;
} ) ;
consumer . on ( "offsetOutOfRange" , ( error ) => {
logger . warn ( ` Kafka Consumer offset out of range ` , {
error : error ? . message ,
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
consumer . on ( "close" , ( ) => {
if ( commitTimer ) {
clearTimeout ( commitTimer ) ;
commitTimer = null ;
}
flushCommits ( ) ;
logger . warn ( ` Kafka Consumer closed ` , {
groupId : kafkaConfig . groupId ,
clientId
} ) ;
} ) ;
return consumer ;
} ;
const createKafkaConsumers = ( { kafkaConfig , onMessage , onError } ) => {
const instances = Number . isFinite ( kafkaConfig . consumerInstances ) ? kafkaConfig . consumerInstances : 1 ;
const count = Math . max ( 1 , instances ) ;
return Array . from (
{ length : count } ,
( _ , idx ) => createOneConsumer ( { kafkaConfig , onMessage , onError , instanceIndex : idx } )
) ;
} ;
const toNumber = ( value ) => {
if ( value === void 0 || value === null || value === "" ) {
return null ;
}
if ( typeof value === "number" ) {
return value ;
}
const parsed = Number ( value ) ;
return Number . isFinite ( parsed ) ? parsed : null ;
} ;
const toStringAllowEmpty = ( value ) => {
if ( value === void 0 || value === null ) {
return value ;
}
return String ( value ) ;
} ;
const kafkaPayloadSchema = z . object ( {
ts _ms : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
upgrade _ts _ms : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
hotel _id : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _id : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
device _id : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
is _send : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
udp _raw : z . any ( ) . optional ( ) . nullable ( ) ,
extra : z . any ( ) . optional ( ) . nullable ( ) ,
ip _type : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
model _num : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
server _ip : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
ip : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
subnet _mask : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
gateway : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
dns : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
app _version : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
rcu _time : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
launcher _version : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
mac : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _type _id : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
config _version : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _status : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
season : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
sys _lock _status : z . preprocess ( toNumber , z . number ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
authorization _time : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
authorization _days : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _num _remark : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _type _remark : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
room _remark : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
mcu _name : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
central _control _name : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
configure _hotel _name : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( ) ,
configure _room _type _name : z . preprocess ( toStringAllowEmpty , z . string ( ) . nullable ( ) ) . optional ( ) . nullable ( )
} ) ;
const normalizeText = ( value , maxLength ) => {
if ( value === void 0 || value === null ) {
return null ;
}
const str = String ( value ) . replace ( /\u0000/g , "" ) ;
if ( maxLength && str . length > maxLength ) {
return str . substring ( 0 , maxLength ) ;
}
return str ;
} ;
const sanitizeJsonValue = ( value ) => {
if ( value === void 0 || value === null ) {
return value ;
}
if ( typeof value === "string" ) {
return value . replace ( /\u0000/g , "" ) ;
}
if ( Array . isArray ( value ) ) {
return value . map ( ( item ) => sanitizeJsonValue ( item ) ) ;
}
if ( typeof value === "object" ) {
const out = { } ;
for ( const [ k , v ] of Object . entries ( value ) ) {
out [ k ] = sanitizeJsonValue ( v ) ;
}
return out ;
}
return value ;
} ;
const isLikelyBase64 = ( text ) => {
if ( ! text || text . length % 4 !== 0 ) {
return false ;
}
return /^[A-Za-z0-9+/]+={0,2}$/ . test ( text ) ;
} ;
const normalizeInteger = ( value ) => {
if ( value === void 0 || value === null || value === "" ) {
return null ;
}
const numeric = typeof value === "number" ? value : Number ( value ) ;
if ( ! Number . isFinite ( numeric ) ) {
return null ;
}
return Math . trunc ( numeric ) ;
} ;
const inRangeOr = ( value , min , max , fallback ) => {
if ( typeof value !== "number" || Number . isNaN ( value ) || value < min || value > max ) {
return fallback ;
}
return value ;
} ;
const normalizeTsMs = ( value ) => {
const numeric = normalizeInteger ( value ) ;
if ( numeric === null ) {
return Date . now ( ) ;
}
if ( numeric > 0 && numeric < 1e11 ) {
return numeric * 1e3 ;
}
return numeric ;
} ;
const normalizeUdpRaw = ( value ) => {
if ( value === void 0 || value === null ) {
return null ;
}
if ( typeof value === "string" ) {
const text = value . replace ( /\u0000/g , "" ) ;
if ( isLikelyBase64 ( text ) ) {
return text ;
}
return Buffer . from ( text , "utf8" ) . toString ( "base64" ) ;
}
if ( Buffer . isBuffer ( value ) ) {
return value . toString ( "base64" ) ;
}
if ( Array . isArray ( value ) ) {
return Buffer . from ( value ) . toString ( "base64" ) ;
}
return Buffer . from ( String ( value ) , "utf8" ) . toString ( "base64" ) ;
} ;
const normalizeExtra = ( value ) => {
if ( value === void 0 || value === null || value === "" ) {
return null ;
}
if ( typeof value === "object" ) {
return sanitizeJsonValue ( value ) ;
}
if ( typeof value === "string" ) {
try {
const parsed = JSON . parse ( value ) ;
if ( parsed && typeof parsed === "object" ) {
return sanitizeJsonValue ( parsed ) ;
}
return sanitizeJsonValue ( { value : parsed } ) ;
} catch {
return sanitizeJsonValue ( { raw : value } ) ;
}
}
return sanitizeJsonValue ( { raw : String ( value ) } ) ;
} ;
const pick = ( payload , snakeKey , pascalKey ) => {
if ( payload [ snakeKey ] !== void 0 ) {
return payload [ snakeKey ] ;
}
if ( payload [ pascalKey ] !== void 0 ) {
return payload [ pascalKey ] ;
}
return void 0 ;
} ;
const buildRowsFromPayload = ( rawPayload ) => {
const normalizedInput = {
ts _ms : pick ( rawPayload , "ts_ms" , "ts_ms" ) ,
upgrade _ts _ms : pick ( rawPayload , "upgrade_ts_ms" , "upgrade_ts_ms" ) ,
hotel _id : pick ( rawPayload , "hotel_id" , "hotel_id" ) ,
room _id : pick ( rawPayload , "room_id" , "room_id" ) ,
device _id : pick ( rawPayload , "device_id" , "device_id" ) ,
is _send : pick ( rawPayload , "is_send" , "is_send" ) ,
udp _raw : pick ( rawPayload , "udp_raw" , "udp_raw" ) ,
extra : pick ( rawPayload , "extra" , "extra" ) ,
ip _type : pick ( rawPayload , "ip_type" , "ip_type" ) ,
model _num : pick ( rawPayload , "model_num" , "model_num" ) ,
server _ip : pick ( rawPayload , "server_ip" , "server_ip" ) ,
ip : pick ( rawPayload , "ip" , "ip" ) ,
subnet _mask : pick ( rawPayload , "subnet_mask" , "subnet_mask" ) ,
gateway : pick ( rawPayload , "gateway" , "gateway" ) ,
dns : pick ( rawPayload , "dns" , "dns" ) ,
app _version : pick ( rawPayload , "app_version" , "app_version" ) ,
rcu _time : pick ( rawPayload , "rcu_time" , "rcu_time" ) ,
launcher _version : pick ( rawPayload , "launcher_version" , "launcher_version" ) ,
mac : pick ( rawPayload , "mac" , "mac" ) ,
room _type _id : pick ( rawPayload , "room_type_id" , "room_type_id" ) ,
config _version : pick ( rawPayload , "config_version" , "config_version" ) ,
room _status : pick ( rawPayload , "room_status" , "room_status" ) ,
season : pick ( rawPayload , "season" , "season" ) ,
sys _lock _status : pick ( rawPayload , "sys_lock_status" , "sys_lock_status" ) ,
authorization _time : pick ( rawPayload , "authorization_time" , "authorization_time" ) ,
authorization _days : pick ( rawPayload , "authorization_days" , "authorization_days" ) ,
room _num _remark : pick ( rawPayload , "room_num_remark" , "room_num_remark" ) ,
room _type _remark : pick ( rawPayload , "room_type_remark" , "room_type_remark" ) ,
room _remark : pick ( rawPayload , "room_remark" , "room_remark" ) ,
mcu _name : pick ( rawPayload , "mcu_name" , "mcu_name" ) ,
central _control _name : pick ( rawPayload , "central_control_name" , "central_control_name" ) ,
configure _hotel _name : pick ( rawPayload , "configure_hotel_name" , "configure_hotel_name" ) ,
configure _room _type _name : pick ( rawPayload , "configure_room_type_name" , "configure_room_type_name" )
} ;
const payload = kafkaPayloadSchema . parse ( normalizedInput ) ;
const tsMs = normalizeTsMs ( payload . ts _ms ) ;
const hotelId = inRangeOr ( normalizeInteger ( payload . hotel _id ) , - 32768 , 32767 , 0 ) ;
const roomId = normalizeText ( payload . room _id , 50 ) || "" ;
const registerRow = {
ts _ms : tsMs ,
hotel _id : hotelId ,
room _id : roomId ,
device _id : normalizeText ( payload . device _id , 64 ) ,
write _ts _ms : Date . now ( ) ,
is _send : inRangeOr ( normalizeInteger ( payload . is _send ) , - 32768 , 32767 , 0 ) ,
udp _raw : normalizeUdpRaw ( payload . udp _raw ) ,
extra : normalizeExtra ( payload . extra ) ,
ip _type : inRangeOr ( normalizeInteger ( payload . ip _type ) , - 32768 , 32767 , null ) ,
model _num : normalizeText ( payload . model _num , 32 ) ,
server _ip : normalizeText ( payload . server _ip , 21 ) ,
ip : normalizeText ( payload . ip , 21 ) ,
subnet _mask : normalizeText ( payload . subnet _mask , 15 ) ,
gateway : normalizeText ( payload . gateway , 15 ) ,
dns : normalizeText ( payload . dns , 15 ) ,
app _version : normalizeText ( payload . app _version , 64 ) ,
rcu _time : normalizeText ( payload . rcu _time , 25 ) ,
launcher _version : normalizeText ( payload . launcher _version , 64 ) ,
mac : normalizeText ( payload . mac , 17 ) ,
room _type _id : normalizeInteger ( payload . room _type _id ) ,
config _version : normalizeText ( payload . config _version , 32 ) ,
room _status : inRangeOr ( normalizeInteger ( payload . room _status ) , - 2147483648 , 2147483647 , null ) ,
season : inRangeOr ( normalizeInteger ( payload . season ) , - 2147483648 , 2147483647 , null ) ,
sys _lock _status : inRangeOr ( normalizeInteger ( payload . sys _lock _status ) , - 2147483648 , 2147483647 , null ) ,
authorization _time : normalizeText ( payload . authorization _time , 10 ) ,
authorization _days : normalizeText ( payload . authorization _days , 10 ) ,
room _num _remark : normalizeText ( payload . room _num _remark , 255 ) ,
room _type _remark : normalizeText ( payload . room _type _remark , 64 ) ,
room _remark : normalizeText ( payload . room _remark , 64 ) ,
mcu _name : normalizeText ( payload . mcu _name , 255 ) ,
central _control _name : normalizeText ( payload . central _control _name , 255 ) ,
configure _hotel _name : normalizeText ( payload . configure _hotel _name , 255 ) ,
configure _room _type _name : normalizeText ( payload . configure _room _type _name , 255 )
} ;
const roomStatusUpdateRow = {
hotel _id : hotelId ,
room _id : roomId ,
2026-03-18 15:12:29 +08:00
ip : registerRow . ip ,
2026-03-14 17:33:19 +08:00
app _version : registerRow . app _version ,
launcher _version : registerRow . launcher _version ,
config _version : registerRow . config _version ,
upgrade _ts _ms : normalizeTsMs ( payload . upgrade _ts _ms ) ,
register _ts _ms : tsMs
} ;
return {
registerRows : [ registerRow ] ,
2026-04-03 18:47:25 +08:00
roomStatusRows : registerRow . is _send === 0 ? [ roomStatusUpdateRow ] : [ ]
2026-03-14 17:33:19 +08:00
} ;
} ;
const parseMessageToRows = ( message ) => {
const rawValue = message . value . toString ( ) ;
let payload ;
try {
payload = JSON . parse ( rawValue ) ;
} catch ( e ) {
const error = new Error ( ` JSON Parse Error: ${ e . message } ` ) ;
error . type = "PARSE_ERROR" ;
throw error ;
}
const validationResult = kafkaPayloadSchema . safeParse ( payload ) ;
if ( ! validationResult . success ) {
const error = new Error ( ` Schema Validation Failed: ${ JSON . stringify ( validationResult . error . errors ) } ` ) ;
error . type = "VALIDATION_ERROR" ;
throw error ;
}
return buildRowsFromPayload ( payload ) ;
} ;
class MetricCollector {
constructor ( ) {
this . reset ( ) ;
}
reset ( ) {
this . metrics = {
kafka _pulled : 0 ,
parse _error : 0 ,
db _inserted : 0 ,
db _failed : 0 ,
db _insert _count : 0 ,
db _insert _ms _sum : 0 ,
batch _flush _count : 0 ,
batch _flush _ms _sum : 0
} ;
this . keyed = { } ;
}
increment ( metric , count = 1 ) {
if ( this . metrics . hasOwnProperty ( metric ) ) {
this . metrics [ metric ] += count ;
}
}
incrementKeyed ( metric , key , count = 1 ) {
if ( ! key ) return ;
if ( ! this . keyed [ metric ] ) {
this . keyed [ metric ] = { } ;
}
if ( ! Object . prototype . hasOwnProperty . call ( this . keyed [ metric ] , key ) ) {
this . keyed [ metric ] [ key ] = 0 ;
}
this . keyed [ metric ] [ key ] += count ;
}
getAndReset ( ) {
const current = { ... this . metrics } ;
const keyed = JSON . parse ( JSON . stringify ( this . keyed ) ) ;
this . reset ( ) ;
return { ... current , keyed } ;
}
}
const NETWORK _CODES = /* @__PURE__ */ new Set ( [
"ECONNREFUSED" ,
"ECONNRESET" ,
"EPIPE" ,
"ETIMEDOUT" ,
"ENOTFOUND" ,
"EHOSTUNREACH" ,
"ENETUNREACH" ,
"57P03" ,
"08006" ,
"08001" ,
"08000" ,
"08003"
] ) ;
const isDbConnectionError = ( err ) => {
if ( typeof err ? . code === "string" && NETWORK _CODES . has ( err . code ) ) {
return true ;
}
const message = typeof err ? . message === "string" ? err . message . toLowerCase ( ) : "" ;
return message . includes ( "connection timeout" ) || message . includes ( "connection terminated" ) || message . includes ( "connection refused" ) || message . includes ( "terminating connection" ) || message . includes ( "econnrefused" ) || message . includes ( "econnreset" ) || message . includes ( "etimedout" ) || message . includes ( "could not connect" ) || message . includes ( "the database system is starting up" ) || message . includes ( "no pg_hba.conf entry" ) ;
} ;
const sleep = ( ms ) => new Promise ( ( resolve ) => setTimeout ( resolve , ms ) ) ;
const bootstrap = async ( ) => {
logger . info ( "Starting register consumer" , {
env : config . env ,
kafka : {
brokers : config . kafka . brokers ,
topic : config . kafka . topic ,
groupId : config . kafka . groupId
} ,
db : {
host : config . db . host ,
port : config . db . port ,
database : config . db . database ,
schema : config . db . schema ,
table : config . db . table ,
roomStatusSchema : config . db . roomStatusSchema ,
roomStatusTable : config . db . roomStatusTable
} ,
flushIntervalMs : config . kafka . flushIntervalMs
} ) ;
const metricCollector = new MetricCollector ( ) ;
const totals = {
kafkaPulled : 0 ,
dbInserted : 0 ,
parseError : 0 ,
dbFailed : 0
} ;
const flushIntervalMs = Math . max ( 3e3 , Number . isFinite ( config . kafka . flushIntervalMs ) ? config . kafka . flushIntervalMs : 3e3 ) ;
const queue = [ ] ;
let flushTimer = null ;
let flushing = false ;
const runCounterTimer = setInterval ( ( ) => {
logger . info ( "Run counters" , {
kafkaPulled : totals . kafkaPulled ,
dbInserted : totals . dbInserted ,
parseError : totals . parseError ,
dbFailed : totals . dbFailed
} ) ;
} , 1e4 ) ;
const handleError = ( error , message ) => {
logger . error ( "Kafka processing error" , {
error : error ? . message ,
type : error ? . type ,
topic : message ? . topic ,
partition : message ? . partition ,
offset : message ? . offset
} ) ;
} ;
cron . schedule ( "* * * * *" , ( ) => {
const metrics = metricCollector . getAndReset ( ) ;
const flushAvgMs = metrics . batch _flush _count > 0 ? ( metrics . batch _flush _ms _sum / metrics . batch _flush _count ) . toFixed ( 1 ) : "0.0" ;
const dbAvgMs = metrics . db _insert _count > 0 ? ( metrics . db _insert _ms _sum / metrics . db _insert _count ) . toFixed ( 1 ) : "0.0" ;
logger . info ( "Minute metrics" , {
kafkaPulled : metrics . kafka _pulled ,
parseError : metrics . parse _error ,
dbInserted : metrics . db _inserted ,
dbFailed : metrics . db _failed ,
flushAvgMs ,
dbAvgMs
} ) ;
} ) ;
const processValidRowsWithRetry = async ( registerRows , roomStatusRows ) => {
const startedAt = Date . now ( ) ;
while ( true ) {
try {
await dbManager . insertRegisterRows ( {
schema : config . db . schema ,
table : config . db . table ,
rows : registerRows
} ) ;
await dbManager . updateRoomStatusRows ( {
schema : config . db . roomStatusSchema ,
table : config . db . roomStatusTable ,
rows : roomStatusRows
} ) ;
metricCollector . increment ( "db_insert_count" , 1 ) ;
metricCollector . increment ( "db_insert_ms_sum" , Date . now ( ) - startedAt ) ;
metricCollector . increment ( "db_inserted" , registerRows . length ) ;
totals . dbInserted += registerRows . length ;
return ;
} catch ( err ) {
if ( ! isDbConnectionError ( err ) ) {
throw err ;
}
logger . warn ( "Database unavailable, retrying in 5s" , { error : err ? . message } ) ;
await sleep ( 5e3 ) ;
}
}
} ;
const scheduleFlush = ( ) => {
if ( flushTimer ) {
return ;
}
flushTimer = setTimeout ( ( ) => {
flushTimer = null ;
void flushQueue ( ) ;
} , flushIntervalMs ) ;
} ;
const flushQueue = async ( ) => {
if ( flushing ) {
return ;
}
if ( queue . length === 0 ) {
return ;
}
flushing = true ;
const startedAt = Date . now ( ) ;
const currentBatch = queue . splice ( 0 , queue . length ) ;
const parsedItems = [ ] ;
for ( const item of currentBatch ) {
try {
const parsed = parseMessageToRows ( item . message ) ;
parsedItems . push ( { item , parsed } ) ;
} catch ( err ) {
metricCollector . increment ( "parse_error" ) ;
totals . parseError += 1 ;
handleError ( err , item . message ) ;
item . resolve ( ) ;
}
}
const insertParsedItems = async ( items ) => {
if ( items . length === 0 ) {
return ;
}
const registerRows = items . flatMap ( ( it ) => it . parsed . registerRows ) ;
const roomStatusRows = items . flatMap ( ( it ) => it . parsed . roomStatusRows ) ;
try {
await processValidRowsWithRetry ( registerRows , roomStatusRows ) ;
} catch ( err ) {
if ( items . length > 1 ) {
const mid = Math . floor ( items . length / 2 ) ;
await insertParsedItems ( items . slice ( 0 , mid ) ) ;
await insertParsedItems ( items . slice ( mid ) ) ;
return ;
}
metricCollector . increment ( "db_failed" , 1 ) ;
totals . dbFailed += 1 ;
handleError ( err , items [ 0 ] . item . message ) ;
}
} ;
if ( parsedItems . length > 0 ) {
await insertParsedItems ( parsedItems ) ;
for ( const parsedItem of parsedItems ) {
parsedItem . item . resolve ( ) ;
}
}
metricCollector . increment ( "batch_flush_count" , 1 ) ;
metricCollector . increment ( "batch_flush_ms_sum" , Date . now ( ) - startedAt ) ;
flushing = false ;
if ( queue . length > 0 ) {
scheduleFlush ( ) ;
}
} ;
const handleMessage = ( message ) => {
metricCollector . increment ( "kafka_pulled" ) ;
totals . kafkaPulled += 1 ;
return new Promise ( ( resolve ) => {
queue . push ( { message , resolve } ) ;
scheduleFlush ( ) ;
} ) ;
} ;
const consumers = createKafkaConsumers ( {
kafkaConfig : config . kafka ,
onMessage : handleMessage ,
onError : handleError
} ) ;
const shutdown = async ( signal ) => {
logger . info ( ` Received ${ signal } , shutting down... ` ) ;
try {
if ( flushTimer ) {
clearTimeout ( flushTimer ) ;
flushTimer = null ;
}
clearInterval ( runCounterTimer ) ;
await flushQueue ( ) ;
if ( consumers && consumers . length > 0 ) {
await Promise . all ( consumers . map ( ( consumer ) => new Promise ( ( resolve ) => consumer . close ( true , resolve ) ) ) ) ;
}
await dbManager . close ( ) ;
logger . info ( "Run summary" , {
kafkaPulled : totals . kafkaPulled ,
dbInserted : totals . dbInserted ,
parseError : totals . parseError ,
dbFailed : totals . dbFailed
} ) ;
process . exit ( 0 ) ;
} catch ( err ) {
logger . error ( "Error during shutdown" , { error : err ? . message } ) ;
process . exit ( 1 ) ;
}
} ;
process . on ( "SIGTERM" , ( ) => shutdown ( "SIGTERM" ) ) ;
process . on ( "SIGINT" , ( ) => shutdown ( "SIGINT" ) ) ;
} ;
bootstrap ( ) . catch ( ( error ) => {
logger . error ( "Service bootstrap failed" , { error : error ? . message } ) ;
process . exit ( 1 ) ;
} ) ;