feat: 新增 G4 热表独立双写能力

- 新增配置项以支持旧/新明细表的独立写入开关及目标表名。
- 重构 DatabaseManager,抽象通用批量 COPY 写入内核,支持不同目标表的复用。
- 新增双明细写入编排器,支持旧/新表独立执行、重试及 fallback。
- 调整 HeartbeatProcessor.processBatch(),确保 room_status 独立执行。
- 错误表仅记录新表写入失败,旧表失败不再写入错误表。
- 重新定义消费暂停策略,基于当前启用的关键 sink 判断。
- 补充按 sink 维度的统计项与启动日志。

新增 G4 热表相关的数据库规范与处理逻辑,确保系统在双写模式下的稳定性与可扩展性。
This commit is contained in:
2026-03-09 15:49:12 +08:00
parent f59000f5ef
commit 43fa7505e5
21 changed files with 2546 additions and 154 deletions

View File

@@ -1,6 +1,7 @@
import { Pool } from 'pg';
import { pipeline } from 'stream/promises';
import { Readable } from 'stream';
import { randomUUID } from 'node:crypto';
import pgCopyStreams from 'pg-copy-streams';
const { from: copyFrom } = pgCopyStreams;
@@ -70,6 +71,13 @@ class DatabaseManager {
return `"${String(id).replace(/"/g, '""')}"`;
}
_normalizeGuid(guid) {
if (guid === null || guid === undefined || guid === '') {
return randomUUID().replace(/-/g, '').toLowerCase();
}
return String(guid).replace(/-/g, '').toLowerCase();
}
formatShanghaiDate(tsMs) {
const date = new Date(Number(tsMs));
const fmt = new Intl.DateTimeFormat('en-CA', {
@@ -90,13 +98,382 @@ class DatabaseManager {
]);
}
isMissingPartitionError(error) {
isMissingPartitionError(error, tableName) {
const msg = String(error?.message ?? '');
if (tableName) return msg.includes('no partition of relation') && msg.includes(tableName);
return msg.includes('no partition of relation') && msg.includes('heartbeat_events');
}
// v2 明细表写入:用于未来对接 Kafka 心跳字段
// ---- 共享格式化工具 ----
_formatPgCol(v) {
if (v === null || v === undefined) return '\\N';
if (typeof v === 'boolean') return v ? 't' : 'f';
if (Array.isArray(v)) {
const inner = v.map((item) => {
if (item === null || item === undefined) return 'NULL';
const s = String(item);
return `"${s.replace(/\\/g, '\\\\').replace(/"/g, '\\"')}"`;
});
const arrStr = `{${inner.join(',')}}`;
return arrStr.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t');
}
const s = typeof v === 'object' ? JSON.stringify(v) : String(v);
return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t');
}
_isDbConnectionError(error) {
if (!error) return false;
const connCodes = ['57P03', '08006', '08001', '08003', '08004', '08007'];
if (connCodes.includes(error.code)) return true;
if (error.message && /ECONNREFUSED|ETIMEDOUT|connection/i.test(error.message)) return true;
return false;
}
// ---- 旧表列定义 ----
_getLegacyColumns() {
return [
'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip',
'power_state', 'guest_type', 'cardless_state', 'service_mask',
'pms_state', 'carbon_state', 'device_count', 'comm_seq',
'insert_card', 'bright_g', 'version',
'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase',
'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp',
'now_temp', 'solenoid_valve', 'extra',
];
}
_legacyToRowValues(e) {
return [
e.ts_ms,
e.write_ts_ms ?? Date.now(),
e.hotel_id,
e.room_id,
e.device_id,
e.ip,
e.power_state,
e.guest_type,
e.cardless_state,
e.service_mask,
e.pms_state,
e.carbon_state,
e.device_count,
e.comm_seq,
e.insert_card ?? null,
(e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null),
e.version ?? null,
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.voltage) ? e.voltage : null,
Array.isArray(e.ampere) ? e.ampere : null,
Array.isArray(e.power) ? e.power : null,
Array.isArray(e.phase) ? e.phase : null,
Array.isArray(e.energy) ? e.energy : null,
Array.isArray(e.sum_energy) ? e.sum_energy : null,
Array.isArray(e.state) ? e.state : null,
Array.isArray(e.model) ? e.model : null,
Array.isArray(e.speed) ? e.speed : null,
Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null,
];
}
// ---- 新表 G4 Hot 列定义 ----
_getG4HotColumns() {
const base = [
'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip',
'power_state', 'guest_type', 'cardless_state', 'service_mask',
'pms_state', 'carbon_state', 'device_count', 'comm_seq',
'insert_card', 'bright_g', 'version',
'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase',
'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp',
'now_temp', 'solenoid_valve', 'extra',
];
const svc = Array.from({ length: 64 }, (_, i) => `svc_${String(i + 1).padStart(2, '0')}`);
const airUnpacked = [
'air_address_1', 'air_address_2', 'air_address_residual',
'state_1', 'state_2', 'state_residual',
'model_1', 'model_2', 'model_residual',
'speed_1', 'speed_2', 'speed_residual',
'set_temp_1', 'set_temp_2', 'set_temp_residual',
'now_temp_1', 'now_temp_2', 'now_temp_residual',
'solenoid_valve_1', 'solenoid_valve_2', 'solenoid_valve_residual',
];
const elecUnpacked = [
'elec_address_1', 'elec_address_2', 'elec_address_residual',
'voltage_1', 'voltage_2', 'voltage_residual',
'ampere_1', 'ampere_2', 'ampere_residual',
'power_1', 'power_2', 'power_residual',
'phase_1', 'phase_2', 'phase_residual',
'energy_1', 'energy_2', 'energy_residual',
'sum_energy_1', 'sum_energy_2', 'sum_energy_residual',
];
const power = ['power_carbon_on', 'power_carbon_off', 'power_person_exist', 'power_person_left'];
const tail = ['guid'];
return [...base, ...svc, ...airUnpacked, ...elecUnpacked, ...power, ...tail];
}
_unpackArrElement(arr, idx) {
if (!Array.isArray(arr) || idx >= arr.length) return null;
return arr[idx] ?? null;
}
_unpackArrResidual(arr) {
if (!Array.isArray(arr) || arr.length <= 2) return null;
return arr.slice(2);
}
_g4HotToRowValues(e) {
const values = [
e.ts_ms,
e.write_ts_ms ?? Date.now(),
e.hotel_id,
e.room_id,
e.device_id,
e.ip,
e.power_state,
e.guest_type,
e.cardless_state,
e.service_mask,
e.pms_state,
e.carbon_state,
e.device_count,
e.comm_seq,
e.insert_card ?? null,
(e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null),
e.version ?? null,
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.voltage) ? e.voltage : null,
Array.isArray(e.ampere) ? e.ampere : null,
Array.isArray(e.power) ? e.power : null,
Array.isArray(e.phase) ? e.phase : null,
Array.isArray(e.energy) ? e.energy : null,
Array.isArray(e.sum_energy) ? e.sum_energy : null,
Array.isArray(e.state) ? e.state : null,
Array.isArray(e.model) ? e.model : null,
Array.isArray(e.speed) ? e.speed : null,
Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null,
];
// svc_01 .. svc_64 布尔展开
const mask = e.service_mask != null ? BigInt(e.service_mask) : null;
for (let i = 0; i < 64; i++) {
values.push(mask != null ? Boolean((mask >> BigInt(i)) & 1n) : null);
}
// 空调展开 _1, _2, _residual
const airArr = Array.isArray(e.air_address) ? e.air_address : null;
const stateArr = Array.isArray(e.state) ? e.state : null;
const modelArr = Array.isArray(e.model) ? e.model : null;
const speedArr = Array.isArray(e.speed) ? e.speed : null;
const setTempArr = Array.isArray(e.set_temp) ? e.set_temp : null;
const nowTempArr = Array.isArray(e.now_temp) ? e.now_temp : null;
const svArr = Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null;
for (const arr of [airArr, stateArr, modelArr, speedArr, setTempArr, nowTempArr, svArr]) {
values.push(this._unpackArrElement(arr, 0));
values.push(this._unpackArrElement(arr, 1));
values.push(this._unpackArrResidual(arr));
}
// 电力展开 _1, _2, _residual
const elecAddr = Array.isArray(e.elec_address) ? e.elec_address : null;
const voltArr = Array.isArray(e.voltage) ? e.voltage : null;
const ampArr = Array.isArray(e.ampere) ? e.ampere : null;
const powArr = Array.isArray(e.power) ? e.power : null;
const phaseArr = Array.isArray(e.phase) ? e.phase : null;
const energyArr = Array.isArray(e.energy) ? e.energy : null;
const sumEnergyArr = Array.isArray(e.sum_energy) ? e.sum_energy : null;
for (const arr of [elecAddr, voltArr, ampArr, powArr, phaseArr, energyArr, sumEnergyArr]) {
values.push(this._unpackArrElement(arr, 0));
values.push(this._unpackArrElement(arr, 1));
values.push(this._unpackArrResidual(arr));
}
// power 辅助字段:当前计算逻辑尚未接入,临时统一写 null
values.push(null);
values.push(null);
values.push(null);
values.push(null);
values.push(this._normalizeGuid(e.guid));
return values;
}
// ---- 通用 COPY + fallback INSERT 写入内核 ----
// target: { tableName, columns, toRowValues, ensurePartitions, logPrefix, missingPartitionTable }
// 返回: { success, insertedCount, failedRecords, error, isConnectionError, batchError }
async _insertEventsToTarget(events, target) {
const { tableName, columns, toRowValues, ensurePartitions, logPrefix, missingPartitionTable } = target;
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
const tsMin = tsValues.length > 0 ? Math.min(...tsValues) : null;
const tsMax = tsValues.length > 0 ? Math.max(...tsValues) : null;
const self = this;
const runInsertOnce = async () => {
if (ensurePartitions && tsMin !== null) {
await self.ensurePartitionsForTsRange(tsMin, tsMax);
}
const client = await self.pool.connect();
try {
const copySql = `COPY ${tableName} (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`;
const stream = client.query(copyFrom(copySql));
async function* generateRows() {
for (const e of events) {
const line = toRowValues(e).map((v) => self._formatPgCol(v)).join('\t') + '\n';
yield line;
}
}
await pipeline(Readable.from(generateRows()), stream);
return { insertedCount: events.length };
} finally {
client.release();
}
};
const retryAttempts = Number(this.config?.retryAttempts ?? 0);
const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000));
const maxAttempts = retryAttempts > 0 ? retryAttempts : 1;
let lastError = null;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
const r = await runInsertOnce();
return { success: true, insertedCount: r.insertedCount, failedRecords: [], error: null, isConnectionError: false, batchError: null };
} catch (error) {
lastError = error;
if (ensurePartitions && missingPartitionTable && this.isMissingPartitionError(error, missingPartitionTable)) {
console.warn(`${logPrefix} 检测到缺分区写入失败,执行兜底预创建并重试一次`);
if (tsMin !== null) {
await this.ensurePartitionsForTsRange(tsMin, tsMax);
}
}
if (attempt < maxAttempts) {
await new Promise((r) => setTimeout(r, retryDelay));
continue;
}
}
}
// COPY 全失败,降级逐条 INSERT
const failedRecords = [];
let insertedCount = 0;
console.error(`${logPrefix} 批量写入失败,已切换为逐条写入:`, lastError);
const singleSql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${columns.map((_, i) => `$${i + 1}`).join(', ')})`;
for (const event of events) {
try {
await this.pool.query(singleSql, toRowValues(event));
insertedCount += 1;
} catch (error) {
if (this._isDbConnectionError(error)) {
return { success: false, insertedCount, failedRecords, error, isConnectionError: true, batchError: error };
}
failedRecords.push({ error, record: event });
}
}
if (insertedCount === 0 && failedRecords.length === events.length && this._isDbConnectionError(lastError)) {
return { success: false, insertedCount: 0, failedRecords, error: lastError, isConnectionError: true, batchError: lastError };
}
return {
success: insertedCount > 0 || failedRecords.length === 0,
insertedCount,
failedRecords,
error: lastError,
isConnectionError: false,
batchError: (insertedCount === 0 && failedRecords.length === events.length) ? lastError : null,
};
}
// ---- 旧/新双写编排 ----
async insertHeartbeatEventsDual(events) {
if (!Array.isArray(events)) events = [events];
if (events.length === 0) {
const empty = { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null };
return { legacy: { ...empty }, g4Hot: { ...empty } };
}
const legacyEnabled = this.config.legacyHeartbeatEnabled;
const g4HotEnabled = this.config.g4HotHeartbeatEnabled;
const empty = { enabled: false, success: true, insertedCount: 0, failedRecords: [], error: null, isConnectionError: false, batchError: null };
const promises = [];
let legacyIdx = -1;
let g4HotIdx = -1;
if (legacyEnabled) {
legacyIdx = promises.length;
promises.push(this._insertEventsToTarget(events, {
tableName: this.config.legacyTable ?? 'heartbeat.heartbeat_events',
columns: this._getLegacyColumns(),
toRowValues: (e) => this._legacyToRowValues(e),
ensurePartitions: true,
logPrefix: '[legacy]',
missingPartitionTable: 'heartbeat_events',
}));
}
if (g4HotEnabled) {
g4HotIdx = promises.length;
promises.push(this._insertEventsToTarget(events, {
tableName: this.config.g4HotTable ?? 'heartbeat.heartbeat_events_g4_hot',
columns: this._getG4HotColumns(),
toRowValues: (e) => this._g4HotToRowValues(e),
ensurePartitions: false,
logPrefix: '[g4hot]',
missingPartitionTable: null,
}));
}
if (promises.length === 0) {
return { legacy: { ...empty }, g4Hot: { ...empty } };
}
const settled = await Promise.allSettled(promises);
const wrap = (s) => {
if (!s) return { ...empty };
if (s.status === 'fulfilled') return { ...s.value, enabled: true };
return { enabled: true, success: false, insertedCount: 0, failedRecords: [], error: s.reason, isConnectionError: this._isDbConnectionError(s.reason), batchError: s.reason };
};
return {
legacy: legacyIdx >= 0 ? wrap(settled[legacyIdx]) : { ...empty },
g4Hot: g4HotIdx >= 0 ? wrap(settled[g4HotIdx]) : { ...empty },
};
}
// v2 明细表写入(向后兼容封装,仅旧表,抛出连接错误)
async insertHeartbeatEvents(events) {
if (!Array.isArray(events)) events = [events];
if (events.length === 0) return;
const result = await this._insertEventsToTarget(events, {
tableName: this.config.legacyTable ?? 'heartbeat.heartbeat_events',
columns: this._getLegacyColumns(),
toRowValues: (e) => this._legacyToRowValues(e),
ensurePartitions: true,
logPrefix: '[legacy]',
missingPartitionTable: 'heartbeat_events',
});
if (result.isConnectionError && result.error) throw result.error;
return { insertedCount: result.insertedCount, failedRecords: result.failedRecords, batchError: result.batchError };
}
// [DEPRECATED] 旧版直写实现,已由 _insertEventsToTarget 替代,后续可移除
async _insertHeartbeatEventsLegacyDirect(events) {
if (!Array.isArray(events)) {
events = [events];
}