Files
Web_BLS_Register_Server/bls-register-backend/dist/index.js

1025 lines
34 KiB
JavaScript
Raw Normal View History

import cron from "node-cron";
import dotenv from "dotenv";
import pg from "pg";
import kafka from "kafka-node";
import { z } from "zod";
dotenv.config();
const parseNumber = (value, defaultValue) => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : defaultValue;
};
const parseList = (value) => (value || "").split(",").map((item) => item.trim()).filter(Boolean);
const config = {
env: process.env.NODE_ENV || "development",
port: parseNumber(process.env.PORT, 3001),
kafka: {
brokers: parseList(process.env.KAFKA_BROKERS),
topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || "blwlog4Nodejs-rcu-register-topic",
groupId: process.env.KAFKA_GROUP_ID || "bls-register-consumer",
clientId: process.env.KAFKA_CLIENT_ID || "bls-register-consumer-client",
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 2e4),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 50 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 256 * 1024),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
fromOffset: process.env.KAFKA_FROM_OFFSET || "latest",
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5e3),
commitIntervalMs: parseNumber(process.env.KAFKA_COMMIT_INTERVAL_MS, 200),
commitOnAttempt: process.env.KAFKA_COMMIT_ON_ATTEMPT === "true",
batchSize: parseNumber(process.env.KAFKA_BATCH_SIZE, 5e3),
batchTimeoutMs: parseNumber(process.env.KAFKA_BATCH_TIMEOUT_MS, 50),
flushIntervalMs: parseNumber(process.env.KAFKA_FLUSH_INTERVAL_MS, 3e3),
logMessages: process.env.KAFKA_LOG_MESSAGES === "true",
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || "plain",
username: process.env.KAFKA_SASL_USERNAME,
password: process.env.KAFKA_SASL_PASSWORD
} : void 0
},
db: {
host: process.env.POSTGRES_HOST_G5,
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 6),
ssl: process.env.POSTGRES_SSL_G5 === "true" ? { rejectUnauthorized: false } : void 0,
schema: process.env.DB_SCHEMA || "rcu_info",
table: process.env.DB_TABLE || "rcu_info_events_g5",
roomStatusSchema: process.env.DB_ROOM_STATUS_SCHEMA || "room_status",
roomStatusTable: process.env.DB_ROOM_STATUS_TABLE || "room_status_moment_g5"
},
redis: {
host: process.env.REDIS_HOST || "localhost",
port: parseNumber(process.env.REDIS_PORT, 6379),
password: process.env.REDIS_PASSWORD || void 0,
db: parseNumber(process.env.REDIS_DB, 0),
projectName: process.env.REDIS_PROJECT_NAME || "bls-onoffline",
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3001)}`
}
};
const format = (level, message, context) => {
const payload = {
level,
message,
timestamp: Date.now(),
...context ? { context } : {}
};
return JSON.stringify(payload);
};
const logger = {
info(message, context) {
process.stdout.write(`${format("info", message, context)}
`);
},
error(message, context) {
process.stderr.write(`${format("error", message, context)}
`);
},
warn(message, context) {
process.stderr.write(`${format("warn", message, context)}
`);
}
};
const { Pool } = pg;
const registerColumns = [
"ts_ms",
"hotel_id",
"room_id",
"device_id",
"write_ts_ms",
"is_send",
"udp_raw",
"extra",
"ip_type",
"model_num",
"server_ip",
"ip",
"subnet_mask",
"gateway",
"dns",
"app_version",
"rcu_time",
"launcher_version",
"mac",
"room_type_id",
"config_version",
"room_status",
"season",
"sys_lock_status",
"authorization_time",
"authorization_days",
"room_num_remark",
"room_type_remark",
"room_remark",
"mcu_name",
"central_control_name",
"configure_hotel_name",
"configure_room_type_name"
];
const roomStatusColumns = [
"hotel_id",
"room_id",
"ip",
"app_version",
"launcher_version",
"config_version",
"upgrade_ts_ms",
"register_ts_ms"
];
class DatabaseManager {
constructor(dbConfig) {
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
}
async insertRegisterRows({ schema, table, rows }) {
if (!rows || rows.length === 0) {
return;
}
const statement = `
INSERT INTO ${schema}.${table} (${registerColumns.join(", ")})
SELECT *
FROM UNNEST(
$1::int8[],
$2::int2[],
$3::text[],
$4::text[],
$5::int8[],
$6::int2[],
$7::text[],
$8::jsonb[],
$9::int2[],
$10::text[],
$11::text[],
$12::text[],
$13::text[],
$14::text[],
$15::text[],
$16::text[],
$17::text[],
$18::text[],
$19::text[],
$20::int8[],
$21::text[],
$22::int4[],
$23::int4[],
$24::int4[],
$25::text[],
$26::text[],
$27::text[],
$28::text[],
$29::text[],
$30::text[],
$31::text[],
$32::text[],
$33::text[]
)
ON CONFLICT DO NOTHING
`;
try {
const params = registerColumns.map((column) => rows.map((row) => row[column] ?? null));
await this.pool.query(statement, params);
} catch (error) {
logger.error("Register table insert failed", {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async updateRoomStatusRows({ schema, table, rows }) {
if (!rows || rows.length === 0) {
return;
}
const statement = `
WITH incoming AS (
SELECT *
FROM UNNEST(
$1::int2[],
$2::text[],
$3::text[],
$4::text[],
$5::text[],
$6::text[],
$7::int8[],
$8::int8[]
) AS u(${roomStatusColumns.join(", ")})
), dedup AS (
SELECT DISTINCT ON (hotel_id, room_id)
hotel_id,
room_id,
ip,
app_version,
launcher_version,
config_version,
upgrade_ts_ms,
register_ts_ms
FROM incoming
ORDER BY hotel_id, room_id, register_ts_ms DESC
), existing AS (
SELECT i.*, t.device_id
FROM dedup i
INNER JOIN ${schema}.${table} t
ON t.hotel_id = i.hotel_id
AND t.room_id = i.room_id
)
INSERT INTO ${schema}.${table} (
hotel_id,
room_id,
device_id,
ip,
app_version,
launcher_version,
config_version,
upgrade_ts_ms,
register_ts_ms
)
SELECT
hotel_id,
room_id,
device_id,
ip,
app_version,
launcher_version,
config_version,
upgrade_ts_ms,
register_ts_ms
FROM existing
ON CONFLICT (hotel_id, room_id) DO UPDATE
SET
ip = EXCLUDED.ip,
app_version = EXCLUDED.app_version,
launcher_version = EXCLUDED.launcher_version,
config_version = EXCLUDED.config_version,
upgrade_ts_ms = EXCLUDED.upgrade_ts_ms,
register_ts_ms = EXCLUDED.register_ts_ms
`;
try {
const params = roomStatusColumns.map((column) => rows.map((row) => row[column] ?? null));
await this.pool.query(statement, params);
} catch (error) {
logger.error("Room status table update failed", {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async checkConnection() {
let client;
try {
const connectPromise = this.pool.connect();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Connection timeout")), 5e3);
});
try {
client = await Promise.race([connectPromise, timeoutPromise]);
} catch (raceError) {
connectPromise.then((c) => c.release()).catch(() => {
});
throw raceError;
}
await client.query("SELECT 1");
return true;
} catch (err) {
logger.error("Database check connection failed", { error: err.message });
return false;
} finally {
if (client) {
client.release();
}
}
}
async close() {
await this.pool.end();
}
}
const dbManager = new DatabaseManager(config.db);
class OffsetTracker {
constructor() {
this.partitions = /* @__PURE__ */ new Map();
}
// Called when a message is received (before processing)
add(topic, partition, offset) {
const key = `${topic}-${partition}`;
if (!this.partitions.has(key)) {
this.partitions.set(key, { nextCommitOffset: null, done: /* @__PURE__ */ new Set() });
}
const state = this.partitions.get(key);
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return;
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
} else if (numericOffset < state.nextCommitOffset) {
state.nextCommitOffset = numericOffset;
}
}
// Called when a message is successfully processed
// Returns the next offset to commit (if any advancement is possible), or null
markDone(topic, partition, offset) {
const key = `${topic}-${partition}`;
const state = this.partitions.get(key);
if (!state) return null;
const numericOffset = Number(offset);
if (!Number.isFinite(numericOffset)) return null;
state.done.add(numericOffset);
if (state.nextCommitOffset === null) {
state.nextCommitOffset = numericOffset;
}
let advanced = false;
while (state.nextCommitOffset !== null && state.done.has(state.nextCommitOffset)) {
state.done.delete(state.nextCommitOffset);
state.nextCommitOffset += 1;
advanced = true;
}
if (!advanced) return null;
return state.nextCommitOffset;
}
clear() {
this.partitions.clear();
}
}
const { ConsumerGroup } = kafka;
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
const kafkaHost = kafkaConfig.brokers.join(",");
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 5e3;
const commitIntervalMs = Number.isFinite(kafkaConfig.commitIntervalMs) ? kafkaConfig.commitIntervalMs : 200;
let inFlight = 0;
const tracker = new OffsetTracker();
let pendingCommits = /* @__PURE__ */ new Map();
let commitTimer = null;
const flushCommits = () => {
if (pendingCommits.size === 0) return;
const batch = pendingCommits;
pendingCommits = /* @__PURE__ */ new Map();
consumer.sendOffsetCommitRequest(
Array.from(batch.values()),
(err) => {
if (err) {
for (const [k, v] of batch.entries()) {
pendingCommits.set(k, v);
}
logger.error("Kafka commit failed", { error: err?.message, count: batch.size });
}
}
);
};
const scheduleCommitFlush = () => {
if (commitTimer) return;
commitTimer = setTimeout(() => {
commitTimer = null;
flushCommits();
}, commitIntervalMs);
};
const consumer = new ConsumerGroup(
{
kafkaHost,
groupId: kafkaConfig.groupId,
clientId,
id,
fromOffset: kafkaConfig.fromOffset || "latest",
protocol: ["roundrobin"],
outOfRangeOffset: "latest",
autoCommit: false,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
sasl: kafkaConfig.sasl
},
kafkaConfig.topic
);
const tryResume = () => {
if (inFlight < maxInFlight && consumer.paused) {
consumer.resume();
}
};
consumer.on("message", (message) => {
inFlight += 1;
tracker.add(message.topic, message.partition, message.offset);
if (inFlight >= maxInFlight) {
consumer.pause();
}
Promise.resolve(onMessage(message)).then(() => {
}).catch((error) => {
logger.error("Kafka message handling failed", { error: error?.message });
if (onError) {
onError(error, message);
}
}).finally(() => {
const commitOffset = tracker.markDone(message.topic, message.partition, message.offset);
if (commitOffset !== null) {
const key = `${message.topic}-${message.partition}`;
pendingCommits.set(key, {
topic: message.topic,
partition: message.partition,
offset: commitOffset,
metadata: "m"
});
scheduleCommitFlush();
}
inFlight -= 1;
tryResume();
});
});
consumer.on("error", (error) => {
logger.error("Kafka consumer error", { error: error?.message });
if (onError) {
onError(error);
}
});
consumer.on("connect", () => {
logger.info(`Kafka Consumer connected`, {
groupId: kafkaConfig.groupId,
clientId
});
});
consumer.on("rebalancing", () => {
logger.info(`Kafka Consumer rebalancing`, {
groupId: kafkaConfig.groupId,
clientId
});
tracker.clear();
pendingCommits.clear();
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
});
consumer.on("rebalanced", () => {
logger.info("Kafka Consumer rebalanced", { clientId, groupId: kafkaConfig.groupId });
});
consumer.on("error", (err) => {
logger.error("Kafka Consumer Error", { error: err.message });
});
consumer.on("offsetOutOfRange", (err) => {
logger.warn("Offset out of range", { error: err.message, topic: err.topic, partition: err.partition });
});
consumer.on("offsetOutOfRange", (error) => {
logger.warn(`Kafka Consumer offset out of range`, {
error: error?.message,
groupId: kafkaConfig.groupId,
clientId
});
});
consumer.on("close", () => {
if (commitTimer) {
clearTimeout(commitTimer);
commitTimer = null;
}
flushCommits();
logger.warn(`Kafka Consumer closed`, {
groupId: kafkaConfig.groupId,
clientId
});
});
return consumer;
};
const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from(
{ length: count },
(_, idx) => createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
);
};
const toNumber = (value) => {
if (value === void 0 || value === null || value === "") {
return null;
}
if (typeof value === "number") {
return value;
}
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
};
const toStringAllowEmpty = (value) => {
if (value === void 0 || value === null) {
return value;
}
return String(value);
};
const kafkaPayloadSchema = z.object({
ts_ms: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
upgrade_ts_ms: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
hotel_id: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
room_id: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
device_id: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
is_send: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
udp_raw: z.any().optional().nullable(),
extra: z.any().optional().nullable(),
ip_type: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
model_num: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
server_ip: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
ip: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
subnet_mask: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
gateway: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
dns: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
app_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
rcu_time: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
launcher_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
mac: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
room_type_id: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
config_version: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
room_status: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
season: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
sys_lock_status: z.preprocess(toNumber, z.number().nullable()).optional().nullable(),
authorization_time: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
authorization_days: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
room_num_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
room_type_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
room_remark: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
mcu_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
central_control_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
configure_hotel_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable(),
configure_room_type_name: z.preprocess(toStringAllowEmpty, z.string().nullable()).optional().nullable()
});
const normalizeText = (value, maxLength) => {
if (value === void 0 || value === null) {
return null;
}
const str = String(value).replace(/\u0000/g, "");
if (maxLength && str.length > maxLength) {
return str.substring(0, maxLength);
}
return str;
};
const sanitizeJsonValue = (value) => {
if (value === void 0 || value === null) {
return value;
}
if (typeof value === "string") {
return value.replace(/\u0000/g, "");
}
if (Array.isArray(value)) {
return value.map((item) => sanitizeJsonValue(item));
}
if (typeof value === "object") {
const out = {};
for (const [k, v] of Object.entries(value)) {
out[k] = sanitizeJsonValue(v);
}
return out;
}
return value;
};
const isLikelyBase64 = (text) => {
if (!text || text.length % 4 !== 0) {
return false;
}
return /^[A-Za-z0-9+/]+={0,2}$/.test(text);
};
const normalizeInteger = (value) => {
if (value === void 0 || value === null || value === "") {
return null;
}
const numeric = typeof value === "number" ? value : Number(value);
if (!Number.isFinite(numeric)) {
return null;
}
return Math.trunc(numeric);
};
const inRangeOr = (value, min, max, fallback) => {
if (typeof value !== "number" || Number.isNaN(value) || value < min || value > max) {
return fallback;
}
return value;
};
const normalizeTsMs = (value) => {
const numeric = normalizeInteger(value);
if (numeric === null) {
return Date.now();
}
if (numeric > 0 && numeric < 1e11) {
return numeric * 1e3;
}
return numeric;
};
const normalizeUdpRaw = (value) => {
if (value === void 0 || value === null) {
return null;
}
if (typeof value === "string") {
const text = value.replace(/\u0000/g, "");
if (isLikelyBase64(text)) {
return text;
}
return Buffer.from(text, "utf8").toString("base64");
}
if (Buffer.isBuffer(value)) {
return value.toString("base64");
}
if (Array.isArray(value)) {
return Buffer.from(value).toString("base64");
}
return Buffer.from(String(value), "utf8").toString("base64");
};
const normalizeExtra = (value) => {
if (value === void 0 || value === null || value === "") {
return null;
}
if (typeof value === "object") {
return sanitizeJsonValue(value);
}
if (typeof value === "string") {
try {
const parsed = JSON.parse(value);
if (parsed && typeof parsed === "object") {
return sanitizeJsonValue(parsed);
}
return sanitizeJsonValue({ value: parsed });
} catch {
return sanitizeJsonValue({ raw: value });
}
}
return sanitizeJsonValue({ raw: String(value) });
};
const pick = (payload, snakeKey, pascalKey) => {
if (payload[snakeKey] !== void 0) {
return payload[snakeKey];
}
if (payload[pascalKey] !== void 0) {
return payload[pascalKey];
}
return void 0;
};
const buildRowsFromPayload = (rawPayload) => {
const normalizedInput = {
ts_ms: pick(rawPayload, "ts_ms", "ts_ms"),
upgrade_ts_ms: pick(rawPayload, "upgrade_ts_ms", "upgrade_ts_ms"),
hotel_id: pick(rawPayload, "hotel_id", "hotel_id"),
room_id: pick(rawPayload, "room_id", "room_id"),
device_id: pick(rawPayload, "device_id", "device_id"),
is_send: pick(rawPayload, "is_send", "is_send"),
udp_raw: pick(rawPayload, "udp_raw", "udp_raw"),
extra: pick(rawPayload, "extra", "extra"),
ip_type: pick(rawPayload, "ip_type", "ip_type"),
model_num: pick(rawPayload, "model_num", "model_num"),
server_ip: pick(rawPayload, "server_ip", "server_ip"),
ip: pick(rawPayload, "ip", "ip"),
subnet_mask: pick(rawPayload, "subnet_mask", "subnet_mask"),
gateway: pick(rawPayload, "gateway", "gateway"),
dns: pick(rawPayload, "dns", "dns"),
app_version: pick(rawPayload, "app_version", "app_version"),
rcu_time: pick(rawPayload, "rcu_time", "rcu_time"),
launcher_version: pick(rawPayload, "launcher_version", "launcher_version"),
mac: pick(rawPayload, "mac", "mac"),
room_type_id: pick(rawPayload, "room_type_id", "room_type_id"),
config_version: pick(rawPayload, "config_version", "config_version"),
room_status: pick(rawPayload, "room_status", "room_status"),
season: pick(rawPayload, "season", "season"),
sys_lock_status: pick(rawPayload, "sys_lock_status", "sys_lock_status"),
authorization_time: pick(rawPayload, "authorization_time", "authorization_time"),
authorization_days: pick(rawPayload, "authorization_days", "authorization_days"),
room_num_remark: pick(rawPayload, "room_num_remark", "room_num_remark"),
room_type_remark: pick(rawPayload, "room_type_remark", "room_type_remark"),
room_remark: pick(rawPayload, "room_remark", "room_remark"),
mcu_name: pick(rawPayload, "mcu_name", "mcu_name"),
central_control_name: pick(rawPayload, "central_control_name", "central_control_name"),
configure_hotel_name: pick(rawPayload, "configure_hotel_name", "configure_hotel_name"),
configure_room_type_name: pick(rawPayload, "configure_room_type_name", "configure_room_type_name")
};
const payload = kafkaPayloadSchema.parse(normalizedInput);
const tsMs = normalizeTsMs(payload.ts_ms);
const hotelId = inRangeOr(normalizeInteger(payload.hotel_id), -32768, 32767, 0);
const roomId = normalizeText(payload.room_id, 50) || "";
const registerRow = {
ts_ms: tsMs,
hotel_id: hotelId,
room_id: roomId,
device_id: normalizeText(payload.device_id, 64),
write_ts_ms: Date.now(),
is_send: inRangeOr(normalizeInteger(payload.is_send), -32768, 32767, 0),
udp_raw: normalizeUdpRaw(payload.udp_raw),
extra: normalizeExtra(payload.extra),
ip_type: inRangeOr(normalizeInteger(payload.ip_type), -32768, 32767, null),
model_num: normalizeText(payload.model_num, 32),
server_ip: normalizeText(payload.server_ip, 21),
ip: normalizeText(payload.ip, 21),
subnet_mask: normalizeText(payload.subnet_mask, 15),
gateway: normalizeText(payload.gateway, 15),
dns: normalizeText(payload.dns, 15),
app_version: normalizeText(payload.app_version, 64),
rcu_time: normalizeText(payload.rcu_time, 25),
launcher_version: normalizeText(payload.launcher_version, 64),
mac: normalizeText(payload.mac, 17),
room_type_id: normalizeInteger(payload.room_type_id),
config_version: normalizeText(payload.config_version, 32),
room_status: inRangeOr(normalizeInteger(payload.room_status), -2147483648, 2147483647, null),
season: inRangeOr(normalizeInteger(payload.season), -2147483648, 2147483647, null),
sys_lock_status: inRangeOr(normalizeInteger(payload.sys_lock_status), -2147483648, 2147483647, null),
authorization_time: normalizeText(payload.authorization_time, 10),
authorization_days: normalizeText(payload.authorization_days, 10),
room_num_remark: normalizeText(payload.room_num_remark, 255),
room_type_remark: normalizeText(payload.room_type_remark, 64),
room_remark: normalizeText(payload.room_remark, 64),
mcu_name: normalizeText(payload.mcu_name, 255),
central_control_name: normalizeText(payload.central_control_name, 255),
configure_hotel_name: normalizeText(payload.configure_hotel_name, 255),
configure_room_type_name: normalizeText(payload.configure_room_type_name, 255)
};
const roomStatusUpdateRow = {
hotel_id: hotelId,
room_id: roomId,
ip: registerRow.ip,
app_version: registerRow.app_version,
launcher_version: registerRow.launcher_version,
config_version: registerRow.config_version,
upgrade_ts_ms: normalizeTsMs(payload.upgrade_ts_ms),
register_ts_ms: tsMs
};
return {
registerRows: [registerRow],
roomStatusRows: [roomStatusUpdateRow]
};
};
const parseMessageToRows = (message) => {
const rawValue = message.value.toString();
let payload;
try {
payload = JSON.parse(rawValue);
} catch (e) {
const error = new Error(`JSON Parse Error: ${e.message}`);
error.type = "PARSE_ERROR";
throw error;
}
const validationResult = kafkaPayloadSchema.safeParse(payload);
if (!validationResult.success) {
const error = new Error(`Schema Validation Failed: ${JSON.stringify(validationResult.error.errors)}`);
error.type = "VALIDATION_ERROR";
throw error;
}
return buildRowsFromPayload(payload);
};
class MetricCollector {
constructor() {
this.reset();
}
reset() {
this.metrics = {
kafka_pulled: 0,
parse_error: 0,
db_inserted: 0,
db_failed: 0,
db_insert_count: 0,
db_insert_ms_sum: 0,
batch_flush_count: 0,
batch_flush_ms_sum: 0
};
this.keyed = {};
}
increment(metric, count = 1) {
if (this.metrics.hasOwnProperty(metric)) {
this.metrics[metric] += count;
}
}
incrementKeyed(metric, key, count = 1) {
if (!key) return;
if (!this.keyed[metric]) {
this.keyed[metric] = {};
}
if (!Object.prototype.hasOwnProperty.call(this.keyed[metric], key)) {
this.keyed[metric][key] = 0;
}
this.keyed[metric][key] += count;
}
getAndReset() {
const current = { ...this.metrics };
const keyed = JSON.parse(JSON.stringify(this.keyed));
this.reset();
return { ...current, keyed };
}
}
const NETWORK_CODES = /* @__PURE__ */ new Set([
"ECONNREFUSED",
"ECONNRESET",
"EPIPE",
"ETIMEDOUT",
"ENOTFOUND",
"EHOSTUNREACH",
"ENETUNREACH",
"57P03",
"08006",
"08001",
"08000",
"08003"
]);
const isDbConnectionError = (err) => {
if (typeof err?.code === "string" && NETWORK_CODES.has(err.code)) {
return true;
}
const message = typeof err?.message === "string" ? err.message.toLowerCase() : "";
return message.includes("connection timeout") || message.includes("connection terminated") || message.includes("connection refused") || message.includes("terminating connection") || message.includes("econnrefused") || message.includes("econnreset") || message.includes("etimedout") || message.includes("could not connect") || message.includes("the database system is starting up") || message.includes("no pg_hba.conf entry");
};
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const bootstrap = async () => {
logger.info("Starting register consumer", {
env: config.env,
kafka: {
brokers: config.kafka.brokers,
topic: config.kafka.topic,
groupId: config.kafka.groupId
},
db: {
host: config.db.host,
port: config.db.port,
database: config.db.database,
schema: config.db.schema,
table: config.db.table,
roomStatusSchema: config.db.roomStatusSchema,
roomStatusTable: config.db.roomStatusTable
},
flushIntervalMs: config.kafka.flushIntervalMs
});
const metricCollector = new MetricCollector();
const totals = {
kafkaPulled: 0,
dbInserted: 0,
parseError: 0,
dbFailed: 0
};
const flushIntervalMs = Math.max(3e3, Number.isFinite(config.kafka.flushIntervalMs) ? config.kafka.flushIntervalMs : 3e3);
const queue = [];
let flushTimer = null;
let flushing = false;
const runCounterTimer = setInterval(() => {
logger.info("Run counters", {
kafkaPulled: totals.kafkaPulled,
dbInserted: totals.dbInserted,
parseError: totals.parseError,
dbFailed: totals.dbFailed
});
}, 1e4);
const handleError = (error, message) => {
logger.error("Kafka processing error", {
error: error?.message,
type: error?.type,
topic: message?.topic,
partition: message?.partition,
offset: message?.offset
});
};
cron.schedule("* * * * *", () => {
const metrics = metricCollector.getAndReset();
const flushAvgMs = metrics.batch_flush_count > 0 ? (metrics.batch_flush_ms_sum / metrics.batch_flush_count).toFixed(1) : "0.0";
const dbAvgMs = metrics.db_insert_count > 0 ? (metrics.db_insert_ms_sum / metrics.db_insert_count).toFixed(1) : "0.0";
logger.info("Minute metrics", {
kafkaPulled: metrics.kafka_pulled,
parseError: metrics.parse_error,
dbInserted: metrics.db_inserted,
dbFailed: metrics.db_failed,
flushAvgMs,
dbAvgMs
});
});
const processValidRowsWithRetry = async (registerRows, roomStatusRows) => {
const startedAt = Date.now();
while (true) {
try {
await dbManager.insertRegisterRows({
schema: config.db.schema,
table: config.db.table,
rows: registerRows
});
await dbManager.updateRoomStatusRows({
schema: config.db.roomStatusSchema,
table: config.db.roomStatusTable,
rows: roomStatusRows
});
metricCollector.increment("db_insert_count", 1);
metricCollector.increment("db_insert_ms_sum", Date.now() - startedAt);
metricCollector.increment("db_inserted", registerRows.length);
totals.dbInserted += registerRows.length;
return;
} catch (err) {
if (!isDbConnectionError(err)) {
throw err;
}
logger.warn("Database unavailable, retrying in 5s", { error: err?.message });
await sleep(5e3);
}
}
};
const scheduleFlush = () => {
if (flushTimer) {
return;
}
flushTimer = setTimeout(() => {
flushTimer = null;
void flushQueue();
}, flushIntervalMs);
};
const flushQueue = async () => {
if (flushing) {
return;
}
if (queue.length === 0) {
return;
}
flushing = true;
const startedAt = Date.now();
const currentBatch = queue.splice(0, queue.length);
const parsedItems = [];
for (const item of currentBatch) {
try {
const parsed = parseMessageToRows(item.message);
parsedItems.push({ item, parsed });
} catch (err) {
metricCollector.increment("parse_error");
totals.parseError += 1;
handleError(err, item.message);
item.resolve();
}
}
const insertParsedItems = async (items) => {
if (items.length === 0) {
return;
}
const registerRows = items.flatMap((it) => it.parsed.registerRows);
const roomStatusRows = items.flatMap((it) => it.parsed.roomStatusRows);
try {
await processValidRowsWithRetry(registerRows, roomStatusRows);
} catch (err) {
if (items.length > 1) {
const mid = Math.floor(items.length / 2);
await insertParsedItems(items.slice(0, mid));
await insertParsedItems(items.slice(mid));
return;
}
metricCollector.increment("db_failed", 1);
totals.dbFailed += 1;
handleError(err, items[0].item.message);
}
};
if (parsedItems.length > 0) {
await insertParsedItems(parsedItems);
for (const parsedItem of parsedItems) {
parsedItem.item.resolve();
}
}
metricCollector.increment("batch_flush_count", 1);
metricCollector.increment("batch_flush_ms_sum", Date.now() - startedAt);
flushing = false;
if (queue.length > 0) {
scheduleFlush();
}
};
const handleMessage = (message) => {
metricCollector.increment("kafka_pulled");
totals.kafkaPulled += 1;
return new Promise((resolve) => {
queue.push({ message, resolve });
scheduleFlush();
});
};
const consumers = createKafkaConsumers({
kafkaConfig: config.kafka,
onMessage: handleMessage,
onError: handleError
});
const shutdown = async (signal) => {
logger.info(`Received ${signal}, shutting down...`);
try {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
clearInterval(runCounterTimer);
await flushQueue();
if (consumers && consumers.length > 0) {
await Promise.all(consumers.map((consumer) => new Promise((resolve) => consumer.close(true, resolve))));
}
await dbManager.close();
logger.info("Run summary", {
kafkaPulled: totals.kafkaPulled,
dbInserted: totals.dbInserted,
parseError: totals.parseError,
dbFailed: totals.dbFailed
});
process.exit(0);
} catch (err) {
logger.error("Error during shutdown", { error: err?.message });
process.exit(1);
}
};
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
};
bootstrap().catch((error) => {
logger.error("Service bootstrap failed", { error: error?.message });
process.exit(1);
});