feat: 实现 G5 数据库同步与房间状态聚合逻辑,支持多环境异步双写,优化数据插入与去重逻辑,移除冗余字段

This commit is contained in:
2026-03-10 19:51:52 +08:00
parent f61a63d8c1
commit 7fab70ec2b
13 changed files with 374 additions and 63 deletions

View File

@@ -15,6 +15,10 @@ KAFKA_FETCH_MAX_BYTES=10485760
KAFKA_FETCH_MAX_WAIT_MS=100
KAFKA_FETCH_MIN_BYTES=1
# =========================
# PostgreSQL 配置
# =========================
POSTGRES_HOST=10.8.8.109
POSTGRES_PORT=5433
POSTGRES_DATABASE=log_platform
@@ -23,6 +27,16 @@ POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres!
POSTGRES_MAX_CONNECTIONS=6
POSTGRES_IDLE_TIMEOUT_MS=30000
# =========================
# PostgreSQL 配置 G5库专用
# =========================
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_DATABASE_G5=log_platform
POSTGRES_USER_G5=log_admin
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
POSTGRES_IDLE_TIMEOUT_MS_G5=30000
PORT=3001
LOG_LEVEL=info

View File

@@ -35,6 +35,7 @@ export const config = {
} : undefined
},
db: {
enabled: process.env.ENABLE_G4_SYNC !== 'false',
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
@@ -45,6 +46,18 @@ export const config = {
schema: process.env.DB_SCHEMA || 'rcu_action',
table: process.env.DB_TABLE || 'rcu_action_events'
},
dbG5: {
enabled: process.env.ENABLE_G5_SYNC !== 'false',
host: process.env.POSTGRES_HOST_G5,
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS, 10),
ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
schema: process.env.DB_SCHEMA || 'rcu_action',
table: 'rcu_action_events_g5'
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseNumber(process.env.REDIS_PORT, 6379),
@@ -54,6 +67,7 @@ export const config = {
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
},
roomStatusDb: {
enabled: process.env.ENABLE_G4_SYNC !== 'false',
host: process.env.ROOM_STATUS_DB_HOST || process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
port: parseNumber(process.env.ROOM_STATUS_DB_PORT || process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.ROOM_STATUS_DB_USER || process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
@@ -64,5 +78,17 @@ export const config = {
schema: process.env.ROOM_STATUS_DB_SCHEMA || 'room_status',
table: process.env.ROOM_STATUS_DB_TABLE || 'room_status_moment'
},
roomStatusDbG5: {
enabled: process.env.ENABLE_G5_SYNC !== 'false',
host: process.env.POSTGRES_HOST_G5,
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5),
ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
schema: 'room_status',
table: 'room_status_moment'
},
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
};

View File

@@ -2,6 +2,8 @@ export class BatchProcessor {
constructor(dbManager, config, options = {}) {
this.dbManager = dbManager;
this.config = config;
this.omitGuid = options.omitGuid || false;
this.dbConfig = options.dbConfig || config.db;
this.batchSize = options.batchSize || 500;
this.flushInterval = options.flushInterval || 1000;
this.buffer = [];
@@ -39,12 +41,13 @@ export class BatchProcessor {
}
try {
await this.dbManager.insertRows({
schema: this.config.db.schema,
table: this.config.db.table,
rows: allRows
await this.dbManager.insertRows({
schema: this.dbConfig.schema,
table: this.dbConfig.table,
rows: allRows,
omitGuid: this.omitGuid
});
// Resolve each item with its own row count
currentBatch.forEach(item => item.resolve(item.rows.length));
} catch (error) {
@@ -61,7 +64,7 @@ export class BatchProcessor {
cmd_word: sample.cmd_word
} : null
};
// Reject all items in the batch
currentBatch.forEach(item => item.reject(error));
}

View File

@@ -45,20 +45,21 @@ export class DatabaseManager {
});
}
async insertRows({ schema, table, rows }) {
async insertRows({ schema, table, rows, omitGuid = false }) {
if (!rows || rows.length === 0) {
return;
}
const currentColumns = omitGuid ? columns.filter(c => c !== 'guid') : columns;
const values = [];
const placeholders = rows.map((row, rowIndex) => {
const offset = rowIndex * columns.length;
columns.forEach((column) => {
const offset = rowIndex * currentColumns.length;
currentColumns.forEach((column) => {
values.push(row[column] ?? null);
});
const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
const params = currentColumns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
return `(${params.join(', ')})`;
});
const statement = `INSERT INTO ${schema}.${table} (${columns.join(', ')}) VALUES ${placeholders.join(', ')}`;
const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`;
try {
await this.pool.query(statement, values);
} catch (error) {

View File

@@ -13,8 +13,9 @@ const { Pool } = pg;
export class RoomStatusManager {
/**
* @param {Object} dbConfig - roomStatusDb config from config.js
* @param {Object} [options] - additional configuration like omitGuid
*/
constructor(dbConfig) {
constructor(dbConfig, options = {}) {
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
@@ -27,6 +28,7 @@ export class RoomStatusManager {
this.schema = dbConfig.schema;
this.table = dbConfig.table;
this.fullTableName = `${this.schema}.${this.table}`;
this.omitGuid = options.omitGuid || false;
}
/**
@@ -50,29 +52,53 @@ export class RoomStatusManager {
for (let i = 0; i < rows.length; i++) {
const row = rows[i];
const offset = i * 8; // Changed from 9 to 8
values.push(
row.guid || randomUUID(), // $1
row.ts_ms, // $2
row.hotel_id, // $3
row.room_id, // $4
row.device_id, // $5
row.sys_lock_status, // $6
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 (was $8)
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null // $8 (was $9)
);
const p = (n) => `$${offset + n}`;
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb)`);
const offset = i * (this.omitGuid ? 7 : 8);
if (this.omitGuid) {
values.push(
row.ts_ms, // $1
row.hotel_id, // $2
row.room_id, // $3
row.device_id, // $4
row.sys_lock_status, // $5
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $6
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $7
1 // $8 online_status
);
const p = (n) => `$${offset + n}`;
placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}::jsonb, ${p(7)}::jsonb, ${p(8)})`);
} else {
values.push(
row.guid || randomUUID(), // $1
row.ts_ms, // $2
row.hotel_id, // $3
row.room_id, // $4
row.device_id, // $5
row.sys_lock_status, // $6
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8
1 // $9 online_status
);
const p = (n) => `$${offset + n}`;
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`);
}
}
const insertColumns = this.omitGuid
? 'ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status'
: 'guid, ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status';
const conflictTarget = this.omitGuid
? '(hotel_id, room_id)'
: '(hotel_id, room_id, device_id)';
const sql = `
INSERT INTO ${this.fullTableName} (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, dev_loops, faulty_device_count
) VALUES ${placeholders.join(', ')}
ON CONFLICT (hotel_id, room_id, device_id)
INSERT INTO ${this.fullTableName} (${insertColumns}) VALUES ${placeholders.join(', ')}
ON CONFLICT ${conflictTarget}
DO UPDATE SET
ts_ms = GREATEST(${this.fullTableName}.ts_ms, EXCLUDED.ts_ms),
ts_ms = EXCLUDED.ts_ms,
online_status = 1,
device_id = EXCLUDED.device_id,
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status),
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN ${this.fullTableName}.dev_loops

View File

@@ -22,6 +22,7 @@ export class StatusBatchProcessor {
this.flushInterval = options.flushInterval || 500;
this.maxBufferSize = options.maxBufferSize || 200;
this.redisIntegration = options.redisIntegration || null;
this.dedupeByRoom = options.dedupeByRoom || false;
/** @type {Map<string, Object>} compositeKey -> mergedState */
this.buffer = new Map();
@@ -32,6 +33,9 @@ export class StatusBatchProcessor {
* Build composite key for deduplication.
*/
_key(update) {
if (this.dedupeByRoom) {
return `${update.hotel_id}:${update.room_id}`;
}
return `${update.hotel_id}:${update.room_id}:${update.device_id}`;
}
@@ -49,6 +53,11 @@ export class StatusBatchProcessor {
// Merge: take latest ts_ms
existing.ts_ms = Math.max(existing.ts_ms, update.ts_ms);
// device_id: prefer newer (important for dedupeByRoom feature)
if (update.device_id) {
existing.device_id = update.device_id;
}
// sys_lock_status: prefer newer non-null value
if (update.sys_lock_status != null) {
existing.sys_lock_status = update.sys_lock_status;

View File

@@ -1,6 +1,6 @@
import cron from 'node-cron';
import { config } from './config/config.js';
import dbManager from './db/databaseManager.js';
import dbManager, { DatabaseManager } from './db/databaseManager.js';
import projectMetadata from './cache/projectMetadata.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
@@ -30,12 +30,24 @@ const bootstrap = async () => {
redisIntegration.startHeartbeat();
// 1.2 Initialize Room Status Manager (independent pool for snapshot table)
const roomStatusManager = new RoomStatusManager(config.roomStatusDb);
const statusBatchProcessor = new StatusBatchProcessor(roomStatusManager, {
const roomStatusManagerG4 = new RoomStatusManager(config.roomStatusDb);
const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, {
flushInterval: 500,
maxBufferSize: 200,
redisIntegration
});
let roomStatusManagerG5 = null;
let statusBatchProcessorG5 = null;
if (config.roomStatusDbG5.enabled) {
roomStatusManagerG5 = new RoomStatusManager(config.roomStatusDbG5, { omitGuid: true });
statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, {
flushInterval: 500,
maxBufferSize: 200,
dedupeByRoom: true,
redisIntegration
});
}
logger.info('Room Status sync pipeline initialized');
// 1. Setup Metric Reporting Cron Job (Every minute)
@@ -54,10 +66,22 @@ const bootstrap = async () => {
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
const batchProcessor = new BatchProcessor(dbManager, config, {
batchSize: config.kafka.maxInFlight
const batchProcessorG4 = new BatchProcessor(dbManager, config, {
batchSize: config.kafka.maxInFlight,
dbConfig: config.db
});
let dbManagerG5 = null;
let batchProcessorG5 = null;
if (config.dbG5.enabled) {
dbManagerG5 = new DatabaseManager(config.dbG5);
batchProcessorG5 = new BatchProcessor(dbManagerG5, config, {
batchSize: config.kafka.maxInFlight,
omitGuid: true,
dbConfig: config.dbG5
});
}
const handleMessage = async (message) => {
if (message.topic) {
metricCollector.increment('kafka_pulled');
@@ -87,14 +111,29 @@ const bootstrap = async () => {
});
}
const { rows, payload } = await processKafkaMessage({ message });
const inserted = await batchProcessor.add({ rows });
let inserted = 0;
const dbActions = [];
if (config.db.enabled) {
dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
if (batchProcessorG5 && config.dbG5.enabled) {
dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
await Promise.all(dbActions);
metricCollector.increment('db_inserted');
// Fire-and-forget: extract status and push to StatusBatchProcessor
try {
const statusUpdate = extractStatusUpdate(payload);
if (statusUpdate) {
statusBatchProcessor.add(statusUpdate);
if (config.roomStatusDb.enabled) {
statusBatchProcessorG4.add(statusUpdate);
}
if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled) {
statusBatchProcessorG5.add(statusUpdate);
}
}
} catch (statusErr) {
logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message });
@@ -158,13 +197,20 @@ const bootstrap = async () => {
const healthCheck = {
shouldPause: async (error) => {
if (error?.type === 'DB_ERROR') {
const isConnected = await dbManager.testConnection();
return !isConnected;
const checks = [];
if (config.db.enabled) checks.push(dbManager.testConnection());
if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection());
const results = await Promise.all(checks);
return results.some(res => !res);
}
return false;
},
check: async () => {
return await dbManager.testConnection();
const checks = [];
if (config.db.enabled) checks.push(dbManager.testConnection());
if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection());
const results = await Promise.all(checks);
return results.every(res => res === true);
}
};
@@ -210,16 +256,27 @@ const bootstrap = async () => {
// 4. Flush and close Room Status pipeline
try {
await statusBatchProcessor.flush();
await roomStatusManager.close();
logger.info('Room Status pipeline closed');
if (config.roomStatusDb.enabled) {
await statusBatchProcessorG4.flush();
await roomStatusManagerG4.close();
}
if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled) {
await statusBatchProcessorG5.flush();
await roomStatusManagerG5.close();
}
logger.info('Room Status pipelines closed');
} catch (rsErr) {
logger.error('Error closing Room Status pipeline', { error: rsErr?.message });
}
// 5. Close Database Pool
await dbManager.close();
logger.info('Database connection closed');
// 5. Close Database Pools
if (config.db.enabled) {
await dbManager.close();
}
if (dbManagerG5 && config.dbG5.enabled) {
await dbManagerG5.close();
}
logger.info('Database connections closed');
process.exit(0);
} catch (err) {

View File

@@ -221,7 +221,7 @@ export const buildRowsFromPayload = (rawPayload) => {
direction: normalizedDirection,
cmd_word: normalizedCmdWord,
frame_id: frameId,
udp_raw: udpRaw,
udp_raw: null,
sys_lock_status: sysLockStatus ?? null,
report_count: reportCount ?? null,
fault_count: faultCount ?? null,
@@ -260,10 +260,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Logic 1: 0x36 Status/Fault Report
if (messageType === '36上报') {
const details = {
device_list: deviceList,
fault_list: faultList
};
const details = null;
// Process device status list
if (deviceList.length > 0) {
@@ -318,9 +315,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Logic 2: 0x0F Control Command
if (messageType === '0F下发') {
const details = {
control_list: controlList
};
const details = null;
if (controlList.length > 0) {
controlList.forEach(control => {
@@ -356,7 +351,7 @@ export const buildRowsFromPayload = (rawPayload) => {
else if (messageType === '0FACK') {
const { control_list: controls = [] } = payload;
if (Array.isArray(controls)) {
const details = { control_list: controls };
const details = null;
controls.forEach((control) => {
rows.push({
...commonFields,

View File

@@ -28,14 +28,15 @@ describe('BatchProcessor', () => {
expect(dbManager.insertRows).not.toHaveBeenCalled();
const p3 = batchProcessor.add({ rows: ['r3'] });
// Wait for microtasks
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
omitGuid: false,
rows: ['r1', 'r2', 'r3']
});
@@ -50,7 +51,7 @@ describe('BatchProcessor', () => {
expect(dbManager.insertRows).not.toHaveBeenCalled();
vi.advanceTimersByTime(1000);
// Wait for microtasks
await Promise.resolve();
@@ -58,6 +59,7 @@ describe('BatchProcessor', () => {
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
omitGuid: false,
rows: ['r1']
});
@@ -87,6 +89,7 @@ describe('BatchProcessor', () => {
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
omitGuid: false,
rows: ['r1', 'r2', 'r3']
});

View File

@@ -51,7 +51,7 @@ describe('Processor Logic', () => {
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_addr).toBe(10);
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
expect(rows[0].details).toBeNull();
});
it('should handle 0x36 Fault Report', () => {
@@ -162,7 +162,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].udp_raw).toBe(expectedBase64);
expect(rows[0].udp_raw).toBeNull();
});
it('should keep udp_raw unchanged when input is not hex string', () => {
@@ -178,7 +178,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows[0].udp_raw).toBe('YWJjMTIz');
expect(rows[0].udp_raw).toBeNull();
});
it('should default extra to empty object when not provided', () => {
@@ -273,7 +273,7 @@ describe('Processor Logic - 0x0E Support', () => {
expect(rows[0].dev_addr).toBe(10);
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
expect(rows[0].details).toBeNull();
});
it('should handle 0x0E Fault Report', () => {