feat: 外置数据库初始化与分区管理功能

- 删除主服务中的数据库初始化与分区管理逻辑,降低复杂度。
- 新增 SQL 脚本用于数据库初始化和分区管理,集中在 SQL_Script 目录。
- 移除环境变量 ENABLE_DATABASE_INITIALIZATION,简化配置。
- 更新 package.json,新增数据库初始化和分区管理的 npm 脚本。
- 删除不再使用的初始化和分区管理相关文件。
- 提供统一的命令行接口,支持外部调用数据库初始化和分区创建。
This commit is contained in:
2026-03-04 14:54:27 +08:00
parent 677db35cdf
commit f61a63d8c1
18 changed files with 483 additions and 397 deletions

File diff suppressed because one or more lines are too long

View File

@@ -41,8 +41,3 @@ REDIS_API_BASE_URL=http://localhost:3000
# ROOM_STATUS_DB_TABLE=room_status_moment
ENABLE_LOOP_NAME_AUTO_GENERATION=true
# Database Initialization Configuration
# Set to 'false' to skip automatic database creation, schema setup, and partition management
# When disabled, the service will start consuming Kafka messages and writing to existing database immediately
# Default: true (enable database initialization)
ENABLE_DATABASE_INITIALIZATION=true

View File

@@ -7,7 +7,12 @@
"dev": "node src/index.js",
"build": "vite build --ssr src/index.js --outDir dist",
"test": "vitest run",
"start": "node dist/index.js"
"start": "node dist/index.js",
"db:init:all": "node ../SQL_Script/db_manager.js init-all",
"db:init:rcu-action": "node ../SQL_Script/db_manager.js init-rcu",
"db:init:room-status": "node ../SQL_Script/db_manager.js init-room-status",
"db:partition:rcu-action": "node ../SQL_Script/db_manager.js partition-rcu",
"db:partition:room-status": "node ../SQL_Script/db_manager.js partition-room-status"
},
"dependencies": {
"dotenv": "^16.4.5",

View File

@@ -1,80 +0,0 @@
-- Database Initialization Script for BLS RCU Action Server
-- 描述:创建 rcu_action 模式及 rcu_action_events 分区表,用于存储 RCU 通讯日志流水
CREATE SCHEMA IF NOT EXISTS rcu_action;
CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
guid VARCHAR(32) NOT NULL,
ts_ms BIGINT NOT NULL,
write_ts_ms BIGINT NOT NULL,
hotel_id INTEGER NOT NULL,
room_id VARCHAR(32) NOT NULL,
device_id VARCHAR(32) NOT NULL,
direction VARCHAR(10) NOT NULL,
cmd_word VARCHAR(10) NOT NULL,
frame_id INTEGER NOT NULL,
udp_raw TEXT NOT NULL,
action_type VARCHAR(20) NOT NULL,
sys_lock_status SMALLINT,
report_count SMALLINT,
dev_type SMALLINT,
dev_addr SMALLINT,
dev_loop INTEGER,
dev_data INTEGER,
fault_count SMALLINT,
error_type SMALLINT,
error_data SMALLINT,
type_l SMALLINT,
type_h SMALLINT,
details JSONB,
extra JSONB,
loop_name VARCHAR(255),
PRIMARY KEY (ts_ms, guid)
) PARTITION BY RANGE (ts_ms);
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
ALTER TABLE rcu_action.rcu_action_events
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
-- Indexes for performance (ONLY on parent partitioned table)
-- PostgreSQL will create/attach corresponding child-partition indexes automatically.
-- Do not create duplicated indexes on partition child tables.
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);
CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);
CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);
CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);
-- Composite Index for typical query pattern (Hotel + Room + Time)
CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);
-- Column Comments
COMMENT ON TABLE rcu_action.rcu_action_events IS 'RCU 通讯日志流水表 - 存储从 Kafka 消费的 RCU 设备上报/下发/ACK 事件';
COMMENT ON COLUMN rcu_action.rcu_action_events.guid IS '主键32位无横线 UUID';
COMMENT ON COLUMN rcu_action.rcu_action_events.ts_ms IS '日志产生时间戳(毫秒),同时用作分区键';
COMMENT ON COLUMN rcu_action.rcu_action_events.write_ts_ms IS '入库时间戳(毫秒),由后端服务写入时生成';
COMMENT ON COLUMN rcu_action.rcu_action_events.hotel_id IS '酒店 ID';
COMMENT ON COLUMN rcu_action.rcu_action_events.room_id IS '房间 ID';
COMMENT ON COLUMN rcu_action.rcu_action_events.device_id IS 'RCU 设备 ID主板编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.direction IS '数据方向:上报 / 下发';
COMMENT ON COLUMN rcu_action.rcu_action_events.cmd_word IS '命令字,如 0x36状态上报、0x0F控制下发/ACK';
COMMENT ON COLUMN rcu_action.rcu_action_events.frame_id IS '通讯帧号,用于串联同一次通讯的命令与状态';
COMMENT ON COLUMN rcu_action.rcu_action_events.udp_raw IS 'UDP 消息原文base64 编码)';
COMMENT ON COLUMN rcu_action.rcu_action_events.action_type IS '记录行为类型:用户操作 / 设备回路状态 / 下发控制 / 0FACK / 无效';
COMMENT ON COLUMN rcu_action.rcu_action_events.sys_lock_status IS '系统锁状态0=未锁定, 1=锁定(仅 0x36 上报)';
COMMENT ON COLUMN rcu_action.rcu_action_events.report_count IS '本次上报设备数量(对应 device_list 长度)';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_type IS '设备类型编号,拆分自 device_list/fault_list/control_list';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_addr IS '设备地址编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_loop IS '设备回路编号';
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_data IS '设备状态数值(仅 0x36 状态上报)';
COMMENT ON COLUMN rcu_action.rcu_action_events.fault_count IS '本次故障设备数量(对应 fault_list 长度)';
COMMENT ON COLUMN rcu_action.rcu_action_events.error_type IS '故障类型0x01=在线/离线, 0x02=电量, 0x03=电流 等';
COMMENT ON COLUMN rcu_action.rcu_action_events.error_data IS '故障内容数据(含义取决于 error_type';
COMMENT ON COLUMN rcu_action.rcu_action_events.type_l IS '执行方式(仅 0x0F 下发控制)';
COMMENT ON COLUMN rcu_action.rcu_action_events.type_h IS '执行内容(仅 0x0F 下发控制)';
COMMENT ON COLUMN rcu_action.rcu_action_events.details IS '业务详情 JSONB存储完整的 device_list / fault_list / control_list';
COMMENT ON COLUMN rcu_action.rcu_action_events.extra IS '扩展信息 JSONB存储上游传入的附加字段';
COMMENT ON COLUMN rcu_action.rcu_action_events.loop_name IS '回路名称:通过 device_id → room_type_id → loop_address 查询获得';

