using BLWData.Entity; using BLWLogProduce.Models; using Common; using CommonEntity; using CommonTools; using Confluent.Kafka; using Google.Protobuf; using MessagePack; using Microsoft.CodeAnalysis.Host.Mef; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.ObjectPool; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; using RestSharp; using System.Diagnostics.Metrics; using System.Runtime.Intrinsics.X86; using System.Text; using System.Text.Json; using System.Threading.Channels; using static CSRedis.CSRedisClient; namespace BLWLogProduce.Services { public class KafkaProduce : BackgroundService { public IConfiguration Configuration { get; set; } public IMemoryCache _Cache { get; set; } public KafkaProduce(IConfiguration configuration, IMemoryCache cache) { this.Configuration = configuration; _Cache = cache; } public static Logger logger = LogManager.GetCurrentClassLogger(); // 消费者方法 protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew((state) => { try { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; var config = new ProducerConfig { BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, SaslUsername = user, SaslPassword = pwd }; var p = new ProducerBuilder(config).Build(); //var p = new ProducerBuilder(config).Build(); ///udp 采集数据 var wireshark = ("wireshark-cap-udp-totalcount", new Action(async (args) => { try { WaiBuJianKong.data.WireShark_UDPCapCount = args.Body; } catch (Exception) { } })); ///webapidata 采集数据 var webapidata_fenxi = ("webapidata_consumer", new Action(async (args) => { try { string data = args.Body; var qs = JsonConvert.DeserializeObject>(data); // 使用 TryGetValue 获取值,不存在则使用0 qs.TryGetValue("GetRoomAirList", out long l1); qs.TryGetValue("SetRCUAir", out long l2); qs.TryGetValue("GetRoomSceneList", out long l3); qs.TryGetValue("SetRCUScene", out long l4); qs.TryGetValue("GetRoomLightList", out long l5); qs.TryGetValue("SetRCULight", out long l6); qs.TryGetValue("GetRoomCurtainList", out long l7); qs.TryGetValue("SetRCUCurtain", out long l8); qs.TryGetValue("GetRoomServiceList", out long l9); qs.TryGetValue("SetRCUService", out long l10); qs.TryGetValue("GetOperationLog", out long l11); qs.TryGetValue("GetRoomMusicList", out long l12); qs.TryGetValue("SetRCUMusic", out long l13); qs.TryGetValue("GetRCUStatus", out long l14); qs.TryGetValue("GetAirDetectList", out long l15); qs.TryGetValue("GetHostFaultList", out long l16); qs.TryGetValue("GetRoomTypeAndModalsList", out long l17); qs.TryGetValue("GetRCUInfoForPDU", out long l18); qs.TryGetValue("GetSessionKey", out long l19); qs.TryGetValue("GetPhoneNumber", out long l20); qs.TryGetValue("GetHotelInfoForWX", out long l21); WaiBuJianKong.data.GetRoomAirList = l1; WaiBuJianKong.data.SetRCUAir = l2; WaiBuJianKong.data.GetRoomSceneList = l3; WaiBuJianKong.data.SetRCUScene = l4; WaiBuJianKong.data.GetRoomLightList = l5; WaiBuJianKong.data.SetRCULight = l6; WaiBuJianKong.data.GetRoomCurtainList = l7; WaiBuJianKong.data.SetRCUCurtain = l8; WaiBuJianKong.data.GetRoomServiceList = l9; WaiBuJianKong.data.SetRCUService = l10; WaiBuJianKong.data.GetOperationLog = l11; WaiBuJianKong.data.GetRoomMusicList = l12; WaiBuJianKong.data.SetRCUMusic = l13; WaiBuJianKong.data.GetRCUStatus = l14; WaiBuJianKong.data.GetAirDetectList = l15; WaiBuJianKong.data.GetHostFaultList = l16; WaiBuJianKong.data.GetRoomTypeAndModalsList = l17; WaiBuJianKong.data.GetRCUInfoForPDU = l18; WaiBuJianKong.data.GetSessionKey = l19; WaiBuJianKong.data.GetPhoneNumber = l20; WaiBuJianKong.data.GetHotelInfoForWX = l21; } catch (Exception) { } })); ///udp 采集数据 var xiaofei_fenxi = ("udp_package_consumer", new Action(async (args) => { try { string data = args.Body; string[] aaa = data.Split("#"); if (aaa.Length != 2) { return; } string key = aaa[0]; string valuedata = aaa[1]; if (key.Equals("task1")) { var ts = WaiBuJianKong.data.Process_TH1; int? count = ts + 1; WaiBuJianKong.data.Process_TH1 = count; } else if (key.Equals("task2")) { var ts = WaiBuJianKong.data.Process_TH2; int? count = ts + 1; WaiBuJianKong.data.Process_TH2 = count; } else if (key.Equals("task3")) { var ts = WaiBuJianKong.data.Process_TH3; int? count = ts + 1; WaiBuJianKong.data.Process_TH3 = count; } else if (key.Equals("task4")) { var ts = WaiBuJianKong.data.Process_TH4; int? count = ts + 1; WaiBuJianKong.data.Process_TH4 = count; } else if (key.Equals("task5")) { var ts = WaiBuJianKong.data.Process_TH5; int? count = ts + 1; WaiBuJianKong.data.Process_TH5 = count; } else { } } catch (Exception) { } })); ///udp 采集数据 var DingYue = ("redis-udppackage", new Action(async (args) => { try { string body = args.Body; //Console.WriteLine("UDPData:"+body); UDPPackage? usa = System.Text.Json.JsonSerializer.Deserialize(body); if (usa != null) { string countsa = usa.TotalCount.ToString(); if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TotalRecvPackage)) { WaiBuJianKong.data.UDP_Upload = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TotalSendPackage)) { WaiBuJianKong.data.UDP_Sent = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.RCU_Online)) { WaiBuJianKong.data.RCU_Online = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.TakeCardIn)) { WaiBuJianKong.data.RCU_TakeCard = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_Heart)) { WaiBuJianKong.data.RCU_Heart = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_SearchHost)) { WaiBuJianKong.data.RCU_SearchHost = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_Intercept)) { WaiBuJianKong.data.Intercept = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.在线用户)) { WaiBuJianKong.data.WebUser = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_能耗)) { WaiBuJianKong.data.Energy = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_StatusPass)) { WaiBuJianKong.data.RCU_OE = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TCL电视Discovery)) { WaiBuJianKong.data.TCL_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TCL电视Control)) { WaiBuJianKong.data.TCL_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_小度音箱Discovery)) { WaiBuJianKong.data.XD_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_小度音箱Control)) { WaiBuJianKong.data.XD_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TianMaoQueryAll)) { WaiBuJianKong.data.TM_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TianMaoCONTROL)) { WaiBuJianKong.data.TM_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.TotalErrorPackageReceiveCount)) { WaiBuJianKong.data.TotalErrorPackageReceiveCount = countsa; } } var QQQ = usa.ExtraData; if (QQQ != null && QQQ.Count > 0) { foreach (var item in QQQ) { string Location = ""; if (_Cache.TryGetValue(item.Key, out Location)) { } else { Location = GetBaiduIp(item.Key); _Cache.Set(item.Key, Location, DateTimeOffset.Now.AddHours(1)); } QQQ[item.Key] = Location; } } byte[] bytes = MyMessagePacker.FastSerialize(usa); //byte[] bytes = MessagePackSerializer.Serialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageKey; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); //Console.WriteLine("产生了数据:"+body); //var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = Encoding.UTF8.GetBytes(body) }); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine("redis-udppackage"); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var DingYue1 = ("redis-roomstatus-monitor", new Action(async (args) => { string body = args.Body; StepInfo? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageStepMonitor; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); //await _messageChannel.Writer.WriteAsync(""); })); //旁路系统 var DingYue2 = ("redis-forksystemdata", new Action(async (args) => { string body = args.Body; ForkSystem? forkSystem = System.Text.Json.JsonSerializer.Deserialize(body); byte[] qf = MyMessagePacker.FastSerialize(forkSystem); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageForkSystemKey; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); //await Task.Factory.StartNew(async () => // { // await ConsumeMessagesAsync(); // }, TaskCreationOptions.LongRunning); })); ///能耗 var DingYue3 = ("redis-power", new Action(async (args) => { string body = args.Body; try { NengHao? poo = System.Text.Json.JsonSerializer.Deserialize(body); if (string.IsNullOrEmpty(poo.HostNumber)) { return; } if (poo.NengHaoList == null || poo.NengHaoList.Count == 0) { return; } //logger.Error("能耗:" + body); //string str= Newtonsoft.Json.JsonConvert.SerializeObject(poo); //Console.WriteLine("收到了"+str); //byte[] qf = MyMessagePacker.FastSerialize(poo); //string TopicKey = KafkaKey.BLWLog_RCU_Topic; //string DetailKey = KafkaKey.UDPPackagePowerMonitor; //await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); #region 宝镜系统使用的能耗数据结构 List la = new List(); foreach (var item in poo.AllDeviceData) { DeviceData dd = new DeviceData(); dd.HostID = item.HostID; dd.DeviceType = item.DeviceType; dd.Address = item.Address; dd.Brightness = item.Brightness; dd.Status = item.Status; dd.CurrentTemp = item.CurrentTemp; dd.SettingTemp = item.SettingTemp; dd.Mode = item.Mode; dd.FanSpeed = item.FanSpeed; dd.Valve = item.Valve; la.Add(dd); } List nenghaolist = new List(); if (poo.NengHaoList != null && poo.NengHaoList.Count > 0) { foreach (var item in poo.NengHaoList) { SinglePowerChannelData gs1 = new SinglePowerChannelData(); gs1.Address = item.address; gs1.Dianliu = item.dianliu; gs1.Dianya = item.dianya; gs1.Gonglv = item.gonglv; gs1.Nenghao = item.nenghao; gs1.Zongnenghao = item.zongnenghao; nenghaolist.Add(gs1); } } //宝镜系统使用 EnergyConsumption ese = new EnergyConsumption(); ese.HotelCode = poo.HotelCode; ese.HostNumber = poo.HostNumber; ese.Mac = poo.Mac; ese.EndPoint = poo.EndPoint; ese.PowerChannelList.AddRange(nenghaolist); //ese.V = poo.V; //ese.A = poo.A; //ese.P = poo.P; //ese.EnergyConsumption_ = poo.Energy_Consumption; //ese.SumEnergyConsumption = poo.Sum_Energy_Consumption; ese.CreateTime = poo.CreateTime; ese.RoomNumber = poo.RoomNumber ?? ""; ese.IsTakeCard = poo.IsTakeCard; ese.IsInsertCard = poo.IsInsertCard; ese.CarbonVIP = poo.CarbonVIP; ese.DeviceStatusList.AddRange(la); ese.IdentityInfo = poo.IdentityInfo; ese.CardEvent = poo.CardEvent; ese.PMSStatus = poo.PMS_Status; ese.BrightG = poo.Bright_G; byte[] data = ese.ToByteArray(); string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey1 = KafkaKey.UDPPackagePowerMonitor; if (poo.HotelCode == 1085) { logger.Error("能耗数据:" + body); } await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); #endregion } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var DingYue3_1 = ("redis-action-data", new Action(async (args) => { string body = args.Body; try { NewRoomtusPush? poo = System.Text.Json.JsonSerializer.Deserialize(body); //byte[] qf = MyMessagePacker.FastSerialize(poo); //string TopicKey = KafkaKey.BLWLog_RCU_Topic; //string DetailKey = KafkaKey.UDPPackagePowerMonitor; //await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); //宝镜系统使用 NewActionChangeDataPush ese = new NewActionChangeDataPush(); ese.Code = poo.code; ese.RoomNumber = poo.roomNumber; ese.HostNumber = poo.hostnumber; ese.Address = poo.address; ese.Brightness = poo.brightness; ese.Status = poo.status; ese.Name = poo.name; ese.CurrentTemp = poo.currentTemp; ese.SettingTemp = poo.settingTemp; ese.FanSpeed = poo.fanSpeed; ese.Mode = poo.mode; ese.Valve = poo.valve; ese.Createtime = poo.createtime; byte[] data = ese.ToByteArray(); string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey1 = KafkaKey.UDPPackage_ActionData; await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var TSLog_DingYue = ("redis-tslog", new Action(async (args) => { string body = args.Body; try { NewVersionLog? poo = System.Text.Json.JsonSerializer.Deserialize(body); poo.ts_ms = Tools.GetUnixTime_MS(); if (string.IsNullOrEmpty(poo.hotel_id) || string.IsNullOrEmpty(poo.device_id)) { return; } string TopicKey1 = KafkaKey.BLWLog4NodeJs_RCU_Topic; string DetailKey1 = poo.comm_seq.ToString(); var jsonstr = JsonConvert.SerializeObject(poo); await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = Encoding.UTF8.GetBytes(jsonstr) }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var TSLog_DingYue_0X36 = ("redis-0X36-0X0F", new Action(async (args) => { string body = args.Body; try { DeviceActionData? poo = System.Text.Json.JsonSerializer.Deserialize(body); poo.ts_ms = Tools.GetUnixTime_MS(); if (string.IsNullOrEmpty(poo.hotel_id) || string.IsNullOrEmpty(poo.device_id)) { return; } if (poo.cmd_word.Equals("0E")&&poo.fault_list.Count==0&&poo.device_list.Count==0) { return; } string TopicKey1 = KafkaKey.BLWLog4NodeJs_RCU_Action_Topic; string DetailKey1 = poo.frame_id.ToString(); var jsonstr = JsonConvert.SerializeObject(poo); await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = Encoding.UTF8.GetBytes(jsonstr) }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #region 碳达人 //var DingYue3_1 = ("redis-carbon_trigger", new Action(async (args) => //{ // string body = args.Body; // NengHao? poo = System.Text.Json.JsonSerializer.Deserialize(body); // //poo.ReportTime = DateTime.Now; // byte[] qf = MyMessagePacker.FastSerialize(poo); // string TopicKey = KafkaKey.BLWLog_RCU_Topic; // string DetailKey = KafkaKey.UDPPackagePowerMonitor; // await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); // //宝镜系统使用 // EnergyConsumption ese = new EnergyConsumption(); // ese.HotelCode = poo.HotelCode; // ese.HostNumber = poo.HostNumber; // ese.Mac = poo.Mac; // ese.EndPoint = poo.EndPoint; // ese.V = poo.V; // ese.A = poo.A; // ese.P = poo.P; // ese.KWH = poo.KW_H; // ese.SumKWH = poo.Sum_KW_H; // ese.CreateTime = poo.CreateTime; // ese.RoomNumber = poo.RoomNumber; // ese.IsTakeCard = poo.IsTakeCard; // ese.CarbonVIP = poo.CarbonVIP; // byte[] data = ese.ToByteArray(); // string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; // string DetailKey1 = KafkaKey.UDPPackageWholeDataMonitor; // await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); //})); #endregion //给宝镜推送数据 ,本意是推送原始的二进制,没有解析过的数据 var DingYue10 = ("redis-baojing-powerdata", new Action(async (args) => { string body = args.Body; try { if (!string.IsNullOrEmpty(body)) { byte[] hexdata = Tools.GetBytesFromString(body); string TopicKey = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey = KafkaKey.InvokceThirdHttpInterface; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = hexdata }); } } catch (Exception ex) { Console.WriteLine("宝镜错误数为:" + body); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //小度 var DingYue5 = ("redis-iotpackage", new Action(async (args) => { try { string body = args.Body; IOTMonitorData? usa = System.Text.Json.JsonSerializer.Deserialize(body); //usa.CreateTime = DateTime.Now; byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.IotMonitor; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //在线离线状态 //这里只会是上线,离线在过期事件中处理 //宝镜系统使用的 var DingYue6 = ("redis-on_off_line", new Action(async (args) => { try { MyPublishRedis.redis6.Set("ChangLiang", "1", 3600); //设置过期时间1小时,离线事件靠过期事件触发 string body = args.Body; CommonEntity.OnOffLineData? usa = System.Text.Json.JsonSerializer.Deserialize(body); //poo.ts_ms = Tools.GetUnixTime_MS(); usa.UnixTime = Tools.GetUnixTime_MS(); if (string.IsNullOrEmpty(usa.EndPoint)) { logger.Error("RCUOnOffLine:" + body); return; } if (string.IsNullOrEmpty(usa.HostNumber)) { logger.Error("RCUOnOffLine:" + body); return; } string Key1 = KafkaKey.BLWLog4NodeJs_RCU_OnOffLine_Topic; string Key2 = usa.HostNumber; byte[] nnn = Encoding.UTF8.GetBytes(body); var dr = await p.ProduceAsync(Key1, new Message { Key = Key2, Value = nnn }); BLWData.Entity.OnOffLineData ese = new BLWData.Entity.OnOffLineData(); ese.HotelCode = usa.HotelCode; ese.HostNumber = usa.HostNumber; ese.RoomNumber = usa.RoomNumber ?? ""; ese.CurrentStatus = usa.CurrentStatus; ese.EndPoint = usa.EndPoint ?? ""; ese.Mac = usa.MAC ?? ""; ese.CurrentTime = usa.CurrentTime.ToString("yyyy-MM-dd HH:mm:ss.fff"); byte[] data = ese.ToByteArray(); string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey1 = KafkaKey.RCUOnLineStatus; //var dr = await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); } catch (Exception ex) { Console.WriteLine("aaaaaaaaaaaaaaaaaaaaaa"); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //取电状态 var DingYue7 = ("redis-takecard_change", new Action(async (args) => { try { string body = args.Body; MTakeCardData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.TakeCardStatus; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //服务状态变化 var DingYue8 = ("redis-serviceinfo_change", new Action(async (args) => { try { string body = args.Body; OtherServiceInfo? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.ServiceInfoStatus; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //第三方接口调用日志 var DingYue9 = ("redis-invoke_httpinterface", new Action(async (args) => { try { string body = args.Body; Interface3Log? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.InvokceThirdHttpInterface; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #region 新版本协议 //新版主机数据重启原因 var DingYue11 = ("redis-rcu-restart", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); RestartIOTData rrr = new RestartIOTData() { HotelCode = usa.HotelCode, HostNumber = usa.HostNumber, MAC = usa.MAC, RoomNumber = usa.RoomNumber, EndPoint = usa.RemoteEndPoint, CurrentStatus = usa.CurrentStatus, CurrentTime = usa.CurrentTime, UnixTime = Tools.GetUnixTime_MS(), LauncherVersion= usa.LauncherVersion, RebootReason= usa.RebootReason }; byte[] bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(rrr)); //byte[] bytes = MyMessagePacker.FastSerialize(rrr); string TopicKey = KafkaKey.BLWLog4NodeJs_RCU_OnOffLine_Topic; string DetailKey = usa.HotelCode; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //取电状态 var DingYue12 = ("redis-rcu-card_action", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_TakeCard; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //设备动作状态 var DingYue13 = ("redis-rcu-hexdata", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_0E; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //注册 var DingYue14 = ("redis-rcu-registerdata", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_Register; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //定时上报 var DingYue15 = ("redis-rcu-timer_data", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewTimerData; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #endregion //专门给选住的 var DingYue20 = ("Redis-XuanZhuKafka", new Action(async (args) => { try { string body = args.Body; XuanZhuRequest? poo = System.Text.Json.JsonSerializer.Deserialize(body); //string ti = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); DeviceActionChangeDataPush ese = new DeviceActionChangeDataPush(); ese.Code = poo.code; ese.RoomNumber = poo.roomNumber; ese.HostNumber = ""; ese.Devicetype = poo.devicetype; ese.Address = poo.address; ese.Brightness = poo.brightness; ese.Status = poo.status; ese.Name = poo.name; ese.CurrentTemp = poo.currentTemp; ese.SettingTemp = poo.settingTemp; ese.FanSpeed = poo.fanSpeed; ese.Mode = poo.mode; ese.Valve = poo.valve; ese.Createtime = Tools.GetUnixTime(); byte[] data = ese.ToByteArray(); string TopicKey = KafkaKey.BLWLog4XuanZhu_RCU_Topic; string DetailKey = KafkaKey.UDPPackageDeviceStatus; string logKey = KafkaKey.BLWLog_RCU_Topic; await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = data }); await p.ProduceAsync(logKey, new Message { Key = DetailKey, Value = data }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); CSRedisCacheHelper.redis3.Subscribe(webapidata_fenxi); CSRedisCacheHelper.redis3.Subscribe(wireshark); CSRedisCacheHelper.redis3.Subscribe(xiaofei_fenxi); //这两个是统计 CSRedisCacheHelper.redis3.Subscribe(DingYue); CSRedisCacheHelper.redis3.Subscribe(DingYue1); //旁路 //CSRedisCacheHelper.redis3.Subscribe(DingYue2); CSRedisCacheHelper.redis3.Subscribe(DingYue3); CSRedisCacheHelper.redis3.Subscribe(DingYue5); CSRedisCacheHelper.redis3.Subscribe(DingYue6); CSRedisCacheHelper.redis3.Subscribe(DingYue7); CSRedisCacheHelper.redis3.Subscribe(DingYue8); //CSRedisCacheHelper.redis3.Subscribe(DingYue9); // 很抱歉打扰你了 //CSRedisCacheHelper.redis3.Subscribe(DingYue10); // 创建无界Channel //_messageChannel = Channel.CreateUnbounded(); //CSRedisCacheHelper.redis3.Subscribe(DingYue11); //CSRedisCacheHelper.redis3.Subscribe(DingYue12); //CSRedisCacheHelper.redis3.Subscribe(DingYue13); //CSRedisCacheHelper.redis3.Subscribe(DingYue14); //CSRedisCacheHelper.redis3.Subscribe(DingYue15); //CSRedisCacheHelper.redis3.Subscribe(DingYue3_1); CSRedisCacheHelper.redis3.Subscribe(DingYue20); //新版本日志 CSRedisCacheHelper.redis3.Subscribe(TSLog_DingYue); CSRedisCacheHelper.redis3.Subscribe(TSLog_DingYue_0X36); CSRedisCacheHelper.redis3.Subscribe(DingYue11); } catch (Exception ex) { logger.Error(ex.Message); logger.Error(ex.StackTrace); } }, TaskCreationOptions.LongRunning, stoppingToken); } /// /// 百度api /// /// public static string GetBaiduIp(string ip) { string location = ""; try { string url = $"https://sp0.baidu.com"; //WebClient client = new WebClient(); RestSharp.RestClient client1 = new RestSharp.RestClient(url); RestSharp.RestRequest request = new RestSharp.RestRequest($"/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8", Method.Get); var buffer = client1.DownloadData(request); //var buffer = client.DownloadData(url); string jsonText = Encoding.UTF8.GetString(buffer); JObject jo = JObject.Parse(jsonText); Root root = JsonConvert.DeserializeObject(jo.ToString()); foreach (var item in root.data) { location = item.location; } return location; } catch (Exception ex) { //Console.WriteLine(ex); return location; } } private static string NewMethod(string Key, string ti, double data) { UDPPackage v = new UDPPackage(); v.CommandType = Key; long CPU_MAX = (long)Math.Round(data); v.TotalCount = CPU_MAX; v.RemoveTime = ti; string f1 = System.Text.Json.JsonSerializer.Serialize(v); return f1; } public static async Task ConsumeMessagesAsync() { while (true) { //if (_messageChannel.Reader.TryRead(out var nnn)) //{ } } } } }