feat: 添加 G5 独立写入功能

- 新增 G5 数据库连接配置与可关闭的写入开关
- 在现有 legacy/G4 写入成功路径后,追加独立的 G5 写入流程
- G5 使用与 G4 相同的数据结构映射,但不写入 guid,由数据库自生成 int4 guid
- room_status 新增 G5 独立 upsert 写入路径,并保留旧表与 G5 表的独立开关
- 新增 G5 写入统计与启动摘要输出
- 更新 StatsCounters 和 StatsReporter 以支持 G5 统计
- 增加测试覆盖,确保 G5 写入逻辑与 room_status 的独立执行
- 新增 G5 相关数据库表结构 SQL 文件
This commit is contained in:
2026-03-10 16:29:24 +08:00
parent fe76884b27
commit 2f8857f98e
14 changed files with 924 additions and 130 deletions

View File

@@ -54,9 +54,28 @@ export default {
retryDelay: 1000, // 重试延迟
legacyHeartbeatEnabled: true, // 旧明细表写入开关
g4HotHeartbeatEnabled: false, // 新明细表(g4_hot)写入开关
g5HeartbeatEnabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true', // 临时 G5 库写入开关
roomStatusEnabled: true, // room_status 写入开关
legacyTable: 'heartbeat.heartbeat_events',
g4HotTable: 'heartbeat.heartbeat_events_g4_hot',
roomStatusTable: env.DB_ROOM_STATUS_TABLE ?? 'room_status.room_status_moment',
},
g5db: {
enabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true',
host: env.POSTGRES_HOST_G5 ?? '10.8.8.80',
port: Number(env.POSTGRES_PORT_G5 ?? 5434),
user: env.POSTGRES_USER_G5 ?? 'log_admin',
password: env.POSTGRES_PASSWORD_G5 ?? 'YourActualStrongPasswordForG5!',
database: env.POSTGRES_DATABASE_G5 ?? 'log_platform',
maxConnections: Number(env.POSTGRES_MAX_CONNECTIONS_G5 ?? 1),
idleTimeoutMillis: Number(env.POSTGRES_IDLE_TIMEOUT_MS_G5 ?? 30000),
retryAttempts: Number(env.DB_G5_RETRY_ATTEMPTS ?? 3),
retryDelay: Number(env.DB_G5_RETRY_DELAY ?? 1000),
g5HeartbeatEnabled: (env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true',
g5Table: env.DB_G5_TABLE ?? 'heartbeat.heartbeat_events_g5',
roomStatusEnabled: (env.DB_G5_ROOM_STATUS_ENABLED ?? env.DB_G5_HEARTBEAT_ENABLED ?? 'false') === 'true',
roomStatusTable: env.DB_G5_ROOM_STATUS_TABLE ?? 'room_status.room_status_moment_g5',
},
// 日志配置

View File

@@ -184,6 +184,14 @@ class DatabaseManager {
// ---- 新表 G4 Hot 列定义 ----
_getG4HotColumns() {
return this._getExpandedHotColumns({ includeGuid: true });
}
_getG5Columns() {
return this._getExpandedHotColumns({ includeGuid: false });
}
_getExpandedHotColumns({ includeGuid }) {
const base = [
'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip',
'power_state', 'guest_type', 'cardless_state', 'service_mask',
@@ -213,8 +221,9 @@ class DatabaseManager {
'sum_energy_1', 'sum_energy_2', 'sum_energy_residual',
];
const power = ['power_carbon_on', 'power_carbon_off', 'power_person_exist', 'power_person_left'];
const tail = ['guid'];
return [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, ...tail];
return includeGuid
? [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, 'guid']
: [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power];
}
_unpackArrElement(arr, idx) {
@@ -228,6 +237,14 @@ class DatabaseManager {
}
_g4HotToRowValues(e) {
return this._expandedHotToRowValues(e, { includeGuid: true, nullifyArrayColumns: false, nullifyG5BaseColumns: false });
}
_g5ToRowValues(e) {
return this._expandedHotToRowValues(e, { includeGuid: false, nullifyArrayColumns: true, nullifyG5BaseColumns: true });
}
_expandedHotToRowValues(e, { includeGuid, nullifyArrayColumns, nullifyG5BaseColumns }) {
const values = [
e.ts_ms,
e.write_ts_ms ?? Date.now(),
@@ -238,7 +255,7 @@ class DatabaseManager {
e.power_state,
e.guest_type,
e.cardless_state,
e.service_mask,
nullifyG5BaseColumns ? null : e.service_mask,
e.pms_state,
e.carbon_state,
e.device_count,
@@ -246,21 +263,21 @@ class DatabaseManager {
e.insert_card ?? null,
(e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null),
e.version ?? null,
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.voltage) ? e.voltage : null,
Array.isArray(e.ampere) ? e.ampere : null,
Array.isArray(e.power) ? e.power : null,
Array.isArray(e.phase) ? e.phase : null,
Array.isArray(e.energy) ? e.energy : null,
Array.isArray(e.sum_energy) ? e.sum_energy : null,
Array.isArray(e.state) ? e.state : null,
Array.isArray(e.model) ? e.model : null,
Array.isArray(e.speed) ? e.speed : null,
Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null,
nullifyArrayColumns ? null : (Array.isArray(e.elec_address) ? e.elec_address : null),
nullifyArrayColumns ? null : (Array.isArray(e.air_address) ? e.air_address : null),
nullifyArrayColumns ? null : (Array.isArray(e.voltage) ? e.voltage : null),
nullifyArrayColumns ? null : (Array.isArray(e.ampere) ? e.ampere : null),
nullifyArrayColumns ? null : (Array.isArray(e.power) ? e.power : null),
nullifyArrayColumns ? null : (Array.isArray(e.phase) ? e.phase : null),
nullifyArrayColumns ? null : (Array.isArray(e.energy) ? e.energy : null),
nullifyArrayColumns ? null : (Array.isArray(e.sum_energy) ? e.sum_energy : null),
nullifyArrayColumns ? null : (Array.isArray(e.state) ? e.state : null),
nullifyArrayColumns ? null : (Array.isArray(e.model) ? e.model : null),
nullifyArrayColumns ? null : (Array.isArray(e.speed) ? e.speed : null),
nullifyArrayColumns ? null : (Array.isArray(e.set_temp) ? e.set_temp : null),
nullifyArrayColumns ? null : (Array.isArray(e.now_temp) ? e.now_temp : null),
nullifyArrayColumns ? null : (Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null),
nullifyG5BaseColumns ? null : (e.extra ?? null),
];
// svc_01 .. svc_64 布尔展开
@@ -305,7 +322,9 @@ class DatabaseManager {
values.push(null);
values.push(null);
values.push(this._normalizeGuid(e.guid));
if (includeGuid) {
values.push(this._normalizeGuid(e.guid));
}
return values;
}
@@ -456,6 +475,24 @@ class DatabaseManager {
};
}
async insertHeartbeatEventsG5(events) {
if (!Array.isArray(events)) events = [events];
if (events.length === 0) {
return { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null };
}
const result = await this._insertEventsToTarget(events, {
tableName: this.config.g5Table ?? 'heartbeat.heartbeat_events_g5',
columns: this._getG5Columns(),
toRowValues: (e) => this._g5ToRowValues(e),
ensurePartitions: false,
logPrefix: '[g5]',
missingPartitionTable: null,
});
return { ...result, enabled: true };
}
// v2 明细表写入(向后兼容封装,仅旧表,抛出连接错误)
async insertHeartbeatEvents(events) {
if (!Array.isArray(events)) events = [events];
@@ -683,34 +720,8 @@ class DatabaseManager {
}
}
// 同步更新 room_status.room_status_moment 表
// 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert
async upsertRoomStatus(events) {
if (!Array.isArray(events)) {
events = [events];
}
if (events.length === 0) return { insertedCount: 0, updatedCount: 0 };
// 批次内去重:按 (hotel_id, room_id, device_id) 分组,只保留 ts_ms 最大的一条
// 原因PostgreSQL ON CONFLICT 不允许同一语句中多次更新同一行
const uniqueEventsMap = new Map();
for (const e of events) {
if (!e.hotel_id || !e.room_id || !e.device_id) continue;
const key = `${e.hotel_id}_${e.room_id}_${e.device_id}`;
const existing = uniqueEventsMap.get(key);
// 如果没有记录,或者当前记录时间更新,则覆盖
if (!existing || (BigInt(e.ts_ms || 0) > BigInt(existing.ts_ms || 0))) {
uniqueEventsMap.set(key, e);
}
}
const uniqueEvents = Array.from(uniqueEventsMap.values());
if (uniqueEvents.length === 0) return { insertedCount: 0, updatedCount: 0 };
// 字段映射:心跳字段 -> room_status 字段
// 注意:只更新心跳包里有的字段
const columns = [
_getRoomStatusBaseColumns() {
return [
'ts_ms',
'hotel_id',
'room_id',
@@ -723,7 +734,7 @@ class DatabaseManager {
'insert_card',
'carbon_state',
'bright_g',
'agreement_ver', // map from version
'agreement_ver',
'air_address',
'air_state',
'air_model',
@@ -739,77 +750,103 @@ class DatabaseManager {
'elec_energy',
'elec_sum_energy',
];
}
const toRowValues = (e) => [
e.ts_ms,
e.hotel_id,
e.room_id,
e.device_id,
e.ip,
e.pms_state, // pms_status
e.power_state,
e.cardless_state,
e.service_mask,
e.insert_card ?? null,
e.carbon_state,
e.bright_g === -1 ? null : (e.bright_g ?? null),
e.version ?? null, // agreement_ver
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.state) ? e.state : null, // air_state
Array.isArray(e.model) ? e.model : null, // air_model
Array.isArray(e.speed) ? e.speed : null, // air_speed
Array.isArray(e.set_temp) ? e.set_temp : null, // air_set_temp
Array.isArray(e.now_temp) ? e.now_temp : null, // air_now_temp
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, // air_solenoid_valve
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.voltage) ? e.voltage : null, // elec_voltage
Array.isArray(e.ampere) ? e.ampere : null, // elec_ampere
Array.isArray(e.power) ? e.power : null, // elec_power
Array.isArray(e.phase) ? e.phase : null, // elec_phase
Array.isArray(e.energy) ? e.energy : null, // elec_energy
Array.isArray(e.sum_energy) ? e.sum_energy : null, // elec_sum_energy
_roomStatusToRowValues(event) {
return [
event.ts_ms,
event.hotel_id,
event.room_id,
event.device_id,
event.ip,
event.pms_state,
event.power_state,
event.cardless_state,
event.service_mask,
event.insert_card ?? null,
event.carbon_state,
event.bright_g === -1 ? null : (event.bright_g ?? null),
event.version === null || event.version === undefined ? null : String(event.version),
Array.isArray(event.air_address) ? event.air_address : null,
Array.isArray(event.state) ? event.state : null,
Array.isArray(event.model) ? event.model : null,
Array.isArray(event.speed) ? event.speed : null,
Array.isArray(event.set_temp) ? event.set_temp : null,
Array.isArray(event.now_temp) ? event.now_temp : null,
Array.isArray(event.solenoid_valve) ? event.solenoid_valve : null,
Array.isArray(event.elec_address) ? event.elec_address : null,
Array.isArray(event.voltage) ? event.voltage : null,
Array.isArray(event.ampere) ? event.ampere : null,
Array.isArray(event.power) ? event.power : null,
Array.isArray(event.phase) ? event.phase : null,
Array.isArray(event.energy) ? event.energy : null,
Array.isArray(event.sum_energy) ? event.sum_energy : null,
];
}
// 构建 UPDATE SET 子句(排除主键和 guid
// 使用 EXCLUDED.col 引用新值
// 使用 IS DISTINCT FROM 避免无意义更新
const updateColumns = columns.filter(c => !['hotel_id', 'room_id', 'device_id'].includes(c));
const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', ');
_buildRoomStatusUpsertQuery(events, target) {
const { tableName, conflictColumns, includeGuid, tableRef } = target;
const columns = this._getRoomStatusBaseColumns();
const uniqueEventsMap = new Map();
// 构建 WHERE 子句:仅当至少一个字段发生变化,且时间戳未回退时才更新
// 注意room_status.room_status_moment.ts_ms 是 bigintEXCLUDED.ts_ms 也是 bigint
const whereConditions = updateColumns.map(col => `room_status.room_status_moment.${col} IS DISTINCT FROM EXCLUDED.${col}`).join(' OR ');
for (const event of events) {
const keyValues = conflictColumns.map((column) => event?.[column]);
if (keyValues.some((value) => value === undefined || value === null || value === '')) continue;
const key = keyValues.join('_');
const existing = uniqueEventsMap.get(key);
if (!existing || BigInt(event.ts_ms || 0) > BigInt(existing.ts_ms || 0)) {
uniqueEventsMap.set(key, event);
}
}
const uniqueEvents = Array.from(uniqueEventsMap.values());
if (uniqueEvents.length === 0) {
return { sql: null, values: [], uniqueEvents: [] };
}
const allColumns = includeGuid ? [...columns, 'guid'] : [...columns];
const updateColumns = columns.filter((column) => !['ts_ms', ...conflictColumns].includes(column));
const updateSet = updateColumns.map((column) => `${column} = EXCLUDED.${column}`).join(', ');
const whereConditions = updateColumns.map((column) => `${tableRef}.${column} IS DISTINCT FROM EXCLUDED.${column}`).join(' OR ');
// 生成批量插入 SQL
// 注意ON CONFLICT (hotel_id, room_id, device_id) 依赖于唯一索引 idx_room_status_unique_device
const values = [];
const placeholders = uniqueEvents.map((e, idx) => {
const rowVals = toRowValues(e);
values.push(...rowVals);
// 额外插入 gen_random_uuid() 作为 guid
const p = rowVals.map((_, i) => `$${idx * rowVals.length + i + 1}`).join(', ');
return `(${p}, gen_random_uuid())`;
const placeholders = uniqueEvents.map((event, eventIndex) => {
const rowValues = this._roomStatusToRowValues(event);
values.push(...rowValues);
const start = eventIndex * rowValues.length;
const params = rowValues.map((_, valueIndex) => `$${start + valueIndex + 1}`).join(', ');
return includeGuid ? `(${params}, gen_random_uuid())` : `(${params})`;
}).join(', ');
const allCols = [...columns, 'guid'].join(', ');
const sql = `
INSERT INTO room_status.room_status_moment (${allCols})
INSERT INTO ${tableName} (${allColumns.join(', ')})
VALUES ${placeholders}
ON CONFLICT (hotel_id, room_id, device_id)
ON CONFLICT (${conflictColumns.join(', ')})
DO UPDATE SET
${updateSet}
WHERE
room_status.room_status_moment.ts_ms <= EXCLUDED.ts_ms
WHERE
${tableRef}.ts_ms <= EXCLUDED.ts_ms
AND (${whereConditions})
`;
return { sql, values, uniqueEvents };
}
async _upsertRoomStatusToTarget(events, target) {
if (!Array.isArray(events)) {
events = [events];
}
if (events.length === 0) return { rowCount: 0 };
const { sql, values, uniqueEvents } = this._buildRoomStatusUpsertQuery(events, target);
if (!sql || uniqueEvents.length === 0) return { rowCount: 0 };
try {
const res = await this.pool.query(sql, values);
return { rowCount: res.rowCount }; // 包括插入和更新的行数
return { rowCount: res.rowCount };
} catch (error) {
if (this.isRoomStatusMissingPartitionError(error)) {
const hotelIds = [...new Set(uniqueEvents.map(e => e.hotel_id).filter(id => id != null))];
if (target.autoCreatePartitions && this.isRoomStatusMissingPartitionError(error)) {
const hotelIds = [...new Set(uniqueEvents.map((event) => event.hotel_id).filter((id) => id != null))];
if (hotelIds.length > 0) {
console.log(`[db] 检测到 room_status 分区缺失尝试自动创建分区hotelIds: ${hotelIds.join(', ')}`);
await this.ensureRoomStatusPartitions(hotelIds);
@@ -817,18 +854,43 @@ class DatabaseManager {
const res = await this.pool.query(sql, values);
return { rowCount: res.rowCount };
} catch (retryError) {
console.warn('[db] upsertRoomStatus retry failed:', retryError.message);
console.warn(`[db] ${target.logPrefix} retry failed:`, retryError.message);
return { error: retryError };
}
}
}
// 不抛出错误只记录日志避免影响主流程Heartbeat History 写入已成功)
console.warn('[db] upsertRoomStatus failed:', error.message);
console.warn(`[db] ${target.logPrefix} failed:`, error.message);
return { error };
}
}
// 同步更新 room_status.room_status_moment 表
// 使用 INSERT ... ON CONFLICT ... DO UPDATE 实现 upsert
async upsertRoomStatus(events) {
return this._upsertRoomStatusToTarget(events, {
tableName: this.config.roomStatusTable ?? 'room_status.room_status_moment',
conflictColumns: ['hotel_id', 'room_id', 'device_id'],
includeGuid: true,
autoCreatePartitions: true,
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment',
logPrefix: 'upsertRoomStatus',
});
}
async upsertRoomStatusG5(events) {
return this._upsertRoomStatusToTarget(events, {
tableName: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5',
conflictColumns: ['hotel_id', 'room_id'],
includeGuid: false,
autoCreatePartitions: false,
tableRef: this.config.roomStatusTable ?? 'room_status.room_status_moment_g5',
logPrefix: 'upsertRoomStatusG5',
});
}
isRoomStatusMissingPartitionError(error) {
const msg = String(error?.message ?? '');
// 错误码 23514 (check_violation) 通常在插入分区表且无对应分区时触发

View File

@@ -12,6 +12,7 @@ class WebBLSHeartbeatServer {
this.kafkaConsumer = null;
this.heartbeatProcessor = null;
this.databaseManager = null;
this.g5DatabaseManager = null;
this.redis = null;
this.consumers = null;
this.stats = new StatsCounters();
@@ -33,12 +34,31 @@ class WebBLSHeartbeatServer {
console.log('数据库连接成功');
await this.redis?.info('数据库连接成功', { module: 'db' });
if (this.config.g5db?.enabled) {
try {
this.g5DatabaseManager = new DatabaseManager({ ...this.config.g5db, maxConnections: 1 });
await this.g5DatabaseManager.connect();
console.log('G5数据库连接成功');
await this.redis?.info('G5数据库连接成功', { module: 'db', table: this.config.g5db?.g5Table });
} catch (error) {
this.g5DatabaseManager = null;
console.warn('G5数据库连接失败已跳过 G5 写入:', error);
await this.redis?.warn('G5数据库连接失败已跳过 G5 写入', {
module: 'db',
table: this.config.g5db?.g5Table,
error: String(error?.message ?? error),
});
}
}
// 打印双写配置摘要
const dbCfg = this.config.db;
const dualWriteSummary = {
legacyHeartbeat: dbCfg.legacyHeartbeatEnabled ? `ON → ${dbCfg.legacyTable}` : 'OFF',
g4HotHeartbeat: dbCfg.g4HotHeartbeatEnabled ? `ON → ${dbCfg.g4HotTable}` : 'OFF',
roomStatus: dbCfg.roomStatusEnabled !== false ? 'ON' : 'OFF',
g5Heartbeat: this.config.g5db?.enabled ? `ON → ${this.config.g5db?.g5Table}` : 'OFF',
roomStatus: dbCfg.roomStatusEnabled !== false ? `ON → ${dbCfg.roomStatusTable}` : 'OFF',
g5RoomStatus: this.config.g5db?.roomStatusEnabled ? `ON → ${this.config.g5db?.roomStatusTable}` : 'OFF',
};
console.log('双写配置摘要:', dualWriteSummary);
await this.redis?.info('双写配置摘要', { module: 'db', ...dualWriteSummary });
@@ -56,6 +76,7 @@ class WebBLSHeartbeatServer {
// 初始化处理器(共享批处理队列)
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, {
g5DatabaseManager: this.g5DatabaseManager,
redis: this.redis,
stats: this.stats,
onDbOffline: () => {
@@ -114,6 +135,10 @@ class WebBLSHeartbeatServer {
await this.databaseManager.disconnect();
}
if (this.g5DatabaseManager) {
await this.g5DatabaseManager.disconnect();
}
if (this.redis) {
await this.redis.info('BLS心跳接收端已停止', { module: 'app' });
await this.redis.disconnect();

View File

@@ -6,6 +6,7 @@ class HeartbeatProcessor {
constructor(config, databaseManager, deps = {}) {
this.config = config;
this.databaseManager = databaseManager;
this.g5DatabaseManager = deps?.g5DatabaseManager ?? null;
this.redis = deps?.redis ?? null;
this.stats = deps?.stats ?? null;
this.batchQueue = [];
@@ -218,14 +219,7 @@ class HeartbeatProcessor {
const { legacy: legacyResult, g4Hot: g4HotResult } = dualResult;
// B. room_status 始终独立执行(不依赖明细写入结果)
const roomStatusEnabled = this.databaseManager.config?.roomStatusEnabled !== false;
if (roomStatusEnabled && batchData.length > 0) {
this.databaseManager.upsertRoomStatus(batchData).catch(err => {
console.warn('异步同步 room_status 失败 (忽略):', err);
this.stats?.incRoomStatusFailed?.(batchData.length);
});
this.stats?.incRoomStatusWritten?.(batchData.length);
}
this._writeRoomStatus(batchData);
// C. 暂停消费判定(基于当前启用的关键 sink
const shouldPause = this._shouldPauseConsumption(legacyResult, g4HotResult);
@@ -240,14 +234,17 @@ class HeartbeatProcessor {
return;
}
// D. 清理队列、resolve deferreds
// D. G5 临时库独立写入,不参与主链路暂停判定
const g5Result = await this._writeToG5(batchData);
// E. 清理队列、resolve deferreds
this.batchQueue.splice(0, batchEventCount);
this.batchMessageQueue.splice(0, batchMessageCount);
for (const entry of batchMessages) {
entry.deferred.resolve({ insertedCount: entry.eventCount });
}
// E. 统计 & 日志
// F. 统计 & 日志
if (legacyResult.enabled) {
this.stats?.incDbWritten?.(legacyResult.insertedCount);
if (legacyResult.failedRecords.length > 0) {
@@ -263,7 +260,25 @@ class HeartbeatProcessor {
}
}
// F. 错误表:仅 g4Hot 失败记录(旧表失败不写错误表)
if (g5Result.enabled) {
const g5FailedCount = g5Result.failedRecords.length > 0
? g5Result.failedRecords.length
: (g5Result.isConnectionError ? batchData.length : 0);
this.stats?.incG5Written?.(g5Result.insertedCount);
if (g5FailedCount > 0) {
this.stats?.incG5WriteFailed?.(g5FailedCount);
console.warn(`[g5] 批次写入失败:成功 ${g5Result.insertedCount},失败 ${g5FailedCount}`);
await this.redis?.warn('G5批次写入失败', {
module: 'db',
table: this.g5DatabaseManager?.config?.g5Table,
insertedCount: g5Result.insertedCount,
failedCount: g5FailedCount,
error: String(g5Result.error?.message ?? g5Result.error ?? 'g5 write failed'),
});
}
}
// G. 错误表:仅 g4Hot 失败记录(旧表失败不写错误表)
if (g4HotResult.enabled && g4HotResult.failedRecords.length > 0) {
const dbPayload = g4HotResult.failedRecords.map(item => ({
hotel_id: item.record?.hotel_id ?? null,
@@ -276,7 +291,7 @@ class HeartbeatProcessor {
this.stats?.incG4HotErrorTableInserted?.(dbPayload.length);
}
// G. Legacy 失败仅日志(不写错误表)
// H. Legacy 失败仅日志(不写错误表)
if (legacyResult.enabled && legacyResult.failedRecords.length > 0) {
for (const item of legacyResult.failedRecords.slice(0, 10)) {
console.warn('[legacy] 单条写入失败:', item.error?.message);
@@ -379,6 +394,54 @@ class HeartbeatProcessor {
}, 5000);
}
async _writeToG5(batchData) {
if (!this.g5DatabaseManager?.config?.enabled) {
return { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null };
}
try {
return await this.g5DatabaseManager.insertHeartbeatEventsG5(batchData);
} catch (error) {
const isConnectionError = typeof this.g5DatabaseManager?._isDbConnectionError === 'function'
? this.g5DatabaseManager._isDbConnectionError(error)
: this._isConnectionError(error);
return {
enabled: true,
success: false,
insertedCount: 0,
failedRecords: [],
error,
isConnectionError,
batchError: error,
};
}
}
_writeRoomStatus(batchData) {
const oldRoomStatusEnabled = this.databaseManager.config?.roomStatusEnabled !== false;
if (oldRoomStatusEnabled && batchData.length > 0) {
this.databaseManager.upsertRoomStatus(batchData).catch(err => {
console.warn('异步同步 room_status 失败 (忽略):', err);
this.stats?.incRoomStatusFailed?.(batchData.length);
});
this.stats?.incRoomStatusWritten?.(batchData.length);
}
const g5RoomStatusEnabled = this.g5DatabaseManager?.config?.roomStatusEnabled === true;
if (g5RoomStatusEnabled && batchData.length > 0 && typeof this.g5DatabaseManager?.upsertRoomStatusG5 === 'function') {
this.g5DatabaseManager.upsertRoomStatusG5(batchData).catch(async (err) => {
console.warn('异步同步 G5 room_status 失败 (忽略):', err);
this.stats?.incRoomStatusFailed?.(batchData.length);
await this.redis?.warn('G5 room_status 同步失败', {
module: 'db',
table: this.g5DatabaseManager?.config?.roomStatusTable,
error: String(err?.message ?? err),
});
});
this.stats?.incRoomStatusWritten?.(batchData.length);
}
}
_emitDbWriteError(error, rawData) {
const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : [];
if (list.length > 0) {

View File

@@ -1,10 +1,13 @@
class StatsCounters {
constructor() {
// 原有 4 槽 + 新增 7 槽 = 11 槽
// [0] dbWritten, [1] filtered, [2] kafkaPulled, [3] dbWriteFailed,
// [4] g4HotWritten, [5] g4HotWriteFailed, [6] roomStatusWritten,
// [7] roomStatusFailed, [8] g4HotErrorTableInserted,
// [9] g5Written, [10] g5WriteFailed
// [0] dbWritten, [1] filtered, [2] kafkaPulled, [3] dbWriteFailed,
// [4] g4HotWritten, [5] g4HotWriteFailed, [6] roomStatusWritten,
// [7] roomStatusFailed, [8] g4HotErrorTableInserted
this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 9);
this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 11);
this._minute = new BigInt64Array(this._minuteBuf);
}
@@ -23,6 +26,8 @@ class StatsCounters {
incRoomStatusWritten(n = 1) { this._inc(6, n); }
incRoomStatusFailed(n = 1) { this._inc(7, n); }
incG4HotErrorTableInserted(n = 1) { this._inc(8, n); }
incG5Written(n = 1) { this._inc(9, n); }
incG5WriteFailed(n = 1) { this._inc(10, n); }
snapshotAndResetMinute() {
const dbWritten = Atomics.exchange(this._minute, 0, 0n);
@@ -34,7 +39,9 @@ class StatsCounters {
const roomStatusWritten = Atomics.exchange(this._minute, 6, 0n);
const roomStatusFailed = Atomics.exchange(this._minute, 7, 0n);
const g4HotErrorTableInserted = Atomics.exchange(this._minute, 8, 0n);
return { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted };
const g5Written = Atomics.exchange(this._minute, 9, 0n);
const g5WriteFailed = Atomics.exchange(this._minute, 10, 0n);
return { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted, g5Written, g5WriteFailed };
}
}
@@ -82,13 +89,15 @@ class StatsReporter {
if (this._lastFlushMinute === minuteKey) {
return;
}
const { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted } = this.stats.snapshotAndResetMinute();
const { dbWritten, filtered, kafkaPulled, dbWriteFailed, g4HotWritten, g4HotWriteFailed, roomStatusWritten, roomStatusFailed, g4HotErrorTableInserted, g5Written, g5WriteFailed } = this.stats.snapshotAndResetMinute();
this._lastFlushMinute = minuteKey;
const ts = formatTimestamp(new Date());
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入量: ${dbWritten}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Legacy写入失败量: ${dbWriteFailed}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入量: ${g4HotWritten}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot写入失败量: ${g4HotWriteFailed}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G5写入量: ${g5Written}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G5写入失败量: ${g5WriteFailed}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus写入量: ${roomStatusWritten}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} RoomStatus失败量: ${roomStatusFailed}`, metadata: { module: 'stats' } });
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} G4Hot错误表插入量: ${g4HotErrorTableInserted}`, metadata: { module: 'stats' } });