- 新增 BatchProcessor 类实现消息批量插入,提高数据库写入性能 - 在 consumer 中禁用 autoCommit 并实现手动提交,确保数据一致性 - 添加数据库健康检查机制,在数据库离线时暂停消费并自动恢复 - 支持 0x0E 命令字处理,扩展消息类型识别范围 - 增加数据库连接重试逻辑,解决 Windows 环境端口冲突问题 - 更新环境变量配置,优化 Kafka 消费者参数 - 添加相关单元测试验证批量处理和可靠性功能
297 lines
8.3 KiB
JavaScript
297 lines
8.3 KiB
JavaScript
import { describe, it, expect, vi } from 'vitest';
|
||
import { buildRowsFromPayload } from '../src/processor/index.js';
|
||
import projectMetadata from '../src/cache/projectMetadata.js';
|
||
|
||
// Mock config to ensure loop name generation is enabled
|
||
vi.mock('../src/config/config.js', async (importOriginal) => {
|
||
const actual = await importOriginal();
|
||
return {
|
||
...actual,
|
||
config: {
|
||
...actual.config,
|
||
enableLoopNameAutoGeneration: true,
|
||
},
|
||
};
|
||
});
|
||
|
||
describe('Processor Logic', () => {
|
||
const basePayload = {
|
||
ts_ms: 1700000000000,
|
||
hotel_id: 1001,
|
||
room_id: '8001',
|
||
device_id: 'dev_001',
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
frame_id: 1,
|
||
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
|
||
sys_lock_status: 0,
|
||
report_count: 0,
|
||
fault_count: 0
|
||
};
|
||
|
||
it('should validate required fields', () => {
|
||
expect(() => buildRowsFromPayload({})).toThrow();
|
||
expect(() => buildRowsFromPayload({ ...basePayload, ts_ms: undefined })).toThrow();
|
||
});
|
||
|
||
it('should handle 0x36 Status Report with device list', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
report_count: 2,
|
||
device_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
|
||
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(2);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
expect(rows[0].dev_addr).toBe(10);
|
||
expect(rows[1].dev_addr).toBe(11);
|
||
expect(rows[0].details.device_list).toHaveLength(2);
|
||
});
|
||
|
||
it('should handle 0x36 Fault Report', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
fault_count: 1,
|
||
fault_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
expect(rows[0].error_type).toBe(2);
|
||
});
|
||
|
||
it('should handle 0x36 Mixed Report (Status + Fault)', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
report_count: 1,
|
||
fault_count: 1,
|
||
device_list: [{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }],
|
||
fault_list: [{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(2); // 1 status + 1 fault
|
||
});
|
||
|
||
it('should handle 0x0F Control Command', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '下发',
|
||
cmd_word: '0x0F',
|
||
control_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, type_l: 1, type_h: 2 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('下发控制');
|
||
expect(rows[0].type_l).toBe(1);
|
||
expect(rows[0].type_h).toBe(2);
|
||
expect(rows[0].dev_loop).toBe(1);
|
||
});
|
||
|
||
it('should handle 0x0F ACK', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x0F',
|
||
control_list: [
|
||
{ dev_type: 1, dev_addr: 1, dev_loop: 1, dev_data: 1, type_h: 0 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
});
|
||
|
||
it('should fallback when lists are empty for 0x36', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [],
|
||
fault_list: []
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
expect(rows[0].dev_type).toBeNull();
|
||
});
|
||
|
||
it('should classify 0x36 as 用户操作 when dev_type is user-operated', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [
|
||
{ dev_type: 2, dev_addr: 10, dev_loop: 1, dev_data: 100 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('用户操作');
|
||
});
|
||
|
||
it('should store udp_raw as base64 when input is hex string', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [],
|
||
fault_list: []
|
||
};
|
||
|
||
const expectedBase64 = Buffer.from(payload.udp_raw.replace(/[\s:]/g, ''), 'hex').toString('base64');
|
||
const rows = buildRowsFromPayload(payload);
|
||
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].udp_raw).toBe(expectedBase64);
|
||
});
|
||
|
||
it('should keep udp_raw unchanged when input is not hex string', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
udp_raw: 'YWJjMTIz',
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
|
||
expect(rows[0].udp_raw).toBe('YWJjMTIz');
|
||
});
|
||
|
||
it('should default extra to empty object when not provided', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [],
|
||
fault_list: []
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
|
||
expect(rows[0].extra).toEqual({});
|
||
});
|
||
|
||
it('should preserve extra when provided by upstream', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [],
|
||
fault_list: [],
|
||
extra: {
|
||
source: 'upstream',
|
||
trace_id: 'trace-123'
|
||
}
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
|
||
expect(rows[0].extra).toEqual({
|
||
source: 'upstream',
|
||
trace_id: 'trace-123'
|
||
});
|
||
});
|
||
|
||
it('should enrich rows with loop_name from metadata', () => {
|
||
// Mock metadata
|
||
projectMetadata.roomMap.set('dev_001', 101);
|
||
// Key format: roomTypeId:00Type00Addr00Loop
|
||
// type=1, addr=10, loop=1 -> 001010001
|
||
projectMetadata.loopMap.set('101:001010001', 'Main Chandelier');
|
||
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x36',
|
||
device_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }, // Should match 001010001
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 2, dev_data: 0 } // Should not match (001010002) -> Fallback
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows[0].loop_name).toBe('Main Chandelier');
|
||
// dev_type 1 is '强电继电器(输出状态)'
|
||
expect(rows[1].loop_name).toBe('[1强电继电器(输出状态)-10-2]');
|
||
});
|
||
});
|
||
|
||
describe('Processor Logic - 0x0E Support', () => {
|
||
const basePayload = {
|
||
ts_ms: 1700000000000,
|
||
hotel_id: 1001,
|
||
room_id: '8001',
|
||
device_id: 'dev_001',
|
||
direction: '上报',
|
||
cmd_word: '0x0E',
|
||
frame_id: 1,
|
||
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
|
||
sys_lock_status: 0,
|
||
report_count: 0,
|
||
fault_count: 0
|
||
};
|
||
|
||
it('should handle 0x0E Status Report with device list (same as 0x36)', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x0E',
|
||
report_count: 2,
|
||
device_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
|
||
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(2);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
expect(rows[0].dev_addr).toBe(10);
|
||
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
|
||
expect(rows[1].dev_addr).toBe(11);
|
||
expect(rows[0].details.device_list).toHaveLength(2);
|
||
});
|
||
|
||
it('should handle 0x0E Fault Report', () => {
|
||
const payload = {
|
||
...basePayload,
|
||
direction: '上报',
|
||
cmd_word: '0x0E',
|
||
fault_count: 1,
|
||
fault_list: [
|
||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
|
||
]
|
||
};
|
||
|
||
const rows = buildRowsFromPayload(payload);
|
||
expect(rows).toHaveLength(1);
|
||
expect(rows[0].action_type).toBe('设备回路状态');
|
||
expect(rows[0].error_type).toBe(2);
|
||
expect(rows[0].cmd_word).toBe('0x0e');
|
||
});
|
||
});
|