View File

@@ -64,6 +64,5 @@ export const config = {
schema: process.env.ROOM_STATUS_DB_SCHEMA || 'room_status',
table: process.env.ROOM_STATUS_DB_TABLE || 'room_status_moment'
},
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true',
enableDatabaseInitialization: process.env.ENABLE_DATABASE_INITIALIZATION !== 'false'
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
};

View File

@@ -1,110 +0,0 @@
import pg from 'pg';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import { logger } from '../utils/logger.js';
import partitionManager from './partitionManager.js';
import dbManager from './databaseManager.js';
import { config } from '../config/config.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
class DatabaseInitializer {
async initialize() {
logger.info('Starting database initialization check...');
// 1. Check if database exists, create if not
await this.ensureDatabaseExists();
// 2. Initialize Schema and Parent Table (if not exists)
// Note: We need to use dbManager because it connects to the target database
await this.ensureSchemaAndTable();
// 3. Ensure Partitions for the next month
await partitionManager.ensurePartitions(30);
logger.info('Database initialization completed successfully.');
}
async ensureDatabaseExists() {
const { host, port, user, password, database, ssl } = config.db;
// Connect to 'postgres' database to check/create target database
const client = new pg.Client({
host,
port,
user,
password,
database: 'postgres',
ssl: ssl ? { rejectUnauthorized: false } : false
});
const maxRetries = 5;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
await client.connect();
break;
} catch (err) {
if (err.code === 'EADDRINUSE') {
retryCount++;
logger.warn(`Port conflict (EADDRINUSE) connecting to database, retrying (${retryCount}/${maxRetries})...`);
await new Promise(resolve => setTimeout(resolve, 1000));
} else {
throw err;
}
}
}
try {
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]
);
if (checkRes.rowCount === 0) {
logger.info(`Database '${database}' does not exist. Creating...`);
// CREATE DATABASE cannot run inside a transaction block
await client.query(`CREATE DATABASE "${database}"`);
logger.info(`Database '${database}' created.`);
} else {
logger.info(`Database '${database}' already exists.`);
}
} catch (err) {
logger.error('Error ensuring database exists:', err);
throw err;
} finally {
await client.end();
}
}
async ensureSchemaAndTable() {
// dbManager connects to the target database
const client = await dbManager.pool.connect();
try {
const sqlPathCandidates = [
path.resolve(process.cwd(), 'scripts/init_db.sql'),
path.resolve(__dirname, '../scripts/init_db.sql'),
path.resolve(__dirname, '../../scripts/init_db.sql')
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
}
const sql = fs.readFileSync(sqlPath, 'utf8');
logger.info('Executing init_db.sql...');
await client.query(sql);
logger.info('Schema and parent table initialized.');
} catch (err) {
logger.error('Error initializing schema and table:', err);
throw err;
} finally {
client.release();
}
}
}
export default new DatabaseInitializer();

