feat: 扩展心跳消息支持电力与空调设备数组字段

新增 Kafka 消息中 electricity[] 和 air_conditioner[] 数组字段支持,用于存储电力与空调设备明细数据。数据库表新增对应数组列并创建 GIN 索引优化查询性能,processor 实现数组字段校验与聚合转换逻辑。

主要变更:
- Kafka 消息规范新增 electricity 和 air_conditioner 数组字段定义
- 数据库 heartbeat_events 表新增 14 个数组列并创建 4 个 GIN 索引
- processor 实现数组字段解析、校验及聚合转换逻辑
- 更新相关文档与测试用例,确保端到端功能完整
This commit is contained in:
2026-01-16 14:45:36 +08:00
parent 7d5b9c50ea
commit 455185ac5d
16 changed files with 693 additions and 8 deletions

View File

@@ -0,0 +1,202 @@
## 目标与范围
* 扩展 Kafka 心跳消息以支持 electricity\[] 与 air\_conditioner\[] 两类数组字段,并保持顺序一致性
* 数据库 heartbeat.heartbeat\_events 新增对应的数组列,并为指定列建立“针对数组元素”的索引
* Processor 增强:解析/校验/转换并输出数组列DB 写入批量适配
* 补充 OpenSpec 规格、文档与测试,完成端到端验证
## OpenSpec 变更
* 创建变更changes`update-heartbeat-arrays-v3`
* 修改以下规格文件(新增/修改 Requirements + 场景):
* specs/kafka/spec.md
* ADDEDKafka 消息 MUST 支持 electricity\[] 与 air\_conditioner\[]
* electricity.item 结构:{ address:string, voltage:double, ampere:double, power:double, phase:string, energy:double, sum\_energy:double }
* air\_conditioner.item 结构:{ address:string, state:int2, model:int2, speed:int2, set\_temp:int2, now\_temp:int2, solenoid\_valve:int2 }
* specs/processor/spec.md
* ADDED数组聚合转换规则逐项按原始顺序提取出列数组顺序一致性缺失/类型校验;转换到 DB 列
* specs/db/spec.md
* ADDED表结构新增数组列为 elec\_address(text\[]), air\_address(text\[]), state(int2\[]), model(int2\[]) 建立 GIN 索引(数组元素级查询建议 `@> ARRAY[...]`
* specs/redis/spec.md无需改动协议不变
* docs 更新:
* docs/kafka-heartbeat-producer.md补充数组字段示例与类型说明示例以数值 ampere 为准;若上游偶有字符串,需按数值可解析策略)
* docs/db-heartbeat-schema.md补充新增列与索引说明、查询示例`col @> ARRAY['addr']``state @> ARRAY[1]`
## 数据库结构调整
* 在父分区表 heartbeat.heartbeat\_events 添加新列(所有分区自动继承):
* 电力类:
* elec\_address text\[]{索引}
* voltage double precision\[]
* ampere double precision\[]
* power double precision\[]
* phase text\[]
* energy double precision\[]
* sum\_energy double precision\[]
* 空调类:
* air\_address text\[]{索引}
* state int2\[] {索引}
* model int2\[] {索引}
* speed int2\[]
* set\_temp int2\[]
* now\_temp int2\[]
* solenoid\_valve int2\[]
* 索引(在父表上创建分区索引):
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_elec\_address\_gin ON heartbeat.heartbeat\_events USING GIN (elec\_address);
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_air\_address\_gin ON heartbeat.heartbeat\_events USING GIN (air\_address);
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_state\_gin ON heartbeat.heartbeat\_events USING GIN (state);
* CREATE INDEX IF NOT EXISTS idx\_he\_events\_model\_gin ON heartbeat.heartbeat\_events USING GIN (model);
* 约束与空值策略:
* 新增数组列默认可为空NULL长度不限制类型按上述定义
* 不引入新 CHECK 约束(避免上游演进时写入失败);后续可在枚举稳定后收紧
* 迁移实现:
* 在 initTables 后追加“ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...”
* 创建分区索引父表并确认落于各分区PostgreSQL 11+ 支持分区索引)
## Processor 处理逻辑调整
* 解析:
* 在 normalizeHeartbeat 阶段读取 obj.electricity 与 obj.air\_conditioner兼容 camelCase/PascalCase
* 转换:
* electricity\[]按原始顺序聚合为列elec\_address\[], voltage\[], ampere\[], power\[], phase\[], energy\[], sum\_energy\[]
* air\_conditioner\[]聚合为列air\_address\[], state\[], model\[], speed\[], set\_temp\[], now\_temp\[], solenoid\_valve\[]
* 值类型转换:
* address/phase → stringtrim 且空串视为缺失)
* 数值列:若为 string 可解析为数字则接受;不可解析则标记为缺失(跳过该项)
* 顺序保证:按输入数组原始顺序放入各列数组;对缺失字段以“跳过该项”或“插入 null”策略统一建议插入 null 保持长度对齐)
* 校验:
* 若两类数组存在,则至少 address 有内容;其余数值/枚举字段允许包含 null
* 不影响 v2 原有必填字段校验
* 输出:
* 在 transformData 返回对象中携带新增数组列insertHeartbeatEvents 扩展列清单并写入
## Kafka 消费者调整
* Consumer 本身不改(仍以 buffer 透传)
## 写库逻辑调整
* insertHeartbeatEvents
* 扩展 columns 列表与 values/占位符生成数组列直接作为参数传入pg 支持将 JS 数组映射到 Postgres 数组)
* 事务语义与分区预建逻辑保持不变
## 校验与质量保证
* 顺序一致性:
* 单元测试:给定 3 项 electricity/air\_conditioner 输入,断言各列数组按顺序聚合
* 类型与缺失校验:
* 单元测试:混合数值字符串("3.2")与非法字符串("N/A"),确保解析并对非法项填 null数组长度与 address 对齐
* 端到端验证:
* 伪造 Kafka message.valueUTF-8 JSON走 Processor → DB 插入(使用测试隔离库或事务回滚)
* 文档更新:
* 更新 APIProducer与 DB schema 文档;给出查询示例:
* `SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];`
* `SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1];`
## 风险与兼容
* 分区索引构建时间与磁盘占用:
* 在父表创建分区索引;按分区数指数增长,需结合实际分区策略与 autovacuum 调优
## 交付清单
* OpenSpec 变更集:`openspec/changes/update-heartbeat-arrays-v3/`proposal、tasks、spec deltas
* 代码修改:
* src/processor/heartbeatProcessor.jsnormalize+transform+校验与聚合)
* src/db/databaseManager.js新增列 + 索引 + insert 扩展)
* src/kafka/consumer.js无需改动
* 测试:
* test/processor.arrays.test.js顺序与类型/缺失校验)
* 如需test/db.insert.arrays.test.js占位符与 rowCount 校验)
* 文档:
* docs/kafka-heartbeat-producer.md数组示例
* docs/db-heartbeat-schema.md新增列与索引查询示例
## 实施步骤
1. 编写 OpenSpec 变更ADDED Requirements + 场景)并验证
2. 数据库父表追加列 + 创建分区索引(初始化流程集成)
3. Processor实现解析/校验/聚合/转换;保持与现有 v2 字段兼容
4. DB 写入:扩展 insertHeartbeatEvents 列与占位符映射
5. 编写并运行单元测试;必要时写端到端冒烟
6. 更新文档;准备部署说明(分区索引创建时间提示)
请确认以上方案,确认后我将按该计划开始实施(含规格增量、代码修改、测试与文档)。

