From 3b98c6239bcd2d7b00e1fa9eb15fd17adf3c09e2 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Wed, 4 Mar 2026 11:47:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=A7=BB=E9=99=A4=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=E4=BB=A3=E7=A0=81=E4=B8=AD=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=88=9D=E5=A7=8B=E5=8C=96=E4=B8=8E=E5=88=86=E5=8C=BA?= =?UTF-8?q?=E7=BB=B4=E6=8A=A4=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增备份 SQL 脚本 `01_init_schema.sql` 和 `02_create_partitions.sql`,用于数据库结构初始化和分区预创建。 - 新增 Node.js 脚本 `run_init.js` 和 `run_ensure_partitions.js`,支持通过外部程序调用进行数据库初始化和分区维护。 - 确保数据库初始化脚本支持幂等重复执行。 - 更新文档,说明新的执行顺序和使用方法。 - 移除运行时相关的数据库初始化和分区维护配置,简化服务职责。 - 保留写入失败时的缺分区兜底逻辑,确保服务稳定性。 --- .env.example | 5 +- SQL_Script/01_init_schema.sql | 297 ++++++++++++++ SQL_Script/02_create_partitions.sql | 15 + SQL_Script/README.md | 97 +++++ SQL_Script/run_ensure_partitions.js | 138 +++++++ SQL_Script/run_init.js | 112 +++++ .../remove-runtime-db-bootstrap/proposal.md | 28 ++ .../specs/db/spec.md | 29 ++ .../remove-runtime-db-bootstrap/tasks.md | 6 + src/config/config.example.js | 11 - src/db/databaseManager.js | 384 +----------------- 11 files changed, 732 insertions(+), 390 deletions(-) create mode 100644 SQL_Script/01_init_schema.sql create mode 100644 SQL_Script/02_create_partitions.sql create mode 100644 SQL_Script/README.md create mode 100644 SQL_Script/run_ensure_partitions.js create mode 100644 SQL_Script/run_init.js create mode 100644 openspec/changes/remove-runtime-db-bootstrap/proposal.md create mode 100644 openspec/changes/remove-runtime-db-bootstrap/specs/db/spec.md create mode 100644 openspec/changes/remove-runtime-db-bootstrap/tasks.md diff --git a/.env.example b/.env.example index 89d1f3d..442bb71 100644 --- a/.env.example +++ b/.env.example @@ -69,7 +69,4 @@ PROCESSOR_BATCH_TIMEOUT=5000 # 日志配置 LOG_LEVEL=info -LOG_FORMAT=json - - -DB_INIT_AND_PARTITION_ENABLED=false \ No newline at end of file +LOG_FORMAT=json \ No newline at end of file diff --git a/SQL_Script/01_init_schema.sql b/SQL_Script/01_init_schema.sql new file mode 100644 index 0000000..09c4f3e --- /dev/null +++ b/SQL_Script/01_init_schema.sql @@ -0,0 +1,297 @@ +-- ============================================================= +-- 01_init_schema.sql +-- 目标数据库:log_platform(或你实际使用的库) +-- 执行方式: +-- psql -h -p -U -d -f 01_init_schema.sql +-- 或通过 run_init.js 自动执行 +-- 幂等安全:全部使用 IF NOT EXISTS / OR REPLACE,可重复执行 +-- ============================================================= + +BEGIN; + +-- ---------------------------------------------------------- +-- 1. 扩展 & Schema +-- ---------------------------------------------------------- +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE SCHEMA IF NOT EXISTS heartbeat; + +-- ---------------------------------------------------------- +-- 2. 主表:heartbeat.heartbeat_events(RANGE 分区,按 ts_ms) +-- ---------------------------------------------------------- +CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( + guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), + ts_ms bigint NOT NULL, + write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint, + hotel_id int2 NOT NULL, + room_id varchar(50) NOT NULL, + device_id varchar(64) NOT NULL, + ip varchar(21) NOT NULL, + power_state int2 NOT NULL, + guest_type int2 NOT NULL, + cardless_state int2 NOT NULL, + service_mask bigint NOT NULL, + pms_state int2 NOT NULL, + carbon_state int2 NOT NULL, + device_count int2 NOT NULL, + comm_seq int4 NOT NULL, + insert_card int2, + bright_g int2, + version int4, + elec_address text[], + air_address text[], + voltage double precision[], + ampere double precision[], + power double precision[], + phase text[], + energy double precision[], + sum_energy double precision[], + state int2[], + model int2[], + speed int2[], + set_temp int2[], + now_temp int2[], + solenoid_valve int2[], + extra jsonb, + + CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid), + CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'), + CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), + CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), + CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50), + CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), + CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), + CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), + CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), + CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), + CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), + CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0) +) +PARTITION BY RANGE (ts_ms); + +-- 补列(迁移安全,已有列时静默跳过) +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint; +ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS insert_card int2; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS bright_g int2; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS version int4; + +-- ip / room_id 类型迁移(inet -> varchar,仅在旧类型存在时执行) +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events' + AND a.attname = 'ip' AND format_type(a.atttypid, a.atttypmod) = 'inet' + ) THEN + ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN ip TYPE varchar(21) USING ip::text; + END IF; + + IF EXISTS ( + SELECT 1 FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' AND c.relname = 'heartbeat_events' + AND a.attname = 'room_id' + AND format_type(a.atttypid, a.atttypmod) NOT LIKE 'character varying%' + ) THEN + ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range; + ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len; + ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN room_id TYPE varchar(50) USING room_id::text; + ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len + CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50); + END IF; +END $$; + +-- ---------------------------------------------------------- +-- 3. 父表索引(自动继承到每个子分区) +-- ---------------------------------------------------------- +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id + ON heartbeat.heartbeat_events (hotel_id); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state + ON heartbeat.heartbeat_events (power_state); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type + ON heartbeat.heartbeat_events (guest_type); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id + ON heartbeat.heartbeat_events (device_id); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin + ON heartbeat.heartbeat_events USING BRIN (service_mask); + +CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit + ON heartbeat.heartbeat_events ((service_mask & 1)); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts + ON heartbeat.heartbeat_events (hotel_id, ts_ms); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin + ON heartbeat.heartbeat_events USING GIN (elec_address); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin + ON heartbeat.heartbeat_events USING GIN (air_address); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin + ON heartbeat.heartbeat_events USING GIN (state); + +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin + ON heartbeat.heartbeat_events USING GIN (model); + +-- ---------------------------------------------------------- +-- 4. 分区辅助函数 +-- ---------------------------------------------------------- + +-- 将 date 转换为上海时区当日 00:00 的 epoch ms +CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) +RETURNS bigint +LANGUAGE sql IMMUTABLE +AS $$ + SELECT ( + EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) * 1000 + )::bigint; +$$; + +-- 生成分区表名,如 heartbeat_events_20260304 +CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) +RETURNS text +LANGUAGE sql IMMUTABLE +AS $$ + SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); +$$; + +-- 强制将分区及其全部索引、TOAST 表和 TOAST 索引迁移到指定表空间(幂等) +CREATE OR REPLACE FUNCTION heartbeat.relocate_partition_to_tablespace( + p_schema text, + p_partition text, + p_tablespace text DEFAULT 'ts_hot' +) +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + v_part_oid oid; + v_toast_oid oid; + r record; +BEGIN + SELECT c.oid INTO v_part_oid + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = p_schema + AND c.relname = p_partition + AND c.relkind = 'r'; + + IF v_part_oid IS NULL THEN + RAISE EXCEPTION 'partition %.% not found', p_schema, p_partition; + END IF; + + -- 1) 分区表 -> 指定 tablespace + EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', p_schema, p_partition, p_tablespace); + + -- 2) 分区全部索引 -> 指定 tablespace + FOR r IN + SELECT idxn.nspname AS index_schema, i.relname AS index_name + FROM pg_index x + JOIN pg_class i ON i.oid = x.indexrelid + JOIN pg_namespace idxn ON idxn.oid = i.relnamespace + LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace + WHERE x.indrelid = v_part_oid + AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace + LOOP + EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace); + END LOOP; + + -- 3) TOAST 表 + TOAST 全部索引 -> 指定 tablespace(若存在) + SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid = v_part_oid; + IF v_toast_oid IS NOT NULL AND v_toast_oid <> 0 THEN + EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, p_tablespace); + + FOR r IN + SELECT idxn.nspname AS index_schema, i.relname AS index_name + FROM pg_index x + JOIN pg_class i ON i.oid = x.indexrelid + JOIN pg_namespace idxn ON idxn.oid = i.relnamespace + LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace + WHERE x.indrelid = v_toast_oid + AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace + LOOP + EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace); + END LOOP; + END IF; + + -- 4) 更新统计信息 + EXECUTE format('ANALYZE %I.%I', p_schema, p_partition); +END; +$$; + +-- 创建单日分区(幂等)并将其置于 ts_hot 表空间 +CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + start_ms bigint; + end_ms bigint; + part_name text; +BEGIN + start_ms := heartbeat.day_start_ms_shanghai(p_day); + end_ms := start_ms + 86400000; + part_name := heartbeat.partition_name_for_day(p_day); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', + part_name, start_ms, end_ms + ); + + PERFORM heartbeat.relocate_partition_to_tablespace('heartbeat', part_name, 'ts_hot'); +END; +$$; + +-- 确保 [p_start_day, p_end_day] 范围内每天的分区都存在(含首尾,幂等) +CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + d date; +BEGIN + IF p_end_day < p_start_day THEN + RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; + END IF; + + FOR d IN + SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date + LOOP + PERFORM heartbeat.create_daily_partition(d); + END LOOP; +END; +$$; + +-- ---------------------------------------------------------- +-- 5. 清理旧兼容(DEFAULT 分区) +-- ---------------------------------------------------------- +DO $$ +BEGIN + IF to_regclass('heartbeat.heartbeat_events_default') IS NOT NULL THEN + EXECUTE 'DROP TABLE heartbeat.heartbeat_events_default'; + END IF; +END $$; + +COMMIT; diff --git a/SQL_Script/02_create_partitions.sql b/SQL_Script/02_create_partitions.sql new file mode 100644 index 0000000..82a2360 --- /dev/null +++ b/SQL_Script/02_create_partitions.sql @@ -0,0 +1,15 @@ +-- ============================================================= +-- 02_create_partitions.sql +-- 目标:预创建分区(昨天 + 未来 N 天) +-- 前提:已执行 01_init_schema.sql(函数 ensure_partitions 已存在) +-- 执行方式: +-- psql -h -p -U -d -f 02_create_partitions.sql +-- 或通过 run_ensure_partitions.js 自动执行(支持自定义范围参数) +-- ============================================================= + +-- 默认预创建:昨天 到 未来 30 天(按上海时区自然日) +-- 调整 -1 / 30 可控制范围,负数=过去天数,正数=未来天数 +SELECT heartbeat.ensure_partitions( + ((now() AT TIME ZONE 'Asia/Shanghai')::date) - 1, + ((now() AT TIME ZONE 'Asia/Shanghai')::date) + 30 +); diff --git a/SQL_Script/README.md b/SQL_Script/README.md new file mode 100644 index 0000000..7c2dfd3 --- /dev/null +++ b/SQL_Script/README.md @@ -0,0 +1,97 @@ +# SQL_Script + +数据库初始化与分区维护脚本。与主服务完全独立,可被任何工具(psql / Node.js / cron / CI)调用。 + +## 文件说明 + +| 文件 | 说明 | +|------|------| +| `01_init_schema.sql` | 建库:Extension + Schema + 主表 + 索引 + 分区辅助函数(幂等,可重复执行) | +| `02_create_partitions.sql` | 预创建分区(昨天 ~ 未来 30 天),直接用 psql 执行 | +| `run_init.js` | Node.js 脚本:执行 `01_init_schema.sql` | +| `run_ensure_partitions.js` | Node.js 脚本:调用 `ensure_partitions()` 预创建指定日期范围分区 | + +--- + +## 执行顺序(首次部署) + +```bash +# 第一步:初始化 Schema(建表 + 函数) +node SQL_Script/run_init.js + +# 第二步:预创建分区(默认昨天 ~ 未来 30 天) +node SQL_Script/run_ensure_partitions.js +``` + +之后每天由外部 cron / 调度任务调用 `run_ensure_partitions.js` 即可。 + +--- + +## run_init.js 用法 + +```bash +# 最简方式(从 .env 读取连接参数) +node SQL_Script/run_init.js + +# 指定连接参数 +node SQL_Script/run_init.js \ + --host=10.8.8.109 \ + --port=5433 \ + --user=log_admin \ + --password=yourpassword \ + --database=log_platform +``` + +--- + +## run_ensure_partitions.js 用法 + +```bash +# 默认:昨天 到 未来 30 天 +node SQL_Script/run_ensure_partitions.js + +# 按 offset(相对今天的天数偏移) +node SQL_Script/run_ensure_partitions.js --start-offset=-1 --end-offset=60 + +# 按具体日期 +node SQL_Script/run_ensure_partitions.js --start-date=2026-03-01 --end-date=2026-06-30 +``` + +--- + +## psql 直接执行 + +```bash +# Schema 初始化 +psql -h 10.8.8.109 -p 5433 -U log_admin -d log_platform -f SQL_Script/01_init_schema.sql + +# 预创建分区(默认昨天 ~ 未来 30 天) +psql -h 10.8.8.109 -p 5433 -U log_admin -d log_platform -f SQL_Script/02_create_partitions.sql +``` + +--- + +## 连接参数优先级 + +``` +命令行参数 > 环境变量 > 根目录 .env 文件 > 默认值 +``` + +支持的环境变量: + +| 环境变量 | 说明 | +|----------|------| +| `POSTGRES_HOST` | 主机,默认 `127.0.0.1` | +| `POSTGRES_PORT` | 端口,默认 `5432` | +| `POSTGRES_USER` | 用户名,默认 `postgres` | +| `POSTGRES_PASSWORD` | 密码 | +| `POSTGRES_DATABASE` | 数据库名,默认 `log_platform` | + +--- + +## 定时分区维护(cron 示例) + +```cron +# 每天凌晨 01:00 自动创建未来 30 天分区 +0 1 * * * cd /path/to/Web_BLS_Heartbeat_Server && node SQL_Script/run_ensure_partitions.js >> /var/log/partition_cron.log 2>&1 +``` diff --git a/SQL_Script/run_ensure_partitions.js b/SQL_Script/run_ensure_partitions.js new file mode 100644 index 0000000..364c34e --- /dev/null +++ b/SQL_Script/run_ensure_partitions.js @@ -0,0 +1,138 @@ +#!/usr/bin/env node +/** + * run_ensure_partitions.js + * 调用 heartbeat.ensure_partitions() 预创建指定日期范围的分区 + * + * 用法: + * # 默认:昨天 到 未来 30 天 + * node SQL_Script/run_ensure_partitions.js + * + * # 按 offset(相对今天的天数偏移) + * node SQL_Script/run_ensure_partitions.js --start-offset=-1 --end-offset=60 + * + * # 按具体日期(YYYY-MM-DD) + * node SQL_Script/run_ensure_partitions.js --start-date=2026-03-01 --end-date=2026-06-30 + * + * 连接参数优先级:命令行参数 > 环境变量 > .env 文件 + * + * 命令行参数(均可选): + * --host= + * --port= + * --user= + * --password= + * --database= + * --start-offset= 相对今天的起始天数偏移,负数=过去,默认 -1 + * --end-offset= 相对今天的结束天数偏移,正数=未来,默认 30 + * --start-date= 直接指定起始日期(优先于 start-offset) + * --end-date= 直接指定结束日期(优先于 end-offset) + */ + +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import { Client } from 'pg'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +// ---------------------------------------------------------- +// 1. 加载 .env +// ---------------------------------------------------------- +function loadEnv() { + const candidates = [ + path.resolve(process.cwd(), '.env'), + path.resolve(__dirname, '../.env'), + ]; + for (const envPath of candidates) { + if (!fs.existsSync(envPath)) continue; + const lines = fs.readFileSync(envPath, 'utf8').split(/\r?\n/); + for (const line of lines) { + const t = line.trim(); + if (!t || t.startsWith('#')) continue; + const idx = t.indexOf('='); + if (idx <= 0) continue; + const key = t.slice(0, idx).trim(); + let val = t.slice(idx + 1).trim(); + if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) { + val = val.slice(1, -1); + } + if (process.env[key] === undefined) process.env[key] = val; + } + console.log(`[env] 已加载 ${envPath}`); + return; + } +} + +// ---------------------------------------------------------- +// 2. 解析命令行参数 +// ---------------------------------------------------------- +function parseArgs() { + const args = {}; + for (const arg of process.argv.slice(2)) { + const m = arg.match(/^--([^=]+)=(.*)$/); + if (m) args[m[1]] = m[2]; + } + return args; +} + +// 将 offset 天数换算为上海时区 YYYY-MM-DD 日期字符串 +function shanghaiDateWithOffset(offsetDays) { + const now = new Date(); + const shanghaiStr = now.toLocaleDateString('en-CA', { timeZone: 'Asia/Shanghai' }); // YYYY-MM-DD + const base = new Date(`${shanghaiStr}T00:00:00+08:00`); + base.setDate(base.getDate() + offsetDays); + return base.toLocaleDateString('en-CA', { timeZone: 'Asia/Shanghai' }); +} + +// ---------------------------------------------------------- +// 3. 主流程 +// ---------------------------------------------------------- +async function main() { + loadEnv(); + const args = parseArgs(); + const env = process.env; + + const config = { + host: args.host ?? env.POSTGRES_HOST ?? env.PGHOST ?? '127.0.0.1', + port: Number(args.port ?? env.POSTGRES_PORT ?? env.PGPORT ?? 5432), + user: args.user ?? env.POSTGRES_USER ?? env.PGUSER ?? 'postgres', + password: args.password ?? env.POSTGRES_PASSWORD ?? env.PGPASSWORD ?? '', + database: args.database ?? env.POSTGRES_DATABASE ?? env.PGTARGETDB ?? 'log_platform', + }; + + // 计算分区范围 + const startDate = args['start-date'] + ? args['start-date'] + : shanghaiDateWithOffset(Number(args['start-offset'] ?? -1)); + + const endDate = args['end-date'] + ? args['end-date'] + : shanghaiDateWithOffset(Number(args['end-offset'] ?? 30)); + + console.log('[partition] 连接数据库:', { + host: config.host, + port: config.port, + user: config.user, + database: config.database, + }); + console.log(`[partition] 预创建分区范围: ${startDate} ~ ${endDate}`); + + const client = new Client(config); + + try { + await client.connect(); + console.log('[partition] 数据库连接成功,开始预创建分区...'); + await client.query( + 'SELECT heartbeat.ensure_partitions($1::date, $2::date)', + [startDate, endDate] + ); + console.log(`[partition] ✅ 分区预创建完成 (${startDate} ~ ${endDate})`); + } catch (err) { + console.error('[partition] ❌ 分区预创建失败:', err.message); + process.exit(1); + } finally { + await client.end(); + } +} + +main(); diff --git a/SQL_Script/run_init.js b/SQL_Script/run_init.js new file mode 100644 index 0000000..fa2f725 --- /dev/null +++ b/SQL_Script/run_init.js @@ -0,0 +1,112 @@ +#!/usr/bin/env node +/** + * run_init.js + * 执行数据库 Schema 初始化(01_init_schema.sql) + * + * 用法: + * node SQL_Script/run_init.js + * + * 连接参数优先级:命令行参数 > 环境变量 > .env 文件 + * + * 命令行参数(均可选): + * --host= PostgreSQL 主机,默认 POSTGRES_HOST 或 127.0.0.1 + * --port= 端口,默认 POSTGRES_PORT 或 5432 + * --user= 用户,默认 POSTGRES_USER 或 postgres + * --password= 密码,默认 POSTGRES_PASSWORD + * --database= 数据库名,默认 POSTGRES_DATABASE 或 log_platform + */ + +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import { Client } from 'pg'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +// ---------------------------------------------------------- +// 1. 加载 .env +// ---------------------------------------------------------- +function loadEnv() { + const candidates = [ + path.resolve(process.cwd(), '.env'), + path.resolve(__dirname, '../.env'), + ]; + for (const envPath of candidates) { + if (!fs.existsSync(envPath)) continue; + const lines = fs.readFileSync(envPath, 'utf8').split(/\r?\n/); + for (const line of lines) { + const t = line.trim(); + if (!t || t.startsWith('#')) continue; + const idx = t.indexOf('='); + if (idx <= 0) continue; + const key = t.slice(0, idx).trim(); + let val = t.slice(idx + 1).trim(); + if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) { + val = val.slice(1, -1); + } + if (process.env[key] === undefined) process.env[key] = val; + } + console.log(`[env] 已加载 ${envPath}`); + return; + } +} + +// ---------------------------------------------------------- +// 2. 解析命令行参数 +// ---------------------------------------------------------- +function parseArgs() { + const args = {}; + for (const arg of process.argv.slice(2)) { + const m = arg.match(/^--([^=]+)=(.*)$/); + if (m) args[m[1]] = m[2]; + } + return args; +} + +// ---------------------------------------------------------- +// 3. 主流程 +// ---------------------------------------------------------- +async function main() { + loadEnv(); + const args = parseArgs(); + const env = process.env; + + const config = { + host: args.host ?? env.POSTGRES_HOST ?? env.PGHOST ?? '127.0.0.1', + port: Number(args.port ?? env.POSTGRES_PORT ?? env.PGPORT ?? 5432), + user: args.user ?? env.POSTGRES_USER ?? env.PGUSER ?? 'postgres', + password: args.password ?? env.POSTGRES_PASSWORD ?? env.PGPASSWORD ?? '', + database: args.database ?? env.POSTGRES_DATABASE ?? env.PGTARGETDB ?? 'log_platform', + }; + + console.log('[init] 连接数据库:', { + host: config.host, + port: config.port, + user: config.user, + database: config.database, + }); + + const sqlPath = path.resolve(__dirname, '01_init_schema.sql'); + if (!fs.existsSync(sqlPath)) { + console.error(`[init] 找不到 SQL 文件: ${sqlPath}`); + process.exit(1); + } + + const sql = fs.readFileSync(sqlPath, 'utf8'); + const client = new Client(config); + + try { + await client.connect(); + console.log('[init] 数据库连接成功,开始执行初始化 SQL...'); + await client.query(sql); + console.log('[init] ✅ Schema 初始化完成'); + } catch (err) { + console.error('[init] ❌ 初始化失败:', err.message); + process.exit(1); + } finally { + await client.end(); + } +} + +main(); diff --git a/openspec/changes/remove-runtime-db-bootstrap/proposal.md b/openspec/changes/remove-runtime-db-bootstrap/proposal.md new file mode 100644 index 0000000..9441487 --- /dev/null +++ b/openspec/changes/remove-runtime-db-bootstrap/proposal.md @@ -0,0 +1,28 @@ +# Change: Remove runtime DB bootstrap from service + +## Why +当前服务进程同时承担了“业务消费写入”和“数据库建库/分区维护”两类职责,导致部署耦合、运维边界不清晰,且难以被其他程序复用。 + +本次变更将建库与分区维护彻底外部化,服务进程只保留 Kafka → 处理 → 写库主链路。 + +## What Changes +- 从运行时代码中移除启动建表、建函数、建索引、定时分区维护逻辑。 +- 移除运行时相关开关配置(`DB_INIT_AND_PARTITION_ENABLED`、`DB_PARTITION_*`)。 +- 在仓库根目录新增 `SQL_Script/`,提供可由外部程序调用的 SQL/JS 脚本: + - `01_init_schema.sql` + - `02_create_partitions.sql` + - `run_init.js` + - `run_ensure_partitions.js` +- 保留写入失败时“缺分区”兜底:按批次时间范围调用 `heartbeat.ensure_partitions` 后重试(前提是外部初始化已部署函数)。 + +## Impact +- Affected specs: `db` +- Affected code: + - `src/db/databaseManager.js` + - `src/config/config.js` + - `src/config/config.example.js` + - `.env` + - `.env.example` + - `SQL_Script/*` +- Operational impact: + - **BREAKING (运维流程层面)**:首次部署或升级后,需先执行 `SQL_Script` 初始化脚本,再启动服务。 \ No newline at end of file diff --git a/openspec/changes/remove-runtime-db-bootstrap/specs/db/spec.md b/openspec/changes/remove-runtime-db-bootstrap/specs/db/spec.md new file mode 100644 index 0000000..9b8b651 --- /dev/null +++ b/openspec/changes/remove-runtime-db-bootstrap/specs/db/spec.md @@ -0,0 +1,29 @@ +## ADDED Requirements + +### Requirement: 服务运行时不得承担建库与定时分区维护 +系统 MUST 仅负责数据库连接与业务写入,不在服务启动时执行数据库结构初始化,也不在进程内执行定时分区维护。 + +#### Scenario: 服务启动仅建立连接 +- **WHEN** 服务进程启动并连接 PostgreSQL +- **THEN** 系统应只创建连接池并完成连通性检测 +- **AND** 不应在运行时执行表/索引/函数等 DDL 初始化 +- **AND** 不应在进程内启动分区维护定时任务 + +#### Scenario: 写入遇到缺分区时的运行时兜底 +- **WHEN** 批量写入 `heartbeat.heartbeat_events` 遇到缺分区错误 +- **THEN** 系统应基于该批次的时间范围调用 `heartbeat.ensure_partitions` +- **AND** 调用后应重试当前批次写入 +- **AND** 系统不应在运行时创建或替换数据库 schema 对象 + +### Requirement: 建库与分区维护能力必须以外部脚本提供 +系统 MUST 在仓库根目录 `SQL_Script/` 提供可被外部程序调用的建库/分区维护脚本。 + +#### Scenario: 提供初始化脚本 +- **WHEN** 运维或其他程序需要初始化数据库结构 +- **THEN** 应可使用 `SQL_Script/01_init_schema.sql` 或 `SQL_Script/run_init.js` +- **AND** 脚本应支持幂等重复执行 + +#### Scenario: 提供分区预创建脚本 +- **WHEN** 运维或外部调度需要预创建分区 +- **THEN** 应可使用 `SQL_Script/02_create_partitions.sql` 或 `SQL_Script/run_ensure_partitions.js` +- **AND** 无需启动主服务进程即可执行分区维护 \ No newline at end of file diff --git a/openspec/changes/remove-runtime-db-bootstrap/tasks.md b/openspec/changes/remove-runtime-db-bootstrap/tasks.md new file mode 100644 index 0000000..8556f17 --- /dev/null +++ b/openspec/changes/remove-runtime-db-bootstrap/tasks.md @@ -0,0 +1,6 @@ +## 1. Implementation +- [x] 1.1 新增根目录 `SQL_Script/`,提供建库与分区预创建 SQL/JS 脚本。 +- [x] 1.2 删除运行时服务中的建表、建函数、建索引与定时分区维护逻辑。 +- [x] 1.3 删除运行时配置中的初始化/分区维护开关字段。 +- [x] 1.4 保留缺分区写入兜底:基于批次时间范围调用 `heartbeat.ensure_partitions` 后重试。 +- [x] 1.5 执行构建验证,确保主服务可正常打包。 \ No newline at end of file diff --git a/src/config/config.example.js b/src/config/config.example.js index 2864268..40df9a4 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -53,17 +53,6 @@ export default { retryAttempts: 3, // 重试次数 retryDelay: 1000, // 重试延迟 - // 是否启用数据库初始化与分区维护(若为 false,跳过建表、分区预创建、定时分区检查) - initAndPartitionEnabled: (env.DB_INIT_AND_PARTITION_ENABLED ?? 'true') === 'true', - - // 分区维护(方案1):启动时预创建 + 周期维护 - partitionMaintenance: { - enabled: true, - futureDays: 30, - intervalHours: 6 - } - }, - // 日志配置 logger: { level: env.LOG_LEVEL ?? 'info', diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index d58980e..ab64ba8 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -8,8 +8,6 @@ class DatabaseManager { constructor(config) { this.config = config; this.pool = null; - - this.partitionMaintenanceTimer = null; } async connect() { @@ -36,21 +34,6 @@ class DatabaseManager { const client = await this.pool.connect(); client.release(); console.log('数据库连接池创建成功'); - - // 根据配置决定是否执行初始化与分区维护 - if (this.config.initAndPartitionEnabled !== false) { - // 初始化表结构 - await this.initTables(); - - // 分区维护(方案1):启动时预创建 + 定时维护 - await this.ensurePartitionsForRange({ - startDayOffset: -1, - endDayOffset: this.getPartitionFutureDays(), - }); - this.startPartitionMaintenance(); - } else { - console.log('[db] 已禁用数据库初始化与分区维护(DB_INIT_AND_PARTITION_ENABLED=false),跳过建表、分区预创建、定时维护'); - } } catch (error) { console.error('数据库连接失败:', error); throw error; @@ -73,7 +56,6 @@ class DatabaseManager { async disconnect() { try { - this.stopPartitionMaintenance(); if (this.pool) { await this.pool.end(); console.log('数据库连接池已关闭'); @@ -84,360 +66,10 @@ class DatabaseManager { } } - async initTables() { - try { - // v2:高吞吐按天分区表(位于 heartbeat schema) - const v2SchemaQuery = ` - BEGIN; - - CREATE EXTENSION IF NOT EXISTS pgcrypto; - - CREATE SCHEMA IF NOT EXISTS heartbeat; - - CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( - guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), - - ts_ms bigint NOT NULL, - write_ts_ms bigint DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint, - hotel_id int2 NOT NULL, - room_id varchar(50) NOT NULL, - device_id varchar(64) NOT NULL, - ip varchar(21) NOT NULL, - power_state int2 NOT NULL, - guest_type int2 NOT NULL, - cardless_state int2 NOT NULL, - service_mask bigint NOT NULL, - pms_state int2 NOT NULL, - carbon_state int2 NOT NULL, - device_count int2 NOT NULL, - comm_seq int4 NOT NULL, - - insert_card int2, - bright_g int2, - - elec_address text[], - air_address text[], - voltage double precision[], - ampere double precision[], - power double precision[], - phase text[], - energy double precision[], - sum_energy double precision[], - state int2[], - model int2[], - speed int2[], - set_temp int2[], - now_temp int2[], - solenoid_valve int2[], - - extra jsonb, - - CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid), - CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'), - - CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), - CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), - CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50), - CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), - CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), - CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), - CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), - CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), - CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), - CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0) - ) - PARTITION BY RANGE (ts_ms); - - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS write_ts_ms bigint; - ALTER TABLE heartbeat.heartbeat_events ALTER COLUMN write_ts_ms SET DEFAULT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; - - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[]; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS insert_card int2; - ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS bright_g int2; - - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); - - CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit - ON heartbeat.heartbeat_events ((service_mask & 1)); - - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state); - CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model); - - -- 分区预创建函数(按 Asia/Shanghai 自然日) - CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) - RETURNS bigint - LANGUAGE sql - IMMUTABLE - AS $$ - SELECT ( - EXTRACT(EPOCH FROM (p_day::timestamp AT TIME ZONE 'Asia/Shanghai')) - * 1000 - )::bigint; - $$; - - CREATE OR REPLACE FUNCTION heartbeat.partition_name_for_day(p_day date) - RETURNS text - LANGUAGE sql - IMMUTABLE - AS $$ - SELECT format('heartbeat_events_%s', to_char(p_day, 'YYYYMMDD')); - $$; - - -- 强制将分区及其索引分配到指定表空间(幂等) - CREATE OR REPLACE FUNCTION heartbeat.relocate_partition_to_tablespace( - p_schema text, - p_partition text, - p_tablespace text DEFAULT 'ts_hot' - ) - RETURNS void - LANGUAGE plpgsql - AS $$ - DECLARE - v_part_oid oid; - v_toast_oid oid; - r record; - BEGIN - SELECT c.oid INTO v_part_oid - FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = p_schema - AND c.relname = p_partition - AND c.relkind = 'r'; - - IF v_part_oid IS NULL THEN - RAISE EXCEPTION 'partition %.% not found', p_schema, p_partition; - END IF; - - -- 1) 分区表对象 -> 指定 tablespace - EXECUTE format('ALTER TABLE %I.%I SET TABLESPACE %I', p_schema, p_partition, p_tablespace); - - -- 2) 分区全部索引 -> 指定 tablespace - FOR r IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class i ON i.oid = x.indexrelid - JOIN pg_namespace idxn ON idxn.oid = i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace - WHERE x.indrelid = v_part_oid - AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace - LOOP - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace); - END LOOP; - - -- 3) TOAST 表 + TOAST 索引 -> 指定 tablespace(若存在) - SELECT reltoastrelid INTO v_toast_oid FROM pg_class WHERE oid = v_part_oid; - IF v_toast_oid IS NOT NULL AND v_toast_oid <> 0 THEN - EXECUTE format('ALTER TABLE %s SET TABLESPACE %I', v_toast_oid::regclass, p_tablespace); - - FOR r IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class i ON i.oid = x.indexrelid - JOIN pg_namespace idxn ON idxn.oid = i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace - WHERE x.indrelid = v_toast_oid - AND COALESCE(ts.spcname, 'pg_default') <> p_tablespace - LOOP - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE %I', r.index_schema, r.index_name, p_tablespace); - END LOOP; - END IF; - - -- 4) 统计信息 - EXECUTE format('ANALYZE %I.%I', p_schema, p_partition); - END; - $$; - - -- 创建单日分区(幂等);父表索引自动继承到子表,无需手动建索引 - CREATE OR REPLACE FUNCTION heartbeat.create_daily_partition(p_day date) - RETURNS void - LANGUAGE plpgsql - AS $$ - DECLARE - start_ms bigint; - end_ms bigint; - part_name text; - BEGIN - start_ms := heartbeat.day_start_ms_shanghai(p_day); - end_ms := start_ms + 86400000; - part_name := heartbeat.partition_name_for_day(p_day); - - EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', - part_name, start_ms, end_ms - ); - - PERFORM heartbeat.relocate_partition_to_tablespace('heartbeat', part_name, 'ts_hot'); - END; - $$; - - CREATE OR REPLACE FUNCTION heartbeat.ensure_partitions(p_start_day date, p_end_day date) - RETURNS void - LANGUAGE plpgsql - AS $$ - DECLARE - d date; - BEGIN - IF p_end_day < p_start_day THEN - RAISE EXCEPTION 'p_end_day (%) must be >= p_start_day (%)', p_end_day, p_start_day; - END IF; - - FOR d IN SELECT generate_series(p_start_day, p_end_day, interval '1 day')::date - LOOP - PERFORM heartbeat.create_daily_partition(d); - END LOOP; - END; - $$; - - COMMIT; - `; - - await this.pool.query(v2SchemaQuery); - await this.ensureIpColumnVarchar(); - await this.ensureRoomIdColumnVarchar(); - console.log('数据库表初始化成功'); - } catch (error) { - console.error('数据库表初始化失败:', error); - throw error; - } - } - - async ensureRoomIdColumnVarchar() { - const res = await this.pool.query( - ` - SELECT format_type(a.atttypid, a.atttypmod) AS type - FROM pg_attribute a - JOIN pg_class c ON c.oid = a.attrelid - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = 'heartbeat' - AND c.relname = 'heartbeat_events' - AND a.attname = 'room_id' - AND a.attnum > 0 - AND NOT a.attisdropped - ` - ); - - const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); - if (!type) return; - if (type.startsWith('character varying')) return; - - await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range'); - await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len'); - - await this.pool.query( - `ALTER TABLE heartbeat.heartbeat_events - ALTER COLUMN room_id TYPE varchar(50) - USING room_id::text` - ); - - await this.pool.query( - 'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)' - ); - } - - async ensureIpColumnVarchar() { - const res = await this.pool.query( - ` - SELECT format_type(a.atttypid, a.atttypmod) AS type - FROM pg_attribute a - JOIN pg_class c ON c.oid = a.attrelid - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = 'heartbeat' - AND c.relname = 'heartbeat_events' - AND a.attname = 'ip' - AND a.attnum > 0 - AND NOT a.attisdropped - ` - ); - - const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); - if (!type) return; - if (type.startsWith('character varying')) return; - if (!type.startsWith('inet')) return; - - await this.pool.query( - `ALTER TABLE heartbeat.heartbeat_events - ALTER COLUMN ip TYPE varchar(21) - USING ip::text` - ); - } - escapeIdentifier(id) { return `"${String(id).replace(/"/g, '""')}"`; } - getPartitionConfig() { - const cfg = this.config.partitionMaintenance ?? {}; - return { - enabled: cfg.enabled !== false, - futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, - intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, - }; - } - - getPartitionFutureDays() { - return this.getPartitionConfig().futureDays; - } - - async ensurePartitionsForRange({ startDayOffset, endDayOffset }) { - const startOffset = Number(startDayOffset ?? 0); - const endOffset = Number(endDayOffset ?? 0); - await this.pool.query( - "SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)", - [startOffset, endOffset] - ); - } - - startPartitionMaintenance() { - const cfg = this.getPartitionConfig(); - if (!cfg.enabled) { - return; - } - - if (this.partitionMaintenanceTimer) { - return; - } - - const intervalMs = Math.max(60_000, cfg.intervalHours * 60 * 60 * 1000); - this.partitionMaintenanceTimer = setInterval(async () => { - try { - await this.ensurePartitionsForRange({ - startDayOffset: -1, - endDayOffset: this.getPartitionFutureDays(), - }); - console.log('[db] 分区预创建维护完成'); - } catch (err) { - console.error('[db] 分区预创建维护失败:', err); - } - }, intervalMs); - } - - stopPartitionMaintenance() { - if (this.partitionMaintenanceTimer) { - clearInterval(this.partitionMaintenanceTimer); - this.partitionMaintenanceTimer = null; - } - } - formatShanghaiDate(tsMs) { const date = new Date(Number(tsMs)); const fmt = new Intl.DateTimeFormat('en-CA', { @@ -555,10 +187,13 @@ class DatabaseManager { return s.replace(/\\/g, '\\\\').replace(/\n/g, '\\n').replace(/\r/g, '\\r').replace(/\t/g, '\\t'); }; + const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); + const tsMin = tsValues.length > 0 ? Math.min(...tsValues) : null; + const tsMax = tsValues.length > 0 ? Math.max(...tsValues) : null; + const runInsertOnce = async () => { - const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); - if (tsValues.length > 0) { - await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); + if (tsMin !== null) { + await this.ensurePartitionsForTsRange(tsMin, tsMax); } const client = await this.pool.connect(); @@ -596,10 +231,9 @@ class DatabaseManager { lastError = error; if (this.isMissingPartitionError(error)) { console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); - await this.ensurePartitionsForRange({ - startDayOffset: -7, - endDayOffset: this.getPartitionFutureDays(), - }); + if (tsMin !== null) { + await this.ensurePartitionsForTsRange(tsMin, tsMax); + } } if (attempt < maxAttempts) { await new Promise((r) => setTimeout(r, retryDelay));