// 数据库管理器模块 import { Pool } from 'pg'; class DatabaseManager { constructor(config) { this.config = config; this.pool = null; this.partitionMaintenanceTimer = null; } async connect() { try { // 创建数据库连接池 this.pool = new Pool({ host: this.config.host, port: this.config.port, user: this.config.user, password: this.config.password, database: this.config.database, max: this.config.maxConnections, idleTimeoutMillis: this.config.idleTimeoutMillis, }); // 测试连接 const client = await this.pool.connect(); client.release(); console.log('数据库连接池创建成功'); // 初始化表结构 await this.initTables(); // 分区维护(方案1):启动时预创建 + 定时维护 await this.ensurePartitionsForRange({ startDayOffset: -1, endDayOffset: this.getPartitionFutureDays(), }); this.startPartitionMaintenance(); } catch (error) { console.error('数据库连接失败:', error); throw error; } } async disconnect() { try { this.stopPartitionMaintenance(); if (this.pool) { await this.pool.end(); console.log('数据库连接池已关闭'); } } catch (error) { console.error('关闭数据库连接池失败:', error); throw error; } } async initTables() { try { // 兼容:保留旧表(public.heartbeat),避免现有调用路径直接报错。 const legacyTableQuery = ` CREATE TABLE IF NOT EXISTS public.heartbeat ( id SERIAL PRIMARY KEY, component_id VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp); `; // v2:高吞吐按天分区表(位于 heartbeat schema) const v2SchemaQuery = ` BEGIN; CREATE SCHEMA IF NOT EXISTS heartbeat; CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( id bigserial, ts_ms bigint NOT NULL, hotel_id int2 NOT NULL, room_id int4 NOT NULL, device_id varchar(64) NOT NULL, ip inet NOT NULL, power_state int2 NOT NULL, guest_type int2 NOT NULL, cardless_state int2 NOT NULL, service_mask bigint NOT NULL, pms_state int2 NOT NULL, carbon_state int2 NOT NULL, device_count int2 NOT NULL, comm_seq int2 NOT NULL, extra jsonb, CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), CONSTRAINT chk_room_id_range CHECK (room_id >= 0), CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767) ) PARTITION BY RANGE (ts_ms); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); -- 分区预创建函数(按 Asia/Shanghai 自然日) CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) RETURNS bigint LANGUAGE sql IMMUTABLE AS $$ SELECT ( EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) * 1000 )::bigint; $$; CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) RETURNS text LANGUAGE sql IMMUTABLE AS $$ SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); $$; CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE start_ms bigint; end_ms bigint; part_name text; BEGIN start_ms := heartbeat.day_start_ms_shanghai(p_day); end_ms := start_ms + 86400000; part_name := heartbeat.partition_name_for_day(p_day); IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN RETURN; END IF; EXECUTE format( 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', part_name, start_ms, end_ms ); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE d date; BEGIN IF p_end_day < p_start_day THEN RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; END IF; d := p_start_day; WHILE d <= p_end_day LOOP PERFORM heartbeat.create_daily_partition(d); d := d + 1; END LOOP; END; $$; COMMIT; `; await this.pool.query(legacyTableQuery); await this.pool.query(v2SchemaQuery); console.log('数据库表初始化成功'); } catch (error) { console.error('数据库表初始化失败:', error); throw error; } } getPartitionConfig() { const cfg = this.config.partitionMaintenance ?? {}; return { enabled: cfg.enabled !== false, futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, }; } getPartitionFutureDays() { return this.getPartitionConfig().futureDays; } async ensurePartitionsForRange({ startDayOffset, endDayOffset }) { const startOffset = Number(startDayOffset ?? 0); const endOffset = Number(endDayOffset ?? 0); await this.pool.query( 'SELECT heartbeat.ensure_partitions(current_date + $1::int, current_date + $2::int)', [startOffset, endOffset] ); } startPartitionMaintenance() { const cfg = this.getPartitionConfig(); if (!cfg.enabled) { return; } if (this.partitionMaintenanceTimer) { return; } const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000); this.partitionMaintenanceTimer = setInterval(async () => { try { await this.ensurePartitionsForRange({ startDayOffset: -1, endDayOffset: this.getPartitionFutureDays(), }); console.log('[db] 分区预创建维护完成'); } catch (err) { console.error('[db] 分区预创建维护失败:', err); } }, intervalMs); // 不阻止进程退出 this.partitionMaintenanceTimer.unref?.(); } stopPartitionMaintenance() { if (this.partitionMaintenanceTimer) { clearInterval(this.partitionMaintenanceTimer); this.partitionMaintenanceTimer = null; } } formatShanghaiDate(tsMs) { const date = new Date(Number(tsMs)); const fmt = new Intl.DateTimeFormat('en-CA', { timeZone: 'Asia/Shanghai', year: 'numeric', month: '2-digit', day: '2-digit', }); return fmt.format(date); } async ensurePartitionsForTsRange(tsMin, tsMax) { const startDay = this.formatShanghaiDate(tsMin); const endDay = this.formatShanghaiDate(tsMax); await this.pool.query('SELECT heartbeat.ensure_partitions($1::date, $2::date)', [ startDay, endDay, ]); } isMissingPartitionError(error) { const msg = String(error?.message ?? ''); return msg.includes('no partition of relation') && msg.includes('heartbeat_events'); } // v2 明细表写入:用于未来对接 Kafka 心跳字段 async insertHeartbeatEvents(events) { if (!Array.isArray(events)) { events = [events]; } if (events.length === 0) return; const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); if (tsValues.length > 0) { await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); } const columns = [ 'ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip', 'power_state', 'guest_type', 'cardless_state', 'service_mask', 'pms_state', 'carbon_state', 'device_count', 'comm_seq', 'extra', ]; const values = []; const placeholders = events .map((e, rowIndex) => { const base = rowIndex * columns.length; values.push( e.ts_ms, e.hotel_id, e.room_id, e.device_id, e.ip, e.power_state, e.guest_type, e.cardless_state, e.service_mask, e.pms_state, e.carbon_state, e.device_count, e.comm_seq, e.extra ?? null ); const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); return `(${row})`; }) .join(', '); const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; try { await this.pool.query(sql, values); } catch (error) { // 兜底:若仍因缺分区失败,尝试确保“当前到未来 N 天”后重试一次 if (this.isMissingPartitionError(error)) { console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); await this.ensurePartitionsForRange({ startDayOffset: -7, endDayOffset: this.getPartitionFutureDays(), }); await this.pool.query(sql, values); return; } throw error; } } async insertHeartbeatData(data) { try { if (!Array.isArray(data)) { data = [data]; } if (data.length === 0) { return; } // 构建批量插入语句 const values = data.map(item => [ item.component_id, item.status, item.timestamp, item.data ]); const query = { text: ` INSERT INTO heartbeat (component_id, status, timestamp, data) VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')} `, values: values.flat() }; await this.pool.query(query); console.log(`成功插入 ${data.length} 条心跳数据`); } catch (error) { console.error('插入心跳数据失败:', error); throw error; } } async getLatestHeartbeat(componentId) { try { const query = { text: ` SELECT * FROM heartbeat WHERE component_id = $1 ORDER BY timestamp DESC LIMIT 1 `, values: [componentId] }; const result = await this.pool.query(query); return result.rows[0] || null; } catch (error) { console.error('查询最新心跳数据失败:', error); throw error; } } async getHeartbeatHistory(componentId, startTime, endTime) { try { const query = { text: ` SELECT * FROM heartbeat WHERE component_id = $1 AND timestamp BETWEEN $2 AND $3 ORDER BY timestamp DESC `, values: [componentId, startTime, endTime] }; const result = await this.pool.query(query); return result.rows; } catch (error) { console.error('查询心跳历史数据失败:', error); throw error; } } } export { DatabaseManager };