添加 Kafka 消费者、数据库写入、Redis 集成等核心模块,实现设备上下线事件处理 - 创建项目基础目录结构与配置文件 - 实现 Kafka 消费逻辑与手动提交偏移量 - 添加 PostgreSQL 数据库连接与分区表管理 - 集成 Redis 用于错误队列和项目心跳 - 包含数据处理逻辑,区分重启与非重启数据 - 提供数据库初始化脚本与分区创建工具 - 添加单元测试与代码校验脚本
45 lines
2.4 KiB
Markdown
45 lines
2.4 KiB
Markdown
1. Kafka 数据结构
|
||
{
|
||
"HotelCode": "1085", // 酒店编码
|
||
"MAC": "00:1A:2B:3C:4D:5E", // MAC地址
|
||
"HostNumber": "091123987456", // 设备编号
|
||
"RoomNumber": "8888房", // 房号
|
||
"EndPoint": "50.2.60.1:6543", // IP:端口
|
||
"CurrentStatus": "on", // 当前状态:枚举值:on/off 如果是重启则为on
|
||
"CurrentTime": "2026-02-02T10:30:00Z" // 发生时间
|
||
"UnixTime": 1770000235000, // Unix时间戳(毫秒级)
|
||
"LauncherVersion": "1.0.0", // 启动器版本,可为空
|
||
"RebootReason": "1" // 枚举值:0x01:软件复位,0x02:上电复位,0x03:外部手动复位,0x04:下电模式唤醒的复位,0x05:看门狗超时复位,0x06:其他复位,NULL:并非重启数据
|
||
}
|
||
2. Kafka主题
|
||
Topic:blwlog4Nodejs-rcu-onoffline-topic
|
||
|
||
3. 数据库结构
|
||
数据库:log_platform
|
||
表:onoffline_record
|
||
字段:
|
||
guid varchar(32) GUID(32 位无连字符 HEX,小写;自动生成)
|
||
ts_ms int8 Unix时间戳 对应kafka:UnixTime (索引)
|
||
write_ts_ms int8 写入时间戳 (写入的时候自动生成)
|
||
hotel_id int2 酒店ID 对应kafka:HotelCode (索引)
|
||
mac varchar(21) MAC地址 对应kafka:MAC
|
||
device_id varchar(64) 设备ID 对应kafka:HostNumber (索引)
|
||
room_id varchar(64) 房间ID 对应kafka:RoomNumber (索引)
|
||
ip varchar(21) IP地址 对应kafka:EndPoint
|
||
current_status varchar(10) 当前状态 对应kafka:CurrentStatus (索引)
|
||
launcher_version varchar(10) 启动器版本 对应kafka:LauncherVersion 可为空
|
||
reboot_reason varchar(10) 重启原因 对应kafka:RebootReason 可为空
|
||
- 主键:(ts_ms, mac, device_id, room_id)
|
||
|
||
|
||
|
||
4. 开发要求:
|
||
从kafka拉取数据,然后处理,再写入数据库。同时,要创建好数据库,表要按照ts_ms按天分区。
|
||
|
||
数据处理特殊说明:
|
||
- 有两种结构的数据:
|
||
- 非重启数据:reboot_reason 为 空 或不存在该字段
|
||
- 当判断为非重启数据时,current_status 为 对应kafka:CurrentStatus
|
||
- 重启数据:reboot_reason 不为 空
|
||
- 当判断为重启数据时,current_status 默认设置为 on
|
||
- 其他数据直接按照kafka数据写入数据库,不做特殊处理(如果为空,数据库字段也为空,就算是数字类型字段,也不要补0)。 |