feat: 实现房间状态同步功能

- 新增 RoomStatusManager 类,负责管理房间状态快照表的数据库连接池及批量 Upsert 操作。
- 新增 StatusBatchProcessor 类,负责收集和合并房间状态更新,并定期将其写入数据库。
- 新增状态提取器 statusExtractor.js,从 Kafka 消息中提取并构建房间状态更新对象。
- 修改 index.js,初始化 RoomStatusManager 和 StatusBatchProcessor,并在 Kafka 消息处理流程中并行推送状态更新。
- 修改 processor/index.js,更新 processKafkaMessage 函数以支持状态提取和处理。
- 更新 kafkaPayload.js,修正 control_list 的提取逻辑,兼容 Kafka 实际传输中的 loop 字段。
- 添加状态批处理器和状态提取器的单元测试,确保功能的正确性。
- 更新文档 plan-room-status-sync.md,详细描述房间状态同步方案及字段映射。
This commit is contained in:
2026-03-02 11:47:52 +08:00
parent e76d04f526
commit cf61e8dac6
16 changed files with 1154776 additions and 28 deletions

View File

@@ -30,4 +30,14 @@ REDIS_DB=0
REDIS_PROJECT_NAME=my-project
REDIS_API_BASE_URL=http://localhost:3000
# Room Status DB Configuration (optional, falls back to DB_* values)
# ROOM_STATUS_DB_HOST=localhost
# ROOM_STATUS_DB_PORT=5432
# ROOM_STATUS_DB_USER=postgres
# ROOM_STATUS_DB_PASSWORD=password
# ROOM_STATUS_DB_DATABASE=bls_rcu_action
# ROOM_STATUS_DB_MAX_CONNECTIONS=5
# ROOM_STATUS_DB_SCHEMA=room_status
# ROOM_STATUS_DB_TABLE=room_status_moment
ENABLE_LOOP_NAME_AUTO_GENERATION=true

View File

@@ -0,0 +1,11 @@
import pg from 'pg';
const pool = new pg.Pool({
host: '10.8.8.109', port: 5433, user: 'log_admin',
password: 'YourActualStrongPasswordForPostgres!', database: 'log_platform', max: 1
});
const s = await pool.query("SELECT count(*) as total, count(*) FILTER (WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb) as with_loops, count(*) FILTER (WHERE sys_lock_status IS NOT NULL) as with_lock FROM room_status.room_status_moment");
console.log(JSON.stringify(s.rows[0]));
const r = await pool.query("SELECT hotel_id, room_id, device_id, sys_lock_status, dev_loops FROM room_status.room_status_moment WHERE dev_loops IS NOT NULL AND dev_loops != '{}'::jsonb ORDER BY ts_ms DESC LIMIT 3");
console.log('Samples:', r.rows.length);
for (const row of r.rows) console.log(JSON.stringify(row));
await pool.end();

1153871
bls-rcu-action-backend/dev.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,5 @@
-- Database Initialization Script for BLS RCU Action Server
-- 描述:创建 rcu_action 模式及 rcu_action_events 分区表,用于存储 RCU 通讯日志流水
CREATE SCHEMA IF NOT EXISTS rcu_action;
@@ -47,3 +48,31 @@ CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_e
-- Composite Index for typical query pattern (Hotel + Room + Time)
CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);
-- Column Comments
COMMENT ON TABLE rcu_action.rcu_action_events IS 'RCU 通讯日志流水表 - 存储从 Kafka 消费的 RCU 设备上报/下发/ACK 事件';
COMMENT ON COLUMN rcu_action.rcu_action_events.guid IS '主键32位无横线 UUID';
COMMENT ON COLUMN rcu_action.rcu_action_events.ts_ms IS '日志产生时间戳(毫秒),同时用作分区键';
COMMENT ON COLUMN rcu_action.rcu_action_events.write_ts_ms IS '入库时间戳(毫秒),由后端服务写入时生成';
COMMENT ON COLUMN rcu_action.rcu_action_events.hotel_id IS '酒店 ID';
COMMENT ON COLUMN rcu_action.rcu_action_events.room_id IS '房间 ID';
COMMENT ON COLUMN rcu_action.rcu_action_events.device_id IS 'RCU 设备 ID主板编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.direction IS '数据方向:上报 / 下发';
COMMENT ON COLUMN rcu_action.rcu_action_events.cmd_word IS '命令字,如 0x36状态上报、0x0F控制下发/ACK';
COMMENT ON COLUMN rcu_action.rcu_action_events.frame_id IS '通讯帧号,用于串联同一次通讯的命令与状态';
COMMENT ON COLUMN rcu_action.rcu_action_events.udp_raw IS 'UDP 消息原文base64 编码)';
COMMENT ON COLUMN rcu_action.rcu_action_events.action_type IS '记录行为类型:用户操作 / 设备回路状态 / 下发控制 / 0FACK / 无效';
COMMENT ON COLUMN rcu_action.rcu_action_events.sys_lock_status IS '系统锁状态0=未锁定, 1=锁定(仅 0x36 上报)';
COMMENT ON COLUMN rcu_action.rcu_action_events.report_count IS '本次上报设备数量(对应 device_list 长度)';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_type IS '设备类型编号,拆分自 device_list/fault_list/control_list';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_addr IS '设备地址编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_loop IS '设备回路编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_data IS '设备状态数值(仅 0x36 状态上报)';
COMMENT ON COLUMN rcu_action.rcu_action_events.fault_count IS '本次故障设备数量(对应 fault_list 长度)';
COMMENT ON COLUMN rcu_action.rcu_action_events.error_type IS '故障类型0x01=在线/离线, 0x02=电量, 0x03=电流 等';
COMMENT ON COLUMN rcu_action.rcu_action_events.error_data IS '故障内容数据(含义取决于 error_type';
COMMENT ON COLUMN rcu_action.rcu_action_events.type_l IS '执行方式(仅 0x0F 下发控制)';
COMMENT ON COLUMN rcu_action.rcu_action_events.type_h IS '执行内容(仅 0x0F 下发控制)';
COMMENT ON COLUMN rcu_action.rcu_action_events.details IS '业务详情 JSONB存储完整的 device_list / fault_list / control_list';
COMMENT ON COLUMN rcu_action.rcu_action_events.extra IS '扩展信息 JSONB存储上游传入的附加字段';
COMMENT ON COLUMN rcu_action.rcu_action_events.loop_name IS '回路名称:通过 device_id → room_type_id → loop_address 查询获得';

