From 5562288315bde3cab7a1268d886a692db748b3e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90?= Date: Tue, 3 Mar 2026 21:28:27 +0800 Subject: [PATCH] revert 2b529baeb3b70aa805597193edba0f77a10c9046 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit revert feat: 更新文档和数据库管理逻辑,添加运行模式说明,修复分区表空间设置 --- README.md | 6 --- docs/db-heartbeat-schema.md | 1 - docs/deployment.md | 6 +-- docs/新分区方法案例.md | 55 ++++++++++++++++++++++ openspec/specs/redis/spec.md | 13 ++--- scripts/db/020_partitioning_auto_daily.sql | 2 +- src/db/databaseManager.js | 38 ++++++--------- 7 files changed, 81 insertions(+), 40 deletions(-) create mode 100644 docs/新分区方法案例.md diff --git a/README.md b/README.md index 94fa9b0..2b8f3af 100644 --- a/README.md +++ b/README.md @@ -19,12 +19,6 @@ BLS心跳接收端,用于接收并处理Kafka队列中的心跳数据,经过 ## 快速开始 - -## 运行模式说明(重要) - -- 本项目是 Kafka 消费服务,**主运行入口是** `src/index.js`,即 `npm run start`。 -- `npm run dev` / `npm run build` / `npm run preview` 仅用于 Vite 相关能力,不会启动心跳消费主链路。 -- 生产环境如使用 PM2,请使用仓库中的 `ecosystem.config.cjs`。 ### 安装依赖 ```bash diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md index 13db381..d5e6c9d 100644 --- a/docs/db-heartbeat-schema.md +++ b/docs/db-heartbeat-schema.md @@ -69,7 +69,6 @@ ## 3. 分区策略与自动分区 - 分区键:`ts_ms` - 粒度:按天(Asia/Shanghai,自然日) -- 新建分区表空间:每日新分区显式创建在 `ts_hot`(非 `pg_default`) - 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用 调用方式: diff --git a/docs/deployment.md b/docs/deployment.md index 62e0ff5..2de5d2c 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -105,7 +105,7 @@ npm run db:apply ```bash # 使用PM2启动服务 -pm2 start ecosystem.config.cjs +pm2 start ecosystem.config.js # 查看服务状态 pm2 status @@ -129,7 +129,7 @@ pm2 startup ## PM2配置说明 -项目提供了 `ecosystem.config.cjs` 配置文件,包含以下配置: +项目提供了 `ecosystem.config.js` 配置文件,包含以下配置: - 应用名称:`web-bls-heartbeat-server` - 运行模式:`cluster`(集群模式) @@ -199,7 +199,7 @@ xcopy /E /I /Y C:\BLS_Heartbeat_Server C:\BLS_Heartbeat_Server_backup npm install --production # 重启服务 -pm2 start ecosystem.config.cjs +pm2 start ecosystem.config.js # 验证服务状态 pm2 status diff --git a/docs/新分区方法案例.md b/docs/新分区方法案例.md new file mode 100644 index 0000000..9b31d1a --- /dev/null +++ b/docs/新分区方法案例.md @@ -0,0 +1,55 @@ +-- 通过 docker compose 在容器内执行 psql,并使用 here-doc 传入 SQL +docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL' + +-- 使用匿名代码块批量处理分区创建与索引迁移 +DO $$ +DECLARE + d date; -- 循环日期(从今天到未来 29 天) + pname text; -- 分区表名,例如 heartbeat_events_20260303 + start_ms bigint; -- 分区起始毫秒时间戳(UTC,含) + end_ms bigint; -- 分区结束毫秒时间戳(UTC,不含) + idx record; -- 遍历分区索引时的游标记录 +BEGIN + -- 生成从 current_date 到 current_date+29 的日期序列(共 30 天) + FOR d IN + SELECT generate_series(current_date, current_date + 29, interval '1 day')::date + LOOP + -- 按约定命名分区名:heartbeat_events_YYYYMMDD + pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD')); + + -- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界 + start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint; + + -- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界 + end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint; + + -- 若分区不存在则创建;存在则跳过(幂等) + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', + pname, start_ms, end_ms + ); + + -- 无论新建或已存在,强制把分区表迁移到 ts_hot(保证热分区落热盘) + EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname); + + -- 遍历该分区的全部索引,筛出不在 ts_hot 的索引 + FOR idx IN + SELECT idxn.nspname AS index_schema, i.relname AS index_name + FROM pg_index x + JOIN pg_class t ON t.oid = x.indrelid + JOIN pg_namespace nt ON nt.oid = t.relnamespace + JOIN pg_class i ON i.oid = x.indexrelid + JOIN pg_namespace idxn ON idxn.oid = i.relnamespace + LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace + WHERE nt.nspname = 'heartbeat' + AND t.relname = pname + AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot' + LOOP + -- 将索引迁移到 ts_hot,确保“分区与索引同盘” + EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name); + END LOOP; + END LOOP; +END $$; + +-- here-doc 结束标记 +SQL \ No newline at end of file diff --git a/openspec/specs/redis/spec.md b/openspec/specs/redis/spec.md index 9955cf4..90b2ea7 100644 --- a/openspec/specs/redis/spec.md +++ b/openspec/specs/redis/spec.md @@ -1,18 +1,19 @@ # Redis 对接规范 ## Purpose -本规范定义本服务按协议向 Redis 写入“项目心跳”(LIST) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。 +本规范定义本服务按协议向 Redis 写入“项目心跳”(STRING) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。 ## Requirements -### Requirement: 项目心跳列表写入 -系统 MUST 按协议周期性向 `项目心跳` LIST 追加心跳记录。 +### Requirement: 心跳 Key 写入 +系统 MUST 按协议周期性写入 Redis STRING 心跳 Key。 -#### Scenario: 定期刷新项目心跳 +#### Scenario: 定期刷新心跳 - **WHEN** 服务运行中 -- **THEN** 系统应每 3 秒(可配置)执行一次 `RPUSH 项目心跳 ` -- **AND** JSON 必须包含 `projectName`、`apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳) +- **THEN** 系统应每 3 秒(可配置)执行一次 `SET ${projectName}_项目心跳 ` +- **AND** JSON 必须包含 `apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳) - **AND** value 使用 UTF-8 编码 +- **AND** 可选设置 TTL(例如 EX 30) ### Requirement: 控制台日志队列写入 系统 MUST 按协议向 Redis LIST 追加控制台日志。 diff --git a/scripts/db/020_partitioning_auto_daily.sql b/scripts/db/020_partitioning_auto_daily.sql index f849aca..2765d9c 100644 --- a/scripts/db/020_partitioning_auto_daily.sql +++ b/scripts/db/020_partitioning_auto_daily.sql @@ -112,7 +112,7 @@ BEGIN part_name := heartbeat.partition_name_for_day(p_day); EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', part_name, start_ms, end_ms ); diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 98db4a5..d02c201 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -27,7 +27,7 @@ class DatabaseManager { }); // 监听连接池错误,防止后端断开导致进程崩溃 - this.pool.on('error', (err) => { + this.pool.on('error', (err, client) => { console.error('[db] 发生未捕获的连接池错误:', err); // 不抛出,让应用层通过心跳检测发现问题 }); @@ -277,7 +277,7 @@ class DatabaseManager { part_name := heartbeat.partition_name_for_day(p_day); EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', part_name, start_ms, end_ms ); @@ -382,21 +382,10 @@ class DatabaseManager { getPartitionConfig() { const cfg = this.config.partitionMaintenance ?? {}; - const futureDaysRaw = Number(cfg.futureDays); - const intervalHoursRaw = Number(cfg.intervalHours); - - const futureDays = Number.isFinite(futureDaysRaw) && futureDaysRaw > 0 - ? Math.floor(futureDaysRaw) - : 30; - - const intervalHours = Number.isFinite(intervalHoursRaw) && intervalHoursRaw > 0 - ? intervalHoursRaw - : 6; - return { enabled: cfg.enabled !== false, - futureDays, - intervalHours, + futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, + intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, }; } @@ -567,21 +556,24 @@ class DatabaseManager { await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); } - const generateRows = async function* () { - for (const e of events) { - const rowValues = toRowValues(e); - const line = rowValues.map(formatPgCol).join('\t') + '\n'; - yield line; - } - }; - const client = await this.pool.connect(); try { const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`; const stream = client.query(copyFrom(copySql)); + // Use a generator to stream rows directly + async function* generateRows() { + for (const e of events) { + const rowValues = toRowValues(e); + const line = rowValues.map(formatPgCol).join('\t') + '\n'; + yield line; + } + } + await pipeline(Readable.from(generateRows()), stream); return { insertedCount: events.length }; + } catch (error) { + throw error; } finally { client.release(); }