feat: 添加 G5 房间状态数据库配置,更新环境变量,优化数据库连接逻辑,增强日志记录

This commit is contained in:
2026-03-14 17:32:59 +08:00
parent 7fab70ec2b
commit c5acfbf47b
8 changed files with 77 additions and 50 deletions

View File

@@ -36,6 +36,9 @@ POSTGRES_DATABASE_G5=log_platform
POSTGRES_USER_G5=log_admin POSTGRES_USER_G5=log_admin
POSTGRES_PASSWORD_G5=H3IkLUt8K!x POSTGRES_PASSWORD_G5=H3IkLUt8K!x
POSTGRES_IDLE_TIMEOUT_MS_G5=30000 POSTGRES_IDLE_TIMEOUT_MS_G5=30000
ROOM_STATUS_DB_SCHEMA_G5=room_status
ROOM_STATUS_DB_TABLE_G5=room_status_moment_g5
ROOM_STATUS_DB_MAX_CONNECTIONS_G5=5
PORT=3001 PORT=3001
LOG_LEVEL=info LOG_LEVEL=info

View File

@@ -40,4 +40,15 @@ REDIS_API_BASE_URL=http://localhost:3000
# ROOM_STATUS_DB_SCHEMA=room_status # ROOM_STATUS_DB_SCHEMA=room_status
# ROOM_STATUS_DB_TABLE=room_status_moment # ROOM_STATUS_DB_TABLE=room_status_moment
# G5 Room Status DB Configuration (optional)
# ENABLE_G5_SYNC=true
# POSTGRES_HOST_G5=10.8.8.80
# POSTGRES_PORT_G5=5434
# POSTGRES_DATABASE_G5=log_platform
# POSTGRES_USER_G5=log_admin
# POSTGRES_PASSWORD_G5=your-password
# ROOM_STATUS_DB_SCHEMA_G5=room_status
# ROOM_STATUS_DB_TABLE_G5=room_status_moment_g5
# ROOM_STATUS_DB_MAX_CONNECTIONS_G5=5
ENABLE_LOOP_NAME_AUTO_GENERATION=true ENABLE_LOOP_NAME_AUTO_GENERATION=true

View File

@@ -85,10 +85,10 @@ export const config = {
user: process.env.POSTGRES_USER_G5, user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5, password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5, database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5), max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS_G5 || process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5),
ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined, ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
schema: 'room_status', schema: process.env.ROOM_STATUS_DB_SCHEMA_G5 || 'room_status',
table: 'room_status_moment' table: process.env.ROOM_STATUS_DB_TABLE_G5 || 'room_status_moment_g5'
}, },
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true' enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
}; };

View File

