From 4e0f5213dbe15c7fbb5b14e39d208c2988c1af54 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Mon, 2 Feb 2026 19:43:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=9B=9E=E8=B7=AF?= =?UTF-8?q?=E5=90=8D=E7=A7=B0=E5=AD=97=E6=AE=B5=E5=B9=B6=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E7=BC=93=E5=AD=98=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 RCU 事件处理中新增回路名称(loop_name)字段,用于标识具体设备回路。 - 在 rcu_action_events 表中添加 loop_name 字段 - 新增项目元数据缓存模块,每日从 temporary_project 表刷新房间与回路信息 - 处理消息时,根据 device_id、dev_addr 等字段查询缓存获取回路名称 - 若缓存未命中,则根据设备类型规则生成兜底名称 - 更新环境变量、文档及测试用例以适配新功能 --- bls-rcu-action-backend/.env | 2 + .../scripts/generate_rules_from_readme.js | 113 ++++++++ bls-rcu-action-backend/scripts/init_db.sql | 4 + .../src/cache/projectMetadata.js | 95 +++++++ .../src/db/databaseManager.js | 3 +- bls-rcu-action-backend/src/index.js | 4 + bls-rcu-action-backend/src/processor/index.js | 241 ++++++++++-------- .../tests/processor.test.js | 31 ++- docs/project.md | 18 +- docs/temporary_project.sql | 231 +++++++++++++++++ .../changes/feature-loop-name-enrichment.md | 32 +++ openspec/project.md | 3 + 12 files changed, 660 insertions(+), 117 deletions(-) create mode 100644 bls-rcu-action-backend/scripts/generate_rules_from_readme.js create mode 100644 bls-rcu-action-backend/src/cache/projectMetadata.js create mode 100644 docs/temporary_project.sql create mode 100644 openspec/changes/feature-loop-name-enrichment.md diff --git a/bls-rcu-action-backend/.env b/bls-rcu-action-backend/.env index a031c85..c676618 100644 --- a/bls-rcu-action-backend/.env +++ b/bls-rcu-action-backend/.env @@ -32,3 +32,5 @@ REDIS_PORT=6379 REDIS_PASSWORD= REDIS_DB=15 REDIS_CONNECT_TIMEOUT_MS=5000 + +ACTION_TYPE_DEV_TYPE_RULES='[{"dev_type":0,"name":"无效设备(也可以被认为是场景)","action_type":"无效"},{"dev_type":1,"name":"强电继电器(输出状态)","action_type":"设备回路状态"},{"dev_type":2,"name":"弱电输入(输入状态)","action_type":"用户操作"},{"dev_type":3,"name":"弱电输出(输出状态)","action_type":"设备回路状态"},{"dev_type":4,"name":"服务信息","action_type":"设备回路状态"},{"dev_type":5,"name":"干节点窗帘","action_type":"设备回路状态"},{"dev_type":6,"name":"开关","action_type":"用户操作"},{"dev_type":7,"name":"空调","action_type":"用户操作"},{"dev_type":8,"name":"红外感应","action_type":"用户操作"},{"dev_type":9,"name":"空气质量检测设备","action_type":"设备回路状态"},{"dev_type":10,"name":"插卡取电","action_type":"用户操作"},{"dev_type":11,"name":"地暖","action_type":"用户操作"},{"dev_type":12,"name":"RCU 设备网络 - 没使用","action_type":""},{"dev_type":13,"name":"窗帘","action_type":"设备回路状态"},{"dev_type":14,"name":"继电器","action_type":"设备回路状态"},{"dev_type":15,"name":"红外发送","action_type":"设备回路状态"},{"dev_type":16,"name":"调光驱动","action_type":"设备回路状态"},{"dev_type":17,"name":"可控硅调光(可控硅状态)","action_type":"设备回路状态"},{"dev_type":18,"name":"灯带(灯带状态) --2025-11-24 取消","action_type":"无效"},{"dev_type":19,"name":"中控","action_type":"无效"},{"dev_type":20,"name":"微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)","action_type":"无效"},{"dev_type":21,"name":"背景音乐(背景音乐状态)","action_type":"设备回路状态"},{"dev_type":22,"name":"房态下发","action_type":"无效"},{"dev_type":23,"name":"主机本地 调光","action_type":"无效"},{"dev_type":24,"name":"485PWM 调光( PWM 调光状态)","action_type":"无效"},{"dev_type":25,"name":"总线调光( PBLED 调光状态) - 没使用 -","action_type":"无效"},{"dev_type":26,"name":"RCU 电源","action_type":"无效"},{"dev_type":27,"name":"A9IO 开关","action_type":"用户操作"},{"dev_type":28,"name":"A9IO 扩展","action_type":"设备回路状态"},{"dev_type":29,"name":"A9IO 电源","action_type":"设备回路状态"},{"dev_type":30,"name":"无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)","action_type":"无效"},{"dev_type":31,"name":"无线网关主动(用于主动控制主动设备)","action_type":"无效"},{"dev_type":32,"name":"无线门磁","action_type":"用户操作"},{"dev_type":33,"name":"空气参数显示设备","action_type":"设备回路状态"},{"dev_type":34,"name":"无线继电器红外","action_type":"设备回路状态"},{"dev_type":35,"name":"时间同步","action_type":"设备回路状态"},{"dev_type":36,"name":"监控控制","action_type":"无效"},{"dev_type":37,"name":"旋钮开关控制","action_type":"用户操作"},{"dev_type":38,"name":"CSIO - 类型","action_type":"设备回路状态"},{"dev_type":39,"name":"插卡状态虚拟设备","action_type":"设备回路状态"},{"dev_type":40,"name":"485 新风设备","action_type":"用户操作"},{"dev_type":41,"name":"485 人脸机","action_type":"用户操作"},{"dev_type":42,"name":"中控","action_type":"无效"},{"dev_type":43,"name":"域控","action_type":"无效"},{"dev_type":44,"name":"LCD","action_type":"设备回路状态"},{"dev_type":45,"name":"无卡断电 --2025-11-24 取消","action_type":"无效"},{"dev_type":46,"name":"无卡取电 2","action_type":"用户操作"},{"dev_type":47,"name":"虚拟时间设备","action_type":"设备回路状态"},{"dev_type":48,"name":"PLC 总控","action_type":"设备回路状态"},{"dev_type":49,"name":"PLC 设备 - 恒流调光设备","action_type":"设备回路状态"},{"dev_type":50,"name":"PLC 设备 - 恒压调光设备","action_type":"设备回路状态"},{"dev_type":51,"name":"PLC 设备 - 继电器设备","action_type":"设备回路状态"},{"dev_type":52,"name":"色温调节功能","action_type":"设备回路状态"},{"dev_type":53,"name":"蓝牙音频","action_type":"设备回路状态"},{"dev_type":54,"name":"碳达人","action_type":"用户操作"},{"dev_type":55,"name":"场景还原","action_type":"用户操作"},{"dev_type":56,"name":"全局设置","action_type":"设备回路状态"},{"dev_type":57,"name":"能耗检测","action_type":"设备回路状态"},{"dev_type":241,"name":"CSIO - 类型","action_type":"设备回路状态"}]' diff --git a/bls-rcu-action-backend/scripts/generate_rules_from_readme.js b/bls-rcu-action-backend/scripts/generate_rules_from_readme.js new file mode 100644 index 0000000..fc54639 --- /dev/null +++ b/bls-rcu-action-backend/scripts/generate_rules_from_readme.js @@ -0,0 +1,113 @@ +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +const readmePath = path.resolve(__dirname, '../../docs/readme.md'); +const envPath = path.resolve(__dirname, '../.env'); +const processorPath = path.resolve(__dirname, '../src/processor/index.js'); + +try { + const readmeContent = fs.readFileSync(readmePath, 'utf8'); + const lines = readmeContent.split('\n'); + + const rules = []; + let inTable = false; + + for (const line of lines) { + const trimmed = line.trim(); + + // Detect start of table (approximately) + if (trimmed.includes('|dev_type|名称|描述|Action Type|')) { + inTable = true; + continue; + } + + // Skip separator line + if (inTable && trimmed.includes('|---|')) { + continue; + } + + // Process table rows + if (inTable && trimmed.startsWith('|') && trimmed.endsWith('|')) { + const parts = trimmed.split('|').map(p => p.trim()); + // parts[0] is empty, parts[1] is dev_type, parts[2] is name, parts[3] is description, parts[4] is action_type + + if (parts.length >= 5) { + const devTypeStr = parts[1]; + const description = parts[3]; + const actionType = parts[4]; + + if (!devTypeStr || isNaN(parseInt(devTypeStr))) { + continue; + } + + const devType = parseInt(devTypeStr, 10); + + rules.push({ + dev_type: devType, + name: description, // Use description as name per user request + action_type: actionType + }); + } + } else if (inTable && trimmed === '') { + // Empty line might mean end of table, but let's be loose + } else if (inTable && !trimmed.startsWith('|')) { + // End of table + inTable = false; + } + } + + // Sort by dev_type + rules.sort((a, b) => a.dev_type - b.dev_type); + + console.log(`Found ${rules.length} rules.`); + + // 1. Generate JSON for .env + const envJson = JSON.stringify(rules); + + // Read existing .env + let envContent = fs.readFileSync(envPath, 'utf8'); + const envKey = 'ACTION_TYPE_DEV_TYPE_RULES'; + + // Replace or Append + const envLine = `${envKey}='${envJson}'`; + const regex = new RegExp(`^${envKey}=.*`, 'm'); + + if (regex.test(envContent)) { + envContent = envContent.replace(regex, envLine); + } else { + envContent += `\n${envLine}`; + } + + fs.writeFileSync(envPath, envContent, 'utf8'); + console.log('Updated .env'); + + // 2. Generate Object for src/processor/index.js + // We need to construct the object string manually to match the code style + const mapLines = rules.map(r => { + // Escape single quotes in name if present + const safeName = r.name.replace(/'/g, "\\'"); + return ` ${r.dev_type}: { name: '${safeName}', action: '${r.action_type}' }`; + }); + + const mapString = `const defaultDevTypeActionMap = {\n${mapLines.join(',\n')}\n};`; + + let processorContent = fs.readFileSync(processorPath, 'utf8'); + + // Regex to replace the object. + const processorRegex = /const defaultDevTypeActionMap = \{[\s\S]*?\};/m; + + if (processorRegex.test(processorContent)) { + processorContent = processorContent.replace(processorRegex, mapString); + fs.writeFileSync(processorPath, processorContent, 'utf8'); + console.log('Updated src/processor/index.js'); + } else { + console.error('Could not find defaultDevTypeActionMap in src/processor/index.js'); + } + +} catch (err) { + console.error('Error:', err); +} diff --git a/bls-rcu-action-backend/scripts/init_db.sql b/bls-rcu-action-backend/scripts/init_db.sql index 83a5af6..a80cc10 100644 --- a/bls-rcu-action-backend/scripts/init_db.sql +++ b/bls-rcu-action-backend/scripts/init_db.sql @@ -27,12 +27,16 @@ CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events ( type_h SMALLINT, details JSONB, extra JSONB, + loop_name VARCHAR(255), PRIMARY KEY (ts_ms, guid) ) PARTITION BY RANGE (ts_ms); ALTER TABLE rcu_action.rcu_action_events ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT ''; +ALTER TABLE rcu_action.rcu_action_events + ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255); + -- Indexes for performance CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id); diff --git a/bls-rcu-action-backend/src/cache/projectMetadata.js b/bls-rcu-action-backend/src/cache/projectMetadata.js new file mode 100644 index 0000000..3b510b7 --- /dev/null +++ b/bls-rcu-action-backend/src/cache/projectMetadata.js @@ -0,0 +1,95 @@ +import cron from 'node-cron'; +import dbManager from '../db/databaseManager.js'; +import { logger } from '../utils/logger.js'; + +class ProjectMetadataCache { + constructor() { + this.roomMap = new Map(); // device_id -> room_type_id + this.loopMap = new Map(); // room_type_id:loop_address -> loop_name + } + + async init() { + try { + await this.refresh(); + } catch (error) { + logger.error('Initial metadata refresh failed', { error: error.message }); + } + + // Schedule 1:00 AM daily + cron.schedule('0 1 * * *', () => { + this.refresh().catch(err => logger.error('Scheduled metadata refresh failed', { error: err.message })); + }); + } + + async refresh() { + logger.info('Refreshing project metadata cache...'); + const client = await dbManager.pool.connect(); + try { + // Load Rooms + // temporary_project.rooms might be partitioned, but querying parent works. + const roomsRes = await client.query('SELECT device_id, room_type_id FROM temporary_project.rooms'); + const newRoomMap = new Map(); + for (const row of roomsRes.rows) { + if (row.device_id) { + newRoomMap.set(row.device_id, row.room_type_id); + } + } + + // Load Loops + const loopsRes = await client.query('SELECT room_type_id, loop_address, loop_name FROM temporary_project.loops'); + const newLoopMap = new Map(); + for (const row of loopsRes.rows) { + if (row.room_type_id && row.loop_address) { + // loop_address is varchar, we will key it as string + const key = `${row.room_type_id}:${row.loop_address}`; + newLoopMap.set(key, row.loop_name); + } + } + + this.roomMap = newRoomMap; + this.loopMap = newLoopMap; + logger.info('Project metadata cache refreshed', { + roomsCount: this.roomMap.size, + loopsCount: this.loopMap.size + }); + + } catch (error) { + // If schema/tables don't exist, this will throw. + // We log but don't crash the app, as this is an enhancement feature. + logger.error('Failed to refresh project metadata', { error: error.message }); + throw error; + } finally { + client.release(); + } + } + + /** + * Get loop name for a given device configuration + * @param {string} deviceId - The device ID (from room) + * @param {number|string} devType - The device type + * @param {number|string} devAddr - The device address + * @param {number|string} devLoop - The device loop + * @returns {string|null} - The loop name or null if not found + */ + getLoopName(deviceId, devType, devAddr, devLoop) { + if (!deviceId || + devType === undefined || devType === null || + devAddr === undefined || devAddr === null || + devLoop === undefined || devLoop === null) { + return null; + } + + const roomTypeId = this.roomMap.get(deviceId); + if (!roomTypeId) return null; + + // Construct loop_address: 3-digit zero-padded concatenation of type, addr, loop + // e.g. type=1, addr=23, loop=12 -> 001023012 + const fmt = (val) => String(val).padStart(3, '0'); + const loopAddress = `${fmt(devType)}${fmt(devAddr)}${fmt(devLoop)}`; + + const key = `${roomTypeId}:${loopAddress}`; + return this.loopMap.get(key) || null; + } +} + +export default new ProjectMetadataCache(); diff --git a/bls-rcu-action-backend/src/db/databaseManager.js b/bls-rcu-action-backend/src/db/databaseManager.js index 15ce685..75d0bbf 100644 --- a/bls-rcu-action-backend/src/db/databaseManager.js +++ b/bls-rcu-action-backend/src/db/databaseManager.js @@ -28,7 +28,8 @@ const columns = [ 'type_l', 'type_h', 'details', - 'extra' + 'extra', + 'loop_name' ]; export class DatabaseManager { diff --git a/bls-rcu-action-backend/src/index.js b/bls-rcu-action-backend/src/index.js index 5fff6b9..d739687 100644 --- a/bls-rcu-action-backend/src/index.js +++ b/bls-rcu-action-backend/src/index.js @@ -3,6 +3,7 @@ import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; import dbInitializer from './db/initializer.js'; import partitionManager from './db/partitionManager.js'; +import projectMetadata from './cache/projectMetadata.js'; import { createKafkaConsumers } from './kafka/consumer.js'; import { processKafkaMessage } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; @@ -14,6 +15,9 @@ import { logger } from './utils/logger.js'; const bootstrap = async () => { // 0. Initialize Database (Create DB, Schema, Table, Partitions) await dbInitializer.initialize(); + + // 0.1 Initialize Project Metadata Cache + await projectMetadata.init(); // Metric Collector const metricCollector = new MetricCollector(); diff --git a/bls-rcu-action-backend/src/processor/index.js b/bls-rcu-action-backend/src/processor/index.js index 9692df1..f7c6928 100644 --- a/bls-rcu-action-backend/src/processor/index.js +++ b/bls-rcu-action-backend/src/processor/index.js @@ -1,5 +1,6 @@ import { createGuid } from '../utils/uuid.js'; import { kafkaPayloadSchema } from '../schema/kafkaPayload.js'; +import projectMetadata from '../cache/projectMetadata.js'; const normalizeDirection = (value) => { if (!value) return null; @@ -43,116 +44,103 @@ const resolveMessageType = (direction, cmdWord) => { }; const defaultDevTypeActionMap = { - 0: '无效', - 1: '设备回路状态', - 2: '用户操作', - 3: '设备回路状态', - 4: '设备回路状态', - 5: '设备回路状态', - 6: '用户操作', - 7: '用户操作', - 8: '用户操作', - 9: '设备回路状态', - 10: '用户操作', - 11: '用户操作', - 12: '无效', - 13: '设备回路状态', - 14: '设备回路状态', - 15: '设备回路状态', - 16: '设备回路状态', - 17: '设备回路状态', - 18: '无效', - 19: '无效', - 20: '无效', - 21: '设备回路状态', - 22: '无效', - 23: '无效', - 24: '无效', - 25: '无效', - 26: '无效', - 27: '用户操作', - 28: '设备回路状态', - 29: '设备回路状态', - 30: '无效', - 31: '无效', - 32: '用户操作', - 33: '设备回路状态', - 34: '设备回路状态', - 35: '设备回路状态', - 36: '无效', - 37: '用户操作', - 38: '设备回路状态', - 39: '设备回路状态', - 40: '用户操作', - 41: '用户操作', - 42: '无效', - 43: '无效', - 44: '设备回路状态', - 45: '无效', - 46: '用户操作', - 47: '设备回路状态', - 48: '设备回路状态', - 49: '设备回路状态', - 50: '设备回路状态', - 51: '设备回路状态', - 52: '设备回路状态', - 53: '设备回路状态', - 54: '用户操作', - 55: '用户操作', - 56: '设备回路状态', - 57: '设备回路状态', - 241: '设备回路状态' + 0: { name: '无效设备(也可以被认为是场景)', action: '无效' }, + 1: { name: '强电继电器(输出状态)', action: '设备回路状态' }, + 2: { name: '弱电输入(输入状态)', action: '用户操作' }, + 3: { name: '弱电输出(输出状态)', action: '设备回路状态' }, + 4: { name: '服务信息', action: '设备回路状态' }, + 5: { name: '干节点窗帘', action: '设备回路状态' }, + 6: { name: '开关', action: '用户操作' }, + 7: { name: '空调', action: '用户操作' }, + 8: { name: '红外感应', action: '用户操作' }, + 9: { name: '空气质量检测设备', action: '设备回路状态' }, + 10: { name: '插卡取电', action: '用户操作' }, + 11: { name: '地暖', action: '用户操作' }, + 12: { name: 'RCU 设备网络 - 没使用', action: '' }, + 13: { name: '窗帘', action: '设备回路状态' }, + 14: { name: '继电器', action: '设备回路状态' }, + 15: { name: '红外发送', action: '设备回路状态' }, + 16: { name: '调光驱动', action: '设备回路状态' }, + 17: { name: '可控硅调光(可控硅状态)', action: '设备回路状态' }, + 18: { name: '灯带(灯带状态) --2025-11-24 取消', action: '无效' }, + 19: { name: '中控', action: '无效' }, + 20: { name: '微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)', action: '无效' }, + 21: { name: '背景音乐(背景音乐状态)', action: '设备回路状态' }, + 22: { name: '房态下发', action: '无效' }, + 23: { name: '主机本地 调光', action: '无效' }, + 24: { name: '485PWM 调光( PWM 调光状态)', action: '无效' }, + 25: { name: '总线调光( PBLED 调光状态) - 没使用 -', action: '无效' }, + 26: { name: 'RCU 电源', action: '无效' }, + 27: { name: 'A9IO 开关', action: '用户操作' }, + 28: { name: 'A9IO 扩展', action: '设备回路状态' }, + 29: { name: 'A9IO 电源', action: '设备回路状态' }, + 30: { name: '无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)', action: '无效' }, + 31: { name: '无线网关主动(用于主动控制主动设备)', action: '无效' }, + 32: { name: '无线门磁', action: '用户操作' }, + 33: { name: '空气参数显示设备', action: '设备回路状态' }, + 34: { name: '无线继电器红外', action: '设备回路状态' }, + 35: { name: '时间同步', action: '设备回路状态' }, + 36: { name: '监控控制', action: '无效' }, + 37: { name: '旋钮开关控制', action: '用户操作' }, + 38: { name: 'CSIO - 类型', action: '设备回路状态' }, + 39: { name: '插卡状态虚拟设备', action: '设备回路状态' }, + 40: { name: '485 新风设备', action: '用户操作' }, + 41: { name: '485 人脸机', action: '用户操作' }, + 42: { name: '中控', action: '无效' }, + 43: { name: '域控', action: '无效' }, + 44: { name: 'LCD', action: '设备回路状态' }, + 45: { name: '无卡断电 --2025-11-24 取消', action: '无效' }, + 46: { name: '无卡取电 2', action: '用户操作' }, + 47: { name: '虚拟时间设备', action: '设备回路状态' }, + 48: { name: 'PLC 总控', action: '设备回路状态' }, + 49: { name: 'PLC 设备 - 恒流调光设备', action: '设备回路状态' }, + 50: { name: 'PLC 设备 - 恒压调光设备', action: '设备回路状态' }, + 51: { name: 'PLC 设备 - 继电器设备', action: '设备回路状态' }, + 52: { name: '色温调节功能', action: '设备回路状态' }, + 53: { name: '蓝牙音频', action: '设备回路状态' }, + 54: { name: '碳达人', action: '用户操作' }, + 55: { name: '场景还原', action: '用户操作' }, + 56: { name: '全局设置', action: '设备回路状态' }, + 57: { name: '能耗检测', action: '设备回路状态' }, + 241: { name: 'CSIO - 类型', action: '设备回路状态' } }; -const buildDevTypeActionMap = () => { - const raw = process.env.ACTION_TYPE_DEV_TYPE_RULES; - if (!raw || typeof raw !== 'string' || !raw.trim()) { - return defaultDevTypeActionMap; - } - - try { - const parsed = JSON.parse(raw); - if (!Array.isArray(parsed)) { - return defaultDevTypeActionMap; +// Parse env rules if present +let devTypeActionRules = []; +try { + if (process.env.ACTION_TYPE_DEV_TYPE_RULES) { + const parsed = JSON.parse(process.env.ACTION_TYPE_DEV_TYPE_RULES); + if (Array.isArray(parsed)) { + devTypeActionRules = parsed; } - - const overrides = {}; - parsed.forEach((item) => { - if (Array.isArray(item) && item.length >= 2) { - const devType = Number(item[0]); - const actionType = item[1]; - if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) { - overrides[devType] = actionType; - } - return; - } - - if (item && typeof item === 'object') { - const devType = Number(item.dev_type ?? item.devType); - const actionType = item.action_type ?? item.actionType ?? item.action; - if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) { - overrides[devType] = actionType; - } - } - }); - - return { ...defaultDevTypeActionMap, ...overrides }; - } catch { - return defaultDevTypeActionMap; } +} catch (error) { + // Silent fallback +} + +const getActionTypeByDevType = (devType) => { + // 1. Env override + const rule = devTypeActionRules.find(r => r.dev_type === devType); + if (rule?.action_type) return rule.action_type; + + // 2. Default map + const entry = defaultDevTypeActionMap[devType]; + return entry?.action || '设备回路状态'; }; -const devTypeActionMap = buildDevTypeActionMap(); +const getDevTypeName = (devType) => { + // 1. Env override + const rule = devTypeActionRules.find(r => r.dev_type === devType); + if (rule?.name) return rule.name; + + // 2. Default map + const entry = defaultDevTypeActionMap[devType]; + return entry?.name || 'Unknown'; +}; const resolveDevTypeAction = (devType) => { - if (typeof devType !== 'number') { - return '设备回路状态'; - } - const mapped = devTypeActionMap[devType]; - if (mapped) { - return mapped; - } - return '设备回路状态'; + if (devType === null || devType === undefined) return '设备回路状态'; + return getActionTypeByDevType(devType); }; const parseKafkaPayload = (value) => { @@ -247,6 +235,19 @@ export const buildRowsFromPayload = (rawPayload) => { extra: extra || {} }; + // Helper to generate loop name if not found in cache + const getLoopNameWithFallback = (deviceId, devType, devAddr, devLoop) => { + // 1. Try cache + const cachedName = projectMetadata.getLoopName(deviceId, devType, devAddr, devLoop); + if (cachedName) return cachedName; + + // 2. Fallback: [TypeName-Addr-Loop] + const typeName = getDevTypeName(devType); + if (!typeName) return null; // Should have a name if devType is valid + + return `[${devType}${typeName}-${devAddr}-${devLoop}]`; + }; + const rows = []; // Logic 1: 0x36 Status/Fault Report @@ -268,6 +269,7 @@ export const buildRowsFromPayload = (rawPayload) => { dev_loop: device.dev_loop ?? null, dev_data: device.dev_data ?? null, action_type: actionType, + loop_name: getLoopNameWithFallback(deviceId, device.dev_type, device.dev_addr, device.dev_loop), details }); }); @@ -287,6 +289,7 @@ export const buildRowsFromPayload = (rawPayload) => { error_type: fault.error_type ?? null, error_data: fault.error_data ?? null, action_type: actionType, + loop_name: getLoopNameWithFallback(deviceId, fault.dev_type, fault.dev_addr, fault.dev_loop), details }); }); @@ -322,6 +325,7 @@ export const buildRowsFromPayload = (rawPayload) => { type_l: control.type_l ?? null, type_h: control.type_h ?? null, action_type: '下发控制', + loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop), details }); }); @@ -340,18 +344,29 @@ export const buildRowsFromPayload = (rawPayload) => { return rows; } - // Logic 3: 0x0F ACK or others - const fallbackActionType = - normalizedCmdWord === '0x0f' && normalizedDirection === '上报' - ? 'ACK' - : '无效'; + // 3. 0x0F ACK + else if (messageType === '0FACK') { + const { control_list: controls = [] } = payload; + if (Array.isArray(controls)) { + const details = { control_list: controls }; + controls.forEach((control) => { + rows.push({ + ...commonFields, + guid: createGuid(), + dev_type: control.dev_type ?? null, + dev_addr: control.dev_addr ?? null, + dev_loop: control.dev_loop ?? null, + dev_data: control.dev_data ?? null, + type_h: control.type_h ?? null, + action_type: '设备回路状态', + loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop), + details + }); + }); + } + } - return [{ - ...commonFields, - guid: createGuid(), - action_type: fallbackActionType, - details: {} - }]; + return rows; }; export const processKafkaMessage = async ({ message, dbManager, config }) => { diff --git a/bls-rcu-action-backend/tests/processor.test.js b/bls-rcu-action-backend/tests/processor.test.js index a7ef286..664c406 100644 --- a/bls-rcu-action-backend/tests/processor.test.js +++ b/bls-rcu-action-backend/tests/processor.test.js @@ -1,5 +1,6 @@ import { describe, it, expect } from 'vitest'; import { buildRowsFromPayload } from '../src/processor/index.js'; +import projectMetadata from '../src/cache/projectMetadata.js'; describe('Processor Logic', () => { const basePayload = { @@ -95,12 +96,15 @@ describe('Processor Logic', () => { const payload = { ...basePayload, direction: '上报', - cmd_word: '0x0F' + cmd_word: '0x0F', + control_list: [ + { dev_type: 1, dev_addr: 1, dev_loop: 1, dev_data: 1, type_h: 0 } + ] }; const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].action_type).toBe('ACK'); + expect(rows[0].action_type).toBe('设备回路状态'); }); it('should fallback when lists are empty for 0x36', () => { @@ -199,4 +203,27 @@ describe('Processor Logic', () => { trace_id: 'trace-123' }); }); + + it('should enrich rows with loop_name from metadata', () => { + // Mock metadata + projectMetadata.roomMap.set('dev_001', 101); + // Key format: roomTypeId:00Type00Addr00Loop + // type=1, addr=10, loop=1 -> 001010001 + projectMetadata.loopMap.set('101:001010001', 'Main Chandelier'); + + const payload = { + ...basePayload, + direction: '上报', + cmd_word: '0x36', + device_list: [ + { dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }, // Should match 001010001 + { dev_type: 1, dev_addr: 10, dev_loop: 2, dev_data: 0 } // Should not match (001010002) -> Fallback + ] + }; + + const rows = buildRowsFromPayload(payload); + expect(rows[0].loop_name).toBe('Main Chandelier'); + // dev_type 1 is 'Dev_Host_HVout' + expect(rows[1].loop_name).toBe('[1Dev_Host_HVout-10-2]'); + }); }); diff --git a/docs/project.md b/docs/project.md index 7fe28c7..ececa3a 100644 --- a/docs/project.md +++ b/docs/project.md @@ -75,4 +75,20 @@ ACK (待补充) 5. 队列结构 队列分区数:6 - Topic:blwlog4Nodejs-rcu-action-topic \ No newline at end of file + Topic:blwlog4Nodejs-rcu-action-topic + + +6. 入库前特殊操作 + - 定期从temporary_project表中读取数据并保存到`内存`中(以全局变量的形式),每天凌晨1点从数据库更新一次。 + - 每条数据写库之前,需要根据项目ID从内存中读取项目信息。 + - 我需要在现有数据表`rcu_action_events`里,添加一个字段:`loop_name`,用于存储回路名称。 + - 查询`loop_name`的方法是: + - 根据要插入的数据中的`device_id`在`rooms`表中找出对应的房间 -> 得到 `room_type_id`。 + - 根据 `room_type_id` 和 `loop_address` 在 `loops` 表中查找对应的 `loop_name`。 + - `loop_address` 的生成规则:将数据的 `dev_type`、`dev_addr`、`dev_loop` 分别转换为 3 位字符串(不足前方补 0),然后拼接。 + - 例如:`dev_type=1, dev_addr=23, dev_loop=12` -> `001` + `023` + `012` -> `001023012`。 + - **兜底逻辑**:如果根据上述规则在 `loops` 缓存中未找到对应的 `loop_name`,则使用 `dev_type` 对应的设备名称(配置在 `ACTION_TYPE_DEV_TYPE_RULES` 中)默认名称。 + - 格式:`[dev_type名称+'-'+dev_addr+'-'+dev_loop]` + - 例如:`dev_type=35` (名称: TimeCtrl), `addr=14`, `loop=21` -> `[35TimeCtrl-14-21]` + - 最后将找到的或生成的 `loop_name` 写入 `rcu_action_events` 表。 + - 注意,所有查库操作都要通过内存缓存来实现。 diff --git a/docs/temporary_project.sql b/docs/temporary_project.sql new file mode 100644 index 0000000..4094a78 --- /dev/null +++ b/docs/temporary_project.sql @@ -0,0 +1,231 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 109 + Source Server Type : PostgreSQL + Source Server Version : 150004 (150004) + Source Host : 10.8.8.109:5433 + Source Catalog : log_platform + Source Schema : temporary_project + + Target Server Type : PostgreSQL + Target Server Version : 150004 (150004) + File Encoding : 65001 + + Date: 02/02/2026 14:28:41 +*/ + + +-- ---------------------------- +-- Table structure for hotels +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."hotels"; +CREATE TABLE "temporary_project"."hotels" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "hotel_id" int4 NOT NULL, + "hotel_name" varchar(255) COLLATE "pg_catalog"."default", + "id" int4 +) +; + +-- ---------------------------- +-- Table structure for loops_default +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."loops_default"; +CREATE TABLE "temporary_project"."loops_default" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "id" int4 NOT NULL, + "loop_name" varchar(255) COLLATE "pg_catalog"."default", + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255) COLLATE "pg_catalog"."default", + "loop_type" varchar(50) COLLATE "pg_catalog"."default" +) +; + +-- ---------------------------- +-- Table structure for room_type +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."room_type"; +CREATE TABLE "temporary_project"."room_type" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "id" int4 NOT NULL, + "room_type_name" varchar(255) COLLATE "pg_catalog"."default", + "hotel_id" int4 +) +; + +-- ---------------------------- +-- Table structure for rooms_default +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."rooms_default"; +CREATE TABLE "temporary_project"."rooms_default" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "hotel_id" int4 NOT NULL, + "room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL, + "room_type_id" int4, + "device_id" varchar(50) COLLATE "pg_catalog"."default", + "mac" varchar(50) COLLATE "pg_catalog"."default", + "id" int4 +) +; + +-- ---------------------------- +-- Table structure for loops +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."loops"; +CREATE TABLE "temporary_project"."loops" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "id" int4 NOT NULL, + "loop_name" varchar(255) COLLATE "pg_catalog"."default", + "room_type_id" int4 NOT NULL, + "loop_address" varchar(255) COLLATE "pg_catalog"."default", + "loop_type" varchar(50) COLLATE "pg_catalog"."default" +) +PARTITION BY ( +) +; + +-- ---------------------------- +-- Table structure for rooms +-- ---------------------------- +DROP TABLE IF EXISTS "temporary_project"."rooms"; +CREATE TABLE "temporary_project"."rooms" ( + "guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL, + "hotel_id" int4 NOT NULL, + "room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL, + "room_type_id" int4, + "device_id" varchar(50) COLLATE "pg_catalog"."default", + "mac" varchar(50) COLLATE "pg_catalog"."default", + "id" int4 +) +PARTITION BY LIST ( + "hotel_id" "pg_catalog"."int4_ops" +) +; +ALTER TABLE "temporary_project"."rooms" ATTACH PARTITION "temporary_project"."rooms_default" DEFAULT; + +-- ---------------------------- +-- Indexes structure for table hotels +-- ---------------------------- +CREATE INDEX "idx_hotels_hotel_id" ON "temporary_project"."hotels" USING btree ( + "hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_hotels_hotel_name" ON "temporary_project"."hotels" USING btree ( + "hotel_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "idx_hotels_id" ON "temporary_project"."hotels" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table hotels +-- ---------------------------- +ALTER TABLE "temporary_project"."hotels" ADD CONSTRAINT "hotels_pkey" PRIMARY KEY ("hotel_id", "guid"); + +-- ---------------------------- +-- Indexes structure for table loops_default +-- ---------------------------- +CREATE INDEX "loops_default_id_idx" ON "temporary_project"."loops_default" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "loops_default_loop_address_idx" ON "temporary_project"."loops_default" USING btree ( + "loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "loops_default_loop_name_idx" ON "temporary_project"."loops_default" USING btree ( + "loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "loops_default_loop_type_idx" ON "temporary_project"."loops_default" USING btree ( + "loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "loops_default_room_type_id_idx" ON "temporary_project"."loops_default" USING btree ( + "room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table loops_default +-- ---------------------------- +ALTER TABLE "temporary_project"."loops_default" ADD CONSTRAINT "loops_default_pkey" PRIMARY KEY ("guid", "id", "room_type_id"); + +-- ---------------------------- +-- Indexes structure for table room_type +-- ---------------------------- +CREATE INDEX "idx_room_type_hotel_id" ON "temporary_project"."room_type" USING btree ( + "hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_room_type_id" ON "temporary_project"."room_type" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_room_type_name" ON "temporary_project"."room_type" USING btree ( + "room_type_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table room_type +-- ---------------------------- +ALTER TABLE "temporary_project"."room_type" ADD CONSTRAINT "room_type_pkey" PRIMARY KEY ("guid", "id"); + +-- ---------------------------- +-- Indexes structure for table rooms_default +-- ---------------------------- +CREATE INDEX "rooms_default_device_id_idx" ON "temporary_project"."rooms_default" USING btree ( + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "rooms_default_hotel_id_idx" ON "temporary_project"."rooms_default" USING btree ( + "hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "rooms_default_id_idx" ON "temporary_project"."rooms_default" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "rooms_default_mac_idx" ON "temporary_project"."rooms_default" USING btree ( + "mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table rooms_default +-- ---------------------------- +ALTER TABLE "temporary_project"."rooms_default" ADD CONSTRAINT "rooms_default_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id"); + +-- ---------------------------- +-- Indexes structure for table loops +-- ---------------------------- +CREATE INDEX "idx_loops_address" ON "temporary_project"."loops" USING btree ( + "loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "idx_loops_id" ON "temporary_project"."loops" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_loops_name" ON "temporary_project"."loops" USING btree ( + "loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "idx_loops_room_type_id" ON "temporary_project"."loops" USING btree ( + "room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_loops_type" ON "temporary_project"."loops" USING btree ( + "loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table loops +-- ---------------------------- +ALTER TABLE "temporary_project"."loops" ADD CONSTRAINT "loops_pkey" PRIMARY KEY ("guid", "id", "room_type_id"); + +-- ---------------------------- +-- Indexes structure for table rooms +-- ---------------------------- +CREATE INDEX "idx_rooms_device_id" ON "temporary_project"."rooms" USING btree ( + "device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); +CREATE INDEX "idx_rooms_hotel_id" ON "temporary_project"."rooms" USING btree ( + "hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_rooms_id" ON "temporary_project"."rooms" USING btree ( + "id" "pg_catalog"."int4_ops" ASC NULLS LAST +); +CREATE INDEX "idx_rooms_mac" ON "temporary_project"."rooms" USING btree ( + "mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Primary Key structure for table rooms +-- ---------------------------- +ALTER TABLE "temporary_project"."rooms" ADD CONSTRAINT "rooms_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id"); diff --git a/openspec/changes/feature-loop-name-enrichment.md b/openspec/changes/feature-loop-name-enrichment.md new file mode 100644 index 0000000..2d7a390 --- /dev/null +++ b/openspec/changes/feature-loop-name-enrichment.md @@ -0,0 +1,32 @@ +# Feature: Loop Name Enrichment + +**Status**: Proposed +**Date**: 2026-02-02 + +## Summary +Enrich incoming RCU action events with `loop_name` by looking up metadata from `temporary_project` tables. This allows easier identification of specific device loops (e.g., "Main Chandelier") in the event log. + +## Requirements +1. **Cache Mechanism**: + - Load `rooms` and `loops` data from `temporary_project` schema into memory. + - Refresh cache daily at 1:00 AM. +2. **Enrichment**: + - For each incoming event, look up `loop_name` using `device_id` and `dev_addr`. + - `device_id` -> `room_type_id` (via `rooms` table). + - `room_type_id` + `dev_addr` -> `loop_name` (via `loops` table). +3. **Storage**: + - Store `loop_name` in `rcu_action_events` table. + +## Ambiguity Resolution +- The requirement mentioned matching `dev_type` to find the loop. However, standard RCU addressing uses `dev_addr` (and `dev_loop`). We assume `loops.loop_address` corresponds to the packet's `dev_addr` (converted to string). +- We will attempt to match `dev_addr` against `loop_address`. + +## Schema Changes +- **Table**: `rcu_action.rcu_action_events` +- **Column**: `loop_name` (VARCHAR(255), Nullable) + +## Implementation Plan +1. **Database**: Update `init_db.sql` and `databaseManager.js`. +2. **Cache**: Create `src/cache/projectMetadata.js`. +3. **Processor**: Integrate cache lookup in `src/processor/index.js`. +4. **Lifecycle**: Initialize cache in `src/index.js`. diff --git a/openspec/project.md b/openspec/project.md index 210a58c..7a5661f 100644 --- a/openspec/project.md +++ b/openspec/project.md @@ -12,6 +12,9 @@ Backend service for processing RCU action events from Kafka, parsing them, and s - **Error Handling**: Redis List (`error_queue`) for failed messages + Retry mechanism - **Output**: PostgreSQL Table (`rcu_action_events`) +## Features +- **Loop Name Enrichment**: Enriches event data with `loop_name` by matching `device_id` and `dev_addr` against metadata cached from `temporary_project` tables (refreshed daily). + ## Configuration (Environment Variables) The project is configured via `.env`. Key variables: - **Kafka**: `KAFKA_BROKERS`, `KAFKA_TOPIC`, `KAFKA_SASL_USERNAME`, `KAFKA_SASL_PASSWORD`