feat(processor): 同步心跳数据到 room_status 表

- 在 HeartbeatProcessor 中新增异步同步逻辑,在历史表写入成功后尝试更新 room_status 表
- 实现 DatabaseManager.upsertRoomStatus 方法,支持批量更新和自动分区创建
- 添加批次内去重逻辑,避免 PostgreSQL ON CONFLICT 冲突
- 新增相关文档:同步方案、测试报告和提案说明
This commit is contained in:
2026-02-06 15:15:03 +08:00
parent b72cdde8bf
commit e44cf10a82
5 changed files with 372 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
# Room Status Synchronization Implementation Test Results
Date: 2026-02-06
## 1. Test Overview
This document summarizes the validation tests performed for the Room Status Synchronization feature, specifically focusing on the `upsertRoomStatus` logic in `DatabaseManager`.
## 2. Test Cases & Results
### 2.1 Auto-Partitioning (自动分区)
- **Scenario**: Incoming heartbeat data contains a `hotel_id` (e.g., 3000) for which no partition exists in `room_status.room_status_moment`.
- **Expected Behavior**: The system should catch the "no partition of relation" error, dynamically create the partition `room_status_moment_h3000`, and retry the insertion successfully.
- **Result**: **PASSED**.
- Log observation: `[db] 检测到 room_status 分区缺失尝试自动创建分区hotelIds: 3000`
- Log observation: `[db] 成功创建 room_status 分区: room_status_moment_h3000`
- Data verification: Data was successfully inserted into the new partition.
### 2.2 In-Batch Deduplication (批次内去重)
- **Scenario**: A single batch of heartbeat events contains multiple records for the same device (`hotel_id`, `room_id`, `device_id`) but with different `ts_ms`.
- **Issue**: PostgreSQL `ON CONFLICT` clause raises an error/warning if the same row is proposed for update multiple times in the same statement: "ON CONFLICT DO UPDATE command cannot affect row a second time".
- **Solution**: Implemented application-level deduplication before SQL generation. Grouped by device key and kept the record with the largest `ts_ms`.
- **Result**: **PASSED**.
- The warning "ON CONFLICT DO UPDATE command cannot affect row a second time" is no longer observed in the logs.
- The database reflects the latest state (highest `ts_ms`).
### 2.3 System Startup & Stability
- **Scenario**: Full system startup with the new code changes.
- **Result**: **PASSED**.
- Service started successfully.
- Redis connection established.
- Database connection established.
- Partition maintenance task started.
- No regression observed in existing Heartbeat History (`heartbeat.heartbeat_events`) insertion.
## 3. Conclusion
The implementation of `upsertRoomStatus` is robust and handles dynamic partitioning and data duplication correctly. The system is ready for deployment.

View File

