feat: 添加回路名称字段并实现元数据缓存查询

在 RCU 事件处理中新增回路名称(loop_name)字段,用于标识具体设备回路。
- 在 rcu_action_events 表中添加 loop_name 字段
- 新增项目元数据缓存模块,每日从 temporary_project 表刷新房间与回路信息
- 处理消息时,根据 device_id、dev_addr 等字段查询缓存获取回路名称
- 若缓存未命中,则根据设备类型规则生成兜底名称
- 更新环境变量、文档及测试用例以适配新功能
This commit is contained in:
2026-02-02 19:43:49 +08:00
parent 0e6c5c3cc3
commit 4e0f5213db
12 changed files with 660 additions and 117 deletions

View File

@@ -32,3 +32,5 @@ REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=15
REDIS_CONNECT_TIMEOUT_MS=5000
ACTION_TYPE_DEV_TYPE_RULES='[{"dev_type":0,"name":"无效设备(也可以被认为是场景)","action_type":"无效"},{"dev_type":1,"name":"强电继电器(输出状态)","action_type":"设备回路状态"},{"dev_type":2,"name":"弱电输入(输入状态)","action_type":"用户操作"},{"dev_type":3,"name":"弱电输出(输出状态)","action_type":"设备回路状态"},{"dev_type":4,"name":"服务信息","action_type":"设备回路状态"},{"dev_type":5,"name":"干节点窗帘","action_type":"设备回路状态"},{"dev_type":6,"name":"开关","action_type":"用户操作"},{"dev_type":7,"name":"空调","action_type":"用户操作"},{"dev_type":8,"name":"红外感应","action_type":"用户操作"},{"dev_type":9,"name":"空气质量检测设备","action_type":"设备回路状态"},{"dev_type":10,"name":"插卡取电","action_type":"用户操作"},{"dev_type":11,"name":"地暖","action_type":"用户操作"},{"dev_type":12,"name":"RCU 设备网络 - 没使用","action_type":""},{"dev_type":13,"name":"窗帘","action_type":"设备回路状态"},{"dev_type":14,"name":"继电器","action_type":"设备回路状态"},{"dev_type":15,"name":"红外发送","action_type":"设备回路状态"},{"dev_type":16,"name":"调光驱动","action_type":"设备回路状态"},{"dev_type":17,"name":"可控硅调光(可控硅状态)","action_type":"设备回路状态"},{"dev_type":18,"name":"灯带(灯带状态) --2025-11-24 取消","action_type":"无效"},{"dev_type":19,"name":"中控","action_type":"无效"},{"dev_type":20,"name":"微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)","action_type":"无效"},{"dev_type":21,"name":"背景音乐(背景音乐状态)","action_type":"设备回路状态"},{"dev_type":22,"name":"房态下发","action_type":"无效"},{"dev_type":23,"name":"主机本地 调光","action_type":"无效"},{"dev_type":24,"name":"485PWM 调光( PWM 调光状态)","action_type":"无效"},{"dev_type":25,"name":"总线调光( PBLED 调光状态) - 没使用 -","action_type":"无效"},{"dev_type":26,"name":"RCU 电源","action_type":"无效"},{"dev_type":27,"name":"A9IO 开关","action_type":"用户操作"},{"dev_type":28,"name":"A9IO 扩展","action_type":"设备回路状态"},{"dev_type":29,"name":"A9IO 电源","action_type":"设备回路状态"},{"dev_type":30,"name":"无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)","action_type":"无效"},{"dev_type":31,"name":"无线网关主动(用于主动控制主动设备)","action_type":"无效"},{"dev_type":32,"name":"无线门磁","action_type":"用户操作"},{"dev_type":33,"name":"空气参数显示设备","action_type":"设备回路状态"},{"dev_type":34,"name":"无线继电器红外","action_type":"设备回路状态"},{"dev_type":35,"name":"时间同步","action_type":"设备回路状态"},{"dev_type":36,"name":"监控控制","action_type":"无效"},{"dev_type":37,"name":"旋钮开关控制","action_type":"用户操作"},{"dev_type":38,"name":"CSIO - 类型","action_type":"设备回路状态"},{"dev_type":39,"name":"插卡状态虚拟设备","action_type":"设备回路状态"},{"dev_type":40,"name":"485 新风设备","action_type":"用户操作"},{"dev_type":41,"name":"485 人脸机","action_type":"用户操作"},{"dev_type":42,"name":"中控","action_type":"无效"},{"dev_type":43,"name":"域控","action_type":"无效"},{"dev_type":44,"name":"LCD","action_type":"设备回路状态"},{"dev_type":45,"name":"无卡断电 --2025-11-24 取消","action_type":"无效"},{"dev_type":46,"name":"无卡取电 2","action_type":"用户操作"},{"dev_type":47,"name":"虚拟时间设备","action_type":"设备回路状态"},{"dev_type":48,"name":"PLC 总控","action_type":"设备回路状态"},{"dev_type":49,"name":"PLC 设备 - 恒流调光设备","action_type":"设备回路状态"},{"dev_type":50,"name":"PLC 设备 - 恒压调光设备","action_type":"设备回路状态"},{"dev_type":51,"name":"PLC 设备 - 继电器设备","action_type":"设备回路状态"},{"dev_type":52,"name":"色温调节功能","action_type":"设备回路状态"},{"dev_type":53,"name":"蓝牙音频","action_type":"设备回路状态"},{"dev_type":54,"name":"碳达人","action_type":"用户操作"},{"dev_type":55,"name":"场景还原","action_type":"用户操作"},{"dev_type":56,"name":"全局设置","action_type":"设备回路状态"},{"dev_type":57,"name":"能耗检测","action_type":"设备回路状态"},{"dev_type":241,"name":"CSIO - 类型","action_type":"设备回路状态"}]'

