Compare commits

...

6 Commits

Author SHA1 Message Date
21cf140c68 feat: 删除无用的测试报告,添加房间状态快照表的数据库初始化脚本及集成方案 2026-02-10 09:06:37 +08:00
680bf6a957 feat: 增加批量处理和数据库离线恢复机制以提升可靠性
- 新增 BatchProcessor 类实现消息批量插入,提高数据库写入性能
- 在 consumer 中禁用 autoCommit 并实现手动提交,确保数据一致性
- 添加数据库健康检查机制,在数据库离线时暂停消费并自动恢复
- 支持 0x0E 命令字处理,扩展消息类型识别范围
- 增加数据库连接重试逻辑,解决 Windows 环境端口冲突问题
- 更新环境变量配置,优化 Kafka 消费者参数
- 添加相关单元测试验证批量处理和可靠性功能
2026-02-04 20:36:33 +08:00
339db6f95f feat(processor): 添加循环名称自动生成的配置开关
新增环境变量 ENABLE_LOOP_NAME_AUTO_GENERATION 用于控制当缓存未命中时是否自动生成循环名称。当设置为 false 时,系统将不再生成 [类型-地址-回路] 格式的备用名称,而是直接返回 null。更新了配置文件、处理器逻辑并添加了相应的单元测试。
2026-02-03 09:26:01 +08:00
4e0f5213db feat: 添加回路名称字段并实现元数据缓存查询
在 RCU 事件处理中新增回路名称(loop_name)字段,用于标识具体设备回路。
- 在 rcu_action_events 表中添加 loop_name 字段
- 新增项目元数据缓存模块,每日从 temporary_project 表刷新房间与回路信息
- 处理消息时,根据 device_id、dev_addr 等字段查询缓存获取回路名称
- 若缓存未命中,则根据设备类型规则生成兜底名称
- 更新环境变量、文档及测试用例以适配新功能
2026-02-02 19:43:49 +08:00
0e6c5c3cc3 feat: 增强Kafka消费者配置并完善action_type分类逻辑
- 支持多实例Kafka消费者,增加并发处理能力
- 新增Kafka配置参数:最大飞行中消息数、获取字节数、等待时间等
- 完善action_type分类逻辑,根据dev_type映射为"用户操作"或"设备回路状态"
- 临时支持hex格式udp_raw自动转换为base64存储
- 增加extra字段支持上游扩展数据传递
- 优化数据库初始化脚本查找路径
- 更新PM2配置,修改应用名称和端口
- 清理开发环境日志文件,将dist目录加入.gitignore
- 更新相关文档说明
2026-01-30 20:09:46 +08:00
e0c3728b42 chore: 移除不再需要的 .gitkeep 文件
该文件用于确保空目录被 Git 追踪。现在 `dist` 目录已包含实际构建产物,不再需要占位文件。
2026-01-30 20:03:57 +08:00
40 changed files with 1880 additions and 146 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
/.github /.github
bls-rcu-action-backend/node_modules bls-rcu-action-backend/node_modules
bls-rcu-action-backend/skill bls-rcu-action-backend/skill
/bls-rcu-action-backend/dist

File diff suppressed because one or more lines are too long

View File

@@ -6,7 +6,9 @@ NODE_ENV=development
KAFKA_BROKERS=localhost:9092 KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=my-topic-name KAFKA_TOPIC=my-topic-name
KAFKA_GROUP_ID=my-group-id KAFKA_GROUP_ID=my-group-id
KAFKA_CLIENT_ID=my-client-id KAFKA_CLIENT_ID=bls-rcu-action-client
KAFKA_AUTO_COMMIT=false
KAFKA_AUTO_COMMIT_INTERVAL_MS=5000
KAFKA_CONSUMER_INSTANCES=1 KAFKA_CONSUMER_INSTANCES=1
# KAFKA_SASL_USERNAME= # KAFKA_SASL_USERNAME=
# KAFKA_SASL_PASSWORD= # KAFKA_SASL_PASSWORD=
@@ -27,3 +29,5 @@ REDIS_PASSWORD=
REDIS_DB=0 REDIS_DB=0
REDIS_PROJECT_NAME=my-project REDIS_PROJECT_NAME=my-project
REDIS_API_BASE_URL=http://localhost:3000 REDIS_API_BASE_URL=http://localhost:3000
ENABLE_LOOP_NAME_AUTO_GENERATION=true

View File

@@ -1 +0,0 @@

View File

@@ -1,24 +1,22 @@
module.exports = { module.exports = {
apps: [{ apps: [{
name: 'bls-rcu-action-backend', name: 'bls-rcu-action',
script: 'dist/index.js', script: 'dist/index.js',
instances: 1, instances: 1,
exec_mode: 'fork', exec_mode: 'fork',
autorestart: true, autorestart: true,
watch: false, watch: false,
max_memory_restart: '1G', max_memory_restart: '1G',
env_file: '.env',
env: { env: {
NODE_ENV: 'production', NODE_ENV: 'production',
PORT: 3000 PORT: 3001
},
env_development: {
NODE_ENV: 'development',
PORT: 3000
}, },
error_file: './logs/error.log', error_file: './logs/error.log',
out_file: './logs/out.log', out_file: './logs/out.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss Z', log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
merge_logs: true, merge_logs: true,
kill_timeout: 5000,
time: true time: true
}] }]
}; };

View File

@@ -1 +0,0 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689985427,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -1 +0,0 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689777074,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769688900027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689140027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689260031,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -0,0 +1,113 @@
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const readmePath = path.resolve(__dirname, '../../docs/readme.md');
const envPath = path.resolve(__dirname, '../.env');
const processorPath = path.resolve(__dirname, '../src/processor/index.js');
try {
const readmeContent = fs.readFileSync(readmePath, 'utf8');
const lines = readmeContent.split('\n');
const rules = [];
let inTable = false;
for (const line of lines) {
const trimmed = line.trim();
// Detect start of table (approximately)
if (trimmed.includes('|dev_type|名称|描述|Action Type|')) {
inTable = true;
continue;
}
// Skip separator line
if (inTable && trimmed.includes('|---|')) {
continue;
}
// Process table rows
if (inTable && trimmed.startsWith('|') && trimmed.endsWith('|')) {
const parts = trimmed.split('|').map(p => p.trim());
// parts[0] is empty, parts[1] is dev_type, parts[2] is name, parts[3] is description, parts[4] is action_type
if (parts.length >= 5) {
const devTypeStr = parts[1];
const description = parts[3];
const actionType = parts[4];
if (!devTypeStr || isNaN(parseInt(devTypeStr))) {
continue;
}
const devType = parseInt(devTypeStr, 10);
rules.push({
dev_type: devType,
name: description, // Use description as name per user request
action_type: actionType
});
}
} else if (inTable && trimmed === '') {
// Empty line might mean end of table, but let's be loose
} else if (inTable && !trimmed.startsWith('|')) {
// End of table
inTable = false;
}
}
// Sort by dev_type
rules.sort((a, b) => a.dev_type - b.dev_type);
console.log(`Found ${rules.length} rules.`);
// 1. Generate JSON for .env
const envJson = JSON.stringify(rules);
// Read existing .env
let envContent = fs.readFileSync(envPath, 'utf8');
const envKey = 'ACTION_TYPE_DEV_TYPE_RULES';
// Replace or Append
const envLine = `${envKey}='${envJson}'`;
const regex = new RegExp(`^${envKey}=.*`, 'm');
if (regex.test(envContent)) {
envContent = envContent.replace(regex, envLine);
} else {
envContent += `\n${envLine}`;
}
fs.writeFileSync(envPath, envContent, 'utf8');
console.log('Updated .env');
// 2. Generate Object for src/processor/index.js
// We need to construct the object string manually to match the code style
const mapLines = rules.map(r => {
// Escape single quotes in name if present
const safeName = r.name.replace(/'/g, "\\'");
return ` ${r.dev_type}: { name: '${safeName}', action: '${r.action_type}' }`;
});
const mapString = `const defaultDevTypeActionMap = {\n${mapLines.join(',\n')}\n};`;
let processorContent = fs.readFileSync(processorPath, 'utf8');
// Regex to replace the object.
const processorRegex = /const defaultDevTypeActionMap = \{[\s\S]*?\};/m;
if (processorRegex.test(processorContent)) {
processorContent = processorContent.replace(processorRegex, mapString);
fs.writeFileSync(processorPath, processorContent, 'utf8');
console.log('Updated src/processor/index.js');
} else {
console.error('Could not find defaultDevTypeActionMap in src/processor/index.js');
}
} catch (err) {
console.error('Error:', err);
}

View File

@@ -27,12 +27,16 @@ CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
type_h SMALLINT, type_h SMALLINT,
details JSONB, details JSONB,
extra JSONB, extra JSONB,
loop_name VARCHAR(255),
PRIMARY KEY (ts_ms, guid) PRIMARY KEY (ts_ms, guid)
) PARTITION BY RANGE (ts_ms); ) PARTITION BY RANGE (ts_ms);
ALTER TABLE rcu_action.rcu_action_events ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT ''; ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
-- Indexes for performance -- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id); CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);

View File

@@ -0,0 +1,95 @@
import cron from 'node-cron';
import dbManager from '../db/databaseManager.js';
import { logger } from '../utils/logger.js';
class ProjectMetadataCache {
constructor() {
this.roomMap = new Map(); // device_id -> room_type_id
this.loopMap = new Map(); // room_type_id:loop_address -> loop_name
}
async init() {
try {
await this.refresh();
} catch (error) {
logger.error('Initial metadata refresh failed', { error: error.message });
}
// Schedule 1:00 AM daily
cron.schedule('0 1 * * *', () => {
this.refresh().catch(err => logger.error('Scheduled metadata refresh failed', { error: err.message }));
});
}
async refresh() {
logger.info('Refreshing project metadata cache...');
const client = await dbManager.pool.connect();
try {
// Load Rooms
// temporary_project.rooms might be partitioned, but querying parent works.
const roomsRes = await client.query('SELECT device_id, room_type_id FROM temporary_project.rooms');
const newRoomMap = new Map();
for (const row of roomsRes.rows) {
if (row.device_id) {
newRoomMap.set(row.device_id, row.room_type_id);
}
}
// Load Loops
const loopsRes = await client.query('SELECT room_type_id, loop_address, loop_name FROM temporary_project.loops');
const newLoopMap = new Map();
for (const row of loopsRes.rows) {
if (row.room_type_id && row.loop_address) {
// loop_address is varchar, we will key it as string
const key = `${row.room_type_id}:${row.loop_address}`;
newLoopMap.set(key, row.loop_name);
}
}
this.roomMap = newRoomMap;
this.loopMap = newLoopMap;
logger.info('Project metadata cache refreshed', {
roomsCount: this.roomMap.size,
loopsCount: this.loopMap.size
});
} catch (error) {
// If schema/tables don't exist, this will throw.
// We log but don't crash the app, as this is an enhancement feature.
logger.error('Failed to refresh project metadata', { error: error.message });
throw error;
} finally {
client.release();
}
}
/**
* Get loop name for a given device configuration
* @param {string} deviceId - The device ID (from room)
* @param {number|string} devType - The device type
* @param {number|string} devAddr - The device address
* @param {number|string} devLoop - The device loop
* @returns {string|null} - The loop name or null if not found
*/
getLoopName(deviceId, devType, devAddr, devLoop) {
if (!deviceId ||
devType === undefined || devType === null ||
devAddr === undefined || devAddr === null ||
devLoop === undefined || devLoop === null) {
return null;
}
const roomTypeId = this.roomMap.get(deviceId);
if (!roomTypeId) return null;
// Construct loop_address: 3-digit zero-padded concatenation of type, addr, loop
// e.g. type=1, addr=23, loop=12 -> 001023012
const fmt = (val) => String(val).padStart(3, '0');
const loopAddress = `${fmt(devType)}${fmt(devAddr)}${fmt(devLoop)}`;
const key = `${roomTypeId}:${loopAddress}`;
return this.loopMap.get(key) || null;
}
}
export default new ProjectMetadataCache();

