203 lines
6.6 KiB
Markdown
203 lines
6.6 KiB
Markdown
|
|
## 目标与范围
|
|||
|
|
|
|||
|
|
* 扩展 Kafka 心跳消息以支持 electricity\[] 与 air\_conditioner\[] 两类数组字段,并保持顺序一致性
|
|||
|
|
|
|||
|
|
* 数据库 heartbeat.heartbeat\_events 新增对应的数组列,并为指定列建立“针对数组元素”的索引
|
|||
|
|
|
|||
|
|
* Processor 增强:解析/校验/转换并输出数组列;DB 写入批量适配
|
|||
|
|
|
|||
|
|
* 补充 OpenSpec 规格、文档与测试,完成端到端验证
|
|||
|
|
|
|||
|
|
## OpenSpec 变更
|
|||
|
|
|
|||
|
|
* 创建变更(changes):`update-heartbeat-arrays-v3`
|
|||
|
|
|
|||
|
|
* 修改以下规格文件(新增/修改 Requirements + 场景):
|
|||
|
|
|
|||
|
|
* specs/kafka/spec.md:
|
|||
|
|
|
|||
|
|
* ADDED:Kafka 消息 MUST 支持 electricity\[] 与 air\_conditioner\[]
|
|||
|
|
|
|||
|
|
* electricity.item 结构:{ address:string, voltage:double, ampere:double, power:double, phase:string, energy:double, sum\_energy:double }
|
|||
|
|
|
|||
|
|
* air\_conditioner.item 结构:{ address:string, state:int2, model:int2, speed:int2, set\_temp:int2, now\_temp:int2, solenoid\_valve:int2 }
|
|||
|
|
|
|||
|
|
* specs/processor/spec.md:
|
|||
|
|
|
|||
|
|
* ADDED:数组聚合转换规则(逐项按原始顺序提取出列数组);顺序一致性;缺失/类型校验;转换到 DB 列
|
|||
|
|
|
|||
|
|
* specs/db/spec.md:
|
|||
|
|
|
|||
|
|
* ADDED:表结构新增数组列;为 elec\_address(text\[]), air\_address(text\[]), state(int2\[]), model(int2\[]) 建立 GIN 索引(数组元素级查询建议 `@> ARRAY[...]`)
|
|||
|
|
|
|||
|
|
* specs/redis/spec.md:无需改动(协议不变)
|
|||
|
|
|
|||
|
|
* docs 更新:
|
|||
|
|
|
|||
|
|
* docs/kafka-heartbeat-producer.md:补充数组字段示例与类型说明(示例以数值 ampere 为准;若上游偶有字符串,需按数值可解析策略)
|
|||
|
|
|
|||
|
|
* docs/db-heartbeat-schema.md:补充新增列与索引说明、查询示例(`col @> ARRAY['addr']`、`state @> ARRAY[1]`)
|
|||
|
|
|
|||
|
|
## 数据库结构调整
|
|||
|
|
|
|||
|
|
* 在父分区表 heartbeat.heartbeat\_events 添加新列(所有分区自动继承):
|
|||
|
|
|
|||
|
|
* 电力类:
|
|||
|
|
|
|||
|
|
* elec\_address text\[]{索引}
|
|||
|
|
|
|||
|
|
* voltage double precision\[]
|
|||
|
|
|
|||
|
|
* ampere double precision\[]
|
|||
|
|
|
|||
|
|
* power double precision\[]
|
|||
|
|
|
|||
|
|
* phase text\[]
|
|||
|
|
|
|||
|
|
* energy double precision\[]
|
|||
|
|
|
|||
|
|
* sum\_energy double precision\[]
|
|||
|
|
|
|||
|
|
* 空调类:
|
|||
|
|
|
|||
|
|
* air\_address text\[]{索引}
|
|||
|
|
|
|||
|
|
* state int2\[] {索引}
|
|||
|
|
|
|||
|
|
* model int2\[] {索引}
|
|||
|
|
|
|||
|
|
* speed int2\[]
|
|||
|
|
|
|||
|
|
* set\_temp int2\[]
|
|||
|
|
|
|||
|
|
* now\_temp int2\[]
|
|||
|
|
|
|||
|
|
* solenoid\_valve int2\[]
|
|||
|
|
|
|||
|
|
* 索引(在父表上创建分区索引):
|
|||
|
|
|
|||
|
|
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_elec\_address\_gin ON heartbeat.heartbeat\_events USING GIN (elec\_address);
|
|||
|
|
|
|||
|
|
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_air\_address\_gin ON heartbeat.heartbeat\_events USING GIN (air\_address);
|
|||
|
|
|
|||
|
|
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_state\_gin ON heartbeat.heartbeat\_events USING GIN (state);
|
|||
|
|
|
|||
|
|
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_model\_gin ON heartbeat.heartbeat\_events USING GIN (model);
|
|||
|
|
|
|||
|
|
* 约束与空值策略:
|
|||
|
|
|
|||
|
|
* 新增数组列默认可为空(NULL);长度不限制;类型按上述定义
|
|||
|
|
|
|||
|
|
* 不引入新 CHECK 约束(避免上游演进时写入失败);后续可在枚举稳定后收紧
|
|||
|
|
|
|||
|
|
* 迁移实现:
|
|||
|
|
|
|||
|
|
* 在 initTables 后追加“ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...”
|
|||
|
|
|
|||
|
|
* 创建分区索引(父表)并确认落于各分区(PostgreSQL 11+ 支持分区索引)
|
|||
|
|
|
|||
|
|
## Processor 处理逻辑调整
|
|||
|
|
|
|||
|
|
* 解析:
|
|||
|
|
|
|||
|
|
* 在 normalizeHeartbeat 阶段读取 obj.electricity 与 obj.air\_conditioner(兼容 camelCase/PascalCase)
|
|||
|
|
|
|||
|
|
* 转换:
|
|||
|
|
|
|||
|
|
* electricity\[]:按原始顺序聚合为列:elec\_address\[], voltage\[], ampere\[], power\[], phase\[], energy\[], sum\_energy\[]
|
|||
|
|
|
|||
|
|
* air\_conditioner\[]:聚合为列:air\_address\[], state\[], model\[], speed\[], set\_temp\[], now\_temp\[], solenoid\_valve\[]
|
|||
|
|
|
|||
|
|
* 值类型转换:
|
|||
|
|
|
|||
|
|
* address/phase → string(trim 且空串视为缺失)
|
|||
|
|
|
|||
|
|
* 数值列:若为 string 可解析为数字则接受;不可解析则标记为缺失(跳过该项)
|
|||
|
|
|
|||
|
|
* 顺序保证:按输入数组原始顺序放入各列数组;对缺失字段以“跳过该项”或“插入 null”策略统一(建议插入 null 保持长度对齐)
|
|||
|
|
|
|||
|
|
* 校验:
|
|||
|
|
|
|||
|
|
* 若两类数组存在,则至少 address 有内容;其余数值/枚举字段允许包含 null
|
|||
|
|
|
|||
|
|
* 不影响 v2 原有必填字段校验
|
|||
|
|
|
|||
|
|
* 输出:
|
|||
|
|
|
|||
|
|
* 在 transformData 返回对象中携带新增数组列;insertHeartbeatEvents 扩展列清单并写入
|
|||
|
|
|
|||
|
|
## Kafka 消费者调整
|
|||
|
|
|
|||
|
|
* Consumer 本身不改(仍以 buffer 透传)
|
|||
|
|
|
|||
|
|
## 写库逻辑调整
|
|||
|
|
|
|||
|
|
* insertHeartbeatEvents:
|
|||
|
|
|
|||
|
|
* 扩展 columns 列表与 values/占位符生成;数组列直接作为参数传入(pg 支持将 JS 数组映射到 Postgres 数组)
|
|||
|
|
|
|||
|
|
* 事务语义与分区预建逻辑保持不变
|
|||
|
|
|
|||
|
|
## 校验与质量保证
|
|||
|
|
|
|||
|
|
* 顺序一致性:
|
|||
|
|
|
|||
|
|
* 单元测试:给定 3 项 electricity/air\_conditioner 输入,断言各列数组按顺序聚合
|
|||
|
|
|
|||
|
|
* 类型与缺失校验:
|
|||
|
|
|
|||
|
|
* 单元测试:混合数值字符串("3.2")与非法字符串("N/A"),确保解析并对非法项填 null,数组长度与 address 对齐
|
|||
|
|
|
|||
|
|
* 端到端验证:
|
|||
|
|
|
|||
|
|
* 伪造 Kafka message.value(UTF-8 JSON)走 Processor → DB 插入(使用测试隔离库或事务回滚)
|
|||
|
|
|
|||
|
|
* 文档更新:
|
|||
|
|
|
|||
|
|
* 更新 API(Producer)与 DB schema 文档;给出查询示例:
|
|||
|
|
|
|||
|
|
* `SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];`
|
|||
|
|
|
|||
|
|
* `SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1];`
|
|||
|
|
|
|||
|
|
## 风险与兼容
|
|||
|
|
|
|||
|
|
* 分区索引构建时间与磁盘占用:
|
|||
|
|
|
|||
|
|
* 在父表创建分区索引;按分区数指数增长,需结合实际分区策略与 autovacuum 调优
|
|||
|
|
|
|||
|
|
## 交付清单
|
|||
|
|
|
|||
|
|
* OpenSpec 变更集:`openspec/changes/update-heartbeat-arrays-v3/`(proposal、tasks、spec deltas)
|
|||
|
|
|
|||
|
|
* 代码修改:
|
|||
|
|
|
|||
|
|
* src/processor/heartbeatProcessor.js(normalize+transform+校验与聚合)
|
|||
|
|
|
|||
|
|
* src/db/databaseManager.js(新增列 + 索引 + insert 扩展)
|
|||
|
|
|
|||
|
|
* src/kafka/consumer.js(无需改动)
|
|||
|
|
|
|||
|
|
* 测试:
|
|||
|
|
|
|||
|
|
* test/processor.arrays.test.js(顺序与类型/缺失校验)
|
|||
|
|
|
|||
|
|
* 如需:test/db.insert.arrays.test.js(占位符与 rowCount 校验)
|
|||
|
|
|
|||
|
|
* 文档:
|
|||
|
|
|
|||
|
|
* docs/kafka-heartbeat-producer.md(数组示例)
|
|||
|
|
|
|||
|
|
* docs/db-heartbeat-schema.md(新增列与索引查询示例)
|
|||
|
|
|
|||
|
|
## 实施步骤
|
|||
|
|
|
|||
|
|
1. 编写 OpenSpec 变更(ADDED Requirements + 场景)并验证
|
|||
|
|
2. 数据库父表追加列 + 创建分区索引(初始化流程集成)
|
|||
|
|
3. Processor:实现解析/校验/聚合/转换;保持与现有 v2 字段兼容
|
|||
|
|
4. DB 写入:扩展 insertHeartbeatEvents 列与占位符映射
|
|||
|
|
5. 编写并运行单元测试;必要时写端到端冒烟
|
|||
|
|
6. 更新文档;准备部署说明(分区索引创建时间提示)
|
|||
|
|
|
|||
|
|
请确认以上方案,确认后我将按该计划开始实施(含规格增量、代码修改、测试与文档)。
|