feat: 添加 G5 数据库支持,更新配置和文档

This commit is contained in:
2026-03-10 19:52:58 +08:00
parent 156930e6bc
commit 1329eca99e
6 changed files with 222 additions and 9 deletions

View File

@@ -29,6 +29,16 @@ POSTGRES_IDLE_TIMEOUT_MS=30000
DB_SCHEMA=onoffline DB_SCHEMA=onoffline
DB_TABLE=onoffline_record DB_TABLE=onoffline_record
# =========================
# PostgreSQL 配置 G5库专用
# =========================
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_DATABASE_G5=log_platform
POSTGRES_USER_G5=log_admin
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
POSTGRES_IDLE_TIMEOUT_MS_G5=30000
PORT=3001 PORT=3001
LOG_LEVEL=info LOG_LEVEL=info

View File

@@ -35,6 +35,15 @@ Topicblwlog4Nodejs-rcu-onoffline-topic
主键ts_ms, mac, device_id, room_id 主键ts_ms, mac, device_id, room_id
按 ts_ms 每日分区 按 ts_ms 每日分区
G5库结构双写临时接入
库同为log_platform
onoffline_record_g5
差异字段:
- guid 为 int4由库自己生成。
- record_source 固定为 CRICS。
- current_status 为 int2on映射为1off映射为2其余为0。
支持通过环境变量开关双写。
4. 数据处理规则 4. 数据处理规则
非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus 非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus
重启数据reboot_reason 不为空current_status 固定为 on 重启数据reboot_reason 不为空current_status 固定为 on

View File

