using System.Text; using Common; using CommonEntity; using CommonTools; using Confluent.Kafka; using MongoDB.Driver; using NLog; using System.Collections.Generic; namespace BLWLogServer.Services { public class KafkaConsume : BackgroundService { public IConfiguration Configuration { get; set; } public KafkaConsume(IConfiguration configuration) { Configuration = configuration; } public static Logger logger = NLog.LogManager.GetCurrentClassLogger(); // 添加两个队列用于存储最近10条TotalCount private static Queue _cpuMaxQueue = new(10); private static Queue _cpuAvgQueue = new(10); private static Queue _cpuMinQueue = new(10); private static Queue _TotalSendPackage = new(10); private static Queue _TotalRecvPackage = new(10); private static Queue _RCUOnLine = new(10); protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew(async () => { var consumers = new List(); for (int i = 0; i < 3; i++) { consumers.Add(Task.Run(() => StartConsumer(stoppingToken))); } await Task.WhenAll(consumers); //await StartConsumer(stoppingToken); }, TaskCreationOptions.LongRunning); } private async Task StartConsumer(CancellationToken stoppingToken) { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; string? mongodbconnectstr = Configuration["Mongodb:Connectstr"]; while (!stoppingToken.IsCancellationRequested) { var config = new ConsumerConfig { GroupId = "blwlogserver-consumer-group", AutoOffsetReset = AutoOffsetReset.Earliest, BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, SaslUsername = user, SaslPassword = pwd }; var c = new ConsumerBuilder(config).Build(); c.Subscribe(KafkaKey.BLWLog_RCU_Topic); var connectionString = mongodbconnectstr; var client = new MongoClient(connectionString); try { while (true) { var cr = c.Consume(stoppingToken); try { var k = cr.Message.Key; var v = cr.Message.Value; if (k.Equals(KafkaKey.UDPPackageKey)) { //var UUU = Encoding.UTF8.GetString(v); //Console.WriteLine($"Consumed message '{k} {v}' at: '{cr.TopicPartitionOffset}'."); var collection = client.GetDatabase("udppackage").GetCollection("totalcount"); UDPPackage UDPPPP = MyMessagePacker.FastDeserialize(v); UDPPackage_db us = new UDPPackage_db(); us.TotalCount = UDPPPP.TotalCount; us.CommandType = UDPPPP.CommandType; us.FenLei = UDPPPP.FenLei; us.ExtraData = UDPPPP.ExtraData; us.RemoveTimeString = UDPPPP.RemoveTime; us.RemoveTime = DateTime.Parse(UDPPPP.RemoveTime); await collection.InsertOneAsync(us); // xu修改20250908 // 修改CPUMax处理逻辑 if (UDPPPP.CommandType == "UDPPackage_CPUMax") { // 维护队列长度不超过10 if (_cpuMaxQueue.Count >= 10) { _cpuMaxQueue.Dequeue(); } _cpuMaxQueue.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _cpuMaxQueue); CSRedisCacheHelper.Set("UDPPackage_CPUMax", arrayString, 10); } if (UDPPPP.CommandType == "UDPPackage_CPUMin") { // 维护队列长度不超过10 if (_cpuMinQueue.Count >= 10) { _cpuMinQueue.Dequeue(); } _cpuMinQueue.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _cpuMinQueue); CSRedisCacheHelper.Set("UDPPackage_CPUMin", arrayString, 10); } // 修改CPUAvg处理逻辑 if (UDPPPP.CommandType == "UDPPackage_CPUAvg") { // 维护队列长度不超过10 if (_cpuAvgQueue.Count >= 10) { _cpuAvgQueue.Dequeue(); } _cpuAvgQueue.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _cpuAvgQueue); CSRedisCacheHelper.Set("UDPPackage_CPUAvg", arrayString, 10); CSRedisCacheHelper.Set("UDPPackage_DetectTime", DateTime.UtcNow.ToString("o"), 10); } if (UDPPPP.CommandType == "UDPPackage_TotalSendPackage") { // 维护队列长度不超过10 if (_TotalSendPackage.Count >= 10) { _TotalSendPackage.Dequeue(); } _TotalSendPackage.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _TotalSendPackage); CSRedisCacheHelper.Set("UDPPackage_TotalSendPackage", arrayString, 10); } if (UDPPPP.CommandType == "UDPPackage_TotalRecvPackage") { // 维护队列长度不超过10 if (_TotalRecvPackage.Count >= 10) { _TotalRecvPackage.Dequeue(); } _TotalRecvPackage.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _TotalRecvPackage); CSRedisCacheHelper.Set("UDPPackage_TotalRecvPackage", arrayString, 10); } if (UDPPPP.CommandType == "RCUOnLine") { // 维护队列长度不超过10 if (_RCUOnLine.Count >= 10) { _RCUOnLine.Dequeue(); } _RCUOnLine.Enqueue(UDPPPP.TotalCount); // 将队列转为逗号分隔字符串 string arrayString = string.Join(",", _RCUOnLine); CSRedisCacheHelper.Set("RCUOnLine", arrayString, 10); } } else if (k.Equals(KafkaKey.UDPPackageStepMonitor)) { var collection = client.GetDatabase("udppackage").GetCollection("zeroe_stepmonitor"); StepInfo UDPPPP = MyMessagePacker.FastDeserialize(v); StepInfo_db us = new StepInfo_db(); us.MessageId = UDPPPP.MessageId; us.Step = UDPPPP.Step; us.StepDescription = UDPPPP.StepDescription; us.Content = UDPPPP.Content; us.TriggerTime = DateTime.Parse(UDPPPP.TriggerTime); us.TriggerTimeString = UDPPPP.TriggerTime; us.EveryMessageId = UDPPPP.EveryMessageId; us.Monitor_0E_01 = UDPPPP.Monitor_0E_01; us.HostNumber = UDPPPP.HostNumber; us.HotelCode = UDPPPP.HotelCode; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.UDPPackagePowerMonitor)) { var collection = client.GetDatabase("udppackage").GetCollection("powermonitor"); NengHao UDPPPP = MyMessagePacker.FastDeserialize(v); NengHao_db us = new NengHao_db(); us.Version = UDPPPP.Version; us.HotelCode = UDPPPP.HotelCode; us.HostNumber = UDPPPP.HostNumber; us.Mac = UDPPPP.Mac; us.EndPoint = UDPPPP.EndPoint; us.V = UDPPPP.V; us.A = UDPPPP.A; us.P = UDPPPP.P; us.Energy_Consumption = UDPPPP.Energy_Consumption; us.Sum_Energy_Consumption = UDPPPP.Sum_Energy_Consumption; us.IsTakeCard = UDPPPP.IsTakeCard; us.CreateTime = UDPPPP.CreateTime; us.ReportTime = DateTime.Parse(UDPPPP.ReportTime); us.RoomNumber = UDPPPP.RoomNumber; us.CarbonVIP = UDPPPP.CarbonVIP; us.AllDeviceData = UDPPPP.AllDeviceData; us.IdentityInfo = UDPPPP.IdentityInfo; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.IotMonitor)) { var collection = client.GetDatabase("udppackage").GetCollection("voiceiotlog"); IOTMonitorData UDPPPP = MyMessagePacker.FastDeserialize(v); IOTMonitorData_db us = new IOTMonitorData_db(); us.Step = UDPPPP.Step; us.TriggerTime = UDPPPP.TriggerTime; us.HotelCode = UDPPPP.HotelCode; us.HotelId = UDPPPP.HotelId; us.HotelName = UDPPPP.HotelName; us.RoomNumber = UDPPPP.RoomNumber; us.RequestId = UDPPPP.RequestId; us.CommandDescription = UDPPPP.CommandDescription; us.Platform = UDPPPP.Platform; us.CreateTime = UDPPPP.CreateTime; us.RemoteIP = UDPPPP.RemoteIP; us.ControlClass = UDPPPP.ControlClass; us.SceneName = UDPPPP.SceneName; us.WhichOneDevice = UDPPPP.WhichOneDevice; await collection.InsertOneAsync(us); } // else if (k.Equals(KafkaKey.RCUOnLineStatus)) { var collection = client.GetDatabase("udppackage").GetCollection("rcustatuslog"); OnOffLineData UDPPPP = MyMessagePacker.FastDeserialize(v); OnOffLineData_db us = new OnOffLineData_db(); us.MAC = UDPPPP.MAC; us.HostNumber = UDPPPP.HostNumber; us.CurrentStatus = UDPPPP.CurrentStatus; us.HotelCode = UDPPPP.HotelCode; us.CurrentTime = UDPPPP.CurrentTime; us.EndPoint = UDPPPP.EndPoint; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.InvokceThirdHttpInterface)) { var collection = client.GetDatabase("udppackage").GetCollection("invokehttpinterfacelog"); Interface3Log UDPPPP = MyMessagePacker.FastDeserialize(v); Interface3Log_db us = new Interface3Log_db(); us.HotelCode = UDPPPP.HotelCode; us.HostNumber = UDPPPP.HostNumber; us.RoomNumber = UDPPPP.RoomNumber; us.TriggerTime = UDPPPP.TriggerTime; us.CommandType = UDPPPP.CommandType; us.ActionData = UDPPPP.ActionData; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.TakeCardStatus)) { var collection = client.GetDatabase("udppackage").GetCollection("takecardlog"); MTakeCardData UDPPPP = MyMessagePacker.FastDeserialize(v); MTakeCardData_db us = new MTakeCardData_db(); us.Status = UDPPPP.Status; us.HostNUMBER = UDPPPP.HostNUMBER; us.LastUpdateTime = UDPPPP.LastUpdateTime; us.HotelCode = UDPPPP.HotelCode; us.Status = UDPPPP.Status; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.ServiceInfoStatus)) { var collection = client.GetDatabase("udppackage").GetCollection("serviceinfolog"); OtherServiceInfo UDPPPP = MyMessagePacker.FastDeserialize(v); OtherServiceInfo_db us = new OtherServiceInfo_db(); us.HotelCode = UDPPPP.HotelCode; us.HostNumber = UDPPPP.HostNumber; us.Address = UDPPPP.Address; us.StatusReceiver = UDPPPP.StatusReceiver; us.LastUpdateTime = UDPPPP.LastUpdateTime; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.PMSLogMonitor)) { //Console.WriteLine("收到了 PMSData"); var collection = client.GetDatabase("udppackage").GetCollection("pmslog"); CheckInYuanShidata UDPPPP = MyMessagePacker.FastDeserialize(v); CheckInYuanShidata_db us = new CheckInYuanShidata_db(); us.Step = UDPPPP.Step; us.IP = UDPPPP.IP; us.RequestId = UDPPPP.RequestId; us.CurrentTime = UDPPPP.CurrentTime; us.CommandType = UDPPPP.CommandType; us.JianJieData = UDPPPP.JianJieData; us.ZhiJieData = UDPPPP.ZhiJieData; await collection.InsertOneAsync(us); } #region 新版协议记录 else if (k.Equals(KafkaKey.RCUNewVersion_Register)) { //P0~P3:子网掩码(4Byte) //P4~P7:网关(4Byte) //P8~P9:RCU端口(2Byte) //P10~P15:Mac地址(6Byte) //P16~P35:软件版本号(20Byte) //P36~P38:配置版本号(3Byte) //P39~P42:DNS服务器1 IP(4Byte) //P43~P58:房型备注(16Byte) //P59~P74:房号备注(16Byte) //P75~P78:房型(4Byte) //P79~P82:房号(4Byte) var collection = client.GetDatabase("udppackage").GetCollection("rcu_hexdata"); NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize(v); //byte[] hexdata = Tools.GetBytesFromString(UDPPPP.HexData); //var 子网掩码 = hexdata[0..4]; //string subnet_mask = String.Join(".", 子网掩码);//子网掩码 //var 网关 = hexdata[4..8]; //string gateway = string.Join(".", 网关); //var RCU端口 = hexdata[8..10]; //int lan_port = BitConverter.ToInt32(RCU端口);//局域网端口 //var MAC = hexdata[10..16]; //var 软件版本号 = hexdata[16..36]; //var 配置版本 = hexdata[36..39]; //var DNS服务器 = hexdata[39..43]; //var 房型备注 = hexdata[43..59]; //var 房号备注 = hexdata[59..75]; //var 房型 = hexdata[75..79]; //var 房号 = hexdata[79..83]; //int ipType = reader.ReadByte();//IP类型 //string type_number = Encoding.GetEncoding("GBK").GetString(reader.ReadBytes(16)).Replace(@"\u0000", "").Replace("", "").Trim();//机型编码 //string lan_ip = String.Join(".", reader.ReadBytes(4));//局域网IP //string server_ip = String.Join(".", reader.ReadBytes(4));//服务器IP //string subnet_mask = String.Join(".", reader.ReadBytes(4));//子网掩码 //string gateway = String.Join(".", reader.ReadBytes(4));//网关 //int lan_port = BitConverter.ToInt32(reader.ReadBytes(4), 0);//局域网端口 //string dns = String.Join(".", reader.ReadBytes(4));//DNS //string software_version = Encoding.GetEncoding("GBK").GetString(reader.ReadBytes(20)).Replace(@"\u0000", "").Replace("", "").Trim();//软件版本号 NewVersionHexData_db cv = new NewVersionHexData_db(); cv.HotelCode = UDPPPP.HotelCode; cv.HostNumber = UDPPPP.HostNumber; cv.RoomNumber = UDPPPP.RoomNumber; cv.CurrentTime = UDPPPP.CurrentTime; cv.RemoteEndPoint = UDPPPP.RemoteEndPoint; cv.CmdType = UDPPPP.CmdType; cv.HexData = UDPPPP.HexData; cv.CurrentTime = UDPPPP.CurrentTime; await collection.InsertOneAsync(cv); } else if (k.Equals(KafkaKey.RCUNewVersion_RestartReason)) { var collection = client.GetDatabase("udppackage").GetCollection("rcu_hexdata"); NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize(v); NewVersionHexData_db cv = new NewVersionHexData_db(); cv.HotelCode = UDPPPP.HotelCode; cv.HostNumber = UDPPPP.HostNumber; cv.RoomNumber = UDPPPP.RoomNumber; cv.CurrentTime = UDPPPP.CurrentTime; cv.RemoteEndPoint = UDPPPP.RemoteEndPoint; cv.CmdType = UDPPPP.CmdType; cv.HexData = UDPPPP.HexData; cv.CurrentTime = UDPPPP.CurrentTime; await collection.InsertOneAsync(cv); } else if (k.Equals(KafkaKey.RCUNewVersion_0E)) { var collection = client.GetDatabase("udppackage").GetCollection("rcu_hexdata"); NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize(v); int length = 30 - 15 - 2; using MemoryStream stream = new MemoryStream(null, 15, length); NewVersionHexData_db cv = new NewVersionHexData_db(); cv.HotelCode = UDPPPP.HotelCode; cv.HostNumber = UDPPPP.HostNumber; cv.RoomNumber = UDPPPP.RoomNumber; cv.CurrentTime = UDPPPP.CurrentTime; cv.RemoteEndPoint = UDPPPP.RemoteEndPoint; cv.CmdType = UDPPPP.CmdType; cv.HexData = UDPPPP.HexData; cv.CurrentTime = UDPPPP.CurrentTime; await collection.InsertOneAsync(cv); } else if (k.Equals(KafkaKey.RCUNewVersion_TakeCard)) { var collection = client.GetDatabase("udppackage").GetCollection("rcu_hexdata"); NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize(v); NewVersionHexData_db cv = new NewVersionHexData_db(); cv.HotelCode = UDPPPP.HotelCode; cv.HostNumber = UDPPPP.HostNumber; cv.RoomNumber = UDPPPP.RoomNumber; cv.CurrentTime = UDPPPP.CurrentTime; cv.RemoteEndPoint = UDPPPP.RemoteEndPoint; cv.CmdType = UDPPPP.CmdType; cv.HexData = UDPPPP.HexData; cv.CurrentTime = UDPPPP.CurrentTime; await collection.InsertOneAsync(cv); } else if (k.Equals(KafkaKey.RCUNewTimerData)) { } else { } #endregion //Console.WriteLine($"消费者1收到消息: [分区{cr.Partition}] " + // $"偏移量{cr.Offset}"); //默认是开着的 //c.Commit(cr); } catch (ConsumeException e) { logger.Error("出错:" + e.Message); Console.WriteLine(111111111111111); } catch (Exception ex) { logger.Error("Ex出错:" + ex.Message); Console.WriteLine(22222222222222); } } } catch (OperationCanceledException) { logger.Error("操作出错"); Console.WriteLine("操作错误"); } catch (Exception ex) { Console.WriteLine("未知错误" + ex.Message); logger.Error("未知错误" + ex.Message); } } } } }