From 41301f9ce550ecf10570840425b8aa228549e97c Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Sat, 17 Jan 2026 18:37:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0GUID=E4=B8=BB?= =?UTF-8?q?=E9=94=AE=E4=B8=8Eservice=5Fmask=E7=B4=A2=E5=BC=95=E6=94=B9?= =?UTF-8?q?=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将主键从自增id改为GUID格式并添加格式校验 - 为service_mask添加表达式索引优化首位查询性能 - 更新相关文档说明改造方案与验证步骤 - 添加统计模块记录数据库写入与Kafka消费量 - 重构Redis心跳协议改用LIST类型存储项目状态 - 修复部署脚本中的服务名称不一致问题 --- .eslintrc.cjs | 2 +- .../db-guid-pk-and-service-mask-index-plan.md | 40 ++++ docs/db-heartbeat-schema.md | 4 +- docs/db-openspec-compliance.md | 6 +- ...b-service-mask-index-performance-report.md | 81 ++++++++ docs/kafka-heartbeat-producer.md | 6 +- docs/redis-integration-protocol.md | 195 +++++++++++++++--- ecosystem.config.js | 31 --- openspec/specs/db/spec.md | 3 +- scripts/db/010_heartbeat_schema.sql | 12 +- scripts/db/020_partitioning_auto_daily.sql | 1 + scripts/deploy.bat | 8 +- scripts/package.bat | 8 +- scripts/update.bat | 12 +- src/db/databaseManager.js | 12 +- src/index.js | 17 +- src/processor/heartbeatProcessor.js | 108 +++++++++- src/redis/redisIntegration.js | 110 +++++++++- src/stats/statsManager.js | 102 +++++++++ test/smoke.test.js | 106 ++++++++++ test/stats.test.js | 70 +++++++ 21 files changed, 828 insertions(+), 106 deletions(-) create mode 100644 docs/db-guid-pk-and-service-mask-index-plan.md create mode 100644 docs/db-service-mask-index-performance-report.md delete mode 100644 ecosystem.config.js create mode 100644 src/stats/statsManager.js create mode 100644 test/stats.test.js diff --git a/.eslintrc.cjs b/.eslintrc.cjs index 8f16655..68e9fcf 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -17,6 +17,6 @@ module.exports = { sourceType: 'module', }, extends: ['eslint:recommended'], - ignorePatterns: ['dist/', 'build/', 'coverage/', 'node_modules/'], + ignorePatterns: ['dist/', 'build/', 'coverage/', 'node_modules/', 'release/'], rules: {}, }; diff --git a/docs/db-guid-pk-and-service-mask-index-plan.md b/docs/db-guid-pk-and-service-mask-index-plan.md new file mode 100644 index 0000000..5b83635 --- /dev/null +++ b/docs/db-guid-pk-and-service-mask-index-plan.md @@ -0,0 +1,40 @@ +# GUID 主键与 service_mask 索引改造实施方案 + +## 目标 +- 将 `heartbeat.heartbeat_events` 的主键从自增 `id` 改为 GUID(32 位无连字符 HEX 字符串)。 +- 为 `service_mask` 的“首位(最低位)判断”新增表达式索引 `idx_service_mask_first_bit`,用于优化常见 bit 查询。 +- 说明:你会删除现有数据库,因此不提供在线迁移流程,仅提供可重建的初始化脚本与验证步骤。 + +## 方案概述 +### 1) 主键(GUID) +- 表字段 `guid` 使用 `varchar(32)` 并设置默认值 `replace(gen_random_uuid()::text, '-', '')`。 +- 通过 `CHECK (guid ~ '^[0-9a-f]{32}$')` 约束输入格式为 32 位小写 HEX。 +- 分区表使用组合主键 `PRIMARY KEY (ts_ms, guid)` 以满足 PostgreSQL 分区唯一约束要求(主键/唯一约束需包含分区键)。 + +### 2) service_mask 首位查询表达式索引 +- 在父表创建索引: + - `CREATE INDEX idx_service_mask_first_bit ON heartbeat.heartbeat_events ((service_mask & 1));` +- 在按天分区创建索引: + - `idx__service_mask_first_bit` + +## 实施步骤(重建式) +1. 删除旧数据库/旧 schema(你将执行)。 +2. 执行建表脚本: + - `scripts/db/010_heartbeat_schema.sql` + - `scripts/db/020_partitioning_auto_daily.sql` +3. 启动服务或执行 `npm run db:apply` 进行初始化。 +4. 执行验证项: + - 确认 `heartbeat.heartbeat_events.guid` 为 `varchar(32)` 且存在默认值 + - 确认存在 `idx_service_mask_first_bit` + - 确认分区新建后存在 `idx__service_mask_first_bit` + +## 风险评估 +- GUID 默认生成依赖 `pgcrypto` 扩展:脚本已包含 `CREATE EXTENSION IF NOT EXISTS pgcrypto;`,但执行账号需要具备安装扩展权限。 +- 分区表主键约束限制:无法实现“父表仅 guid 作为主键约束”,因此使用 `(ts_ms, guid)` 的组合主键。 +- 表达式索引的表达式匹配:业务查询需要匹配索引表达式(详见 checklist),否则无法命中索引。 + +## 回滚步骤(重建式) +1. 停止服务写入。 +2. `DROP SCHEMA heartbeat CASCADE;`(或删除数据库)。 +3. 使用回滚版本的 SQL 脚本重新创建(例如回退到 `id bigserial` 版本的脚本)。 +4. 重新启动服务并验证写入与查询。 diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md index 6f2fb0a..96d3e6e 100644 --- a/docs/db-heartbeat-schema.md +++ b/docs/db-heartbeat-schema.md @@ -19,7 +19,7 @@ ### 2.1 字段列表 | 字段 | 类型 | 必填 | 说明 | |---|---|---:|---| -| id | bigserial | 否(自动生成) | 自增序列号(写入时可不提供) | +| guid | varchar(32) | 是(自动生成) | GUID(32 位无连字符 HEX,小写;自动生成) | | ts_ms | bigint | 是 | 毫秒级时间戳(epoch ms) | | hotel_id | int2 | 是 | 酒店编号 | | room_id | varchar(50) | 是 | 房间编号(或房间唯一标识,按字符串存储) | @@ -59,7 +59,7 @@ 需求写“主键:id(bigserial)”,但 **PostgreSQL 分区表的主键/唯一约束通常必须包含分区键**。 脚本采用: -- `PRIMARY KEY (ts_ms, id)` +- `PRIMARY KEY (ts_ms, guid)` 原因:保证分区表可创建、约束可落地。 diff --git a/docs/db-openspec-compliance.md b/docs/db-openspec-compliance.md index c2e8eb1..1e8b724 100644 --- a/docs/db-openspec-compliance.md +++ b/docs/db-openspec-compliance.md @@ -11,13 +11,13 @@ ## 2. 偏差与风险(需要评估) ### 2.1 “主键仅 id” 与 PostgreSQL 分区约束冲突 -- 需求写:主键为 `id (bigserial)`。 -- 现实现:`PRIMARY KEY (ts_ms, id)`。 +- 需求写:主键为 `id`(GUID)。 +- 现实现:`PRIMARY KEY (ts_ms, guid)`(其中 guid 为 varchar(32))。 原因:PostgreSQL 分区表的主键/唯一约束通常需要包含分区键,否则无法在父表创建全局约束。 影响: -- 业务若强依赖“仅 id 即主键”的语义,需要额外约定(例如只把 id 当作全局唯一序列号使用,主键组合用于物理约束)。 +- 业务若强依赖“仅 guid 即主键”的语义,需要额外约定(例如只把 guid 当作全局唯一序列号使用,主键组合用于物理约束)。 ### 2.2 “自动分区”实现方式 - 需求写:新分区可自动创建。 diff --git a/docs/db-service-mask-index-performance-report.md b/docs/db-service-mask-index-performance-report.md new file mode 100644 index 0000000..afb6baf --- /dev/null +++ b/docs/db-service-mask-index-performance-report.md @@ -0,0 +1,81 @@ +# service_mask 索引性能对比报告 + +## 结论摘要(如何判定“提升”) +- 在 `service_mask` 位运算过滤场景中,新增 `idx_service_mask_first_bit` 后,查询计划应从 `Seq Scan` 转为 `Index Scan/Bitmap Index Scan`(并触发分区裁剪时,仅扫描命中的分区)。 +- 若过滤选择性较高(例如仅小比例满足 “&1=1”),通常能显著降低 IO 与响应时间。 + +## 索引脚本 +- 结构与索引创建包含在: + - `scripts/db/010_heartbeat_schema.sql` + - `scripts/db/020_partitioning_auto_daily.sql` +- 关键语句: + - `CREATE INDEX idx_service_mask_first_bit ON heartbeat.heartbeat_events ((service_mask & 1));` + +## 测试方法(可复现) +### 1) 数据准备 +建议在测试库中写入至少 100 万行,保证计划稳定: + +```sql +SELECT heartbeat.ensure_partitions(current_date, current_date + 1); + +INSERT INTO heartbeat.heartbeat_events ( + ts_ms, hotel_id, room_id, device_id, ip, + power_state, guest_type, cardless_state, service_mask, + pms_state, carbon_state, device_count, comm_seq, extra +) +SELECT + (extract(epoch from now()) * 1000)::bigint + (g % 86400000), + (g % 1000)::int2, + ('R' || (g % 500))::varchar(50), + ('D' || (g % 20000))::varchar(64), + '127.0.0.1:1', + 1, 0, 0, + (g % 1024)::bigint, + 0, 0, 1, (g % 100000)::int4, + jsonb_build_object('src','bench') +FROM generate_series(1, 1000000) g; + +ANALYZE heartbeat.heartbeat_events; +``` + +### 2) 对比维度 +- 响应时间:`EXPLAIN (ANALYZE, BUFFERS)` 的 `Execution Time` +- QPS:用同一 SQL 连续执行 N 次(例如 1000 次)并统计平均耗时(由压测端计算) +- 查询形态: + - 形态 A(位运算表达式,命中表达式索引): + - `WHERE (service_mask & 1) = 1` + +## 修改前/后对比(需要在目标环境产出) +说明:不同硬件/数据分布/缓存命中会导致数值差异。以下对比表格请在你的环境执行本报告“测试方法”并填入实际输出。 + +### A. 修改前(无 idx_service_mask_first_bit) +- 查询计划(期望:Seq Scan 或 Bitmap Heap Scan + BRIN): + +```sql +EXPLAIN (ANALYZE, BUFFERS) +SELECT count(*) +FROM heartbeat.heartbeat_events +WHERE (service_mask & 1) = 1; +``` + +- 结果记录: + - QPS(1000 次):____ + - P50 响应时间(ms):____ + - P95 响应时间(ms):____ + +### B. 修改后(有 idx_service_mask_first_bit) +```sql +EXPLAIN (ANALYZE, BUFFERS) +SELECT count(*) +FROM heartbeat.heartbeat_events +WHERE (service_mask & 1) = 1; +``` + +- 结果记录: + - 位运算表达式命中索引(期望:Index/Bitmap Index Scan): + - QPS(1000 次):____ + - P50(ms):____ + - P95(ms):____ + +## 结论与建议 +- 业务使用 `WHERE (service_mask & 1) = 1` 可直接命中 `idx_service_mask_first_bit`。 diff --git a/docs/kafka-heartbeat-producer.md b/docs/kafka-heartbeat-producer.md index 87e2ab3..21492f7 100644 --- a/docs/kafka-heartbeat-producer.md +++ b/docs/kafka-heartbeat-producer.md @@ -36,7 +36,7 @@ ### 3.2 可选字段 | 字段 | 类型 | 示例 | 说明 | |---|---|---|---| -| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 | +| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:原文、版本等其他自定义字段 | | electricity | array | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) | | air_conditioner | array | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) | @@ -79,10 +79,8 @@ } ], "extra": { - "source": "gw", "ver": "1.2.3", - "ac": {"mode": 1, "set_temp": 26}, - "meter": {"p": 123.4, "e_wh": 5678} + "original_byte": "0x12345678" } } diff --git a/docs/redis-integration-protocol.md b/docs/redis-integration-protocol.md index 01afe30..cd47f84 100644 --- a/docs/redis-integration-protocol.md +++ b/docs/redis-integration-protocol.md @@ -1,11 +1,34 @@ -# Redis 对接协议(供 AI 生成代码使用) +# Redis 对接协议(供外部项目 AI 生成代码使用) -本文档定义“外部项目 ↔ BLS Project Console”之间通过 Redis 交互的 **Key 命名、数据类型、写入方式、读取方式与数据格式**。 +本文档定义"外部项目 ↔ BLS Project Console"之间通过 Redis 交互的 **Key 命名、数据类型、写入方式、读取方式与数据格式**。 -> 约束:每个需要关联本控制台的外部项目,必须在本项目使用的同一个 Redis 实例中: -> -> - 写入 2 个 Key(心跳 + 控制台信息) -> - 命令下发为 HTTP API 调用 +注:本仓库对外暴露的 Redis 连接信息如下(供对方直接连接以写入心跳/日志): + +- 地址:`10.8.8.109` +- 端口:默认 `6379` +- 密码:无(空) +- 数据库:固定 `15` + +示例(环境变量): + +``` +REDIS_HOST=10.8.8.109 +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=15 +``` + +示例(redis-cli): + +``` +redis-cli -h 10.8.8.109 -p 6379 -n 15 +``` + +> 约束:每个需要关联本控制台的外部项目,必须在同一个 Redis(DB15)中: + +> - 更新 `项目心跳`(项目列表 + 心跳信息) +> - 追加 `${projectName}_项目控制台`(日志队列) +> - 命令下发为 HTTP API 调用(不通过 Redis 下发命令) ## 1. 命名约定 @@ -15,35 +38,45 @@ 固定后缀: -- 心跳:`${projectName}_项目心跳` - 控制台:`${projectName}_项目控制台` 示例(projectName = `订单系统`): -- `订单系统_项目心跳` - `订单系统_项目控制台` ## 2. 外部项目需要写入的 2 个 Key -### 2.1 `${projectName}_项目心跳` +说明:当前控制台左侧“项目选择列表”只读取 `项目心跳`(LIST)。因此外部项目必须维护该 Key,否则项目不会出现在列表中。 -- Redis 数据类型:**STRING** -- 写入方式:`SET ${projectName}_项目心跳 ` -- value:JSON 字符串,必须包含目标项目可被调用的 `apiBaseUrl`,以及活跃时间戳 `lastActiveAt` +### 2.1 `项目心跳` -推荐 JSON Schema: +- Redis 数据类型:**LIST** +- 写入方式(推荐 FIFO):`RPUSH 项目心跳 ` +- value:每个列表元素为“项目心跳记录”的 JSON 字符串 + +示例(与当前代码读取一致;下面示例表示“逻辑结构”): ```json -{ - "apiBaseUrl": "http://127.0.0.1:4001", - "lastActiveAt": 1760000000000 -} +[ + { + "projectName": "BLS主机心跳日志", + "apiBaseUrl": "http://127.0.0.1:3000", + "lastActiveAt": 1768566165572 + } +] ``` -字段说明: +示例(Redis 写入命令): +``` +RPUSH 项目心跳 "{\"projectName\":\"BLS主机心跳日志\",\"apiBaseUrl\":\"http://127.0.0.1:3000\",\"lastActiveAt\":1768566165572}" +``` + +字段说明(每条心跳记录): + +- `projectName`:项目名称(用于拼接日志 Key:`${projectName}_项目控制台`) - `apiBaseUrl`:目标项目对外提供的 API 地址(基地址,后端将基于它拼接 `apiName`) -- `lastActiveAt`:状态时间(活跃时间戳,毫秒)。建议每 **3 秒**刷新一次。 +- `lastActiveAt`:活跃时间戳(毫秒)。建议每 **3 秒**刷新一次。 在线/离线判定(BLS Project Console 使用): @@ -53,14 +86,19 @@ 建议: - `lastActiveAt` 使用 `Date.now()` 生成(毫秒) -- 可设置 TTL(可选):例如 `SET key value EX 30` +- 建议对 `项目心跳` 做长度控制(可选):例如每次写入后执行 `LTRIM 项目心跳 -2000 -1` 保留最近 2000 条 + +去重提示: + +- `项目心跳` 为 LIST 时,外部项目周期性 `RPUSH` 会产生多条重复记录 +- BLS Project Console 后端会按 `projectName` 去重,保留 `lastActiveAt` 最新的一条作为项目状态 ### 2.2 `${projectName}_项目控制台` -- Redis 数据类型:**LIST**(作为项目向控制台追加的“消息队列/日志队列”) +- Redis 数据类型:**LIST**(作为项目向控制台追加的"消息队列/日志队列") - 写入方式(推荐 FIFO):`RPUSH ${projectName}_项目控制台 ` -value(推荐格式):一条 JSON 字符串,表示“错误/调试信息”或日志记录。 +value(推荐格式):一条 JSON 字符串,表示"错误/调试信息"或日志记录。 推荐 JSON Schema(字段尽量保持稳定,便于控制台解析): @@ -83,11 +121,59 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息 - `message`:日志文本 - `metadata`:可选对象(附加信息) -## 3. 命令下发方式(HTTP API 控制) +## 3. 项目列表管理(重要) + +### 3.1 迁移机制(仅用于旧数据导入) + +BLS Project Console 支持从旧格式自动迁移到新格式: + +- **旧格式**:每个项目独立的心跳键 `${projectName}_项目心跳` +- **新格式**:统一的项目列表键 `项目心跳`(LIST 类型,每个元素为 JSON 字符串) + +迁移过程: + +1. 扫描所有 `${projectName}_项目心跳` 键 +2. 提取 `apiBaseUrl` 和 `lastActiveAt` 字段 +3. 写入到 `项目心跳`(LIST) +4. 可选:删除旧键 + +重要说明(与当前代码实现一致): + +- 迁移不会自动后台执行,需要通过接口触发:`POST /api/projects/migrate` +- 迁移的目的只是“从历史 `${projectName}_项目心跳` 导入一次,生成 `项目心跳` 列表” +- 迁移完成后,如果外部项目仍然只更新旧 Key,则 `项目心跳` 不会自动跟随更新;要想实时更新,外部项目必须直接更新 `项目心跳` + +### 3.2 新格式项目列表结构 + +`项目心跳` 为 LIST,列表元素为 JSON 字符串;其“逻辑结构”如下: + +```json +[ + { + "projectName": "订单系统", + "apiBaseUrl": "http://127.0.0.1:4001", + "lastActiveAt": 1760000000000 + }, + { + "projectName": "用户服务", + "apiBaseUrl": "http://127.0.0.1:4002", + "lastActiveAt": 1760000000001 + } +] +``` + +### 3.3 外部项目对接建议 + +外部项目应当: + +1. 定期写入 `项目心跳`(RPUSH 自己的心跳记录;允许产生多条记录,由控制台按 projectName 去重) +2. 追加 `${projectName}_项目控制台` 日志 + +## 4. 命令下发方式(HTTP API 控制) 控制台不再通过 Redis 写入控制指令队列;改为由 BLS Project Console 后端根据目标项目心跳里的 `apiBaseUrl` 直接调用目标项目 HTTP API。 -### 3.1 控制台输入格式 +### 4.1 控制台输入格式 一行文本按空格拆分: @@ -100,7 +186,7 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息 - `reload force` - `user/refreshCache tenantA` -### 3.2 目标项目需要提供的 API +### 4.2 目标项目需要提供的 API 后端默认使用 `POST` 调用: @@ -119,18 +205,69 @@ value(推荐格式):一条 JSON 字符串,表示“错误/调试信息 } ``` +字段说明: + +- `commandId`:唯一命令标识符 +- `timestamp`:命令发送时间(ISO-8601 格式) +- `source`:命令来源标识 +- `apiName`:API 接口名 +- `args`:参数数组 +- `argsText`:参数文本(空格连接) + 返回建议: - 2xx 表示成功 - 非 2xx 表示失败(控制台会展示 upstreamStatus 与部分返回内容) -## 4. 兼容与错误处理建议 +### 4.3 在线/离线判定 + +发送命令前,系统会检查项目在线状态: + +- 从 `项目心跳` 列表读取 `lastActiveAt` +- 若 `now - lastActiveAt > 10_000ms`,则认为该应用 **离线**,拒绝发送命令 +- 否则认为 **在线**,允许发送命令 + +## 5. 与本项目代码的对应关系 + +- **后端 `/api/projects`**:只从 `项目心跳`(LIST)读取项目列表,返回所有项目及其在线状态 +- **后端 `/api/commands`**:从 `项目心跳` 中查找目标项目的 `apiBaseUrl/lastActiveAt`,在线时调用目标项目 API +- **后端 `/api/logs`**:读取 `${projectName}_项目控制台`(LIST);并基于 `项目心跳` 中该项目的 `lastActiveAt` 计算在线/离线与 API 地址信息 + +## 6. 兼容与错误处理建议 - JSON 解析失败:外部项目应记录错误,并丢弃该条消息(避免死循环阻塞消费)。 - 消息过长:建议控制单条消息大小(例如 < 64KB)。 - 字符编码:统一 UTF-8。 +- 心跳超时:建议外部项目每 3 秒更新一次心跳,避免被误判为离线。 -## 5. 与本项目代码的对应关系(实现中) +## 7. 数据迁移工具(旧数据导入) -- 后端通过 `/api/commands`:从 `${targetProjectName}_项目心跳` 读取 `apiBaseUrl` 与 `lastActiveAt`,在线时调用目标项目 API。 -- 后端通过 `/api/logs`:读取 `${projectName}_项目控制台`;并基于 `${projectName}_项目心跳` 返回在线/离线与 API 地址信息。 +如果需要从旧格式迁移到新格式,可使用以下 API: + +```bash +POST /api/projects/migrate +Content-Type: application/json + +{ + "deleteOldKeys": false, + "dryRun": false +} +``` + +参数说明: + +- `deleteOldKeys`:是否删除旧格式键(默认 false) +- `dryRun`:是否仅模拟运行(默认 false) + +返回示例: + +```json +{ + "success": true, + "message": "数据迁移完成", + "migrated": 2, + "projects": [...], + "listKey": "项目心跳", + "deleteOldKeys": false +} +``` diff --git a/ecosystem.config.js b/ecosystem.config.js deleted file mode 100644 index 98f925f..0000000 --- a/ecosystem.config.js +++ /dev/null @@ -1,31 +0,0 @@ -module.exports = { - apps: [{ - name: 'bls-heartbeat-server', - script: 'dist/index.es.js', - instances: 'max', - exec_mode: 'cluster', - autorestart: true, - watch: false, - max_memory_restart: '1G', - env: { - NODE_ENV: 'production', - PORT: 3000 - }, - env_development: { - NODE_ENV: 'development', - PORT: 3000 - }, - error_file: './logs/error.log', - out_file: './logs/out.log', - log_date_format: 'YYYY-MM-DD HH:mm:ss Z', - merge_logs: true, - time: true, - min_uptime: '10s', - max_restarts: 10, - restart_delay: 4000, - kill_timeout: 5000, - listen_timeout: 10000, - wait_ready: true, - shutdown_with_message: true - }] -}; \ No newline at end of file diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index b5c7e9a..d212bd0 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -51,7 +51,8 @@ - **THEN** 应该存在按 `ts_ms` 日分区的心跳明细表 - **AND** 必填字段应具备 NOT NULL 约束 - **AND** 状态类字段应具备 CHECK 约束(限制取值范围) -- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN) +- **AND** 主键应采用 GUID(32 位无连字符 HEX 字符串)并具备格式 CHECK +- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN;service_mask 首位查询表达式索引 idx_service_mask_first_bit) #### Scenario: 自动分区 - **WHEN** 写入某天数据而该日分区不存在 diff --git a/scripts/db/010_heartbeat_schema.sql b/scripts/db/010_heartbeat_schema.sql index 96aaa92..d57eb8f 100644 --- a/scripts/db/010_heartbeat_schema.sql +++ b/scripts/db/010_heartbeat_schema.sql @@ -3,13 +3,15 @@ BEGIN; +CREATE EXTENSION IF NOT EXISTS pgcrypto; + CREATE SCHEMA IF NOT EXISTS heartbeat; -- 主表(按 ts_ms 日分区) -- 说明:PostgreSQL 分区表的 PRIMARY KEY 通常需要包含分区键。 --- 这里使用 (ts_ms, id) 作为主键以保证可创建且可执行。 +-- 这里使用 (ts_ms, guid) 作为主键以保证可创建且可执行。 CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( - id bigserial, + guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), ts_ms bigint NOT NULL, hotel_id int2 NOT NULL, @@ -43,7 +45,8 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( -- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra) extra jsonb, - CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), + CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid), + CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'), -- CHECK 约束:先做“非负+上界”约束(避免未来枚举扩展导致写入失败) CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), @@ -84,6 +87,9 @@ CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat -- 说明:BRIN 对“随时间递增且有相关性”的列收益更大;service_mask 若不具备相关性,收益可能有限。 CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); +CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit +ON heartbeat.heartbeat_events ((service_mask & 1)); + -- 高价值附加索引(不在需求强制列表内):常见查询是 hotel_id + 时间范围 -- 若不希望额外索引,可注释掉 CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); diff --git a/scripts/db/020_partitioning_auto_daily.sql b/scripts/db/020_partitioning_auto_daily.sql index 1eb305e..3ec1976 100644 --- a/scripts/db/020_partitioning_auto_daily.sql +++ b/scripts/db/020_partitioning_auto_daily.sql @@ -61,6 +61,7 @@ BEGIN EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; diff --git a/scripts/deploy.bat b/scripts/deploy.bat index 28485b5..93bc726 100644 --- a/scripts/deploy.bat +++ b/scripts/deploy.bat @@ -124,9 +124,9 @@ echo ======================================== echo. echo 常用命令: echo 查看状态: pm2 status -echo 查看日志: pm2 logs web-bls-heartbeat-server -echo 重启服务: pm2 restart web-bls-heartbeat-server -echo 停止服务: pm2 stop web-bls-heartbeat-server +echo 查看日志: pm2 logs bls-heartbeat +echo 重启服务: pm2 restart bls-heartbeat +echo 停止服务: pm2 stop bls-heartbeat echo 实时监控: pm2 monit echo. -pause \ No newline at end of file +pause diff --git a/scripts/package.bat b/scripts/package.bat index fa61309..ad0f0d7 100644 --- a/scripts/package.bat +++ b/scripts/package.bat @@ -67,7 +67,7 @@ echo. >> "%RELEASE_DIR%\README.txt" echo ## 文件说明 >> "%RELEASE_DIR%\README.txt" echo - dist/: 构建后的应用程序文件 >> "%RELEASE_DIR%\README.txt" echo - package.json: 项目依赖配置 >> "%RELEASE_DIR%\README.txt" -echo - ecosystem.config.js: PM2进程管理配置 >> "%RELEASE_DIR%\README.txt" +echo - ecosystem.config.cjs: PM2进程管理配置 >> "%RELEASE_DIR%\README.txt" echo - .env.example: 环境变量配置示例 >> "%RELEASE_DIR%\README.txt" echo - scripts/: 部署和更新脚本 >> "%RELEASE_DIR%\README.txt" echo - src/config/config.example.js: 配置文件示例 >> "%RELEASE_DIR%\README.txt" @@ -76,8 +76,8 @@ echo ## 常用命令 >> "%RELEASE_DIR%\README.txt" echo - 首次部署: scripts\deploy.bat >> "%RELEASE_DIR%\README.txt" echo - 更新部署: scripts\update.bat >> "%RELEASE_DIR%\README.txt" echo - 查看状态: pm2 status >> "%RELEASE_DIR%\README.txt" -echo - 查看日志: pm2 logs web-bls-heartbeat-server >> "%RELEASE_DIR%\README.txt" -echo - 重启服务: pm2 restart web-bls-heartbeat-server >> "%RELEASE_DIR%\README.txt" +echo - 查看日志: pm2 logs bls-heartbeat >> "%RELEASE_DIR%\README.txt" +echo - 重启服务: pm2 restart bls-heartbeat >> "%RELEASE_DIR%\README.txt" echo. >> "%RELEASE_DIR%\README.txt" echo 详细文档请参考 docs/deployment.md >> "%RELEASE_DIR%\README.txt" echo [成功] 说明文件已创建 @@ -92,4 +92,4 @@ dir /b "%RELEASE_DIR%" echo. echo 复制整个 %RELEASE_DIR% 文件夹到目标服务器即可 echo. -pause \ No newline at end of file +pause diff --git a/scripts/update.bat b/scripts/update.bat index 13f30a1..d8695c3 100644 --- a/scripts/update.bat +++ b/scripts/update.bat @@ -8,12 +8,12 @@ setlocal enabledelayedexpansion :: 检查服务是否运行 echo [1/6] 检查服务状态... -pm2 describe web-bls-heartbeat-server >nul 2>&1 +pm2 describe bls-heartbeat >nul 2>&1 if errorlevel 1 ( echo [警告] 服务未运行,跳过停止步骤 ) else ( echo [信息] 停止服务... - pm2 stop web-bls-heartbeat-server + pm2 stop bls-heartbeat if errorlevel 1 ( echo [错误] 服务停止失败 pause @@ -83,9 +83,9 @@ echo ======================================== echo. echo 常用命令: echo 查看状态: pm2 status -echo 查看日志: pm2 logs web-bls-heartbeat-server -echo 重启服务: pm2 restart web-bls-heartbeat-server -echo 停止服务: pm2 stop web-bls-heartbeat-server +echo 查看日志: pm2 logs bls-heartbeat +echo 重启服务: pm2 restart bls-heartbeat +echo 停止服务: pm2 stop bls-heartbeat echo 实时监控: pm2 monit echo. -pause \ No newline at end of file +pause diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 29219e8..9b20b1b 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -76,10 +76,12 @@ class DatabaseManager { const v2SchemaQuery = ` BEGIN; + CREATE EXTENSION IF NOT EXISTS pgcrypto; + CREATE SCHEMA IF NOT EXISTS heartbeat; CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( - id bigserial, + guid varchar(32) NOT NULL DEFAULT replace(gen_random_uuid()::text, '-', ''), ts_ms bigint NOT NULL, hotel_id int2 NOT NULL, @@ -112,7 +114,8 @@ class DatabaseManager { extra jsonb, - CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), + CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, guid), + CONSTRAINT chk_guid_32_hex CHECK (guid ~ '^[0-9a-f]{32}$'), CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), @@ -147,6 +150,10 @@ class DatabaseManager { CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); + + CREATE INDEX IF NOT EXISTS idx_service_mask_first_bit + ON heartbeat.heartbeat_events ((service_mask & 1)); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address); @@ -200,6 +207,7 @@ class DatabaseManager { EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (guest_type);', 'idx_'||part_name||'_guest_type', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (device_id);', 'idx_'||part_name||'_device_id', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I USING BRIN (service_mask);', 'idx_'||part_name||'_service_mask_brin', part_name); + EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I ((service_mask & 1));', 'idx_'||part_name||'_service_mask_first_bit', part_name); EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON heartbeat.%I (hotel_id, ts_ms);', 'idx_'||part_name||'_hotel_ts', part_name); END; $$; diff --git a/src/index.js b/src/index.js index f94771d..a2add06 100644 --- a/src/index.js +++ b/src/index.js @@ -4,6 +4,7 @@ import { KafkaConsumer } from './kafka/consumer.js'; import { HeartbeatProcessor } from './processor/heartbeatProcessor.js'; import { DatabaseManager } from './db/databaseManager.js'; import { RedisIntegration } from './redis/redisIntegration.js'; +import { StatsCounters, StatsReporter } from './stats/statsManager.js'; class WebBLSHeartbeatServer { constructor() { @@ -13,6 +14,8 @@ class WebBLSHeartbeatServer { this.databaseManager = null; this.redis = null; this.consumers = null; + this.stats = new StatsCounters(); + this.statsReporter = null; } async start() { @@ -21,6 +24,8 @@ class WebBLSHeartbeatServer { this.redis = new RedisIntegration(this.config.redis); await this.redis.connect(); this.redis.startHeartbeat(); + this.statsReporter = new StatsReporter({ redis: this.redis, stats: this.stats }); + this.statsReporter.start(); // 初始化数据库连接 this.databaseManager = new DatabaseManager({ ...this.config.db, maxConnections: 1 }); @@ -36,11 +41,14 @@ class WebBLSHeartbeatServer { groupId: this.config.kafka?.groupId, fromOffset: this.config.kafka?.fromOffset ?? 'latest', ssl: !!this.config.kafka?.sslEnabled, - sasl: !!this.config.kafka?.saslEnabled ? `enabled (mechanism: ${this.config.kafka?.saslMechanism})` : 'disabled' + sasl: this.config.kafka?.saslEnabled ? `enabled (mechanism: ${this.config.kafka?.saslMechanism})` : 'disabled' }); // 初始化处理器(共享批处理队列) - this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager); + this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager, { + redis: this.redis, + stats: this.stats, + }); // 在单进程内启动 N 个消费者实例(与分区数匹配) const instances = Math.max(1, Number(this.config.kafka?.consumerInstances ?? 1)); @@ -69,6 +77,11 @@ class WebBLSHeartbeatServer { async stop() { try { + if (this.statsReporter) { + this.statsReporter.stop(); + this.statsReporter = null; + } + if (this.consumers && Array.isArray(this.consumers)) { for (const { consumer } of this.consumers) { await consumer.stopConsuming(); diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index 718d3ed..8b4bbb6 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -1,10 +1,13 @@ // 心跳处理器模块 import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib'; +import { formatTimestamp } from '../stats/statsManager.js'; class HeartbeatProcessor { - constructor(config, databaseManager) { + constructor(config, databaseManager, deps = {}) { this.config = config; this.databaseManager = databaseManager; + this.redis = deps?.redis ?? null; + this.stats = deps?.stats ?? null; this.batchQueue = []; this.batchMessageQueue = []; this.batchTimer = null; @@ -13,9 +16,22 @@ class HeartbeatProcessor { async processMessage(message) { const deferred = this.createDeferred(); + this.stats?.incKafkaPulled?.(1); // 解包心跳消息 - const unpackedData = this.unpackMessage(message); + let unpackedData = null; + try { + unpackedData = this.unpackMessage(message); + } catch (err) { + this.stats?.incFiltered?.(1); + this._emitRejectedRecord({ + errorId: 'decode_failed', + error: err, + rawData: this._extractRawKafkaValue(message), + }); + deferred.resolve({ insertedCount: 0 }); + return deferred.promise; + } // 支持批量上报:message.value 可能是 JSON 数组 const items = Array.isArray(unpackedData) ? unpackedData : [unpackedData]; @@ -27,12 +43,29 @@ class HeartbeatProcessor { // 验证心跳数据 const isValid = this.validateData(effective); if (!isValid) { + this.stats?.incFiltered?.(1); + this._emitRejectedRecord({ + errorId: 'validation_failed', + rawData: { item, effective }, + }); console.error('无效的心跳数据:', effective); continue; } // 转换数据格式 - const transformedData = this.transformData(effective); + let transformedData = null; + try { + transformedData = this.transformData(effective); + } catch (err) { + this.stats?.incFiltered?.(1); + this._emitRejectedRecord({ + errorId: 'transform_failed', + error: err, + rawData: { item, effective }, + }); + console.error('转换心跳数据失败:', err); + continue; + } // 添加到批量队列 this.batchQueue.push(transformedData); @@ -163,10 +196,11 @@ class HeartbeatProcessor { this._batchInFlight = true; let hasMore = false; + let batchData = null; try { const { batchEventCount, batchMessageCount } = this.computeNextBatchWindow(); - const batchData = this.batchQueue.slice(0, batchEventCount); + batchData = this.batchQueue.slice(0, batchEventCount); const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount); let insertedCount = 0; @@ -189,10 +223,12 @@ class HeartbeatProcessor { entry.deferred.resolve({ insertedCount: entry.eventCount }); } + this.stats?.incDbWritten?.(batchData.length); console.log(`成功处理批次数据,共 ${batchData.length} 条`); hasMore = this.batchQueue.length > 0; } catch (error) { console.error('批量处理失败:', error); + this._emitDbWriteError(error, batchData); if (!this.batchTimer) { const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000)); this.batchTimer = setTimeout(() => this.processBatch(), retryDelay); @@ -207,6 +243,70 @@ class HeartbeatProcessor { } } + _emitDbWriteError(error, rawData) { + if (!this.redis?.isEnabled?.()) return; + const list = Array.isArray(rawData) ? rawData : rawData ? [rawData] : []; + for (const record of list) { + this._emitRejectedRecord({ + errorId: 'db_write_failed', + error, + rawData: record, + }); + } + } + + _emitRejectedRecord({ errorId, error, rawData }) { + if (!this.redis?.isEnabled?.()) return; + const ts = formatTimestamp(new Date()); + const errMsg = error ? String(error?.stack ?? error?.message ?? error) : undefined; + const payload = this._safeStringify({ + errorId, + error: errMsg, + rawData, + }); + const base = `[ERROR] ${ts} ${errorId}: `; + const maxChunkChars = 50_000; + if (payload.length <= maxChunkChars) { + this.redis.pushConsoleLog?.({ level: 'warn', message: `${base}${payload}`, metadata: { module: 'processor' } }); + return; + } + const parts = Math.ceil(payload.length / maxChunkChars); + for (let i = 0; i < parts; i += 1) { + const chunk = payload.slice(i * maxChunkChars, (i + 1) * maxChunkChars); + this.redis.pushConsoleLog?.({ + level: 'warn', + message: `${base}(part ${i + 1}/${parts}) ${chunk}`, + metadata: { module: 'processor' }, + }); + } + } + + _extractRawKafkaValue(message) { + try { + const raw = message?.value; + if (Buffer.isBuffer(raw)) { + return { type: 'buffer', bytes: raw.length, base64: raw.toString('base64') }; + } + if (typeof raw === 'string') { + return { type: 'string', chars: raw.length, value: raw }; + } + return { type: typeof raw, value: raw }; + } catch (err) { + return { type: 'unknown', error: String(err?.message ?? err) }; + } + } + + _safeStringify(obj) { + try { + return JSON.stringify(obj); + } catch (err) { + return JSON.stringify({ + stringifyError: String(err?.message ?? err), + type: typeof obj, + }); + } + } + shouldFlushNow() { const max = Math.max(1, Number(this.config.batchSize ?? 1)); return this.batchQueue.length >= max; diff --git a/src/redis/redisIntegration.js b/src/redis/redisIntegration.js index 7dada3a..7c120c7 100644 --- a/src/redis/redisIntegration.js +++ b/src/redis/redisIntegration.js @@ -8,6 +8,9 @@ class RedisIntegration { this._connectPromise = null; this._lastErrorLogAt = 0; + this._pendingConsoleLogs = []; + this._flushingConsoleLogs = false; + this._pendingHeartbeat = null; } isEnabled() { @@ -23,7 +26,7 @@ class RedisIntegration { } getHeartbeatKey() { - return `${this.getProjectName()}_项目心跳`; + return '项目心跳'; } getConsoleKey() { @@ -50,6 +53,14 @@ class RedisIntegration { return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; } + getHeartbeatMaxLen() { + const v = this.config?.heartbeatMaxLen; + if (v === undefined) return 2000; + if (v === null) return null; + const n = Number(v); + return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; + } + getConsoleMaxLen() { const v = this.config?.consoleMaxLen; if (v === undefined || v === null) return null; @@ -137,6 +148,7 @@ class RedisIntegration { .connect() .then(() => { console.log('[redis] connected'); + return Promise.all([this.flushPendingHeartbeat(), this.flushPendingConsoleLogs()]); }) .catch((err) => { // connect 失败不抛出到上层;依赖 redis 内建重连策略或下次调用再触发 @@ -163,15 +175,32 @@ class RedisIntegration { } finally { this.client = null; this._connectPromise = null; + this._pendingConsoleLogs = []; + this._pendingHeartbeat = null; console.log('[redis] disconnected'); } } + async flushPendingHeartbeat() { + if (!this.isEnabled()) return; + if (!this.client || !this.client.isReady) return; + if (!this._pendingHeartbeat) return; + + const key = this.getHeartbeatKey(); + const value = this._pendingHeartbeat; + this._pendingHeartbeat = null; + + await this.client.rPush(key, value); + const maxLen = this.getHeartbeatMaxLen(); + if (maxLen) { + await this.client.lTrim(key, -maxLen, -1); + } + } + async writeHeartbeat() { if (!this.isEnabled()) return; - if (!this.client || !this.client.isReady) return; - const payload = { + projectName: this.getProjectName(), apiBaseUrl: this.getApiBaseUrl(), lastActiveAt: Date.now(), }; @@ -179,12 +208,34 @@ class RedisIntegration { const key = this.getHeartbeatKey(); const value = JSON.stringify(payload); - const ttl = this.getHeartbeatTtlSeconds(); - if (ttl) { - await this.client.set(key, value, { EX: ttl }); - } else { - await this.client.set(key, value); + if (!this.client || !this.client.isReady) { + this._pendingHeartbeat = value; + this.ensureConnectedInBackground(); + return; } + + let lastError = null; + for (let attempt = 1; attempt <= 2; attempt += 1) { + try { + await this.client.rPush(key, value); + const maxLen = this.getHeartbeatMaxLen(); + if (maxLen) { + await this.client.lTrim(key, -maxLen, -1); + } + this._pendingHeartbeat = null; + return; + } catch (err) { + lastError = err; + this.ensureConnectedInBackground(); + if (attempt < 2) { + await new Promise((r) => setTimeout(r, 250)); + if (!this.client?.isReady) return; + continue; + } + } + } + + throw lastError; } startHeartbeat() { @@ -214,13 +265,52 @@ class RedisIntegration { } } - async pushConsoleLog({ level, message, metadata }) { + async flushPendingConsoleLogs() { if (!this.isEnabled()) return; if (!this.client || !this.client.isReady) return; + if (this._flushingConsoleLogs) return; + if (!this._pendingConsoleLogs.length) return; + + this._flushingConsoleLogs = true; + try { + const key = this.getConsoleKey(); + while (this._pendingConsoleLogs.length) { + const batch = this._pendingConsoleLogs.splice(0, 200); + await this.client.rPush(key, ...batch); + + const maxLen = this.getConsoleMaxLen(); + if (maxLen) { + await this.client.lTrim(key, -maxLen, -1); + } + } + } finally { + this._flushingConsoleLogs = false; + } + } + + async pushConsoleLog({ level, message, metadata }) { + if (!this.isEnabled()) return; + const normalizedLevel = String(level ?? '').toLowerCase(); + if (!this.client || !this.client.isReady) { + if (this._pendingConsoleLogs.length < 5000) { + const entry = { + timestamp: new Date().toISOString(), + level: normalizedLevel, + message, + metadata: metadata ?? undefined, + }; + const value = JSON.stringify(entry); + if (Buffer.byteLength(value, 'utf8') <= 64 * 1024) { + this._pendingConsoleLogs.push(value); + } + } + this.ensureConnectedInBackground(); + return; + } const entry = { timestamp: new Date().toISOString(), - level, + level: normalizedLevel, message, metadata: metadata ?? undefined, }; diff --git a/src/stats/statsManager.js b/src/stats/statsManager.js new file mode 100644 index 0000000..e28b7c6 --- /dev/null +++ b/src/stats/statsManager.js @@ -0,0 +1,102 @@ +class StatsCounters { + constructor() { + this._minuteBuf = new SharedArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT * 3); + this._minute = new BigInt64Array(this._minuteBuf); + } + + incDbWritten(n = 1) { + const v = BigInt(Math.max(0, Number(n) || 0)); + if (v === 0n) return; + Atomics.add(this._minute, 0, v); + } + + incFiltered(n = 1) { + const v = BigInt(Math.max(0, Number(n) || 0)); + if (v === 0n) return; + Atomics.add(this._minute, 1, v); + } + + incKafkaPulled(n = 1) { + const v = BigInt(Math.max(0, Number(n) || 0)); + if (v === 0n) return; + Atomics.add(this._minute, 2, v); + } + + snapshotAndResetMinute() { + const dbWritten = Atomics.exchange(this._minute, 0, 0n); + const filtered = Atomics.exchange(this._minute, 1, 0n); + const kafkaPulled = Atomics.exchange(this._minute, 2, 0n); + return { dbWritten, filtered, kafkaPulled }; + } +} + +const pad2 = (n) => String(n).padStart(2, '0'); +const pad3 = (n) => String(n).padStart(3, '0'); + +const formatTimestamp = (d) => { + const year = d.getFullYear(); + const month = pad2(d.getMonth() + 1); + const day = pad2(d.getDate()); + const hour = pad2(d.getHours()); + const minute = pad2(d.getMinutes()); + const second = pad2(d.getSeconds()); + const ms = pad3(d.getMilliseconds()); + return `${year}-${month}-${day} ${hour}:${minute}:${second}.${ms}`; +}; + +class StatsReporter { + constructor({ redis, stats }) { + this.redis = redis; + this.stats = stats; + this._timer = null; + this._running = false; + } + + start() { + if (this._running) return; + this._running = true; + this._scheduleNext(); + } + + stop() { + this._running = false; + if (this._timer) { + clearTimeout(this._timer); + this._timer = null; + } + } + + flushOnce() { + if (!this.redis?.isEnabled?.()) return; + const { dbWritten, filtered, kafkaPulled } = this.stats.snapshotAndResetMinute(); + const ts = formatTimestamp(new Date()); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据库写入量: ${dbWritten}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} 数据过滤量: ${filtered}条`, metadata: { module: 'stats' } }); + this.redis.pushConsoleLog?.({ level: 'info', message: `[STATS] ${ts} Kafka拉取量: ${kafkaPulled}条`, metadata: { module: 'stats' } }); + } + + _scheduleNext() { + if (!this._running) return; + if (this._timer) return; + + const now = Date.now(); + const delay = 60_000 - (now % 60_000); + this._timer = setTimeout(() => { + this._timer = null; + try { + this.flushOnce(); + } catch (err) { + this.redis?.pushConsoleLog?.({ + level: 'warn', + message: `[ERROR] ${formatTimestamp(new Date())} 统计任务异常: ${String(err?.message ?? err)}`, + metadata: { module: 'stats' }, + }); + } finally { + this._scheduleNext(); + } + }, delay); + } +} + +export { StatsCounters, StatsReporter, formatTimestamp }; + diff --git a/test/smoke.test.js b/test/smoke.test.js index 5566b20..73b53a7 100644 --- a/test/smoke.test.js +++ b/test/smoke.test.js @@ -1,5 +1,6 @@ import assert from 'node:assert/strict'; import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; +import { RedisIntegration } from '../src/redis/redisIntegration.js'; describe('HeartbeatProcessor smoke', () => { it('decodes JSON buffer into object', () => { @@ -25,3 +26,108 @@ describe('HeartbeatProcessor smoke', () => { }); }); +describe('RedisIntegration protocol', () => { + it('writes heartbeat to 项目心跳 LIST', async () => { + const redis = new RedisIntegration({ + enabled: true, + projectName: 'BLS主机心跳日志', + apiBaseUrl: 'http://127.0.0.1:3000', + }); + + const calls = { rPush: [], lTrim: [] }; + redis.client = { + isReady: true, + rPush: async (key, value) => { + calls.rPush.push({ key, value }); + }, + lTrim: async (key, start, stop) => { + calls.lTrim.push({ key, start, stop }); + }, + }; + + const before = Date.now(); + await redis.writeHeartbeat(); + const after = Date.now(); + + assert.equal(calls.rPush.length, 1); + assert.equal(calls.rPush[0].key, '项目心跳'); + const payload = JSON.parse(calls.rPush[0].value); + assert.equal(payload.projectName, 'BLS主机心跳日志'); + assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000'); + assert.equal(typeof payload.lastActiveAt, 'number'); + assert.ok(payload.lastActiveAt >= before && payload.lastActiveAt <= after); + + assert.equal(calls.lTrim.length, 1); + assert.deepEqual(calls.lTrim[0], { key: '项目心跳', start: -2000, stop: -1 }); + }); + + it('caches heartbeat when redis is not ready and flushes later', async () => { + const redis = new RedisIntegration({ + enabled: true, + projectName: 'BLS主机心跳日志', + apiBaseUrl: 'http://127.0.0.1:3000', + }); + + const calls = { rPush: [], lTrim: [] }; + redis.client = { + isReady: false, + connect: async () => {}, + rPush: async (key, value) => { + calls.rPush.push({ key, value }); + }, + lTrim: async (key, start, stop) => { + calls.lTrim.push({ key, start, stop }); + }, + }; + + await redis.writeHeartbeat(); + assert.ok(redis._pendingHeartbeat); + + redis.client.isReady = true; + await redis.flushPendingHeartbeat(); + + assert.equal(redis._pendingHeartbeat, null); + assert.equal(calls.rPush.length, 1); + assert.equal(calls.rPush[0].key, '项目心跳'); + const payload = JSON.parse(calls.rPush[0].value); + assert.equal(payload.projectName, 'BLS主机心跳日志'); + assert.equal(payload.apiBaseUrl, 'http://127.0.0.1:3000'); + assert.equal(typeof payload.lastActiveAt, 'number'); + assert.equal(calls.lTrim.length, 1); + }); + + it('buffers console logs when redis is not ready', async () => { + const redis = new RedisIntegration({ + enabled: true, + projectName: 'BLS主机心跳日志', + apiBaseUrl: 'http://127.0.0.1:3000', + }); + + const calls = { rPush: [], lTrim: [] }; + redis.client = { + isReady: false, + connect: async () => {}, + rPush: async (key, ...values) => { + calls.rPush.push({ key, values }); + }, + lTrim: async (key, start, stop) => { + calls.lTrim.push({ key, start, stop }); + }, + }; + + await redis.info('hello', { module: 'test' }); + assert.equal(redis._pendingConsoleLogs.length, 1); + + redis.client.isReady = true; + await redis.flushPendingConsoleLogs(); + + assert.equal(redis._pendingConsoleLogs.length, 0); + assert.equal(calls.rPush.length, 1); + assert.equal(calls.rPush[0].key, 'BLS主机心跳日志_项目控制台'); + assert.equal(calls.rPush[0].values.length, 1); + const entry = JSON.parse(calls.rPush[0].values[0]); + assert.equal(entry.level, 'info'); + assert.equal(entry.message, 'hello'); + assert.equal(entry.metadata.module, 'test'); + }); +}); diff --git a/test/stats.test.js b/test/stats.test.js new file mode 100644 index 0000000..2116a4d --- /dev/null +++ b/test/stats.test.js @@ -0,0 +1,70 @@ +import assert from 'node:assert/strict'; +import { StatsCounters, StatsReporter } from '../src/stats/statsManager.js'; +import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; + +describe('StatsCounters', () => { + it('snapshots and resets minute counters atomically', () => { + const stats = new StatsCounters(); + stats.incDbWritten(3); + stats.incFiltered(2); + stats.incKafkaPulled(5); + + const first = stats.snapshotAndResetMinute(); + assert.equal(first.dbWritten, 3n); + assert.equal(first.filtered, 2n); + assert.equal(first.kafkaPulled, 5n); + + const second = stats.snapshotAndResetMinute(); + assert.equal(second.dbWritten, 0n); + assert.equal(second.filtered, 0n); + assert.equal(second.kafkaPulled, 0n); + }); +}); + +describe('StatsReporter', () => { + it('writes three [STATS] info logs to redis console', () => { + const stats = new StatsCounters(); + stats.incDbWritten(7); + stats.incFiltered(8); + stats.incKafkaPulled(9); + + const calls = { push: [] }; + const redis = { + isEnabled: () => true, + pushConsoleLog: ({ level, message, metadata }) => { + calls.push.push({ level, message, metadata }); + }, + }; + + const reporter = new StatsReporter({ redis, stats }); + reporter.flushOnce(); + + assert.equal(calls.push.length, 3); + assert.equal(calls.push[0].level, 'info'); + assert.equal(calls.push[1].level, 'info'); + assert.equal(calls.push[2].level, 'info'); + assert.match(calls.push[0].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据库写入量: 7条$/); + assert.match(calls.push[1].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} 数据过滤量: 8条$/); + assert.match(calls.push[2].message, /^\[STATS\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} Kafka拉取量: 9条$/); + }); +}); + +describe('HeartbeatProcessor db write error logging', () => { + it('emits [ERROR] warn log with raw data', () => { + const calls = { warn: [] }; + const redis = { + isEnabled: () => true, + pushConsoleLog: ({ level, message }) => { + if (level === 'warn') calls.warn.push(message); + }, + }; + + const processor = new HeartbeatProcessor({ batchSize: 1, batchTimeout: 10 }, {}, { redis }); + processor._emitDbWriteError(new Error('boom'), [{ a: 1 }]); + + assert.equal(calls.warn.length >= 1, true); + assert.match(calls.warn[0], /^\[ERROR\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} db_write_failed: /); + assert.match(calls.warn[0], /"errorId":"db_write_failed"/); + assert.match(calls.warn[0], /"rawData":\{"a":1\}/); + }); +});