Compare commits

...

12 Commits

Author SHA1 Message Date
38f1421fff feat(数据库): 添加数据库写入开关配置
添加 DB_WRITE_ENABLED 和 G5_DB_WRITE_ENABLED 环境变量配置
在数据库操作逻辑中增加写入开关检查
2026-04-02 14:07:32 +08:00
7713cfeb9e feat: 添加 G5 状态表同步开关,默认关闭写入功能并更新相关配置和文档 2026-03-24 08:34:36 +08:00
fa363835a3 feat: 添加 G5 状态表 IP 同步功能,新增 upsert 方法并更新相关测试 2026-03-18 11:51:17 +08:00
381080fee0 feat: 添加对 Kafka CurrentStatusrestart 值支持,更新 G5 入库逻辑及相关测试 2026-03-18 09:47:33 +08:00
1329eca99e feat: 添加 G5 数据库支持,更新配置和文档 2026-03-10 19:52:58 +08:00
156930e6bc 修改gitigonre 2026-03-04 16:45:40 +08:00
33c9bf0e07 refactor: 移除运行时数据库初始化与分区维护
- 删除了服务启动阶段的数据库初始化逻辑,包括创建数据库、表和分区的相关代码。
- 移除了定时分区维护任务,确保服务职责更清晰。
- 更新了数据库分区策略,明确分区由外部脚本管理,服务不再自动创建缺失分区。
- 修改了相关文档,确保数据库结构与分区维护的责任转移到 `SQL_Script/` 目录下的外部脚本。
- 更新了需求和场景,确保符合新的设计规范。
2026-03-04 11:52:12 +08:00
3d80ad8710 回退版本 2026-03-03 22:10:18 +08:00
4492a9c47e feat: 添加强制热表空间的 SQL 构建功能,优化分区创建逻辑 2026-03-03 21:00:53 +08:00
ba61a540da feat: 添加当前或未来日期检查功能以优化分区创建逻辑 2026-03-03 20:21:57 +08:00
c6d7cea8cf Merge branch 'main' of http://blv-rd.tech:3001/Boonlive_RD_Web/Web_BLS_OnOffLine_Server 2026-03-03 18:37:23 +08:00
1eccc2e3aa refactor: 重构分区索引策略,移除显式创建索引的方法并更新相关流程 2026-03-03 18:22:21 +08:00
40 changed files with 908 additions and 1677 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/bls-onoffline-backend/node_modules /bls-onoffline-backend/node_modules
/template /template
/bls-onoffline-backend/dist

View File

@@ -0,0 +1,6 @@
-- Replace {{DATABASE_NAME}} before execution
-- Requires psql (uses \gexec)
SELECT format('CREATE DATABASE %I', '{{DATABASE_NAME}}')
WHERE NOT EXISTS (
SELECT 1 FROM pg_database WHERE datname = '{{DATABASE_NAME}}'
)\gexec;

View File

@@ -0,0 +1,12 @@
-- Replace placeholders before execution:
-- {{SCHEMA_NAME}} default: onoffline
-- {{TABLE_NAME}} default: onoffline_record
-- {{PARTITION_SUFFIX}} format: YYYYMMDD (example: 20260304)
-- {{START_TS_MS}} unix ms at 00:00:00.000
-- {{END_TS_MS}} unix ms at next day 00:00:00.000
-- {{TABLESPACE_CLAUSE}} either empty string or: TABLESPACE ts_hot
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}}_{{PARTITION_SUFFIX}}
PARTITION OF {{SCHEMA_NAME}}.{{TABLE_NAME}}
FOR VALUES FROM ({{START_TS_MS}}) TO ({{END_TS_MS}})
{{TABLESPACE_CLAUSE}};

View File

@@ -0,0 +1,27 @@
-- Replace placeholders before execution:
-- {{SCHEMA_NAME}} default: onoffline
-- {{TABLE_NAME}} default: onoffline_record
CREATE SCHEMA IF NOT EXISTS {{SCHEMA_NAME}};
CREATE TABLE IF NOT EXISTS {{SCHEMA_NAME}}.{{TABLE_NAME}} (
guid TEXT NOT NULL,
ts_ms BIGINT NOT NULL,
write_ts_ms BIGINT NOT NULL,
hotel_id SMALLINT,
mac TEXT,
device_id TEXT,
room_id TEXT,
ip TEXT,
current_status TEXT,
launcher_version TEXT,
reboot_reason TEXT,
PRIMARY KEY (ts_ms, guid)
) PARTITION BY RANGE (ts_ms);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_guid ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (guid);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_device_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (device_id);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_hotel_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (hotel_id);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_mac ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (mac);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_room_id ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (room_id);
CREATE INDEX IF NOT EXISTS idx_{{TABLE_NAME}}_status ON {{SCHEMA_NAME}}.{{TABLE_NAME}} (current_status);

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env node
import fs from 'fs';
import path from 'path';
const args = Object.fromEntries(
process.argv.slice(2).map((arg) => {
const [key, value] = arg.split('=');
return [key.replace(/^--/, ''), value];
})
);
const database = args.database || 'log_platform';
const schema = args.schema || 'onoffline';
const table = args.table || 'onoffline_record';
const output = args.output;
const sql = `SELECT format('CREATE DATABASE %I', '${database}')\nWHERE NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = '${database}')\\gexec;\n\nCREATE SCHEMA IF NOT EXISTS ${schema};\n\nCREATE TABLE IF NOT EXISTS ${schema}.${table} (\n guid TEXT NOT NULL,\n ts_ms BIGINT NOT NULL,\n write_ts_ms BIGINT NOT NULL,\n hotel_id SMALLINT,\n mac TEXT,\n device_id TEXT,\n room_id TEXT,\n ip TEXT,\n current_status TEXT,\n launcher_version TEXT,\n reboot_reason TEXT,\n PRIMARY KEY (ts_ms, guid)\n) PARTITION BY RANGE (ts_ms);\n\nCREATE INDEX IF NOT EXISTS idx_${table}_guid ON ${schema}.${table} (guid);\nCREATE INDEX IF NOT EXISTS idx_${table}_device_id ON ${schema}.${table} (device_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_hotel_id ON ${schema}.${table} (hotel_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_mac ON ${schema}.${table} (mac);\nCREATE INDEX IF NOT EXISTS idx_${table}_room_id ON ${schema}.${table} (room_id);\nCREATE INDEX IF NOT EXISTS idx_${table}_status ON ${schema}.${table} (current_status);\n`;
if (output) {
const outputPath = path.resolve(output);
fs.writeFileSync(outputPath, sql, 'utf8');
console.log(`Init SQL written to ${outputPath}`);
} else {
process.stdout.write(sql);
}

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env node
import fs from 'fs';
import path from 'path';
const args = Object.fromEntries(
process.argv.slice(2).map((arg) => {
const [key, value] = arg.split('=');
return [key.replace(/^--/, ''), value];
})
);
const schema = args.schema || 'onoffline';
const table = args.table || 'onoffline_record';
const start = args.start;
const days = Number(args.days || '30');
const tablespace = args.tablespace || '';
const output = args.output;
if (!start) {
console.error('Missing required argument: --start=YYYY-MM-DD');
process.exit(1);
}
if (!Number.isFinite(days) || days <= 0) {
console.error('Invalid --days value. It must be a positive integer.');
process.exit(1);
}
const base = new Date(`${start}T00:00:00`);
if (Number.isNaN(base.getTime())) {
console.error('Invalid start date. Use format YYYY-MM-DD');
process.exit(1);
}
const statements = [];
for (let i = 0; i < days; i += 1) {
const date = new Date(base);
date.setDate(base.getDate() + i);
const startMs = date.getTime();
const endDate = new Date(date);
endDate.setDate(endDate.getDate() + 1);
const endMs = endDate.getTime();
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, '0');
const dd = String(date.getDate()).padStart(2, '0');
const suffix = `${yyyy}${mm}${dd}`;
const tablespaceClause = tablespace ? ` TABLESPACE ${tablespace}` : '';
statements.push(
`CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})${tablespaceClause};`
);
}
const sql = `${statements.join('\n\n')}\n`;
if (output) {
const outputPath = path.resolve(output);
fs.writeFileSync(outputPath, sql, 'utf8');
console.log(`Partition range SQL written to ${outputPath}`);
} else {
process.stdout.write(sql);
}

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env node
import fs from 'fs';
import path from 'path';
const args = Object.fromEntries(
process.argv.slice(2).map((arg) => {
const [key, value] = arg.split('=');
return [key.replace(/^--/, ''), value];
})
);
const schema = args.schema || 'onoffline';
const table = args.table || 'onoffline_record';
const dateInput = args.date;
const tablespace = args.tablespace || '';
const output = args.output;
if (!dateInput) {
console.error('Missing required argument: --date=YYYY-MM-DD');
process.exit(1);
}
const date = new Date(`${dateInput}T00:00:00`);
if (Number.isNaN(date.getTime())) {
console.error('Invalid date. Use format YYYY-MM-DD');
process.exit(1);
}
const startMs = date.getTime();
const endDate = new Date(date);
endDate.setDate(endDate.getDate() + 1);
const endMs = endDate.getTime();
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, '0');
const dd = String(date.getDate()).padStart(2, '0');
const suffix = `${yyyy}${mm}${dd}`;
const tablespaceClause = tablespace ? `TABLESPACE ${tablespace}` : '';
const sql = `CREATE TABLE IF NOT EXISTS ${schema}.${table}_${suffix}\nPARTITION OF ${schema}.${table}\nFOR VALUES FROM (${startMs}) TO (${endMs})\n${tablespaceClause};\n`;
if (output) {
const outputPath = path.resolve(output);
fs.writeFileSync(outputPath, sql, 'utf8');
console.log(`Partition SQL written to ${outputPath}`);
} else {
process.stdout.write(sql);
}

