Files
Web_BLS_Heartbeat_Server/docs/kafka-heartbeat-producer.md
XuJiacheng ad270bd936 feat(heartbeat): 添加版本号字段并处理亮度值-1为NULL
- 在心跳事件表中新增 version 字段,用于存储版本号信息
- 将 bright_g 字段的 -1 值映射为数据库中的 NULL,避免语义混淆
- 更新相关文档、数据库迁移脚本和测试用例
2026-01-28 17:47:05 +08:00

113 lines
4.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Kafka 心跳数据推送说明(给数据产生者)
本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。
## 1. Topic 与编码
- Topic默认 `blwlog4Nodejs-rcu-heartbeat-topic`(以服务端配置为准,见 `src/config/config.js`
- 编码UTF-8
- 建议消息格式JSON便于跨语言对接与灰度演进
> 服务端会以 buffer 方式接收 Kafka message.value并按 UTF-8 解码为 JSON。
## 2. 消息 Key强烈建议
为了保证同设备消息更有序、便于消费端批量聚合:
- Kafka message key`"{hotel_id}:{device_id}"`
## 3. 消息 ValueJSON
### 3.1 必填字段
下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误):
| 字段 | 类型 | 示例 | 说明 |
|---|---|---|---|
| ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 |
| hotel_id | number/int | 12 | 酒店编号int2 范围内) |
| room_id | string/number | "1203" | 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50) |
| device_id | string | "A1B2C3D4" | 设备唯一 ID序列号/MAC/自定义编码) |
| ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21) |
| power_state | number/int | 1 | 取电状态(枚举值需统一标准) |
| guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) |
| cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) |
| service_mask | number/int64 | 5 | 服务/场景位图bigint |
| pms_state | number/int | 1 | PMS 状态(枚举) |
| carbon_state | number/int | 0 | 碳控状态(枚举) |
| device_count | number/int | 1 | 设备数量/上报设备数(语义需统一) |
| comm_seq | number/int | 7 | 通讯序号(语义需统一) |
### 3.2 可选字段
| 字段 | 类型 | 示例 | 说明 |
|---|---|---|---|
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:原文、版本等其他自定义字段 |
| electricity | array<object> | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) |
| air_conditioner | array<object> | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) |
| insert_card | number/int | 1 | 是否插卡(整数,可为空) |
| bright_g | number/int | 80 | 全局亮度值(整数,可为空;若为 -1 则落库为 null |
| version | number/int | 1 | 版本号int2/short可为空 |
## 4. JSON 示例
```json
{
"ts_ms": 1700000000123,
"hotel_id": 12,
"room_id": 1203,
"device_id": "A1B2C3D4",
"ip": "192.168.1.10",
"power_state": 1,
"guest_type": 0,
"cardless_state": 0,
"service_mask": 5,
"pms_state": 1,
"carbon_state": 0,
"device_count": 1,
"comm_seq": 7,
"insert_card": 1,
"bright_g": 80,
"version": "1.3.0",
"electricity": [
{
"address": "add11",
"voltage": 3.2,
"ampere": 1.1,
"power": 704.3,
"phase": "A",
"energy": 10.5,
"sum_energy": 100.5
}
],
"air_conditioner": [
{
"address": "ac1",
"state": 1,
"model": 2,
"speed": 3,
"set_temp": 26,
"now_temp": 25,
"solenoid_valve": 1
}
],
"extra": {
"ver": "1.2.3",
"original_byte": "0x12345678"
}
}
## 4.1 C#
```csharp
var nas = JsonConvert.SerializeObject(s);
var data = Encoding.UTF8.GetBytes(nas);
// 将 data 作为 Kafka message.value 发送即可
```
```
## 5. 推送方式(实现建议)
- Producer建议开启压缩lz4/zstd、合理的 `batch.size``linger.ms`,以降低单条发送开销
- 分区:按 key 分区(同设备落同分区)
- 语义至少一次at-least-once或恰好一次exactly-once取决于你们链路要求服务端需要配合幂等/去重策略(如后续引入唯一键)
## 6. 与数据库字段的映射
服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform
- 必填字段:与表字段同名
- 扩展数组字段:
- electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
- air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
- 弹性字段:其余未知字段写入 `extra`jsonb