diff --git a/openspec/changes/add-g5-independent-write/specs/db/spec.md b/openspec/changes/add-g5-independent-write/specs/db/spec.md index c010006..739c306 100644 --- a/openspec/changes/add-g5-independent-write/specs/db/spec.md +++ b/openspec/changes/add-g5-independent-write/specs/db/spec.md @@ -71,4 +71,15 @@ - **WHEN** 系统收到任意一包属于同一 `(hotel_id, room_id)` 的 G5 room_status 数据 - **THEN** 即使业务字段与现存记录完全一致,也应执行一次 `ON CONFLICT DO UPDATE` - **AND** 不应在 G5 room_status 的冲突更新路径上使用“字段无变化则跳过 UPDATE”的 WHERE 条件 -- **AND** 应依赖数据库触发器自动刷新 `ts_ms` \ No newline at end of file +- **AND** 应依赖数据库触发器自动刷新 `ts_ms` + +#### Scenario: G5 room_status 写入前按冲突键排序并去重 +- **WHEN** 一批 G5 room_status 数据进入写入阶段 +- **THEN** 系统应先按 `(hotel_id, room_id)` 排序 +- **AND** 对同一 `(hotel_id, room_id)` 仅保留 `ts_ms` 最大的一条记录 +- **AND** 去重后的结果再进入 `INSERT ... ON CONFLICT DO UPDATE` + +#### Scenario: G5 room_status 死锁自动重试 +- **WHEN** G5 room_status 的 upsert 遇到 PostgreSQL 死锁错误 `40P01` +- **THEN** 系统应进行短暂退避后自动重试 +- **AND** 重试应仅作用于 G5 room_status 路径 \ No newline at end of file diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 448ec73..19d813f 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -130,6 +130,14 @@ class DatabaseManager { return false; } + _isDeadlockError(error) { + return String(error?.code ?? '') === '40P01'; + } + + async _sleep(ms) { + await new Promise((resolve) => setTimeout(resolve, ms)); + } + // ---- 旧表列定义 ---- _getLegacyColumns() { @@ -785,7 +793,7 @@ class DatabaseManager { } _buildRoomStatusUpsertQuery(events, target) { - const { tableName, conflictColumns, includeGuid, tableRef, forceOnlineStatusOnWrite, forceUpdateOnConflict } = target; + const { tableName, conflictColumns, includeGuid, tableRef, forceOnlineStatusOnWrite, forceUpdateOnConflict, sortAndDedupByConflictKey } = target; const columns = this._getRoomStatusBaseColumns(); const uniqueEventsMap = new Map(); @@ -799,7 +807,22 @@ class DatabaseManager { } } - const uniqueEvents = Array.from(uniqueEventsMap.values()); + let uniqueEvents = Array.from(uniqueEventsMap.values()); + if (sortAndDedupByConflictKey) { + uniqueEvents = uniqueEvents.sort((a, b) => { + for (const column of conflictColumns) { + const av = String(a?.[column] ?? ''); + const bv = String(b?.[column] ?? ''); + const cmp = av.localeCompare(bv, 'zh-CN', { numeric: true }); + if (cmp !== 0) return cmp; + } + const ats = BigInt(a?.ts_ms ?? 0); + const bts = BigInt(b?.ts_ms ?? 0); + if (ats === bts) return 0; + return ats < bts ? -1 : 1; + }); + } + if (uniqueEvents.length === 0) { return { sql: null, values: [], uniqueEvents: [] }; } @@ -855,28 +878,44 @@ class DatabaseManager { const { sql, values, uniqueEvents } = this._buildRoomStatusUpsertQuery(events, target); if (!sql || uniqueEvents.length === 0) return { rowCount: 0 }; - try { - const res = await this.pool.query(sql, values); - return { rowCount: res.rowCount }; - } catch (error) { - if (target.autoCreatePartitions && this.isRoomStatusMissingPartitionError(error)) { - const hotelIds = [...new Set(uniqueEvents.map((event) => event.hotel_id).filter((id) => id != null))]; - if (hotelIds.length > 0) { - console.log(`[db] 检测到 room_status 分区缺失,尝试自动创建分区,hotelIds: ${hotelIds.join(', ')}`); - await this.ensureRoomStatusPartitions(hotelIds); - try { - const res = await this.pool.query(sql, values); - return { rowCount: res.rowCount }; - } catch (retryError) { - console.warn(`[db] ${target.logPrefix} retry failed:`, retryError.message); - return { error: retryError }; + const deadlockRetryAttempts = Number(target.deadlockRetryAttempts ?? 0); + const deadlockRetryBaseDelayMs = Number(target.deadlockRetryBaseDelayMs ?? 100); + + for (let attempt = 0; attempt <= deadlockRetryAttempts; attempt += 1) { + try { + const res = await this.pool.query(sql, values); + return { rowCount: res.rowCount }; + } catch (error) { + if (target.autoCreatePartitions && this.isRoomStatusMissingPartitionError(error)) { + const hotelIds = [...new Set(uniqueEvents.map((event) => event.hotel_id).filter((id) => id != null))]; + if (hotelIds.length > 0) { + console.log(`[db] 检测到 room_status 分区缺失,尝试自动创建分区,hotelIds: ${hotelIds.join(', ')}`); + await this.ensureRoomStatusPartitions(hotelIds); + try { + const res = await this.pool.query(sql, values); + return { rowCount: res.rowCount }; + } catch (retryError) { + console.warn(`[db] ${target.logPrefix} retry failed:`, retryError.message); + return { error: retryError }; + } } } - } - console.warn(`[db] ${target.logPrefix} failed:`, error.message); - return { error }; + const shouldRetryDeadlock = this._isDeadlockError(error) && attempt < deadlockRetryAttempts; + if (shouldRetryDeadlock) { + const jitter = Math.floor(Math.random() * Math.max(1, deadlockRetryBaseDelayMs)); + const delay = deadlockRetryBaseDelayMs * (attempt + 1) + jitter; + console.warn(`[db] ${target.logPrefix} deadlock detected, retrying in ${delay}ms (attempt ${attempt + 1}/${deadlockRetryAttempts})`); + await this._sleep(delay); + continue; + } + + console.warn(`[db] ${target.logPrefix} failed:`, error.message); + return { error }; + } } + + return { error: new Error(`${target.logPrefix} failed after deadlock retries`) }; } @@ -892,6 +931,9 @@ class DatabaseManager { tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment', forceOnlineStatusOnWrite: false, forceUpdateOnConflict: false, + sortAndDedupByConflictKey: false, + deadlockRetryAttempts: 0, + deadlockRetryBaseDelayMs: 100, logPrefix: 'upsertRoomStatus', }); } @@ -905,6 +947,9 @@ class DatabaseManager { tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, + sortAndDedupByConflictKey: true, + deadlockRetryAttempts: 3, + deadlockRetryBaseDelayMs: 100, logPrefix: 'upsertRoomStatusG5', }); } diff --git a/test/dualWrite.test.js b/test/dualWrite.test.js index 898a477..bf90a43 100644 --- a/test/dualWrite.test.js +++ b/test/dualWrite.test.js @@ -625,11 +625,73 @@ describe('DatabaseManager: room_status upsert SQL', () => { tableRef: 'room_status.room_status_moment_g5', forceOnlineStatusOnWrite: true, forceUpdateOnConflict: true, + sortAndDedupByConflictKey: true, logPrefix: 'upsertRoomStatusG5', }); assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/); assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/); }); + + it('sorts and dedups g5 room_status by conflict key, keeping max ts_ms', () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); + const built = dm._buildRoomStatusUpsertQuery([ + { ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 1000, device_id: 'dev-old-102' }, + { ...buildBasePayload(), hotel_id: 1, room_id: '101', ts_ms: 3000, device_id: 'dev-101' }, + { ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 5000, device_id: 'dev-new-102' }, + ], { + tableName: 'room_status.room_status_moment_g5', + conflictColumns: ['hotel_id', 'room_id'], + includeGuid: false, + autoCreatePartitions: false, + tableRef: 'room_status.room_status_moment_g5', + forceOnlineStatusOnWrite: true, + forceUpdateOnConflict: true, + sortAndDedupByConflictKey: true, + logPrefix: 'upsertRoomStatusG5', + }); + + assert.equal(built.uniqueEvents.length, 2); + assert.equal(built.uniqueEvents[0].hotel_id, 1); + assert.equal(built.uniqueEvents[0].room_id, '101'); + assert.equal(built.uniqueEvents[1].hotel_id, 2); + assert.equal(built.uniqueEvents[1].room_id, '102'); + assert.equal(built.uniqueEvents[1].ts_ms, 5000); + assert.equal(built.uniqueEvents[1].device_id, 'dev-new-102'); + }); + + it('retries g5 room_status on deadlock', async () => { + const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' }); + let calls = 0; + dm.pool = { + query: async () => { + calls += 1; + if (calls < 3) { + const err = new Error('deadlock detected'); + err.code = '40P01'; + throw err; + } + return { rowCount: 1 }; + }, + }; + dm._sleep = async () => {}; + + const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], { + tableName: 'room_status.room_status_moment_g5', + conflictColumns: ['hotel_id', 'room_id'], + includeGuid: false, + autoCreatePartitions: false, + tableRef: 'room_status.room_status_moment_g5', + forceOnlineStatusOnWrite: true, + forceUpdateOnConflict: true, + sortAndDedupByConflictKey: true, + deadlockRetryAttempts: 3, + deadlockRetryBaseDelayMs: 1, + logPrefix: 'upsertRoomStatusG5', + }); + + assert.equal(result.rowCount, 1); + assert.equal(calls, 3); + }); }); describe('DatabaseManager: insertHeartbeatEventsDual', () => {