From 910f1c353f3aa56dd994f8016faf577fbfbe09c5 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Wed, 14 Jan 2026 17:58:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0Redis=E9=9B=86?= =?UTF-8?q?=E6=88=90=E4=B8=8EKafka=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增Redis集成模块,支持心跳写入与控制台日志队列 - 优化Kafka消费者实现,支持多实例与自动重连 - 改进消息处理器,支持批量处理与多层解码 - 更新数据库表结构,调整字段类型与约束 - 添加Redis与Kafka的配置项和环境变量支持 - 补充测试用例和文档说明 --- .eslintrc.cjs | 22 + README.md | 6 +- docs/db-heartbeat-schema.md | 4 +- docs/kafka-heartbeat-producer.md | 13 +- docs/redis-integration-protocol.md | 136 +++++ .../update-heartbeat-db-v2/specs/db/spec.md | 4 +- .../specs/kafka/spec.md | 7 +- .../specs/processor/spec.md | 14 +- .../changes/update-heartbeat-db-v2/tasks.md | 15 +- openspec/specs/db/spec.md | 15 +- openspec/specs/kafka/spec.md | 16 +- openspec/specs/processor/spec.md | 30 +- openspec/specs/redis/spec.md | 33 ++ package-lock.json | 104 +++- package.json | 13 +- scripts/db/010_heartbeat_schema.sql | 10 +- scripts/db/apply.js | 10 +- scripts/db/smokeTest.js | 47 +- scripts/kafka/decodeMessage.js | 66 +++ scripts/redis/smokeTest.js | 17 + src/config/config.example.js | 60 ++- src/db/databaseManager.js | 201 ++++++-- src/index.js | 60 ++- src/kafka/consumer.js | 199 +++++++- src/processor/heartbeatProcessor.js | 466 ++++++++++++++++-- src/redis/redisIntegration.js | 263 ++++++++++ test/smoke.test.js | 27 + vite.config.js | 10 +- 28 files changed, 1691 insertions(+), 177 deletions(-) create mode 100644 .eslintrc.cjs create mode 100644 docs/redis-integration-protocol.md create mode 100644 openspec/specs/redis/spec.md create mode 100644 scripts/kafka/decodeMessage.js create mode 100644 scripts/redis/smokeTest.js create mode 100644 src/redis/redisIntegration.js create mode 100644 test/smoke.test.js diff --git a/.eslintrc.cjs b/.eslintrc.cjs new file mode 100644 index 0000000..8f16655 --- /dev/null +++ b/.eslintrc.cjs @@ -0,0 +1,22 @@ +module.exports = { + root: true, + env: { + node: true, + es2022: true, + }, + overrides: [ + { + files: ['test/**/*.js'], + env: { + mocha: true, + }, + }, + ], + parserOptions: { + ecmaVersion: 'latest', + sourceType: 'module', + }, + extends: ['eslint:recommended'], + ignorePatterns: ['dist/', 'build/', 'coverage/', 'node_modules/'], + rules: {}, +}; diff --git a/README.md b/README.md index b8afdd7..2b8f3af 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,11 @@ npm run build ### 运行 ```bash -npm run dev +# 启动后端(Kafka consumer + DB + Redis) +npm run start + +# 注意:npm run dev 是 Vite 开发服务器,不会启动 Kafka consumer +# npm run dev ``` ## 项目结构 diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md index e64f274..51c91b4 100644 --- a/docs/db-heartbeat-schema.md +++ b/docs/db-heartbeat-schema.md @@ -32,12 +32,12 @@ | pms_state | int2 | 是 | PMS 状态(枚举待定) | | carbon_state | int2 | 是 | 碳控状态(枚举待定) | | device_count | int2 | 是 | 设备数量/上报设备数量(语义待确认) | -| comm_seq | int2 | 是 | 通讯序号(语义待确认) | +| comm_seq | int4 | 是 | 通讯序号(语义待确认) | | extra | jsonb | 否 | 可扩展字段:电参/空调状态/版本/来源等 | ### 2.2 约束 - 所有必填字段:`NOT NULL` -- `ip`:使用 `inet` 类型(天然校验 IPv4/IPv6 格式) +- `ip`:使用 `varchar(21)`,用于存储 `IP:PORT`(IPv4) - 各 `int2/int4`:当前脚本采用“非负 + 上界”CHECK(避免枚举未来扩展造成写入失败) - 如需更强的枚举约束,建议在确认枚举标准后改为 `IN (...)` 或 `BETWEEN` 更小范围。 diff --git a/docs/kafka-heartbeat-producer.md b/docs/kafka-heartbeat-producer.md index 2ddd548..2956be6 100644 --- a/docs/kafka-heartbeat-producer.md +++ b/docs/kafka-heartbeat-producer.md @@ -3,11 +3,11 @@ 本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。 ## 1. Topic 与编码 -- Topic:默认 `bls-heartbeat`(以服务端配置为准,见 `src/config/config.js`) +- Topic:默认 `blwlog4Nodejs-rcu-heartbeat-topic`(以服务端配置为准,见 `src/config/config.js`) - 编码:UTF-8 - 建议消息格式:JSON(便于跨语言对接与灰度演进) -> 注意:当前服务端代码的“二进制解包”尚未实现,若你们已经有既定二进制协议,需要在 Processor 中落地对应解包逻辑,并在本文档补充协议细节。 +> 服务端会以 buffer 方式接收 Kafka message.value,并按 UTF-8 解码为 JSON。 ## 2. 消息 Key(强烈建议) 为了保证同设备消息更有序、便于消费端批量聚合: @@ -23,7 +23,7 @@ | hotel_id | number/int | 12 | 酒店编号(int2 范围内) | | room_id | number/int | 1203 | 房间编号/房间标识(int4) | | device_id | string | "A1B2C3D4" | 设备唯一 ID(序列号/MAC/自定义编码) | -| ip | string | "192.168.1.10" | IPv4/IPv6 字符串(落库为 inet) | +| ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21)) | | power_state | number/int | 1 | 取电状态(枚举值需统一标准) | | guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) | | cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) | @@ -61,6 +61,13 @@ "meter": {"p": 123.4, "e_wh": 5678} } } + + ## 4.1 C# 生产者示例(与你们当前实现一致) + ```csharp + var nas = JsonConvert.SerializeObject(s); + var data = Encoding.UTF8.GetBytes(nas); + // 将 data 作为 Kafka message.value 发送即可 + ``` ``` ## 5. 推送方式(实现建议) diff --git a/docs/redis-integration-protocol.md b/docs/redis-integration-protocol.md new file mode 100644 index 0000000..01afe30 --- /dev/null +++ b/docs/redis-integration-protocol.md @@ -0,0 +1,136 @@ +# Redis 对接协议(供 AI 生成代码使用) + +本文档定义“外部项目 ↔ BLS Project Console”之间通过 Redis 交互的 **Key 命名、数据类型、写入方式、读取方式与数据格式**。 + +> 约束:每个需要关联本控制台的外部项目,必须在本项目使用的同一个 Redis 实例中: +> +> - 写入 2 个 Key(心跳 + 控制台信息) +> - 命令下发为 HTTP API 调用 + +## 1. 命名约定 + +令: + +- `projectName`:外部项目名称(建议只用字母数字下划线 `A-Za-z0-9_`;如使用中文也可,但需保证统一且 UTF-8)。 + +固定后缀: + +- 心跳:`${projectName}_项目心跳` +- 控制台:`${projectName}_项目控制台` + +示例(projectName = `订单系统`): + +- `订单系统_项目心跳` +- `订单系统_项目控制台` + +## 2. 外部项目需要写入的 2 个 Key + +### 2.1 `${projectName}_项目心跳` + +- Redis 数据类型:**STRING** +- 写入方式:`SET ${projectName}_项目心跳 ` +- value:JSON 字符串,必须包含目标项目可被调用的 `apiBaseUrl`,以及活跃时间戳 `lastActiveAt` + +推荐 JSON Schema: + +```json +{ + "apiBaseUrl": "http://127.0.0.1:4001", + "lastActiveAt": 1760000000000 +} +``` + +字段说明: + +- `apiBaseUrl`:目标项目对外提供的 API 地址(基地址,后端将基于它拼接 `apiName`) +- `lastActiveAt`:状态时间(活跃时间戳,毫秒)。建议每 **3 秒**刷新一次。 + +在线/离线判定(BLS Project Console 使用): + +- 若 `now - lastActiveAt > 10_000ms`,则认为该应用 **离线** +- 否则认为 **在线** + +建议: + +- `lastActiveAt` 使用 `Date.now()` 生成(毫秒) +- 可设置 TTL(可选):例如 `SET key value EX 30` + +### 2.2 `${projectName}_项目控制台` + +- Redis 数据类型:**LIST**(作为项目向控制台追加的“消息队列/日志队列”) +- 写入方式(推荐 FIFO):`RPUSH ${projectName}_项目控制台 ` + +value(推荐格式):一条 JSON 字符串,表示“错误/调试信息”或日志记录。 + +推荐 JSON Schema(字段尽量保持稳定,便于控制台解析): + +```json +{ + "timestamp": "2026-01-12T12:34:56.789Z", + "level": "info", + "message": "连接成功", + "metadata": { + "module": "redis", + "host": "127.0.0.1" + } +} +``` + +字段说明: + +- `timestamp`:ISO-8601 时间字符串 +- `level`:建议取值 `info|warn|error|debug`(小写) +- `message`:日志文本 +- `metadata`:可选对象(附加信息) + +## 3. 命令下发方式(HTTP API 控制) + +控制台不再通过 Redis 写入控制指令队列;改为由 BLS Project Console 后端根据目标项目心跳里的 `apiBaseUrl` 直接调用目标项目 HTTP API。 + +### 3.1 控制台输入格式 + +一行文本按空格拆分: + +- 第一个 token:`apiName`(接口名/路径片段) +- 剩余 token:参数列表(字符串数组) + +示例: + +- `reload` +- `reload force` +- `user/refreshCache tenantA` + +### 3.2 目标项目需要提供的 API + +后端默认使用 `POST` 调用: + +- `POST {apiBaseUrl}/{apiName}` + +请求体(JSON)示例: + +```json +{ + "commandId": "cmd-1700000000000-abc123", + "timestamp": "2026-01-13T00:00:00.000Z", + "source": "BLS Project Console", + "apiName": "reload", + "args": ["force"], + "argsText": "force" +} +``` + +返回建议: + +- 2xx 表示成功 +- 非 2xx 表示失败(控制台会展示 upstreamStatus 与部分返回内容) + +## 4. 兼容与错误处理建议 + +- JSON 解析失败:外部项目应记录错误,并丢弃该条消息(避免死循环阻塞消费)。 +- 消息过长:建议控制单条消息大小(例如 < 64KB)。 +- 字符编码:统一 UTF-8。 + +## 5. 与本项目代码的对应关系(实现中) + +- 后端通过 `/api/commands`:从 `${targetProjectName}_项目心跳` 读取 `apiBaseUrl` 与 `lastActiveAt`,在线时调用目标项目 API。 +- 后端通过 `/api/logs`:读取 `${projectName}_项目控制台`;并基于 `${projectName}_项目心跳` 返回在线/离线与 API 地址信息。 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md index 3ab929b..feebabe 100644 --- a/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md +++ b/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md @@ -1,7 +1,7 @@ ## MODIFIED Requirements ### Requirement: 数据库表结构管理 -系统必须包含数据库表结构的定义和管理机制。 +系统 MUST 提供数据库表结构的定义和管理机制。 #### Scenario: 表结构初始化(高吞吐分区表) - **WHEN** 系统首次启动或部署数据库时 @@ -18,7 +18,7 @@ ## ADDED Requirements ### Requirement: 高吞吐写入友好 -系统在高吞吐场景(约 5 万条/分钟量级)下应避免单点瓶颈。 +系统 MUST 在高吞吐场景(约 5 万条/分钟量级)下避免单点瓶颈。 #### Scenario: 批量写入与分区裁剪 - **WHEN** 进行批量写入 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md index 139dee1..7072c47 100644 --- a/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md +++ b/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md @@ -1,7 +1,7 @@ ## ADDED Requirements ### Requirement: 心跳消息载荷格式(生产者约束) -Kafka 心跳消息必须包含数据库落库所需的必填字段,并采用 UTF-8 编码。 +Kafka 心跳消息 MUST 包含数据库落库所需的必填字段,并采用 UTF-8 编码。 #### Scenario: JSON 心跳消息 - **WHEN** 生产者向主题推送心跳消息 @@ -10,6 +10,11 @@ Kafka 心跳消息必须包含数据库落库所需的必填字段,并采用 U - **AND** 可选包含 extra(json object) ### Requirement: 分区键友好的 Kafka Key +系统 MUST 支持使用 `hotel_id:device_id` 作为 Kafka message key 以获得更好的分区与有序性。 + +#### Scenario: 缺失 key 仍可处理 +- **WHEN** 消息未携带 key +- **THEN** 系统仍应能够消费与处理该消息 #### Scenario: 使用 device_id 作为 key - **WHEN** 生产者发送消息 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md index a30049b..c5ff9fa 100644 --- a/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md +++ b/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md @@ -1,7 +1,19 @@ ## MODIFIED Requirements +### Requirement: Kafka 心跳消息解码/解压 +系统 MUST 能够将 Kafka 消息 value 解码为 JSON 对象或数组,并支持两层以内的编码/压缩组合。 + +#### Scenario: 两层以内解码成功 +- **WHEN** Kafka 消息 value 为 UTF-8 JSON、base64(二进制)、或 gzip/deflate/raw-deflate/brotli 压缩二进制 +- **THEN** 系统应按“最多两层”策略尝试解码/解压 +- **AND** 成功时得到 JSON 对象或数组 + +#### Scenario: 解码失败 +- **WHEN** Kafka 消息 value 无法被解码/解压为 JSON +- **THEN** 系统应记录错误并丢弃该消息 + ### Requirement: 心跳数据转换 -系统必须能够将解包后的心跳数据转换为数据库存储格式。 +系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 #### Scenario: 转换为 v2 明细表字段 - **WHEN** 心跳数据验证通过时 diff --git a/openspec/changes/update-heartbeat-db-v2/tasks.md b/openspec/changes/update-heartbeat-db-v2/tasks.md index 3f23f6e..b0c6e87 100644 --- a/openspec/changes/update-heartbeat-db-v2/tasks.md +++ b/openspec/changes/update-heartbeat-db-v2/tasks.md @@ -1,10 +1,11 @@ ## 1. Implementation -- [ ] 提供 PostgreSQL 建库脚本(UTF-8 + 中文排序规则可选) -- [ ] 提供心跳明细表结构(必填字段、可选字段、约束、索引) -- [ ] 实现按 `ts_ms` 日分区与自动建分区机制 -- [ ] 补充性能建议(索引策略、分区影响、聚合/物化视图建议) -- [ ] 产出 docs:DB 表结构文档 + Kafka 生产者推送数据结构与方式 +- [x] 提供 PostgreSQL 建库脚本(UTF-8 + 中文排序规则可选) +- [x] 提供心跳明细表结构(必填字段、可选字段、约束、索引) +- [x] 实现按 `ts_ms` 日分区与自动建分区机制 +- [x] 补充性能建议(索引策略、分区影响、聚合/物化视图建议) +- [x] 产出 docs:DB 表结构文档 + Kafka 生产者推送数据结构与方式 +- [ ] Processor:实现 Kafka 心跳 value 的两层解码/解压与反序列化(需要对端样本/算法确认) ## 2. Validation -- [ ] 在可访问的 PostgreSQL 环境执行脚本并验证对象创建成功 -- [ ] 检查约束与索引是否符合要求 +- [x] 在可访问的 PostgreSQL 环境执行脚本并验证对象创建成功 +- [x] 检查约束与索引是否符合要求 diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index 877aff2..ff3d003 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -1,9 +1,12 @@ # 数据库操作规范 -## 需求 +## Purpose +本规范定义本服务对 PostgreSQL 的连接池配置、表结构初始化(含分区表)、分区预创建维护策略、批量写入与约束错误处理等行为。 + +## Requirements ### Requirement: 数据库连接管理 -系统必须能够建立和维护与PostgreSQL数据库的连接。 +系统 MUST 能够建立和维护与 PostgreSQL 数据库的连接。 #### Scenario: 成功连接数据库 - **WHEN** 系统启动时 @@ -16,7 +19,7 @@ - **AND** 重连失败时应该记录错误日志 ### Requirement: 心跳数据写入 -系统必须能够将处理后的心跳数据写入PostgreSQL数据库。 +系统 MUST 能够将处理后的心跳数据写入 PostgreSQL 数据库。 #### Scenario: 写入单条心跳数据 - **WHEN** 接收到单条处理后的心跳数据时 @@ -29,7 +32,7 @@ - **AND** 提高写入效率 ### Requirement: 数据完整性保障 -系统必须保障写入数据库的心跳数据的完整性。 +系统 MUST 保障写入数据库的心跳数据完整性。 #### Scenario: 事务管理 - **WHEN** 写入多条相关数据时 @@ -43,7 +46,7 @@ - **AND** 根据配置决定是否重试 ### Requirement: 数据库表结构管理 -系统必须包含数据库表结构的定义和管理机制。 +系统 MUST 提供数据库表结构的定义和管理机制。 #### Scenario: 表结构初始化 - **WHEN** 系统首次启动时 @@ -62,7 +65,7 @@ - **AND** 不影响现有数据 ### Requirement: 数据查询支持 -系统必须支持基本的数据查询操作,用于监控和调试。 +系统 MUST 支持基本的数据查询操作,用于监控和调试。 #### Scenario: 查询最新心跳数据 - **WHEN** 需要查询最新的心跳数据时 diff --git a/openspec/specs/kafka/spec.md b/openspec/specs/kafka/spec.md index 48b92a0..c4f46b5 100644 --- a/openspec/specs/kafka/spec.md +++ b/openspec/specs/kafka/spec.md @@ -1,9 +1,12 @@ # Kafka消息处理规范 -## 需求 +## Purpose +本规范定义本服务如何连接 Kafka 集群、订阅主题并消费消息(以 buffer 形式透传 payload),以及错误处理/重连与消费确认语义。 + +## Requirements ### Requirement: Kafka连接管理 -系统必须能够建立和维护与Kafka集群的连接。 +系统 MUST 能够建立和维护与 Kafka 集群的连接。 #### Scenario: 成功连接Kafka集群 - **WHEN** 系统启动时 @@ -16,19 +19,24 @@ - **AND** 重连失败时应该记录错误日志 ### Requirement: 心跳消息消费 -系统必须能够消费Kafka队列中的心跳消息。 +系统 MUST 能够消费 Kafka 队列中的心跳消息。 #### Scenario: 消费心跳消息 - **WHEN** Kafka队列中有心跳消息时 - **THEN** 系统应该消费该消息 - **AND** 将消息传递给处理器进行解包 +#### Scenario: 二进制 payload 透传 +- **WHEN** Kafka 消息 value 可能为二进制压缩数据(非纯文本) +- **THEN** Consumer 应使用 buffer 方式接收 value/key +- **AND** 将原始 buffer 交由 Processor 执行解码/解压与反序列化 + #### Scenario: 消息消费确认 - **WHEN** 消息处理完成后 - **THEN** 系统应该向Kafka确认消息已消费 ### Requirement: 消息过滤与路由 -系统必须能够根据消息类型过滤和路由心跳消息。 +系统 MUST 能够根据消息类型过滤和路由心跳消息。 #### Scenario: 过滤无效消息 - **WHEN** 接收到无效格式的消息时 diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index 9b1ab6c..b525e28 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -1,9 +1,25 @@ # 数据处理器规范 -## 需求 +## Purpose +本规范定义心跳处理器对 Kafka 消息 value 的解码/解压(含两层以内组合)、字段校验、转换为分区表写入结构,以及批量写库与失败丢弃/记录策略。 + +## Requirements ### Requirement: 心跳数据解包 -系统必须能够解包Kafka消息中的二进制心跳数据。 +系统 MUST 能够解包 Kafka 消息中的心跳数据。 + +#### Scenario: 支持常见编码/压缩(两层以内) +- **WHEN** Kafka 消息 value 为下列任意形式时: + - UTF-8 JSON(对象或数组) + - base64(二进制) + - gzip / deflate(zlib) / deflate(raw) / brotli 压缩后的二进制 +- **THEN** 系统应当按“最多两层”的策略尝试解码/解压 +- **AND** 成功时应还原为 JSON 对象或数组 +- **AND** 失败时应记录错误并丢弃该消息 + +#### Scenario: 支持包装结构 +- **WHEN** 解包得到的 JSON 为包装结构(例如包含 `data`/`payload`/`body` 字段) +- **THEN** 系统应优先提取其中的对象作为心跳数据源 #### Scenario: 解包有效心跳数据 - **WHEN** 接收到有效格式的Kafka心跳消息时 @@ -16,7 +32,7 @@ - **AND** 记录错误日志 ### Requirement: 心跳数据验证 -系统必须能够验证解包后的心跳数据的有效性。 +系统 MUST 能够验证解包后的心跳数据有效性。 #### Scenario: 验证有效心跳数据 - **WHEN** 解包后的心跳数据格式正确且字段完整时 @@ -30,7 +46,7 @@ - **AND** 丢弃该数据 ### Requirement: 心跳数据转换 -系统必须能够将解包后的心跳数据转换为数据库存储格式。 +系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 #### Scenario: 转换心跳数据格式 - **WHEN** 心跳数据验证通过时 @@ -38,9 +54,13 @@ - **AND** 添加必要的元数据 ### Requirement: 批量处理支持 -系统必须支持批量处理心跳数据,提高处理效率。 +系统 MUST 支持批量处理心跳数据,提高处理效率。 #### Scenario: 批量处理心跳数据 - **WHEN** 接收到大量心跳消息时 - **THEN** 系统应该将数据分批处理 - **AND** 每批处理的数量可配置 + +#### Scenario: Kafka 单条消息携带批量心跳 +- **WHEN** Kafka 消息 value 为 JSON 数组(批量心跳) +- **THEN** 系统应将数组内每条心跳作为独立项进入批处理队列 diff --git a/openspec/specs/redis/spec.md b/openspec/specs/redis/spec.md new file mode 100644 index 0000000..90b2ea7 --- /dev/null +++ b/openspec/specs/redis/spec.md @@ -0,0 +1,33 @@ +# Redis 对接规范 + +## Purpose +本规范定义本服务按协议向 Redis 写入“项目心跳”(STRING) 与“项目控制台”(LIST) 两个 key 的数据结构与频率,并在 Redis 不可用时保持无人值守可用性(不阻塞启动、后台重连)。 + +## Requirements + +### Requirement: 心跳 Key 写入 +系统 MUST 按协议周期性写入 Redis STRING 心跳 Key。 + +#### Scenario: 定期刷新心跳 +- **WHEN** 服务运行中 +- **THEN** 系统应每 3 秒(可配置)执行一次 `SET ${projectName}_项目心跳 ` +- **AND** JSON 必须包含 `apiBaseUrl` 与 `lastActiveAt`(毫秒时间戳) +- **AND** value 使用 UTF-8 编码 +- **AND** 可选设置 TTL(例如 EX 30) + +### Requirement: 控制台日志队列写入 +系统 MUST 按协议向 Redis LIST 追加控制台日志。 + +#### Scenario: 追加日志 +- **WHEN** 发生关键事件(启动成功/错误/连接状态变化) +- **THEN** 系统应执行 `RPUSH ${projectName}_项目控制台 ` +- **AND** JSON 必须包含 `timestamp`(ISO-8601)、`level`、`message` +- **AND** `level` 建议取值 `info|warn|error|debug` + +### Requirement: Redis 异常处理 +系统 MUST 在 Redis 不可用时进行后台重连,且不得阻塞主服务启动。 + +#### Scenario: Redis 连接中断 +- **WHEN** Redis 连接中断 +- **THEN** 系统应自动重连 +- **AND** 不应导致主进程崩溃 diff --git a/package-lock.json b/package-lock.json index cc99ef4..96a5b81 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,8 @@ "license": "MIT", "dependencies": { "kafka-node": "^5.0.0", - "pg": "^8.11.3" + "pg": "^8.11.3", + "redis": "^4.7.1" }, "devDependencies": { "eslint": "^8.56.0", @@ -548,6 +549,66 @@ "node": ">= 8" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmmirror.com/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.6.1", + "resolved": "https://registry.npmmirror.com/@redis/client/-/client-1.6.1.tgz", + "integrity": "sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==", + "license": "MIT", + "peer": true, + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.1", + "resolved": "https://registry.npmmirror.com/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.7", + "resolved": "https://registry.npmmirror.com/@redis/json/-/json-1.0.7.tgz", + "integrity": "sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.2.0", + "resolved": "https://registry.npmmirror.com/@redis/search/-/search-1.2.0.tgz", + "integrity": "sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.1.0", + "resolved": "https://registry.npmmirror.com/@redis/time-series/-/time-series-1.1.0.tgz", + "integrity": "sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==", + "license": "MIT", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@rollup/rollup-android-arm-eabi": { "version": "4.55.1", "resolved": "https://registry.npmmirror.com/@rollup/rollup-android-arm-eabi/-/rollup-android-arm-eabi-4.55.1.tgz", @@ -1371,6 +1432,15 @@ "node": ">=8" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmmirror.com/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/code-point-at": { "version": "1.1.0", "resolved": "https://registry.npmmirror.com/code-point-at/-/code-point-at-1.1.0.tgz", @@ -2064,6 +2134,15 @@ "node": ">=0.10.0" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmmirror.com/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "license": "MIT", + "engines": { + "node": ">= 4" + } + }, "node_modules/get-caller-file": { "version": "2.0.5", "resolved": "https://registry.npmmirror.com/get-caller-file/-/get-caller-file-2.0.5.tgz", @@ -3356,6 +3435,23 @@ "node": ">=8.10.0" } }, + "node_modules/redis": { + "version": "4.7.1", + "resolved": "https://registry.npmmirror.com/redis/-/redis-4.7.1.tgz", + "integrity": "sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==", + "license": "MIT", + "workspaces": [ + "./packages/*" + ], + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.6.1", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.7", + "@redis/search": "1.2.0", + "@redis/time-series": "1.1.0" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmmirror.com/require-directory/-/require-directory-2.1.1.tgz", @@ -4145,6 +4241,12 @@ "node": ">=10" } }, + "node_modules/yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmmirror.com/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==", + "license": "ISC" + }, "node_modules/yargs": { "version": "16.2.0", "resolved": "https://registry.npmmirror.com/yargs/-/yargs-16.2.0.tgz", diff --git a/package.json b/package.json index baf1980..b57e3e3 100644 --- a/package.json +++ b/package.json @@ -6,20 +6,23 @@ "main": "dist/index.js", "scripts": { "dev": "vite", + "start": "node src/index.js", "build": "vite build", "preview": "vite preview", "lint": "eslint . --ext .js", "test": "mocha", - "db:apply": "node scripts/db/apply.js" + "db:apply": "node scripts/db/apply.js", + "kafka:decode": "node scripts/kafka/decodeMessage.js" }, "dependencies": { "kafka-node": "^5.0.0", - "pg": "^8.11.3" + "pg": "^8.11.3", + "redis": "^4.7.1" }, "devDependencies": { - "vite": "^5.0.0", "eslint": "^8.56.0", - "mocha": "^10.2.0" + "mocha": "^10.2.0", + "vite": "^5.0.0" }, "keywords": [ "BLS", @@ -30,4 +33,4 @@ ], "author": "", "license": "MIT" -} \ No newline at end of file +} diff --git a/scripts/db/010_heartbeat_schema.sql b/scripts/db/010_heartbeat_schema.sql index 7469adb..0a5a5c1 100644 --- a/scripts/db/010_heartbeat_schema.sql +++ b/scripts/db/010_heartbeat_schema.sql @@ -13,9 +13,9 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( ts_ms bigint NOT NULL, hotel_id int2 NOT NULL, - room_id int4 NOT NULL, + room_id varchar(50) NOT NULL, device_id varchar(64) NOT NULL, - ip inet NOT NULL, + ip varchar(21) NOT NULL, power_state int2 NOT NULL, guest_type int2 NOT NULL, cardless_state int2 NOT NULL, @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( pms_state int2 NOT NULL, carbon_state int2 NOT NULL, device_count int2 NOT NULL, - comm_seq int2 NOT NULL, + comm_seq int4 NOT NULL, -- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra) extra jsonb, @@ -33,14 +33,14 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( -- CHECK 约束:先做“非负+上界”约束(避免未来枚举扩展导致写入失败) CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), - CONSTRAINT chk_room_id_range CHECK (room_id >= 0), + CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50), CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), - CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767) + CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0) ) PARTITION BY RANGE (ts_ms); diff --git a/scripts/db/apply.js b/scripts/db/apply.js index 3271d62..c4aa8fe 100644 --- a/scripts/db/apply.js +++ b/scripts/db/apply.js @@ -15,10 +15,10 @@ function getEnv(name, fallback) { function buildClientConfig(database) { const db = config.db; return { - host: getEnv('PGHOST', db.host), - port: Number(getEnv('PGPORT', db.port)), - user: getEnv('PGUSER', db.user), - password: getEnv('PGPASSWORD', db.password), + host: getEnv('POSTGRES_HOST', getEnv('PGHOST', db.host)), + port: Number(getEnv('POSTGRES_PORT', getEnv('PGPORT', db.port))), + user: getEnv('POSTGRES_USER', getEnv('PGUSER', db.user)), + password: getEnv('POSTGRES_PASSWORD', getEnv('PGPASSWORD', db.password)), database, }; } @@ -36,7 +36,7 @@ async function main() { const schemaFile = path.join(scriptsDir, '010_heartbeat_schema.sql'); const partitionFile = path.join(scriptsDir, '020_partitioning_auto_daily.sql'); - const targetDb = getEnv('PGTARGETDB', config.db.database); + const targetDb = getEnv('POSTGRES_DATABASE', getEnv('PGTARGETDB', config.db.database)); console.log(`[db] Connecting to target db: ${targetDb}`); const targetClient = new Client(buildClientConfig(targetDb)); diff --git a/scripts/db/smokeTest.js b/scripts/db/smokeTest.js index 69d2089..2d785d3 100644 --- a/scripts/db/smokeTest.js +++ b/scripts/db/smokeTest.js @@ -15,6 +15,29 @@ async function main() { // 预创建今日分区,避免“无分区时 INSERT 直接失败” await client.query('SELECT heartbeat.ensure_partitions(current_date, current_date)'); + const ipType = await client.query( + ` + SELECT format_type(a.atttypid, a.atttypmod) AS type + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' + AND c.relname = 'heartbeat_events' + AND a.attname = 'ip' + AND a.attnum > 0 + AND NOT a.attisdropped + ` + ); + + const type = String(ipType?.rows?.[0]?.type ?? '').toLowerCase(); + if (type.startsWith('inet')) { + await client.query( + `ALTER TABLE heartbeat.heartbeat_events + ALTER COLUMN ip TYPE varchar(21) + USING ip::text` + ); + } + const ts = Date.now(); await client.query( `INSERT INTO heartbeat.heartbeat_events ( @@ -27,7 +50,7 @@ async function main() { 1, 101, 'dev-1', - '192.168.0.1', + '192.168.0.1:12345', 1, 0, 0, @@ -51,11 +74,33 @@ async function main() { ORDER BY c.relname` ); + const parent = await client.query( + ` + SELECT c.relkind AS kind + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' + AND c.relname = 'heartbeat_events' + ` + ); + + const parentIndexes = await client.query( + ` + SELECT indexname + FROM pg_indexes + WHERE schemaname = 'heartbeat' + AND tablename = 'heartbeat_events' + ORDER BY indexname + ` + ); + const cnt = await client.query( 'SELECT count(*)::int AS n FROM heartbeat.heartbeat_events' ); + console.log('parentKind:', parent.rows?.[0]?.kind); console.log('partitions:', partitions.rows.map((r) => r.partition)); + console.log('parentIndexes:', parentIndexes.rows.map((r) => r.indexname)); console.log('rows:', cnt.rows[0].n); await client.end(); diff --git a/scripts/kafka/decodeMessage.js b/scripts/kafka/decodeMessage.js new file mode 100644 index 0000000..f4604cf --- /dev/null +++ b/scripts/kafka/decodeMessage.js @@ -0,0 +1,66 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import process from 'node:process'; + +import { HeartbeatProcessor } from '../../src/processor/heartbeatProcessor.js'; + +function usageAndExit(code = 1) { + console.log(`\n用法:\n node scripts/kafka/decodeMessage.js --base64 \n node scripts/kafka/decodeMessage.js --hex \n node scripts/kafka/decodeMessage.js --file [--encoding base64|hex|raw]\n\n说明:\n- 用于验证 Kafka message.value 的反向解码结果(对端为 JSON + UTF-8 bytes)\n- 会尝试:UTF-8 JSON / base64 -> (gzip|deflate|raw deflate|brotli) 循环解压(兼容但对端当前未用)\n`); + process.exit(code); +} +function parseArgs(argv) { + const args = {}; + for (let i = 2; i < argv.length; i++) { + const a = argv[i]; + if (a === '--base64') args.base64 = argv[++i]; + else if (a === '--hex') args.hex = argv[++i]; + else if (a === '--file') args.file = argv[++i]; + else if (a === '--encoding') args.encoding = argv[++i]; + else if (a === '--help' || a === '-h') args.help = true; + else args._ = [...(args._ ?? []), a]; + } + return args; +} + +const args = parseArgs(process.argv); +if (args.help) usageAndExit(0); + +const processor = new HeartbeatProcessor( + { batchSize: 9999, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => {} } +); + +let buf; +if (args.base64) { + buf = Buffer.from(String(args.base64).trim(), 'base64'); +} else if (args.hex) { + buf = Buffer.from(String(args.hex).trim().replace(/\s+/g, ''), 'hex'); +} else if (args.file) { + const p = path.resolve(process.cwd(), args.file); + const raw = fs.readFileSync(p); + const enc = (args.encoding ?? 'raw').toLowerCase(); + if (enc === 'raw') buf = raw; + else if (enc === 'base64') buf = Buffer.from(raw.toString('utf8').trim(), 'base64'); + else if (enc === 'hex') buf = Buffer.from(raw.toString('utf8').trim().replace(/\s+/g, ''), 'hex'); + else { + console.error('未知 encoding:', enc); + usageAndExit(1); + } +} else { + usageAndExit(1); +} + +try { + const obj = processor.decodeToObject(buf); + const items = Array.isArray(obj) ? obj : [obj]; + + console.log('[decode] ok; items:', items.length); + console.log(JSON.stringify(obj, null, 2)); + + const normalized = items.map((x) => processor.normalizeHeartbeat(processor.unwrapPayload(x))); + const validCount = normalized.filter((x) => processor.validateData(x)).length; + console.log('[normalize] valid (required fields present):', validCount, '/', items.length); +} catch (err) { + console.error('[decode] failed:', err); + process.exitCode = 1; +} diff --git a/scripts/redis/smokeTest.js b/scripts/redis/smokeTest.js new file mode 100644 index 0000000..56244c5 --- /dev/null +++ b/scripts/redis/smokeTest.js @@ -0,0 +1,17 @@ +import config from '../../src/config/config.js'; +import { RedisIntegration } from '../../src/redis/redisIntegration.js'; + +async function main() { + const redis = new RedisIntegration(config.redis); + await redis.connect(); + + await redis.writeHeartbeat(); + await redis.info('redis smoke test: ok', { module: 'redis' }); + + await redis.disconnect(); +} + +main().catch((err) => { + console.error('redis smoke test failed:', err); + process.exit(1); +}); diff --git a/src/config/config.example.js b/src/config/config.example.js index 7985ad5..24b2004 100644 --- a/src/config/config.example.js +++ b/src/config/config.example.js @@ -1,16 +1,40 @@ // 配置文件示例 // 复制此文件为 config.js 并填写实际配置 +const env = process.env; +const envList = (v) => + String(v ?? '') + .split(',') + .map((s) => s.trim()) + .filter(Boolean); + export default { + // Redis 对接(严格按 docs/redis-integration-protocol.md) + redis: { + enabled: true, + projectName: 'Web_BLS_Heartbeat_Server', + url: 'redis://10.8.8.109:6379', + apiBaseUrl: `http://127.0.0.1:${env.PORT ?? 3001}`, + heartbeatIntervalMs: 3000, + heartbeatTtlSeconds: 30, + consoleMaxLen: null, + }, + // Kafka配置 kafka: { - brokers: ['localhost:9092'], // Kafka集群地址 - groupId: 'bls-heartbeat-consumer', // 消费者组ID - topic: 'bls-heartbeat', // 心跳消息主题 - autoCommit: true, // 自动提交偏移量 - autoCommitIntervalMs: 5000, // 自动提交间隔 - retryAttempts: 3, // 重试次数 - retryDelay: 1000 // 重试延迟 + brokers: envList(env.KAFKA_BROKERS).length ? envList(env.KAFKA_BROKERS) : ['kafka.blv-oa.com:9092'], + clientId: env.KAFKA_CLIENT_ID ?? 'bls-heartbeat', + groupId: env.KAFKA_GROUP_ID ?? 'bls-heartbeat-consumer', + topics: envList(env.KAFKA_TOPICS).length ? envList(env.KAFKA_TOPICS) : ['blwlog4Nodejs-rcu-heartbeat-topic'], + autoCommit: (env.KAFKA_AUTO_COMMIT ?? 'true') === 'true', + autoCommitIntervalMs: Number(env.KAFKA_AUTO_COMMIT_INTERVAL_MS ?? 5000), + retryAttempts: 3, + retryDelay: 1000, + saslEnabled: (env.KAFKA_SASL_ENABLED ?? 'false') === 'true', + saslMechanism: env.KAFKA_SASL_MECHANISM ?? 'plain', + saslUsername: env.KAFKA_SASL_USERNAME, + saslPassword: env.KAFKA_SASL_PASSWORD, + sslEnabled: (env.KAFKA_SSL_ENABLED ?? 'false') === 'true', }, // 处理器配置 @@ -21,13 +45,13 @@ export default { // 数据库配置 db: { - host: '10.8.8.109', // 数据库主机 - port: 5433, // 数据库端口 - user: 'log_admin', // 数据库用户名 - password: 'YourActualStrongPasswordForPostgres!', // 数据库密码 - database: 'log_platform', // 数据库名称 - maxConnections: 10, // 最大连接数 - idleTimeoutMillis: 30000, // 连接空闲超时时间 + host: env.POSTGRES_HOST ?? '10.8.8.109', + port: Number(env.POSTGRES_PORT ?? 5433), + user: env.POSTGRES_USER ?? 'log_admin', + password: env.POSTGRES_PASSWORD ?? 'YourActualStrongPasswordForPostgres!', + database: env.POSTGRES_DATABASE ?? 'log_platform', + maxConnections: Number(env.POSTGRES_MAX_CONNECTIONS ?? 6), + idleTimeoutMillis: Number(env.POSTGRES_IDLE_TIMEOUT_MS ?? 30000), retryAttempts: 3, // 重试次数 retryDelay: 1000, // 重试延迟 @@ -41,13 +65,13 @@ export default { // 日志配置 logger: { - level: 'info', // 日志级别 + level: env.LOG_LEVEL ?? 'info', format: 'json' // 日志格式 }, // 应用配置 app: { - port: 3000, // 应用端口 - env: 'development' // 运行环境 + port: Number(env.PORT ?? 3001), + env: env.NODE_ENV ?? 'development' } -}; \ No newline at end of file +}; diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 85ffe69..c2dd203 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -83,9 +83,9 @@ class DatabaseManager { ts_ms bigint NOT NULL, hotel_id int2 NOT NULL, - room_id int4 NOT NULL, + room_id varchar(50) NOT NULL, device_id varchar(64) NOT NULL, - ip inet NOT NULL, + ip varchar(21) NOT NULL, power_state int2 NOT NULL, guest_type int2 NOT NULL, cardless_state int2 NOT NULL, @@ -93,7 +93,7 @@ class DatabaseManager { pms_state int2 NOT NULL, carbon_state int2 NOT NULL, device_count int2 NOT NULL, - comm_seq int2 NOT NULL, + comm_seq int4 NOT NULL, extra jsonb, @@ -101,14 +101,14 @@ class DatabaseManager { CONSTRAINT chk_ts_ms_positive CHECK (ts_ms > 0), CONSTRAINT chk_hotel_id_range CHECK (hotel_id >= 0 AND hotel_id <= 32767), - CONSTRAINT chk_room_id_range CHECK (room_id >= 0), + CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50), CONSTRAINT chk_power_state_range CHECK (power_state >= 0 AND power_state <= 32767), CONSTRAINT chk_guest_type_range CHECK (guest_type >= 0 AND guest_type <= 32767), CONSTRAINT chk_cardless_state_range CHECK (cardless_state >= 0 AND cardless_state <= 32767), CONSTRAINT chk_pms_state_range CHECK (pms_state >= 0 AND pms_state <= 32767), CONSTRAINT chk_carbon_state_range CHECK (carbon_state >= 0 AND carbon_state <= 32767), CONSTRAINT chk_device_count_range CHECK (device_count >= 0 AND device_count <= 32767), - CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0 AND comm_seq <= 32767) + CONSTRAINT chk_comm_seq_range CHECK (comm_seq >= 0) ) PARTITION BY RANGE (ts_ms); @@ -194,6 +194,8 @@ class DatabaseManager { await this.pool.query(legacyTableQuery); await this.pool.query(v2SchemaQuery); + await this.ensureIpColumnVarchar(); + await this.ensureRoomIdColumnVarchar(); console.log('数据库表初始化成功'); } catch (error) { console.error('数据库表初始化失败:', error); @@ -201,6 +203,116 @@ class DatabaseManager { } } + async ensureRoomIdColumnVarchar() { + const res = await this.pool.query( + ` + SELECT format_type(a.atttypid, a.atttypmod) AS type + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' + AND c.relname = 'heartbeat_events' + AND a.attname = 'room_id' + AND a.attnum > 0 + AND NOT a.attisdropped + ` + ); + + const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); + if (!type) return; + if (type.startsWith('character varying')) return; + + await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_range'); + await this.pool.query('ALTER TABLE heartbeat.heartbeat_events DROP CONSTRAINT IF EXISTS chk_room_id_len'); + + await this.pool.query( + `ALTER TABLE heartbeat.heartbeat_events + ALTER COLUMN room_id TYPE varchar(50) + USING room_id::text` + ); + + await this.pool.query( + 'ALTER TABLE heartbeat.heartbeat_events ADD CONSTRAINT chk_room_id_len CHECK (char_length(room_id) > 0 AND char_length(room_id) <= 50)' + ); + + const parts = await this.pool.query( + ` + SELECT c.relname AS partition + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_namespace n ON n.oid = p.relnamespace + WHERE n.nspname = 'heartbeat' + AND p.relname = 'heartbeat_events' + ORDER BY c.relname + ` + ); + + for (const row of parts.rows ?? []) { + const name = row?.partition; + if (!name) continue; + await this.pool.query( + `ALTER TABLE heartbeat.${this.escapeIdentifier(name)} + ALTER COLUMN room_id TYPE varchar(50) + USING room_id::text` + ); + } + } + + async ensureIpColumnVarchar() { + const res = await this.pool.query( + ` + SELECT format_type(a.atttypid, a.atttypmod) AS type + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'heartbeat' + AND c.relname = 'heartbeat_events' + AND a.attname = 'ip' + AND a.attnum > 0 + AND NOT a.attisdropped + ` + ); + + const type = String(res?.rows?.[0]?.type ?? '').toLowerCase(); + if (!type) return; + if (type.startsWith('character varying')) return; + if (!type.startsWith('inet')) return; + + await this.pool.query( + `ALTER TABLE heartbeat.heartbeat_events + ALTER COLUMN ip TYPE varchar(21) + USING ip::text` + ); + + const parts = await this.pool.query( + ` + SELECT c.relname AS partition + FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_namespace n ON n.oid = p.relnamespace + WHERE n.nspname = 'heartbeat' + AND p.relname = 'heartbeat_events' + ORDER BY c.relname + ` + ); + + for (const row of parts.rows ?? []) { + const name = row?.partition; + if (!name) continue; + await this.pool.query( + `ALTER TABLE heartbeat.${this.escapeIdentifier(name)} + ALTER COLUMN ip TYPE varchar(21) + USING ip::text` + ); + } + } + + escapeIdentifier(id) { + return `"${String(id).replace(/"/g, '""')}"`; + } + getPartitionConfig() { const cfg = this.config.partitionMaintenance ?? {}; return { @@ -218,7 +330,7 @@ class DatabaseManager { const startOffset = Number(startDayOffset ?? 0); const endOffset = Number(endDayOffset ?? 0); await this.pool.query( - 'SELECT heartbeat.ensure_partitions(current_date + $1::int, current_date + $2::int)', + "SELECT heartbeat.ensure_partitions(((now() AT TIME ZONE 'Asia/Shanghai')::date) + $1::int, ((now() AT TIME ZONE 'Asia/Shanghai')::date) + $2::int)", [startOffset, endOffset] ); } @@ -245,9 +357,6 @@ class DatabaseManager { console.error('[db] 分区预创建维护失败:', err); } }, intervalMs); - - // 不阻止进程退出 - this.partitionMaintenanceTimer.unref?.(); } stopPartitionMaintenance() { @@ -289,11 +398,6 @@ class DatabaseManager { } if (events.length === 0) return; - const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); - if (tsValues.length > 0) { - await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); - } - const columns = [ 'ts_ms', 'hotel_id', @@ -338,21 +442,59 @@ class DatabaseManager { const sql = `INSERT INTO heartbeat.heartbeat_events (${columns.join(', ')}) VALUES ${placeholders}`; - try { - await this.pool.query(sql, values); - } catch (error) { - // 兜底:若仍因缺分区失败,尝试确保“当前到未来 N 天”后重试一次 - if (this.isMissingPartitionError(error)) { - console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); - await this.ensurePartitionsForRange({ - startDayOffset: -7, - endDayOffset: this.getPartitionFutureDays(), - }); - await this.pool.query(sql, values); - return; + const runInsertOnce = async () => { + const tsValues = events.map((e) => Number(e.ts_ms)).filter((n) => Number.isFinite(n)); + if (tsValues.length > 0) { + await this.ensurePartitionsForTsRange(Math.min(...tsValues), Math.max(...tsValues)); + } + + const client = await this.pool.connect(); + try { + await client.query('BEGIN'); + const res = await client.query(sql, values); + const insertedCount = Number(res?.rowCount ?? 0); + if (insertedCount !== events.length) { + throw new Error(`insert rowCount mismatch: expect=${events.length} actual=${insertedCount}`); + } + await client.query('COMMIT'); + return { insertedCount }; + } catch (error) { + try { + await client.query('ROLLBACK'); + } catch (rollbackError) { + console.error('[db] rollback failed:', rollbackError); + } + throw error; + } finally { + client.release(); + } + }; + + const retryAttempts = Number(this.config?.retryAttempts ?? 0); + const retryDelay = Math.max(250, Number(this.config?.retryDelay ?? 1000)); + const maxAttempts = retryAttempts > 0 ? retryAttempts : 1; + + let lastError = null; + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + try { + return await runInsertOnce(); + } catch (error) { + lastError = error; + if (this.isMissingPartitionError(error)) { + console.warn('[db] 检测到缺分区写入失败,执行兜底预创建并重试一次'); + await this.ensurePartitionsForRange({ + startDayOffset: -7, + endDayOffset: this.getPartitionFutureDays(), + }); + } + if (attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, retryDelay)); + continue; + } } - throw error; } + + throw lastError; } async insertHeartbeatData(data) { @@ -381,8 +523,9 @@ class DatabaseManager { values: values.flat() }; - await this.pool.query(query); + const res = await this.pool.query(query); console.log(`成功插入 ${data.length} 条心跳数据`); + return { insertedCount: Number(res?.rowCount ?? data.length) }; } catch (error) { console.error('插入心跳数据失败:', error); throw error; @@ -430,4 +573,4 @@ class DatabaseManager { } } -export { DatabaseManager }; \ No newline at end of file +export { DatabaseManager }; diff --git a/src/index.js b/src/index.js index 7df1878..22fbea0 100644 --- a/src/index.js +++ b/src/index.js @@ -3,6 +3,7 @@ import config from './config/config.js'; import { KafkaConsumer } from './kafka/consumer.js'; import { HeartbeatProcessor } from './processor/heartbeatProcessor.js'; import { DatabaseManager } from './db/databaseManager.js'; +import { RedisIntegration } from './redis/redisIntegration.js'; class WebBLSHeartbeatServer { constructor() { @@ -10,48 +11,69 @@ class WebBLSHeartbeatServer { this.kafkaConsumer = null; this.heartbeatProcessor = null; this.databaseManager = null; + this.redis = null; + this.consumers = null; } async start() { try { + // 初始化 Redis(按协议写入心跳与控制台日志) + this.redis = new RedisIntegration(this.config.redis); + await this.redis.connect(); + this.redis.startHeartbeat(); + // 初始化数据库连接 - this.databaseManager = new DatabaseManager(this.config.db); + this.databaseManager = new DatabaseManager({ ...this.config.db, maxConnections: 1 }); await this.databaseManager.connect(); console.log('数据库连接成功'); + await this.redis?.info('数据库连接成功', { module: 'db' }); - // 初始化处理器 - this.heartbeatProcessor = new HeartbeatProcessor( - this.config.processor, - this.databaseManager - ); + // 初始化处理器(共享批处理队列) + this.heartbeatProcessor = new HeartbeatProcessor(this.config.processor, this.databaseManager); - // 初始化Kafka消费者 - this.kafkaConsumer = new KafkaConsumer( - this.config.kafka, - this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor) - ); - await this.kafkaConsumer.connect(); - await this.kafkaConsumer.subscribe(); - await this.kafkaConsumer.startConsuming(); - console.log('Kafka消费者启动成功'); + // 在单进程内启动 N 个消费者实例(与分区数匹配) + const instances = Math.max(1, Number(this.config.kafka?.consumerInstances ?? 1)); + this.consumers = []; + for (let i = 0; i < instances; i++) { + const consumer = new KafkaConsumer( + { ...this.config.kafka, consumerInstanceIndex: i }, + this.heartbeatProcessor.processMessage.bind(this.heartbeatProcessor) + ); + await consumer.connect(); + await consumer.subscribe(); + await consumer.startConsuming(); + this.consumers.push({ consumer }); + } + console.log(`Kafka消费者启动成功,共 ${instances} 个实例`); + await this.redis?.info('Kafka消费者启动成功', { module: 'kafka', topic: this.config.kafka?.topic, instances }); console.log('BLS心跳接收端启动成功'); + await this.redis?.info('BLS心跳接收端启动成功', { module: 'app' }); } catch (error) { console.error('启动失败:', error); + await this.redis?.error('启动失败', { module: 'app', error: String(error?.message ?? error) }); process.exit(1); } } async stop() { try { - if (this.kafkaConsumer) { - await this.kafkaConsumer.stopConsuming(); - await this.kafkaConsumer.disconnect(); + if (this.consumers && Array.isArray(this.consumers)) { + for (const { consumer } of this.consumers) { + await consumer.stopConsuming(); + await consumer.disconnect(); + } + this.consumers = null; } if (this.databaseManager) { await this.databaseManager.disconnect(); } + + if (this.redis) { + await this.redis.info('BLS心跳接收端已停止', { module: 'app' }); + await this.redis.disconnect(); + } console.log('BLS心跳接收端已停止'); } catch (error) { @@ -75,4 +97,4 @@ process.on('SIGTERM', () => { process.exit(0); }); -export { WebBLSHeartbeatServer }; \ No newline at end of file +export { WebBLSHeartbeatServer }; diff --git a/src/kafka/consumer.js b/src/kafka/consumer.js index 594a50b..f59c31a 100644 --- a/src/kafka/consumer.js +++ b/src/kafka/consumer.js @@ -1,44 +1,217 @@ // Kafka消费者模块 +import kafka from 'kafka-node'; + +const { ConsumerGroupStream } = kafka; class KafkaConsumer { constructor(config, messageHandler) { this.config = config; this.messageHandler = messageHandler; - this.consumer = null; + this.consumerGroupStream = null; this.isRunning = false; + + this._reconnectTimer = null; + this._reconnectAttempts = 0; + + this._inFlight = new Set(); + this._paused = false; } async connect() { - // 实现Kafka连接逻辑 + // ConsumerGroup 会在创建时建立连接 console.log('连接到Kafka集群:', this.config.brokers); - // TODO: 实现Kafka连接 } async disconnect() { - // 实现Kafka断开连接逻辑 console.log('断开与Kafka集群的连接'); - // TODO: 实现Kafka断开连接 + this._clearReconnectTimer(); + if (this.consumerGroupStream) { + await new Promise((resolve) => { + this.consumerGroupStream.close(() => resolve()); + }); + this.consumerGroupStream = null; + } } async subscribe() { - // 实现Kafka订阅逻辑 - console.log('订阅Kafka主题:', this.config.topic); - // TODO: 实现Kafka订阅 + const topics = this.getTopics(); + console.log('订阅Kafka主题:', topics.join(', ')); + + const kafkaHost = Array.isArray(this.config.brokers) + ? this.config.brokers.join(',') + : String(this.config.brokers ?? ''); + + const idx = Number(this.config?.consumerInstanceIndex ?? 0); + const memberId = `${this.config.clientId ?? 'bls-heartbeat'}-${process.pid}-${Number.isFinite(idx) ? idx : 0}`; + const options = { + kafkaHost, + id: memberId, + clientId: memberId, + groupId: this.config.groupId, + protocol: ['range', 'roundrobin'], + autoCommit: false, + autoCommitIntervalMs: this.config.autoCommitIntervalMs, + // 从最新开始(生产环境常见);需要历史消费可改 earliest + fromOffset: 'latest', + encoding: 'buffer', + keyEncoding: 'buffer', + }; + + if (this.config?.sslEnabled === true) { + options.ssl = true; + } + + if (this.config?.saslEnabled === true) { + options.sasl = { + mechanism: this.config?.saslMechanism ?? 'plain', + username: this.config?.saslUsername, + password: this.config?.saslPassword, + }; + } + + this.consumerGroupStream = new ConsumerGroupStream({ ...options, autoCommit: false }, topics); + + this.consumerGroupStream.on('error', (err) => { + console.error('[kafka] consumer error:', err); + if (this.isRunning) { + this._scheduleReconnect('consumer error'); + } + }); + + this.consumerGroupStream.on('connect', () => { + console.log('[kafka] connected'); + this._reconnectAttempts = 0; + }); } async startConsuming() { - // 实现Kafka消息消费逻辑 console.log('开始消费Kafka消息'); + if (!this.consumerGroupStream) { + throw new Error('KafkaConsumer 未 subscribe'); + } this.isRunning = true; - // TODO: 实现Kafka消息消费 + + // 若已挂了重连定时器,说明上一轮失败,先清掉 + this._clearReconnectTimer(); + + this.consumerGroupStream.on('data', (message) => { + if (!this.isRunning) return; + const p = this._handleMessage(message); + this._inFlight.add(p); + this._updateBackpressure(); + p.finally(() => { + this._inFlight.delete(p); + this._updateBackpressure(); + }); + }); } async stopConsuming() { - // 实现停止Kafka消息消费逻辑 console.log('停止消费Kafka消息'); this.isRunning = false; - // TODO: 实现停止Kafka消息消费 + this._clearReconnectTimer(); + await Promise.allSettled(Array.from(this._inFlight)); + } + + getTopics() { + const topics = this.config?.topics; + if (Array.isArray(topics) && topics.length) { + return topics.map((t) => String(t)).filter(Boolean); + } + const topic = this.config?.topic; + return [String(topic ?? '')].filter(Boolean); + } + + _getRetryConfig() { + const attempts = Number(this.config?.retryAttempts ?? 0); + const delayMs = Number(this.config?.retryDelay ?? 1000); + return { + // attempts <= 0 表示无限重试 + attempts: Number.isFinite(attempts) ? attempts : 0, + delayMs: Number.isFinite(delayMs) && delayMs > 0 ? delayMs : 1000, + }; + } + + _clearReconnectTimer() { + if (this._reconnectTimer) { + clearTimeout(this._reconnectTimer); + this._reconnectTimer = null; + } + } + + async _handleMessage(message) { + try { + await this.messageHandler(message); + await this._commitMessage(message); + } catch (err) { + console.error('[kafka] messageHandler failed:', err); + } + } + + async _commitMessage(message) { + if (!this.consumerGroupStream) return; + await new Promise((resolve, reject) => { + this.consumerGroupStream.commit(message, false, (err) => { + if (err) return reject(err); + resolve(); + }); + }).catch((err) => { + console.error('[kafka] commit failed:', err); + }); + } + + _updateBackpressure() { + if (!this.consumerGroupStream) return; + const max = Number(this.config?.maxInFlightMessages ?? 0); + if (!Number.isFinite(max) || max <= 0) return; + + const shouldPause = this._inFlight.size >= max; + if (shouldPause && !this._paused) { + this.consumerGroupStream.pause(); + this._paused = true; + console.warn(`[kafka] paused: inFlight=${this._inFlight.size} max=${max}`); + return; + } + + if (!shouldPause && this._paused) { + this.consumerGroupStream.resume(); + this._paused = false; + console.warn(`[kafka] resumed: inFlight=${this._inFlight.size} max=${max}`); + } + } + + _scheduleReconnect(reason) { + this._clearReconnectTimer(); + + const { attempts, delayMs } = this._getRetryConfig(); + this._reconnectAttempts += 1; + + if (attempts > 0 && this._reconnectAttempts > attempts) { + console.error(`[kafka] reached max reconnect attempts (${attempts}); stop reconnecting`); + return; + } + + const wait = Math.min(delayMs * this._reconnectAttempts, 30_000); + console.warn(`[kafka] scheduling reconnect in ${wait}ms (attempt ${this._reconnectAttempts}) reason=${reason}`); + + // 不 unref:Kafka 不可用时也要保持进程存活并持续重连 + this._reconnectTimer = setTimeout(async () => { + if (!this.isRunning) return; + try { + await this.disconnect(); + } catch (err) { + console.error('[kafka] disconnect during reconnect failed:', err); + } + + try { + await this.subscribe(); + await this.startConsuming(); + } catch (err) { + console.error('[kafka] reconnect failed:', err); + this._scheduleReconnect('reconnect failed'); + } + }, wait); } } -export { KafkaConsumer }; \ No newline at end of file +export { KafkaConsumer }; diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index c3c03fe..c68a35e 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -1,90 +1,462 @@ // 心跳处理器模块 +import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib'; class HeartbeatProcessor { constructor(config, databaseManager) { this.config = config; this.databaseManager = databaseManager; this.batchQueue = []; + this.batchMessageQueue = []; this.batchTimer = null; + this._batchInFlight = false; } async processMessage(message) { - try { - // 解包心跳消息 - const unpackedData = this.unpackMessage(message); - + const deferred = this.createDeferred(); + + // 解包心跳消息 + const unpackedData = this.unpackMessage(message); + + // 支持批量上报:message.value 可能是 JSON 数组 + const items = Array.isArray(unpackedData) ? unpackedData : [unpackedData]; + + let addedCount = 0; + for (const item of items) { + const effective = this.unwrapPayload(item); + // 验证心跳数据 - const isValid = this.validateData(unpackedData); + const isValid = this.validateData(effective); if (!isValid) { - console.error('无效的心跳数据:', unpackedData); - return; + console.error('无效的心跳数据:', effective); + continue; } - + // 转换数据格式 - const transformedData = this.transformData(unpackedData); - + const transformedData = this.transformData(effective); + // 添加到批量队列 this.batchQueue.push(transformedData); - - // 检查是否需要立即处理 - if (this.batchQueue.length >= this.config.batchSize) { - await this.processBatch(); - } else if (!this.batchTimer) { - // 设置批量处理定时器 - this.batchTimer = setTimeout( - () => this.processBatch(), - this.config.batchTimeout - ); - } - } catch (error) { - console.error('处理消息失败:', error); + addedCount += 1; } + + if (addedCount === 0) { + deferred.resolve({ insertedCount: 0 }); + return deferred.promise; + } + + this.batchMessageQueue.push({ deferred, eventCount: addedCount, message }); + + if (this.shouldFlushNow()) { + this.processBatch(); + return deferred.promise; + } + + if (!this.batchTimer) { + this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout); + } + + return deferred.promise; } unpackMessage(message) { - // 实现心跳消息解包逻辑 - console.log('解包心跳消息:', message); - // TODO: 实现消息解包 - return {}; + // kafka-node message: { value: Buffer|string, key: Buffer|string, ... } + const raw = message?.value; + const obj = this.decodeToObject(raw); + return obj; + } + + unwrapPayload(obj) { + if (!obj || typeof obj !== 'object') return obj; + // 常见的包装结构:{ data: {...} } / { payload: {...} } / { body: {...} } + const candidates = ['data', 'payload', 'body', 'message']; + for (const k of candidates) { + const v = obj[k]; + if (v && typeof v === 'object') return v; + } + return obj; } validateData(data) { - // 实现心跳数据验证逻辑 - console.log('验证心跳数据:', data); - // TODO: 实现数据验证 + if (!data || typeof data !== 'object') return false; + + // v2 必填字段校验(宽松:允许上游使用 camelCase/PascalCase) + const normalized = this.normalizeHeartbeat(data); + const required = [ + 'ts_ms', + 'hotel_id', + 'room_id', + 'device_id', + 'ip', + 'power_state', + 'guest_type', + 'cardless_state', + 'service_mask', + 'pms_state', + 'carbon_state', + 'device_count', + 'comm_seq', + ]; + + for (const k of required) { + if (normalized[k] === undefined || normalized[k] === null) { + return false; + } + } + + const isDigits = (v) => typeof v === 'string' && /^-?\d+$/.test(v); + const isFiniteNumber = (v) => typeof v === 'number' && Number.isFinite(v); + + if (!isFiniteNumber(normalized.hotel_id)) return false; + if (!isFiniteNumber(normalized.power_state)) return false; + if (!isFiniteNumber(normalized.guest_type)) return false; + if (!isFiniteNumber(normalized.cardless_state)) return false; + if (!isFiniteNumber(normalized.pms_state)) return false; + if (!isFiniteNumber(normalized.carbon_state)) return false; + if (!isFiniteNumber(normalized.device_count)) return false; + if (!isFiniteNumber(normalized.comm_seq)) return false; + + if (!(isFiniteNumber(normalized.ts_ms) || isDigits(normalized.ts_ms))) return false; + if (!(isFiniteNumber(normalized.service_mask) || isDigits(normalized.service_mask))) return false; + + if (typeof normalized.device_id !== 'string' || normalized.device_id.length === 0) return false; + if (typeof normalized.room_id !== 'string' || normalized.room_id.length === 0 || normalized.room_id.length > 50) { + return false; + } + if (typeof normalized.ip !== 'string' || normalized.ip.length === 0) return false; + return true; } transformData(data) { - // 实现心跳数据转换逻辑 - console.log('转换心跳数据:', data); - // TODO: 实现数据转换 - return data; + return this.normalizeHeartbeat(data); } async processBatch() { - if (this.batchQueue.length === 0) { - return; - } - - // 清除定时器 + if (this._batchInFlight) return; + if (this.batchQueue.length === 0) return; + if (this.batchMessageQueue.length === 0) return; + if (this.batchTimer) { clearTimeout(this.batchTimer); this.batchTimer = null; } - + + this._batchInFlight = true; + let hasMore = false; + try { - // 获取当前批次数据 - const batchData = [...this.batchQueue]; - this.batchQueue = []; - - // 写入数据库 - await this.databaseManager.insertHeartbeatData(batchData); + const { batchEventCount, batchMessageCount } = this.computeNextBatchWindow(); + const batchData = this.batchQueue.slice(0, batchEventCount); + const batchMessages = this.batchMessageQueue.slice(0, batchMessageCount); + + let insertedCount = 0; + if (typeof this.databaseManager.insertHeartbeatEvents === 'function') { + const result = await this.databaseManager.insertHeartbeatEvents(batchData); + insertedCount = Number(result?.insertedCount ?? result ?? 0); + } else { + const result = await this.databaseManager.insertHeartbeatData(batchData); + insertedCount = Number(result?.insertedCount ?? result ?? 0); + } + + if (insertedCount !== batchData.length) { + throw new Error(`落库结果校验失败:expect=${batchData.length} actual=${insertedCount}`); + } + + this.batchQueue.splice(0, batchEventCount); + this.batchMessageQueue.splice(0, batchMessageCount); + + for (const entry of batchMessages) { + entry.deferred.resolve({ insertedCount: entry.eventCount }); + } + console.log(`成功处理批次数据,共 ${batchData.length} 条`); + hasMore = this.batchQueue.length > 0; } catch (error) { console.error('批量处理失败:', error); + if (!this.batchTimer) { + const retryDelay = Math.max(250, Number(this.config.batchTimeout ?? 1000)); + this.batchTimer = setTimeout(() => this.processBatch(), retryDelay); + } + } finally { + this._batchInFlight = false; + if (hasMore && this.shouldFlushNow()) { + setImmediate(() => this.processBatch()); + } else if (!this.batchTimer && this.batchQueue.length > 0) { + this.batchTimer = setTimeout(() => this.processBatch(), this.config.batchTimeout); + } } } + + shouldFlushNow() { + const max = Math.max(1, Number(this.config.batchSize ?? 1)); + return this.batchQueue.length >= max; + } + + computeNextBatchWindow() { + const maxEvents = Math.max(1, Number(this.config.batchSize ?? 1)); + let batchEventCount = 0; + let batchMessageCount = 0; + + for (const entry of this.batchMessageQueue) { + const cnt = Math.max(0, Number(entry?.eventCount ?? 0)); + if (batchMessageCount === 0 && cnt > maxEvents) { + batchEventCount = cnt; + batchMessageCount = 1; + break; + } + if (batchEventCount + cnt > maxEvents) break; + batchEventCount += cnt; + batchMessageCount += 1; + } + + if (batchMessageCount === 0) { + const first = this.batchMessageQueue[0]; + batchEventCount = Math.max(0, Number(first?.eventCount ?? 0)); + batchMessageCount = 1; + } + + return { batchEventCount, batchMessageCount }; + } + + createDeferred() { + let resolve = null; + let reject = null; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; + } + + decodeToObject(input) { + let buf = this.toBuffer(input); + + // 最多尝试 3 轮(兼容“2层压缩”+base64) + for (let i = 0; i < 3; i++) { + // 1) 直接当 UTF-8 文本 + const text = this.tryDecodeUtf8(buf); + if (text) { + const trimmed = text.trim(); + if (trimmed.startsWith('{') || trimmed.startsWith('[')) { + return JSON.parse(trimmed); + } + + // 2) base64(有些上游会把二进制压缩结果再 base64) + if (this.looksLikeBase64(trimmed)) { + try { + buf = Buffer.from(trimmed, 'base64'); + continue; + } catch { + // ignore + } + } + } + + // 3) gzip / deflate + const decompressed = this.tryDecompress(buf); + if (decompressed) { + buf = decompressed; + continue; + } + + break; + } + + // 最后再试一次 JSON.parse + const finalText = this.tryDecodeUtf8(buf); + if (finalText) { + return JSON.parse(finalText); + } + + throw new Error('无法解码 Kafka message.value 为 JSON 对象'); + } + + toBuffer(input) { + if (Buffer.isBuffer(input)) return input; + if (input === undefined || input === null) return Buffer.from(''); + if (typeof input === 'string') return Buffer.from(input, 'utf8'); + // kafka-node 在 encoding=buffer 时通常给 Buffer;兜底 + return Buffer.from(String(input), 'utf8'); + } + + tryDecodeUtf8(buf) { + if (!buf || buf.length === 0) return null; + try { + const text = buf.toString('utf8'); + let nonPrintableCount = 0; + for (let i = 0; i < text.length; i++) { + const code = text.charCodeAt(i); + const allowed = + code === 0x09 || + code === 0x0a || + code === 0x0d || + (code >= 0x20 && code <= 0x7e) || + (code >= 0x4e00 && code <= 0x9fff); + if (!allowed) { + nonPrintableCount++; + } + } + if (nonPrintableCount > Math.max(8, text.length * 0.2)) { + return null; + } + return text; + } catch { + return null; + } + } + + looksLikeBase64(str) { + if (!str || str.length < 16) return false; + const s = str.replace(/\s+/g, ''); + if (s.length % 4 !== 0) return false; + return /^[A-Za-z0-9+/=]+$/.test(s); + } + + tryDecompress(buf) { + if (!buf || buf.length < 2) return null; + + // gzip magic: 1f 8b + if (buf[0] === 0x1f && buf[1] === 0x8b) { + try { + return gunzipSync(buf); + } catch { + return null; + } + } + + // zlib/deflate 常见头:0x78 0x01/0x9c/0xda + if (buf[0] === 0x78) { + try { + return inflateSync(buf); + } catch { + return null; + } + } + + // .NET DeflateStream 常见为 raw deflate(无 zlib 头) + try { + const out = inflateRawSync(buf); + // 粗略判定:解出来应当是可读文本或至少有长度 + if (out && out.length > 0) return out; + } catch { + // ignore + } + + // brotli(若上游用 br 压缩) + try { + const out = brotliDecompressSync(buf); + if (out && out.length > 0) return out; + } catch { + // ignore + } + + return null; + } + + normalizeHeartbeat(obj) { + // 支持 snake_case / camelCase / PascalCase + const pick = (keys) => { + for (const k of keys) { + if (obj[k] !== undefined && obj[k] !== null) return obj[k]; + } + return undefined; + }; + + const normalized = { + ts_ms: pick(['ts_ms', 'tsMs', 'TsMs', 'timestampMs', 'TimestampMs', 'timestamp', 'Timestamp', 'ts', 'Ts']), + hotel_id: pick(['hotel_id', 'hotelId', 'HotelId']), + room_id: pick(['room_id', 'roomId', 'RoomId']), + device_id: pick(['device_id', 'deviceId', 'DeviceId', 'device', 'Device']), + ip: pick(['ip', 'Ip', 'IP']), + power_state: pick(['power_state', 'powerState', 'PowerState']), + guest_type: pick(['guest_type', 'guestType', 'GuestType']), + cardless_state: pick(['cardless_state', 'cardlessState', 'CardlessState']), + service_mask: pick(['service_mask', 'serviceMask', 'ServiceMask']), + pms_state: pick(['pms_state', 'pmsState', 'PmsState']), + carbon_state: pick(['carbon_state', 'carbonState', 'CarbonState']), + device_count: pick(['device_count', 'deviceCount', 'DeviceCount']), + comm_seq: pick(['comm_seq', 'commSeq', 'CommSeq']), + extra: pick(['extra', 'Extra']), + }; + + const toTrimmedStringOrUndefined = (v) => { + if (v === undefined || v === null) return v; + const s = String(v).trim(); + return s.length === 0 ? undefined : s; + }; + + const toIntOrUndefined = (v) => { + if (v === undefined || v === null) return v; + if (typeof v === 'number') { + if (!Number.isFinite(v)) return undefined; + return Math.trunc(v); + } + const s = String(v).trim(); + if (s.length === 0) return undefined; + if (!/^-?\d+$/.test(s)) return undefined; + const n = Number(s); + if (!Number.isFinite(n)) return undefined; + return Math.trunc(n); + }; + + const toBigintParamOrUndefined = (v) => { + if (v === undefined || v === null) return v; + if (typeof v === 'number') { + if (!Number.isFinite(v)) return undefined; + const n = Math.trunc(v); + return Number.isSafeInteger(n) ? n : String(n); + } + const s = String(v).trim(); + if (s.length === 0) return undefined; + if (!/^-?\d+$/.test(s)) return undefined; + return s; + }; + + normalized.ts_ms = toBigintParamOrUndefined(normalized.ts_ms); + normalized.hotel_id = toIntOrUndefined(normalized.hotel_id); + normalized.room_id = toTrimmedStringOrUndefined(normalized.room_id); + normalized.device_id = toTrimmedStringOrUndefined(normalized.device_id); + normalized.ip = toTrimmedStringOrUndefined(normalized.ip); + normalized.power_state = toIntOrUndefined(normalized.power_state); + normalized.guest_type = toIntOrUndefined(normalized.guest_type); + normalized.cardless_state = toIntOrUndefined(normalized.cardless_state); + normalized.service_mask = toBigintParamOrUndefined(normalized.service_mask); + normalized.pms_state = toIntOrUndefined(normalized.pms_state); + normalized.carbon_state = toIntOrUndefined(normalized.carbon_state); + normalized.device_count = toIntOrUndefined(normalized.device_count); + normalized.comm_seq = toIntOrUndefined(normalized.comm_seq); + + // 其余未知字段塞进 extra(避免丢信息),但不覆盖显式 extra + if (!normalized.extra || typeof normalized.extra !== 'object') { + normalized.extra = {}; + } + + for (const [k, v] of Object.entries(obj)) { + if ( + [ + 'ts_ms','tsMs','TsMs','timestampMs','TimestampMs','timestamp','Timestamp','ts','Ts', + 'hotel_id','hotelId','HotelId', + 'room_id','roomId','RoomId', + 'device_id','deviceId','DeviceId','device','Device', + 'ip','Ip','IP', + 'power_state','powerState','PowerState', + 'guest_type','guestType','GuestType', + 'cardless_state','cardlessState','CardlessState', + 'service_mask','serviceMask','ServiceMask', + 'pms_state','pmsState','PmsState', + 'carbon_state','carbonState','CarbonState', + 'device_count','deviceCount','DeviceCount', + 'comm_seq','commSeq','CommSeq', + 'extra','Extra' + ].includes(k) + ) { + continue; + } + normalized.extra[k] = v; + } + + return normalized; + } } -export { HeartbeatProcessor }; \ No newline at end of file +export { HeartbeatProcessor }; diff --git a/src/redis/redisIntegration.js b/src/redis/redisIntegration.js new file mode 100644 index 0000000..7dada3a --- /dev/null +++ b/src/redis/redisIntegration.js @@ -0,0 +1,263 @@ +import { createClient } from 'redis'; + +class RedisIntegration { + constructor(config) { + this.config = config; + this.client = null; + this.heartbeatTimer = null; + + this._connectPromise = null; + this._lastErrorLogAt = 0; + } + + isEnabled() { + return Boolean(this.config?.enabled); + } + + getProjectName() { + const projectName = this.config?.projectName; + if (!projectName || typeof projectName !== 'string') { + throw new Error('Redis projectName 未配置'); + } + return projectName; + } + + getHeartbeatKey() { + return `${this.getProjectName()}_项目心跳`; + } + + getConsoleKey() { + return `${this.getProjectName()}_项目控制台`; + } + + getApiBaseUrl() { + const apiBaseUrl = this.config?.apiBaseUrl; + if (!apiBaseUrl || typeof apiBaseUrl !== 'string') { + throw new Error('Redis apiBaseUrl 未配置'); + } + return apiBaseUrl; + } + + getHeartbeatIntervalMs() { + const ms = Number(this.config?.heartbeatIntervalMs ?? 3000); + return Number.isFinite(ms) && ms > 0 ? ms : 3000; + } + + getHeartbeatTtlSeconds() { + const ttl = this.config?.heartbeatTtlSeconds; + if (ttl === undefined || ttl === null) return null; + const n = Number(ttl); + return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; + } + + getConsoleMaxLen() { + const v = this.config?.consoleMaxLen; + if (v === undefined || v === null) return null; + const n = Number(v); + return Number.isFinite(n) && n > 0 ? Math.floor(n) : null; + } + + async connect() { + if (!this.isEnabled()) { + console.log('[redis] disabled'); + return; + } + + if (this.client) { + // 已创建 client,则后台保证连接 + this.ensureConnectedInBackground(); + return; + } + + const url = this.config?.url; + const host = this.config?.host; + const port = this.config?.port; + const password = this.config?.password; + const database = this.config?.db; + const connectTimeout = this.config?.connectTimeoutMs; + const socket = this.config?.socket; + + const reconnectStrategy = + socket?.reconnectStrategy ?? + ((retries) => Math.min(1000 + retries * 500, 10_000)); + + const clientOptions = {}; + if (url) { + clientOptions.url = url; + } + + clientOptions.socket = + socket ?? + { + host, + port, + connectTimeout, + reconnectStrategy, + }; + + if (typeof password === 'string' && password.length > 0) { + clientOptions.password = password; + } + + if (database !== undefined && database !== null) { + const n = Number(database); + if (Number.isFinite(n) && n >= 0) { + clientOptions.database = Math.floor(n); + } + } + + this.client = createClient(clientOptions); + + this.client.on('error', (err) => { + const now = Date.now(); + // 节流:最多每 10 秒打一条,避免长期无人值守刷屏 + if (now - this._lastErrorLogAt > 10_000) { + this._lastErrorLogAt = now; + console.error('[redis] client error:', err); + } + }); + + // 不要 await:避免 Redis 短暂不可用导致主服务启动失败 + this.ensureConnectedInBackground(); + } + + ensureConnectedInBackground() { + if (!this.isEnabled()) return; + if (!this.client) return; + + if (this.client.isReady) { + return; + } + + if (this._connectPromise) { + return; + } + + this._connectPromise = this.client + .connect() + .then(() => { + console.log('[redis] connected'); + }) + .catch((err) => { + // connect 失败不抛出到上层;依赖 redis 内建重连策略或下次调用再触发 + const now = Date.now(); + if (now - this._lastErrorLogAt > 10_000) { + this._lastErrorLogAt = now; + console.error('[redis] connect failed:', err); + } + }) + .finally(() => { + this._connectPromise = null; + }); + } + + async disconnect() { + this.stopHeartbeat(); + + if (!this.client) { + return; + } + + try { + await this.client.quit(); + } finally { + this.client = null; + this._connectPromise = null; + console.log('[redis] disconnected'); + } + } + + async writeHeartbeat() { + if (!this.isEnabled()) return; + if (!this.client || !this.client.isReady) return; + + const payload = { + apiBaseUrl: this.getApiBaseUrl(), + lastActiveAt: Date.now(), + }; + + const key = this.getHeartbeatKey(); + const value = JSON.stringify(payload); + + const ttl = this.getHeartbeatTtlSeconds(); + if (ttl) { + await this.client.set(key, value, { EX: ttl }); + } else { + await this.client.set(key, value); + } + } + + startHeartbeat() { + if (!this.isEnabled()) return; + if (this.heartbeatTimer) return; + + const intervalMs = this.getHeartbeatIntervalMs(); + + // 立即写一次,随后按间隔写 + this.writeHeartbeat().catch((err) => { + console.error('[redis] writeHeartbeat failed:', err); + }); + + this.heartbeatTimer = setInterval(() => { + this.writeHeartbeat().catch((err) => { + console.error('[redis] writeHeartbeat failed:', err); + }); + }, intervalMs); + + console.log(`[redis] heartbeat started: every ${intervalMs}ms`); + } + + stopHeartbeat() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + + async pushConsoleLog({ level, message, metadata }) { + if (!this.isEnabled()) return; + if (!this.client || !this.client.isReady) return; + + const entry = { + timestamp: new Date().toISOString(), + level, + message, + metadata: metadata ?? undefined, + }; + + const value = JSON.stringify(entry); + + // 建议 < 64KB;超出则丢弃避免 Redis 阻塞/异常 + if (Buffer.byteLength(value, 'utf8') > 64 * 1024) { + console.warn('[redis] console log too large; dropped'); + return; + } + + const key = this.getConsoleKey(); + await this.client.rPush(key, value); + + const maxLen = this.getConsoleMaxLen(); + if (maxLen) { + // 保留最新 maxLen 条 + await this.client.lTrim(key, -maxLen, -1); + } + } + + info(message, metadata) { + return this.pushConsoleLog({ level: 'info', message, metadata }); + } + + warn(message, metadata) { + return this.pushConsoleLog({ level: 'warn', message, metadata }); + } + + error(message, metadata) { + return this.pushConsoleLog({ level: 'error', message, metadata }); + } + + debug(message, metadata) { + return this.pushConsoleLog({ level: 'debug', message, metadata }); + } +} + +export { RedisIntegration }; diff --git a/test/smoke.test.js b/test/smoke.test.js new file mode 100644 index 0000000..5566b20 --- /dev/null +++ b/test/smoke.test.js @@ -0,0 +1,27 @@ +import assert from 'node:assert/strict'; +import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; + +describe('HeartbeatProcessor smoke', () => { + it('decodes JSON buffer into object', () => { + const processor = new HeartbeatProcessor( + { batchSize: 100, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => {} } + ); + + const payload = { ts_ms: 1700000000123, hotel_id: 1, room_id: 2, device_id: 'd', ip: '127.0.0.1', power_state: 1, guest_type: 0, cardless_state: 0, service_mask: 1, pms_state: 1, carbon_state: 0, device_count: 1, comm_seq: 1 }; + const message = { value: Buffer.from(JSON.stringify(payload), 'utf8') }; + const decoded = processor.unpackMessage(message); + assert.equal(decoded.hotel_id, 1); + }); + + it('accepts camelCase fields via normalizeHeartbeat', () => { + const processor = new HeartbeatProcessor( + { batchSize: 100, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => {} } + ); + + const payload = { tsMs: 1700000000123, hotelId: 1, roomId: 2, deviceId: 'd', ip: '127.0.0.1', powerState: 1, guestType: 0, cardlessState: 0, serviceMask: 1, pmsState: 1, carbonState: 0, deviceCount: 1, commSeq: 1 }; + assert.equal(processor.validateData(payload), true); + }); +}); + diff --git a/vite.config.js b/vite.config.js index a2f2e2c..93e24fb 100644 --- a/vite.config.js +++ b/vite.config.js @@ -10,12 +10,18 @@ export default defineConfig({ fileName: (format) => `index.${format}.js` }, rollupOptions: { - external: ['kafka-node', 'pg', 'openspec'], + external: [ + 'kafka-node', 'pg', 'redis', + // Node.js core modules + 'events', 'url', 'crypto', 'util', 'net', 'tls', 'buffer', 'path', + 'node:zlib', + // openspec is not actually used in the code, remove it + ], output: { globals: { 'kafka-node': 'KafkaNode', 'pg': 'Pg', - 'openspec': 'OpenSpec' + 'redis': 'Redis' } } }