From 7d5b9c50ea7b4b3f5037cd6ba74b0bb8b8055251 Mon Sep 17 00:00:00 2001 From: MomoWen Date: Wed, 14 Jan 2026 19:38:02 +0800 Subject: [PATCH] =?UTF-8?q?refactor(openspec):=20=E5=BD=92=E6=A1=A3?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E6=95=B0=E6=8D=AE=E5=BA=93v2=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E7=9B=B8=E5=85=B3=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将update-heartbeat-db-v2目录下的文档迁移至archive目录 更新specs目录下的相关规范文件以反映最新变更 --- .../proposal.md | 0 .../specs/db/spec.md | 0 .../specs/kafka/spec.md | 0 .../specs/processor/spec.md | 39 +++++++++++++++++++ .../tasks.md | 2 +- .../specs/processor/spec.md | 21 ---------- openspec/specs/db/spec.md | 35 +++++++++-------- openspec/specs/kafka/spec.md | 24 +++++++++++- openspec/specs/processor/spec.md | 13 ++++--- 9 files changed, 89 insertions(+), 45 deletions(-) rename openspec/changes/{update-heartbeat-db-v2 => archive/2026-01-14-update-heartbeat-db-v2}/proposal.md (100%) rename openspec/changes/{update-heartbeat-db-v2 => archive/2026-01-14-update-heartbeat-db-v2}/specs/db/spec.md (100%) rename openspec/changes/{update-heartbeat-db-v2 => archive/2026-01-14-update-heartbeat-db-v2}/specs/kafka/spec.md (100%) create mode 100644 openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/processor/spec.md rename openspec/changes/{update-heartbeat-db-v2 => archive/2026-01-14-update-heartbeat-db-v2}/tasks.md (90%) delete mode 100644 openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md diff --git a/openspec/changes/update-heartbeat-db-v2/proposal.md b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/proposal.md similarity index 100% rename from openspec/changes/update-heartbeat-db-v2/proposal.md rename to openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/proposal.md diff --git a/openspec/changes/update-heartbeat-db-v2/specs/db/spec.md b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/db/spec.md similarity index 100% rename from openspec/changes/update-heartbeat-db-v2/specs/db/spec.md rename to openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/db/spec.md diff --git a/openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/kafka/spec.md similarity index 100% rename from openspec/changes/update-heartbeat-db-v2/specs/kafka/spec.md rename to openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/kafka/spec.md diff --git a/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/processor/spec.md b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/processor/spec.md new file mode 100644 index 0000000..f76cb1f --- /dev/null +++ b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/specs/processor/spec.md @@ -0,0 +1,39 @@ +## MODIFIED Requirements + +### Requirement: 心跳数据解包 +系统 MUST 能够将 Kafka 消息 value 解码/解压并还原为 JSON 对象或数组,支持两层以内的编码/压缩组合。 + +#### Scenario: 支持常见编码/压缩(两层以内) +- **WHEN** Kafka 消息 value 为下列任意形式时: + - UTF-8 JSON(对象或数组) + - base64(二进制) + - gzip / deflate(zlib) / deflate(raw) / brotli 压缩后的二进制 +- **THEN** 系统应当按“最多两层”的策略尝试解码/解压 +- **AND** 成功时应还原为 JSON 对象或数组 +- **AND** 失败时应记录错误并丢弃该消息 + +#### Scenario: 支持包装结构 +- **WHEN** 解包得到的 JSON 为包装结构(例如包含 `data`/`payload`/`body` 字段) +- **THEN** 系统应优先提取其中的对象作为心跳数据源 + +#### Scenario: 解包有效心跳数据 +- **WHEN** 接收到有效格式的Kafka心跳消息时 +- **THEN** 系统应该成功解包消息 +- **AND** 提取出心跳数据的各个字段 + +#### Scenario: 解包无效心跳数据 +- **WHEN** 接收到无效格式的Kafka心跳消息时 +- **THEN** 系统应该返回解包错误 +- **AND** 记录错误日志 + +### Requirement: 心跳数据转换 +系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 + +#### Scenario: 转换为 v2 明细表字段 +- **WHEN** 心跳数据验证通过时 +- **THEN** 系统应输出与 v2 明细表字段一致的数据结构 +- **AND** 添加必要的元数据 + +#### Scenario: 缺失必填字段 +- **WHEN** 心跳数据缺失必填字段时 +- **THEN** 系统应判定为无效数据并丢弃 diff --git a/openspec/changes/update-heartbeat-db-v2/tasks.md b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/tasks.md similarity index 90% rename from openspec/changes/update-heartbeat-db-v2/tasks.md rename to openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/tasks.md index b0c6e87..f103fac 100644 --- a/openspec/changes/update-heartbeat-db-v2/tasks.md +++ b/openspec/changes/archive/2026-01-14-update-heartbeat-db-v2/tasks.md @@ -4,7 +4,7 @@ - [x] 实现按 `ts_ms` 日分区与自动建分区机制 - [x] 补充性能建议(索引策略、分区影响、聚合/物化视图建议) - [x] 产出 docs:DB 表结构文档 + Kafka 生产者推送数据结构与方式 -- [ ] Processor:实现 Kafka 心跳 value 的两层解码/解压与反序列化(需要对端样本/算法确认) +- [x] Processor:实现 Kafka 心跳 value 的两层解码/解压与反序列化(需要对端样本/算法确认) ## 2. Validation - [x] 在可访问的 PostgreSQL 环境执行脚本并验证对象创建成功 diff --git a/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md b/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md deleted file mode 100644 index c5ff9fa..0000000 --- a/openspec/changes/update-heartbeat-db-v2/specs/processor/spec.md +++ /dev/null @@ -1,21 +0,0 @@ -## MODIFIED Requirements - -### Requirement: Kafka 心跳消息解码/解压 -系统 MUST 能够将 Kafka 消息 value 解码为 JSON 对象或数组,并支持两层以内的编码/压缩组合。 - -#### Scenario: 两层以内解码成功 -- **WHEN** Kafka 消息 value 为 UTF-8 JSON、base64(二进制)、或 gzip/deflate/raw-deflate/brotli 压缩二进制 -- **THEN** 系统应按“最多两层”策略尝试解码/解压 -- **AND** 成功时得到 JSON 对象或数组 - -#### Scenario: 解码失败 -- **WHEN** Kafka 消息 value 无法被解码/解压为 JSON -- **THEN** 系统应记录错误并丢弃该消息 - -### Requirement: 心跳数据转换 -系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 - -#### Scenario: 转换为 v2 明细表字段 -- **WHEN** 心跳数据验证通过时 -- **THEN** 系统应输出与 v2 明细表字段一致的数据结构 -- **AND** 缺失必填字段时应判定为无效数据并丢弃 diff --git a/openspec/specs/db/spec.md b/openspec/specs/db/spec.md index ff3d003..b8f4032 100644 --- a/openspec/specs/db/spec.md +++ b/openspec/specs/db/spec.md @@ -2,9 +2,7 @@ ## Purpose 本规范定义本服务对 PostgreSQL 的连接池配置、表结构初始化(含分区表)、分区预创建维护策略、批量写入与约束错误处理等行为。 - ## Requirements - ### Requirement: 数据库连接管理 系统 MUST 能够建立和维护与 PostgreSQL 数据库的连接。 @@ -48,21 +46,17 @@ ### Requirement: 数据库表结构管理 系统 MUST 提供数据库表结构的定义和管理机制。 -#### Scenario: 表结构初始化 -- **WHEN** 系统首次启动时 -- **THEN** 系统应该检查数据库表是否存在 -- **AND** 不存在时应该创建表结构 +#### Scenario: 表结构初始化(高吞吐分区表) +- **WHEN** 系统首次启动或部署数据库时 +- **THEN** 应该存在按 `ts_ms` 日分区的心跳明细表 +- **AND** 必填字段应具备 NOT NULL 约束 +- **AND** 状态类字段应具备 CHECK 约束(限制取值范围) +- **AND** 必需索引应存在(hotel_id/power_state/guest_type/device_id B-tree;service_mask BRIN) -#### Scenario: 分区预创建(无人值守) -- **WHEN** 系统启动完成数据库初始化后 -- **THEN** 系统应该预创建未来一段时间(例如未来 30 天)的日分区 -- **AND** 系统应该周期性执行该预创建以保证长期运行不中断 -- **AND** 当分区预创建失败时应记录错误日志 - -#### Scenario: 表结构迁移 -- **WHEN** 表结构需要变更时 -- **THEN** 系统应该支持平滑的表结构迁移 -- **AND** 不影响现有数据 +#### Scenario: 自动分区 +- **WHEN** 写入某天数据而该日分区不存在 +- **THEN** 系统应能够自动创建对应日分区或确保分区被预创建 +- **AND** 不应影响持续写入(高吞吐场景) ### Requirement: 数据查询支持 系统 MUST 支持基本的数据查询操作,用于监控和调试。 @@ -76,3 +70,12 @@ - **WHEN** 需要按特定条件查询心跳数据时 - **THEN** 系统应该支持条件过滤 - **AND** 返回符合条件的数据 + +### Requirement: 高吞吐写入友好 +系统 MUST 在高吞吐场景(约 5 万条/分钟量级)下避免单点瓶颈。 + +#### Scenario: 批量写入与分区裁剪 +- **WHEN** 进行批量写入 +- **THEN** 写入应路由到正确日分区 +- **AND** 常见查询(hotel_id + 时间范围)应触发分区裁剪 + diff --git a/openspec/specs/kafka/spec.md b/openspec/specs/kafka/spec.md index c4f46b5..b77d596 100644 --- a/openspec/specs/kafka/spec.md +++ b/openspec/specs/kafka/spec.md @@ -2,9 +2,7 @@ ## Purpose 本规范定义本服务如何连接 Kafka 集群、订阅主题并消费消息(以 buffer 形式透传 payload),以及错误处理/重连与消费确认语义。 - ## Requirements - ### Requirement: Kafka连接管理 系统 MUST 能够建立和维护与 Kafka 集群的连接。 @@ -46,3 +44,25 @@ #### Scenario: 路由有效消息 - **WHEN** 接收到有效格式的心跳消息时 - **THEN** 系统应该将消息路由到正确的处理器 + +### Requirement: 心跳消息载荷格式(生产者约束) +Kafka 心跳消息 MUST 包含数据库落库所需的必填字段,并采用 UTF-8 编码。 + +#### Scenario: JSON 心跳消息 +- **WHEN** 生产者向主题推送心跳消息 +- **THEN** 消息 value 应为 JSON(UTF-8) +- **AND** 至少包含 ts_ms、hotel_id、room_id、device_id、ip、power_state、guest_type、cardless_state、service_mask、pms_state、carbon_state、device_count、comm_seq +- **AND** 可选包含 extra(json object) + +### Requirement: 分区键友好的 Kafka Key +系统 MUST 支持使用 `hotel_id:device_id` 作为 Kafka message key 以获得更好的分区与有序性。 + +#### Scenario: 缺失 key 仍可处理 +- **WHEN** 消息未携带 key +- **THEN** 系统仍应能够消费与处理该消息 + +#### Scenario: 使用 device_id 作为 key +- **WHEN** 生产者发送消息 +- **THEN** 建议使用 `hotel_id:device_id` 作为 Kafka message key +- **AND** 以提升同设备有序性与消费侧批量聚合效率 + diff --git a/openspec/specs/processor/spec.md b/openspec/specs/processor/spec.md index b525e28..5802f90 100644 --- a/openspec/specs/processor/spec.md +++ b/openspec/specs/processor/spec.md @@ -2,11 +2,9 @@ ## Purpose 本规范定义心跳处理器对 Kafka 消息 value 的解码/解压(含两层以内组合)、字段校验、转换为分区表写入结构,以及批量写库与失败丢弃/记录策略。 - ## Requirements - ### Requirement: 心跳数据解包 -系统 MUST 能够解包 Kafka 消息中的心跳数据。 +系统 MUST 能够将 Kafka 消息 value 解码/解压并还原为 JSON 对象或数组,支持两层以内的编码/压缩组合。 #### Scenario: 支持常见编码/压缩(两层以内) - **WHEN** Kafka 消息 value 为下列任意形式时: @@ -48,11 +46,15 @@ ### Requirement: 心跳数据转换 系统 MUST 能够将解包后的心跳数据转换为数据库存储格式。 -#### Scenario: 转换心跳数据格式 +#### Scenario: 转换为 v2 明细表字段 - **WHEN** 心跳数据验证通过时 -- **THEN** 系统应该将数据转换为数据库表结构所需的格式 +- **THEN** 系统应输出与 v2 明细表字段一致的数据结构 - **AND** 添加必要的元数据 +#### Scenario: 缺失必填字段 +- **WHEN** 心跳数据缺失必填字段时 +- **THEN** 系统应判定为无效数据并丢弃 + ### Requirement: 批量处理支持 系统 MUST 支持批量处理心跳数据,提高处理效率。 @@ -64,3 +66,4 @@ #### Scenario: Kafka 单条消息携带批量心跳 - **WHEN** Kafka 消息 value 为 JSON 数组(批量心跳) - **THEN** 系统应将数组内每条心跳作为独立项进入批处理队列 +