Files
Web_BLS_Heartbeat_Server/docs/kafka-heartbeat-producer.md
XuJiacheng 910f1c353f feat: 实现Redis集成与Kafka消息处理优化
- 新增Redis集成模块,支持心跳写入与控制台日志队列
- 优化Kafka消费者实现,支持多实例与自动重连
- 改进消息处理器,支持批量处理与多层解码
- 更新数据库表结构,调整字段类型与约束
- 添加Redis与Kafka的配置项和环境变量支持
- 补充测试用例和文档说明
2026-01-14 17:58:45 +08:00

82 lines
3.2 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Kafka 心跳数据推送说明(给数据产生者)
本文档说明数据产生者需要往 Kafka 队列推送的数据结构与推送方式。
## 1. Topic 与编码
- Topic默认 `blwlog4Nodejs-rcu-heartbeat-topic`(以服务端配置为准,见 `src/config/config.js`
- 编码UTF-8
- 建议消息格式JSON便于跨语言对接与灰度演进
> 服务端会以 buffer 方式接收 Kafka message.value并按 UTF-8 解码为 JSON。
## 2. 消息 Key强烈建议
为了保证同设备消息更有序、便于消费端批量聚合:
- Kafka message key`"{hotel_id}:{device_id}"`
## 3. 消息 ValueJSON
### 3.1 必填字段
下面字段必须提供(否则会被判定为无效数据并丢弃/记录错误):
| 字段 | 类型 | 示例 | 说明 |
|---|---|---|---|
| ts_ms | number/int64 | 1700000000123 | 毫秒级 epoch 时间戳 |
| hotel_id | number/int | 12 | 酒店编号int2 范围内) |
| room_id | number/int | 1203 | 房间编号/房间标识int4 |
| device_id | string | "A1B2C3D4" | 设备唯一 ID序列号/MAC/自定义编码) |
| ip | string | "192.168.1.10:8080" | `IP:PORT` 字符串(落库为 varchar(21) |
| power_state | number/int | 1 | 取电状态(枚举值需统一标准) |
| guest_type | number/int | 0 | 住客身份(住客/空房/保洁/维修等,枚举值需统一标准) |
| cardless_state | number/int | 0 | 无卡取电/无卡策略状态(枚举) |
| service_mask | number/int64 | 5 | 服务/场景位图bigint |
| pms_state | number/int | 1 | PMS 状态(枚举) |
| carbon_state | number/int | 0 | 碳控状态(枚举) |
| device_count | number/int | 1 | 设备数量/上报设备数(语义需统一) |
| comm_seq | number/int | 7 | 通讯序号(语义需统一) |
### 3.2 可选字段
| 字段 | 类型 | 示例 | 说明 |
|---|---|---|---|
| extra | object | {"source":"gw","ver":"1.2.3"} | 扩展字段:电参、空调状态、版本、上报来源等 |
## 4. JSON 示例
```json
{
"ts_ms": 1700000000123,
"hotel_id": 12,
"room_id": 1203,
"device_id": "A1B2C3D4",
"ip": "192.168.1.10",
"power_state": 1,
"guest_type": 0,
"cardless_state": 0,
"service_mask": 5,
"pms_state": 1,
"carbon_state": 0,
"device_count": 1,
"comm_seq": 7,
"extra": {
"source": "gw",
"ver": "1.2.3",
"ac": {"mode": 1, "set_temp": 26},
"meter": {"p": 123.4, "e_wh": 5678}
}
}
## 4.1 C#
```csharp
var nas = JsonConvert.SerializeObject(s);
var data = Encoding.UTF8.GetBytes(nas);
// 将 data 作为 Kafka message.value 发送即可
```
```
## 5. 推送方式(实现建议)
- Producer建议开启压缩lz4/zstd、合理的 `batch.size``linger.ms`,以降低单条发送开销
- 分区:按 key 分区(同设备落同分区)
- 语义至少一次at-least-once或恰好一次exactly-once取决于你们链路要求服务端需要配合幂等/去重策略(如后续引入唯一键)
## 6. 与数据库字段的映射
服务端落库目标表:`heartbeat.heartbeat_events`(位于既有数据库中,默认 log_platform
- 必填字段:与表字段同名
- 弹性字段:写入 `extra`jsonb