View File

@@ -22,9 +22,9 @@
| id | bigserial | 否(自动生成) | 自增序列号(写入时可不提供) |
| ts_ms | bigint | 是 | 毫秒级时间戳epoch ms |
| hotel_id | int2 | 是 | 酒店编号 |
| room_id | int4 | 是 | 房间编号(或房间唯一标识) |
| room_id | varchar(50) | 是 | 房间编号(或房间唯一标识,按字符串存储 |
| device_id | varchar(64) | 是 | 设备 ID序列号/MAC/混合编码);如明确为纯数字可改 bigint |
| ip | inet | 是 | 设备/上报方 IPPostgreSQL inet 类型自带格式校验) |
| ip | varchar(21) | 是 | `IP:PORT`IPv4或纯 IP 字符串 |
| power_state | int2 | 是 | 取电状态(枚举值待标准化) |
| guest_type | int2 | 是 | 住客身份(住客/空房/保洁/维修等,枚举值待标准化) |
| cardless_state | int2 | 是 | 无卡取电/无卡策略状态(枚举待定) |
@@ -33,6 +33,20 @@
| carbon_state | int2 | 是 | 碳控状态(枚举待定) |
| device_count | int2 | 是 | 设备数量/上报设备数量(语义待确认) |
| comm_seq | int4 | 是 | 通讯序号(语义待确认) |
| elec_address | text[] | 否 | 电力设备地址数组(与 voltage[] 等按下标对齐) |
| voltage | double precision[] | 否 | 电压数组 |
| ampere | double precision[] | 否 | 电流数组 |
| power | double precision[] | 否 | 功率数组 |
| phase | text[] | 否 | 相位数组 |
| energy | double precision[] | 否 | 能耗数组 |
| sum_energy | double precision[] | 否 | 总能耗数组 |
| air_address | text[] | 否 | 空调设备地址数组(与 state[] 等按下标对齐) |
| state | int2[] | 否 | 开关状态数组 |
| model | int2[] | 否 | 运行模式数组 |
| speed | int2[] | 否 | 风速设置数组 |
| set_temp | int2[] | 否 | 设定温度数组 |
| now_temp | int2[] | 否 | 当前温度数组 |
| solenoid_valve | int2[] | 否 | 电磁阀门状态数组 |
| extra | jsonb | 否 | 可扩展字段:电参/空调状态/版本/来源等 |
### 2.2 约束
@@ -67,6 +81,15 @@
- B-tree`hotel_id`, `power_state`, `guest_type`, `device_id`
- BRIN`service_mask`
新增(数组元素查询):
- GIN`elec_address`, `air_address`, `state`, `model`
常用查询示例:
- `SELECT * FROM heartbeat.heartbeat_events WHERE elec_address @> ARRAY['add11'];`
- `SELECT * FROM heartbeat.heartbeat_events WHERE air_address @> ARRAY['ac1'];`
- `SELECT * FROM heartbeat.heartbeat_events WHERE state @> ARRAY[1::int2];`
- `SELECT * FROM heartbeat.heartbeat_events WHERE model @> ARRAY[2::int2];`
额外建议(脚本默认包含,可按需移除):
- `btree (hotel_id, ts_ms)`:覆盖最常见过滤(酒店 + 时间范围),显著提升检索与分区内扫描效率。

