Files
Web_BLS_Heartbeat_Server/docs/kafka-heartbeat-producer.md
XuJiacheng ad270bd936 feat(heartbeat): 添加版本号字段并处理亮度值-1为NULL
- 在心跳事件表中新增 version 字段,用于存储版本号信息
- 将 bright_g 字段的 -1 值映射为数据库中的 NULL,避免语义混淆
- 更新相关文档、数据库迁移脚本和测试用例
2026-01-28 17:47:05 +08:00

4.4 KiB
Raw Permalink Blame History

Kafka 心跳数据推送说明(给数据产生者)

本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。

1. Topic 与编码

  • Topic默认 blwlog4Nodejs-rcu-heartbeat-topic(以服务端配置为准,见 src/config/config.js
  • 编码UTF-8
  • 建议消息格式JSON便于跨语言对接与灰度演进

服务端会以 buffer 方式接收 Kafka message.value并按 UTF-8 解码为 JSON。

2. 消息 Key强烈建议

为了保证同设备消息更有序、便于消费端批量聚合:

  • Kafka message key"{hotel_id}:{device_id}"

3. 消息 ValueJSON

3.1 必填字段

下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误):

字段 类型 示例 说明
ts_ms number/int64 1700000000123 毫秒级 epoch 时间戳
hotel_id number/int 12 酒店编号int2 范围内)
room_id string/number "1203" 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50)
device_id string "A1B2C3D4" 设备唯一 ID序列号/MAC/自定义编码)
ip string "192.168.1.10:8080" IP:PORT 字符串(落库为 varchar(21)
power_state number/int 1 取电状态(枚举值需统一标准)
guest_type number/int 0 住客身份(住客/空房/保洁/维修等,枚举值需统一标准)
cardless_state number/int 0 无卡取电/无卡策略状态(枚举)
service_mask number/int64 5 服务/场景位图bigint
pms_state number/int 1 PMS 状态(枚举)
carbon_state number/int 0 碳控状态(枚举)
device_count number/int 1 设备数量/上报设备数(语义需统一)
comm_seq number/int 7 通讯序号(语义需统一)

3.2 可选字段

字段 类型 示例 说明
extra object {"source":"gw","ver":"1.2.3"} 扩展字段:原文、版本等其他自定义字段
electricity array [{"address":"add11","voltage":3.2,...}] 电力设备数组(按原始顺序拆列落库为数组列)
air_conditioner array [{"address":"ac1","state":1,...}] 空调设备数组(按原始顺序拆列落库为数组列)
insert_card number/int 1 是否插卡(整数,可为空)
bright_g number/int 80 全局亮度值(整数,可为空;若为 -1 则落库为 null
version number/int 1 版本号int2/short可为空

4. JSON 示例

{
  "ts_ms": 1700000000123,
  "hotel_id": 12,
  "room_id": 1203,
  "device_id": "A1B2C3D4",
  "ip": "192.168.1.10",
  "power_state": 1,
  "guest_type": 0,
  "cardless_state": 0,
  "service_mask": 5,
  "pms_state": 1,
  "carbon_state": 0,
  "device_count": 1,
  "comm_seq": 7,
  "insert_card": 1,
  "bright_g": 80,
  "version": "1.3.0",
  "electricity": [
    {
      "address": "add11",
      "voltage": 3.2,
      "ampere": 1.1,
      "power": 704.3,
      "phase": "A",
      "energy": 10.5,
      "sum_energy": 100.5
    }
  ],
  "air_conditioner": [
    {
      "address": "ac1",
      "state": 1,
      "model": 2,
      "speed": 3,
      "set_temp": 26,
      "now_temp": 25,
      "solenoid_valve": 1
    }
  ],
  "extra": {
    "ver": "1.2.3",
    "original_byte": "0x12345678"
  }
}

  ## 4.1 C# 生产者示例(与你们当前实现一致)
  ```csharp
  var nas = JsonConvert.SerializeObject(s);
  var data = Encoding.UTF8.GetBytes(nas);
  // 将 data 作为 Kafka message.value 发送即可

## 5. 推送方式(实现建议)
- Producer建议开启压缩lz4/zstd、合理的 `batch.size` 与 `linger.ms`,以降低单条发送开销
- 分区:按 key 分区(同设备落同分区)
- 语义至少一次at-least-once或恰好一次exactly-once取决于你们链路要求服务端需要配合幂等/去重策略(如后续引入唯一键)

## 6. 与数据库字段的映射
服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform
- 必填字段:与表字段同名
- 扩展数组字段:
  - electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
  - air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
- 弹性字段:其余未知字段写入 `extra`jsonb