feat: 移除运行时代码中的数据库初始化与分区维护逻辑

- 新增备份 SQL 脚本 `01_init_schema.sql` 和 `02_create_partitions.sql`,用于数据库结构初始化和分区预创建。
- 新增 Node.js 脚本 `run_init.js` 和 `run_ensure_partitions.js`,支持通过外部程序调用进行数据库初始化和分区维护。
- 确保数据库初始化脚本支持幂等重复执行。
- 更新文档,说明新的执行顺序和使用方法。
- 移除运行时相关的数据库初始化和分区维护配置,简化服务职责。
- 保留写入失败时的缺分区兜底逻辑,确保服务稳定性。
This commit is contained in:
2026-03-04 11:47:22 +08:00
parent b4967f4c35
commit 3b98c6239b
11 changed files with 732 additions and 390 deletions

View File

@@ -53,17 +53,6 @@ export default {
retryAttempts: 3, // 重试次数
retryDelay: 1000, // 重试延迟
// 是否启用数据库初始化与分区维护(若为 false跳过建表、分区预创建、定时分区检查
initAndPartitionEnabled: (env.DB_INIT_AND_PARTITION_ENABLED ?? 'true') === 'true',
// 分区维护方案1启动时预创建 + 周期维护
partitionMaintenance: {
enabled: true,
futureDays: 30,
intervalHours: 6
}
},
// 日志配置
logger: {
level: env.LOG_LEVEL ?? 'info',

View File

@@ -8,8 +8,6 @@ class DatabaseManager {
constructor(config) {
this.config = config;
this.pool = null;
this.partitionMaintenanceTimer = null;
}
async connect() {
@@ -36,21 +34,6 @@ class DatabaseManager {
const client = await this.pool.connect();
client.release();
console.log('数据库连接池创建成功');
// 根据配置决定是否执行初始化与分区维护
if (this.config.initAndPartitionEnabled !== false) {
// 初始化表结构
await this.initTables();
// 分区维护方案1启动时预创建 + 定时维护
await this.ensurePartitionsForRange({
startDayOffset: -1,
endDayOffset: this.getPartitionFutureDays(),
});
this.startPartitionMaintenance();
} else {
console.log('[db] 已禁用数据库初始化与分区维护DB_INIT_AND_PARTITION_ENABLED=false跳过建表、分区预创建、定时维护');
}
} catch (error) {
console.error('数据库连接失败:', error);
throw error;
@@ -73,7 +56,6 @@ class DatabaseManager {
async disconnect() {
try {
this.stopPartitionMaintenance();
if (this.pool) {
await this.pool.end();
console.log('数据库连接池已关闭');
@@ -84,360 +66,10 @@ class DatabaseManager {
}
}
async initTables() {
try {
// v2高吞吐按天分区表位于 heartbeat schema
const v2SchemaQuery = `
BEGIN;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE SCHEMA IF NOT EXISTS heartbeat;
CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''),
ts_ms bigint NOT NULL,
write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint,
hotel_id int2 NOT NULL,
room_id varchar(50) NOT NULL,
device_id varchar(64) NOT NULL,
ip varchar(21) NOT NULL,
power_state int2 NOT NULL,
guest_type int2 NOT NULL,
cardless_state int2 NOT NULL,
service_mask bigint NOT NULL,
pms_state int2 NOT NULL,
carbon_state int2 NOT NULL,
device_count int2 NOT NULL,
comm_seq int4 NOT NULL,
insert_card int2,
bright_g int2,
elec_address text[],
air_address text[],
voltage double precision[],
ampere double precision[],
power double precision[],
phase text[],
energy double precision[],
sum_energy double precision[],
state int2[],
model int2[],
speed int2[],
set_temp int2[],
now_temp int2[],
solenoid_valve int2[],
extra jsonb,
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid),
CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'),
CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0),
CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767),
CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50),
CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767),
CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767),
CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767),
CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767),
CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767),
CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767),
CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0)
)
PARTITION BY RANGE (ts_ms);
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint;
ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS insert_card int2;
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS bright_g int2;
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask);
CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit
ON heartbeat.heartbeat_events ((service_mask & 1));
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model);
-- 分区预创建函数(按 Asia/Shanghai 自然日)
CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date)
RETURNS bigint
LANGUAGE sql
IMMUTABLE
AS $$
SELECT (
EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai'))
* 1000
)::bigint;
$$;
CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date)
RETURNS text
LANGUAGE sql
IMMUTABLE
AS $$
SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD'));
$$;
-- 强制将分区及其索引分配到指定表空间(幂等)
CREATE OR REPLACE FUNCTION heartbeat.relocate_partition_to_tablespace(
p_schema text,
p_partition text,
p_tablespace text DEFAULT 'ts_hot'
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_part_oid oid;
v_toast_oid oid;
r record;
BEGIN
SELECT c.oid INTO v_part_oid
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = p_schema
AND c.relname = p_partition
AND c.relkind = 'r';
IF v_part_oid IS NULL THEN
RAISE EXCEPTION 'partition %.% not found', p_schema, p_partition;
END IF;
-- 1) 分区表对象 -> 指定 tablespace
EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', p_schema, p_partition, p_tablespace);
-- 2) 分区全部索引 -> 指定 tablespace
FOR r IN
SELECT idxn.nspname AS index_schema, i.relname AS index_name
FROM pg_index x
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
WHERE x.indrelid = v_part_oid
AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace
LOOP
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace);
END LOOP;
-- 3) TOAST 表 + TOAST 索引 -> 指定 tablespace若存在
SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid = v_part_oid;
IF v_toast_oid IS NOT NULL AND v_toast_oid <> 0 THEN
EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, p_tablespace);
FOR r IN
SELECT idxn.nspname AS index_schema, i.relname AS index_name
FROM pg_index x
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
WHERE x.indrelid = v_toast_oid
AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace
LOOP
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace);
END LOOP;
END IF;
-- 4) 统计信息
EXECUTE format('ANALYZE %I.%I', p_schema, p_partition);
END;
$$;
-- 创建单日分区(幂等);父表索引自动继承到子表,无需手动建索引
CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
start_ms bigint;
end_ms bigint;
part_name text;
BEGIN
start_ms := heartbeat.day_start_ms_shanghai(p_day);
end_ms := start_ms + 86400000;
part_name := heartbeat.partition_name_for_day(p_day);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
part_name, start_ms, end_ms
);
PERFORM heartbeat.relocate_partition_to_tablespace('heartbeat', part_name, 'ts_hot');
END;
$$;
CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
d date;
BEGIN
IF p_end_day < p_start_day THEN
RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day;
END IF;
FOR d IN SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date
LOOP
PERFORM heartbeat.create_daily_partition(d);
END LOOP;
END;
$$;
COMMIT;
`;
await this.pool.query(v2SchemaQuery);
await this.ensureIpColumnVarchar();
await this.ensureRoomIdColumnVarchar();
console.log('数据库表初始化成功');
} catch (error) {
console.error('数据库表初始化失败:', error);
throw error;
}
}
async ensureRoomIdColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'room_id'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range');
await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len');
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN room_id TYPE varchar(50)
USING room_id::text`
);
await this.pool.query(
'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)'
);
}
async ensureIpColumnVarchar() {
const res = await this.pool.query(
`
SELECT format_type(a.atttypid, a.atttypmod) AS type
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'heartbeat'
AND c.relname = 'heartbeat_events'
AND a.attname = 'ip'
AND a.attnum > 0
AND NOT a.attisdropped
`
);
const type = String(res?.rows?.[0]?.type ?? '').toLowerCase();
if (!type) return;
if (type.startsWith('character varying')) return;
if (!type.startsWith('inet')) return;
await this.pool.query(
`ALTER TABLE heartbeat.heartbeat_events
ALTER COLUMN ip TYPE varchar(21)
USING ip::text`
);
}
escapeIdentifier(id) {
return `"${String(id).replace(/"/g, '""')}"`;
}
getPartitionConfig() {
const cfg = this.config.partitionMaintenance ?? {};
return {
enabled: cfg.enabled !== false,
futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30,
intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6,
};
}
getPartitionFutureDays() {
return this.getPartitionConfig().futureDays;
}
async ensurePartitionsForRange({ startDayOffset, endDayOffset }) {
const startOffset = Number(startDayOffset ?? 0);
const endOffset = Number(endDayOffset ?? 0);
await this.pool.query(
"SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)",
[startOffset, endOffset]
);
}
startPartitionMaintenance() {
const cfg = this.getPartitionConfig();
if (!cfg.enabled) {
return;
}
if (this.partitionMaintenanceTimer) {
return;
}
const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000);
this.partitionMaintenanceTimer = setInterval(async () => {
try {
await this.ensurePartitionsForRange({
startDayOffset: -1,
endDayOffset: this.getPartitionFutureDays(),
});
console.log('[db] 分区预创建维护完成');
} catch (err) {
console.error('[db] 分区预创建维护失败:', err);
}
}, intervalMs);
}
stopPartitionMaintenance() {
if (this.partitionMaintenanceTimer) {
clearInterval(this.partitionMaintenanceTimer);
this.partitionMaintenanceTimer = null;
}
}
formatShanghaiDate(tsMs) {
const date = new Date(Number(tsMs));
const fmt = new Intl.DateTimeFormat('en-CA', {
@@ -555,10 +187,13 @@ class DatabaseManager {
return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t');
};
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
const tsMin = tsValues.length > 0 ? Math.min(...tsValues) : null;
const tsMax = tsValues.length > 0 ? Math.max(...tsValues) : null;
const runInsertOnce = async () => {
const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n));
if (tsValues.length > 0) {
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
if (tsMin !== null) {
await this.ensurePartitionsForTsRange(tsMin, tsMax);
}
const client = await this.pool.connect();
@@ -596,10 +231,9 @@ class DatabaseManager {
lastError = error;
if (this.isMissingPartitionError(error)) {
console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次');
await this.ensurePartitionsForRange({
startDayOffset: -7,
endDayOffset: this.getPartitionFutureDays(),
});
if (tsMin !== null) {
await this.ensurePartitionsForTsRange(tsMin, tsMax);
}
}
if (attempt < maxAttempts) {
await new Promise((r) => setTimeout(r, retryDelay));