feat: 添加 G5 状态表 IP 同步功能,新增 upsert 方法并更新相关测试

This commit is contained in:
2026-03-18 11:51:17 +08:00
parent 381080fee0
commit fa363835a3
7 changed files with 223 additions and 1 deletions

View File

@@ -0,0 +1,13 @@
# Change: add g5 room status ip upsert
## Why
当前双写仅写入 onoffline_record_g5未同步更新 G5 状态表 `room_status.room_status_moment_g5``ip` 字段,导致状态表与最新上报数据存在延迟。
## What Changes
- 新增 G5 状态表同步机制:按 `hotel_id + room_id` 查找目标行,并使用 `ON CONFLICT DO UPDATE` 更新 `ip`
- 同步时仅处理查找到的第一条匹配行(`LIMIT 1`)。
- 无论 `ip` 是否变化都执行 upsert 更新,以触发数据库更新时间相关触发器。
## Impact
- Affected specs: `openspec/specs/onoffline/spec.md`
- Affected code: `src/db/g5DatabaseManager.js`, `src/index.js`, `tests/g5DatabaseManager.test.js`

View File

@@ -0,0 +1,19 @@
## ADDED Requirements
### Requirement: G5 状态表 IP 同步
系统 SHALL 在处理并写入数据时,同步将对应设备的 `ip` 更新到 G5 状态表 `room_status.room_status_moment_g5`
#### Scenario: 按唯一键同步 IP
- **GIVEN** 当前处理行包含 `hotel_id``room_id`
- **WHEN** 执行 G5 状态表同步
- **THEN** 系统按 `hotel_id + room_id` 查找状态表记录,并仅对查找到的第一条记录执行写入
#### Scenario: 使用 upsert 触发更新
- **GIVEN** 状态表已存在同键记录
- **WHEN** 执行写入
- **THEN** 系统使用 `ON CONFLICT (hotel_id, room_id) DO UPDATE` 更新 `ip`
#### Scenario: IP 不变仍触发更新
- **GIVEN** 新 `ip` 与库内 `ip` 相同
- **WHEN** 执行状态表同步
- **THEN** 系统仍执行更新语句,以触发表上的更新时间相关触发器

View File

@@ -0,0 +1,10 @@
## 1. Implementation
- [x] 1.1 Add room status IP upsert method in G5 DB manager.
- [x] 1.2 Trigger room status IP upsert in the main write path.
- [x] 1.3 Add tests for status mapping and room status dedupe behavior.
- [x] 1.4 Add OpenSpec delta for the new synchronization requirement.
## 2. Validation
- [x] 2.1 Run `npm run test`.
- [x] 2.2 Run `npm run build`.
- [x] 2.3 Run `openspec validate add-g5-room-status-ip-upsert --strict`.

View File

@@ -18,6 +18,9 @@ const g5Columns = [
'record_source' 'record_source'
]; ];
const roomStatusSyncSchema = 'room_status';
const roomStatusSyncTable = 'room_status_moment_g5';
export const mapCurrentStatusToG5Code = (value) => { export const mapCurrentStatusToG5Code = (value) => {
if (value === 1 || value === 2 || value === 3) { if (value === 1 || value === 2 || value === 3) {
return value; return value;
@@ -30,6 +33,26 @@ export const mapCurrentStatusToG5Code = (value) => {
return 0; return 0;
}; };
export const dedupeRoomStatusSyncRows = (rows) => {
const uniqueRows = new Map();
for (const row of rows || []) {
const hotelId = Number(row?.hotel_id);
const roomId = row?.room_id ?? null;
if (!Number.isFinite(hotelId) || roomId === null || roomId === '') {
continue;
}
const key = `${hotelId}@@${String(roomId)}`;
if (!uniqueRows.has(key)) {
uniqueRows.set(key, {
hotel_id: hotelId,
room_id: String(roomId),
ip: row?.ip ?? null
});
}
}
return Array.from(uniqueRows.values());
};
export class G5DatabaseManager { export class G5DatabaseManager {
constructor(dbConfig) { constructor(dbConfig) {
if (!dbConfig.enabled) return; if (!dbConfig.enabled) return;
@@ -94,6 +117,54 @@ export class G5DatabaseManager {
} }
} }
async upsertRoomStatusMomentIp(rows) {
if (!this.pool || !rows || rows.length === 0) {
return;
}
const syncRows = dedupeRoomStatusSyncRows(rows);
if (syncRows.length === 0) {
return;
}
const sql = `
WITH input_rows AS (
SELECT *
FROM UNNEST($1::int2[], $2::text[], $3::text[])
AS t(hotel_id, room_id, ip)
), matched_rows AS (
SELECT i.hotel_id, i.room_id, i.ip
FROM input_rows i
JOIN LATERAL (
SELECT r.hotel_id, r.room_id
FROM ${roomStatusSyncSchema}.${roomStatusSyncTable} r
WHERE r.hotel_id = i.hotel_id
AND r.room_id = i.room_id
LIMIT 1
) m ON TRUE
)
INSERT INTO ${roomStatusSyncSchema}.${roomStatusSyncTable} (hotel_id, room_id, ip)
SELECT hotel_id, room_id, ip
FROM matched_rows
ON CONFLICT (hotel_id, room_id)
DO UPDATE SET ip = EXCLUDED.ip
`;
try {
await this.pool.query(sql, [
syncRows.map((row) => row.hotel_id),
syncRows.map((row) => row.room_id),
syncRows.map((row) => row.ip)
]);
} catch (error) {
logger.error('G5 room_status_moment_g5 upsert failed', {
error: error?.message,
rowsLength: syncRows.length
});
throw error;
}
}
async checkConnection() { async checkConnection() {
if (!this.pool) return true; // Pretend it's ok if disabled if (!this.pool) return true; // Pretend it's ok if disabled
let client; let client;

View File

@@ -183,6 +183,9 @@ const bootstrap = async () => {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed but non-blocking', { error: e.message }); logger.error('G5 Database insert failed but non-blocking', { error: e.message });
})); }));
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message });
}));
} }
await Promise.all(promises); await Promise.all(promises);
@@ -213,6 +216,9 @@ const bootstrap = async () => {
promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => {
logger.error('G5 Database insert failed in insertOnce', { error: e.message }); logger.error('G5 Database insert failed in insertOnce', { error: e.message });
})); }));
promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => {
logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message });
}));
} }
await Promise.all(promises); await Promise.all(promises);
metricCollector.increment('db_insert_count', 1); metricCollector.increment('db_insert_count', 1);

