feat: 更新文档和数据库管理逻辑,添加运行模式说明,修复分区表空间设置

This commit is contained in:
2026-03-03 20:22:01 +08:00
parent c0cdc9ea66
commit 2b529baeb3
7 changed files with 40 additions and 81 deletions

View File

@@ -19,6 +19,12 @@ BLS心跳接收端用于接收并处理Kafka队列中的心跳数据经过
## 快速开始
## 运行模式说明(重要)
- 本项目是 Kafka 消费服务,**主运行入口是** `src/index.js`,即 `npm run start`
- `npm run dev` / `npm run build` / `npm run preview` 仅用于 Vite 相关能力,不会启动心跳消费主链路。
- 生产环境如使用 PM2请使用仓库中的 `ecosystem.config.cjs`
### 安装依赖
```bash

View File

@@ -69,6 +69,7 @@
## 3. 分区策略与自动分区
- 分区键:`ts_ms`
- 粒度按天Asia/Shanghai自然日
- 新建分区表空间:每日新分区显式创建在 `ts_hot`(非 `pg_default`
- 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用
调用方式:

View File

@@ -105,7 +105,7 @@ npm run db:apply
```bash
# 使用PM2启动服务
pm2 start ecosystem.config.js
pm2 start ecosystem.config.cjs
# 查看服务状态
pm2 status
@@ -129,7 +129,7 @@ pm2 startup
## PM2配置说明
项目提供了 `ecosystem.config.js` 配置文件,包含以下配置:
项目提供了 `ecosystem.config.cjs` 配置文件,包含以下配置:
- 应用名称:`web-bls-heartbeat-server`
- 运行模式:`cluster`(集群模式)
@@ -199,7 +199,7 @@ xcopy /E /I /Y C:\BLS_Heartbeat_Server C:\BLS_Heartbeat_Server_backup
npm install --production
# 重启服务
pm2 start ecosystem.config.js
pm2 start ecosystem.config.cjs
# 验证服务状态
pm2 status

View File

@@ -1,55 +0,0 @@
-- 通过 docker compose 在容器内执行 psql并使用 here-doc 传入 SQL
docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL'
-- 使用匿名代码块批量处理分区创建与索引迁移
DO $$
DECLARE
d date; -- 循环日期(从今天到未来 29 天)
pname text; -- 分区表名,例如 heartbeat_events_20260303
start_ms bigint; -- 分区起始毫秒时间戳UTC
end_ms bigint; -- 分区结束毫秒时间戳UTC不含
idx record; -- 遍历分区索引时的游标记录
BEGIN
-- 生成从 current_date 到 current_date+29 的日期序列(共 30 天)
FOR d IN
SELECT generate_series(current_date, current_date + 29, interval '1 day')::date
LOOP
-- 按约定命名分区名heartbeat_events_YYYYMMDD
pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD'));
-- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界
start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint;
-- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界
end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint;
-- 若分区不存在则创建;存在则跳过(幂等)
EXECUTE format(
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
pname, start_ms, end_ms
);
-- 无论新建或已存在,强制把分区表迁移到 ts_hot保证热分区落热盘
EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname);
-- 遍历该分区的全部索引,筛出不在 ts_hot 的索引
FOR idx IN
SELECT idxn.nspname AS index_schema, i.relname AS index_name
FROM pg_index x
JOIN pg_class t ON t.oid = x.indrelid
JOIN pg_namespace nt ON nt.oid = t.relnamespace
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
WHERE nt.nspname = 'heartbeat'
AND t.relname = pname
AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot'
LOOP
-- 将索引迁移到 ts_hot确保“分区与索引同盘”
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name);
END LOOP;
END LOOP;
END $$;
-- here-doc 结束标记
SQL

View File

@@ -1,19 +1,18 @@
# Redis 对接规范
## Purpose
本规范定义本服务按协议向 Redis 写入“项目心跳”(STRING) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。
本规范定义本服务按协议向 Redis 写入“项目心跳”(LIST) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。
## Requirements
### Requirement: 心跳 Key 写入
系统 MUST 按协议周期性写入 Redis STRING 心跳 Key
### Requirement: 项目心跳列表写入
系统 MUST 按协议周期性`项目心跳` LIST 追加心跳记录
#### Scenario: 定期刷新心跳
#### Scenario: 定期刷新项目心跳
- **WHEN** 服务运行中
- **THEN** 系统应每 3 秒(可配置)执行一次 `SET ${projectName}_项目心跳 <json>`
- **AND** JSON 必须包含 `apiBaseUrl``lastActiveAt`(毫秒时间戳)
- **THEN** 系统应每 3 秒(可配置)执行一次 `RPUSH 项目心跳 <json>`
- **AND** JSON 必须包含 `projectName``apiBaseUrl``lastActiveAt`(毫秒时间戳)
- **AND** value 使用 UTF-8 编码
- **AND** 可选设置 TTL例如 EX 30
### Requirement: 控制台日志队列写入
系统 MUST 按协议向 Redis LIST 追加控制台日志。

View File

@@ -48,7 +48,7 @@ BEGIN
part_name := heartbeat.partition_name_for_day(p_day);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
part_name, start_ms, end_ms
);
END;

View File

@@ -27,7 +27,7 @@ class DatabaseManager {
});
// 监听连接池错误,防止后端断开导致进程崩溃
this.pool.on('error', (err, client) => {
this.pool.on('error', (err) => {
console.error('[db] 发生未捕获的连接池错误:', err);
// 不抛出,让应用层通过心跳检测发现问题
});
@@ -213,7 +213,7 @@ class DatabaseManager {
part_name := heartbeat.partition_name_for_day(p_day);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
part_name, start_ms, end_ms
);
END;
@@ -316,10 +316,21 @@ class DatabaseManager {
getPartitionConfig() {
const cfg = this.config.partitionMaintenance ?? {};
const futureDaysRaw = Number(cfg.futureDays);
const intervalHoursRaw = Number(cfg.intervalHours);
const futureDays = Number.isFinite(futureDaysRaw) && futureDaysRaw > 0
? Math.floor(futureDaysRaw)
: 30;
const intervalHours = Number.isFinite(intervalHoursRaw) && intervalHoursRaw > 0
? intervalHoursRaw
: 6;
return {
enabled: cfg.enabled !== false,
futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30,
intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6,
futureDays,
intervalHours,
};
}
@@ -490,24 +501,21 @@ class DatabaseManager {
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
}
const generateRows = async function* () {
for (const e of events) {
const rowValues = toRowValues(e);
const line = rowValues.map(formatPgCol).join('\t') + '\n';
yield line;
}
};
const client = await this.pool.connect();
try {
const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`;
const stream = client.query(copyFrom(copySql));
// Use a generator to stream rows directly
async function* generateRows() {
for (const e of events) {
const rowValues = toRowValues(e);
const line = rowValues.map(formatPgCol).join('\t') + '\n';
yield line;
}
}
await pipeline(Readable.from(generateRows()), stream);
return { insertedCount: events.length };
} catch (error) {
throw error;
} finally {
client.release();
}