View File

@@ -0,0 +1,113 @@
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const readmePath = path.resolve(__dirname, '../../docs/readme.md');
const envPath = path.resolve(__dirname, '../.env');
const processorPath = path.resolve(__dirname, '../src/processor/index.js');
try {
const readmeContent = fs.readFileSync(readmePath, 'utf8');
const lines = readmeContent.split('\n');
const rules = [];
let inTable = false;
for (const line of lines) {
const trimmed = line.trim();
// Detect start of table (approximately)
if (trimmed.includes('|dev_type|名称|描述|Action Type|')) {
inTable = true;
continue;
}
// Skip separator line
if (inTable && trimmed.includes('|---|')) {
continue;
}
// Process table rows
if (inTable && trimmed.startsWith('|') && trimmed.endsWith('|')) {
const parts = trimmed.split('|').map(p => p.trim());
// parts[0] is empty, parts[1] is dev_type, parts[2] is name, parts[3] is description, parts[4] is action_type
if (parts.length >= 5) {
const devTypeStr = parts[1];
const description = parts[3];
const actionType = parts[4];
if (!devTypeStr || isNaN(parseInt(devTypeStr))) {
continue;
}
const devType = parseInt(devTypeStr, 10);
rules.push({
dev_type: devType,
name: description, // Use description as name per user request
action_type: actionType
});
}
} else if (inTable && trimmed === '') {
// Empty line might mean end of table, but let's be loose
} else if (inTable && !trimmed.startsWith('|')) {
// End of table
inTable = false;
}
}
// Sort by dev_type
rules.sort((a, b) => a.dev_type - b.dev_type);
console.log(`Found ${rules.length} rules.`);
// 1. Generate JSON for .env
const envJson = JSON.stringify(rules);
// Read existing .env
let envContent = fs.readFileSync(envPath, 'utf8');
const envKey = 'ACTION_TYPE_DEV_TYPE_RULES';
// Replace or Append
const envLine = `${envKey}='${envJson}'`;
const regex = new RegExp(`^${envKey}=.*`, 'm');
if (regex.test(envContent)) {
envContent = envContent.replace(regex, envLine);
} else {
envContent += `\n${envLine}`;
}
fs.writeFileSync(envPath, envContent, 'utf8');
console.log('Updated .env');
// 2. Generate Object for src/processor/index.js
// We need to construct the object string manually to match the code style
const mapLines = rules.map(r => {
// Escape single quotes in name if present
const safeName = r.name.replace(/'/g, "\\'");
return ` ${r.dev_type}: { name: '${safeName}', action: '${r.action_type}' }`;
});
const mapString = `const defaultDevTypeActionMap = {\n${mapLines.join(',\n')}\n};`;
let processorContent = fs.readFileSync(processorPath, 'utf8');
// Regex to replace the object.
const processorRegex = /const defaultDevTypeActionMap = \{[\s\S]*?\};/m;
if (processorRegex.test(processorContent)) {
processorContent = processorContent.replace(processorRegex, mapString);
fs.writeFileSync(processorPath, processorContent, 'utf8');
console.log('Updated src/processor/index.js');
} else {
console.error('Could not find defaultDevTypeActionMap in src/processor/index.js');
}
} catch (err) {
console.error('Error:', err);
}

View File

@@ -27,12 +27,16 @@ CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
type_h SMALLINT,
details JSONB,
extra JSONB,
loop_name VARCHAR(255),
PRIMARY KEY (ts_ms, guid)
) PARTITION BY RANGE (ts_ms);
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);

