Files
Web_BLS_Heartbeat_Server/src/db/databaseManager.js
XuJiacheng ad270bd936 feat(heartbeat): 添加版本号字段并处理亮度值-1为NULL
- 在心跳事件表中新增 version 字段,用于存储版本号信息
- 将 bright_g 字段的 -1 值映射为数据库中的 NULL,避免语义混淆
- 更新相关文档、数据库迁移脚本和测试用例
2026-01-28 17:47:05 +08:00

640 lines
22 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 数据库管理器模块
import { Pool } from 'pg';
class DatabaseManager {
constructor(config) {
this.config = config;
this.pool = null;
this.partitionMaintenanceTimer = null;
}
async connect() {
try {
// 创建数据库连接池
this.pool = new Pool({
host: this.config.host,
port: this.config.port,
user: this.config.user,
password: this.config.password,
database: this.config.database,
max: this.config.maxConnections,
idleTimeoutMillis: this.config.idleTimeoutMillis,
});
// 测试连接
const client = await this.pool.connect();
client.release();
console.log('数据库连接池创建成功');
// 初始化表结构
await this.initTables();
// 分区维护方案1启动时预创建 + 定时维护
await this.ensurePartitionsForRange({
startDayOffset: -1,
endDayOffset: this.getPartitionFutureDays(),
});
this.startPartitionMaintenance();
} catch (error) {
console.error('数据库连接失败:', error);
throw error;
}
}
async disconnect() {
try {
this.stopPartitionMaintenance();
if (this.pool) {
await this.pool.end();
console.log('数据库连接池已关闭');
}
} catch (error) {
console.error('关闭数据库连接池失败:', error);
throw error;
}
}
async initTables() {
try {
// 兼容保留旧表public.heartbeat避免现有调用路径直接报错。
const legacyTableQuery = `
CREATE TABLE IF NOT EXISTS public.heartbeat (
id SERIAL PRIMARY KEY,
component_id VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_heartbeat_component_id ON public.heartbeat(component_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_timestamp ON public.heartbeat(timestamp);
`;
// v2高吞吐按天分区表位于 heartbeat schema
const v2SchemaQuery = `
BEGIN;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE SCHEMA IF NOT EXISTS heartbeat;
CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''),
ts_ms bigint NOT NULL,
write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint,
hotel_id int2 NOT NULL,
room_id varchar(50) NOT NULL,
device_id varchar(64) NOT NULL,
ip varchar(21) NOT NULL,
power_state int2 NOT NULL,
guest_type int2 NOT NULL,
cardless_state int2 NOT NULL,
service_mask bigint NOT NULL,
pms_state int2 NOT NULL,
carbon_state int2 NOT NULL,
device_count int2 NOT NULL,
comm_seq int4 NOT NULL,
insert_card int2,
bright_g int2,
elec_address text[],
air_address text[],
voltage double precision[],
ampere double precision[],
power double precision[],
phase text[],
energy double precision[],
sum_energy double precision[],
state int2[],
model int2[],
speed int2[],
set_temp int2[],
now_temp int2[],
solenoid_valve int2[],
extra jsonb,
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid),
CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'),
CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0),
CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767),
CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50),
CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767),
CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767),
CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767),
CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767),
CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767),
CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767),
CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0)
)
PARTITION BY RANGE (ts_ms);
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint;
ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS insert_card int2;
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS bright_g int2;
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask);
CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit
ON heartbeat.heartbeat_events ((service_mask & 1));
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model);
-- 分区预创建函数(按 Asia/Shanghai 自然日)
CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date)
RETURNS bigint
LANGUAGE sql
IMMUTABLE
AS $$
SELECT (
EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai'))
* 1000
)::bigint;
$$;
CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date)
RETURNS text
LANGUAGE sql
IMMUTABLE
AS $$
SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD'));
$$;
CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
start_ms bigint;
end_ms bigint;
part_name text;
BEGIN
start_ms := heartbeat.day_start_ms_shanghai(p_day);
end_ms := start_ms + 86400000;
part_name := heartbeat.partition_name_for_day(p_day);
IF to_regclass(format('heartbeat.%I', part_name)) IS NOT NULL THEN
RETURN;
END IF;
EXECUTE format(
'CREATE TABLE heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s);',
part_name, start_ms, end_ms
);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id);', 'idx_'||part_name||'_hotel_id', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (power_state);', 'idx_'||part_name||'_power_state', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name);
END;
$$;
CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
d date;
BEGIN
IF p_end_day < p_start_day THEN
RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day;
END IF;
d := p_start_day;
WHILE d <= p_end_day LOOP
PERFORM heartbeat.create_daily_partition(d);
d := d + 1;
END LOOP;
END;
$$;
COMMIT;
`;
await this.pool.query(legacyTableQuery);
await this.pool.query(v2SchemaQuery);
await this.ensureIpColumnVarchar();
await this.ensureRoomIdColumnVarchar();
console.log('数据库表初始化成功');
} catch (error) {
console.error('数据库表初始化失败:', error);
throw error;
}
}
async ensureRoomIdColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'room_id'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range');
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len');
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN room_id TYPE varchar(50)
USING room_id::text`
);
await this.pool.query(
'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)'
);
}
async ensureIpColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'ip'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
if (!type.startsWith('inet')) return;
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN ip TYPE varchar(21)
USING ip::text`
);
}
escapeIdentifier(id) {
return `"${String(id).replace(/"/g, '""')}"`;
}
getPartitionConfig() {
const cfg = this.config.partitionMaintenance ?? {};
return {
enabled: cfg.enabled !== false,
futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30,
intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6,
};
}
getPartitionFutureDays() {
return this.getPartitionConfig().futureDays;
}
async ensurePartitionsForRange({ startDayOffset, endDayOffset }) {
const startOffset = Number(startDayOffset ?? 0);
const endOffset = Number(endDayOffset ?? 0);
await this.pool.query(
"SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)",
[startOffset, endOffset]
);
}
startPartitionMaintenance() {
const cfg = this.getPartitionConfig();
if (!cfg.enabled) {
return;
}
if (this.partitionMaintenanceTimer) {
return;
}
const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000);
this.partitionMaintenanceTimer = setInterval(async () => {
try {
await this.ensurePartitionsForRange({
startDayOffset: -1,
endDayOffset: this.getPartitionFutureDays(),
});
console.log('[db] 分区预创建维护完成');
} catch (err) {
console.error('[db] 分区预创建维护失败:', err);
}
}, intervalMs);
}
stopPartitionMaintenance() {
if (this.partitionMaintenanceTimer) {
clearInterval(this.partitionMaintenanceTimer);
this.partitionMaintenanceTimer = null;
}
}
formatShanghaiDate(tsMs) {
const date = new Date(Number(tsMs));
const fmt = new Intl.DateTimeFormat('en-CA', {
timeZone: 'Asia/Shanghai',
year: 'numeric',
month: '2-digit',
day: '2-digit',
});
return fmt.format(date);
}
async ensurePartitionsForTsRange(tsMin, tsMax) {
const startDay = this.formatShanghaiDate(tsMin);
const endDay = this.formatShanghaiDate(tsMax);
await this.pool.query('SELECT heartbeat.ensure_partitions($1::date, $2::date)', [
startDay,
endDay,
]);
}
isMissingPartitionError(error) {
const msg = String(error?.message ?? '');
return msg.includes('no partition of relation') && msg.includes('heartbeat_events');
}
// v2 明细表写入:用于未来对接 Kafka 心跳字段
async insertHeartbeatEvents(events) {
if (!Array.isArray(events)) {
events = [events];
}
if (events.length === 0) return;
const columns = [
'ts_ms',
'write_ts_ms',
'hotel_id',
'room_id',
'device_id',
'ip',
'power_state',
'guest_type',
'cardless_state',
'service_mask',
'pms_state',
'carbon_state',
'device_count',
'comm_seq',
'insert_card',
'bright_g',
'version',
'elec_address',
'air_address',
'voltage',
'ampere',
'power',
'phase',
'energy',
'sum_energy',
'state',
'model',
'speed',
'set_temp',
'now_temp',
'solenoid_valve',
'extra',
];
const toRowValues = (e) => [
e.ts_ms,
e.write_ts_ms ?? Date.now(),
e.hotel_id,
e.room_id,
e.device_id,
e.ip,
e.power_state,
e.guest_type,
e.cardless_state,
e.service_mask,
e.pms_state,
e.carbon_state,
e.device_count,
e.comm_seq,
e.insert_card ?? null,
(e.bright_g === -1 || e.bright_g === '-1') ? null : (e.bright_g ?? null),
e.version ?? null,
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.voltage) ? e.voltage : null,
Array.isArray(e.ampere) ? e.ampere : null,
Array.isArray(e.power) ? e.power : null,
Array.isArray(e.phase) ? e.phase : null,
Array.isArray(e.energy) ? e.energy : null,
Array.isArray(e.sum_energy) ? e.sum_energy : null,
Array.isArray(e.state) ? e.state : null,
Array.isArray(e.model) ? e.model : null,
Array.isArray(e.speed) ? e.speed : null,
Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null,
];
const values = [];
const placeholders = events
.map((e, rowIndex) => {
const base = rowIndex * columns.length;
values.push(...toRowValues(e));
const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', ');
return `(${row})`;
})
.join(', ');
const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`;
const singleSql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES (${columns
.map((_, i) => `$${i + 1}`)
.join(', ')})`;
const runInsertOnce = async () => {
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
if (tsValues.length > 0) {
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const res = await client.query(sql, values);
const insertedCount = Number(res?.rowCount ?? 0);
if (insertedCount !== events.length) {
throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`);
}
await client.query('COMMIT');
return { insertedCount };
} catch (error) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
console.error('[db] rollback failed:', rollbackError);
}
throw error;
} finally {
client.release();
}
};
const retryAttempts = Number(this.config?.retryAttempts ?? 0);
const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000));
const maxAttempts = retryAttempts > 0 ? retryAttempts : 1;
let lastError = null;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
return await runInsertOnce();
} catch (error) {
lastError = error;
if (this.isMissingPartitionError(error)) {
console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次');
await this.ensurePartitionsForRange({
startDayOffset: -7,
endDayOffset: this.getPartitionFutureDays(),
});
}
if (attempt < maxAttempts) {
await new Promise((r) => setTimeout(r, retryDelay));
continue;
}
}
}
const failedRecords = [];
let insertedCount = 0;
console.error('[db] 批量写入失败,已切换为逐条写入:', lastError);
for (const event of events) {
try {
await this.pool.query(singleSql, toRowValues(event));
insertedCount += 1;
} catch (error) {
failedRecords.push({ error, record: event });
}
}
if (insertedCount === 0 && failedRecords.length === events.length) {
throw lastError;
}
return { insertedCount, failedRecords, batchError: lastError };
}
async insertHeartbeatData(data) {
try {
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0) {
return;
}
// 构建批量插入语句
const values = data.map(item => [
item.component_id,
item.status,
item.timestamp,
item.data
]);
const query = {
text: `
INSERT INTO heartbeat (component_id, status, timestamp, data)
VALUES ${values.map((_, index) => `($${index * 4 + 1}, $${index * 4 + 2}, $${index * 4 + 3}, $${index * 4 + 4})`).join(', ')}
`,
values: values.flat()
};
const res = await this.pool.query(query);
console.log(`成功插入 ${data.length} 条心跳数据`);
return { insertedCount: Number(res?.rowCount ?? data.length) };
} catch (error) {
console.error('插入心跳数据失败:', error);
throw error;
}
}
async getLatestHeartbeat(componentId) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
ORDER BY timestamp DESC
LIMIT 1
`,
values: [componentId]
};
const result = await this.pool.query(query);
return result.rows[0] || null;
} catch (error) {
console.error('查询最新心跳数据失败:', error);
throw error;
}
}
async getHeartbeatHistory(componentId, startTime, endTime) {
try {
const query = {
text: `
SELECT * FROM heartbeat
WHERE component_id = $1
AND timestamp BETWEEN $2 AND $3
ORDER BY timestamp DESC
`,
values: [componentId, startTime, endTime]
};
const result = await this.pool.query(query);
return result.rows;
} catch (error) {
console.error('查询心跳历史数据失败:', error);
throw error;
}
}
}
export { DatabaseManager };