feat: 初始化后端服务基础架构与核心组件

- 添加项目基础结构,包括 .gitignore、package.json、Docker 配置和环境变量示例
- 实现核心模块:Kafka 消费者、PostgreSQL 数据库管理器、Redis 客户端与错误队列
- 添加工具类:日志记录器、指标收集器、UUID 生成器
- 实现数据处理器,支持 0x36 上报和 0x0F 命令的解析与存储
- 添加数据库初始化脚本和分区管理,支持按时间范围分区
- 引入 Zod 数据验证和 Vitest 单元测试框架
- 提供完整的项目文档,包括数据库设计、Kafka 格式规范和 Redis 集成协议
This commit is contained in:
2026-01-30 11:05:00 +08:00
parent ec2b44b165
commit 86a1e79153
51 changed files with 5921 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
KAFKA_BROKERS=kafka.blv-oa.com:9092
KAFKA_CLIENT_ID=bls-action-producer
KAFKA_GROUP_ID=bls-action-consumer
KAFKA_TOPICS=blwlog4Nodejs-rcu-action-topic
KAFKA_AUTO_COMMIT=true
KAFKA_AUTO_COMMIT_INTERVAL_MS=5000
KAFKA_SASL_ENABLED=true
KAFKA_SASL_MECHANISM=plain
KAFKA_SASL_USERNAME=blwmomo
KAFKA_SASL_PASSWORD=blwmomo
KAFKA_SSL_ENABLED=false
POSTGRES_HOST=10.8.8.109
POSTGRES_PORT=5433
POSTGRES_DATABASE=log_platform
POSTGRES_USER=log_admin
POSTGRES_PASSWORD=YourActualStrongPasswordForPostgres!
POSTGRES_MAX_CONNECTIONS=6
POSTGRES_IDLE_TIMEOUT_MS=30000
PORT=3001
LOG_LEVEL=info
# Redis connection
REDIS_HOST=10.8.8.109
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=15
REDIS_CONNECT_TIMEOUT_MS=5000

View File

@@ -0,0 +1,29 @@
# Server Configuration
PORT=3000
NODE_ENV=development
# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_TOPIC=my-topic-name
KAFKA_GROUP_ID=my-group-id
KAFKA_CLIENT_ID=my-client-id
KAFKA_CONSUMER_INSTANCES=1
# KAFKA_SASL_USERNAME=
# KAFKA_SASL_PASSWORD=
# KAFKA_SASL_MECHANISM=plain
# Database Configuration (PostgreSQL)
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASSWORD=password
DB_DATABASE=my_database
DB_MAX_CONNECTIONS=10
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_PROJECT_NAME=my-project
REDIS_API_BASE_URL=http://localhost:3000

View File

@@ -0,0 +1,19 @@
FROM node:18-alpine
WORKDIR /app
# Install dependencies
COPY package.json package-lock.json ./
RUN npm ci
# Copy source code
COPY . .
# Build
RUN npm run build
# Expose port
EXPOSE 3000
# Start command
CMD ["npm", "run", "start"]

1
bls-rcu-action-backend/dist/.gitkeep vendored Normal file
View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,55 @@
version: '3.8'
services:
app:
build: .
restart: always
ports:
- "3000:3000"
env_file:
- .env
depends_on:
- postgres
- redis
- kafka
postgres:
image: postgres:15-alpine
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: my_database
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:alpine
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
volumes:
postgres_data:
redis_data:

View File

@@ -0,0 +1,24 @@
module.exports = {
apps: [{
name: 'bls-rcu-action-backend',
script: 'dist/index.js',
instances: 1,
exec_mode: 'fork',
autorestart: true,
watch: false,
max_memory_restart: '1G',
env: {
NODE_ENV: 'production',
PORT: 3000
},
env_development: {
NODE_ENV: 'development',
PORT: 3000
},
error_file: './logs/error.log',
out_file: './logs/out.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
merge_logs: true,
time: true
}]
};

View File

@@ -0,0 +1 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689985427,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -0,0 +1 @@
{"level":"error","message":"Kafka message handling failed","timestamp":1769689777074,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n },\n {\n \"expected\": \"array\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"control_list\"\n ],\n \"message\": \"Invalid input: expected array, received null\"\n }\n]"}}

View File

@@ -0,0 +1 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769688900027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -0,0 +1 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689140027,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

View File

@@ -0,0 +1 @@
{"level":"info","message":"[Minute Metrics] Pulled: 0, Parse Error: 0, Inserted: 0, Failed: 0","timestamp":1769689260031,"context":{"kafka_pulled":0,"parse_error":0,"db_inserted":0,"db_failed":0}}