View File

@@ -0,0 +1,95 @@
import cron from 'node-cron';
import dbManager from '../db/databaseManager.js';
import { logger } from '../utils/logger.js';
class ProjectMetadataCache {
constructor() {
this.roomMap = new Map(); // device_id -> room_type_id
this.loopMap = new Map(); // room_type_id:loop_address -> loop_name
}
async init() {
try {
await this.refresh();
} catch (error) {
logger.error('Initial metadata refresh failed', { error: error.message });
}
// Schedule 1:00 AM daily
cron.schedule('0 1 * * *', () => {
this.refresh().catch(err => logger.error('Scheduled metadata refresh failed', { error: err.message }));
});
}
async refresh() {
logger.info('Refreshing project metadata cache...');
const client = await dbManager.pool.connect();
try {
// Load Rooms
// temporary_project.rooms might be partitioned, but querying parent works.
const roomsRes = await client.query('SELECT device_id, room_type_id FROM temporary_project.rooms');
const newRoomMap = new Map();
for (const row of roomsRes.rows) {
if (row.device_id) {
newRoomMap.set(row.device_id, row.room_type_id);
}
}
// Load Loops
const loopsRes = await client.query('SELECT room_type_id, loop_address, loop_name FROM temporary_project.loops');
const newLoopMap = new Map();
for (const row of loopsRes.rows) {
if (row.room_type_id && row.loop_address) {
// loop_address is varchar, we will key it as string
const key = `${row.room_type_id}:${row.loop_address}`;
newLoopMap.set(key, row.loop_name);
}
}
this.roomMap = newRoomMap;
this.loopMap = newLoopMap;
logger.info('Project metadata cache refreshed', {
roomsCount: this.roomMap.size,
loopsCount: this.loopMap.size
});
} catch (error) {
// If schema/tables don't exist, this will throw.
// We log but don't crash the app, as this is an enhancement feature.
logger.error('Failed to refresh project metadata', { error: error.message });
throw error;
} finally {
client.release();
}
}
/**
* Get loop name for a given device configuration
* @param {string} deviceId - The device ID (from room)
* @param {number|string} devType - The device type
* @param {number|string} devAddr - The device address
* @param {number|string} devLoop - The device loop
* @returns {string|null} - The loop name or null if not found
*/
getLoopName(deviceId, devType, devAddr, devLoop) {
if (!deviceId ||
devType === undefined || devType === null ||
devAddr === undefined || devAddr === null ||
devLoop === undefined || devLoop === null) {
return null;
}
const roomTypeId = this.roomMap.get(deviceId);
if (!roomTypeId) return null;
// Construct loop_address: 3-digit zero-padded concatenation of type, addr, loop
// e.g. type=1, addr=23, loop=12 -> 001023012
const fmt = (val) => String(val).padStart(3, '0');
const loopAddress = `${fmt(devType)}${fmt(devAddr)}${fmt(devLoop)}`;
const key = `${roomTypeId}:${loopAddress}`;
return this.loopMap.get(key) || null;
}
}
export default new ProjectMetadataCache();

View File

