From 455185ac5d8cfc14db7e5fe22ded042a7c809705 Mon Sep 17 00:00:00 2001 From: XuJiacheng Date: Fri, 16 Jan 2026 14:45:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=89=A9=E5=B1=95=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=94=AF=E6=8C=81=E7=94=B5=E5=8A=9B=E4=B8=8E?= =?UTF-8?q?=E7=A9=BA=E8=B0=83=E8=AE=BE=E5=A4=87=E6=95=B0=E7=BB=84=E5=AD=97?= =?UTF-8?q?=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 Kafka 消息中 electricity[] 和 air_conditioner[] 数组字段支持,用于存储电力与空调设备明细数据。数据库表新增对应数组列并创建 GIN 索引优化查询性能,processor 实现数组字段校验与聚合转换逻辑。 主要变更: - Kafka 消息规范新增 electricity 和 air_conditioner 数组字段定义 - 数据库 heartbeat_events 表新增 14 个数组列并创建 4 个 GIN 索引 - processor 实现数组字段解析、校验及聚合转换逻辑 - 更新相关文档与测试用例,确保端到端功能完整 --- .../扩展心跳数据以支持电力与空调数组.md | 202 ++++++++++++++++++ docs/db-heartbeat-schema.md | 27 ++- docs/kafka-heartbeat-producer.md | 31 ++- .../update-heartbeat-arrays-v3/proposal.md | 14 ++ .../specs/db/spec.md | 11 + .../specs/kafka/spec.md | 13 ++ .../specs/processor/spec.md | 19 ++ .../update-heartbeat-arrays-v3/tasks.md | 11 + openspec/specs/db/spec.md | 15 ++ openspec/specs/kafka/spec.md | 26 +++ openspec/specs/processor/spec.md | 23 ++ scripts/db/010_heartbeat_schema.sql | 35 +++ scripts/db/smokeTest.js | 21 +- src/db/databaseManager.js | 62 ++++++ src/processor/heartbeatProcessor.js | 74 ++++++- test/arrays.test.js | 117 ++++++++++ 16 files changed, 693 insertions(+), 8 deletions(-) create mode 100644 .trae/documents/扩展心跳数据以支持电力与空调数组.md create mode 100644 openspec/changes/update-heartbeat-arrays-v3/proposal.md create mode 100644 openspec/changes/update-heartbeat-arrays-v3/specs/db/spec.md create mode 100644 openspec/changes/update-heartbeat-arrays-v3/specs/kafka/spec.md create mode 100644 openspec/changes/update-heartbeat-arrays-v3/specs/processor/spec.md create mode 100644 openspec/changes/update-heartbeat-arrays-v3/tasks.md create mode 100644 test/arrays.test.js diff --git a/.trae/documents/扩展心跳数据以支持电力与空调数组.md b/.trae/documents/扩展心跳数据以支持电力与空调数组.md new file mode 100644 index 0000000..cec941a --- /dev/null +++ b/.trae/documents/扩展心跳数据以支持电力与空调数组.md @@ -0,0 +1,202 @@ +## 目标与范围 + +* 扩展 Kafka 心跳消息以支持 electricity\[] 与 air\_conditioner\[] 两类数组字段,并保持顺序一致性 + +* 数据库 heartbeat.heartbeat\_events 新增对应的数组列,并为指定列建立“针对数组元素”的索引 + +* Processor 增强:解析/校验/转换并输出数组列;DB 写入批量适配 + +* 补充 OpenSpec 规格、文档与测试,完成端到端验证 + +## OpenSpec 变更 + +* 创建变更(changes):`update-heartbeat-arrays-v3` + +* 修改以下规格文件(新增/修改 Requirements + 场景): + + * specs/kafka/spec.md: + + * ADDED:Kafka 消息 MUST 支持 electricity\[] 与 air\_conditioner\[] + + * electricity.item 结构:{ address:string, voltage:double, ampere:double, power:double, phase:string, energy:double, sum\_energy:double } + + * air\_conditioner.item 结构:{ address:string, state:int2, model:int2, speed:int2, set\_temp:int2, now\_temp:int2, solenoid\_valve:int2 } + + * specs/processor/spec.md: + + * ADDED:数组聚合转换规则(逐项按原始顺序提取出列数组);顺序一致性;缺失/类型校验;转换到 DB 列 + + * specs/db/spec.md: + + * ADDED:表结构新增数组列;为 elec\_address(text\[]), air\_address(text\[]), state(int2\[]), model(int2\[]) 建立 GIN 索引(数组元素级查询建议 `@> ARRAY[...]`) + + * specs/redis/spec.md:无需改动(协议不变) + +* docs 更新: + + * docs/kafka-heartbeat-producer.md:补充数组字段示例与类型说明(示例以数值 ampere 为准;若上游偶有字符串,需按数值可解析策略) + + * docs/db-heartbeat-schema.md:补充新增列与索引说明、查询示例(`col @> ARRAY['addr']`、`state @> ARRAY[1]`) + +## 数据库结构调整 + +* 在父分区表 heartbeat.heartbeat\_events 添加新列(所有分区自动继承): + + * 电力类: + + * elec\_address text\[]{索引} + + * voltage double precision\[] + + * ampere double precision\[] + + * power double precision\[] + + * phase text\[] + + * energy double precision\[] + + * sum\_energy double precision\[] + + * 空调类: + + * air\_address text\[]{索引} + + * state int2\[] {索引} + + * model int2\[] {索引} + + * speed int2\[] + + * set\_temp int2\[] + + * now\_temp int2\[] + + * solenoid\_valve int2\[] + +* 索引(在父表上创建分区索引): + + * CREATE INDEX IF NOT EXISTS idx\_he\_events\_elec\_address\_gin ON heartbeat.heartbeat\_events USING GIN (elec\_address); + + * CREATE INDEX IF NOT EXISTS idx\_he\_events\_air\_address\_gin ON heartbeat.heartbeat\_events USING GIN (air\_address); + + * CREATE INDEX IF NOT EXISTS idx\_he\_events\_state\_gin ON heartbeat.heartbeat\_events USING GIN (state); + + * CREATE INDEX IF NOT EXISTS idx\_he\_events\_model\_gin ON heartbeat.heartbeat\_events USING GIN (model); + +* 约束与空值策略: + + * 新增数组列默认可为空(NULL);长度不限制;类型按上述定义 + + * 不引入新 CHECK 约束(避免上游演进时写入失败);后续可在枚举稳定后收紧 + +* 迁移实现: + + * 在 initTables 后追加“ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...” + + * 创建分区索引(父表)并确认落于各分区(PostgreSQL 11+ 支持分区索引) + +## Processor 处理逻辑调整 + +* 解析: + + * 在 normalizeHeartbeat 阶段读取 obj.electricity 与 obj.air\_conditioner(兼容 camelCase/PascalCase) + +* 转换: + + * electricity\[]:按原始顺序聚合为列:elec\_address\[], voltage\[], ampere\[], power\[], phase\[], energy\[], sum\_energy\[] + + * air\_conditioner\[]:聚合为列:air\_address\[], state\[], model\[], speed\[], set\_temp\[], now\_temp\[], solenoid\_valve\[] + + * 值类型转换: + + * address/phase → string(trim 且空串视为缺失) + + * 数值列:若为 string 可解析为数字则接受;不可解析则标记为缺失(跳过该项) + + * 顺序保证:按输入数组原始顺序放入各列数组;对缺失字段以“跳过该项”或“插入 null”策略统一(建议插入 null 保持长度对齐) + +* 校验: + + * 若两类数组存在,则至少 address 有内容;其余数值/枚举字段允许包含 null + + * 不影响 v2 原有必填字段校验 + +* 输出: + + * 在 transformData 返回对象中携带新增数组列;insertHeartbeatEvents 扩展列清单并写入 + +## Kafka 消费者调整 + +* Consumer 本身不改(仍以 buffer 透传) + +## 写库逻辑调整 + +* insertHeartbeatEvents: + + * 扩展 columns 列表与 values/占位符生成;数组列直接作为参数传入(pg 支持将 JS 数组映射到 Postgres 数组) + + * 事务语义与分区预建逻辑保持不变 + +## 校验与质量保证 + +* 顺序一致性: + + * 单元测试:给定 3 项 electricity/air\_conditioner 输入,断言各列数组按顺序聚合 + +* 类型与缺失校验: + + * 单元测试:混合数值字符串("3.2")与非法字符串("N/A"),确保解析并对非法项填 null,数组长度与 address 对齐 + +* 端到端验证: + + * 伪造 Kafka message.value(UTF-8 JSON)走 Processor → DB 插入(使用测试隔离库或事务回滚) + +* 文档更新: + + * 更新 API(Producer)与 DB schema 文档;给出查询示例: + + * `SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];` + + * `SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1];` + +## 风险与兼容 + +* 分区索引构建时间与磁盘占用: + + * 在父表创建分区索引;按分区数指数增长,需结合实际分区策略与 autovacuum 调优 + +## 交付清单 + +* OpenSpec 变更集:`openspec/changes/update-heartbeat-arrays-v3/`(proposal、tasks、spec deltas) + +* 代码修改: + + * src/processor/heartbeatProcessor.js(normalize+transform+校验与聚合) + + * src/db/databaseManager.js(新增列 + 索引 + insert 扩展) + + * src/kafka/consumer.js(无需改动) + +* 测试: + + * test/processor.arrays.test.js(顺序与类型/缺失校验) + + * 如需:test/db.insert.arrays.test.js(占位符与 rowCount 校验) + +* 文档: + + * docs/kafka-heartbeat-producer.md(数组示例) + + * docs/db-heartbeat-schema.md(新增列与索引查询示例) + +## 实施步骤 + +1. 编写 OpenSpec 变更(ADDED Requirements + 场景)并验证 +2. 数据库父表追加列 + 创建分区索引(初始化流程集成) +3. Processor:实现解析/校验/聚合/转换;保持与现有 v2 字段兼容 +4. DB 写入:扩展 insertHeartbeatEvents 列与占位符映射 +5. 编写并运行单元测试;必要时写端到端冒烟 +6. 更新文档;准备部署说明(分区索引创建时间提示) + +请确认以上方案,确认后我将按该计划开始实施(含规格增量、代码修改、测试与文档)。 diff --git a/docs/db-heartbeat-schema.md b/docs/db-heartbeat-schema.md index 51c91b4..6f2fb0a 100644 --- a/docs/db-heartbeat-schema.md +++ b/docs/db-heartbeat-schema.md @@ -22,9 +22,9 @@ | id | bigserial | 否(自动生成) | 自增序列号(写入时可不提供) | | ts_ms | bigint | 是 | 毫秒级时间戳(epoch ms) | | hotel_id | int2 | 是 | 酒店编号 | -| room_id | int4 | 是 | 房间编号(或房间唯一标识) | +| room_id | varchar(50) | 是 | 房间编号(或房间唯一标识,按字符串存储) | | device_id | varchar(64) | 是 | 设备 ID(序列号/MAC/混合编码);如明确为纯数字可改 bigint | -| ip | inet | 是 | 设备/上报方 IP(PostgreSQL inet 类型自带格式校验) | +| ip | varchar(21) | 是 | `IP:PORT`(IPv4)或纯 IP 字符串 | | power_state | int2 | 是 | 取电状态(枚举值待标准化) | | guest_type | int2 | 是 | 住客身份(住客/空房/保洁/维修等,枚举值待标准化) | | cardless_state | int2 | 是 | 无卡取电/无卡策略状态(枚举待定) | @@ -33,6 +33,20 @@ | carbon_state | int2 | 是 | 碳控状态(枚举待定) | | device_count | int2 | 是 | 设备数量/上报设备数量(语义待确认) | | comm_seq | int4 | 是 | 通讯序号(语义待确认) | +| elec_address | text[] | 否 | 电力设备地址数组(与 voltage[] 等按下标对齐) | +| voltage | double precision[] | 否 | 电压数组 | +| ampere | double precision[] | 否 | 电流数组 | +| power | double precision[] | 否 | 功率数组 | +| phase | text[] | 否 | 相位数组 | +| energy | double precision[] | 否 | 能耗数组 | +| sum_energy | double precision[] | 否 | 总能耗数组 | +| air_address | text[] | 否 | 空调设备地址数组(与 state[] 等按下标对齐) | +| state | int2[] | 否 | 开关状态数组 | +| model | int2[] | 否 | 运行模式数组 | +| speed | int2[] | 否 | 风速设置数组 | +| set_temp | int2[] | 否 | 设定温度数组 | +| now_temp | int2[] | 否 | 当前温度数组 | +| solenoid_valve | int2[] | 否 | 电磁阀门状态数组 | | extra | jsonb | 否 | 可扩展字段:电参/空调状态/版本/来源等 | ### 2.2 约束 @@ -67,6 +81,15 @@ - B-tree:`hotel_id`, `power_state`, `guest_type`, `device_id` - BRIN:`service_mask` +新增(数组元素查询): +- GIN:`elec_address`, `air_address`, `state`, `model` + +常用查询示例: +- `SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];` +- `SELECT * FROM heartbeat.heartbeat_events WHERE air_address @> ARRAY['ac1'];` +- `SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1::int2];` +- `SELECT * FROM heartbeat.heartbeat_events WHERE model @> ARRAY[2::int2];` + 额外建议(脚本默认包含,可按需移除): - `btree (hotel_id, ts_ms)`:覆盖最常见过滤(酒店 + 时间范围),显著提升检索与分区内扫描效率。 diff --git a/docs/kafka-heartbeat-producer.md b/docs/kafka-heartbeat-producer.md index 2956be6..87e2ab3 100644 --- a/docs/kafka-heartbeat-producer.md +++ b/docs/kafka-heartbeat-producer.md @@ -21,7 +21,7 @@ |---|---|---|---| | ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 | | hotel_id | number/int | 12 | 酒店编号(int2 范围内) | -| room_id | number/int | 1203 | 房间编号/房间标识(int4) | +| room_id | string/number | "1203" | 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50)) | | device_id | string | "A1B2C3D4" | 设备唯一 ID(序列号/MAC/自定义编码) | | ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21)) | | power_state | number/int | 1 | 取电状态(枚举值需统一标准) | @@ -37,6 +37,8 @@ | 字段 | 类型 | 示例 | 说明 | |---|---|---|---| | extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 | +| electricity | array | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) | +| air_conditioner | array | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) | ## 4. JSON 示例 ```json @@ -54,6 +56,28 @@ "carbon_state": 0, "device_count": 1, "comm_seq": 7, + "electricity": [ + { + "address": "add11", + "voltage": 3.2, + "ampere": 1.1, + "power": 704.3, + "phase": "A", + "energy": 10.5, + "sum_energy": 100.5 + } + ], + "air_conditioner": [ + { + "address": "ac1", + "state": 1, + "model": 2, + "speed": 3, + "set_temp": 26, + "now_temp": 25, + "solenoid_valve": 1 + } + ], "extra": { "source": "gw", "ver": "1.2.3", @@ -78,4 +102,7 @@ ## 6. 与数据库字段的映射 服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform) - 必填字段:与表字段同名 -- 弹性字段:写入 `extra`(jsonb) +- 扩展数组字段: + - electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] + - air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] +- 弹性字段:其余未知字段写入 `extra`(jsonb) diff --git a/openspec/changes/update-heartbeat-arrays-v3/proposal.md b/openspec/changes/update-heartbeat-arrays-v3/proposal.md new file mode 100644 index 0000000..0706ca5 --- /dev/null +++ b/openspec/changes/update-heartbeat-arrays-v3/proposal.md @@ -0,0 +1,14 @@ +# Change: 扩展心跳消息与分区表以支持电力/空调数组(v3) + +## Why +当前心跳数据仅落库基础字段与 extra,无法按需查询与聚合“电力设备/空调设备”的细粒度状态;需要将数组结构拆列为数据库数组列,并对指定列提供数组元素索引。 + +## What Changes +- **Kafka**:新增可选字段 `electricity[]`、`air_conditioner[]` 以及元素字段定义 +- **Processor**:新增数组字段类型校验与“按原始顺序聚合为列数组”的转换规则 +- **DB**:在 `heartbeat.heartbeat_events` 新增数组列,并为 `elec_address/air_address/state/model` 建立 GIN 索引(数组元素查询) +- **Docs/Tests**:更新 producer 与 schema 文档;补充单元测试与冒烟验证 + +## Impact +- Affected specs: `openspec/specs/kafka`, `openspec/specs/processor`, `openspec/specs/db` +- Affected code: `src/processor/heartbeatProcessor.js`, `src/db/databaseManager.js`, `scripts/db/*.sql`, `docs/*`, `test/*` diff --git a/openspec/changes/update-heartbeat-arrays-v3/specs/db/spec.md b/openspec/changes/update-heartbeat-arrays-v3/specs/db/spec.md new file mode 100644 index 0000000..12d48e3 --- /dev/null +++ b/openspec/changes/update-heartbeat-arrays-v3/specs/db/spec.md @@ -0,0 +1,11 @@ +## ADDED Requirements +### Requirement: 分区表新增数组列与数组元素索引 +系统 SHALL 在 `heartbeat.heartbeat_events` 中新增用于存储电力与空调子设备的数组列,并为指定数组列提供数组元素级查询索引。 + +#### Scenario: 新增数组列 +- **WHEN** 部署或升级数据库结构时 +- **THEN** 表应包含 elec_address、air_address、voltage、ampere、power、phase、energy、sum_energy、state、model、speed、set_temp、now_temp、solenoid_valve + +#### Scenario: 数组元素索引 +- **WHEN** 需要按 elec_address/air_address/state/model 的数组元素进行查询 +- **THEN** 数据库应具备 GIN 索引以优化包含类查询 diff --git a/openspec/changes/update-heartbeat-arrays-v3/specs/kafka/spec.md b/openspec/changes/update-heartbeat-arrays-v3/specs/kafka/spec.md new file mode 100644 index 0000000..0c9d21b --- /dev/null +++ b/openspec/changes/update-heartbeat-arrays-v3/specs/kafka/spec.md @@ -0,0 +1,13 @@ +## ADDED Requirements +### Requirement: 心跳消息扩展数组字段 +Kafka 心跳消息 SHALL 支持携带电力与空调子设备的数组字段,以便消费端拆列落库与查询优化。 + +#### Scenario: 携带 electricity 数组 +- **WHEN** 生产者需要上报电力设备明细时 +- **THEN** 消息可选包含 `electricity`(array of object) +- **AND** 每个元素包含 address、voltage、ampere、power、phase、energy、sum_energy + +#### Scenario: 携带 air_conditioner 数组 +- **WHEN** 生产者需要上报空调设备明细时 +- **THEN** 消息可选包含 `air_conditioner`(array of object) +- **AND** 每个元素包含 address、state、model、speed、set_temp、now_temp、solenoid_valve diff --git a/openspec/changes/update-heartbeat-arrays-v3/specs/processor/spec.md b/openspec/changes/update-heartbeat-arrays-v3/specs/processor/spec.md new file mode 100644 index 0000000..ea19d3f --- /dev/null +++ b/openspec/changes/update-heartbeat-arrays-v3/specs/processor/spec.md @@ -0,0 +1,19 @@ +## ADDED Requirements +### Requirement: 数组字段聚合为列数组 +系统 SHALL 将 `electricity[]` 与 `air_conditioner[]` 按原始顺序聚合为数据库写入结构的列数组。 + +#### Scenario: electricity 聚合 +- **WHEN** 输入包含 `electricity` 数组 +- **THEN** 输出应包含 elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] +- **AND** 各数组下标与输入数组下标一一对应 + +#### Scenario: air_conditioner 聚合 +- **WHEN** 输入包含 `air_conditioner` 数组 +- **THEN** 输出应包含 air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] +- **AND** 各数组下标与输入数组下标一一对应 + +#### Scenario: 类型与缺失处理 +- **WHEN** electricity 或 air_conditioner 存在但不是数组 +- **THEN** 系统应丢弃该消息并记录错误 +- **WHEN** 数组元素字段缺失或无法转换 +- **THEN** 系统应保持长度对齐并写入 null diff --git a/openspec/changes/update-heartbeat-arrays-v3/tasks.md b/openspec/changes/update-heartbeat-arrays-v3/tasks.md new file mode 100644 index 0000000..e78f685 --- /dev/null +++ b/openspec/changes/update-heartbeat-arrays-v3/tasks.md @@ -0,0 +1,11 @@ +## 1. Implementation +- [ ] 1.1 更新 OpenSpec 增量规范(Kafka/Processor/DB) +- [ ] 1.2 数据库脚本与初始化:新增数组列与索引 +- [ ] 1.3 Processor:数组字段校验与聚合转换 +- [ ] 1.4 DB 写入:扩展批量 insert 列与参数 +- [ ] 1.5 测试:新增单元测试覆盖顺序与类型/缺失处理 +- [ ] 1.6 文档:更新 producer 与 DB schema + +## 2. Validation +- [ ] 2.1 运行单元测试与 lint +- [ ] 2.2 在可用 PostgreSQL 环境执行冒烟(含数组列插入) diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index b8f4032..b5c7e9a 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -58,6 +58,21 @@ - **THEN** 系统应能够自动创建对应日分区或确保分区被预创建 - **AND** 不应影响持续写入(高吞吐场景) +### Requirement: 数组字段存储与索引 +系统 MUST 支持将电力与空调子设备数据以数组列形式存储,并为指定数组列建立针对元素查询的索引。 + +#### Scenario: 新增数组列用于存储电力与空调子设备数据 +- **WHEN** 系统初始化 v2 心跳明细表结构时 +- **THEN** 表结构应包含以下新增列: + - elec_address(text[]) + - air_address(text[]) + - voltage(double precision[])、ampere(double precision[])、power(double precision[])、phase(text[])、energy(double precision[])、sum_energy(double precision[]) + - state(int2[])、model(int2[])、speed(int2[])、set_temp(int2[])、now_temp(int2[])、solenoid_valve(int2[]) + +#### Scenario: 针对数组元素的索引 +- **WHEN** 对 elec_address、air_address、state、model 执行“元素包含”类查询时 +- **THEN** 系统应提供 GIN 索引以优化查询 + ### Requirement: 数据查询支持 系统 MUST 支持基本的数据查询操作,用于监控和调试。 diff --git a/openspec/specs/kafka/spec.md b/openspec/specs/kafka/spec.md index b77d596..1284c11 100644 --- a/openspec/specs/kafka/spec.md +++ b/openspec/specs/kafka/spec.md @@ -53,6 +53,32 @@ Kafka 心跳消息 MUST 包含数据库落库所需的必填字段,并采用 U - **THEN** 消息 value 应为 JSON(UTF-8) - **AND** 至少包含 ts_ms、hotel_id、room_id、device_id、ip、power_state、guest_type、cardless_state、service_mask、pms_state、carbon_state、device_count、comm_seq - **AND** 可选包含 extra(json object) +- **AND** 可选包含 electricity(array of object) +- **AND** 可选包含 air_conditioner(array of object) + +#### Scenario: electricity 数组字段结构 +- **WHEN** 生产者在心跳消息中携带 electricity 字段时 +- **THEN** electricity MUST 为数组 +- **AND** 数组每个元素 MUST 为对象,包含: + - address(string,设备地址) + - voltage(double,电压) + - ampere(double,电流) + - power(double,功率) + - phase(string,相位) + - energy(double,能耗) + - sum_energy(double,总能耗) + +#### Scenario: air_conditioner 数组字段结构 +- **WHEN** 生产者在心跳消息中携带 air_conditioner 字段时 +- **THEN** air_conditioner MUST 为数组 +- **AND** 数组每个元素 MUST 为对象,包含: + - address(string,设备地址) + - state(int2,开关状态) + - model(int2,运行模式) + - speed(int2,风速设置) + - set_temp(int2,设定温度) + - now_temp(int2,当前温度) + - solenoid_valve(int2,电磁阀门状态) ### Requirement: 分区键友好的 Kafka Key 系统 MUST 支持使用 `hotel_id:device_id` 作为 Kafka message key 以获得更好的分区与有序性。 diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index 5802f90..0583c57 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -55,6 +55,29 @@ - **WHEN** 心跳数据缺失必填字段时 - **THEN** 系统应判定为无效数据并丢弃 +### Requirement: 数组字段聚合转换 +系统 MUST 支持将 electricity[] 与 air_conditioner[] 的对象数组聚合为数据库的“列数组”,并保持原始顺序一致性。 + +#### Scenario: electricity 数组聚合为列数组 +- **WHEN** 心跳数据包含 electricity(对象数组)时 +- **THEN** 系统应按输入数组原始顺序提取并聚合为: + - elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] +- **AND** 同一条心跳记录内,上述各数组下标必须与输入 electricity 的下标一一对应 + +#### Scenario: air_conditioner 数组聚合为列数组 +- **WHEN** 心跳数据包含 air_conditioner(对象数组)时 +- **THEN** 系统应按输入数组原始顺序提取并聚合为: + - air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] +- **AND** 同一条心跳记录内,上述各数组下标必须与输入 air_conditioner 的下标一一对应 + +#### Scenario: 数组字段缺失与类型校验 +- **WHEN** electricity 或 air_conditioner 字段存在但类型不为数组时 +- **THEN** 系统应判定该条消息为无效并丢弃 +- **AND** 记录错误日志 +- **WHEN** 数组元素字段缺失或类型不匹配但消息其余必填字段有效时 +- **THEN** 系统应保持数组长度对齐,并对无法转换的单元写入 null +- **AND** 不应破坏其余字段的写入 + ### Requirement: 批量处理支持 系统 MUST 支持批量处理心跳数据,提高处理效率。 diff --git a/scripts/db/010_heartbeat_schema.sql b/scripts/db/010_heartbeat_schema.sql index 0a5a5c1..96aaa92 100644 --- a/scripts/db/010_heartbeat_schema.sql +++ b/scripts/db/010_heartbeat_schema.sql @@ -25,6 +25,21 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( device_count int2 NOT NULL, comm_seq int4 NOT NULL, + elec_address text[], + air_address text[], + voltage double precision[], + ampere double precision[], + power double precision[], + phase text[], + energy double precision[], + sum_energy double precision[], + state int2[], + model int2[], + speed int2[], + set_temp int2[], + now_temp int2[], + solenoid_valve int2[], + -- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra) extra jsonb, @@ -44,6 +59,21 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events ( ) PARTITION BY RANGE (ts_ms); +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[]; +ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[]; + -- 指定索引 CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); @@ -58,4 +88,9 @@ CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.h -- 若不希望额外索引,可注释掉 CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state); +CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model); + COMMIT; diff --git a/scripts/db/smokeTest.js b/scripts/db/smokeTest.js index 2d785d3..f6bf3ae 100644 --- a/scripts/db/smokeTest.js +++ b/scripts/db/smokeTest.js @@ -43,8 +43,11 @@ async function main() { `INSERT INTO heartbeat.heartbeat_events ( ts_ms, hotel_id, room_id, device_id, ip, power_state, guest_type, cardless_state, service_mask, - pms_state, carbon_state, device_count, comm_seq, extra - ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)`, + pms_state, carbon_state, device_count, comm_seq, + elec_address, voltage, ampere, power, phase, energy, sum_energy, + air_address, state, model, speed, set_temp, now_temp, solenoid_valve, + extra + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28)`, [ ts, 1, @@ -59,6 +62,20 @@ async function main() { 0, 1, 1, + ['add11', 'add12'], + [3.2, 3.3], + [1.1, 2.2], + [704.3, 705.3], + ['A', 'B'], + [10.5, 11.5], + [100.5, 101.5], + ['ac1'], + [1], + [2], + [3], + [26], + [25], + [1], { source: 'smoke-test' }, ] ); diff --git a/src/db/databaseManager.js b/src/db/databaseManager.js index 2168aa0..29219e8 100644 --- a/src/db/databaseManager.js +++ b/src/db/databaseManager.js @@ -95,6 +95,21 @@ class DatabaseManager { device_count int2 NOT NULL, comm_seq int4 NOT NULL, + elec_address text[], + air_address text[], + voltage double precision[], + ampere double precision[], + power double precision[], + phase text[], + energy double precision[], + sum_energy double precision[], + state int2[], + model int2[], + speed int2[], + set_temp int2[], + now_temp int2[], + solenoid_valve int2[], + extra jsonb, CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id), @@ -112,12 +127,31 @@ class DatabaseManager { ) PARTITION BY RANGE (ts_ms); + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[]; + ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[]; + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask); CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state); + CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model); -- 分区预创建函数(按 Asia/Shanghai 自然日) CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date) @@ -366,6 +400,20 @@ class DatabaseManager { 'carbon_state', 'device_count', 'comm_seq', + 'elec_address', + 'air_address', + 'voltage', + 'ampere', + 'power', + 'phase', + 'energy', + 'sum_energy', + 'state', + 'model', + 'speed', + 'set_temp', + 'now_temp', + 'solenoid_valve', 'extra', ]; @@ -387,6 +435,20 @@ class DatabaseManager { e.carbon_state, e.device_count, e.comm_seq, + Array.isArray(e.elec_address) ? e.elec_address : null, + Array.isArray(e.air_address) ? e.air_address : null, + Array.isArray(e.voltage) ? e.voltage : null, + Array.isArray(e.ampere) ? e.ampere : null, + Array.isArray(e.power) ? e.power : null, + Array.isArray(e.phase) ? e.phase : null, + Array.isArray(e.energy) ? e.energy : null, + Array.isArray(e.sum_energy) ? e.sum_energy : null, + Array.isArray(e.state) ? e.state : null, + Array.isArray(e.model) ? e.model : null, + Array.isArray(e.speed) ? e.speed : null, + Array.isArray(e.set_temp) ? e.set_temp : null, + Array.isArray(e.now_temp) ? e.now_temp : null, + Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null, e.extra ?? null ); const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', '); diff --git a/src/processor/heartbeatProcessor.js b/src/processor/heartbeatProcessor.js index c68a35e..718d3ed 100644 --- a/src/processor/heartbeatProcessor.js +++ b/src/processor/heartbeatProcessor.js @@ -124,11 +124,31 @@ class HeartbeatProcessor { } if (typeof normalized.ip !== 'string' || normalized.ip.length === 0) return false; + if (normalized.electricity !== undefined && normalized.electricity !== null) { + if (!Array.isArray(normalized.electricity)) return false; + for (const item of normalized.electricity) { + if (!item || typeof item !== 'object' || Array.isArray(item)) { + return false; + } + } + } + + if (normalized.air_conditioner !== undefined && normalized.air_conditioner !== null) { + if (!Array.isArray(normalized.air_conditioner)) return false; + for (const item of normalized.air_conditioner) { + if (!item || typeof item !== 'object' || Array.isArray(item)) { + return false; + } + } + } + return true; } transformData(data) { - return this.normalizeHeartbeat(data); + const n = this.normalizeHeartbeat(data); + const ext = this.aggregateArrays(n); + return { ...n, ...ext }; } async processBatch() { @@ -377,6 +397,8 @@ class HeartbeatProcessor { device_count: pick(['device_count', 'deviceCount', 'DeviceCount']), comm_seq: pick(['comm_seq', 'commSeq', 'CommSeq']), extra: pick(['extra', 'Extra']), + electricity: pick(['electricity', 'Electricity']), + air_conditioner: pick(['air_conditioner', 'airConditioner', 'AirConditioner']), }; const toTrimmedStringOrUndefined = (v) => { @@ -447,7 +469,9 @@ class HeartbeatProcessor { 'carbon_state','carbonState','CarbonState', 'device_count','deviceCount','DeviceCount', 'comm_seq','commSeq','CommSeq', - 'extra','Extra' + 'extra','Extra', + 'electricity','Electricity', + 'air_conditioner','airConditioner','AirConditioner' ].includes(k) ) { continue; @@ -457,6 +481,52 @@ class HeartbeatProcessor { return normalized; } + + aggregateArrays(n) { + const toStrOrNull = (v) => { + if (v === undefined || v === null) return null; + const s = String(v).trim(); + return s.length === 0 ? null : s; + }; + const toNumOrNull = (v) => { + if (v === undefined || v === null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? v : null; + const s = String(v).trim(); + if (s.length === 0) return null; + const num = Number(s); + return Number.isFinite(num) ? num : null; + }; + const toIntOrNull = (v) => { + if (v === undefined || v === null) return null; + if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null; + const s = String(v).trim(); + if (!/^-?\d+$/.test(s)) return null; + const num = Number(s); + return Number.isFinite(num) ? Math.trunc(num) : null; + }; + const out = {}; + const elec = Array.isArray(n.electricity) ? n.electricity : null; + if (elec && elec.length) { + out.elec_address = elec.map((x) => toStrOrNull(x?.address)); + out.voltage = elec.map((x) => toNumOrNull(x?.voltage)); + out.ampere = elec.map((x) => toNumOrNull(x?.ampere)); + out.power = elec.map((x) => toNumOrNull(x?.power)); + out.phase = elec.map((x) => toStrOrNull(x?.phase)); + out.energy = elec.map((x) => toNumOrNull(x?.energy)); + out.sum_energy = elec.map((x) => toNumOrNull(x?.sum_energy)); + } + const ac = Array.isArray(n.air_conditioner) ? n.air_conditioner : null; + if (ac && ac.length) { + out.air_address = ac.map((x) => toStrOrNull(x?.address)); + out.state = ac.map((x) => toIntOrNull(x?.state)); + out.model = ac.map((x) => toIntOrNull(x?.model)); + out.speed = ac.map((x) => toIntOrNull(x?.speed)); + out.set_temp = ac.map((x) => toIntOrNull(x?.set_temp)); + out.now_temp = ac.map((x) => toIntOrNull(x?.now_temp)); + out.solenoid_valve = ac.map((x) => toIntOrNull(x?.solenoid_valve)); + } + return out; + } } export { HeartbeatProcessor }; diff --git a/test/arrays.test.js b/test/arrays.test.js new file mode 100644 index 0000000..b4d062c --- /dev/null +++ b/test/arrays.test.js @@ -0,0 +1,117 @@ +import assert from 'node:assert/strict'; +import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js'; + +function buildBasePayload() { + return { + ts_ms: 1700000000123, + hotel_id: 1, + room_id: '101', + device_id: 'dev-1', + ip: '127.0.0.1:1234', + power_state: 1, + guest_type: 0, + cardless_state: 0, + service_mask: 5, + pms_state: 1, + carbon_state: 0, + device_count: 1, + comm_seq: 7, + }; +} + +describe('HeartbeatProcessor arrays', () => { + it('aggregates electricity[] into aligned column arrays (keeps order)', () => { + const processor = new HeartbeatProcessor( + { batchSize: 100, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => ({ insertedCount: 0 }) } + ); + + const payload = { + ...buildBasePayload(), + electricity: [ + { address: 'add11', ampere: '1.1', power: 704.3, voltage: 3.2, phase: 'A', energy: '10', sum_energy: 100 }, + { address: 'add12', ampere: 2.2, power: '705.3', voltage: '3.3', phase: 'B', energy: 11, sum_energy: '101' }, + { address: 'add13', ampere: 'N/A', power: 706.3, voltage: 3.4, phase: '', energy: null, sum_energy: undefined }, + ], + }; + + assert.equal(processor.validateData(payload), true); + + const transformed = processor.transformData(payload); + assert.deepEqual(transformed.elec_address, ['add11', 'add12', 'add13']); + assert.deepEqual(transformed.ampere, [1.1, 2.2, null]); + assert.deepEqual(transformed.power, [704.3, 705.3, 706.3]); + assert.deepEqual(transformed.voltage, [3.2, 3.3, 3.4]); + assert.deepEqual(transformed.phase, ['A', 'B', null]); + assert.deepEqual(transformed.energy, [10, 11, null]); + assert.deepEqual(transformed.sum_energy, [100, 101, null]); + }); + + it('aggregates air_conditioner[] into aligned column arrays (keeps order)', () => { + const processor = new HeartbeatProcessor( + { batchSize: 100, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => ({ insertedCount: 0 }) } + ); + + const payload = { + ...buildBasePayload(), + air_conditioner: [ + { address: 'ac1', state: 1, model: 2, speed: 3, set_temp: 26, now_temp: 25, solenoid_valve: 1 }, + { address: 'ac2', state: '0', model: '1', speed: '2', set_temp: '24', now_temp: '23', solenoid_valve: '0' }, + { address: 'ac3', state: 'bad', model: null, speed: undefined, set_temp: '', now_temp: 22, solenoid_valve: 1.2 }, + ], + }; + + assert.equal(processor.validateData(payload), true); + + const transformed = processor.transformData(payload); + assert.deepEqual(transformed.air_address, ['ac1', 'ac2', 'ac3']); + assert.deepEqual(transformed.state, [1, 0, null]); + assert.deepEqual(transformed.model, [2, 1, null]); + assert.deepEqual(transformed.speed, [3, 2, null]); + assert.deepEqual(transformed.set_temp, [26, 24, null]); + assert.deepEqual(transformed.now_temp, [25, 23, 22]); + assert.deepEqual(transformed.solenoid_valve, [1, 0, 1]); + }); + + it('rejects non-array electricity/air_conditioner', () => { + const processor = new HeartbeatProcessor( + { batchSize: 100, batchTimeout: 1000 }, + { insertHeartbeatEvents: async () => ({ insertedCount: 0 }) } + ); + + const bad1 = { ...buildBasePayload(), electricity: { address: 'x' } }; + const bad2 = { ...buildBasePayload(), air_conditioner: { address: 'y' } }; + + assert.equal(processor.validateData(bad1), false); + assert.equal(processor.validateData(bad2), false); + }); + + it('end-to-end: message buffer -> processMessage -> insertHeartbeatEvents payload includes arrays', async () => { + let captured = null; + const db = { + insertHeartbeatEvents: async (events) => { + captured = events; + return { insertedCount: events.length }; + }, + }; + + const processor = new HeartbeatProcessor({ batchSize: 1, batchTimeout: 1000 }, db); + + const payload = { + ...buildBasePayload(), + electricity: [{ address: 'add11', voltage: 3.2, ampere: 1.1, power: 704.3, phase: 'A', energy: 10, sum_energy: 100 }], + air_conditioner: [{ address: 'ac1', state: 1, model: 2, speed: 3, set_temp: 26, now_temp: 25, solenoid_valve: 1 }], + }; + + const message = { value: Buffer.from(JSON.stringify(payload), 'utf8') }; + const res = await processor.processMessage(message); + assert.deepEqual(res, { insertedCount: 1 }); + assert.ok(Array.isArray(captured)); + assert.equal(captured.length, 1); + assert.deepEqual(captured[0].elec_address, ['add11']); + assert.deepEqual(captured[0].air_address, ['ac1']); + assert.deepEqual(captured[0].state, [1]); + assert.deepEqual(captured[0].model, [2]); + }); +});