feat: 添加 G5 room_status 写入的冲突键排序与去重功能,并实现死锁自动重试机制
This commit is contained in:
@@ -72,3 +72,14 @@
|
|||||||
- **THEN** 即使业务字段与现存记录完全一致,也应执行一次 `ON CONFLICT DO UPDATE`
|
- **THEN** 即使业务字段与现存记录完全一致,也应执行一次 `ON CONFLICT DO UPDATE`
|
||||||
- **AND** 不应在 G5 room_status 的冲突更新路径上使用“字段无变化则跳过 UPDATE”的 WHERE 条件
|
- **AND** 不应在 G5 room_status 的冲突更新路径上使用“字段无变化则跳过 UPDATE”的 WHERE 条件
|
||||||
- **AND** 应依赖数据库触发器自动刷新 `ts_ms`
|
- **AND** 应依赖数据库触发器自动刷新 `ts_ms`
|
||||||
|
|
||||||
|
#### Scenario: G5 room_status 写入前按冲突键排序并去重
|
||||||
|
- **WHEN** 一批 G5 room_status 数据进入写入阶段
|
||||||
|
- **THEN** 系统应先按 `(hotel_id, room_id)` 排序
|
||||||
|
- **AND** 对同一 `(hotel_id, room_id)` 仅保留 `ts_ms` 最大的一条记录
|
||||||
|
- **AND** 去重后的结果再进入 `INSERT ... ON CONFLICT DO UPDATE`
|
||||||
|
|
||||||
|
#### Scenario: G5 room_status 死锁自动重试
|
||||||
|
- **WHEN** G5 room_status 的 upsert 遇到 PostgreSQL 死锁错误 `40P01`
|
||||||
|
- **THEN** 系统应进行短暂退避后自动重试
|
||||||
|
- **AND** 重试应仅作用于 G5 room_status 路径
|
||||||
@@ -130,6 +130,14 @@ class DatabaseManager {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_isDeadlockError(error) {
|
||||||
|
return String(error?.code ?? '') === '40P01';
|
||||||
|
}
|
||||||
|
|
||||||
|
async _sleep(ms) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
|
}
|
||||||
|
|
||||||
// ---- 旧表列定义 ----
|
// ---- 旧表列定义 ----
|
||||||
|
|
||||||
_getLegacyColumns() {
|
_getLegacyColumns() {
|
||||||
@@ -785,7 +793,7 @@ class DatabaseManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_buildRoomStatusUpsertQuery(events, target) {
|
_buildRoomStatusUpsertQuery(events, target) {
|
||||||
const { tableName, conflictColumns, includeGuid, tableRef, forceOnlineStatusOnWrite, forceUpdateOnConflict } = target;
|
const { tableName, conflictColumns, includeGuid, tableRef, forceOnlineStatusOnWrite, forceUpdateOnConflict, sortAndDedupByConflictKey } = target;
|
||||||
const columns = this._getRoomStatusBaseColumns();
|
const columns = this._getRoomStatusBaseColumns();
|
||||||
const uniqueEventsMap = new Map();
|
const uniqueEventsMap = new Map();
|
||||||
|
|
||||||
@@ -799,7 +807,22 @@ class DatabaseManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const uniqueEvents = Array.from(uniqueEventsMap.values());
|
let uniqueEvents = Array.from(uniqueEventsMap.values());
|
||||||
|
if (sortAndDedupByConflictKey) {
|
||||||
|
uniqueEvents = uniqueEvents.sort((a, b) => {
|
||||||
|
for (const column of conflictColumns) {
|
||||||
|
const av = String(a?.[column] ?? '');
|
||||||
|
const bv = String(b?.[column] ?? '');
|
||||||
|
const cmp = av.localeCompare(bv, 'zh-CN', { numeric: true });
|
||||||
|
if (cmp !== 0) return cmp;
|
||||||
|
}
|
||||||
|
const ats = BigInt(a?.ts_ms ?? 0);
|
||||||
|
const bts = BigInt(b?.ts_ms ?? 0);
|
||||||
|
if (ats === bts) return 0;
|
||||||
|
return ats < bts ? -1 : 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (uniqueEvents.length === 0) {
|
if (uniqueEvents.length === 0) {
|
||||||
return { sql: null, values: [], uniqueEvents: [] };
|
return { sql: null, values: [], uniqueEvents: [] };
|
||||||
}
|
}
|
||||||
@@ -855,6 +878,10 @@ class DatabaseManager {
|
|||||||
const { sql, values, uniqueEvents } = this._buildRoomStatusUpsertQuery(events, target);
|
const { sql, values, uniqueEvents } = this._buildRoomStatusUpsertQuery(events, target);
|
||||||
if (!sql || uniqueEvents.length === 0) return { rowCount: 0 };
|
if (!sql || uniqueEvents.length === 0) return { rowCount: 0 };
|
||||||
|
|
||||||
|
const deadlockRetryAttempts = Number(target.deadlockRetryAttempts ?? 0);
|
||||||
|
const deadlockRetryBaseDelayMs = Number(target.deadlockRetryBaseDelayMs ?? 100);
|
||||||
|
|
||||||
|
for (let attempt = 0; attempt <= deadlockRetryAttempts; attempt += 1) {
|
||||||
try {
|
try {
|
||||||
const res = await this.pool.query(sql, values);
|
const res = await this.pool.query(sql, values);
|
||||||
return { rowCount: res.rowCount };
|
return { rowCount: res.rowCount };
|
||||||
@@ -874,11 +901,23 @@ class DatabaseManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const shouldRetryDeadlock = this._isDeadlockError(error) && attempt < deadlockRetryAttempts;
|
||||||
|
if (shouldRetryDeadlock) {
|
||||||
|
const jitter = Math.floor(Math.random() * Math.max(1, deadlockRetryBaseDelayMs));
|
||||||
|
const delay = deadlockRetryBaseDelayMs * (attempt + 1) + jitter;
|
||||||
|
console.warn(`[db] ${target.logPrefix} deadlock detected, retrying in ${delay}ms (attempt ${attempt + 1}/${deadlockRetryAttempts})`);
|
||||||
|
await this._sleep(delay);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
console.warn(`[db] ${target.logPrefix} failed:`, error.message);
|
console.warn(`[db] ${target.logPrefix} failed:`, error.message);
|
||||||
return { error };
|
return { error };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return { error: new Error(`${target.logPrefix} failed after deadlock retries`) };
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// 同步更新 room_status.room_status_moment 表
|
// 同步更新 room_status.room_status_moment 表
|
||||||
@@ -892,6 +931,9 @@ class DatabaseManager {
|
|||||||
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment',
|
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment',
|
||||||
forceOnlineStatusOnWrite: false,
|
forceOnlineStatusOnWrite: false,
|
||||||
forceUpdateOnConflict: false,
|
forceUpdateOnConflict: false,
|
||||||
|
sortAndDedupByConflictKey: false,
|
||||||
|
deadlockRetryAttempts: 0,
|
||||||
|
deadlockRetryBaseDelayMs: 100,
|
||||||
logPrefix: 'upsertRoomStatus',
|
logPrefix: 'upsertRoomStatus',
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -905,6 +947,9 @@ class DatabaseManager {
|
|||||||
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5',
|
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5',
|
||||||
forceOnlineStatusOnWrite: true,
|
forceOnlineStatusOnWrite: true,
|
||||||
forceUpdateOnConflict: true,
|
forceUpdateOnConflict: true,
|
||||||
|
sortAndDedupByConflictKey: true,
|
||||||
|
deadlockRetryAttempts: 3,
|
||||||
|
deadlockRetryBaseDelayMs: 100,
|
||||||
logPrefix: 'upsertRoomStatusG5',
|
logPrefix: 'upsertRoomStatusG5',
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -625,11 +625,73 @@ describe('DatabaseManager: room_status upsert SQL', () => {
|
|||||||
tableRef: 'room_status.room_status_moment_g5',
|
tableRef: 'room_status.room_status_moment_g5',
|
||||||
forceOnlineStatusOnWrite: true,
|
forceOnlineStatusOnWrite: true,
|
||||||
forceUpdateOnConflict: true,
|
forceUpdateOnConflict: true,
|
||||||
|
sortAndDedupByConflictKey: true,
|
||||||
logPrefix: 'upsertRoomStatusG5',
|
logPrefix: 'upsertRoomStatusG5',
|
||||||
});
|
});
|
||||||
assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/);
|
assert.doesNotMatch(built.sql, /room_status\.room_status_moment_g5\.ts_ms <= EXCLUDED\.ts_ms/);
|
||||||
assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/);
|
assert.doesNotMatch(built.sql, /IS DISTINCT FROM EXCLUDED/);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('sorts and dedups g5 room_status by conflict key, keeping max ts_ms', () => {
|
||||||
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
|
||||||
|
const built = dm._buildRoomStatusUpsertQuery([
|
||||||
|
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 1000, device_id: 'dev-old-102' },
|
||||||
|
{ ...buildBasePayload(), hotel_id: 1, room_id: '101', ts_ms: 3000, device_id: 'dev-101' },
|
||||||
|
{ ...buildBasePayload(), hotel_id: 2, room_id: '102', ts_ms: 5000, device_id: 'dev-new-102' },
|
||||||
|
], {
|
||||||
|
tableName: 'room_status.room_status_moment_g5',
|
||||||
|
conflictColumns: ['hotel_id', 'room_id'],
|
||||||
|
includeGuid: false,
|
||||||
|
autoCreatePartitions: false,
|
||||||
|
tableRef: 'room_status.room_status_moment_g5',
|
||||||
|
forceOnlineStatusOnWrite: true,
|
||||||
|
forceUpdateOnConflict: true,
|
||||||
|
sortAndDedupByConflictKey: true,
|
||||||
|
logPrefix: 'upsertRoomStatusG5',
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(built.uniqueEvents.length, 2);
|
||||||
|
assert.equal(built.uniqueEvents[0].hotel_id, 1);
|
||||||
|
assert.equal(built.uniqueEvents[0].room_id, '101');
|
||||||
|
assert.equal(built.uniqueEvents[1].hotel_id, 2);
|
||||||
|
assert.equal(built.uniqueEvents[1].room_id, '102');
|
||||||
|
assert.equal(built.uniqueEvents[1].ts_ms, 5000);
|
||||||
|
assert.equal(built.uniqueEvents[1].device_id, 'dev-new-102');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('retries g5 room_status on deadlock', async () => {
|
||||||
|
const dm = new DatabaseManager({ host: 'x', port: 5432, user: 'x', password: 'x', database: 'x', roomStatusTable: 'room_status.room_status_moment_g5' });
|
||||||
|
let calls = 0;
|
||||||
|
dm.pool = {
|
||||||
|
query: async () => {
|
||||||
|
calls += 1;
|
||||||
|
if (calls < 3) {
|
||||||
|
const err = new Error('deadlock detected');
|
||||||
|
err.code = '40P01';
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
return { rowCount: 1 };
|
||||||
|
},
|
||||||
|
};
|
||||||
|
dm._sleep = async () => {};
|
||||||
|
|
||||||
|
const result = await dm._upsertRoomStatusToTarget([buildBasePayload()], {
|
||||||
|
tableName: 'room_status.room_status_moment_g5',
|
||||||
|
conflictColumns: ['hotel_id', 'room_id'],
|
||||||
|
includeGuid: false,
|
||||||
|
autoCreatePartitions: false,
|
||||||
|
tableRef: 'room_status.room_status_moment_g5',
|
||||||
|
forceOnlineStatusOnWrite: true,
|
||||||
|
forceUpdateOnConflict: true,
|
||||||
|
sortAndDedupByConflictKey: true,
|
||||||
|
deadlockRetryAttempts: 3,
|
||||||
|
deadlockRetryBaseDelayMs: 1,
|
||||||
|
logPrefix: 'upsertRoomStatusG5',
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.equal(result.rowCount, 1);
|
||||||
|
assert.equal(calls, 3);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('DatabaseManager: insertHeartbeatEventsDual', () => {
|
describe('DatabaseManager: insertHeartbeatEventsDual', () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user