View File

@@ -29,6 +29,16 @@ POSTGRES_IDLE_TIMEOUT_MS=30000
DB_SCHEMA=onoffline DB_SCHEMA=onoffline
DB_TABLE=onoffline_record DB_TABLE=onoffline_record
# =========================
# PostgreSQL 配置 G5库专用
# =========================
POSTGRES_HOST_G5=10.8.8.80
POSTGRES_PORT_G5=5434
POSTGRES_DATABASE_G5=log_platform
POSTGRES_USER_G5=log_admin
POSTGRES_PASSWORD_G5=H3IkLUt8K!x
POSTGRES_IDLE_TIMEOUT_MS_G5=30000
PORT=3001 PORT=3001
LOG_LEVEL=info LOG_LEVEL=info
@@ -39,3 +49,11 @@ REDIS_PASSWORD=
REDIS_DB=15 REDIS_DB=15
REDIS_CONNECT_TIMEOUT_MS=5000 REDIS_CONNECT_TIMEOUT_MS=5000
REDIS_PROJECT_NAME=bls-onoffline REDIS_PROJECT_NAME=bls-onoffline
# DB write switches
DB_WRITE_ENABLED=true
G5_DB_WRITE_ENABLED=true
# G5 room_status.moment sync
# false: do not write room_status.room_status_moment_g5
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false

View File

@@ -29,3 +29,7 @@ REDIS_PASSWORD=
REDIS_DB=0 REDIS_DB=0
REDIS_PROJECT_NAME=bls-onoffline REDIS_PROJECT_NAME=bls-onoffline
REDIS_API_BASE_URL=http://localhost:3001 REDIS_API_BASE_URL=http://localhost:3001
# G5 room_status.moment sync
# false: do not write room_status.room_status_moment_g5
G5_ROOM_STATUS_MOMENT_SYNC_ENABLED=false

View File

@@ -18,7 +18,13 @@ bls-onoffline-backend
- 复制 .env.example 为 .env 并按实际环境配置 - 复制 .env.example 为 .env 并按实际环境配置
数据库初始化 数据库初始化
- 启动时自动执行 scripts/init_db.sql 并预创建未来 30 天分区 - 运行服务前请先通过根目录 SQL_Script 下脚本完成建库与分区维护
- `../SQL_Script/create_database.sql`建库psql
- `../SQL_Script/create_schema_and_parent_table.sql`:建 schema 与主分区表
- `../SQL_Script/create_partition_for_day.sql`:按日建分区模板
- `../SQL_Script/generate_init_sql.js`:生成建库+建表 SQL
- `../SQL_Script/generate_partition_sql.js`:生成单日分区 SQL
- `../SQL_Script/generate_partition_range_sql.js`:生成批量分区 SQL
规范说明 规范说明
- 规格文件位于 spec/onoffline-spec.md - 规格文件位于 spec/onoffline-spec.md

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
# Change: add g5 room status ip upsert
## Why
当前双写仅写入 onoffline_record_g5未同步更新 G5 状态表 `room_status.room_status_moment_g5``ip` 字段,导致状态表与最新上报数据存在延迟。
## What Changes
- 新增 G5 状态表同步机制:按 `hotel_id + room_id` 查找目标行,并使用 `ON CONFLICT DO UPDATE` 更新 `ip`
- 同步时仅处理查找到的第一条匹配行(`LIMIT 1`)。
- 无论 `ip` 是否变化都执行 upsert 更新,以触发数据库更新时间相关触发器。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/db/g5DatabaseManager.js`, `src/index.js`, `tests/g5DatabaseManager.test.js`

View File

@@ -0,0 +1,19 @@
## ADDED Requirements
### Requirement: G5 状态表 IP 同步
系统 SHALL 在处理并写入数据时,同步将对应设备的 `ip` 更新到 G5 状态表 `room_status.room_status_moment_g5`
#### Scenario: 按唯一键同步 IP
- **GIVEN** 当前处理行包含 `hotel_id``room_id`
- **WHEN** 执行 G5 状态表同步
- **THEN** 系统按 `hotel_id + room_id` 查找状态表记录,并仅对查找到的第一条记录执行写入
#### Scenario: 使用 upsert 触发更新
- **GIVEN** 状态表已存在同键记录
- **WHEN** 执行写入
- **THEN** 系统使用 `ON CONFLICT (hotel_id, room_id) DO UPDATE` 更新 `ip`
#### Scenario: IP 不变仍触发更新
- **GIVEN** 新 `ip` 与库内 `ip` 相同
- **WHEN** 执行状态表同步
- **THEN** 系统仍执行更新语句,以触发表上的更新时间相关触发器

View File

@@ -0,0 +1,10 @@
## 1. Implementation
- [x] 1.1 Add room status IP upsert method in G5 DB manager.
- [x] 1.2 Trigger room status IP upsert in the main write path.
- [x] 1.3 Add tests for status mapping and room status dedupe behavior.
- [x] 1.4 Add OpenSpec delta for the new synchronization requirement.
## 2. Validation
- [x] 2.1 Run `npm run test`.
- [x] 2.2 Run `npm run build`.
- [x] 2.3 Run `openspec validate add-g5-room-status-ip-upsert --strict`.

View File

@@ -0,0 +1,11 @@
# Proposal: Refactor Partition Indexes
## Goal
利用 PostgreSQL 默认的支持,改变每日分区创立时的索引策略,不再在代码中对每个分区单独创建索引。
## Context
当前 `PartitionManager` 在动态创建子分区后,会隐式调用查询在子分区上创建六个单列索引。由于我们使用的是 PostgreSQL 11+,且我们在初始化脚本中的主分区表 `onoffline.onoffline_record` 上已经创建了所有的索引,此主表上的索引会自动应用于所有的子分区,不需要我们在创建分区时另外手动添加。
## Proposed Changes
1.`src/db/partitionManager.js` 中移除子分区显式创建索引的方法 `ensurePartitionIndexes` 以及针对已有子分区的循环索引检查函数 `ensureIndexesForExistingPartitions`
2. 在更新分区流程 `ensurePartitions` 以及 `ensurePartitionsForTimestamps` 中,移除对 `ensurePartitionIndexes` 的调用。

View File

@@ -0,0 +1,11 @@
# Spec Delta: onoffline-backend
## MODIFIED Requirements
### Requirement: 数据库分区策略
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引。
#### Scenario: 分区预创建
- **GIVEN** 系统启动或每日凌晨
- **WHEN** 运行分区维护任务
- **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引

View File

@@ -0,0 +1,6 @@
# Tasks: Refactor Partition Indexes
- [x] refactor `src/db/partitionManager.js`: remove `ensurePartitionIndexes` and `ensureIndexesForExistingPartitions`.
- [x] refactor `src/db/partitionManager.js`: update `ensurePartitions` and `ensurePartitionsForTimestamps` to remove calls to `ensurePartitionIndexes`.
- [x] refactor `src/db/initializer.js` (and any other occurrences) to reflect the removal.
- [x] update openspec requirements to clarify that index propagation relies on PostgreSQL parent-table indexes.

View File

@@ -0,0 +1,14 @@
# Change: remove runtime db provisioning
## Why
当前服务在运行时承担了建库、建表和分区维护职责,导致服务职责边界不清晰,也会引入启动阶段 DDL 风险。现已将该能力剥离到根目录 `SQL_Script/`,需要通过 OpenSpec 正式记录为规范变更。
## What Changes
- 移除服务启动阶段的数据库初始化与定时分区维护要求。
- 移除服务在写入失败时自动创建缺失分区的要求。
- 明确数据库结构与分区维护由外部脚本(`SQL_Script/`)负责。
- 保留服务的核心职责Kafka 消费、解析、写库、重试与监控。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/index.js`, `src/config/config.js`, `src/db/initializer.js`, `src/db/partitionManager.js`, `scripts/init_db.sql`, `scripts/verify_partitions.js`, `../SQL_Script/*`