@@ -49,6 +49,18 @@ export const config = {
schema: process.env.DB_SCHEMA || 'onoffline', schema: process.env.DB_SCHEMA || 'onoffline',
table: process.env.DB_TABLE || 'onoffline_record' table: process.env.DB_TABLE || 'onoffline_record'
}, },
g5db: {
enabled: !!process.env.POSTGRES_HOST_G5,
host: process.env.POSTGRES_HOST_G5,
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 3),
ssl: process.env.POSTGRES_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
schema: process.env.DB_SCHEMA_G5 || 'onoffline',
table: process.env.DB_TABLE_G5 || 'onoffline_record_g5'
},
redis: { redis: {
host: process.env.REDIS_HOST || 'localhost', host: process.env.REDIS_HOST || 'localhost',
port: parseNumber(process.env.REDIS_PORT, 6379), port: parseNumber(process.env.REDIS_PORT, 6379),

View File

@@ -0,0 +1,121 @@
import pg from 'pg';
import { config } from '../config/config.js';
import { logger } from '../utils/logger.js';
const { Pool } = pg;
const g5Columns = [
'ts_ms',
'write_ts_ms',
'hotel_id',
'mac',
'device_id',
'room_id',
'ip',
'current_status',
'launcher_version',
'reboot_reason',
'record_source'
];
export class G5DatabaseManager {
constructor(dbConfig) {
if (!dbConfig.enabled) return;
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
}
async insertRows({ schema, table, rows }) {
if (!this.pool || !rows || rows.length === 0) {
return;
}
const statement = `
INSERT INTO ${schema}.${table} (${g5Columns.join(', ')})
SELECT *
FROM UNNEST(
$1::int8[],
$2::int8[],
$3::int2[],
$4::text[],
$5::text[],
$6::text[],
$7::text[],
$8::int2[],
$9::text[],
$10::text[],
$11::text[]
)
ON CONFLICT DO NOTHING
`;
try {
const params = g5Columns.map((column) => {
return rows.map((row) => {
if (column === 'record_source') {
return 'CRICS';
}
if (column === 'current_status') {
// current_status in G5 is int2
if (row.current_status === 'on') return 1;
if (row.current_status === 'off') return 2;
return 0;
}
return row[column] ?? null;
});
});
await this.pool.query(statement, params);
} catch (error) {
logger.error('G5 Database insert failed', {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async checkConnection() {
if (!this.pool) return true; // Pretend it's ok if disabled
let client;
try {
const connectPromise = this.pool.connect();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Connection timeout')), 5000);
});
try {
client = await Promise.race([connectPromise, timeoutPromise]);
} catch (raceError) {
connectPromise.then(c => c.release()).catch(() => { });
throw raceError;
}
await client.query('SELECT 1');
return true;
} catch (err) {
logger.error('G5 Database check connection failed', { error: err.message });
return false;
} finally {
if (client) {
client.release();
}
}
}
async close() {
if (this.pool) {
await this.pool.end();
}
}
}
const g5DbManager = new G5DatabaseManager(config.g5db);
export default g5DbManager;

View File

@@ -1,6 +1,7 @@
import cron from 'node-cron'; import cron from 'node-cron';
import { config } from './config/config.js'; import { config } from './config/config.js';
import dbManager from './db/databaseManager.js'; import dbManager from './db/databaseManager.js';
import g5DbManager from './db/g5DatabaseManager.js';
import { createKafkaConsumers } from './kafka/consumer.js'; import { createKafkaConsumers } from './kafka/consumer.js';
import { parseMessageToRows } from './processor/index.js'; import { parseMessageToRows } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js'; import { createRedisClient } from './redis/redisClient.js';
@@ -53,7 +54,7 @@ const bootstrap = async () => {
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`; const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
console.log(report); console.log(report);
logger.info(report); logger.info(report);
try { try {
await redisIntegration.info('Minute Metrics', metrics); await redisIntegration.info('Minute Metrics', metrics);
} catch (err) { } catch (err) {
@@ -106,7 +107,7 @@ const bootstrap = async () => {
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight)); const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs); const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
const commitOnAttempt = config.kafka.commitOnAttempt === true; const commitOnAttempt = config.kafka.commitOnAttempt === true;
const batchStates = new Map(); const batchStates = new Map();
const partitionKeyFromMessage = (message) => { const partitionKeyFromMessage = (message) => {
@@ -175,7 +176,16 @@ const bootstrap = async () => {
const startedAt = Date.now(); const startedAt = Date.now();
while (true) { while (true) {
try { try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); const promises = [
dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows })
];
if (config.g5db.enabled) {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed but non-blocking', { error: e.message });
}));
}
await Promise.all(promises);
metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
return; return;
@@ -196,7 +206,15 @@ const bootstrap = async () => {
const insertRowsOnce = async (rows) => { const insertRowsOnce = async (rows) => {
const startedAt = Date.now(); const startedAt = Date.now();
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); const promises = [
dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows })
];
if (config.g5db.enabled) {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed in insertOnce', { error: e.message });
}));
}
await Promise.all(promises);
metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
}; };
@@ -329,7 +347,7 @@ const bootstrap = async () => {
for (const item of unresolvedItems) { for (const item of unresolvedItems) {
try { try {
await handleError(err, item.message); await handleError(err, item.message);
} catch {} } catch { }
item.resolve(); item.resolve();
} }
} }
@@ -355,7 +373,7 @@ const bootstrap = async () => {
metricCollector.increment('kafka_pulled'); metricCollector.increment('kafka_pulled');
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1); metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
} }
// const messageValue = Buffer.isBuffer(message.value) // const messageValue = Buffer.isBuffer(message.value)
// ? message.value.toString('utf8') // ? message.value.toString('utf8')
// : message.value; // : message.value;
@@ -371,7 +389,7 @@ const bootstrap = async () => {
// value: config.kafka.logMessages ? messageValue : undefined, // value: config.kafka.logMessages ? messageValue : undefined,
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null // valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
// }; // };
// logger.info('Kafka message received', logDetails); // logger.info('Kafka message received', logDetails);
const partitionKey = partitionKeyFromMessage(message); const partitionKey = partitionKeyFromMessage(message);
@@ -414,7 +432,7 @@ const bootstrap = async () => {
// Graceful Shutdown Logic // Graceful Shutdown Logic
const shutdown = async (signal) => { const shutdown = async (signal) => {
logger.info(`Received ${signal}, shutting down...`); logger.info(`Received ${signal}, shutting down...`);
try { try {
// 1. Close Kafka Consumer // 1. Close Kafka Consumer
if (consumers && consumers.length > 0) { if (consumers && consumers.length > 0) {
@@ -429,8 +447,9 @@ const bootstrap = async () => {
await redisClient.quit(); await redisClient.quit();
logger.info('Redis client closed'); logger.info('Redis client closed');
// 4. Close Database Pool // 4. Close Database Pools
await dbManager.close(); await dbManager.close();
await g5DbManager.close();
logger.info('Database connection closed'); logger.info('Database connection closed');
process.exit(0); process.exit(0);

View File

@@ -0,0 +1,42 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 80
Source Server Type : PostgreSQL
Source Server Version : 150017 (150017)
Source Host : 10.8.8.80:5434
Source Catalog : log_platform
Source Schema : onoffline
Target Server Type : PostgreSQL
Target Server Version : 150017 (150017)
File Encoding : 65001
Date: 10/03/2026 17:23:24
*/
-- ----------------------------
-- Table structure for onoffline_record_g5
-- ----------------------------
DROP TABLE IF EXISTS "onoffline"."onoffline_record_g5";
CREATE TABLE "onoffline"."onoffline_record_g5" (
"guid" int4 NOT NULL DEFAULT nextval('"onoffline".onoffline_record_g5_guid_seq'::regclass),
"ts_ms" int8 NOT NULL,
"write_ts_ms" int8 NOT NULL,
"hotel_id" int2 NOT NULL,
"mac" varchar(21) COLLATE "pg_catalog"."default" NOT NULL,
"device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
"room_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
"ip" varchar(25) COLLATE "pg_catalog"."default",
"current_status" int2 NOT NULL DEFAULT 0,
"launcher_version" varchar(255) COLLATE "pg_catalog"."default",
"reboot_reason" varchar(255) COLLATE "pg_catalog"."default",
"record_source" varchar(50) COLLATE "pg_catalog"."default"
)
;
-- ----------------------------
-- Primary Key structure for table onoffline_record_g5
-- ----------------------------
ALTER TABLE "onoffline"."onoffline_record_g5" ADD CONSTRAINT "onoffline_record_g5_pkey" PRIMARY KEY ("ts_ms", "guid");