View File

@@ -22,6 +22,12 @@ export const config = {
groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group', groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group',
clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client', clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client',
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 500),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5000),
logMessages: process.env.KAFKA_LOG_MESSAGES === 'true',
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain', mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain',
username: process.env.KAFKA_SASL_USERNAME, username: process.env.KAFKA_SASL_USERNAME,
@@ -46,5 +52,6 @@ export const config = {
db: parseNumber(process.env.REDIS_DB, 0), db: parseNumber(process.env.REDIS_DB, 0),
projectName: process.env.REDIS_PROJECT_NAME || 'bls-rcu-action', projectName: process.env.REDIS_PROJECT_NAME || 'bls-rcu-action',
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}` apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
} },
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
}; };

View File

@@ -0,0 +1,69 @@
export class BatchProcessor {
constructor(dbManager, config, options = {}) {
this.dbManager = dbManager;
this.config = config;
this.batchSize = options.batchSize || 500;
this.flushInterval = options.flushInterval || 1000;
this.buffer = [];
this.timer = null;
}
add(item) {
return new Promise((resolve, reject) => {
this.buffer.push({ ...item, resolve, reject });
if (this.buffer.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
});
}
async flush() {
if (this.buffer.length === 0) return;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
const currentBatch = [...this.buffer];
this.buffer = [];
const allRows = currentBatch.flatMap(item => item.rows);
if (allRows.length === 0) {
// No rows to insert (e.g. empty messages), just resolve
currentBatch.forEach(item => item.resolve(0));
return;
}
try {
await this.dbManager.insertRows({
schema: this.config.db.schema,
table: this.config.db.table,
rows: allRows
});
// Resolve each item with its own row count
currentBatch.forEach(item => item.resolve(item.rows.length));
} catch (error) {
// Enrich error with DB context if possible (using first item as sample)
error.type = 'DB_ERROR';
const sample = allRows[0];
error.dbContext = {
batchSize: currentBatch.length,
totalRows: allRows.length,
sampleRow: sample ? {
guid: sample.guid,
ts_ms: sample.ts_ms,
action_type: sample.action_type,
cmd_word: sample.cmd_word
} : null
};
// Reject all items in the batch
currentBatch.forEach(item => item.reject(error));
}
}
}

View File

@@ -28,7 +28,8 @@ const columns = [
'type_l', 'type_l',
'type_h', 'type_h',
'details', 'details',
'extra' 'extra',
'loop_name'
]; ];
export class DatabaseManager { export class DatabaseManager {
@@ -71,6 +72,15 @@ export class DatabaseManager {
} }
} }
async testConnection() {
try {
await this.pool.query('SELECT 1');
return true;
} catch (error) {
return false;
}
}
async close() { async close() {
await this.pool.end(); await this.pool.end();
} }

View File

@@ -40,9 +40,25 @@ class DatabaseInitializer {
ssl: ssl ? { rejectUnauthorized: false } : false ssl: ssl ? { rejectUnauthorized: false } : false
}); });
const maxRetries = 5;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
await client.connect();
break;
} catch (err) {
if (err.code === 'EADDRINUSE') {
retryCount++;
logger.warn(`Port conflict (EADDRINUSE) connecting to database, retrying (${retryCount}/${maxRetries})...`);
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw err;
}
}
}
try { try {
await client.connect();
const checkRes = await client.query( const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`, `SELECT 1 FROM pg_database WHERE datname = $1`,
[database] [database]
@@ -68,7 +84,15 @@ class DatabaseInitializer {
// dbManager connects to the target database // dbManager connects to the target database
const client = await dbManager.pool.connect(); const client = await dbManager.pool.connect();
try { try {
const sqlPath = path.resolve(__dirname, '../../scripts/init_db.sql'); const sqlPathCandidates = [
path.resolve(process.cwd(), 'scripts/init_db.sql'),
path.resolve(__dirname, '../scripts/init_db.sql'),
path.resolve(__dirname, '../../scripts/init_db.sql')
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
}
const sql = fs.readFileSync(sqlPath, 'utf8'); const sql = fs.readFileSync(sqlPath, 'utf8');
logger.info('Executing init_db.sql...'); logger.info('Executing init_db.sql...');

View File

@@ -3,17 +3,22 @@ import { config } from './config/config.js';
import dbManager from './db/databaseManager.js'; import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js'; import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js'; import partitionManager from './db/partitionManager.js';
import { createKafkaConsumer } from './kafka/consumer.js'; import projectMetadata from './cache/projectMetadata.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js'; import { processKafkaMessage } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js'; import { createRedisClient } from './redis/redisClient.js';
import { RedisIntegration } from './redis/redisIntegration.js'; import { RedisIntegration } from './redis/redisIntegration.js';
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js'; import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
import { MetricCollector } from './utils/metricCollector.js'; import { MetricCollector } from './utils/metricCollector.js';
import { logger } from './utils/logger.js'; import { logger } from './utils/logger.js';
import { BatchProcessor } from './db/batchProcessor.js';
const bootstrap = async () => { const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions) // 0. Initialize Database (Create DB, Schema, Table, Partitions)
await dbInitializer.initialize(); await dbInitializer.initialize();
// 0.1 Initialize Project Metadata Cache
await projectMetadata.init();
// Metric Collector // Metric Collector
const metricCollector = new MetricCollector(); const metricCollector = new MetricCollector();
@@ -68,6 +73,10 @@ const bootstrap = async () => {
const errorQueueKey = buildErrorQueueKey(config.redis.projectName); const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
const batchProcessor = new BatchProcessor(dbManager, config, {
batchSize: config.kafka.maxInFlight
});
const handleMessage = async (message) => { const handleMessage = async (message) => {
if (message.topic) { if (message.topic) {
metricCollector.increment('kafka_pulled'); metricCollector.increment('kafka_pulled');
@@ -79,14 +88,25 @@ const bootstrap = async () => {
const messageKey = Buffer.isBuffer(message.key) const messageKey = Buffer.isBuffer(message.key)
? message.key.toString('utf8') ? message.key.toString('utf8')
: message.key; : message.key;
logger.info('Kafka message received', { if (config.kafka.logMessages) {
topic: message.topic, logger.info('Kafka message received', {
partition: message.partition, topic: message.topic,
offset: message.offset, partition: message.partition,
key: messageKey, offset: message.offset,
value: messageValue key: messageKey,
}); value: messageValue
const inserted = await processKafkaMessage({ message, dbManager, config }); });
} else {
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
valueLength: typeof messageValue === 'string' ? messageValue.length : null
});
}
const rows = await processKafkaMessage({ message });
const inserted = await batchProcessor.add({ rows });
metricCollector.increment('db_inserted'); metricCollector.increment('db_inserted');
logger.info('Kafka message processed', { inserted }); logger.info('Kafka message processed', { inserted });
} catch (error) { } catch (error) {
@@ -143,10 +163,24 @@ const bootstrap = async () => {
} }
}; };
const consumer = createKafkaConsumer({ const healthCheck = {
shouldPause: async (error) => {
if (error?.type === 'DB_ERROR') {
const isConnected = await dbManager.testConnection();
return !isConnected;
}
return false;
},
check: async () => {
return await dbManager.testConnection();
}
};
const consumers = createKafkaConsumers({
kafkaConfig: config.kafka, kafkaConfig: config.kafka,
onMessage: handleMessage, onMessage: handleMessage,
onError: handleError onError: handleError,
healthCheck
}); });
// Start retry worker (non-blocking) // Start retry worker (non-blocking)
@@ -170,9 +204,9 @@ const bootstrap = async () => {
try { try {
// 1. Close Kafka Consumer // 1. Close Kafka Consumer
if (consumer) { if (consumers && consumers.length > 0) {
await new Promise((resolve) => consumer.close(true, resolve)); await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve))));
logger.info('Kafka consumer closed'); logger.info('Kafka consumer closed', { count: consumers.length });
} }
// 2. Stop Redis Heartbeat (if method exists, otherwise just close client) // 2. Stop Redis Heartbeat (if method exists, otherwise just close client)

View File

@@ -3,29 +3,94 @@ import { logger } from '../utils/logger.js';
const { ConsumerGroup } = kafka; const { ConsumerGroup } = kafka;
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => { const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex, healthCheck }) => {
const kafkaHost = kafkaConfig.brokers.join(','); const kafkaHost = kafkaConfig.brokers.join(',');
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
let inFlight = 0;
let isPausedForHealth = false;
const consumer = new ConsumerGroup( const consumer = new ConsumerGroup(
{ {
kafkaHost, kafkaHost,
groupId: kafkaConfig.groupId, groupId: kafkaConfig.groupId,
clientId: kafkaConfig.clientId, clientId,
id,
fromOffset: 'earliest', fromOffset: 'earliest',
protocol: ['roundrobin'], protocol: ['roundrobin'],
outOfRangeOffset: 'latest', outOfRangeOffset: 'latest',
autoCommit: true, autoCommit: false,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
sasl: kafkaConfig.sasl sasl: kafkaConfig.sasl
}, },
kafkaConfig.topic kafkaConfig.topic
); );
const tryResume = () => {
if (!isPausedForHealth && inFlight < maxInFlight) {
consumer.resume();
}
};
consumer.on('message', (message) => { consumer.on('message', (message) => {
onMessage(message).catch((error) => { inFlight += 1;
logger.error('Kafka message handling failed', { error: error?.message }); if (inFlight >= maxInFlight) {
if (onError) { consumer.pause();
onError(error, message); }
} return Promise.resolve(onMessage(message))
}); .then(() => {
consumer.commit((err) => {
if (err) {
logger.error('Kafka commit failed', { error: err.message });
}
});
})
.catch(async (error) => {
logger.error('Kafka message handling failed', { error: error?.message });
let shouldCommit = true;
if (!isPausedForHealth && healthCheck && await healthCheck.shouldPause(error)) {
shouldCommit = false;
isPausedForHealth = true;
consumer.pause();
logger.warn('Pausing consumer due to dependency failure. Entering recovery mode...');
const checkInterval = setInterval(async () => {
try {
const isHealthy = await healthCheck.check();
if (isHealthy) {
clearInterval(checkInterval);
isPausedForHealth = false;
consumer.resume();
logger.info('Dependency recovered. Resuming consumer.');
}
} catch (err) {
logger.error('Health check failed', { error: err.message });
}
}, 60000);
}
if (shouldCommit) {
consumer.commit((err) => {
if (err) {
logger.error('Kafka commit failed (error case)', { error: err.message });
}
});
}
if (onError) {
onError(error, message);
}
})
.finally(() => {
inFlight -= 1;
tryResume();
});
}); });
consumer.on('error', (error) => { consumer.on('error', (error) => {
@@ -37,3 +102,14 @@ export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => {
return consumer; return consumer;
}; };
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError, healthCheck }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from({ length: count }, (_, idx) =>
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx, healthCheck })
);
};
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError, healthCheck }) =>
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck })[0];

View File

