Files
Web_BLS_Heartbeat_Server/.trae/documents/扩展心跳数据以支持电力与空调数组.md
XuJiacheng 455185ac5d feat: 扩展心跳消息支持电力与空调设备数组字段
新增 Kafka 消息中 electricity[] 和 air_conditioner[] 数组字段支持,用于存储电力与空调设备明细数据。数据库表新增对应数组列并创建 GIN 索引优化查询性能,processor 实现数组字段校验与聚合转换逻辑。

主要变更:
- Kafka 消息规范新增 electricity 和 air_conditioner 数组字段定义
- 数据库 heartbeat_events 表新增 14 个数组列并创建 4 个 GIN 索引
- processor 实现数组字段解析、校验及聚合转换逻辑
- 更新相关文档与测试用例,确保端到端功能完整
2026-01-16 14:45:36 +08:00

6.6 KiB
Raw Permalink Blame History

目标与范围

  • 扩展 Kafka 心跳消息以支持 electricity[] 与 air_conditioner[] 两类数组字段,并保持顺序一致性

  • 数据库 heartbeat.heartbeat_events 新增对应的数组列,并为指定列建立“针对数组元素”的索引

  • Processor 增强:解析/校验/转换并输出数组列DB 写入批量适配

  • 补充 OpenSpec 规格、文档与测试,完成端到端验证

OpenSpec 变更

  • 创建变更changesupdate-heartbeat-arrays-v3

  • 修改以下规格文件(新增/修改 Requirements + 场景):

    • specs/kafka/spec.md

      • ADDEDKafka 消息 MUST 支持 electricity[] 与 air_conditioner[]

      • electricity.item 结构:{ address:string, voltage:double, ampere:double, power:double, phase:string, energy:double, sum_energy:double }

      • air_conditioner.item 结构:{ address:string, state:int2, model:int2, speed:int2, set_temp:int2, now_temp:int2, solenoid_valve:int2 }

    • specs/processor/spec.md

      • ADDED数组聚合转换规则逐项按原始顺序提取出列数组顺序一致性缺失/类型校验;转换到 DB 列
    • specs/db/spec.md

      • ADDED表结构新增数组列为 elec_address(text[]), air_address(text[]), state(int2[]), model(int2[]) 建立 GIN 索引(数组元素级查询建议 @> ARRAY[...]
    • specs/redis/spec.md无需改动协议不变

  • docs 更新:

    • docs/kafka-heartbeat-producer.md补充数组字段示例与类型说明示例以数值 ampere 为准;若上游偶有字符串,需按数值可解析策略)

    • docs/db-heartbeat-schema.md补充新增列与索引说明、查询示例col @> ARRAY['addr']state @> ARRAY[1]

数据库结构调整

  • 在父分区表 heartbeat.heartbeat_events 添加新列(所有分区自动继承):

    • 电力类:

      • elec_address text[]{索引}

      • voltage double precision[]

      • ampere double precision[]

      • power double precision[]

      • phase text[]

      • energy double precision[]

      • sum_energy double precision[]

    • 空调类:

      • air_address text[]{索引}

      • state int2[] {索引}

      • model int2[] {索引}

      • speed int2[]

      • set_temp int2[]

      • now_temp int2[]

      • solenoid_valve int2[]

  • 索引(在父表上创建分区索引):

    • CREATE INDEX IF NOT EXISTS idx_he_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);

    • CREATE INDEX IF NOT EXISTS idx_he_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);

    • CREATE INDEX IF NOT EXISTS idx_he_events_state_gin ON heartbeat.heartbeat_events USING GIN (state);

    • CREATE INDEX IF NOT EXISTS idx_he_events_model_gin ON heartbeat.heartbeat_events USING GIN (model);

  • 约束与空值策略:

    • 新增数组列默认可为空NULL长度不限制类型按上述定义

    • 不引入新 CHECK 约束(避免上游演进时写入失败);后续可在枚举稳定后收紧

  • 迁移实现:

    • 在 initTables 后追加“ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...”

    • 创建分区索引父表并确认落于各分区PostgreSQL 11+ 支持分区索引)

Processor 处理逻辑调整

  • 解析:

    • 在 normalizeHeartbeat 阶段读取 obj.electricity 与 obj.air_conditioner兼容 camelCase/PascalCase
  • 转换:

    • electricity[]按原始顺序聚合为列elec_address[], voltage[], ampere[], power[], phase[], energy[], sum_energy[]

    • air_conditioner[]聚合为列air_address[], state[], model[], speed[], set_temp[], now_temp[], solenoid_valve[]

    • 值类型转换:

      • address/phase → stringtrim 且空串视为缺失)

      • 数值列:若为 string 可解析为数字则接受;不可解析则标记为缺失(跳过该项)

    • 顺序保证:按输入数组原始顺序放入各列数组;对缺失字段以“跳过该项”或“插入 null”策略统一建议插入 null 保持长度对齐)

  • 校验:

    • 若两类数组存在,则至少 address 有内容;其余数值/枚举字段允许包含 null

    • 不影响 v2 原有必填字段校验

  • 输出:

    • 在 transformData 返回对象中携带新增数组列insertHeartbeatEvents 扩展列清单并写入

Kafka 消费者调整

  • Consumer 本身不改(仍以 buffer 透传)

写库逻辑调整

  • insertHeartbeatEvents

    • 扩展 columns 列表与 values/占位符生成数组列直接作为参数传入pg 支持将 JS 数组映射到 Postgres 数组)

    • 事务语义与分区预建逻辑保持不变

校验与质量保证

  • 顺序一致性:

    • 单元测试:给定 3 项 electricity/air_conditioner 输入,断言各列数组按顺序聚合
  • 类型与缺失校验:

    • 单元测试:混合数值字符串("3.2")与非法字符串("N/A"),确保解析并对非法项填 null数组长度与 address 对齐
  • 端到端验证:

    • 伪造 Kafka message.valueUTF-8 JSON走 Processor → DB 插入(使用测试隔离库或事务回滚)
  • 文档更新:

    • 更新 APIProducer与 DB schema 文档;给出查询示例:

      • SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];

      • SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1];

风险与兼容

  • 分区索引构建时间与磁盘占用:

    • 在父表创建分区索引;按分区数指数增长,需结合实际分区策略与 autovacuum 调优

交付清单

  • OpenSpec 变更集:openspec/changes/update-heartbeat-arrays-v3/proposal、tasks、spec deltas

  • 代码修改:

    • src/processor/heartbeatProcessor.jsnormalize+transform+校验与聚合)

    • src/db/databaseManager.js新增列 + 索引 + insert 扩展)

    • src/kafka/consumer.js无需改动

  • 测试:

    • test/processor.arrays.test.js顺序与类型/缺失校验)

    • 如需test/db.insert.arrays.test.js占位符与 rowCount 校验)

  • 文档:

    • docs/kafka-heartbeat-producer.md数组示例

    • docs/db-heartbeat-schema.md新增列与索引查询示例

实施步骤

  1. 编写 OpenSpec 变更ADDED Requirements + 场景)并验证
  2. 数据库父表追加列 + 创建分区索引(初始化流程集成)
  3. Processor实现解析/校验/聚合/转换;保持与现有 v2 字段兼容
  4. DB 写入:扩展 insertHeartbeatEvents 列与占位符映射
  5. 编写并运行单元测试;必要时写端到端冒烟
  6. 更新文档;准备部署说明(分区索引创建时间提示)

请确认以上方案,确认后我将按该计划开始实施(含规格增量、代码修改、测试与文档)。