View File

@@ -1,91 +0,0 @@
import { logger } from '../utils/logger.js';
import dbManager from './databaseManager.js';
const PARENT_TABLE = 'rcu_action.rcu_action_events';
const PARTITION_TABLESPACE = 'ts_hot';
const PARENT_INDEX_STATEMENTS = [
'CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);',
'CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);'
];
class PartitionManager {
async ensureParentIndexes(client) {
for (const sql of PARENT_INDEX_STATEMENTS) {
await client.query(sql);
}
}
/**
* Calculate the start and end timestamps (milliseconds) for a given date.
* @param {Date} date - The date to calculate for.
* @returns {Object} { startMs, endMs, partitionSuffix }
*/
getPartitionInfo(date) {
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, '0');
const dd = String(date.getDate()).padStart(2, '0');
const partitionSuffix = `${yyyy}${mm}${dd}`;
const start = new Date(date);
start.setHours(0, 0, 0, 0);
const startMs = start.getTime();
const end = new Date(date);
end.setDate(end.getDate() + 1);
end.setHours(0, 0, 0, 0);
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
/**
* Ensure partitions exist for the next N days.
* @param {number} daysAhead - Number of days to pre-create.
*/
async ensurePartitions(daysAhead = 30) {
const client = await dbManager.pool.connect();
try {
logger.info(`Starting partition check for the next ${daysAhead} days...`);
await this.ensureParentIndexes(client);
const now = new Date();
for (let i = 0; i < daysAhead; i++) {
const targetDate = new Date(now);
targetDate.setDate(now.getDate() + i);
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
const partitionName = `rcu_action.rcu_action_events_${partitionSuffix}`;
// Check if partition exists
const checkSql = `
SELECT to_regclass($1) as exists;
`;
const checkRes = await client.query(checkSql, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const createSql = `
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${PARENT_TABLE}
FOR VALUES FROM (${startMs}) TO (${endMs})
TABLESPACE ${PARTITION_TABLESPACE};
`;
await client.query(createSql);
}
}
logger.info('Partition check completed.');
} catch (err) {
logger.error('Error ensuring partitions:', err);
throw err;
} finally {
client.release();
}
}
}
export default new PartitionManager();

View File