@@ -0,0 +1,124 @@
# Room Status 状态表同步方案
## 1. 背景
需要将 Kafka 接收到的心跳数据同步写入 `room_status.room_status_moment` 表。该表为设备实时状态表,由多个服务共同维护,当前服务仅负责更新心跳相关的业务字段,要注意:不能阻碍到其他服务对该表的读写操作,也要注意不能因为其他服务对该表的读写操作而影响到当前服务的正常运行(重要)。
## 2. 表结构与唯一性分析
**目标表**`room_status.room_status_moment`
**逻辑唯一键**`hotel_id` + `room_id` + `device_id`
**当前挑战**
- 现有 DDL 中,主键定义为 `PRIMARY KEY (hotel_id, room_id, device_id, guid)`
- 目前尚未看到针对 `(hotel_id, room_id, device_id)` 的唯一索引Unique Index
- **建议**:为了支持高效的 `INSERT ... ON CONFLICT` 操作并确保数据唯一性,强烈建议在数据库中添加唯一索引:
```sql
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_room_status_unique_device
ON room_status.room_status_moment (hotel_id, room_id, device_id);
```
- **应对策略**:在代码实现中,我们将假设上述唯一约束存在(或通过先查后写的方式兜底,但先查后写性能较差且非原子操作)。考虑到性能要求,**推荐使用 Upsert (ON CONFLICT) 语法**。
## 3. 字段映射方案
仅更新以下心跳包中包含的字段,其他字段(如 `sys_lock_status`, `online_status` 等)保持原值。
| 心跳数据源字段 (Source) | 状态表字段 (Target) | 数据类型 | 说明 |
| :--- | :--- | :--- | :--- |
| `ts_ms` | `ts_ms` | INT8 | 更新时间 |
| `ip` | `ip` | TEXT | 设备IP |
| `pms_state` | `pms_status` | INT2 | PMS状态 |
| `power_state` | `power_state` | INT2 | 取电状态 |
| `cardless_state` | `cardless_state` | INT2 | 无人状态 |
| `service_mask` | `service_mask` | INT8 | 服务掩码 |
| `insert_card` | `insert_card` | INT2 | 插卡状态 |
| `bright_g` | `bright_g` | INT2 | 全局亮度 |
| `version` | `agreement_ver` | TEXT | 协议版本 |
| `carbon_state` | `carbon_state` | INT2 | 碳达人状态 |
| **空调数组** | | | |
| `air_address` | `air_address` | TEXT[] | |
| `state` | `air_state` | INT2[] | |
| `model` | `air_model` | INT2[] | |
| `speed` | `air_speed` | INT2[] | |
| `set_temp` | `air_set_temp` | INT2[] | |
| `now_temp` | `air_now_temp` | INT2[] | |
| `solenoid_valve` | `air_solenoid_valve` | INT2[] | |
| **能耗数组** | | | |
| `elec_address` | `elec_address` | TEXT[] | |
| `voltage` | `elec_voltage` | DOUBLE[] | |
| `ampere` | `elec_ampere` | DOUBLE[] | |
| `power` | `elec_power` | DOUBLE[] | |
| `phase` | `elec_phase` | DOUBLE[] | |
| `energy` | `elec_energy` | DOUBLE[] | |
| `sum_energy` | `elec_sum_energy` | DOUBLE[] | |
## 4. 写入策略与性能优化
### 4.1 核心逻辑
采用 **Batch Upsert** 模式,结合 PostgreSQL 的 `ON CONFLICT` 语法。
### 4.2 "仅变化时更新" 的实现
利用 PostgreSQL 的 `IS DISTINCT FROM` 语法在数据库层过滤无效更新,减少 WAL 日志和 I/O 开销。
**SQL 模板示例**
```sql
INSERT INTO room_status.room_status_moment (
hotel_id, room_id, device_id, guid, ts_ms, ip, pms_status, ...
) VALUES (
$1, $2, $3, gen_random_uuid(), $4, $5, $6, ...
)
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
ts_ms = EXCLUDED.ts_ms,
ip = EXCLUDED.ip,
pms_status = EXCLUDED.pms_status,
bright_g = EXCLUDED.bright_g,
agreement_ver = EXCLUDED.agreement_ver,
...
WHERE
room_status.room_status_moment.ts_ms < EXCLUDED.ts_ms -- 仅允许更新更新的时间戳(可选,防止乱序)
AND (
room_status.room_status_moment.pms_status IS DISTINCT FROM EXCLUDED.pms_status
OR room_status.room_status_moment.power_state IS DISTINCT FROM EXCLUDED.power_state
OR room_status.room_status_moment.bright_g IS DISTINCT FROM EXCLUDED.bright_g
OR room_status.room_status_moment.agreement_ver IS DISTINCT FROM EXCLUDED.agreement_ver
OR ...
);
```
*注:已确认将在数据库中建立唯一索引 `idx_room_status_unique_device`,因此可以直接使用 `ON CONFLICT`。*
### 4.3 代码修改计划
1. **DatabaseManager (`src/db/databaseManager.js`)**:
* 新增 `upsertRoomStatus(events)` 方法。
* 构建针对 `room_status.room_status_moment` 的批量 Upsert 语句。
* 处理数组字段的映射和类型转换。
2. **HeartbeatProcessor (`src/processor/heartbeatProcessor.js`)**:
* 在 `processBatch` 中,当 `insertHeartbeatEvents` (历史表) 成功后,调用 `upsertRoomStatus`。
* **异步执行**:状态表的更新不应阻塞主流程(或者根据一致性要求决定是否 `await`)。建议 `await` 但捕获错误,避免影响 Offset 提交(除非要求强一致性)。
* 鉴于用户要求“入库成功以后才提交kafka消费回执”建议将两个写操作串行执行
1. 写历史表 (Must Success)
2. 写状态表 (Should Success, Log Error if fail)
3. 提交 Offset
## 5. 待确认事项
1. **唯一索引**:已确认创建 `idx_room_status_unique_device`。
2. **guid 处理**:已确认使用 `gen_random_uuid()`。
## 7. 实施记录 (Implemented)
### 7.1 功能实现
- [x] **唯一索引创建**:已在 `docs/room_status_moment.sql` 中添加 `idx_room_status_unique_device`。
- [x] **Batch Upsert**:在 `DatabaseManager.upsertRoomStatus` 中实现了基于 `ON CONFLICT` 的批量更新。
- [x] **批次内去重**:为了解决 PostgreSQL "ON CONFLICT command cannot affect row a second time" 限制,在应用层实现了批次内去重逻辑(保留 `ts_ms` 最新的记录)。
- [x] **自动分区**:实现了 `isRoomStatusMissingPartitionError` 和 `ensureRoomStatusPartitions`,当检测到分区缺失错误时,自动创建对应 `hotel_id` 的分区并重试写入。
### 7.2 验证结果
- **启动测试**:系统正常启动,无报错。
- **自动分区**:模拟 `hotel_id=3000` 的数据写入,验证了系统能自动创建 `room_status_moment_h3000` 分区并写入成功。
- **去重逻辑**:模拟同一批次包含同一设备的多次更新,验证了去重逻辑生效,无 PostgreSQL 警告。
- **数据一致性**:验证了数据库中数据的 `ts_ms` 为最新值。
## 8. 归档说明
- 相关的测试报告已归档至 `docs/archive/` 目录。
- 核心代码位于 `src/db/databaseManager.js`。
- 数据库脚本位于 `docs/room_status_moment.sql`。