@@ -28,7 +28,8 @@ const columns = [
'type_l',
'type_h',
'details',
'extra'
'extra',
'loop_name'
];
export class DatabaseManager {

View File

@@ -3,6 +3,7 @@ import { config } from './config/config.js';
import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js';
import projectMetadata from './cache/projectMetadata.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js';
@@ -15,6 +16,9 @@ const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
await dbInitializer.initialize();
// 0.1 Initialize Project Metadata Cache
await projectMetadata.init();
// Metric Collector
const metricCollector = new MetricCollector();

View File

@@ -1,5 +1,6 @@
import { createGuid } from '../utils/uuid.js';
import { kafkaPayloadSchema } from '../schema/kafkaPayload.js';
import projectMetadata from '../cache/projectMetadata.js';
const normalizeDirection = (value) => {
if (!value) return null;
@@ -43,116 +44,103 @@ const resolveMessageType = (direction, cmdWord) => {
};
const defaultDevTypeActionMap = {
0: '无效',
1: '设备回路状态',
2: '用户操作',
3: '设备回路状态',
4: '设备回路状态',
5: '设备回路状态',
6: '用户操作',
7: '用户操作',
8: '用户操作',
9: '设备回路状态',
10: '用户操作',
11: '用户操作',
12: '无效',
13: '设备回路状态',
14: '设备回路状态',
15: '设备回路状态',
16: '设备回路状态',
17: '设备回路状态',
18: '无效',
19: '无效',
20: '无效',
21: '设备回路状态',
22: '无效',
23: '无效',
24: '无效',
25: '无效',
26: '无效',
27: '用户操作',
28: '设备回路状态',
29: '设备回路状态',
30: '无效',
31: '无效',
32: '用户操作',
33: '设备回路状态',
34: '设备回路状态',
35: '设备回路状态',
36: '无效',
37: '用户操作',
38: '设备回路状态',
39: '设备回路状态',
40: '用户操作',
41: '用户操作',
42: '无效',
43: '无效',
44: '设备回路状态',
45: '无效',
46: '用户操作',
47: '设备回路状态',
48: '设备回路状态',
49: '设备回路状态',
50: '设备回路状态',
51: '设备回路状态',
52: '设备回路状态',
53: '设备回路状态',
54: '用户操作',
55: '用户操作',
56: '设备回路状态',
57: '设备回路状态',
241: '设备回路状态'
0: { name: '无效设备(也可以被认为是场景)', action: '无效' },
1: { name: '强电继电器(输出状态)', action: '设备回路状态' },
2: { name: '弱电输入(输入状态)', action: '用户操作' },
3: { name: '弱电输出(输出状态)', action: '设备回路状态' },
4: { name: '服务信息', action: '设备回路状态' },
5: { name: '干节点窗帘', action: '设备回路状态' },
6: { name: '开关', action: '用户操作' },
7: { name: '空调', action: '用户操作' },
8: { name: '红外感应', action: '用户操作' },
9: { name: '空气质量检测设备', action: '设备回路状态' },
10: { name: '插卡取电', action: '用户操作' },
11: { name: '地暖', action: '用户操作' },
12: { name: 'RCU 设备网络 - 没使用', action: '' },
13: { name: '窗帘', action: '设备回路状态' },
14: { name: '继电器', action: '设备回路状态' },
15: { name: '红外发送', action: '设备回路状态' },
16: { name: '调光驱动', action: '设备回路状态' },
17: { name: '可控硅调光(可控硅状态)', action: '设备回路状态' },
18: { name: '灯带(灯带状态) --2025-11-24 取消', action: '无效' },
19: { name: '中控', action: '无效' },
20: { name: '微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)', action: '无效' },
21: { name: '背景音乐(背景音乐状态)', action: '设备回路状态' },
22: { name: '房态下发', action: '无效' },
23: { name: '主机本地 调光', action: '无效' },
24: { name: '485PWM 调光( PWM 调光状态)', action: '无效' },
25: { name: '总线调光( PBLED 调光状态) - 没使用 -', action: '无效' },
26: { name: 'RCU 电源', action: '无效' },
27: { name: 'A9IO 开关', action: '用户操作' },
28: { name: 'A9IO 扩展', action: '设备回路状态' },
29: { name: 'A9IO 电源', action: '设备回路状态' },
30: { name: '无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)', action: '无效' },
31: { name: '无线网关主动(用于主动控制主动设备)', action: '无效' },
32: { name: '无线门磁', action: '用户操作' },
33: { name: '空气参数显示设备', action: '设备回路状态' },
34: { name: '无线继电器红外', action: '设备回路状态' },
35: { name: '时间同步', action: '设备回路状态' },
36: { name: '监控控制', action: '无效' },
37: { name: '旋钮开关控制', action: '用户操作' },
38: { name: 'CSIO - 类型', action: '设备回路状态' },
39: { name: '插卡状态虚拟设备', action: '设备回路状态' },
40: { name: '485 新风设备', action: '用户操作' },
41: { name: '485 人脸机', action: '用户操作' },
42: { name: '中控', action: '无效' },
43: { name: '域控', action: '无效' },
44: { name: 'LCD', action: '设备回路状态' },
45: { name: '无卡断电 --2025-11-24 取消', action: '无效' },
46: { name: '无卡取电 2', action: '用户操作' },
47: { name: '虚拟时间设备', action: '设备回路状态' },
48: { name: 'PLC 总控', action: '设备回路状态' },
49: { name: 'PLC 设备 - 恒流调光设备', action: '设备回路状态' },
50: { name: 'PLC 设备 - 恒压调光设备', action: '设备回路状态' },
51: { name: 'PLC 设备 - 继电器设备', action: '设备回路状态' },
52: { name: '色温调节功能', action: '设备回路状态' },
53: { name: '蓝牙音频', action: '设备回路状态' },
54: { name: '碳达人', action: '用户操作' },
55: { name: '场景还原', action: '用户操作' },
56: { name: '全局设置', action: '设备回路状态' },
57: { name: '能耗检测', action: '设备回路状态' },
241: { name: 'CSIO - 类型', action: '设备回路状态' }
};
const buildDevTypeActionMap = () => {
const raw = process.env.ACTION_TYPE_DEV_TYPE_RULES;
if (!raw || typeof raw !== 'string' || !raw.trim()) {
return defaultDevTypeActionMap;
}
// Parse env rules if present
let devTypeActionRules = [];
try {
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) {
return defaultDevTypeActionMap;
if (process.env.ACTION_TYPE_DEV_TYPE_RULES) {
const parsed = JSON.parse(process.env.ACTION_TYPE_DEV_TYPE_RULES);
if (Array.isArray(parsed)) {
devTypeActionRules = parsed;
}
}
} catch (error) {
// Silent fallback
}
const overrides = {};
parsed.forEach((item) => {
if (Array.isArray(item) && item.length >= 2) {
const devType = Number(item[0]);
const actionType = item[1];
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
overrides[devType] = actionType;
}
return;
}
const getActionTypeByDevType = (devType) => {
// 1. Env override
const rule = devTypeActionRules.find(r => r.dev_type === devType);
if (rule?.action_type) return rule.action_type;
if (item && typeof item === 'object') {
const devType = Number(item.dev_type ?? item.devType);
const actionType = item.action_type ?? item.actionType ?? item.action;
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
overrides[devType] = actionType;
}
}
});
return { ...defaultDevTypeActionMap, ...overrides };
} catch {
return defaultDevTypeActionMap;
}
// 2. Default map
const entry = defaultDevTypeActionMap[devType];
return entry?.action || '设备回路状态';
};
const devTypeActionMap = buildDevTypeActionMap();
const getDevTypeName = (devType) => {
// 1. Env override
const rule = devTypeActionRules.find(r => r.dev_type === devType);
if (rule?.name) return rule.name;
// 2. Default map
const entry = defaultDevTypeActionMap[devType];
return entry?.name || 'Unknown';
};
const resolveDevTypeAction = (devType) => {
if (typeof devType !== 'number') {
return '设备回路状态';
}
const mapped = devTypeActionMap[devType];
if (mapped) {
return mapped;
}
return '设备回路状态';
if (devType === null || devType === undefined) return '设备回路状态';
return getActionTypeByDevType(devType);
};
const parseKafkaPayload = (value) => {
@@ -247,6 +235,19 @@ export const buildRowsFromPayload = (rawPayload) => {
extra: extra || {}
};
// Helper to generate loop name if not found in cache
const getLoopNameWithFallback = (deviceId, devType, devAddr, devLoop) => {
// 1. Try cache
const cachedName = projectMetadata.getLoopName(deviceId, devType, devAddr, devLoop);
if (cachedName) return cachedName;
// 2. Fallback: [TypeName-Addr-Loop]
const typeName = getDevTypeName(devType);
if (!typeName) return null; // Should have a name if devType is valid
return `[${devType}${typeName}-${devAddr}-${devLoop}]`;
};
const rows = [];
// Logic 1: 0x36 Status/Fault Report
@@ -268,6 +269,7 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_loop: device.dev_loop ?? null,
dev_data: device.dev_data ?? null,
action_type: actionType,
loop_name: getLoopNameWithFallback(deviceId, device.dev_type, device.dev_addr, device.dev_loop),
details
});
});
@@ -287,6 +289,7 @@ export const buildRowsFromPayload = (rawPayload) => {
error_type: fault.error_type ?? null,
error_data: fault.error_data ?? null,
action_type: actionType,
loop_name: getLoopNameWithFallback(deviceId, fault.dev_type, fault.dev_addr, fault.dev_loop),
details
});
});
@@ -322,6 +325,7 @@ export const buildRowsFromPayload = (rawPayload) => {
type_l: control.type_l ?? null,
type_h: control.type_h ?? null,
action_type: '下发控制',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details
});
});
@@ -340,18 +344,29 @@ export const buildRowsFromPayload = (rawPayload) => {
return rows;
}
// Logic 3: 0x0F ACK or others
const fallbackActionType =
normalizedCmdWord === '0x0f' && normalizedDirection === '上报'
? 'ACK'
: '无效';
return [{
// 3. 0x0F ACK
else if (messageType === '0FACK') {
const { control_list: controls = [] } = payload;
if (Array.isArray(controls)) {
const details = { control_list: controls };
controls.forEach((control) => {
rows.push({
...commonFields,
guid: createGuid(),
action_type: fallbackActionType,
details: {}
}];
dev_type: control.dev_type ?? null,
dev_addr: control.dev_addr ?? null,
dev_loop: control.dev_loop ?? null,
dev_data: control.dev_data ?? null,
type_h: control.type_h ?? null,
action_type: '设备回路状态',
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
details
});
});
}
}
return rows;
};
export const processKafkaMessage = async ({ message, dbManager, config }) => {

View File

@@ -1,5 +1,6 @@
import { describe, it, expect } from 'vitest';
import { buildRowsFromPayload } from '../src/processor/index.js';
import projectMetadata from '../src/cache/projectMetadata.js';
describe('Processor Logic', () => {
const basePayload = {
@@ -95,12 +96,15 @@ describe('Processor Logic', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0F'
cmd_word: '0x0F',
control_list: [
{ dev_type: 1, dev_addr: 1, dev_loop: 1, dev_data: 1, type_h: 0 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('ACK');
expect(rows[0].action_type).toBe('设备回路状态');
});
it('should fallback when lists are empty for 0x36', () => {
@@ -199,4 +203,27 @@ describe('Processor Logic', () => {
trace_id: 'trace-123'
});
});
it('should enrich rows with loop_name from metadata', () => {
// Mock metadata
projectMetadata.roomMap.set('dev_001', 101);
// Key format: roomTypeId:00Type00Addr00Loop
// type=1, addr=10, loop=1 -> 001010001
projectMetadata.loopMap.set('101:001010001', 'Main Chandelier');
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }, // Should match 001010001
{ dev_type: 1, dev_addr: 10, dev_loop: 2, dev_data: 0 } // Should not match (001010002) -> Fallback
]
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].loop_name).toBe('Main Chandelier');
// dev_type 1 is 'Dev_Host_HVout'
expect(rows[1].loop_name).toBe('[1Dev_Host_HVout-10-2]');
});
});