@@ -1,5 +1,7 @@
import { createGuid } from '../utils/uuid.js'; import { createGuid } from '../utils/uuid.js';
import { kafkaPayloadSchema } from '../schema/kafkaPayload.js'; import { kafkaPayloadSchema } from '../schema/kafkaPayload.js';
import projectMetadata from '../cache/projectMetadata.js';
import { config } from '../config/config.js';
const normalizeDirection = (value) => { const normalizeDirection = (value) => {
if (!value) return null; if (!value) return null;
@@ -23,15 +25,14 @@ const normalizeCmdWord = (value) => {
} }
return trimmed; return trimmed;
} }
// The Zod schema might have already converted numbers to strings, but let's be safe
if (typeof value === 'number' && Number.isFinite(value)) { if (typeof value === 'number' && Number.isFinite(value)) {
return `0x${value.toString(16).toLowerCase()}`; return `0x${value.toString(16).toLowerCase()}`;
} }
return null; return null;
}; };
const resolveActionType = (direction, cmdWord) => { const resolveMessageType = (direction, cmdWord) => {
if (cmdWord === '0x36') { if (cmdWord === '0x36' || cmdWord === '0x0e') {
return '36上报'; return '36上报';
} }
if (cmdWord === '0x0f' && direction === '下发') { if (cmdWord === '0x0f' && direction === '下发') {
@@ -43,6 +44,106 @@ const resolveActionType = (direction, cmdWord) => {
return null; return null;
}; };
const defaultDevTypeActionMap = {
0: { name: '无效设备(也可以被认为是场景)', action: '无效' },
1: { name: '强电继电器(输出状态)', action: '设备回路状态' },
2: { name: '弱电输入(输入状态)', action: '用户操作' },
3: { name: '弱电输出(输出状态)', action: '设备回路状态' },
4: { name: '服务信息', action: '设备回路状态' },
5: { name: '干节点窗帘', action: '设备回路状态' },
6: { name: '开关', action: '用户操作' },
7: { name: '空调', action: '用户操作' },
8: { name: '红外感应', action: '用户操作' },
9: { name: '空气质量检测设备', action: '设备回路状态' },
10: { name: '插卡取电', action: '用户操作' },
11: { name: '地暖', action: '用户操作' },
12: { name: 'RCU 设备网络 - 没使用', action: '' },
13: { name: '窗帘', action: '设备回路状态' },
14: { name: '继电器', action: '设备回路状态' },
15: { name: '红外发送', action: '设备回路状态' },
16: { name: '调光驱动', action: '设备回路状态' },
17: { name: '可控硅调光(可控硅状态)', action: '设备回路状态' },
18: { name: '灯带(灯带状态) --2025-11-24 取消', action: '无效' },
19: { name: '中控', action: '无效' },
20: { name: '微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)', action: '无效' },
21: { name: '背景音乐(背景音乐状态)', action: '设备回路状态' },
22: { name: '房态下发', action: '无效' },
23: { name: '主机本地 调光', action: '无效' },
24: { name: '485PWM 调光( PWM 调光状态)', action: '无效' },
25: { name: '总线调光( PBLED 调光状态) - 没使用 -', action: '无效' },
26: { name: 'RCU 电源', action: '无效' },
27: { name: 'A9IO 开关', action: '用户操作' },
28: { name: 'A9IO 扩展', action: '设备回路状态' },
29: { name: 'A9IO 电源', action: '设备回路状态' },
30: { name: '无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)', action: '无效' },
31: { name: '无线网关主动(用于主动控制主动设备)', action: '无效' },
32: { name: '无线门磁', action: '用户操作' },
33: { name: '空气参数显示设备', action: '设备回路状态' },
34: { name: '无线继电器红外', action: '设备回路状态' },
35: { name: '时间同步', action: '设备回路状态' },
36: { name: '监控控制', action: '无效' },
37: { name: '旋钮开关控制', action: '用户操作' },
38: { name: 'CSIO - 类型', action: '设备回路状态' },
39: { name: '插卡状态虚拟设备', action: '设备回路状态' },
40: { name: '485 新风设备', action: '用户操作' },
41: { name: '485 人脸机', action: '用户操作' },
42: { name: '中控', action: '无效' },
43: { name: '域控', action: '无效' },
44: { name: 'LCD', action: '设备回路状态' },
45: { name: '无卡断电 --2025-11-24 取消', action: '无效' },
46: { name: '无卡取电 2', action: '用户操作' },
47: { name: '虚拟时间设备', action: '设备回路状态' },
48: { name: 'PLC 总控', action: '设备回路状态' },
49: { name: 'PLC 设备 - 恒流调光设备', action: '设备回路状态' },
50: { name: 'PLC 设备 - 恒压调光设备', action: '设备回路状态' },
51: { name: 'PLC 设备 - 继电器设备', action: '设备回路状态' },
52: { name: '色温调节功能', action: '设备回路状态' },
53: { name: '蓝牙音频', action: '设备回路状态' },
54: { name: '碳达人', action: '用户操作' },
55: { name: '场景还原', action: '用户操作' },
56: { name: '全局设置', action: '设备回路状态' },
57: { name: '能耗检测', action: '设备回路状态' },
241: { name: 'CSIO - 类型', action: '设备回路状态' }
};
// Parse env rules if present
let devTypeActionRules = [];
try {
if (process.env.ACTION_TYPE_DEV_TYPE_RULES) {
const parsed = JSON.parse(process.env.ACTION_TYPE_DEV_TYPE_RULES);
if (Array.isArray(parsed)) {
devTypeActionRules = parsed;
}
}
} catch (error) {
// Silent fallback
}
const getActionTypeByDevType = (devType) => {
// 1. Env override
const rule = devTypeActionRules.find(r => r.dev_type === devType);
if (rule?.action_type) return rule.action_type;
// 2. Default map
const entry = defaultDevTypeActionMap[devType];
return entry?.action || '设备回路状态';
};
const getDevTypeName = (devType) => {
// 1. Env override
const rule = devTypeActionRules.find(r => r.dev_type === devType);
if (rule?.name) return rule.name;
// 2. Default map
const entry = defaultDevTypeActionMap[devType];
return entry?.name || 'Unknown';
};
const resolveDevTypeAction = (devType) => {
if (devType === null || devType === undefined) return '设备回路状态';
return getActionTypeByDevType(devType);
};
const parseKafkaPayload = (value) => { const parseKafkaPayload = (value) => {
const raw = Buffer.isBuffer(value) ? value.toString('utf8') : value; const raw = Buffer.isBuffer(value) ? value.toString('utf8') : value;
if (typeof raw !== 'string') { if (typeof raw !== 'string') {
@@ -56,6 +157,27 @@ export const buildRowsFromMessageValue = (value) => {
return buildRowsFromPayload(payload); return buildRowsFromPayload(payload);
}; };
const isHexString = (value) => {
if (typeof value !== 'string') {
return false;
}
const trimmed = value.trim();
if (!trimmed) {
return false;
}
const compact = trimmed.replace(/[\s:]/g, '');
if (compact.length === 0 || compact.length % 2 !== 0) {
return false;
}
return /^[0-9a-fA-F]+$/.test(compact);
};
const hexToBase64 = (hex) => {
const compact = hex.replace(/[\s:]/g, '');
const buffer = Buffer.from(compact, 'hex');
return buffer.toString('base64');
};
export const buildRowsFromPayload = (rawPayload) => { export const buildRowsFromPayload = (rawPayload) => {
// 1. Validate and transform payload using Zod schema // 1. Validate and transform payload using Zod schema
const payload = kafkaPayloadSchema.parse(rawPayload); const payload = kafkaPayloadSchema.parse(rawPayload);
@@ -68,18 +190,23 @@ export const buildRowsFromPayload = (rawPayload) => {
direction, direction,
cmd_word: cmdWord, cmd_word: cmdWord,
frame_id: frameId, frame_id: frameId,
udp_raw: udpRaw, udp_raw: udpRawRaw,
sys_lock_status: sysLockStatus, sys_lock_status: sysLockStatus,
report_count: reportCount, report_count: reportCount,
fault_count: faultCount, fault_count: faultCount,
device_list: deviceList, // Zod provides default [] device_list: deviceList, // Zod provides default []
fault_list: faultList, // Zod provides default [] fault_list: faultList, // Zod provides default []
control_list: controlList // Zod provides default [] control_list: controlList, // Zod provides default []
extra
} = payload; } = payload;
// Temporary migration logic: if udp_raw is hex, convert to base64 before insert.
// This is a transitional mechanism and should be removed once all upstream systems send base64 directly.
const udpRaw = isHexString(udpRawRaw) ? hexToBase64(udpRawRaw) : udpRawRaw;
const normalizedDirection = normalizeDirection(direction); const normalizedDirection = normalizeDirection(direction);
const normalizedCmdWord = normalizeCmdWord(cmdWord); const normalizedCmdWord = normalizeCmdWord(cmdWord);
const actionType = resolveActionType(normalizedDirection, normalizedCmdWord); const messageType = resolveMessageType(normalizedDirection, normalizedCmdWord);
const writeTsMs = Date.now(); const writeTsMs = Date.now();
// Base fields common to all rows (excluding unique ID) // Base fields common to all rows (excluding unique ID)
@@ -88,12 +215,11 @@ export const buildRowsFromPayload = (rawPayload) => {
write_ts_ms: writeTsMs, write_ts_ms: writeTsMs,
hotel_id: hotelId, hotel_id: hotelId,
room_id: roomId, room_id: roomId,
device_id: deviceId, // Pass through normalized/validated device_id device_id: deviceId,
direction: normalizedDirection, direction: normalizedDirection,
cmd_word: normalizedCmdWord, cmd_word: normalizedCmdWord,
frame_id: frameId, frame_id: frameId,
udp_raw: udpRaw, udp_raw: udpRaw,
action_type: actionType,
sys_lock_status: sysLockStatus ?? null, sys_lock_status: sysLockStatus ?? null,
report_count: reportCount ?? null, report_count: reportCount ?? null,
fault_count: faultCount ?? null, fault_count: faultCount ?? null,
@@ -107,13 +233,31 @@ export const buildRowsFromPayload = (rawPayload) => {
type_l: null, type_l: null,
type_h: null, type_h: null,
details: null, details: null,
extra: { raw_hex: udpRaw } extra: extra || {}
};
// Helper to generate loop name if not found in cache
const getLoopNameWithFallback = (deviceId, devType, devAddr, devLoop) => {
// 1. Try cache
const cachedName = projectMetadata.getLoopName(deviceId, devType, devAddr, devLoop);
if (cachedName) return cachedName;
// 2. Check config for auto-generation
if (!config.enableLoopNameAutoGeneration) {
return null;
}
// 3. Fallback: [TypeName-Addr-Loop]
const typeName = getDevTypeName(devType);
if (!typeName) return null; // Should have a name if devType is valid
return `[${devType}${typeName}-${devAddr}-${devLoop}]`;
}; };
const rows = []; const rows = [];
// Logic 1: 0x36 Status/Fault Report // Logic 1: 0x36 Status/Fault Report
if (actionType === '36上报') { if (messageType === '36上报') {
const details = { const details = {
device_list: deviceList, device_list: deviceList,
fault_list: faultList fault_list: faultList
@@ -122,6 +266,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Process device status list // Process device status list
if (deviceList.length > 0) { if (deviceList.length > 0) {
deviceList.forEach(device => { deviceList.forEach(device => {
const actionType = resolveDevTypeAction(device.dev_type);
rows.push({ rows.push({
...commonFields, ...commonFields,
guid: createGuid(), guid: createGuid(),
@@ -129,6 +274,8 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_addr: device.dev_addr ?? null, dev_addr: device.dev_addr ?? null,
dev_loop: device.dev_loop ?? null, dev_loop: device.dev_loop ?? null,
dev_data: device.dev_data ?? null, dev_data: device.dev_data ?? null,
action_type: actionType,
loop_name: getLoopNameWithFallback(deviceId, device.dev_type, device.dev_addr, device.dev_loop),
details details
}); });
}); });
@@ -137,6 +284,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Process fault list // Process fault list
if (faultList.length > 0) { if (faultList.length > 0) {
faultList.forEach(fault => { faultList.forEach(fault => {
const actionType = resolveDevTypeAction(fault.dev_type);
rows.push({ rows.push({
...commonFields, ...commonFields,
guid: createGuid(), guid: createGuid(),
@@ -146,6 +294,8 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_loop: fault.dev_loop ?? null, dev_loop: fault.dev_loop ?? null,
error_type: fault.error_type ?? null, error_type: fault.error_type ?? null,
error_data: fault.error_data ?? null, error_data: fault.error_data ?? null,
action_type: actionType,
loop_name: getLoopNameWithFallback(deviceId, fault.dev_type, fault.dev_addr, fault.dev_loop),
details details
}); });
}); });
@@ -156,6 +306,7 @@ export const buildRowsFromPayload = (rawPayload) => {
rows.push({ rows.push({
...commonFields, ...commonFields,
guid: createGuid(), guid: createGuid(),
action_type: '设备回路状态',
details details
}); });
} }
@@ -164,7 +315,7 @@ export const buildRowsFromPayload = (rawPayload) => {
} }
// Logic 2: 0x0F Control Command // Logic 2: 0x0F Control Command
if (actionType === '0F下发') { if (messageType === '0F下发') {
const details = { const details = {
control_list: controlList control_list: controlList
}; };
@@ -179,6 +330,8 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_loop: control.dev_loop ?? null, dev_loop: control.dev_loop ?? null,
type_l: control.type_l ?? null, type_l: control.type_l ?? null,
type_h: control.type_h ?? null, type_h: control.type_h ?? null,
action_type: '下发控制',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details details
}); });
}); });
@@ -189,6 +342,7 @@ export const buildRowsFromPayload = (rawPayload) => {
rows.push({ rows.push({
...commonFields, ...commonFields,
guid: createGuid(), guid: createGuid(),
action_type: '下发控制',
details details
}); });
} }
@@ -196,20 +350,37 @@ export const buildRowsFromPayload = (rawPayload) => {
return rows; return rows;
} }
// Logic 3: 0x0F ACK or others // 3. 0x0F ACK
// Default behavior: single row else if (messageType === '0FACK') {
return [{ const { control_list: controls = [] } = payload;
...commonFields, if (Array.isArray(controls)) {
guid: createGuid(), const details = { control_list: controls };
details: {} controls.forEach((control) => {
}]; rows.push({
...commonFields,
guid: createGuid(),
dev_type: control.dev_type ?? null,
dev_addr: control.dev_addr ?? null,
dev_loop: control.dev_loop ?? null,
dev_data: control.dev_data ?? null,
type_h: control.type_h ?? null,
action_type: '设备回路状态',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details
});
});
}
}
return rows;
}; };
export const processKafkaMessage = async ({ message, dbManager, config }) => { export const processKafkaMessage = async ({ message }) => {
let rows; let rows;
try { try {
const payload = parseKafkaPayload(message.value); const payload = parseKafkaPayload(message.value);
rows = buildRowsFromPayload(payload); rows = buildRowsFromPayload(payload);
return rows;
} catch (error) { } catch (error) {
error.type = 'PARSE_ERROR'; error.type = 'PARSE_ERROR';
const rawValue = Buffer.isBuffer(message.value) const rawValue = Buffer.isBuffer(message.value)
@@ -221,27 +392,4 @@ export const processKafkaMessage = async ({ message, dbManager, config }) => {
} }
throw error; throw error;
} }
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
} catch (error) {
error.type = 'DB_ERROR';
const sample = rows?.[0];
error.dbContext = {
rowsLength: rows?.length || 0,
sampleRow: sample
? {
guid: sample.guid,
ts_ms: sample.ts_ms,
action_type: sample.action_type,
cmd_word: sample.cmd_word,
direction: sample.direction,
device_id: sample.device_id
}
: null
};
throw error;
}
return rows.length;
}; };

