Compare commits
12 Commits
c98b83635d
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 38f1421fff | |||
| 7713cfeb9e | |||
| fa363835a3 | |||
| 381080fee0 | |||
| 1329eca99e | |||
| 156930e6bc | |||
| 33c9bf0e07 | |||
| 3d80ad8710 | |||
| 4492a9c47e | |||
| ba61a540da | |||
| c6d7cea8cf | |||
| 1eccc2e3aa |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
/bls-onoffline-backend/node_modules
|
||||
/template
|
||||
/bls-onoffline-backend/dist
|
||||
|
||||
6
SQL_Script/create_database.sql
Normal file
6
SQL_Script/create_database.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
-- Replace {{DATABASE_NAME}} before execution
|
||||
-- Requires psql (uses \gexec)
|
||||
SELECT format('CREATE DATABASE %I', '{{DATABASE_NAME}}')
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM pg_database WHERE datname = '{{DATABASE_NAME}}'
|
||||
)\gexec;
|
||||
12
SQL_Script/create_partition_for_day.sql
Normal file
12
SQL_Script/create_partition_for_day.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
-- Replace placeholders before execution:
|
||||
-- {{SCHEMA_NAME}} default: onoffline
|
||||
-- {{TABLE_NAME}} default: onoffline_record
|
||||
-- {{PARTITION_SUFFIX}} format: YYYYMMDD (example: 20260304)
|
||||
-- {{START_TS_MS}} unix ms at 00:00:00.000
|
||||
-- {{END_TS_MS}} unix ms at next day 00:00:00.000
|
||||
-- {{TABLESPACE_CLAUSE}} either empty string or: TABLESPACE ts_hot
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}}_{{PARTITION_SUFFIX}}
|
||||
PARTITION OF {{SCHEMA_NAME}}.{{TABLE_NAME}}
|
||||
FOR VALUES FROM ({{START_TS_MS}}) TO ({{END_TS_MS}})
|
||||
{{TABLESPACE_CLAUSE}};
|
||||
27
SQL_Script/create_schema_and_parent_table.sql
Normal file
27
SQL_Script/create_schema_and_parent_table.sql
Normal file
@@ -0,0 +1,27 @@
|
||||
-- Replace placeholders before execution:
|
||||
-- {{SCHEMA_NAME}} default: onoffline
|
||||
-- {{TABLE_NAME}} default: onoffline_record
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS {{SCHEMA_NAME}};
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}} (
|
||||
guid TEXT NOT NULL,
|
||||
ts_ms BIGINT NOT NULL,
|
||||
write_ts_ms BIGINT NOT NULL,
|
||||
hotel_id SMALLINT,
|
||||
mac TEXT,
|
||||
device_id TEXT,
|
||||
room_id TEXT,
|
||||
ip TEXT,
|
||||
current_status TEXT,
|
||||
launcher_version TEXT,
|
||||
reboot_reason TEXT,
|
||||
PRIMARY KEY (ts_ms, guid)
|
||||
) PARTITION BY RANGE (ts_ms);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_guid ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (guid);
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_device_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_hotel_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (hotel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_mac ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (mac);
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_room_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_status ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (current_status);
|
||||
26
SQL_Script/generate_init_sql.js
Normal file
26
SQL_Script/generate_init_sql.js
Normal file
@@ -0,0 +1,26 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
const args = Object.fromEntries(
|
||||
process.argv.slice(2).map((arg) => {
|
||||
const [key, value] = arg.split('=');
|
||||
return [key.replace(/^--/, ''), value];
|
||||
})
|
||||
);
|
||||
|
||||
const database = args.database || 'log_platform';
|
||||
const schema = args.schema || 'onoffline';
|
||||
const table = args.table || 'onoffline_record';
|
||||
const output = args.output;
|
||||
|
||||
const sql = `SELECT format('CREATE DATABASE %I', '${database}')\nWHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${database}')\\gexec;\n\nCREATE SCHEMA IF NOT EXISTS ${schema};\n\nCREATE TABLE IF NOT EXISTS ${schema}.${table} (\n guid TEXT NOT NULL,\n ts_ms BIGINT NOT NULL,\n write_ts_ms BIGINT NOT NULL,\n hotel_id SMALLINT,\n mac TEXT,\n device_id TEXT,\n room_id TEXT,\n ip TEXT,\n current_status TEXT,\n launcher_version TEXT,\n reboot_reason TEXT,\n PRIMARY KEY (ts_ms, guid)\n) PARTITION BY RANGE (ts_ms);\n\nCREATE INDEX IF NOT EXISTS idx_${table}_guid ON ${schema}.${table} (guid);\nCREATE INDEX IF NOT EXISTS idx_${table}_device_id ON ${schema}.${table} (device_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_hotel_id ON ${schema}.${table} (hotel_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_mac ON ${schema}.${table} (mac);\nCREATE INDEX IF NOT EXISTS idx_${table}_room_id ON ${schema}.${table} (room_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_status ON ${schema}.${table} (current_status);\n`;
|
||||
|
||||
if (output) {
|
||||
const outputPath = path.resolve(output);
|
||||
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||
console.log(`Init SQL written to ${outputPath}`);
|
||||
} else {
|
||||
process.stdout.write(sql);
|
||||
}
|
||||
64
SQL_Script/generate_partition_range_sql.js
Normal file
64
SQL_Script/generate_partition_range_sql.js
Normal file
@@ -0,0 +1,64 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
const args = Object.fromEntries(
|
||||
process.argv.slice(2).map((arg) => {
|
||||
const [key, value] = arg.split('=');
|
||||
return [key.replace(/^--/, ''), value];
|
||||
})
|
||||
);
|
||||
|
||||
const schema = args.schema || 'onoffline';
|
||||
const table = args.table || 'onoffline_record';
|
||||
const start = args.start;
|
||||
const days = Number(args.days || '30');
|
||||
const tablespace = args.tablespace || '';
|
||||
const output = args.output;
|
||||
|
||||
if (!start) {
|
||||
console.error('Missing required argument: --start=YYYY-MM-DD');
|
||||
process.exit(1);
|
||||
}
|
||||
if (!Number.isFinite(days) || days <= 0) {
|
||||
console.error('Invalid --days value. It must be a positive integer.');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const base = new Date(`${start}T00:00:00`);
|
||||
if (Number.isNaN(base.getTime())) {
|
||||
console.error('Invalid start date. Use format YYYY-MM-DD');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const statements = [];
|
||||
for (let i = 0; i < days; i += 1) {
|
||||
const date = new Date(base);
|
||||
date.setDate(base.getDate() + i);
|
||||
|
||||
const startMs = date.getTime();
|
||||
const endDate = new Date(date);
|
||||
endDate.setDate(endDate.getDate() + 1);
|
||||
const endMs = endDate.getTime();
|
||||
|
||||
const yyyy = date.getFullYear();
|
||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(date.getDate()).padStart(2, '0');
|
||||
const suffix = `${yyyy}${mm}${dd}`;
|
||||
|
||||
const tablespaceClause = tablespace ? ` TABLESPACE ${tablespace}` : '';
|
||||
statements.push(
|
||||
`CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};`
|
||||
);
|
||||
}
|
||||
|
||||
const sql = `${statements.join('\n\n')}\n`;
|
||||
|
||||
if (output) {
|
||||
const outputPath = path.resolve(output);
|
||||
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||
console.log(`Partition range SQL written to ${outputPath}`);
|
||||
} else {
|
||||
process.stdout.write(sql);
|
||||
}
|
||||
49
SQL_Script/generate_partition_sql.js
Normal file
49
SQL_Script/generate_partition_sql.js
Normal file
@@ -0,0 +1,49 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
const args = Object.fromEntries(
|
||||
process.argv.slice(2).map((arg) => {
|
||||
const [key, value] = arg.split('=');
|
||||
return [key.replace(/^--/, ''), value];
|
||||
})
|
||||
);
|
||||
|
||||
const schema = args.schema || 'onoffline';
|
||||
const table = args.table || 'onoffline_record';
|
||||
const dateInput = args.date;
|
||||
const tablespace = args.tablespace || '';
|
||||
const output = args.output;
|
||||
|
||||
if (!dateInput) {
|
||||
console.error('Missing required argument: --date=YYYY-MM-DD');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const date = new Date(`${dateInput}T00:00:00`);
|
||||
if (Number.isNaN(date.getTime())) {
|
||||
console.error('Invalid date. Use format YYYY-MM-DD');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const startMs = date.getTime();
|
||||
const endDate = new Date(date);
|
||||
endDate.setDate(endDate.getDate() + 1);
|
||||
const endMs = endDate.getTime();
|
||||
|
||||
const yyyy = date.getFullYear();
|
||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(date.getDate()).padStart(2, '0');
|
||||
const suffix = `${yyyy}${mm}${dd}`;
|
||||
const tablespaceClause = tablespace ? `TABLESPACE ${tablespace}` : '';
|
||||
|
||||
const sql = `CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})\n${tablespaceClause};\n`;
|
||||
|
||||
if (output) {
|
||||
const outputPath = path.resolve(output);
|
||||
fs.writeFileSync(outputPath, sql, 'utf8');
|
||||
console.log(`Partition SQL written to ${outputPath}`);
|
||||
} else {
|
||||
process.stdout.write(sql);
|
||||
}
|
||||
@@ -29,6 +29,16 @@ POSTGRES_IDLE_TIMEOUT_MS=30000
|
||||
DB_SCHEMA=onoffline
|
||||
DB_TABLE=onoffline_record
|
||||
|
||||
# =========================
|
||||
# PostgreSQL 配置 G5库专用
|
||||
# =========================
|
||||
POSTGRES_HOST_G5=10.8.8.80
|
||||
POSTGRES_PORT_G5=5434
|
||||
POSTGRES_DATABASE_G5=log_platform
|
||||
POSTGRES_USER_G5=log_admin
|
||||
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
|
||||
POSTGRES_IDLE_TIMEOUT_MS_G5=30000
|
||||
|
||||
PORT=3001
|
||||
LOG_LEVEL=info
|
||||
|
||||
@@ -39,3 +49,11 @@ REDIS_PASSWORD=
|
||||
REDIS_DB=15
|
||||
REDIS_CONNECT_TIMEOUT_MS=5000
|
||||
REDIS_PROJECT_NAME=bls-onoffline
|
||||
|
||||
# DB write switches
|
||||
DB_WRITE_ENABLED=true
|
||||
G5_DB_WRITE_ENABLED=true
|
||||
|
||||
# G5 room_status.moment sync
|
||||
# false: do not write room_status.room_status_moment_g5
|
||||
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false
|
||||
|
||||
@@ -29,3 +29,7 @@ REDIS_PASSWORD=
|
||||
REDIS_DB=0
|
||||
REDIS_PROJECT_NAME=bls-onoffline
|
||||
REDIS_API_BASE_URL=http://localhost:3001
|
||||
|
||||
# G5 room_status.moment sync
|
||||
# false: do not write room_status.room_status_moment_g5
|
||||
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false
|
||||
|
||||
@@ -18,7 +18,13 @@ bls-onoffline-backend
|
||||
- 复制 .env.example 为 .env 并按实际环境配置
|
||||
|
||||
数据库初始化
|
||||
- 启动时自动执行 scripts/init_db.sql 并预创建未来 30 天分区
|
||||
- 运行服务前请先通过根目录 SQL_Script 下脚本完成建库与分区维护
|
||||
- `../SQL_Script/create_database.sql`:建库(psql)
|
||||
- `../SQL_Script/create_schema_and_parent_table.sql`:建 schema 与主分区表
|
||||
- `../SQL_Script/create_partition_for_day.sql`:按日建分区模板
|
||||
- `../SQL_Script/generate_init_sql.js`:生成建库+建表 SQL
|
||||
- `../SQL_Script/generate_partition_sql.js`:生成单日分区 SQL
|
||||
- `../SQL_Script/generate_partition_range_sql.js`:生成批量分区 SQL
|
||||
|
||||
规范说明
|
||||
- 规格文件位于 spec/onoffline-spec.md
|
||||
|
||||
1218
bls-onoffline-backend/dist/index.js
vendored
1218
bls-onoffline-backend/dist/index.js
vendored
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,13 @@
|
||||
# Change: add g5 room status ip upsert
|
||||
|
||||
## Why
|
||||
当前双写仅写入 onoffline_record_g5,未同步更新 G5 状态表 `room_status.room_status_moment_g5` 的 `ip` 字段,导致状态表与最新上报数据存在延迟。
|
||||
|
||||
## What Changes
|
||||
- 新增 G5 状态表同步机制:按 `hotel_id + room_id` 查找目标行,并使用 `ON CONFLICT DO UPDATE` 更新 `ip`。
|
||||
- 同步时仅处理查找到的第一条匹配行(`LIMIT 1`)。
|
||||
- 无论 `ip` 是否变化都执行 upsert 更新,以触发数据库更新时间相关触发器。
|
||||
|
||||
## Impact
|
||||
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||
- Affected code: `src/db/g5DatabaseManager.js`, `src/index.js`, `tests/g5DatabaseManager.test.js`
|
||||
@@ -0,0 +1,19 @@
|
||||
## ADDED Requirements
|
||||
|
||||
### Requirement: G5 状态表 IP 同步
|
||||
系统 SHALL 在处理并写入数据时,同步将对应设备的 `ip` 更新到 G5 状态表 `room_status.room_status_moment_g5`。
|
||||
|
||||
#### Scenario: 按唯一键同步 IP
|
||||
- **GIVEN** 当前处理行包含 `hotel_id` 与 `room_id`
|
||||
- **WHEN** 执行 G5 状态表同步
|
||||
- **THEN** 系统按 `hotel_id + room_id` 查找状态表记录,并仅对查找到的第一条记录执行写入
|
||||
|
||||
#### Scenario: 使用 upsert 触发更新
|
||||
- **GIVEN** 状态表已存在同键记录
|
||||
- **WHEN** 执行写入
|
||||
- **THEN** 系统使用 `ON CONFLICT (hotel_id, room_id) DO UPDATE` 更新 `ip`
|
||||
|
||||
#### Scenario: IP 不变仍触发更新
|
||||
- **GIVEN** 新 `ip` 与库内 `ip` 相同
|
||||
- **WHEN** 执行状态表同步
|
||||
- **THEN** 系统仍执行更新语句,以触发表上的更新时间相关触发器
|
||||
@@ -0,0 +1,10 @@
|
||||
## 1. Implementation
|
||||
- [x] 1.1 Add room status IP upsert method in G5 DB manager.
|
||||
- [x] 1.2 Trigger room status IP upsert in the main write path.
|
||||
- [x] 1.3 Add tests for status mapping and room status dedupe behavior.
|
||||
- [x] 1.4 Add OpenSpec delta for the new synchronization requirement.
|
||||
|
||||
## 2. Validation
|
||||
- [x] 2.1 Run `npm run test`.
|
||||
- [x] 2.2 Run `npm run build`.
|
||||
- [x] 2.3 Run `openspec validate add-g5-room-status-ip-upsert --strict`.
|
||||
@@ -0,0 +1,11 @@
|
||||
# Proposal: Refactor Partition Indexes
|
||||
|
||||
## Goal
|
||||
利用 PostgreSQL 默认的支持,改变每日分区创立时的索引策略,不再在代码中对每个分区单独创建索引。
|
||||
|
||||
## Context
|
||||
当前 `PartitionManager` 在动态创建子分区后,会隐式调用查询在子分区上创建六个单列索引。由于我们使用的是 PostgreSQL 11+,且我们在初始化脚本中的主分区表 `onoffline.onoffline_record` 上已经创建了所有的索引,此主表上的索引会自动应用于所有的子分区,不需要我们在创建分区时另外手动添加。
|
||||
|
||||
## Proposed Changes
|
||||
1. 在 `src/db/partitionManager.js` 中移除子分区显式创建索引的方法 `ensurePartitionIndexes` 以及针对已有子分区的循环索引检查函数 `ensureIndexesForExistingPartitions`。
|
||||
2. 在更新分区流程 `ensurePartitions` 以及 `ensurePartitionsForTimestamps` 中,移除对 `ensurePartitionIndexes` 的调用。
|
||||
@@ -0,0 +1,11 @@
|
||||
# Spec Delta: onoffline-backend
|
||||
|
||||
## MODIFIED Requirements
|
||||
|
||||
### Requirement: 数据库分区策略
|
||||
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引。
|
||||
|
||||
#### Scenario: 分区预创建
|
||||
- **GIVEN** 系统启动或每日凌晨
|
||||
- **WHEN** 运行分区维护任务
|
||||
- **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引
|
||||
@@ -0,0 +1,6 @@
|
||||
# Tasks: Refactor Partition Indexes
|
||||
|
||||
- [x] refactor `src/db/partitionManager.js`: remove `ensurePartitionIndexes` and `ensureIndexesForExistingPartitions`.
|
||||
- [x] refactor `src/db/partitionManager.js`: update `ensurePartitions` and `ensurePartitionsForTimestamps` to remove calls to `ensurePartitionIndexes`.
|
||||
- [x] refactor `src/db/initializer.js` (and any other occurrences) to reflect the removal.
|
||||
- [x] update openspec requirements to clarify that index propagation relies on PostgreSQL parent-table indexes.
|
||||
@@ -0,0 +1,14 @@
|
||||
# Change: remove runtime db provisioning
|
||||
|
||||
## Why
|
||||
当前服务在运行时承担了建库、建表和分区维护职责,导致服务职责边界不清晰,也会引入启动阶段 DDL 风险。现已将该能力剥离到根目录 `SQL_Script/`,需要通过 OpenSpec 正式记录为规范变更。
|
||||
|
||||
## What Changes
|
||||
- 移除服务启动阶段的数据库初始化与定时分区维护要求。
|
||||
- 移除服务在写入失败时自动创建缺失分区的要求。
|
||||
- 明确数据库结构与分区维护由外部脚本(`SQL_Script/`)负责。
|
||||
- 保留服务的核心职责:Kafka 消费、解析、写库、重试与监控。
|
||||
|
||||
## Impact
|
||||
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||
- Affected code: `src/index.js`, `src/config/config.js`, `src/db/initializer.js`, `src/db/partitionManager.js`, `scripts/init_db.sql`, `scripts/verify_partitions.js`, `../SQL_Script/*`
|
||||
@@ -0,0 +1,32 @@
|
||||
## MODIFIED Requirements
|
||||
|
||||
### Requirement: 数据库分区策略
|
||||
系统 SHALL 使用 Range Partitioning 按天分区;运行服务本身 SHALL NOT 执行建库、建表、分区创建或定时分区维护。
|
||||
|
||||
#### Scenario: 服务启动不执行 DDL
|
||||
- **GIVEN** 服务进程启动
|
||||
- **WHEN** 进入 bootstrap 过程
|
||||
- **THEN** 仅初始化消费、处理、监控相关能力,不执行数据库创建、表结构初始化与分区创建
|
||||
|
||||
#### Scenario: 分区由外部脚本维护
|
||||
- **GIVEN** 需要创建数据库对象或新增未来分区
|
||||
- **WHEN** 执行外部 SQL/JS 工具
|
||||
- **THEN** 通过根目录 `SQL_Script/` 完成建库和分区维护,而不是由服务运行时自动执行
|
||||
|
||||
### Requirement: 批量消费与写入
|
||||
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。
|
||||
|
||||
#### Scenario: 批量写入
|
||||
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
|
||||
- **WHEN** 缓冲区满或超时 (e.g., 200ms)
|
||||
- **THEN** 执行一次批量数据库插入操作
|
||||
|
||||
#### Scenario: 写入失败降级
|
||||
- **GIVEN** 批量写入因数据错误失败 (非连接错误)
|
||||
- **WHEN** 捕获异常
|
||||
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
|
||||
|
||||
#### Scenario: 分区缺失错误处理
|
||||
- **GIVEN** 写入时数据库返回分区缺失错误
|
||||
- **WHEN** 服务处理该错误
|
||||
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建
|
||||
@@ -0,0 +1,12 @@
|
||||
## 1. Implementation
|
||||
- [x] 1.1 Remove runtime DB initialization from bootstrap flow (`src/index.js`).
|
||||
- [x] 1.2 Remove scheduled partition maintenance job from runtime service.
|
||||
- [x] 1.3 Remove runtime missing-partition auto-fix behavior.
|
||||
- [x] 1.4 Remove legacy DB provisioning modules and scripts from service project.
|
||||
- [x] 1.5 Add external SQL/JS provisioning scripts under root `SQL_Script/` for DB/schema/partition management.
|
||||
- [x] 1.6 Update project docs to point DB provisioning to `SQL_Script/`.
|
||||
|
||||
## 2. Validation
|
||||
- [x] 2.1 Run `npm run lint` in `bls-onoffline-backend`.
|
||||
- [x] 2.2 Run `npm run build` in `bls-onoffline-backend`.
|
||||
- [x] 2.3 Run `openspec validate remove-runtime-db-provisioning --strict`.
|
||||
@@ -0,0 +1,13 @@
|
||||
# Change: add g5 room status sync toggle
|
||||
|
||||
## Why
|
||||
当前 G5 `room_status.room_status_moment_g5` 同步是默认启用的,但新的运行要求需要默认关闭,避免未经确认就写入状态表。
|
||||
|
||||
## What Changes
|
||||
- 新增一个环境变量开关,控制是否同步写入 G5 `room_status.room_status_moment_g5`。
|
||||
- 默认值为 `false`,即不写入该表。
|
||||
- 当开关开启时,保留现有按 `hotel_id + room_id` upsert 更新 `ip` 的行为。
|
||||
|
||||
## Impact
|
||||
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||
- Affected code: `src/config/config.js`, `src/index.js`, `.env`, `.env.example`
|
||||
@@ -0,0 +1,14 @@
|
||||
## ADDED Requirements
|
||||
|
||||
### Requirement: G5 状态表同步开关
|
||||
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`。
|
||||
|
||||
#### Scenario: 默认关闭
|
||||
- **GIVEN** 未设置 G5 状态表同步开关
|
||||
- **WHEN** 服务启动并处理 Kafka 消息
|
||||
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
|
||||
|
||||
#### Scenario: 显式开启
|
||||
- **GIVEN** G5 状态表同步开关被设置为 `true`
|
||||
- **WHEN** 服务处理 Kafka 消息
|
||||
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据
|
||||
@@ -0,0 +1,11 @@
|
||||
## 1. Implementation
|
||||
- [x] 1.1 Add a config flag for G5 room_status synchronization.
|
||||
- [x] 1.2 Disable `room_status.room_status_moment_g5` writes by default.
|
||||
- [x] 1.3 Keep the existing upsert behavior when the flag is enabled.
|
||||
- [x] 1.4 Update environment samples to set the flag to `false`.
|
||||
|
||||
## 2. Validation
|
||||
- [x] 2.1 Run `npm run lint`.
|
||||
- [x] 2.2 Run `npm run test`.
|
||||
- [x] 2.3 Run `npm run build`.
|
||||
- [x] 2.4 Run `openspec validate add-g5-room-status-sync-toggle --strict`.
|
||||
@@ -0,0 +1,13 @@
|
||||
# Change: add restart current_status mapping
|
||||
|
||||
## Why
|
||||
上游 Kafka 的 `CurrentStatus` 新增了 `restart` 值。现有处理逻辑仍按 `on/off` 处理,导致 `restart` 在入库时无法被正确标记为 3。
|
||||
|
||||
## What Changes
|
||||
- 让 Kafka 解析链路接受 `CurrentStatus=restart`。
|
||||
- 将 G5 入库链路中的 `current_status=restart` 映射为 `3`。
|
||||
- 更新相关测试与 OpenSpec 说明,确保 `restart` 是受支持状态。
|
||||
|
||||
## Impact
|
||||
- Affected specs: `openspec/specs/onoffline/spec.md`
|
||||
- Affected code: `src/processor/index.js`, `src/db/g5DatabaseManager.js`, `tests/processor.test.js`
|
||||
@@ -0,0 +1,14 @@
|
||||
## MODIFIED Requirements
|
||||
|
||||
### Requirement: 重启数据处理
|
||||
系统 SHALL 在 `CurrentStatus` 为 `restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`。
|
||||
|
||||
#### Scenario: restart 状态写入
|
||||
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `restart`
|
||||
- **WHEN** 消息被处理并写入数据库
|
||||
- **THEN** 普通入库链路保留 `restart`,G5 入库链路将其写入为 `3`
|
||||
|
||||
#### Scenario: 其他状态保持原样
|
||||
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `on` 或 `off`
|
||||
- **WHEN** 消息被处理并写入数据库
|
||||
- **THEN** 系统按既有规则处理该状态值
|
||||
@@ -0,0 +1,10 @@
|
||||
## 1. Implementation
|
||||
- [x] 1.1 Update Kafka row building logic to preserve `restart` as a valid `current_status` value.
|
||||
- [x] 1.2 Update G5 database mapping so `restart` maps to `3`.
|
||||
- [x] 1.3 Update processor tests for the `restart` case.
|
||||
- [x] 1.4 Update OpenSpec requirements for supported current status values.
|
||||
|
||||
## 2. Validation
|
||||
- [x] 2.1 Run `npm run test`.
|
||||
- [x] 2.2 Run `npm run build`.
|
||||
- [x] 2.3 Run `openspec validate add-restart-current-status-mapping --strict`.
|
||||
@@ -12,12 +12,17 @@
|
||||
- **THEN** current_status 等于 CurrentStatus (截断至 255 字符)
|
||||
|
||||
### Requirement: 重启数据处理
|
||||
系统 SHALL 在 RebootReason 非空时强制 current_status 为 on。
|
||||
系统 SHALL 在 `CurrentStatus` 为 `restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`。
|
||||
|
||||
#### Scenario: 重启数据写入
|
||||
- **GIVEN** RebootReason 为非空值
|
||||
- **WHEN** 消息被处理
|
||||
- **THEN** current_status 等于 on
|
||||
#### Scenario: restart 状态写入
|
||||
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `restart`
|
||||
- **WHEN** 消息被处理并写入数据库
|
||||
- **THEN** 普通入库链路保留 `restart`,G5 入库链路将其写入为 `3`
|
||||
|
||||
#### Scenario: 其他状态保持原样
|
||||
- **GIVEN** Kafka 消息中的 `CurrentStatus` 为 `on` 或 `off`
|
||||
- **WHEN** 消息被处理并写入数据库
|
||||
- **THEN** 系统按既有规则处理该状态值
|
||||
|
||||
### Requirement: 空值保留
|
||||
系统 SHALL 保留上游空值,不对字段进行补 0。
|
||||
@@ -28,12 +33,12 @@
|
||||
- **THEN** 数据库存储值为对应的空字符串
|
||||
|
||||
### Requirement: 数据库分区策略
|
||||
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表。
|
||||
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引。
|
||||
|
||||
#### Scenario: 分区预创建
|
||||
- **GIVEN** 系统启动或每日凌晨
|
||||
- **WHEN** 运行分区维护任务
|
||||
- **THEN** 确保数据库中存在未来 30 天的分区表
|
||||
- **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引
|
||||
|
||||
### Requirement: 消费可靠性 (At-Least-Once)
|
||||
系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。
|
||||
@@ -84,7 +89,7 @@
|
||||
- **THEN** 自动乘以 1000 转换为毫秒
|
||||
|
||||
### Requirement: 批量消费与写入
|
||||
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。
|
||||
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。
|
||||
|
||||
#### Scenario: 批量写入
|
||||
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
|
||||
@@ -96,3 +101,21 @@
|
||||
- **WHEN** 捕获异常
|
||||
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
|
||||
|
||||
#### Scenario: 分区缺失错误处理
|
||||
- **GIVEN** 写入时数据库返回分区缺失错误
|
||||
- **WHEN** 服务处理该错误
|
||||
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建
|
||||
|
||||
### Requirement: G5 状态表同步开关
|
||||
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`。
|
||||
|
||||
#### Scenario: 默认关闭
|
||||
- **GIVEN** 未设置 G5 状态表同步开关
|
||||
- **WHEN** 服务启动并处理 Kafka 消息
|
||||
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
|
||||
|
||||
#### Scenario: 显式开启
|
||||
- **GIVEN** G5 状态表同步开关被设置为 `true`
|
||||
- **WHEN** 服务处理 Kafka 消息
|
||||
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据
|
||||
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
CREATE SCHEMA IF NOT EXISTS onoffline;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS onoffline.onoffline_record (
|
||||
guid VARCHAR(32) NOT NULL,
|
||||
ts_ms BIGINT NOT NULL,
|
||||
write_ts_ms BIGINT NOT NULL,
|
||||
hotel_id SMALLINT NOT NULL,
|
||||
mac VARCHAR(21) NOT NULL,
|
||||
device_id VARCHAR(64) NOT NULL,
|
||||
room_id VARCHAR(64) NOT NULL,
|
||||
ip VARCHAR(21),
|
||||
current_status VARCHAR(255),
|
||||
launcher_version VARCHAR(255),
|
||||
reboot_reason VARCHAR(255),
|
||||
PRIMARY KEY (ts_ms, mac, device_id, room_id)
|
||||
) PARTITION BY RANGE (ts_ms);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_ts_ms ON onoffline.onoffline_record (ts_ms);
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_hotel_id ON onoffline.onoffline_record (hotel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_mac ON onoffline.onoffline_record (mac);
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_device_id ON onoffline.onoffline_record (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_room_id ON onoffline.onoffline_record (room_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_onoffline_current_status ON onoffline.onoffline_record (current_status);
|
||||
@@ -1,61 +0,0 @@
|
||||
import pg from 'pg';
|
||||
import { config } from '../src/config/config.js';
|
||||
|
||||
const { Pool } = pg;
|
||||
|
||||
const verify = async () => {
|
||||
const pool = new Pool({
|
||||
host: config.db.host,
|
||||
port: config.db.port,
|
||||
user: config.db.user,
|
||||
password: config.db.password,
|
||||
database: config.db.database,
|
||||
ssl: config.db.ssl,
|
||||
});
|
||||
|
||||
try {
|
||||
console.log('Verifying partitions for table onoffline_record...');
|
||||
const client = await pool.connect();
|
||||
|
||||
// Check parent table
|
||||
const parentRes = await client.query(`
|
||||
SELECT to_regclass('onoffline.onoffline_record') as oid;
|
||||
`);
|
||||
if (!parentRes.rows[0].oid) {
|
||||
console.error('Parent table onoffline.onoffline_record DOES NOT EXIST.');
|
||||
return;
|
||||
}
|
||||
console.log('Parent table onoffline.onoffline_record exists.');
|
||||
|
||||
// Check partitions
|
||||
const res = await client.query(`
|
||||
SELECT
|
||||
child.relname AS partition_name
|
||||
FROM pg_inherits
|
||||
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
|
||||
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
|
||||
JOIN pg_namespace ns ON parent.relnamespace = ns.oid
|
||||
WHERE parent.relname = 'onoffline_record' AND ns.nspname = 'onoffline'
|
||||
ORDER BY child.relname;
|
||||
`);
|
||||
|
||||
console.log(`Found ${res.rowCount} partitions.`);
|
||||
res.rows.forEach(row => {
|
||||
console.log(`- ${row.partition_name}`);
|
||||
});
|
||||
|
||||
if (res.rowCount >= 30) {
|
||||
console.log('SUCCESS: At least 30 partitions exist.');
|
||||
} else {
|
||||
console.warn(`WARNING: Expected 30+ partitions, found ${res.rowCount}.`);
|
||||
}
|
||||
|
||||
client.release();
|
||||
} catch (err) {
|
||||
console.error('Verification failed:', err);
|
||||
} finally {
|
||||
await pool.end();
|
||||
}
|
||||
};
|
||||
|
||||
verify();
|
||||
@@ -35,7 +35,16 @@ Topic:blwlog4Nodejs-rcu-onoffline-topic
|
||||
主键:(ts_ms, mac, device_id, room_id)
|
||||
按 ts_ms 每日分区
|
||||
|
||||
G5库结构(双写,临时接入):
|
||||
库同为:log_platform
|
||||
表:onoffline_record_g5
|
||||
差异字段:
|
||||
- guid 为 int4,由库自己生成。
|
||||
- record_source 固定为 CRICS。
|
||||
- current_status 为 int2,on映射为1,off映射为2,restart映射为3,其余为0。
|
||||
支持通过环境变量开关双写。
|
||||
|
||||
4. 数据处理规则
|
||||
非重启数据:reboot_reason 为空或不存在,current_status 取 CurrentStatus
|
||||
重启数据:reboot_reason 不为空,current_status 固定为 on
|
||||
重启数据:reboot_reason 不为空时保留 Kafka 上游 current_status 值;若上游值为 restart,则入库标记为 restart,G5 库映射为 3
|
||||
其余字段直接按 Kafka 原值落库,空值不补 0
|
||||
|
||||
@@ -13,6 +13,12 @@ const parseList = (value) =>
|
||||
.map((item) => item.trim())
|
||||
.filter(Boolean);
|
||||
|
||||
const parseBoolean = (value, defaultValue) => {
|
||||
if (value === undefined || value === null || value === '') return defaultValue;
|
||||
if (typeof value === 'boolean') return value;
|
||||
return value === 'true' || value === '1';
|
||||
};
|
||||
|
||||
export const config = {
|
||||
env: process.env.NODE_ENV || 'development',
|
||||
port: parseNumber(process.env.PORT, 3001),
|
||||
@@ -39,6 +45,7 @@ export const config = {
|
||||
} : undefined
|
||||
},
|
||||
db: {
|
||||
writeEnabled: parseBoolean(process.env.DB_WRITE_ENABLED, true),
|
||||
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
|
||||
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
|
||||
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
|
||||
@@ -49,6 +56,20 @@ export const config = {
|
||||
schema: process.env.DB_SCHEMA || 'onoffline',
|
||||
table: process.env.DB_TABLE || 'onoffline_record'
|
||||
},
|
||||
g5db: {
|
||||
writeEnabled: parseBoolean(process.env.G5_DB_WRITE_ENABLED, true),
|
||||
enabled: !!process.env.POSTGRES_HOST_G5,
|
||||
roomStatusMomentSyncEnabled: parseBoolean(process.env.G5_ROOM_STATUS_MOMENT_SYNC_ENABLED, false),
|
||||
host: process.env.POSTGRES_HOST_G5,
|
||||
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
|
||||
user: process.env.POSTGRES_USER_G5,
|
||||
password: process.env.POSTGRES_PASSWORD_G5,
|
||||
database: process.env.POSTGRES_DATABASE_G5,
|
||||
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 3),
|
||||
ssl: process.env.POSTGRES_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
|
||||
schema: process.env.DB_SCHEMA_G5 || 'onoffline',
|
||||
table: process.env.DB_TABLE_G5 || 'onoffline_record_g5'
|
||||
},
|
||||
redis: {
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: parseNumber(process.env.REDIS_PORT, 6379),
|
||||
|
||||
202
bls-onoffline-backend/src/db/g5DatabaseManager.js
Normal file
202
bls-onoffline-backend/src/db/g5DatabaseManager.js
Normal file
@@ -0,0 +1,202 @@
|
||||
import pg from 'pg';
|
||||
import { config } from '../config/config.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
const { Pool } = pg;
|
||||
|
||||
const g5Columns = [
|
||||
'ts_ms',
|
||||
'write_ts_ms',
|
||||
'hotel_id',
|
||||
'mac',
|
||||
'device_id',
|
||||
'room_id',
|
||||
'ip',
|
||||
'current_status',
|
||||
'launcher_version',
|
||||
'reboot_reason',
|
||||
'record_source'
|
||||
];
|
||||
|
||||
const roomStatusSyncSchema = 'room_status';
|
||||
const roomStatusSyncTable = 'room_status_moment_g5';
|
||||
|
||||
export const mapCurrentStatusToG5Code = (value) => {
|
||||
if (value === 1 || value === 2 || value === 3) {
|
||||
return value;
|
||||
}
|
||||
|
||||
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
|
||||
if (normalized === 'on') return 1;
|
||||
if (normalized === 'off') return 2;
|
||||
if (normalized === 'restart') return 3;
|
||||
return 0;
|
||||
};
|
||||
|
||||
export const dedupeRoomStatusSyncRows = (rows) => {
|
||||
const uniqueRows = new Map();
|
||||
for (const row of rows || []) {
|
||||
const hotelId = Number(row?.hotel_id);
|
||||
const roomId = row?.room_id ?? null;
|
||||
if (!Number.isFinite(hotelId) || roomId === null || roomId === '') {
|
||||
continue;
|
||||
}
|
||||
const key = `${hotelId}@@${String(roomId)}`;
|
||||
if (!uniqueRows.has(key)) {
|
||||
uniqueRows.set(key, {
|
||||
hotel_id: hotelId,
|
||||
room_id: String(roomId),
|
||||
ip: row?.ip ?? null
|
||||
});
|
||||
}
|
||||
}
|
||||
return Array.from(uniqueRows.values());
|
||||
};
|
||||
|
||||
export class G5DatabaseManager {
|
||||
constructor(dbConfig) {
|
||||
if (!dbConfig.enabled) return;
|
||||
this.pool = new Pool({
|
||||
host: dbConfig.host,
|
||||
port: dbConfig.port,
|
||||
user: dbConfig.user,
|
||||
password: dbConfig.password,
|
||||
database: dbConfig.database,
|
||||
max: dbConfig.max,
|
||||
ssl: dbConfig.ssl
|
||||
});
|
||||
}
|
||||
|
||||
async insertRows({ schema, table, rows }) {
|
||||
if (!this.pool || !rows || rows.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const statement = `
|
||||
INSERT INTO ${schema}.${table} (${g5Columns.join(', ')})
|
||||
SELECT *
|
||||
FROM UNNEST(
|
||||
$1::int8[],
|
||||
$2::int8[],
|
||||
$3::int2[],
|
||||
$4::text[],
|
||||
$5::text[],
|
||||
$6::text[],
|
||||
$7::text[],
|
||||
$8::int2[],
|
||||
$9::text[],
|
||||
$10::text[],
|
||||
$11::text[]
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
`;
|
||||
|
||||
try {
|
||||
const params = g5Columns.map((column) => {
|
||||
return rows.map((row) => {
|
||||
if (column === 'record_source') {
|
||||
return 'CRICS';
|
||||
}
|
||||
if (column === 'current_status') {
|
||||
// current_status in G5 is int2
|
||||
return mapCurrentStatusToG5Code(row.current_status);
|
||||
}
|
||||
return row[column] ?? null;
|
||||
});
|
||||
});
|
||||
|
||||
await this.pool.query(statement, params);
|
||||
} catch (error) {
|
||||
logger.error('G5 Database insert failed', {
|
||||
error: error?.message,
|
||||
schema,
|
||||
table,
|
||||
rowsLength: rows.length
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async upsertRoomStatusMomentIp(rows) {
|
||||
if (!this.pool || !rows || rows.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const syncRows = dedupeRoomStatusSyncRows(rows);
|
||||
if (syncRows.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const sql = `
|
||||
WITH input_rows AS (
|
||||
SELECT *
|
||||
FROM UNNEST($1::int2[], $2::text[], $3::text[])
|
||||
AS t(hotel_id, room_id, ip)
|
||||
), matched_rows AS (
|
||||
SELECT i.hotel_id, i.room_id, i.ip
|
||||
FROM input_rows i
|
||||
JOIN LATERAL (
|
||||
SELECT r.hotel_id, r.room_id
|
||||
FROM ${roomStatusSyncSchema}.${roomStatusSyncTable} r
|
||||
WHERE r.hotel_id = i.hotel_id
|
||||
AND r.room_id = i.room_id
|
||||
LIMIT 1
|
||||
) m ON TRUE
|
||||
)
|
||||
INSERT INTO ${roomStatusSyncSchema}.${roomStatusSyncTable} (hotel_id, room_id, ip)
|
||||
SELECT hotel_id, room_id, ip
|
||||
FROM matched_rows
|
||||
ON CONFLICT (hotel_id, room_id)
|
||||
DO UPDATE SET ip = EXCLUDED.ip
|
||||
`;
|
||||
|
||||
try {
|
||||
await this.pool.query(sql, [
|
||||
syncRows.map((row) => row.hotel_id),
|
||||
syncRows.map((row) => row.room_id),
|
||||
syncRows.map((row) => row.ip)
|
||||
]);
|
||||
} catch (error) {
|
||||
logger.error('G5 room_status_moment_g5 upsert failed', {
|
||||
error: error?.message,
|
||||
rowsLength: syncRows.length
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async checkConnection() {
|
||||
if (!this.pool) return true; // Pretend it's ok if disabled
|
||||
let client;
|
||||
try {
|
||||
const connectPromise = this.pool.connect();
|
||||
const timeoutPromise = new Promise((_, reject) => {
|
||||
setTimeout(() => reject(new Error('Connection timeout')), 5000);
|
||||
});
|
||||
try {
|
||||
client = await Promise.race([connectPromise, timeoutPromise]);
|
||||
} catch (raceError) {
|
||||
connectPromise.then(c => c.release()).catch(() => { });
|
||||
throw raceError;
|
||||
}
|
||||
await client.query('SELECT 1');
|
||||
return true;
|
||||
} catch (err) {
|
||||
logger.error('G5 Database check connection failed', { error: err.message });
|
||||
return false;
|
||||
} finally {
|
||||
if (client) {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async close() {
|
||||
if (this.pool) {
|
||||
await this.pool.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const g5DbManager = new G5DatabaseManager(config.g5db);
|
||||
export default g5DbManager;
|
||||
@@ -1,101 +0,0 @@
|
||||
import pg from 'pg';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import partitionManager from './partitionManager.js';
|
||||
import dbManager from './databaseManager.js';
|
||||
import { config } from '../config/config.js';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
class DatabaseInitializer {
|
||||
async initialize() {
|
||||
logger.info('Starting database initialization check...');
|
||||
|
||||
// 1. Check if database exists, create if not
|
||||
await this.ensureDatabaseExists();
|
||||
|
||||
// 2. Initialize Schema and Parent Table (if not exists)
|
||||
// Note: We need to use dbManager because it connects to the target database
|
||||
await this.ensureSchemaAndTable();
|
||||
|
||||
// 3. Ensure Partitions for the next month
|
||||
await partitionManager.ensurePartitions(30);
|
||||
await partitionManager.ensureIndexesForExistingPartitions();
|
||||
|
||||
console.log('Database initialization completed successfully.');
|
||||
logger.info('Database initialization completed successfully.');
|
||||
}
|
||||
|
||||
async ensureDatabaseExists() {
|
||||
const { host, port, user, password, database, ssl } = config.db;
|
||||
console.log(`Checking if database '${database}' exists at ${host}:${port}...`);
|
||||
|
||||
// Connect to 'postgres' database to check/create target database
|
||||
const client = new pg.Client({
|
||||
host,
|
||||
port,
|
||||
user,
|
||||
password,
|
||||
database: 'postgres',
|
||||
ssl: ssl ? { rejectUnauthorized: false } : false
|
||||
});
|
||||
|
||||
try {
|
||||
await client.connect();
|
||||
|
||||
const checkRes = await client.query(
|
||||
`SELECT 1 FROM pg_database WHERE datname = $1`,
|
||||
[database]
|
||||
);
|
||||
|
||||
if (checkRes.rowCount === 0) {
|
||||
logger.info(`Database '${database}' does not exist. Creating...`);
|
||||
// CREATE DATABASE cannot run inside a transaction block
|
||||
await client.query(`CREATE DATABASE "${database}"`);
|
||||
console.log(`Database '${database}' created.`);
|
||||
logger.info(`Database '${database}' created.`);
|
||||
} else {
|
||||
console.log(`Database '${database}' already exists.`);
|
||||
logger.info(`Database '${database}' already exists.`);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error ensuring database exists:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
await client.end();
|
||||
}
|
||||
}
|
||||
|
||||
async ensureSchemaAndTable() {
|
||||
// dbManager connects to the target database
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
const sqlPathCandidates = [
|
||||
path.resolve(process.cwd(), 'scripts/init_db.sql'),
|
||||
path.resolve(__dirname, '../scripts/init_db.sql'),
|
||||
path.resolve(__dirname, '../../scripts/init_db.sql')
|
||||
];
|
||||
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
|
||||
if (!sqlPath) {
|
||||
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
|
||||
}
|
||||
const sql = fs.readFileSync(sqlPath, 'utf8');
|
||||
|
||||
console.log(`Executing init_db.sql from ${sqlPath}...`);
|
||||
logger.info('Executing init_db.sql...');
|
||||
await client.query(sql);
|
||||
console.log('Schema and parent table initialized.');
|
||||
logger.info('Schema and parent table initialized.');
|
||||
} catch (err) {
|
||||
logger.error('Error initializing schema and table:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new DatabaseInitializer();
|
||||
@@ -1,197 +0,0 @@
|
||||
import { logger } from '../utils/logger.js';
|
||||
import { config } from '../config/config.js';
|
||||
import dbManager from './databaseManager.js';
|
||||
|
||||
class PartitionManager {
|
||||
/**
|
||||
* Calculate the start and end timestamps (milliseconds) for a given date.
|
||||
* @param {Date} date - The date to calculate for.
|
||||
* @returns {Object} { startMs, endMs, partitionSuffix }
|
||||
*/
|
||||
getPartitionInfo(date) {
|
||||
const yyyy = date.getFullYear();
|
||||
const mm = String(date.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(date.getDate()).padStart(2, '0');
|
||||
const partitionSuffix = `${yyyy}${mm}${dd}`;
|
||||
|
||||
const start = new Date(date);
|
||||
start.setHours(0, 0, 0, 0);
|
||||
const startMs = start.getTime();
|
||||
|
||||
const end = new Date(date);
|
||||
end.setDate(end.getDate() + 1);
|
||||
end.setHours(0, 0, 0, 0);
|
||||
const endMs = end.getTime();
|
||||
|
||||
return { startMs, endMs, partitionSuffix };
|
||||
}
|
||||
|
||||
async ensurePartitionIndexes(client, schema, table, partitionSuffix) {
|
||||
const startedAt = Date.now();
|
||||
const partitionName = `${schema}.${table}_${partitionSuffix}`;
|
||||
const indexBase = `${table}_${partitionSuffix}`;
|
||||
|
||||
const indexSpecs = [
|
||||
{ name: `idx_${indexBase}_ts`, column: 'ts_ms' },
|
||||
{ name: `idx_${indexBase}_hid`, column: 'hotel_id' },
|
||||
{ name: `idx_${indexBase}_mac`, column: 'mac' },
|
||||
{ name: `idx_${indexBase}_did`, column: 'device_id' },
|
||||
{ name: `idx_${indexBase}_rid`, column: 'room_id' },
|
||||
{ name: `idx_${indexBase}_cs`, column: 'current_status' }
|
||||
];
|
||||
|
||||
for (const spec of indexSpecs) {
|
||||
await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`);
|
||||
}
|
||||
|
||||
await client.query(`ANALYZE ${partitionName};`);
|
||||
|
||||
const elapsedMs = Date.now() - startedAt;
|
||||
if (elapsedMs > 1000) {
|
||||
logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs });
|
||||
}
|
||||
}
|
||||
|
||||
async ensureIndexesForExistingPartitions() {
|
||||
const startedAt = Date.now();
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
const schema = config.db.schema;
|
||||
const table = config.db.table;
|
||||
|
||||
const res = await client.query(
|
||||
`
|
||||
SELECT c.relname AS relname
|
||||
FROM pg_inherits i
|
||||
JOIN pg_class p ON i.inhparent = p.oid
|
||||
JOIN pg_namespace pn ON pn.oid = p.relnamespace
|
||||
JOIN pg_class c ON i.inhrelid = c.oid
|
||||
WHERE pn.nspname = $1 AND p.relname = $2
|
||||
ORDER BY c.relname;
|
||||
`,
|
||||
[schema, table]
|
||||
);
|
||||
|
||||
const suffixes = new Set();
|
||||
const pattern = new RegExp(`^${table}_(\\d{8})$`);
|
||||
for (const row of res.rows) {
|
||||
const relname = row?.relname;
|
||||
if (typeof relname !== 'string') continue;
|
||||
const match = relname.match(pattern);
|
||||
if (!match) continue;
|
||||
suffixes.add(match[1]);
|
||||
}
|
||||
|
||||
for (const suffix of suffixes) {
|
||||
await this.ensurePartitionIndexes(client, schema, table, suffix);
|
||||
}
|
||||
|
||||
const elapsedMs = Date.now() - startedAt;
|
||||
if (elapsedMs > 5000) {
|
||||
logger.warn('Ensure existing partition indexes slow', { schema, table, partitions: suffixes.size, elapsedMs });
|
||||
}
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure partitions exist for the past M days and next N days.
|
||||
* @param {number} daysAhead - Number of days to pre-create.
|
||||
* @param {number} daysBack - Number of days to look back.
|
||||
*/
|
||||
async ensurePartitions(daysAhead = 30, daysBack = 15) {
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
|
||||
console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
|
||||
const now = new Date();
|
||||
|
||||
for (let i = -daysBack; i < daysAhead; i++) {
|
||||
const targetDate = new Date(now);
|
||||
targetDate.setDate(now.getDate() + i);
|
||||
|
||||
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
|
||||
const schema = config.db.schema;
|
||||
const table = config.db.table;
|
||||
const partitionName = `${schema}.${table}_${partitionSuffix}`;
|
||||
|
||||
// Check if partition exists
|
||||
const checkSql = `
|
||||
SELECT to_regclass($1) as exists;
|
||||
`;
|
||||
const checkRes = await client.query(checkSql, [partitionName]);
|
||||
|
||||
if (!checkRes.rows[0].exists) {
|
||||
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
||||
console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
||||
const createSql = `
|
||||
CREATE TABLE IF NOT EXISTS ${partitionName}
|
||||
PARTITION OF ${schema}.${table}
|
||||
FOR VALUES FROM (${startMs}) TO (${endMs});
|
||||
`;
|
||||
await client.query(createSql);
|
||||
}
|
||||
|
||||
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
|
||||
}
|
||||
logger.info('Partition check completed.');
|
||||
} catch (err) {
|
||||
logger.error('Error ensuring partitions:', err);
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
async ensurePartitionsForTimestamps(tsMsList) {
|
||||
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
|
||||
|
||||
const uniqueSuffixes = new Set();
|
||||
for (const ts of tsMsList) {
|
||||
const numericTs = typeof ts === 'string' ? Number(ts) : ts;
|
||||
if (!Number.isFinite(numericTs)) continue;
|
||||
const date = new Date(numericTs);
|
||||
if (Number.isNaN(date.getTime())) continue;
|
||||
const { partitionSuffix } = this.getPartitionInfo(date);
|
||||
uniqueSuffixes.add(partitionSuffix);
|
||||
if (uniqueSuffixes.size >= 400) break;
|
||||
}
|
||||
|
||||
if (uniqueSuffixes.size === 0) return;
|
||||
|
||||
const client = await dbManager.pool.connect();
|
||||
try {
|
||||
const schema = config.db.schema;
|
||||
const table = config.db.table;
|
||||
|
||||
for (const partitionSuffix of uniqueSuffixes) {
|
||||
const yyyy = Number(partitionSuffix.slice(0, 4));
|
||||
const mm = Number(partitionSuffix.slice(4, 6));
|
||||
const dd = Number(partitionSuffix.slice(6, 8));
|
||||
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
|
||||
const targetDate = new Date(yyyy, mm - 1, dd);
|
||||
if (Number.isNaN(targetDate.getTime())) continue;
|
||||
|
||||
const { startMs, endMs } = this.getPartitionInfo(targetDate);
|
||||
const partitionName = `${schema}.${table}_${partitionSuffix}`;
|
||||
|
||||
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
|
||||
if (!checkRes.rows[0].exists) {
|
||||
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
|
||||
await client.query(`
|
||||
CREATE TABLE IF NOT EXISTS ${partitionName}
|
||||
PARTITION OF ${schema}.${table}
|
||||
FOR VALUES FROM (${startMs}) TO (${endMs});
|
||||
`);
|
||||
}
|
||||
|
||||
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
|
||||
}
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new PartitionManager();
|
||||
@@ -1,8 +1,7 @@
|
||||
import cron from 'node-cron';
|
||||
import { config } from './config/config.js';
|
||||
import dbManager from './db/databaseManager.js';
|
||||
import dbInitializer from './db/initializer.js';
|
||||
import partitionManager from './db/partitionManager.js';
|
||||
import g5DbManager from './db/g5DatabaseManager.js';
|
||||
import { createKafkaConsumers } from './kafka/consumer.js';
|
||||
import { parseMessageToRows } from './processor/index.js';
|
||||
import { createRedisClient } from './redis/redisClient.js';
|
||||
@@ -20,7 +19,13 @@ const bootstrap = async () => {
|
||||
port: config.db.port,
|
||||
user: config.db.user,
|
||||
database: config.db.database,
|
||||
schema: config.db.schema
|
||||
schema: config.db.schema,
|
||||
writeEnabled: config.db.writeEnabled
|
||||
},
|
||||
g5db: {
|
||||
enabled: config.g5db.enabled,
|
||||
writeEnabled: config.g5db.writeEnabled,
|
||||
roomStatusMomentSyncEnabled: config.g5db.roomStatusMomentSyncEnabled
|
||||
},
|
||||
kafka: {
|
||||
brokers: config.kafka.brokers,
|
||||
@@ -33,38 +38,12 @@ const bootstrap = async () => {
|
||||
}
|
||||
});
|
||||
|
||||
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
|
||||
await dbInitializer.initialize();
|
||||
|
||||
// Metric Collector
|
||||
const metricCollector = new MetricCollector();
|
||||
|
||||
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
|
||||
cron.schedule('0 0 * * *', async () => {
|
||||
logger.info('Running scheduled partition maintenance...');
|
||||
try {
|
||||
await partitionManager.ensurePartitions(30);
|
||||
} catch (err) {
|
||||
logger.error('Scheduled partition maintenance failed', err);
|
||||
}
|
||||
});
|
||||
|
||||
// 1.1 Setup Metric Reporting Cron Job (Every minute)
|
||||
// Moved after redisIntegration initialization
|
||||
|
||||
|
||||
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
|
||||
// In databaseManager.js it exports `dbManager` instance by default.
|
||||
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
|
||||
// Let's check `databaseManager.js` content.
|
||||
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
|
||||
// If `databaseManager.js` exports an instance as default, I should use that.
|
||||
// If it exports a class, I should instantiate it.
|
||||
|
||||
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
|
||||
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
|
||||
// I need to verify `databaseManager.js`.
|
||||
|
||||
const redisClient = await createRedisClient(config.redis);
|
||||
const redisIntegration = new RedisIntegration(
|
||||
redisClient,
|
||||
@@ -81,7 +60,7 @@ const bootstrap = async () => {
|
||||
const report = `[Metrics] Pulled:${metrics.kafka_pulled} ParseErr:${metrics.parse_error} Inserted:${metrics.db_inserted} Failed:${metrics.db_failed} FlushAvg:${flushAvgMs}ms DbAvg:${dbAvgMs}ms`;
|
||||
console.log(report);
|
||||
logger.info(report);
|
||||
|
||||
|
||||
try {
|
||||
await redisIntegration.info('Minute Metrics', metrics);
|
||||
} catch (err) {
|
||||
@@ -134,7 +113,7 @@ const bootstrap = async () => {
|
||||
const BATCH_SIZE = Math.max(10, Math.min(configuredBatchSize, configuredMaxInFlight));
|
||||
const BATCH_TIMEOUT_MS = Math.max(1, configuredBatchTimeoutMs);
|
||||
const commitOnAttempt = config.kafka.commitOnAttempt === true;
|
||||
|
||||
|
||||
const batchStates = new Map();
|
||||
|
||||
const partitionKeyFromMessage = (message) => {
|
||||
@@ -199,16 +178,26 @@ const bootstrap = async () => {
|
||||
);
|
||||
};
|
||||
|
||||
const isMissingPartitionError = (err) =>
|
||||
err?.code === '23514' ||
|
||||
(typeof err?.message === 'string' && err.message.includes('no partition of relation'));
|
||||
|
||||
const insertRowsWithRetry = async (rows) => {
|
||||
const startedAt = Date.now();
|
||||
let attemptedPartitionFix = false;
|
||||
while (true) {
|
||||
try {
|
||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
||||
const promises = [];
|
||||
if (config.db.writeEnabled) {
|
||||
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
|
||||
}
|
||||
if (config.g5db.writeEnabled && config.g5db.enabled) {
|
||||
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
||||
logger.error('G5 Database insert failed but non-blocking', { error: e.message });
|
||||
}));
|
||||
if (config.g5db.roomStatusMomentSyncEnabled) {
|
||||
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
||||
logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message });
|
||||
}));
|
||||
}
|
||||
}
|
||||
await Promise.all(promises);
|
||||
|
||||
metricCollector.increment('db_insert_count', 1);
|
||||
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
||||
return;
|
||||
@@ -222,24 +211,6 @@ const bootstrap = async () => {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
|
||||
attemptedPartitionFix = true;
|
||||
try {
|
||||
await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms));
|
||||
} catch (partitionErr) {
|
||||
if (isDbConnectionError(partitionErr)) {
|
||||
logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message });
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
while (!(await dbManager.checkConnection())) {
|
||||
logger.warn('Database still offline. Waiting 5s...');
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
throw partitionErr;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
@@ -247,7 +218,21 @@ const bootstrap = async () => {
|
||||
|
||||
const insertRowsOnce = async (rows) => {
|
||||
const startedAt = Date.now();
|
||||
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows });
|
||||
const promises = [];
|
||||
if (config.db.writeEnabled) {
|
||||
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
|
||||
}
|
||||
if (config.g5db.writeEnabled && config.g5db.enabled) {
|
||||
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
|
||||
logger.error('G5 Database insert failed in insertOnce', { error: e.message });
|
||||
}));
|
||||
if (config.g5db.roomStatusMomentSyncEnabled) {
|
||||
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
|
||||
logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message });
|
||||
}));
|
||||
}
|
||||
}
|
||||
await Promise.all(promises);
|
||||
metricCollector.increment('db_insert_count', 1);
|
||||
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
|
||||
};
|
||||
@@ -380,7 +365,7 @@ const bootstrap = async () => {
|
||||
for (const item of unresolvedItems) {
|
||||
try {
|
||||
await handleError(err, item.message);
|
||||
} catch {}
|
||||
} catch { }
|
||||
item.resolve();
|
||||
}
|
||||
}
|
||||
@@ -406,7 +391,7 @@ const bootstrap = async () => {
|
||||
metricCollector.increment('kafka_pulled');
|
||||
metricCollector.incrementKeyed('kafka_pulled_by_partition', `${message.topic}-${message.partition}`, 1);
|
||||
}
|
||||
|
||||
|
||||
// const messageValue = Buffer.isBuffer(message.value)
|
||||
// ? message.value.toString('utf8')
|
||||
// : message.value;
|
||||
@@ -422,7 +407,7 @@ const bootstrap = async () => {
|
||||
// value: config.kafka.logMessages ? messageValue : undefined,
|
||||
// valueLength: !config.kafka.logMessages && typeof messageValue === 'string' ? messageValue.length : null
|
||||
// };
|
||||
|
||||
|
||||
// logger.info('Kafka message received', logDetails);
|
||||
|
||||
const partitionKey = partitionKeyFromMessage(message);
|
||||
@@ -465,7 +450,7 @@ const bootstrap = async () => {
|
||||
// Graceful Shutdown Logic
|
||||
const shutdown = async (signal) => {
|
||||
logger.info(`Received ${signal}, shutting down...`);
|
||||
|
||||
|
||||
try {
|
||||
// 1. Close Kafka Consumer
|
||||
if (consumers && consumers.length > 0) {
|
||||
@@ -480,8 +465,9 @@ const bootstrap = async () => {
|
||||
await redisClient.quit();
|
||||
logger.info('Redis client closed');
|
||||
|
||||
// 4. Close Database Pool
|
||||
// 4. Close Database Pools
|
||||
await dbManager.close();
|
||||
await g5DbManager.close();
|
||||
logger.info('Database connection closed');
|
||||
|
||||
process.exit(0);
|
||||
|
||||
@@ -20,6 +20,16 @@ const normalizeText = (value, maxLength) => {
|
||||
return str;
|
||||
};
|
||||
|
||||
const normalizeCurrentStatus = (value) => {
|
||||
const currentStatus = normalizeText(value, 255);
|
||||
if (currentStatus === null) return null;
|
||||
const normalized = currentStatus.toLowerCase();
|
||||
if (normalized === 'on' || normalized === 'off' || normalized === 'restart') {
|
||||
return normalized;
|
||||
}
|
||||
return currentStatus;
|
||||
};
|
||||
|
||||
export const buildRowsFromMessageValue = (value) => {
|
||||
const payload = parseKafkaPayload(value);
|
||||
return buildRowsFromPayload(payload);
|
||||
@@ -30,9 +40,7 @@ export const buildRowsFromPayload = (rawPayload) => {
|
||||
|
||||
// Database limit is VARCHAR(255)
|
||||
const rebootReason = normalizeText(payload.RebootReason, 255);
|
||||
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255);
|
||||
const hasRebootReason = rebootReason !== null && rebootReason !== '';
|
||||
const currentStatus = hasRebootReason ? 'on' : currentStatusRaw;
|
||||
const currentStatus = normalizeCurrentStatus(payload.CurrentStatus);
|
||||
|
||||
// Derive timestamp: UnixTime -> CurrentTime -> Date.now()
|
||||
let tsMs = payload.UnixTime;
|
||||
|
||||
27
bls-onoffline-backend/tests/g5DatabaseManager.test.js
Normal file
27
bls-onoffline-backend/tests/g5DatabaseManager.test.js
Normal file
@@ -0,0 +1,27 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { dedupeRoomStatusSyncRows, mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js';
|
||||
|
||||
describe('G5 current_status mapping', () => {
|
||||
it('maps on/off/restart to numeric codes', () => {
|
||||
expect(mapCurrentStatusToG5Code('on')).toBe(1);
|
||||
expect(mapCurrentStatusToG5Code('off')).toBe(2);
|
||||
expect(mapCurrentStatusToG5Code('restart')).toBe(3);
|
||||
});
|
||||
|
||||
it('returns 0 for unknown values', () => {
|
||||
expect(mapCurrentStatusToG5Code('idle')).toBe(0);
|
||||
expect(mapCurrentStatusToG5Code(null)).toBe(0);
|
||||
});
|
||||
|
||||
it('dedupes room status sync rows by hotel_id and room_id using first row', () => {
|
||||
const rows = dedupeRoomStatusSyncRows([
|
||||
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' },
|
||||
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.2:5678' },
|
||||
{ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' }
|
||||
]);
|
||||
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows[0]).toEqual({ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' });
|
||||
expect(rows[1]).toEqual({ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' });
|
||||
});
|
||||
});
|
||||
@@ -26,13 +26,19 @@ describe('Processor Logic', () => {
|
||||
expect(rows[0].reboot_reason).toBeNull();
|
||||
});
|
||||
|
||||
it('should override current_status to on for reboot data', () => {
|
||||
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'off', RebootReason: '0x01' });
|
||||
it('should preserve restart current_status for reboot data', () => {
|
||||
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: '0x01' });
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].current_status).toBe('on');
|
||||
expect(rows[0].current_status).toBe('restart');
|
||||
expect(rows[0].reboot_reason).toBe('0x01');
|
||||
});
|
||||
|
||||
it('should preserve restart current_status for non-reboot data', () => {
|
||||
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: null });
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].current_status).toBe('restart');
|
||||
});
|
||||
|
||||
it('should keep empty optional fields as empty strings', () => {
|
||||
const rows = buildRowsFromPayload({
|
||||
...basePayload,
|
||||
|
||||
42
docs/onoffline_record_g5.sql
Normal file
42
docs/onoffline_record_g5.sql
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
Navicat Premium Dump SQL
|
||||
|
||||
Source Server : FnOS 80
|
||||
Source Server Type : PostgreSQL
|
||||
Source Server Version : 150017 (150017)
|
||||
Source Host : 10.8.8.80:5434
|
||||
Source Catalog : log_platform
|
||||
Source Schema : onoffline
|
||||
|
||||
Target Server Type : PostgreSQL
|
||||
Target Server Version : 150017 (150017)
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 10/03/2026 17:23:24
|
||||
*/
|
||||
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for onoffline_record_g5
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "onoffline"."onoffline_record_g5";
|
||||
CREATE TABLE "onoffline"."onoffline_record_g5" (
|
||||
"guid" int4 NOT NULL DEFAULT nextval('"onoffline".onoffline_record_g5_guid_seq'::regclass),
|
||||
"ts_ms" int8 NOT NULL,
|
||||
"write_ts_ms" int8 NOT NULL,
|
||||
"hotel_id" int2 NOT NULL,
|
||||
"mac" varchar(21) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"room_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"ip" varchar(25) COLLATE "pg_catalog"."default",
|
||||
"current_status" int2 NOT NULL DEFAULT 0,
|
||||
"launcher_version" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"reboot_reason" varchar(255) COLLATE "pg_catalog"."default",
|
||||
"record_source" varchar(50) COLLATE "pg_catalog"."default"
|
||||
)
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table onoffline_record_g5
|
||||
-- ----------------------------
|
||||
ALTER TABLE "onoffline"."onoffline_record_g5" ADD CONSTRAINT "onoffline_record_g5_pkey" PRIMARY KEY ("ts_ms", "guid");
|
||||
91
docs/room_status_moment_g5.sql
Normal file
91
docs/room_status_moment_g5.sql
Normal file
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
Navicat Premium Dump SQL
|
||||
|
||||
Source Server : FnOS 80
|
||||
Source Server Type : PostgreSQL
|
||||
Source Server Version : 150017 (150017)
|
||||
Source Host : 10.8.8.80:5434
|
||||
Source Catalog : log_platform
|
||||
Source Schema : room_status
|
||||
|
||||
Target Server Type : PostgreSQL
|
||||
Target Server Version : 150017 (150017)
|
||||
File Encoding : 65001
|
||||
|
||||
Date: 18/03/2026 10:55:09
|
||||
*/
|
||||
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for room_status_moment_g5
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS "room_status"."room_status_moment_g5";
|
||||
CREATE TABLE "room_status"."room_status_moment_g5" (
|
||||
"hotel_id" int2 NOT NULL,
|
||||
"room_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"device_id" text COLLATE "pg_catalog"."default" NOT NULL,
|
||||
"ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint,
|
||||
"sys_lock_status" int2,
|
||||
"online_status" int2,
|
||||
"launcher_version" text COLLATE "pg_catalog"."default",
|
||||
"app_version" text COLLATE "pg_catalog"."default",
|
||||
"config_version" text COLLATE "pg_catalog"."default",
|
||||
"register_ts_ms" int8,
|
||||
"upgrade_ts_ms" int8,
|
||||
"config_ts_ms" int8,
|
||||
"ip" text COLLATE "pg_catalog"."default",
|
||||
"pms_status" int2,
|
||||
"power_state" int2,
|
||||
"cardless_state" int2,
|
||||
"service_mask" int8,
|
||||
"insert_card" int2,
|
||||
"bright_g" int2,
|
||||
"agreement_ver" text COLLATE "pg_catalog"."default",
|
||||
"air_address" _text COLLATE "pg_catalog"."default",
|
||||
"air_state" _int2,
|
||||
"air_model" _int2,
|
||||
"air_speed" _int2,
|
||||
"air_set_temp" _int2,
|
||||
"air_now_temp" _int2,
|
||||
"air_solenoid_valve" _int2,
|
||||
"elec_address" _text COLLATE "pg_catalog"."default",
|
||||
"elec_voltage" _float8,
|
||||
"elec_ampere" _float8,
|
||||
"elec_power" _float8,
|
||||
"elec_phase" _float8,
|
||||
"elec_energy" _float8,
|
||||
"elec_sum_energy" _float8,
|
||||
"carbon_state" int2,
|
||||
"dev_loops" jsonb,
|
||||
"energy_carbon_sum" float8,
|
||||
"energy_nocard_sum" float8,
|
||||
"external_device" jsonb DEFAULT '{}'::jsonb,
|
||||
"faulty_device_count" jsonb DEFAULT '{}'::jsonb
|
||||
)
|
||||
WITH (fillfactor=90)
|
||||
TABLESPACE "ts_hot"
|
||||
;
|
||||
|
||||
-- ----------------------------
|
||||
-- Indexes structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree (
|
||||
"hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||
"online_status" "pg_catalog"."int2_ops" ASC NULLS LAST,
|
||||
"power_state" "pg_catalog"."int2_ops" ASC NULLS LAST
|
||||
);
|
||||
|
||||
-- ----------------------------
|
||||
-- Triggers structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5"
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"();
|
||||
CREATE TRIGGER "trigger_room_status_change" AFTER UPDATE ON "room_status"."room_status_moment_g5"
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE "room_status"."handle_room_status_change"();
|
||||
|
||||
-- ----------------------------
|
||||
-- Primary Key structure for table room_status_moment_g5
|
||||
-- ----------------------------
|
||||
ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");
|
||||
Reference in New Issue
Block a user