View File

@@ -1,5 +1,5 @@
import { describe, it, expect } from 'vitest'; import { describe, it, expect } from 'vitest';
import { mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js'; import { dedupeRoomStatusSyncRows, mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js';
describe('G5 current_status mapping', () => { describe('G5 current_status mapping', () => {
it('maps on/off/restart to numeric codes', () => { it('maps on/off/restart to numeric codes', () => {
@@ -12,4 +12,16 @@ describe('G5 current_status mapping', () => {
expect(mapCurrentStatusToG5Code('idle')).toBe(0); expect(mapCurrentStatusToG5Code('idle')).toBe(0);
expect(mapCurrentStatusToG5Code(null)).toBe(0); expect(mapCurrentStatusToG5Code(null)).toBe(0);
}); });
it('dedupes room status sync rows by hotel_id and room_id using first row', () => {
const rows = dedupeRoomStatusSyncRows([
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' },
{ hotel_id: 101, room_id: '8001', ip: '10.0.0.2:5678' },
{ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' }
]);
expect(rows).toHaveLength(2);
expect(rows[0]).toEqual({ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' });
expect(rows[1]).toEqual({ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' });
});
}); });

View File

@@ -0,0 +1,91 @@
/*
Navicat Premium Dump SQL
Source Server : FnOS 80
Source Server Type : PostgreSQL
Source Server Version : 150017 (150017)
Source Host : 10.8.8.80:5434
Source Catalog : log_platform
Source Schema : room_status
Target Server Type : PostgreSQL
Target Server Version : 150017 (150017)
File Encoding : 65001
Date: 18/03/2026 10:55:09
*/
-- ----------------------------
-- Table structure for room_status_moment_g5
-- ----------------------------
DROP TABLE IF EXISTS "room_status"."room_status_moment_g5";
CREATE TABLE "room_status"."room_status_moment_g5" (
"hotel_id" int2 NOT NULL,
"room_id" text COLLATE "pg_catalog"."default" NOT NULL,
"device_id" text COLLATE "pg_catalog"."default" NOT NULL,
"ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint,
"sys_lock_status" int2,
"online_status" int2,
"launcher_version" text COLLATE "pg_catalog"."default",
"app_version" text COLLATE "pg_catalog"."default",
"config_version" text COLLATE "pg_catalog"."default",
"register_ts_ms" int8,
"upgrade_ts_ms" int8,
"config_ts_ms" int8,
"ip" text COLLATE "pg_catalog"."default",
"pms_status" int2,
"power_state" int2,
"cardless_state" int2,
"service_mask" int8,
"insert_card" int2,
"bright_g" int2,
"agreement_ver" text COLLATE "pg_catalog"."default",
"air_address" _text COLLATE "pg_catalog"."default",
"air_state" _int2,
"air_model" _int2,
"air_speed" _int2,
"air_set_temp" _int2,
"air_now_temp" _int2,
"air_solenoid_valve" _int2,
"elec_address" _text COLLATE "pg_catalog"."default",
"elec_voltage" _float8,
"elec_ampere" _float8,
"elec_power" _float8,
"elec_phase" _float8,
"elec_energy" _float8,
"elec_sum_energy" _float8,
"carbon_state" int2,
"dev_loops" jsonb,
"energy_carbon_sum" float8,
"energy_nocard_sum" float8,
"external_device" jsonb DEFAULT '{}'::jsonb,
"faulty_device_count" jsonb DEFAULT '{}'::jsonb
)
WITH (fillfactor=90)
TABLESPACE "ts_hot"
;
-- ----------------------------
-- Indexes structure for table room_status_moment_g5
-- ----------------------------
CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree (
"hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST,
"online_status" "pg_catalog"."int2_ops" ASC NULLS LAST,
"power_state" "pg_catalog"."int2_ops" ASC NULLS LAST
);
-- ----------------------------
-- Triggers structure for table room_status_moment_g5
-- ----------------------------
CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5"
FOR EACH ROW
EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"();
CREATE TRIGGER "trigger_room_status_change" AFTER UPDATE ON "room_status"."room_status_moment_g5"
FOR EACH ROW
EXECUTE PROCEDURE "room_status"."handle_room_status_change"();
-- ----------------------------
-- Primary Key structure for table room_status_moment_g5
-- ----------------------------
ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");