From 2b529baeb3b70aa805597193edba0f77a10c9046 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Tue, 3 Mar 2026 20:22:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E5=92=8C=E6=95=B0=E6=8D=AE=E5=BA=93=E7=AE=A1=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=B7=BB=E5=8A=A0=E8=BF=90=E8=A1=8C=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E8=AF=B4=E6=98=8E=EF=BC=8C=E4=BF=AE=E5=A4=8D=E5=88=86?= =?UTF-8?q?=E5=8C=BA=E8=A1=A8=E7=A9=BA=E9=97=B4=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++ docs/db-heartbeat-schema.md | 1 + docs/deployment.md | 6 +-- docs/新分区方法案例.md | 55 ---------------------- openspec/specs/redis/spec.md | 13 +++-- scripts/db/020_partitioning_auto_daily.sql | 2 +- src/db/databaseManager.js | 38 +++++++++------ 7 files changed, 40 insertions(+), 81 deletions(-) delete mode 100644 docs/新分区方法案例.md diff --git a/README.md b/README.md index 2b8f3af..94fa9b0 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,12 @@ BLS心跳接收端,用于接收并处理Kafka队列中的心跳数据,经过 ## 快速开始 + +## 运行模式说明(重要) + +- 本项目是 Kafka 消费服务,**主运行入口是** `src/index.js`,即 `npm run start`。 +- `npm run dev` / `npm run build` / `npm run preview` 仅用于 Vite 相关能力,不会启动心跳消费主链路。 +- 生产环境如使用 PM2,请使用仓库中的 `ecosystem.config.cjs`。 ### 安装依赖 ```bash diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md index d5e6c9d..13db381 100644 --- a/docs/db-heartbeat-schema.md +++ b/docs/db-heartbeat-schema.md @@ -69,6 +69,7 @@ ## 3. 分区策略与自动分区 - 分区键:`ts_ms` - 粒度:按天(Asia/Shanghai,自然日) +- 新建分区表空间:每日新分区显式创建在 `ts_hot`(非 `pg_default`) - 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用 调用方式: diff --git a/docs/deployment.md b/docs/deployment.md index 2de5d2c..62e0ff5 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -105,7 +105,7 @@ npm run db:apply ```bash # 使用PM2启动服务 -pm2 start ecosystem.config.js +pm2 start ecosystem.config.cjs # 查看服务状态 pm2 status @@ -129,7 +129,7 @@ pm2 startup ## PM2配置说明 -项目提供了 `ecosystem.config.js` 配置文件,包含以下配置: +项目提供了 `ecosystem.config.cjs` 配置文件,包含以下配置: - 应用名称:`web-bls-heartbeat-server` - 运行模式:`cluster`(集群模式) @@ -199,7 +199,7 @@ xcopy /E /I /Y C:\BLS_Heartbeat_Server C:\BLS_Heartbeat_Server_backup npm install --production # 重启服务 -pm2 start ecosystem.config.js +pm2 start ecosystem.config.cjs # 验证服务状态 pm2 status diff --git a/docs/新分区方法案例.md b/docs/新分区方法案例.md deleted file mode 100644 index 9b31d1a..0000000 --- a/docs/新分区方法案例.md +++ /dev/null @@ -1,55 +0,0 @@ --- 通过 docker compose 在容器内执行 psql,并使用 here-doc 传入 SQL -docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL' - --- 使用匿名代码块批量处理分区创建与索引迁移 -DO $$ -DECLARE - d date; -- 循环日期(从今天到未来 29 天) - pname text; -- 分区表名,例如 heartbeat_events_20260303 - start_ms bigint; -- 分区起始毫秒时间戳(UTC,含) - end_ms bigint; -- 分区结束毫秒时间戳(UTC,不含) - idx record; -- 遍历分区索引时的游标记录 -BEGIN - -- 生成从 current_date 到 current_date+29 的日期序列(共 30 天) - FOR d IN - SELECT generate_series(current_date, current_date + 29, interval '1 day')::date - LOOP - -- 按约定命名分区名:heartbeat_events_YYYYMMDD - pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD')); - - -- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界 - start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint; - - -- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界 - end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint; - - -- 若分区不存在则创建;存在则跳过(幂等) - EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', - pname, start_ms, end_ms - ); - - -- 无论新建或已存在,强制把分区表迁移到 ts_hot(保证热分区落热盘) - EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname); - - -- 遍历该分区的全部索引,筛出不在 ts_hot 的索引 - FOR idx IN - SELECT idxn.nspname AS index_schema, i.relname AS index_name - FROM pg_index x - JOIN pg_class t ON t.oid = x.indrelid - JOIN pg_namespace nt ON nt.oid = t.relnamespace - JOIN pg_class i ON i.oid = x.indexrelid - JOIN pg_namespace idxn ON idxn.oid = i.relnamespace - LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace - WHERE nt.nspname = 'heartbeat' - AND t.relname = pname - AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot' - LOOP - -- 将索引迁移到 ts_hot,确保“分区与索引同盘” - EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name); - END LOOP; - END LOOP; -END $$; - --- here-doc 结束标记 -SQL \ No newline at end of file diff --git a/openspec/specs/redis/spec.md b/openspec/specs/redis/spec.md index 90b2ea7..9955cf4 100644 --- a/openspec/specs/redis/spec.md +++ b/openspec/specs/redis/spec.md @@ -1,19 +1,18 @@ # Redis 对接规范 ## Purpose -本规范定义本服务按协议向 Redis 写入“项目心跳”(STRING) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。 +本规范定义本服务按协议向 Redis 写入“项目心跳”(LIST) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。 ## Requirements -### Requirement: 心跳 Key 写入 -系统 MUST 按协议周期性写入 Redis STRING 心跳 Key。 +### Requirement: 项目心跳列表写入 +系统 MUST 按协议周期性向 `项目心跳` LIST 追加心跳记录。 -#### Scenario: 定期刷新心跳 +#### Scenario: 定期刷新项目心跳 - **WHEN** 服务运行中 -- **THEN** 系统应每 3 秒(可配置)执行一次 `SET ${projectName}_项目心跳 ` -- **AND** JSON 必须包含 `apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳) +- **THEN** 系统应每 3 秒(可配置)执行一次 `RPUSH 项目心跳 ` +- **AND** JSON 必须包含 `projectName`、`apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳) - **AND** value 使用 UTF-8 编码 -- **AND** 可选设置 TTL(例如 EX 30) ### Requirement: 控制台日志队列写入 系统 MUST 按协议向 Redis LIST 追加控制台日志。 diff --git a/scripts/db/020_partitioning_auto_daily.sql b/scripts/db/020_partitioning_auto_daily.sql index 057a39d..5f321a3 100644 --- a/scripts/db/020_partitioning_auto_daily.sql +++ b/scripts/db/020_partitioning_auto_daily.sql @@ -48,7 +48,7 @@ BEGIN part_name := heartbeat.partition_name_for_day(p_day); EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', part_name, start_ms, end_ms ); END; diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index cbdf41b..c0bd8d5 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -27,7 +27,7 @@ class DatabaseManager { }); // 监听连接池错误,防止后端断开导致进程崩溃 - this.pool.on('error', (err, client) => { + this.pool.on('error', (err) => { console.error('[db] 发生未捕获的连接池错误:', err); // 不抛出,让应用层通过心跳检测发现问题 }); @@ -213,7 +213,7 @@ class DatabaseManager { part_name := heartbeat.partition_name_for_day(p_day); EXECUTE format( - 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)', + 'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot', part_name, start_ms, end_ms ); END; @@ -316,10 +316,21 @@ class DatabaseManager { getPartitionConfig() { const cfg = this.config.partitionMaintenance ?? {}; + const futureDaysRaw = Number(cfg.futureDays); + const intervalHoursRaw = Number(cfg.intervalHours); + + const futureDays = Number.isFinite(futureDaysRaw) && futureDaysRaw > 0 + ? Math.floor(futureDaysRaw) + : 30; + + const intervalHours = Number.isFinite(intervalHoursRaw) && intervalHoursRaw > 0 + ? intervalHoursRaw + : 6; + return { enabled: cfg.enabled !== false, - futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30, - intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6, + futureDays, + intervalHours, }; } @@ -490,24 +501,21 @@ class DatabaseManager { await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); } + const generateRows = async function* () { + for (const e of events) { + const rowValues = toRowValues(e); + const line = rowValues.map(formatPgCol).join('\t') + '\n'; + yield line; + } + }; + const client = await this.pool.connect(); try { const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`; const stream = client.query(copyFrom(copySql)); - // Use a generator to stream rows directly - async function* generateRows() { - for (const e of events) { - const rowValues = toRowValues(e); - const line = rowValues.map(formatPgCol).join('\t') + '\n'; - yield line; - } - } - await pipeline(Readable.from(generateRows()), stream); return { insertedCount: events.length }; - } catch (error) { - throw error; } finally { client.release(); }