# Kafka消息处理规范 ## Purpose 本规范定义本服务如何连接 Kafka 集群、订阅主题并消费消息(以 buffer 形式透传 payload),以及错误处理/重连与消费确认语义。 ## Requirements ### Requirement: Kafka连接管理 系统 MUST 能够建立和维护与 Kafka 集群的连接。 #### Scenario: 成功连接Kafka集群 - **WHEN** 系统启动时 - **THEN** 应该成功连接到配置的Kafka集群 - **AND** 应该监控连接状态 #### Scenario: Kafka连接断开重连 - **WHEN** Kafka连接断开时 - **THEN** 系统应该自动尝试重连 - **AND** 重连失败时应该记录错误日志 ### Requirement: 心跳消息消费 系统 MUST 能够消费 Kafka 队列中的心跳消息。 #### Scenario: 消费心跳消息 - **WHEN** Kafka队列中有心跳消息时 - **THEN** 系统应该消费该消息 - **AND** 将消息传递给处理器进行解包 #### Scenario: 二进制 payload 透传 - **WHEN** Kafka 消息 value 可能为二进制压缩数据(非纯文本) - **THEN** Consumer 应使用 buffer 方式接收 value/key - **AND** 将原始 buffer 交由 Processor 执行解码/解压与反序列化 #### Scenario: 消息消费确认 - **WHEN** 消息处理完成后 - **THEN** 系统应该向Kafka确认消息已消费 ### Requirement: 消息过滤与路由 系统 MUST 能够根据消息类型过滤和路由心跳消息。 #### Scenario: 过滤无效消息 - **WHEN** 接收到无效格式的消息时 - **THEN** 系统应该丢弃该消息 - **AND** 记录错误日志 #### Scenario: 路由有效消息 - **WHEN** 接收到有效格式的心跳消息时 - **THEN** 系统应该将消息路由到正确的处理器 ### Requirement: 心跳消息载荷格式(生产者约束) Kafka 心跳消息 MUST 包含数据库落库所需的必填字段,并采用 UTF-8 编码。 #### Scenario: JSON 心跳消息 - **WHEN** 生产者向主题推送心跳消息 - **THEN** 消息 value 应为 JSON(UTF-8) - **AND** 至少包含 ts_ms、hotel_id、room_id、device_id、ip、power_state、guest_type、cardless_state、service_mask、pms_state、carbon_state、device_count、comm_seq - **AND** 可选包含 extra(json object) ### Requirement: 分区键友好的 Kafka Key 系统 MUST 支持使用 `hotel_id:device_id` 作为 Kafka message key 以获得更好的分区与有序性。 #### Scenario: 缺失 key 仍可处理 - **WHEN** 消息未携带 key - **THEN** 系统仍应能够消费与处理该消息 #### Scenario: 使用 device_id 作为 key - **WHEN** 生产者发送消息 - **THEN** 建议使用 `hotel_id:device_id` 作为 Kafka message key - **AND** 以提升同设备有序性与消费侧批量聚合效率