# Kafka 心跳数据推送说明(给数据产生者) 本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。 ## 1. Topic 与编码 - Topic:默认 `blwlog4Nodejs-rcu-heartbeat-topic`(以服务端配置为准,见 `src/config/config.js`) - 编码:UTF-8 - 建议消息格式:JSON(便于跨语言对接与灰度演进) > 服务端会以 buffer 方式接收 Kafka message.value,并按 UTF-8 解码为 JSON。 ## 2. 消息 Key(强烈建议) 为了保证同设备消息更有序、便于消费端批量聚合: - Kafka message key:`"{hotel_id}:{device_id}"` ## 3. 消息 Value(JSON) ### 3.1 必填字段 下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误): | 字段 | 类型 | 示例 | 说明 | |---|---|---|---| | ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 | | hotel_id | number/int | 12 | 酒店编号(int2 范围内) | | room_id | string/number | "1203" | 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50)) | | device_id | string | "A1B2C3D4" | 设备唯一 ID(序列号/MAC/自定义编码) | | ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21)) | | power_state | number/int | 1 | 取电状态(枚举值需统一标准) | | guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) | | cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) | | service_mask | number/int64 | 5 | 服务/场景位图(bigint) | | pms_state | number/int | 1 | PMS 状态(枚举) | | carbon_state | number/int | 0 | 碳控状态(枚举) | | device_count | number/int | 1 | 设备数量/上报设备数(语义需统一) | | comm_seq | number/int | 7 | 通讯序号(语义需统一) | ### 3.2 可选字段 | 字段 | 类型 | 示例 | 说明 | |---|---|---|---| | extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 | | electricity | array | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) | | air_conditioner | array | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) | ## 4. JSON 示例 ```json { "ts_ms": 1700000000123, "hotel_id": 12, "room_id": 1203, "device_id": "A1B2C3D4", "ip": "192.168.1.10", "power_state": 1, "guest_type": 0, "cardless_state": 0, "service_mask": 5, "pms_state": 1, "carbon_state": 0, "device_count": 1, "comm_seq": 7, "electricity": [ { "address": "add11", "voltage": 3.2, "ampere": 1.1, "power": 704.3, "phase": "A", "energy": 10.5, "sum_energy": 100.5 } ], "air_conditioner": [ { "address": "ac1", "state": 1, "model": 2, "speed": 3, "set_temp": 26, "now_temp": 25, "solenoid_valve": 1 } ], "extra": { "source": "gw", "ver": "1.2.3", "ac": {"mode": 1, "set_temp": 26}, "meter": {"p": 123.4, "e_wh": 5678} } } ## 4.1 C# 生产者示例(与你们当前实现一致) ```csharp var nas = JsonConvert.SerializeObject(s); var data = Encoding.UTF8.GetBytes(nas); // 将 data 作为 Kafka message.value 发送即可 ``` ``` ## 5. 推送方式(实现建议) - Producer:建议开启压缩(lz4/zstd)、合理的 `batch.size` 与 `linger.ms`,以降低单条发送开销 - 分区:按 key 分区(同设备落同分区) - 语义:至少一次(at-least-once)或恰好一次(exactly-once)取决于你们链路要求;服务端需要配合幂等/去重策略(如后续引入唯一键) ## 6. 与数据库字段的映射 服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform) - 必填字段:与表字段同名 - 扩展数组字段: - electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[] - air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[] - 弹性字段:其余未知字段写入 `extra`(jsonb)