View File

@@ -55,5 +55,10 @@ export const kafkaPayloadSchema = z.object({
// Lists // Lists
device_list: listSchema(deviceItemSchema), device_list: listSchema(deviceItemSchema),
fault_list: listSchema(faultItemSchema), fault_list: listSchema(faultItemSchema),
control_list: listSchema(controlItemSchema) control_list: listSchema(controlItemSchema),
extra: z.preprocess(
(value) => (value === null ? {} : value),
z.any().optional().default({})
)
}); });

View File

@@ -12,6 +12,9 @@ export const logger = {
info(message, context) { info(message, context) {
process.stdout.write(`${format('info', message, context)}\n`); process.stdout.write(`${format('info', message, context)}\n`);
}, },
warn(message, context) {
process.stdout.write(`${format('warn', message, context)}\n`);
},
error(message, context) { error(message, context) {
process.stderr.write(`${format('error', message, context)}\n`); process.stderr.write(`${format('error', message, context)}\n`);
} }

View File

@@ -0,0 +1,97 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { BatchProcessor } from '../src/db/batchProcessor.js';
describe('BatchProcessor', () => {
let dbManager;
let config;
let batchProcessor;
beforeEach(() => {
vi.useFakeTimers();
dbManager = {
insertRows: vi.fn().mockResolvedValue(true)
};
config = {
db: { schema: 'test_schema', table: 'test_table' }
};
batchProcessor = new BatchProcessor(dbManager, config, { batchSize: 3, flushInterval: 1000 });
});
afterEach(() => {
vi.useRealTimers();
});
it('should buffer items and not flush until batch size is reached', async () => {
const p1 = batchProcessor.add({ rows: ['r1'] });
const p2 = batchProcessor.add({ rows: ['r2'] });
expect(dbManager.insertRows).not.toHaveBeenCalled();
const p3 = batchProcessor.add({ rows: ['r3'] });
// Wait for microtasks
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1', 'r2', 'r3']
});
await expect(p1).resolves.toBe(1);
await expect(p2).resolves.toBe(1);
await expect(p3).resolves.toBe(1);
});
it('should flush when timer expires', async () => {
const p1 = batchProcessor.add({ rows: ['r1'] });
expect(dbManager.insertRows).not.toHaveBeenCalled();
vi.advanceTimersByTime(1000);
// Wait for microtasks
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1']
});
await expect(p1).resolves.toBe(1);
});
it('should handle db error and reject all pending promises', async () => {
dbManager.insertRows.mockRejectedValue(new Error('DB Fail'));
const p1 = batchProcessor.add({ rows: ['r1'] });
const p2 = batchProcessor.add({ rows: ['r2'] });
const p3 = batchProcessor.add({ rows: ['r3'] }); // Triggers flush
await expect(p1).rejects.toThrow('DB Fail');
await expect(p2).rejects.toThrow('DB Fail');
await expect(p3).rejects.toThrow('DB Fail');
});
it('should handle mixed batch sizes', async () => {
// 3 items with different row counts
const p1 = batchProcessor.add({ rows: ['r1', 'r2'] });
const p2 = batchProcessor.add({ rows: [] }); // Empty rows
const p3 = batchProcessor.add({ rows: ['r3'] });
await Promise.resolve();
expect(dbManager.insertRows).toHaveBeenCalledWith({
schema: 'test_schema',
table: 'test_table',
rows: ['r1', 'r2', 'r3']
});
await expect(p1).resolves.toBe(2);
await expect(p2).resolves.toBe(0);
await expect(p3).resolves.toBe(1);
});
});

View File

@@ -0,0 +1,124 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { createKafkaConsumers } from '../src/kafka/consumer.js';
import kafka from 'kafka-node';
// Mock kafka-node
vi.mock('kafka-node', () => {
return {
ConsumerGroup: vi.fn(),
default: { ConsumerGroup: vi.fn() }
};
});
describe('Consumer Reliability', () => {
let mockConsumer;
let onMessage;
let onError;
let healthCheck;
const kafkaConfig = {
brokers: ['localhost:9092'],
groupId: 'test-group',
clientId: 'test-client',
topic: 'test-topic',
autoCommitIntervalMs: 5000
};
beforeEach(() => {
vi.clearAllMocks();
mockConsumer = {
on: vi.fn(),
commit: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
close: vi.fn()
};
kafka.ConsumerGroup.mockImplementation(function() {
return mockConsumer;
});
onMessage = vi.fn().mockResolvedValue(true);
onError = vi.fn();
healthCheck = {
shouldPause: vi.fn().mockResolvedValue(false),
check: vi.fn().mockResolvedValue(true)
};
});
it('should initialize with autoCommit: false', () => {
createKafkaConsumers({ kafkaConfig, onMessage, onError });
expect(kafka.ConsumerGroup).toHaveBeenCalledWith(
expect.objectContaining({ autoCommit: false }),
expect.anything()
);
});
it('should commit offset after successful message processing', async () => {
createKafkaConsumers({ kafkaConfig, onMessage, onError });
// Simulate 'message' event
const message = { value: 'test' };
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler(message);
expect(onMessage).toHaveBeenCalledWith(message);
expect(mockConsumer.commit).toHaveBeenCalled();
});
it('should NOT commit if processing fails and health check says pause', async () => {
onMessage.mockRejectedValue(new Error('Fail'));
healthCheck.shouldPause.mockResolvedValue(true);
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler({ value: 'test' });
expect(mockConsumer.commit).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalled();
});
it('should commit if processing fails but health check says continue (Data Error)', async () => {
onMessage.mockRejectedValue(new Error('Data Error'));
healthCheck.shouldPause.mockResolvedValue(false); // Do not pause, it's just bad data
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
await messageHandler({ value: 'bad_data' });
expect(mockConsumer.commit).toHaveBeenCalled(); // Should commit to move past bad data
expect(onError).toHaveBeenCalled(); // Should still report error
});
it('should pause and enter recovery mode if healthCheck.shouldPause returns true', async () => {
vi.useFakeTimers();
onMessage.mockRejectedValue(new Error('DB Error'));
healthCheck.shouldPause.mockResolvedValue(true);
healthCheck.check.mockResolvedValueOnce(false).mockResolvedValueOnce(true); // Fail once, then succeed
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
// Trigger error
await messageHandler({ value: 'fail' });
expect(mockConsumer.pause).toHaveBeenCalled();
expect(healthCheck.shouldPause).toHaveBeenCalled();
// Fast-forward time for interval check (1st check - fails)
await vi.advanceTimersByTimeAsync(60000);
expect(healthCheck.check).toHaveBeenCalledTimes(1);
expect(mockConsumer.resume).not.toHaveBeenCalled();
// Fast-forward time for interval check (2nd check - succeeds)
await vi.advanceTimersByTimeAsync(60000);
expect(healthCheck.check).toHaveBeenCalledTimes(2);
expect(mockConsumer.resume).toHaveBeenCalled();
vi.useRealTimers();
});
});

View File

@@ -0,0 +1,47 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { buildRowsFromPayload } from '../src/processor/index.js';
import { config } from '../src/config/config.js';
describe('Feature Toggle: Loop Name Auto Generation', () => {
const basePayload = {
ts_ms: 1700000000000,
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
direction: '上报',
cmd_word: '0x36',
frame_id: 1,
udp_raw: '00',
sys_lock_status: 0,
report_count: 0,
fault_count: 0,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
]
};
let originalConfigValue;
beforeEach(() => {
originalConfigValue = config.enableLoopNameAutoGeneration;
});
afterEach(() => {
config.enableLoopNameAutoGeneration = originalConfigValue;
});
it('should generate loop_name when flag is true', () => {
config.enableLoopNameAutoGeneration = true;
const rows = buildRowsFromPayload(basePayload);
// Expect format: [1强电继电器输出状态-10-1]
// The exact name depends on the map, but it should contain brackets and numbers
expect(rows[0].loop_name).toBeDefined();
expect(rows[0].loop_name).toMatch(/^\[1.*-10-1\]$/);
});
it('should NOT generate loop_name when flag is false', () => {
config.enableLoopNameAutoGeneration = false;
const rows = buildRowsFromPayload(basePayload);
expect(rows[0].loop_name).toBeNull();
});
});

View File

