From 7fab70ec2b66bf6937c01c53aea6335d0295f95a Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Tue, 10 Mar 2026 19:51:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=20G5=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E5=90=8C=E6=AD=A5=E4=B8=8E=E6=88=BF=E9=97=B4?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E8=81=9A=E5=90=88=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E7=8E=AF=E5=A2=83=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=8F=8C=E5=86=99=EF=BC=8C=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8F=92=E5=85=A5=E4=B8=8E=E5=8E=BB=E9=87=8D=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E7=A7=BB=E9=99=A4=E5=86=97=E4=BD=99=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bls-rcu-action-backend/.env | 14 +++ bls-rcu-action-backend/src/config/config.js | 26 ++++++ .../src/db/batchProcessor.js | 15 ++-- .../src/db/databaseManager.js | 11 +-- .../src/db/roomStatusManager.js | 66 +++++++++----- .../src/db/statusBatchProcessor.js | 9 ++ bls-rcu-action-backend/src/index.js | 89 +++++++++++++++---- bls-rcu-action-backend/src/processor/index.js | 13 +-- .../tests/batch_processor.test.js | 9 +- .../tests/processor.test.js | 8 +- docs/rcu_action_events_g5.sql | 63 +++++++++++++ docs/room_status_moment_g5.sql | 88 ++++++++++++++++++ .../2026-03-10-g5-database-sync/spec.md | 26 ++++++ 13 files changed, 374 insertions(+), 63 deletions(-) create mode 100644 docs/rcu_action_events_g5.sql create mode 100644 docs/room_status_moment_g5.sql create mode 100644 openspec/changes/2026-03-10-g5-database-sync/spec.md diff --git a/bls-rcu-action-backend/.env b/bls-rcu-action-backend/.env index e7503f5..a4dfe14 100644 --- a/bls-rcu-action-backend/.env +++ b/bls-rcu-action-backend/.env @@ -15,6 +15,10 @@ KAFKA_FETCH_MAX_BYTES=10485760 KAFKA_FETCH_MAX_WAIT_MS=100 KAFKA_FETCH_MIN_BYTES=1 + +# ========================= +# PostgreSQL 配置 +# ========================= POSTGRES_HOST=10.8.8.109 POSTGRES_PORT=5433 POSTGRES_DATABASE=log_platform @@ -23,6 +27,16 @@ POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres! POSTGRES_MAX_CONNECTIONS=6 POSTGRES_IDLE_TIMEOUT_MS=30000 +# ========================= +# PostgreSQL 配置 G5库专用 +# ========================= +POSTGRES_HOST_G5=10.8.8.80 +POSTGRES_PORT_G5=5434 +POSTGRES_DATABASE_G5=log_platform +POSTGRES_USER_G5=log_admin +POSTGRES_PASSWORD_G5=H3IkLUt8K!x +POSTGRES_IDLE_TIMEOUT_MS_G5=30000 + PORT=3001 LOG_LEVEL=info diff --git a/bls-rcu-action-backend/src/config/config.js b/bls-rcu-action-backend/src/config/config.js index c1438ec..34b057c 100644 --- a/bls-rcu-action-backend/src/config/config.js +++ b/bls-rcu-action-backend/src/config/config.js @@ -35,6 +35,7 @@ export const config = { } : undefined }, db: { + enabled: process.env.ENABLE_G4_SYNC !== 'false', host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost', port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432), user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres', @@ -45,6 +46,18 @@ export const config = { schema: process.env.DB_SCHEMA || 'rcu_action', table: process.env.DB_TABLE || 'rcu_action_events' }, + dbG5: { + enabled: process.env.ENABLE_G5_SYNC !== 'false', + host: process.env.POSTGRES_HOST_G5, + port: parseNumber(process.env.POSTGRES_PORT_G5, 5434), + user: process.env.POSTGRES_USER_G5, + password: process.env.POSTGRES_PASSWORD_G5, + database: process.env.POSTGRES_DATABASE_G5, + max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS, 10), + ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined, + schema: process.env.DB_SCHEMA || 'rcu_action', + table: 'rcu_action_events_g5' + }, redis: { host: process.env.REDIS_HOST || 'localhost', port: parseNumber(process.env.REDIS_PORT, 6379), @@ -54,6 +67,7 @@ export const config = { apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}` }, roomStatusDb: { + enabled: process.env.ENABLE_G4_SYNC !== 'false', host: process.env.ROOM_STATUS_DB_HOST || process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost', port: parseNumber(process.env.ROOM_STATUS_DB_PORT || process.env.DB_PORT || process.env.POSTGRES_PORT, 5432), user: process.env.ROOM_STATUS_DB_USER || process.env.DB_USER || process.env.POSTGRES_USER || 'postgres', @@ -64,5 +78,17 @@ export const config = { schema: process.env.ROOM_STATUS_DB_SCHEMA || 'room_status', table: process.env.ROOM_STATUS_DB_TABLE || 'room_status_moment' }, + roomStatusDbG5: { + enabled: process.env.ENABLE_G5_SYNC !== 'false', + host: process.env.POSTGRES_HOST_G5, + port: parseNumber(process.env.POSTGRES_PORT_G5, 5434), + user: process.env.POSTGRES_USER_G5, + password: process.env.POSTGRES_PASSWORD_G5, + database: process.env.POSTGRES_DATABASE_G5, + max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5), + ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined, + schema: 'room_status', + table: 'room_status_moment' + }, enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true' }; diff --git a/bls-rcu-action-backend/src/db/batchProcessor.js b/bls-rcu-action-backend/src/db/batchProcessor.js index b6ffed7..dd69fdc 100644 --- a/bls-rcu-action-backend/src/db/batchProcessor.js +++ b/bls-rcu-action-backend/src/db/batchProcessor.js @@ -2,6 +2,8 @@ export class BatchProcessor { constructor(dbManager, config, options = {}) { this.dbManager = dbManager; this.config = config; + this.omitGuid = options.omitGuid || false; + this.dbConfig = options.dbConfig || config.db; this.batchSize = options.batchSize || 500; this.flushInterval = options.flushInterval || 1000; this.buffer = []; @@ -39,12 +41,13 @@ export class BatchProcessor { } try { - await this.dbManager.insertRows({ - schema: this.config.db.schema, - table: this.config.db.table, - rows: allRows + await this.dbManager.insertRows({ + schema: this.dbConfig.schema, + table: this.dbConfig.table, + rows: allRows, + omitGuid: this.omitGuid }); - + // Resolve each item with its own row count currentBatch.forEach(item => item.resolve(item.rows.length)); } catch (error) { @@ -61,7 +64,7 @@ export class BatchProcessor { cmd_word: sample.cmd_word } : null }; - + // Reject all items in the batch currentBatch.forEach(item => item.reject(error)); } diff --git a/bls-rcu-action-backend/src/db/databaseManager.js b/bls-rcu-action-backend/src/db/databaseManager.js index a009220..ec1deb9 100644 --- a/bls-rcu-action-backend/src/db/databaseManager.js +++ b/bls-rcu-action-backend/src/db/databaseManager.js @@ -45,20 +45,21 @@ export class DatabaseManager { }); } - async insertRows({ schema, table, rows }) { + async insertRows({ schema, table, rows, omitGuid = false }) { if (!rows || rows.length === 0) { return; } + const currentColumns = omitGuid ? columns.filter(c => c !== 'guid') : columns; const values = []; const placeholders = rows.map((row, rowIndex) => { - const offset = rowIndex * columns.length; - columns.forEach((column) => { + const offset = rowIndex * currentColumns.length; + currentColumns.forEach((column) => { values.push(row[column] ?? null); }); - const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`); + const params = currentColumns.map((_, columnIndex) => `$${offset + columnIndex + 1}`); return `(${params.join(', ')})`; }); - const statement = `INSERT INTO ${schema}.${table} (${columns.join(', ')}) VALUES ${placeholders.join(', ')}`; + const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`; try { await this.pool.query(statement, values); } catch (error) { diff --git a/bls-rcu-action-backend/src/db/roomStatusManager.js b/bls-rcu-action-backend/src/db/roomStatusManager.js index 97cf7a5..36b3482 100644 --- a/bls-rcu-action-backend/src/db/roomStatusManager.js +++ b/bls-rcu-action-backend/src/db/roomStatusManager.js @@ -13,8 +13,9 @@ const { Pool } = pg; export class RoomStatusManager { /** * @param {Object} dbConfig - roomStatusDb config from config.js + * @param {Object} [options] - additional configuration like omitGuid */ - constructor(dbConfig) { + constructor(dbConfig, options = {}) { this.pool = new Pool({ host: dbConfig.host, port: dbConfig.port, @@ -27,6 +28,7 @@ export class RoomStatusManager { this.schema = dbConfig.schema; this.table = dbConfig.table; this.fullTableName = `${this.schema}.${this.table}`; + this.omitGuid = options.omitGuid || false; } /** @@ -50,29 +52,53 @@ export class RoomStatusManager { for (let i = 0; i < rows.length; i++) { const row = rows[i]; - const offset = i * 8; // Changed from 9 to 8 - values.push( - row.guid || randomUUID(), // $1 - row.ts_ms, // $2 - row.hotel_id, // $3 - row.room_id, // $4 - row.device_id, // $5 - row.sys_lock_status, // $6 - row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 (was $8) - row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null // $8 (was $9) - ); - const p = (n) => `$${offset + n}`; - placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb)`); + const offset = i * (this.omitGuid ? 7 : 8); + + if (this.omitGuid) { + values.push( + row.ts_ms, // $1 + row.hotel_id, // $2 + row.room_id, // $3 + row.device_id, // $4 + row.sys_lock_status, // $5 + row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $6 + row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $7 + 1 // $8 online_status + ); + const p = (n) => `$${offset + n}`; + placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}::jsonb, ${p(7)}::jsonb, ${p(8)})`); + } else { + values.push( + row.guid || randomUUID(), // $1 + row.ts_ms, // $2 + row.hotel_id, // $3 + row.room_id, // $4 + row.device_id, // $5 + row.sys_lock_status, // $6 + row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 + row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8 + 1 // $9 online_status + ); + const p = (n) => `$${offset + n}`; + placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`); + } } + const insertColumns = this.omitGuid + ? 'ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status' + : 'guid, ts_ms, hotel_id, room_id, device_id, sys_lock_status, dev_loops, faulty_device_count, online_status'; + + const conflictTarget = this.omitGuid + ? '(hotel_id, room_id)' + : '(hotel_id, room_id, device_id)'; + const sql = ` - INSERT INTO ${this.fullTableName} ( - guid, ts_ms, hotel_id, room_id, device_id, - sys_lock_status, dev_loops, faulty_device_count - ) VALUES ${placeholders.join(', ')} - ON CONFLICT (hotel_id, room_id, device_id) + INSERT INTO ${this.fullTableName} (${insertColumns}) VALUES ${placeholders.join(', ')} + ON CONFLICT ${conflictTarget} DO UPDATE SET - ts_ms = GREATEST(${this.fullTableName}.ts_ms, EXCLUDED.ts_ms), + ts_ms = EXCLUDED.ts_ms, + online_status = 1, + device_id = EXCLUDED.device_id, sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status), dev_loops = CASE WHEN EXCLUDED.dev_loops IS NULL THEN ${this.fullTableName}.dev_loops diff --git a/bls-rcu-action-backend/src/db/statusBatchProcessor.js b/bls-rcu-action-backend/src/db/statusBatchProcessor.js index 5e12e0f..e780432 100644 --- a/bls-rcu-action-backend/src/db/statusBatchProcessor.js +++ b/bls-rcu-action-backend/src/db/statusBatchProcessor.js @@ -22,6 +22,7 @@ export class StatusBatchProcessor { this.flushInterval = options.flushInterval || 500; this.maxBufferSize = options.maxBufferSize || 200; this.redisIntegration = options.redisIntegration || null; + this.dedupeByRoom = options.dedupeByRoom || false; /** @type {Map} compositeKey -> mergedState */ this.buffer = new Map(); @@ -32,6 +33,9 @@ export class StatusBatchProcessor { * Build composite key for deduplication. */ _key(update) { + if (this.dedupeByRoom) { + return `${update.hotel_id}:${update.room_id}`; + } return `${update.hotel_id}:${update.room_id}:${update.device_id}`; } @@ -49,6 +53,11 @@ export class StatusBatchProcessor { // Merge: take latest ts_ms existing.ts_ms = Math.max(existing.ts_ms, update.ts_ms); + // device_id: prefer newer (important for dedupeByRoom feature) + if (update.device_id) { + existing.device_id = update.device_id; + } + // sys_lock_status: prefer newer non-null value if (update.sys_lock_status != null) { existing.sys_lock_status = update.sys_lock_status; diff --git a/bls-rcu-action-backend/src/index.js b/bls-rcu-action-backend/src/index.js index fbbae9c..9818568 100644 --- a/bls-rcu-action-backend/src/index.js +++ b/bls-rcu-action-backend/src/index.js @@ -1,6 +1,6 @@ import cron from 'node-cron'; import { config } from './config/config.js'; -import dbManager from './db/databaseManager.js'; +import dbManager, { DatabaseManager } from './db/databaseManager.js'; import projectMetadata from './cache/projectMetadata.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { processKafkaMessage } from './processor/index.js'; @@ -30,12 +30,24 @@ const bootstrap = async () => { redisIntegration.startHeartbeat(); // 1.2 Initialize Room Status Manager (independent pool for snapshot table) - const roomStatusManager = new RoomStatusManager(config.roomStatusDb); - const statusBatchProcessor = new StatusBatchProcessor(roomStatusManager, { + const roomStatusManagerG4 = new RoomStatusManager(config.roomStatusDb); + const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, { flushInterval: 500, maxBufferSize: 200, redisIntegration }); + + let roomStatusManagerG5 = null; + let statusBatchProcessorG5 = null; + if (config.roomStatusDbG5.enabled) { + roomStatusManagerG5 = new RoomStatusManager(config.roomStatusDbG5, { omitGuid: true }); + statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, { + flushInterval: 500, + maxBufferSize: 200, + dedupeByRoom: true, + redisIntegration + }); + } logger.info('Room Status sync pipeline initialized'); // 1. Setup Metric Reporting Cron Job (Every minute) @@ -54,10 +66,22 @@ const bootstrap = async () => { const errorQueueKey = buildErrorQueueKey(config.redis.projectName); - const batchProcessor = new BatchProcessor(dbManager, config, { - batchSize: config.kafka.maxInFlight + const batchProcessorG4 = new BatchProcessor(dbManager, config, { + batchSize: config.kafka.maxInFlight, + dbConfig: config.db }); + let dbManagerG5 = null; + let batchProcessorG5 = null; + if (config.dbG5.enabled) { + dbManagerG5 = new DatabaseManager(config.dbG5); + batchProcessorG5 = new BatchProcessor(dbManagerG5, config, { + batchSize: config.kafka.maxInFlight, + omitGuid: true, + dbConfig: config.dbG5 + }); + } + const handleMessage = async (message) => { if (message.topic) { metricCollector.increment('kafka_pulled'); @@ -87,14 +111,29 @@ const bootstrap = async () => { }); } const { rows, payload } = await processKafkaMessage({ message }); - const inserted = await batchProcessor.add({ rows }); + + let inserted = 0; + const dbActions = []; + if (config.db.enabled) { + dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); + } + if (batchProcessorG5 && config.dbG5.enabled) { + dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); + } + await Promise.all(dbActions); + metricCollector.increment('db_inserted'); // Fire-and-forget: extract status and push to StatusBatchProcessor try { const statusUpdate = extractStatusUpdate(payload); if (statusUpdate) { - statusBatchProcessor.add(statusUpdate); + if (config.roomStatusDb.enabled) { + statusBatchProcessorG4.add(statusUpdate); + } + if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled) { + statusBatchProcessorG5.add(statusUpdate); + } } } catch (statusErr) { logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message }); @@ -158,13 +197,20 @@ const bootstrap = async () => { const healthCheck = { shouldPause: async (error) => { if (error?.type === 'DB_ERROR') { - const isConnected = await dbManager.testConnection(); - return !isConnected; + const checks = []; + if (config.db.enabled) checks.push(dbManager.testConnection()); + if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection()); + const results = await Promise.all(checks); + return results.some(res => !res); } return false; }, check: async () => { - return await dbManager.testConnection(); + const checks = []; + if (config.db.enabled) checks.push(dbManager.testConnection()); + if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection()); + const results = await Promise.all(checks); + return results.every(res => res === true); } }; @@ -210,16 +256,27 @@ const bootstrap = async () => { // 4. Flush and close Room Status pipeline try { - await statusBatchProcessor.flush(); - await roomStatusManager.close(); - logger.info('Room Status pipeline closed'); + if (config.roomStatusDb.enabled) { + await statusBatchProcessorG4.flush(); + await roomStatusManagerG4.close(); + } + if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled) { + await statusBatchProcessorG5.flush(); + await roomStatusManagerG5.close(); + } + logger.info('Room Status pipelines closed'); } catch (rsErr) { logger.error('Error closing Room Status pipeline', { error: rsErr?.message }); } - // 5. Close Database Pool - await dbManager.close(); - logger.info('Database connection closed'); + // 5. Close Database Pools + if (config.db.enabled) { + await dbManager.close(); + } + if (dbManagerG5 && config.dbG5.enabled) { + await dbManagerG5.close(); + } + logger.info('Database connections closed'); process.exit(0); } catch (err) { diff --git a/bls-rcu-action-backend/src/processor/index.js b/bls-rcu-action-backend/src/processor/index.js index 617df40..b20f832 100644 --- a/bls-rcu-action-backend/src/processor/index.js +++ b/bls-rcu-action-backend/src/processor/index.js @@ -221,7 +221,7 @@ export const buildRowsFromPayload = (rawPayload) => { direction: normalizedDirection, cmd_word: normalizedCmdWord, frame_id: frameId, - udp_raw: udpRaw, + udp_raw: null, sys_lock_status: sysLockStatus ?? null, report_count: reportCount ?? null, fault_count: faultCount ?? null, @@ -260,10 +260,7 @@ export const buildRowsFromPayload = (rawPayload) => { // Logic 1: 0x36 Status/Fault Report if (messageType === '36上报') { - const details = { - device_list: deviceList, - fault_list: faultList - }; + const details = null; // Process device status list if (deviceList.length > 0) { @@ -318,9 +315,7 @@ export const buildRowsFromPayload = (rawPayload) => { // Logic 2: 0x0F Control Command if (messageType === '0F下发') { - const details = { - control_list: controlList - }; + const details = null; if (controlList.length > 0) { controlList.forEach(control => { @@ -356,7 +351,7 @@ export const buildRowsFromPayload = (rawPayload) => { else if (messageType === '0FACK') { const { control_list: controls = [] } = payload; if (Array.isArray(controls)) { - const details = { control_list: controls }; + const details = null; controls.forEach((control) => { rows.push({ ...commonFields, diff --git a/bls-rcu-action-backend/tests/batch_processor.test.js b/bls-rcu-action-backend/tests/batch_processor.test.js index d828402..b14916b 100644 --- a/bls-rcu-action-backend/tests/batch_processor.test.js +++ b/bls-rcu-action-backend/tests/batch_processor.test.js @@ -28,14 +28,15 @@ describe('BatchProcessor', () => { expect(dbManager.insertRows).not.toHaveBeenCalled(); const p3 = batchProcessor.add({ rows: ['r3'] }); - + // Wait for microtasks await Promise.resolve(); - + expect(dbManager.insertRows).toHaveBeenCalledTimes(1); expect(dbManager.insertRows).toHaveBeenCalledWith({ schema: 'test_schema', table: 'test_table', + omitGuid: false, rows: ['r1', 'r2', 'r3'] }); @@ -50,7 +51,7 @@ describe('BatchProcessor', () => { expect(dbManager.insertRows).not.toHaveBeenCalled(); vi.advanceTimersByTime(1000); - + // Wait for microtasks await Promise.resolve(); @@ -58,6 +59,7 @@ describe('BatchProcessor', () => { expect(dbManager.insertRows).toHaveBeenCalledWith({ schema: 'test_schema', table: 'test_table', + omitGuid: false, rows: ['r1'] }); @@ -87,6 +89,7 @@ describe('BatchProcessor', () => { expect(dbManager.insertRows).toHaveBeenCalledWith({ schema: 'test_schema', table: 'test_table', + omitGuid: false, rows: ['r1', 'r2', 'r3'] }); diff --git a/bls-rcu-action-backend/tests/processor.test.js b/bls-rcu-action-backend/tests/processor.test.js index abd7642..f5ec380 100644 --- a/bls-rcu-action-backend/tests/processor.test.js +++ b/bls-rcu-action-backend/tests/processor.test.js @@ -51,7 +51,7 @@ describe('Processor Logic', () => { expect(rows[0].action_type).toBe('设备回路状态'); expect(rows[0].dev_addr).toBe(10); expect(rows[1].dev_addr).toBe(11); - expect(rows[0].details.device_list).toHaveLength(2); + expect(rows[0].details).toBeNull(); }); it('should handle 0x36 Fault Report', () => { @@ -162,7 +162,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].udp_raw).toBe(expectedBase64); + expect(rows[0].udp_raw).toBeNull(); }); it('should keep udp_raw unchanged when input is not hex string', () => { @@ -178,7 +178,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); - expect(rows[0].udp_raw).toBe('YWJjMTIz'); + expect(rows[0].udp_raw).toBeNull(); }); it('should default extra to empty object when not provided', () => { @@ -273,7 +273,7 @@ describe('Processor Logic - 0x0E Support', () => { expect(rows[0].dev_addr).toBe(10); expect(rows[0].cmd_word).toBe('0x0e'); // Normalized expect(rows[1].dev_addr).toBe(11); - expect(rows[0].details.device_list).toHaveLength(2); + expect(rows[0].details).toBeNull(); }); it('should handle 0x0E Fault Report', () => { diff --git a/docs/rcu_action_events_g5.sql b/docs/rcu_action_events_g5.sql new file mode 100644 index 0000000..5c7603d --- /dev/null +++ b/docs/rcu_action_events_g5.sql @@ -0,0 +1,63 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : rcu_action + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 10/03/2026 16:01:04 +*/ + + +-- ---------------------------- +-- Table structure for rcu_action_events_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "rcu_action"."rcu_action_events_g5"; +CREATE TABLE "rcu_action"."rcu_action_events_g5" ( + "guid" int4 NOT NULL DEFAULT nextval('"rcu_action".rcu_action_events_g5_guid_seq1'::regclass), + "ts_ms" int8 NOT NULL, + "write_ts_ms" int8 NOT NULL, + "hotel_id" int4 NOT NULL, + "room_id" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "device_id" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "direction" varchar(10) COLLATE "pg_catalog"."default" NOT NULL, + "cmd_word" varchar(10) COLLATE "pg_catalog"."default" NOT NULL, + "frame_id" int4 NOT NULL, + "udp_raw" text COLLATE "pg_catalog"."default" NOT NULL, + "action_type" varchar(20) COLLATE "pg_catalog"."default" NOT NULL, + "sys_lock_status" int2, + "report_count" int2, + "dev_type" int2, + "dev_addr" int2, + "dev_loop" int4, + "dev_data" int4, + "fault_count" int2, + "error_type" int2, + "error_data" int2, + "type_l" int2, + "type_h" int2, + "details" jsonb, + "extra" jsonb, + "loop_name" varchar(255) COLLATE "pg_catalog"."default" +) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Indexes structure for table rcu_action_events_g5 +-- ---------------------------- +CREATE INDEX "idx_g5_prod_ts_ms" ON "rcu_action"."rcu_action_events_g5" USING btree ( + "ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST +); + +-- ---------------------------- +-- Primary Key structure for table rcu_action_events_g5 +-- ---------------------------- +ALTER TABLE "rcu_action"."rcu_action_events_g5" ADD CONSTRAINT "rcu_action_events_g5_pkey" PRIMARY KEY ("ts_ms", "guid"); diff --git a/docs/room_status_moment_g5.sql b/docs/room_status_moment_g5.sql new file mode 100644 index 0000000..06d8d0d --- /dev/null +++ b/docs/room_status_moment_g5.sql @@ -0,0 +1,88 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : room_status + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 10/03/2026 10:32:13 +*/ + + +-- ---------------------------- +-- Table structure for room_status_moment_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "room_status"."room_status_moment_g5"; +CREATE TABLE "room_status"."room_status_moment_g5" ( + "hotel_id" int2 NOT NULL, + "room_id" text COLLATE "pg_catalog"."default" NOT NULL, + "device_id" text COLLATE "pg_catalog"."default" NOT NULL, + "ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "sys_lock_status" int2, + "online_status" int2, + "launcher_version" text COLLATE "pg_catalog"."default", + "app_version" text COLLATE "pg_catalog"."default", + "config_version" text COLLATE "pg_catalog"."default", + "register_ts_ms" int8, + "upgrade_ts_ms" int8, + "config_ts_ms" int8, + "ip" text COLLATE "pg_catalog"."default", + "pms_status" int2, + "power_state" int2, + "cardless_state" int2, + "service_mask" int8, + "insert_card" int2, + "bright_g" int2, + "agreement_ver" text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "air_state" _int2, + "air_model" _int2, + "air_speed" _int2, + "air_set_temp" _int2, + "air_now_temp" _int2, + "air_solenoid_valve" _int2, + "elec_address" _text COLLATE "pg_catalog"."default", + "elec_voltage" _float8, + "elec_ampere" _float8, + "elec_power" _float8, + "elec_phase" _float8, + "elec_energy" _float8, + "elec_sum_energy" _float8, + "carbon_state" int2, + "dev_loops" jsonb, + "energy_carbon_sum" float8, + "energy_nocard_sum" float8, + "external_device" jsonb DEFAULT '{}'::jsonb, + "faulty_device_count" jsonb DEFAULT '{}'::jsonb +) +WITH (fillfactor=90) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Indexes structure for table room_status_moment_g5 +-- ---------------------------- +CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "online_status" "pg_catalog"."int2_ops" ASC NULLS LAST, + "power_state" "pg_catalog"."int2_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Triggers structure for table room_status_moment_g5 +-- ---------------------------- +CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5" +FOR EACH ROW +EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"(); + +-- ---------------------------- +-- Primary Key structure for table room_status_moment_g5 +-- ---------------------------- +ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id"); diff --git a/openspec/changes/2026-03-10-g5-database-sync/spec.md b/openspec/changes/2026-03-10-g5-database-sync/spec.md new file mode 100644 index 0000000..cf06263 --- /dev/null +++ b/openspec/changes/2026-03-10-g5-database-sync/spec.md @@ -0,0 +1,26 @@ +# G5 Database Sync and Room Status Aggregation Logic + +## openspec-proposal +- **生态包推荐与选型**:本次需求纯粹为数据库多环境异步双写及聚合同步架构升级。我们保持着**不重复造轮子**的极简最佳实践,未引入新的冗余 npm 依赖。坚持复用项目中现有的优秀驱动包 `pg`(node-postgres)作为核心驱动来应对大量行级的高并发插入与聚合。 +- **降级与扩展策略**:对现有的 `BatchProcessor` 和 `StatusBatchProcessor` 进行原生构造拆解重写,拓展 `options`(如 `omitGuid`, `dedupeByRoom` 等变量配置),以最小侵入业务的方式分离实例对象,挂载 G5 双写。 + +## openspec-apply + +针对 `Node.js` (V8) 和异步并发模型,完成生成和执行如下实施与扩展细则: + +### 1. 异步非阻塞的 Dual-Write 设计方案 +- **Event Loop 友好型并行**:在执行向 G4 和 G5 环境多库分发数据时,彻底摒弃 `await g4_write; await g5_write` 的顺序堵塞请求堆栈。采用纯异步的 `Promise.all([dbActions])` 把宏任务交由事件循环底层的 I/O 线程池并行发送所有的 `INSERT 事务报文`,将两座异地数据库的返回时延并集。 +- **配置隔离熔断法**:依托 `.env` 实现对每一向数据库独立挂载的生命周期(如动态通过 `ENABLE_G4_SYNC` 或 `ENABLE_G5_SYNC` 处理)。任一数据库挂掉带来的错误 `PROMISE_REJECTED` 在进程捕获后只会独立封停,不会引发系统阻塞或导致另一健康的数据库断供。 + +### 2. G5 字段 `omitGuid` 的剥离控制与瘦身落地 +- G5 中 `rcu_action_events_g5` 表里的 `guid` 基于 `int4` `nextval`。这与原本生成的 `uuid` 类型存在截然冲突。 +- **具体落地**:当连接器池检测到 `omitGuid = true`(G5 模式打开),底层的 SQL `INSERT INTO (${columns})` 将会动态清洗掉 `guid` 和所绑定的 `$n` 占位符。把主键控制权彻底返还给 PostgreSQL 内部机制,Node 层专注传输,达到结构和时效的最简合并。 + +### 3. Room Status: 房间聚合锁与 Upsert (ON CONFLICT DO UPDATE) +- **去重逻辑优化与 `dedupeByRoom`**:针对 `room_status_moment_g5` 中限定 `(hotel_id, room_id)` 是唯一的联合主键特质,如果仍然通过旧结构缓存,会产生频繁覆盖导致的 JSON 散件丢失。本次通过 Node 层注入 `dedupeByRoom` 拦截校验法:缓冲器使用 `${hotel_id}:${room_id}` 的更底层维度合并哈希。同一房间内设备的任何 JSONB 数组比如 `dev_loops` 都先进行底层 `||` 合并及时间对比去重,然后一次性下发大包。 +- **SQL 更新映射**:由于 `device_id` 不再具有唯一性维度,我们执行基于 PostgreSQL 的合并操作:在冲突 `EXCLUDED` 对象中强制替换最新上报设备 `$device_id = EXCLUDED.device_id`。并且彻底抽去了 Node 更新时间戳的过程,让权给表上的 `AFTER/BEFORE UPDATE` 原生触发器处理 `ts_ms`。符合强数据库型开发的完美最佳实践。 + +### 4. 废弃冗余数据占用 (`udp_raw` & `details`) +由于上游报文及 JSON 持久化逻辑逐步升级,对于 G4 以及后续加入的 G5 库,在映射中全量废除了直接对 `udp_raw` 回追文本和对 `details` JSONB 的业务数据的多余存储。 +- **具体拦截处理**:我们在 `processor/index.js` `commonFields` 中将 `udp_raw` 强赋值为 `null`;同理对于所有的 `payload` 解析结果,`details` 也改为了强置空(`null`)。相应的单元测试套件已被更新修改以校验拦截有效性。 +- **目的**:通过这两项高频臃肿字典的精减裁剪,可极大缩小单行落盘宽表的体积,减轻 PostgreSQL 序列化负担及提升 UPSERT 时延,而数据库中的字段保留以用于结构向后兼容。