2026-01-12 19:53:27 +08:00
|
|
|
|
# Kafka 心跳数据推送说明(给数据产生者)
|
|
|
|
|
|
|
|
|
|
|
|
本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。
|
|
|
|
|
|
|
|
|
|
|
|
## 1. Topic 与编码
|
2026-01-14 17:58:45 +08:00
|
|
|
|
- Topic:默认 `blwlog4Nodejs-rcu-heartbeat-topic`(以服务端配置为准,见 `src/config/config.js`)
|
2026-01-12 19:53:27 +08:00
|
|
|
|
- 编码:UTF-8
|
|
|
|
|
|
- 建议消息格式:JSON(便于跨语言对接与灰度演进)
|
|
|
|
|
|
|
2026-01-14 17:58:45 +08:00
|
|
|
|
> 服务端会以 buffer 方式接收 Kafka message.value,并按 UTF-8 解码为 JSON。
|
2026-01-12 19:53:27 +08:00
|
|
|
|
|
|
|
|
|
|
## 2. 消息 Key(强烈建议)
|
|
|
|
|
|
为了保证同设备消息更有序、便于消费端批量聚合:
|
|
|
|
|
|
- Kafka message key:`"{hotel_id}:{device_id}"`
|
|
|
|
|
|
|
|
|
|
|
|
## 3. 消息 Value(JSON)
|
|
|
|
|
|
### 3.1 必填字段
|
|
|
|
|
|
下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误):
|
|
|
|
|
|
|
|
|
|
|
|
| 字段 | 类型 | 示例 | 说明 |
|
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
| ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 |
|
|
|
|
|
|
| hotel_id | number/int | 12 | 酒店编号(int2 范围内) |
|
2026-01-16 14:45:36 +08:00
|
|
|
|
| room_id | string/number | "1203" | 房间编号/房间标识(服务端会归一化为字符串;落库为 varchar(50)) |
|
2026-01-12 19:53:27 +08:00
|
|
|
|
| device_id | string | "A1B2C3D4" | 设备唯一 ID(序列号/MAC/自定义编码) |
|
2026-01-14 17:58:45 +08:00
|
|
|
|
| ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21)) |
|
2026-01-12 19:53:27 +08:00
|
|
|
|
| power_state | number/int | 1 | 取电状态(枚举值需统一标准) |
|
|
|
|
|
|
| guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) |
|
|
|
|
|
|
| cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) |
|
|
|
|
|
|
| service_mask | number/int64 | 5 | 服务/场景位图(bigint) |
|
|
|
|
|
|
| pms_state | number/int | 1 | PMS 状态(枚举) |
|
|
|
|
|
|
| carbon_state | number/int | 0 | 碳控状态(枚举) |
|
|
|
|
|
|
| device_count | number/int | 1 | 设备数量/上报设备数(语义需统一) |
|
|
|
|
|
|
| comm_seq | number/int | 7 | 通讯序号(语义需统一) |
|
|
|
|
|
|
|
|
|
|
|
|
### 3.2 可选字段
|
|
|
|
|
|
| 字段 | 类型 | 示例 | 说明 |
|
|
|
|
|
|
|---|---|---|---|
|
2026-01-17 18:37:44 +08:00
|
|
|
|
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:原文、版本等其他自定义字段 |
|
2026-01-16 14:45:36 +08:00
|
|
|
|
| electricity | array<object> | [{"address":"add11","voltage":3.2,...}] | 电力设备数组(按原始顺序拆列落库为数组列) |
|
|
|
|
|
|
| air_conditioner | array<object> | [{"address":"ac1","state":1,...}] | 空调设备数组(按原始顺序拆列落库为数组列) |
|
2026-01-27 19:49:05 +08:00
|
|
|
|
| insert_card | number/int | 1 | 是否插卡(整数,可为空) |
|
2026-01-28 17:47:05 +08:00
|
|
|
|
| bright_g | number/int | 80 | 全局亮度值(整数,可为空;若为 -1 则落库为 null) |
|
|
|
|
|
|
| version | number/int | 1 | 版本号(int2/short,可为空) |
|
2026-01-12 19:53:27 +08:00
|
|
|
|
|
|
|
|
|
|
## 4. JSON 示例
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
|
|
|
|
|
"ts_ms": 1700000000123,
|
|
|
|
|
|
"hotel_id": 12,
|
|
|
|
|
|
"room_id": 1203,
|
|
|
|
|
|
"device_id": "A1B2C3D4",
|
|
|
|
|
|
"ip": "192.168.1.10",
|
|
|
|
|
|
"power_state": 1,
|
|
|
|
|
|
"guest_type": 0,
|
|
|
|
|
|
"cardless_state": 0,
|
|
|
|
|
|
"service_mask": 5,
|
|
|
|
|
|
"pms_state": 1,
|
|
|
|
|
|
"carbon_state": 0,
|
|
|
|
|
|
"device_count": 1,
|
|
|
|
|
|
"comm_seq": 7,
|
2026-01-27 19:49:05 +08:00
|
|
|
|
"insert_card": 1,
|
|
|
|
|
|
"bright_g": 80,
|
2026-01-28 17:47:05 +08:00
|
|
|
|
"version": "1.3.0",
|
2026-01-16 14:45:36 +08:00
|
|
|
|
"electricity": [
|
|
|
|
|
|
{
|
|
|
|
|
|
"address": "add11",
|
|
|
|
|
|
"voltage": 3.2,
|
|
|
|
|
|
"ampere": 1.1,
|
|
|
|
|
|
"power": 704.3,
|
|
|
|
|
|
"phase": "A",
|
|
|
|
|
|
"energy": 10.5,
|
|
|
|
|
|
"sum_energy": 100.5
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
|
|
|
|
|
"air_conditioner": [
|
|
|
|
|
|
{
|
|
|
|
|
|
"address": "ac1",
|
|
|
|
|
|
"state": 1,
|
|
|
|
|
|
"model": 2,
|
|
|
|
|
|
"speed": 3,
|
|
|
|
|
|
"set_temp": 26,
|
|
|
|
|
|
"now_temp": 25,
|
|
|
|
|
|
"solenoid_valve": 1
|
|
|
|
|
|
}
|
|
|
|
|
|
],
|
2026-01-12 19:53:27 +08:00
|
|
|
|
"extra": {
|
|
|
|
|
|
"ver": "1.2.3",
|
2026-01-17 18:37:44 +08:00
|
|
|
|
"original_byte": "0x12345678"
|
2026-01-12 19:53:27 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-14 17:58:45 +08:00
|
|
|
|
|
|
|
|
|
|
## 4.1 C# 生产者示例(与你们当前实现一致)
|
|
|
|
|
|
```csharp
|
|
|
|
|
|
var nas = JsonConvert.SerializeObject(s);
|
|
|
|
|
|
var data = Encoding.UTF8.GetBytes(nas);
|
|
|
|
|
|
// 将 data 作为 Kafka message.value 发送即可
|
|
|
|
|
|
```
|
2026-01-12 19:53:27 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
## 5. 推送方式(实现建议)
|
|
|
|
|
|
- Producer:建议开启压缩(lz4/zstd)、合理的 `batch.size` 与 `linger.ms`,以降低单条发送开销
|
|
|
|
|
|
- 分区:按 key 分区(同设备落同分区)
|
|
|
|
|
|
- 语义:至少一次(at-least-once)或恰好一次(exactly-once)取决于你们链路要求;服务端需要配合幂等/去重策略(如后续引入唯一键)
|
|
|
|
|
|
|
|
|
|
|
|
## 6. 与数据库字段的映射
|
|
|
|
|
|
服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform)
|
|
|
|
|
|
- 必填字段:与表字段同名
|
2026-01-16 14:45:36 +08:00
|
|
|
|
- 扩展数组字段:
|
|
|
|
|
|
- electricity[] → elec_address[]、voltage[]、ampere[]、power[]、phase[]、energy[]、sum_energy[]
|
|
|
|
|
|
- air_conditioner[] → air_address[]、state[]、model[]、speed[]、set_temp[]、now_temp[]、solenoid_valve[]
|
|
|
|
|
|
- 弹性字段:其余未知字段写入 `extra`(jsonb)
|