@@ -1,5 +1,18 @@
import { describe, it, expect } from 'vitest'; import { describe, it, expect, vi } from 'vitest';
import { buildRowsFromPayload } from '../src/processor/index.js'; import { buildRowsFromPayload } from '../src/processor/index.js';
import projectMetadata from '../src/cache/projectMetadata.js';
// Mock config to ensure loop name generation is enabled
vi.mock('../src/config/config.js', async (importOriginal) => {
const actual = await importOriginal();
return {
...actual,
config: {
...actual.config,
enableLoopNameAutoGeneration: true,
},
};
});
describe('Processor Logic', () => { describe('Processor Logic', () => {
const basePayload = { const basePayload = {
@@ -10,7 +23,7 @@ describe('Processor Logic', () => {
direction: '上报', direction: '上报',
cmd_word: '0x36', cmd_word: '0x36',
frame_id: 1, frame_id: 1,
udp_raw: '3601...', udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
sys_lock_status: 0, sys_lock_status: 0,
report_count: 0, report_count: 0,
fault_count: 0 fault_count: 0
@@ -35,7 +48,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload); const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2); expect(rows).toHaveLength(2);
expect(rows[0].action_type).toBe('36上报'); expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_addr).toBe(10); expect(rows[0].dev_addr).toBe(10);
expect(rows[1].dev_addr).toBe(11); expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2); expect(rows[0].details.device_list).toHaveLength(2);
@@ -54,7 +67,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload); const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报'); expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].error_type).toBe(2); expect(rows[0].error_type).toBe(2);
}); });
@@ -85,7 +98,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload); const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0F下发'); expect(rows[0].action_type).toBe('下发控制');
expect(rows[0].type_l).toBe(1); expect(rows[0].type_l).toBe(1);
expect(rows[0].type_h).toBe(2); expect(rows[0].type_h).toBe(2);
expect(rows[0].dev_loop).toBe(1); expect(rows[0].dev_loop).toBe(1);
@@ -95,12 +108,15 @@ describe('Processor Logic', () => {
const payload = { const payload = {
...basePayload, ...basePayload,
direction: '上报', direction: '上报',
cmd_word: '0x0F' cmd_word: '0x0F',
control_list: [
{ dev_type: 1, dev_addr: 1, dev_loop: 1, dev_data: 1, type_h: 0 }
]
}; };
const rows = buildRowsFromPayload(payload); const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0FACK'); expect(rows[0].action_type).toBe('设备回路状态');
}); });
it('should fallback when lists are empty for 0x36', () => { it('should fallback when lists are empty for 0x36', () => {
@@ -114,7 +130,167 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload); const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报'); expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_type).toBeNull(); expect(rows[0].dev_type).toBeNull();
}); });
it('should classify 0x36 as 用户操作 when dev_type is user-operated', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 2, dev_addr: 10, dev_loop: 1, dev_data: 100 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('用户操作');
});
it('should store udp_raw as base64 when input is hex string', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: []
};
const expectedBase64 = Buffer.from(payload.udp_raw.replace(/[\s:]/g, ''), 'hex').toString('base64');
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].udp_raw).toBe(expectedBase64);
});
it('should keep udp_raw unchanged when input is not hex string', () => {
const payload = {
...basePayload,
udp_raw: 'YWJjMTIz',
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].udp_raw).toBe('YWJjMTIz');
});
it('should default extra to empty object when not provided', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: []
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].extra).toEqual({});
});
it('should preserve extra when provided by upstream', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: [],
extra: {
source: 'upstream',
trace_id: 'trace-123'
}
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].extra).toEqual({
source: 'upstream',
trace_id: 'trace-123'
});
});
it('should enrich rows with loop_name from metadata', () => {
// Mock metadata
projectMetadata.roomMap.set('dev_001', 101);
// Key format: roomTypeId:00Type00Addr00Loop
// type=1, addr=10, loop=1 -> 001010001
projectMetadata.loopMap.set('101:001010001', 'Main Chandelier');
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }, // Should match 001010001
{ dev_type: 1, dev_addr: 10, dev_loop: 2, dev_data: 0 } // Should not match (001010002) -> Fallback
]
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].loop_name).toBe('Main Chandelier');
// dev_type 1 is '强电继电器(输出状态)'
expect(rows[1].loop_name).toBe('[1强电继电器输出状态-10-2]');
});
});
describe('Processor Logic - 0x0E Support', () => {
const basePayload = {
ts_ms: 1700000000000,
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
direction: '上报',
cmd_word: '0x0E',
frame_id: 1,
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
sys_lock_status: 0,
report_count: 0,
fault_count: 0
};
it('should handle 0x0E Status Report with device list (same as 0x36)', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0E',
report_count: 2,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2);
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_addr).toBe(10);
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
});
it('should handle 0x0E Fault Report', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0E',
fault_count: 1,
fault_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].error_type).toBe(2);
expect(rows[0].cmd_word).toBe('0x0e');
});
}); });

View File

@@ -24,7 +24,7 @@ JSON 消息由 **Header 信息** 和 **业务列表数据** 组成。
| **direction** | String | **是** | "上报" 或 "下发" | | **direction** | String | **是** | "上报" 或 "下发" |
| **cmd_word** | String | **是** | 命令字 (如 "0x36", "0x0F") | | **cmd_word** | String | **是** | 命令字 (如 "0x36", "0x0F") |
| **frame_id** | Number | **是** | 通讯帧号 | | **frame_id** | Number | **是** | 通讯帧号 |
| **udp_raw** | String | **是** | UDP 原始报文 (作为备份/校验) | | **udp_raw** | String | **是** | UDP 原始报文备份推荐上游直接发送base64字符串当前若上游仍发送十六进制字符串后端会在入库前临时将其转换为base64后续会取消该自动转换机制 |
| **sys_lock_status** | Number | 否 | 系统锁状态 (0:未锁, 1:锁定) | | **sys_lock_status** | Number | 否 | 系统锁状态 (0:未锁, 1:锁定) |
| **report_count** | Number | 否 | 上报设备数量 (对应 device_list 长度) | | **report_count** | Number | 否 | 上报设备数量 (对应 device_list 长度) |
| **fault_count** | Number | 否 | 故障设备数量 (对应 fault_list 长度) | | **fault_count** | Number | 否 | 故障设备数量 (对应 fault_list 长度) |

View File

@@ -75,4 +75,22 @@ ACK (待补充)
5. 队列结构 5. 队列结构
队列分区数6 队列分区数6
Topicblwlog4Nodejs-rcu-action-topic Topicblwlog4Nodejs-rcu-action-topic
6. 入库前特殊操作
- 定期从temporary_project表中读取数据并保存到`内存`以全局变量的形式每天凌晨1点从数据库更新一次。
- 每条数据写库之前需要根据项目ID从内存中读取项目信息。
- 我需要在现有数据表`rcu_action_events`里,添加一个字段:`loop_name`,用于存储回路名称。
- 查询`loop_name`的方法是:
- 根据要插入的数据中的`device_id``rooms`表中找出对应的房间 -> 得到 `room_type_id`
- 根据 `room_type_id``loop_address``loops` 表中查找对应的 `loop_name`
- `loop_address` 的生成规则:将数据的 `dev_type``dev_addr``dev_loop` 分别转换为 3 位字符串(不足前方补 0然后拼接。
- 例如:`dev_type=1, dev_addr=23, dev_loop=12` -> `001` + `023` + `012` -> `001023012`
- **兜底逻辑**:如果根据上述规则在 `loops` 缓存中未找到对应的 `loop_name`,则使用 `dev_type` 对应的设备名称(配置在 `ACTION_TYPE_DEV_TYPE_RULES` 中)默认名称。
- 格式:`[dev_type名称+'-'+dev_addr+'-'+dev_loop]`
- 例如:`dev_type=35` (名称: TimeCtrl), `addr=14`, `loop=21` -> `[35TimeCtrl-14-21]`
- 最后将找到的或生成的 `loop_name` 写入 `rcu_action_events` 表。
- 注意,所有查库操作都要通过内存缓存来实现。

View File

@@ -30,7 +30,7 @@
| **direction** | VARCHAR(10) | 数据方向 | **必填** (L53) "上报"/"下发" Index | | **direction** | VARCHAR(10) | 数据方向 | **必填** (L53) "上报"/"下发" Index |
| **cmd_word** | VARCHAR(10) | 命令字 | **必填** (L54) 如 "0x36", "0x0F" Index | | **cmd_word** | VARCHAR(10) | 命令字 | **必填** (L54) 如 "0x36", "0x0F" Index |
| **frame_id** | INTEGER | 通讯帧号 | **必填** (L55) 用于串联命令与状态 | | **frame_id** | INTEGER | 通讯帧号 | **必填** (L55) 用于串联命令与状态 |
| **udp_raw** | TEXT | UDP消息原文 | **必填** (L56) Hex字符串 | | **udp_raw** | TEXT | UDP消息原文 | **必填** (L56) base64字符串当前若上游仍发送十六进制字符串则后端会在入库前临时转换为base64后续会取消该自动转换机制 |
| **action_type** | VARCHAR(20) | 记录行为类型 | **必填** (L57) Index | | **action_type** | VARCHAR(20) | 记录行为类型 | **必填** (L57) Index |
| **sys_lock_status** | SMALLINT | 系统锁状态 | (L59) 可空 | | **sys_lock_status** | SMALLINT | 系统锁状态 | (L59) 可空 |
| **report_count** | SMALLINT | 本次上报数量 | (L60) 可空 | | **report_count** | SMALLINT | 本次上报数量 | (L60) 可空 |
@@ -44,16 +44,80 @@
| **type_l** | SMALLINT | 执行方式 | 可空 (0x0F下发) | | **type_l** | SMALLINT | 执行方式 | 可空 (0x0F下发) |
| **type_h** | SMALLINT | 执行内容 | 可空 (0x0F下发) | | **type_h** | SMALLINT | 执行内容 | 可空 (0x0F下发) |
| **details** | JSONB | 业务详情数据 | 存储不定长设备列表、故障信息等 | | **details** | JSONB | 业务详情数据 | 存储不定长设备列表、故障信息等 |
| **extra** | JSONB | 扩展信息 | 存储通讯原文等扩展数据 | | **extra** | JSONB | 扩展信息 | 存储上游传入的extra扩展字段如未提供则为空对象 |
**主键定义**: `(ts_ms, guid)` **主键定义**: `(ts_ms, guid)`
**索引定义**: 备注带index的字段为需要索引的字段用于提高查询效率。 **索引定义**: 备注带index的字段为需要索引的字段用于提高查询效率。
### 3.2 字典定义 ### 3.2 字典定义
**Action Type (记录行为类型)**: **Action Type (记录行为类型)**:
- `"0FACK"`: ACK (应答) - 枚举值ACK和下发控制是0x0F的特殊情况用户操作和设备回路状态是0x36的枚举值
- `"0F下发"`: 下发控制 (0x0F 下发) - ACKACK是0x0F的上报独有的所以如果0F且Direction为上报就标记为ACK
- `"36上报"`: 设备回路状态 (0x36 上报) - 下发控制0x0F的Direction为下发指令记录为下发控制
- 用户操作0x36上报 的开关、温控器等客户操作设备产生的,属于用户操作
- 设备回路状态0x36上报 的灯光、继电器回路等变化等受控设备,属于设备回路状态
- 用户操作和设备回路状态的具体区分表根据本行数据的dev_type来区分注意这张表是根据dev_type来区分的所以dev_type不能改变否则会导致数据错误另外这个表要写入env配置文件以数组形式保存随时可以更改
|dev_type|名称|描述|Action Type|
|---|---|---|---|
|0|Dev_Host_Invalid|无效设备(也可以被认为是场景)|无效|
|1|Dev_Host_HVout|强电继电器(输出状态)|设备回路状态|
|2|Dev_Host_LVinput|弱电输入(输入状态)|用户操作|
|3|Dev_Host_LVoutput|弱电输出(输出状态)|设备回路状态|
|4|Dev_Host_Service|服务信息|设备回路状态|
|5|Dev_NodeCurtain|干节点窗帘|设备回路状态|
|6|DEV_RS485_SWT|开关|用户操作|
|7|DEV_RS485_TEMP|空调|用户操作|
|8|DEV_RS485_INFRARED|红外感应|用户操作|
|9|DEV_RS485_AirDetect|空气质量检测设备|设备回路状态|
|10|DEV_RS485_CARD|插卡取电|用户操作|
|11|DEV_RS485_HEATER|地暖|用户操作|
|12|Dev_RCU_NET|RCU 设备网络 - 没使用||
|13|DEV_RS485_CURTAIN|窗帘|设备回路状态|
|14|DEV_RS485_RELAY|继电器|设备回路状态|
|15|DEV_RS485_IR_SEND|红外发送|设备回路状态|
|16|DEV_RS485_DIMMING|调光驱动|设备回路状态|
|17|DEV_RS485_TRAIC|可控硅调光(可控硅状态)|设备回路状态|
|18|DEV_RS485_STRIP|灯带(灯带状态) --2025-11-24 取消|无效|
|19|DEV_RS485_CoreCtrl|中控|无效|
|20|DEV_RS485_WxLock|微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)|无效|
|21|DEV_RS485_MUSIC|背景音乐(背景音乐状态)|设备回路状态|
|22|DEV_NET_ROOMSTATE|房态下发|无效|
|23|Dev_Host_PWMLight|主机本地 调光|无效|
|24|DEV_RS485_PWM|485PWM 调光( PWM 调光状态)|无效|
|25|DEV_PB_LED|总线调光( PBLED 调光状态) - 没使用 -|无效|
|26|DEV_RCU_POWER|RCU 电源|无效|
|27|DEV_RS485_A9_IO_SWT|A9IO 开关|用户操作|
|28|DEV_RS485_A9_IO_EXP|A9IO 扩展|设备回路状态|
|29|DEV_RS485_A9_IO_POWER|A9IO 电源|设备回路状态|
|30|DEV_RS485_RFGatewayCycle|无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)|无效|
|31|DEV_RS485_RFGatewayHost|无线网关主动(用于主动控制主动设备)|无效|
|32|DEV_RS485_RFGatewayDoor|无线门磁|用户操作|
|33|DEV_RS485_AirReveal|空气参数显示设备|设备回路状态|
|34|DEV_RS485_RFGatewayRelayPir|无线继电器红外|设备回路状态|
|35|Dev_Host_TimeCtrl|时间同步|设备回路状态|
|36|Dev_Rs458_MonitorCtrl|监控控制|无效|
|37|Dev_Rs458_RotaryCtrl|旋钮开关控制|用户操作|
|38|Dev_BUS_C5IO|CSIO - 类型|设备回路状态|
|39|Dev_RS485_CardState|插卡状态虚拟设备|设备回路状态|
|40|DEV_RS485_FreshAir|485 新风设备|用户操作|
|41|DEV_RS485_FaceMach|485 人脸机|用户操作|
|42|DEV_Center_Control|中控|无效|
|43|DEV_Domain_Control|域控|无效|
|44|DEV_RS485_LCD|LCD|设备回路状态|
|45|DEV_Virtual_NoCard|无卡断电 --2025-11-24 取消|无效|
|46|DEV_Virtual_Card|无卡取电 2|用户操作|
|47|DEV_Virtual_Time|虚拟时间设备|设备回路状态|
|48|Dev_Rs485_PB20|PLC 总控|设备回路状态|
|49|Dev_Rs485_PB20_LD|PLC 设备 - 恒流调光设备|设备回路状态|
|50|Dev_Rs485_PB20_LS|PLC 设备 - 恒压调光设备|设备回路状态|
|51|Dev_Rs485_PB20_Relay|PLC 设备 - 继电器设备|设备回路状态|
|52|DEV_Virtual_ColorTemp|色温调节功能|设备回路状态|
|53|Dev_485_BLE_Music|蓝牙音频|设备回路状态|
|54|DEV_Carbon_Saved|碳达人|用户操作|
|55|Dev_Scene_Restore|场景还原|用户操作|
|56|Dev_Virtual_GlobalSet|全局设置|设备回路状态|
|57|Dev_Energy_Monitor|能耗检测|设备回路状态|
|241|Dev_BUS_C5IO|CSIO - 类型|设备回路状态|
**Direction (方向)**: **Direction (方向)**:
- `"上报"`: Upload - `"上报"`: Upload
@@ -89,4 +153,3 @@
**Mapping**: **Mapping**:
- `details`: `{ "ack_code": "0x00" }` - `details`: `{ "ack_code": "0x00" }`
- `extra`: `{ "raw_hex": "..." }` - `extra`: `{ "raw_hex": "..." }`