View File

@@ -76,3 +76,19 @@ ACK (待补充)
5. 队列结构
队列分区数6
Topicblwlog4Nodejs-rcu-action-topic
6. 入库前特殊操作
- 定期从temporary_project表中读取数据并保存到`内存`以全局变量的形式每天凌晨1点从数据库更新一次。
- 每条数据写库之前需要根据项目ID从内存中读取项目信息。
- 我需要在现有数据表`rcu_action_events`里,添加一个字段:`loop_name`,用于存储回路名称。
- 查询`loop_name`的方法是:
- 根据要插入的数据中的`device_id``rooms`表中找出对应的房间 -> 得到 `room_type_id`
- 根据 `room_type_id``loop_address``loops` 表中查找对应的 `loop_name`
- `loop_address` 的生成规则:将数据的 `dev_type``dev_addr``dev_loop` 分别转换为 3 位字符串(不足前方补 0然后拼接。
- 例如:`dev_type=1, dev_addr=23, dev_loop=12` -> `001` + `023` + `012` -> `001023012`
- **兜底逻辑**:如果根据上述规则在 `loops` 缓存中未找到对应的 `loop_name`,则使用 `dev_type` 对应的设备名称(配置在 `ACTION_TYPE_DEV_TYPE_RULES` 中)默认名称。
- 格式:`[dev_type名称+'-'+dev_addr+'-'+dev_loop]`
- 例如:`dev_type=35` (名称: TimeCtrl), `addr=14`, `loop=21` -> `[35TimeCtrl-14-21]`
- 最后将找到的或生成的 `loop_name` 写入 `rcu_action_events` 表。
- 注意,所有查库操作都要通过内存缓存来实现。

231
docs/temporary_project.sql Normal file
View File

@@ -0,0 +1,231 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 109
Source Server Type : PostgreSQL
Source Server Version : 150004 (150004)
Source Host : 10.8.8.109:5433
Source Catalog : log_platform
Source Schema : temporary_project
Target Server Type : PostgreSQL
Target Server Version : 150004 (150004)
File Encoding : 65001
Date: 02/02/2026 14:28:41
*/
-- ----------------------------
-- Table structure for hotels
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."hotels";
CREATE TABLE "temporary_project"."hotels" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"hotel_name" varchar(255) COLLATE "pg_catalog"."default",
"id" int4
)
;
-- ----------------------------
-- Table structure for loops_default
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."loops_default";
CREATE TABLE "temporary_project"."loops_default" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
)
;
-- ----------------------------
-- Table structure for room_type
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."room_type";
CREATE TABLE "temporary_project"."room_type" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"room_type_name" varchar(255) COLLATE "pg_catalog"."default",
"hotel_id" int4
)
;
-- ----------------------------
-- Table structure for rooms_default
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."rooms_default";
CREATE TABLE "temporary_project"."rooms_default" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"room_type_id" int4,
"device_id" varchar(50) COLLATE "pg_catalog"."default",
"mac" varchar(50) COLLATE "pg_catalog"."default",
"id" int4
)
;
-- ----------------------------
-- Table structure for loops
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."loops";
CREATE TABLE "temporary_project"."loops" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"id" int4 NOT NULL,
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
"room_type_id" int4 NOT NULL,
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
)
PARTITION BY (
)
;
-- ----------------------------
-- Table structure for rooms
-- ----------------------------
DROP TABLE IF EXISTS "temporary_project"."rooms";
CREATE TABLE "temporary_project"."rooms" (
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"hotel_id" int4 NOT NULL,
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"room_type_id" int4,
"device_id" varchar(50) COLLATE "pg_catalog"."default",
"mac" varchar(50) COLLATE "pg_catalog"."default",
"id" int4
)
PARTITION BY LIST (
"hotel_id" "pg_catalog"."int4_ops"
)
;
ALTER TABLE "temporary_project"."rooms" ATTACH PARTITION "temporary_project"."rooms_default" DEFAULT;
-- ----------------------------
-- Indexes structure for table hotels
-- ----------------------------
CREATE INDEX "idx_hotels_hotel_id" ON "temporary_project"."hotels" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_hotels_hotel_name" ON "temporary_project"."hotels" USING btree (
"hotel_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_hotels_id" ON "temporary_project"."hotels" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table hotels
-- ----------------------------
ALTER TABLE "temporary_project"."hotels" ADD CONSTRAINT "hotels_pkey" PRIMARY KEY ("hotel_id", "guid");
-- ----------------------------
-- Indexes structure for table loops_default
-- ----------------------------
CREATE INDEX "loops_default_id_idx" ON "temporary_project"."loops_default" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_address_idx" ON "temporary_project"."loops_default" USING btree (
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_name_idx" ON "temporary_project"."loops_default" USING btree (
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_loop_type_idx" ON "temporary_project"."loops_default" USING btree (
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "loops_default_room_type_id_idx" ON "temporary_project"."loops_default" USING btree (
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table loops_default
-- ----------------------------
ALTER TABLE "temporary_project"."loops_default" ADD CONSTRAINT "loops_default_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
-- ----------------------------
-- Indexes structure for table room_type
-- ----------------------------
CREATE INDEX "idx_room_type_hotel_id" ON "temporary_project"."room_type" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_room_type_id" ON "temporary_project"."room_type" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_room_type_name" ON "temporary_project"."room_type" USING btree (
"room_type_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table room_type
-- ----------------------------
ALTER TABLE "temporary_project"."room_type" ADD CONSTRAINT "room_type_pkey" PRIMARY KEY ("guid", "id");
-- ----------------------------
-- Indexes structure for table rooms_default
-- ----------------------------
CREATE INDEX "rooms_default_device_id_idx" ON "temporary_project"."rooms_default" USING btree (
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_hotel_id_idx" ON "temporary_project"."rooms_default" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_id_idx" ON "temporary_project"."rooms_default" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "rooms_default_mac_idx" ON "temporary_project"."rooms_default" USING btree (
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table rooms_default
-- ----------------------------
ALTER TABLE "temporary_project"."rooms_default" ADD CONSTRAINT "rooms_default_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");
-- ----------------------------
-- Indexes structure for table loops
-- ----------------------------
CREATE INDEX "idx_loops_address" ON "temporary_project"."loops" USING btree (
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_id" ON "temporary_project"."loops" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_name" ON "temporary_project"."loops" USING btree (
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_room_type_id" ON "temporary_project"."loops" USING btree (
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_loops_type" ON "temporary_project"."loops" USING btree (
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table loops
-- ----------------------------
ALTER TABLE "temporary_project"."loops" ADD CONSTRAINT "loops_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
-- ----------------------------
-- Indexes structure for table rooms
-- ----------------------------
CREATE INDEX "idx_rooms_device_id" ON "temporary_project"."rooms" USING btree (
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_hotel_id" ON "temporary_project"."rooms" USING btree (
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_id" ON "temporary_project"."rooms" USING btree (
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
);
CREATE INDEX "idx_rooms_mac" ON "temporary_project"."rooms" USING btree (
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
-- ----------------------------
-- Primary Key structure for table rooms
-- ----------------------------
ALTER TABLE "temporary_project"."rooms" ADD CONSTRAINT "rooms_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");

View File

@@ -0,0 +1,32 @@
# Feature: Loop Name Enrichment
**Status**: Proposed
**Date**: 2026-02-02
## Summary
Enrich incoming RCU action events with `loop_name` by looking up metadata from `temporary_project` tables. This allows easier identification of specific device loops (e.g., "Main Chandelier") in the event log.
## Requirements
1. **Cache Mechanism**:
- Load `rooms` and `loops` data from `temporary_project` schema into memory.
- Refresh cache daily at 1:00 AM.
2. **Enrichment**:
- For each incoming event, look up `loop_name` using `device_id` and `dev_addr`.
- `device_id` -> `room_type_id` (via `rooms` table).
- `room_type_id` + `dev_addr` -> `loop_name` (via `loops` table).
3. **Storage**:
- Store `loop_name` in `rcu_action_events` table.
## Ambiguity Resolution
- The requirement mentioned matching `dev_type` to find the loop. However, standard RCU addressing uses `dev_addr` (and `dev_loop`). We assume `loops.loop_address` corresponds to the packet's `dev_addr` (converted to string).
- We will attempt to match `dev_addr` against `loop_address`.
## Schema Changes
- **Table**: `rcu_action.rcu_action_events`
- **Column**: `loop_name` (VARCHAR(255), Nullable)
## Implementation Plan
1. **Database**: Update `init_db.sql` and `databaseManager.js`.
2. **Cache**: Create `src/cache/projectMetadata.js`.
3. **Processor**: Integrate cache lookup in `src/processor/index.js`.
4. **Lifecycle**: Initialize cache in `src/index.js`.

View File

@@ -12,6 +12,9 @@ Backend service for processing RCU action events from Kafka, parsing them, and s
- **Error Handling**: Redis List (`error_queue`) for failed messages + Retry mechanism
- **Output**: PostgreSQL Table (`rcu_action_events`)
## Features
- **Loop Name Enrichment**: Enriches event data with `loop_name` by matching `device_id` and `dev_addr` against metadata cached from `temporary_project` tables (refreshed daily).
## Configuration (Environment Variables)
The project is configured via `.env`. Key variables:
- **Kafka**: `KAFKA_BROKERS`, `KAFKA_TOPIC`, `KAFKA_SASL_USERNAME`, `KAFKA_SASL_PASSWORD`