feat: 增强Kafka消费者配置并完善action_type分类逻辑

- 支持多实例Kafka消费者,增加并发处理能力
- 新增Kafka配置参数:最大飞行中消息数、获取字节数、等待时间等
- 完善action_type分类逻辑,根据dev_type映射为"用户操作"或"设备回路状态"
- 临时支持hex格式udp_raw自动转换为base64存储
- 增加extra字段支持上游扩展数据传递
- 优化数据库初始化脚本查找路径
- 更新PM2配置,修改应用名称和端口
- 清理开发环境日志文件,将dist目录加入.gitignore
- 更新相关文档说明
This commit is contained in:
2026-01-30 20:09:46 +08:00
parent e0c3728b42
commit 0e6c5c3cc3
18 changed files with 459 additions and 57 deletions

View File

@@ -9,6 +9,11 @@ KAFKA_SASL_MECHANISM=plain
KAFKA_SASL_USERNAME=blwmomo
KAFKA_SASL_PASSWORD=blwmomo
KAFKA_SSL_ENABLED=false
KAFKA_CONSUMER_INSTANCES=6
KAFKA_MAX_IN_FLIGHT=50
KAFKA_FETCH_MAX_BYTES=10485760
KAFKA_FETCH_MAX_WAIT_MS=100
KAFKA_FETCH_MIN_BYTES=1
POSTGRES_HOST=10.8.8.109
POSTGRES_PORT=5433

View File

@@ -1,24 +1,22 @@
module.exports = {
apps: [{
name: 'bls-rcu-action-backend',
name: 'bls-rcu-action',
script: 'dist/index.js',
instances: 1,
exec_mode: 'fork',
autorestart: true,
watch: false,
max_memory_restart: '1G',
env_file: '.env',
env: {
NODE_ENV: 'production',
PORT: 3000
},
env_development: {
NODE_ENV: 'development',
PORT: 3000
PORT: 3001
},
error_file: './logs/error.log',
out_file: './logs/out.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
merge_logs: true,
kill_timeout: 5000,
time: true
}]
};

View File

@@ -1 +0,0 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689985427,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -1 +0,0 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689777074,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769688900027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689140027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -1 +0,0 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689260031,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -22,6 +22,12 @@ export const config = {
groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group',
clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client',
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50),
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024),
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1),
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
autoCommitIntervalMs: parseNumber(process.env.KAFKA_AUTO_COMMIT_INTERVAL_MS, 5000),
logMessages: process.env.KAFKA_LOG_MESSAGES === 'true',
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain',
username: process.env.KAFKA_SASL_USERNAME,

View File