141
docs/room_status_moment.sql Normal file
View File

@@ -0,0 +1,141 @@
-- ============================================================================
-- 数据库初始化脚本
-- 描述:创建 log_platform 库逻辑参考、room_status 模式及 room_status_moment 分区表
-- 对应项目需求project.md #L57-65
-- ============================================================================
-- 注意:在 PostgreSQL 中CREATE DATABASE 不能在事务块中执行。
-- 通常建议先手动创建数据库,然后再执行后续脚本。
-- CREATE DATABASE log_platform;
-- 切换到 log_platform 数据库后执行以下内容:
-- 1. 创建模式
CREATE SCHEMA IF NOT EXISTS room_status;
-- 2. 创建主表 (使用声明式分区)
-- 根据需求 L57-65考虑后期十万级以上数据的扩展按 hotel_id 进行 LIST 分区
CREATE TABLE IF NOT EXISTS room_status.room_status_moment (
-- 基础标识字段
guid UUID NOT NULL,
ts_ms INT8 NOT NULL DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT,
hotel_id INT2 NOT NULL,
room_id TEXT NOT NULL,
device_id TEXT NOT NULL,
-- 设备状态字段
sys_lock_status INT2,
online_status INT2,
launcher_version TEXT,
app_version TEXT,
config_version TEXT,
register_ts_ms INT8,
upgrade_ts_ms INT8,
config_ts_ms INT8,
ip TEXT,
-- 房间业务状态字段
pms_status INT2,
power_state INT2,
cardless_state INT2,
service_mask INT8,
insert_card INT2,
bright_g INT2,
agreement_ver TEXT,
-- 空调相关
air_address TEXT[],
air_state INT2[],
air_model INT2[],
air_speed INT2[],
air_set_temp INT2[],
air_now_temp INT2[],
air_solenoid_valve INT2[],
-- 能耗相关
elec_address TEXT[],
elec_voltage DOUBLE PRECISION[],
elec_ampere DOUBLE PRECISION[],
elec_power DOUBLE PRECISION[],
elec_phase DOUBLE PRECISION[],
elec_energy DOUBLE PRECISION[],
elec_sum_energy DOUBLE PRECISION[],
-- 节能与外设
carbon_state INT2,
dev_loops JSONB,
energy_carbon_sum DOUBLE PRECISION,
energy_nocard_sum DOUBLE PRECISION,
external_device JSONB DEFAULT '{}',
faulty_device_count JSONB DEFAULT '{}',
-- 约束:分区表的主键必须包含分区键 (hotel_id)
PRIMARY KEY (hotel_id, room_id, device_id, guid)
) PARTITION BY LIST (hotel_id);
-- 3. 创建索引 (针对高频查询字段)
-- 注意:在分区表上创建索引会自动在所有子表上创建对应的索引
CREATE INDEX IF NOT EXISTS idx_room_status_moment_hotel_room ON room_status.room_status_moment (hotel_id, room_id);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_device_id ON room_status.room_status_moment (device_id);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_sys_lock ON room_status.room_status_moment (sys_lock_status);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_online ON room_status.room_status_moment (online_status);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_pms ON room_status.room_status_moment (pms_status);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_power ON room_status.room_status_moment (power_state);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_cardless ON room_status.room_status_moment (cardless_state);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_insert_card ON room_status.room_status_moment (insert_card);
CREATE INDEX IF NOT EXISTS idx_room_status_moment_carbon ON room_status.room_status_moment (carbon_state);
-- 3.1 唯一索引 (支持 UPSERT)
-- 必须在 (hotel_id, room_id, device_id) 上建立唯一约束,才能使用 ON CONFLICT
CREATE UNIQUE INDEX IF NOT EXISTS idx_room_status_unique_device
ON room_status.room_status_moment (hotel_id, room_id, device_id);
-- 4. 示例:创建第一个分区 (hotel_id = 1)
-- 实际部署时,可根据 hotel_id 动态创建分区
CREATE TABLE IF NOT EXISTS room_status.room_status_moment_h1
PARTITION OF room_status.room_status_moment
FOR VALUES IN (1);
-- 5. 添加表和字段注释
COMMENT ON TABLE room_status.room_status_moment IS '房间即时状态表 - 记录设备及房间业务的最新实时状态';
COMMENT ON COLUMN room_status.room_status_moment.guid IS '主键 guid uuid 32位无符号UUID';
COMMENT ON COLUMN room_status.room_status_moment.ts_ms IS '最后更新时间';
COMMENT ON COLUMN room_status.room_status_moment.hotel_id IS '酒店';
COMMENT ON COLUMN room_status.room_status_moment.room_id IS '房间';
COMMENT ON COLUMN room_status.room_status_moment.device_id IS '设备编号';
COMMENT ON COLUMN room_status.room_status_moment.sys_lock_status IS '系统锁状态';
COMMENT ON COLUMN room_status.room_status_moment.online_status IS '设备在线状态';
COMMENT ON COLUMN room_status.room_status_moment.launcher_version IS '设备launcher版本';
COMMENT ON COLUMN room_status.room_status_moment.app_version IS '设备App版本';
COMMENT ON COLUMN room_status.room_status_moment.config_version IS '设备配置版本';
COMMENT ON COLUMN room_status.room_status_moment.register_ts_ms IS '最后一次注册时间';
COMMENT ON COLUMN room_status.room_status_moment.upgrade_ts_ms IS '最后一次升级时间';
COMMENT ON COLUMN room_status.room_status_moment.config_ts_ms IS '最后一次下发配置时间';
COMMENT ON COLUMN room_status.room_status_moment.ip IS '当前公网IP地址';
COMMENT ON COLUMN room_status.room_status_moment.pms_status IS 'PMS状态';
COMMENT ON COLUMN room_status.room_status_moment.power_state IS '取电状态';
COMMENT ON COLUMN room_status.room_status_moment.cardless_state IS '有、无人状态';
COMMENT ON COLUMN room_status.room_status_moment.service_mask IS '服务状态';
COMMENT ON COLUMN room_status.room_status_moment.insert_card IS '插卡状态';
COMMENT ON COLUMN room_status.room_status_moment.air_address IS '空调地址';
COMMENT ON COLUMN room_status.room_status_moment.air_state IS '空调状态';
COMMENT ON COLUMN room_status.room_status_moment.air_model IS '空调模型';
COMMENT ON COLUMN room_status.room_status_moment.air_speed IS '空调风速';
COMMENT ON COLUMN room_status.room_status_moment.air_set_temp IS '空调设置温度';
COMMENT ON COLUMN room_status.room_status_moment.air_now_temp IS '房间当前温度';
COMMENT ON COLUMN room_status.room_status_moment.air_solenoid_valve IS '空调电磁阀状态';
COMMENT ON COLUMN room_status.room_status_moment.elec_address IS '能耗表地址';
COMMENT ON COLUMN room_status.room_status_moment.elec_voltage IS '能耗表电压';
COMMENT ON COLUMN room_status.room_status_moment.elec_ampere IS '能耗表电流';
COMMENT ON COLUMN room_status.room_status_moment.elec_power IS '能耗表功率';
COMMENT ON COLUMN room_status.room_status_moment.elec_phase IS '当前相位';
COMMENT ON COLUMN room_status.room_status_moment.elec_energy IS '能耗表能耗';
COMMENT ON COLUMN room_status.room_status_moment.elec_sum_energy IS '能耗表累计能耗';
COMMENT ON COLUMN room_status.room_status_moment.carbon_state IS '碳达人状态';
COMMENT ON COLUMN room_status.room_status_moment.bright_g IS '光亮值';
COMMENT ON COLUMN room_status.room_status_moment.agreement_ver IS '协议版本';
COMMENT ON COLUMN room_status.room_status_moment.dev_loops IS '回路状态';
COMMENT ON COLUMN room_status.room_status_moment.energy_carbon_sum IS '碳达人节能累计';
COMMENT ON COLUMN room_status.room_status_moment.energy_nocard_sum IS '无卡节能累计';
COMMENT ON COLUMN room_status.room_status_moment.external_device IS '外设设备管理(数组)';
COMMENT ON COLUMN room_status.room_status_moment.faulty_device_count IS '故障设备数量';

