Compare commits
10 Commits
9165ce0bd0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ea0effb0b | |||
| 04099e49e2 | |||
| c5acfbf47b | |||
| 7fab70ec2b | |||
| f61a63d8c1 | |||
| 677db35cdf | |||
| a8e8913ea5 | |||
| 21bc56fe6c | |||
| 9a5e5d8c0d | |||
| 603055797b |
58
SQL_Script/README.md
Normal file
58
SQL_Script/README.md
Normal file
@@ -0,0 +1,58 @@
|
||||
# SQL_Script
|
||||
|
||||
用于**独立于业务服务**的数据库初始化与分区管理。
|
||||
|
||||
> 目标:主服务 `bls-rcu-action-backend` 只负责消费 Kafka 与写库,不再承担任何建库/建表/建分区职责。
|
||||
|
||||
## 文件说明
|
||||
|
||||
- `init_rcu_action.sql`
|
||||
- 初始化 `rcu_action.rcu_action_events` 主表与索引
|
||||
- `init_room_status.sql`
|
||||
- 初始化 `room_status.room_status_moment` 主表与索引
|
||||
- `partition_rcu_action.sql`
|
||||
- `rcu_action` 按天 RANGE 分区 SQL 模板
|
||||
- `partition_room_status.sql`
|
||||
- `room_status` 按 `hotel_id` LIST 分区 SQL 模板
|
||||
- `db_manager.js`
|
||||
- Node 调用入口(CLI + 可 import)
|
||||
|
||||
## 环境变量
|
||||
|
||||
与主服务统一:
|
||||
|
||||
- `DB_HOST` / `POSTGRES_HOST`
|
||||
- `DB_PORT` / `POSTGRES_PORT`
|
||||
- `DB_USER` / `POSTGRES_USER`
|
||||
- `DB_PASSWORD` / `POSTGRES_PASSWORD`
|
||||
- `DB_DATABASE` / `POSTGRES_DATABASE`
|
||||
- `DB_SSL=true|false`
|
||||
- `DB_ADMIN_DATABASE`(可选,默认 `postgres`)
|
||||
|
||||
## 命令行调用
|
||||
|
||||
在 `bls-rcu-action-backend` 目录执行:
|
||||
|
||||
- `npm run db:init:all`
|
||||
- 创建数据库(若不存在)+ 初始化两套主表
|
||||
- `npm run db:init:rcu-action`
|
||||
- `npm run db:init:room-status`
|
||||
- `npm run db:partition:rcu-action`
|
||||
- 默认预建未来 30 天分区
|
||||
- `npm run db:partition:room-status -- 1001`
|
||||
- 为 hotel_id=1001 建分区
|
||||
|
||||
## 其他程序直接 import
|
||||
|
||||
```js
|
||||
import {
|
||||
initAll,
|
||||
ensureDatabase,
|
||||
ensureRcuPartitions,
|
||||
ensureRoomStatusPartition
|
||||
} from '../SQL_Script/db_manager.js';
|
||||
|
||||
await initAll();
|
||||
await ensureRcuPartitions(45);
|
||||
await ensureRoomStatusPartition(1001);
|
||||
```
|
||||
159
SQL_Script/db_manager.js
Normal file
159
SQL_Script/db_manager.js
Normal file
@@ -0,0 +1,159 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import pg from 'pg';
|
||||
import { fileURLToPath } from 'url';
|
||||
|
||||
const { Client } = pg;
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
const scriptDir = __dirname;
|
||||
|
||||
const parseNumber = (value, defaultValue) => {
|
||||
const parsed = Number(value);
|
||||
return Number.isFinite(parsed) ? parsed : defaultValue;
|
||||
};
|
||||
|
||||
const dbConfig = {
|
||||
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
||||
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
||||
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
||||
password: process.env.DB_PASSWORD || process.env.POSTGRES_PASSWORD || '',
|
||||
database: process.env.DB_DATABASE || process.env.POSTGRES_DATABASE || 'bls_rcu_action',
|
||||
ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: false } : undefined
|
||||
};
|
||||
|
||||
const withClient = async (runner) => {
|
||||
const client = new Client(dbConfig);
|
||||
await client.connect();
|
||||
try {
|
||||
await runner(client);
|
||||
} finally {
|
||||
await client.end();
|
||||
}
|
||||
};
|
||||
|
||||
const executeSqlFile = async (client, fileName) => {
|
||||
const filePath = path.join(scriptDir, fileName);
|
||||
const sql = fs.readFileSync(filePath, 'utf8');
|
||||
await client.query(sql);
|
||||
};
|
||||
|
||||
export const ensureDatabase = async () => {
|
||||
const adminClient = new Client({
|
||||
...dbConfig,
|
||||
database: process.env.DB_ADMIN_DATABASE || 'postgres'
|
||||
});
|
||||
await adminClient.connect();
|
||||
try {
|
||||
const targetDb = dbConfig.database;
|
||||
const check = await adminClient.query('SELECT 1 FROM pg_database WHERE datname = $1', [targetDb]);
|
||||
if (check.rowCount === 0) {
|
||||
await adminClient.query(`CREATE DATABASE "${targetDb}"`);
|
||||
console.log(`[SQL_Script] created database: ${targetDb}`);
|
||||
} else {
|
||||
console.log(`[SQL_Script] database exists: ${targetDb}`);
|
||||
}
|
||||
} finally {
|
||||
await adminClient.end();
|
||||
}
|
||||
};
|
||||
|
||||
const toPartitionSuffix = (date) => {
|
||||
const yyyy = date.getFullYear();
|
||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(date.getDate()).padStart(2, '0');
|
||||
return `${yyyy}${mm}${dd}`;
|
||||
};
|
||||
|
||||
const getDayRange = (date) => {
|
||||
const start = new Date(date);
|
||||
start.setHours(0, 0, 0, 0);
|
||||
const end = new Date(start);
|
||||
end.setDate(end.getDate() + 1);
|
||||
return { startMs: start.getTime(), endMs: end.getTime() };
|
||||
};
|
||||
|
||||
export const ensureRcuPartitions = async (daysAhead = 30) => {
|
||||
const tpl = fs.readFileSync(path.join(scriptDir, 'partition_rcu_action.sql'), 'utf8');
|
||||
await withClient(async (client) => {
|
||||
const now = new Date();
|
||||
for (let i = 0; i < daysAhead; i++) {
|
||||
const d = new Date(now);
|
||||
d.setDate(now.getDate() + i);
|
||||
const suffix = toPartitionSuffix(d);
|
||||
const partitionName = `rcu_action.rcu_action_events_${suffix}`;
|
||||
const { startMs, endMs } = getDayRange(d);
|
||||
|
||||
const sql = tpl
|
||||
.replaceAll('{partition_name}', partitionName)
|
||||
.replaceAll('{start_ms}', String(startMs))
|
||||
.replaceAll('{end_ms}', String(endMs));
|
||||
|
||||
await client.query(sql);
|
||||
}
|
||||
});
|
||||
console.log(`[SQL_Script] ensured rcu_action partitions for ${daysAhead} days`);
|
||||
};
|
||||
|
||||
export const ensureRoomStatusPartition = async (hotelId) => {
|
||||
if (!Number.isFinite(Number(hotelId))) {
|
||||
throw new Error('hotelId is required and must be a number');
|
||||
}
|
||||
const tpl = fs.readFileSync(path.join(scriptDir, 'partition_room_status.sql'), 'utf8');
|
||||
const sql = tpl.replaceAll('{hotel_id}', String(hotelId));
|
||||
|
||||
await withClient(async (client) => {
|
||||
await client.query(sql);
|
||||
});
|
||||
|
||||
console.log(`[SQL_Script] ensured room_status partition for hotel_id=${hotelId}`);
|
||||
};
|
||||
|
||||
export const initAll = async () => {
|
||||
await ensureDatabase();
|
||||
await withClient(async (client) => {
|
||||
await executeSqlFile(client, 'init_rcu_action.sql');
|
||||
await executeSqlFile(client, 'init_room_status.sql');
|
||||
});
|
||||
console.log('[SQL_Script] initialized schemas and tables');
|
||||
};
|
||||
|
||||
const run = async () => {
|
||||
const cmd = process.argv[2];
|
||||
|
||||
if (!cmd) {
|
||||
throw new Error('missing command: init-all | init-rcu | init-room-status | partition-rcu [days] | partition-room-status <hotelId>');
|
||||
}
|
||||
|
||||
switch (cmd) {
|
||||
case 'init-all':
|
||||
await initAll();
|
||||
break;
|
||||
case 'init-rcu':
|
||||
await withClient((client) => executeSqlFile(client, 'init_rcu_action.sql'));
|
||||
console.log('[SQL_Script] initialized rcu_action schema/table');
|
||||
break;
|
||||
case 'init-room-status':
|
||||
await withClient((client) => executeSqlFile(client, 'init_room_status.sql'));
|
||||
console.log('[SQL_Script] initialized room_status schema/table');
|
||||
break;
|
||||
case 'partition-rcu': {
|
||||
const days = parseNumber(process.argv[3], 30);
|
||||
await ensureRcuPartitions(days);
|
||||
break;
|
||||
}
|
||||
case 'partition-room-status': {
|
||||
const hotelId = process.argv[3];
|
||||
await ensureRoomStatusPartition(hotelId);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error(`unsupported command: ${cmd}`);
|
||||
}
|
||||
};
|
||||
|
||||
run().catch((err) => {
|
||||
console.error('[SQL_Script] failed:', err?.message || err);
|
||||
process.exit(1);
|
||||
});
|
||||
47
SQL_Script/init_rcu_action.sql
Normal file
47
SQL_Script/init_rcu_action.sql
Normal file
@@ -0,0 +1,47 @@
|
||||
-- SQL_Script/init_rcu_action.sql
|
||||
-- RCU Action 主业务表初始化(不包含 CREATE DATABASE)
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS rcu_action;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
|
||||
guid VARCHAR(32) NOT NULL,
|
||||
ts_ms BIGINT NOT NULL,
|
||||
write_ts_ms BIGINT NOT NULL,
|
||||
hotel_id INTEGER NOT NULL,
|
||||
room_id VARCHAR(32) NOT NULL,
|
||||
device_id VARCHAR(32) NOT NULL,
|
||||
direction VARCHAR(10) NOT NULL,
|
||||
cmd_word VARCHAR(10) NOT NULL,
|
||||
frame_id INTEGER NOT NULL,
|
||||
udp_raw TEXT NOT NULL,
|
||||
action_type VARCHAR(20) NOT NULL,
|
||||
sys_lock_status SMALLINT,
|
||||
report_count SMALLINT,
|
||||
dev_type SMALLINT,
|
||||
dev_addr SMALLINT,
|
||||
dev_loop INTEGER,
|
||||
dev_data INTEGER,
|
||||
fault_count SMALLINT,
|
||||
error_type SMALLINT,
|
||||
error_data SMALLINT,
|
||||
type_l SMALLINT,
|
||||
type_h SMALLINT,
|
||||
details JSONB,
|
||||
extra JSONB,
|
||||
loop_name VARCHAR(255),
|
||||
PRIMARY KEY (ts_ms, guid)
|
||||
) PARTITION BY RANGE (ts_ms);
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);
|
||||
66
SQL_Script/init_room_status.sql
Normal file
66
SQL_Script/init_room_status.sql
Normal file
@@ -0,0 +1,66 @@
|
||||
-- SQL_Script/init_room_status.sql
|
||||
-- Room Status 快照表初始化(不包含 CREATE DATABASE)
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS room_status;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS room_status.room_status_moment (
|
||||
guid UUID NOT NULL,
|
||||
ts_ms INT8 NOT NULL DEFAULT (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::BIGINT,
|
||||
hotel_id INT2 NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
|
||||
sys_lock_status INT2,
|
||||
online_status INT2,
|
||||
launcher_version TEXT,
|
||||
app_version TEXT,
|
||||
config_version TEXT,
|
||||
register_ts_ms INT8,
|
||||
upgrade_ts_ms INT8,
|
||||
config_ts_ms INT8,
|
||||
ip TEXT,
|
||||
|
||||
pms_status INT2,
|
||||
power_state INT2,
|
||||
cardless_state INT2,
|
||||
service_mask INT8,
|
||||
insert_card INT2,
|
||||
bright_g INT2,
|
||||
agreement_ver TEXT,
|
||||
|
||||
air_address TEXT[],
|
||||
air_state INT2[],
|
||||
air_model INT2[],
|
||||
air_speed INT2[],
|
||||
air_set_temp INT2[],
|
||||
air_now_temp INT2[],
|
||||
air_solenoid_valve INT2[],
|
||||
|
||||
elec_address TEXT[],
|
||||
elec_voltage DOUBLE PRECISION[],
|
||||
elec_ampere DOUBLE PRECISION[],
|
||||
elec_power DOUBLE PRECISION[],
|
||||
elec_phase DOUBLE PRECISION[],
|
||||
elec_energy DOUBLE PRECISION[],
|
||||
elec_sum_energy DOUBLE PRECISION[],
|
||||
|
||||
carbon_state INT2,
|
||||
dev_loops JSONB,
|
||||
energy_carbon_sum DOUBLE PRECISION,
|
||||
energy_nocard_sum DOUBLE PRECISION,
|
||||
external_device JSONB DEFAULT '{}',
|
||||
faulty_device_count JSONB DEFAULT '{}',
|
||||
|
||||
PRIMARY KEY (hotel_id, room_id, device_id, guid)
|
||||
) PARTITION BY LIST (hotel_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_hotel_room ON room_status.room_status_moment (hotel_id, room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_device_id ON room_status.room_status_moment (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_sys_lock ON room_status.room_status_moment (sys_lock_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_online ON room_status.room_status_moment (online_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_pms ON room_status.room_status_moment (pms_status);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_power ON room_status.room_status_moment (power_state);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_cardless ON room_status.room_status_moment (cardless_state);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_insert_card ON room_status.room_status_moment (insert_card);
|
||||
CREATE INDEX IF NOT EXISTS idx_room_status_moment_carbon ON room_status.room_status_moment (carbon_state);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_room_status_unique_device ON room_status.room_status_moment (hotel_id, room_id, device_id);
|
||||
14
SQL_Script/partition_rcu_action.sql
Normal file
14
SQL_Script/partition_rcu_action.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
-- SQL_Script/partition_rcu_action.sql
|
||||
-- 说明:此文件提供“按天分区”的 SQL 模板。
|
||||
-- 其他程序可用参数替换后执行:{partition_name} {start_ms} {end_ms}
|
||||
|
||||
-- 示例:
|
||||
-- CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events_20260304
|
||||
-- PARTITION OF rcu_action.rcu_action_events
|
||||
-- FOR VALUES FROM (1741046400000) TO (1741132800000)
|
||||
-- TABLESPACE ts_hot;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {partition_name}
|
||||
PARTITION OF rcu_action.rcu_action_events
|
||||
FOR VALUES FROM ({start_ms}) TO ({end_ms})
|
||||
TABLESPACE ts_hot;
|
||||
12
SQL_Script/partition_room_status.sql
Normal file
12
SQL_Script/partition_room_status.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
-- SQL_Script/partition_room_status.sql
|
||||
-- 说明:此文件提供 room_status 按 hotel_id LIST 分区 SQL 模板。
|
||||
-- 其他程序可用参数替换后执行:{hotel_id}
|
||||
|
||||
-- 示例:
|
||||
-- CREATE TABLE IF NOT EXISTS room_status.room_status_moment_h1001
|
||||
-- PARTITION OF room_status.room_status_moment
|
||||
-- FOR VALUES IN (1001);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS room_status.room_status_moment_h{hotel_id}
|
||||
PARTITION OF room_status.room_status_moment
|
||||
FOR VALUES IN ({hotel_id});
|
||||
File diff suppressed because one or more lines are too long
@@ -40,4 +40,15 @@ REDIS_API_BASE_URL=http://localhost:3000
|
||||
# ROOM_STATUS_DB_SCHEMA=room_status
|
||||
# ROOM_STATUS_DB_TABLE=room_status_moment
|
||||
|
||||
# G5 Room Status DB Configuration (optional)
|
||||
# ENABLE_G5_SYNC=true
|
||||
# POSTGRES_HOST_G5=10.8.8.80
|
||||
# POSTGRES_PORT_G5=5434
|
||||
# POSTGRES_DATABASE_G5=log_platform
|
||||
# POSTGRES_USER_G5=log_admin
|
||||
# POSTGRES_PASSWORD_G5=your-password
|
||||
# ROOM_STATUS_DB_SCHEMA_G5=room_status
|
||||
# ROOM_STATUS_DB_TABLE_G5=room_status_moment_g5
|
||||
# ROOM_STATUS_DB_MAX_CONNECTIONS_G5=5
|
||||
|
||||
ENABLE_LOOP_NAME_AUTO_GENERATION=true
|
||||
|
||||
@@ -7,7 +7,12 @@
|
||||
"dev": "node src/index.js",
|
||||
"build": "vite build --ssr src/index.js --outDir dist",
|
||||
"test": "vitest run",
|
||||
"start": "node dist/index.js"
|
||||
"start": "node dist/index.js",
|
||||
"db:init:all": "node ../SQL_Script/db_manager.js init-all",
|
||||
"db:init:rcu-action": "node ../SQL_Script/db_manager.js init-rcu",
|
||||
"db:init:room-status": "node ../SQL_Script/db_manager.js init-room-status",
|
||||
"db:partition:rcu-action": "node ../SQL_Script/db_manager.js partition-rcu",
|
||||
"db:partition:room-status": "node ../SQL_Script/db_manager.js partition-room-status"
|
||||
},
|
||||
"dependencies": {
|
||||
"dotenv": "^16.4.5",
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
-- Database Initialization Script for BLS RCU Action Server
|
||||
-- 描述:创建 rcu_action 模式及 rcu_action_events 分区表,用于存储 RCU 通讯日志流水
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS rcu_action;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rcu_action.rcu_action_events (
|
||||
guid VARCHAR(32) NOT NULL,
|
||||
ts_ms BIGINT NOT NULL,
|
||||
write_ts_ms BIGINT NOT NULL,
|
||||
hotel_id INTEGER NOT NULL,
|
||||
room_id VARCHAR(32) NOT NULL,
|
||||
device_id VARCHAR(32) NOT NULL,
|
||||
direction VARCHAR(10) NOT NULL,
|
||||
cmd_word VARCHAR(10) NOT NULL,
|
||||
frame_id INTEGER NOT NULL,
|
||||
udp_raw TEXT NOT NULL,
|
||||
action_type VARCHAR(20) NOT NULL,
|
||||
sys_lock_status SMALLINT,
|
||||
report_count SMALLINT,
|
||||
dev_type SMALLINT,
|
||||
dev_addr SMALLINT,
|
||||
dev_loop INTEGER,
|
||||
dev_data INTEGER,
|
||||
fault_count SMALLINT,
|
||||
error_type SMALLINT,
|
||||
error_data SMALLINT,
|
||||
type_l SMALLINT,
|
||||
type_h SMALLINT,
|
||||
details JSONB,
|
||||
extra JSONB,
|
||||
loop_name VARCHAR(255),
|
||||
PRIMARY KEY (ts_ms, guid)
|
||||
) PARTITION BY RANGE (ts_ms);
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS device_id VARCHAR(32) NOT NULL DEFAULT '';
|
||||
|
||||
ALTER TABLE rcu_action.rcu_action_events
|
||||
ADD COLUMN IF NOT EXISTS loop_name VARCHAR(255);
|
||||
|
||||
-- Indexes for performance (ONLY on parent partitioned table)
|
||||
-- PostgreSQL will create/attach corresponding child-partition indexes automatically.
|
||||
-- Do not create duplicated indexes on partition child tables.
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);
|
||||
|
||||
-- Composite Index for typical query pattern (Hotel + Room + Time)
|
||||
CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);
|
||||
|
||||
-- Column Comments
|
||||
COMMENT ON TABLE rcu_action.rcu_action_events IS 'RCU 通讯日志流水表 - 存储从 Kafka 消费的 RCU 设备上报/下发/ACK 事件';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.guid IS '主键,32位无横线 UUID';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.ts_ms IS '日志产生时间戳(毫秒),同时用作分区键';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.write_ts_ms IS '入库时间戳(毫秒),由后端服务写入时生成';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.hotel_id IS '酒店 ID';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.room_id IS '房间 ID';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.device_id IS 'RCU 设备 ID(主板编号)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.direction IS '数据方向:上报 / 下发';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.cmd_word IS '命令字,如 0x36(状态上报)、0x0F(控制下发/ACK)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.frame_id IS '通讯帧号,用于串联同一次通讯的命令与状态';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.udp_raw IS 'UDP 消息原文(base64 编码)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.action_type IS '记录行为类型:用户操作 / 设备回路状态 / 下发控制 / 0FACK / 无效';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.sys_lock_status IS '系统锁状态:0=未锁定, 1=锁定(仅 0x36 上报)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.report_count IS '本次上报设备数量(对应 device_list 长度)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_type IS '设备类型编号,拆分自 device_list/fault_list/control_list';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_addr IS '设备地址编号';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_loop IS '设备回路编号';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.dev_data IS '设备状态数值(仅 0x36 状态上报)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.fault_count IS '本次故障设备数量(对应 fault_list 长度)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.error_type IS '故障类型:0x01=在线/离线, 0x02=电量, 0x03=电流 等';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.error_data IS '故障内容数据(含义取决于 error_type)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.type_l IS '执行方式(仅 0x0F 下发控制)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.type_h IS '执行内容(仅 0x0F 下发控制)';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.details IS '业务详情 JSONB:存储完整的 device_list / fault_list / control_list';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.extra IS '扩展信息 JSONB:存储上游传入的附加字段';
|
||||
COMMENT ON COLUMN rcu_action.rcu_action_events.loop_name IS '回路名称:通过 device_id → room_type_id → loop_address 查询获得';
|
||||
@@ -35,6 +35,7 @@ export const config = {
|
||||
} : undefined
|
||||
},
|
||||
db: {
|
||||
enabled: process.env.ENABLE_G4_SYNC !== 'false',
|
||||
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
||||
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
||||
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
||||
@@ -45,6 +46,18 @@ export const config = {
|
||||
schema: process.env.DB_SCHEMA || 'rcu_action',
|
||||
table: process.env.DB_TABLE || 'rcu_action_events'
|
||||
},
|
||||
dbG5: {
|
||||
enabled: process.env.ENABLE_G5_SYNC !== 'false',
|
||||
host: process.env.POSTGRES_HOST_G5,
|
||||
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
|
||||
user: process.env.POSTGRES_USER_G5,
|
||||
password: process.env.POSTGRES_PASSWORD_G5,
|
||||
database: process.env.POSTGRES_DATABASE_G5,
|
||||
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS, 10),
|
||||
ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
|
||||
schema: process.env.DB_SCHEMA || 'rcu_action',
|
||||
table: 'rcu_action_events_g5'
|
||||
},
|
||||
redis: {
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: parseNumber(process.env.REDIS_PORT, 6379),
|
||||
@@ -54,6 +67,7 @@ export const config = {
|
||||
apiBaseUrl: process.env.REDIS_API_BASE_URL || `http://localhost:${parseNumber(process.env.PORT, 3000)}`
|
||||
},
|
||||
roomStatusDb: {
|
||||
enabled: process.env.ENABLE_G4_SYNC !== 'false',
|
||||
host: process.env.ROOM_STATUS_DB_HOST || process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
||||
port: parseNumber(process.env.ROOM_STATUS_DB_PORT || process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
||||
user: process.env.ROOM_STATUS_DB_USER || process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
||||
@@ -64,5 +78,21 @@ export const config = {
|
||||
schema: process.env.ROOM_STATUS_DB_SCHEMA || 'room_status',
|
||||
table: process.env.ROOM_STATUS_DB_TABLE || 'room_status_moment'
|
||||
},
|
||||
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true'
|
||||
roomStatusDbG5: {
|
||||
enabled: process.env.ENABLE_G5_SYNC !== 'false',
|
||||
host: process.env.POSTGRES_HOST_G5,
|
||||
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
|
||||
user: process.env.POSTGRES_USER_G5,
|
||||
password: process.env.POSTGRES_PASSWORD_G5,
|
||||
database: process.env.POSTGRES_DATABASE_G5,
|
||||
max: parseNumber(process.env.ROOM_STATUS_DB_MAX_CONNECTIONS_G5 || process.env.ROOM_STATUS_DB_MAX_CONNECTIONS, 5),
|
||||
ssl: process.env.DB_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
|
||||
schema: process.env.ROOM_STATUS_DB_SCHEMA_G5 || 'room_status',
|
||||
table: process.env.ROOM_STATUS_DB_TABLE_G5 || 'room_status_moment_g5'
|
||||
},
|
||||
enableLoopNameAutoGeneration: process.env.ENABLE_LOOP_NAME_AUTO_GENERATION === 'true',
|
||||
writeToG4Action: process.env.WRITE_TO_G4_ACTION !== 'false',
|
||||
writeToG4Status: process.env.WRITE_TO_G4_STATUS !== 'false',
|
||||
writeToG5Action: process.env.WRITE_TO_G5_ACTION !== 'false',
|
||||
writeToG5Status: process.env.WRITE_TO_G5_STATUS !== 'false'
|
||||
};
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
export class BatchProcessor {
|
||||
constructor(dbManager, config, options = {}) {
|
||||
this.dbManager = dbManager;
|
||||
this.config = config;
|
||||
this.omitGuid = options.omitGuid || false;
|
||||
this.dbConfig = options.dbConfig || config.db;
|
||||
this.batchSize = options.batchSize || 500;
|
||||
this.flushInterval = options.flushInterval || 1000;
|
||||
this.targetName = options.targetName || 'action_db';
|
||||
this.buffer = [];
|
||||
this.timer = null;
|
||||
}
|
||||
@@ -39,12 +44,18 @@ export class BatchProcessor {
|
||||
}
|
||||
|
||||
try {
|
||||
await this.dbManager.insertRows({
|
||||
schema: this.config.db.schema,
|
||||
table: this.config.db.table,
|
||||
rows: allRows
|
||||
await this.dbManager.insertRows({
|
||||
schema: this.dbConfig.schema,
|
||||
table: this.dbConfig.table,
|
||||
rows: allRows,
|
||||
omitGuid: this.omitGuid
|
||||
});
|
||||
|
||||
|
||||
logger.info('Action batch flushed successfully', {
|
||||
target: this.targetName,
|
||||
rows: allRows.length
|
||||
});
|
||||
|
||||
// Resolve each item with its own row count
|
||||
currentBatch.forEach(item => item.resolve(item.rows.length));
|
||||
} catch (error) {
|
||||
@@ -61,7 +72,7 @@ export class BatchProcessor {
|
||||
cmd_word: sample.cmd_word
|
||||
} : null
|
||||
};
|
||||
|
||||
|
||||
// Reject all items in the batch
|
||||
currentBatch.forEach(item => item.reject(error));
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import pg from 'pg';
|
||||
import { config } from '../config/config.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
const { Pool } = pg;
|
||||
|
||||
@@ -45,31 +44,22 @@ export class DatabaseManager {
|
||||
});
|
||||
}
|
||||
|
||||
async insertRows({ schema, table, rows }) {
|
||||
async insertRows({ schema, table, rows, omitGuid = false }) {
|
||||
if (!rows || rows.length === 0) {
|
||||
return;
|
||||
}
|
||||
const currentColumns = omitGuid ? columns.filter(c => c !== 'guid') : columns;
|
||||
const values = [];
|
||||
const placeholders = rows.map((row, rowIndex) => {
|
||||
const offset = rowIndex * columns.length;
|
||||
columns.forEach((column) => {
|
||||
const offset = rowIndex * currentColumns.length;
|
||||
currentColumns.forEach((column) => {
|
||||
values.push(row[column] ?? null);
|
||||
});
|
||||
const params = columns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
|
||||
const params = currentColumns.map((_, columnIndex) => `$${offset + columnIndex + 1}`);
|
||||
return `(${params.join(', ')})`;
|
||||
});
|
||||
const statement = `INSERT INTO ${schema}.${table} (${columns.join(', ')}) VALUES ${placeholders.join(', ')}`;
|
||||
try {
|
||||
await this.pool.query(statement, values);
|
||||
} catch (error) {
|
||||
logger.error('Database insert failed', {
|
||||
error: error?.message,
|
||||
schema,
|
||||
table,
|
||||
rowsLength: rows.length
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
const statement = `INSERT INTO ${schema}.${table} (${currentColumns.join(', ')}) VALUES ${placeholders.join(', ')}`;
|
||||
await this.pool.query(statement, values);
|
||||
}
|
||||
|
||||
async testConnection() {
|
||||
|
||||
@@ -1,110 +0,0 @@
|
||||
import pg from 'pg';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import partitionManager from './partitionManager.js';
|
||||
import dbManager from './databaseManager.js';
|
||||
import { config } from '../config/config.js';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
class DatabaseInitializer {
|
||||
async initialize() {
|
||||
logger.info('Starting database initialization check...');
|
||||
|
||||
// 1. Check if database exists, create if not
|
||||
await this.ensureDatabaseExists();
|
||||
|
||||
// 2. Initialize Schema and Parent Table (if not exists)
|
||||
// Note: We need to use dbManager because it connects to the target database
|
||||
await this.ensureSchemaAndTable();
|
||||
|
||||
// 3. Ensure Partitions for the next month
|
||||
await partitionManager.ensurePartitions(30);
|
||||
|
||||
logger.info('Database initialization completed successfully.');
|
||||
}
|
||||
|
||||
async ensureDatabaseExists() {
|
||||
const { host, port, user, password, database, ssl } = config.db;
|
||||
|
||||
// Connect to 'postgres' database to check/create target database
|
||||
const client = new pg.Client({
|
||||
host,
|
||||
port,
|
||||
user,
|
||||
password,
|
||||
database: 'postgres',
|
||||
ssl: ssl ? { rejectUnauthorized: false } : false
|
||||
});
|
||||
|
||||
const maxRetries = 5;
|
||||
let retryCount = 0;
|
||||
|
||||
while (retryCount < maxRetries) {
|
||||
try {
|
||||
await client.connect();
|
||||
break;
|
||||
} catch (err) {
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
retryCount++;
|
||||
logger.warn(`Port conflict (EADDRINUSE) connecting to database, retrying (${retryCount}/${maxRetries})...`);
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const checkRes = await client.query(
|
||||
`SELECT 1 FROM pg_database WHERE datname = $1`,
|
||||
[database]
|
||||
);
|
||||
|
||||
if (checkRes.rowCount === 0) {
|
||||
logger.info(`Database '${database}' does not exist. Creating...`);
|
||||
// CREATE DATABASE cannot run inside a transaction block
|
||||
await client.query(`CREATE DATABASE "${database}"`);
|
||||
logger.info(`Database '${database}' created.`);
|
||||
} else {
|
||||
logger.info(`Database '${database}' already exists.`);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error ensuring database exists:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
await client.end();
|
||||
}
|
||||
}
|
||||
|
||||
async ensureSchemaAndTable() {
|
||||
// dbManager connects to the target database
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
const sqlPathCandidates = [
|
||||
path.resolve(process.cwd(), 'scripts/init_db.sql'),
|
||||
path.resolve(__dirname, '../scripts/init_db.sql'),
|
||||
path.resolve(__dirname, '../../scripts/init_db.sql')
|
||||
];
|
||||
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
|
||||
if (!sqlPath) {
|
||||
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
|
||||
}
|
||||
const sql = fs.readFileSync(sqlPath, 'utf8');
|
||||
|
||||
logger.info('Executing init_db.sql...');
|
||||
await client.query(sql);
|
||||
logger.info('Schema and parent table initialized.');
|
||||
} catch (err) {
|
||||
logger.error('Error initializing schema and table:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new DatabaseInitializer();
|
||||
@@ -1,167 +0,0 @@
|
||||
import { logger } from '../utils/logger.js';
|
||||
import dbManager from './databaseManager.js';
|
||||
|
||||
const PARENT_TABLE = 'rcu_action.rcu_action_events';
|
||||
const PARTITION_TABLESPACE = 'ts_hot';
|
||||
const PARTITION_SCHEMA = 'rcu_action';
|
||||
const PARTITION_PREFIX = 'rcu_action_events_';
|
||||
const PARENT_INDEX_STATEMENTS = [
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_hotel_id ON rcu_action.rcu_action_events (hotel_id);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_room_id ON rcu_action.rcu_action_events (room_id);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_device_id ON rcu_action.rcu_action_events (device_id);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_direction ON rcu_action.rcu_action_events (direction);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_cmd_word ON rcu_action.rcu_action_events (cmd_word);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_action_type ON rcu_action.rcu_action_events (action_type);',
|
||||
'CREATE INDEX IF NOT EXISTS idx_rcu_action_query_main ON rcu_action.rcu_action_events (hotel_id, room_id, ts_ms DESC);'
|
||||
];
|
||||
|
||||
class PartitionManager {
|
||||
toSqlTextLiteral(value) {
|
||||
return `'${String(value).replace(/'/g, "''")}'`;
|
||||
}
|
||||
|
||||
buildForceTablespaceSql({ schema, partition, tablespace }) {
|
||||
return `
|
||||
DO $$
|
||||
DECLARE
|
||||
v_schema text := ${this.toSqlTextLiteral(schema)};
|
||||
v_partition text := ${this.toSqlTextLiteral(partition)};
|
||||
v_hot text := ${this.toSqlTextLiteral(tablespace)};
|
||||
v_part_oid oid;
|
||||
v_toast_oid oid;
|
||||
r record;
|
||||
BEGIN
|
||||
SELECT c.oid INTO v_part_oid
|
||||
FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname = v_schema AND c.relname = v_partition AND c.relkind = 'r';
|
||||
|
||||
IF v_part_oid IS NULL THEN
|
||||
RAISE EXCEPTION 'partition %.% not found', v_schema, v_partition;
|
||||
END IF;
|
||||
|
||||
-- 1) 分区表对象 -> hot
|
||||
EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', v_schema, v_partition, v_hot);
|
||||
|
||||
-- 2) 分区全部索引 -> hot
|
||||
FOR r IN
|
||||
SELECT idxn.nspname AS index_schema, i.relname AS index_name
|
||||
FROM pg_index x
|
||||
JOIN pg_class t ON t.oid = x.indrelid
|
||||
JOIN pg_namespace nt ON nt.oid = t.relnamespace
|
||||
JOIN pg_class i ON i.oid = x.indexrelid
|
||||
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
|
||||
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
|
||||
WHERE nt.nspname = v_schema
|
||||
AND t.relname = v_partition
|
||||
AND COALESCE(ts.spcname, 'pg_default') <> v_hot
|
||||
LOOP
|
||||
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot);
|
||||
END LOOP;
|
||||
|
||||
-- 3) TOAST 表 + TOAST 全部索引 -> hot(若存在)
|
||||
SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid = v_part_oid;
|
||||
IF v_toast_oid IS NOT NULL AND v_toast_oid <> 0 THEN
|
||||
EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, v_hot);
|
||||
|
||||
FOR r IN
|
||||
SELECT idxn.nspname AS index_schema, i.relname AS index_name
|
||||
FROM pg_index x
|
||||
JOIN pg_class i ON i.oid = x.indexrelid
|
||||
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
|
||||
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
|
||||
WHERE x.indrelid = v_toast_oid
|
||||
AND COALESCE(ts.spcname, 'pg_default') <> v_hot
|
||||
LOOP
|
||||
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, v_hot);
|
||||
END LOOP;
|
||||
END IF;
|
||||
|
||||
-- 4) 统计信息
|
||||
EXECUTE format('ANALYZE %I.%I', v_schema, v_partition);
|
||||
END $$;
|
||||
`;
|
||||
}
|
||||
|
||||
async ensureParentIndexes(client) {
|
||||
for (const sql of PARENT_INDEX_STATEMENTS) {
|
||||
await client.query(sql);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the start and end timestamps (milliseconds) for a given date.
|
||||
* @param {Date} date - The date to calculate for.
|
||||
* @returns {Object} { startMs, endMs, partitionSuffix }
|
||||
*/
|
||||
getPartitionInfo(date) {
|
||||
const yyyy = date.getFullYear();
|
||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(date.getDate()).padStart(2, '0');
|
||||
const partitionSuffix = `${yyyy}${mm}${dd}`;
|
||||
|
||||
const start = new Date(date);
|
||||
start.setHours(0, 0, 0, 0);
|
||||
const startMs = start.getTime();
|
||||
|
||||
const end = new Date(date);
|
||||
end.setDate(end.getDate() + 1);
|
||||
end.setHours(0, 0, 0, 0);
|
||||
const endMs = end.getTime();
|
||||
|
||||
return { startMs, endMs, partitionSuffix };
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure partitions exist for the next N days.
|
||||
* @param {number} daysAhead - Number of days to pre-create.
|
||||
*/
|
||||
async ensurePartitions(daysAhead = 30) {
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
logger.info(`Starting partition check for the next ${daysAhead} days...`);
|
||||
await this.ensureParentIndexes(client);
|
||||
const now = new Date();
|
||||
|
||||
for (let i = 0; i < daysAhead; i++) {
|
||||
const targetDate = new Date(now);
|
||||
targetDate.setDate(now.getDate() + i);
|
||||
|
||||
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
|
||||
const partitionTable = `${PARTITION_PREFIX}${partitionSuffix}`;
|
||||
const partitionName = `${PARTITION_SCHEMA}.${partitionTable}`;
|
||||
|
||||
// Check if partition exists
|
||||
const checkSql = `
|
||||
SELECT to_regclass($1) as exists;
|
||||
`;
|
||||
const checkRes = await client.query(checkSql, [partitionName]);
|
||||
|
||||
if (!checkRes.rows[0].exists) {
|
||||
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
||||
const createSql = `
|
||||
CREATE TABLE IF NOT EXISTS ${partitionName}
|
||||
PARTITION OF ${PARENT_TABLE}
|
||||
FOR VALUES FROM (${startMs}) TO (${endMs})
|
||||
TABLESPACE ${PARTITION_TABLESPACE};
|
||||
`;
|
||||
await client.query(createSql);
|
||||
|
||||
const forceTablespaceSql = this.buildForceTablespaceSql({
|
||||
schema: PARTITION_SCHEMA,
|
||||
partition: partitionTable,
|
||||
tablespace: PARTITION_TABLESPACE
|
||||
});
|
||||
await client.query(forceTablespaceSql);
|
||||
}
|
||||
}
|
||||
logger.info('Partition check completed.');
|
||||
} catch (err) {
|
||||
logger.error('Error ensuring partitions:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new PartitionManager();
|
||||
@@ -3,19 +3,19 @@
|
||||
*
|
||||
* Manages an independent PostgreSQL connection pool for
|
||||
* the room_status.room_status_moment snapshot table.
|
||||
* Provides batch upsert with JSONB merge and auto-partition creation.
|
||||
* Provides batch upsert with JSONB merge.
|
||||
*/
|
||||
import pg from 'pg';
|
||||
import { randomUUID } from 'crypto';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
const { Pool } = pg;
|
||||
|
||||
export class RoomStatusManager {
|
||||
/**
|
||||
* @param {Object} dbConfig - roomStatusDb config from config.js
|
||||
* @param {Object} [options] - additional configuration like omitGuid
|
||||
*/
|
||||
constructor(dbConfig) {
|
||||
constructor(dbConfig, options = {}) {
|
||||
this.pool = new Pool({
|
||||
host: dbConfig.host,
|
||||
port: dbConfig.port,
|
||||
@@ -28,8 +28,7 @@ export class RoomStatusManager {
|
||||
this.schema = dbConfig.schema;
|
||||
this.table = dbConfig.table;
|
||||
this.fullTableName = `${this.schema}.${this.table}`;
|
||||
// Track which partitions we have already ensured
|
||||
this.knownPartitions = new Set();
|
||||
this.omitGuid = options.omitGuid || false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -37,19 +36,10 @@ export class RoomStatusManager {
|
||||
* Uses ON CONFLICT for atomic merge.
|
||||
*
|
||||
* @param {Array<Object>} rows - Array of merged status objects
|
||||
* Each: { hotel_id, room_id, device_id, ts_ms, sys_lock_status, dev_loops, faulty_device_count }
|
||||
* Each: { hotel_id, room_id, device_id, ts_ms, ip, sys_lock_status, dev_loops, faulty_device_count }
|
||||
*/
|
||||
async upsertBatch(rows) {
|
||||
if (!rows || rows.length === 0) return;
|
||||
|
||||
// Pre-ensure all needed partitions exist before attempting upsert
|
||||
const newHotelIds = [...new Set(rows.map(r => r.hotel_id))]
|
||||
.filter(id => !this.knownPartitions.has(id));
|
||||
|
||||
if (newHotelIds.length > 0) {
|
||||
await this._ensurePartitionsBatch(newHotelIds);
|
||||
}
|
||||
|
||||
await this._doUpsert(rows);
|
||||
}
|
||||
|
||||
@@ -62,29 +52,57 @@ export class RoomStatusManager {
|
||||
|
||||
for (let i = 0; i < rows.length; i++) {
|
||||
const row = rows[i];
|
||||
const offset = i * 8; // Changed from 9 to 8
|
||||
values.push(
|
||||
row.guid || randomUUID(), // $1
|
||||
row.ts_ms, // $2
|
||||
row.hotel_id, // $3
|
||||
row.room_id, // $4
|
||||
row.device_id, // $5
|
||||
row.sys_lock_status, // $6
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7 (was $8)
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null // $8 (was $9)
|
||||
);
|
||||
const p = (n) => `$${offset + n}`;
|
||||
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb)`);
|
||||
const paramsPerRow = this.omitGuid ? 9 : 10;
|
||||
const offset = i * paramsPerRow;
|
||||
|
||||
if (this.omitGuid) {
|
||||
values.push(
|
||||
row.ts_ms, // $1
|
||||
row.hotel_id, // $2
|
||||
row.room_id, // $3
|
||||
row.device_id, // $4
|
||||
row.ip ?? null, // $5
|
||||
row.sys_lock_status, // $6
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $7
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $8
|
||||
1 // $9 online_status
|
||||
);
|
||||
const p = (n) => `$${offset + n}`;
|
||||
placeholders.push(`(${p(1)}, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}::jsonb, ${p(8)}::jsonb, ${p(9)})`);
|
||||
} else {
|
||||
values.push(
|
||||
row.guid || randomUUID(), // $1
|
||||
row.ts_ms, // $2
|
||||
row.hotel_id, // $3
|
||||
row.room_id, // $4
|
||||
row.device_id, // $5
|
||||
row.ip ?? null, // $6
|
||||
row.sys_lock_status, // $7
|
||||
row.dev_loops ? JSON.stringify(row.dev_loops) : null, // $8
|
||||
row.faulty_device_count ? JSON.stringify(row.faulty_device_count) : null, // $9
|
||||
1 // $10 online_status
|
||||
);
|
||||
const p = (n) => `$${offset + n}`;
|
||||
placeholders.push(`(${p(1)}::uuid, ${p(2)}, ${p(3)}, ${p(4)}, ${p(5)}, ${p(6)}, ${p(7)}, ${p(8)}::jsonb, ${p(9)}::jsonb, ${p(10)})`);
|
||||
}
|
||||
}
|
||||
|
||||
const insertColumns = this.omitGuid
|
||||
? 'ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status'
|
||||
: 'guid, ts_ms, hotel_id, room_id, device_id, ip, sys_lock_status, dev_loops, faulty_device_count, online_status';
|
||||
|
||||
const conflictTarget = this.omitGuid
|
||||
? '(hotel_id, room_id)'
|
||||
: '(hotel_id, room_id, device_id)';
|
||||
|
||||
const sql = `
|
||||
INSERT INTO ${this.fullTableName} (
|
||||
guid, ts_ms, hotel_id, room_id, device_id,
|
||||
sys_lock_status, dev_loops, faulty_device_count
|
||||
) VALUES ${placeholders.join(', ')}
|
||||
ON CONFLICT (hotel_id, room_id, device_id)
|
||||
INSERT INTO ${this.fullTableName} (${insertColumns}) VALUES ${placeholders.join(', ')}
|
||||
ON CONFLICT ${conflictTarget}
|
||||
DO UPDATE SET
|
||||
ts_ms = GREATEST(${this.fullTableName}.ts_ms, EXCLUDED.ts_ms),
|
||||
ts_ms = EXCLUDED.ts_ms,
|
||||
ip = COALESCE(EXCLUDED.ip, ${this.fullTableName}.ip),
|
||||
online_status = 1,
|
||||
device_id = EXCLUDED.device_id,
|
||||
sys_lock_status = COALESCE(EXCLUDED.sys_lock_status, ${this.fullTableName}.sys_lock_status),
|
||||
dev_loops = CASE
|
||||
WHEN EXCLUDED.dev_loops IS NULL THEN ${this.fullTableName}.dev_loops
|
||||
@@ -95,54 +113,6 @@ export class RoomStatusManager {
|
||||
|
||||
await this.pool.query(sql, values);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an error is a missing partition error.
|
||||
*/
|
||||
_isPartitionMissingError(error) {
|
||||
const msg = error?.message || '';
|
||||
return msg.includes('no partition') || msg.includes('routing') ||
|
||||
(error?.code === '23514' && msg.includes('partition'));
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch-create LIST partitions for multiple hotel_ids in a single connection.
|
||||
* Uses CREATE TABLE IF NOT EXISTS (idempotent) — no check query needed.
|
||||
*/
|
||||
async _ensurePartitionsBatch(hotelIds) {
|
||||
const client = await this.pool.connect();
|
||||
try {
|
||||
for (const hotelId of hotelIds) {
|
||||
const partitionName = `${this.schema}.${this.table}_h${hotelId}`;
|
||||
try {
|
||||
await client.query(
|
||||
`CREATE TABLE IF NOT EXISTS ${partitionName} PARTITION OF ${this.fullTableName} FOR VALUES IN (${hotelId})`
|
||||
);
|
||||
this.knownPartitions.add(hotelId);
|
||||
} catch (err) {
|
||||
// Partition may already exist (race condition) — safe to ignore
|
||||
if (!err.message?.includes('already exists')) {
|
||||
logger.error('Error creating partition', { error: err?.message, hotelId });
|
||||
}
|
||||
this.knownPartitions.add(hotelId);
|
||||
}
|
||||
}
|
||||
if (hotelIds.length > 0) {
|
||||
logger.info(`Ensured ${hotelIds.length} room_status partitions`);
|
||||
}
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure a LIST partition exists for the given hotel_id (single).
|
||||
*/
|
||||
async ensurePartition(hotelId) {
|
||||
if (this.knownPartitions.has(hotelId)) return;
|
||||
await this._ensurePartitionsBatch([hotelId]);
|
||||
}
|
||||
|
||||
async testConnection() {
|
||||
try {
|
||||
await this.pool.query('SELECT 1');
|
||||
|
||||
@@ -22,6 +22,8 @@ export class StatusBatchProcessor {
|
||||
this.flushInterval = options.flushInterval || 500;
|
||||
this.maxBufferSize = options.maxBufferSize || 200;
|
||||
this.redisIntegration = options.redisIntegration || null;
|
||||
this.dedupeByRoom = options.dedupeByRoom || false;
|
||||
this.targetName = options.targetName || 'room_status';
|
||||
|
||||
/** @type {Map<string, Object>} compositeKey -> mergedState */
|
||||
this.buffer = new Map();
|
||||
@@ -32,6 +34,9 @@ export class StatusBatchProcessor {
|
||||
* Build composite key for deduplication.
|
||||
*/
|
||||
_key(update) {
|
||||
if (this.dedupeByRoom) {
|
||||
return `${update.hotel_id}:${update.room_id}`;
|
||||
}
|
||||
return `${update.hotel_id}:${update.room_id}:${update.device_id}`;
|
||||
}
|
||||
|
||||
@@ -42,6 +47,17 @@ export class StatusBatchProcessor {
|
||||
add(update) {
|
||||
if (!update) return;
|
||||
|
||||
if (this.targetName.startsWith('g5:') && !update.ip) {
|
||||
logger.info('Status update skipped for empty ip', {
|
||||
target: this.targetName,
|
||||
hotel_id: update.hotel_id,
|
||||
room_id: update.room_id,
|
||||
device_id: update.device_id,
|
||||
ts_ms: update.ts_ms
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const key = this._key(update);
|
||||
const existing = this.buffer.get(key);
|
||||
|
||||
@@ -49,11 +65,21 @@ export class StatusBatchProcessor {
|
||||
// Merge: take latest ts_ms
|
||||
existing.ts_ms = Math.max(existing.ts_ms, update.ts_ms);
|
||||
|
||||
// device_id: prefer newer (important for dedupeByRoom feature)
|
||||
if (update.device_id) {
|
||||
existing.device_id = update.device_id;
|
||||
}
|
||||
|
||||
// sys_lock_status: prefer newer non-null value
|
||||
if (update.sys_lock_status != null) {
|
||||
existing.sys_lock_status = update.sys_lock_status;
|
||||
}
|
||||
|
||||
// ip: prefer newer non-null value
|
||||
if (update.ip != null) {
|
||||
existing.ip = update.ip;
|
||||
}
|
||||
|
||||
// dev_loops: merge keys (new overwrites old for same key)
|
||||
if (update.dev_loops) {
|
||||
existing.dev_loops = existing.dev_loops
|
||||
@@ -97,11 +123,16 @@ export class StatusBatchProcessor {
|
||||
this.buffer.clear();
|
||||
|
||||
try {
|
||||
logger.info('StatusBatchProcessor flushing rows', { count: rows.length, sampleRowKeys: rows.map(r => r.device_id).slice(0, 5) });
|
||||
logger.info('StatusBatchProcessor flushing rows', {
|
||||
target: this.targetName,
|
||||
count: rows.length,
|
||||
sampleRowKeys: rows.map(r => r.device_id).slice(0, 5)
|
||||
});
|
||||
await this.roomStatusManager.upsertBatch(rows);
|
||||
logger.info('StatusBatchProcessor flushed successfully', { count: rows.length });
|
||||
logger.info('StatusBatchProcessor flushed successfully', { target: this.targetName, count: rows.length });
|
||||
} catch (error) {
|
||||
logger.error('StatusBatchProcessor flush failed', {
|
||||
target: this.targetName,
|
||||
error: error?.message,
|
||||
stack: error?.stack,
|
||||
count: rows.length
|
||||
@@ -112,6 +143,7 @@ export class StatusBatchProcessor {
|
||||
try {
|
||||
await this.redisIntegration.error('StatusBatchProcessor flush failed', {
|
||||
module: 'room_status',
|
||||
target: this.targetName,
|
||||
count: rows.length,
|
||||
stack: error?.message
|
||||
});
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import cron from 'node-cron';
|
||||
import { config } from './config/config.js';
|
||||
import dbManager from './db/databaseManager.js';
|
||||
import dbInitializer from './db/initializer.js';
|
||||
import partitionManager from './db/partitionManager.js';
|
||||
import dbManager, { DatabaseManager } from './db/databaseManager.js';
|
||||
import projectMetadata from './cache/projectMetadata.js';
|
||||
import { createKafkaConsumers } from './kafka/consumer.js';
|
||||
import { processKafkaMessage } from './processor/index.js';
|
||||
@@ -17,41 +15,12 @@ import { logger } from './utils/logger.js';
|
||||
import { BatchProcessor } from './db/batchProcessor.js';
|
||||
|
||||
const bootstrap = async () => {
|
||||
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
|
||||
await dbInitializer.initialize();
|
||||
|
||||
// 0.1 Initialize Project Metadata Cache
|
||||
// 0. Initialize Project Metadata Cache
|
||||
await projectMetadata.init();
|
||||
|
||||
// Metric Collector
|
||||
const metricCollector = new MetricCollector();
|
||||
|
||||
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
|
||||
cron.schedule('0 0 * * *', async () => {
|
||||
logger.info('Running scheduled partition maintenance...');
|
||||
try {
|
||||
await partitionManager.ensurePartitions(30);
|
||||
} catch (err) {
|
||||
logger.error('Scheduled partition maintenance failed', err);
|
||||
}
|
||||
});
|
||||
|
||||
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
||||
// Moved after redisIntegration initialization
|
||||
|
||||
|
||||
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
|
||||
// In databaseManager.js it exports `dbManager` instance by default.
|
||||
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
|
||||
// Let's check `databaseManager.js` content.
|
||||
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
|
||||
// If `databaseManager.js` exports an instance as default, I should use that.
|
||||
// If it exports a class, I should instantiate it.
|
||||
|
||||
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
|
||||
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
|
||||
// I need to verify `databaseManager.js`.
|
||||
|
||||
const redisClient = await createRedisClient(config.redis);
|
||||
const redisIntegration = new RedisIntegration(
|
||||
redisClient,
|
||||
@@ -61,15 +30,28 @@ const bootstrap = async () => {
|
||||
redisIntegration.startHeartbeat();
|
||||
|
||||
// 1.2 Initialize Room Status Manager (independent pool for snapshot table)
|
||||
const roomStatusManager = new RoomStatusManager(config.roomStatusDb);
|
||||
const statusBatchProcessor = new StatusBatchProcessor(roomStatusManager, {
|
||||
const roomStatusManagerG4 = new RoomStatusManager(config.roomStatusDb);
|
||||
const statusBatchProcessorG4 = new StatusBatchProcessor(roomStatusManagerG4, {
|
||||
flushInterval: 500,
|
||||
maxBufferSize: 200,
|
||||
targetName: 'g4:room_status.room_status_moment',
|
||||
redisIntegration
|
||||
});
|
||||
|
||||
let roomStatusManagerG5 = null;
|
||||
let statusBatchProcessorG5 = null;
|
||||
if (config.roomStatusDbG5.enabled) {
|
||||
roomStatusManagerG5 = new RoomStatusManager(config.roomStatusDbG5, { omitGuid: true });
|
||||
statusBatchProcessorG5 = new StatusBatchProcessor(roomStatusManagerG5, {
|
||||
flushInterval: 500,
|
||||
maxBufferSize: 200,
|
||||
targetName: `g5:${config.roomStatusDbG5.schema}.${config.roomStatusDbG5.table}`,
|
||||
redisIntegration
|
||||
});
|
||||
}
|
||||
logger.info('Room Status sync pipeline initialized');
|
||||
|
||||
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
||||
// 1. Setup Metric Reporting Cron Job (Every minute)
|
||||
cron.schedule('* * * * *', async () => {
|
||||
const metrics = metricCollector.getAndReset();
|
||||
const report = `[Minute Metrics] Pulled: ${metrics.kafka_pulled}, Parse Error: ${metrics.parse_error}, Inserted: ${metrics.db_inserted}, Failed: ${metrics.db_failed}`;
|
||||
@@ -85,10 +67,24 @@ const bootstrap = async () => {
|
||||
|
||||
const errorQueueKey = buildErrorQueueKey(config.redis.projectName);
|
||||
|
||||
const batchProcessor = new BatchProcessor(dbManager, config, {
|
||||
batchSize: config.kafka.maxInFlight
|
||||
const batchProcessorG4 = new BatchProcessor(dbManager, config, {
|
||||
batchSize: config.kafka.maxInFlight,
|
||||
targetName: 'g4:rcu_action.rcu_action_events',
|
||||
dbConfig: config.db
|
||||
});
|
||||
|
||||
let dbManagerG5 = null;
|
||||
let batchProcessorG5 = null;
|
||||
if (config.dbG5.enabled) {
|
||||
dbManagerG5 = new DatabaseManager(config.dbG5);
|
||||
batchProcessorG5 = new BatchProcessor(dbManagerG5, config, {
|
||||
batchSize: config.kafka.maxInFlight,
|
||||
omitGuid: true,
|
||||
targetName: 'g5:rcu_action.rcu_action_events_g5',
|
||||
dbConfig: config.dbG5
|
||||
});
|
||||
}
|
||||
|
||||
const handleMessage = async (message) => {
|
||||
if (message.topic) {
|
||||
metricCollector.increment('kafka_pulled');
|
||||
@@ -100,38 +96,58 @@ const bootstrap = async () => {
|
||||
const messageKey = Buffer.isBuffer(message.key)
|
||||
? message.key.toString('utf8')
|
||||
: message.key;
|
||||
if (config.kafka.logMessages) {
|
||||
logger.info('Kafka message received', {
|
||||
topic: message.topic,
|
||||
partition: message.partition,
|
||||
offset: message.offset,
|
||||
key: messageKey,
|
||||
value: messageValue
|
||||
});
|
||||
} else {
|
||||
logger.info('Kafka message received', {
|
||||
topic: message.topic,
|
||||
partition: message.partition,
|
||||
offset: message.offset,
|
||||
key: messageKey,
|
||||
valueLength: typeof messageValue === 'string' ? messageValue.length : null
|
||||
});
|
||||
}
|
||||
const { rows, payload } = await processKafkaMessage({ message });
|
||||
const inserted = await batchProcessor.add({ rows });
|
||||
metricCollector.increment('db_inserted');
|
||||
|
||||
// Fire-and-forget: extract status and push to StatusBatchProcessor
|
||||
// Status sync must be independent from action-event DB write failures.
|
||||
try {
|
||||
const statusUpdate = extractStatusUpdate(payload);
|
||||
if (statusUpdate) {
|
||||
statusBatchProcessor.add(statusUpdate);
|
||||
if (config.roomStatusDb.enabled && config.writeToG4Status) {
|
||||
statusBatchProcessorG4.add(statusUpdate);
|
||||
}
|
||||
if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled && config.writeToG5Status) {
|
||||
statusBatchProcessorG5.add(statusUpdate);
|
||||
}
|
||||
}
|
||||
} catch (statusErr) {
|
||||
logger.error('Status extraction failed (non-blocking)', { error: statusErr?.message });
|
||||
}
|
||||
|
||||
logger.info('Kafka message processed', { inserted });
|
||||
let inserted = 0;
|
||||
const dbActions = [];
|
||||
if (config.db.enabled && config.writeToG4Action) {
|
||||
dbActions.push(batchProcessorG4.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
|
||||
}
|
||||
if (batchProcessorG5 && config.dbG5.enabled && config.writeToG5Action) {
|
||||
dbActions.push(batchProcessorG5.add({ rows }).then(c => { inserted = Math.max(inserted, c); }));
|
||||
}
|
||||
|
||||
const dbResults = await Promise.allSettled(dbActions);
|
||||
const failedDbResults = dbResults.filter(r => r.status === 'rejected');
|
||||
|
||||
if (failedDbResults.length > 0) {
|
||||
metricCollector.increment('db_failed');
|
||||
failedDbResults.forEach((result) => {
|
||||
const err = result.reason;
|
||||
logger.warn('Action event insert failed and skipped (non-blocking)', {
|
||||
error: err?.message,
|
||||
type: err?.type,
|
||||
dbContext: err?.dbContext
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (dbResults.some(r => r.status === 'fulfilled')) {
|
||||
metricCollector.increment('db_inserted');
|
||||
}
|
||||
|
||||
logger.info('Kafka message processed', {
|
||||
inserted,
|
||||
statusTargets: {
|
||||
g4: config.roomStatusDb.enabled,
|
||||
g5: Boolean(statusBatchProcessorG5 && config.roomStatusDbG5.enabled)
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
if (error.type === 'PARSE_ERROR') {
|
||||
metricCollector.increment('parse_error');
|
||||
@@ -189,13 +205,20 @@ const bootstrap = async () => {
|
||||
const healthCheck = {
|
||||
shouldPause: async (error) => {
|
||||
if (error?.type === 'DB_ERROR') {
|
||||
const isConnected = await dbManager.testConnection();
|
||||
return !isConnected;
|
||||
const checks = [];
|
||||
if (config.db.enabled) checks.push(dbManager.testConnection());
|
||||
if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection());
|
||||
const results = await Promise.all(checks);
|
||||
return results.some(res => !res);
|
||||
}
|
||||
return false;
|
||||
},
|
||||
check: async () => {
|
||||
return await dbManager.testConnection();
|
||||
const checks = [];
|
||||
if (config.db.enabled) checks.push(dbManager.testConnection());
|
||||
if (dbManagerG5 && config.dbG5.enabled) checks.push(dbManagerG5.testConnection());
|
||||
const results = await Promise.all(checks);
|
||||
return results.every(res => res === true);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -241,16 +264,27 @@ const bootstrap = async () => {
|
||||
|
||||
// 4. Flush and close Room Status pipeline
|
||||
try {
|
||||
await statusBatchProcessor.flush();
|
||||
await roomStatusManager.close();
|
||||
logger.info('Room Status pipeline closed');
|
||||
if (config.roomStatusDb.enabled) {
|
||||
await statusBatchProcessorG4.flush();
|
||||
await roomStatusManagerG4.close();
|
||||
}
|
||||
if (statusBatchProcessorG5 && config.roomStatusDbG5.enabled) {
|
||||
await statusBatchProcessorG5.flush();
|
||||
await roomStatusManagerG5.close();
|
||||
}
|
||||
logger.info('Room Status pipelines closed');
|
||||
} catch (rsErr) {
|
||||
logger.error('Error closing Room Status pipeline', { error: rsErr?.message });
|
||||
}
|
||||
|
||||
// 5. Close Database Pool
|
||||
await dbManager.close();
|
||||
logger.info('Database connection closed');
|
||||
// 5. Close Database Pools
|
||||
if (config.db.enabled) {
|
||||
await dbManager.close();
|
||||
}
|
||||
if (dbManagerG5 && config.dbG5.enabled) {
|
||||
await dbManagerG5.close();
|
||||
}
|
||||
logger.info('Database connections closed');
|
||||
|
||||
process.exit(0);
|
||||
} catch (err) {
|
||||
|
||||
@@ -221,7 +221,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
direction: normalizedDirection,
|
||||
cmd_word: normalizedCmdWord,
|
||||
frame_id: frameId,
|
||||
udp_raw: udpRaw,
|
||||
udp_raw: null,
|
||||
sys_lock_status: sysLockStatus ?? null,
|
||||
report_count: reportCount ?? null,
|
||||
fault_count: faultCount ?? null,
|
||||
@@ -260,10 +260,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
|
||||
// Logic 1: 0x36 Status/Fault Report
|
||||
if (messageType === '36上报') {
|
||||
const details = {
|
||||
device_list: deviceList,
|
||||
fault_list: faultList
|
||||
};
|
||||
const details = null;
|
||||
|
||||
// Process device status list
|
||||
if (deviceList.length > 0) {
|
||||
@@ -318,9 +315,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
|
||||
// Logic 2: 0x0F Control Command
|
||||
if (messageType === '0F下发') {
|
||||
const details = {
|
||||
control_list: controlList
|
||||
};
|
||||
const details = null;
|
||||
|
||||
if (controlList.length > 0) {
|
||||
controlList.forEach(control => {
|
||||
@@ -356,7 +351,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
else if (messageType === '0FACK') {
|
||||
const { control_list: controls = [] } = payload;
|
||||
if (Array.isArray(controls)) {
|
||||
const details = { control_list: controls };
|
||||
const details = null;
|
||||
controls.forEach((control) => {
|
||||
rows.push({
|
||||
...commonFields,
|
||||
|
||||
@@ -20,6 +20,12 @@ const pad3 = (val) => String(val).padStart(3, '0');
|
||||
const buildLoopKey = (devType, devAddr, devLoop) =>
|
||||
`${pad3(devType)}${pad3(devAddr)}${pad3(devLoop)}`;
|
||||
|
||||
const normalizeIp = (value) => {
|
||||
if (value == null) return null;
|
||||
const text = String(value).trim();
|
||||
return text.length > 0 ? text : null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Extract a status update object from a validated Kafka payload.
|
||||
*
|
||||
@@ -34,14 +40,18 @@ export const extractStatusUpdate = (payload) => {
|
||||
room_id,
|
||||
device_id,
|
||||
ts_ms,
|
||||
ip,
|
||||
sys_lock_status,
|
||||
device_list = [],
|
||||
fault_list = [],
|
||||
control_list = [],
|
||||
extra = {},
|
||||
direction,
|
||||
cmd_word
|
||||
} = payload;
|
||||
|
||||
const resolvedIp = normalizeIp(ip ?? extra?.ip ?? null);
|
||||
|
||||
// Must have identity fields
|
||||
if (hotel_id == null || !room_id || !device_id || !ts_ms) {
|
||||
return null;
|
||||
@@ -98,6 +108,7 @@ export const extractStatusUpdate = (payload) => {
|
||||
room_id: String(room_id),
|
||||
device_id: String(device_id),
|
||||
ts_ms,
|
||||
ip: resolvedIp,
|
||||
sys_lock_status: sys_lock_status ?? null,
|
||||
dev_loops: devLoops,
|
||||
faulty_device_count: faultyDeviceCount
|
||||
|
||||
@@ -47,6 +47,7 @@ export const kafkaPayloadSchema = z.object({
|
||||
cmd_word: z.union([z.string(), z.number()]).transform(val => String(val)),
|
||||
frame_id: z.number(),
|
||||
udp_raw: z.string(),
|
||||
ip: z.string().optional().nullable(),
|
||||
|
||||
// Optional Statistical/Status Fields
|
||||
sys_lock_status: z.number().optional().nullable(),
|
||||
|
||||
@@ -28,14 +28,15 @@ describe('BatchProcessor', () => {
|
||||
expect(dbManager.insertRows).not.toHaveBeenCalled();
|
||||
|
||||
const p3 = batchProcessor.add({ rows: ['r3'] });
|
||||
|
||||
|
||||
// Wait for microtasks
|
||||
await Promise.resolve();
|
||||
|
||||
|
||||
expect(dbManager.insertRows).toHaveBeenCalledTimes(1);
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
omitGuid: false,
|
||||
rows: ['r1', 'r2', 'r3']
|
||||
});
|
||||
|
||||
@@ -50,7 +51,7 @@ describe('BatchProcessor', () => {
|
||||
expect(dbManager.insertRows).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
|
||||
// Wait for microtasks
|
||||
await Promise.resolve();
|
||||
|
||||
@@ -58,6 +59,7 @@ describe('BatchProcessor', () => {
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
omitGuid: false,
|
||||
rows: ['r1']
|
||||
});
|
||||
|
||||
@@ -87,6 +89,7 @@ describe('BatchProcessor', () => {
|
||||
expect(dbManager.insertRows).toHaveBeenCalledWith({
|
||||
schema: 'test_schema',
|
||||
table: 'test_table',
|
||||
omitGuid: false,
|
||||
rows: ['r1', 'r2', 'r3']
|
||||
});
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ describe('Processor Logic', () => {
|
||||
expect(rows[0].action_type).toBe('设备回路状态');
|
||||
expect(rows[0].dev_addr).toBe(10);
|
||||
expect(rows[1].dev_addr).toBe(11);
|
||||
expect(rows[0].details.device_list).toHaveLength(2);
|
||||
expect(rows[0].details).toBeNull();
|
||||
});
|
||||
|
||||
it('should handle 0x36 Fault Report', () => {
|
||||
@@ -162,7 +162,7 @@ describe('Processor Logic', () => {
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].udp_raw).toBe(expectedBase64);
|
||||
expect(rows[0].udp_raw).toBeNull();
|
||||
});
|
||||
|
||||
it('should keep udp_raw unchanged when input is not hex string', () => {
|
||||
@@ -178,7 +178,7 @@ describe('Processor Logic', () => {
|
||||
|
||||
const rows = buildRowsFromPayload(payload);
|
||||
|
||||
expect(rows[0].udp_raw).toBe('YWJjMTIz');
|
||||
expect(rows[0].udp_raw).toBeNull();
|
||||
});
|
||||
|
||||
it('should default extra to empty object when not provided', () => {
|
||||
@@ -273,7 +273,7 @@ describe('Processor Logic - 0x0E Support', () => {
|
||||
expect(rows[0].dev_addr).toBe(10);
|
||||
expect(rows[0].cmd_word).toBe('0x0e'); // Normalized
|
||||
expect(rows[1].dev_addr).toBe(11);
|
||||
expect(rows[0].details.device_list).toHaveLength(2);
|
||||
expect(rows[0].details).toBeNull();
|
||||
});
|
||||
|
||||
it('should handle 0x0E Fault Report', () => {
|
||||
|
||||
@@ -25,6 +25,7 @@ describe('StatusBatchProcessor', () => {
|
||||
room_id: '8001',
|
||||
device_id: 'dev_001',
|
||||
ts_ms: 1700000000000,
|
||||
ip: '10.1.2.3',
|
||||
sys_lock_status: null,
|
||||
dev_loops: null,
|
||||
faulty_device_count: null,
|
||||
@@ -105,6 +106,29 @@ describe('StatusBatchProcessor', () => {
|
||||
expect(processor.buffer.size).toBe(2);
|
||||
});
|
||||
|
||||
it('should skip empty ip for g5 target', () => {
|
||||
processor = new StatusBatchProcessor(mockManager, {
|
||||
flushInterval: 50000,
|
||||
maxBufferSize: 100,
|
||||
targetName: 'g5:room_status.room_status_moment_g5'
|
||||
});
|
||||
|
||||
processor.add(makeUpdate({ ip: null }));
|
||||
|
||||
expect(processor.buffer.size).toBe(0);
|
||||
expect(mockManager.upsertBatch).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should preserve ip when flushing rows', async () => {
|
||||
processor.add(makeUpdate({ ip: '10.9.8.7' }));
|
||||
|
||||
await processor.flush();
|
||||
|
||||
expect(mockManager.upsertBatch).toHaveBeenCalledTimes(1);
|
||||
const rows = mockManager.upsertBatch.mock.calls[0][0];
|
||||
expect(rows[0].ip).toBe('10.9.8.7');
|
||||
});
|
||||
|
||||
it('should clear buffer after flush', async () => {
|
||||
processor.add(makeUpdate());
|
||||
expect(processor.buffer.size).toBe(1);
|
||||
|
||||
@@ -45,6 +45,27 @@ describe('StatusExtractor', () => {
|
||||
expect(result.device_id).toBe('dev_001');
|
||||
});
|
||||
|
||||
it('should normalize empty ip to null', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
ip: ' ',
|
||||
sys_lock_status: 1
|
||||
});
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result.ip).toBeNull();
|
||||
});
|
||||
|
||||
it('should preserve non-empty ip', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
ip: '10.1.2.3',
|
||||
sys_lock_status: 1
|
||||
});
|
||||
|
||||
expect(result.ip).toBe('10.1.2.3');
|
||||
});
|
||||
|
||||
it('should build dev_loops from device_list with 9-digit padded keys', () => {
|
||||
const result = extractStatusUpdate({
|
||||
...base,
|
||||
|
||||
63
docs/rcu_action_events_g5.sql
Normal file
63
docs/rcu_action_events_g5.sql
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Navicat Premium Dump SQL
|
||||
|
||||
Source Server : FnOS 80
|
||||
Source Server Type : PostgreSQL
|
||||
Source Server Version : 150017 (150017)
|
||||
Source Host : 10.8.8.80:5434
|
||||
Source Catalog : log_platform
|
||||
Source Schema : rcu_action
|
||||
|
||||
Target Server Type : PostgreSQL
|
||||
Target Server Version : 150017 (150017)
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 10/03/2026 16:01:04
|
||||
*/
|
||||
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for rcu_action_events_g5
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "rcu_action"."rcu_action_events_g5";
|
||||
CREATE TABLE "rcu_action"."rcu_action_events_g5" (
|
||||
"guid" int4 NOT NULL DEFAULT nextval('"rcu_action".rcu_action_events_g5_guid_seq1'::regclass),
|
||||
"ts_ms" int8 NOT NULL,
|
||||
"write_ts_ms" int8 NOT NULL,
|
||||
"hotel_id" int4 NOT NULL,
|
||||
"room_id" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"device_id" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"direction" varchar(10) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"cmd_word" varchar(10) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"frame_id" int4 NOT NULL,
|
||||
"udp_raw" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"action_type" varchar(20) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"sys_lock_status" int2,
|
||||
"report_count" int2,
|
||||
"dev_type" int2,
|
||||
"dev_addr" int2,
|
||||
"dev_loop" int4,
|
||||
"dev_data" int4,
|
||||
"fault_count" int2,
|
||||
"error_type" int2,
|
||||
"error_data" int2,
|
||||
"type_l" int2,
|
||||
"type_h" int2,
|
||||
"details" jsonb,
|
||||
"extra" jsonb,
|
||||
"loop_name" varchar(255) COLLATE "pg_catalog"."default"
|
||||
)
|
||||
TABLESPACE "ts_hot"
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table rcu_action_events_g5
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_g5_prod_ts_ms" ON "rcu_action"."rcu_action_events_g5" USING btree (
|
||||
"ts_ms" "pg_catalog"."int8_ops" DESC NULLS FIRST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table rcu_action_events_g5
|
||||
-- ----------------------------
|
||||
ALTER TABLE "rcu_action"."rcu_action_events_g5" ADD CONSTRAINT "rcu_action_events_g5_pkey" PRIMARY KEY ("ts_ms", "guid");
|
||||
88
docs/room_status_moment_g5.sql
Normal file
88
docs/room_status_moment_g5.sql
Normal file
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
Navicat Premium Dump SQL
|
||||
|
||||
Source Server : FnOS 80
|
||||
Source Server Type : PostgreSQL
|
||||
Source Server Version : 150017 (150017)
|
||||
Source Host : 10.8.8.80:5434
|
||||
Source Catalog : log_platform
|
||||
Source Schema : room_status
|
||||
|
||||
Target Server Type : PostgreSQL
|
||||
Target Server Version : 150017 (150017)
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 10/03/2026 10:32:13
|
||||
*/
|
||||
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for room_status_moment_g5
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "room_status"."room_status_moment_g5";
|
||||
CREATE TABLE "room_status"."room_status_moment_g5" (
|
||||
"hotel_id" int2 NOT NULL,
|
||||
"room_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"device_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint,
|
||||
"sys_lock_status" int2,
|
||||
"online_status" int2,
|
||||
"launcher_version" text COLLATE "pg_catalog"."default",
|
||||
"app_version" text COLLATE "pg_catalog"."default",
|
||||
"config_version" text COLLATE "pg_catalog"."default",
|
||||
"register_ts_ms" int8,
|
||||
"upgrade_ts_ms" int8,
|
||||
"config_ts_ms" int8,
|
||||
"ip" text COLLATE "pg_catalog"."default",
|
||||
"pms_status" int2,
|
||||
"power_state" int2,
|
||||
"cardless_state" int2,
|
||||
"service_mask" int8,
|
||||
"insert_card" int2,
|
||||
"bright_g" int2,
|
||||
"agreement_ver" text COLLATE "pg_catalog"."default",
|
||||
"air_address" _text COLLATE "pg_catalog"."default",
|
||||
"air_state" _int2,
|
||||
"air_model" _int2,
|
||||
"air_speed" _int2,
|
||||
"air_set_temp" _int2,
|
||||
"air_now_temp" _int2,
|
||||
"air_solenoid_valve" _int2,
|
||||
"elec_address" _text COLLATE "pg_catalog"."default",
|
||||
"elec_voltage" _float8,
|
||||
"elec_ampere" _float8,
|
||||
"elec_power" _float8,
|
||||
"elec_phase" _float8,
|
||||
"elec_energy" _float8,
|
||||
"elec_sum_energy" _float8,
|
||||
"carbon_state" int2,
|
||||
"dev_loops" jsonb,
|
||||
"energy_carbon_sum" float8,
|
||||
"energy_nocard_sum" float8,
|
||||
"external_device" jsonb DEFAULT '{}'::jsonb,
|
||||
"faulty_device_count" jsonb DEFAULT '{}'::jsonb
|
||||
)
|
||||
WITH (fillfactor=90)
|
||||
TABLESPACE "ts_hot"
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree (
|
||||
"hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||
"online_status" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||
"power_state" "pg_catalog"."int2_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Triggers structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5"
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"();
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");
|
||||
@@ -0,0 +1,140 @@
|
||||
# 配置化数据库初始化 - 实现总结
|
||||
|
||||
**日期**:2026年3月3日
|
||||
**功能**:使数据库初始化和分区维护可配置化
|
||||
|
||||
## 变更内容
|
||||
|
||||
### 1. 新增环境变量
|
||||
- **变量名**:`ENABLE_DATABASE_INITIALIZATION`
|
||||
- **类型**:Boolean
|
||||
- **默认值**:`true`(启用)
|
||||
- **说明**:控制是否在服务启动时执行数据库初始化和分区维护
|
||||
|
||||
### 2. 修改的文件
|
||||
|
||||
#### [src/config/config.js](../bls-rcu-action-backend/src/config/config.js)
|
||||
- 行 69:新增配置项 `enableDatabaseInitialization`
|
||||
- 逻辑:`enableDatabaseInitialization: process.env.ENABLE_DATABASE_INITIALIZATION !== 'false'` (默认 true)
|
||||
|
||||
#### [src/index.js](../bls-rcu-action-backend/src/index.js)
|
||||
- **第 21-28 行**:修改初始化逻辑
|
||||
- 添加条件判断:`if (config.enableDatabaseInitialization)`
|
||||
- 启用时:执行 `dbInitializer.initialize()`
|
||||
- 禁用时:记录日志并跳过
|
||||
|
||||
- **第 36-47 行**:修改分区维护定时任务
|
||||
- 添加条件判断:仅当 `config.enableDatabaseInitialization` 为 true 时,才注册定时任务
|
||||
- 禁用时:记录日志说明定时任务已禁用
|
||||
|
||||
#### [.env](../bls-rcu-action-backend/.env)
|
||||
- 末尾新增配置注释和示例:
|
||||
```
|
||||
# Database Initialization Configuration
|
||||
# Set to 'false' to skip automatic database creation, schema setup, and partition management
|
||||
# When disabled, the service will start consuming Kafka messages and writing to existing database immediately
|
||||
# Default: true (enable database initialization)
|
||||
ENABLE_DATABASE_INITIALIZATION=true
|
||||
```
|
||||
|
||||
#### [.env.example](../bls-rcu-action-backend/.env.example)
|
||||
- 末尾新增配置说明(同上)
|
||||
|
||||
#### [docs/DATABASE_INITIALIZATION_CONFIG.md](../DATABASE_INITIALIZATION_CONFIG.md)(新增)
|
||||
- 详细的使用指南和配置说明
|
||||
- 包含多种使用场景
|
||||
- 故障排查建议
|
||||
|
||||
## 行为变化
|
||||
|
||||
### 启用初始化(`ENABLE_DATABASE_INITIALIZATION=true`)
|
||||
```
|
||||
启动服务
|
||||
↓
|
||||
执行 dbInitializer.initialize()
|
||||
├─ 检查/创建数据库
|
||||
├─ 执行 init_db.sql(创建 schema 和主表)
|
||||
└─ 创建未来 30 天的分区
|
||||
↓
|
||||
注册定时任务:每天 00:00 检查并创建新分区
|
||||
↓
|
||||
初始化项目元数据缓存
|
||||
↓
|
||||
启动 Kafka 消费者
|
||||
↓
|
||||
开始处理消息
|
||||
```
|
||||
|
||||
### 禁用初始化(`ENABLE_DATABASE_INITIALIZATION=false`)
|
||||
```
|
||||
启动服务
|
||||
↓
|
||||
跳过 dbInitializer.initialize()
|
||||
↓
|
||||
不注册分区维护定时任务
|
||||
↓
|
||||
初始化项目元数据缓存
|
||||
↓
|
||||
启动 Kafka 消费者
|
||||
↓
|
||||
开始处理消息(直接)
|
||||
```
|
||||
|
||||
## 向后兼容性
|
||||
|
||||
✅ **完全向后兼容**
|
||||
- 不执行此配置时,默认行为为 `true`(启用初始化)
|
||||
- 现有部署无需修改即可继续运行
|
||||
- 新部署可根据需要设置为 `false` 以跳过初始化
|
||||
|
||||
## 适用场景
|
||||
|
||||
| 场景 | 配置值 | 说明 |
|
||||
|------|--------|------|
|
||||
| 首次部署 | `true` | 自动完成初始化 |
|
||||
| 生产环境(已初始化) | `false` | 跳过初始化,直接消费 |
|
||||
| 多实例部署 | 实例1:`true`<br/>其他:`false` | 一个实例初始化,其他跳过 |
|
||||
| 容器编排(K8s) | `false` | 初始化 Job 单独执行,应用跳过 |
|
||||
|
||||
## 关键改进
|
||||
|
||||
1. **启动速度**:禁用初始化可减少启动时间 5-10 秒(避免分区检查)
|
||||
2. **灵活部署**:支持多实例场景,避免并发创建表
|
||||
3. **生产友好**:可明确控制何时执行 DDL 操作
|
||||
4. **日志清晰**:明确说明初始化状态和定时任务是否启用
|
||||
|
||||
## 测试验证
|
||||
|
||||
### 测试 1:启用初始化(默认)
|
||||
```bash
|
||||
ENABLE_DATABASE_INITIALIZATION=true npm run dev
|
||||
# 预期:看到初始化日志
|
||||
```
|
||||
|
||||
### 测试 2:禁用初始化
|
||||
```bash
|
||||
ENABLE_DATABASE_INITIALIZATION=false npm run dev
|
||||
# 预期:看到"Database initialization is disabled"日志,跳过初始化,直接消费消息
|
||||
```
|
||||
|
||||
### 测试 3:多实例启动
|
||||
```bash
|
||||
# 终端 1
|
||||
ENABLE_DATABASE_INITIALIZATION=true npm run dev
|
||||
|
||||
# 等待初始化完成,然后在另一个终端
|
||||
|
||||
# 终端 2
|
||||
ENABLE_DATABASE_INITIALIZATION=false npm run dev
|
||||
# 预期:第二个实例直接启动消费,无冲突
|
||||
```
|
||||
|
||||
## 依赖无变化
|
||||
- 不需要新增 npm 包
|
||||
- 不需要修改数据库结构
|
||||
- 不需要修改 Kafka 配置
|
||||
|
||||
## 后续可扩展
|
||||
- 可进一步细化:分离"创建表"开关和"分区维护"开关
|
||||
- 可添加"从外部 API 初始化元数据"的选项
|
||||
- 可支持自定义初始化 SQL 脚本路径
|
||||
26
openspec/changes/2026-03-10-g5-database-sync/spec.md
Normal file
26
openspec/changes/2026-03-10-g5-database-sync/spec.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# G5 Database Sync and Room Status Aggregation Logic
|
||||
|
||||
## openspec-proposal
|
||||
- **生态包推荐与选型**:本次需求纯粹为数据库多环境异步双写及聚合同步架构升级。我们保持着**不重复造轮子**的极简最佳实践,未引入新的冗余 npm 依赖。坚持复用项目中现有的优秀驱动包 `pg`(node-postgres)作为核心驱动来应对大量行级的高并发插入与聚合。
|
||||
- **降级与扩展策略**:对现有的 `BatchProcessor` 和 `StatusBatchProcessor` 进行原生构造拆解重写,拓展 `options`(如 `omitGuid`, `dedupeByRoom` 等变量配置),以最小侵入业务的方式分离实例对象,挂载 G5 双写。
|
||||
|
||||
## openspec-apply
|
||||
|
||||
针对 `Node.js` (V8) 和异步并发模型,完成生成和执行如下实施与扩展细则:
|
||||
|
||||
### 1. 异步非阻塞的 Dual-Write 设计方案
|
||||
- **Event Loop 友好型并行**:在执行向 G4 和 G5 环境多库分发数据时,彻底摒弃 `await g4_write; await g5_write` 的顺序堵塞请求堆栈。采用纯异步的 `Promise.all([dbActions])` 把宏任务交由事件循环底层的 I/O 线程池并行发送所有的 `INSERT 事务报文`,将两座异地数据库的返回时延并集。
|
||||
- **配置隔离熔断法**:依托 `.env` 实现对每一向数据库独立挂载的生命周期(如动态通过 `ENABLE_G4_SYNC` 或 `ENABLE_G5_SYNC` 处理)。任一数据库挂掉带来的错误 `PROMISE_REJECTED` 在进程捕获后只会独立封停,不会引发系统阻塞或导致另一健康的数据库断供。
|
||||
|
||||
### 2. G5 字段 `omitGuid` 的剥离控制与瘦身落地
|
||||
- G5 中 `rcu_action_events_g5` 表里的 `guid` 基于 `int4` `nextval`。这与原本生成的 `uuid` 类型存在截然冲突。
|
||||
- **具体落地**:当连接器池检测到 `omitGuid = true`(G5 模式打开),底层的 SQL `INSERT INTO (${columns})` 将会动态清洗掉 `guid` 和所绑定的 `$n` 占位符。把主键控制权彻底返还给 PostgreSQL 内部机制,Node 层专注传输,达到结构和时效的最简合并。
|
||||
|
||||
### 3. Room Status: 房间聚合锁与 Upsert (ON CONFLICT DO UPDATE)
|
||||
- **去重逻辑优化与 `dedupeByRoom`**:针对 `room_status_moment_g5` 中限定 `(hotel_id, room_id)` 是唯一的联合主键特质,如果仍然通过旧结构缓存,会产生频繁覆盖导致的 JSON 散件丢失。本次通过 Node 层注入 `dedupeByRoom` 拦截校验法:缓冲器使用 `${hotel_id}:${room_id}` 的更底层维度合并哈希。同一房间内设备的任何 JSONB 数组比如 `dev_loops` 都先进行底层 `||` 合并及时间对比去重,然后一次性下发大包。
|
||||
- **SQL 更新映射**:由于 `device_id` 不再具有唯一性维度,我们执行基于 PostgreSQL 的合并操作:在冲突 `EXCLUDED` 对象中强制替换最新上报设备 `$device_id = EXCLUDED.device_id`。并且彻底抽去了 Node 更新时间戳的过程,让权给表上的 `AFTER/BEFORE UPDATE` 原生触发器处理 `ts_ms`。符合强数据库型开发的完美最佳实践。
|
||||
|
||||
### 4. 废弃冗余数据占用 (`udp_raw` & `details`)
|
||||
由于上游报文及 JSON 持久化逻辑逐步升级,对于 G4 以及后续加入的 G5 库,在映射中全量废除了直接对 `udp_raw` 回追文本和对 `details` JSONB 的业务数据的多余存储。
|
||||
- **具体拦截处理**:我们在 `processor/index.js` `commonFields` 中将 `udp_raw` 强赋值为 `null`;同理对于所有的 `payload` 解析结果,`details` 也改为了强置空(`null`)。相应的单元测试套件已被更新修改以校验拦截有效性。
|
||||
- **目的**:通过这两项高频臃肿字典的精减裁剪,可极大缩小单行落盘宽表的体积,减轻 PostgreSQL 序列化负担及提升 UPSERT 时延,而数据库中的字段保留以用于结构向后兼容。
|
||||
27
openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md
Normal file
27
openspec/changes/2026-03-18-g5-ip-skip-empty/spec.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# G5 room_status 空 IP 跳过写入
|
||||
|
||||
## 背景
|
||||
Kafka payload 中包含 `ip` 字段,但该字段可能为空。对于 `room_status_moment_g5`,当 `ip` 为空时,应直接跳过该次状态写入,避免更新 `ts_ms`,也避免写入不完整数据。
|
||||
|
||||
## 目标
|
||||
1. G5 `room_status_moment_g5` 写入时补充 `ip` 字段。
|
||||
2. 当 `ip` 为空或空白时,G5 直接跳过这条状态,不进入缓冲、不触发 upsert、不更新 `ts_ms`。
|
||||
3. G4 逻辑保持现状,不受 G5 空 IP 策略影响。
|
||||
4. 保持 `ip` 作为有效值时的正常 upsert 与字段合并。
|
||||
|
||||
## 变更范围
|
||||
- `src/processor/statusExtractor.js`
|
||||
- 规范化 `ip`,将空字符串/空白字符串视为 `null`
|
||||
- `src/db/statusBatchProcessor.js`
|
||||
- G5 目标在 `ip` 为空时直接跳过
|
||||
- `src/db/roomStatusManager.js`
|
||||
- 在 upsert 中增加 `ip` 写入与更新逻辑
|
||||
- `tests/status_extractor.test.js`
|
||||
- 增加 `ip` 归一化测试
|
||||
- `tests/status_batch_processor.test.js`
|
||||
- 增加 G5 空 `ip` 直接跳过测试
|
||||
|
||||
## 验收标准
|
||||
1. G5 表 `room_status_moment_g5` 的记录可写入 `ip` 字段。
|
||||
2. `ip` 为空的 G5 状态不产生任何数据库写入。
|
||||
3. 现有测试通过。
|
||||
16
openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md
Normal file
16
openspec/changes/2026-03-18-g5-ip-skip-empty/summary.md
Normal file
@@ -0,0 +1,16 @@
|
||||
# 2026-03-18 G5 空 IP 跳过写入修正
|
||||
|
||||
## 结果
|
||||
已修正 room_status_moment_g5 的写入规则:
|
||||
- 现在会写入 `ip` 字段
|
||||
- `ip` 为空或空白时,G5 状态直接跳过,不更新 `ts_ms`
|
||||
- G4 逻辑不受影响
|
||||
|
||||
## 关键实现
|
||||
- `statusExtractor` 规范化 `ip`
|
||||
- `statusBatchProcessor` 对 G5 空 `ip` 直接丢弃
|
||||
- `roomStatusManager` 的 upsert 增加 `ip` 列
|
||||
|
||||
## 验证
|
||||
- 新增测试覆盖 `ip` 归一化与 G5 空 `ip` 跳过
|
||||
- 现有测试应保持通过
|
||||
@@ -0,0 +1,15 @@
|
||||
# Proposal: Externalize DB Provisioning
|
||||
|
||||
## Why
|
||||
- 降低主服务复杂度与运行时 DDL 风险。
|
||||
- 避免在高并发消费服务中执行建库/建分区。
|
||||
- 便于平台化调度(独立任务/外部程序调用)。
|
||||
|
||||
## What
|
||||
- 删除主服务中的初始化与分区创建能力。
|
||||
- 将建库相关 SQL 与 JS 工具集中到根目录 `SQL_Script`。
|
||||
- 提供统一 npm scripts 入口供外部调用。
|
||||
|
||||
## Non-Goals
|
||||
- 不改动 Kafka 解析与业务写库模型。
|
||||
- 不引入旧开关兼容。
|
||||
@@ -0,0 +1,64 @@
|
||||
# Externalize Database Provisioning to SQL_Script
|
||||
|
||||
## 1. 背景
|
||||
当前服务进程中仍包含数据库初始化与分区创建相关职责。为降低主服务复杂度、避免运行时 DDL 风险,并支持由外部程序统一调度,现将“建库/建表/建分区”能力完全外置到根目录 `SQL_Script`。
|
||||
|
||||
## 2. 目标
|
||||
1. 主服务 `bls-rcu-action-backend` 启动后仅执行:拉取 Kafka -> 解析 -> 写库。
|
||||
2. 删除所有运行时建库、建表、建分区与对应定时任务逻辑。
|
||||
3. 不保留旧兼容开关(如 `ENABLE_DATABASE_INITIALIZATION`)。
|
||||
4. 在根目录 `SQL_Script` 提供可复用的 SQL 与 JS 调用入口,供其他程序调用。
|
||||
|
||||
## 3. 变更范围
|
||||
### 3.1 业务服务(剥离)
|
||||
- 删除文件:
|
||||
- `bls-rcu-action-backend/src/db/initializer.js`
|
||||
- `bls-rcu-action-backend/src/db/partitionManager.js`
|
||||
- 修改文件:
|
||||
- `bls-rcu-action-backend/src/index.js`
|
||||
- 移除初始化与定时建分区调用
|
||||
- `bls-rcu-action-backend/src/config/config.js`
|
||||
- 移除 `enableDatabaseInitialization`
|
||||
- `bls-rcu-action-backend/src/db/roomStatusManager.js`
|
||||
- 移除自动建分区逻辑(仅保留 upsert)
|
||||
- `bls-rcu-action-backend/.env`
|
||||
- `bls-rcu-action-backend/.env.example`
|
||||
- 移除 `ENABLE_DATABASE_INITIALIZATION`
|
||||
- 删除旧 SQL:
|
||||
- `bls-rcu-action-backend/scripts/init_db.sql`
|
||||
|
||||
### 3.2 外部脚本(新增)
|
||||
新增根目录 `SQL_Script/`:
|
||||
- `init_rcu_action.sql`
|
||||
- `init_room_status.sql`
|
||||
- `partition_rcu_action.sql`
|
||||
- `partition_room_status.sql`
|
||||
- `db_manager.js`(CLI + import)
|
||||
- `README.md`
|
||||
|
||||
### 3.3 npm scripts(入口)
|
||||
在 `bls-rcu-action-backend/package.json` 新增:
|
||||
- `db:init:all`
|
||||
- `db:init:rcu-action`
|
||||
- `db:init:room-status`
|
||||
- `db:partition:rcu-action`
|
||||
- `db:partition:room-status`
|
||||
|
||||
## 4. 设计约束
|
||||
1. 主服务代码中不得出现 `CREATE TABLE` / `CREATE SCHEMA` / `CREATE INDEX` / 分区创建语句。
|
||||
2. 主服务中不得出现初始化器和分区管理器调用路径。
|
||||
3. 建库能力仅存在于根目录 `SQL_Script`。
|
||||
|
||||
## 5. 验收标准
|
||||
1. `npm run test` 全通过。
|
||||
2. `npm run build` 通过,且 `dist/index.js` 不包含以下关键词:
|
||||
- `dbInitializer`
|
||||
- `partitionManager`
|
||||
- `ENABLE_DATABASE_INITIALIZATION`
|
||||
- `ensurePartitions`
|
||||
3. 通过 `npm run db:init:all` 可以执行外部初始化流程。
|
||||
4. 通过 `npm run db:partition:rcu-action`、`npm run db:partition:room-status -- <hotelId>` 可执行外部分区流程。
|
||||
|
||||
## 6. 迁移说明
|
||||
- 旧部署流程若依赖服务启动自动建库,需改为先执行 `SQL_Script` 对应命令,再启动主服务。
|
||||
- 该变更为有意“去兼容”升级:删除旧开关与旧路径,避免双路径维护。
|
||||
@@ -0,0 +1,37 @@
|
||||
# 2026-03-04 外置建库能力改造总结
|
||||
|
||||
## 概述
|
||||
已将建库(初始化 + 分区创建)能力从主服务中完全剥离,迁移至根目录 `SQL_Script`,并提供可供其他程序调用的 JS/SQL 入口。
|
||||
|
||||
## 实施结果
|
||||
|
||||
### 已删除(主服务内)
|
||||
- `src/db/initializer.js`
|
||||
- `src/db/partitionManager.js`
|
||||
- 运行时初始化、定时分区维护逻辑
|
||||
- 配置项:`ENABLE_DATABASE_INITIALIZATION`
|
||||
- 旧 SQL:`bls-rcu-action-backend/scripts/init_db.sql`
|
||||
|
||||
### 已新增(根目录)
|
||||
- `SQL_Script/init_rcu_action.sql`
|
||||
- `SQL_Script/init_room_status.sql`
|
||||
- `SQL_Script/partition_rcu_action.sql`
|
||||
- `SQL_Script/partition_room_status.sql`
|
||||
- `SQL_Script/db_manager.js`
|
||||
- `SQL_Script/README.md`
|
||||
|
||||
### 已新增 npm 入口
|
||||
- `db:init:all`
|
||||
- `db:init:rcu-action`
|
||||
- `db:init:room-status`
|
||||
- `db:partition:rcu-action`
|
||||
- `db:partition:room-status`
|
||||
|
||||
## 验证
|
||||
- `npm run test`:通过(45/45)
|
||||
- `npm run build`:通过
|
||||
- 构建产物不再包含初始化/分区管理逻辑
|
||||
|
||||
## 影响
|
||||
- 主服务职责更单一:只处理 Kafka 消费与写库。
|
||||
- DDL 改为外部可控,适合由调度系统或独立服务统一执行。
|
||||
Reference in New Issue
Block a user