// 数据库管理器模块 import { Pool } from 'pg'; class DatabaseManager { constructor(config) { this.config = config; this.pool = null; this.partitionMaintenanceTimer = null; } async connect() { try { // 创建数据库连接池 this.pool = new Pool({ host: this.config.host, port: this.config.port, user: this.config.user, password: this.config.password, database: this.config.database, max: this.config.maxConnections, idleTimeoutMillis: this.config.idleTimeoutMillis, }); // 测试连接 const client = await this.pool.connect(); client.release(); console.log('数据库连接池创建成功'); // 初始化表结构 await this.initTables(); // 分区维护(方案1):启动时预创建 + 定时维护 await this.ensurePartitionsForRange({ startDayOffset: -1, endDayOffset: this.getPartitionFutureDays(), }); this.startPartitionMaintenance(); } catch (error) { console.error('数据库连接失败:', error); throw error; } } async disconnect() { try { this.stopPartitionMaintenance(); if (this.pool) { await this.pool.end(); console.log('数据库连接池已关闭'); } } catch (error) { console.error('关闭数据库连接池失败:', error); throw error; } } async initTables() { try { // 兼容:保留旧表(public.heartbeat),避免现有调用路径直接报错。 const legacyTableQuery = ` CREATE TABLE IF NOT EXISTS public.heartbeat ( id SERIAL PRIMARY KEY, component_id VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp); `; // v2:高吞吐按天分区表(位于 heartbeat schema) const v2SchemaQuery = ` BEGIN; CREATE EXTENSION IF NOT EXISTS pgcrypto; CREATE SCHEMA IF NOT EXISTS heartbeat; CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), ts_ms bigint NOT NULL, write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint, hotel_id int2 NOT NULL, room_id varchar(50) NOT NULL, device_id varchar(64) NOT NULL, ip varchar(21) NOT NULL, power_state int2 NOT NULL, guest_type int2 NOT NULL, cardless_state int2 NOT NULL, service_mask bigint NOT NULL, pms_state int2 NOT NULL, carbon_state int2 NOT NULL, device_count int2 NOT NULL, comm_seq int4 NOT NULL, elec_address text[], air_address text[], voltage double precision[], ampere double precision[], power double precision[], phase text[], energy double precision[], sum_energy double precision[], state int2[], model int2[], speed int2[], set_temp int2[], now_temp int2[], solenoid_valve int2[], extra jsonb, CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid), CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'), CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50), CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0) ) PARTITION BY RANGE (ts_ms); ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint; ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[]; ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[]; CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit ON heartbeat.heartbeat_events ((service_mask & 1)); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model); -- 分区预创建函数(按 Asia/Shanghai 自然日) CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) RETURNS bigint LANGUAGE sql IMMUTABLE AS $$ SELECT ( EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) * 1000 )::bigint; $$; CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) RETURNS text LANGUAGE sql IMMUTABLE AS $$ SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); $$; CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE start_ms bigint; end_ms bigint; part_name text; BEGIN start_ms := heartbeat.day_start_ms_shanghai(p_day); end_ms := start_ms + 86400000; part_name := heartbeat.partition_name_for_day(p_day); IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN RETURN; END IF; EXECUTE format( 'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);', part_name, start_ms, end_ms ); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) RETURNS void LANGUAGE plpgsql AS $$ DECLARE d date; BEGIN IF p_end_day < p_start_day THEN RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; END IF; d := p_start_day; WHILE d <= p_end_day LOOP PERFORM heartbeat.create_daily_partition(d); d := d + 1; END LOOP; END; $$; COMMIT; `; await this.pool.query(legacyTableQuery); await this.pool.query(v2SchemaQuery); await this.ensureIpColumnVarchar(); await this.ensureRoomIdColumnVarchar(); console.log('数据库表初始化成功'); } catch (error) { console.error('数据库表初始化失败:', error); throw error; } } async ensureRoomIdColumnVarchar() { const res = await this.pool.query( ` SELECT format_type(a.atttypid, a.atttypmod) AS type FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events' AND a.attname = 'room_id' AND a.attnum > 0 AND NOT a.attisdropped ` ); const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); if (!type) return; if (type.startsWith('character varying')) return; await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range'); await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len'); await this.pool.query( `ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN room_id TYPE varchar(50) USING room_id::text` ); await this.pool.query( 'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)' ); } async ensureIpColumnVarchar() { const res = await this.pool.query( ` SELECT format_type(a.atttypid, a.atttypmod) AS type FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events' AND a.attname = 'ip' AND a.attnum > 0 AND NOT a.attisdropped ` ); const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); if (!type) return; if (type.startsWith('character varying')) return; if (!type.startsWith('inet')) return; await this.pool.query( `ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN ip TYPE varchar(21) USING ip::text` ); } escapeIdentifier(id) { return `"${String(id).replace(/"/g, '""')}"`; } getPartitionConfig() { const cfg = this.config.partitionMaintenance ?? {}; return { enabled: cfg.enabled !== false, futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, }; } getPartitionFutureDays() { return this.getPartitionConfig().futureDays; } async ensurePartitionsForRange({ startDayOffset, endDayOffset }) { const startOffset = Number(startDayOffset ?? 0); const endOffset = Number(endDayOffset ?? 0); await this.pool.query( "SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)", [startOffset, endOffset] ); } startPartitionMaintenance() { const cfg = this.getPartitionConfig(); if (!cfg.enabled) { return; } if (this.partitionMaintenanceTimer) { return; } const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000); this.partitionMaintenanceTimer = setInterval(async () => { try { await this.ensurePartitionsForRange({ startDayOffset: -1, endDayOffset: this.getPartitionFutureDays(), }); console.log('[db] 分区预创建维护完成'); } catch (err) { console.error('[db] 分区预创建维护失败:', err); } }, intervalMs); } stopPartitionMaintenance() { if (this.partitionMaintenanceTimer) { clearInterval(this.partitionMaintenanceTimer); this.partitionMaintenanceTimer = null; } } formatShanghaiDate(tsMs) { const date = new Date(Number(tsMs)); const fmt = new Intl.DateTimeFormat('en-CA', { timeZone: 'Asia/Shanghai', year: 'numeric', month: '2-digit', day: '2-digit', }); return fmt.format(date); } async ensurePartitionsForTsRange(tsMin, tsMax) { const startDay = this.formatShanghaiDate(tsMin); const endDay = this.formatShanghaiDate(tsMax); await this.pool.query('SELECT heartbeat.ensure_partitions($1::date, $2::date)', [ startDay, endDay, ]); } isMissingPartitionError(error) { const msg = String(error?.message ?? ''); return msg.includes('no partition of relation') && msg.includes('heartbeat_events'); } // v2 明细表写入:用于未来对接 Kafka 心跳字段 async insertHeartbeatEvents(events) { if (!Array.isArray(events)) { events = [events]; } if (events.length === 0) return; const columns = [ 'ts_ms', 'write_ts_ms', 'hotel_id', 'room_id', 'device_id', 'ip', 'power_state', 'guest_type', 'cardless_state', 'service_mask', 'pms_state', 'carbon_state', 'device_count', 'comm_seq', 'elec_address', 'air_address', 'voltage', 'ampere', 'power', 'phase', 'energy', 'sum_energy', 'state', 'model', 'speed', 'set_temp', 'now_temp', 'solenoid_valve', 'extra', ]; const toRowValues = (e) => [ e.ts_ms, e.write_ts_ms ?? Date.now(), e.hotel_id, e.room_id, e.device_id, e.ip, e.power_state, e.guest_type, e.cardless_state, e.service_mask, e.pms_state, e.carbon_state, e.device_count, e.comm_seq, Array.isArray(e.elec_address) ? e.elec_address : null, Array.isArray(e.air_address) ? e.air_address : null, Array.isArray(e.voltage) ? e.voltage : null, Array.isArray(e.ampere) ? e.ampere : null, Array.isArray(e.power) ? e.power : null, Array.isArray(e.phase) ? e.phase : null, Array.isArray(e.energy) ? e.energy : null, Array.isArray(e.sum_energy) ? e.sum_energy : null, Array.isArray(e.state) ? e.state : null, Array.isArray(e.model) ? e.model : null, Array.isArray(e.speed) ? e.speed : null, Array.isArray(e.set_temp) ? e.set_temp : null, Array.isArray(e.now_temp) ? e.now_temp : null, Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, e.extra ?? null, ]; const values = []; const placeholders = events .map((e, rowIndex) => { const base = rowIndex * columns.length; values.push(...toRowValues(e)); const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); return `(${row})`; }) .join(', '); const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns .map((_, i) => `$${i + 1}`) .join(', ')})`; const runInsertOnce = async () => { const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); if (tsValues.length > 0) { await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); } const client = await this.pool.connect(); try { await client.query('BEGIN'); const res = await client.query(sql, values); const insertedCount = Number(res?.rowCount ?? 0); if (insertedCount !== events.length) { throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`); } await client.query('COMMIT'); return { insertedCount }; } catch (error) { try { await client.query('ROLLBACK'); } catch (rollbackError) { console.error('[db] rollback failed:', rollbackError); } throw error; } finally { client.release(); } }; const retryAttempts = Number(this.config?.retryAttempts ?? 0); const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000)); const maxAttempts = retryAttempts > 0 ? retryAttempts : 1; let lastError = null; for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { try { return await runInsertOnce(); } catch (error) { lastError = error; if (this.isMissingPartitionError(error)) { console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); await this.ensurePartitionsForRange({ startDayOffset: -7, endDayOffset: this.getPartitionFutureDays(), }); } if (attempt < maxAttempts) { await new Promise((r) => setTimeout(r, retryDelay)); continue; } } } const failedRecords = []; let insertedCount = 0; console.error('[db] 批量写入失败,已切换为逐条写入:', lastError); for (const event of events) { try { await this.pool.query(singleSql, toRowValues(event)); insertedCount += 1; } catch (error) { failedRecords.push({ error, record: event }); } } if (insertedCount === 0 && failedRecords.length === events.length) { throw lastError; } return { insertedCount, failedRecords, batchError: lastError }; } async insertHeartbeatData(data) { try { if (!Array.isArray(data)) { data = [data]; } if (data.length === 0) { return; } // 构建批量插入语句 const values = data.map(item => [ item.component_id, item.status, item.timestamp, item.data ]); const query = { text: ` INSERT INTO heartbeat (component_id, status, timestamp, data) VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')} `, values: values.flat() }; const res = await this.pool.query(query); console.log(`成功插入 ${data.length} 条心跳数据`); return { insertedCount: Number(res?.rowCount ?? data.length) }; } catch (error) { console.error('插入心跳数据失败:', error); throw error; } } async getLatestHeartbeat(componentId) { try { const query = { text: ` SELECT * FROM heartbeat WHERE component_id = $1 ORDER BY timestamp DESC LIMIT 1 `, values: [componentId] }; const result = await this.pool.query(query); return result.rows[0] || null; } catch (error) { console.error('查询最新心跳数据失败:', error); throw error; } } async getHeartbeatHistory(componentId, startTime, endTime) { try { const query = { text: ` SELECT * FROM heartbeat WHERE component_id = $1 AND timestamp BETWEEN $2 AND $3 ORDER BY timestamp DESC `, values: [componentId, startTime, endTime] }; const result = await this.pool.query(query); return result.rows; } catch (error) { console.error('查询心跳历史数据失败:', error); throw error; } } } export { DatabaseManager };