View File

@@ -0,0 +1,32 @@
## MODIFIED Requirements
### Requirement: 数据库分区策略
系统 SHALL 使用 Range Partitioning 按天分区;运行服务本身 SHALL NOT 执行建库、建表、分区创建或定时分区维护。
#### Scenario: 服务启动不执行 DDL
- **GIVEN** 服务进程启动
- **WHEN** 进入 bootstrap 过程
- **THEN** 仅初始化消费、处理、监控相关能力,不执行数据库创建、表结构初始化与分区创建
#### Scenario: 分区由外部脚本维护
- **GIVEN** 需要创建数据库对象或新增未来分区
- **WHEN** 执行外部 SQL/JS 工具
- **THEN** 通过根目录 `SQL_Script/` 完成建库和分区维护,而不是由服务运行时自动执行
### Requirement: 批量消费与写入
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区。
#### Scenario: 批量写入
- **GIVEN** 短时间内收到多条消息 (e.g., 500条)
- **WHEN** 缓冲区满或超时 (e.g., 200ms)
- **THEN** 执行一次批量数据库插入操作
#### Scenario: 写入失败降级
- **GIVEN** 批量写入因数据错误失败 (非连接错误)
- **WHEN** 捕获异常
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
#### Scenario: 分区缺失错误处理
- **GIVEN** 写入时数据库返回分区缺失错误
- **WHEN** 服务处理该错误
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建

View File

@@ -0,0 +1,12 @@
## 1. Implementation
- [x] 1.1 Remove runtime DB initialization from bootstrap flow (`src/index.js`).
- [x] 1.2 Remove scheduled partition maintenance job from runtime service.
- [x] 1.3 Remove runtime missing-partition auto-fix behavior.
- [x] 1.4 Remove legacy DB provisioning modules and scripts from service project.
- [x] 1.5 Add external SQL/JS provisioning scripts under root `SQL_Script/` for DB/schema/partition management.
- [x] 1.6 Update project docs to point DB provisioning to `SQL_Script/`.
## 2. Validation
- [x] 2.1 Run `npm run lint` in `bls-onoffline-backend`.
- [x] 2.2 Run `npm run build` in `bls-onoffline-backend`.
- [x] 2.3 Run `openspec validate remove-runtime-db-provisioning --strict`.

View File

@@ -0,0 +1,13 @@
# Change: add g5 room status sync toggle
## Why
当前 G5 `room_status.room_status_moment_g5` 同步是默认启用的,但新的运行要求需要默认关闭,避免未经确认就写入状态表。
## What Changes
- 新增一个环境变量开关,控制是否同步写入 G5 `room_status.room_status_moment_g5`
- 默认值为 `false`,即不写入该表。
- 当开关开启时,保留现有按 `hotel_id + room_id` upsert 更新 `ip` 的行为。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/config/config.js`, `src/index.js`, `.env`, `.env.example`

View File

@@ -0,0 +1,14 @@
## ADDED Requirements
### Requirement: G5 状态表同步开关
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`
#### Scenario: 默认关闭
- **GIVEN** 未设置 G5 状态表同步开关
- **WHEN** 服务启动并处理 Kafka 消息
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
#### Scenario: 显式开启
- **GIVEN** G5 状态表同步开关被设置为 `true`
- **WHEN** 服务处理 Kafka 消息
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据

View File

@@ -0,0 +1,11 @@
## 1. Implementation
- [x] 1.1 Add a config flag for G5 room_status synchronization.
- [x] 1.2 Disable `room_status.room_status_moment_g5` writes by default.
- [x] 1.3 Keep the existing upsert behavior when the flag is enabled.
- [x] 1.4 Update environment samples to set the flag to `false`.
## 2. Validation
- [x] 2.1 Run `npm run lint`.
- [x] 2.2 Run `npm run test`.
- [x] 2.3 Run `npm run build`.
- [x] 2.4 Run `openspec validate add-g5-room-status-sync-toggle --strict`.

View File