@@ -1,3 +1,5 @@
import { logger } from '../utils/logger.js';
export class BatchProcessor { export class BatchProcessor {
constructor(dbManager, config, options = {}) { constructor(dbManager, config, options = {}) {
this.dbManager = dbManager; this.dbManager = dbManager;
@@ -6,6 +8,7 @@ export class BatchProcessor {
this.dbConfig = options.dbConfig || config.db; this.dbConfig = options.dbConfig || config.db;
this.batchSize = options.batchSize || 500; this.batchSize = options.batchSize || 500;
this.flushInterval = options.flushInterval || 1000; this.flushInterval = options.flushInterval || 1000;
this.targetName = options.targetName || 'action_db';
this.buffer = []; this.buffer = [];
this.timer = null; this.timer = null;
} }
@@ -48,6 +51,11 @@ export class BatchProcessor {
omitGuid: this.omitGuid omitGuid: this.omitGuid
}); });
logger.info('Action batch flushed successfully', {
target: this.targetName,
rows: allRows.length
});
// Resolve each item with its own row count // Resolve each item with its own row count
currentBatch.forEach(item => item.resolve(item.rows.length)); currentBatch.forEach(item => item.resolve(item.rows.length));
} catch (error) { } catch (error) {

View File

@@ -1,6 +1,5 @@
import pg from 'pg'; import pg from 'pg';
import { config } from '../config/config.js'; import { config } from '../config/config.js';
import { logger } from '../utils/logger.js';
const { Pool } = pg; const { Pool } = pg;
@@ -60,17 +59,7 @@ export class DatabaseManager {
return `(${params.join(', ')})`; return `(${params.join(', ')})`;
}); });
const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`; const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`;
try { await this.pool.query(statement, values);
await this.pool.query(statement, values);
} catch (error) {
logger.error('Database insert failed', {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
} }
async testConnection() { async testConnection() {

View File

@@ -52,7 +52,8 @@ export class RoomStatusManager {
for (let i = 0; i < rows.length; i++) { for (let i = 0; i < rows.length; i++) {
const row = rows[i]; const row = rows[i];
const offset = i * (this.omitGuid ? 7 : 8); const paramsPerRow = this.omitGuid ? 8 : 9;
const offset = i * paramsPerRow;
if (this.omitGuid) { if (this.omitGuid) {
values.push( values.push(

View File

@@ -23,6 +23,7 @@ export class StatusBatchProcessor {
this.maxBufferSize = options.maxBufferSize || 200; this.maxBufferSize = options.maxBufferSize || 200;
this.redisIntegration = options.redisIntegration || null; this.redisIntegration = options.redisIntegration || null;
this.dedupeByRoom = options.dedupeByRoom || false; this.dedupeByRoom = options.dedupeByRoom || false;
this.targetName = options.targetName || 'room_status';
/** @type {Map<string, Object>} compositeKey -> mergedState */ /** @type {Map<string, Object>} compositeKey -> mergedState */
this.buffer = new Map(); this.buffer = new Map();
@@ -106,11 +107,16 @@ export class StatusBatchProcessor {
this.buffer.clear(); this.buffer.clear();
try { try {
logger.info('StatusBatchProcessor flushing rows', { count: rows.length, sampleRowKeys: rows.map(r => r.device_id).slice(0, 5) }); logger.info('StatusBatchProcessor flushing rows', {
target: this.targetName,
count: rows.length,
sampleRowKeys: rows.map(r => r.device_id).slice(0, 5)
});
await this.roomStatusManager.upsertBatch(rows); await this.roomStatusManager.upsertBatch(rows);
logger.info('StatusBatchProcessor flushed successfully', { count: rows.length }); logger.info('StatusBatchProcessor flushed successfully', { target: this.targetName, count: rows.length });
} catch (error) { } catch (error) {
logger.error('StatusBatchProcessor flush failed', { logger.error('StatusBatchProcessor flush failed', {
target: this.targetName,
error: error?.message, error: error?.message,
stack: error?.stack, stack: error?.stack,
count: rows.length count: rows.length
@@ -121,6 +127,7 @@ export class StatusBatchProcessor {
try { try {
await this.redisIntegration.error('StatusBatchProcessor flush failed', { await this.redisIntegration.error('StatusBatchProcessor flush failed', {
module: 'room_status', module: 'room_status',
target: this.targetName,
count: rows.length, count: rows.length,
stack: error?.message stack: error?.message
}); });

View File

@@ -34,6 +34,7 @@ const bootstrap = async () => {
const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, { const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, {
flushInterval: 500, flushInterval: 500,
maxBufferSize: 200, maxBufferSize: 200,
targetName: 'g4:room_status.room_status_moment',
redisIntegration redisIntegration
}); });
@@ -44,7 +45,7 @@ const bootstrap = async () => {
statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, { statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, {
flushInterval: 500, flushInterval: 500,
maxBufferSize: 200, maxBufferSize: 200,
dedupeByRoom: true, targetName: `g5:${config.roomStatusDbG5.schema}.${config.roomStatusDbG5.table}`,
redisIntegration redisIntegration
}); });
} }
@@ -68,6 +69,7 @@ const bootstrap = async () => {
const batchProcessorG4 = new BatchProcessor(dbManager, config, { const batchProcessorG4 = new BatchProcessor(dbManager, config, {
batchSize: config.kafka.maxInFlight, batchSize: config.kafka.maxInFlight,
targetName: 'g4:rcu_action.rcu_action_events',
dbConfig: config.db dbConfig: config.db
}); });
@@ -78,6 +80,7 @@ const bootstrap = async () => {
batchProcessorG5 = new BatchProcessor(dbManagerG5, config, { batchProcessorG5 = new BatchProcessor(dbManagerG5, config, {
batchSize: config.kafka.maxInFlight, batchSize: config.kafka.maxInFlight,
omitGuid: true, omitGuid: true,
targetName: 'g5:rcu_action.rcu_action_events_g5',
dbConfig: config.dbG5 dbConfig: config.dbG5
}); });
} }
@@ -93,38 +96,9 @@ const bootstrap = async () => {
const messageKey = Buffer.isBuffer(message.key) const messageKey = Buffer.isBuffer(message.key)
? message.key.toString('utf8') ? message.key.toString('utf8')
: message.key; : message.key;
if (config.kafka.logMessages) {
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
value: messageValue
});
} else {
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
valueLength: typeof messageValue === 'string' ? messageValue.length : null
});
}
const { rows, payload } = await processKafkaMessage({ message }); const { rows, payload } = await processKafkaMessage({ message });
let inserted = 0; // Status sync must be independent from action-event DB write failures.
const dbActions = [];
if (config.db.enabled) {
dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
if (batchProcessorG5 && config.dbG5.enabled) {
dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
await Promise.all(dbActions);
metricCollector.increment('db_inserted');
// Fire-and-forget: extract status and push to StatusBatchProcessor
try { try {
const statusUpdate = extractStatusUpdate(payload); const statusUpdate = extractStatusUpdate(payload);
if (statusUpdate) { if (statusUpdate) {
@@ -139,7 +113,41 @@ const bootstrap = async () => {
logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message }); logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message });
} }
logger.info('Kafka message processed', { inserted }); let inserted = 0;
const dbActions = [];
if (config.db.enabled) {
dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
if (batchProcessorG5 && config.dbG5.enabled) {
dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
}
const dbResults = await Promise.allSettled(dbActions);
const failedDbResults = dbResults.filter(r => r.status === 'rejected');
if (failedDbResults.length > 0) {
metricCollector.increment('db_failed');
failedDbResults.forEach((result) => {
const err = result.reason;
logger.warn('Action event insert failed and skipped (non-blocking)', {
error: err?.message,
type: err?.type,
dbContext: err?.dbContext
});
});
}
if (dbResults.some(r => r.status === 'fulfilled')) {
metricCollector.increment('db_inserted');
}
logger.info('Kafka message processed', {
inserted,
statusTargets: {
g4: config.roomStatusDb.enabled,
g5: Boolean(statusBatchProcessorG5 && config.roomStatusDbG5.enabled)
}
});
} catch (error) { } catch (error) {
if (error.type === 'PARSE_ERROR') { if (error.type === 'PARSE_ERROR') {
metricCollector.increment('parse_error'); metricCollector.increment('parse_error');