3526
bls-rcu-action-backend/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,24 @@
{
"name": "bls-rcu-action-backend",
"version": "1.0.0",
"type": "module",
"private": true,
"scripts": {
"dev": "node src/index.js",
"build": "vite build --ssr src/index.js --outDir dist",
"test": "vitest run",
"start": "node dist/index.js"
},
"dependencies": {
"dotenv": "^16.4.5",
"kafka-node": "^5.0.0",
"node-cron": "^4.2.1",
"pg": "^8.11.5",
"redis": "^4.6.13",
"zod": "^4.3.6"
},
"devDependencies": {
"vite": "^5.4.0",
"vitest": "^4.0.18"
}
}

View File

@@ -0,0 +1,45 @@
-- Database Initialization Script for BLS RCU Action Server
CREATE SCHEMA IF NOT EXISTS rcu_action;
CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
guid VARCHAR(32) NOT NULL,
ts_ms BIGINT NOT NULL,
write_ts_ms BIGINT NOT NULL,
hotel_id INTEGER NOT NULL,
room_id VARCHAR(32) NOT NULL,
device_id VARCHAR(32) NOT NULL,
direction VARCHAR(10) NOT NULL,
cmd_word VARCHAR(10) NOT NULL,
frame_id INTEGER NOT NULL,
udp_raw TEXT NOT NULL,
action_type VARCHAR(20) NOT NULL,
sys_lock_status SMALLINT,
report_count SMALLINT,
dev_type SMALLINT,
dev_addr SMALLINT,
dev_loop INTEGER,
dev_data INTEGER,
fault_count SMALLINT,
error_type SMALLINT,
error_data SMALLINT,
type_l SMALLINT,
type_h SMALLINT,
details JSONB,
extra JSONB,
PRIMARY KEY (ts_ms, guid)
) PARTITION BY RANGE (ts_ms);
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);
CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);
CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);
-- Composite Index for typical query pattern (Hotel + Room + Time)
CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);

View File

@@ -0,0 +1,50 @@
import dotenv from 'dotenv';
dotenv.config();
const parseNumber = (value, defaultValue) => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : defaultValue;
};
const parseList = (value) =>
(value || '')
.split(',')
.map((item) => item.trim())
.filter(Boolean);
export const config = {
env: process.env.NODE_ENV || 'development',
port: parseNumber(process.env.PORT, 3000),
kafka: {
brokers: parseList(process.env.KAFKA_BROKERS),
topic: process.env.KAFKA_TOPIC || process.env.KAFKA_TOPICS || 'blwlog4Nodejs-rcu-action-topic',
groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group',
clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client',
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
sasl: process.env.KAFKA_SASL_USERNAME && process.env.KAFKA_SASL_PASSWORD ? {
mechanism: process.env.KAFKA_SASL_MECHANISM || 'plain',
username: process.env.KAFKA_SASL_USERNAME,
password: process.env.KAFKA_SASL_PASSWORD
} : undefined
},
db: {
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || '',
database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || 'bls_rcu_action',
max: parseNumber(process.env.DB_MAX_CONNECTIONS || process.env.POSTGRES_MAX_CONNECTIONS, 10),
ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : undefined,
schema: process.env.DB_SCHEMA || 'rcu_action',
table: process.env.DB_TABLE || 'rcu_action_events'
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseNumber(process.env.REDIS_PORT, 6379),
password: process.env.REDIS_PASSWORD || undefined,
db: parseNumber(process.env.REDIS_DB, 0),
projectName: process.env.REDIS_PROJECT_NAME || 'bls-rcu-action',
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
}
};

View File

