feat: 处理整数溢出并持久化无法处理的数据

- 更新 heartbeatProcessor.js 以处理所有数字溢出类型(int16、int32、int64)并使用二进制补码。
- 防止仅与数据相关的 PostgreSQL 失败抛出个别回退错误。
- 在 databaseManager.js 中添加 insertHeartbeatEventsErrors 方法以存储被拒绝的记录。
- 更新 heartbeatProcessor.js 中的 _emitRejectedRecord 方法,直接将所有无法处理的心跳数据写入 heartbeat_events_errors 数据库。
- 更新 openspec 规范以支持新的溢出和验证回退状态。
- 添加测试文件以验证大整数处理。
This commit is contained in:
2026-03-02 10:49:02 +08:00
parent 58c3880732
commit d0c4940e01
7 changed files with 281 additions and 103 deletions

View File

@@ -22,18 +22,18 @@ class DatabaseManager {
idleTimeoutMillis: this.config.idleTimeoutMillis,
connectionTimeoutMillis: 5000, // 5秒连接超时防止断网时无限等待
});
// 监听连接池错误,防止后端断开导致进程崩溃
this.pool.on('error', (err, client) => {
console.error('[db] 发生未捕获的连接池错误:', err);
// 不抛出,让应用层通过心跳检测发现问题
});
// 测试连接
const client = await this.pool.connect();
client.release();
console.log('数据库连接池创建成功');
// 初始化表结构
await this.initTables();
@@ -570,15 +570,55 @@ class DatabaseManager {
await this.pool.query(singleSql, toRowValues(event));
insertedCount += 1;
} catch (error) {
const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007'];
if (connCodes.includes(error?.code) || (error?.message && /ECONNREFUSED|ETIMEDOUT|connection/i.test(error.message))) {
throw error;
}
failedRecords.push({ error, record: event });
}
}
if (insertedCount === 0 && failedRecords.length === events.length) {
throw lastError;
// 只有在确定是纯网络断开时(所有单独重试都触发同一级别的连接错误),才向外抛出网络错误以重试批次。
const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007'];
const isConnError = connCodes.includes(lastError?.code) ||
(lastError?.message && /ECONNREFUSED|ETIMEDOUT|connection/i.test(lastError.message));
if (isConnError) {
throw lastError;
}
}
return { insertedCount, failedRecords, batchError: lastError };
return { insertedCount, failedRecords, batchError: (insertedCount === 0 && failedRecords.length === events.length) ? lastError : null };
}
async insertHeartbeatEventsErrors(errorsList) {
if (!errorsList || errorsList.length === 0) return;
try {
const placeholders = [];
const values = [];
let i = 1;
for (const err of errorsList) {
let hotel_id = Number(err.hotel_id);
if (!Number.isFinite(hotel_id)) hotel_id = null;
let room_id = Number(err.room_id);
if (!Number.isFinite(room_id)) room_id = null;
let original_data = null;
try { original_data = JSON.stringify(err.original_data); } catch { /* ignore */ }
const error_code = String(err.error_code || '');
const error_message = String(err.error_message || '');
placeholders.push(`($${i}, $${i + 1}, $${i + 2}, $${i + 3}, $${i + 4})`);
values.push(hotel_id, room_id, original_data, error_code, error_message);
i += 5;
}
const sql = `INSERT INTO heartbeat.heartbeat_events_errors (hotel_id, room_id, original_data, error_code, error_message) VALUES ${placeholders.join(', ')}`;
await this.pool.query(sql, values);
} catch (e) {
console.warn('Failed to insert into heartbeat_events_errors:', e);
}
}
async insertHeartbeatData(data) {
@@ -586,11 +626,11 @@ class DatabaseManager {
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0) {
return;
}
// 构建批量插入语句
const values = data.map(item => [
item.component_id,
@@ -598,7 +638,7 @@ class DatabaseManager {
item.timestamp,
item.data
]);
const query = {
text: `
INSERT INTO heartbeat (component_id, status, timestamp, data)
@@ -606,7 +646,7 @@ class DatabaseManager {
`,
values: values.flat()
};
const res = await this.pool.query(query);
console.log(`成功插入 ${data.length} 条心跳数据`);
return { insertedCount: Number(res?.rowCount ?? data.length) };
@@ -706,11 +746,11 @@ class DatabaseManager {
// 使用 IS DISTINCT FROM 避免无意义更新
const updateColumns = columns.filter(c => !['hotel_id', 'room_id', 'device_id'].includes(c));
const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', ');
// 构建 WHERE 子句:仅当至少一个字段发生变化,且时间戳未回退时才更新
// 注意room_status.room_status_moment.ts_ms 是 bigintEXCLUDED.ts_ms 也是 bigint
const whereConditions = updateColumns.map(col => `room_status.room_status_moment.${col} IS DISTINCT FROM EXCLUDED.${col}`).join(' OR ');
// 生成批量插入 SQL
// 注意ON CONFLICT (hotel_id, room_id, device_id) 依赖于唯一索引 idx_room_status_unique_device
const values = [];
@@ -753,7 +793,7 @@ class DatabaseManager {
}
}
}
// 不抛出错误只记录日志避免影响主流程Heartbeat History 写入已成功)
console.warn('[db] upsertRoomStatus failed:', error.message);
return { error };
@@ -804,7 +844,7 @@ class DatabaseManager {
`,
values: [componentId]
};
const result = await this.pool.query(query);
return result.rows[0] || null;
} catch (error) {
@@ -824,7 +864,7 @@ class DatabaseManager {
`,
values: [componentId, startTime, endTime]
};
const result = await this.pool.query(query);
return result.rows;
} catch (error) {

View File

@@ -202,11 +202,16 @@ class HeartbeatProcessor {
this._batchInFlight = true;
let hasMore = false;
let batchData = null;
let batchEventCount = 0;
let batchMessageCount = 0;
let batchMessages = [];
try {
const { batchEventCount, batchMessageCount } = this.computeNextBatchWindow();
const window = this.computeNextBatchWindow();
batchEventCount = window.batchEventCount;
batchMessageCount = window.batchMessageCount;
batchData = this.batchQueue.slice(0, batchEventCount);
const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount);
batchMessages = this.batchMessageQueue.slice(0, batchMessageCount);
let insertedCount = 0;
let failedRecords = [];
@@ -219,10 +224,10 @@ class HeartbeatProcessor {
// 只有当历史表写入成功insertedCount > 0才尝试同步
// 过滤掉写入失败的记录(如果有)
if (insertedCount > 0) {
const successData = failedRecords.length > 0
const successData = failedRecords.length > 0
? batchData.filter(d => !failedRecords.some(f => f.record === d))
: batchData;
if (successData.length > 0) {
this.databaseManager.upsertRoomStatus(successData).catch(err => {
console.warn('异步同步 room_status 失败 (忽略):', err);
@@ -243,13 +248,12 @@ class HeartbeatProcessor {
}
if (failedRecords.length > 0) {
for (const item of failedRecords) {
this._emitRejectedRecord({
errorId: 'db_write_failed',
error: item?.error,
rawData: item?.record,
});
}
const rejects = failedRecords.map(item => ({
errorId: 'db_write_failed',
error: item?.error,
rawData: item?.record,
}));
this._emitRejectedRecords(rejects);
this.stats?.incDbWriteFailed?.(failedRecords.length);
}
this.stats?.incDbWritten?.(insertedCount);
@@ -353,28 +357,60 @@ class HeartbeatProcessor {
}
_emitRejectedRecord({ errorId, error, rawData }) {
if (!this.redis?.isEnabled?.()) return;
const ts = formatTimestamp(new Date());
const errMsg = error ? String(error?.stack ?? error?.message ?? error) : undefined;
const payload = this._safeStringify({
errorId,
error: errMsg,
rawData,
});
const base = `[ERROR] ${ts} ${errorId}: `;
const maxChunkChars = 50_000;
if (payload.length <= maxChunkChars) {
this.redis.pushConsoleLog?.({ level: 'warn', message: `${base}${payload}`, metadata: { module: 'processor' } });
return;
}
const parts = Math.ceil(payload.length / maxChunkChars);
for (let i = 0; i < parts; i += 1) {
const chunk = payload.slice(i * maxChunkChars, (i + 1) * maxChunkChars);
this.redis.pushConsoleLog?.({
level: 'warn',
message: `${base}(part ${i + 1}/${parts}) ${chunk}`,
metadata: { module: 'processor' },
this._emitRejectedRecords([{ errorId, error, rawData }]);
}
_emitRejectedRecords(records) {
if (!records || records.length === 0) return;
if (typeof this.databaseManager?.insertHeartbeatEventsErrors === 'function') {
const dbPayload = records.map(r => {
let hotel_id = null;
let room_id = null;
const effective = r.rawData?.effective || r.rawData?.record || r.rawData;
if (effective && typeof effective === 'object') {
hotel_id = effective.hotel_id ?? effective.hotelId;
room_id = effective.room_id ?? effective.roomId;
if (typeof room_id === 'string' && /^-?\d+$/.test(room_id)) room_id = Number(room_id);
}
return {
hotel_id,
room_id,
original_data: r.rawData,
error_code: String(r.errorId),
error_message: r.error ? String(r.error?.message ?? r.error) : 'Rejected record'
};
});
this.databaseManager.insertHeartbeatEventsErrors(dbPayload).catch(() => { });
}
if (!this.redis?.isEnabled?.()) return;
// Batch log to redis (at most first 10 to avoid noise)
const logs = records.slice(0, 10);
for (const { errorId, error, rawData } of logs) {
const ts = formatTimestamp(new Date());
const errMsg = error ? String(error?.stack ?? error?.message ?? error) : undefined;
const payload = this._safeStringify({
errorId,
error: errMsg,
rawData,
});
const base = `[ERROR] ${ts} ${errorId}: `;
const maxChunkChars = 50_000;
if (payload.length <= maxChunkChars) {
this.redis.pushConsoleLog?.({ level: 'warn', message: `${base}${payload}`, metadata: { module: 'processor' } });
continue;
}
const parts = Math.ceil(payload.length / maxChunkChars);
for (let i = 0; i < parts; i += 1) {
const chunk = payload.slice(i * maxChunkChars, (i + 1) * maxChunkChars);
this.redis.pushConsoleLog?.({
level: 'warn',
message: `${base}(part ${i + 1}/${parts}) ${chunk}`,
metadata: { module: 'processor' },
});
}
}
}
@@ -607,50 +643,68 @@ class HeartbeatProcessor {
return s.length === 0 ? undefined : s;
};
const toIntOrUndefined = (v) => {
const toInt32OrUndefined = (v) => {
if (v === undefined || v === null) return v;
if (typeof v === 'number') {
if (!Number.isFinite(v)) return undefined;
return Math.trunc(v);
let val = null;
if (typeof v === 'number' && Number.isFinite(v)) val = Math.trunc(v);
else {
const s = String(v).trim();
if (s.length === 0 || !/^-?\d+$/.test(s)) return undefined;
val = Number(s);
}
const s = String(v).trim();
if (s.length === 0) return undefined;
if (!/^-?\d+$/.test(s)) return undefined;
const n = Number(s);
if (!Number.isFinite(n)) return undefined;
return Math.trunc(n);
if (val === null || !Number.isFinite(val)) return undefined;
return val | 0; // 强制补码映射到 32位有符号并去小数
};
const toInt16OrUndefined = (v) => {
const val = toInt32OrUndefined(v);
if (val === undefined) return undefined;
return (val << 16) >> 16; // 强制补码映射到 16位有符号
};
const toBigintParamOrUndefined = (v) => {
if (v === undefined || v === null) return v;
let s;
if (typeof v === 'number') {
if (!Number.isFinite(v)) return undefined;
const n = Math.trunc(v);
return Number.isSafeInteger(n) ? n : String(n);
// 如果是安全整数范围内,原样返回(作为 number 进库没问题)
if (Number.isSafeInteger(n)) return n;
// 如果超出了上限(比如 uint64 最大值附近的数因精度变为 float转成字符串走下面转换
s = BigInt(n).toString();
} else {
s = String(v).trim();
if (s.length === 0) return undefined;
if (!/^-?\d+$/.test(s)) return undefined;
}
try {
// 把可能超过有符号 64 位整数上限的无符号数(高位置 1 时),
// 转换为对应的有符号负数,以完美适应 PostgreSQL bigint 并保留完整的 bit 掩码。
return BigInt.asIntN(64, BigInt(s)).toString();
} catch (err) {
return s;
}
const s = String(v).trim();
if (s.length === 0) return undefined;
if (!/^-?\d+$/.test(s)) return undefined;
return s;
};
normalized.ts_ms = toBigintParamOrUndefined(normalized.ts_ms);
normalized.hotel_id = toIntOrUndefined(normalized.hotel_id);
normalized.hotel_id = toInt16OrUndefined(normalized.hotel_id);
normalized.room_id = toTrimmedStringOrUndefined(normalized.room_id);
normalized.device_id = toTrimmedStringOrUndefined(normalized.device_id);
normalized.ip = toTrimmedStringOrUndefined(normalized.ip);
normalized.power_state = toIntOrUndefined(normalized.power_state);
normalized.guest_type = toIntOrUndefined(normalized.guest_type);
normalized.cardless_state = toIntOrUndefined(normalized.cardless_state);
normalized.power_state = toInt16OrUndefined(normalized.power_state);
normalized.guest_type = toInt16OrUndefined(normalized.guest_type);
normalized.cardless_state = toInt16OrUndefined(normalized.cardless_state);
normalized.service_mask = toBigintParamOrUndefined(normalized.service_mask);
normalized.pms_state = toIntOrUndefined(normalized.pms_state);
normalized.carbon_state = toIntOrUndefined(normalized.carbon_state);
normalized.device_count = toIntOrUndefined(normalized.device_count);
normalized.comm_seq = toIntOrUndefined(normalized.comm_seq);
normalized.insert_card = toIntOrUndefined(normalized.insert_card);
const bg = toIntOrUndefined(normalized.bright_g);
normalized.pms_state = toInt16OrUndefined(normalized.pms_state);
normalized.carbon_state = toInt16OrUndefined(normalized.carbon_state);
normalized.device_count = toInt16OrUndefined(normalized.device_count);
normalized.comm_seq = toInt32OrUndefined(normalized.comm_seq); // int4
normalized.insert_card = toInt16OrUndefined(normalized.insert_card);
const bg = toInt16OrUndefined(normalized.bright_g);
normalized.bright_g = bg === -1 ? undefined : bg;
normalized.version = toIntOrUndefined(normalized.version);
normalized.version = toInt32OrUndefined(normalized.version);
// 其余未知字段塞进 extra避免丢信息但不覆盖显式 extra
if (!normalized.extra || typeof normalized.extra !== 'object') {
@@ -660,25 +714,25 @@ class HeartbeatProcessor {
for (const [k, v] of Object.entries(obj)) {
if (
[
'ts_ms','tsMs','TsMs','timestampMs','TimestampMs','timestamp','Timestamp','ts','Ts',
'hotel_id','hotelId','HotelId',
'room_id','roomId','RoomId',
'device_id','deviceId','DeviceId','device','Device',
'ip','Ip','IP',
'power_state','powerState','PowerState',
'guest_type','guestType','GuestType',
'cardless_state','cardlessState','CardlessState',
'service_mask','serviceMask','ServiceMask',
'pms_state','pmsState','PmsState',
'carbon_state','carbonState','CarbonState',
'device_count','deviceCount','DeviceCount',
'comm_seq','commSeq','CommSeq',
'insert_card','insertCard','InsertCard',
'bright_g','brightG','BrightG',
'version','Version','ver','Ver',
'extra','Extra',
'electricity','Electricity',
'air_conditioner','airConditioner','AirConditioner'
'ts_ms', 'tsMs', 'TsMs', 'timestampMs', 'TimestampMs', 'timestamp', 'Timestamp', 'ts', 'Ts',
'hotel_id', 'hotelId', 'HotelId',
'room_id', 'roomId', 'RoomId',
'device_id', 'deviceId', 'DeviceId', 'device', 'Device',
'ip', 'Ip', 'IP',
'power_state', 'powerState', 'PowerState',
'guest_type', 'guestType', 'GuestType',
'cardless_state', 'cardlessState', 'CardlessState',
'service_mask', 'serviceMask', 'ServiceMask',
'pms_state', 'pmsState', 'PmsState',
'carbon_state', 'carbonState', 'CarbonState',
'device_count', 'deviceCount', 'DeviceCount',
'comm_seq', 'commSeq', 'CommSeq',
'insert_card', 'insertCard', 'InsertCard',
'bright_g', 'brightG', 'BrightG',
'version', 'Version', 'ver', 'Ver',
'extra', 'Extra',
'electricity', 'Electricity',
'air_conditioner', 'airConditioner', 'AirConditioner'
].includes(k)
) {
continue;
@@ -703,13 +757,20 @@ class HeartbeatProcessor {
const num = Number(s);
return Number.isFinite(num) ? num : null;
};
const toIntOrNull = (v) => {
const toInt16OrNull = (v) => {
if (v === undefined || v === null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
const s = String(v).trim();
if (!/^-?\d+$/.test(s)) return null;
const num = Number(s);
return Number.isFinite(num) ? Math.trunc(num) : null;
let val = null;
if (typeof v === 'number') {
if (Number.isFinite(v)) val = Math.trunc(v);
} else {
const s = String(v).trim();
if (/^-?\d+$/.test(s)) {
const num = Number(s);
if (Number.isFinite(num)) val = Math.trunc(num);
}
}
if (val === null) return null;
return (val << 16) >> 16;
};
const out = {};
const elec = Array.isArray(n.electricity) ? n.electricity : null;
@@ -725,12 +786,12 @@ class HeartbeatProcessor {
const ac = Array.isArray(n.air_conditioner) ? n.air_conditioner : null;
if (ac && ac.length) {
out.air_address = ac.map((x) => toStrOrNull(x?.address));
out.state = ac.map((x) => toIntOrNull(x?.state));
out.model = ac.map((x) => toIntOrNull(x?.model));
out.speed = ac.map((x) => toIntOrNull(x?.speed));
out.set_temp = ac.map((x) => toIntOrNull(x?.set_temp));
out.now_temp = ac.map((x) => toIntOrNull(x?.now_temp));
out.solenoid_valve = ac.map((x) => toIntOrNull(x?.solenoid_valve));
out.state = ac.map((x) => toInt16OrNull(x?.state));
out.model = ac.map((x) => toInt16OrNull(x?.model));
out.speed = ac.map((x) => toInt16OrNull(x?.speed));
out.set_temp = ac.map((x) => toInt16OrNull(x?.set_temp));
out.now_temp = ac.map((x) => toInt16OrNull(x?.now_temp));
out.solenoid_valve = ac.map((x) => toInt16OrNull(x?.solenoid_valve));
}
return out;
}