@@ -3,11 +3,10 @@
*
* Manages an independent PostgreSQL connection pool for
* the room_status.room_status_moment snapshot table.
* Provides batch upsert with JSONB merge and auto-partition creation.
* Provides batch upsert with JSONB merge.
*/
import pg from 'pg';
import { randomUUID } from 'crypto';
import { logger } from '../utils/logger.js';
const { Pool } = pg;
@@ -28,8 +27,6 @@ export class RoomStatusManager {
this.schema = dbConfig.schema;
this.table = dbConfig.table;
this.fullTableName = `${this.schema}.${this.table}`;
// Track which partitions we have already ensured
this.knownPartitions = new Set();
}
/**
@@ -41,15 +38,6 @@ export class RoomStatusManager {
*/
async upsertBatch(rows) {
if (!rows || rows.length === 0) return;
// Pre-ensure all needed partitions exist before attempting upsert
const newHotelIds = [...new Set(rows.map(r => r.hotel_id))]
.filter(id => !this.knownPartitions.has(id));
if (newHotelIds.length > 0) {
await this._ensurePartitionsBatch(newHotelIds);
}
await this._doUpsert(rows);
}
@@ -95,54 +83,6 @@ export class RoomStatusManager {
await this.pool.query(sql, values);
}
/**
* Check if an error is a missing partition error.
*/
_isPartitionMissingError(error) {
const msg = error?.message || '';
return msg.includes('no partition') || msg.includes('routing') ||
(error?.code === '23514' && msg.includes('partition'));
}
/**
* Batch-create LIST partitions for multiple hotel_ids in a single connection.
* Uses CREATE TABLE IF NOT EXISTS (idempotent) — no check query needed.
*/
async _ensurePartitionsBatch(hotelIds) {
const client = await this.pool.connect();
try {
for (const hotelId of hotelIds) {
const partitionName = `${this.schema}.${this.table}_h${hotelId}`;
try {
await client.query(
`CREATE TABLE IF NOT EXISTS ${partitionName} PARTITION OF ${this.fullTableName} FOR VALUES IN (${hotelId})`
);
this.knownPartitions.add(hotelId);
} catch (err) {
// Partition may already exist (race condition) — safe to ignore
if (!err.message?.includes('already exists')) {
logger.error('Error creating partition', { error: err?.message, hotelId });
}
this.knownPartitions.add(hotelId);
}
}
if (hotelIds.length > 0) {
logger.info(`Ensured ${hotelIds.length} room_status partitions`);
}
} finally {
client.release();
}
}
/**
* Ensure a LIST partition exists for the given hotel_id (single).
*/
async ensurePartition(hotelId) {
if (this.knownPartitions.has(hotelId)) return;
await this._ensurePartitionsBatch([hotelId]);
}
async testConnection() {
try {
await this.pool.query('SELECT 1');

View File

@@ -1,8 +1,6 @@
import cron from 'node-cron';
import { config } from './config/config.js';
import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js';
import partitionManager from './db/partitionManager.js';
import projectMetadata from './cache/projectMetadata.js';
import { createKafkaConsumers } from './kafka/consumer.js';
import { processKafkaMessage } from './processor/index.js';
@@ -17,52 +15,12 @@ import { logger } from './utils/logger.js';
import { BatchProcessor } from './db/batchProcessor.js';
const bootstrap = async () => {
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
// Only execute initialization if enabled (default: true)
if (config.enableDatabaseInitialization) {
logger.info('Database initialization is enabled. Starting initialization...');
await dbInitializer.initialize();
} else {
logger.info('Database initialization is disabled. Skipping database initialization, schema creation, and partition setup.');
}
// 0.1 Initialize Project Metadata Cache
// 0. Initialize Project Metadata Cache
await projectMetadata.init();
// Metric Collector
const metricCollector = new MetricCollector();
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
// Only setup partition maintenance cron if database initialization is enabled
if (config.enableDatabaseInitialization) {
cron.schedule('0 0 * * *', async () => {
logger.info('Running scheduled partition maintenance...');
try {
await partitionManager.ensurePartitions(30);
} catch (err) {
logger.error('Scheduled partition maintenance failed', err);
}
});
} else {
logger.info('Partition maintenance cron job is disabled (database initialization is disabled).');
}
// 1.1 Setup Metric Reporting Cron Job (Every minute)
// Moved after redisIntegration initialization
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
// In databaseManager.js it exports `dbManager` instance by default.
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
// Let's check `databaseManager.js` content.
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
// If `databaseManager.js` exports an instance as default, I should use that.
// If it exports a class, I should instantiate it.
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
// I need to verify `databaseManager.js`.
const redisClient = await createRedisClient(config.redis);
const redisIntegration = new RedisIntegration(
redisClient,
@@ -80,7 +38,7 @@ const bootstrap = async () => {
});
logger.info('Room Status sync pipeline initialized');
// 1.1 Setup Metric Reporting Cron Job (Every minute)
// 1. Setup Metric Reporting Cron Job (Every minute)
cron.schedule('* * * * *', async () => {
const metrics = metricCollector.getAndReset();
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`;