@@ -0,0 +1,80 @@
import pg from 'pg';
import { config } from '../config/config.js';
import { logger } from '../utils/logger.js';
const { Pool } = pg;
const columns = [
'guid',
'ts_ms',
'write_ts_ms',
'hotel_id',
'room_id',
'device_id',
'direction',
'cmd_word',
'frame_id',
'udp_raw',
'action_type',
'sys_lock_status',
'report_count',
'dev_type',
'dev_addr',
'dev_loop',
'dev_data',
'fault_count',
'error_type',
'error_data',
'type_l',
'type_h',
'details',
'extra'
];
export class DatabaseManager {
constructor(dbConfig) {
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
}
async insertRows({ schema, table, rows }) {
if (!rows || rows.length === 0) {
return;
}
const values = [];
const placeholders = rows.map((row, rowIndex) => {
const offset = rowIndex * columns.length;
columns.forEach((column) => {
values.push(row[column] ?? null);
});
const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
return `(${params.join(', ')})`;
});
const statement = `INSERT INTO ${schema}.${table} (${columns.join(', ')}) VALUES ${placeholders.join(', ')}`;
try {
await this.pool.query(statement, values);
} catch (error) {
logger.error('Database insert failed', {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async close() {
await this.pool.end();
}
}
const dbManager = new DatabaseManager(config.db);
export default dbManager;

View File

@@ -0,0 +1,86 @@
import pg from 'pg';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import { logger } from '../utils/logger.js';
import partitionManager from './partitionManager.js';
import dbManager from './databaseManager.js';
import { config } from '../config/config.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
class DatabaseInitializer {
async initialize() {
logger.info('Starting database initialization check...');
// 1. Check if database exists, create if not
await this.ensureDatabaseExists();
// 2. Initialize Schema and Parent Table (if not exists)
// Note: We need to use dbManager because it connects to the target database
await this.ensureSchemaAndTable();
// 3. Ensure Partitions for the next month
await partitionManager.ensurePartitions(30);
logger.info('Database initialization completed successfully.');
}
async ensureDatabaseExists() {
const { host, port, user, password, database, ssl } = config.db;
// Connect to 'postgres' database to check/create target database
const client = new pg.Client({
host,
port,
user,
password,
database: 'postgres',
ssl: ssl ? { rejectUnauthorized: false } : false
});
try {
await client.connect();
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]
);
if (checkRes.rowCount === 0) {
logger.info(`Database '${database}' does not exist. Creating...`);
// CREATE DATABASE cannot run inside a transaction block
await client.query(`CREATE DATABASE "${database}"`);
logger.info(`Database '${database}' created.`);
} else {
logger.info(`Database '${database}' already exists.`);
}
} catch (err) {
logger.error('Error ensuring database exists:', err);
throw err;
} finally {
await client.end();
}
}
async ensureSchemaAndTable() {
// dbManager connects to the target database
const client = await dbManager.pool.connect();
try {
const sqlPath = path.resolve(__dirname, '../../scripts/init_db.sql');
const sql = fs.readFileSync(sqlPath, 'utf8');
logger.info('Executing init_db.sql...');
await client.query(sql);
logger.info('Schema and parent table initialized.');
} catch (err) {
logger.error('Error initializing schema and table:', err);
throw err;
} finally {
client.release();
}
}
}
export default new DatabaseInitializer();

View File

@@ -0,0 +1,71 @@
import { logger } from '../utils/logger.js';
import dbManager from './databaseManager.js';
class PartitionManager {
/**
* Calculate the start and end timestamps (milliseconds) for a given date.
* @param {Date} date - The date to calculate for.
* @returns {Object} { startMs, endMs, partitionSuffix }
*/
getPartitionInfo(date) {
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, '0');
const dd = String(date.getDate()).padStart(2, '0');
const partitionSuffix = `${yyyy}${mm}${dd}`;
const start = new Date(date);
start.setHours(0, 0, 0, 0);
const startMs = start.getTime();
const end = new Date(date);
end.setDate(end.getDate() + 1);
end.setHours(0, 0, 0, 0);
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
/**
* Ensure partitions exist for the next N days.
* @param {number} daysAhead - Number of days to pre-create.
*/
async ensurePartitions(daysAhead = 30) {
const client = await dbManager.pool.connect();
try {
logger.info(`Starting partition check for the next ${daysAhead} days...`);
const now = new Date();
for (let i = 0; i < daysAhead; i++) {
const targetDate = new Date(now);
targetDate.setDate(now.getDate() + i);
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
const partitionName = `rcu_action.rcu_action_events_${partitionSuffix}`;
// Check if partition exists
const checkSql = `
SELECT to_regclass($1) as exists;
`;
const checkRes = await client.query(checkSql, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const createSql = `
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF rcu_action.rcu_action_events
FOR VALUES FROM (${startMs}) TO (${endMs});
`;
await client.query(createSql);
}
}
logger.info('Partition check completed.');
} catch (err) {
logger.error('Error ensuring partitions:', err);
throw err;
} finally {
client.release();
}
}
}
export default new PartitionManager();

View File

@@ -0,0 +1,203 @@
import cron from 'node-cron';
import { config } from './config/config.js';
import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js';
import { createKafkaConsumer } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js';
import { RedisIntegration } from './redis/redisIntegration.js';
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
import { MetricCollector } from './utils/metricCollector.js';
import { logger } from './utils/logger.js';
const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
await dbInitializer.initialize();
// Metric Collector
const metricCollector = new MetricCollector();
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
cron.schedule('0 0 * * *', async () => {
logger.info('Running scheduled partition maintenance...');
try {
await partitionManager.ensurePartitions(30);
} catch (err) {
logger.error('Scheduled partition maintenance failed', err);
}
});
// 1.1 Setup Metric Reporting Cron Job (Every minute)
// Moved after redisIntegration initialization
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
// In databaseManager.js it exports `dbManager` instance by default.
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
// Let's check `databaseManager.js` content.
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
// If `databaseManager.js` exports an instance as default, I should use that.
// If it exports a class, I should instantiate it.
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
// I need to verify `databaseManager.js`.
const redisClient = await createRedisClient(config.redis);
const redisIntegration = new RedisIntegration(
redisClient,
config.redis.projectName,
config.redis.apiBaseUrl
);
redisIntegration.startHeartbeat();
// 1.1 Setup Metric Reporting Cron Job (Every minute)
cron.schedule('* * * * *', async () => {
const metrics = metricCollector.getAndReset();
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`;
console.log(report);
logger.info(report, metrics);
try {
await redisIntegration.info('Minute Metrics', metrics);
} catch (err) {
logger.error('Failed to report metrics to Redis', { error: err?.message });
}
});
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
const handleMessage = async (message) => {
if (message.topic) {
metricCollector.increment('kafka_pulled');
}
try {
const messageValue = Buffer.isBuffer(message.value)
? message.value.toString('utf8')
: message.value;
const messageKey = Buffer.isBuffer(message.key)
? message.key.toString('utf8')
: message.key;
logger.info('Kafka message received', {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: messageKey,
value: messageValue
});
const inserted = await processKafkaMessage({ message, dbManager, config });
metricCollector.increment('db_inserted');
logger.info('Kafka message processed', { inserted });
} catch (error) {
if (error.type === 'PARSE_ERROR') {
metricCollector.increment('parse_error');
} else {
metricCollector.increment('db_failed');
}
logger.error('Message processing failed', {
error: error?.message,
type: error?.type,
stack: error?.stack,
rawPayload: error?.rawPayload,
validationIssues: error?.validationIssues,
dbContext: error?.dbContext
});
throw error; // Re-throw to trigger onError
}
};
const handleError = async (error, message) => {
logger.error('Kafka processing error', {
error: error?.message,
type: error?.type,
stack: error?.stack
});
try {
await redisIntegration.error('Kafka processing error', {
module: 'kafka',
stack: error?.stack || error?.message
});
} catch (redisError) {
logger.error('Redis error log failed', { error: redisError?.message });
}
if (message) {
const messageValue = Buffer.isBuffer(message.value)
? message.value.toString('utf8')
: message.value;
try {
await enqueueError(redisClient, errorQueueKey, {
attempts: 0,
value: messageValue,
meta: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
key: message.key
},
timestamp: Date.now()
});
} catch (enqueueError) {
logger.error('Enqueue error payload failed', { error: enqueueError?.message });
}
}
};
const consumer = createKafkaConsumer({
kafkaConfig: config.kafka,
onMessage: handleMessage,
onError: handleError
});
// Start retry worker (non-blocking)
startErrorRetryWorker({
client: redisClient,
queueKey: errorQueueKey,
redisIntegration,
handler: async (item) => {
if (!item?.value) {
throw new Error('Missing value in retry payload');
}
await handleMessage({ value: item.value });
}
}).catch(err => {
logger.error('Retry worker failed', { error: err?.message });
});
// Graceful Shutdown Logic
const shutdown = async (signal) => {
logger.info(`Received ${signal}, shutting down...`);
try {
// 1. Close Kafka Consumer
if (consumer) {
await new Promise((resolve) => consumer.close(true, resolve));
logger.info('Kafka consumer closed');
}
// 2. Stop Redis Heartbeat (if method exists, otherwise just close client)
// redisIntegration.stopHeartbeat(); // Assuming implementation or just rely on client close
// 3. Close Redis Client
await redisClient.quit();
logger.info('Redis client closed');
// 4. Close Database Pool
await dbManager.close();
logger.info('Database connection closed');
process.exit(0);
} catch (err) {
logger.error('Error during shutdown', { error: err?.message });
process.exit(1);
}
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
};
bootstrap().catch((error) => {
logger.error('Service bootstrap failed', { error: error?.message });
process.exit(1);
});

View File

@@ -0,0 +1,39 @@
import kafka from 'kafka-node';
import { logger } from '../utils/logger.js';
const { ConsumerGroup } = kafka;
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) => {
const kafkaHost = kafkaConfig.brokers.join(',');
const consumer = new ConsumerGroup(
{
kafkaHost,
groupId: kafkaConfig.groupId,
clientId: kafkaConfig.clientId,
fromOffset: 'earliest',
protocol: ['roundrobin'],
outOfRangeOffset: 'latest',
autoCommit: true,
sasl: kafkaConfig.sasl
},
kafkaConfig.topic
);
consumer.on('message', (message) => {
onMessage(message).catch((error) => {
logger.error('Kafka message handling failed', { error: error?.message });
if (onError) {
onError(error, message);
}
});
});
consumer.on('error', (error) => {
logger.error('Kafka consumer error', { error: error?.message });
if (onError) {
onError(error);
}
});
return consumer;
};

View File

@@ -0,0 +1,247 @@
import { createGuid } from '../utils/uuid.js';
import { kafkaPayloadSchema } from '../schema/kafkaPayload.js';
const normalizeDirection = (value) => {
if (!value) return null;
if (value === '上报' || value === '上传') return '上报';
if (value === '下发') return '下发';
return value;
};
const normalizeCmdWord = (value) => {
if (typeof value === 'string') {
const trimmed = value.trim();
if (trimmed.startsWith('0x') || trimmed.startsWith('0X')) {
return `0x${trimmed.slice(2).toLowerCase()}`;
}
if (/^[0-9a-fA-F]{2}$/.test(trimmed)) {
return `0x${trimmed.toLowerCase()}`;
}
const parsed = Number(trimmed);
if (Number.isFinite(parsed)) {
return `0x${parsed.toString(16).toLowerCase()}`;
}
return trimmed;
}
// The Zod schema might have already converted numbers to strings, but let's be safe
if (typeof value === 'number' && Number.isFinite(value)) {
return `0x${value.toString(16).toLowerCase()}`;
}
return null;
};
const resolveActionType = (direction, cmdWord) => {
if (cmdWord === '0x36') {
return '36上报';
}
if (cmdWord === '0x0f' && direction === '下发') {
return '0F下发';
}
if (cmdWord === '0x0f' && direction === '上报') {
return '0FACK';
}
return null;
};
const parseKafkaPayload = (value) => {
const raw = Buffer.isBuffer(value) ? value.toString('utf8') : value;
if (typeof raw !== 'string') {
throw new Error('Invalid kafka message value');
}
return JSON.parse(raw);
};
export const buildRowsFromMessageValue = (value) => {
const payload = parseKafkaPayload(value);
return buildRowsFromPayload(payload);
};
export const buildRowsFromPayload = (rawPayload) => {
// 1. Validate and transform payload using Zod schema
const payload = kafkaPayloadSchema.parse(rawPayload);
const {
ts_ms: tsMs,
hotel_id: hotelId,
room_id: roomId,
device_id: deviceId,
direction,
cmd_word: cmdWord,
frame_id: frameId,
udp_raw: udpRaw,
sys_lock_status: sysLockStatus,
report_count: reportCount,
fault_count: faultCount,
device_list: deviceList, // Zod provides default []
fault_list: faultList, // Zod provides default []
control_list: controlList // Zod provides default []
} = payload;
const normalizedDirection = normalizeDirection(direction);
const normalizedCmdWord = normalizeCmdWord(cmdWord);
const actionType = resolveActionType(normalizedDirection, normalizedCmdWord);
const writeTsMs = Date.now();
// Base fields common to all rows (excluding unique ID)
const commonFields = {
ts_ms: tsMs,
write_ts_ms: writeTsMs,
hotel_id: hotelId,
room_id: roomId,
device_id: deviceId, // Pass through normalized/validated device_id
direction: normalizedDirection,
cmd_word: normalizedCmdWord,
frame_id: frameId,
udp_raw: udpRaw,
action_type: actionType,
sys_lock_status: sysLockStatus ?? null,
report_count: reportCount ?? null,
fault_count: faultCount ?? null,
// Initialize nullable fields
dev_type: null,
dev_addr: null,
dev_loop: null,
dev_data: null,
error_type: null,
error_data: null,
type_l: null,
type_h: null,
details: null,
extra: { raw_hex: udpRaw }
};
const rows = [];
// Logic 1: 0x36 Status/Fault Report
if (actionType === '36上报') {
const details = {
device_list: deviceList,
fault_list: faultList
};
// Process device status list
if (deviceList.length > 0) {
deviceList.forEach(device => {
rows.push({
...commonFields,
guid: createGuid(),
dev_type: device.dev_type ?? null,
dev_addr: device.dev_addr ?? null,
dev_loop: device.dev_loop ?? null,
dev_data: device.dev_data ?? null,
details
});
});
}
// Process fault list
if (faultList.length > 0) {
faultList.forEach(fault => {
rows.push({
...commonFields,
guid: createGuid(),
// Use common dev_ fields for fault device identification
dev_type: fault.dev_type ?? null,
dev_addr: fault.dev_addr ?? null,
dev_loop: fault.dev_loop ?? null,
error_type: fault.error_type ?? null,
error_data: fault.error_data ?? null,
details
});
});
}
// Fallback: if no lists, insert one record to preserve the event
if (rows.length === 0) {
rows.push({
...commonFields,
guid: createGuid(),
details
});
}
return rows;
}
// Logic 2: 0x0F Control Command
if (actionType === '0F下发') {
const details = {
control_list: controlList
};
if (controlList.length > 0) {
controlList.forEach(control => {
rows.push({
...commonFields,
guid: createGuid(),
dev_type: control.dev_type ?? null,
dev_addr: control.dev_addr ?? null,
dev_loop: control.dev_loop ?? null,
type_l: control.type_l ?? null,
type_h: control.type_h ?? null,
details
});
});
}
// Fallback
if (rows.length === 0) {
rows.push({
...commonFields,
guid: createGuid(),
details
});
}
return rows;
}
// Logic 3: 0x0F ACK or others
// Default behavior: single row
return [{
...commonFields,
guid: createGuid(),
details: {}
}];
};
export const processKafkaMessage = async ({ message, dbManager, config }) => {
let rows;
try {
const payload = parseKafkaPayload(message.value);
rows = buildRowsFromPayload(payload);
} catch (error) {
error.type = 'PARSE_ERROR';
const rawValue = Buffer.isBuffer(message.value)
? message.value.toString('utf8')
: String(message.value ?? '');
error.rawPayload = rawValue.length > 1000 ? `${rawValue.slice(0, 1000)}...` : rawValue;
if (error?.issues) {
error.validationIssues = error.issues;
}
throw error;
}
try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
} catch (error) {
error.type = 'DB_ERROR';
const sample = rows?.[0];
error.dbContext = {
rowsLength: rows?.length || 0,
sampleRow: sample
? {
guid: sample.guid,
ts_ms: sample.ts_ms,
action_type: sample.action_type,
cmd_word: sample.cmd_word,
direction: sample.direction,
device_id: sample.device_id
}
: null
};
throw error;
}
return rows.length;
};

View File

@@ -0,0 +1,83 @@
const normalizeHex = (hex) => {
if (typeof hex !== 'string') {
return '';
}
let cleaned = hex.trim().replace(/^0x/i, '').replace(/\s+/g, '');
if (cleaned.length % 2 === 1) {
cleaned = `0${cleaned}`;
}
return cleaned;
};
const toHex = (value) => `0x${value.toString(16).padStart(2, '0')}`;
const readUInt16 = (buffer, offset) => buffer.readUInt16BE(offset);
export const parse0x36 = (udpRaw) => {
const cleaned = normalizeHex(udpRaw);
const buffer = cleaned ? Buffer.from(cleaned, 'hex') : Buffer.alloc(0);
const sysLockStatus = buffer.length > 0 ? buffer[0] : null;
const reportCount = buffer.length > 7 ? buffer[7] : null;
let offset = 8;
const devices = [];
for (let i = 0; i < (reportCount || 0) && offset + 5 < buffer.length; i += 1) {
devices.push({
dev_type: buffer[offset],
dev_addr: buffer[offset + 1],
dev_loop: readUInt16(buffer, offset + 2),
dev_data: readUInt16(buffer, offset + 4)
});
offset += 6;
}
const faultCount = offset < buffer.length ? buffer[offset] : null;
offset += 1;
const faults = [];
for (let i = 0; i < (faultCount || 0) && offset + 5 < buffer.length; i += 1) {
faults.push({
fault_dev_type: buffer[offset],
fault_dev_addr: buffer[offset + 1],
fault_dev_loop: readUInt16(buffer, offset + 2),
error_type: buffer[offset + 4],
error_data: buffer[offset + 5]
});
offset += 6;
}
return {
sysLockStatus,
reportCount,
faultCount,
devices,
faults
};
};
export const parse0x0fDownlink = (udpRaw) => {
const cleaned = normalizeHex(udpRaw);
const buffer = cleaned ? Buffer.from(cleaned, 'hex') : Buffer.alloc(0);
const controlCount = buffer.length > 0 ? buffer[0] : null;
let offset = 1;
const controlParams = [];
for (let i = 0; i < (controlCount || 0) && offset + 5 < buffer.length; i += 1) {
const typeValue = readUInt16(buffer, offset + 4);
controlParams.push({
dev_type: buffer[offset],
dev_addr: buffer[offset + 1],
loop: readUInt16(buffer, offset + 2),
type: typeValue,
type_l: buffer[offset + 4],
type_h: buffer[offset + 5]
});
offset += 6;
}
return {
controlCount,
controlParams
};
};
export const parse0x0fAck = (udpRaw) => {
const cleaned = normalizeHex(udpRaw);
const buffer = cleaned ? Buffer.from(cleaned, 'hex') : Buffer.alloc(0);
const ackCode = buffer.length > 1 ? toHex(buffer[1]) : null;
return { ackCode };
};

View File

@@ -0,0 +1,53 @@
import { logger } from '../utils/logger.js';
export const buildErrorQueueKey = (projectName) => `${projectName}_error_queue`;
export const enqueueError = async (client, queueKey, payload) => {
try {
await client.rPush(queueKey, JSON.stringify(payload));
} catch (error) {
logger.error('Redis enqueue error failed', { error: error?.message });
throw error;
}
};
export const startErrorRetryWorker = async ({
client,
queueKey,
handler,
redisIntegration,
maxAttempts = 5
}) => {
while (true) {
const result = await client.blPop(queueKey, 0);
const raw = result?.element;
if (!raw) {
continue;
}
let item;
try {
item = JSON.parse(raw);
} catch (error) {
logger.error('Invalid error payload', { error: error?.message });
await redisIntegration.error('Invalid error payload', { module: 'redis', stack: error?.message });
continue;
}
const attempts = item.attempts || 0;
try {
await handler(item);
} catch (error) {
logger.error('Retry handler failed', { error: error?.message, stack: error?.stack });
const nextPayload = {
...item,
attempts: attempts + 1,
lastError: error?.message,
lastAttemptAt: Date.now()
};
if (nextPayload.attempts >= maxAttempts) {
await redisIntegration.error('Retry attempts exceeded', { module: 'retry', stack: JSON.stringify(nextPayload) });
} else {
await enqueueError(client, queueKey, nextPayload);
}
}
}
};

View File

@@ -0,0 +1,14 @@
import { createClient } from 'redis';
export const createRedisClient = async (config) => {
const client = createClient({
socket: {
host: config.host,
port: config.port
},
password: config.password,
database: config.db
});
await client.connect();
return client;
};

View File

@@ -0,0 +1,40 @@
export class RedisIntegration {
constructor(client, projectName, apiBaseUrl) {
this.client = client;
this.projectName = projectName;
this.apiBaseUrl = apiBaseUrl;
this.heartbeatKey = '项目心跳';
this.logKey = `${projectName}_项目控制台`;
}
async info(message, context) {
const payload = {
timestamp: new Date().toISOString(),
level: 'info',
message,
metadata: context || undefined
};
await this.client.rPush(this.logKey, JSON.stringify(payload));
}
async error(message, context) {
const payload = {
timestamp: new Date().toISOString(),
level: 'error',
message,
metadata: context || undefined
};
await this.client.rPush(this.logKey, JSON.stringify(payload));
}
startHeartbeat() {
setInterval(() => {
const payload = {
projectName: this.projectName,
apiBaseUrl: this.apiBaseUrl,
lastActiveAt: Date.now()
};
this.client.rPush(this.heartbeatKey, JSON.stringify(payload));
}, 3000);
}
}

View File

@@ -0,0 +1,59 @@
import { z } from 'zod';
// Device Status Schema (for device_list)
const deviceItemSchema = z.object({
dev_type: z.number().int().optional(),
dev_addr: z.number().int().optional(),
dev_loop: z.number().int().optional(),
dev_data: z.number().int().optional()
});
// Fault Item Schema (for fault_list)
const faultItemSchema = z.object({
dev_type: z.number().int().optional(),
dev_addr: z.number().int().optional(),
dev_loop: z.number().int().optional(),
error_type: z.number().int().optional(),
error_data: z.number().int().optional()
});
// Control Item Schema (for control_list)
const controlItemSchema = z.object({
dev_type: z.number().int().optional(),
dev_addr: z.number().int().optional(),
dev_loop: z.number().int().optional(),
type_l: z.number().int().optional(),
type_h: z.number().int().optional()
});
const listSchema = (schema) =>
z.preprocess(
(value) => (value === null ? [] : value),
z.array(schema).optional().default([])
);
// Main Kafka Payload Schema
export const kafkaPayloadSchema = z.object({
// Required Header Fields
ts_ms: z.number(),
hotel_id: z.preprocess(
(value) => (typeof value === 'string' ? Number(value) : value),
z.number()
),
room_id: z.union([z.string(), z.number()]).transform(val => String(val)),
device_id: z.union([z.string(), z.number()]).transform(val => String(val)),
direction: z.string(),
cmd_word: z.union([z.string(), z.number()]).transform(val => String(val)),
frame_id: z.number(),
udp_raw: z.string(),
// Optional Statistical/Status Fields
sys_lock_status: z.number().optional().nullable(),
report_count: z.number().optional().nullable(),
fault_count: z.number().optional().nullable(),
// Lists
device_list: listSchema(deviceItemSchema),
fault_list: listSchema(faultItemSchema),
control_list: listSchema(controlItemSchema)
});

View File

@@ -0,0 +1,18 @@
const format = (level, message, context) => {
const payload = {
level,
message,
timestamp: Date.now(),
...(context ? { context } : {})
};
return JSON.stringify(payload);
};
export const logger = {
info(message, context) {
process.stdout.write(`${format('info', message, context)}\n`);
},
error(message, context) {
process.stderr.write(`${format('error', message, context)}\n`);
}
};

View File

@@ -0,0 +1,26 @@
export class MetricCollector {
constructor() {
this.reset();
}
reset() {
this.metrics = {
kafka_pulled: 0,
parse_error: 0,
db_inserted: 0,
db_failed: 0
};
}
increment(metric, count = 1) {
if (this.metrics.hasOwnProperty(metric)) {
this.metrics[metric] += count;
}
}
getAndReset() {
const current = { ...this.metrics };
this.reset();
return current;
}
}

View File

@@ -0,0 +1,3 @@
import { randomUUID } from 'crypto';
export const createGuid = () => randomUUID().replace(/-/g, '');

View File

@@ -0,0 +1,120 @@
import { describe, it, expect } from 'vitest';
import { buildRowsFromPayload } from '../src/processor/index.js';
describe('Processor Logic', () => {
const basePayload = {
ts_ms: 1700000000000,
hotel_id: 1001,
room_id: '8001',
device_id: 'dev_001',
direction: '上报',
cmd_word: '0x36',
frame_id: 1,
udp_raw: '3601...',
sys_lock_status: 0,
report_count: 0,
fault_count: 0
};
it('should validate required fields', () => {
expect(() => buildRowsFromPayload({})).toThrow();
expect(() => buildRowsFromPayload({ ...basePayload, ts_ms: undefined })).toThrow();
});
it('should handle 0x36 Status Report with device list', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
report_count: 2,
device_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].dev_addr).toBe(10);
expect(rows[1].dev_addr).toBe(11);
expect(rows[0].details.device_list).toHaveLength(2);
});
it('should handle 0x36 Fault Report', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
fault_count: 1,
fault_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].error_type).toBe(2);
});
it('should handle 0x36 Mixed Report (Status + Fault)', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
report_count: 1,
fault_count: 1,
device_list: [{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }],
fault_list: [{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(2); // 1 status + 1 fault
});
it('should handle 0x0F Control Command', () => {
const payload = {
...basePayload,
direction: '下发',
cmd_word: '0x0F',
control_list: [
{ dev_type: 1, dev_addr: 10, dev_loop: 1, type_l: 1, type_h: 2 }
]
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0F下发');
expect(rows[0].type_l).toBe(1);
expect(rows[0].type_h).toBe(2);
expect(rows[0].dev_loop).toBe(1);
});
it('should handle 0x0F ACK', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x0F'
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('0FACK');
});
it('should fallback when lists are empty for 0x36', () => {
const payload = {
...basePayload,
direction: '上报',
cmd_word: '0x36',
device_list: [],
fault_list: []
};
const rows = buildRowsFromPayload(payload);
expect(rows).toHaveLength(1);
expect(rows[0].action_type).toBe('36上报');
expect(rows[0].dev_type).toBeNull();
});
});