添加 Kafka 消费者、数据库写入、Redis 集成等核心模块,实现设备上下线事件处理 - 创建项目基础目录结构与配置文件 - 实现 Kafka 消费逻辑与手动提交偏移量 - 添加 PostgreSQL 数据库连接与分区表管理 - 集成 Redis 用于错误队列和项目心跳 - 包含数据处理逻辑,区分重启与非重启数据 - 提供数据库初始化脚本与分区创建工具 - 添加单元测试与代码校验脚本
2.4 KiB
-
Kafka 数据结构 { "HotelCode": "1085", // 酒店编码 "MAC": "00:1A:2B:3C:4D:5E", // MAC地址 "HostNumber": "091123987456", // 设备编号 "RoomNumber": "8888房", // 房号 "EndPoint": "50.2.60.1:6543", // IP:端口 "CurrentStatus": "on", // 当前状态:枚举值:on/off 如果是重启则为on "CurrentTime": "2026-02-02T10:30:00Z" // 发生时间 "UnixTime": 1770000235000, // Unix时间戳(毫秒级) "LauncherVersion": "1.0.0", // 启动器版本,可为空 "RebootReason": "1" // 枚举值:0x01:软件复位,0x02:上电复位,0x03:外部手动复位,0x04:下电模式唤醒的复位,0x05:看门狗超时复位,0x06:其他复位,NULL:并非重启数据 }
-
Kafka主题 Topic:blwlog4Nodejs-rcu-onoffline-topic
-
数据库结构 数据库:log_platform 表:onoffline_record 字段: guid varchar(32) GUID(32 位无连字符 HEX,小写;自动生成) ts_ms int8 Unix时间戳 对应kafka:UnixTime (索引) write_ts_ms int8 写入时间戳 (写入的时候自动生成) hotel_id int2 酒店ID 对应kafka:HotelCode (索引) mac varchar(21) MAC地址 对应kafka:MAC device_id varchar(64) 设备ID 对应kafka:HostNumber (索引) room_id varchar(64) 房间ID 对应kafka:RoomNumber (索引) ip varchar(21) IP地址 对应kafka:EndPoint current_status varchar(10) 当前状态 对应kafka:CurrentStatus (索引) launcher_version varchar(10) 启动器版本 对应kafka:LauncherVersion 可为空 reboot_reason varchar(10) 重启原因 对应kafka:RebootReason 可为空
- 主键:(ts_ms, mac, device_id, room_id)
-
开发要求: 从kafka拉取数据,然后处理,再写入数据库。同时,要创建好数据库,表要按照ts_ms按天分区。
数据处理特殊说明: - 有两种结构的数据: - 非重启数据:reboot_reason 为 空 或不存在该字段 - 当判断为非重启数据时,current_status 为 对应kafka:CurrentStatus - 重启数据:reboot_reason 不为 空 - 当判断为重启数据时,current_status 默认设置为 on - 其他数据直接按照kafka数据写入数据库,不做特殊处理(如果为空,数据库字段也为空,就算是数字类型字段,也不要补0)。