Compare commits
10 Commits
c6d7cea8cf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 38f1421fff | |||
| 7713cfeb9e | |||
| fa363835a3 | |||
| 381080fee0 | |||
| 1329eca99e | |||
| 156930e6bc | |||
| 33c9bf0e07 | |||
| 3d80ad8710 | |||
| 4492a9c47e | |||
| ba61a540da |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
/bls-onoffline-backend/node_modules
|
/bls-onoffline-backend/node_modules
|
||||||
/template
|
/template
|
||||||
|
/bls-onoffline-backend/dist
|
||||||
|
|||||||
6
SQL_Script/create_database.sql
Normal file
6
SQL_Script/create_database.sql
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
-- Replace {{DATABASE_NAME}} before execution
|
||||||
|
-- Requires psql (uses \gexec)
|
||||||
|
SELECT format('CREATE DATABASE %I', '{{DATABASE_NAME}}')
|
||||||
|
WHERE NOT EXISTS (
|
||||||
|
SELECT 1 FROM pg_database WHERE datname = '{{DATABASE_NAME}}'
|
||||||
|
)\gexec;
|
||||||
12
SQL_Script/create_partition_for_day.sql
Normal file
12
SQL_Script/create_partition_for_day.sql
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
-- Replace placeholders before execution:
|
||||||
|
-- {{SCHEMA_NAME}} default: onoffline
|
||||||
|
-- {{TABLE_NAME}} default: onoffline_record
|
||||||
|
-- {{PARTITION_SUFFIX}} format: YYYYMMDD (example: 20260304)
|
||||||
|
-- {{START_TS_MS}} unix ms at 00:00:00.000
|
||||||
|
-- {{END_TS_MS}} unix ms at next day 00:00:00.000
|
||||||
|
-- {{TABLESPACE_CLAUSE}} either empty string or: TABLESPACE ts_hot
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}}_{{PARTITION_SUFFIX}}
|
||||||
|
PARTITION OF {{SCHEMA_NAME}}.{{TABLE_NAME}}
|
||||||
|
FOR VALUES FROM ({{START_TS_MS}}) TO ({{END_TS_MS}})
|
||||||
|
{{TABLESPACE_CLAUSE}};
|
||||||
27
SQL_Script/create_schema_and_parent_table.sql
Normal file
27
SQL_Script/create_schema_and_parent_table.sql
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
-- Replace placeholders before execution:
|
||||||
|
-- {{SCHEMA_NAME}} default: onoffline
|
||||||
|
-- {{TABLE_NAME}} default: onoffline_record
|
||||||
|
|
||||||
|
CREATE SCHEMA IF NOT EXISTS {{SCHEMA_NAME}};
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}} (
|
||||||
|
guid TEXT NOT NULL,
|
||||||
|
ts_ms BIGINT NOT NULL,
|
||||||
|
write_ts_ms BIGINT NOT NULL,
|
||||||
|
hotel_id SMALLINT,
|
||||||
|
mac TEXT,
|
||||||
|
device_id TEXT,
|
||||||
|
room_id TEXT,
|
||||||
|
ip TEXT,
|
||||||
|
current_status TEXT,
|
||||||
|
launcher_version TEXT,
|
||||||
|
reboot_reason TEXT,
|
||||||
|
PRIMARY KEY (ts_ms, guid)
|
||||||
|
) PARTITION BY RANGE (ts_ms);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_guid ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (guid);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_device_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (device_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_hotel_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (hotel_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_mac ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (mac);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_room_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (room_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_status ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (current_status);
|
||||||
26
SQL_Script/generate_init_sql.js
Normal file
26
SQL_Script/generate_init_sql.js
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
|
const args = Object.fromEntries(
|
||||||
|
process.argv.slice(2).map((arg) => {
|
||||||
|
const [key, value] = arg.split('=');
|
||||||
|
return [key.replace(/^--/, ''), value];
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
const database = args.database || 'log_platform';
|
||||||
|
const schema = args.schema || 'onoffline';
|
||||||
|
const table = args.table || 'onoffline_record';
|
||||||
|
const output = args.output;
|
||||||
|
|
||||||
|
const sql = `SELECT format('CREATE DATABASE %I', '${database}')\nWHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${database}')\\gexec;\n\nCREATE SCHEMA IF NOT EXISTS ${schema};\n\nCREATE TABLE IF NOT EXISTS ${schema}.${table} (\n guid TEXT NOT NULL,\n ts_ms BIGINT NOT NULL,\n write_ts_ms BIGINT NOT NULL,\n hotel_id SMALLINT,\n mac TEXT,\n device_id TEXT,\n room_id TEXT,\n ip TEXT,\n current_status TEXT,\n launcher_version TEXT,\n reboot_reason TEXT,\n PRIMARY KEY (ts_ms, guid)\n) PARTITION BY RANGE (ts_ms);\n\nCREATE INDEX IF NOT EXISTS idx_${table}_guid ON ${schema}.${table} (guid);\nCREATE INDEX IF NOT EXISTS idx_${table}_device_id ON ${schema}.${table} (device_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_hotel_id ON ${schema}.${table} (hotel_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_mac ON ${schema}.${table} (mac);\nCREATE INDEX IF NOT EXISTS idx_${table}_room_id ON ${schema}.${table} (room_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_status ON ${schema}.${table} (current_status);\n`;
|
||||||
|
|
||||||
|
if (output) {
|
||||||
|
const outputPath = path.resolve(output);
|
||||||
|
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||||
|
console.log(`Init SQL written to ${outputPath}`);
|
||||||
|
} else {
|
||||||
|
process.stdout.write(sql);
|
||||||
|
}
|
||||||
64
SQL_Script/generate_partition_range_sql.js
Normal file
64
SQL_Script/generate_partition_range_sql.js
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
|
const args = Object.fromEntries(
|
||||||
|
process.argv.slice(2).map((arg) => {
|
||||||
|
const [key, value] = arg.split('=');
|
||||||
|
return [key.replace(/^--/, ''), value];
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
const schema = args.schema || 'onoffline';
|
||||||
|
const table = args.table || 'onoffline_record';
|
||||||
|
const start = args.start;
|
||||||
|
const days = Number(args.days || '30');
|
||||||
|
const tablespace = args.tablespace || '';
|
||||||
|
const output = args.output;
|
||||||
|
|
||||||
|
if (!start) {
|
||||||
|
console.error('Missing required argument: --start=YYYY-MM-DD');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
if (!Number.isFinite(days) || days <= 0) {
|
||||||
|
console.error('Invalid --days value. It must be a positive integer.');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const base = new Date(`${start}T00:00:00`);
|
||||||
|
if (Number.isNaN(base.getTime())) {
|
||||||
|
console.error('Invalid start date. Use format YYYY-MM-DD');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const statements = [];
|
||||||
|
for (let i = 0; i < days; i += 1) {
|
||||||
|
const date = new Date(base);
|
||||||
|
date.setDate(base.getDate() + i);
|
||||||
|
|
||||||
|
const startMs = date.getTime();
|
||||||
|
const endDate = new Date(date);
|
||||||
|
endDate.setDate(endDate.getDate() + 1);
|
||||||
|
const endMs = endDate.getTime();
|
||||||
|
|
||||||
|
const yyyy = date.getFullYear();
|
||||||
|
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||||
|
const dd = String(date.getDate()).padStart(2, '0');
|
||||||
|
const suffix = `${yyyy}${mm}${dd}`;
|
||||||
|
|
||||||
|
const tablespaceClause = tablespace ? ` TABLESPACE ${tablespace}` : '';
|
||||||
|
statements.push(
|
||||||
|
`CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const sql = `${statements.join('\n\n')}\n`;
|
||||||
|
|
||||||
|
if (output) {
|
||||||
|
const outputPath = path.resolve(output);
|
||||||
|
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||||
|
console.log(`Partition range SQL written to ${outputPath}`);
|
||||||
|
} else {
|
||||||
|
process.stdout.write(sql);
|
||||||
|
}
|
||||||
49
SQL_Script/generate_partition_sql.js
Normal file
49
SQL_Script/generate_partition_sql.js
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
#!/usr/bin/env node
|
||||||
|
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
|
const args = Object.fromEntries(
|
||||||
|
process.argv.slice(2).map((arg) => {
|
||||||
|
const [key, value] = arg.split('=');
|
||||||
|
return [key.replace(/^--/, ''), value];
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
const schema = args.schema || 'onoffline';
|
||||||
|
const table = args.table || 'onoffline_record';
|
||||||
|
const dateInput = args.date;
|
||||||
|
const tablespace = args.tablespace || '';
|
||||||
|
const output = args.output;
|
||||||
|
|
||||||
|
if (!dateInput) {
|
||||||
|
console.error('Missing required argument: --date=YYYY-MM-DD');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const date = new Date(`${dateInput}T00:00:00`);
|
||||||
|
if (Number.isNaN(date.getTime())) {
|
||||||
|
console.error('Invalid date. Use format YYYY-MM-DD');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const startMs = date.getTime();
|
||||||
|
const endDate = new Date(date);
|
||||||
|
endDate.setDate(endDate.getDate() + 1);
|
||||||
|
const endMs = endDate.getTime();
|
||||||
|
|
||||||
|
const yyyy = date.getFullYear();
|
||||||
|
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||||
|
const dd = String(date.getDate()).padStart(2, '0');
|
||||||
|
const suffix = `${yyyy}${mm}${dd}`;
|
||||||
|
const tablespaceClause = tablespace ? `TABLESPACE ${tablespace}` : '';
|
||||||
|
|
||||||
|
const sql = `CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})\n${tablespaceClause};\n`;
|
||||||
|
|
||||||
|
if (output) {
|
||||||
|
const outputPath = path.resolve(output);
|
||||||
|
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||||
|
console.log(`Partition SQL written to ${outputPath}`);
|
||||||
|
} else {
|
||||||
|
process.stdout.write(sql);
|
||||||
|
}
|
||||||
@@ -29,6 +29,16 @@ POSTGRES_IDLE_TIMEOUT_MS=30000
|
|||||||
DB_SCHEMA=onoffline
|
DB_SCHEMA=onoffline
|
||||||
DB_TABLE=onoffline_record
|
DB_TABLE=onoffline_record
|
||||||
|
|
||||||
|
# =========================
|
||||||
|
# PostgreSQL 配置 G5库专用
|
||||||
|
# =========================
|
||||||
|
POSTGRES_HOST_G5=10.8.8.80
|
||||||
|
POSTGRES_PORT_G5=5434
|
||||||
|
POSTGRES_DATABASE_G5=log_platform
|
||||||
|
POSTGRES_USER_G5=log_admin
|
||||||
|
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
|
||||||
|
POSTGRES_IDLE_TIMEOUT_MS_G5=30000
|
||||||
|
|
||||||
PORT=3001
|
PORT=3001
|
||||||
LOG_LEVEL=info
|
LOG_LEVEL=info
|
||||||
|
|
||||||
@@ -39,3 +49,11 @@ REDIS_PASSWORD=
|
|||||||
REDIS_DB=15
|
REDIS_DB=15
|
||||||
REDIS_CONNECT_TIMEOUT_MS=5000
|
REDIS_CONNECT_TIMEOUT_MS=5000
|
||||||
REDIS_PROJECT_NAME=bls-onoffline
|
REDIS_PROJECT_NAME=bls-onoffline
|
||||||
|
|
||||||
|
# DB write switches
|
||||||
|
DB_WRITE_ENABLED=true
|
||||||
|
G5_DB_WRITE_ENABLED=true
|
||||||
|
|
||||||
|
# G5 room_status.moment sync
|
||||||
|
# false: do not write room_status.room_status_moment_g5
|
||||||
|
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false
|
||||||
|
|||||||
@@ -29,3 +29,7 @@ REDIS_PASSWORD=
|
|||||||
REDIS_DB=0
|
REDIS_DB=0
|
||||||
REDIS_PROJECT_NAME=bls-onoffline
|
REDIS_PROJECT_NAME=bls-onoffline
|
||||||
REDIS_API_BASE_URL=http://localhost:3001
|
REDIS_API_BASE_URL=http://localhost:3001
|
||||||
|
|
||||||
|
# G5 room_status.moment sync
|
||||||
|
# false: do not write room_status.room_status_moment_g5
|
||||||
|
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false
|
||||||
|
|||||||
@@ -18,7 +18,13 @@ bls-onoffline-backend
|
|||||||
- 复制 .env.example 为 .env 并按实际环境配置
|
- 复制 .env.example 为 .env 并按实际环境配置
|
||||||
|
|
||||||
数据库初始化
|
数据库初始化
|
||||||
- 启动时自动执行 scripts/init_db.sql 并预创建未来 30 天分区
|
- 运行服务前请先通过根目录 SQL_Script 下脚本完成建库与分区维护
|
||||||
|
- `../SQL_Script/create_database.sql`:建库(psql)
|
||||||
|
- `../SQL_Script/create_schema_and_parent_table.sql`:建 schema 与主分区表
|
||||||
|
- `../SQL_Script/create_partition_for_day.sql`:按日建分区模板
|
||||||
|
- `../SQL_Script/generate_init_sql.js`:生成建库+建表 SQL
|
||||||
|
- `../SQL_Script/generate_partition_sql.js`:生成单日分区 SQL
|
||||||
|
- `../SQL_Script/generate_partition_range_sql.js`:生成批量分区 SQL
|
||||||
|
|
||||||
规范说明
|
规范说明
|
||||||
- 规格文件位于 spec/onoffline-spec.md
|
- 规格文件位于 spec/onoffline-spec.md
|
||||||
|
|||||||
1156
bls-onoffline-backend/dist/index.js
vendored
1156
bls-onoffline-backend/dist/index.js
vendored
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,13 @@
|
|||||||
|
# Change: add g5 room status ip upsert
|
||||||
|
|
||||||
|
## Why
|
||||||
|
当前双写仅写入 onoffline_record_g5,未同步更新 G5 状态表 `room_status.room_status_moment_g5` 的 `ip` 字段,导致状态表与最新上报数据存在延迟。
|
||||||
|
|
||||||
|
## What Changes
|
||||||
|
- 新增 G5 状态表同步机制:按 `hotel_id + room_id` 查找目标行,并使用 `ON CONFLICT DO UPDATE` 更新 `ip`。
|
||||||
|
- 同步时仅处理查找到的第一条匹配行(`LIMIT 1`)。
|
||||||
|
- 无论 `ip` 是否变化都执行 upsert 更新,以触发数据库更新时间相关触发器。
|
||||||
|
|
||||||
|
## Impact
|
||||||
|
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||||
|
- Affected code: `src/db/g5DatabaseManager.js`, `src/index.js`, `tests/g5DatabaseManager.test.js`
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
## ADDED Requirements
|
||||||
|
|
||||||
|
### Requirement: G5 状态表 IP 同步
|
||||||
|
系统 SHALL 在处理并写入数据时,同步将对应设备的 `ip` 更新到 G5 状态表 `room_status.room_status_moment_g5`。
|
||||||
|
|
||||||
|
#### Scenario: 按唯一键同步 IP
|
||||||
|
- **GIVEN** 当前处理行包含 `hotel_id` 与 `room_id`
|
||||||
|
- **WHEN** 执行 G5 状态表同步
|
||||||
|
- **THEN** 系统按 `hotel_id + room_id` 查找状态表记录,并仅对查找到的第一条记录执行写入
|
||||||
|
|
||||||
|
#### Scenario: 使用 upsert 触发更新
|
||||||
|
- **GIVEN** 状态表已存在同键记录
|
||||||
|
- **WHEN** 执行写入
|
||||||
|
- **THEN** 系统使用 `ON CONFLICT (hotel_id, room_id) DO UPDATE` 更新 `ip`
|
||||||
|
|
||||||
|
#### Scenario: IP 不变仍触发更新
|
||||||
|
- **GIVEN** 新 `ip` 与库内 `ip` 相同
|
||||||
|
- **WHEN** 执行状态表同步
|
||||||
|
- **THEN** 系统仍执行更新语句,以触发表上的更新时间相关触发器
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
## 1. Implementation
|
||||||
|
- [x] 1.1 Add room status IP upsert method in G5 DB manager.
|
||||||
|
- [x] 1.2 Trigger room status IP upsert in the main write path.
|
||||||
|
- [x] 1.3 Add tests for status mapping and room status dedupe behavior.
|
||||||
|
- [x] 1.4 Add OpenSpec delta for the new synchronization requirement.
|
||||||
|
|
||||||
|
## 2. Validation
|
||||||
|
- [x] 2.1 Run `npm run test`.
|
||||||
|
- [x] 2.2 Run `npm run build`.
|
||||||
|
- [x] 2.3 Run `openspec validate add-g5-room-status-ip-upsert --strict`.
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
# Change: remove runtime db provisioning
|
||||||
|
|
||||||
|
## Why
|
||||||
|
当前服务在运行时承担了建库、建表和分区维护职责,导致服务职责边界不清晰,也会引入启动阶段 DDL 风险。现已将该能力剥离到根目录 `SQL_Script/`,需要通过 OpenSpec 正式记录为规范变更。
|
||||||
|
|
||||||
|
## What Changes
|
||||||
|
- 移除服务启动阶段的数据库初始化与定时分区维护要求。
|
||||||
|
- 移除服务在写入失败时自动创建缺失分区的要求。
|
||||||
|
- 明确数据库结构与分区维护由外部脚本(`SQL_Script/`)负责。
|
||||||
|
- 保留服务的核心职责:Kafka 消费、解析、写库、重试与监控。
|
||||||
|
|
||||||
|
## Impact
|
||||||
|
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||||
|
- Affected code: `src/index.js`, `src/config/config.js`, `src/db/initializer.js`, `src/db/partitionManager.js`, `scripts/init_db.sql`, `scripts/verify_partitions.js`, `../SQL_Script/*`
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
## MODIFIED Requirements
|
||||||
|
|
||||||
|
### Requirement: 数据库分区策略
|
||||||
|
系统 SHALL 使用 Range Partitioning 按天分区;运行服务本身 SHALL NOT 执行建库、建表、分区创建或定时分区维护。
|
||||||
|
|
||||||
|
#### Scenario: 服务启动不执行 DDL
|
||||||
|
- **GIVEN** 服务进程启动
|
||||||
|
- **WHEN** 进入 bootstrap 过程
|
||||||
|
- **THEN** 仅初始化消费、处理、监控相关能力,不执行数据库创建、表结构初始化与分区创建
|
||||||
|
|
||||||
|
#### Scenario: 分区由外部脚本维护
|
||||||
|
- **GIVEN** 需要创建数据库对象或新增未来分区
|
||||||
|
- **WHEN** 执行外部 SQL/JS 工具
|
||||||
|
- **THEN** 通过根目录 `SQL_Script/` 完成建库和分区维护,而不是由服务运行时自动执行
|
||||||
|
|
||||||
|
### Requirement: 批量消费与写入
|
||||||
|
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。
|
||||||
|
|
||||||
|
#### Scenario: 批量写入
|
||||||
|
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
|
||||||
|
- **WHEN** 缓冲区满或超时 (e.g., 200ms)
|
||||||
|
- **THEN** 执行一次批量数据库插入操作
|
||||||
|
|
||||||
|
#### Scenario: 写入失败降级
|
||||||
|
- **GIVEN** 批量写入因数据错误失败 (非连接错误)
|
||||||
|
- **WHEN** 捕获异常
|
||||||
|
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
|
||||||
|
|
||||||
|
#### Scenario: 分区缺失错误处理
|
||||||
|
- **GIVEN** 写入时数据库返回分区缺失错误
|
||||||
|
- **WHEN** 服务处理该错误
|
||||||
|
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
## 1. Implementation
|
||||||
|
- [x] 1.1 Remove runtime DB initialization from bootstrap flow (`src/index.js`).
|
||||||
|
- [x] 1.2 Remove scheduled partition maintenance job from runtime service.
|
||||||
|
- [x] 1.3 Remove runtime missing-partition auto-fix behavior.
|
||||||
|
- [x] 1.4 Remove legacy DB provisioning modules and scripts from service project.
|
||||||
|
- [x] 1.5 Add external SQL/JS provisioning scripts under root `SQL_Script/` for DB/schema/partition management.
|
||||||
|
- [x] 1.6 Update project docs to point DB provisioning to `SQL_Script/`.
|
||||||
|
|
||||||
|
## 2. Validation
|
||||||
|
- [x] 2.1 Run `npm run lint` in `bls-onoffline-backend`.
|
||||||
|
- [x] 2.2 Run `npm run build` in `bls-onoffline-backend`.
|
||||||
|
- [x] 2.3 Run `openspec validate remove-runtime-db-provisioning --strict`.
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
# Change: add g5 room status sync toggle
|
||||||
|
|
||||||
|
## Why
|
||||||
|
当前 G5 `room_status.room_status_moment_g5` 同步是默认启用的,但新的运行要求需要默认关闭,避免未经确认就写入状态表。
|
||||||
|
|
||||||
|
## What Changes
|
||||||
|
- 新增一个环境变量开关,控制是否同步写入 G5 `room_status.room_status_moment_g5`。
|
||||||
|
- 默认值为 `false`,即不写入该表。
|
||||||
|
- 当开关开启时,保留现有按 `hotel_id + room_id` upsert 更新 `ip` 的行为。
|
||||||
|
|
||||||
|
## Impact
|
||||||
|
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||||
|
- Affected code: `src/config/config.js`, `src/index.js`, `.env`, `.env.example`
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
## ADDED Requirements
|
||||||
|
|
||||||
|
### Requirement: G5 状态表同步开关
|
||||||
|
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`。
|
||||||
|
|
||||||
|
#### Scenario: 默认关闭
|
||||||
|
- **GIVEN** 未设置 G5 状态表同步开关
|
||||||
|
- **WHEN** 服务启动并处理 Kafka 消息
|
||||||
|
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
|
||||||
|
|
||||||
|
#### Scenario: 显式开启
|
||||||
|
- **GIVEN** G5 状态表同步开关被设置为 `true`
|
||||||
|
- **WHEN** 服务处理 Kafka 消息
|
||||||
|
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
## 1. Implementation
|
||||||
|
- [x] 1.1 Add a config flag for G5 room_status synchronization.
|
||||||
|
- [x] 1.2 Disable `room_status.room_status_moment_g5` writes by default.
|
||||||
|
- [x] 1.3 Keep the existing upsert behavior when the flag is enabled.
|
||||||
|
- [x] 1.4 Update environment samples to set the flag to `false`.
|
||||||
|
|
||||||
|
## 2. Validation
|
||||||
|
- [x] 2.1 Run `npm run lint`.
|
||||||
|
- [x] 2.2 Run `npm run test`.
|
||||||
|
- [x] 2.3 Run `npm run build`.
|
||||||
|
- [x] 2.4 Run `openspec validate add-g5-room-status-sync-toggle --strict`.
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
# Change: add restart current_status mapping
|
||||||
|
|
||||||
|
## Why
|
||||||
|
上游 Kafka 的 `CurrentStatus` 新增了 `restart` 值。现有处理逻辑仍按 `on/off` 处理,导致 `restart` 在入库时无法被正确标记为 3。
|
||||||
|
|
||||||
|
## What Changes
|
||||||
|
- 让 Kafka 解析链路接受 `CurrentStatus=restart`。
|
||||||
|
- 将 G5 入库链路中的 `current_status=restart` 映射为 `3`。
|
||||||
|
- 更新相关测试与 OpenSpec 说明,确保 `restart` 是受支持状态。
|
||||||
|
|
||||||
|
## Impact
|
||||||
|
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||||
|
- Affected code: `src/processor/index.js`, `src/db/g5DatabaseManager.js`, `tests/processor.test.js`
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
## MODIFIED Requirements
|
||||||
|
|
||||||
|
### Requirement: 重启数据处理
|
||||||
|
系统 SHALL 在 `CurrentStatus` 为 `restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`。
|
||||||
|
|
||||||
|
#### Scenario: restart 状态写入
|
||||||
|
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `restart`
|
||||||
|
- **WHEN** 消息被处理并写入数据库
|
||||||
|
- **THEN** 普通入库链路保留 `restart`,G5 入库链路将其写入为 `3`
|
||||||
|
|
||||||
|
#### Scenario: 其他状态保持原样
|
||||||
|
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `on` 或 `off`
|
||||||
|
- **WHEN** 消息被处理并写入数据库
|
||||||
|
- **THEN** 系统按既有规则处理该状态值
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
## 1. Implementation
|
||||||
|
- [x] 1.1 Update Kafka row building logic to preserve `restart` as a valid `current_status` value.
|
||||||
|
- [x] 1.2 Update G5 database mapping so `restart` maps to `3`.
|
||||||
|
- [x] 1.3 Update processor tests for the `restart` case.
|
||||||
|
- [x] 1.4 Update OpenSpec requirements for supported current status values.
|
||||||
|
|
||||||
|
## 2. Validation
|
||||||
|
- [x] 2.1 Run `npm run test`.
|
||||||
|
- [x] 2.2 Run `npm run build`.
|
||||||
|
- [x] 2.3 Run `openspec validate add-restart-current-status-mapping --strict`.
|
||||||
@@ -12,12 +12,17 @@
|
|||||||
- **THEN** current_status 等于 CurrentStatus (截断至 255 字符)
|
- **THEN** current_status 等于 CurrentStatus (截断至 255 字符)
|
||||||
|
|
||||||
### Requirement: 重启数据处理
|
### Requirement: 重启数据处理
|
||||||
系统 SHALL 在 RebootReason 非空时强制 current_status 为 on。
|
系统 SHALL 在 `CurrentStatus` 为 `restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`。
|
||||||
|
|
||||||
#### Scenario: 重启数据写入
|
#### Scenario: restart 状态写入
|
||||||
- **GIVEN** RebootReason 为非空值
|
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `restart`
|
||||||
- **WHEN** 消息被处理
|
- **WHEN** 消息被处理并写入数据库
|
||||||
- **THEN** current_status 等于 on
|
- **THEN** 普通入库链路保留 `restart`,G5 入库链路将其写入为 `3`
|
||||||
|
|
||||||
|
#### Scenario: 其他状态保持原样
|
||||||
|
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `on` 或 `off`
|
||||||
|
- **WHEN** 消息被处理并写入数据库
|
||||||
|
- **THEN** 系统按既有规则处理该状态值
|
||||||
|
|
||||||
### Requirement: 空值保留
|
### Requirement: 空值保留
|
||||||
系统 SHALL 保留上游空值,不对字段进行补 0。
|
系统 SHALL 保留上游空值,不对字段进行补 0。
|
||||||
@@ -28,12 +33,12 @@
|
|||||||
- **THEN** 数据库存储值为对应的空字符串
|
- **THEN** 数据库存储值为对应的空字符串
|
||||||
|
|
||||||
### Requirement: 数据库分区策略
|
### Requirement: 数据库分区策略
|
||||||
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表。
|
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引。
|
||||||
|
|
||||||
#### Scenario: 分区预创建
|
#### Scenario: 分区预创建
|
||||||
- **GIVEN** 系统启动或每日凌晨
|
- **GIVEN** 系统启动或每日凌晨
|
||||||
- **WHEN** 运行分区维护任务
|
- **WHEN** 运行分区维护任务
|
||||||
- **THEN** 确保数据库中存在未来 30 天的分区表
|
- **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引
|
||||||
|
|
||||||
### Requirement: 消费可靠性 (At-Least-Once)
|
### Requirement: 消费可靠性 (At-Least-Once)
|
||||||
系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。
|
系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。
|
||||||
@@ -84,7 +89,7 @@
|
|||||||
- **THEN** 自动乘以 1000 转换为毫秒
|
- **THEN** 自动乘以 1000 转换为毫秒
|
||||||
|
|
||||||
### Requirement: 批量消费与写入
|
### Requirement: 批量消费与写入
|
||||||
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。
|
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。
|
||||||
|
|
||||||
#### Scenario: 批量写入
|
#### Scenario: 批量写入
|
||||||
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
|
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
|
||||||
@@ -96,3 +101,21 @@
|
|||||||
- **WHEN** 捕获异常
|
- **WHEN** 捕获异常
|
||||||
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
|
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
|
||||||
|
|
||||||
|
#### Scenario: 分区缺失错误处理
|
||||||
|
- **GIVEN** 写入时数据库返回分区缺失错误
|
||||||
|
- **WHEN** 服务处理该错误
|
||||||
|
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建
|
||||||
|
|
||||||
|
### Requirement: G5 状态表同步开关
|
||||||
|
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`。
|
||||||
|
|
||||||
|
#### Scenario: 默认关闭
|
||||||
|
- **GIVEN** 未设置 G5 状态表同步开关
|
||||||
|
- **WHEN** 服务启动并处理 Kafka 消息
|
||||||
|
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
|
||||||
|
|
||||||
|
#### Scenario: 显式开启
|
||||||
|
- **GIVEN** G5 状态表同步开关被设置为 `true`
|
||||||
|
- **WHEN** 服务处理 Kafka 消息
|
||||||
|
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据
|
||||||
|
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
CREATE SCHEMA IF NOT EXISTS onoffline;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS onoffline.onoffline_record (
|
|
||||||
guid VARCHAR(32) NOT NULL,
|
|
||||||
ts_ms BIGINT NOT NULL,
|
|
||||||
write_ts_ms BIGINT NOT NULL,
|
|
||||||
hotel_id SMALLINT NOT NULL,
|
|
||||||
mac VARCHAR(21) NOT NULL,
|
|
||||||
device_id VARCHAR(64) NOT NULL,
|
|
||||||
room_id VARCHAR(64) NOT NULL,
|
|
||||||
ip VARCHAR(21),
|
|
||||||
current_status VARCHAR(255),
|
|
||||||
launcher_version VARCHAR(255),
|
|
||||||
reboot_reason VARCHAR(255),
|
|
||||||
PRIMARY KEY (ts_ms, mac, device_id, room_id)
|
|
||||||
) PARTITION BY RANGE (ts_ms);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_ts_ms ON onoffline.onoffline_record (ts_ms);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_hotel_id ON onoffline.onoffline_record (hotel_id);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_mac ON onoffline.onoffline_record (mac);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_device_id ON onoffline.onoffline_record (device_id);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_room_id ON onoffline.onoffline_record (room_id);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_onoffline_current_status ON onoffline.onoffline_record (current_status);
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
import pg from 'pg';
|
|
||||||
import { config } from '../src/config/config.js';
|
|
||||||
|
|
||||||
const { Pool } = pg;
|
|
||||||
|
|
||||||
const verify = async () => {
|
|
||||||
const pool = new Pool({
|
|
||||||
host: config.db.host,
|
|
||||||
port: config.db.port,
|
|
||||||
user: config.db.user,
|
|
||||||
password: config.db.password,
|
|
||||||
database: config.db.database,
|
|
||||||
ssl: config.db.ssl,
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
console.log('Verifying partitions for table onoffline_record...');
|
|
||||||
const client = await pool.connect();
|
|
||||||
|
|
||||||
// Check parent table
|
|
||||||
const parentRes = await client.query(`
|
|
||||||
SELECT to_regclass('onoffline.onoffline_record') as oid;
|
|
||||||
`);
|
|
||||||
if (!parentRes.rows[0].oid) {
|
|
||||||
console.error('Parent table onoffline.onoffline_record DOES NOT EXIST.');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
console.log('Parent table onoffline.onoffline_record exists.');
|
|
||||||
|
|
||||||
// Check partitions
|
|
||||||
const res = await client.query(`
|
|
||||||
SELECT
|
|
||||||
child.relname AS partition_name
|
|
||||||
FROM pg_inherits
|
|
||||||
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
|
|
||||||
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
|
|
||||||
JOIN pg_namespace ns ON parent.relnamespace = ns.oid
|
|
||||||
WHERE parent.relname = 'onoffline_record' AND ns.nspname = 'onoffline'
|
|
||||||
ORDER BY child.relname;
|
|
||||||
`);
|
|
||||||
|
|
||||||
console.log(`Found ${res.rowCount} partitions.`);
|
|
||||||
res.rows.forEach(row => {
|
|
||||||
console.log(`- ${row.partition_name}`);
|
|
||||||
});
|
|
||||||
|
|
||||||
if (res.rowCount >= 30) {
|
|
||||||
console.log('SUCCESS: At least 30 partitions exist.');
|
|
||||||
} else {
|
|
||||||
console.warn(`WARNING: Expected 30+ partitions, found ${res.rowCount}.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
client.release();
|
|
||||||
} catch (err) {
|
|
||||||
console.error('Verification failed:', err);
|
|
||||||
} finally {
|
|
||||||
await pool.end();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
verify();
|
|
||||||
@@ -35,7 +35,16 @@ Topic:blwlog4Nodejs-rcu-onoffline-topic
|
|||||||
主键:(ts_ms, mac, device_id, room_id)
|
主键:(ts_ms, mac, device_id, room_id)
|
||||||
按 ts_ms 每日分区
|
按 ts_ms 每日分区
|
||||||
|
|
||||||
|
G5库结构(双写,临时接入):
|
||||||
|
库同为:log_platform
|
||||||
|
表:onoffline_record_g5
|
||||||
|
差异字段:
|
||||||
|
- guid 为 int4,由库自己生成。
|
||||||
|
- record_source 固定为 CRICS。
|
||||||
|
- current_status 为 int2,on映射为1,off映射为2,restart映射为3,其余为0。
|
||||||
|
支持通过环境变量开关双写。
|
||||||
|
|
||||||
4. 数据处理规则
|
4. 数据处理规则
|
||||||
非重启数据:reboot_reason 为空或不存在,current_status 取 CurrentStatus
|
非重启数据:reboot_reason 为空或不存在,current_status 取 CurrentStatus
|
||||||
重启数据:reboot_reason 不为空,current_status 固定为 on
|
重启数据:reboot_reason 不为空时保留 Kafka 上游 current_status 值;若上游值为 restart,则入库标记为 restart,G5 库映射为 3
|
||||||
其余字段直接按 Kafka 原值落库,空值不补 0
|
其余字段直接按 Kafka 原值落库,空值不补 0
|
||||||
|
|||||||
@@ -13,6 +13,12 @@ const parseList = (value) =>
|
|||||||
.map((item) => item.trim())
|
.map((item) => item.trim())
|
||||||
.filter(Boolean);
|
.filter(Boolean);
|
||||||
|
|
||||||
|
const parseBoolean = (value, defaultValue) => {
|
||||||
|
if (value === undefined || value === null || value === '') return defaultValue;
|
||||||
|
if (typeof value === 'boolean') return value;
|
||||||
|
return value === 'true' || value === '1';
|
||||||
|
};
|
||||||
|
|
||||||
export const config = {
|
export const config = {
|
||||||
env: process.env.NODE_ENV || 'development',
|
env: process.env.NODE_ENV || 'development',
|
||||||
port: parseNumber(process.env.PORT, 3001),
|
port: parseNumber(process.env.PORT, 3001),
|
||||||
@@ -39,6 +45,7 @@ export const config = {
|
|||||||
} : undefined
|
} : undefined
|
||||||
},
|
},
|
||||||
db: {
|
db: {
|
||||||
|
writeEnabled: parseBoolean(process.env.DB_WRITE_ENABLED, true),
|
||||||
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
||||||
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
||||||
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
||||||
@@ -49,6 +56,20 @@ export const config = {
|
|||||||
schema: process.env.DB_SCHEMA || 'onoffline',
|
schema: process.env.DB_SCHEMA || 'onoffline',
|
||||||
table: process.env.DB_TABLE || 'onoffline_record'
|
table: process.env.DB_TABLE || 'onoffline_record'
|
||||||
},
|
},
|
||||||
|
g5db: {
|
||||||
|
writeEnabled: parseBoolean(process.env.G5_DB_WRITE_ENABLED, true),
|
||||||
|
enabled: !!process.env.POSTGRES_HOST_G5,
|
||||||
|
roomStatusMomentSyncEnabled: parseBoolean(process.env.G5_ROOM_STATUS_MOMENT_SYNC_ENABLED, false),
|
||||||
|
host: process.env.POSTGRES_HOST_G5,
|
||||||
|
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
|
||||||
|
user: process.env.POSTGRES_USER_G5,
|
||||||
|
password: process.env.POSTGRES_PASSWORD_G5,
|
||||||
|
database: process.env.POSTGRES_DATABASE_G5,
|
||||||
|
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 3),
|
||||||
|
ssl: process.env.POSTGRES_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
|
||||||
|
schema: process.env.DB_SCHEMA_G5 || 'onoffline',
|
||||||
|
table: process.env.DB_TABLE_G5 || 'onoffline_record_g5'
|
||||||
|
},
|
||||||
redis: {
|
redis: {
|
||||||
host: process.env.REDIS_HOST || 'localhost',
|
host: process.env.REDIS_HOST || 'localhost',
|
||||||
port: parseNumber(process.env.REDIS_PORT, 6379),
|
port: parseNumber(process.env.REDIS_PORT, 6379),
|
||||||
|
|||||||
202
bls-onoffline-backend/src/db/g5DatabaseManager.js
Normal file
202
bls-onoffline-backend/src/db/g5DatabaseManager.js
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
import pg from 'pg';
|
||||||
|
import { config } from '../config/config.js';
|
||||||
|
import { logger } from '../utils/logger.js';
|
||||||
|
|
||||||
|
const { Pool } = pg;
|
||||||
|
|
||||||
|
const g5Columns = [
|
||||||
|
'ts_ms',
|
||||||
|
'write_ts_ms',
|
||||||
|
'hotel_id',
|
||||||
|
'mac',
|
||||||
|
'device_id',
|
||||||
|
'room_id',
|
||||||
|
'ip',
|
||||||
|
'current_status',
|
||||||
|
'launcher_version',
|
||||||
|
'reboot_reason',
|
||||||
|
'record_source'
|
||||||
|
];
|
||||||
|
|
||||||
|
const roomStatusSyncSchema = 'room_status';
|
||||||
|
const roomStatusSyncTable = 'room_status_moment_g5';
|
||||||
|
|
||||||
|
export const mapCurrentStatusToG5Code = (value) => {
|
||||||
|
if (value === 1 || value === 2 || value === 3) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
|
||||||
|
if (normalized === 'on') return 1;
|
||||||
|
if (normalized === 'off') return 2;
|
||||||
|
if (normalized === 'restart') return 3;
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const dedupeRoomStatusSyncRows = (rows) => {
|
||||||
|
const uniqueRows = new Map();
|
||||||
|
for (const row of rows || []) {
|
||||||
|
const hotelId = Number(row?.hotel_id);
|
||||||
|
const roomId = row?.room_id ?? null;
|
||||||
|
if (!Number.isFinite(hotelId) || roomId === null || roomId === '') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const key = `${hotelId}@@${String(roomId)}`;
|
||||||
|
if (!uniqueRows.has(key)) {
|
||||||
|
uniqueRows.set(key, {
|
||||||
|
hotel_id: hotelId,
|
||||||
|
room_id: String(roomId),
|
||||||
|
ip: row?.ip ?? null
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Array.from(uniqueRows.values());
|
||||||
|
};
|
||||||
|
|
||||||
|
export class G5DatabaseManager {
|
||||||
|
constructor(dbConfig) {
|
||||||
|
if (!dbConfig.enabled) return;
|
||||||
|
this.pool = new Pool({
|
||||||
|
host: dbConfig.host,
|
||||||
|
port: dbConfig.port,
|
||||||
|
user: dbConfig.user,
|
||||||
|
password: dbConfig.password,
|
||||||
|
database: dbConfig.database,
|
||||||
|
max: dbConfig.max,
|
||||||
|
ssl: dbConfig.ssl
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async insertRows({ schema, table, rows }) {
|
||||||
|
if (!this.pool || !rows || rows.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const statement = `
|
||||||
|
INSERT INTO ${schema}.${table} (${g5Columns.join(', ')})
|
||||||
|
SELECT *
|
||||||
|
FROM UNNEST(
|
||||||
|
$1::int8[],
|
||||||
|
$2::int8[],
|
||||||
|
$3::int2[],
|
||||||
|
$4::text[],
|
||||||
|
$5::text[],
|
||||||
|
$6::text[],
|
||||||
|
$7::text[],
|
||||||
|
$8::int2[],
|
||||||
|
$9::text[],
|
||||||
|
$10::text[],
|
||||||
|
$11::text[]
|
||||||
|
)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const params = g5Columns.map((column) => {
|
||||||
|
return rows.map((row) => {
|
||||||
|
if (column === 'record_source') {
|
||||||
|
return 'CRICS';
|
||||||
|
}
|
||||||
|
if (column === 'current_status') {
|
||||||
|
// current_status in G5 is int2
|
||||||
|
return mapCurrentStatusToG5Code(row.current_status);
|
||||||
|
}
|
||||||
|
return row[column] ?? null;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.pool.query(statement, params);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('G5 Database insert failed', {
|
||||||
|
error: error?.message,
|
||||||
|
schema,
|
||||||
|
table,
|
||||||
|
rowsLength: rows.length
|
||||||
|
});
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async upsertRoomStatusMomentIp(rows) {
|
||||||
|
if (!this.pool || !rows || rows.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const syncRows = dedupeRoomStatusSyncRows(rows);
|
||||||
|
if (syncRows.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sql = `
|
||||||
|
WITH input_rows AS (
|
||||||
|
SELECT *
|
||||||
|
FROM UNNEST($1::int2[], $2::text[], $3::text[])
|
||||||
|
AS t(hotel_id, room_id, ip)
|
||||||
|
), matched_rows AS (
|
||||||
|
SELECT i.hotel_id, i.room_id, i.ip
|
||||||
|
FROM input_rows i
|
||||||
|
JOIN LATERAL (
|
||||||
|
SELECT r.hotel_id, r.room_id
|
||||||
|
FROM ${roomStatusSyncSchema}.${roomStatusSyncTable} r
|
||||||
|
WHERE r.hotel_id = i.hotel_id
|
||||||
|
AND r.room_id = i.room_id
|
||||||
|
LIMIT 1
|
||||||
|
) m ON TRUE
|
||||||
|
)
|
||||||
|
INSERT INTO ${roomStatusSyncSchema}.${roomStatusSyncTable} (hotel_id, room_id, ip)
|
||||||
|
SELECT hotel_id, room_id, ip
|
||||||
|
FROM matched_rows
|
||||||
|
ON CONFLICT (hotel_id, room_id)
|
||||||
|
DO UPDATE SET ip = EXCLUDED.ip
|
||||||
|
`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.pool.query(sql, [
|
||||||
|
syncRows.map((row) => row.hotel_id),
|
||||||
|
syncRows.map((row) => row.room_id),
|
||||||
|
syncRows.map((row) => row.ip)
|
||||||
|
]);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('G5 room_status_moment_g5 upsert failed', {
|
||||||
|
error: error?.message,
|
||||||
|
rowsLength: syncRows.length
|
||||||
|
});
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async checkConnection() {
|
||||||
|
if (!this.pool) return true; // Pretend it's ok if disabled
|
||||||
|
let client;
|
||||||
|
try {
|
||||||
|
const connectPromise = this.pool.connect();
|
||||||
|
const timeoutPromise = new Promise((_, reject) => {
|
||||||
|
setTimeout(() => reject(new Error('Connection timeout')), 5000);
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
client = await Promise.race([connectPromise, timeoutPromise]);
|
||||||
|
} catch (raceError) {
|
||||||
|
connectPromise.then(c => c.release()).catch(() => { });
|
||||||
|
throw raceError;
|
||||||
|
}
|
||||||
|
await client.query('SELECT 1');
|
||||||
|
return true;
|
||||||
|
} catch (err) {
|
||||||
|
logger.error('G5 Database check connection failed', { error: err.message });
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
if (client) {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async close() {
|
||||||
|
if (this.pool) {
|
||||||
|
await this.pool.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const g5DbManager = new G5DatabaseManager(config.g5db);
|
||||||
|
export default g5DbManager;
|
||||||
@@ -1,100 +0,0 @@
|
|||||||
import pg from 'pg';
|
|
||||||
import fs from 'fs';
|
|
||||||
import path from 'path';
|
|
||||||
import { fileURLToPath } from 'url';
|
|
||||||
import { logger } from '../utils/logger.js';
|
|
||||||
import partitionManager from './partitionManager.js';
|
|
||||||
import dbManager from './databaseManager.js';
|
|
||||||
import { config } from '../config/config.js';
|
|
||||||
|
|
||||||
const __filename = fileURLToPath(import.meta.url);
|
|
||||||
const __dirname = path.dirname(__filename);
|
|
||||||
|
|
||||||
class DatabaseInitializer {
|
|
||||||
async initialize() {
|
|
||||||
logger.info('Starting database initialization check...');
|
|
||||||
|
|
||||||
// 1. Check if database exists, create if not
|
|
||||||
await this.ensureDatabaseExists();
|
|
||||||
|
|
||||||
// 2. Initialize Schema and Parent Table (if not exists)
|
|
||||||
// Note: We need to use dbManager because it connects to the target database
|
|
||||||
await this.ensureSchemaAndTable();
|
|
||||||
|
|
||||||
// 3. Ensure Partitions for the next month
|
|
||||||
await partitionManager.ensurePartitions(30);
|
|
||||||
|
|
||||||
console.log('Database initialization completed successfully.');
|
|
||||||
logger.info('Database initialization completed successfully.');
|
|
||||||
}
|
|
||||||
|
|
||||||
async ensureDatabaseExists() {
|
|
||||||
const { host, port, user, password, database, ssl } = config.db;
|
|
||||||
console.log(`Checking if database '${database}' exists at ${host}:${port}...`);
|
|
||||||
|
|
||||||
// Connect to 'postgres' database to check/create target database
|
|
||||||
const client = new pg.Client({
|
|
||||||
host,
|
|
||||||
port,
|
|
||||||
user,
|
|
||||||
password,
|
|
||||||
database: 'postgres',
|
|
||||||
ssl: ssl ? { rejectUnauthorized: false } : false
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
await client.connect();
|
|
||||||
|
|
||||||
const checkRes = await client.query(
|
|
||||||
`SELECT 1 FROM pg_database WHERE datname = $1`,
|
|
||||||
[database]
|
|
||||||
);
|
|
||||||
|
|
||||||
if (checkRes.rowCount === 0) {
|
|
||||||
logger.info(`Database '${database}' does not exist. Creating...`);
|
|
||||||
// CREATE DATABASE cannot run inside a transaction block
|
|
||||||
await client.query(`CREATE DATABASE "${database}"`);
|
|
||||||
console.log(`Database '${database}' created.`);
|
|
||||||
logger.info(`Database '${database}' created.`);
|
|
||||||
} else {
|
|
||||||
console.log(`Database '${database}' already exists.`);
|
|
||||||
logger.info(`Database '${database}' already exists.`);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
logger.error('Error ensuring database exists:', err);
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
await client.end();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async ensureSchemaAndTable() {
|
|
||||||
// dbManager connects to the target database
|
|
||||||
const client = await dbManager.pool.connect();
|
|
||||||
try {
|
|
||||||
const sqlPathCandidates = [
|
|
||||||
path.resolve(process.cwd(), 'scripts/init_db.sql'),
|
|
||||||
path.resolve(__dirname, '../scripts/init_db.sql'),
|
|
||||||
path.resolve(__dirname, '../../scripts/init_db.sql')
|
|
||||||
];
|
|
||||||
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
|
|
||||||
if (!sqlPath) {
|
|
||||||
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
|
|
||||||
}
|
|
||||||
const sql = fs.readFileSync(sqlPath, 'utf8');
|
|
||||||
|
|
||||||
console.log(`Executing init_db.sql from ${sqlPath}...`);
|
|
||||||
logger.info('Executing init_db.sql...');
|
|
||||||
await client.query(sql);
|
|
||||||
console.log('Schema and parent table initialized.');
|
|
||||||
logger.info('Schema and parent table initialized.');
|
|
||||||
} catch (err) {
|
|
||||||
logger.error('Error initializing schema and table:', err);
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default new DatabaseInitializer();
|
|
||||||
@@ -1,124 +0,0 @@
|
|||||||
import { logger } from '../utils/logger.js';
|
|
||||||
import { config } from '../config/config.js';
|
|
||||||
import dbManager from './databaseManager.js';
|
|
||||||
|
|
||||||
class PartitionManager {
|
|
||||||
/**
|
|
||||||
* Calculate the start and end timestamps (milliseconds) for a given date.
|
|
||||||
* @param {Date} date - The date to calculate for.
|
|
||||||
* @returns {Object} { startMs, endMs, partitionSuffix }
|
|
||||||
*/
|
|
||||||
getPartitionInfo(date) {
|
|
||||||
const yyyy = date.getFullYear();
|
|
||||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
|
||||||
const dd = String(date.getDate()).padStart(2, '0');
|
|
||||||
const partitionSuffix = `${yyyy}${mm}${dd}`;
|
|
||||||
|
|
||||||
const start = new Date(date);
|
|
||||||
start.setHours(0, 0, 0, 0);
|
|
||||||
const startMs = start.getTime();
|
|
||||||
|
|
||||||
const end = new Date(date);
|
|
||||||
end.setDate(end.getDate() + 1);
|
|
||||||
end.setHours(0, 0, 0, 0);
|
|
||||||
const endMs = end.getTime();
|
|
||||||
|
|
||||||
return { startMs, endMs, partitionSuffix };
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ensure partitions exist for the past M days and next N days.
|
|
||||||
* @param {number} daysAhead - Number of days to pre-create.
|
|
||||||
* @param {number} daysBack - Number of days to look back.
|
|
||||||
*/
|
|
||||||
async ensurePartitions(daysAhead = 30, daysBack = 15) {
|
|
||||||
const client = await dbManager.pool.connect();
|
|
||||||
try {
|
|
||||||
logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
|
|
||||||
console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
|
|
||||||
const now = new Date();
|
|
||||||
|
|
||||||
for (let i = -daysBack; i < daysAhead; i++) {
|
|
||||||
const targetDate = new Date(now);
|
|
||||||
targetDate.setDate(now.getDate() + i);
|
|
||||||
|
|
||||||
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
|
|
||||||
const schema = config.db.schema;
|
|
||||||
const table = config.db.table;
|
|
||||||
const partitionName = `${schema}.${table}_${partitionSuffix}`;
|
|
||||||
|
|
||||||
// Check if partition exists
|
|
||||||
const checkSql = `
|
|
||||||
SELECT to_regclass($1) as exists;
|
|
||||||
`;
|
|
||||||
const checkRes = await client.query(checkSql, [partitionName]);
|
|
||||||
|
|
||||||
if (!checkRes.rows[0].exists) {
|
|
||||||
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
|
||||||
console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
|
||||||
const createSql = `
|
|
||||||
CREATE TABLE IF NOT EXISTS ${partitionName}
|
|
||||||
PARTITION OF ${schema}.${table}
|
|
||||||
FOR VALUES FROM (${startMs}) TO (${endMs});
|
|
||||||
`;
|
|
||||||
await client.query(createSql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info('Partition check completed.');
|
|
||||||
} catch (err) {
|
|
||||||
logger.error('Error ensuring partitions:', err);
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async ensurePartitionsForTimestamps(tsMsList) {
|
|
||||||
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
|
|
||||||
|
|
||||||
const uniqueSuffixes = new Set();
|
|
||||||
for (const ts of tsMsList) {
|
|
||||||
const numericTs = typeof ts === 'string' ? Number(ts) : ts;
|
|
||||||
if (!Number.isFinite(numericTs)) continue;
|
|
||||||
const date = new Date(numericTs);
|
|
||||||
if (Number.isNaN(date.getTime())) continue;
|
|
||||||
const { partitionSuffix } = this.getPartitionInfo(date);
|
|
||||||
uniqueSuffixes.add(partitionSuffix);
|
|
||||||
if (uniqueSuffixes.size >= 400) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (uniqueSuffixes.size === 0) return;
|
|
||||||
|
|
||||||
const client = await dbManager.pool.connect();
|
|
||||||
try {
|
|
||||||
const schema = config.db.schema;
|
|
||||||
const table = config.db.table;
|
|
||||||
|
|
||||||
for (const partitionSuffix of uniqueSuffixes) {
|
|
||||||
const yyyy = Number(partitionSuffix.slice(0, 4));
|
|
||||||
const mm = Number(partitionSuffix.slice(4, 6));
|
|
||||||
const dd = Number(partitionSuffix.slice(6, 8));
|
|
||||||
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
|
|
||||||
const targetDate = new Date(yyyy, mm - 1, dd);
|
|
||||||
if (Number.isNaN(targetDate.getTime())) continue;
|
|
||||||
|
|
||||||
const { startMs, endMs } = this.getPartitionInfo(targetDate);
|
|
||||||
const partitionName = `${schema}.${table}_${partitionSuffix}`;
|
|
||||||
|
|
||||||
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
|
|
||||||
if (!checkRes.rows[0].exists) {
|
|
||||||
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
|
||||||
await client.query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS ${partitionName}
|
|
||||||
PARTITION OF ${schema}.${table}
|
|
||||||
FOR VALUES FROM (${startMs}) TO (${endMs});
|
|
||||||
`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default new PartitionManager();
|
|
||||||
@@ -1,8 +1,7 @@
|
|||||||
import cron from 'node-cron';
|
import cron from 'node-cron';
|
||||||
import { config } from './config/config.js';
|
import { config } from './config/config.js';
|
||||||
import dbManager from './db/databaseManager.js';
|
import dbManager from './db/databaseManager.js';
|
||||||
import dbInitializer from './db/initializer.js';
|
import g5DbManager from './db/g5DatabaseManager.js';
|
||||||
import partitionManager from './db/partitionManager.js';
|
|
||||||
import { createKafkaConsumers } from './kafka/consumer.js';
|
import { createKafkaConsumers } from './kafka/consumer.js';
|
||||||
import { parseMessageToRows } from './processor/index.js';
|
import { parseMessageToRows } from './processor/index.js';
|
||||||
import { createRedisClient } from './redis/redisClient.js';
|
import { createRedisClient } from './redis/redisClient.js';
|
||||||
@@ -20,7 +19,13 @@ const bootstrap = async () => {
|
|||||||
port: config.db.port,
|
port: config.db.port,
|
||||||
user: config.db.user,
|
user: config.db.user,
|
||||||
database: config.db.database,
|
database: config.db.database,
|
||||||
schema: config.db.schema
|
schema: config.db.schema,
|
||||||
|
writeEnabled: config.db.writeEnabled
|
||||||
|
},
|
||||||
|
g5db: {
|
||||||
|
enabled: config.g5db.enabled,
|
||||||
|
writeEnabled: config.g5db.writeEnabled,
|
||||||
|
roomStatusMomentSyncEnabled: config.g5db.roomStatusMomentSyncEnabled
|
||||||
},
|
},
|
||||||
kafka: {
|
kafka: {
|
||||||
brokers: config.kafka.brokers,
|
brokers: config.kafka.brokers,
|
||||||
@@ -33,38 +38,12 @@ const bootstrap = async () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
|
|
||||||
await dbInitializer.initialize();
|
|
||||||
|
|
||||||
// Metric Collector
|
// Metric Collector
|
||||||
const metricCollector = new MetricCollector();
|
const metricCollector = new MetricCollector();
|
||||||
|
|
||||||
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
|
|
||||||
cron.schedule('0 0 * * *', async () => {
|
|
||||||
logger.info('Running scheduled partition maintenance...');
|
|
||||||
try {
|
|
||||||
await partitionManager.ensurePartitions(30);
|
|
||||||
} catch (err) {
|
|
||||||
logger.error('Scheduled partition maintenance failed', err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
||||||
// Moved after redisIntegration initialization
|
// Moved after redisIntegration initialization
|
||||||
|
|
||||||
|
|
||||||
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
|
|
||||||
// In databaseManager.js it exports `dbManager` instance by default.
|
|
||||||
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
|
|
||||||
// Let's check `databaseManager.js` content.
|
|
||||||
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
|
|
||||||
// If `databaseManager.js` exports an instance as default, I should use that.
|
|
||||||
// If it exports a class, I should instantiate it.
|
|
||||||
|
|
||||||
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
|
|
||||||
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
|
|
||||||
// I need to verify `databaseManager.js`.
|
|
||||||
|
|
||||||
const redisClient = await createRedisClient(config.redis);
|
const redisClient = await createRedisClient(config.redis);
|
||||||
const redisIntegration = new RedisIntegration(
|
const redisIntegration = new RedisIntegration(
|
||||||
redisClient,
|
redisClient,
|
||||||
@@ -81,7 +60,7 @@ const bootstrap = async () => {
|
|||||||
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
|
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
|
||||||
console.log(report);
|
console.log(report);
|
||||||
logger.info(report);
|
logger.info(report);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await redisIntegration.info('Minute Metrics', metrics);
|
await redisIntegration.info('Minute Metrics', metrics);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -134,7 +113,7 @@ const bootstrap = async () => {
|
|||||||
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
|
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
|
||||||
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
|
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
|
||||||
const commitOnAttempt = config.kafka.commitOnAttempt === true;
|
const commitOnAttempt = config.kafka.commitOnAttempt === true;
|
||||||
|
|
||||||
const batchStates = new Map();
|
const batchStates = new Map();
|
||||||
|
|
||||||
const partitionKeyFromMessage = (message) => {
|
const partitionKeyFromMessage = (message) => {
|
||||||
@@ -199,16 +178,26 @@ const bootstrap = async () => {
|
|||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
const isMissingPartitionError = (err) =>
|
|
||||||
err?.code === '23514' ||
|
|
||||||
(typeof err?.message === 'string' && err.message.includes('no partition of relation'));
|
|
||||||
|
|
||||||
const insertRowsWithRetry = async (rows) => {
|
const insertRowsWithRetry = async (rows) => {
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
let attemptedPartitionFix = false;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
const promises = [];
|
||||||
|
if (config.db.writeEnabled) {
|
||||||
|
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
|
||||||
|
}
|
||||||
|
if (config.g5db.writeEnabled && config.g5db.enabled) {
|
||||||
|
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
||||||
|
logger.error('G5 Database insert failed but non-blocking', { error: e.message });
|
||||||
|
}));
|
||||||
|
if (config.g5db.roomStatusMomentSyncEnabled) {
|
||||||
|
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
||||||
|
logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message });
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
|
|
||||||
metricCollector.increment('db_insert_count', 1);
|
metricCollector.increment('db_insert_count', 1);
|
||||||
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
||||||
return;
|
return;
|
||||||
@@ -222,24 +211,6 @@ const bootstrap = async () => {
|
|||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
|
|
||||||
attemptedPartitionFix = true;
|
|
||||||
try {
|
|
||||||
await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms));
|
|
||||||
} catch (partitionErr) {
|
|
||||||
if (isDbConnectionError(partitionErr)) {
|
|
||||||
logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message });
|
|
||||||
await new Promise(r => setTimeout(r, 5000));
|
|
||||||
while (!(await dbManager.checkConnection())) {
|
|
||||||
logger.warn('Database still offline. Waiting 5s...');
|
|
||||||
await new Promise(r => setTimeout(r, 5000));
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw partitionErr;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -247,7 +218,21 @@ const bootstrap = async () => {
|
|||||||
|
|
||||||
const insertRowsOnce = async (rows) => {
|
const insertRowsOnce = async (rows) => {
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
const promises = [];
|
||||||
|
if (config.db.writeEnabled) {
|
||||||
|
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
|
||||||
|
}
|
||||||
|
if (config.g5db.writeEnabled && config.g5db.enabled) {
|
||||||
|
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
||||||
|
logger.error('G5 Database insert failed in insertOnce', { error: e.message });
|
||||||
|
}));
|
||||||
|
if (config.g5db.roomStatusMomentSyncEnabled) {
|
||||||
|
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
||||||
|
logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message });
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
metricCollector.increment('db_insert_count', 1);
|
metricCollector.increment('db_insert_count', 1);
|
||||||
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
||||||
};
|
};
|
||||||
@@ -380,7 +365,7 @@ const bootstrap = async () => {
|
|||||||
for (const item of unresolvedItems) {
|
for (const item of unresolvedItems) {
|
||||||
try {
|
try {
|
||||||
await handleError(err, item.message);
|
await handleError(err, item.message);
|
||||||
} catch {}
|
} catch { }
|
||||||
item.resolve();
|
item.resolve();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -406,7 +391,7 @@ const bootstrap = async () => {
|
|||||||
metricCollector.increment('kafka_pulled');
|
metricCollector.increment('kafka_pulled');
|
||||||
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
|
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// const messageValue = Buffer.isBuffer(message.value)
|
// const messageValue = Buffer.isBuffer(message.value)
|
||||||
// ? message.value.toString('utf8')
|
// ? message.value.toString('utf8')
|
||||||
// : message.value;
|
// : message.value;
|
||||||
@@ -422,7 +407,7 @@ const bootstrap = async () => {
|
|||||||
// value: config.kafka.logMessages ? messageValue : undefined,
|
// value: config.kafka.logMessages ? messageValue : undefined,
|
||||||
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
|
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
|
||||||
// };
|
// };
|
||||||
|
|
||||||
// logger.info('Kafka message received', logDetails);
|
// logger.info('Kafka message received', logDetails);
|
||||||
|
|
||||||
const partitionKey = partitionKeyFromMessage(message);
|
const partitionKey = partitionKeyFromMessage(message);
|
||||||
@@ -465,7 +450,7 @@ const bootstrap = async () => {
|
|||||||
// Graceful Shutdown Logic
|
// Graceful Shutdown Logic
|
||||||
const shutdown = async (signal) => {
|
const shutdown = async (signal) => {
|
||||||
logger.info(`Received ${signal}, shutting down...`);
|
logger.info(`Received ${signal}, shutting down...`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 1. Close Kafka Consumer
|
// 1. Close Kafka Consumer
|
||||||
if (consumers && consumers.length > 0) {
|
if (consumers && consumers.length > 0) {
|
||||||
@@ -480,8 +465,9 @@ const bootstrap = async () => {
|
|||||||
await redisClient.quit();
|
await redisClient.quit();
|
||||||
logger.info('Redis client closed');
|
logger.info('Redis client closed');
|
||||||
|
|
||||||
// 4. Close Database Pool
|
// 4. Close Database Pools
|
||||||
await dbManager.close();
|
await dbManager.close();
|
||||||
|
await g5DbManager.close();
|
||||||
logger.info('Database connection closed');
|
logger.info('Database connection closed');
|
||||||
|
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
|
|||||||
@@ -20,6 +20,16 @@ const normalizeText = (value, maxLength) => {
|
|||||||
return str;
|
return str;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const normalizeCurrentStatus = (value) => {
|
||||||
|
const currentStatus = normalizeText(value, 255);
|
||||||
|
if (currentStatus === null) return null;
|
||||||
|
const normalized = currentStatus.toLowerCase();
|
||||||
|
if (normalized === 'on' || normalized === 'off' || normalized === 'restart') {
|
||||||
|
return normalized;
|
||||||
|
}
|
||||||
|
return currentStatus;
|
||||||
|
};
|
||||||
|
|
||||||
export const buildRowsFromMessageValue = (value) => {
|
export const buildRowsFromMessageValue = (value) => {
|
||||||
const payload = parseKafkaPayload(value);
|
const payload = parseKafkaPayload(value);
|
||||||
return buildRowsFromPayload(payload);
|
return buildRowsFromPayload(payload);
|
||||||
@@ -30,9 +40,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
|||||||
|
|
||||||
// Database limit is VARCHAR(255)
|
// Database limit is VARCHAR(255)
|
||||||
const rebootReason = normalizeText(payload.RebootReason, 255);
|
const rebootReason = normalizeText(payload.RebootReason, 255);
|
||||||
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255);
|
const currentStatus = normalizeCurrentStatus(payload.CurrentStatus);
|
||||||
const hasRebootReason = rebootReason !== null && rebootReason !== '';
|
|
||||||
const currentStatus = hasRebootReason ? 'on' : currentStatusRaw;
|
|
||||||
|
|
||||||
// Derive timestamp: UnixTime -> CurrentTime -> Date.now()
|
// Derive timestamp: UnixTime -> CurrentTime -> Date.now()
|
||||||
let tsMs = payload.UnixTime;
|
let tsMs = payload.UnixTime;
|
||||||
|
|||||||
27
bls-onoffline-backend/tests/g5DatabaseManager.test.js
Normal file
27
bls-onoffline-backend/tests/g5DatabaseManager.test.js
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { dedupeRoomStatusSyncRows, mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js';
|
||||||
|
|
||||||
|
describe('G5 current_status mapping', () => {
|
||||||
|
it('maps on/off/restart to numeric codes', () => {
|
||||||
|
expect(mapCurrentStatusToG5Code('on')).toBe(1);
|
||||||
|
expect(mapCurrentStatusToG5Code('off')).toBe(2);
|
||||||
|
expect(mapCurrentStatusToG5Code('restart')).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns 0 for unknown values', () => {
|
||||||
|
expect(mapCurrentStatusToG5Code('idle')).toBe(0);
|
||||||
|
expect(mapCurrentStatusToG5Code(null)).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('dedupes room status sync rows by hotel_id and room_id using first row', () => {
|
||||||
|
const rows = dedupeRoomStatusSyncRows([
|
||||||
|
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' },
|
||||||
|
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.2:5678' },
|
||||||
|
{ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' }
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(rows).toHaveLength(2);
|
||||||
|
expect(rows[0]).toEqual({ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' });
|
||||||
|
expect(rows[1]).toEqual({ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' });
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -26,13 +26,19 @@ describe('Processor Logic', () => {
|
|||||||
expect(rows[0].reboot_reason).toBeNull();
|
expect(rows[0].reboot_reason).toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should override current_status to on for reboot data', () => {
|
it('should preserve restart current_status for reboot data', () => {
|
||||||
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'off', RebootReason: '0x01' });
|
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: '0x01' });
|
||||||
expect(rows).toHaveLength(1);
|
expect(rows).toHaveLength(1);
|
||||||
expect(rows[0].current_status).toBe('on');
|
expect(rows[0].current_status).toBe('restart');
|
||||||
expect(rows[0].reboot_reason).toBe('0x01');
|
expect(rows[0].reboot_reason).toBe('0x01');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should preserve restart current_status for non-reboot data', () => {
|
||||||
|
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: null });
|
||||||
|
expect(rows).toHaveLength(1);
|
||||||
|
expect(rows[0].current_status).toBe('restart');
|
||||||
|
});
|
||||||
|
|
||||||
it('should keep empty optional fields as empty strings', () => {
|
it('should keep empty optional fields as empty strings', () => {
|
||||||
const rows = buildRowsFromPayload({
|
const rows = buildRowsFromPayload({
|
||||||
...basePayload,
|
...basePayload,
|
||||||
|
|||||||
42
docs/onoffline_record_g5.sql
Normal file
42
docs/onoffline_record_g5.sql
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
Navicat Premium Dump SQL
|
||||||
|
|
||||||
|
Source Server : FnOS 80
|
||||||
|
Source Server Type : PostgreSQL
|
||||||
|
Source Server Version : 150017 (150017)
|
||||||
|
Source Host : 10.8.8.80:5434
|
||||||
|
Source Catalog : log_platform
|
||||||
|
Source Schema : onoffline
|
||||||
|
|
||||||
|
Target Server Type : PostgreSQL
|
||||||
|
Target Server Version : 150017 (150017)
|
||||||
|
File Encoding : 65001
|
||||||
|
|
||||||
|
Date: 10/03/2026 17:23:24
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Table structure for onoffline_record_g5
|
||||||
|
-- ----------------------------
|
||||||
|
DROP TABLE IF EXISTS "onoffline"."onoffline_record_g5";
|
||||||
|
CREATE TABLE "onoffline"."onoffline_record_g5" (
|
||||||
|
"guid" int4 NOT NULL DEFAULT nextval('"onoffline".onoffline_record_g5_guid_seq'::regclass),
|
||||||
|
"ts_ms" int8 NOT NULL,
|
||||||
|
"write_ts_ms" int8 NOT NULL,
|
||||||
|
"hotel_id" int2 NOT NULL,
|
||||||
|
"mac" varchar(21) COLLATE "pg_catalog"."default" NOT NULL,
|
||||||
|
"device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
|
||||||
|
"room_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
|
||||||
|
"ip" varchar(25) COLLATE "pg_catalog"."default",
|
||||||
|
"current_status" int2 NOT NULL DEFAULT 0,
|
||||||
|
"launcher_version" varchar(255) COLLATE "pg_catalog"."default",
|
||||||
|
"reboot_reason" varchar(255) COLLATE "pg_catalog"."default",
|
||||||
|
"record_source" varchar(50) COLLATE "pg_catalog"."default"
|
||||||
|
)
|
||||||
|
;
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Primary Key structure for table onoffline_record_g5
|
||||||
|
-- ----------------------------
|
||||||
|
ALTER TABLE "onoffline"."onoffline_record_g5" ADD CONSTRAINT "onoffline_record_g5_pkey" PRIMARY KEY ("ts_ms", "guid");
|
||||||
91
docs/room_status_moment_g5.sql
Normal file
91
docs/room_status_moment_g5.sql
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
/*
|
||||||
|
Navicat Premium Dump SQL
|
||||||
|
|
||||||
|
Source Server : FnOS 80
|
||||||
|
Source Server Type : PostgreSQL
|
||||||
|
Source Server Version : 150017 (150017)
|
||||||
|
Source Host : 10.8.8.80:5434
|
||||||
|
Source Catalog : log_platform
|
||||||
|
Source Schema : room_status
|
||||||
|
|
||||||
|
Target Server Type : PostgreSQL
|
||||||
|
Target Server Version : 150017 (150017)
|
||||||
|
File Encoding : 65001
|
||||||
|
|
||||||
|
Date: 18/03/2026 10:55:09
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Table structure for room_status_moment_g5
|
||||||
|
-- ----------------------------
|
||||||
|
DROP TABLE IF EXISTS "room_status"."room_status_moment_g5";
|
||||||
|
CREATE TABLE "room_status"."room_status_moment_g5" (
|
||||||
|
"hotel_id" int2 NOT NULL,
|
||||||
|
"room_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||||
|
"device_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||||
|
"ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint,
|
||||||
|
"sys_lock_status" int2,
|
||||||
|
"online_status" int2,
|
||||||
|
"launcher_version" text COLLATE "pg_catalog"."default",
|
||||||
|
"app_version" text COLLATE "pg_catalog"."default",
|
||||||
|
"config_version" text COLLATE "pg_catalog"."default",
|
||||||
|
"register_ts_ms" int8,
|
||||||
|
"upgrade_ts_ms" int8,
|
||||||
|
"config_ts_ms" int8,
|
||||||
|
"ip" text COLLATE "pg_catalog"."default",
|
||||||
|
"pms_status" int2,
|
||||||
|
"power_state" int2,
|
||||||
|
"cardless_state" int2,
|
||||||
|
"service_mask" int8,
|
||||||
|
"insert_card" int2,
|
||||||
|
"bright_g" int2,
|
||||||
|
"agreement_ver" text COLLATE "pg_catalog"."default",
|
||||||
|
"air_address" _text COLLATE "pg_catalog"."default",
|
||||||
|
"air_state" _int2,
|
||||||
|
"air_model" _int2,
|
||||||
|
"air_speed" _int2,
|
||||||
|
"air_set_temp" _int2,
|
||||||
|
"air_now_temp" _int2,
|
||||||
|
"air_solenoid_valve" _int2,
|
||||||
|
"elec_address" _text COLLATE "pg_catalog"."default",
|
||||||
|
"elec_voltage" _float8,
|
||||||
|
"elec_ampere" _float8,
|
||||||
|
"elec_power" _float8,
|
||||||
|
"elec_phase" _float8,
|
||||||
|
"elec_energy" _float8,
|
||||||
|
"elec_sum_energy" _float8,
|
||||||
|
"carbon_state" int2,
|
||||||
|
"dev_loops" jsonb,
|
||||||
|
"energy_carbon_sum" float8,
|
||||||
|
"energy_nocard_sum" float8,
|
||||||
|
"external_device" jsonb DEFAULT '{}'::jsonb,
|
||||||
|
"faulty_device_count" jsonb DEFAULT '{}'::jsonb
|
||||||
|
)
|
||||||
|
WITH (fillfactor=90)
|
||||||
|
TABLESPACE "ts_hot"
|
||||||
|
;
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Indexes structure for table room_status_moment_g5
|
||||||
|
-- ----------------------------
|
||||||
|
CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree (
|
||||||
|
"hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||||
|
"online_status" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||||
|
"power_state" "pg_catalog"."int2_ops" ASC NULLS LAST
|
||||||
|
);
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Triggers structure for table room_status_moment_g5
|
||||||
|
-- ----------------------------
|
||||||
|
CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5"
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"();
|
||||||
|
CREATE TRIGGER "trigger_room_status_change" AFTER UPDATE ON "room_status"."room_status_moment_g5"
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE "room_status"."handle_room_status_change"();
|
||||||
|
|
||||||
|
-- ----------------------------
|
||||||
|
-- Primary Key structure for table room_status_moment_g5
|
||||||
|
-- ----------------------------
|
||||||
|
ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");
|
||||||
Reference in New Issue
Block a user