diff --git a/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/proposal.md b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/proposal.md new file mode 100644 index 0000000..8984ed1 --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/proposal.md @@ -0,0 +1,13 @@ +# Change: add g5 room status ip upsert + +## Why +当前双写仅写入 onoffline_record_g5,未同步更新 G5 状态表 `room_status.room_status_moment_g5` 的 `ip` 字段,导致状态表与最新上报数据存在延迟。 + +## What Changes +- 新增 G5 状态表同步机制:按 `hotel_id + room_id` 查找目标行,并使用 `ON CONFLICT DO UPDATE` 更新 `ip`。 +- 同步时仅处理查找到的第一条匹配行(`LIMIT 1`)。 +- 无论 `ip` 是否变化都执行 upsert 更新,以触发数据库更新时间相关触发器。 + +## Impact +- Affected specs: `openspec/specs/onoffline/spec.md` +- Affected code: `src/db/g5DatabaseManager.js`, `src/index.js`, `tests/g5DatabaseManager.test.js` diff --git a/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/specs/onoffline/spec.md b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/specs/onoffline/spec.md new file mode 100644 index 0000000..35fcb7e --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/specs/onoffline/spec.md @@ -0,0 +1,19 @@ +## ADDED Requirements + +### Requirement: G5 状态表 IP 同步 +系统 SHALL 在处理并写入数据时,同步将对应设备的 `ip` 更新到 G5 状态表 `room_status.room_status_moment_g5`。 + +#### Scenario: 按唯一键同步 IP +- **GIVEN** 当前处理行包含 `hotel_id` 与 `room_id` +- **WHEN** 执行 G5 状态表同步 +- **THEN** 系统按 `hotel_id + room_id` 查找状态表记录,并仅对查找到的第一条记录执行写入 + +#### Scenario: 使用 upsert 触发更新 +- **GIVEN** 状态表已存在同键记录 +- **WHEN** 执行写入 +- **THEN** 系统使用 `ON CONFLICT (hotel_id, room_id) DO UPDATE` 更新 `ip` + +#### Scenario: IP 不变仍触发更新 +- **GIVEN** 新 `ip` 与库内 `ip` 相同 +- **WHEN** 执行状态表同步 +- **THEN** 系统仍执行更新语句,以触发表上的更新时间相关触发器 diff --git a/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/tasks.md b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/tasks.md new file mode 100644 index 0000000..bbe3adb --- /dev/null +++ b/bls-onoffline-backend/openspec/changes/add-g5-room-status-ip-upsert/tasks.md @@ -0,0 +1,10 @@ +## 1. Implementation +- [x] 1.1 Add room status IP upsert method in G5 DB manager. +- [x] 1.2 Trigger room status IP upsert in the main write path. +- [x] 1.3 Add tests for status mapping and room status dedupe behavior. +- [x] 1.4 Add OpenSpec delta for the new synchronization requirement. + +## 2. Validation +- [x] 2.1 Run `npm run test`. +- [x] 2.2 Run `npm run build`. +- [x] 2.3 Run `openspec validate add-g5-room-status-ip-upsert --strict`. diff --git a/bls-onoffline-backend/src/db/g5DatabaseManager.js b/bls-onoffline-backend/src/db/g5DatabaseManager.js index 2378128..3c8ce05 100644 --- a/bls-onoffline-backend/src/db/g5DatabaseManager.js +++ b/bls-onoffline-backend/src/db/g5DatabaseManager.js @@ -18,6 +18,9 @@ const g5Columns = [ 'record_source' ]; +const roomStatusSyncSchema = 'room_status'; +const roomStatusSyncTable = 'room_status_moment_g5'; + export const mapCurrentStatusToG5Code = (value) => { if (value === 1 || value === 2 || value === 3) { return value; @@ -30,6 +33,26 @@ export const mapCurrentStatusToG5Code = (value) => { return 0; }; +export const dedupeRoomStatusSyncRows = (rows) => { + const uniqueRows = new Map(); + for (const row of rows || []) { + const hotelId = Number(row?.hotel_id); + const roomId = row?.room_id ?? null; + if (!Number.isFinite(hotelId) || roomId === null || roomId === '') { + continue; + } + const key = `${hotelId}@@${String(roomId)}`; + if (!uniqueRows.has(key)) { + uniqueRows.set(key, { + hotel_id: hotelId, + room_id: String(roomId), + ip: row?.ip ?? null + }); + } + } + return Array.from(uniqueRows.values()); +}; + export class G5DatabaseManager { constructor(dbConfig) { if (!dbConfig.enabled) return; @@ -94,6 +117,54 @@ export class G5DatabaseManager { } } + async upsertRoomStatusMomentIp(rows) { + if (!this.pool || !rows || rows.length === 0) { + return; + } + + const syncRows = dedupeRoomStatusSyncRows(rows); + if (syncRows.length === 0) { + return; + } + + const sql = ` + WITH input_rows AS ( + SELECT * + FROM UNNEST($1::int2[], $2::text[], $3::text[]) + AS t(hotel_id, room_id, ip) + ), matched_rows AS ( + SELECT i.hotel_id, i.room_id, i.ip + FROM input_rows i + JOIN LATERAL ( + SELECT r.hotel_id, r.room_id + FROM ${roomStatusSyncSchema}.${roomStatusSyncTable} r + WHERE r.hotel_id = i.hotel_id + AND r.room_id = i.room_id + LIMIT 1 + ) m ON TRUE + ) + INSERT INTO ${roomStatusSyncSchema}.${roomStatusSyncTable} (hotel_id, room_id, ip) + SELECT hotel_id, room_id, ip + FROM matched_rows + ON CONFLICT (hotel_id, room_id) + DO UPDATE SET ip = EXCLUDED.ip + `; + + try { + await this.pool.query(sql, [ + syncRows.map((row) => row.hotel_id), + syncRows.map((row) => row.room_id), + syncRows.map((row) => row.ip) + ]); + } catch (error) { + logger.error('G5 room_status_moment_g5 upsert failed', { + error: error?.message, + rowsLength: syncRows.length + }); + throw error; + } + } + async checkConnection() { if (!this.pool) return true; // Pretend it's ok if disabled let client; diff --git a/bls-onoffline-backend/src/index.js b/bls-onoffline-backend/src/index.js index e060710..ee87906 100644 --- a/bls-onoffline-backend/src/index.js +++ b/bls-onoffline-backend/src/index.js @@ -183,6 +183,9 @@ const bootstrap = async () => { promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { logger.error('G5 Database insert failed but non-blocking', { error: e.message }); })); + promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => { + logger.error('G5 room_status_moment_g5 upsert failed but non-blocking', { error: e.message }); + })); } await Promise.all(promises); @@ -213,6 +216,9 @@ const bootstrap = async () => { promises.push(g5DbManager.insertRows({ schema: config.g5db.schema, table: config.g5db.table, rows }).catch(e => { logger.error('G5 Database insert failed in insertOnce', { error: e.message }); })); + promises.push(g5DbManager.upsertRoomStatusMomentIp(rows).catch(e => { + logger.error('G5 room_status_moment_g5 upsert failed in insertOnce', { error: e.message }); + })); } await Promise.all(promises); metricCollector.increment('db_insert_count', 1); diff --git a/bls-onoffline-backend/tests/g5DatabaseManager.test.js b/bls-onoffline-backend/tests/g5DatabaseManager.test.js index 6eccd22..1b7387a 100644 --- a/bls-onoffline-backend/tests/g5DatabaseManager.test.js +++ b/bls-onoffline-backend/tests/g5DatabaseManager.test.js @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js'; +import { dedupeRoomStatusSyncRows, mapCurrentStatusToG5Code } from '../src/db/g5DatabaseManager.js'; describe('G5 current_status mapping', () => { it('maps on/off/restart to numeric codes', () => { @@ -12,4 +12,16 @@ describe('G5 current_status mapping', () => { expect(mapCurrentStatusToG5Code('idle')).toBe(0); expect(mapCurrentStatusToG5Code(null)).toBe(0); }); + + it('dedupes room status sync rows by hotel_id and room_id using first row', () => { + const rows = dedupeRoomStatusSyncRows([ + { hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' }, + { hotel_id: 101, room_id: '8001', ip: '10.0.0.2:5678' }, + { hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' } + ]); + + expect(rows).toHaveLength(2); + expect(rows[0]).toEqual({ hotel_id: 101, room_id: '8001', ip: '10.0.0.1:1234' }); + expect(rows[1]).toEqual({ hotel_id: 101, room_id: '8002', ip: '10.0.0.3:9012' }); + }); }); diff --git a/docs/room_status_moment_g5.sql b/docs/room_status_moment_g5.sql new file mode 100644 index 0000000..fcf2f50 --- /dev/null +++ b/docs/room_status_moment_g5.sql @@ -0,0 +1,91 @@ +/* + Navicat Premium Dump SQL + + Source Server : FnOS 80 + Source Server Type : PostgreSQL + Source Server Version : 150017 (150017) + Source Host : 10.8.8.80:5434 + Source Catalog : log_platform + Source Schema : room_status + + Target Server Type : PostgreSQL + Target Server Version : 150017 (150017) + File Encoding : 65001 + + Date: 18/03/2026 10:55:09 +*/ + + +-- ---------------------------- +-- Table structure for room_status_moment_g5 +-- ---------------------------- +DROP TABLE IF EXISTS "room_status"."room_status_moment_g5"; +CREATE TABLE "room_status"."room_status_moment_g5" ( + "hotel_id" int2 NOT NULL, + "room_id" text COLLATE "pg_catalog"."default" NOT NULL, + "device_id" text COLLATE "pg_catalog"."default" NOT NULL, + "ts_ms" int8 NOT NULL DEFAULT ((EXTRACT(epoch FROM clock_timestamp()) * (1000)::numeric))::bigint, + "sys_lock_status" int2, + "online_status" int2, + "launcher_version" text COLLATE "pg_catalog"."default", + "app_version" text COLLATE "pg_catalog"."default", + "config_version" text COLLATE "pg_catalog"."default", + "register_ts_ms" int8, + "upgrade_ts_ms" int8, + "config_ts_ms" int8, + "ip" text COLLATE "pg_catalog"."default", + "pms_status" int2, + "power_state" int2, + "cardless_state" int2, + "service_mask" int8, + "insert_card" int2, + "bright_g" int2, + "agreement_ver" text COLLATE "pg_catalog"."default", + "air_address" _text COLLATE "pg_catalog"."default", + "air_state" _int2, + "air_model" _int2, + "air_speed" _int2, + "air_set_temp" _int2, + "air_now_temp" _int2, + "air_solenoid_valve" _int2, + "elec_address" _text COLLATE "pg_catalog"."default", + "elec_voltage" _float8, + "elec_ampere" _float8, + "elec_power" _float8, + "elec_phase" _float8, + "elec_energy" _float8, + "elec_sum_energy" _float8, + "carbon_state" int2, + "dev_loops" jsonb, + "energy_carbon_sum" float8, + "energy_nocard_sum" float8, + "external_device" jsonb DEFAULT '{}'::jsonb, + "faulty_device_count" jsonb DEFAULT '{}'::jsonb +) +WITH (fillfactor=90) +TABLESPACE "ts_hot" +; + +-- ---------------------------- +-- Indexes structure for table room_status_moment_g5 +-- ---------------------------- +CREATE INDEX "idx_rsm_g5_dashboard_query" ON "room_status"."room_status_moment_g5" USING btree ( + "hotel_id" "pg_catalog"."int2_ops" ASC NULLS LAST, + "online_status" "pg_catalog"."int2_ops" ASC NULLS LAST, + "power_state" "pg_catalog"."int2_ops" ASC NULLS LAST +); + +-- ---------------------------- +-- Triggers structure for table room_status_moment_g5 +-- ---------------------------- +CREATE TRIGGER "trg_update_rsm_ts_ms" BEFORE UPDATE ON "room_status"."room_status_moment_g5" +FOR EACH ROW +EXECUTE PROCEDURE "room_status"."update_ts_ms_g5"(); +CREATE TRIGGER "trigger_room_status_change" AFTER UPDATE ON "room_status"."room_status_moment_g5" +FOR EACH ROW +EXECUTE PROCEDURE "room_status"."handle_room_status_change"(); + +-- ---------------------------- +-- Primary Key structure for table room_status_moment_g5 +-- ---------------------------- +ALTER TABLE "room_status"."room_status_moment_g5" ADD CONSTRAINT "room_status_moment_g5_pkey" PRIMARY KEY ("hotel_id", "room_id");