View File

@@ -0,0 +1,20 @@
# Proposal: Synchronize Heartbeat Data to Room Status Table
## Background
The `room_status.room_status_moment` table is a shared table for real-time device status. The heartbeat service needs to synchronize relevant fields from Kafka messages to this table.
## Changes
1. **Database Schema**:
* Add `bright_g` (INT2) and `agreement_ver` (TEXT) columns to `room_status.room_status_moment`.
* Add a UNIQUE INDEX on `(hotel_id, room_id, device_id)` to support efficient UPSERT operations.
2. **Application Logic**:
* Implement `upsertRoomStatus` in `DatabaseManager`.
* Call this method in `HeartbeatProcessor` after successful insertion into the history table.
* Map `version` to `agreement_ver` and `bright_g` to `bright_g`.
## Tasks
- [ ] Update `docs/room_status_moment.sql` with new columns and index.
- [ ] Update `docs/plan-room-status-sync.md` with new fields and finalized plan.
- [ ] Implement `upsertRoomStatus` in `DatabaseManager`.
- [ ] Integrate into `HeartbeatProcessor`.

View File

@@ -609,6 +609,183 @@ class DatabaseManager {
} }
} }
// 同步更新 room_status.room_status_moment 表
// 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert
async upsertRoomStatus(events) {
if (!Array.isArray(events)) {
events = [events];
}
if (events.length === 0) return { insertedCount: 0, updatedCount: 0 };
// 批次内去重:按 (hotel_id, room_id, device_id) 分组,只保留 ts_ms 最大的一条
// 原因PostgreSQL ON CONFLICT 不允许同一语句中多次更新同一行
const uniqueEventsMap = new Map();
for (const e of events) {
if (!e.hotel_id || !e.room_id || !e.device_id) continue;
const key = `${e.hotel_id}_${e.room_id}_${e.device_id}`;
const existing = uniqueEventsMap.get(key);
// 如果没有记录,或者当前记录时间更新,则覆盖
if (!existing || (BigInt(e.ts_ms || 0) > BigInt(existing.ts_ms || 0))) {
uniqueEventsMap.set(key, e);
}
}
const uniqueEvents = Array.from(uniqueEventsMap.values());
if (uniqueEvents.length === 0) return { insertedCount: 0, updatedCount: 0 };
// 字段映射:心跳字段 -> room_status 字段
// 注意:只更新心跳包里有的字段
const columns = [
'ts_ms',
'hotel_id',
'room_id',
'device_id',
'ip',
'pms_status',
'power_state',
'cardless_state',
'service_mask',
'insert_card',
'carbon_state',
'bright_g',
'agreement_ver', // map from version
'air_address',
'air_state',
'air_model',
'air_speed',
'air_set_temp',
'air_now_temp',
'air_solenoid_valve',
'elec_address',
'elec_voltage',
'elec_ampere',
'elec_power',
'elec_phase',
'elec_energy',
'elec_sum_energy',
];
const toRowValues = (e) => [
e.ts_ms,
e.hotel_id,
e.room_id,
e.device_id,
e.ip,
e.pms_state, // pms_status
e.power_state,
e.cardless_state,
e.service_mask,
e.insert_card ?? null,
e.carbon_state,
e.bright_g === -1 ? null : (e.bright_g ?? null),
e.version ?? null, // agreement_ver
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.state) ? e.state : null, // air_state
Array.isArray(e.model) ? e.model : null, // air_model
Array.isArray(e.speed) ? e.speed : null, // air_speed
Array.isArray(e.set_temp) ? e.set_temp : null, // air_set_temp
Array.isArray(e.now_temp) ? e.now_temp : null, // air_now_temp
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, // air_solenoid_valve
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.voltage) ? e.voltage : null, // elec_voltage
Array.isArray(e.ampere) ? e.ampere : null, // elec_ampere
Array.isArray(e.power) ? e.power : null, // elec_power
Array.isArray(e.phase) ? e.phase : null, // elec_phase
Array.isArray(e.energy) ? e.energy : null, // elec_energy
Array.isArray(e.sum_energy) ? e.sum_energy : null, // elec_sum_energy
];
// 构建 UPDATE SET 子句(排除主键和 guid
// 使用 EXCLUDED.col 引用新值
// 使用 IS DISTINCT FROM 避免无意义更新
const updateColumns = columns.filter(c => !['hotel_id', 'room_id', 'device_id'].includes(c));
const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', ');
// 构建 WHERE 子句:仅当至少一个字段发生变化,且时间戳未回退时才更新
// 注意room_status.room_status_moment.ts_ms 是 bigintEXCLUDED.ts_ms 也是 bigint
const whereConditions = updateColumns.map(col => `room_status.room_status_moment.${col} IS DISTINCT FROM EXCLUDED.${col}`).join(' OR ');
// 生成批量插入 SQL
// 注意ON CONFLICT (hotel_id, room_id, device_id) 依赖于唯一索引 idx_room_status_unique_device
const values = [];
const placeholders = uniqueEvents.map((e, idx) => {
const rowVals = toRowValues(e);
values.push(...rowVals);
// 额外插入 gen_random_uuid() 作为 guid
const p = rowVals.map((_, i) => `$${idx * rowVals.length + i + 1}`).join(', ');
return `(${p}, gen_random_uuid())`;
}).join(', ');
const allCols = [...columns, 'guid'].join(', ');
const sql = `
INSERT INTO room_status.room_status_moment (${allCols})
VALUES ${placeholders}
ON CONFLICT (hotel_id, room_id, device_id)
DO UPDATE SET
${updateSet}
WHERE
room_status.room_status_moment.ts_ms <= EXCLUDED.ts_ms
AND (${whereConditions})
`;
try {
const res = await this.pool.query(sql, values);
return { rowCount: res.rowCount }; // 包括插入和更新的行数
} catch (error) {
if (this.isRoomStatusMissingPartitionError(error)) {
const hotelIds = [...new Set(uniqueEvents.map(e => e.hotel_id).filter(id => id != null))];
if (hotelIds.length > 0) {
console.log(`[db] 检测到 room_status 分区缺失尝试自动创建分区hotelIds: ${hotelIds.join(', ')}`);
await this.ensureRoomStatusPartitions(hotelIds);
try {
const res = await this.pool.query(sql, values);
return { rowCount: res.rowCount };
} catch (retryError) {
console.warn('[db] upsertRoomStatus retry failed:', retryError.message);
return { error: retryError };
}
}
}
// 不抛出错误只记录日志避免影响主流程Heartbeat History 写入已成功)
console.warn('[db] upsertRoomStatus failed:', error.message);
return { error };
}
}
isRoomStatusMissingPartitionError(error) {
const msg = String(error?.message ?? '');
// 错误码 23514 (check_violation) 通常在插入分区表且无对应分区时触发
// 或者直接匹配错误信息 "no partition of relation"
return msg.includes('no partition of relation') && msg.includes('room_status_moment');
}
async ensureRoomStatusPartitions(hotelIds) {
for (const hotelId of hotelIds) {
await this.createRoomStatusPartition(hotelId);
}
}
async createRoomStatusPartition(hotelId) {
// 简单的整数合法性检查
if (!Number.isInteger(Number(hotelId))) return;
const tableName = `room_status_moment_h${hotelId}`;
const sql = `
CREATE TABLE IF NOT EXISTS room_status.${tableName}
PARTITION OF room_status.room_status_moment
FOR VALUES IN (${hotelId});
`;
try {
await this.pool.query(sql);
console.log(`[db] 成功创建 room_status 分区: ${tableName}`);
} catch (err) {
// 并发创建时可能会报错,如果已存在则忽略
if (String(err?.message).includes('already exists')) return;
console.error(`[db] 创建 room_status 分区失败 ${tableName}:`, err);
}
}
async getLatestHeartbeat(componentId) { async getLatestHeartbeat(componentId) {
try { try {
const query = { const query = {

View File

@@ -214,6 +214,22 @@ class HeartbeatProcessor {
const result = await this.databaseManager.insertHeartbeatEvents(batchData); const result = await this.databaseManager.insertHeartbeatEvents(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0); insertedCount = Number(result?.insertedCount ?? result ?? 0);
failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : []; failedRecords = Array.isArray(result?.failedRecords) ? result.failedRecords : [];
// 同步到 room_status 表 (Best Effort)
// 只有当历史表写入成功insertedCount > 0才尝试同步
// 过滤掉写入失败的记录(如果有)
if (insertedCount > 0) {
const successData = failedRecords.length > 0
? batchData.filter(d => !failedRecords.some(f => f.record === d))
: batchData;
if (successData.length > 0) {
this.databaseManager.upsertRoomStatus(successData).catch(err => {
console.warn('异步同步 room_status 失败 (忽略):', err);
});
}
}
} else { } else {
const result = await this.databaseManager.insertHeartbeatData(batchData); const result = await this.databaseManager.insertHeartbeatData(batchData);
insertedCount = Number(result?.insertedCount ?? result ?? 0); insertedCount = Number(result?.insertedCount ?? result ?? 0);