diff --git a/bls-onoffline-backend/.env b/bls-onoffline-backend/.env index 0398351..889fe71 100644 --- a/bls-onoffline-backend/.env +++ b/bls-onoffline-backend/.env @@ -29,6 +29,16 @@ POSTGRES_IDLE_TIMEOUT_MS=30000 DB_SCHEMA=onoffline DB_TABLE=onoffline_record +# ========================= +# PostgreSQL 配置 G5库专用 +# ========================= +POSTGRES_HOST_G5=10.8.8.80 +POSTGRES_PORT_G5=5434 +POSTGRES_DATABASE_G5=log_platform +POSTGRES_USER_G5=log_admin +POSTGRES_PASSWORD_G5=H3IkLUt8K!x +POSTGRES_IDLE_TIMEOUT_MS_G5=30000 + PORT=3001 LOG_LEVEL=info diff --git a/bls-onoffline-backend/spec/onoffline-spec.md b/bls-onoffline-backend/spec/onoffline-spec.md index 618377d..8115bef 100644 --- a/bls-onoffline-backend/spec/onoffline-spec.md +++ b/bls-onoffline-backend/spec/onoffline-spec.md @@ -35,6 +35,15 @@ Topic:blwlog4Nodejs-rcu-onoffline-topic 主键:(ts_ms, mac, device_id, room_id) 按 ts_ms 每日分区 +G5库结构(双写,临时接入): +库同为:log_platform +表:onoffline_record_g5 +差异字段: + - guid 为 int4,由库自己生成。 + - record_source 固定为 CRICS。 + - current_status 为 int2,on映射为1,off映射为2,其余为0。 +支持通过环境变量开关双写。 + 4. 数据处理规则 非重启数据:reboot_reason 为空或不存在,current_status 取 CurrentStatus 重启数据:reboot_reason 不为空,current_status 固定为 on diff --git a/bls-onoffline-backend/src/config/config.js b/bls-onoffline-backend/src/config/config.js index 67fd8d3..1669d9f 100644 --- a/bls-onoffline-backend/src/config/config.js +++ b/bls-onoffline-backend/src/config/config.js @@ -49,6 +49,18 @@ export const config = { schema: process.env.DB_SCHEMA || 'onoffline', table: process.env.DB_TABLE || 'onoffline_record' }, + g5db: { + enabled: !!process.env.POSTGRES_HOST_G5, + host: process.env.POSTGRES_HOST_G5, + port: parseNumber(process.env.POSTGRES_PORT_G5, 5434), + user: process.env.POSTGRES_USER_G5, + password: process.env.POSTGRES_PASSWORD_G5, + database: process.env.POSTGRES_DATABASE_G5, + max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 3), + ssl: process.env.POSTGRES_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined, + schema: process.env.DB_SCHEMA_G5 || 'onoffline', + table: process.env.DB_TABLE_G5 || 'onoffline_record_g5' + }, redis: { host: process.env.REDIS_HOST || 'localhost', port: parseNumber(process.env.REDIS_PORT, 6379), diff --git a/bls-onoffline-backend/src/db/g5DatabaseManager.js b/bls-onoffline-backend/src/db/g5DatabaseManager.js new file mode 100644 index 0000000..99c8a01 --- /dev/null +++ b/bls-onoffline-backend/src/db/g5DatabaseManager.js @@ -0,0 +1,121 @@ +import pg from 'pg'; +import { config } from '../config/config.js'; +import { logger } from '../utils/logger.js'; + +const { Pool } = pg; + +const g5Columns = [ + 'ts_ms', + 'write_ts_ms', + 'hotel_id', + 'mac', + 'device_id', + 'room_id', + 'ip', + 'current_status', + 'launcher_version', + 'reboot_reason', + 'record_source' +]; + +export class G5DatabaseManager { + constructor(dbConfig) { + if (!dbConfig.enabled) return; + this.pool = new Pool({ + host: dbConfig.host, + port: dbConfig.port, + user: dbConfig.user, + password: dbConfig.password, + database: dbConfig.database, + max: dbConfig.max, + ssl: dbConfig.ssl + }); + } + + async insertRows({ schema, table, rows }) { + if (!this.pool || !rows || rows.length === 0) { + return; + } + + const statement = ` + INSERT INTO ${schema}.${table} (${g5Columns.join(', ')}) + SELECT * + FROM UNNEST( + $1::int8[], + $2::int8[], + $3::int2[], + $4::text[], + $5::text[], + $6::text[], + $7::text[], + $8::int2[], + $9::text[], + $10::text[], + $11::text[] + ) + ON CONFLICT DO NOTHING + `; + + try { + const params = g5Columns.map((column) => { + return rows.map((row) => { + if (column === 'record_source') { + return 'CRICS'; + } + if (column === 'current_status') { + // current_status in G5 is int2 + if (row.current_status === 'on') return 1; + if (row.current_status === 'off') return 2; + return 0; + } + return row[column] ?? null; + }); + }); + + await this.pool.query(statement, params); + } catch (error) { + logger.error('G5 Database insert failed', { + error: error?.message, + schema, + table, + rowsLength: rows.length + }); + throw error; + } + } + + async checkConnection() { + if (!this.pool) return true; // Pretend it's ok if disabled + let client; + try { + const connectPromise = this.pool.connect(); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error('Connection timeout')), 5000); + }); + try { + client = await Promise.race([connectPromise, timeoutPromise]); + } catch (raceError) { + connectPromise.then(c => c.release()).catch(() => { }); + throw raceError; + } + await client.query('SELECT 1'); + return true; + } catch (err) { + logger.error('G5 Database check connection failed', { error: err.message }); + return false; + } finally { + if (client) { + client.release(); + } + } + } + + async close() { + if (this.pool) { + await this.pool.end(); + } + } +} + +const g5DbManager = new G5DatabaseManager(config.g5db); +export default g5DbManager; diff --git a/bls-onoffline-backend/src/index.js b/bls-onoffline-backend/src/index.js index 5259900..e060710 100644 --- a/bls-onoffline-backend/src/index.js +++ b/bls-onoffline-backend/src/index.js @@ -1,6 +1,7 @@ import cron from 'node-cron'; import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; +import g5DbManager from './db/g5DatabaseManager.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { parseMessageToRows } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; @@ -53,7 +54,7 @@ const bootstrap = async () => { const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`; console.log(report); logger.info(report); - + try { await redisIntegration.info('Minute Metrics', metrics); } catch (err) { @@ -106,7 +107,7 @@ const bootstrap = async () => { const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); const commitOnAttempt = config.kafka.commitOnAttempt === true; - + const batchStates = new Map(); const partitionKeyFromMessage = (message) => { @@ -175,7 +176,16 @@ const bootstrap = async () => { const startedAt = Date.now(); while (true) { try { - await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + const promises = [ + dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }) + ]; + if (config.g5db.enabled) { + promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { + logger.error('G5 Database insert failed but non-blocking', { error: e.message }); + })); + } + await Promise.all(promises); + metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); return; @@ -196,7 +206,15 @@ const bootstrap = async () => { const insertRowsOnce = async (rows) => { const startedAt = Date.now(); - await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); + const promises = [ + dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }) + ]; + if (config.g5db.enabled) { + promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { + logger.error('G5 Database insert failed in insertOnce', { error: e.message }); + })); + } + await Promise.all(promises); metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); }; @@ -329,7 +347,7 @@ const bootstrap = async () => { for (const item of unresolvedItems) { try { await handleError(err, item.message); - } catch {} + } catch { } item.resolve(); } } @@ -355,7 +373,7 @@ const bootstrap = async () => { metricCollector.increment('kafka_pulled'); metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1); } - + // const messageValue = Buffer.isBuffer(message.value) // ? message.value.toString('utf8') // : message.value; @@ -371,7 +389,7 @@ const bootstrap = async () => { // value: config.kafka.logMessages ? messageValue : undefined, // valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null // }; - + // logger.info('Kafka message received', logDetails); const partitionKey = partitionKeyFromMessage(message); @@ -414,7 +432,7 @@ const bootstrap = async () => { // Graceful Shutdown Logic const shutdown = async (signal) => { logger.info(`Received ${signal}, shutting down...`); - + try { // 1. Close Kafka Consumer if (consumers && consumers.length > 0) { @@ -429,8 +447,9 @@ const bootstrap = async () => { await redisClient.quit(); logger.info('Redis client closed'); - // 4. Close Database Pool + // 4. Close Database Pools await dbManager.close(); + await g5DbManager.close(); logger.info('Database connection closed'); process.exit(0); diff --git a/docs/onoffline_record_g5.sql b/docs/onoffline_record_g5.sql new file mode 100644 index 0000000..fa51fb8 --- /dev/null +++ b/docs/onoffline_record_g5.sql @@ -0,0 +1,42 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : onoffline + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 10/03/2026 17:23:24 +*/ + + +-- ---------------------------- +-- Table structure for onoffline_record_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "onoffline"."onoffline_record_g5"; +CREATE TABLE "onoffline"."onoffline_record_g5" ( + "guid" int4 NOT NULL DEFAULT nextval('"onoffline".onoffline_record_g5_guid_seq'::regclass), + "ts_ms" int8 NOT NULL, + "write_ts_ms" int8 NOT NULL, + "hotel_id" int2 NOT NULL, + "mac" varchar(21) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "room_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL, + "ip" varchar(25) COLLATE "pg_catalog"."default", + "current_status" int2 NOT NULL DEFAULT 0, + "launcher_version" varchar(255) COLLATE "pg_catalog"."default", + "reboot_reason" varchar(255) COLLATE "pg_catalog"."default", + "record_source" varchar(50) COLLATE "pg_catalog"."default" +) +; + +-- ---------------------------- +-- Primary Key structure for table onoffline_record_g5 +-- ---------------------------- +ALTER TABLE "onoffline"."onoffline_record_g5" ADD CONSTRAINT "onoffline_record_g5_pkey" PRIMARY KEY ("ts_ms", "guid");