View File

@@ -21,7 +21,7 @@
|---|---|---|---|
| ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 |
| hotel_id | number/int | 12 | 酒店编号int2 范围内) |
| room_id | number/int | 1203 | 房间编号/房间标识(int4 |
| room_id | string/number | "1203" | 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50) |
| device_id | string | "A1B2C3D4" | 设备唯一 ID序列号/MAC/自定义编码) |
| ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21) |
| power_state | number/int | 1 | 取电状态(枚举值需统一标准) |
@@ -37,6 +37,8 @@
| 字段 | 类型 | 示例 | 说明 |
|---|---|---|---|
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 |
| electricity | array<object> | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) |
| air_conditioner | array<object> | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) |
## 4. JSON 示例
```json
@@ -54,6 +56,28 @@
"carbon_state": 0,
"device_count": 1,
"comm_seq": 7,
"electricity": [
{
"address": "add11",
"voltage": 3.2,
"ampere": 1.1,
"power": 704.3,
"phase": "A",
"energy": 10.5,
"sum_energy": 100.5
}
],
"air_conditioner": [
{
"address": "ac1",
"state": 1,
"model": 2,
"speed": 3,
"set_temp": 26,
"now_temp": 25,
"solenoid_valve": 1
}
],
"extra": {
"source": "gw",
"ver": "1.2.3",
@@ -78,4 +102,7 @@
## 6. 与数据库字段的映射
服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform
- 必填字段:与表字段同名
- 弹性字段:写入 `extra`jsonb
- 扩展数组字段:
- electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
- air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
- 弹性字段:其余未知字段写入 `extra`jsonb

View File

@@ -0,0 +1,14 @@
# Change: 扩展心跳消息与分区表以支持电力/空调数组v3
## Why
当前心跳数据仅落库基础字段与 extra无法按需查询与聚合“电力设备/空调设备”的细粒度状态;需要将数组结构拆列为数据库数组列,并对指定列提供数组元素索引。
## What Changes
- **Kafka**:新增可选字段 `electricity[]``air_conditioner[]` 以及元素字段定义
- **Processor**:新增数组字段类型校验与“按原始顺序聚合为列数组”的转换规则
- **DB**:在 `heartbeat.heartbeat_events` 新增数组列,并为 `elec_address/air_address/state/model` 建立 GIN 索引(数组元素查询)
- **Docs/Tests**:更新 producer 与 schema 文档;补充单元测试与冒烟验证
## Impact
- Affected specs: `openspec/specs/kafka`, `openspec/specs/processor`, `openspec/specs/db`
- Affected code: `src/processor/heartbeatProcessor.js`, `src/db/databaseManager.js`, `scripts/db/*.sql`, `docs/*`, `test/*`

View File

@@ -0,0 +1,11 @@
## ADDED Requirements
### Requirement: 分区表新增数组列与数组元素索引
系统 SHALL 在 `heartbeat.heartbeat_events` 中新增用于存储电力与空调子设备的数组列,并为指定数组列提供数组元素级查询索引。
#### Scenario: 新增数组列
- **WHEN** 部署或升级数据库结构时
- **THEN** 表应包含 elec_address、air_address、voltage、ampere、power、phase、energy、sum_energy、state、model、speed、set_temp、now_temp、solenoid_valve
#### Scenario: 数组元素索引
- **WHEN** 需要按 elec_address/air_address/state/model 的数组元素进行查询
- **THEN** 数据库应具备 GIN 索引以优化包含类查询

View File

@@ -0,0 +1,13 @@
## ADDED Requirements
### Requirement: 心跳消息扩展数组字段
Kafka 心跳消息 SHALL 支持携带电力与空调子设备的数组字段,以便消费端拆列落库与查询优化。
#### Scenario: 携带 electricity 数组
- **WHEN** 生产者需要上报电力设备明细时
- **THEN** 消息可选包含 `electricity`array of object
- **AND** 每个元素包含 address、voltage、ampere、power、phase、energy、sum_energy
#### Scenario: 携带 air_conditioner 数组
- **WHEN** 生产者需要上报空调设备明细时
- **THEN** 消息可选包含 `air_conditioner`array of object
- **AND** 每个元素包含 address、state、model、speed、set_temp、now_temp、solenoid_valve

View File

@@ -0,0 +1,19 @@
## ADDED Requirements
### Requirement: 数组字段聚合为列数组
系统 SHALL 将 `electricity[]``air_conditioner[]` 按原始顺序聚合为数据库写入结构的列数组。
#### Scenario: electricity 聚合
- **WHEN** 输入包含 `electricity` 数组
- **THEN** 输出应包含 elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
- **AND** 各数组下标与输入数组下标一一对应
#### Scenario: air_conditioner 聚合
- **WHEN** 输入包含 `air_conditioner` 数组
- **THEN** 输出应包含 air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
- **AND** 各数组下标与输入数组下标一一对应
#### Scenario: 类型与缺失处理
- **WHEN** electricity 或 air_conditioner 存在但不是数组
- **THEN** 系统应丢弃该消息并记录错误
- **WHEN** 数组元素字段缺失或无法转换
- **THEN** 系统应保持长度对齐并写入 null

View File

@@ -0,0 +1,11 @@
## 1. Implementation
- [ ] 1.1 更新 OpenSpec 增量规范Kafka/Processor/DB
- [ ] 1.2 数据库脚本与初始化:新增数组列与索引
- [ ] 1.3 Processor数组字段校验与聚合转换
- [ ] 1.4 DB 写入:扩展批量 insert 列与参数
- [ ] 1.5 测试:新增单元测试覆盖顺序与类型/缺失处理
- [ ] 1.6 文档:更新 producer 与 DB schema
## 2. Validation
- [ ] 2.1 运行单元测试与 lint
- [ ] 2.2 在可用 PostgreSQL 环境执行冒烟(含数组列插入)

View File

@@ -58,6 +58,21 @@
- **THEN** 系统应能够自动创建对应日分区或确保分区被预创建
- **AND** 不应影响持续写入(高吞吐场景)
### Requirement: 数组字段存储与索引
系统 MUST 支持将电力与空调子设备数据以数组列形式存储,并为指定数组列建立针对元素查询的索引。
#### Scenario: 新增数组列用于存储电力与空调子设备数据
- **WHEN** 系统初始化 v2 心跳明细表结构时
- **THEN** 表结构应包含以下新增列:
- elec_addresstext[]
- air_addresstext[]
- voltagedouble precision[]、amperedouble precision[]、powerdouble precision[]、phasetext[]、energydouble precision[]、sum_energydouble precision[]
- stateint2[]、modelint2[]、speedint2[]、set_tempint2[]、now_tempint2[]、solenoid_valveint2[]
#### Scenario: 针对数组元素的索引
- **WHEN** 对 elec_address、air_address、state、model 执行“元素包含”类查询时
- **THEN** 系统应提供 GIN 索引以优化查询
### Requirement: 数据查询支持
系统 MUST 支持基本的数据查询操作,用于监控和调试。

View File

@@ -53,6 +53,32 @@ Kafka 心跳消息 MUST 包含数据库落库所需的必填字段,并采用 U
- **THEN** 消息 value 应为 JSONUTF-8
- **AND** 至少包含 ts_ms、hotel_id、room_id、device_id、ip、power_state、guest_type、cardless_state、service_mask、pms_state、carbon_state、device_count、comm_seq
- **AND** 可选包含 extrajson object
- **AND** 可选包含 electricityarray of object
- **AND** 可选包含 air_conditionerarray of object
#### Scenario: electricity 数组字段结构
- **WHEN** 生产者在心跳消息中携带 electricity 字段时
- **THEN** electricity MUST 为数组
- **AND** 数组每个元素 MUST 为对象,包含:
- addressstring设备地址
- voltagedouble电压
- amperedouble电流
- powerdouble功率
- phasestring相位
- energydouble能耗
- sum_energydouble总能耗
#### Scenario: air_conditioner 数组字段结构
- **WHEN** 生产者在心跳消息中携带 air_conditioner 字段时
- **THEN** air_conditioner MUST 为数组
- **AND** 数组每个元素 MUST 为对象,包含:
- addressstring设备地址
- stateint2开关状态
- modelint2运行模式
- speedint2风速设置
- set_tempint2设定温度
- now_tempint2当前温度
- solenoid_valveint2电磁阀门状态
### Requirement: 分区键友好的 Kafka Key
系统 MUST 支持使用 `hotel_id:device_id` 作为 Kafka message key 以获得更好的分区与有序性。

View File

@@ -55,6 +55,29 @@
- **WHEN** 心跳数据缺失必填字段时
- **THEN** 系统应判定为无效数据并丢弃
### Requirement: 数组字段聚合转换
系统 MUST 支持将 electricity[] 与 air_conditioner[] 的对象数组聚合为数据库的“列数组”,并保持原始顺序一致性。
#### Scenario: electricity 数组聚合为列数组
- **WHEN** 心跳数据包含 electricity对象数组
- **THEN** 系统应按输入数组原始顺序提取并聚合为:
- elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
- **AND** 同一条心跳记录内,上述各数组下标必须与输入 electricity 的下标一一对应
#### Scenario: air_conditioner 数组聚合为列数组
- **WHEN** 心跳数据包含 air_conditioner对象数组
- **THEN** 系统应按输入数组原始顺序提取并聚合为:
- air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
- **AND** 同一条心跳记录内,上述各数组下标必须与输入 air_conditioner 的下标一一对应
#### Scenario: 数组字段缺失与类型校验
- **WHEN** electricity 或 air_conditioner 字段存在但类型不为数组时
- **THEN** 系统应判定该条消息为无效并丢弃
- **AND** 记录错误日志
- **WHEN** 数组元素字段缺失或类型不匹配但消息其余必填字段有效时
- **THEN** 系统应保持数组长度对齐,并对无法转换的单元写入 null
- **AND** 不应破坏其余字段的写入
### Requirement: 批量处理支持
系统 MUST 支持批量处理心跳数据,提高处理效率。

View File

@@ -25,6 +25,21 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
device_count int2 NOT NULL,
comm_seq int4 NOT NULL,
elec_address text[],
air_address text[],
voltage double precision[],
ampere double precision[],
power double precision[],
phase text[],
energy double precision[],
sum_energy double precision[],
state int2[],
model int2[],
speed int2[],
set_temp int2[],
now_temp int2[],
solenoid_valve int2[],
-- 弹性字段:电参/空调等(后续可结构化拆列;当前先放 extra
extra jsonb,
@@ -44,6 +59,21 @@ CREATE TABLE IF NOT EXISTS heartbeat.heartbeat_events (
)
PARTITION BY RANGE (ts_ms);
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[];
-- 指定索引
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state);
@@ -58,4 +88,9 @@ CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.h
-- 若不希望额外索引,可注释掉
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model);
COMMIT;

View File

@@ -43,8 +43,11 @@ async function main() {
`INSERT INTO heartbeat.heartbeat_events (
ts_ms, hotel_id, room_id, device_id, ip,
power_state, guest_type, cardless_state, service_mask,
pms_state, carbon_state, device_count, comm_seq, extra
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)`,
pms_state, carbon_state, device_count, comm_seq,
elec_address, voltage, ampere, power, phase, energy, sum_energy,
air_address, state, model, speed, set_temp, now_temp, solenoid_valve,
extra
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28)`,
[
ts,
1,
@@ -59,6 +62,20 @@ async function main() {
0,
1,
1,
['add11', 'add12'],
[3.2, 3.3],
[1.1, 2.2],
[704.3, 705.3],
['A', 'B'],
[10.5, 11.5],
[100.5, 101.5],
['ac1'],
[1],
[2],
[3],
[26],
[25],
[1],
{ source: 'smoke-test' },
]
);

View File

@@ -95,6 +95,21 @@ class DatabaseManager {
device_count int2 NOT NULL,
comm_seq int4 NOT NULL,
elec_address text[],
air_address text[],
voltage double precision[],
ampere double precision[],
power double precision[],
phase text[],
energy double precision[],
sum_energy double precision[],
state int2[],
model int2[],
speed int2[],
set_temp int2[],
now_temp int2[],
solenoid_valve int2[],
extra jsonb,
CONSTRAINT heartbeat_events_pk PRIMARY KEY (ts_ms, id),
@@ -112,12 +127,31 @@ class DatabaseManager {
)
PARTITION BY RANGE (ts_ms);
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS elec_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS air_address text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS voltage double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS ampere double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS power double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS phase text[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS sum_energy double precision[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS state int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS model int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS speed int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS set_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS now_temp int2[];
ALTER TABLE heartbeat.heartbeat_events ADD COLUMN IF NOT EXISTS solenoid_valve int2[];
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_id ON heartbeat.heartbeat_events (hotel_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_power_state ON heartbeat.heartbeat_events (power_state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_guest_type ON heartbeat.heartbeat_events (guest_type);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_device_id ON heartbeat.heartbeat_events (device_id);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_service_mask_brin ON heartbeat.heartbeat_events USING BRIN (service_mask);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_hotel_ts ON heartbeat.heartbeat_events (hotel_id, ts_ms);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_elec_address_gin ON heartbeat.heartbeat_events USING GIN (elec_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_air_address_gin ON heartbeat.heartbeat_events USING GIN (air_address);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_state_gin ON heartbeat.heartbeat_events USING GIN (state);
CREATE INDEX IF NOT EXISTS idx_heartbeat_events_model_gin ON heartbeat.heartbeat_events USING GIN (model);
-- 分区预创建函数(按 Asia/Shanghai 自然日)
CREATE OR REPLACE FUNCTION heartbeat.day_start_ms_shanghai(p_day date)
@@ -366,6 +400,20 @@ class DatabaseManager {
'carbon_state',
'device_count',
'comm_seq',
'elec_address',
'air_address',
'voltage',
'ampere',
'power',
'phase',
'energy',
'sum_energy',
'state',
'model',
'speed',
'set_temp',
'now_temp',
'solenoid_valve',
'extra',
];
@@ -387,6 +435,20 @@ class DatabaseManager {
e.carbon_state,
e.device_count,
e.comm_seq,
Array.isArray(e.elec_address) ? e.elec_address : null,
Array.isArray(e.air_address) ? e.air_address : null,
Array.isArray(e.voltage) ? e.voltage : null,
Array.isArray(e.ampere) ? e.ampere : null,
Array.isArray(e.power) ? e.power : null,
Array.isArray(e.phase) ? e.phase : null,
Array.isArray(e.energy) ? e.energy : null,
Array.isArray(e.sum_energy) ? e.sum_energy : null,
Array.isArray(e.state) ? e.state : null,
Array.isArray(e.model) ? e.model : null,
Array.isArray(e.speed) ? e.speed : null,
Array.isArray(e.set_temp) ? e.set_temp : null,
Array.isArray(e.now_temp) ? e.now_temp : null,
Array.isArray(e.solenoid_valve) ? e.solenoid_valve : null,
e.extra ?? null
);
const row = columns.map((_, colIndex) => `$${base + colIndex + 1}`).join(', ');

View File

@@ -124,11 +124,31 @@ class HeartbeatProcessor {
}
if (typeof normalized.ip !== 'string' || normalized.ip.length === 0) return false;
if (normalized.electricity !== undefined && normalized.electricity !== null) {
if (!Array.isArray(normalized.electricity)) return false;
for (const item of normalized.electricity) {
if (!item || typeof item !== 'object' || Array.isArray(item)) {
return false;
}
}
}
if (normalized.air_conditioner !== undefined && normalized.air_conditioner !== null) {
if (!Array.isArray(normalized.air_conditioner)) return false;
for (const item of normalized.air_conditioner) {
if (!item || typeof item !== 'object' || Array.isArray(item)) {
return false;
}
}
}
return true;
}
transformData(data) {
return this.normalizeHeartbeat(data);
const n = this.normalizeHeartbeat(data);
const ext = this.aggregateArrays(n);
return { ...n, ...ext };
}
async processBatch() {
@@ -377,6 +397,8 @@ class HeartbeatProcessor {
device_count: pick(['device_count', 'deviceCount', 'DeviceCount']),
comm_seq: pick(['comm_seq', 'commSeq', 'CommSeq']),
extra: pick(['extra', 'Extra']),
electricity: pick(['electricity', 'Electricity']),
air_conditioner: pick(['air_conditioner', 'airConditioner', 'AirConditioner']),
};
const toTrimmedStringOrUndefined = (v) => {
@@ -447,7 +469,9 @@ class HeartbeatProcessor {
'carbon_state','carbonState','CarbonState',
'device_count','deviceCount','DeviceCount',
'comm_seq','commSeq','CommSeq',
'extra','Extra'
'extra','Extra',
'electricity','Electricity',
'air_conditioner','airConditioner','AirConditioner'
].includes(k)
) {
continue;
@@ -457,6 +481,52 @@ class HeartbeatProcessor {
return normalized;
}
aggregateArrays(n) {
const toStrOrNull = (v) => {
if (v === undefined || v === null) return null;
const s = String(v).trim();
return s.length === 0 ? null : s;
};
const toNumOrNull = (v) => {
if (v === undefined || v === null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? v : null;
const s = String(v).trim();
if (s.length === 0) return null;
const num = Number(s);
return Number.isFinite(num) ? num : null;
};
const toIntOrNull = (v) => {
if (v === undefined || v === null) return null;
if (typeof v === 'number') return Number.isFinite(v) ? Math.trunc(v) : null;
const s = String(v).trim();
if (!/^-?\d+$/.test(s)) return null;
const num = Number(s);
return Number.isFinite(num) ? Math.trunc(num) : null;
};
const out = {};
const elec = Array.isArray(n.electricity) ? n.electricity : null;
if (elec && elec.length) {
out.elec_address = elec.map((x) => toStrOrNull(x?.address));
out.voltage = elec.map((x) => toNumOrNull(x?.voltage));
out.ampere = elec.map((x) => toNumOrNull(x?.ampere));
out.power = elec.map((x) => toNumOrNull(x?.power));
out.phase = elec.map((x) => toStrOrNull(x?.phase));
out.energy = elec.map((x) => toNumOrNull(x?.energy));
out.sum_energy = elec.map((x) => toNumOrNull(x?.sum_energy));
}
const ac = Array.isArray(n.air_conditioner) ? n.air_conditioner : null;
if (ac && ac.length) {
out.air_address = ac.map((x) => toStrOrNull(x?.address));
out.state = ac.map((x) => toIntOrNull(x?.state));
out.model = ac.map((x) => toIntOrNull(x?.model));
out.speed = ac.map((x) => toIntOrNull(x?.speed));
out.set_temp = ac.map((x) => toIntOrNull(x?.set_temp));
out.now_temp = ac.map((x) => toIntOrNull(x?.now_temp));
out.solenoid_valve = ac.map((x) => toIntOrNull(x?.solenoid_valve));
}
return out;
}
}
export { HeartbeatProcessor };

117
test/arrays.test.js Normal file
View File

@@ -0,0 +1,117 @@
import assert from 'node:assert/strict';
import { HeartbeatProcessor } from '../src/processor/heartbeatProcessor.js';
function buildBasePayload() {
return {
ts_ms: 1700000000123,
hotel_id: 1,
room_id: '101',
device_id: 'dev-1',
ip: '127.0.0.1:1234',
power_state: 1,
guest_type: 0,
cardless_state: 0,
service_mask: 5,
pms_state: 1,
carbon_state: 0,
device_count: 1,
comm_seq: 7,
};
}
describe('HeartbeatProcessor arrays', () => {
it('aggregates electricity[] into aligned column arrays (keeps order)', () => {
const processor = new HeartbeatProcessor(
{ batchSize: 100, batchTimeout: 1000 },
{ insertHeartbeatEvents: async () => ({ insertedCount: 0 }) }
);
const payload = {
...buildBasePayload(),
electricity: [
{ address: 'add11', ampere: '1.1', power: 704.3, voltage: 3.2, phase: 'A', energy: '10', sum_energy: 100 },
{ address: 'add12', ampere: 2.2, power: '705.3', voltage: '3.3', phase: 'B', energy: 11, sum_energy: '101' },
{ address: 'add13', ampere: 'N/A', power: 706.3, voltage: 3.4, phase: '', energy: null, sum_energy: undefined },
],
};
assert.equal(processor.validateData(payload), true);
const transformed = processor.transformData(payload);
assert.deepEqual(transformed.elec_address, ['add11', 'add12', 'add13']);
assert.deepEqual(transformed.ampere, [1.1, 2.2, null]);
assert.deepEqual(transformed.power, [704.3, 705.3, 706.3]);
assert.deepEqual(transformed.voltage, [3.2, 3.3, 3.4]);
assert.deepEqual(transformed.phase, ['A', 'B', null]);
assert.deepEqual(transformed.energy, [10, 11, null]);
assert.deepEqual(transformed.sum_energy, [100, 101, null]);
});
it('aggregates air_conditioner[] into aligned column arrays (keeps order)', () => {
const processor = new HeartbeatProcessor(
{ batchSize: 100, batchTimeout: 1000 },
{ insertHeartbeatEvents: async () => ({ insertedCount: 0 }) }
);
const payload = {
...buildBasePayload(),
air_conditioner: [
{ address: 'ac1', state: 1, model: 2, speed: 3, set_temp: 26, now_temp: 25, solenoid_valve: 1 },
{ address: 'ac2', state: '0', model: '1', speed: '2', set_temp: '24', now_temp: '23', solenoid_valve: '0' },
{ address: 'ac3', state: 'bad', model: null, speed: undefined, set_temp: '', now_temp: 22, solenoid_valve: 1.2 },
],
};
assert.equal(processor.validateData(payload), true);
const transformed = processor.transformData(payload);
assert.deepEqual(transformed.air_address, ['ac1', 'ac2', 'ac3']);
assert.deepEqual(transformed.state, [1, 0, null]);
assert.deepEqual(transformed.model, [2, 1, null]);
assert.deepEqual(transformed.speed, [3, 2, null]);
assert.deepEqual(transformed.set_temp, [26, 24, null]);
assert.deepEqual(transformed.now_temp, [25, 23, 22]);
assert.deepEqual(transformed.solenoid_valve, [1, 0, 1]);
});
it('rejects non-array electricity/air_conditioner', () => {
const processor = new HeartbeatProcessor(
{ batchSize: 100, batchTimeout: 1000 },
{ insertHeartbeatEvents: async () => ({ insertedCount: 0 }) }
);
const bad1 = { ...buildBasePayload(), electricity: { address: 'x' } };
const bad2 = { ...buildBasePayload(), air_conditioner: { address: 'y' } };
assert.equal(processor.validateData(bad1), false);
assert.equal(processor.validateData(bad2), false);
});
it('end-to-end: message buffer -> processMessage -> insertHeartbeatEvents payload includes arrays', async () => {
let captured = null;
const db = {
insertHeartbeatEvents: async (events) => {
captured = events;
return { insertedCount: events.length };
},
};
const processor = new HeartbeatProcessor({ batchSize: 1, batchTimeout: 1000 }, db);
const payload = {
...buildBasePayload(),
electricity: [{ address: 'add11', voltage: 3.2, ampere: 1.1, power: 704.3, phase: 'A', energy: 10, sum_energy: 100 }],
air_conditioner: [{ address: 'ac1', state: 1, model: 2, speed: 3, set_temp: 26, now_temp: 25, solenoid_valve: 1 }],
};
const message = { value: Buffer.from(JSON.stringify(payload), 'utf8') };
const res = await processor.processMessage(message);
assert.deepEqual(res, { insertedCount: 1 });
assert.ok(Array.isArray(captured));
assert.equal(captured.length, 1);
assert.deepEqual(captured[0].elec_address, ['add11']);
assert.deepEqual(captured[0].air_address, ['ac1']);
assert.deepEqual(captured[0].state, [1]);
assert.deepEqual(captured[0].model, [2]);
});
});