@@ -19,12 +19,6 @@ BLS心跳接收端,用于接收并处理Kafka队列中的心跳数据,经过
|
|||||||
|
|
||||||
## 快速开始
|
## 快速开始
|
||||||
|
|
||||||
|
|
||||||
## 运行模式说明(重要)
|
|
||||||
|
|
||||||
- 本项目是 Kafka 消费服务,**主运行入口是** `src/index.js`,即 `npm run start`。
|
|
||||||
- `npm run dev` / `npm run build` / `npm run preview` 仅用于 Vite 相关能力,不会启动心跳消费主链路。
|
|
||||||
- 生产环境如使用 PM2,请使用仓库中的 `ecosystem.config.cjs`。
|
|
||||||
### 安装依赖
|
### 安装依赖
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
@@ -69,7 +69,6 @@
|
|||||||
## 3. 分区策略与自动分区
|
## 3. 分区策略与自动分区
|
||||||
- 分区键:`ts_ms`
|
- 分区键:`ts_ms`
|
||||||
- 粒度:按天(Asia/Shanghai,自然日)
|
- 粒度:按天(Asia/Shanghai,自然日)
|
||||||
- 新建分区表空间:每日新分区显式创建在 `ts_hot`(非 `pg_default`)
|
|
||||||
- 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用
|
- 自动分区:通过“预创建分区”的方式实现(安装时预建昨天~未来 7 天),并提供函数供服务启动/定时任务调用
|
||||||
|
|
||||||
调用方式:
|
调用方式:
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ npm run db:apply
|
|||||||
|
|
||||||
```bash
|
```bash
|
||||||
# 使用PM2启动服务
|
# 使用PM2启动服务
|
||||||
pm2 start ecosystem.config.cjs
|
pm2 start ecosystem.config.js
|
||||||
|
|
||||||
# 查看服务状态
|
# 查看服务状态
|
||||||
pm2 status
|
pm2 status
|
||||||
@@ -129,7 +129,7 @@ pm2 startup
|
|||||||
|
|
||||||
## PM2配置说明
|
## PM2配置说明
|
||||||
|
|
||||||
项目提供了 `ecosystem.config.cjs` 配置文件,包含以下配置:
|
项目提供了 `ecosystem.config.js` 配置文件,包含以下配置:
|
||||||
|
|
||||||
- 应用名称:`web-bls-heartbeat-server`
|
- 应用名称:`web-bls-heartbeat-server`
|
||||||
- 运行模式:`cluster`(集群模式)
|
- 运行模式:`cluster`(集群模式)
|
||||||
@@ -199,7 +199,7 @@ xcopy /E /I /Y C:\BLS_Heartbeat_Server C:\BLS_Heartbeat_Server_backup
|
|||||||
npm install --production
|
npm install --production
|
||||||
|
|
||||||
# 重启服务
|
# 重启服务
|
||||||
pm2 start ecosystem.config.cjs
|
pm2 start ecosystem.config.js
|
||||||
|
|
||||||
# 验证服务状态
|
# 验证服务状态
|
||||||
pm2 status
|
pm2 status
|
||||||
|
|||||||
55
docs/新分区方法案例.md
Normal file
55
docs/新分区方法案例.md
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
-- 通过 docker compose 在容器内执行 psql,并使用 here-doc 传入 SQL
|
||||||
|
docker compose exec -T postgres psql -U log_admin -d log_platform -v ON_ERROR_STOP=1 <<'SQL'
|
||||||
|
|
||||||
|
-- 使用匿名代码块批量处理分区创建与索引迁移
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
d date; -- 循环日期(从今天到未来 29 天)
|
||||||
|
pname text; -- 分区表名,例如 heartbeat_events_20260303
|
||||||
|
start_ms bigint; -- 分区起始毫秒时间戳(UTC,含)
|
||||||
|
end_ms bigint; -- 分区结束毫秒时间戳(UTC,不含)
|
||||||
|
idx record; -- 遍历分区索引时的游标记录
|
||||||
|
BEGIN
|
||||||
|
-- 生成从 current_date 到 current_date+29 的日期序列(共 30 天)
|
||||||
|
FOR d IN
|
||||||
|
SELECT generate_series(current_date, current_date + 29, interval '1 day')::date
|
||||||
|
LOOP
|
||||||
|
-- 按约定命名分区名:heartbeat_events_YYYYMMDD
|
||||||
|
pname := format('heartbeat_events_%s', to_char(d, 'YYYYMMDD'));
|
||||||
|
|
||||||
|
-- 计算该日期 00:00:00 UTC 的毫秒时间戳作为分区下界
|
||||||
|
start_ms := (extract(epoch from (d::timestamp at time zone 'UTC')) * 1000)::bigint;
|
||||||
|
|
||||||
|
-- 计算下一天 00:00:00 UTC 的毫秒时间戳作为分区上界
|
||||||
|
end_ms := (extract(epoch from ((d + 1)::timestamp at time zone 'UTC')) * 1000)::bigint;
|
||||||
|
|
||||||
|
-- 若分区不存在则创建;存在则跳过(幂等)
|
||||||
|
EXECUTE format(
|
||||||
|
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
|
||||||
|
pname, start_ms, end_ms
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 无论新建或已存在,强制把分区表迁移到 ts_hot(保证热分区落热盘)
|
||||||
|
EXECUTE format('ALTER TABLE heartbeat.%I SET TABLESPACE ts_hot', pname);
|
||||||
|
|
||||||
|
-- 遍历该分区的全部索引,筛出不在 ts_hot 的索引
|
||||||
|
FOR idx IN
|
||||||
|
SELECT idxn.nspname AS index_schema, i.relname AS index_name
|
||||||
|
FROM pg_index x
|
||||||
|
JOIN pg_class t ON t.oid = x.indrelid
|
||||||
|
JOIN pg_namespace nt ON nt.oid = t.relnamespace
|
||||||
|
JOIN pg_class i ON i.oid = x.indexrelid
|
||||||
|
JOIN pg_namespace idxn ON idxn.oid = i.relnamespace
|
||||||
|
LEFT JOIN pg_tablespace ts ON ts.oid = i.reltablespace
|
||||||
|
WHERE nt.nspname = 'heartbeat'
|
||||||
|
AND t.relname = pname
|
||||||
|
AND COALESCE(ts.spcname, 'pg_default') <> 'ts_hot'
|
||||||
|
LOOP
|
||||||
|
-- 将索引迁移到 ts_hot,确保“分区与索引同盘”
|
||||||
|
EXECUTE format('ALTER INDEX %I.%I SET TABLESPACE ts_hot', idx.index_schema, idx.index_name);
|
||||||
|
END LOOP;
|
||||||
|
END LOOP;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
-- here-doc 结束标记
|
||||||
|
SQL
|
||||||
@@ -1,18 +1,19 @@
|
|||||||
# Redis 对接规范
|
# Redis 对接规范
|
||||||
|
|
||||||
## Purpose
|
## Purpose
|
||||||
本规范定义本服务按协议向 Redis 写入“项目心跳”(LIST) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。
|
本规范定义本服务按协议向 Redis 写入“项目心跳”(STRING) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
### Requirement: 项目心跳列表写入
|
### Requirement: 心跳 Key 写入
|
||||||
系统 MUST 按协议周期性向 `项目心跳` LIST 追加心跳记录。
|
系统 MUST 按协议周期性写入 Redis STRING 心跳 Key。
|
||||||
|
|
||||||
#### Scenario: 定期刷新项目心跳
|
#### Scenario: 定期刷新心跳
|
||||||
- **WHEN** 服务运行中
|
- **WHEN** 服务运行中
|
||||||
- **THEN** 系统应每 3 秒(可配置)执行一次 `RPUSH 项目心跳 <json>`
|
- **THEN** 系统应每 3 秒(可配置)执行一次 `SET ${projectName}_项目心跳 <json>`
|
||||||
- **AND** JSON 必须包含 `projectName`、`apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳)
|
- **AND** JSON 必须包含 `apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳)
|
||||||
- **AND** value 使用 UTF-8 编码
|
- **AND** value 使用 UTF-8 编码
|
||||||
|
- **AND** 可选设置 TTL(例如 EX 30)
|
||||||
|
|
||||||
### Requirement: 控制台日志队列写入
|
### Requirement: 控制台日志队列写入
|
||||||
系统 MUST 按协议向 Redis LIST 追加控制台日志。
|
系统 MUST 按协议向 Redis LIST 追加控制台日志。
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ BEGIN
|
|||||||
part_name := heartbeat.partition_name_for_day(p_day);
|
part_name := heartbeat.partition_name_for_day(p_day);
|
||||||
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
|
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
|
||||||
part_name, start_ms, end_ms
|
part_name, start_ms, end_ms
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ class DatabaseManager {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// 监听连接池错误,防止后端断开导致进程崩溃
|
// 监听连接池错误,防止后端断开导致进程崩溃
|
||||||
this.pool.on('error', (err) => {
|
this.pool.on('error', (err, client) => {
|
||||||
console.error('[db] 发生未捕获的连接池错误:', err);
|
console.error('[db] 发生未捕获的连接池错误:', err);
|
||||||
// 不抛出,让应用层通过心跳检测发现问题
|
// 不抛出,让应用层通过心跳检测发现问题
|
||||||
});
|
});
|
||||||
@@ -277,7 +277,7 @@ class DatabaseManager {
|
|||||||
part_name := heartbeat.partition_name_for_day(p_day);
|
part_name := heartbeat.partition_name_for_day(p_day);
|
||||||
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s) TABLESPACE ts_hot',
|
'CREATE TABLE IF NOT EXISTS heartbeat.%I PARTITION OF heartbeat.heartbeat_events FOR VALUES FROM (%s) TO (%s)',
|
||||||
part_name, start_ms, end_ms
|
part_name, start_ms, end_ms
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -382,21 +382,10 @@ class DatabaseManager {
|
|||||||
|
|
||||||
getPartitionConfig() {
|
getPartitionConfig() {
|
||||||
const cfg = this.config.partitionMaintenance ?? {};
|
const cfg = this.config.partitionMaintenance ?? {};
|
||||||
const futureDaysRaw = Number(cfg.futureDays);
|
|
||||||
const intervalHoursRaw = Number(cfg.intervalHours);
|
|
||||||
|
|
||||||
const futureDays = Number.isFinite(futureDaysRaw) && futureDaysRaw > 0
|
|
||||||
? Math.floor(futureDaysRaw)
|
|
||||||
: 30;
|
|
||||||
|
|
||||||
const intervalHours = Number.isFinite(intervalHoursRaw) && intervalHoursRaw > 0
|
|
||||||
? intervalHoursRaw
|
|
||||||
: 6;
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
enabled: cfg.enabled !== false,
|
enabled: cfg.enabled !== false,
|
||||||
futureDays,
|
futureDays: Number.isFinite(cfg.futureDays) ? cfg.futureDays : 30,
|
||||||
intervalHours,
|
intervalHours: Number.isFinite(cfg.intervalHours) ? cfg.intervalHours : 6,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -567,21 +556,24 @@ class DatabaseManager {
|
|||||||
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
|
await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues));
|
||||||
}
|
}
|
||||||
|
|
||||||
const generateRows = async function* () {
|
|
||||||
for (const e of events) {
|
|
||||||
const rowValues = toRowValues(e);
|
|
||||||
const line = rowValues.map(formatPgCol).join('\t') + '\n';
|
|
||||||
yield line;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const client = await this.pool.connect();
|
const client = await this.pool.connect();
|
||||||
try {
|
try {
|
||||||
const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`;
|
const copySql = `COPY heartbeat.heartbeat_events (${columns.join(', ')}) FROM STDIN WITH (FORMAT text, DELIMITER E'\\t', NULL '\\N')`;
|
||||||
const stream = client.query(copyFrom(copySql));
|
const stream = client.query(copyFrom(copySql));
|
||||||
|
|
||||||
|
// Use a generator to stream rows directly
|
||||||
|
async function* generateRows() {
|
||||||
|
for (const e of events) {
|
||||||
|
const rowValues = toRowValues(e);
|
||||||
|
const line = rowValues.map(formatPgCol).join('\t') + '\n';
|
||||||
|
yield line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await pipeline(Readable.from(generateRows()), stream);
|
await pipeline(Readable.from(generateRows()), stream);
|
||||||
return { insertedCount: events.length };
|
return { insertedCount: events.length };
|
||||||
|
} catch (error) {
|
||||||
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user