231
docs/temporary_project.sql Normal file
View File

@@ -0,0 +1,231 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 109
Source Server Type : PostgreSQL
Source Server Version : 150004 (150004)
Source Host : 10.8.8.109:5433
Source Catalog : log_platform
Source Schema : temporary_project
Target Server Type : PostgreSQL
Target Server Version : 150004 (150004)
File Encoding : 65001
Date: 02/02/2026 14:28:41
*/
-- ----------------------------
-- Table structure for hotels
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."hotels";
CREATE TABLE "temporary_project"."hotels" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"hotel_name" varchar(255) COLLATE "pg_catalog"."default",
"id" int4
)
;
-- ----------------------------
-- Table structure for loops_default
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."loops_default";
CREATE TABLE "temporary_project"."loops_default" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
)
;
-- ----------------------------
-- Table structure for room_type
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."room_type";
CREATE TABLE "temporary_project"."room_type" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"room_type_name" varchar(255) COLLATE "pg_catalog"."default",
"hotel_id" int4
)
;
-- ----------------------------
-- Table structure for rooms_default
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."rooms_default";
CREATE TABLE "temporary_project"."rooms_default" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"room_type_id" int4,
"device_id" varchar(50) COLLATE "pg_catalog"."default",
"mac" varchar(50) COLLATE "pg_catalog"."default",
"id" int4
)
;
-- ----------------------------
-- Table structure for loops
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."loops";
CREATE TABLE "temporary_project"."loops" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
)
PARTITION BY (
)
;
-- ----------------------------
-- Table structure for rooms
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."rooms";
CREATE TABLE "temporary_project"."rooms" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"room_type_id" int4,
"device_id" varchar(50) COLLATE "pg_catalog"."default",
"mac" varchar(50) COLLATE "pg_catalog"."default",
"id" int4
)
PARTITION BY LIST (
"hotel_id" "pg_catalog"."int4_ops"
)
;
ALTER TABLE "temporary_project"."rooms" ATTACH PARTITION "temporary_project"."rooms_default" DEFAULT;
-- ----------------------------
-- Indexes structure for table hotels
-- ----------------------------
CREATE INDEX "idx_hotels_hotel_id" ON "temporary_project"."hotels" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_hotels_hotel_name" ON "temporary_project"."hotels" USING btree (
"hotel_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_hotels_id" ON "temporary_project"."hotels" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table hotels
-- ----------------------------
ALTER TABLE "temporary_project"."hotels" ADD CONSTRAINT "hotels_pkey" PRIMARY KEY ("hotel_id", "guid");
-- ----------------------------
-- Indexes structure for table loops_default
-- ----------------------------
CREATE INDEX "loops_default_id_idx" ON "temporary_project"."loops_default" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_address_idx" ON "temporary_project"."loops_default" USING btree (
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_name_idx" ON "temporary_project"."loops_default" USING btree (
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_type_idx" ON "temporary_project"."loops_default" USING btree (
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_room_type_id_idx" ON "temporary_project"."loops_default" USING btree (
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table loops_default
-- ----------------------------
ALTER TABLE "temporary_project"."loops_default" ADD CONSTRAINT "loops_default_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
-- ----------------------------
-- Indexes structure for table room_type
-- ----------------------------
CREATE INDEX "idx_room_type_hotel_id" ON "temporary_project"."room_type" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_room_type_id" ON "temporary_project"."room_type" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_room_type_name" ON "temporary_project"."room_type" USING btree (
"room_type_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table room_type
-- ----------------------------
ALTER TABLE "temporary_project"."room_type" ADD CONSTRAINT "room_type_pkey" PRIMARY KEY ("guid", "id");
-- ----------------------------
-- Indexes structure for table rooms_default
-- ----------------------------
CREATE INDEX "rooms_default_device_id_idx" ON "temporary_project"."rooms_default" USING btree (
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_hotel_id_idx" ON "temporary_project"."rooms_default" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_id_idx" ON "temporary_project"."rooms_default" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_mac_idx" ON "temporary_project"."rooms_default" USING btree (
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table rooms_default
-- ----------------------------
ALTER TABLE "temporary_project"."rooms_default" ADD CONSTRAINT "rooms_default_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");
-- ----------------------------
-- Indexes structure for table loops
-- ----------------------------
CREATE INDEX "idx_loops_address" ON "temporary_project"."loops" USING btree (
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_id" ON "temporary_project"."loops" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_name" ON "temporary_project"."loops" USING btree (
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_room_type_id" ON "temporary_project"."loops" USING btree (
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_type" ON "temporary_project"."loops" USING btree (
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table loops
-- ----------------------------
ALTER TABLE "temporary_project"."loops" ADD CONSTRAINT "loops_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
-- ----------------------------
-- Indexes structure for table rooms
-- ----------------------------
CREATE INDEX "idx_rooms_device_id" ON "temporary_project"."rooms" USING btree (
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_hotel_id" ON "temporary_project"."rooms" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_id" ON "temporary_project"."rooms" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_mac" ON "temporary_project"."rooms" USING btree (
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table rooms
-- ----------------------------
ALTER TABLE "temporary_project"."rooms" ADD CONSTRAINT "rooms_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");

View File

@@ -1,45 +0,0 @@
# 测试报告
## 基本信息
- 运行时间: 2026-01-29
- 运行方式: 控制台启动 `npm run dev`,运行约 60 秒后 Ctrl + C 终止
- 测试目标: 验证 Kafka 消费与入库链路,定位无入库原因
## 控制台关键日志
```
{"level":"error","message":"Message processing failed","timestamp":1769734880590,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n }\n]","type":"PARSE_ERROR","stack":"ZodError: ...","rawPayload":"{\"ts_ms\":1769692878011,\"hotel_id\":\"2147\",\"room_id\":\"8209\",\"device_id\":\"099008129081\",\"direction\":\"上报\",\"cmd_word\":\"36\",\"frame_id\":52496,...}","validationIssues":[{"expected":"number","code":"invalid_type","path":["hotel_id"],"message":"Invalid input: expected number, received string"}]}}
```
## 结论
- 数据未入库的直接原因: Kafka 消息在解析阶段触发 Zod 校验失败,`hotel_id` 为字符串类型而非文档要求的 Number导致 `PARSE_ERROR`,数据库插入流程未执行。
## 与文档格式的一致性检查
对照 [kafka_format.md](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/docs/kafka_format.md):
- `hotel_id`: 文档要求 Number但实测为字符串 (示例: `"2147"`),不一致。
- `cmd_word`: 文档要求 `"0x36"`/`"0x0F"`,实测为 `"36"`,不一致。
- `control_list`: 文档要求 Array/可选,但实测为 `null`,不一致。
- 其余关键字段如 `ts_ms`, `room_id`, `device_id`, `direction`, `udp_raw` 均存在。
## 已增强的控制台错误输出
为了便于定位异常,以下模块已经增加详细错误输出到 PowerShell 控制台:
- Kafka 处理异常: 输出 `type`, `stack`, `rawPayload`, `validationIssues`, `dbContext`
- 数据库插入异常: 输出 `schema`, `table`, `rowsLength`
- Redis 入队与重试异常: 输出详细错误信息
相关改动文件:
- [index.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/index.js)
- [databaseManager.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/db/databaseManager.js)
- [errorQueue.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/redis/errorQueue.js)
## 建议修改方向
以下为解决无入库问题的可选方案,由你决定是否执行:
1. 上游严格按文档输出:
- `hotel_id` 改为 Number
- `cmd_word` 改为 `"0x36"` / `"0x0F"`
- `control_list``[]` 或省略字段,避免 `null`
2. 下游放宽校验并做类型转换:
-`hotel_id` 支持字符串并转换为 Number
- 继续兼容 `cmd_word = "36"` 的写法
- `control_list/device_list/fault_list` 接受 `null` 并转为空数组
当前代码已兼容 `cmd_word="36"``control_list=null`,但 `hotel_id` 仍按文档严格要求 Number。

View File

@@ -0,0 +1 @@
{"version":"4.0.18","results":[[":bls-rcu-action-backend/tests/consumer_reliability.test.js",{"duration":9.49589999999995,"failed":false}]]}

View File

@@ -0,0 +1,115 @@
# Room Status Moment 集成方案
## 1. 背景
我们需要将一个新的数据库表 `room_status.room_status_moment`(快照表)集成到现有的 Kafka 处理流程中。
该表用于存储每个房间/设备的最新状态。
现有的逻辑(批量插入到 `rcu_action_events`)必须保持不变。
## 2. 数据库配置
新表位于一个独立的数据库中(可能是 `log_platform`,或者现有数据库中的新模式 `room_status`)。
我们将添加对 `ROOM_STATUS` 独立连接池的支持,以确保灵活性。
**环境变量配置:**
```env
# 现有数据库配置
DB_HOST=...
...
# 新 Room Status 数据库配置 (如果未提供,默认使用现有数据库,但使用独立的连接池)
ROOM_STATUS_DB_HOST=...
ROOM_STATUS_DB_PORT=...
ROOM_STATUS_DB_USER=...
ROOM_STATUS_DB_PASSWORD=...
ROOM_STATUS_DB_DATABASE=log_platform <-- SQL 脚本中的目标数据库名
ROOM_STATUS_DB_SCHEMA=room_status
```
## 3. 字段映射策略
目标表:`room_status.room_status_moment`
唯一键:`(hotel_id, room_id, device_id)`
| 源字段 (Kafka) | 目标字段 | 更新逻辑 |
| :--- | :--- | :--- |
| `hotel_id` | `hotel_id` | 主键/索引键 |
| `room_id` | `room_id` | 主键/索引键 |
| `device_id` | `device_id` | 主键/索引键 |
| `ts_ms` | `ts_ms` | 始终更新为最新值 |
| `sys_lock_status` | `sys_lock_status` | 直接映射 (如果存在) |
| `device_list` (0x36) <br> `control_list` (0x0F) | `dev_loops` (JSONB) | **合并策略 (Merge)**: <br> Key: `001002003` (Type(3)+Addr(3)+Loop(3)) <br> Value: `dev_data` (int) <br> 操作: `old_json || new_json` (旧值合并新值) |
| `fault_list` (0x36) | `faulty_device_count` (JSONB) | **替换策略 (Replace)**: <br> 由于 0x36 上报的是完整故障列表,我们直接覆盖该字段。<br> 内容: `{dev_type, dev_addr, dev_loop, error_type, error_data}` 的列表 |
| `fault_list` -> item `error_type=1` | `online_status` | 如果 `error_data=1` -> 离线 (0) <br> 如果 `error_data=0` -> 在线 (1) <br> *需要验证具体的映射约定* |
**关于在线状态 (Online Status) 的说明**:
文档描述: "0x01: 0:在线 1:离线"。
表字段 `online_status` 类型为 INT2。
约定:通常 1=在线, 0=离线。
逻辑:
- 如果故障类型 0x01, 数据 0 (在线) -> 设置 `online_status` = 1
- 如果故障类型 0x01, 数据 1 (离线) -> 设置 `online_status` = 0
- 否则 -> 不更新 `online_status`
## 4. Upsert 逻辑 (PostgreSQL)
我们将使用 `INSERT ... ON CONFLICT DO UPDATE` 语法。
```sql
INSERT INTO room_status.room_status_moment (
guid, ts_ms, hotel_id, room_id, device_id,
sys_lock_status, online_status,
dev_loops, faulty_device_count
) VALUES (
$guid, $ts_ms, $hotel_id, $room_id, $device_id,
$sys_lock_status, $online_status,
$dev_loops::jsonb, $faulty_device_count::jsonb
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = EXCLUDED.ts_ms,
-- 仅在新数据不为空时更新 sys_lock_status
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
-- 仅在新数据不为空时更新 online_status
online_status = COALESCE(EXCLUDED.online_status, room_status.room_status_moment.online_status),
-- 合并 dev_loops
dev_loops = CASE
WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
END,
-- 如果存在则替换 faulty_device_count
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
WHERE
-- 可选优化:仅在时间戳更新时写入
-- 注意:对于 JSON 合并,很难在不计算的情况下检测是否相等。
-- 我们依赖 ts_ms 的变化来表示数据的“新鲜度”。
EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms
;
```
## 5. 架构变更
1. **`src/config/config.js`**: 添加 `roomStatusDb` 配置项。
2. **`src/db/roomStatusManager.js`**: 新增单例类,用于管理 `log_platform` 的数据库连接池。
3. **`src/db/statusBatchProcessor.js`**: 针对 `room_status_moment` 的专用批量处理器。
* **原因**: Upsert 逻辑复杂,且与 `rcu_action_events` 的追加写日志模式不同。
* 它需要在批处理内聚合每个设备的更新,以减少数据库负载(去重)。
4. **`src/processor/statusExtractor.js`**: 辅助工具,用于将 `KafkaPayload` 转换为 `StatusRow` 数据结构。
5. **`src/index.js`**: 挂载新的处理器逻辑。
## 6. 去重策略 (内存批量聚合)
由于 `room_status_moment` 是快照表,如果我们在 1 秒内收到同一设备的 10 次更新:
- 我们只需要写入 **最后一次** 的状态(或合并后的状态)。
- `StatusBatchProcessor` 应该维护一个映射: `Map<Key(hotel,room,device), LatestData>`
- 在 Flush 时,将 Map 的值转换为批量 Upsert 操作。
- **约束**: `dev_loops` 的更新如果是针对不同回路的,可能需要累积合并。
- **优化策略**:
- 如果 `dev_loops` 是部分更新,我们不能简单地取最后一条消息。
- 但是,在短时间的批处理窗口(例如 500ms我们可以在内存中将它们合并后再发送给数据库。
- 结构: `Map<Key, MergedState>`
- 逻辑: `MergedState.dev_loops = Object.assign({}, old.dev_loops, new.dev_loops)`
## 7. 执行计划
1. 添加配置 (Config) 和数据库管理器 (DB Manager)。
2. 实现 `StatusExtractor` (将 Kafka 载荷转换为快照数据)。
3. 实现 `StatusBatchProcessor` (包含内存合并逻辑)。
4. 更新 `processKafkaMessage`,使其同时返回 `LogRows` (现有) 和 `StatusUpdate` (新增)。
5. 在主循环中处理分发。

View File

@@ -0,0 +1,32 @@
# Feature: Loop Name Enrichment
**Status**: Implemented
**Date**: 2026-02-02
## Summary
Enrich incoming RCU action events with `loop_name` by looking up metadata from `temporary_project` tables. This allows easier identification of specific device loops (e.g., "Main Chandelier") in the event log.
## Requirements
1. **Cache Mechanism**:
- Load `rooms` and `loops` data from `temporary_project` schema into memory.
- Refresh cache daily at 1:00 AM.
2. **Enrichment**:
- For each incoming event, look up `loop_name` using `device_id` and `dev_addr`.
- `device_id` -> `room_type_id` (via `rooms` table).
- `room_type_id` + `dev_addr` -> `loop_name` (via `loops` table).
3. **Storage**:
- Store `loop_name` in `rcu_action_events` table.
## Ambiguity Resolution
- The requirement mentioned matching `dev_type` to find the loop. However, standard RCU addressing uses `dev_addr` (and `dev_loop`). We assume `loops.loop_address` corresponds to the packet's `dev_addr` (converted to string).
- We will attempt to match `dev_addr` against `loop_address`.
## Schema Changes
- **Table**: `rcu_action.rcu_action_events`
- **Column**: `loop_name` (VARCHAR(255), Nullable)
## Implementation Plan
1. **Database**: Update `init_db.sql` and `databaseManager.js`.
2. **Cache**: Create `src/cache/projectMetadata.js`.
3. **Processor**: Integrate cache lookup in `src/processor/index.js`.
4. **Lifecycle**: Initialize cache in `src/index.js`.

View File

@@ -0,0 +1,22 @@
# Feature: Feature Toggle Loop Name Generation
**Status**: Implemented
**Date**: 2026-02-03
## Summary
Add a configuration switch to control the fallback behavior for `loop_name` generation. Currently, when cache lookup fails, the system auto-generates a name using the format `[Type-Addr-Loop]`. This change allows users to enable or disable this fallback behavior via an environment variable.
## Requirements
1. **Configuration**:
- Add `ENABLE_LOOP_NAME_AUTO_GENERATION` to environment variables.
- Default behavior should match existing logic (enable generation) if not specified, but user requested explicit control.
- If `true`: Perform concatenation `[dev_type名称+'-'+dev_addr+'-'+dev_loop]`.
- If `false`: Do not generate name, leave `loop_name` as null (or whatever default is appropriate, likely null).
2. **Processor Logic**:
- In `getLoopNameWithFallback`, check the configuration flag before applying the fallback generation logic.
## Implementation Plan
1. **Config**: Update `src/config/config.js` to parse `ENABLE_LOOP_NAME_AUTO_GENERATION`.
2. **Env**: Update `.env` and `.env.example`.
3. **Processor**: Update `src/processor/index.js` to respect the flag.

View File

@@ -0,0 +1,27 @@
# Summary of Changes: Loop Name Features
**Date**: 2026-02-03
**Status**: Archived
## Overview
This archive contains the specifications and proposals for the Loop Name Enrichment and Auto-Generation features. These features enhance the RCU Action Server by enriching event data with descriptive loop names derived from project metadata or fallback generation logic.
## Included Changes
### 1. [Feature: Loop Name Enrichment](./feature-loop-name-enrichment.md)
- **Goal**: Enrich `rcu_action_events` with `loop_name` by looking up cached metadata from `temporary_project` tables.
- **Key Components**:
- `ProjectMetadataCache`: Loads rooms and loops data daily.
- `loop_name` column added to `rcu_action_events` table.
- Processor logic updated to perform lookup.
### 2. [Feature: Feature Toggle Loop Name Generation](./feature-toggle-loop-name-generation.md)
- **Goal**: Provide a configuration switch to control the fallback behavior when cache lookup fails.
- **Key Components**:
- `ENABLE_LOOP_NAME_AUTO_GENERATION` env var.
- Logic to generate `[Type-Addr-Loop]` format only if enabled.
- Default behavior is `true` (enabled).
## Implementation Status
- All proposed features have been implemented and verified via unit tests.
- Configuration variables are documented in `.env.example`.

View File

@@ -0,0 +1,49 @@
# Reliable Kafka Consumption & DB Offline Handling
- **Status**: Completed
- **Author**: AI Assistant
- **Created**: 2026-02-04
## Context
Currently, the Kafka consumer is configured with `autoCommit: true`. This means offsets are committed periodically regardless of whether the data was successfully processed and stored in the database. If the database insertion fails (e.g., due to a constraint violation or connection loss), the message is considered "consumed" by Kafka, leading to data loss.
Additionally, if the PostgreSQL database goes offline, the consumer continues to try processing messages, likely filling logs with errors and potentially losing data if retries aren't handled correctly. The user requires a mechanism to pause consumption during DB outages and resume only when the DB is back online.
## Proposal
We propose to enhance the reliability of the ingestion pipeline by:
1. **Disabling Auto-Commit**:
- Set `autoCommit: false` in the Kafka `ConsumerGroup` configuration.
- Implement manual offset committing only after the database insertion is confirmed successful.
2. **Implementing DB Offline Handling (Circuit Breaker)**:
- Detect database connection errors during insertion.
- If a connection error occurs:
1. Pause the Kafka consumer immediately.
2. Log a warning and enter a "Recovery Mode".
3. Wait for 1 minute.
4. Periodically check database connectivity (every 1 minute).
5. Once the database is reachable, resume the Kafka consumer.
## Technical Details
### Configuration
- No new environment variables are strictly required, but `KAFKA_AUTO_COMMIT` could be forced to `false` or removed if we enforce this behavior.
- Retry interval (60 seconds) can be a constant or a config.
### Implementation Steps
1. Modify `src/kafka/consumer.js`:
- Change `autoCommit` to `false`.
- Update the message processing flow to await the `onMessage` handler.
- Call `consumer.commit()` explicitly after successful processing.
- Add logic to handle errors from `onMessage`. If it's a DB connection error, trigger the pause/retry loop.
2. Update `src/db/databaseManager.js` (Optional but helpful):
- Ensure it exposes a method to check connectivity (e.g., `testConnection()`) for the recovery loop.
## Impact
- **Reliability**: drastically improved. Zero data loss guarantee for DB outages.
- **Performance**: Slight overhead due to manual commits (can be batched if needed, but per-message or small batch is safer for now).
- **Operations**: System will self-recover from DB maintenance or crashes.

View File

@@ -0,0 +1,39 @@
# Phase 2: Optimization and Fixes
- **Status**: Completed
- **Author**: AI Assistant
- **Created**: 2026-02-04
## Context
Following the initial stabilization, several issues were identified:
1. **Missing Command Support**: The system did not recognize command word `0x0E`, which shares the same structure as `0x36`.
2. **Bootstrap Instability**: On Windows, restarting the service frequently caused `EADDRINUSE` errors when connecting to PostgreSQL due to ephemeral port exhaustion.
3. **Performance Bottleneck**: The Kafka consumer could not keep up with the backlog using single-row inserts and low parallelism, and scaling horizontal instances was restricted.
## Implemented Changes
### 1. 0x0E Command Support
- **Goal**: Enable processing of `0x0E` command word.
- **Implementation**:
- Updated `resolveMessageType` in `src/processor/index.js` to map `0x0E` to the same handler as `0x36`.
- Added unit tests in `tests/processor.test.js` to verify `0x0E` parsing for status and fault reports.
### 2. Bootstrap Retry Logic
- **Goal**: Prevent service startup failure due to transient port conflicts.
- **Implementation**:
- Modified `src/db/initializer.js` to catch `EADDRINUSE` errors during the initial database connection.
- Added a retry mechanism: max 5 retries with 1-second backoff.
### 3. High Throughput Optimization (Batch Processing)
- **Goal**: Resolve Kafka backlog without adding more service instances.
- **Implementation**:
- **Batch Processor**: Created `src/db/batchProcessor.js` to buffer messages in memory.
- **Strategy**: Messages are flushed to DB when buffer size reaches 500 or every 1 second.
- **Config Update**: Increased default `KAFKA_MAX_IN_FLIGHT` from 50 to 500 in `src/config/config.js` to align with batch size.
- **Integration**: Refactored `src/index.js` and `src/processor/index.js` to decouple parsing from insertion, allowing `BatchProcessor` to handle the write operations.
## Impact
- **Throughput**: Significantly increased database write throughput via batching.
- **Reliability**: Service is resilient to port conflicts on restart.
- **Functionality**: `0x0E` messages are now correctly processed and stored.

View File

@@ -12,6 +12,9 @@ Backend service for processing RCU action events from Kafka, parsing them, and s
- **Error Handling**: Redis List (`error_queue`) for failed messages + Retry mechanism - **Error Handling**: Redis List (`error_queue`) for failed messages + Retry mechanism
- **Output**: PostgreSQL Table (`rcu_action_events`) - **Output**: PostgreSQL Table (`rcu_action_events`)
## Features
- **Loop Name Enrichment**: Enriches event data with `loop_name` by matching `device_id` and `dev_addr` against metadata cached from `temporary_project` tables (refreshed daily).
## Configuration (Environment Variables) ## Configuration (Environment Variables)
The project is configured via `.env`. Key variables: The project is configured via `.env`. Key variables:
- **Kafka**: `KAFKA_BROKERS`, `KAFKA_TOPIC`, `KAFKA_SASL_USERNAME`, `KAFKA_SASL_PASSWORD` - **Kafka**: `KAFKA_BROKERS`, `KAFKA_TOPIC`, `KAFKA_SASL_USERNAME`, `KAFKA_SASL_PASSWORD`