Compare commits
4 Commits
0e6c5c3cc3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 21cf140c68 | |||
| 680bf6a957 | |||
| 339db6f95f | |||
| 4e0f5213db |
File diff suppressed because one or more lines are too long
@@ -6,7 +6,9 @@ NODE_ENV=development
|
||||
KAFKA_BROKERS=localhost:9092
|
||||
KAFKA_TOPIC=my-topic-name
|
||||
KAFKA_GROUP_ID=my-group-id
|
||||
KAFKA_CLIENT_ID=my-client-id
|
||||
KAFKA_CLIENT_ID=bls-rcu-action-client
|
||||
KAFKA_AUTO_COMMIT=false
|
||||
KAFKA_AUTO_COMMIT_INTERVAL_MS=5000
|
||||
KAFKA_CONSUMER_INSTANCES=1
|
||||
# KAFKA_SASL_USERNAME=
|
||||
# KAFKA_SASL_PASSWORD=
|
||||
@@ -27,3 +29,5 @@ REDIS_PASSWORD=
|
||||
REDIS_DB=0
|
||||
REDIS_PROJECT_NAME=my-project
|
||||
REDIS_API_BASE_URL=http://localhost:3000
|
||||
|
||||
ENABLE_LOOP_NAME_AUTO_GENERATION=true
|
||||
|
||||
113
bls-rcu-action-backend/scripts/generate_rules_from_readme.js
Normal file
113
bls-rcu-action-backend/scripts/generate_rules_from_readme.js
Normal file
@@ -0,0 +1,113 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
const readmePath = path.resolve(__dirname, '../../docs/readme.md');
|
||||
const envPath = path.resolve(__dirname, '../.env');
|
||||
const processorPath = path.resolve(__dirname, '../src/processor/index.js');
|
||||
|
||||
try {
|
||||
const readmeContent = fs.readFileSync(readmePath, 'utf8');
|
||||
const lines = readmeContent.split('\n');
|
||||
|
||||
const rules = [];
|
||||
let inTable = false;
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
|
||||
// Detect start of table (approximately)
|
||||
if (trimmed.includes('|dev_type|名称|描述|Action Type|')) {
|
||||
inTable = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip separator line
|
||||
if (inTable && trimmed.includes('|---|')) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process table rows
|
||||
if (inTable && trimmed.startsWith('|') && trimmed.endsWith('|')) {
|
||||
const parts = trimmed.split('|').map(p => p.trim());
|
||||
// parts[0] is empty, parts[1] is dev_type, parts[2] is name, parts[3] is description, parts[4] is action_type
|
||||
|
||||
if (parts.length >= 5) {
|
||||
const devTypeStr = parts[1];
|
||||
const description = parts[3];
|
||||
const actionType = parts[4];
|
||||
|
||||
if (!devTypeStr || isNaN(parseInt(devTypeStr))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const devType = parseInt(devTypeStr, 10);
|
||||
|
||||
rules.push({
|
||||
dev_type: devType,
|
||||
name: description, // Use description as name per user request
|
||||
action_type: actionType
|
||||
});
|
||||
}
|
||||
} else if (inTable && trimmed === '') {
|
||||
// Empty line might mean end of table, but let's be loose
|
||||
} else if (inTable && !trimmed.startsWith('|')) {
|
||||
// End of table
|
||||
inTable = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by dev_type
|
||||
rules.sort((a, b) => a.dev_type - b.dev_type);
|
||||
|
||||
console.log(`Found ${rules.length} rules.`);
|
||||
|
||||
// 1. Generate JSON for .env
|
||||
const envJson = JSON.stringify(rules);
|
||||
|
||||
// Read existing .env
|
||||
let envContent = fs.readFileSync(envPath, 'utf8');
|
||||
const envKey = 'ACTION_TYPE_DEV_TYPE_RULES';
|
||||
|
||||
// Replace or Append
|
||||
const envLine = `${envKey}='${envJson}'`;
|
||||
const regex = new RegExp(`^${envKey}=.*`, 'm');
|
||||
|
||||
if (regex.test(envContent)) {
|
||||
envContent = envContent.replace(regex, envLine);
|
||||
} else {
|
||||
envContent += `\n${envLine}`;
|
||||
}
|
||||
|
||||
fs.writeFileSync(envPath, envContent, 'utf8');
|
||||
console.log('Updated .env');
|
||||
|
||||
// 2. Generate Object for src/processor/index.js
|
||||
// We need to construct the object string manually to match the code style
|
||||
const mapLines = rules.map(r => {
|
||||
// Escape single quotes in name if present
|
||||
const safeName = r.name.replace(/'/g, "\\'");
|
||||
return ` ${r.dev_type}: { name: '${safeName}', action: '${r.action_type}' }`;
|
||||
});
|
||||
|
||||
const mapString = `const defaultDevTypeActionMap = {\n${mapLines.join(',\n')}\n};`;
|
||||
|
||||
let processorContent = fs.readFileSync(processorPath, 'utf8');
|
||||
|
||||
// Regex to replace the object.
|
||||
const processorRegex = /const defaultDevTypeActionMap = \{[\s\S]*?\};/m;
|
||||
|
||||
if (processorRegex.test(processorContent)) {
|
||||
processorContent = processorContent.replace(processorRegex, mapString);
|
||||
fs.writeFileSync(processorPath, processorContent, 'utf8');
|
||||
console.log('Updated src/processor/index.js');
|
||||
} else {
|
||||
console.error('Could not find defaultDevTypeActionMap in src/processor/index.js');
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
console.error('Error:', err);
|
||||
}
|
||||
@@ -27,12 +27,16 @@ CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
|
||||
type_h SMALLINT,
|
||||
details JSONB,
|
||||
extra JSONB,
|
||||
loop_name VARCHAR(255),
|
||||
PRIMARY KEY (ts_ms, guid)
|
||||
) PARTITION BY RANGE (ts_ms);
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
|
||||
|
||||
-- Indexes for performance
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);
|
||||
|
||||
95
bls-rcu-action-backend/src/cache/projectMetadata.js
vendored
Normal file
95
bls-rcu-action-backend/src/cache/projectMetadata.js
vendored
Normal file
@@ -0,0 +1,95 @@
|
||||
import cron from 'node-cron';
|
||||
import dbManager from '../db/databaseManager.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
class ProjectMetadataCache {
|
||||
constructor() {
|
||||
this.roomMap = new Map(); // device_id -> room_type_id
|
||||
this.loopMap = new Map(); // room_type_id:loop_address -> loop_name
|
||||
}
|
||||
|
||||
async init() {
|
||||
try {
|
||||
await this.refresh();
|
||||
} catch (error) {
|
||||
logger.error('Initial metadata refresh failed', { error: error.message });
|
||||
}
|
||||
|
||||
// Schedule 1:00 AM daily
|
||||
cron.schedule('0 1 * * *', () => {
|
||||
this.refresh().catch(err => logger.error('Scheduled metadata refresh failed', { error: err.message }));
|
||||
});
|
||||
}
|
||||
|
||||
async refresh() {
|
||||
logger.info('Refreshing project metadata cache...');
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
// Load Rooms
|
||||
// temporary_project.rooms might be partitioned, but querying parent works.
|
||||
const roomsRes = await client.query('SELECT device_id, room_type_id FROM temporary_project.rooms');
|
||||
const newRoomMap = new Map();
|
||||
for (const row of roomsRes.rows) {
|
||||
if (row.device_id) {
|
||||
newRoomMap.set(row.device_id, row.room_type_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Load Loops
|
||||
const loopsRes = await client.query('SELECT room_type_id, loop_address, loop_name FROM temporary_project.loops');
|
||||
const newLoopMap = new Map();
|
||||
for (const row of loopsRes.rows) {
|
||||
if (row.room_type_id && row.loop_address) {
|
||||
// loop_address is varchar, we will key it as string
|
||||
const key = `${row.room_type_id}:${row.loop_address}`;
|
||||
newLoopMap.set(key, row.loop_name);
|
||||
}
|
||||
}
|
||||
|
||||
this.roomMap = newRoomMap;
|
||||
this.loopMap = newLoopMap;
|
||||
logger.info('Project metadata cache refreshed', {
|
||||
roomsCount: this.roomMap.size,
|
||||
loopsCount: this.loopMap.size
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
// If schema/tables don't exist, this will throw.
|
||||
// We log but don't crash the app, as this is an enhancement feature.
|
||||
logger.error('Failed to refresh project metadata', { error: error.message });
|
||||
throw error;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get loop name for a given device configuration
|
||||
* @param {string} deviceId - The device ID (from room)
|
||||
* @param {number|string} devType - The device type
|
||||
* @param {number|string} devAddr - The device address
|
||||
* @param {number|string} devLoop - The device loop
|
||||
* @returns {string|null} - The loop name or null if not found
|
||||
*/
|
||||
getLoopName(deviceId, devType, devAddr, devLoop) {
|
||||
if (!deviceId ||
|
||||
devType === undefined || devType === null ||
|
||||
devAddr === undefined || devAddr === null ||
|
||||
devLoop === undefined || devLoop === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const roomTypeId = this.roomMap.get(deviceId);
|
||||
if (!roomTypeId) return null;
|
||||
|
||||
// Construct loop_address: 3-digit zero-padded concatenation of type, addr, loop
|
||||
// e.g. type=1, addr=23, loop=12 -> 001023012
|
||||
const fmt = (val) => String(val).padStart(3, '0');
|
||||
const loopAddress = `${fmt(devType)}${fmt(devAddr)}${fmt(devLoop)}`;
|
||||
|
||||
const key = `${roomTypeId}:${loopAddress}`;
|
||||
return this.loopMap.get(key) || null;
|
||||
}
|
||||
}
|
||||
|
||||
export default new ProjectMetadataCache();
|
||||
@@ -22,7 +22,7 @@ export const config = {
|
||||
groupId: process.env.KAFKA_GROUP_ID || 'bls-rcu-action-group',
|
||||
clientId: process.env.KAFKA_CLIENT_ID || 'bls-rcu-action-client',
|
||||
consumerInstances: parseNumber(process.env.KAFKA_CONSUMER_INSTANCES, 1),
|
||||
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 50),
|
||||
maxInFlight: parseNumber(process.env.KAFKA_MAX_IN_FLIGHT, 500),
|
||||
fetchMaxBytes: parseNumber(process.env.KAFKA_FETCH_MAX_BYTES, 10 * 1024 * 1024),
|
||||
fetchMinBytes: parseNumber(process.env.KAFKA_FETCH_MIN_BYTES, 1),
|
||||
fetchMaxWaitMs: parseNumber(process.env.KAFKA_FETCH_MAX_WAIT_MS, 100),
|
||||
@@ -52,5 +52,6 @@ export const config = {
|
||||
db: parseNumber(process.env.REDIS_DB, 0),
|
||||
projectName: process.env.REDIS_PROJECT_NAME || 'bls-rcu-action',
|
||||
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
|
||||
}
|
||||
},
|
||||
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
|
||||
};
|
||||
|
||||
69
bls-rcu-action-backend/src/db/batchProcessor.js
Normal file
69
bls-rcu-action-backend/src/db/batchProcessor.js
Normal file
@@ -0,0 +1,69 @@
|
||||
export class BatchProcessor {
|
||||
constructor(dbManager, config, options = {}) {
|
||||
this.dbManager = dbManager;
|
||||
this.config = config;
|
||||
this.batchSize = options.batchSize || 500;
|
||||
this.flushInterval = options.flushInterval || 1000;
|
||||
this.buffer = [];
|
||||
this.timer = null;
|
||||
}
|
||||
|
||||
add(item) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.buffer.push({ ...item, resolve, reject });
|
||||
if (this.buffer.length >= this.batchSize) {
|
||||
this.flush();
|
||||
} else if (!this.timer) {
|
||||
this.timer = setTimeout(() => this.flush(), this.flushInterval);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async flush() {
|
||||
if (this.buffer.length === 0) return;
|
||||
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
|
||||
const currentBatch = [...this.buffer];
|
||||
this.buffer = [];
|
||||
|
||||
const allRows = currentBatch.flatMap(item => item.rows);
|
||||
|
||||
if (allRows.length === 0) {
|
||||
// No rows to insert (e.g. empty messages), just resolve
|
||||
currentBatch.forEach(item => item.resolve(0));
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.dbManager.insertRows({
|
||||
schema: this.config.db.schema,
|
||||
table: this.config.db.table,
|
||||
rows: allRows
|
||||
});
|
||||
|
||||
// Resolve each item with its own row count
|
||||
currentBatch.forEach(item => item.resolve(item.rows.length));
|
||||
} catch (error) {
|
||||
// Enrich error with DB context if possible (using first item as sample)
|
||||
error.type = 'DB_ERROR';
|
||||
const sample = allRows[0];
|
||||
error.dbContext = {
|
||||
batchSize: currentBatch.length,
|
||||
totalRows: allRows.length,
|
||||
sampleRow: sample ? {
|
||||
guid: sample.guid,
|
||||
ts_ms: sample.ts_ms,
|
||||
action_type: sample.action_type,
|
||||
cmd_word: sample.cmd_word
|
||||
} : null
|
||||
};
|
||||
|
||||
// Reject all items in the batch
|
||||
currentBatch.forEach(item => item.reject(error));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,8 @@ const columns = [
|
||||
'type_l',
|
||||
'type_h',
|
||||
'details',
|
||||
'extra'
|
||||
'extra',
|
||||
'loop_name'
|
||||
];
|
||||
|
||||
export class DatabaseManager {
|
||||
@@ -71,6 +72,15 @@ export class DatabaseManager {
|
||||
}
|
||||
}
|
||||
|
||||
async testConnection() {
|
||||
try {
|
||||
await this.pool.query('SELECT 1');
|
||||
return true;
|
||||
} catch (error) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async close() {
|
||||
await this.pool.end();
|
||||
}
|
||||
|
||||
@@ -40,9 +40,25 @@ class DatabaseInitializer {
|
||||
ssl: ssl ? { rejectUnauthorized: false } : false
|
||||
});
|
||||
|
||||
const maxRetries = 5;
|
||||
let retryCount = 0;
|
||||
|
||||
while (retryCount < maxRetries) {
|
||||
try {
|
||||
await client.connect();
|
||||
break;
|
||||
} catch (err) {
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
retryCount++;
|
||||
logger.warn(`Port conflict (EADDRINUSE) connecting to database, retrying (${retryCount}/${maxRetries})...`);
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await client.connect();
|
||||
|
||||
const checkRes = await client.query(
|
||||
`SELECT 1 FROM pg_database WHERE datname = $1`,
|
||||
[database]
|
||||
|
||||
@@ -3,6 +3,7 @@ import { config } from './config/config.js';
|
||||
import dbManager from './db/databaseManager.js';
|
||||
import dbInitializer from './db/initializer.js';
|
||||
import partitionManager from './db/partitionManager.js';
|
||||
import projectMetadata from './cache/projectMetadata.js';
|
||||
import { createKafkaConsumers } from './kafka/consumer.js';
|
||||
import { processKafkaMessage } from './processor/index.js';
|
||||
import { createRedisClient } from './redis/redisClient.js';
|
||||
@@ -10,10 +11,14 @@ import { RedisIntegration } from './redis/redisIntegration.js';
|
||||
import { buildErrorQueueKey, enqueueError, startErrorRetryWorker } from './redis/errorQueue.js';
|
||||
import { MetricCollector } from './utils/metricCollector.js';
|
||||
import { logger } from './utils/logger.js';
|
||||
import { BatchProcessor } from './db/batchProcessor.js';
|
||||
|
||||
const bootstrap = async () => {
|
||||
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
|
||||
await dbInitializer.initialize();
|
||||
|
||||
// 0.1 Initialize Project Metadata Cache
|
||||
await projectMetadata.init();
|
||||
|
||||
// Metric Collector
|
||||
const metricCollector = new MetricCollector();
|
||||
@@ -68,6 +73,10 @@ const bootstrap = async () => {
|
||||
|
||||
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
|
||||
|
||||
const batchProcessor = new BatchProcessor(dbManager, config, {
|
||||
batchSize: config.kafka.maxInFlight
|
||||
});
|
||||
|
||||
const handleMessage = async (message) => {
|
||||
if (message.topic) {
|
||||
metricCollector.increment('kafka_pulled');
|
||||
@@ -96,7 +105,8 @@ const bootstrap = async () => {
|
||||
valueLength: typeof messageValue === 'string' ? messageValue.length : null
|
||||
});
|
||||
}
|
||||
const inserted = await processKafkaMessage({ message, dbManager, config });
|
||||
const rows = await processKafkaMessage({ message });
|
||||
const inserted = await batchProcessor.add({ rows });
|
||||
metricCollector.increment('db_inserted');
|
||||
logger.info('Kafka message processed', { inserted });
|
||||
} catch (error) {
|
||||
@@ -153,10 +163,24 @@ const bootstrap = async () => {
|
||||
}
|
||||
};
|
||||
|
||||
const healthCheck = {
|
||||
shouldPause: async (error) => {
|
||||
if (error?.type === 'DB_ERROR') {
|
||||
const isConnected = await dbManager.testConnection();
|
||||
return !isConnected;
|
||||
}
|
||||
return false;
|
||||
},
|
||||
check: async () => {
|
||||
return await dbManager.testConnection();
|
||||
}
|
||||
};
|
||||
|
||||
const consumers = createKafkaConsumers({
|
||||
kafkaConfig: config.kafka,
|
||||
onMessage: handleMessage,
|
||||
onError: handleError
|
||||
onError: handleError,
|
||||
healthCheck
|
||||
});
|
||||
|
||||
// Start retry worker (non-blocking)
|
||||
|
||||
@@ -3,12 +3,13 @@ import { logger } from '../utils/logger.js';
|
||||
|
||||
const { ConsumerGroup } = kafka;
|
||||
|
||||
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) => {
|
||||
const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex, healthCheck }) => {
|
||||
const kafkaHost = kafkaConfig.brokers.join(',');
|
||||
const clientId = instanceIndex === 0 ? kafkaConfig.clientId : `${kafkaConfig.clientId}-${instanceIndex}`;
|
||||
const id = `${clientId}-${process.pid}-${Date.now()}`;
|
||||
const maxInFlight = Number.isFinite(kafkaConfig.maxInFlight) ? kafkaConfig.maxInFlight : 50;
|
||||
let inFlight = 0;
|
||||
let isPausedForHealth = false;
|
||||
|
||||
const consumer = new ConsumerGroup(
|
||||
{
|
||||
@@ -19,7 +20,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
|
||||
fromOffset: 'earliest',
|
||||
protocol: ['roundrobin'],
|
||||
outOfRangeOffset: 'latest',
|
||||
autoCommit: true,
|
||||
autoCommit: false,
|
||||
autoCommitIntervalMs: kafkaConfig.autoCommitIntervalMs,
|
||||
fetchMaxBytes: kafkaConfig.fetchMaxBytes,
|
||||
fetchMinBytes: kafkaConfig.fetchMinBytes,
|
||||
@@ -30,7 +31,7 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
|
||||
);
|
||||
|
||||
const tryResume = () => {
|
||||
if (inFlight < maxInFlight) {
|
||||
if (!isPausedForHealth && inFlight < maxInFlight) {
|
||||
consumer.resume();
|
||||
}
|
||||
};
|
||||
@@ -40,9 +41,48 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
|
||||
if (inFlight >= maxInFlight) {
|
||||
consumer.pause();
|
||||
}
|
||||
Promise.resolve(onMessage(message))
|
||||
.catch((error) => {
|
||||
return Promise.resolve(onMessage(message))
|
||||
.then(() => {
|
||||
consumer.commit((err) => {
|
||||
if (err) {
|
||||
logger.error('Kafka commit failed', { error: err.message });
|
||||
}
|
||||
});
|
||||
})
|
||||
.catch(async (error) => {
|
||||
logger.error('Kafka message handling failed', { error: error?.message });
|
||||
|
||||
let shouldCommit = true;
|
||||
|
||||
if (!isPausedForHealth && healthCheck && await healthCheck.shouldPause(error)) {
|
||||
shouldCommit = false;
|
||||
isPausedForHealth = true;
|
||||
consumer.pause();
|
||||
logger.warn('Pausing consumer due to dependency failure. Entering recovery mode...');
|
||||
|
||||
const checkInterval = setInterval(async () => {
|
||||
try {
|
||||
const isHealthy = await healthCheck.check();
|
||||
if (isHealthy) {
|
||||
clearInterval(checkInterval);
|
||||
isPausedForHealth = false;
|
||||
consumer.resume();
|
||||
logger.info('Dependency recovered. Resuming consumer.');
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Health check failed', { error: err.message });
|
||||
}
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
if (shouldCommit) {
|
||||
consumer.commit((err) => {
|
||||
if (err) {
|
||||
logger.error('Kafka commit failed (error case)', { error: err.message });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (onError) {
|
||||
onError(error, message);
|
||||
}
|
||||
@@ -63,13 +103,13 @@ const createOneConsumer = ({ kafkaConfig, onMessage, onError, instanceIndex }) =
|
||||
return consumer;
|
||||
};
|
||||
|
||||
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError }) => {
|
||||
export const createKafkaConsumers = ({ kafkaConfig, onMessage, onError, healthCheck }) => {
|
||||
const instances = Number.isFinite(kafkaConfig.consumerInstances) ? kafkaConfig.consumerInstances : 1;
|
||||
const count = Math.max(1, instances);
|
||||
return Array.from({ length: count }, (_, idx) =>
|
||||
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx })
|
||||
createOneConsumer({ kafkaConfig, onMessage, onError, instanceIndex: idx, healthCheck })
|
||||
);
|
||||
};
|
||||
|
||||
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError }) =>
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError })[0];
|
||||
export const createKafkaConsumer = ({ kafkaConfig, onMessage, onError, healthCheck }) =>
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck })[0];
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { createGuid } from '../utils/uuid.js';
|
||||
import { kafkaPayloadSchema } from '../schema/kafkaPayload.js';
|
||||
import projectMetadata from '../cache/projectMetadata.js';
|
||||
import { config } from '../config/config.js';
|
||||
|
||||
const normalizeDirection = (value) => {
|
||||
if (!value) return null;
|
||||
@@ -30,7 +32,7 @@ const normalizeCmdWord = (value) => {
|
||||
};
|
||||
|
||||
const resolveMessageType = (direction, cmdWord) => {
|
||||
if (cmdWord === '0x36') {
|
||||
if (cmdWord === '0x36' || cmdWord === '0x0e') {
|
||||
return '36上报';
|
||||
}
|
||||
if (cmdWord === '0x0f' && direction === '下发') {
|
||||
@@ -43,116 +45,103 @@ const resolveMessageType = (direction, cmdWord) => {
|
||||
};
|
||||
|
||||
const defaultDevTypeActionMap = {
|
||||
0: '无效',
|
||||
1: '设备回路状态',
|
||||
2: '用户操作',
|
||||
3: '设备回路状态',
|
||||
4: '设备回路状态',
|
||||
5: '设备回路状态',
|
||||
6: '用户操作',
|
||||
7: '用户操作',
|
||||
8: '用户操作',
|
||||
9: '设备回路状态',
|
||||
10: '用户操作',
|
||||
11: '用户操作',
|
||||
12: '无效',
|
||||
13: '设备回路状态',
|
||||
14: '设备回路状态',
|
||||
15: '设备回路状态',
|
||||
16: '设备回路状态',
|
||||
17: '设备回路状态',
|
||||
18: '无效',
|
||||
19: '无效',
|
||||
20: '无效',
|
||||
21: '设备回路状态',
|
||||
22: '无效',
|
||||
23: '无效',
|
||||
24: '无效',
|
||||
25: '无效',
|
||||
26: '无效',
|
||||
27: '用户操作',
|
||||
28: '设备回路状态',
|
||||
29: '设备回路状态',
|
||||
30: '无效',
|
||||
31: '无效',
|
||||
32: '用户操作',
|
||||
33: '设备回路状态',
|
||||
34: '设备回路状态',
|
||||
35: '设备回路状态',
|
||||
36: '无效',
|
||||
37: '用户操作',
|
||||
38: '设备回路状态',
|
||||
39: '设备回路状态',
|
||||
40: '用户操作',
|
||||
41: '用户操作',
|
||||
42: '无效',
|
||||
43: '无效',
|
||||
44: '设备回路状态',
|
||||
45: '无效',
|
||||
46: '用户操作',
|
||||
47: '设备回路状态',
|
||||
48: '设备回路状态',
|
||||
49: '设备回路状态',
|
||||
50: '设备回路状态',
|
||||
51: '设备回路状态',
|
||||
52: '设备回路状态',
|
||||
53: '设备回路状态',
|
||||
54: '用户操作',
|
||||
55: '用户操作',
|
||||
56: '设备回路状态',
|
||||
57: '设备回路状态',
|
||||
241: '设备回路状态'
|
||||
0: { name: '无效设备(也可以被认为是场景)', action: '无效' },
|
||||
1: { name: '强电继电器(输出状态)', action: '设备回路状态' },
|
||||
2: { name: '弱电输入(输入状态)', action: '用户操作' },
|
||||
3: { name: '弱电输出(输出状态)', action: '设备回路状态' },
|
||||
4: { name: '服务信息', action: '设备回路状态' },
|
||||
5: { name: '干节点窗帘', action: '设备回路状态' },
|
||||
6: { name: '开关', action: '用户操作' },
|
||||
7: { name: '空调', action: '用户操作' },
|
||||
8: { name: '红外感应', action: '用户操作' },
|
||||
9: { name: '空气质量检测设备', action: '设备回路状态' },
|
||||
10: { name: '插卡取电', action: '用户操作' },
|
||||
11: { name: '地暖', action: '用户操作' },
|
||||
12: { name: 'RCU 设备网络 - 没使用', action: '' },
|
||||
13: { name: '窗帘', action: '设备回路状态' },
|
||||
14: { name: '继电器', action: '设备回路状态' },
|
||||
15: { name: '红外发送', action: '设备回路状态' },
|
||||
16: { name: '调光驱动', action: '设备回路状态' },
|
||||
17: { name: '可控硅调光(可控硅状态)', action: '设备回路状态' },
|
||||
18: { name: '灯带(灯带状态) --2025-11-24 取消', action: '无效' },
|
||||
19: { name: '中控', action: '无效' },
|
||||
20: { name: '微信锁 (福 瑞狗的蓝牙锁 默认 0 地址)', action: '无效' },
|
||||
21: { name: '背景音乐(背景音乐状态)', action: '设备回路状态' },
|
||||
22: { name: '房态下发', action: '无效' },
|
||||
23: { name: '主机本地 调光', action: '无效' },
|
||||
24: { name: '485PWM 调光( PWM 调光状态)', action: '无效' },
|
||||
25: { name: '总线调光( PBLED 调光状态) - 没使用 -', action: '无效' },
|
||||
26: { name: 'RCU 电源', action: '无效' },
|
||||
27: { name: 'A9IO 开关', action: '用户操作' },
|
||||
28: { name: 'A9IO 扩展', action: '设备回路状态' },
|
||||
29: { name: 'A9IO 电源', action: '设备回路状态' },
|
||||
30: { name: '无线网关轮询(用于轮询控制轮询设备;给无线网关下发配置和询问网关状态)', action: '无效' },
|
||||
31: { name: '无线网关主动(用于主动控制主动设备)', action: '无效' },
|
||||
32: { name: '无线门磁', action: '用户操作' },
|
||||
33: { name: '空气参数显示设备', action: '设备回路状态' },
|
||||
34: { name: '无线继电器红外', action: '设备回路状态' },
|
||||
35: { name: '时间同步', action: '设备回路状态' },
|
||||
36: { name: '监控控制', action: '无效' },
|
||||
37: { name: '旋钮开关控制', action: '用户操作' },
|
||||
38: { name: 'CSIO - 类型', action: '设备回路状态' },
|
||||
39: { name: '插卡状态虚拟设备', action: '设备回路状态' },
|
||||
40: { name: '485 新风设备', action: '用户操作' },
|
||||
41: { name: '485 人脸机', action: '用户操作' },
|
||||
42: { name: '中控', action: '无效' },
|
||||
43: { name: '域控', action: '无效' },
|
||||
44: { name: 'LCD', action: '设备回路状态' },
|
||||
45: { name: '无卡断电 --2025-11-24 取消', action: '无效' },
|
||||
46: { name: '无卡取电 2', action: '用户操作' },
|
||||
47: { name: '虚拟时间设备', action: '设备回路状态' },
|
||||
48: { name: 'PLC 总控', action: '设备回路状态' },
|
||||
49: { name: 'PLC 设备 - 恒流调光设备', action: '设备回路状态' },
|
||||
50: { name: 'PLC 设备 - 恒压调光设备', action: '设备回路状态' },
|
||||
51: { name: 'PLC 设备 - 继电器设备', action: '设备回路状态' },
|
||||
52: { name: '色温调节功能', action: '设备回路状态' },
|
||||
53: { name: '蓝牙音频', action: '设备回路状态' },
|
||||
54: { name: '碳达人', action: '用户操作' },
|
||||
55: { name: '场景还原', action: '用户操作' },
|
||||
56: { name: '全局设置', action: '设备回路状态' },
|
||||
57: { name: '能耗检测', action: '设备回路状态' },
|
||||
241: { name: 'CSIO - 类型', action: '设备回路状态' }
|
||||
};
|
||||
|
||||
const buildDevTypeActionMap = () => {
|
||||
const raw = process.env.ACTION_TYPE_DEV_TYPE_RULES;
|
||||
if (!raw || typeof raw !== 'string' || !raw.trim()) {
|
||||
return defaultDevTypeActionMap;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(raw);
|
||||
if (!Array.isArray(parsed)) {
|
||||
return defaultDevTypeActionMap;
|
||||
// Parse env rules if present
|
||||
let devTypeActionRules = [];
|
||||
try {
|
||||
if (process.env.ACTION_TYPE_DEV_TYPE_RULES) {
|
||||
const parsed = JSON.parse(process.env.ACTION_TYPE_DEV_TYPE_RULES);
|
||||
if (Array.isArray(parsed)) {
|
||||
devTypeActionRules = parsed;
|
||||
}
|
||||
|
||||
const overrides = {};
|
||||
parsed.forEach((item) => {
|
||||
if (Array.isArray(item) && item.length >= 2) {
|
||||
const devType = Number(item[0]);
|
||||
const actionType = item[1];
|
||||
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
|
||||
overrides[devType] = actionType;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (item && typeof item === 'object') {
|
||||
const devType = Number(item.dev_type ?? item.devType);
|
||||
const actionType = item.action_type ?? item.actionType ?? item.action;
|
||||
if (Number.isFinite(devType) && typeof actionType === 'string' && actionType) {
|
||||
overrides[devType] = actionType;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return { ...defaultDevTypeActionMap, ...overrides };
|
||||
} catch {
|
||||
return defaultDevTypeActionMap;
|
||||
}
|
||||
} catch (error) {
|
||||
// Silent fallback
|
||||
}
|
||||
|
||||
const getActionTypeByDevType = (devType) => {
|
||||
// 1. Env override
|
||||
const rule = devTypeActionRules.find(r => r.dev_type === devType);
|
||||
if (rule?.action_type) return rule.action_type;
|
||||
|
||||
// 2. Default map
|
||||
const entry = defaultDevTypeActionMap[devType];
|
||||
return entry?.action || '设备回路状态';
|
||||
};
|
||||
|
||||
const devTypeActionMap = buildDevTypeActionMap();
|
||||
const getDevTypeName = (devType) => {
|
||||
// 1. Env override
|
||||
const rule = devTypeActionRules.find(r => r.dev_type === devType);
|
||||
if (rule?.name) return rule.name;
|
||||
|
||||
// 2. Default map
|
||||
const entry = defaultDevTypeActionMap[devType];
|
||||
return entry?.name || 'Unknown';
|
||||
};
|
||||
|
||||
const resolveDevTypeAction = (devType) => {
|
||||
if (typeof devType !== 'number') {
|
||||
return '设备回路状态';
|
||||
}
|
||||
const mapped = devTypeActionMap[devType];
|
||||
if (mapped) {
|
||||
return mapped;
|
||||
}
|
||||
return '设备回路状态';
|
||||
if (devType === null || devType === undefined) return '设备回路状态';
|
||||
return getActionTypeByDevType(devType);
|
||||
};
|
||||
|
||||
const parseKafkaPayload = (value) => {
|
||||
@@ -247,6 +236,24 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
extra: extra || {}
|
||||
};
|
||||
|
||||
// Helper to generate loop name if not found in cache
|
||||
const getLoopNameWithFallback = (deviceId, devType, devAddr, devLoop) => {
|
||||
// 1. Try cache
|
||||
const cachedName = projectMetadata.getLoopName(deviceId, devType, devAddr, devLoop);
|
||||
if (cachedName) return cachedName;
|
||||
|
||||
// 2. Check config for auto-generation
|
||||
if (!config.enableLoopNameAutoGeneration) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 3. Fallback: [TypeName-Addr-Loop]
|
||||
const typeName = getDevTypeName(devType);
|
||||
if (!typeName) return null; // Should have a name if devType is valid
|
||||
|
||||
return `[${devType}${typeName}-${devAddr}-${devLoop}]`;
|
||||
};
|
||||
|
||||
const rows = [];
|
||||
|
||||
// Logic 1: 0x36 Status/Fault Report
|
||||
@@ -268,6 +275,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
dev_loop: device.dev_loop ?? null,
|
||||
dev_data: device.dev_data ?? null,
|
||||
action_type: actionType,
|
||||
loop_name: getLoopNameWithFallback(deviceId, device.dev_type, device.dev_addr, device.dev_loop),
|
||||
details
|
||||
});
|
||||
});
|
||||
@@ -287,6 +295,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
error_type: fault.error_type ?? null,
|
||||
error_data: fault.error_data ?? null,
|
||||
action_type: actionType,
|
||||
loop_name: getLoopNameWithFallback(deviceId, fault.dev_type, fault.dev_addr, fault.dev_loop),
|
||||
details
|
||||
});
|
||||
});
|
||||
@@ -322,6 +331,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
type_l: control.type_l ?? null,
|
||||
type_h: control.type_h ?? null,
|
||||
action_type: '下发控制',
|
||||
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
|
||||
details
|
||||
});
|
||||
});
|
||||
@@ -340,25 +350,37 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
return rows;
|
||||
}
|
||||
|
||||
// Logic 3: 0x0F ACK or others
|
||||
const fallbackActionType =
|
||||
normalizedCmdWord === '0x0f' && normalizedDirection === '上报'
|
||||
? 'ACK'
|
||||
: '无效';
|
||||
// 3. 0x0F ACK
|
||||
else if (messageType === '0FACK') {
|
||||
const { control_list: controls = [] } = payload;
|
||||
if (Array.isArray(controls)) {
|
||||
const details = { control_list: controls };
|
||||
controls.forEach((control) => {
|
||||
rows.push({
|
||||
...commonFields,
|
||||
guid: createGuid(),
|
||||
dev_type: control.dev_type ?? null,
|
||||
dev_addr: control.dev_addr ?? null,
|
||||
dev_loop: control.dev_loop ?? null,
|
||||
dev_data: control.dev_data ?? null,
|
||||
type_h: control.type_h ?? null,
|
||||
action_type: '设备回路状态',
|
||||
loop_name: getLoopNameWithFallback(deviceId, control.dev_type, control.dev_addr, control.dev_loop),
|
||||
details
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return [{
|
||||
...commonFields,
|
||||
guid: createGuid(),
|
||||
action_type: fallbackActionType,
|
||||
details: {}
|
||||
}];
|
||||
return rows;
|
||||
};
|
||||
|
||||
export const processKafkaMessage = async ({ message, dbManager, config }) => {
|
||||
export const processKafkaMessage = async ({ message }) => {
|
||||
let rows;
|
||||
try {
|
||||
const payload = parseKafkaPayload(message.value);
|
||||
rows = buildRowsFromPayload(payload);
|
||||
return rows;
|
||||
} catch (error) {
|
||||
error.type = 'PARSE_ERROR';
|
||||
const rawValue = Buffer.isBuffer(message.value)
|
||||
@@ -370,27 +392,4 @@ export const processKafkaMessage = async ({ message, dbManager, config }) => {
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
try {
|
||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
||||
} catch (error) {
|
||||
error.type = 'DB_ERROR';
|
||||
const sample = rows?.[0];
|
||||
error.dbContext = {
|
||||
rowsLength: rows?.length || 0,
|
||||
sampleRow: sample
|
||||
? {
|
||||
guid: sample.guid,
|
||||
ts_ms: sample.ts_ms,
|
||||
action_type: sample.action_type,
|
||||
cmd_word: sample.cmd_word,
|
||||
direction: sample.direction,
|
||||
device_id: sample.device_id
|
||||
}
|
||||
: null
|
||||
};
|
||||
throw error;
|
||||
}
|
||||
|
||||
return rows.length;
|
||||
};
|
||||
|
||||
@@ -12,6 +12,9 @@ export const logger = {
|
||||
info(message, context) {
|
||||
process.stdout.write(`${format('info', message, context)}\n`);
|
||||
},
|
||||
warn(message, context) {
|
||||
process.stdout.write(`${format('warn', message, context)}\n`);
|
||||
},
|
||||
error(message, context) {
|
||||
process.stderr.write(`${format('error', message, context)}\n`);
|
||||
}
|
||||
|
||||
97
bls-rcu-action-backend/tests/batch_processor.test.js
Normal file
97
bls-rcu-action-backend/tests/batch_processor.test.js
Normal file
@@ -0,0 +1,97 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { BatchProcessor } from '../src/db/batchProcessor.js';
|
||||
|
||||
describe('BatchProcessor', () => {
|
||||
let dbManager;
|
||||
let config;
|
||||
let batchProcessor;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
dbManager = {
|
||||
insertRows: vi.fn().mockResolvedValue(true)
|
||||
};
|
||||
config = {
|
||||
db: { schema: 'test_schema', table: 'test_table' }
|
||||
};
|
||||
batchProcessor = new BatchProcessor(dbManager, config, { batchSize: 3, flushInterval: 1000 });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it('should buffer items and not flush until batch size is reached', async () => {
|
||||
const p1 = batchProcessor.add({ rows: ['r1'] });
|
||||
const p2 = batchProcessor.add({ rows: ['r2'] });
|
||||
|
||||
expect(dbManager.insertRows).not.toHaveBeenCalled();
|
||||
|
||||
const p3 = batchProcessor.add({ rows: ['r3'] });
|
||||
|
||||
// Wait for microtasks
|
||||
await Promise.resolve();
|
||||
|
||||
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
rows: ['r1', 'r2', 'r3']
|
||||
});
|
||||
|
||||
await expect(p1).resolves.toBe(1);
|
||||
await expect(p2).resolves.toBe(1);
|
||||
await expect(p3).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it('should flush when timer expires', async () => {
|
||||
const p1 = batchProcessor.add({ rows: ['r1'] });
|
||||
|
||||
expect(dbManager.insertRows).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
// Wait for microtasks
|
||||
await Promise.resolve();
|
||||
|
||||
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
rows: ['r1']
|
||||
});
|
||||
|
||||
await expect(p1).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it('should handle db error and reject all pending promises', async () => {
|
||||
dbManager.insertRows.mockRejectedValue(new Error('DB Fail'));
|
||||
|
||||
const p1 = batchProcessor.add({ rows: ['r1'] });
|
||||
const p2 = batchProcessor.add({ rows: ['r2'] });
|
||||
const p3 = batchProcessor.add({ rows: ['r3'] }); // Triggers flush
|
||||
|
||||
await expect(p1).rejects.toThrow('DB Fail');
|
||||
await expect(p2).rejects.toThrow('DB Fail');
|
||||
await expect(p3).rejects.toThrow('DB Fail');
|
||||
});
|
||||
|
||||
it('should handle mixed batch sizes', async () => {
|
||||
// 3 items with different row counts
|
||||
const p1 = batchProcessor.add({ rows: ['r1', 'r2'] });
|
||||
const p2 = batchProcessor.add({ rows: [] }); // Empty rows
|
||||
const p3 = batchProcessor.add({ rows: ['r3'] });
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
rows: ['r1', 'r2', 'r3']
|
||||
});
|
||||
|
||||
await expect(p1).resolves.toBe(2);
|
||||
await expect(p2).resolves.toBe(0);
|
||||
await expect(p3).resolves.toBe(1);
|
||||
});
|
||||
});
|
||||
124
bls-rcu-action-backend/tests/consumer_reliability.test.js
Normal file
124
bls-rcu-action-backend/tests/consumer_reliability.test.js
Normal file
@@ -0,0 +1,124 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { createKafkaConsumers } from '../src/kafka/consumer.js';
|
||||
import kafka from 'kafka-node';
|
||||
|
||||
// Mock kafka-node
|
||||
vi.mock('kafka-node', () => {
|
||||
return {
|
||||
ConsumerGroup: vi.fn(),
|
||||
default: { ConsumerGroup: vi.fn() }
|
||||
};
|
||||
});
|
||||
|
||||
describe('Consumer Reliability', () => {
|
||||
let mockConsumer;
|
||||
let onMessage;
|
||||
let onError;
|
||||
let healthCheck;
|
||||
|
||||
const kafkaConfig = {
|
||||
brokers: ['localhost:9092'],
|
||||
groupId: 'test-group',
|
||||
clientId: 'test-client',
|
||||
topic: 'test-topic',
|
||||
autoCommitIntervalMs: 5000
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
mockConsumer = {
|
||||
on: vi.fn(),
|
||||
commit: vi.fn(),
|
||||
pause: vi.fn(),
|
||||
resume: vi.fn(),
|
||||
close: vi.fn()
|
||||
};
|
||||
|
||||
kafka.ConsumerGroup.mockImplementation(function() {
|
||||
return mockConsumer;
|
||||
});
|
||||
|
||||
onMessage = vi.fn().mockResolvedValue(true);
|
||||
onError = vi.fn();
|
||||
healthCheck = {
|
||||
shouldPause: vi.fn().mockResolvedValue(false),
|
||||
check: vi.fn().mockResolvedValue(true)
|
||||
};
|
||||
});
|
||||
|
||||
it('should initialize with autoCommit: false', () => {
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError });
|
||||
expect(kafka.ConsumerGroup).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ autoCommit: false }),
|
||||
expect.anything()
|
||||
);
|
||||
});
|
||||
|
||||
it('should commit offset after successful message processing', async () => {
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError });
|
||||
|
||||
// Simulate 'message' event
|
||||
const message = { value: 'test' };
|
||||
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
|
||||
|
||||
await messageHandler(message);
|
||||
|
||||
expect(onMessage).toHaveBeenCalledWith(message);
|
||||
expect(mockConsumer.commit).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should NOT commit if processing fails and health check says pause', async () => {
|
||||
onMessage.mockRejectedValue(new Error('Fail'));
|
||||
healthCheck.shouldPause.mockResolvedValue(true);
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
|
||||
|
||||
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
|
||||
await messageHandler({ value: 'test' });
|
||||
|
||||
expect(mockConsumer.commit).not.toHaveBeenCalled();
|
||||
expect(onError).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should commit if processing fails but health check says continue (Data Error)', async () => {
|
||||
onMessage.mockRejectedValue(new Error('Data Error'));
|
||||
healthCheck.shouldPause.mockResolvedValue(false); // Do not pause, it's just bad data
|
||||
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
|
||||
|
||||
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
|
||||
await messageHandler({ value: 'bad_data' });
|
||||
|
||||
expect(mockConsumer.commit).toHaveBeenCalled(); // Should commit to move past bad data
|
||||
expect(onError).toHaveBeenCalled(); // Should still report error
|
||||
});
|
||||
|
||||
it('should pause and enter recovery mode if healthCheck.shouldPause returns true', async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
onMessage.mockRejectedValue(new Error('DB Error'));
|
||||
healthCheck.shouldPause.mockResolvedValue(true);
|
||||
healthCheck.check.mockResolvedValueOnce(false).mockResolvedValueOnce(true); // Fail once, then succeed
|
||||
|
||||
createKafkaConsumers({ kafkaConfig, onMessage, onError, healthCheck });
|
||||
const messageHandler = mockConsumer.on.mock.calls.find(call => call[0] === 'message')[1];
|
||||
|
||||
// Trigger error
|
||||
await messageHandler({ value: 'fail' });
|
||||
|
||||
expect(mockConsumer.pause).toHaveBeenCalled();
|
||||
expect(healthCheck.shouldPause).toHaveBeenCalled();
|
||||
|
||||
// Fast-forward time for interval check (1st check - fails)
|
||||
await vi.advanceTimersByTimeAsync(60000);
|
||||
expect(healthCheck.check).toHaveBeenCalledTimes(1);
|
||||
expect(mockConsumer.resume).not.toHaveBeenCalled();
|
||||
|
||||
// Fast-forward time for interval check (2nd check - succeeds)
|
||||
await vi.advanceTimersByTimeAsync(60000);
|
||||
expect(healthCheck.check).toHaveBeenCalledTimes(2);
|
||||
expect(mockConsumer.resume).toHaveBeenCalled();
|
||||
|
||||
vi.useRealTimers();
|
||||
});
|
||||
});
|
||||
47
bls-rcu-action-backend/tests/feature_flag_loop_name.test.js
Normal file
47
bls-rcu-action-backend/tests/feature_flag_loop_name.test.js
Normal file
@@ -0,0 +1,47 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import { buildRowsFromPayload } from '../src/processor/index.js';
|
||||
import { config } from '../src/config/config.js';
|
||||
|
||||
describe('Feature Toggle: Loop Name Auto Generation', () => {
|
||||
const basePayload = {
|
||||
ts_ms: 1700000000000,
|
||||
hotel_id: 1001,
|
||||
room_id: '8001',
|
||||
device_id: 'dev_001',
|
||||
direction: '上报',
|
||||
cmd_word: '0x36',
|
||||
frame_id: 1,
|
||||
udp_raw: '00',
|
||||
sys_lock_status: 0,
|
||||
report_count: 0,
|
||||
fault_count: 0,
|
||||
device_list: [
|
||||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }
|
||||
]
|
||||
};
|
||||
|
||||
let originalConfigValue;
|
||||
|
||||
beforeEach(() => {
|
||||
originalConfigValue = config.enableLoopNameAutoGeneration;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
config.enableLoopNameAutoGeneration = originalConfigValue;
|
||||
});
|
||||
|
||||
it('should generate loop_name when flag is true', () => {
|
||||
config.enableLoopNameAutoGeneration = true;
|
||||
const rows = buildRowsFromPayload(basePayload);
|
||||
// Expect format: [1强电继电器(输出状态)-10-1]
|
||||
// The exact name depends on the map, but it should contain brackets and numbers
|
||||
expect(rows[0].loop_name).toBeDefined();
|
||||
expect(rows[0].loop_name).toMatch(/^\[1.*-10-1\]$/);
|
||||
});
|
||||
|
||||
it('should NOT generate loop_name when flag is false', () => {
|
||||
config.enableLoopNameAutoGeneration = false;
|
||||
const rows = buildRowsFromPayload(basePayload);
|
||||
expect(rows[0].loop_name).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -1,5 +1,18 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import { buildRowsFromPayload } from '../src/processor/index.js';
|
||||
import projectMetadata from '../src/cache/projectMetadata.js';
|
||||
|
||||
// Mock config to ensure loop name generation is enabled
|
||||
vi.mock('../src/config/config.js', async (importOriginal) => {
|
||||
const actual = await importOriginal();
|
||||
return {
|
||||
...actual,
|
||||
config: {
|
||||
...actual.config,
|
||||
enableLoopNameAutoGeneration: true,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
describe('Processor Logic', () => {
|
||||
const basePayload = {
|
||||
@@ -95,12 +108,15 @@ describe('Processor Logic', () => {
|
||||
const payload = {
|
||||
...basePayload,
|
||||
direction: '上报',
|
||||
cmd_word: '0x0F'
|
||||
cmd_word: '0x0F',
|
||||
control_list: [
|
||||
{ dev_type: 1, dev_addr: 1, dev_loop: 1, dev_data: 1, type_h: 0 }
|
||||
]
|
||||
};
|
||||
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].action_type).toBe('ACK');
|
||||
expect(rows[0].action_type).toBe('设备回路状态');
|
||||
});
|
||||
|
||||
it('should fallback when lists are empty for 0x36', () => {
|
||||
@@ -199,4 +215,82 @@ describe('Processor Logic', () => {
|
||||
trace_id: 'trace-123'
|
||||
});
|
||||
});
|
||||
|
||||
it('should enrich rows with loop_name from metadata', () => {
|
||||
// Mock metadata
|
||||
projectMetadata.roomMap.set('dev_001', 101);
|
||||
// Key format: roomTypeId:00Type00Addr00Loop
|
||||
// type=1, addr=10, loop=1 -> 001010001
|
||||
projectMetadata.loopMap.set('101:001010001', 'Main Chandelier');
|
||||
|
||||
const payload = {
|
||||
...basePayload,
|
||||
direction: '上报',
|
||||
cmd_word: '0x36',
|
||||
device_list: [
|
||||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 }, // Should match 001010001
|
||||
{ dev_type: 1, dev_addr: 10, dev_loop: 2, dev_data: 0 } // Should not match (001010002) -> Fallback
|
||||
]
|
||||
};
|
||||
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
expect(rows[0].loop_name).toBe('Main Chandelier');
|
||||
// dev_type 1 is '强电继电器(输出状态)'
|
||||
expect(rows[1].loop_name).toBe('[1强电继电器(输出状态)-10-2]');
|
||||
});
|
||||
});
|
||||
|
||||
describe('Processor Logic - 0x0E Support', () => {
|
||||
const basePayload = {
|
||||
ts_ms: 1700000000000,
|
||||
hotel_id: 1001,
|
||||
room_id: '8001',
|
||||
device_id: 'dev_001',
|
||||
direction: '上报',
|
||||
cmd_word: '0x0E',
|
||||
frame_id: 1,
|
||||
udp_raw: 'AA552000543353413610CD63088151000000000000000001180003000114005ECB',
|
||||
sys_lock_status: 0,
|
||||
report_count: 0,
|
||||
fault_count: 0
|
||||
};
|
||||
|
||||
it('should handle 0x0E Status Report with device list (same as 0x36)', () => {
|
||||
const payload = {
|
||||
...basePayload,
|
||||
direction: '上报',
|
||||
cmd_word: '0x0E',
|
||||
report_count: 2,
|
||||
device_list: [
|
||||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, dev_data: 100 },
|
||||
{ dev_type: 1, dev_addr: 11, dev_loop: 2, dev_data: 0 }
|
||||
]
|
||||
};
|
||||
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows[0].action_type).toBe('设备回路状态');
|
||||
expect(rows[0].dev_addr).toBe(10);
|
||||
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
|
||||
expect(rows[1].dev_addr).toBe(11);
|
||||
expect(rows[0].details.device_list).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('should handle 0x0E Fault Report', () => {
|
||||
const payload = {
|
||||
...basePayload,
|
||||
direction: '上报',
|
||||
cmd_word: '0x0E',
|
||||
fault_count: 1,
|
||||
fault_list: [
|
||||
{ dev_type: 1, dev_addr: 10, dev_loop: 1, error_type: 2, error_data: 5 }
|
||||
]
|
||||
};
|
||||
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].action_type).toBe('设备回路状态');
|
||||
expect(rows[0].error_type).toBe(2);
|
||||
expect(rows[0].cmd_word).toBe('0x0e');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
2026-01-30T16:54:47: Error [ERR_MODULE_NOT_FOUND]: Cannot find package 'node-cron' imported from R:\nodejsROOT\bls\rcu-action\dist\index.js
|
||||
2026-01-30T16:54:47: at Object.getPackageJSONURL (node:internal/modules/package_json_reader:316:9)
|
||||
2026-01-30T16:54:47: at packageResolve (node:internal/modules/esm/resolve:768:81)
|
||||
2026-01-30T16:54:47: at moduleResolve (node:internal/modules/esm/resolve:858:18)
|
||||
2026-01-30T16:54:47: at defaultResolve (node:internal/modules/esm/resolve:990:11)
|
||||
2026-01-30T16:54:47: at #cachedDefaultResolve (node:internal/modules/esm/loader:737:20)
|
||||
2026-01-30T16:54:47: at ModuleLoader.resolve (node:internal/modules/esm/loader:714:38)
|
||||
2026-01-30T16:54:47: at ModuleLoader.getModuleJobForImport (node:internal/modules/esm/loader:293:38)
|
||||
2026-01-30T16:54:47: at #link (node:internal/modules/esm/module_job:208:49)
|
||||
2026-01-30T16:54:47: at process.processTicksAndRejections (node:internal/process/task_queues:103:5) {
|
||||
2026-01-30T16:54:47: code: 'ERR_MODULE_NOT_FOUND'
|
||||
2026-01-30T16:54:47: }
|
||||
2026-01-30T16:56:12: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763372054,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:12: {"level":"error","message":"Service bootstrap failed","timestamp":1769763372055,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:12: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763372929,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:12: {"level":"error","message":"Service bootstrap failed","timestamp":1769763372929,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:13: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763373801,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:13: {"level":"error","message":"Service bootstrap failed","timestamp":1769763373801,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:14: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763374671,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:14: {"level":"error","message":"Service bootstrap failed","timestamp":1769763374671,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:15: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763375539,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:15: {"level":"error","message":"Service bootstrap failed","timestamp":1769763375539,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:16: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763376418,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:16: {"level":"error","message":"Service bootstrap failed","timestamp":1769763376419,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:17: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763377290,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:17: {"level":"error","message":"Service bootstrap failed","timestamp":1769763377291,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:18: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763378161,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:18: {"level":"error","message":"Service bootstrap failed","timestamp":1769763378162,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:19: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763379035,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:19: {"level":"error","message":"Service bootstrap failed","timestamp":1769763379035,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:19: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763379920,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:19: {"level":"error","message":"Service bootstrap failed","timestamp":1769763379921,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:20: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763380801,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:20: {"level":"error","message":"Service bootstrap failed","timestamp":1769763380802,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:21: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763381675,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:21: {"level":"error","message":"Service bootstrap failed","timestamp":1769763381675,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:22: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763382560,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:22: {"level":"error","message":"Service bootstrap failed","timestamp":1769763382561,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:23: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763383432,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:23: {"level":"error","message":"Service bootstrap failed","timestamp":1769763383433,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:24: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763384307,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:24: {"level":"error","message":"Service bootstrap failed","timestamp":1769763384307,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
2026-01-30T16:56:25: {"level":"error","message":"Error initializing schema and table:","timestamp":1769763385185,"context":{"errno":-4058,"code":"ENOENT","syscall":"open","path":"R:\\nodejsROOT\\bls\\scripts\\init_db.sql"}}
|
||||
2026-01-30T16:56:25: {"level":"error","message":"Service bootstrap failed","timestamp":1769763385185,"context":{"error":"ENOENT: no such file or directory, open 'R:\\nodejsROOT\\bls\\scripts\\init_db.sql'"}}
|
||||
@@ -75,4 +75,22 @@ ACK (待补充)
|
||||
|
||||
5. 队列结构
|
||||
队列分区数:6
|
||||
Topic:blwlog4Nodejs-rcu-action-topic
|
||||
Topic:blwlog4Nodejs-rcu-action-topic
|
||||
|
||||
|
||||
6. 入库前特殊操作
|
||||
- 定期从temporary_project表中读取数据并保存到`内存`中(以全局变量的形式),每天凌晨1点从数据库更新一次。
|
||||
- 每条数据写库之前,需要根据项目ID从内存中读取项目信息。
|
||||
- 我需要在现有数据表`rcu_action_events`里,添加一个字段:`loop_name`,用于存储回路名称。
|
||||
- 查询`loop_name`的方法是:
|
||||
- 根据要插入的数据中的`device_id`在`rooms`表中找出对应的房间 -> 得到 `room_type_id`。
|
||||
- 根据 `room_type_id` 和 `loop_address` 在 `loops` 表中查找对应的 `loop_name`。
|
||||
- `loop_address` 的生成规则:将数据的 `dev_type`、`dev_addr`、`dev_loop` 分别转换为 3 位字符串(不足前方补 0),然后拼接。
|
||||
- 例如:`dev_type=1, dev_addr=23, dev_loop=12` -> `001` + `023` + `012` -> `001023012`。
|
||||
- **兜底逻辑**:如果根据上述规则在 `loops` 缓存中未找到对应的 `loop_name`,则使用 `dev_type` 对应的设备名称(配置在 `ACTION_TYPE_DEV_TYPE_RULES` 中)默认名称。
|
||||
- 格式:`[dev_type名称+'-'+dev_addr+'-'+dev_loop]`
|
||||
- 例如:`dev_type=35` (名称: TimeCtrl), `addr=14`, `loop=21` -> `[35TimeCtrl-14-21]`
|
||||
- 最后将找到的或生成的 `loop_name` 写入 `rcu_action_events` 表。
|
||||
- 注意,所有查库操作都要通过内存缓存来实现。
|
||||
|
||||
|
||||
|
||||
141
docs/room_status_moment.sql
Normal file
141
docs/room_status_moment.sql
Normal file
@@ -0,0 +1,141 @@
|
||||
-- ============================================================================
|
||||
-- 数据库初始化脚本
|
||||
-- 描述:创建 log_platform 库(逻辑参考)、room_status 模式及 room_status_moment 分区表
|
||||
-- 对应项目需求:project.md #L57-65
|
||||
-- ============================================================================
|
||||
|
||||
-- 注意:在 PostgreSQL 中,CREATE DATABASE 不能在事务块中执行。
|
||||
-- 通常建议先手动创建数据库,然后再执行后续脚本。
|
||||
-- CREATE DATABASE log_platform;
|
||||
|
||||
-- 切换到 log_platform 数据库后执行以下内容:
|
||||
|
||||
-- 1. 创建模式
|
||||
CREATE SCHEMA IF NOT EXISTS room_status;
|
||||
|
||||
-- 2. 创建主表 (使用声明式分区)
|
||||
-- 根据需求 L57-65,考虑后期十万级以上数据的扩展,按 hotel_id 进行 LIST 分区
|
||||
CREATE TABLE IF NOT EXISTS room_status.room_status_moment (
|
||||
-- 基础标识字段
|
||||
guid UUID NOT NULL,
|
||||
ts_ms INT8 NOT NULL DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT,
|
||||
hotel_id INT2 NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
|
||||
-- 设备状态字段
|
||||
sys_lock_status INT2,
|
||||
online_status INT2,
|
||||
launcher_version TEXT,
|
||||
app_version TEXT,
|
||||
config_version TEXT,
|
||||
register_ts_ms INT8,
|
||||
upgrade_ts_ms INT8,
|
||||
config_ts_ms INT8,
|
||||
ip TEXT,
|
||||
|
||||
-- 房间业务状态字段
|
||||
pms_status INT2,
|
||||
power_state INT2,
|
||||
cardless_state INT2,
|
||||
service_mask INT8,
|
||||
insert_card INT2,
|
||||
bright_g INT2,
|
||||
agreement_ver TEXT,
|
||||
|
||||
-- 空调相关
|
||||
air_address TEXT[],
|
||||
air_state INT2[],
|
||||
air_model INT2[],
|
||||
air_speed INT2[],
|
||||
air_set_temp INT2[],
|
||||
air_now_temp INT2[],
|
||||
air_solenoid_valve INT2[],
|
||||
|
||||
-- 能耗相关
|
||||
elec_address TEXT[],
|
||||
elec_voltage DOUBLE PRECISION[],
|
||||
elec_ampere DOUBLE PRECISION[],
|
||||
elec_power DOUBLE PRECISION[],
|
||||
elec_phase DOUBLE PRECISION[],
|
||||
elec_energy DOUBLE PRECISION[],
|
||||
elec_sum_energy DOUBLE PRECISION[],
|
||||
|
||||
-- 节能与外设
|
||||
carbon_state INT2,
|
||||
dev_loops JSONB,
|
||||
energy_carbon_sum DOUBLE PRECISION,
|
||||
energy_nocard_sum DOUBLE PRECISION,
|
||||
external_device JSONB DEFAULT '{}',
|
||||
faulty_device_count JSONB DEFAULT '{}',
|
||||
|
||||
-- 约束:分区表的主键必须包含分区键 (hotel_id)
|
||||
PRIMARY KEY (hotel_id, room_id, device_id, guid)
|
||||
) PARTITION BY LIST (hotel_id);
|
||||
|
||||
-- 3. 创建索引 (针对高频查询字段)
|
||||
-- 注意:在分区表上创建索引会自动在所有子表上创建对应的索引
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_hotel_room ON room_status.room_status_moment (hotel_id, room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_device_id ON room_status.room_status_moment (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_sys_lock ON room_status.room_status_moment (sys_lock_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_online ON room_status.room_status_moment (online_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_pms ON room_status.room_status_moment (pms_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_power ON room_status.room_status_moment (power_state);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_cardless ON room_status.room_status_moment (cardless_state);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_insert_card ON room_status.room_status_moment (insert_card);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_carbon ON room_status.room_status_moment (carbon_state);
|
||||
|
||||
-- 3.1 唯一索引 (支持 UPSERT)
|
||||
-- 必须在 (hotel_id, room_id, device_id) 上建立唯一约束,才能使用 ON CONFLICT
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_room_status_unique_device
|
||||
ON room_status.room_status_moment (hotel_id, room_id, device_id);
|
||||
|
||||
-- 4. 示例:创建第一个分区 (hotel_id = 1)
|
||||
-- 实际部署时,可根据 hotel_id 动态创建分区
|
||||
CREATE TABLE IF NOT EXISTS room_status.room_status_moment_h1
|
||||
PARTITION OF room_status.room_status_moment
|
||||
FOR VALUES IN (1);
|
||||
|
||||
-- 5. 添加表和字段注释
|
||||
COMMENT ON TABLE room_status.room_status_moment IS '房间即时状态表 - 记录设备及房间业务的最新实时状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.guid IS '主键 guid uuid 32位无符号UUID';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.ts_ms IS '最后更新时间';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.hotel_id IS '酒店';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.room_id IS '房间';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.device_id IS '设备编号';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.sys_lock_status IS '系统锁状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.online_status IS '设备在线状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.launcher_version IS '设备launcher版本';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.app_version IS '设备App版本';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.config_version IS '设备配置版本';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.register_ts_ms IS '最后一次注册时间';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.upgrade_ts_ms IS '最后一次升级时间';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.config_ts_ms IS '最后一次下发配置时间';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.ip IS '当前公网IP地址';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.pms_status IS 'PMS状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.power_state IS '取电状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.cardless_state IS '有、无人状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.service_mask IS '服务状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.insert_card IS '插卡状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_address IS '空调地址';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_state IS '空调状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_model IS '空调模型';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_speed IS '空调风速';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_set_temp IS '空调设置温度';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_now_temp IS '房间当前温度';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.air_solenoid_valve IS '空调电磁阀状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_address IS '能耗表地址';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_voltage IS '能耗表电压';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_ampere IS '能耗表电流';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_power IS '能耗表功率';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_phase IS '当前相位';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_energy IS '能耗表能耗';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.elec_sum_energy IS '能耗表累计能耗';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.carbon_state IS '碳达人状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.bright_g IS '光亮值';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.agreement_ver IS '协议版本';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.dev_loops IS '回路状态';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.energy_carbon_sum IS '碳达人节能累计';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.energy_nocard_sum IS '无卡节能累计';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.external_device IS '外设设备管理(数组)';
|
||||
COMMENT ON COLUMN room_status.room_status_moment.faulty_device_count IS '故障设备数量';
|
||||
231
docs/temporary_project.sql
Normal file
231
docs/temporary_project.sql
Normal file
@@ -0,0 +1,231 @@
|
||||
/*
|
||||
Navicat Premium Dump SQL
|
||||
|
||||
Source Server : FnOS 109
|
||||
Source Server Type : PostgreSQL
|
||||
Source Server Version : 150004 (150004)
|
||||
Source Host : 10.8.8.109:5433
|
||||
Source Catalog : log_platform
|
||||
Source Schema : temporary_project
|
||||
|
||||
Target Server Type : PostgreSQL
|
||||
Target Server Version : 150004 (150004)
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 02/02/2026 14:28:41
|
||||
*/
|
||||
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for hotels
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."hotels";
|
||||
CREATE TABLE "temporary_project"."hotels" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"hotel_id" int4 NOT NULL,
|
||||
"hotel_name" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"id" int4
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for loops_default
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."loops_default";
|
||||
CREATE TABLE "temporary_project"."loops_default" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"id" int4 NOT NULL,
|
||||
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"room_type_id" int4 NOT NULL,
|
||||
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for room_type
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."room_type";
|
||||
CREATE TABLE "temporary_project"."room_type" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"id" int4 NOT NULL,
|
||||
"room_type_name" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"hotel_id" int4
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for rooms_default
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."rooms_default";
|
||||
CREATE TABLE "temporary_project"."rooms_default" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"hotel_id" int4 NOT NULL,
|
||||
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"room_type_id" int4,
|
||||
"device_id" varchar(50) COLLATE "pg_catalog"."default",
|
||||
"mac" varchar(50) COLLATE "pg_catalog"."default",
|
||||
"id" int4
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for loops
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."loops";
|
||||
CREATE TABLE "temporary_project"."loops" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"id" int4 NOT NULL,
|
||||
"loop_name" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"room_type_id" int4 NOT NULL,
|
||||
"loop_address" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"loop_type" varchar(50) COLLATE "pg_catalog"."default"
|
||||
)
|
||||
PARTITION BY (
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for rooms
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "temporary_project"."rooms";
|
||||
CREATE TABLE "temporary_project"."rooms" (
|
||||
"guid" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"hotel_id" int4 NOT NULL,
|
||||
"room_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"room_type_id" int4,
|
||||
"device_id" varchar(50) COLLATE "pg_catalog"."default",
|
||||
"mac" varchar(50) COLLATE "pg_catalog"."default",
|
||||
"id" int4
|
||||
)
|
||||
PARTITION BY LIST (
|
||||
"hotel_id" "pg_catalog"."int4_ops"
|
||||
)
|
||||
;
|
||||
ALTER TABLE "temporary_project"."rooms" ATTACH PARTITION "temporary_project"."rooms_default" DEFAULT;
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table hotels
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_hotels_hotel_id" ON "temporary_project"."hotels" USING btree (
|
||||
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_hotels_hotel_name" ON "temporary_project"."hotels" USING btree (
|
||||
"hotel_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_hotels_id" ON "temporary_project"."hotels" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table hotels
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."hotels" ADD CONSTRAINT "hotels_pkey" PRIMARY KEY ("hotel_id", "guid");
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table loops_default
|
||||
-- ----------------------------
|
||||
CREATE INDEX "loops_default_id_idx" ON "temporary_project"."loops_default" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "loops_default_loop_address_idx" ON "temporary_project"."loops_default" USING btree (
|
||||
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "loops_default_loop_name_idx" ON "temporary_project"."loops_default" USING btree (
|
||||
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "loops_default_loop_type_idx" ON "temporary_project"."loops_default" USING btree (
|
||||
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "loops_default_room_type_id_idx" ON "temporary_project"."loops_default" USING btree (
|
||||
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table loops_default
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."loops_default" ADD CONSTRAINT "loops_default_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table room_type
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_room_type_hotel_id" ON "temporary_project"."room_type" USING btree (
|
||||
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_room_type_id" ON "temporary_project"."room_type" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_room_type_name" ON "temporary_project"."room_type" USING btree (
|
||||
"room_type_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table room_type
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."room_type" ADD CONSTRAINT "room_type_pkey" PRIMARY KEY ("guid", "id");
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table rooms_default
|
||||
-- ----------------------------
|
||||
CREATE INDEX "rooms_default_device_id_idx" ON "temporary_project"."rooms_default" USING btree (
|
||||
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "rooms_default_hotel_id_idx" ON "temporary_project"."rooms_default" USING btree (
|
||||
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "rooms_default_id_idx" ON "temporary_project"."rooms_default" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "rooms_default_mac_idx" ON "temporary_project"."rooms_default" USING btree (
|
||||
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table rooms_default
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."rooms_default" ADD CONSTRAINT "rooms_default_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table loops
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_loops_address" ON "temporary_project"."loops" USING btree (
|
||||
"loop_address" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_loops_id" ON "temporary_project"."loops" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_loops_name" ON "temporary_project"."loops" USING btree (
|
||||
"loop_name" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_loops_room_type_id" ON "temporary_project"."loops" USING btree (
|
||||
"room_type_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_loops_type" ON "temporary_project"."loops" USING btree (
|
||||
"loop_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table loops
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."loops" ADD CONSTRAINT "loops_pkey" PRIMARY KEY ("guid", "id", "room_type_id");
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table rooms
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_rooms_device_id" ON "temporary_project"."rooms" USING btree (
|
||||
"device_id" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_rooms_hotel_id" ON "temporary_project"."rooms" USING btree (
|
||||
"hotel_id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_rooms_id" ON "temporary_project"."rooms" USING btree (
|
||||
"id" "pg_catalog"."int4_ops" ASC NULLS LAST
|
||||
);
|
||||
CREATE INDEX "idx_rooms_mac" ON "temporary_project"."rooms" USING btree (
|
||||
"mac" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table rooms
|
||||
-- ----------------------------
|
||||
ALTER TABLE "temporary_project"."rooms" ADD CONSTRAINT "rooms_pkey" PRIMARY KEY ("guid", "hotel_id", "room_id");
|
||||
45
docs/测试报告.md
45
docs/测试报告.md
@@ -1,45 +0,0 @@
|
||||
# 测试报告
|
||||
|
||||
## 基本信息
|
||||
- 运行时间: 2026-01-29
|
||||
- 运行方式: 控制台启动 `npm run dev`,运行约 60 秒后 Ctrl + C 终止
|
||||
- 测试目标: 验证 Kafka 消费与入库链路,定位无入库原因
|
||||
|
||||
## 控制台关键日志
|
||||
```
|
||||
{"level":"error","message":"Message processing failed","timestamp":1769734880590,"context":{"error":"[\n {\n \"expected\": \"number\",\n \"code\": \"invalid_type\",\n \"path\": [\n \"hotel_id\"\n ],\n \"message\": \"Invalid input: expected number, received string\"\n }\n]","type":"PARSE_ERROR","stack":"ZodError: ...","rawPayload":"{\"ts_ms\":1769692878011,\"hotel_id\":\"2147\",\"room_id\":\"8209\",\"device_id\":\"099008129081\",\"direction\":\"上报\",\"cmd_word\":\"36\",\"frame_id\":52496,...}","validationIssues":[{"expected":"number","code":"invalid_type","path":["hotel_id"],"message":"Invalid input: expected number, received string"}]}}
|
||||
```
|
||||
|
||||
## 结论
|
||||
- 数据未入库的直接原因: Kafka 消息在解析阶段触发 Zod 校验失败,`hotel_id` 为字符串类型而非文档要求的 Number,导致 `PARSE_ERROR`,数据库插入流程未执行。
|
||||
|
||||
## 与文档格式的一致性检查
|
||||
对照 [kafka_format.md](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/docs/kafka_format.md):
|
||||
- `hotel_id`: 文档要求 Number,但实测为字符串 (示例: `"2147"`),不一致。
|
||||
- `cmd_word`: 文档要求 `"0x36"`/`"0x0F"`,实测为 `"36"`,不一致。
|
||||
- `control_list`: 文档要求 Array/可选,但实测为 `null`,不一致。
|
||||
- 其余关键字段如 `ts_ms`, `room_id`, `device_id`, `direction`, `udp_raw` 均存在。
|
||||
|
||||
## 已增强的控制台错误输出
|
||||
为了便于定位异常,以下模块已经增加详细错误输出到 PowerShell 控制台:
|
||||
- Kafka 处理异常: 输出 `type`, `stack`, `rawPayload`, `validationIssues`, `dbContext`
|
||||
- 数据库插入异常: 输出 `schema`, `table`, `rowsLength`
|
||||
- Redis 入队与重试异常: 输出详细错误信息
|
||||
|
||||
相关改动文件:
|
||||
- [index.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/index.js)
|
||||
- [databaseManager.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/db/databaseManager.js)
|
||||
- [errorQueue.js](file:///e:/Project_Class/BLS/Web_BLS_RCUAction_Server/bls-rcu-action-backend/src/redis/errorQueue.js)
|
||||
|
||||
## 建议修改方向
|
||||
以下为解决无入库问题的可选方案,由你决定是否执行:
|
||||
1. 上游严格按文档输出:
|
||||
- `hotel_id` 改为 Number
|
||||
- `cmd_word` 改为 `"0x36"` / `"0x0F"`
|
||||
- `control_list` 用 `[]` 或省略字段,避免 `null`
|
||||
2. 下游放宽校验并做类型转换:
|
||||
- 将 `hotel_id` 支持字符串并转换为 Number
|
||||
- 继续兼容 `cmd_word = "36"` 的写法
|
||||
- `control_list/device_list/fault_list` 接受 `null` 并转为空数组
|
||||
|
||||
当前代码已兼容 `cmd_word="36"` 和 `control_list=null`,但 `hotel_id` 仍按文档严格要求 Number。
|
||||
1
node_modules/.vite/vitest/da39a3ee5e6b4b0d3255bfef95601890afd80709/results.json
generated
vendored
Normal file
1
node_modules/.vite/vitest/da39a3ee5e6b4b0d3255bfef95601890afd80709/results.json
generated
vendored
Normal file
@@ -0,0 +1 @@
|
||||
{"version":"4.0.18","results":[[":bls-rcu-action-backend/tests/consumer_reliability.test.js",{"duration":9.49589999999995,"failed":false}]]}
|
||||
115
openspec/changes/2026-02-06-room-status-moment/spec.md
Normal file
115
openspec/changes/2026-02-06-room-status-moment/spec.md
Normal file
@@ -0,0 +1,115 @@
|
||||
# Room Status Moment 集成方案
|
||||
|
||||
## 1. 背景
|
||||
我们需要将一个新的数据库表 `room_status.room_status_moment`(快照表)集成到现有的 Kafka 处理流程中。
|
||||
该表用于存储每个房间/设备的最新状态。
|
||||
现有的逻辑(批量插入到 `rcu_action_events`)必须保持不变。
|
||||
|
||||
## 2. 数据库配置
|
||||
新表位于一个独立的数据库中(可能是 `log_platform`,或者现有数据库中的新模式 `room_status`)。
|
||||
我们将添加对 `ROOM_STATUS` 独立连接池的支持,以确保灵活性。
|
||||
|
||||
**环境变量配置:**
|
||||
```env
|
||||
# 现有数据库配置
|
||||
DB_HOST=...
|
||||
...
|
||||
|
||||
# 新 Room Status 数据库配置 (如果未提供,默认使用现有数据库,但使用独立的连接池)
|
||||
ROOM_STATUS_DB_HOST=...
|
||||
ROOM_STATUS_DB_PORT=...
|
||||
ROOM_STATUS_DB_USER=...
|
||||
ROOM_STATUS_DB_PASSWORD=...
|
||||
ROOM_STATUS_DB_DATABASE=log_platform <-- SQL 脚本中的目标数据库名
|
||||
ROOM_STATUS_DB_SCHEMA=room_status
|
||||
```
|
||||
|
||||
## 3. 字段映射策略
|
||||
|
||||
目标表:`room_status.room_status_moment`
|
||||
唯一键:`(hotel_id, room_id, device_id)`
|
||||
|
||||
| 源字段 (Kafka) | 目标字段 | 更新逻辑 |
|
||||
| :--- | :--- | :--- |
|
||||
| `hotel_id` | `hotel_id` | 主键/索引键 |
|
||||
| `room_id` | `room_id` | 主键/索引键 |
|
||||
| `device_id` | `device_id` | 主键/索引键 |
|
||||
| `ts_ms` | `ts_ms` | 始终更新为最新值 |
|
||||
| `sys_lock_status` | `sys_lock_status` | 直接映射 (如果存在) |
|
||||
| `device_list` (0x36) <br> `control_list` (0x0F) | `dev_loops` (JSONB) | **合并策略 (Merge)**: <br> Key: `001002003` (Type(3)+Addr(3)+Loop(3)) <br> Value: `dev_data` (int) <br> 操作: `old_json || new_json` (旧值合并新值) |
|
||||
| `fault_list` (0x36) | `faulty_device_count` (JSONB) | **替换策略 (Replace)**: <br> 由于 0x36 上报的是完整故障列表,我们直接覆盖该字段。<br> 内容: `{dev_type, dev_addr, dev_loop, error_type, error_data}` 的列表 |
|
||||
| `fault_list` -> item `error_type=1` | `online_status` | 如果 `error_data=1` -> 离线 (0) <br> 如果 `error_data=0` -> 在线 (1) <br> *需要验证具体的映射约定* |
|
||||
|
||||
**关于在线状态 (Online Status) 的说明**:
|
||||
文档描述: "0x01: 0:在线 1:离线"。
|
||||
表字段 `online_status` 类型为 INT2。
|
||||
约定:通常 1=在线, 0=离线。
|
||||
逻辑:
|
||||
- 如果故障类型 0x01, 数据 0 (在线) -> 设置 `online_status` = 1
|
||||
- 如果故障类型 0x01, 数据 1 (离线) -> 设置 `online_status` = 0
|
||||
- 否则 -> 不更新 `online_status`
|
||||
|
||||
## 4. Upsert 逻辑 (PostgreSQL)
|
||||
|
||||
我们将使用 `INSERT ... ON CONFLICT DO UPDATE` 语法。
|
||||
|
||||
```sql
|
||||
INSERT INTO room_status.room_status_moment (
|
||||
guid, ts_ms, hotel_id, room_id, device_id,
|
||||
sys_lock_status, online_status,
|
||||
dev_loops, faulty_device_count
|
||||
) VALUES (
|
||||
$guid, $ts_ms, $hotel_id, $room_id, $device_id,
|
||||
$sys_lock_status, $online_status,
|
||||
$dev_loops::jsonb, $faulty_device_count::jsonb
|
||||
)
|
||||
ON CONFLICT (hotel_id, room_id, device_id)
|
||||
DO UPDATE SET
|
||||
ts_ms = EXCLUDED.ts_ms,
|
||||
-- 仅在新数据不为空时更新 sys_lock_status
|
||||
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, room_status.room_status_moment.sys_lock_status),
|
||||
-- 仅在新数据不为空时更新 online_status
|
||||
online_status = COALESCE(EXCLUDED.online_status, room_status.room_status_moment.online_status),
|
||||
-- 合并 dev_loops
|
||||
dev_loops = CASE
|
||||
WHEN EXCLUDED.dev_loops IS NULL THEN room_status.room_status_moment.dev_loops
|
||||
ELSE COALESCE(room_status.room_status_moment.dev_loops, '{}'::jsonb) || EXCLUDED.dev_loops
|
||||
END,
|
||||
-- 如果存在则替换 faulty_device_count
|
||||
faulty_device_count = COALESCE(EXCLUDED.faulty_device_count, room_status.room_status_moment.faulty_device_count)
|
||||
WHERE
|
||||
-- 可选优化:仅在时间戳更新时写入
|
||||
-- 注意:对于 JSON 合并,很难在不计算的情况下检测是否相等。
|
||||
-- 我们依赖 ts_ms 的变化来表示数据的“新鲜度”。
|
||||
EXCLUDED.ts_ms >= room_status.room_status_moment.ts_ms
|
||||
;
|
||||
```
|
||||
|
||||
## 5. 架构变更
|
||||
|
||||
1. **`src/config/config.js`**: 添加 `roomStatusDb` 配置项。
|
||||
2. **`src/db/roomStatusManager.js`**: 新增单例类,用于管理 `log_platform` 的数据库连接池。
|
||||
3. **`src/db/statusBatchProcessor.js`**: 针对 `room_status_moment` 的专用批量处理器。
|
||||
* **原因**: Upsert 逻辑复杂,且与 `rcu_action_events` 的追加写日志模式不同。
|
||||
* 它需要在批处理内聚合每个设备的更新,以减少数据库负载(去重)。
|
||||
4. **`src/processor/statusExtractor.js`**: 辅助工具,用于将 `KafkaPayload` 转换为 `StatusRow` 数据结构。
|
||||
5. **`src/index.js`**: 挂载新的处理器逻辑。
|
||||
|
||||
## 6. 去重策略 (内存批量聚合)
|
||||
由于 `room_status_moment` 是快照表,如果我们在 1 秒内收到同一设备的 10 次更新:
|
||||
- 我们只需要写入 **最后一次** 的状态(或合并后的状态)。
|
||||
- `StatusBatchProcessor` 应该维护一个映射: `Map<Key(hotel,room,device), LatestData>`。
|
||||
- 在 Flush 时,将 Map 的值转换为批量 Upsert 操作。
|
||||
- **约束**: `dev_loops` 的更新如果是针对不同回路的,可能需要累积合并。
|
||||
- **优化策略**:
|
||||
- 如果 `dev_loops` 是部分更新,我们不能简单地取最后一条消息。
|
||||
- 但是,在短时间的批处理窗口(例如 500ms)内,我们可以在内存中将它们合并后再发送给数据库。
|
||||
- 结构: `Map<Key, MergedState>`
|
||||
- 逻辑: `MergedState.dev_loops = Object.assign({}, old.dev_loops, new.dev_loops)`
|
||||
|
||||
## 7. 执行计划
|
||||
1. 添加配置 (Config) 和数据库管理器 (DB Manager)。
|
||||
2. 实现 `StatusExtractor` (将 Kafka 载荷转换为快照数据)。
|
||||
3. 实现 `StatusBatchProcessor` (包含内存合并逻辑)。
|
||||
4. 更新 `processKafkaMessage`,使其同时返回 `LogRows` (现有) 和 `StatusUpdate` (新增)。
|
||||
5. 在主循环中处理分发。
|
||||
@@ -0,0 +1,32 @@
|
||||
# Feature: Loop Name Enrichment
|
||||
|
||||
**Status**: Implemented
|
||||
**Date**: 2026-02-02
|
||||
|
||||
## Summary
|
||||
Enrich incoming RCU action events with `loop_name` by looking up metadata from `temporary_project` tables. This allows easier identification of specific device loops (e.g., "Main Chandelier") in the event log.
|
||||
|
||||
## Requirements
|
||||
1. **Cache Mechanism**:
|
||||
- Load `rooms` and `loops` data from `temporary_project` schema into memory.
|
||||
- Refresh cache daily at 1:00 AM.
|
||||
2. **Enrichment**:
|
||||
- For each incoming event, look up `loop_name` using `device_id` and `dev_addr`.
|
||||
- `device_id` -> `room_type_id` (via `rooms` table).
|
||||
- `room_type_id` + `dev_addr` -> `loop_name` (via `loops` table).
|
||||
3. **Storage**:
|
||||
- Store `loop_name` in `rcu_action_events` table.
|
||||
|
||||
## Ambiguity Resolution
|
||||
- The requirement mentioned matching `dev_type` to find the loop. However, standard RCU addressing uses `dev_addr` (and `dev_loop`). We assume `loops.loop_address` corresponds to the packet's `dev_addr` (converted to string).
|
||||
- We will attempt to match `dev_addr` against `loop_address`.
|
||||
|
||||
## Schema Changes
|
||||
- **Table**: `rcu_action.rcu_action_events`
|
||||
- **Column**: `loop_name` (VARCHAR(255), Nullable)
|
||||
|
||||
## Implementation Plan
|
||||
1. **Database**: Update `init_db.sql` and `databaseManager.js`.
|
||||
2. **Cache**: Create `src/cache/projectMetadata.js`.
|
||||
3. **Processor**: Integrate cache lookup in `src/processor/index.js`.
|
||||
4. **Lifecycle**: Initialize cache in `src/index.js`.
|
||||
@@ -0,0 +1,22 @@
|
||||
# Feature: Feature Toggle Loop Name Generation
|
||||
|
||||
**Status**: Implemented
|
||||
**Date**: 2026-02-03
|
||||
|
||||
## Summary
|
||||
Add a configuration switch to control the fallback behavior for `loop_name` generation. Currently, when cache lookup fails, the system auto-generates a name using the format `[Type-Addr-Loop]`. This change allows users to enable or disable this fallback behavior via an environment variable.
|
||||
|
||||
## Requirements
|
||||
1. **Configuration**:
|
||||
- Add `ENABLE_LOOP_NAME_AUTO_GENERATION` to environment variables.
|
||||
- Default behavior should match existing logic (enable generation) if not specified, but user requested explicit control.
|
||||
- If `true`: Perform concatenation `[dev_type名称+'-'+dev_addr+'-'+dev_loop]`.
|
||||
- If `false`: Do not generate name, leave `loop_name` as null (or whatever default is appropriate, likely null).
|
||||
|
||||
2. **Processor Logic**:
|
||||
- In `getLoopNameWithFallback`, check the configuration flag before applying the fallback generation logic.
|
||||
|
||||
## Implementation Plan
|
||||
1. **Config**: Update `src/config/config.js` to parse `ENABLE_LOOP_NAME_AUTO_GENERATION`.
|
||||
2. **Env**: Update `.env` and `.env.example`.
|
||||
3. **Processor**: Update `src/processor/index.js` to respect the flag.
|
||||
@@ -0,0 +1,27 @@
|
||||
# Summary of Changes: Loop Name Features
|
||||
|
||||
**Date**: 2026-02-03
|
||||
**Status**: Archived
|
||||
|
||||
## Overview
|
||||
This archive contains the specifications and proposals for the Loop Name Enrichment and Auto-Generation features. These features enhance the RCU Action Server by enriching event data with descriptive loop names derived from project metadata or fallback generation logic.
|
||||
|
||||
## Included Changes
|
||||
|
||||
### 1. [Feature: Loop Name Enrichment](./feature-loop-name-enrichment.md)
|
||||
- **Goal**: Enrich `rcu_action_events` with `loop_name` by looking up cached metadata from `temporary_project` tables.
|
||||
- **Key Components**:
|
||||
- `ProjectMetadataCache`: Loads rooms and loops data daily.
|
||||
- `loop_name` column added to `rcu_action_events` table.
|
||||
- Processor logic updated to perform lookup.
|
||||
|
||||
### 2. [Feature: Feature Toggle Loop Name Generation](./feature-toggle-loop-name-generation.md)
|
||||
- **Goal**: Provide a configuration switch to control the fallback behavior when cache lookup fails.
|
||||
- **Key Components**:
|
||||
- `ENABLE_LOOP_NAME_AUTO_GENERATION` env var.
|
||||
- Logic to generate `[Type-Addr-Loop]` format only if enabled.
|
||||
- Default behavior is `true` (enabled).
|
||||
|
||||
## Implementation Status
|
||||
- All proposed features have been implemented and verified via unit tests.
|
||||
- Configuration variables are documented in `.env.example`.
|
||||
@@ -0,0 +1,49 @@
|
||||
# Reliable Kafka Consumption & DB Offline Handling
|
||||
|
||||
- **Status**: Completed
|
||||
- **Author**: AI Assistant
|
||||
- **Created**: 2026-02-04
|
||||
|
||||
## Context
|
||||
|
||||
Currently, the Kafka consumer is configured with `autoCommit: true`. This means offsets are committed periodically regardless of whether the data was successfully processed and stored in the database. If the database insertion fails (e.g., due to a constraint violation or connection loss), the message is considered "consumed" by Kafka, leading to data loss.
|
||||
|
||||
Additionally, if the PostgreSQL database goes offline, the consumer continues to try processing messages, likely filling logs with errors and potentially losing data if retries aren't handled correctly. The user requires a mechanism to pause consumption during DB outages and resume only when the DB is back online.
|
||||
|
||||
## Proposal
|
||||
|
||||
We propose to enhance the reliability of the ingestion pipeline by:
|
||||
|
||||
1. **Disabling Auto-Commit**:
|
||||
- Set `autoCommit: false` in the Kafka `ConsumerGroup` configuration.
|
||||
- Implement manual offset committing only after the database insertion is confirmed successful.
|
||||
|
||||
2. **Implementing DB Offline Handling (Circuit Breaker)**:
|
||||
- Detect database connection errors during insertion.
|
||||
- If a connection error occurs:
|
||||
1. Pause the Kafka consumer immediately.
|
||||
2. Log a warning and enter a "Recovery Mode".
|
||||
3. Wait for 1 minute.
|
||||
4. Periodically check database connectivity (every 1 minute).
|
||||
5. Once the database is reachable, resume the Kafka consumer.
|
||||
|
||||
## Technical Details
|
||||
|
||||
### Configuration
|
||||
- No new environment variables are strictly required, but `KAFKA_AUTO_COMMIT` could be forced to `false` or removed if we enforce this behavior.
|
||||
- Retry interval (60 seconds) can be a constant or a config.
|
||||
|
||||
### Implementation Steps
|
||||
1. Modify `src/kafka/consumer.js`:
|
||||
- Change `autoCommit` to `false`.
|
||||
- Update the message processing flow to await the `onMessage` handler.
|
||||
- Call `consumer.commit()` explicitly after successful processing.
|
||||
- Add logic to handle errors from `onMessage`. If it's a DB connection error, trigger the pause/retry loop.
|
||||
2. Update `src/db/databaseManager.js` (Optional but helpful):
|
||||
- Ensure it exposes a method to check connectivity (e.g., `testConnection()`) for the recovery loop.
|
||||
|
||||
## Impact
|
||||
|
||||
- **Reliability**: drastically improved. Zero data loss guarantee for DB outages.
|
||||
- **Performance**: Slight overhead due to manual commits (can be batched if needed, but per-message or small batch is safer for now).
|
||||
- **Operations**: System will self-recover from DB maintenance or crashes.
|
||||
@@ -0,0 +1,39 @@
|
||||
# Phase 2: Optimization and Fixes
|
||||
|
||||
- **Status**: Completed
|
||||
- **Author**: AI Assistant
|
||||
- **Created**: 2026-02-04
|
||||
|
||||
## Context
|
||||
|
||||
Following the initial stabilization, several issues were identified:
|
||||
1. **Missing Command Support**: The system did not recognize command word `0x0E`, which shares the same structure as `0x36`.
|
||||
2. **Bootstrap Instability**: On Windows, restarting the service frequently caused `EADDRINUSE` errors when connecting to PostgreSQL due to ephemeral port exhaustion.
|
||||
3. **Performance Bottleneck**: The Kafka consumer could not keep up with the backlog using single-row inserts and low parallelism, and scaling horizontal instances was restricted.
|
||||
|
||||
## Implemented Changes
|
||||
|
||||
### 1. 0x0E Command Support
|
||||
- **Goal**: Enable processing of `0x0E` command word.
|
||||
- **Implementation**:
|
||||
- Updated `resolveMessageType` in `src/processor/index.js` to map `0x0E` to the same handler as `0x36`.
|
||||
- Added unit tests in `tests/processor.test.js` to verify `0x0E` parsing for status and fault reports.
|
||||
|
||||
### 2. Bootstrap Retry Logic
|
||||
- **Goal**: Prevent service startup failure due to transient port conflicts.
|
||||
- **Implementation**:
|
||||
- Modified `src/db/initializer.js` to catch `EADDRINUSE` errors during the initial database connection.
|
||||
- Added a retry mechanism: max 5 retries with 1-second backoff.
|
||||
|
||||
### 3. High Throughput Optimization (Batch Processing)
|
||||
- **Goal**: Resolve Kafka backlog without adding more service instances.
|
||||
- **Implementation**:
|
||||
- **Batch Processor**: Created `src/db/batchProcessor.js` to buffer messages in memory.
|
||||
- **Strategy**: Messages are flushed to DB when buffer size reaches 500 or every 1 second.
|
||||
- **Config Update**: Increased default `KAFKA_MAX_IN_FLIGHT` from 50 to 500 in `src/config/config.js` to align with batch size.
|
||||
- **Integration**: Refactored `src/index.js` and `src/processor/index.js` to decouple parsing from insertion, allowing `BatchProcessor` to handle the write operations.
|
||||
|
||||
## Impact
|
||||
- **Throughput**: Significantly increased database write throughput via batching.
|
||||
- **Reliability**: Service is resilient to port conflicts on restart.
|
||||
- **Functionality**: `0x0E` messages are now correctly processed and stored.
|
||||
@@ -12,6 +12,9 @@ Backend service for processing RCU action events from Kafka, parsing them, and s
|
||||
- **Error Handling**: Redis List (`error_queue`) for failed messages + Retry mechanism
|
||||
- **Output**: PostgreSQL Table (`rcu_action_events`)
|
||||
|
||||
## Features
|
||||
- **Loop Name Enrichment**: Enriches event data with `loop_name` by matching `device_id` and `dev_addr` against metadata cached from `temporary_project` tables (refreshed daily).
|
||||
|
||||
## Configuration (Environment Variables)
|
||||
The project is configured via `.env`. Key variables:
|
||||
- **Kafka**: `KAFKA_BROKERS`, `KAFKA_TOPIC`, `KAFKA_SASL_USERNAME`, `KAFKA_SASL_PASSWORD`
|
||||
|
||||
Reference in New Issue
Block a user