@@ -0,0 +1,13 @@
# Change: add restart current_status mapping
## Why
上游 Kafka 的 `CurrentStatus` 新增了 `restart` 值。现有处理逻辑仍按 `on/off` 处理,导致 `restart` 在入库时无法被正确标记为 3。
## What Changes
- 让 Kafka 解析链路接受 `CurrentStatus=restart`
- 将 G5 入库链路中的 `current_status=restart` 映射为 `3`
- 更新相关测试与 OpenSpec 说明,确保 `restart` 是受支持状态。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/processor/index.js`, `src/db/g5DatabaseManager.js`, `tests/processor.test.js`

View File

@@ -0,0 +1,14 @@
## MODIFIED Requirements
### Requirement: 重启数据处理
系统 SHALL 在 `CurrentStatus``restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`
#### Scenario: restart 状态写入
- **GIVEN** Kafka 消息中的 `CurrentStatus``restart`
- **WHEN** 消息被处理并写入数据库
- **THEN** 普通入库链路保留 `restart`G5 入库链路将其写入为 `3`
#### Scenario: 其他状态保持原样
- **GIVEN** Kafka 消息中的 `CurrentStatus``on``off`
- **WHEN** 消息被处理并写入数据库
- **THEN** 系统按既有规则处理该状态值

View File

@@ -0,0 +1,10 @@
## 1. Implementation
- [x] 1.1 Update Kafka row building logic to preserve `restart` as a valid `current_status` value.
- [x] 1.2 Update G5 database mapping so `restart` maps to `3`.
- [x] 1.3 Update processor tests for the `restart` case.
- [x] 1.4 Update OpenSpec requirements for supported current status values.
## 2. Validation
- [x] 2.1 Run `npm run test`.
- [x] 2.2 Run `npm run build`.
- [x] 2.3 Run `openspec validate add-restart-current-status-mapping --strict`.

View File

@@ -12,12 +12,17 @@
- **THEN** current_status 等于 CurrentStatus (截断至 255 字符) - **THEN** current_status 等于 CurrentStatus (截断至 255 字符)
### Requirement: 重启数据处理 ### Requirement: 重启数据处理
系统 SHALL 在 RebootReason 非空时强制 current_status 为 on 系统 SHALL 在 `CurrentStatus``restart` 时将 `current_status` 保留为 `restart`,并在 G5 入库链路中映射为 `3`
#### Scenario: 重启数据写入 #### Scenario: restart 状态写入
- **GIVEN** RebootReason 为非空值 - **GIVEN** Kafka 消息中的 `CurrentStatus``restart`
- **WHEN** 消息被处理 - **WHEN** 消息被处理并写入数据库
- **THEN** current_status 等于 on - **THEN** 普通入库链路保留 `restart`G5 入库链路将其写入为 `3`
#### Scenario: 其他状态保持原样
- **GIVEN** Kafka 消息中的 `CurrentStatus``on``off`
- **WHEN** 消息被处理并写入数据库
- **THEN** 系统按既有规则处理该状态值
### Requirement: 空值保留 ### Requirement: 空值保留
系统 SHALL 保留上游空值,不对字段进行补 0。 系统 SHALL 保留上游空值,不对字段进行补 0。
@@ -28,12 +33,12 @@
- **THEN** 数据库存储值为对应的空字符串 - **THEN** 数据库存储值为对应的空字符串
### Requirement: 数据库分区策略 ### Requirement: 数据库分区策略
系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表。 系统 SHALL 使用 Range Partitioning 按天分区,并自动维护未来 30 天的分区表,子表依赖 PostgreSQL 原生机制继承主表索引
#### Scenario: 分区预创建 #### Scenario: 分区预创建
- **GIVEN** 系统启动或每日凌晨 - **GIVEN** 系统启动或每日凌晨
- **WHEN** 运行分区维护任务 - **WHEN** 运行分区维护任务
- **THEN** 确保数据库中存在未来 30 天的分区表 - **THEN** 确保数据库中存在未来 30 天的分区表,无需对子表显式创建单列表索引
### Requirement: 消费可靠性 (At-Least-Once) ### Requirement: 消费可靠性 (At-Least-Once)
系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。 系统 SHALL 仅在数据成功写入数据库后,才向 Kafka 提交消费位点。
@@ -84,7 +89,7 @@
- **THEN** 自动乘以 1000 转换为毫秒 - **THEN** 自动乘以 1000 转换为毫秒
### Requirement: 批量消费与写入 ### Requirement: 批量消费与写入
系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量。 系统 SHALL 对 Kafka 消息进行缓冲,并按批次写入数据库,以提高吞吐量;当写入失败时,系统 SHALL 执行连接恢复重试与降级策略,但不在运行时创建数据库分区
#### Scenario: 批量写入 #### Scenario: 批量写入
- **GIVEN** 短时间内收到多条消息 (e.g., 500条) - **GIVEN** 短时间内收到多条消息 (e.g., 500条)
@@ -96,3 +101,21 @@
- **WHEN** 捕获异常 - **WHEN** 捕获异常
- **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库 - **THEN** 自动降级为逐条写入,以隔离错误数据并确保有效数据入库
#### Scenario: 分区缺失错误处理
- **GIVEN** 写入时数据库返回分区缺失错误
- **WHEN** 服务处理该错误
- **THEN** 服务记录错误并按既有错误处理机制处理,不在运行时执行分区创建
### Requirement: G5 状态表同步开关
系统 SHALL 提供一个环境变量开关,用于控制是否向 G5 状态表 `room_status.room_status_moment_g5` 写入数据;该开关默认值 SHALL 为 `false`
#### Scenario: 默认关闭
- **GIVEN** 未设置 G5 状态表同步开关
- **WHEN** 服务启动并处理 Kafka 消息
- **THEN** 系统不向 `room_status.room_status_moment_g5` 写入数据
#### Scenario: 显式开启
- **GIVEN** G5 状态表同步开关被设置为 `true`
- **WHEN** 服务处理 Kafka 消息
- **THEN** 系统恢复向 `room_status.room_status_moment_g5` 写入数据

View File

@@ -1,23 +0,0 @@
CREATE SCHEMA IF NOT EXISTS onoffline;
CREATE TABLE IF NOT EXISTS onoffline.onoffline_record (
guid VARCHAR(32) NOT NULL,
ts_ms BIGINT NOT NULL,
write_ts_ms BIGINT NOT NULL,
hotel_id SMALLINT NOT NULL,
mac VARCHAR(21) NOT NULL,
device_id VARCHAR(64) NOT NULL,
room_id VARCHAR(64) NOT NULL,
ip VARCHAR(21),
current_status VARCHAR(255),
launcher_version VARCHAR(255),
reboot_reason VARCHAR(255),
PRIMARY KEY (ts_ms, mac, device_id, room_id)
) PARTITION BY RANGE (ts_ms);
CREATE INDEX IF NOT EXISTS idx_onoffline_ts_ms ON onoffline.onoffline_record (ts_ms);
CREATE INDEX IF NOT EXISTS idx_onoffline_hotel_id ON onoffline.onoffline_record (hotel_id);
CREATE INDEX IF NOT EXISTS idx_onoffline_mac ON onoffline.onoffline_record (mac);
CREATE INDEX IF NOT EXISTS idx_onoffline_device_id ON onoffline.onoffline_record (device_id);
CREATE INDEX IF NOT EXISTS idx_onoffline_room_id ON onoffline.onoffline_record (room_id);
CREATE INDEX IF NOT EXISTS idx_onoffline_current_status ON onoffline.onoffline_record (current_status);

View File

@@ -1,61 +0,0 @@
import pg from 'pg';
import { config } from '../src/config/config.js';
const { Pool } = pg;
const verify = async () => {
const pool = new Pool({
host: config.db.host,
port: config.db.port,
user: config.db.user,
password: config.db.password,
database: config.db.database,
ssl: config.db.ssl,
});
try {
console.log('Verifying partitions for table onoffline_record...');
const client = await pool.connect();
// Check parent table
const parentRes = await client.query(`
SELECT to_regclass('onoffline.onoffline_record') as oid;
`);
if (!parentRes.rows[0].oid) {
console.error('Parent table onoffline.onoffline_record DOES NOT EXIST.');
return;
}
console.log('Parent table onoffline.onoffline_record exists.');
// Check partitions
const res = await client.query(`
SELECT
child.relname AS partition_name
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace ns ON parent.relnamespace = ns.oid
WHERE parent.relname = 'onoffline_record' AND ns.nspname = 'onoffline'
ORDER BY child.relname;
`);
console.log(`Found ${res.rowCount} partitions.`);
res.rows.forEach(row => {
console.log(`- ${row.partition_name}`);
});
if (res.rowCount >= 30) {
console.log('SUCCESS: At least 30 partitions exist.');
} else {
console.warn(`WARNING: Expected 30+ partitions, found ${res.rowCount}.`);
}
client.release();
} catch (err) {
console.error('Verification failed:', err);
} finally {
await pool.end();
}
};
verify();

View File

@@ -35,7 +35,16 @@ Topicblwlog4Nodejs-rcu-onoffline-topic
主键ts_ms, mac, device_id, room_id 主键ts_ms, mac, device_id, room_id
按 ts_ms 每日分区 按 ts_ms 每日分区
G5库结构双写临时接入
库同为log_platform
onoffline_record_g5
差异字段:
- guid 为 int4由库自己生成。
- record_source 固定为 CRICS。
- current_status 为 int2on映射为1off映射为2restart映射为3其余为0。
支持通过环境变量开关双写。
4. 数据处理规则 4. 数据处理规则
非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus 非重启数据reboot_reason 为空或不存在current_status 取 CurrentStatus
重启数据reboot_reason 不为空current_status 固定为 on 重启数据reboot_reason 不为空时保留 Kafka 上游 current_status 值;若上游值为 restart则入库标记为 restartG5 库映射为 3
其余字段直接按 Kafka 原值落库,空值不补 0 其余字段直接按 Kafka 原值落库,空值不补 0

View File

@@ -13,6 +13,12 @@ const parseList = (value) =>
.map((item) => item.trim()) .map((item) => item.trim())
.filter(Boolean); .filter(Boolean);
const parseBoolean = (value, defaultValue) => {
if (value === undefined || value === null || value === '') return defaultValue;
if (typeof value === 'boolean') return value;
return value === 'true' || value === '1';
};
export const config = { export const config = {
env: process.env.NODE_ENV || 'development', env: process.env.NODE_ENV || 'development',
port: parseNumber(process.env.PORT, 3001), port: parseNumber(process.env.PORT, 3001),
@@ -39,6 +45,7 @@ export const config = {
} : undefined } : undefined
}, },
db: { db: {
writeEnabled: parseBoolean(process.env.DB_WRITE_ENABLED, true),
host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost', host: process.env.DB_HOST || process.env.POSTGRES_HOST || 'localhost',
port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432), port: parseNumber(process.env.DB_PORT || process.env.POSTGRES_PORT, 5432),
user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres', user: process.env.DB_USER || process.env.POSTGRES_USER || 'postgres',
@@ -49,6 +56,20 @@ export const config = {
schema: process.env.DB_SCHEMA || 'onoffline', schema: process.env.DB_SCHEMA || 'onoffline',
table: process.env.DB_TABLE || 'onoffline_record' table: process.env.DB_TABLE || 'onoffline_record'
}, },
g5db: {
writeEnabled: parseBoolean(process.env.G5_DB_WRITE_ENABLED, true),
enabled: !!process.env.POSTGRES_HOST_G5,
roomStatusMomentSyncEnabled: parseBoolean(process.env.G5_ROOM_STATUS_MOMENT_SYNC_ENABLED, false),
host: process.env.POSTGRES_HOST_G5,
port: parseNumber(process.env.POSTGRES_PORT_G5, 5434),
user: process.env.POSTGRES_USER_G5,
password: process.env.POSTGRES_PASSWORD_G5,
database: process.env.POSTGRES_DATABASE_G5,
max: parseNumber(process.env.POSTGRES_MAX_CONNECTIONS_G5, 3),
ssl: process.env.POSTGRES_SSL_G5 === 'true' ? { rejectUnauthorized: false } : undefined,
schema: process.env.DB_SCHEMA_G5 || 'onoffline',
table: process.env.DB_TABLE_G5 || 'onoffline_record_g5'
},
redis: { redis: {
host: process.env.REDIS_HOST || 'localhost', host: process.env.REDIS_HOST || 'localhost',
port: parseNumber(process.env.REDIS_PORT, 6379), port: parseNumber(process.env.REDIS_PORT, 6379),

View File

@@ -0,0 +1,202 @@
import pg from 'pg';
import { config } from '../config/config.js';
import { logger } from '../utils/logger.js';
const { Pool } = pg;
const g5Columns = [
'ts_ms',
'write_ts_ms',
'hotel_id',
'mac',
'device_id',
'room_id',
'ip',
'current_status',
'launcher_version',
'reboot_reason',
'record_source'
];
const roomStatusSyncSchema = 'room_status';
const roomStatusSyncTable = 'room_status_moment_g5';
export const mapCurrentStatusToG5Code = (value) => {
if (value === 1 || value === 2 || value === 3) {
return value;
}
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
if (normalized === 'on') return 1;
if (normalized === 'off') return 2;
if (normalized === 'restart') return 3;
return 0;
};
export const dedupeRoomStatusSyncRows = (rows) => {
const uniqueRows = new Map();
for (const row of rows || []) {
const hotelId = Number(row?.hotel_id);
const roomId = row?.room_id ?? null;
if (!Number.isFinite(hotelId) || roomId === null || roomId === '') {
continue;
}
const key = `${hotelId}@@${String(roomId)}`;
if (!uniqueRows.has(key)) {
uniqueRows.set(key, {
hotel_id: hotelId,
room_id: String(roomId),
ip: row?.ip ?? null
});
}
}
return Array.from(uniqueRows.values());
};
export class G5DatabaseManager {
constructor(dbConfig) {
if (!dbConfig.enabled) return;
this.pool = new Pool({
host: dbConfig.host,
port: dbConfig.port,
user: dbConfig.user,
password: dbConfig.password,
database: dbConfig.database,
max: dbConfig.max,
ssl: dbConfig.ssl
});
}
async insertRows({ schema, table, rows }) {
if (!this.pool || !rows || rows.length === 0) {
return;
}
const statement = `
INSERT INTO ${schema}.${table} (${g5Columns.join(', ')})
SELECT *
FROM UNNEST(
$1::int8[],
$2::int8[],
$3::int2[],
$4::text[],
$5::text[],
$6::text[],
$7::text[],
$8::int2[],
$9::text[],
$10::text[],
$11::text[]
)
ON CONFLICT DO NOTHING
`;
try {
const params = g5Columns.map((column) => {
return rows.map((row) => {
if (column === 'record_source') {
return 'CRICS';
}
if (column === 'current_status') {
// current_status in G5 is int2
return mapCurrentStatusToG5Code(row.current_status);
}
return row[column] ?? null;
});
});
await this.pool.query(statement, params);
} catch (error) {
logger.error('G5 Database insert failed', {
error: error?.message,
schema,
table,
rowsLength: rows.length
});
throw error;
}
}
async upsertRoomStatusMomentIp(rows) {
if (!this.pool || !rows || rows.length === 0) {
return;
}
const syncRows = dedupeRoomStatusSyncRows(rows);
if (syncRows.length === 0) {
return;
}
const sql = `
WITH input_rows AS (
SELECT *
FROM UNNEST($1::int2[], $2::text[], $3::text[])
AS t(hotel_id, room_id, ip)
), matched_rows AS (
SELECT i.hotel_id, i.room_id, i.ip
FROM input_rows i
JOIN LATERAL (
SELECT r.hotel_id, r.room_id
FROM ${roomStatusSyncSchema}.${roomStatusSyncTable} r
WHERE r.hotel_id = i.hotel_id
AND r.room_id = i.room_id
LIMIT 1
) m ON TRUE
)
INSERT INTO ${roomStatusSyncSchema}.${roomStatusSyncTable} (hotel_id, room_id, ip)
SELECT hotel_id, room_id, ip
FROM matched_rows
ON CONFLICT (hotel_id, room_id)
DO UPDATE SET ip = EXCLUDED.ip
`;
try {
await this.pool.query(sql, [
syncRows.map((row) => row.hotel_id),
syncRows.map((row) => row.room_id),
syncRows.map((row) => row.ip)
]);
} catch (error) {
logger.error('G5 room_status_moment_g5 upsert failed', {
error: error?.message,
rowsLength: syncRows.length
});
throw error;
}
}
async checkConnection() {
if (!this.pool) return true; // Pretend it's ok if disabled
let client;
try {
const connectPromise = this.pool.connect();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Connection timeout')), 5000);
});
try {
client = await Promise.race([connectPromise, timeoutPromise]);
} catch (raceError) {
connectPromise.then(c => c.release()).catch(() => { });
throw raceError;
}
await client.query('SELECT 1');
return true;
} catch (err) {
logger.error('G5 Database check connection failed', { error: err.message });
return false;
} finally {
if (client) {
client.release();
}
}
}
async close() {
if (this.pool) {
await this.pool.end();
}
}
}
const g5DbManager = new G5DatabaseManager(config.g5db);
export default g5DbManager;

View File

@@ -1,101 +0,0 @@
import pg from 'pg';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import { logger } from '../utils/logger.js';
import partitionManager from './partitionManager.js';
import dbManager from './databaseManager.js';
import { config } from '../config/config.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
class DatabaseInitializer {
async initialize() {
logger.info('Starting database initialization check...');
// 1. Check if database exists, create if not
await this.ensureDatabaseExists();
// 2. Initialize Schema and Parent Table (if not exists)
// Note: We need to use dbManager because it connects to the target database
await this.ensureSchemaAndTable();
// 3. Ensure Partitions for the next month
await partitionManager.ensurePartitions(30);
await partitionManager.ensureIndexesForExistingPartitions();
console.log('Database initialization completed successfully.');
logger.info('Database initialization completed successfully.');
}
async ensureDatabaseExists() {
const { host, port, user, password, database, ssl } = config.db;
console.log(`Checking if database '${database}' exists at ${host}:${port}...`);
// Connect to 'postgres' database to check/create target database
const client = new pg.Client({
host,
port,
user,
password,
database: 'postgres',
ssl: ssl ? { rejectUnauthorized: false } : false
});
try {
await client.connect();
const checkRes = await client.query(
`SELECT 1 FROM pg_database WHERE datname = $1`,
[database]
);
if (checkRes.rowCount === 0) {
logger.info(`Database '${database}' does not exist. Creating...`);
// CREATE DATABASE cannot run inside a transaction block
await client.query(`CREATE DATABASE "${database}"`);
console.log(`Database '${database}' created.`);
logger.info(`Database '${database}' created.`);
} else {
console.log(`Database '${database}' already exists.`);
logger.info(`Database '${database}' already exists.`);
}
} catch (err) {
logger.error('Error ensuring database exists:', err);
throw err;
} finally {
await client.end();
}
}
async ensureSchemaAndTable() {
// dbManager connects to the target database
const client = await dbManager.pool.connect();
try {
const sqlPathCandidates = [
path.resolve(process.cwd(), 'scripts/init_db.sql'),
path.resolve(__dirname, '../scripts/init_db.sql'),
path.resolve(__dirname, '../../scripts/init_db.sql')
];
const sqlPath = sqlPathCandidates.find((candidate) => fs.existsSync(candidate));
if (!sqlPath) {
throw new Error(`init_db.sql not found. Candidates: ${sqlPathCandidates.join(' | ')}`);
}
const sql = fs.readFileSync(sqlPath, 'utf8');
console.log(`Executing init_db.sql from ${sqlPath}...`);
logger.info('Executing init_db.sql...');
await client.query(sql);
console.log('Schema and parent table initialized.');
logger.info('Schema and parent table initialized.');
} catch (err) {
logger.error('Error initializing schema and table:', err);
throw err;
} finally {
client.release();
}
}
}
export default new DatabaseInitializer();

View File

@@ -1,197 +0,0 @@
import { logger } from '../utils/logger.js';
import { config } from '../config/config.js';
import dbManager from './databaseManager.js';
class PartitionManager {
/**
* Calculate the start and end timestamps (milliseconds) for a given date.
* @param {Date} date - The date to calculate for.
* @returns {Object} { startMs, endMs, partitionSuffix }
*/
getPartitionInfo(date) {
const yyyy = date.getFullYear();
const mm = String(date.getMonth() + 1).padStart(2, '0');
const dd = String(date.getDate()).padStart(2, '0');
const partitionSuffix = `${yyyy}${mm}${dd}`;
const start = new Date(date);
start.setHours(0, 0, 0, 0);
const startMs = start.getTime();
const end = new Date(date);
end.setDate(end.getDate() + 1);
end.setHours(0, 0, 0, 0);
const endMs = end.getTime();
return { startMs, endMs, partitionSuffix };
}
async ensurePartitionIndexes(client, schema, table, partitionSuffix) {
const startedAt = Date.now();
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const indexBase = `${table}_${partitionSuffix}`;
const indexSpecs = [
{ name: `idx_${indexBase}_ts`, column: 'ts_ms' },
{ name: `idx_${indexBase}_hid`, column: 'hotel_id' },
{ name: `idx_${indexBase}_mac`, column: 'mac' },
{ name: `idx_${indexBase}_did`, column: 'device_id' },
{ name: `idx_${indexBase}_rid`, column: 'room_id' },
{ name: `idx_${indexBase}_cs`, column: 'current_status' }
];
for (const spec of indexSpecs) {
await client.query(`CREATE INDEX IF NOT EXISTS ${spec.name} ON ${partitionName} (${spec.column});`);
}
await client.query(`ANALYZE ${partitionName};`);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 1000) {
logger.warn(`Partition index ensure slow`, { partitionName, elapsedMs });
}
}
async ensureIndexesForExistingPartitions() {
const startedAt = Date.now();
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
const res = await client.query(
`
SELECT c.relname AS relname
FROM pg_inherits i
JOIN pg_class p ON i.inhparent = p.oid
JOIN pg_namespace pn ON pn.oid = p.relnamespace
JOIN pg_class c ON i.inhrelid = c.oid
WHERE pn.nspname = $1 AND p.relname = $2
ORDER BY c.relname;
`,
[schema, table]
);
const suffixes = new Set();
const pattern = new RegExp(`^${table}_(\\d{8})$`);
for (const row of res.rows) {
const relname = row?.relname;
if (typeof relname !== 'string') continue;
const match = relname.match(pattern);
if (!match) continue;
suffixes.add(match[1]);
}
for (const suffix of suffixes) {
await this.ensurePartitionIndexes(client, schema, table, suffix);
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs > 5000) {
logger.warn('Ensure existing partition indexes slow', { schema, table, partitions: suffixes.size, elapsedMs });
}
} finally {
client.release();
}
}
/**
* Ensure partitions exist for the past M days and next N days.
* @param {number} daysAhead - Number of days to pre-create.
* @param {number} daysBack - Number of days to look back.
*/
async ensurePartitions(daysAhead = 30, daysBack = 15) {
const client = await dbManager.pool.connect();
try {
logger.info(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
console.log(`Starting partition check for the past ${daysBack} days and next ${daysAhead} days...`);
const now = new Date();
for (let i = -daysBack; i < daysAhead; i++) {
const targetDate = new Date(now);
targetDate.setDate(now.getDate() + i);
const { startMs, endMs, partitionSuffix } = this.getPartitionInfo(targetDate);
const schema = config.db.schema;
const table = config.db.table;
const partitionName = `${schema}.${table}_${partitionSuffix}`;
// Check if partition exists
const checkSql = `
SELECT to_regclass($1) as exists;
`;
const checkRes = await client.query(checkSql, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
console.log(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
const createSql = `
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs});
`;
await client.query(createSql);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
logger.info('Partition check completed.');
} catch (err) {
logger.error('Error ensuring partitions:', err);
throw err;
} finally {
client.release();
}
}
async ensurePartitionsForTimestamps(tsMsList) {
if (!Array.isArray(tsMsList) || tsMsList.length === 0) return;
const uniqueSuffixes = new Set();
for (const ts of tsMsList) {
const numericTs = typeof ts === 'string' ? Number(ts) : ts;
if (!Number.isFinite(numericTs)) continue;
const date = new Date(numericTs);
if (Number.isNaN(date.getTime())) continue;
const { partitionSuffix } = this.getPartitionInfo(date);
uniqueSuffixes.add(partitionSuffix);
if (uniqueSuffixes.size >= 400) break;
}
if (uniqueSuffixes.size === 0) return;
const client = await dbManager.pool.connect();
try {
const schema = config.db.schema;
const table = config.db.table;
for (const partitionSuffix of uniqueSuffixes) {
const yyyy = Number(partitionSuffix.slice(0, 4));
const mm = Number(partitionSuffix.slice(4, 6));
const dd = Number(partitionSuffix.slice(6, 8));
if (!Number.isFinite(yyyy) || !Number.isFinite(mm) || !Number.isFinite(dd)) continue;
const targetDate = new Date(yyyy, mm - 1, dd);
if (Number.isNaN(targetDate.getTime())) continue;
const { startMs, endMs } = this.getPartitionInfo(targetDate);
const partitionName = `${schema}.${table}_${partitionSuffix}`;
const checkRes = await client.query(`SELECT to_regclass($1) as exists;`, [partitionName]);
if (!checkRes.rows[0].exists) {
logger.info(`Creating partition ${partitionName} for range [${startMs}, ${endMs})`);
await client.query(`
CREATE TABLE IF NOT EXISTS ${partitionName}
PARTITION OF ${schema}.${table}
FOR VALUES FROM (${startMs}) TO (${endMs});
`);
}
await this.ensurePartitionIndexes(client, schema, table, partitionSuffix);
}
} finally {
client.release();
}
}
}
export default new PartitionManager();

View File

@@ -1,8 +1,7 @@
import cron from 'node-cron'; import cron from 'node-cron';
import { config } from './config/config.js'; import { config } from './config/config.js';
import dbManager from './db/databaseManager.js'; import dbManager from './db/databaseManager.js';
import dbInitializer from './db/initializer.js'; import g5DbManager from './db/g5DatabaseManager.js';
import partitionManager from './db/partitionManager.js';
import { createKafkaConsumers } from './kafka/consumer.js'; import { createKafkaConsumers } from './kafka/consumer.js';
import { parseMessageToRows } from './processor/index.js'; import { parseMessageToRows } from './processor/index.js';
import { createRedisClient } from './redis/redisClient.js'; import { createRedisClient } from './redis/redisClient.js';
@@ -20,7 +19,13 @@ const bootstrap = async () => {
port: config.db.port, port: config.db.port,
user: config.db.user, user: config.db.user,
database: config.db.database, database: config.db.database,
schema: config.db.schema schema: config.db.schema,
writeEnabled: config.db.writeEnabled
},
g5db: {
enabled: config.g5db.enabled,
writeEnabled: config.g5db.writeEnabled,
roomStatusMomentSyncEnabled: config.g5db.roomStatusMomentSyncEnabled
}, },
kafka: { kafka: {
brokers: config.kafka.brokers, brokers: config.kafka.brokers,
@@ -33,38 +38,12 @@ const bootstrap = async () => {
} }
}); });
// 0. Initialize Database (Create DB, Schema, Table, Partitions)
await dbInitializer.initialize();
// Metric Collector // Metric Collector
const metricCollector = new MetricCollector(); const metricCollector = new MetricCollector();
// 1. Setup Partition Maintenance Cron Job (Every day at 00:00)
cron.schedule('0 0 * * *', async () => {
logger.info('Running scheduled partition maintenance...');
try {
await partitionManager.ensurePartitions(30);
} catch (err) {
logger.error('Scheduled partition maintenance failed', err);
}
});
// 1.1 Setup Metric Reporting Cron Job (Every minute) // 1.1 Setup Metric Reporting Cron Job (Every minute)
// Moved after redisIntegration initialization // Moved after redisIntegration initialization
// DatabaseManager is now a singleton exported instance, but let's keep consistency if possible
// In databaseManager.js it exports `dbManager` instance by default.
// The original code was `const dbManager = new DatabaseManager(config.db);` which implies it might have been a class export.
// Let's check `databaseManager.js` content.
// Wait, I imported `dbManager` from `./db/databaseManager.js`.
// If `databaseManager.js` exports an instance as default, I should use that.
// If it exports a class, I should instantiate it.
// Let's assume the previous code `new DatabaseManager` was correct if it was a class.
// BUT I used `dbManager.pool` in `partitionManager.js` assuming it's an instance.
// I need to verify `databaseManager.js`.
const redisClient = await createRedisClient(config.redis); const redisClient = await createRedisClient(config.redis);
const redisIntegration = new RedisIntegration( const redisIntegration = new RedisIntegration(
redisClient, redisClient,
@@ -199,16 +178,26 @@ const bootstrap = async () => {
); );
}; };
const isMissingPartitionError = (err) =>
err?.code === '23514' ||
(typeof err?.message === 'string' && err.message.includes('no partition of relation'));
const insertRowsWithRetry = async (rows) => { const insertRowsWithRetry = async (rows) => {
const startedAt = Date.now(); const startedAt = Date.now();
let attemptedPartitionFix = false;
while (true) { while (true) {
try { try {
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); const promises = [];
if (config.db.writeEnabled) {
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
}
if (config.g5db.writeEnabled && config.g5db.enabled) {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed but non-blocking', { error: e.message });
}));
if (config.g5db.roomStatusMomentSyncEnabled) {
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message });
}));
}
}
await Promise.all(promises);
metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
return; return;
@@ -222,24 +211,6 @@ const bootstrap = async () => {
} }
continue; continue;
} }
if (isMissingPartitionError(err) && !attemptedPartitionFix) {
attemptedPartitionFix = true;
try {
await partitionManager.ensurePartitionsForTimestamps(rows.map(r => r.ts_ms));
} catch (partitionErr) {
if (isDbConnectionError(partitionErr)) {
logger.error('Database offline during partition ensure. Retrying in 5s...', { error: partitionErr.message });
await new Promise(r => setTimeout(r, 5000));
while (!(await dbManager.checkConnection())) {
logger.warn('Database still offline. Waiting 5s...');
await new Promise(r => setTimeout(r, 5000));
}
continue;
}
throw partitionErr;
}
continue;
}
throw err; throw err;
} }
} }
@@ -247,7 +218,21 @@ const bootstrap = async () => {
const insertRowsOnce = async (rows) => { const insertRowsOnce = async (rows) => {
const startedAt = Date.now(); const startedAt = Date.now();
await dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }); const promises = [];
if (config.db.writeEnabled) {
promises.push(dbManager.insertRows({ schema: config.db.schema, table: config.db.table, rows }));
}
if (config.g5db.writeEnabled && config.g5db.enabled) {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed in insertOnce', { error: e.message });
}));
if (config.g5db.roomStatusMomentSyncEnabled) {
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message });
}));
}
}
await Promise.all(promises);
metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_count', 1);
metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt); metricCollector.increment('db_insert_ms_sum', Date.now() - startedAt);
}; };
@@ -380,7 +365,7 @@ const bootstrap = async () => {
for (const item of unresolvedItems) { for (const item of unresolvedItems) {
try { try {
await handleError(err, item.message); await handleError(err, item.message);
} catch {} } catch { }
item.resolve(); item.resolve();
} }
} }
@@ -480,8 +465,9 @@ const bootstrap = async () => {
await redisClient.quit(); await redisClient.quit();
logger.info('Redis client closed'); logger.info('Redis client closed');
// 4. Close Database Pool // 4. Close Database Pools
await dbManager.close(); await dbManager.close();
await g5DbManager.close();
logger.info('Database connection closed'); logger.info('Database connection closed');
process.exit(0); process.exit(0);

View File

@@ -20,6 +20,16 @@ const normalizeText = (value, maxLength) => {
return str; return str;
}; };
const normalizeCurrentStatus = (value) => {
const currentStatus = normalizeText(value, 255);
if (currentStatus === null) return null;
const normalized = currentStatus.toLowerCase();
if (normalized === 'on' || normalized === 'off' || normalized === 'restart') {
return normalized;
}
return currentStatus;
};
export const buildRowsFromMessageValue = (value) => { export const buildRowsFromMessageValue = (value) => {
const payload = parseKafkaPayload(value); const payload = parseKafkaPayload(value);
return buildRowsFromPayload(payload); return buildRowsFromPayload(payload);
@@ -30,9 +40,7 @@ export const buildRowsFromPayload = (rawPayload) => {
// Database limit is VARCHAR(255) // Database limit is VARCHAR(255)
const rebootReason = normalizeText(payload.RebootReason, 255); const rebootReason = normalizeText(payload.RebootReason, 255);
const currentStatusRaw = normalizeText(payload.CurrentStatus, 255); const currentStatus = normalizeCurrentStatus(payload.CurrentStatus);
const hasRebootReason = rebootReason !== null && rebootReason !== '';
const currentStatus = hasRebootReason ? 'on' : currentStatusRaw;
// Derive timestamp: UnixTime -> CurrentTime -> Date.now() // Derive timestamp: UnixTime -> CurrentTime -> Date.now()
let tsMs = payload.UnixTime; let tsMs = payload.UnixTime;

View File

@@ -0,0 +1,27 @@
import { describe, it, expect } from 'vitest';
import { dedupeRoomStatusSyncRows, mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js';
describe('G5 current_status mapping', () => {
it('maps on/off/restart to numeric codes', () => {
expect(mapCurrentStatusToG5Code('on')).toBe(1);
expect(mapCurrentStatusToG5Code('off')).toBe(2);
expect(mapCurrentStatusToG5Code('restart')).toBe(3);
});
it('returns 0 for unknown values', () => {
expect(mapCurrentStatusToG5Code('idle')).toBe(0);
expect(mapCurrentStatusToG5Code(null)).toBe(0);
});
it('dedupes room status sync rows by hotel_id and room_id using first row', () => {
const rows = dedupeRoomStatusSyncRows([
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' },
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.2:5678' },
{ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' }
]);
expect(rows).toHaveLength(2);
expect(rows[0]).toEqual({ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' });
expect(rows[1]).toEqual({ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' });
});
});

View File

@@ -26,13 +26,19 @@ describe('Processor Logic', () => {
expect(rows[0].reboot_reason).toBeNull(); expect(rows[0].reboot_reason).toBeNull();
}); });
it('should override current_status to on for reboot data', () => { it('should preserve restart current_status for reboot data', () => {
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'off', RebootReason: '0x01' }); const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: '0x01' });
expect(rows).toHaveLength(1); expect(rows).toHaveLength(1);
expect(rows[0].current_status).toBe('on'); expect(rows[0].current_status).toBe('restart');
expect(rows[0].reboot_reason).toBe('0x01'); expect(rows[0].reboot_reason).toBe('0x01');
}); });
it('should preserve restart current_status for non-reboot data', () => {
const rows = buildRowsFromPayload({ ...basePayload, CurrentStatus: 'restart', RebootReason: null });
expect(rows).toHaveLength(1);
expect(rows[0].current_status).toBe('restart');
});
it('should keep empty optional fields as empty strings', () => { it('should keep empty optional fields as empty strings', () => {
const rows = buildRowsFromPayload({ const rows = buildRowsFromPayload({
...basePayload, ...basePayload,

View File

@@ -0,0 +1,42 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 80
Source Server Type : PostgreSQL
Source Server Version : 150017 (150017)
Source Host : 10.8.8.80:5434
Source Catalog : log_platform
Source Schema : onoffline
Target Server Type : PostgreSQL
Target Server Version : 150017 (150017)
File Encoding : 65001
Date: 10/03/2026 17:23:24
*/
-- ----------------------------
-- Table structure for onoffline_record_g5
-- ----------------------------
DROP TABLE IF EXISTS "onoffline"."onoffline_record_g5";
CREATE TABLE "onoffline"."onoffline_record_g5" (
"guid" int4 NOT NULL DEFAULT nextval('"onoffline".onoffline_record_g5_guid_seq'::regclass),
"ts_ms" int8 NOT NULL,
"write_ts_ms" int8 NOT NULL,
"hotel_id" int2 NOT NULL,
"mac" varchar(21) COLLATE "pg_catalog"."default" NOT NULL,
"device_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
"room_id" varchar(64) COLLATE "pg_catalog"."default" NOT NULL,
"ip" varchar(25) COLLATE "pg_catalog"."default",
"current_status" int2 NOT NULL DEFAULT 0,
"launcher_version" varchar(255) COLLATE "pg_catalog"."default",
"reboot_reason" varchar(255) COLLATE "pg_catalog"."default",
"record_source" varchar(50) COLLATE "pg_catalog"."default"
)
;
-- ----------------------------
-- Primary Key structure for table onoffline_record_g5
-- ----------------------------
ALTER TABLE "onoffline"."onoffline_record_g5" ADD CONSTRAINT "onoffline_record_g5_pkey" PRIMARY KEY ("ts_ms", "guid");

View File

@@ -0,0 +1,91 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 80
Source Server Type : PostgreSQL
Source Server Version : 150017 (150017)
Source Host : 10.8.8.80:5434
Source Catalog : log_platform
Source Schema : room_status
Target Server Type : PostgreSQL
Target Server Version : 150017 (150017)
File Encoding : 65001
Date: 18/03/2026 10:55:09
*/
-- ----------------------------
-- Table structure for room_status_moment_g5
-- ----------------------------
DROP TABLE IF EXISTS "room_status"."room_status_moment_g5";
CREATE TABLE "room_status"."room_status_moment_g5" (
"hotel_id" int2 NOT NULL,
"room_id" text COLLATE "pg_catalog"."default" NOT NULL,
"device_id" text COLLATE "pg_catalog"."default" NOT NULL,
"ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint,
"sys_lock_status" int2,
"online_status" int2,
"launcher_version" text COLLATE "pg_catalog"."default",
"app_version" text COLLATE "pg_catalog"."default",
"config_version" text COLLATE "pg_catalog"."default",
"register_ts_ms" int8,
"upgrade_ts_ms" int8,
"config_ts_ms" int8,
"ip" text COLLATE "pg_catalog"."default",
"pms_status" int2,
"power_state" int2,
"cardless_state" int2,
"service_mask" int8,
"insert_card" int2,
"bright_g" int2,
"agreement_ver" text COLLATE "pg_catalog"."default",
"air_address" _text COLLATE "pg_catalog"."default",
"air_state" _int2,
"air_model" _int2,
"air_speed" _int2,
"air_set_temp" _int2,
"air_now_temp" _int2,
"air_solenoid_valve" _int2,
"elec_address" _text COLLATE "pg_catalog"."default",
"elec_voltage" _float8,
"elec_ampere" _float8,
"elec_power" _float8,
"elec_phase" _float8,
"elec_energy" _float8,
"elec_sum_energy" _float8,
"carbon_state" int2,
"dev_loops" jsonb,
"energy_carbon_sum" float8,
"energy_nocard_sum" float8,
"external_device" jsonb DEFAULT '{}'::jsonb,
"faulty_device_count" jsonb DEFAULT '{}'::jsonb
)
WITH (fillfactor=90)
TABLESPACE "ts_hot"
;
-- ----------------------------
-- Indexes structure for table room_status_moment_g5
-- ----------------------------
CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree (
"hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST,
"online_status" "pg_catalog"."int2_ops" ASC NULLS LAST,
"power_state" "pg_catalog"."int2_ops" ASC NULLS LAST
);
-- ----------------------------
-- Triggers structure for table room_status_moment_g5
-- ----------------------------
CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5"
FOR EACH ROW
EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"();
CREATE TRIGGER "trigger_room_status_change" AFTER UPDATE ON "room_status"."room_status_moment_g5"
FOR EACH ROW
EXECUTE PROCEDURE "room_status"."handle_room_status_change"();
-- ----------------------------
-- Primary Key structure for table room_status_moment_g5
-- ----------------------------
ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");