View File

@@ -53,5 +53,16 @@ export const config = {
projectName: process.env.REDIS_PROJECT_NAME || 'bls-rcu-action',
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
},
roomStatusDb: {
host: process.env.ROOM_STATUS_DB_HOST || process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
port: parseNumber(process.env.ROOM_STATUS_DB_PORT || process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.ROOM_STATUS_DB_USER || process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
password: process.env.ROOM_STATUS_DB_PASSWORD || process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || '',
database: process.env.ROOM_STATUS_DB_DATABASE || process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || 'bls_rcu_action',
max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5),
ssl: process.env.ROOM_STATUS_DB_SSL === 'true' ? { rejectUnauthorized: false } : undefined,
schema: process.env.ROOM_STATUS_DB_SCHEMA || 'room_status',
table: process.env.ROOM_STATUS_DB_TABLE || 'room_status_moment'
},
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
};

View File

@@ -0,0 +1,158 @@
/**
* Room Status Manager
*
* Manages an independent PostgreSQL connection pool for
* the room_status.room_status_moment snapshot table.
* Provides batch upsert with JSONB merge and auto-partition creation.
*/
import pg from 'pg';
import { randomUUID } from 'crypto';
import { logger } from '../utils/logger.js';
const { Pool } = pg;
export class RoomStatusManager {
/**
* @param {Object} dbConfig - roomStatusDb config from config.js
*/
constructor(dbConfig) {
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
this.schema = dbConfig.schema;
this.table = dbConfig.table;
this.fullTableName = `${this.schema}.${this.table}`;
// Track which partitions we have already ensured
this.knownPartitions = new Set();
}
/**
* Batch upsert status rows into room_status_moment.
* Uses ON CONFLICT for atomic merge.
*
* @param {Array<Object>} rows - Array of merged status objects
* Each: { hotel_id, room_id, device_id, ts_ms, sys_lock_status, dev_loops, faulty_device_count }
*/
async upsertBatch(rows) {
if (!rows || rows.length === 0) return;
// Pre-ensure all needed partitions exist before attempting upsert
const newHotelIds = [...new Set(rows.map(r => r.hotel_id))]
.filter(id => !this.knownPartitions.has(id));
if (newHotelIds.length > 0) {
await this._ensurePartitionsBatch(newHotelIds);
}
await this._doUpsert(rows);
}
/**
* Execute the actual upsert SQL for a batch of rows.
*/
async _doUpsert(rows) {
const values = [];
const placeholders = [];
for (let i = 0; i < rows.length; i++) {
const row = rows[i];
const offset = i * 8; // Changed from 9 to 8
values.push(
row.guid || randomUUID(), // $1
row.ts_ms, // $2
row.hotel_id, // $3
row.room_id, // $4
row.device_id, // $5
row.sys_lock_status, // $6
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 (was $8)
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null // $8 (was $9)
);
const p = (n) => `$${offset + n}`;
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb)`);
}
const sql = `
INSERT INTO ${this.fullTableName} (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, dev_loops, faulty_device_count
) VALUES ${placeholders.join(', ')}
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = GREATEST(${this.fullTableName}.ts_ms, EXCLUDED.ts_ms),
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status),
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN ${this.fullTableName}.dev_loops
ELSE COALESCE(${this.fullTableName}.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
END,
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, ${this.fullTableName}.faulty_device_count)
`;
await this.pool.query(sql, values);
}
/**
* Check if an error is a missing partition error.
*/
_isPartitionMissingError(error) {
const msg = error?.message || '';
return msg.includes('no partition') || msg.includes('routing') ||
(error?.code === '23514' && msg.includes('partition'));
}
/**
* Batch-create LIST partitions for multiple hotel_ids in a single connection.
* Uses CREATE TABLE IF NOT EXISTS (idempotent) — no check query needed.
*/
async _ensurePartitionsBatch(hotelIds) {
const client = await this.pool.connect();
try {
for (const hotelId of hotelIds) {
const partitionName = `${this.schema}.${this.table}_h${hotelId}`;
try {
await client.query(
`CREATE TABLE IF NOT EXISTS ${partitionName} PARTITION OF ${this.fullTableName} FOR VALUES IN (${hotelId})`
);
this.knownPartitions.add(hotelId);
} catch (err) {
// Partition may already exist (race condition) — safe to ignore
if (!err.message?.includes('already exists')) {
logger.error('Error creating partition', { error: err?.message, hotelId });
}
this.knownPartitions.add(hotelId);
}
}
if (hotelIds.length > 0) {
logger.info(`Ensured ${hotelIds.length} room_status partitions`);
}
} finally {
client.release();
}
}
/**
* Ensure a LIST partition exists for the given hotel_id (single).
*/
async ensurePartition(hotelId) {
if (this.knownPartitions.has(hotelId)) return;
await this._ensurePartitionsBatch([hotelId]);
}
async testConnection() {
try {
await this.pool.query('SELECT 1');
return true;
} catch {
return false;
}
}
async close() {
await this.pool.end();
}
}

View File

@@ -0,0 +1,133 @@
/**
* Status Batch Processor
*
* Collects status updates for room_status_moment table,
* merges updates for the same device within a batch window,
* and flushes them via RoomStatusManager.
*
* Errors during flush are logged but never thrown, to protect the main pipeline.
*/
import { logger } from '../utils/logger.js';
export class StatusBatchProcessor {
/**
* @param {import('./roomStatusManager.js').RoomStatusManager} roomStatusManager
* @param {Object} options
* @param {number} [options.flushInterval=500] - Flush interval in ms
* @param {number} [options.maxBufferSize=200] - Max items before forced flush
* @param {import('../redis/redisIntegration.js').RedisIntegration} [options.redisIntegration] - For error reporting
*/
constructor(roomStatusManager, options = {}) {
this.roomStatusManager = roomStatusManager;
this.flushInterval = options.flushInterval || 500;
this.maxBufferSize = options.maxBufferSize || 200;
this.redisIntegration = options.redisIntegration || null;
/** @type {Map<string, Object>} compositeKey -> mergedState */
this.buffer = new Map();
this.timer = null;
}
/**
* Build composite key for deduplication.
*/
_key(update) {
return `${update.hotel_id}:${update.room_id}:${update.device_id}`;
}
/**
* Add a status update to the buffer, merging with any existing entry for the same device.
* @param {Object} update - Output from extractStatusUpdate()
*/
add(update) {
if (!update) return;
const key = this._key(update);
const existing = this.buffer.get(key);
if (existing) {
// Merge: take latest ts_ms
existing.ts_ms = Math.max(existing.ts_ms, update.ts_ms);
// sys_lock_status: prefer newer non-null value
if (update.sys_lock_status != null) {
existing.sys_lock_status = update.sys_lock_status;
}
// dev_loops: merge keys (new overwrites old for same key)
if (update.dev_loops) {
existing.dev_loops = existing.dev_loops
? { ...existing.dev_loops, ...update.dev_loops }
: update.dev_loops;
}
// faulty_device_count: full replacement (newer wins)
if (update.faulty_device_count) {
existing.faulty_device_count = update.faulty_device_count;
}
} else {
// Clone to avoid mutation of caller's object
this.buffer.set(key, { ...update });
}
// Check if we should flush
if (this.buffer.size >= this.maxBufferSize && !this.isFlushing) {
this.flush();
} else if (!this.timer && !this.isFlushing) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
/**
* Flush all buffered status updates to the database.
* Errors are caught and logged, never thrown.
*/
async flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.buffer.size === 0) return;
if (this.isFlushing) return;
this.isFlushing = true;
// Snapshot and clear the buffer atomically
const rows = [...this.buffer.values()];
this.buffer.clear();
try {
logger.info('StatusBatchProcessor flushing rows', { count: rows.length, sampleRowKeys: rows.map(r => r.device_id).slice(0, 5) });
await this.roomStatusManager.upsertBatch(rows);
logger.info('StatusBatchProcessor flushed successfully', { count: rows.length });
} catch (error) {
logger.error('StatusBatchProcessor flush failed', {
error: error?.message,
stack: error?.stack,
count: rows.length
});
// Report to Redis console if available
if (this.redisIntegration) {
try {
await this.redisIntegration.error('StatusBatchProcessor flush failed', {
module: 'room_status',
count: rows.length,
stack: error?.message
});
} catch {
// Silently ignore Redis reporting errors
}
}
// IMPORTANT: Do NOT re-throw. This protects the main Kafka pipeline.
} finally {
this.isFlushing = false;
// Catch up if buffer refilled wildly during flush
if (this.buffer.size >= this.maxBufferSize) {
this.flush();
} else if (this.buffer.size > 0 && !this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
}
}

View File

@@ -6,17 +6,20 @@ import partitionManager from './db/partitionManager.js';
import projectMetadata from './cache/projectMetadata.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
import { extractStatusUpdate } from './processor/statusExtractor.js';
import { createRedisClient } from './redis/redisClient.js';
import { RedisIntegration } from './redis/redisIntegration.js';
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
import { MetricCollector } from './utils/metricCollector.js';
import { RoomStatusManager } from './db/roomStatusManager.js';
import { StatusBatchProcessor } from './db/statusBatchProcessor.js';
import { logger } from './utils/logger.js';
import { BatchProcessor } from './db/batchProcessor.js';
const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
await dbInitializer.initialize();
// 0.1 Initialize Project Metadata Cache
await projectMetadata.init();
@@ -44,7 +47,7 @@ const bootstrap = async () => {
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
// If `databaseManager.js` exports an instance as default, I should use that.
// If it exports a class, I should instantiate it.
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
// I need to verify `databaseManager.js`.
@@ -57,13 +60,22 @@ const bootstrap = async () => {
);
redisIntegration.startHeartbeat();
// 1.2 Initialize Room Status Manager (independent pool for snapshot table)
const roomStatusManager = new RoomStatusManager(config.roomStatusDb);
const statusBatchProcessor = new StatusBatchProcessor(roomStatusManager, {
flushInterval: 500,
maxBufferSize: 200,
redisIntegration
});
logger.info('Room Status sync pipeline initialized');
// 1.1 Setup Metric Reporting Cron Job (Every minute)
cron.schedule('* * * * *', async () => {
const metrics = metricCollector.getAndReset();
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`;
console.log(report);
logger.info(report, metrics);
try {
await redisIntegration.info('Minute Metrics', metrics);
} catch (err) {
@@ -105,9 +117,20 @@ const bootstrap = async () => {
valueLength: typeof messageValue === 'string' ? messageValue.length : null
});
}
const rows = await processKafkaMessage({ message });
const { rows, payload } = await processKafkaMessage({ message });
const inserted = await batchProcessor.add({ rows });
metricCollector.increment('db_inserted');
// Fire-and-forget: extract status and push to StatusBatchProcessor
try {
const statusUpdate = extractStatusUpdate(payload);
if (statusUpdate) {
statusBatchProcessor.add(statusUpdate);
}
} catch (statusErr) {
logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message });
}
logger.info('Kafka message processed', { inserted });
} catch (error) {
if (error.type === 'PARSE_ERROR') {
@@ -201,7 +224,7 @@ const bootstrap = async () => {
// Graceful Shutdown Logic
const shutdown = async (signal) => {
logger.info(`Received ${signal}, shutting down...`);
try {
// 1. Close Kafka Consumer
if (consumers && consumers.length > 0) {
@@ -216,7 +239,16 @@ const bootstrap = async () => {
await redisClient.quit();
logger.info('Redis client closed');
// 4. Close Database Pool
// 4. Flush and close Room Status pipeline
try {
await statusBatchProcessor.flush();
await roomStatusManager.close();
logger.info('Room Status pipeline closed');
} catch (rsErr) {
logger.error('Error closing Room Status pipeline', { error: rsErr?.message });
}
// 5. Close Database Pool
await dbManager.close();
logger.info('Database connection closed');

View File

@@ -123,7 +123,7 @@ const getActionTypeByDevType = (devType) => {
// 1. Env override
const rule = devTypeActionRules.find(r => r.dev_type === devType);
if (rule?.action_type) return rule.action_type;
// 2. Default map
const entry = defaultDevTypeActionMap[devType];
return entry?.action || '设备回路状态';
@@ -354,24 +354,24 @@ export const buildRowsFromPayload = (rawPayload) => {
// 3. 0x0F ACK
else if (messageType === '0FACK') {
const { control_list: controls = [] } = payload;
if (Array.isArray(controls)) {
const details = { control_list: controls };
controls.forEach((control) => {
rows.push({
...commonFields,
guid: createGuid(),
dev_type: control.dev_type ?? null,
dev_addr: control.dev_addr ?? null,
dev_loop: control.dev_loop ?? null,
dev_data: control.dev_data ?? null,
type_h: control.type_h ?? null,
action_type: '设备回路状态',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details
});
});
}
const { control_list: controls = [] } = payload;
if (Array.isArray(controls)) {
const details = { control_list: controls };
controls.forEach((control) => {
rows.push({
...commonFields,
guid: createGuid(),
dev_type: control.dev_type ?? null,
dev_addr: control.dev_addr ?? null,
dev_loop: control.dev_loop ?? null,
dev_data: control.dev_data ?? null,
type_h: control.type_h ?? null,
action_type: '设备回路状态',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details
});
});
}
}
return rows;
@@ -380,9 +380,11 @@ export const buildRowsFromPayload = (rawPayload) => {
export const processKafkaMessage = async ({ message }) => {
let rows;
try {
const payload = parseKafkaPayload(message.value);
rows = buildRowsFromPayload(payload);
return rows;
const rawPayload = parseKafkaPayload(message.value);
// Validate through Zod to get normalized fields (arrays defaulted, types coerced)
const validatedPayload = kafkaPayloadSchema.parse(rawPayload);
rows = buildRowsFromPayload(rawPayload);
return { rows, payload: validatedPayload };
} catch (error) {
error.type = 'PARSE_ERROR';
const rawValue = Buffer.isBuffer(message.value)

View File

@@ -0,0 +1,105 @@
/**
* Status Extractor
*
* Extracts a snapshot-style status update from a validated Kafka payload.
* The output is suitable for upsert into room_status.room_status_moment.
*/
/**
* Zero-pad a number to 3 digits.
* @param {number|string} val
* @returns {string} e.g. 1 -> "001", 23 -> "023"
*/
const pad3 = (val) => String(val).padStart(3, '0');
/**
* Build a 9-character loop address key from device parameters.
* Format: {dev_type(3)}{dev_addr(3)}{dev_loop(3)}
* Example: type=1, addr=23, loop=12 -> "001023012"
*/
const buildLoopKey = (devType, devAddr, devLoop) =>
`${pad3(devType)}${pad3(devAddr)}${pad3(devLoop)}`;
/**
* Extract a status update object from a validated Kafka payload.
*
* @param {Object} payload - The parsed and validated Kafka payload
* @returns {Object|null} Status update object or null if nothing to update
*/
export const extractStatusUpdate = (payload) => {
if (!payload) return null;
const {
hotel_id,
room_id,
device_id,
ts_ms,
sys_lock_status,
device_list = [],
fault_list = [],
control_list = [],
direction,
cmd_word
} = payload;
// Must have identity fields
if (hotel_id == null || !room_id || !device_id || !ts_ms) {
return null;
}
// Build dev_loops from device_list (0x36) or control_list (0x0F)
let devLoops = null;
if (device_list.length > 0) {
devLoops = {};
for (const dev of device_list) {
if (dev.dev_type != null && dev.dev_addr != null && dev.dev_loop != null) {
const key = buildLoopKey(dev.dev_type, dev.dev_addr, dev.dev_loop);
devLoops[key] = dev.dev_data ?? null;
}
}
if (Object.keys(devLoops).length === 0) devLoops = null;
}
if (control_list.length > 0) {
if (!devLoops) devLoops = {};
for (const ctrl of control_list) {
const devLoop = ctrl.dev_loop ?? ctrl.loop;
if (ctrl.dev_type != null && ctrl.dev_addr != null && devLoop != null) {
const key = buildLoopKey(ctrl.dev_type, ctrl.dev_addr, devLoop);
devLoops[key] = { type_l: ctrl.type_l ?? null, type_h: ctrl.type_h ?? null };
}
}
if (Object.keys(devLoops).length === 0) devLoops = null;
}
// Build faulty_device_count from fault_list (full replacement)
let faultyDeviceCount = null;
if (fault_list.length > 0) {
faultyDeviceCount = fault_list.map(f => ({
dev_type: f.dev_type ?? null,
dev_addr: f.dev_addr ?? null,
dev_loop: f.dev_loop ?? null,
error_type: f.error_type ?? null,
error_data: f.error_data ?? null
}));
}
// If there's absolutely nothing to update, return null
if (devLoops === null && faultyDeviceCount === null && sys_lock_status == null) {
return null;
}
// Clamp hotel_id to INT2 range (-32768 to 32767) to match production schema
const validHotelId = (hotel_id >= -32768 && hotel_id <= 32767) ? hotel_id : 0;
return {
hotel_id: validHotelId,
room_id: String(room_id),
device_id: String(device_id),
ts_ms,
sys_lock_status: sys_lock_status ?? null,
dev_loops: devLoops,
faulty_device_count: faultyDeviceCount
};
};

View File

@@ -22,6 +22,7 @@ const controlItemSchema = z.object({
dev_type: z.number().int().optional(),
dev_addr: z.number().int().optional(),
dev_loop: z.number().int().optional(),
loop: z.number().int().optional(), // Kafka uses 'loop'
type_l: z.number().int().optional(),
type_h: z.number().int().optional()
});

View File

@@ -0,0 +1,134 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { StatusBatchProcessor } from '../src/db/statusBatchProcessor.js';
// Create a mock RoomStatusManager
const createMockManager = () => ({
upsertBatch: vi.fn().mockResolvedValue(undefined),
testConnection: vi.fn().mockResolvedValue(true),
close: vi.fn().mockResolvedValue(undefined)
});
describe('StatusBatchProcessor', () => {
let mockManager;
let processor;
beforeEach(() => {
mockManager = createMockManager();
processor = new StatusBatchProcessor(mockManager, {
flushInterval: 50000, // Long interval so we control flush manually
maxBufferSize: 100
});
});
const makeUpdate = (overrides = {}) => ({
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
ts_ms: 1700000000000,
sys_lock_status: null,
dev_loops: null,
faulty_device_count: null,
...overrides
});
it('should accept and buffer a status update', () => {
processor.add(makeUpdate());
expect(processor.buffer.size).toBe(1);
});
it('should ignore null/undefined updates', () => {
processor.add(null);
processor.add(undefined);
expect(processor.buffer.size).toBe(0);
});
it('should merge dev_loops for same device (different keys accumulate)', async () => {
processor.add(makeUpdate({
dev_loops: { '001010001': 100 }
}));
processor.add(makeUpdate({
dev_loops: { '001011002': 50 }
}));
expect(processor.buffer.size).toBe(1);
await processor.flush();
expect(mockManager.upsertBatch).toHaveBeenCalledTimes(1);
const rows = mockManager.upsertBatch.mock.calls[0][0];
expect(rows).toHaveLength(1);
expect(rows[0].dev_loops).toEqual({
'001010001': 100,
'001011002': 50
});
});
it('should overwrite dev_loops for same key (newer wins)', async () => {
processor.add(makeUpdate({
dev_loops: { '001010001': 100 }
}));
processor.add(makeUpdate({
dev_loops: { '001010001': 200 }
}));
await processor.flush();
const rows = mockManager.upsertBatch.mock.calls[0][0];
expect(rows[0].dev_loops['001010001']).toBe(200);
});
it('should take latest ts_ms across merges', () => {
processor.add(makeUpdate({ ts_ms: 100 }));
processor.add(makeUpdate({ ts_ms: 300 }));
processor.add(makeUpdate({ ts_ms: 200 }));
const entry = [...processor.buffer.values()][0];
expect(entry.ts_ms).toBe(300);
});
it('should replace faulty_device_count with newer value', () => {
processor.add(makeUpdate({
faulty_device_count: [{ dev_type: 1, error_type: 1, error_data: 1 }]
}));
processor.add(makeUpdate({
faulty_device_count: [{ dev_type: 2, error_type: 2, error_data: 0 }]
}));
const entry = [...processor.buffer.values()][0];
expect(entry.faulty_device_count).toEqual([{ dev_type: 2, error_type: 2, error_data: 0 }]);
});
it('should keep different devices separate in the buffer', () => {
processor.add(makeUpdate({ device_id: 'dev_001' }));
processor.add(makeUpdate({ device_id: 'dev_002' }));
expect(processor.buffer.size).toBe(2);
});
it('should clear buffer after flush', async () => {
processor.add(makeUpdate());
expect(processor.buffer.size).toBe(1);
await processor.flush();
expect(processor.buffer.size).toBe(0);
expect(mockManager.upsertBatch).toHaveBeenCalledTimes(1);
});
it('should NOT throw when upsertBatch fails', async () => {
mockManager.upsertBatch.mockRejectedValue(new Error('DB down'));
processor.add(makeUpdate());
// flush should not throw
await expect(processor.flush()).resolves.not.toThrow();
// Buffer should still be cleared even on error
expect(processor.buffer.size).toBe(0);
});
it('should do nothing when flushing empty buffer', async () => {
await processor.flush();
expect(mockManager.upsertBatch).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,128 @@
import { describe, it, expect } from 'vitest';
import { extractStatusUpdate } from '../src/processor/statusExtractor.js';
describe('StatusExtractor', () => {
const base = {
ts_ms: 1700000000000,
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
direction: '上报',
cmd_word: '0x36',
frame_id: 1,
udp_raw: 'test',
sys_lock_status: 0,
device_list: [],
fault_list: [],
control_list: []
};
it('should return null when payload is null/undefined', () => {
expect(extractStatusUpdate(null)).toBeNull();
expect(extractStatusUpdate(undefined)).toBeNull();
});
it('should return null when nothing to update (empty lists, no sys_lock)', () => {
const payload = {
...base,
sys_lock_status: undefined,
device_list: [],
fault_list: [],
control_list: []
};
expect(extractStatusUpdate(payload)).toBeNull();
});
it('should return update when sys_lock_status is present even with empty lists', () => {
const result = extractStatusUpdate({
...base,
sys_lock_status: 1
});
expect(result).not.toBeNull();
expect(result.sys_lock_status).toBe(1);
expect(result.hotel_id).toBe(1001);
expect(result.room_id).toBe('8001');
expect(result.device_id).toBe('dev_001');
});
it('should build dev_loops from device_list with 9-digit padded keys', () => {
const result = extractStatusUpdate({
...base,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
]
});
expect(result).not.toBeNull();
expect(result.dev_loops).toEqual({
'001010001': 100,
'001011002': 0
});
});
it('should build dev_loops from control_list with type_l/type_h values', () => {
const result = extractStatusUpdate({
...base,
control_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, type_l: 0, type_h: 1 }
]
});
expect(result).not.toBeNull();
expect(result.dev_loops).toEqual({
'001010001': { type_l: 0, type_h: 1 }
});
});
it('should merge device_list and control_list into dev_loops', () => {
const result = extractStatusUpdate({
...base,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
],
control_list: [
{ dev_type: 2, dev_addr: 5, dev_loop: 3, type_l: 1, type_h: 2 }
]
});
expect(result.dev_loops).toEqual({
'001010001': 100,
'002005003': { type_l: 1, type_h: 2 }
});
});
it('should build faulty_device_count from fault_list', () => {
const result = extractStatusUpdate({
...base,
fault_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
]
});
expect(result).not.toBeNull();
expect(result.faulty_device_count).toEqual([
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
]);
});
it('should return null when identity fields are missing', () => {
expect(extractStatusUpdate({ ...base, hotel_id: null })).toBeNull();
expect(extractStatusUpdate({ ...base, room_id: '' })).toBeNull();
expect(extractStatusUpdate({ ...base, device_id: '' })).toBeNull();
expect(extractStatusUpdate({ ...base, ts_ms: null })).toBeNull();
});
it('should handle large dev_type/addr/loop values with proper zero-padding', () => {
const result = extractStatusUpdate({
...base,
device_list: [
{ dev_type: 241, dev_addr: 255, dev_loop: 999, dev_data: 42 }
]
});
expect(result.dev_loops).toEqual({
'241255999': 42
});
});
});

View File

@@ -0,0 +1,76 @@
# Room Status 状态表同步方案
## 1. 背景
我们需要将 RCU Action Server 接收到的 Kafka 数据(主要是 `0x36` 上报和 `0x0F` 下发指令),实时同步提炼并写入快照表 `room_status.room_status_moment`
该表为设备实时状态表,主要用于存储每个房间/设备的最新状态,它可能由多个微服务共同维护。当前服务主要负责基于不固定长度的通用设备回调帧来更新对应的 `dev_loops``faulty_device_count` JSONB 字段。
注意:本更新为非阻塞异步更新,旨在不拖累主力流水表(`rcu_action_events`)写入性能的同时,保证外部系统读状态表时的即时性。
## 2. 表结构与唯一性分析
**目标表**`room_status.room_status_moment`
**逻辑唯一键**`(hotel_id, room_id, device_id)`
**数据库设计策略**
- 为满足 PostgreSQL 的 `ON CONFLICT` 原子级并发合并Upsert能力表中必需包含针对 `(hotel_id, room_id, device_id)` 的唯一约束 / 索引。
- 快照表可能在独立数据库进行负载隔离,业务侧需提供其独立的 DB 池连接能力。
## 3. 字段映射方案 (当前项目)
不采用硬编码多列映射,转而采用 JSONB 结构进行灵活映射。
| 源字段 (Kafka: `0x36`/`0x0F`) | 目标表字段 | 数据类型 | 更新逻辑 |
| :--- | :--- | :--- | :--- |
| `hotel_id`, `room_id`, `device_id`| `hotel_id`,`room_id`,`device_id` | 匹配 | 主键/查询键 |
| `ts_ms` | `ts_ms` | BIGINT | 保存该快照的更新时间,取最新 |
| `sys_lock_status` | `sys_lock_status` | SMALLINT | 只在kafka传入的该字段非空时覆盖 |
| `device_list` (0x36) 或 `control_list` (0x0F) | `dev_loops` | JSONB | **合并策略 (Merge)**: <br/>提取每个元素的 `dev_type`, `dev_addr`, `dev_loop` 分别补零为 3 位并拼接成 9 位长作为 Key (如 `001023012`),其 Value 取对应的状态数据。<br/>与数据库中原有 JSONB 执行 `\|\|` 运算进行增量覆盖合并。 |
| `fault_list` (0x36) | `faulty_device_count`| JSONB | **替换策略 (Replace)**: <br/>由于 `0x36` 是上报此刻的全量故障清单,故直接将其整个存为 JSON Array 覆盖原字段即可。 |
## 4. 写入策略与性能优化
### 4.1 内存合并去重设计
Kafka 瞬发同设备大批量跳变事件会引发严重的数据库并发写压。故采取**批处理聚合**配合**JSON 局部合并**的策略:
- **`StatusBatchProcessor`** 会收集给定窗口期 (例如 `500ms`) 内产生的所有更新状态。
- 使用 `Map<hotel_id+room_id+device_id, MergedStatusData>` 来暂存去重状态。
- **深合并**:针对 `dev_loops` 更新,内存中会对同一个设备的后者对前者进行 Object 解析与替换合并,保证推到数据库的只有一条终态的最完整 SQL 语句。
### 4.2 SQL UPSERT 模板
利用 PostgreSQL `ON CONFLICT DO UPDATE SET` 自动完成字段保鲜与合并:
```sql
INSERT INTO room_status.room_status_moment (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, dev_loops, faulty_device_count
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8::jsonb
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = GREATEST(room_status.room_status_moment.ts_ms, EXCLUDED.ts_ms),
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
END,
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
WHERE EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms;
```
### 4.3 代码修改计划
为完全支持本项目的特性及独立解耦要求,接下来我们将实施以下新增及重构计划:
1. **配置层 (`src/config/config.js`)**
- 补充 `roomStatusDb` 的独立环境配置项定义(如 `ROOM_STATUS_DB_HOST` 等),支持独立连入快照库以避免抢占写日志的大库连接池。
2. **数据库管理器 (`src/db/roomStatusManager.js`)**
- 新建单独的管理器及连接池单例,专职处理向目的状态表的批量 Upsert 操作及报错封装,与原有的流水日志追加入库流程(`databaseManager.js`)从库底层隔离开来。
3. **批量写入器 (`src/db/statusBatchProcessor.js`)**
- 专属于快照表的批量聚合任务列队类。
- 实现批量数据接收保护,以及在触发定时 Flush 执行前,依据前文设计的基于 `(hotel, room, device)` 唯一维度对于 `dev_loops` 的状态内存全合并算法。
4. **状态转换提取器 (`src/processor/statusExtractor.js`)**
- 提取业务侧逻辑的编解码中间件功能。它负责消化并筛选被校验验证后的 `KafkaPayload` 模型数据,将结构里错综的 `device_list``fault_list` 提炼为符合合并规则的 `dev_loops`/`faulty_device_count` 结构扁平化独立快照对象。
5. **主流程集成 (`src/processor/index.js` 及 `src/index.js`)**
- 不改变原先 `processKafkaMessage``BatchProcessor` (`rcu_action_events`) 无脑推送的过程。
- 在主流程处理完毕的数据流中,提取有效状态对象,并以并行非阻塞的形式推送至新的 `StatusBatchProcessor.add(...)`。以此做到即使状态库网络出现波动也能保护 Kafka 的持续稳定流转消费。

View File

@@ -0,0 +1,13 @@
# 移除 online_status 字段处理
## 背景
`room_status_moment` 表中的 `online_status` 字段不由本项目RCU Action Server管理。
本项目不应从 `fault_list` 中推导 `online_status`,该字段由其他服务负责写入。
## 变更范围
- `src/processor/statusExtractor.js` — 移除 `online_status` 推导逻辑
- `src/db/roomStatusManager.js` — Upsert SQL 移除 `online_status`
- `src/db/statusBatchProcessor.js` — 合并逻辑移除 `online_status`
- `tests/status_extractor.test.js` — 删除 `online_status` 相关测试用例
- `tests/status_batch_processor.test.js` — 删除 `online_status` 相关测试用例
- `docs/plan-room-status-sync.md` — 移除字段映射行和 SQL 中的 `online_status`

View File

@@ -0,0 +1,34 @@
# Room Status Moment 实时快照同步
## 背景
将 Kafka 消费的 RCU 设备状态/控制事件(`0x36` 上报、`0x0F` 下发),在写入流水日志表 `rcu_action_events` 的同时,并行提取关键状态信息,以 Upsert 方式同步到独立的快照表 `room_status.room_status_moment`
## 变更范围
### 新增文件
- `src/db/roomStatusManager.js` — 独立连接池 + Upsert SQL 封装
- `src/db/statusBatchProcessor.js` — 快照专用的批量聚合处理器(内存合并去重)
- `src/processor/statusExtractor.js` — 从 Kafka payload 提取 `dev_loops`/`faulty_device_count`
- `tests/status_extractor.test.js` — statusExtractor 单元测试
- `tests/status_batch_processor.test.js` — statusBatchProcessor 单元测试
### 修改文件
- `src/config/config.js` — 新增 `roomStatusDb` 配置段
- `src/index.js` — 挂载 `StatusBatchProcessor` 并在消息处理流程中并行推送状态更新
- `.env.example` — 补充 `ROOM_STATUS_DB_*` 环境变量示例
### 缺陷修复 (Bug Fixes)
- `src/db/statusBatchProcessor.js` — 添加 `isFlushing` 状态锁防止高并发带来的重复刷新及数据库死锁 (`ON CONFLICT` 互锁)。
- `src/db/roomStatusManager.js` — 去除 `WHERE EXCLUDED.ts_ms >= room_status_moment.ts_ms` 条件限制,保证始终强制应用最新的状态覆盖。
- `src/schema/kafkaPayload.js` & `src/processor/statusExtractor.js` — 修正 `control_list` 的提取逻辑,兼容 Kafka 实际传输中的 `loop` 字段。
- `docs/plan-room-status-sync.md` — 明确 `sys_lock_status` 仅在 Kafka 传入的值非空时,才进行覆盖。
## 设计约束
1. 状态表写入失败**不得**阻塞主流水表写入和 Kafka offset 提交
2. 同一批次中同设备多次更新需在内存中合并后再提交数据库
3. `dev_loops` JSONB 使用增量合并 (`||`)`faulty_device_count` 使用整体替换
4. 快照表使用独立数据库连接池,与流水表连接池资源隔离
## 参考文档
- `docs/plan-room-status-sync.md`
- `docs/room_status_moment.sql`