@@ -68,7 +68,15 @@ class DatabaseInitializer {
// dbManager connects to the target database
const client = await dbManager.pool.connect();
try {
const sqlPath = path.resolve(__dirname, '../../scripts/init_db.sql');
const sqlPathCandidates = [
path.resolve(process.cwd(), 'scripts/init_db.sql'),
path.resolve(__dirname, '../scripts/init_db.sql'),
path.resolve(__dirname, '../../scripts/init_db.sql')
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
}
const sql = fs.readFileSync(sqlPath, 'utf8');
logger.info('Executing init_db.sql...');

View File

@@ -3,7 +3,7 @@ import { config } from './config/config.js';
import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js';
import { createKafkaConsumer } from './kafka/consumer.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js';
import { RedisIntegration } from './redis/redisIntegration.js';
@@ -79,13 +79,23 @@ const bootstrap = async () => {
const messageKey = Buffer.isBuffer(message.key)
? message.key.toString('utf8')
: message.key;
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
value: messageValue
});
if (config.kafka.logMessages) {
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
value: messageValue
});
} else {
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
valueLength: typeof messageValue === 'string' ? messageValue.length : null
});
}
const inserted = await processKafkaMessage({ message, dbManager, config });
metricCollector.increment('db_inserted');
logger.info('Kafka message processed', { inserted });
@@ -143,7 +153,7 @@ const bootstrap = async () => {
}
};
const consumer = createKafkaConsumer({
const consumers = createKafkaConsumers({
kafkaConfig: config.kafka,
onMessage: handleMessage,
onError: handleError
@@ -170,9 +180,9 @@ const bootstrap = async () => {
try {
// 1. Close Kafka Consumer
if (consumer) {
await new Promise((resolve) => consumer.close(true, resolve));
logger.info('Kafka consumer closed');
if (consumers && consumers.length > 0) {
await Promise.all(consumers.map(c => new Promise((resolve) => c.close(true, resolve))));
logger.info('Kafka consumer closed', { count: consumers.length });
}
// 2. Stop Redis Heartbeat (if method exists, otherwise just close client)

View File

@@ -3,29 +3,54 @@ import { logger } from '../utils/logger.js';
const { ConsumerGroup } = kafka;
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => {
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
const kafkaHost = kafkaConfig.brokers.join(',');
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
const id = `${clientId}-${process.pid}-${Date.now()}`;
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
let inFlight = 0;
const consumer = new ConsumerGroup(
{
kafkaHost,
groupId: kafkaConfig.groupId,
clientId: kafkaConfig.clientId,
clientId,
id,
fromOffset: 'earliest',
protocol: ['roundrobin'],
outOfRangeOffset: 'latest',
autoCommit: true,
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
fetchMinBytes: kafkaConfig.fetchMinBytes,
fetchMaxWaitMs: kafkaConfig.fetchMaxWaitMs,
sasl: kafkaConfig.sasl
},
kafkaConfig.topic
);
const tryResume = () => {
if (inFlight < maxInFlight) {
consumer.resume();
}
};
consumer.on('message', (message) => {
onMessage(message).catch((error) => {
logger.error('Kafka message handling failed', { error: error?.message });
if (onError) {
onError(error, message);
}
});
inFlight += 1;
if (inFlight >= maxInFlight) {
consumer.pause();
}
Promise.resolve(onMessage(message))
.catch((error) => {
logger.error('Kafka message handling failed', { error: error?.message });
if (onError) {
onError(error, message);
}
})
.finally(() => {
inFlight -= 1;
tryResume();
});
});
consumer.on('error', (error) => {
@@ -37,3 +62,14 @@ export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => {
return consumer;
};
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
const count = Math.max(1, instances);
return Array.from({ length: count }, (_, idx) =>
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
);
};
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) =>
createKafkaConsumers({ kafkaConfig, onMessage, onError })[0];

View File

@@ -23,14 +23,13 @@ const normalizeCmdWord = (value) => {
}
return trimmed;
}
// The Zod schema might have already converted numbers to strings, but let's be safe
if (typeof value === 'number' && Number.isFinite(value)) {
return `0x${value.toString(16).toLowerCase()}`;
}
return null;
};
const resolveActionType = (direction, cmdWord) => {
const resolveMessageType = (direction, cmdWord) => {
if (cmdWord === '0x36') {
return '36上报';
}
@@ -43,6 +42,119 @@ const resolveActionType = (direction, cmdWord) => {
return null;
};
const defaultDevTypeActionMap = {
0: '无效',
1: '设备回路状态',
2: '用户操作',
3: '设备回路状态',
4: '设备回路状态',
5: '设备回路状态',
6: '用户操作',
7: '用户操作',
8: '用户操作',
9: '设备回路状态',
10: '用户操作',
11: '用户操作',
12: '无效',
13: '设备回路状态',
14: '设备回路状态',
15: '设备回路状态',
16: '设备回路状态',
17: '设备回路状态',
18: '无效',
19: '无效',
20: '无效',
21: '设备回路状态',
22: '无效',
23: '无效',
24: '无效',
25: '无效',
26: '无效',
27: '用户操作',
28: '设备回路状态',
29: '设备回路状态',
30: '无效',
31: '无效',
32: '用户操作',
33: '设备回路状态',
34: '设备回路状态',
35: '设备回路状态',
36: '无效',
37: '用户操作',
38: '设备回路状态',
39: '设备回路状态',
40: '用户操作',
41: '用户操作',
42: '无效',
43: '无效',
44: '设备回路状态',
45: '无效',
46: '用户操作',
47: '设备回路状态',
48: '设备回路状态',
49: '设备回路状态',
50: '设备回路状态',
51: '设备回路状态',
52: '设备回路状态',
53: '设备回路状态',
54: '用户操作',
55: '用户操作',
56: '设备回路状态',
57: '设备回路状态',
241: '设备回路状态'
};
const buildDevTypeActionMap = () => {
const raw = process.env.ACTION_TYPE_DEV_TYPE_RULES;
if (!raw || typeof raw !== 'string' || !raw.trim()) {
return defaultDevTypeActionMap;
}
try {
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) {
return defaultDevTypeActionMap;
}
const overrides = {};
parsed.forEach((item) => {
if (Array.isArray(item) && item.length >= 2) {
const devType = Number(item[0]);
const actionType = item[1];
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
overrides[devType] = actionType;
}
return;
}
if (item && typeof item === 'object') {
const devType = Number(item.dev_type ?? item.devType);
const actionType = item.action_type ?? item.actionType ?? item.action;
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
overrides[devType] = actionType;
}
}
});
return { ...defaultDevTypeActionMap, ...overrides };
} catch {
return defaultDevTypeActionMap;
}
};
const devTypeActionMap = buildDevTypeActionMap();
const resolveDevTypeAction = (devType) => {
if (typeof devType !== 'number') {
return '设备回路状态';
}
const mapped = devTypeActionMap[devType];
if (mapped) {
return mapped;
}
return '设备回路状态';
};
const parseKafkaPayload = (value) => {
const raw = Buffer.isBuffer(value) ? value.toString('utf8') : value;
if (typeof raw !== 'string') {
@@ -56,6 +168,27 @@ export const buildRowsFromMessageValue = (value) => {
return buildRowsFromPayload(payload);
};
const isHexString = (value) => {
if (typeof value !== 'string') {
return false;
}
const trimmed = value.trim();
if (!trimmed) {
return false;
}
const compact = trimmed.replace(/[\s:]/g, '');
if (compact.length === 0 || compact.length % 2 !== 0) {
return false;
}
return /^[0-9a-fA-F]+$/.test(compact);
};
const hexToBase64 = (hex) => {
const compact = hex.replace(/[\s:]/g, '');
const buffer = Buffer.from(compact, 'hex');
return buffer.toString('base64');
};
export const buildRowsFromPayload = (rawPayload) => {
// 1. Validate and transform payload using Zod schema
const payload = kafkaPayloadSchema.parse(rawPayload);
@@ -68,18 +201,23 @@ export const buildRowsFromPayload = (rawPayload) => {
direction,
cmd_word: cmdWord,
frame_id: frameId,
udp_raw: udpRaw,
udp_raw: udpRawRaw,
sys_lock_status: sysLockStatus,
report_count: reportCount,
fault_count: faultCount,
device_list: deviceList, // Zod provides default []
fault_list: faultList, // Zod provides default []
control_list: controlList // Zod provides default []
control_list: controlList, // Zod provides default []
extra
} = payload;
// Temporary migration logic: if udp_raw is hex, convert to base64 before insert.
// This is a transitional mechanism and should be removed once all upstream systems send base64 directly.
const udpRaw = isHexString(udpRawRaw) ? hexToBase64(udpRawRaw) : udpRawRaw;
const normalizedDirection = normalizeDirection(direction);
const normalizedCmdWord = normalizeCmdWord(cmdWord);
const actionType = resolveActionType(normalizedDirection, normalizedCmdWord);
const messageType = resolveMessageType(normalizedDirection, normalizedCmdWord);
const writeTsMs = Date.now();
// Base fields common to all rows (excluding unique ID)
@@ -88,12 +226,11 @@ export const buildRowsFromPayload = (rawPayload) => {
write_ts_ms: writeTsMs,
hotel_id: hotelId,
room_id: roomId,
device_id: deviceId, // Pass through normalized/validated device_id
device_id: deviceId,
direction: normalizedDirection,
cmd_word: normalizedCmdWord,
frame_id: frameId,
udp_raw: udpRaw,
action_type: actionType,
sys_lock_status: sysLockStatus ?? null,
report_count: reportCount ?? null,
fault_count: faultCount ?? null,
@@ -107,13 +244,13 @@ export const buildRowsFromPayload = (rawPayload) => {
type_l: null,
type_h: null,
details: null,
extra: { raw_hex: udpRaw }
extra: extra || {}
};
const rows = [];
// Logic 1: 0x36 Status/Fault Report
if (actionType === '36上报') {
if (messageType === '36上报') {
const details = {
device_list: deviceList,
fault_list: faultList
@@ -122,6 +259,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Process device status list
if (deviceList.length > 0) {
deviceList.forEach(device => {
const actionType = resolveDevTypeAction(device.dev_type);
rows.push({
...commonFields,
guid: createGuid(),
@@ -129,6 +267,7 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_addr: device.dev_addr ?? null,
dev_loop: device.dev_loop ?? null,
dev_data: device.dev_data ?? null,
action_type: actionType,
details
});
});
@@ -137,6 +276,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Process fault list
if (faultList.length > 0) {
faultList.forEach(fault => {
const actionType = resolveDevTypeAction(fault.dev_type);
rows.push({
...commonFields,
guid: createGuid(),
@@ -146,6 +286,7 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_loop: fault.dev_loop ?? null,
error_type: fault.error_type ?? null,
error_data: fault.error_data ?? null,
action_type: actionType,
details
});
});
@@ -156,6 +297,7 @@ export const buildRowsFromPayload = (rawPayload) => {
rows.push({
...commonFields,
guid: createGuid(),
action_type: '设备回路状态',
details
});
}
@@ -164,7 +306,7 @@ export const buildRowsFromPayload = (rawPayload) => {
}
// Logic 2: 0x0F Control Command
if (actionType === '0F下发') {
if (messageType === '0F下发') {
const details = {
control_list: controlList
};
@@ -179,6 +321,7 @@ export const buildRowsFromPayload = (rawPayload) => {
dev_loop: control.dev_loop ?? null,
type_l: control.type_l ?? null,
type_h: control.type_h ?? null,
action_type: '下发控制',
details
});
});
@@ -189,6 +332,7 @@ export const buildRowsFromPayload = (rawPayload) => {
rows.push({
...commonFields,
guid: createGuid(),
action_type: '下发控制',
details
});
}
@@ -197,10 +341,15 @@ export const buildRowsFromPayload = (rawPayload) => {
}
// Logic 3: 0x0F ACK or others
// Default behavior: single row
const fallbackActionType =
normalizedCmdWord === '0x0f' && normalizedDirection === '上报'
? 'ACK'
: '无效';
return [{
...commonFields,
guid: createGuid(),
action_type: fallbackActionType,
details: {}
}];
};

View File

@@ -55,5 +55,10 @@ export const kafkaPayloadSchema = z.object({
// Lists
device_list: listSchema(deviceItemSchema),
fault_list: listSchema(faultItemSchema),
control_list: listSchema(controlItemSchema)
control_list: listSchema(controlItemSchema),
extra: z.preprocess(
(value) => (value === null ? {} : value),
z.any().optional().default({})
)
});

View File

@@ -10,7 +10,7 @@ describe('Processor Logic', () => {
direction: '上报',
cmd_word: '0x36',
frame_id: 1,
udp_raw: '3601...',
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
sys_lock_status: 0,
report_count: 0,
fault_count: 0
@@ -35,7 +35,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_addr).toBe(10);
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
@@ -54,7 +54,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].error_type).toBe(2);
});
@@ -85,7 +85,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0F下发');
expect(rows[0].action_type).toBe('下发控制');
expect(rows[0].type_l).toBe(1);
expect(rows[0].type_h).toBe(2);
expect(rows[0].dev_loop).toBe(1);
@@ -100,7 +100,7 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0FACK');
expect(rows[0].action_type).toBe('ACK');
});
it('should fallback when lists are empty for 0x36', () => {
@@ -114,7 +114,89 @@ describe('Processor Logic', () => {
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].action_type).toBe('设备回路状态');
expect(rows[0].dev_type).toBeNull();
});
it('should classify 0x36 as 用户操作 when dev_type is user-operated', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 2, dev_addr: 10, dev_loop: 1, dev_data: 100 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('用户操作');
});
it('should store udp_raw as base64 when input is hex string', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: []
};
const expectedBase64 = Buffer.from(payload.udp_raw.replace(/[\s:]/g, ''), 'hex').toString('base64');
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].udp_raw).toBe(expectedBase64);
});
it('should keep udp_raw unchanged when input is not hex string', () => {
const payload = {
...basePayload,
udp_raw: 'YWJjMTIz',
direction: '上报',
cmd_word: '0x36',
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].udp_raw).toBe('YWJjMTIz');
});
it('should default extra to empty object when not provided', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: []
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].extra).toEqual({});
});
it('should preserve extra when provided by upstream', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: [],
extra: {
source: 'upstream',
trace_id: 'trace-123'
}
};
const rows = buildRowsFromPayload(payload);
expect(rows[0].extra).toEqual({
source: 'upstream',
trace_id: 'trace-123'
});
});
});