diff --git a/.gitignore b/.gitignore index e375517..155e108 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /.github bls-rcu-action-backend/node_modules bls-rcu-action-backend/skill +/bls-rcu-action-backend/dist diff --git a/bls-rcu-action-backend/.env b/bls-rcu-action-backend/.env index fa874a8..a031c85 100644 --- a/bls-rcu-action-backend/.env +++ b/bls-rcu-action-backend/.env @@ -9,6 +9,11 @@ KAFKA_SASL_MECHANISM=plain KAFKA_SASL_USERNAME=blwmomo KAFKA_SASL_PASSWORD=blwmomo KAFKA_SSL_ENABLED=false +KAFKA_CONSUMER_INSTANCES=6 +KAFKA_MAX_IN_FLIGHT=50 +KAFKA_FETCH_MAX_BYTES=10485760 +KAFKA_FETCH_MAX_WAIT_MS=100 +KAFKA_FETCH_MIN_BYTES=1 POSTGRES_HOST=10.8.8.109 POSTGRES_PORT=5433 diff --git a/bls-rcu-action-backend/ecosystem.config.cjs b/bls-rcu-action-backend/ecosystem.config.cjs index 453f908..3ff74ba 100644 --- a/bls-rcu-action-backend/ecosystem.config.cjs +++ b/bls-rcu-action-backend/ecosystem.config.cjs @@ -1,24 +1,22 @@ module.exports = { apps: [{ - name: 'bls-rcu-action-backend', + name: 'bls-rcu-action', script: 'dist/index.js', instances: 1, exec_mode: 'fork', autorestart: true, watch: false, max_memory_restart: '1G', + env_file: '.env', env: { NODE_ENV: 'production', - PORT: 3000 - }, - env_development: { - NODE_ENV: 'development', - PORT: 3000 + PORT: 3001 }, error_file: './logs/error.log', out_file: './logs/out.log', log_date_format: 'YYYY-MM-DD HH:mm:ss Z', merge_logs: true, + kill_timeout: 5000, time: true }] }; diff --git a/bls-rcu-action-backend/output_dev_debug.log b/bls-rcu-action-backend/output_dev_debug.log deleted file mode 100644 index 80a1ade..0000000 --- a/bls-rcu-action-backend/output_dev_debug.log +++ /dev/null @@ -1 +0,0 @@ -{"level":"error","message":"Kafka message handling failed","timestamp":1769689985427,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}} diff --git a/bls-rcu-action-backend/output_dev_final.log b/bls-rcu-action-backend/output_dev_final.log deleted file mode 100644 index 40e3410..0000000 --- a/bls-rcu-action-backend/output_dev_final.log +++ /dev/null @@ -1 +0,0 @@ -{"level":"error","message":"Kafka message handling failed","timestamp":1769689777074,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}} diff --git a/bls-rcu-action-backend/output_dev_v2.log b/bls-rcu-action-backend/output_dev_v2.log deleted file mode 100644 index 2997c33..0000000 --- a/bls-rcu-action-backend/output_dev_v2.log +++ /dev/null @@ -1 +0,0 @@ -{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769688900027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}} diff --git a/bls-rcu-action-backend/output_dev_v3.log b/bls-rcu-action-backend/output_dev_v3.log deleted file mode 100644 index ff93d8d..0000000 --- a/bls-rcu-action-backend/output_dev_v3.log +++ /dev/null @@ -1 +0,0 @@ -{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689140027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}} diff --git a/bls-rcu-action-backend/output_dev_v4.log b/bls-rcu-action-backend/output_dev_v4.log deleted file mode 100644 index 24913f9..0000000 --- a/bls-rcu-action-backend/output_dev_v4.log +++ /dev/null @@ -1 +0,0 @@ -{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689260031,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}} diff --git a/bls-rcu-action-backend/src/config/config.js b/bls-rcu-action-backend/src/config/config.js index 4c2e963..f5c7acf 100644 --- a/bls-rcu-action-backend/src/config/config.js +++ b/bls-rcu-action-backend/src/config/config.js @@ -22,6 +22,12 @@ export const config = { groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group', clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client', consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1), + maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50), + fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024), + fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1), + fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100), + autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5000), + logMessages: process.env.KAFKA_LOG_MESSAGES === 'true', sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? { mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain', username: process.env.KAFKA_SASL_USERNAME, diff --git a/bls-rcu-action-backend/src/db/initializer.js b/bls-rcu-action-backend/src/db/initializer.js index 7bc7292..2693a47 100644 --- a/bls-rcu-action-backend/src/db/initializer.js +++ b/bls-rcu-action-backend/src/db/initializer.js @@ -68,7 +68,15 @@ class DatabaseInitializer { // dbManager connects to the target database const client = await dbManager.pool.connect(); try { - const sqlPath = path.resolve(__dirname, '../../scripts/init_db.sql'); + const sqlPathCandidates = [ + path.resolve(process.cwd(), 'scripts/init_db.sql'), + path.resolve(__dirname, '../scripts/init_db.sql'), + path.resolve(__dirname, '../../scripts/init_db.sql') + ]; + const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate)); + if (!sqlPath) { + throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`); + } const sql = fs.readFileSync(sqlPath, 'utf8'); logger.info('Executing init_db.sql...'); diff --git a/bls-rcu-action-backend/src/index.js b/bls-rcu-action-backend/src/index.js index 78ac428..5fff6b9 100644 --- a/bls-rcu-action-backend/src/index.js +++ b/bls-rcu-action-backend/src/index.js @@ -3,7 +3,7 @@ import { config } from './config/config.js'; import dbManager from './db/databaseManager.js'; import dbInitializer from './db/initializer.js'; import partitionManager from './db/partitionManager.js'; -import { createKafkaConsumer } from './kafka/consumer.js'; +import { createKafkaConsumers } from './kafka/consumer.js'; import { processKafkaMessage } from './processor/index.js'; import { createRedisClient } from './redis/redisClient.js'; import { RedisIntegration } from './redis/redisIntegration.js'; @@ -79,13 +79,23 @@ const bootstrap = async () => { const messageKey = Buffer.isBuffer(message.key) ? message.key.toString('utf8') : message.key; - logger.info('Kafka message received', { - topic: message.topic, - partition: message.partition, - offset: message.offset, - key: messageKey, - value: messageValue - }); + if (config.kafka.logMessages) { + logger.info('Kafka message received', { + topic: message.topic, + partition: message.partition, + offset: message.offset, + key: messageKey, + value: messageValue + }); + } else { + logger.info('Kafka message received', { + topic: message.topic, + partition: message.partition, + offset: message.offset, + key: messageKey, + valueLength: typeof messageValue === 'string' ? messageValue.length : null + }); + } const inserted = await processKafkaMessage({ message, dbManager, config }); metricCollector.increment('db_inserted'); logger.info('Kafka message processed', { inserted }); @@ -143,7 +153,7 @@ const bootstrap = async () => { } }; - const consumer = createKafkaConsumer({ + const consumers = createKafkaConsumers({ kafkaConfig: config.kafka, onMessage: handleMessage, onError: handleError @@ -170,9 +180,9 @@ const bootstrap = async () => { try { // 1. Close Kafka Consumer - if (consumer) { - await new Promise((resolve) => consumer.close(true, resolve)); - logger.info('Kafka consumer closed'); + if (consumers && consumers.length > 0) { + await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve)))); + logger.info('Kafka consumer closed', { count: consumers.length }); } // 2. Stop Redis Heartbeat (if method exists, otherwise just close client) diff --git a/bls-rcu-action-backend/src/kafka/consumer.js b/bls-rcu-action-backend/src/kafka/consumer.js index beeddd6..5bc9217 100644 --- a/bls-rcu-action-backend/src/kafka/consumer.js +++ b/bls-rcu-action-backend/src/kafka/consumer.js @@ -3,29 +3,54 @@ import { logger } from '../utils/logger.js'; const { ConsumerGroup } = kafka; -export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => { +const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => { const kafkaHost = kafkaConfig.brokers.join(','); + const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`; + const id = `${clientId}-${process.pid}-${Date.now()}`; + const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50; + let inFlight = 0; + const consumer = new ConsumerGroup( { kafkaHost, groupId: kafkaConfig.groupId, - clientId: kafkaConfig.clientId, + clientId, + id, fromOffset: 'earliest', protocol: ['roundrobin'], outOfRangeOffset: 'latest', autoCommit: true, + autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs, + fetchMaxBytes: kafkaConfig.fetchMaxBytes, + fetchMinBytes: kafkaConfig.fetchMinBytes, + fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs, sasl: kafkaConfig.sasl }, kafkaConfig.topic ); + const tryResume = () => { + if (inFlight < maxInFlight) { + consumer.resume(); + } + }; + consumer.on('message', (message) => { - onMessage(message).catch((error) => { - logger.error('Kafka message handling failed', { error: error?.message }); - if (onError) { - onError(error, message); - } - }); + inFlight += 1; + if (inFlight >= maxInFlight) { + consumer.pause(); + } + Promise.resolve(onMessage(message)) + .catch((error) => { + logger.error('Kafka message handling failed', { error: error?.message }); + if (onError) { + onError(error, message); + } + }) + .finally(() => { + inFlight -= 1; + tryResume(); + }); }); consumer.on('error', (error) => { @@ -37,3 +62,14 @@ export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => { return consumer; }; + +export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => { + const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1; + const count = Math.max(1, instances); + return Array.from({ length: count }, (_, idx) => + createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx }) + ); +}; + +export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => + createKafkaConsumers({ kafkaConfig, onMessage, onError })[0]; diff --git a/bls-rcu-action-backend/src/processor/index.js b/bls-rcu-action-backend/src/processor/index.js index 6d063b6..9692df1 100644 --- a/bls-rcu-action-backend/src/processor/index.js +++ b/bls-rcu-action-backend/src/processor/index.js @@ -23,14 +23,13 @@ const normalizeCmdWord = (value) => { } return trimmed; } - // The Zod schema might have already converted numbers to strings, but let's be safe if (typeof value === 'number' && Number.isFinite(value)) { return `0x${value.toString(16).toLowerCase()}`; } return null; }; -const resolveActionType = (direction, cmdWord) => { +const resolveMessageType = (direction, cmdWord) => { if (cmdWord === '0x36') { return '36上报'; } @@ -43,6 +42,119 @@ const resolveActionType = (direction, cmdWord) => { return null; }; +const defaultDevTypeActionMap = { + 0: '无效', + 1: '设备回路状态', + 2: '用户操作', + 3: '设备回路状态', + 4: '设备回路状态', + 5: '设备回路状态', + 6: '用户操作', + 7: '用户操作', + 8: '用户操作', + 9: '设备回路状态', + 10: '用户操作', + 11: '用户操作', + 12: '无效', + 13: '设备回路状态', + 14: '设备回路状态', + 15: '设备回路状态', + 16: '设备回路状态', + 17: '设备回路状态', + 18: '无效', + 19: '无效', + 20: '无效', + 21: '设备回路状态', + 22: '无效', + 23: '无效', + 24: '无效', + 25: '无效', + 26: '无效', + 27: '用户操作', + 28: '设备回路状态', + 29: '设备回路状态', + 30: '无效', + 31: '无效', + 32: '用户操作', + 33: '设备回路状态', + 34: '设备回路状态', + 35: '设备回路状态', + 36: '无效', + 37: '用户操作', + 38: '设备回路状态', + 39: '设备回路状态', + 40: '用户操作', + 41: '用户操作', + 42: '无效', + 43: '无效', + 44: '设备回路状态', + 45: '无效', + 46: '用户操作', + 47: '设备回路状态', + 48: '设备回路状态', + 49: '设备回路状态', + 50: '设备回路状态', + 51: '设备回路状态', + 52: '设备回路状态', + 53: '设备回路状态', + 54: '用户操作', + 55: '用户操作', + 56: '设备回路状态', + 57: '设备回路状态', + 241: '设备回路状态' +}; + +const buildDevTypeActionMap = () => { + const raw = process.env.ACTION_TYPE_DEV_TYPE_RULES; + if (!raw || typeof raw !== 'string' || !raw.trim()) { + return defaultDevTypeActionMap; + } + + try { + const parsed = JSON.parse(raw); + if (!Array.isArray(parsed)) { + return defaultDevTypeActionMap; + } + + const overrides = {}; + parsed.forEach((item) => { + if (Array.isArray(item) && item.length >= 2) { + const devType = Number(item[0]); + const actionType = item[1]; + if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) { + overrides[devType] = actionType; + } + return; + } + + if (item && typeof item === 'object') { + const devType = Number(item.dev_type ?? item.devType); + const actionType = item.action_type ?? item.actionType ?? item.action; + if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) { + overrides[devType] = actionType; + } + } + }); + + return { ...defaultDevTypeActionMap, ...overrides }; + } catch { + return defaultDevTypeActionMap; + } +}; + +const devTypeActionMap = buildDevTypeActionMap(); + +const resolveDevTypeAction = (devType) => { + if (typeof devType !== 'number') { + return '设备回路状态'; + } + const mapped = devTypeActionMap[devType]; + if (mapped) { + return mapped; + } + return '设备回路状态'; +}; + const parseKafkaPayload = (value) => { const raw = Buffer.isBuffer(value) ? value.toString('utf8') : value; if (typeof raw !== 'string') { @@ -56,6 +168,27 @@ export const buildRowsFromMessageValue = (value) => { return buildRowsFromPayload(payload); }; +const isHexString = (value) => { + if (typeof value !== 'string') { + return false; + } + const trimmed = value.trim(); + if (!trimmed) { + return false; + } + const compact = trimmed.replace(/[\s:]/g, ''); + if (compact.length === 0 || compact.length % 2 !== 0) { + return false; + } + return /^[0-9a-fA-F]+$/.test(compact); +}; + +const hexToBase64 = (hex) => { + const compact = hex.replace(/[\s:]/g, ''); + const buffer = Buffer.from(compact, 'hex'); + return buffer.toString('base64'); +}; + export const buildRowsFromPayload = (rawPayload) => { // 1. Validate and transform payload using Zod schema const payload = kafkaPayloadSchema.parse(rawPayload); @@ -68,18 +201,23 @@ export const buildRowsFromPayload = (rawPayload) => { direction, cmd_word: cmdWord, frame_id: frameId, - udp_raw: udpRaw, + udp_raw: udpRawRaw, sys_lock_status: sysLockStatus, report_count: reportCount, fault_count: faultCount, device_list: deviceList, // Zod provides default [] fault_list: faultList, // Zod provides default [] - control_list: controlList // Zod provides default [] + control_list: controlList, // Zod provides default [] + extra } = payload; + // Temporary migration logic: if udp_raw is hex, convert to base64 before insert. + // This is a transitional mechanism and should be removed once all upstream systems send base64 directly. + const udpRaw = isHexString(udpRawRaw) ? hexToBase64(udpRawRaw) : udpRawRaw; + const normalizedDirection = normalizeDirection(direction); const normalizedCmdWord = normalizeCmdWord(cmdWord); - const actionType = resolveActionType(normalizedDirection, normalizedCmdWord); + const messageType = resolveMessageType(normalizedDirection, normalizedCmdWord); const writeTsMs = Date.now(); // Base fields common to all rows (excluding unique ID) @@ -88,12 +226,11 @@ export const buildRowsFromPayload = (rawPayload) => { write_ts_ms: writeTsMs, hotel_id: hotelId, room_id: roomId, - device_id: deviceId, // Pass through normalized/validated device_id + device_id: deviceId, direction: normalizedDirection, cmd_word: normalizedCmdWord, frame_id: frameId, udp_raw: udpRaw, - action_type: actionType, sys_lock_status: sysLockStatus ?? null, report_count: reportCount ?? null, fault_count: faultCount ?? null, @@ -107,13 +244,13 @@ export const buildRowsFromPayload = (rawPayload) => { type_l: null, type_h: null, details: null, - extra: { raw_hex: udpRaw } + extra: extra || {} }; const rows = []; // Logic 1: 0x36 Status/Fault Report - if (actionType === '36上报') { + if (messageType === '36上报') { const details = { device_list: deviceList, fault_list: faultList @@ -122,6 +259,7 @@ export const buildRowsFromPayload = (rawPayload) => { // Process device status list if (deviceList.length > 0) { deviceList.forEach(device => { + const actionType = resolveDevTypeAction(device.dev_type); rows.push({ ...commonFields, guid: createGuid(), @@ -129,6 +267,7 @@ export const buildRowsFromPayload = (rawPayload) => { dev_addr: device.dev_addr ?? null, dev_loop: device.dev_loop ?? null, dev_data: device.dev_data ?? null, + action_type: actionType, details }); }); @@ -137,6 +276,7 @@ export const buildRowsFromPayload = (rawPayload) => { // Process fault list if (faultList.length > 0) { faultList.forEach(fault => { + const actionType = resolveDevTypeAction(fault.dev_type); rows.push({ ...commonFields, guid: createGuid(), @@ -146,6 +286,7 @@ export const buildRowsFromPayload = (rawPayload) => { dev_loop: fault.dev_loop ?? null, error_type: fault.error_type ?? null, error_data: fault.error_data ?? null, + action_type: actionType, details }); }); @@ -156,6 +297,7 @@ export const buildRowsFromPayload = (rawPayload) => { rows.push({ ...commonFields, guid: createGuid(), + action_type: '设备回路状态', details }); } @@ -164,7 +306,7 @@ export const buildRowsFromPayload = (rawPayload) => { } // Logic 2: 0x0F Control Command - if (actionType === '0F下发') { + if (messageType === '0F下发') { const details = { control_list: controlList }; @@ -179,6 +321,7 @@ export const buildRowsFromPayload = (rawPayload) => { dev_loop: control.dev_loop ?? null, type_l: control.type_l ?? null, type_h: control.type_h ?? null, + action_type: '下发控制', details }); }); @@ -189,6 +332,7 @@ export const buildRowsFromPayload = (rawPayload) => { rows.push({ ...commonFields, guid: createGuid(), + action_type: '下发控制', details }); } @@ -197,10 +341,15 @@ export const buildRowsFromPayload = (rawPayload) => { } // Logic 3: 0x0F ACK or others - // Default behavior: single row + const fallbackActionType = + normalizedCmdWord === '0x0f' && normalizedDirection === '上报' + ? 'ACK' + : '无效'; + return [{ ...commonFields, guid: createGuid(), + action_type: fallbackActionType, details: {} }]; }; diff --git a/bls-rcu-action-backend/src/schema/kafkaPayload.js b/bls-rcu-action-backend/src/schema/kafkaPayload.js index a0e2e22..c0fd8db 100644 --- a/bls-rcu-action-backend/src/schema/kafkaPayload.js +++ b/bls-rcu-action-backend/src/schema/kafkaPayload.js @@ -55,5 +55,10 @@ export const kafkaPayloadSchema = z.object({ // Lists device_list: listSchema(deviceItemSchema), fault_list: listSchema(faultItemSchema), - control_list: listSchema(controlItemSchema) + control_list: listSchema(controlItemSchema), + + extra: z.preprocess( + (value) => (value === null ? {} : value), + z.any().optional().default({}) + ) }); diff --git a/bls-rcu-action-backend/tests/processor.test.js b/bls-rcu-action-backend/tests/processor.test.js index aa92731..a7ef286 100644 --- a/bls-rcu-action-backend/tests/processor.test.js +++ b/bls-rcu-action-backend/tests/processor.test.js @@ -10,7 +10,7 @@ describe('Processor Logic', () => { direction: '上报', cmd_word: '0x36', frame_id: 1, - udp_raw: '3601...', + udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB', sys_lock_status: 0, report_count: 0, fault_count: 0 @@ -35,7 +35,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(2); - expect(rows[0].action_type).toBe('36上报'); + expect(rows[0].action_type).toBe('设备回路状态'); expect(rows[0].dev_addr).toBe(10); expect(rows[1].dev_addr).toBe(11); expect(rows[0].details.device_list).toHaveLength(2); @@ -54,7 +54,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].action_type).toBe('36上报'); + expect(rows[0].action_type).toBe('设备回路状态'); expect(rows[0].error_type).toBe(2); }); @@ -85,7 +85,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].action_type).toBe('0F下发'); + expect(rows[0].action_type).toBe('下发控制'); expect(rows[0].type_l).toBe(1); expect(rows[0].type_h).toBe(2); expect(rows[0].dev_loop).toBe(1); @@ -100,7 +100,7 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].action_type).toBe('0FACK'); + expect(rows[0].action_type).toBe('ACK'); }); it('should fallback when lists are empty for 0x36', () => { @@ -114,7 +114,89 @@ describe('Processor Logic', () => { const rows = buildRowsFromPayload(payload); expect(rows).toHaveLength(1); - expect(rows[0].action_type).toBe('36上报'); + expect(rows[0].action_type).toBe('设备回路状态'); expect(rows[0].dev_type).toBeNull(); }); + + it('should classify 0x36 as 用户操作 when dev_type is user-operated', () => { + const payload = { + ...basePayload, + direction: '上报', + cmd_word: '0x36', + device_list: [ + { dev_type: 2, dev_addr: 10, dev_loop: 1, dev_data: 100 } + ] + }; + + const rows = buildRowsFromPayload(payload); + expect(rows).toHaveLength(1); + expect(rows[0].action_type).toBe('用户操作'); + }); + + it('should store udp_raw as base64 when input is hex string', () => { + const payload = { + ...basePayload, + direction: '上报', + cmd_word: '0x36', + device_list: [], + fault_list: [] + }; + + const expectedBase64 = Buffer.from(payload.udp_raw.replace(/[\s:]/g, ''), 'hex').toString('base64'); + const rows = buildRowsFromPayload(payload); + + expect(rows).toHaveLength(1); + expect(rows[0].udp_raw).toBe(expectedBase64); + }); + + it('should keep udp_raw unchanged when input is not hex string', () => { + const payload = { + ...basePayload, + udp_raw: 'YWJjMTIz', + direction: '上报', + cmd_word: '0x36', + device_list: [ + { dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 } + ] + }; + + const rows = buildRowsFromPayload(payload); + + expect(rows[0].udp_raw).toBe('YWJjMTIz'); + }); + + it('should default extra to empty object when not provided', () => { + const payload = { + ...basePayload, + direction: '上报', + cmd_word: '0x36', + device_list: [], + fault_list: [] + }; + + const rows = buildRowsFromPayload(payload); + + expect(rows[0].extra).toEqual({}); + }); + + it('should preserve extra when provided by upstream', () => { + const payload = { + ...basePayload, + direction: '上报', + cmd_word: '0x36', + device_list: [], + fault_list: [], + extra: { + source: 'upstream', + trace_id: 'trace-123' + } + }; + + const rows = buildRowsFromPayload(payload); + + expect(rows[0].extra).toEqual({ + source: 'upstream', + trace_id: 'trace-123' + }); + }); }); diff --git a/docs/error.log b/docs/error.log new file mode 100644 index 0000000..08c7771 --- /dev/null +++ b/docs/error.log @@ -0,0 +1,44 @@ +2026-01-30T16:54:47: Error [ERR_MODULE_NOT_FOUND]: Cannot find package 'node-cron' imported from R:\nodejsROOT\bls\rcu-action\dist\index.js +2026-01-30T16:54:47: at Object.getPackageJSONURL (node:internal/modules/package_json_reader:316:9) +2026-01-30T16:54:47: at packageResolve (node:internal/modules/esm/resolve:768:81) +2026-01-30T16:54:47: at moduleResolve (node:internal/modules/esm/resolve:858:18) +2026-01-30T16:54:47: at defaultResolve (node:internal/modules/esm/resolve:990:11) +2026-01-30T16:54:47: at #cachedDefaultResolve (node:internal/modules/esm/loader:737:20) +2026-01-30T16:54:47: at ModuleLoader.resolve (node:internal/modules/esm/loader:714:38) +2026-01-30T16:54:47: at ModuleLoader.getModuleJobForImport (node:internal/modules/esm/loader:293:38) +2026-01-30T16:54:47: at #link (node:internal/modules/esm/module_job:208:49) +2026-01-30T16:54:47: at process.processTicksAndRejections (node:internal/process/task_queues:103:5) { +2026-01-30T16:54:47: code: 'ERR_MODULE_NOT_FOUND' +2026-01-30T16:54:47: } +2026-01-30T16:56:12: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763372054,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:12: {"level":"error","message":"Service bootstrap failed","timestamp":1769763372055,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:12: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763372929,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:12: {"level":"error","message":"Service bootstrap failed","timestamp":1769763372929,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:13: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763373801,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:13: {"level":"error","message":"Service bootstrap failed","timestamp":1769763373801,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:14: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763374671,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:14: {"level":"error","message":"Service bootstrap failed","timestamp":1769763374671,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:15: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763375539,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:15: {"level":"error","message":"Service bootstrap failed","timestamp":1769763375539,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:16: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763376418,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:16: {"level":"error","message":"Service bootstrap failed","timestamp":1769763376419,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:17: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763377290,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:17: {"level":"error","message":"Service bootstrap failed","timestamp":1769763377291,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:18: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763378161,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:18: {"level":"error","message":"Service bootstrap failed","timestamp":1769763378162,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:19: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763379035,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:19: {"level":"error","message":"Service bootstrap failed","timestamp":1769763379035,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:19: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763379920,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:19: {"level":"error","message":"Service bootstrap failed","timestamp":1769763379921,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:20: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763380801,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:20: {"level":"error","message":"Service bootstrap failed","timestamp":1769763380802,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:21: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763381675,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:21: {"level":"error","message":"Service bootstrap failed","timestamp":1769763381675,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:22: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763382560,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:22: {"level":"error","message":"Service bootstrap failed","timestamp":1769763382561,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:23: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763383432,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:23: {"level":"error","message":"Service bootstrap failed","timestamp":1769763383433,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:24: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763384307,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:24: {"level":"error","message":"Service bootstrap failed","timestamp":1769763384307,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} +2026-01-30T16:56:25: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763385185,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}} +2026-01-30T16:56:25: {"level":"error","message":"Service bootstrap failed","timestamp":1769763385185,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}} diff --git a/docs/kafka_format.md b/docs/kafka_format.md index 31b31a4..c15910b 100644 --- a/docs/kafka_format.md +++ b/docs/kafka_format.md @@ -24,7 +24,7 @@ JSON 消息由 **Header 信息** 和 **业务列表数据** 组成。 | **direction** | String | **是** | "上报" 或 "下发" | | **cmd_word** | String | **是** | 命令字 (如 "0x36", "0x0F") | | **frame_id** | Number | **是** | 通讯帧号 | -| **udp_raw** | String | **是** | UDP 原始报文 (作为备份/校验) | +| **udp_raw** | String | **是** | UDP 原始报文备份;推荐上游直接发送base64字符串;当前若上游仍发送十六进制字符串,后端会在入库前临时将其转换为base64,后续会取消该自动转换机制 | | **sys_lock_status** | Number | 否 | 系统锁状态 (0:未锁, 1:锁定) | | **report_count** | Number | 否 | 上报设备数量 (对应 device_list 长度) | | **fault_count** | Number | 否 | 故障设备数量 (对应 fault_list 长度) | diff --git a/docs/readme.md b/docs/readme.md index 8929246..6a418bf 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -30,7 +30,7 @@ | **direction** | VARCHAR(10) | 数据方向 | **必填** (L53) "上报"/"下发" Index | | **cmd_word** | VARCHAR(10) | 命令字 | **必填** (L54) 如 "0x36", "0x0F" Index | | **frame_id** | INTEGER | 通讯帧号 | **必填** (L55) 用于串联命令与状态 | -| **udp_raw** | TEXT | UDP消息原文 | **必填** (L56) Hex字符串 | +| **udp_raw** | TEXT | UDP消息原文 | **必填** (L56) base64字符串;当前若上游仍发送十六进制字符串,则后端会在入库前临时转换为base64,后续会取消该自动转换机制 | | **action_type** | VARCHAR(20) | 记录行为类型 | **必填** (L57) Index | | **sys_lock_status** | SMALLINT | 系统锁状态 | (L59) 可空 | | **report_count** | SMALLINT | 本次上报数量 | (L60) 可空 | @@ -44,16 +44,80 @@ | **type_l** | SMALLINT | 执行方式 | 可空 (0x0F下发) | | **type_h** | SMALLINT | 执行内容 | 可空 (0x0F下发) | | **details** | JSONB | 业务详情数据 | 存储不定长设备列表、故障信息等 | -| **extra** | JSONB | 扩展信息 | 存储通讯原文等扩展数据 | +| **extra** | JSONB | 扩展信息 | 存储上游传入的extra扩展字段;如未提供则为空对象 | **主键定义**: `(ts_ms, guid)` **索引定义**: 备注带index的字段为需要索引的字段,用于提高查询效率。 ### 3.2 字典定义 **Action Type (记录行为类型)**: -- `"0FACK"`: ACK (应答) -- `"0F下发"`: 下发控制 (0x0F 下发) -- `"36上报"`: 设备回路状态 (0x36 上报) +- 枚举值:(ACK和下发控制是0x0F的特殊情况,用户操作和设备回路状态是0x36的枚举值) + - ACK:ACK是0x0F的上报独有的,所以如果0F且Direction为上报,就标记为ACK + - 下发控制:0x0F的Direction为下发指令,记录为下发控制 + - 用户操作:0x36上报 的开关、温控器等客户操作设备产生的,属于用户操作 + - 设备回路状态:0x36上报 的灯光、继电器回路等变化等受控设备,属于设备回路状态 + - 用户操作和设备回路状态的具体区分表(根据本行数据的dev_type来区分),注意,这张表是根据dev_type来区分的,所以dev_type不能改变,否则会导致数据错误,另外这个表要写入env配置文件,以数组形式保存,随时可以更改: + |dev_type|名称|描述|Action Type| + |---|---|---|---| + |0|Dev_Host_Invalid|无效设备(也可以被认为是场景)|无效| + |1|Dev_Host_HVout|强电继电器(输出状态)|设备回路状态| + |2|Dev_Host_LVinput|弱电输入(输入状态)|用户操作| + |3|Dev_Host_LVoutput|弱电输出(输出状态)|设备回路状态| + |4|Dev_Host_Service|服务信息|设备回路状态| + |5|Dev_NodeCurtain|干节点窗帘|设备回路状态| + |6|DEV_RS485_SWT|开关|用户操作| + |7|DEV_RS485_TEMP|空调|用户操作| + |8|DEV_RS485_INFRARED|红外感应|用户操作| + |9|DEV_RS485_AirDetect|空气质量检测设备|设备回路状态| + |10|DEV_RS485_CARD|插卡取电|用户操作| + |11|DEV_RS485_HEATER|地暖|用户操作| + |12|Dev_RCU_NET|RCU 设备网络 - 没使用|| + |13|DEV_RS485_CURTAIN|窗帘|设备回路状态| + |14|DEV_RS485_RELAY|继电器|设备回路状态| + |15|DEV_RS485_IR_SEND|红外发送|设备回路状态| + |16|DEV_RS485_DIMMING|调光驱动|设备回路状态| + |17|DEV_RS485_TRAIC|可控硅调光(可控硅状态)|设备回路状态| + |18|DEV_RS485_STRIP|灯带(灯带状态) --2025-11-24 取消|无效| + |19|DEV_RS485_CoreCtrl|中控|无效| + |20|DEV_RS485_WxLock|微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)|无效| + |21|DEV_RS485_MUSIC|背景音乐(背景音乐状态)|设备回路状态| + |22|DEV_NET_ROOMSTATE|房态下发|无效| + |23|Dev_Host_PWMLight|主机本地 调光|无效| + |24|DEV_RS485_PWM|485PWM 调光( PWM 调光状态)|无效| + |25|DEV_PB_LED|总线调光( PBLED 调光状态) - 没使用 -|无效| + |26|DEV_RCU_POWER|RCU 电源|无效| + |27|DEV_RS485_A9_IO_SWT|A9IO 开关|用户操作| + |28|DEV_RS485_A9_IO_EXP|A9IO 扩展|设备回路状态| + |29|DEV_RS485_A9_IO_POWER|A9IO 电源|设备回路状态| + |30|DEV_RS485_RFGatewayCycle|无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)|无效| + |31|DEV_RS485_RFGatewayHost|无线网关主动(用于主动控制主动设备)|无效| + |32|DEV_RS485_RFGatewayDoor|无线门磁|用户操作| + |33|DEV_RS485_AirReveal|空气参数显示设备|设备回路状态| + |34|DEV_RS485_RFGatewayRelayPir|无线继电器红外|设备回路状态| + |35|Dev_Host_TimeCtrl|时间同步|设备回路状态| + |36|Dev_Rs458_MonitorCtrl|监控控制|无效| + |37|Dev_Rs458_RotaryCtrl|旋钮开关控制|用户操作| + |38|Dev_BUS_C5IO|CSIO - 类型|设备回路状态| + |39|Dev_RS485_CardState|插卡状态虚拟设备|设备回路状态| + |40|DEV_RS485_FreshAir|485 新风设备|用户操作| + |41|DEV_RS485_FaceMach|485 人脸机|用户操作| + |42|DEV_Center_Control|中控|无效| + |43|DEV_Domain_Control|域控|无效| + |44|DEV_RS485_LCD|LCD|设备回路状态| + |45|DEV_Virtual_NoCard|无卡断电 --2025-11-24 取消|无效| + |46|DEV_Virtual_Card|无卡取电 2|用户操作| + |47|DEV_Virtual_Time|虚拟时间设备|设备回路状态| + |48|Dev_Rs485_PB20|PLC 总控|设备回路状态| + |49|Dev_Rs485_PB20_LD|PLC 设备 - 恒流调光设备|设备回路状态| + |50|Dev_Rs485_PB20_LS|PLC 设备 - 恒压调光设备|设备回路状态| + |51|Dev_Rs485_PB20_Relay|PLC 设备 - 继电器设备|设备回路状态| + |52|DEV_Virtual_ColorTemp|色温调节功能|设备回路状态| + |53|Dev_485_BLE_Music|蓝牙音频|设备回路状态| + |54|DEV_Carbon_Saved|碳达人|用户操作| + |55|Dev_Scene_Restore|场景还原|用户操作| + |56|Dev_Virtual_GlobalSet|全局设置|设备回路状态| + |57|Dev_Energy_Monitor|能耗检测|设备回路状态| + |241|Dev_BUS_C5IO|CSIO - 类型|设备回路状态| **Direction (方向)**: - `"上报"`: Upload @@ -89,4 +153,3 @@ **Mapping**: - `details`: `{ "ack_code": "0x00" }` - `extra`: `{ "raw_hex": "..." }` -