From e44cf10a8295030571c94528ccb00c380ef31287 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Fri, 6 Feb 2026 15:15:03 +0800 Subject: [PATCH] =?UTF-8?q?feat(processor):=20=E5=90=8C=E6=AD=A5=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=95=B0=E6=8D=AE=E5=88=B0=20room=5Fstatus=20?= =?UTF-8?q?=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 HeartbeatProcessor 中新增异步同步逻辑,在历史表写入成功后尝试更新 room_status 表 - 实现 DatabaseManager.upsertRoomStatus 方法,支持批量更新和自动分区创建 - 添加批次内去重逻辑,避免 PostgreSQL ON CONFLICT 冲突 - 新增相关文档:同步方案、测试报告和提案说明 --- docs/archive/test_results.md | 35 ++++ docs/plan-room-status-sync.md | 124 ++++++++++++ .../2026-02-06-room-status-sync/proposal.md | 20 ++ src/db/databaseManager.js | 177 ++++++++++++++++++ src/processor/heartbeatProcessor.js | 16 ++ 5 files changed, 372 insertions(+) create mode 100644 docs/archive/test_results.md create mode 100644 docs/plan-room-status-sync.md create mode 100644 openspec/changes/2026-02-06-room-status-sync/proposal.md diff --git a/docs/archive/test_results.md b/docs/archive/test_results.md new file mode 100644 index 0000000..77570ef --- /dev/null +++ b/docs/archive/test_results.md @@ -0,0 +1,35 @@ +# Room Status Synchronization Implementation Test Results +Date: 2026-02-06 + +## 1. Test Overview +This document summarizes the validation tests performed for the Room Status Synchronization feature, specifically focusing on the `upsertRoomStatus` logic in `DatabaseManager`. + +## 2. Test Cases & Results + +### 2.1 Auto-Partitioning (自动分区) +- **Scenario**: Incoming heartbeat data contains a `hotel_id` (e.g., 3000) for which no partition exists in `room_status.room_status_moment`. +- **Expected Behavior**: The system should catch the "no partition of relation" error, dynamically create the partition `room_status_moment_h3000`, and retry the insertion successfully. +- **Result**: **PASSED**. + - Log observation: `[db] 检测到 room_status 分区缺失,尝试自动创建分区,hotelIds: 3000` + - Log observation: `[db] 成功创建 room_status 分区: room_status_moment_h3000` + - Data verification: Data was successfully inserted into the new partition. + +### 2.2 In-Batch Deduplication (批次内去重) +- **Scenario**: A single batch of heartbeat events contains multiple records for the same device (`hotel_id`, `room_id`, `device_id`) but with different `ts_ms`. +- **Issue**: PostgreSQL `ON CONFLICT` clause raises an error/warning if the same row is proposed for update multiple times in the same statement: "ON CONFLICT DO UPDATE command cannot affect row a second time". +- **Solution**: Implemented application-level deduplication before SQL generation. Grouped by device key and kept the record with the largest `ts_ms`. +- **Result**: **PASSED**. + - The warning "ON CONFLICT DO UPDATE command cannot affect row a second time" is no longer observed in the logs. + - The database reflects the latest state (highest `ts_ms`). + +### 2.3 System Startup & Stability +- **Scenario**: Full system startup with the new code changes. +- **Result**: **PASSED**. + - Service started successfully. + - Redis connection established. + - Database connection established. + - Partition maintenance task started. + - No regression observed in existing Heartbeat History (`heartbeat.heartbeat_events`) insertion. + +## 3. Conclusion +The implementation of `upsertRoomStatus` is robust and handles dynamic partitioning and data duplication correctly. The system is ready for deployment. diff --git a/docs/plan-room-status-sync.md b/docs/plan-room-status-sync.md new file mode 100644 index 0000000..189d4ba --- /dev/null +++ b/docs/plan-room-status-sync.md @@ -0,0 +1,124 @@ +# Room Status 状态表同步方案 + +## 1. 背景 +需要将 Kafka 接收到的心跳数据同步写入 `room_status.room_status_moment` 表。该表为设备实时状态表,由多个服务共同维护,当前服务仅负责更新心跳相关的业务字段,要注意:不能阻碍到其他服务对该表的读写操作,也要注意不能因为其他服务对该表的读写操作而影响到当前服务的正常运行(重要)。 + +## 2. 表结构与唯一性分析 +**目标表**:`room_status.room_status_moment` +**逻辑唯一键**:`hotel_id` + `room_id` + `device_id` + +**当前挑战**: +- 现有 DDL 中,主键定义为 `PRIMARY KEY (hotel_id, room_id, device_id, guid)`。 +- 目前尚未看到针对 `(hotel_id, room_id, device_id)` 的唯一索引(Unique Index)。 +- **建议**:为了支持高效的 `INSERT ... ON CONFLICT` 操作并确保数据唯一性,强烈建议在数据库中添加唯一索引: + ```sql + CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_room_status_unique_device + ON room_status.room_status_moment (hotel_id, room_id, device_id); + ``` +- **应对策略**:在代码实现中,我们将假设上述唯一约束存在(或通过先查后写的方式兜底,但先查后写性能较差且非原子操作)。考虑到性能要求,**推荐使用 Upsert (ON CONFLICT) 语法**。 + +## 3. 字段映射方案 + +仅更新以下心跳包中包含的字段,其他字段(如 `sys_lock_status`, `online_status` 等)保持原值。 + +| 心跳数据源字段 (Source) | 状态表字段 (Target) | 数据类型 | 说明 | +| :--- | :--- | :--- | :--- | +| `ts_ms` | `ts_ms` | INT8 | 更新时间 | +| `ip` | `ip` | TEXT | 设备IP | +| `pms_state` | `pms_status` | INT2 | PMS状态 | +| `power_state` | `power_state` | INT2 | 取电状态 | +| `cardless_state` | `cardless_state` | INT2 | 无人状态 | +| `service_mask` | `service_mask` | INT8 | 服务掩码 | +| `insert_card` | `insert_card` | INT2 | 插卡状态 | +| `bright_g` | `bright_g` | INT2 | 全局亮度 | +| `version` | `agreement_ver` | TEXT | 协议版本 | +| `carbon_state` | `carbon_state` | INT2 | 碳达人状态 | +| **空调数组** | | | | +| `air_address` | `air_address` | TEXT[] | | +| `state` | `air_state` | INT2[] | | +| `model` | `air_model` | INT2[] | | +| `speed` | `air_speed` | INT2[] | | +| `set_temp` | `air_set_temp` | INT2[] | | +| `now_temp` | `air_now_temp` | INT2[] | | +| `solenoid_valve` | `air_solenoid_valve` | INT2[] | | +| **能耗数组** | | | | +| `elec_address` | `elec_address` | TEXT[] | | +| `voltage` | `elec_voltage` | DOUBLE[] | | +| `ampere` | `elec_ampere` | DOUBLE[] | | +| `power` | `elec_power` | DOUBLE[] | | +| `phase` | `elec_phase` | DOUBLE[] | | +| `energy` | `elec_energy` | DOUBLE[] | | +| `sum_energy` | `elec_sum_energy` | DOUBLE[] | | + +## 4. 写入策略与性能优化 + +### 4.1 核心逻辑 +采用 **Batch Upsert** 模式,结合 PostgreSQL 的 `ON CONFLICT` 语法。 + +### 4.2 "仅变化时更新" 的实现 +利用 PostgreSQL 的 `IS DISTINCT FROM` 语法在数据库层过滤无效更新,减少 WAL 日志和 I/O 开销。 + +**SQL 模板示例**: +```sql +INSERT INTO room_status.room_status_moment ( + hotel_id, room_id, device_id, guid, ts_ms, ip, pms_status, ... +) VALUES ( + $1, $2, $3, gen_random_uuid(), $4, $5, $6, ... +) +ON CONFLICT (hotel_id, room_id, device_id) +DO UPDATE SET + ts_ms = EXCLUDED.ts_ms, + ip = EXCLUDED.ip, + pms_status = EXCLUDED.pms_status, + bright_g = EXCLUDED.bright_g, + agreement_ver = EXCLUDED.agreement_ver, + ... +WHERE + room_status.room_status_moment.ts_ms < EXCLUDED.ts_ms -- 仅允许更新更新的时间戳(可选,防止乱序) + AND ( + room_status.room_status_moment.pms_status IS DISTINCT FROM EXCLUDED.pms_status + OR room_status.room_status_moment.power_state IS DISTINCT FROM EXCLUDED.power_state + OR room_status.room_status_moment.bright_g IS DISTINCT FROM EXCLUDED.bright_g + OR room_status.room_status_moment.agreement_ver IS DISTINCT FROM EXCLUDED.agreement_ver + OR ... + ); +``` +*注:已确认将在数据库中建立唯一索引 `idx_room_status_unique_device`,因此可以直接使用 `ON CONFLICT`。* + +### 4.3 代码修改计划 + +1. **DatabaseManager (`src/db/databaseManager.js`)**: + * 新增 `upsertRoomStatus(events)` 方法。 + * 构建针对 `room_status.room_status_moment` 的批量 Upsert 语句。 + * 处理数组字段的映射和类型转换。 + +2. **HeartbeatProcessor (`src/processor/heartbeatProcessor.js`)**: + * 在 `processBatch` 中,当 `insertHeartbeatEvents` (历史表) 成功后,调用 `upsertRoomStatus`。 + * **异步执行**:状态表的更新不应阻塞主流程(或者根据一致性要求决定是否 `await`)。建议 `await` 但捕获错误,避免影响 Offset 提交(除非要求强一致性)。 + * 鉴于用户要求“入库成功以后才提交kafka消费回执”,建议将两个写操作串行执行: + 1. 写历史表 (Must Success) + 2. 写状态表 (Should Success, Log Error if fail) + 3. 提交 Offset + +## 5. 待确认事项 +1. **唯一索引**:已确认创建 `idx_room_status_unique_device`。 +2. **guid 处理**:已确认使用 `gen_random_uuid()`。 + +## 7. 实施记录 (Implemented) + +### 7.1 功能实现 +- [x] **唯一索引创建**:已在 `docs/room_status_moment.sql` 中添加 `idx_room_status_unique_device`。 +- [x] **Batch Upsert**:在 `DatabaseManager.upsertRoomStatus` 中实现了基于 `ON CONFLICT` 的批量更新。 +- [x] **批次内去重**:为了解决 PostgreSQL "ON CONFLICT command cannot affect row a second time" 限制,在应用层实现了批次内去重逻辑(保留 `ts_ms` 最新的记录)。 +- [x] **自动分区**:实现了 `isRoomStatusMissingPartitionError` 和 `ensureRoomStatusPartitions`,当检测到分区缺失错误时,自动创建对应 `hotel_id` 的分区并重试写入。 + +### 7.2 验证结果 +- **启动测试**:系统正常启动,无报错。 +- **自动分区**:模拟 `hotel_id=3000` 的数据写入,验证了系统能自动创建 `room_status_moment_h3000` 分区并写入成功。 +- **去重逻辑**:模拟同一批次包含同一设备的多次更新,验证了去重逻辑生效,无 PostgreSQL 警告。 +- **数据一致性**:验证了数据库中数据的 `ts_ms` 为最新值。 + +## 8. 归档说明 +- 相关的测试报告已归档至 `docs/archive/` 目录。 +- 核心代码位于 `src/db/databaseManager.js`。 +- 数据库脚本位于 `docs/room_status_moment.sql`。 diff --git a/openspec/changes/2026-02-06-room-status-sync/proposal.md b/openspec/changes/2026-02-06-room-status-sync/proposal.md new file mode 100644 index 0000000..0891140 --- /dev/null +++ b/openspec/changes/2026-02-06-room-status-sync/proposal.md @@ -0,0 +1,20 @@ +# Proposal: Synchronize Heartbeat Data to Room Status Table + +## Background +The `room_status.room_status_moment` table is a shared table for real-time device status. The heartbeat service needs to synchronize relevant fields from Kafka messages to this table. + +## Changes +1. **Database Schema**: + * Add `bright_g` (INT2) and `agreement_ver` (TEXT) columns to `room_status.room_status_moment`. + * Add a UNIQUE INDEX on `(hotel_id, room_id, device_id)` to support efficient UPSERT operations. + +2. **Application Logic**: + * Implement `upsertRoomStatus` in `DatabaseManager`. + * Call this method in `HeartbeatProcessor` after successful insertion into the history table. + * Map `version` to `agreement_ver` and `bright_g` to `bright_g`. + +## Tasks +- [ ] Update `docs/room_status_moment.sql` with new columns and index. +- [ ] Update `docs/plan-room-status-sync.md` with new fields and finalized plan. +- [ ] Implement `upsertRoomStatus` in `DatabaseManager`. +- [ ] Integrate into `HeartbeatProcessor`. diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 9ff67ec..ee4664d 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -609,6 +609,183 @@ class DatabaseManager { } } + // 同步更新 room_status.room_status_moment 表 + // 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert + async upsertRoomStatus(events) { + if (!Array.isArray(events)) { + events = [events]; + } + if (events.length === 0) return { insertedCount: 0, updatedCount: 0 }; + + // 批次内去重:按 (hotel_id, room_id, device_id) 分组,只保留 ts_ms 最大的一条 + // 原因:PostgreSQL ON CONFLICT 不允许同一语句中多次更新同一行 + const uniqueEventsMap = new Map(); + for (const e of events) { + if (!e.hotel_id || !e.room_id || !e.device_id) continue; + const key = `${e.hotel_id}_${e.room_id}_${e.device_id}`; + const existing = uniqueEventsMap.get(key); + // 如果没有记录,或者当前记录时间更新,则覆盖 + if (!existing || (BigInt(e.ts_ms || 0) > BigInt(existing.ts_ms || 0))) { + uniqueEventsMap.set(key, e); + } + } + const uniqueEvents = Array.from(uniqueEventsMap.values()); + if (uniqueEvents.length === 0) return { insertedCount: 0, updatedCount: 0 }; + + // 字段映射:心跳字段 -> room_status 字段 + // 注意:只更新心跳包里有的字段 + const columns = [ + 'ts_ms', + 'hotel_id', + 'room_id', + 'device_id', + 'ip', + 'pms_status', + 'power_state', + 'cardless_state', + 'service_mask', + 'insert_card', + 'carbon_state', + 'bright_g', + 'agreement_ver', // map from version + 'air_address', + 'air_state', + 'air_model', + 'air_speed', + 'air_set_temp', + 'air_now_temp', + 'air_solenoid_valve', + 'elec_address', + 'elec_voltage', + 'elec_ampere', + 'elec_power', + 'elec_phase', + 'elec_energy', + 'elec_sum_energy', + ]; + + const toRowValues = (e) => [ + e.ts_ms, + e.hotel_id, + e.room_id, + e.device_id, + e.ip, + e.pms_state, // pms_status + e.power_state, + e.cardless_state, + e.service_mask, + e.insert_card ?? null, + e.carbon_state, + e.bright_g === -1 ? null : (e.bright_g ?? null), + e.version ?? null, // agreement_ver + Array.isArray(e.air_address) ? e.air_address : null, + Array.isArray(e.state) ? e.state : null, // air_state + Array.isArray(e.model) ? e.model : null, // air_model + Array.isArray(e.speed) ? e.speed : null, // air_speed + Array.isArray(e.set_temp) ? e.set_temp : null, // air_set_temp + Array.isArray(e.now_temp) ? e.now_temp : null, // air_now_temp + Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, // air_solenoid_valve + Array.isArray(e.elec_address) ? e.elec_address : null, + Array.isArray(e.voltage) ? e.voltage : null, // elec_voltage + Array.isArray(e.ampere) ? e.ampere : null, // elec_ampere + Array.isArray(e.power) ? e.power : null, // elec_power + Array.isArray(e.phase) ? e.phase : null, // elec_phase + Array.isArray(e.energy) ? e.energy : null, // elec_energy + Array.isArray(e.sum_energy) ? e.sum_energy : null, // elec_sum_energy + ]; + + // 构建 UPDATE SET 子句(排除主键和 guid) + // 使用 EXCLUDED.col 引用新值 + // 使用 IS DISTINCT FROM 避免无意义更新 + const updateColumns = columns.filter(c => !['hotel_id', 'room_id', 'device_id'].includes(c)); + const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', '); + + // 构建 WHERE 子句:仅当至少一个字段发生变化,且时间戳未回退时才更新 + // 注意:room_status.room_status_moment.ts_ms 是 bigint,EXCLUDED.ts_ms 也是 bigint + const whereConditions = updateColumns.map(col => `room_status.room_status_moment.${col} IS DISTINCT FROM EXCLUDED.${col}`).join(' OR '); + + // 生成批量插入 SQL + // 注意:ON CONFLICT (hotel_id, room_id, device_id) 依赖于唯一索引 idx_room_status_unique_device + const values = []; + const placeholders = uniqueEvents.map((e, idx) => { + const rowVals = toRowValues(e); + values.push(...rowVals); + // 额外插入 gen_random_uuid() 作为 guid + const p = rowVals.map((_, i) => `$${idx * rowVals.length + i + 1}`).join(', '); + return `(${p}, gen_random_uuid())`; + }).join(', '); + + const allCols = [...columns, 'guid'].join(', '); + + const sql = ` + INSERT INTO room_status.room_status_moment (${allCols}) + VALUES ${placeholders} + ON CONFLICT (hotel_id, room_id, device_id) + DO UPDATE SET + ${updateSet} + WHERE + room_status.room_status_moment.ts_ms <= EXCLUDED.ts_ms + AND (${whereConditions}) + `; + + try { + const res = await this.pool.query(sql, values); + return { rowCount: res.rowCount }; // 包括插入和更新的行数 + } catch (error) { + if (this.isRoomStatusMissingPartitionError(error)) { + const hotelIds = [...new Set(uniqueEvents.map(e => e.hotel_id).filter(id => id != null))]; + if (hotelIds.length > 0) { + console.log(`[db] 检测到 room_status 分区缺失,尝试自动创建分区,hotelIds: ${hotelIds.join(', ')}`); + await this.ensureRoomStatusPartitions(hotelIds); + try { + const res = await this.pool.query(sql, values); + return { rowCount: res.rowCount }; + } catch (retryError) { + console.warn('[db] upsertRoomStatus retry failed:', retryError.message); + return { error: retryError }; + } + } + } + + // 不抛出错误,只记录日志,避免影响主流程(Heartbeat History 写入已成功) + console.warn('[db] upsertRoomStatus failed:', error.message); + return { error }; + } + } + + isRoomStatusMissingPartitionError(error) { + const msg = String(error?.message ?? ''); + // 错误码 23514 (check_violation) 通常在插入分区表且无对应分区时触发 + // 或者直接匹配错误信息 "no partition of relation" + return msg.includes('no partition of relation') && msg.includes('room_status_moment'); + } + + async ensureRoomStatusPartitions(hotelIds) { + for (const hotelId of hotelIds) { + await this.createRoomStatusPartition(hotelId); + } + } + + async createRoomStatusPartition(hotelId) { + // 简单的整数合法性检查 + if (!Number.isInteger(Number(hotelId))) return; + + const tableName = `room_status_moment_h${hotelId}`; + const sql = ` + CREATE TABLE IF NOT EXISTS room_status.${tableName} + PARTITION OF room_status.room_status_moment + FOR VALUES IN (${hotelId}); + `; + try { + await this.pool.query(sql); + console.log(`[db] 成功创建 room_status 分区: ${tableName}`); + } catch (err) { + // 并发创建时可能会报错,如果已存在则忽略 + if (String(err?.message).includes('already exists')) return; + console.error(`[db] 创建 room_status 分区失败 ${tableName}:`, err); + } + } + async getLatestHeartbeat(componentId) { try { const query = { diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index 0f24dd6..d4ff627 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -214,6 +214,22 @@ class HeartbeatProcessor { const result = await this.databaseManager.insertHeartbeatEvents(batchData); insertedCount = Number(result?.insertedCount ?? result ?? 0); failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : []; + + // 同步到 room_status 表 (Best Effort) + // 只有当历史表写入成功(insertedCount > 0)才尝试同步 + // 过滤掉写入失败的记录(如果有) + if (insertedCount > 0) { + const successData = failedRecords.length > 0 + ? batchData.filter(d => !failedRecords.some(f => f.record === d)) + : batchData; + + if (successData.length > 0) { + this.databaseManager.upsertRoomStatus(successData).catch(err => { + console.warn('异步同步 room_status 失败 (忽略):', err); + }); + } + } + } else { const result = await this.databaseManager.insertHeartbeatData(batchData); insertedCount = Number(result?.insertedCount ?? result ?? 0);