diff --git a/bls-rcu-action-backend/.env b/bls-rcu-action-backend/.env index a4dfe14..c05b01b 100644 --- a/bls-rcu-action-backend/.env +++ b/bls-rcu-action-backend/.env @@ -36,6 +36,9 @@ POSTGRES_DATABASE_G5=log_platform POSTGRES_USER_G5=log_admin POSTGRES_PASSWORD_G5=H3IkLUt8K!x POSTGRES_IDLE_TIMEOUT_MS_G5=30000 +ROOM_STATUS_DB_SCHEMA_G5=room_status +ROOM_STATUS_DB_TABLE_G5=room_status_moment_g5 +ROOM_STATUS_DB_MAX_CONNECTIONS_G5=5 PORT=3001 LOG_LEVEL=info diff --git a/bls-rcu-action-backend/.env.example b/bls-rcu-action-backend/.env.example index 63f0f17..bdc9736 100644 --- a/bls-rcu-action-backend/.env.example +++ b/bls-rcu-action-backend/.env.example @@ -40,4 +40,15 @@ REDIS_API_BASE_URL=http://localhost:3000 # ROOM_STATUS_DB_SCHEMA=room_status # ROOM_STATUS_DB_TABLE=room_status_moment +# G5 Room Status DB Configuration (optional) +# ENABLE_G5_SYNC=true +# POSTGRES_HOST_G5=10.8.8.80 +# POSTGRES_PORT_G5=5434 +# POSTGRES_DATABASE_G5=log_platform +# POSTGRES_USER_G5=log_admin +# POSTGRES_PASSWORD_G5=your-password +# ROOM_STATUS_DB_SCHEMA_G5=room_status +# ROOM_STATUS_DB_TABLE_G5=room_status_moment_g5 +# ROOM_STATUS_DB_MAX_CONNECTIONS_G5=5 + ENABLE_LOOP_NAME_AUTO_GENERATION=true diff --git a/bls-rcu-action-backend/src/config/config.js b/bls-rcu-action-backend/src/config/config.js index 34b057c..fab58a4 100644 --- a/bls-rcu-action-backend/src/config/config.js +++ b/bls-rcu-action-backend/src/config/config.js @@ -85,10 +85,10 @@ export const config = { user: process.env.POSTGRES_USER_G5, password: process.env.POSTGRES_PASSWORD_G5, database: process.env.POSTGRES_DATABASE_G5, - max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5), + max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS_G5 || process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5), ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined, - schema: 'room_status', - table: 'room_status_moment' + schema: process.env.ROOM_STATUS_DB_SCHEMA_G5 || 'room_status', + table: process.env.ROOM_STATUS_DB_TABLE_G5 || 'room_status_moment_g5' }, enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true' }; diff --git a/bls-rcu-action-backend/src/db/batchProcessor.js b/bls-rcu-action-backend/src/db/batchProcessor.js index dd69fdc..fbacb70 100644 --- a/bls-rcu-action-backend/src/db/batchProcessor.js +++ b/bls-rcu-action-backend/src/db/batchProcessor.js @@ -1,3 +1,5 @@ +import { logger } from '../utils/logger.js'; + export class BatchProcessor { constructor(dbManager, config, options = {}) { this.dbManager = dbManager; @@ -6,6 +8,7 @@ export class BatchProcessor { this.dbConfig = options.dbConfig || config.db; this.batchSize = options.batchSize || 500; this.flushInterval = options.flushInterval || 1000; + this.targetName = options.targetName || 'action_db'; this.buffer = []; this.timer = null; } @@ -48,6 +51,11 @@ export class BatchProcessor { omitGuid: this.omitGuid }); + logger.info('Action batch flushed successfully', { + target: this.targetName, + rows: allRows.length + }); + // Resolve each item with its own row count currentBatch.forEach(item => item.resolve(item.rows.length)); } catch (error) { diff --git a/bls-rcu-action-backend/src/db/databaseManager.js b/bls-rcu-action-backend/src/db/databaseManager.js index ec1deb9..59e2896 100644 --- a/bls-rcu-action-backend/src/db/databaseManager.js +++ b/bls-rcu-action-backend/src/db/databaseManager.js @@ -1,6 +1,5 @@ import pg from 'pg'; import { config } from '../config/config.js'; -import { logger } from '../utils/logger.js'; const { Pool } = pg; @@ -60,17 +59,7 @@ export class DatabaseManager { return `(${params.join(', ')})`; }); const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`; - try { - await this.pool.query(statement, values); - } catch (error) { - logger.error('Database insert failed', { - error: error?.message, - schema, - table, - rowsLength: rows.length - }); - throw error; - } + await this.pool.query(statement, values); } async testConnection() { diff --git a/bls-rcu-action-backend/src/db/roomStatusManager.js b/bls-rcu-action-backend/src/db/roomStatusManager.js index 36b3482..9784d83 100644 --- a/bls-rcu-action-backend/src/db/roomStatusManager.js +++ b/bls-rcu-action-backend/src/db/roomStatusManager.js @@ -52,7 +52,8 @@ export class RoomStatusManager { for (let i = 0; i < rows.length; i++) { const row = rows[i]; - const offset = i * (this.omitGuid ? 7 : 8); + const paramsPerRow = this.omitGuid ? 8 : 9; + const offset = i * paramsPerRow; if (this.omitGuid) { values.push( diff --git a/bls-rcu-action-backend/src/db/statusBatchProcessor.js b/bls-rcu-action-backend/src/db/statusBatchProcessor.js index e780432..73a1705 100644 --- a/bls-rcu-action-backend/src/db/statusBatchProcessor.js +++ b/bls-rcu-action-backend/src/db/statusBatchProcessor.js @@ -23,6 +23,7 @@ export class StatusBatchProcessor { this.maxBufferSize = options.maxBufferSize || 200; this.redisIntegration = options.redisIntegration || null; this.dedupeByRoom = options.dedupeByRoom || false; + this.targetName = options.targetName || 'room_status'; /** @type {Map} compositeKey -> mergedState */ this.buffer = new Map(); @@ -106,11 +107,16 @@ export class StatusBatchProcessor { this.buffer.clear(); try { - logger.info('StatusBatchProcessor flushing rows', { count: rows.length, sampleRowKeys: rows.map(r => r.device_id).slice(0, 5) }); + logger.info('StatusBatchProcessor flushing rows', { + target: this.targetName, + count: rows.length, + sampleRowKeys: rows.map(r => r.device_id).slice(0, 5) + }); await this.roomStatusManager.upsertBatch(rows); - logger.info('StatusBatchProcessor flushed successfully', { count: rows.length }); + logger.info('StatusBatchProcessor flushed successfully', { target: this.targetName, count: rows.length }); } catch (error) { logger.error('StatusBatchProcessor flush failed', { + target: this.targetName, error: error?.message, stack: error?.stack, count: rows.length @@ -121,6 +127,7 @@ export class StatusBatchProcessor { try { await this.redisIntegration.error('StatusBatchProcessor flush failed', { module: 'room_status', + target: this.targetName, count: rows.length, stack: error?.message }); diff --git a/bls-rcu-action-backend/src/index.js b/bls-rcu-action-backend/src/index.js index 9818568..b653505 100644 --- a/bls-rcu-action-backend/src/index.js +++ b/bls-rcu-action-backend/src/index.js @@ -34,6 +34,7 @@ const bootstrap = async () => { const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, { flushInterval: 500, maxBufferSize: 200, + targetName: 'g4:room_status.room_status_moment', redisIntegration }); @@ -44,7 +45,7 @@ const bootstrap = async () => { statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, { flushInterval: 500, maxBufferSize: 200, - dedupeByRoom: true, + targetName: `g5:${config.roomStatusDbG5.schema}.${config.roomStatusDbG5.table}`, redisIntegration }); } @@ -68,6 +69,7 @@ const bootstrap = async () => { const batchProcessorG4 = new BatchProcessor(dbManager, config, { batchSize: config.kafka.maxInFlight, + targetName: 'g4:rcu_action.rcu_action_events', dbConfig: config.db }); @@ -78,6 +80,7 @@ const bootstrap = async () => { batchProcessorG5 = new BatchProcessor(dbManagerG5, config, { batchSize: config.kafka.maxInFlight, omitGuid: true, + targetName: 'g5:rcu_action.rcu_action_events_g5', dbConfig: config.dbG5 }); } @@ -93,38 +96,9 @@ const bootstrap = async () => { const messageKey = Buffer.isBuffer(message.key) ? message.key.toString('utf8') : message.key; - if (config.kafka.logMessages) { - logger.info('Kafka message received', { - topic: message.topic, - partition: message.partition, - offset: message.offset, - key: messageKey, - value: messageValue - }); - } else { - logger.info('Kafka message received', { - topic: message.topic, - partition: message.partition, - offset: message.offset, - key: messageKey, - valueLength: typeof messageValue === 'string' ? messageValue.length : null - }); - } const { rows, payload } = await processKafkaMessage({ message }); - let inserted = 0; - const dbActions = []; - if (config.db.enabled) { - dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); - } - if (batchProcessorG5 && config.dbG5.enabled) { - dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); - } - await Promise.all(dbActions); - - metricCollector.increment('db_inserted'); - - // Fire-and-forget: extract status and push to StatusBatchProcessor + // Status sync must be independent from action-event DB write failures. try { const statusUpdate = extractStatusUpdate(payload); if (statusUpdate) { @@ -139,7 +113,41 @@ const bootstrap = async () => { logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message }); } - logger.info('Kafka message processed', { inserted }); + let inserted = 0; + const dbActions = []; + if (config.db.enabled) { + dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); + } + if (batchProcessorG5 && config.dbG5.enabled) { + dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); })); + } + + const dbResults = await Promise.allSettled(dbActions); + const failedDbResults = dbResults.filter(r => r.status === 'rejected'); + + if (failedDbResults.length > 0) { + metricCollector.increment('db_failed'); + failedDbResults.forEach((result) => { + const err = result.reason; + logger.warn('Action event insert failed and skipped (non-blocking)', { + error: err?.message, + type: err?.type, + dbContext: err?.dbContext + }); + }); + } + + if (dbResults.some(r => r.status === 'fulfilled')) { + metricCollector.increment('db_inserted'); + } + + logger.info('Kafka message processed', { + inserted, + statusTargets: { + g4: config.roomStatusDb.enabled, + g5: Boolean(statusBatchProcessorG5 && config.roomStatusDbG5.enabled) + } + }); } catch (error) { if (error.type === 'PARSE_ERROR') { metricCollector.increment('parse_error');