feat: 实现GUID主键与service_mask索引改造
- 将主键从自增id改为GUID格式并添加格式校验 - 为service_mask添加表达式索引优化首位查询性能 - 更新相关文档说明改造方案与验证步骤 - 添加统计模块记录数据库写入与Kafka消费量 - 重构Redis心跳协议改用LIST类型存储项目状态 - 修复部署脚本中的服务名称不一致问题
This commit is contained in:
@@ -17,6 +17,6 @@ module.exports = {
|
||||
sourceType: 'module',
|
||||
},
|
||||
extends: ['eslint:recommended'],
|
||||
ignorePatterns: ['dist/', 'build/', 'coverage/', 'node_modules/'],
|
||||
ignorePatterns: ['dist/', 'build/', 'coverage/', 'node_modules/', 'release/'],
|
||||
rules: {},
|
||||
};
|
||||
|
||||
40
docs/db-guid-pk-and-service-mask-index-plan.md
Normal file
40
docs/db-guid-pk-and-service-mask-index-plan.md
Normal file
@@ -0,0 +1,40 @@
|
||||
# GUID 主键与 service_mask 索引改造实施方案
|
||||
|
||||
## 目标
|
||||
- 将 `heartbeat.heartbeat_events` 的主键从自增 `id` 改为 GUID(32 位无连字符 HEX 字符串)。
|
||||
- 为 `service_mask` 的“首位(最低位)判断”新增表达式索引 `idx_service_mask_first_bit`,用于优化常见 bit 查询。
|
||||
- 说明:你会删除现有数据库,因此不提供在线迁移流程,仅提供可重建的初始化脚本与验证步骤。
|
||||
|
||||
## 方案概述
|
||||
### 1) 主键(GUID)
|
||||
- 表字段 `guid` 使用 `varchar(32)` 并设置默认值 `replace(gen_random_uuid()::text, '-', '')`。
|
||||
- 通过 `CHECK (guid ~ '^[0-9a-f]{32}$')` 约束输入格式为 32 位小写 HEX。
|
||||
- 分区表使用组合主键 `PRIMARY KEY (ts_ms, guid)` 以满足 PostgreSQL 分区唯一约束要求(主键/唯一约束需包含分区键)。
|
||||
|
||||
### 2) service_mask 首位查询表达式索引
|
||||
- 在父表创建索引:
|
||||
- `CREATE INDEX idx_service_mask_first_bit ON heartbeat.heartbeat_events ((service_mask & 1));`
|
||||
- 在按天分区创建索引:
|
||||
- `idx_<partition>_service_mask_first_bit`
|
||||
|
||||
## 实施步骤(重建式)
|
||||
1. 删除旧数据库/旧 schema(你将执行)。
|
||||
2. 执行建表脚本:
|
||||
- `scripts/db/010_heartbeat_schema.sql`
|
||||
- `scripts/db/020_partitioning_auto_daily.sql`
|
||||
3. 启动服务或执行 `npm run db:apply` 进行初始化。
|
||||
4. 执行验证项:
|
||||
- 确认 `heartbeat.heartbeat_events.guid` 为 `varchar(32)` 且存在默认值
|
||||
- 确认存在 `idx_service_mask_first_bit`
|
||||
- 确认分区新建后存在 `idx_<partition>_service_mask_first_bit`
|
||||
|
||||
## 风险评估
|
||||
- GUID 默认生成依赖 `pgcrypto` 扩展:脚本已包含 `CREATE EXTENSION IF NOT EXISTS pgcrypto;`,但执行账号需要具备安装扩展权限。
|
||||
- 分区表主键约束限制:无法实现“父表仅 guid 作为主键约束”,因此使用 `(ts_ms, guid)` 的组合主键。
|
||||
- 表达式索引的表达式匹配:业务查询需要匹配索引表达式(详见 checklist),否则无法命中索引。
|
||||
|
||||
## 回滚步骤(重建式)
|
||||
1. 停止服务写入。
|
||||
2. `DROP SCHEMA heartbeat CASCADE;`(或删除数据库)。
|
||||
3. 使用回滚版本的 SQL 脚本重新创建(例如回退到 `id bigserial` 版本的脚本)。
|
||||
4. 重新启动服务并验证写入与查询。
|
||||
@@ -19,7 +19,7 @@
|
||||
### 2.1 字段列表
|
||||
| 字段 | 类型 | 必填 | 说明 |
|
||||
|---|---|---:|---|
|
||||
| id | bigserial | 否(自动生成) | 自增序列号(写入时可不提供) |
|
||||
| guid | varchar(32) | 是(自动生成) | GUID(32 位无连字符 HEX,小写;自动生成) |
|
||||
| ts_ms | bigint | 是 | 毫秒级时间戳(epoch ms) |
|
||||
| hotel_id | int2 | 是 | 酒店编号 |
|
||||
| room_id | varchar(50) | 是 | 房间编号(或房间唯一标识,按字符串存储) |
|
||||
@@ -59,7 +59,7 @@
|
||||
需求写“主键:id(bigserial)”,但 **PostgreSQL 分区表的主键/唯一约束通常必须包含分区键**。
|
||||
|
||||
脚本采用:
|
||||
- `PRIMARY KEY (ts_ms, id)`
|
||||
- `PRIMARY KEY (ts_ms, guid)`
|
||||
|
||||
原因:保证分区表可创建、约束可落地。
|
||||
|
||||
|
||||
@@ -11,13 +11,13 @@
|
||||
|
||||
## 2. 偏差与风险(需要评估)
|
||||
### 2.1 “主键仅 id” 与 PostgreSQL 分区约束冲突
|
||||
- 需求写:主键为 `id (bigserial)`。
|
||||
- 现实现:`PRIMARY KEY (ts_ms, id)`。
|
||||
- 需求写:主键为 `id`(GUID)。
|
||||
- 现实现:`PRIMARY KEY (ts_ms, guid)`(其中 guid 为 varchar(32))。
|
||||
|
||||
原因:PostgreSQL 分区表的主键/唯一约束通常需要包含分区键,否则无法在父表创建全局约束。
|
||||
|
||||
影响:
|
||||
- 业务若强依赖“仅 id 即主键”的语义,需要额外约定(例如只把 id 当作全局唯一序列号使用,主键组合用于物理约束)。
|
||||
- 业务若强依赖“仅 guid 即主键”的语义,需要额外约定(例如只把 guid 当作全局唯一序列号使用,主键组合用于物理约束)。
|
||||
|
||||
### 2.2 “自动分区”实现方式
|
||||
- 需求写:新分区可自动创建。
|
||||
|
||||
81
docs/db-service-mask-index-performance-report.md
Normal file
81
docs/db-service-mask-index-performance-report.md
Normal file
@@ -0,0 +1,81 @@
|
||||
# service_mask 索引性能对比报告
|
||||
|
||||
## 结论摘要(如何判定“提升”)
|
||||
- 在 `service_mask` 位运算过滤场景中,新增 `idx_service_mask_first_bit` 后,查询计划应从 `Seq Scan` 转为 `Index Scan/Bitmap Index Scan`(并触发分区裁剪时,仅扫描命中的分区)。
|
||||
- 若过滤选择性较高(例如仅小比例满足 “&1=1”),通常能显著降低 IO 与响应时间。
|
||||
|
||||
## 索引脚本
|
||||
- 结构与索引创建包含在:
|
||||
- `scripts/db/010_heartbeat_schema.sql`
|
||||
- `scripts/db/020_partitioning_auto_daily.sql`
|
||||
- 关键语句:
|
||||
- `CREATE INDEX idx_service_mask_first_bit ON heartbeat.heartbeat_events ((service_mask & 1));`
|
||||
|
||||
## 测试方法(可复现)
|
||||
### 1) 数据准备
|
||||
建议在测试库中写入至少 100 万行,保证计划稳定:
|
||||
|
||||
```sql
|
||||
SELECT heartbeat.ensure_partitions(current_date, current_date + 1);
|
||||
|
||||
INSERT INTO heartbeat.heartbeat_events (
|
||||
ts_ms, hotel_id, room_id, device_id, ip,
|
||||
power_state, guest_type, cardless_state, service_mask,
|
||||
pms_state, carbon_state, device_count, comm_seq, extra
|
||||
)
|
||||
SELECT
|
||||
(extract(epoch from now()) * 1000)::bigint + (g % 86400000),
|
||||
(g % 1000)::int2,
|
||||
('R' || (g % 500))::varchar(50),
|
||||
('D' || (g % 20000))::varchar(64),
|
||||
'127.0.0.1:1',
|
||||
1, 0, 0,
|
||||
(g % 1024)::bigint,
|
||||
0, 0, 1, (g % 100000)::int4,
|
||||
jsonb_build_object('src','bench')
|
||||
FROM generate_series(1, 1000000) g;
|
||||
|
||||
ANALYZE heartbeat.heartbeat_events;
|
||||
```
|
||||
|
||||
### 2) 对比维度
|
||||
- 响应时间:`EXPLAIN (ANALYZE, BUFFERS)` 的 `Execution Time`
|
||||
- QPS:用同一 SQL 连续执行 N 次(例如 1000 次)并统计平均耗时(由压测端计算)
|
||||
- 查询形态:
|
||||
- 形态 A(位运算表达式,命中表达式索引):
|
||||
- `WHERE (service_mask & 1) = 1`
|
||||
|
||||
## 修改前/后对比(需要在目标环境产出)
|
||||
说明:不同硬件/数据分布/缓存命中会导致数值差异。以下对比表格请在你的环境执行本报告“测试方法”并填入实际输出。
|
||||
|
||||
### A. 修改前(无 idx_service_mask_first_bit)
|
||||
- 查询计划(期望:Seq Scan 或 Bitmap Heap Scan + BRIN):
|
||||
|
||||
```sql
|
||||
EXPLAIN (ANALYZE, BUFFERS)
|
||||
SELECT count(*)
|
||||
FROM heartbeat.heartbeat_events
|
||||
WHERE (service_mask & 1) = 1;
|
||||
```
|
||||
|
||||
- 结果记录:
|
||||
- QPS(1000 次):____
|
||||
- P50 响应时间(ms):____
|
||||
- P95 响应时间(ms):____
|
||||
|
||||
### B. 修改后(有 idx_service_mask_first_bit)
|
||||
```sql
|
||||
EXPLAIN (ANALYZE, BUFFERS)
|
||||
SELECT count(*)
|
||||
FROM heartbeat.heartbeat_events
|
||||
WHERE (service_mask & 1) = 1;
|
||||
```
|
||||
|
||||
- 结果记录:
|
||||
- 位运算表达式命中索引(期望:Index/Bitmap Index Scan):
|
||||
- QPS(1000 次):____
|
||||
- P50(ms):____
|
||||
- P95(ms):____
|
||||
|
||||
## 结论与建议
|
||||
- 业务使用 `WHERE (service_mask & 1) = 1` 可直接命中 `idx_service_mask_first_bit`。
|
||||
@@ -36,7 +36,7 @@
|
||||
### 3.2 可选字段
|
||||
| 字段 | 类型 | 示例 | 说明 |
|
||||
|---|---|---|---|
|
||||
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 |
|
||||
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:原文、版本等其他自定义字段 |
|
||||
| electricity | array<object> | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) |
|
||||
| air_conditioner | array<object> | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) |
|
||||
|
||||
@@ -79,10 +79,8 @@
|
||||
}
|
||||
],
|
||||
"extra": {
|
||||
"source": "gw",
|
||||
"ver": "1.2.3",
|
||||
"ac": {"mode": 1, "set_temp": 26},
|
||||
"meter": {"p": 123.4, "e_wh": 5678}
|
||||
"original_byte": "0x12345678"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,34 @@
|
||||
# Redis 对接协议(供 AI 生成代码使用)
|
||||
# Redis 对接协议(供外部项目 AI 生成代码使用)
|
||||
|
||||
本文档定义“外部项目 ↔ BLS Project Console”之间通过 Redis 交互的 **Key 命名、数据类型、写入方式、读取方式与数据格式**。
|
||||
本文档定义"外部项目 ↔ BLS Project Console"之间通过 Redis 交互的 **Key 命名、数据类型、写入方式、读取方式与数据格式**。
|
||||
|
||||
> 约束:每个需要关联本控制台的外部项目,必须在本项目使用的同一个 Redis 实例中:
|
||||
>
|
||||
> - 写入 2 个 Key(心跳 + 控制台信息)
|
||||
> - 命令下发为 HTTP API 调用
|
||||
注:本仓库对外暴露的 Redis 连接信息如下(供对方直接连接以写入心跳/日志):
|
||||
|
||||
- 地址:`10.8.8.109`
|
||||
- 端口:默认 `6379`
|
||||
- 密码:无(空)
|
||||
- 数据库:固定 `15`
|
||||
|
||||
示例(环境变量):
|
||||
|
||||
```
|
||||
REDIS_HOST=10.8.8.109
|
||||
REDIS_PORT=6379
|
||||
REDIS_PASSWORD=
|
||||
REDIS_DB=15
|
||||
```
|
||||
|
||||
示例(redis-cli):
|
||||
|
||||
```
|
||||
redis-cli -h 10.8.8.109 -p 6379 -n 15
|
||||
```
|
||||
|
||||
> 约束:每个需要关联本控制台的外部项目,必须在同一个 Redis(DB15)中:
|
||||
|
||||
> - 更新 `项目心跳`(项目列表 + 心跳信息)
|
||||
> - 追加 `${projectName}_项目控制台`(日志队列)
|
||||
> - 命令下发为 HTTP API 调用(不通过 Redis 下发命令)
|
||||
|
||||
## 1. 命名约定
|
||||
|
||||
@@ -15,35 +38,45 @@
|
||||
|
||||
固定后缀:
|
||||
|
||||
- 心跳:`${projectName}_项目心跳`
|
||||
- 控制台:`${projectName}_项目控制台`
|
||||
|
||||
示例(projectName = `订单系统`):
|
||||
|
||||
- `订单系统_项目心跳`
|
||||
- `订单系统_项目控制台`
|
||||
|
||||
## 2. 外部项目需要写入的 2 个 Key
|
||||
|
||||
### 2.1 `${projectName}_项目心跳`
|
||||
说明:当前控制台左侧“项目选择列表”只读取 `项目心跳`(LIST)。因此外部项目必须维护该 Key,否则项目不会出现在列表中。
|
||||
|
||||
- Redis 数据类型:**STRING**
|
||||
- 写入方式:`SET ${projectName}_项目心跳 <json>`
|
||||
- value:JSON 字符串,必须包含目标项目可被调用的 `apiBaseUrl`,以及活跃时间戳 `lastActiveAt`
|
||||
### 2.1 `项目心跳`
|
||||
|
||||
推荐 JSON Schema:
|
||||
- Redis 数据类型:**LIST**
|
||||
- 写入方式(推荐 FIFO):`RPUSH 项目心跳 <json>`
|
||||
- value:每个列表元素为“项目心跳记录”的 JSON 字符串
|
||||
|
||||
示例(与当前代码读取一致;下面示例表示“逻辑结构”):
|
||||
|
||||
```json
|
||||
{
|
||||
"apiBaseUrl": "http://127.0.0.1:4001",
|
||||
"lastActiveAt": 1760000000000
|
||||
}
|
||||
[
|
||||
{
|
||||
"projectName": "BLS主机心跳日志",
|
||||
"apiBaseUrl": "http://127.0.0.1:3000",
|
||||
"lastActiveAt": 1768566165572
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
字段说明:
|
||||
示例(Redis 写入命令):
|
||||
|
||||
```
|
||||
RPUSH 项目心跳 "{\"projectName\":\"BLS主机心跳日志\",\"apiBaseUrl\":\"http://127.0.0.1:3000\",\"lastActiveAt\":1768566165572}"
|
||||
```
|
||||
|
||||
字段说明(每条心跳记录):
|
||||
|
||||
- `projectName`:项目名称(用于拼接日志 Key:`${projectName}_项目控制台`)
|
||||
- `apiBaseUrl`:目标项目对外提供的 API 地址(基地址,后端将基于它拼接 `apiName`)
|
||||
- `lastActiveAt`:状态时间(活跃时间戳,毫秒)。建议每 **3 秒**刷新一次。
|
||||
- `lastActiveAt`:活跃时间戳(毫秒)。建议每 **3 秒**刷新一次。
|
||||
|
||||
在线/离线判定(BLS Project Console 使用):
|
||||
|
||||
@@ -53,14 +86,19 @@
|
||||
建议:
|
||||
|
||||
- `lastActiveAt` 使用 `Date.now()` 生成(毫秒)
|
||||
- 可设置 TTL(可选):例如 `SET key value EX 30`
|
||||
- 建议对 `项目心跳` 做长度控制(可选):例如每次写入后执行 `LTRIM 项目心跳 -2000 -1` 保留最近 2000 条
|
||||
|
||||
去重提示:
|
||||
|
||||
- `项目心跳` 为 LIST 时,外部项目周期性 `RPUSH` 会产生多条重复记录
|
||||
- BLS Project Console 后端会按 `projectName` 去重,保留 `lastActiveAt` 最新的一条作为项目状态
|
||||
|
||||
### 2.2 `${projectName}_项目控制台`
|
||||
|
||||
- Redis 数据类型:**LIST**(作为项目向控制台追加的“消息队列/日志队列”)
|
||||
- Redis 数据类型:**LIST**(作为项目向控制台追加的"消息队列/日志队列")
|
||||
- 写入方式(推荐 FIFO):`RPUSH ${projectName}_项目控制台 <json>`
|
||||
|
||||
value(推荐格式):一条 JSON 字符串,表示“错误/调试信息”或日志记录。
|
||||
value(推荐格式):一条 JSON 字符串,表示"错误/调试信息"或日志记录。
|
||||
|
||||
推荐 JSON Schema(字段尽量保持稳定,便于控制台解析):
|
||||
|
||||
@@ -83,11 +121,59 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息
|
||||
- `message`:日志文本
|
||||
- `metadata`:可选对象(附加信息)
|
||||
|
||||
## 3. 命令下发方式(HTTP API 控制)
|
||||
## 3. 项目列表管理(重要)
|
||||
|
||||
### 3.1 迁移机制(仅用于旧数据导入)
|
||||
|
||||
BLS Project Console 支持从旧格式自动迁移到新格式:
|
||||
|
||||
- **旧格式**:每个项目独立的心跳键 `${projectName}_项目心跳`
|
||||
- **新格式**:统一的项目列表键 `项目心跳`(LIST 类型,每个元素为 JSON 字符串)
|
||||
|
||||
迁移过程:
|
||||
|
||||
1. 扫描所有 `${projectName}_项目心跳` 键
|
||||
2. 提取 `apiBaseUrl` 和 `lastActiveAt` 字段
|
||||
3. 写入到 `项目心跳`(LIST)
|
||||
4. 可选:删除旧键
|
||||
|
||||
重要说明(与当前代码实现一致):
|
||||
|
||||
- 迁移不会自动后台执行,需要通过接口触发:`POST /api/projects/migrate`
|
||||
- 迁移的目的只是“从历史 `${projectName}_项目心跳` 导入一次,生成 `项目心跳` 列表”
|
||||
- 迁移完成后,如果外部项目仍然只更新旧 Key,则 `项目心跳` 不会自动跟随更新;要想实时更新,外部项目必须直接更新 `项目心跳`
|
||||
|
||||
### 3.2 新格式项目列表结构
|
||||
|
||||
`项目心跳` 为 LIST,列表元素为 JSON 字符串;其“逻辑结构”如下:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"projectName": "订单系统",
|
||||
"apiBaseUrl": "http://127.0.0.1:4001",
|
||||
"lastActiveAt": 1760000000000
|
||||
},
|
||||
{
|
||||
"projectName": "用户服务",
|
||||
"apiBaseUrl": "http://127.0.0.1:4002",
|
||||
"lastActiveAt": 1760000000001
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### 3.3 外部项目对接建议
|
||||
|
||||
外部项目应当:
|
||||
|
||||
1. 定期写入 `项目心跳`(RPUSH 自己的心跳记录;允许产生多条记录,由控制台按 projectName 去重)
|
||||
2. 追加 `${projectName}_项目控制台` 日志
|
||||
|
||||
## 4. 命令下发方式(HTTP API 控制)
|
||||
|
||||
控制台不再通过 Redis 写入控制指令队列;改为由 BLS Project Console 后端根据目标项目心跳里的 `apiBaseUrl` 直接调用目标项目 HTTP API。
|
||||
|
||||
### 3.1 控制台输入格式
|
||||
### 4.1 控制台输入格式
|
||||
|
||||
一行文本按空格拆分:
|
||||
|
||||
@@ -100,7 +186,7 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息
|
||||
- `reload force`
|
||||
- `user/refreshCache tenantA`
|
||||
|
||||
### 3.2 目标项目需要提供的 API
|
||||
### 4.2 目标项目需要提供的 API
|
||||
|
||||
后端默认使用 `POST` 调用:
|
||||
|
||||
@@ -119,18 +205,69 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息
|
||||
}
|
||||
```
|
||||
|
||||
字段说明:
|
||||
|
||||
- `commandId`:唯一命令标识符
|
||||
- `timestamp`:命令发送时间(ISO-8601 格式)
|
||||
- `source`:命令来源标识
|
||||
- `apiName`:API 接口名
|
||||
- `args`:参数数组
|
||||
- `argsText`:参数文本(空格连接)
|
||||
|
||||
返回建议:
|
||||
|
||||
- 2xx 表示成功
|
||||
- 非 2xx 表示失败(控制台会展示 upstreamStatus 与部分返回内容)
|
||||
|
||||
## 4. 兼容与错误处理建议
|
||||
### 4.3 在线/离线判定
|
||||
|
||||
发送命令前,系统会检查项目在线状态:
|
||||
|
||||
- 从 `项目心跳` 列表读取 `lastActiveAt`
|
||||
- 若 `now - lastActiveAt > 10_000ms`,则认为该应用 **离线**,拒绝发送命令
|
||||
- 否则认为 **在线**,允许发送命令
|
||||
|
||||
## 5. 与本项目代码的对应关系
|
||||
|
||||
- **后端 `/api/projects`**:只从 `项目心跳`(LIST)读取项目列表,返回所有项目及其在线状态
|
||||
- **后端 `/api/commands`**:从 `项目心跳` 中查找目标项目的 `apiBaseUrl/lastActiveAt`,在线时调用目标项目 API
|
||||
- **后端 `/api/logs`**:读取 `${projectName}_项目控制台`(LIST);并基于 `项目心跳` 中该项目的 `lastActiveAt` 计算在线/离线与 API 地址信息
|
||||
|
||||
## 6. 兼容与错误处理建议
|
||||
|
||||
- JSON 解析失败:外部项目应记录错误,并丢弃该条消息(避免死循环阻塞消费)。
|
||||
- 消息过长:建议控制单条消息大小(例如 < 64KB)。
|
||||
- 字符编码:统一 UTF-8。
|
||||
- 心跳超时:建议外部项目每 3 秒更新一次心跳,避免被误判为离线。
|
||||
|
||||
## 5. 与本项目代码的对应关系(实现中)
|
||||
## 7. 数据迁移工具(旧数据导入)
|
||||
|
||||
- 后端通过 `/api/commands`:从 `${targetProjectName}_项目心跳` 读取 `apiBaseUrl` 与 `lastActiveAt`,在线时调用目标项目 API。
|
||||
- 后端通过 `/api/logs`:读取 `${projectName}_项目控制台`;并基于 `${projectName}_项目心跳` 返回在线/离线与 API 地址信息。
|
||||
如果需要从旧格式迁移到新格式,可使用以下 API:
|
||||
|
||||
```bash
|
||||
POST /api/projects/migrate
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"deleteOldKeys": false,
|
||||
"dryRun": false
|
||||
}
|
||||
```
|
||||
|
||||
参数说明:
|
||||
|
||||
- `deleteOldKeys`:是否删除旧格式键(默认 false)
|
||||
- `dryRun`:是否仅模拟运行(默认 false)
|
||||
|
||||
返回示例:
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"message": "数据迁移完成",
|
||||
"migrated": 2,
|
||||
"projects": [...],
|
||||
"listKey": "项目心跳",
|
||||
"deleteOldKeys": false
|
||||
}
|
||||
```
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
module.exports = {
|
||||
apps: [{
|
||||
name: 'bls-heartbeat-server',
|
||||
script: 'dist/index.es.js',
|
||||
instances: 'max',
|
||||
exec_mode: 'cluster',
|
||||
autorestart: true,
|
||||
watch: false,
|
||||
max_memory_restart: '1G',
|
||||
env: {
|
||||
NODE_ENV: 'production',
|
||||
PORT: 3000
|
||||
},
|
||||
env_development: {
|
||||
NODE_ENV: 'development',
|
||||
PORT: 3000
|
||||
},
|
||||
error_file: './logs/error.log',
|
||||
out_file: './logs/out.log',
|
||||
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
|
||||
merge_logs: true,
|
||||
time: true,
|
||||
min_uptime: '10s',
|
||||
max_restarts: 10,
|
||||
restart_delay: 4000,
|
||||
kill_timeout: 5000,
|
||||
listen_timeout: 10000,
|
||||
wait_ready: true,
|
||||
shutdown_with_message: true
|
||||
}]
|
||||
};
|
||||
@@ -51,7 +51,8 @@
|
||||
- **THEN** 应该存在按 `ts_ms` 日分区的心跳明细表
|
||||
- **AND** 必填字段应具备 NOT NULL 约束
|
||||
- **AND** 状态类字段应具备 CHECK 约束(限制取值范围)
|
||||
- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN)
|
||||
- **AND** 主键应采用 GUID(32 位无连字符 HEX 字符串)并具备格式 CHECK
|
||||
- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN;service_mask 首位查询表达式索引 idx_service_mask_first_bit)
|
||||
|
||||
#### Scenario: 自动分区
|
||||
- **WHEN** 写入某天数据而该日分区不存在
|
||||
|
||||
@@ -3,13 +3,15 @@
|
||||
|
||||
BEGIN;
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS heartbeat;
|
||||
|
||||
-- 主表(按 ts_ms 日分区)
|
||||
-- 说明:PostgreSQL 分区表的 PRIMARY KEY 通常需要包含分区键。
|
||||
-- 这里使用 (ts_ms, id) 作为主键以保证可创建且可执行。
|
||||
-- 这里使用 (ts_ms, guid) 作为主键以保证可创建且可执行。
|
||||
CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
|
||||
id bigserial,
|
||||
guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''),
|
||||
|
||||
ts_ms bigint NOT NULL,
|
||||
hotel_id int2 NOT NULL,
|
||||
@@ -43,7 +45,8 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
|
||||
-- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra)
|
||||
extra jsonb,
|
||||
|
||||
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id),
|
||||
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid),
|
||||
CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'),
|
||||
|
||||
-- CHECK 约束:先做“非负+上界”约束(避免未来枚举扩展导致写入失败)
|
||||
CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0),
|
||||
@@ -84,6 +87,9 @@ CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat
|
||||
-- 说明:BRIN 对“随时间递增且有相关性”的列收益更大;service_mask 若不具备相关性,收益可能有限。
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit
|
||||
ON heartbeat.heartbeat_events ((service_mask & 1));
|
||||
|
||||
-- 高价值附加索引(不在需求强制列表内):常见查询是 hotel_id + 时间范围
|
||||
-- 若不希望额外索引,可注释掉
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
|
||||
|
||||
@@ -61,6 +61,7 @@ BEGIN
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name);
|
||||
END;
|
||||
$$;
|
||||
|
||||
@@ -124,9 +124,9 @@ echo ========================================
|
||||
echo.
|
||||
echo 常用命令:
|
||||
echo 查看状态: pm2 status
|
||||
echo 查看日志: pm2 logs web-bls-heartbeat-server
|
||||
echo 重启服务: pm2 restart web-bls-heartbeat-server
|
||||
echo 停止服务: pm2 stop web-bls-heartbeat-server
|
||||
echo 查看日志: pm2 logs bls-heartbeat
|
||||
echo 重启服务: pm2 restart bls-heartbeat
|
||||
echo 停止服务: pm2 stop bls-heartbeat
|
||||
echo 实时监控: pm2 monit
|
||||
echo.
|
||||
pause
|
||||
pause
|
||||
|
||||
@@ -67,7 +67,7 @@ echo. >> "%RELEASE_DIR%\README.txt"
|
||||
echo ## 文件说明 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - dist/: 构建后的应用程序文件 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - package.json: 项目依赖配置 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - ecosystem.config.js: PM2进程管理配置 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - ecosystem.config.cjs: PM2进程管理配置 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - .env.example: 环境变量配置示例 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - scripts/: 部署和更新脚本 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - src/config/config.example.js: 配置文件示例 >> "%RELEASE_DIR%\README.txt"
|
||||
@@ -76,8 +76,8 @@ echo ## 常用命令 >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 首次部署: scripts\deploy.bat >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 更新部署: scripts\update.bat >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 查看状态: pm2 status >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 查看日志: pm2 logs web-bls-heartbeat-server >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 重启服务: pm2 restart web-bls-heartbeat-server >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 查看日志: pm2 logs bls-heartbeat >> "%RELEASE_DIR%\README.txt"
|
||||
echo - 重启服务: pm2 restart bls-heartbeat >> "%RELEASE_DIR%\README.txt"
|
||||
echo. >> "%RELEASE_DIR%\README.txt"
|
||||
echo 详细文档请参考 docs/deployment.md >> "%RELEASE_DIR%\README.txt"
|
||||
echo [成功] 说明文件已创建
|
||||
@@ -92,4 +92,4 @@ dir /b "%RELEASE_DIR%"
|
||||
echo.
|
||||
echo 复制整个 %RELEASE_DIR% 文件夹到目标服务器即可
|
||||
echo.
|
||||
pause
|
||||
pause
|
||||
|
||||
@@ -8,12 +8,12 @@ setlocal enabledelayedexpansion
|
||||
|
||||
:: 检查服务是否运行
|
||||
echo [1/6] 检查服务状态...
|
||||
pm2 describe web-bls-heartbeat-server >nul 2>&1
|
||||
pm2 describe bls-heartbeat >nul 2>&1
|
||||
if errorlevel 1 (
|
||||
echo [警告] 服务未运行,跳过停止步骤
|
||||
) else (
|
||||
echo [信息] 停止服务...
|
||||
pm2 stop web-bls-heartbeat-server
|
||||
pm2 stop bls-heartbeat
|
||||
if errorlevel 1 (
|
||||
echo [错误] 服务停止失败
|
||||
pause
|
||||
@@ -83,9 +83,9 @@ echo ========================================
|
||||
echo.
|
||||
echo 常用命令:
|
||||
echo 查看状态: pm2 status
|
||||
echo 查看日志: pm2 logs web-bls-heartbeat-server
|
||||
echo 重启服务: pm2 restart web-bls-heartbeat-server
|
||||
echo 停止服务: pm2 stop web-bls-heartbeat-server
|
||||
echo 查看日志: pm2 logs bls-heartbeat
|
||||
echo 重启服务: pm2 restart bls-heartbeat
|
||||
echo 停止服务: pm2 stop bls-heartbeat
|
||||
echo 实时监控: pm2 monit
|
||||
echo.
|
||||
pause
|
||||
pause
|
||||
|
||||
@@ -76,10 +76,12 @@ class DatabaseManager {
|
||||
const v2SchemaQuery = `
|
||||
BEGIN;
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS heartbeat;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
|
||||
id bigserial,
|
||||
guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''),
|
||||
|
||||
ts_ms bigint NOT NULL,
|
||||
hotel_id int2 NOT NULL,
|
||||
@@ -112,7 +114,8 @@ class DatabaseManager {
|
||||
|
||||
extra jsonb,
|
||||
|
||||
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id),
|
||||
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid),
|
||||
CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'),
|
||||
|
||||
CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0),
|
||||
CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767),
|
||||
@@ -147,6 +150,10 @@ class DatabaseManager {
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit
|
||||
ON heartbeat.heartbeat_events ((service_mask & 1));
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);
|
||||
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);
|
||||
@@ -200,6 +207,7 @@ class DatabaseManager {
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name);
|
||||
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name);
|
||||
END;
|
||||
$$;
|
||||
|
||||
17
src/index.js
17
src/index.js
@@ -4,6 +4,7 @@ import { KafkaConsumer } from './kafka/consumer.js';
|
||||
import { HeartbeatProcessor } from './processor/heartbeatProcessor.js';
|
||||
import { DatabaseManager } from './db/databaseManager.js';
|
||||
import { RedisIntegration } from './redis/redisIntegration.js';
|
||||
import { StatsCounters, StatsReporter } from './stats/statsManager.js';
|
||||
|
||||
class WebBLSHeartbeatServer {
|
||||
constructor() {
|
||||
@@ -13,6 +14,8 @@ class WebBLSHeartbeatServer {
|
||||
this.databaseManager = null;
|
||||
this.redis = null;
|
||||
this.consumers = null;
|
||||
this.stats = new StatsCounters();
|
||||
this.statsReporter = null;
|
||||
}
|
||||
|
||||
async start() {
|
||||
@@ -21,6 +24,8 @@ class WebBLSHeartbeatServer {
|
||||
this.redis = new RedisIntegration(this.config.redis);
|
||||
await this.redis.connect();
|
||||
this.redis.startHeartbeat();
|
||||
this.statsReporter = new StatsReporter({ redis: this.redis, stats: this.stats });
|
||||
this.statsReporter.start();
|
||||
|
||||
// 初始化数据库连接
|
||||
this.databaseManager = new DatabaseManager({ ...this.config.db, maxConnections: 1 });
|
||||
@@ -36,11 +41,14 @@ class WebBLSHeartbeatServer {
|
||||
groupId: this.config.kafka?.groupId,
|
||||
fromOffset: this.config.kafka?.fromOffset ?? 'latest',
|
||||
ssl: !!this.config.kafka?.sslEnabled,
|
||||
sasl: !!this.config.kafka?.saslEnabled ? `enabled (mechanism: ${this.config.kafka?.saslMechanism})` : 'disabled'
|
||||
sasl: this.config.kafka?.saslEnabled ? `enabled (mechanism: ${this.config.kafka?.saslMechanism})` : 'disabled'
|
||||
});
|
||||
|
||||
// 初始化处理器(共享批处理队列)
|
||||
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager);
|
||||
this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, {
|
||||
redis: this.redis,
|
||||
stats: this.stats,
|
||||
});
|
||||
|
||||
// 在单进程内启动 N 个消费者实例(与分区数匹配)
|
||||
const instances = Math.max(1, Number(this.config.kafka?.consumerInstances ?? 1));
|
||||
@@ -69,6 +77,11 @@ class WebBLSHeartbeatServer {
|
||||
|
||||
async stop() {
|
||||
try {
|
||||
if (this.statsReporter) {
|
||||
this.statsReporter.stop();
|
||||
this.statsReporter = null;
|
||||
}
|
||||
|
||||
if (this.consumers && Array.isArray(this.consumers)) {
|
||||
for (const { consumer } of this.consumers) {
|
||||
await consumer.stopConsuming();
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
// 心跳处理器模块
|
||||
import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib';
|
||||
import { formatTimestamp } from '../stats/statsManager.js';
|
||||
|
||||
class HeartbeatProcessor {
|
||||
constructor(config, databaseManager) {
|
||||
constructor(config, databaseManager, deps = {}) {
|
||||
this.config = config;
|
||||
this.databaseManager = databaseManager;
|
||||
this.redis = deps?.redis ?? null;
|
||||
this.stats = deps?.stats ?? null;
|
||||
this.batchQueue = [];
|
||||
this.batchMessageQueue = [];
|
||||
this.batchTimer = null;
|
||||
@@ -13,9 +16,22 @@ class HeartbeatProcessor {
|
||||
|
||||
async processMessage(message) {
|
||||
const deferred = this.createDeferred();
|
||||
this.stats?.incKafkaPulled?.(1);
|
||||
|
||||
// 解包心跳消息
|
||||
const unpackedData = this.unpackMessage(message);
|
||||
let unpackedData = null;
|
||||
try {
|
||||
unpackedData = this.unpackMessage(message);
|
||||
} catch (err) {
|
||||
this.stats?.incFiltered?.(1);
|
||||
this._emitRejectedRecord({
|
||||
errorId: 'decode_failed',
|
||||
error: err,
|
||||
rawData: this._extractRawKafkaValue(message),
|
||||
});
|
||||
deferred.resolve({ insertedCount: 0 });
|
||||
return deferred.promise;
|
||||
}
|
||||
|
||||
// 支持批量上报:message.value 可能是 JSON 数组
|
||||
const items = Array.isArray(unpackedData) ? unpackedData : [unpackedData];
|
||||
@@ -27,12 +43,29 @@ class HeartbeatProcessor {
|
||||
// 验证心跳数据
|
||||
const isValid = this.validateData(effective);
|
||||
if (!isValid) {
|
||||
this.stats?.incFiltered?.(1);
|
||||
this._emitRejectedRecord({
|
||||
errorId: 'validation_failed',
|
||||
rawData: { item, effective },
|
||||
});
|
||||
console.error('无效的心跳数据:', effective);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 转换数据格式
|
||||
const transformedData = this.transformData(effective);
|
||||
let transformedData = null;
|
||||
try {
|
||||
transformedData = this.transformData(effective);
|
||||
} catch (err) {
|
||||
this.stats?.incFiltered?.(1);
|
||||
this._emitRejectedRecord({
|
||||
errorId: 'transform_failed',
|
||||
error: err,
|
||||
rawData: { item, effective },
|
||||
});
|
||||
console.error('转换心跳数据失败:', err);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 添加到批量队列
|
||||
this.batchQueue.push(transformedData);
|
||||
@@ -163,10 +196,11 @@ class HeartbeatProcessor {
|
||||
|
||||
this._batchInFlight = true;
|
||||
let hasMore = false;
|
||||
let batchData = null;
|
||||
|
||||
try {
|
||||
const { batchEventCount, batchMessageCount } = this.computeNextBatchWindow();
|
||||
const batchData = this.batchQueue.slice(0, batchEventCount);
|
||||
batchData = this.batchQueue.slice(0, batchEventCount);
|
||||
const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount);
|
||||
|
||||
let insertedCount = 0;
|
||||
@@ -189,10 +223,12 @@ class HeartbeatProcessor {
|
||||
entry.deferred.resolve({ insertedCount: entry.eventCount });
|
||||
}
|
||||
|
||||
this.stats?.incDbWritten?.(batchData.length);
|
||||
console.log(`成功处理批次数据,共 ${batchData.length} 条`);
|
||||
hasMore = this.batchQueue.length > 0;
|
||||
} catch (error) {
|
||||
console.error('批量处理失败:', error);
|
||||
this._emitDbWriteError(error, batchData);
|
||||
if (!this.batchTimer) {
|
||||
const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000));
|
||||
this.batchTimer = setTimeout(() => this.processBatch(), retryDelay);
|
||||
@@ -207,6 +243,70 @@ class HeartbeatProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
_emitDbWriteError(error, rawData) {
|
||||
if (!this.redis?.isEnabled?.()) return;
|
||||
const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : [];
|
||||
for (const record of list) {
|
||||
this._emitRejectedRecord({
|
||||
errorId: 'db_write_failed',
|
||||
error,
|
||||
rawData: record,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_emitRejectedRecord({ errorId, error, rawData }) {
|
||||
if (!this.redis?.isEnabled?.()) return;
|
||||
const ts = formatTimestamp(new Date());
|
||||
const errMsg = error ? String(error?.stack ?? error?.message ?? error) : undefined;
|
||||
const payload = this._safeStringify({
|
||||
errorId,
|
||||
error: errMsg,
|
||||
rawData,
|
||||
});
|
||||
const base = `[ERROR] ${ts} ${errorId}: `;
|
||||
const maxChunkChars = 50_000;
|
||||
if (payload.length <= maxChunkChars) {
|
||||
this.redis.pushConsoleLog?.({ level: 'warn', message: `${base}${payload}`, metadata: { module: 'processor' } });
|
||||
return;
|
||||
}
|
||||
const parts = Math.ceil(payload.length / maxChunkChars);
|
||||
for (let i = 0; i < parts; i += 1) {
|
||||
const chunk = payload.slice(i * maxChunkChars, (i + 1) * maxChunkChars);
|
||||
this.redis.pushConsoleLog?.({
|
||||
level: 'warn',
|
||||
message: `${base}(part ${i + 1}/${parts}) ${chunk}`,
|
||||
metadata: { module: 'processor' },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_extractRawKafkaValue(message) {
|
||||
try {
|
||||
const raw = message?.value;
|
||||
if (Buffer.isBuffer(raw)) {
|
||||
return { type: 'buffer', bytes: raw.length, base64: raw.toString('base64') };
|
||||
}
|
||||
if (typeof raw === 'string') {
|
||||
return { type: 'string', chars: raw.length, value: raw };
|
||||
}
|
||||
return { type: typeof raw, value: raw };
|
||||
} catch (err) {
|
||||
return { type: 'unknown', error: String(err?.message ?? err) };
|
||||
}
|
||||
}
|
||||
|
||||
_safeStringify(obj) {
|
||||
try {
|
||||
return JSON.stringify(obj);
|
||||
} catch (err) {
|
||||
return JSON.stringify({
|
||||
stringifyError: String(err?.message ?? err),
|
||||
type: typeof obj,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
shouldFlushNow() {
|
||||
const max = Math.max(1, Number(this.config.batchSize ?? 1));
|
||||
return this.batchQueue.length >= max;
|
||||
|
||||
@@ -8,6 +8,9 @@ class RedisIntegration {
|
||||
|
||||
this._connectPromise = null;
|
||||
this._lastErrorLogAt = 0;
|
||||
this._pendingConsoleLogs = [];
|
||||
this._flushingConsoleLogs = false;
|
||||
this._pendingHeartbeat = null;
|
||||
}
|
||||
|
||||
isEnabled() {
|
||||
@@ -23,7 +26,7 @@ class RedisIntegration {
|
||||
}
|
||||
|
||||
getHeartbeatKey() {
|
||||
return `${this.getProjectName()}_项目心跳`;
|
||||
return '项目心跳';
|
||||
}
|
||||
|
||||
getConsoleKey() {
|
||||
@@ -50,6 +53,14 @@ class RedisIntegration {
|
||||
return Number.isFinite(n) && n > 0 ? Math.floor(n) : null;
|
||||
}
|
||||
|
||||
getHeartbeatMaxLen() {
|
||||
const v = this.config?.heartbeatMaxLen;
|
||||
if (v === undefined) return 2000;
|
||||
if (v === null) return null;
|
||||
const n = Number(v);
|
||||
return Number.isFinite(n) && n > 0 ? Math.floor(n) : null;
|
||||
}
|
||||
|
||||
getConsoleMaxLen() {
|
||||
const v = this.config?.consoleMaxLen;
|
||||
if (v === undefined || v === null) return null;
|
||||
@@ -137,6 +148,7 @@ class RedisIntegration {
|
||||
.connect()
|
||||
.then(() => {
|
||||
console.log('[redis] connected');
|
||||
return Promise.all([this.flushPendingHeartbeat(), this.flushPendingConsoleLogs()]);
|
||||
})
|
||||
.catch((err) => {
|
||||
// connect 失败不抛出到上层;依赖 redis 内建重连策略或下次调用再触发
|
||||
@@ -163,15 +175,32 @@ class RedisIntegration {
|
||||
} finally {
|
||||
this.client = null;
|
||||
this._connectPromise = null;
|
||||
this._pendingConsoleLogs = [];
|
||||
this._pendingHeartbeat = null;
|
||||
console.log('[redis] disconnected');
|
||||
}
|
||||
}
|
||||
|
||||
async flushPendingHeartbeat() {
|
||||
if (!this.isEnabled()) return;
|
||||
if (!this.client || !this.client.isReady) return;
|
||||
if (!this._pendingHeartbeat) return;
|
||||
|
||||
const key = this.getHeartbeatKey();
|
||||
const value = this._pendingHeartbeat;
|
||||
this._pendingHeartbeat = null;
|
||||
|
||||
await this.client.rPush(key, value);
|
||||
const maxLen = this.getHeartbeatMaxLen();
|
||||
if (maxLen) {
|
||||
await this.client.lTrim(key, -maxLen, -1);
|
||||
}
|
||||
}
|
||||
|
||||
async writeHeartbeat() {
|
||||
if (!this.isEnabled()) return;
|
||||
if (!this.client || !this.client.isReady) return;
|
||||
|
||||
const payload = {
|
||||
projectName: this.getProjectName(),
|
||||
apiBaseUrl: this.getApiBaseUrl(),
|
||||
lastActiveAt: Date.now(),
|
||||
};
|
||||
@@ -179,12 +208,34 @@ class RedisIntegration {
|
||||
const key = this.getHeartbeatKey();
|
||||
const value = JSON.stringify(payload);
|
||||
|
||||
const ttl = this.getHeartbeatTtlSeconds();
|
||||
if (ttl) {
|
||||
await this.client.set(key, value, { EX: ttl });
|
||||
} else {
|
||||
await this.client.set(key, value);
|
||||
if (!this.client || !this.client.isReady) {
|
||||
this._pendingHeartbeat = value;
|
||||
this.ensureConnectedInBackground();
|
||||
return;
|
||||
}
|
||||
|
||||
let lastError = null;
|
||||
for (let attempt = 1; attempt <= 2; attempt += 1) {
|
||||
try {
|
||||
await this.client.rPush(key, value);
|
||||
const maxLen = this.getHeartbeatMaxLen();
|
||||
if (maxLen) {
|
||||
await this.client.lTrim(key, -maxLen, -1);
|
||||
}
|
||||
this._pendingHeartbeat = null;
|
||||
return;
|
||||
} catch (err) {
|
||||
lastError = err;
|
||||
this.ensureConnectedInBackground();
|
||||
if (attempt < 2) {
|
||||
await new Promise((r) => setTimeout(r, 250));
|
||||
if (!this.client?.isReady) return;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
startHeartbeat() {
|
||||
@@ -214,13 +265,52 @@ class RedisIntegration {
|
||||
}
|
||||
}
|
||||
|
||||
async pushConsoleLog({ level, message, metadata }) {
|
||||
async flushPendingConsoleLogs() {
|
||||
if (!this.isEnabled()) return;
|
||||
if (!this.client || !this.client.isReady) return;
|
||||
if (this._flushingConsoleLogs) return;
|
||||
if (!this._pendingConsoleLogs.length) return;
|
||||
|
||||
this._flushingConsoleLogs = true;
|
||||
try {
|
||||
const key = this.getConsoleKey();
|
||||
while (this._pendingConsoleLogs.length) {
|
||||
const batch = this._pendingConsoleLogs.splice(0, 200);
|
||||
await this.client.rPush(key, ...batch);
|
||||
|
||||
const maxLen = this.getConsoleMaxLen();
|
||||
if (maxLen) {
|
||||
await this.client.lTrim(key, -maxLen, -1);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this._flushingConsoleLogs = false;
|
||||
}
|
||||
}
|
||||
|
||||
async pushConsoleLog({ level, message, metadata }) {
|
||||
if (!this.isEnabled()) return;
|
||||
const normalizedLevel = String(level ?? '').toLowerCase();
|
||||
if (!this.client || !this.client.isReady) {
|
||||
if (this._pendingConsoleLogs.length < 5000) {
|
||||
const entry = {
|
||||
timestamp: new Date().toISOString(),
|
||||
level: normalizedLevel,
|
||||
message,
|
||||
metadata: metadata ?? undefined,
|
||||
};
|
||||
const value = JSON.stringify(entry);
|
||||
if (Buffer.byteLength(value, 'utf8') <= 64 * 1024) {
|
||||
this._pendingConsoleLogs.push(value);
|
||||
}
|
||||
}
|
||||
this.ensureConnectedInBackground();
|
||||
return;
|
||||
}
|
||||
|
||||
const entry = {
|
||||
timestamp: new Date().toISOString(),
|
||||
level,
|
||||
level: normalizedLevel,
|
||||
message,
|
||||
metadata: metadata ?? undefined,
|
||||
};
|
||||
|
||||
102
src/stats/statsManager.js
Normal file
102
src/stats/statsManager.js
Normal file
@@ -0,0 +1,102 @@
|
||||
class StatsCounters {
|
||||
constructor() {
|
||||
this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 3);
|
||||
this._minute = new BigInt64Array(this._minuteBuf);
|
||||
}
|
||||
|
||||
incDbWritten(n = 1) {
|
||||
const v = BigInt(Math.max(0, Number(n) || 0));
|
||||
if (v === 0n) return;
|
||||
Atomics.add(this._minute, 0, v);
|
||||
}
|
||||
|
||||
incFiltered(n = 1) {
|
||||
const v = BigInt(Math.max(0, Number(n) || 0));
|
||||
if (v === 0n) return;
|
||||
Atomics.add(this._minute, 1, v);
|
||||
}
|
||||
|
||||
incKafkaPulled(n = 1) {
|
||||
const v = BigInt(Math.max(0, Number(n) || 0));
|
||||
if (v === 0n) return;
|
||||
Atomics.add(this._minute, 2, v);
|
||||
}
|
||||
|
||||
snapshotAndResetMinute() {
|
||||
const dbWritten = Atomics.exchange(this._minute, 0, 0n);
|
||||
const filtered = Atomics.exchange(this._minute, 1, 0n);
|
||||
const kafkaPulled = Atomics.exchange(this._minute, 2, 0n);
|
||||
return { dbWritten, filtered, kafkaPulled };
|
||||
}
|
||||
}
|
||||
|
||||
const pad2 = (n) => String(n).padStart(2, '0');
|
||||
const pad3 = (n) => String(n).padStart(3, '0');
|
||||
|
||||
const formatTimestamp = (d) => {
|
||||
const year = d.getFullYear();
|
||||
const month = pad2(d.getMonth() + 1);
|
||||
const day = pad2(d.getDate());
|
||||
const hour = pad2(d.getHours());
|
||||
const minute = pad2(d.getMinutes());
|
||||
const second = pad2(d.getSeconds());
|
||||
const ms = pad3(d.getMilliseconds());
|
||||
return `${year}-${month}-${day} ${hour}:${minute}:${second}.${ms}`;
|
||||
};
|
||||
|
||||
class StatsReporter {
|
||||
constructor({ redis, stats }) {
|
||||
this.redis = redis;
|
||||
this.stats = stats;
|
||||
this._timer = null;
|
||||
this._running = false;
|
||||
}
|
||||
|
||||
start() {
|
||||
if (this._running) return;
|
||||
this._running = true;
|
||||
this._scheduleNext();
|
||||
}
|
||||
|
||||
stop() {
|
||||
this._running = false;
|
||||
if (this._timer) {
|
||||
clearTimeout(this._timer);
|
||||
this._timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
flushOnce() {
|
||||
if (!this.redis?.isEnabled?.()) return;
|
||||
const { dbWritten, filtered, kafkaPulled } = this.stats.snapshotAndResetMinute();
|
||||
const ts = formatTimestamp(new Date());
|
||||
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}条`, metadata: { module: 'stats' } });
|
||||
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}条`, metadata: { module: 'stats' } });
|
||||
this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}条`, metadata: { module: 'stats' } });
|
||||
}
|
||||
|
||||
_scheduleNext() {
|
||||
if (!this._running) return;
|
||||
if (this._timer) return;
|
||||
|
||||
const now = Date.now();
|
||||
const delay = 60_000 - (now % 60_000);
|
||||
this._timer = setTimeout(() => {
|
||||
this._timer = null;
|
||||
try {
|
||||
this.flushOnce();
|
||||
} catch (err) {
|
||||
this.redis?.pushConsoleLog?.({
|
||||
level: 'warn',
|
||||
message: `[ERROR] ${formatTimestamp(new Date())} 统计任务异常: ${String(err?.message ?? err)}`,
|
||||
metadata: { module: 'stats' },
|
||||
});
|
||||
} finally {
|
||||
this._scheduleNext();
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
}
|
||||
|
||||
export { StatsCounters, StatsReporter, formatTimestamp };
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js';
|
||||
import { RedisIntegration } from '../src/redis/redisIntegration.js';
|
||||
|
||||
describe('HeartbeatProcessor smoke', () => {
|
||||
it('decodes JSON buffer into object', () => {
|
||||
@@ -25,3 +26,108 @@ describe('HeartbeatProcessor smoke', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('RedisIntegration protocol', () => {
|
||||
it('writes heartbeat to 项目心跳 LIST', async () => {
|
||||
const redis = new RedisIntegration({
|
||||
enabled: true,
|
||||
projectName: 'BLS主机心跳日志',
|
||||
apiBaseUrl: 'http://127.0.0.1:3000',
|
||||
});
|
||||
|
||||
const calls = { rPush: [], lTrim: [] };
|
||||
redis.client = {
|
||||
isReady: true,
|
||||
rPush: async (key, value) => {
|
||||
calls.rPush.push({ key, value });
|
||||
},
|
||||
lTrim: async (key, start, stop) => {
|
||||
calls.lTrim.push({ key, start, stop });
|
||||
},
|
||||
};
|
||||
|
||||
const before = Date.now();
|
||||
await redis.writeHeartbeat();
|
||||
const after = Date.now();
|
||||
|
||||
assert.equal(calls.rPush.length, 1);
|
||||
assert.equal(calls.rPush[0].key, '项目心跳');
|
||||
const payload = JSON.parse(calls.rPush[0].value);
|
||||
assert.equal(payload.projectName, 'BLS主机心跳日志');
|
||||
assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000');
|
||||
assert.equal(typeof payload.lastActiveAt, 'number');
|
||||
assert.ok(payload.lastActiveAt >= before && payload.lastActiveAt <= after);
|
||||
|
||||
assert.equal(calls.lTrim.length, 1);
|
||||
assert.deepEqual(calls.lTrim[0], { key: '项目心跳', start: -2000, stop: -1 });
|
||||
});
|
||||
|
||||
it('caches heartbeat when redis is not ready and flushes later', async () => {
|
||||
const redis = new RedisIntegration({
|
||||
enabled: true,
|
||||
projectName: 'BLS主机心跳日志',
|
||||
apiBaseUrl: 'http://127.0.0.1:3000',
|
||||
});
|
||||
|
||||
const calls = { rPush: [], lTrim: [] };
|
||||
redis.client = {
|
||||
isReady: false,
|
||||
connect: async () => {},
|
||||
rPush: async (key, value) => {
|
||||
calls.rPush.push({ key, value });
|
||||
},
|
||||
lTrim: async (key, start, stop) => {
|
||||
calls.lTrim.push({ key, start, stop });
|
||||
},
|
||||
};
|
||||
|
||||
await redis.writeHeartbeat();
|
||||
assert.ok(redis._pendingHeartbeat);
|
||||
|
||||
redis.client.isReady = true;
|
||||
await redis.flushPendingHeartbeat();
|
||||
|
||||
assert.equal(redis._pendingHeartbeat, null);
|
||||
assert.equal(calls.rPush.length, 1);
|
||||
assert.equal(calls.rPush[0].key, '项目心跳');
|
||||
const payload = JSON.parse(calls.rPush[0].value);
|
||||
assert.equal(payload.projectName, 'BLS主机心跳日志');
|
||||
assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000');
|
||||
assert.equal(typeof payload.lastActiveAt, 'number');
|
||||
assert.equal(calls.lTrim.length, 1);
|
||||
});
|
||||
|
||||
it('buffers console logs when redis is not ready', async () => {
|
||||
const redis = new RedisIntegration({
|
||||
enabled: true,
|
||||
projectName: 'BLS主机心跳日志',
|
||||
apiBaseUrl: 'http://127.0.0.1:3000',
|
||||
});
|
||||
|
||||
const calls = { rPush: [], lTrim: [] };
|
||||
redis.client = {
|
||||
isReady: false,
|
||||
connect: async () => {},
|
||||
rPush: async (key, ...values) => {
|
||||
calls.rPush.push({ key, values });
|
||||
},
|
||||
lTrim: async (key, start, stop) => {
|
||||
calls.lTrim.push({ key, start, stop });
|
||||
},
|
||||
};
|
||||
|
||||
await redis.info('hello', { module: 'test' });
|
||||
assert.equal(redis._pendingConsoleLogs.length, 1);
|
||||
|
||||
redis.client.isReady = true;
|
||||
await redis.flushPendingConsoleLogs();
|
||||
|
||||
assert.equal(redis._pendingConsoleLogs.length, 0);
|
||||
assert.equal(calls.rPush.length, 1);
|
||||
assert.equal(calls.rPush[0].key, 'BLS主机心跳日志_项目控制台');
|
||||
assert.equal(calls.rPush[0].values.length, 1);
|
||||
const entry = JSON.parse(calls.rPush[0].values[0]);
|
||||
assert.equal(entry.level, 'info');
|
||||
assert.equal(entry.message, 'hello');
|
||||
assert.equal(entry.metadata.module, 'test');
|
||||
});
|
||||
});
|
||||
|
||||
70
test/stats.test.js
Normal file
70
test/stats.test.js
Normal file
@@ -0,0 +1,70 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { StatsCounters, StatsReporter } from '../src/stats/statsManager.js';
|
||||
import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js';
|
||||
|
||||
describe('StatsCounters', () => {
|
||||
it('snapshots and resets minute counters atomically', () => {
|
||||
const stats = new StatsCounters();
|
||||
stats.incDbWritten(3);
|
||||
stats.incFiltered(2);
|
||||
stats.incKafkaPulled(5);
|
||||
|
||||
const first = stats.snapshotAndResetMinute();
|
||||
assert.equal(first.dbWritten, 3n);
|
||||
assert.equal(first.filtered, 2n);
|
||||
assert.equal(first.kafkaPulled, 5n);
|
||||
|
||||
const second = stats.snapshotAndResetMinute();
|
||||
assert.equal(second.dbWritten, 0n);
|
||||
assert.equal(second.filtered, 0n);
|
||||
assert.equal(second.kafkaPulled, 0n);
|
||||
});
|
||||
});
|
||||
|
||||
describe('StatsReporter', () => {
|
||||
it('writes three [STATS] info logs to redis console', () => {
|
||||
const stats = new StatsCounters();
|
||||
stats.incDbWritten(7);
|
||||
stats.incFiltered(8);
|
||||
stats.incKafkaPulled(9);
|
||||
|
||||
const calls = { push: [] };
|
||||
const redis = {
|
||||
isEnabled: () => true,
|
||||
pushConsoleLog: ({ level, message, metadata }) => {
|
||||
calls.push.push({ level, message, metadata });
|
||||
},
|
||||
};
|
||||
|
||||
const reporter = new StatsReporter({ redis, stats });
|
||||
reporter.flushOnce();
|
||||
|
||||
assert.equal(calls.push.length, 3);
|
||||
assert.equal(calls.push[0].level, 'info');
|
||||
assert.equal(calls.push[1].level, 'info');
|
||||
assert.equal(calls.push[2].level, 'info');
|
||||
assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/);
|
||||
assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/);
|
||||
assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('HeartbeatProcessor db write error logging', () => {
|
||||
it('emits [ERROR] warn log with raw data', () => {
|
||||
const calls = { warn: [] };
|
||||
const redis = {
|
||||
isEnabled: () => true,
|
||||
pushConsoleLog: ({ level, message }) => {
|
||||
if (level === 'warn') calls.warn.push(message);
|
||||
},
|
||||
};
|
||||
|
||||
const processor = new HeartbeatProcessor({ batchSize: 1, batchTimeout: 10 }, {}, { redis });
|
||||
processor._emitDbWriteError(new Error('boom'), [{ a: 1 }]);
|
||||
|
||||
assert.equal(calls.warn.length >= 1, true);
|
||||
assert.match(calls.warn[0], /^\[ERROR\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} db_write_failed: /);
|
||||
assert.match(calls.warn[0], /"errorId":"db_write_failed"/);
|
||||
assert.match(calls.warn[0], /"rawData":\{"a":1\}/);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user