using System.Diagnostics.Metrics; using System.Text; using System.Text.Json; using System.Threading.Channels; using BLWData.Entity; using BLWLogProduce.Models; using Common; using CommonEntity; using CommonTools; using Confluent.Kafka; using Google.Protobuf; using MessagePack; using Microsoft.Extensions.Caching.Memory; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; using RestSharp; using static CSRedis.CSRedisClient; namespace BLWLogProduce.Services { public class KafkaProduce : BackgroundService { public IConfiguration Configuration { get; set; } public IMemoryCache _Cache { get; set; } public KafkaProduce(IConfiguration configuration, IMemoryCache cache) { this.Configuration = configuration; _Cache = cache; } public static Logger logger = LogManager.GetCurrentClassLogger(); // 消费者方法 protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew((state) => { try { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; var config = new ProducerConfig { BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, SaslUsername = user, SaslPassword = pwd }; var p = new ProducerBuilder(config).Build(); //var p = new ProducerBuilder(config).Build(); ///udp 采集数据 var wireshark = ("wireshark-cap-udp-totalcount", new Action(async (args) => { try { WaiBuJianKong.data.WireShark_UDPCapCount = args.Body; } catch (Exception) { } })); ///udp 采集数据 var xiaofei_fenxi = ("udp_package_consumer", new Action(async (args) => { try { string data = args.Body; string[] aaa = data.Split("#"); if (aaa.Length != 2) { return; } string key = aaa[0]; string valuedata = aaa[1]; if (key.Equals("task1")) { var ts = WaiBuJianKong.data.Process_TH1; int? count = ts + 1; WaiBuJianKong.data.Process_TH1 = count; } else if (key.Equals("task2")) { var ts = WaiBuJianKong.data.Process_TH2; int? count = ts + 1; WaiBuJianKong.data.Process_TH2 = count; } else if (key.Equals("task3")) { var ts = WaiBuJianKong.data.Process_TH3; int? count = ts + 1; WaiBuJianKong.data.Process_TH3 = count; } else if (key.Equals("task4")) { var ts = WaiBuJianKong.data.Process_TH4; int? count = ts + 1; WaiBuJianKong.data.Process_TH4= count; } else if (key.Equals("task5")) { var ts = WaiBuJianKong.data.Process_TH5; int? count = ts + 1; WaiBuJianKong.data.Process_TH5 = count; } else { } } catch (Exception) { } })); ///udp 采集数据 var DingYue = ("redis-udppackage", new Action(async (args) => { try { string body = args.Body; //Console.WriteLine("UDPData:"+body); UDPPackage? usa = System.Text.Json.JsonSerializer.Deserialize(body); if (usa != null) { string countsa = usa.TotalCount.ToString(); if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TotalRecvPackage)) { WaiBuJianKong.data.UDP_Upload = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TotalSendPackage)) { WaiBuJianKong.data.UDP_Sent = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.RCU_Online)) { WaiBuJianKong.data.RCU_Online = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.TakeCardIn)) { WaiBuJianKong.data.RCU_TakeCard = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_Heart)) { WaiBuJianKong.data.RCU_Heart = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_SearchHost)) { WaiBuJianKong.data.RCU_SearchHost = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_Intercept)) { WaiBuJianKong.data.Intercept = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.在线用户)) { WaiBuJianKong.data.WebUser = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_能耗)) { WaiBuJianKong.data.Energy = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_StatusPass)) { WaiBuJianKong.data.RCU_OE = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TCL电视Discovery)) { WaiBuJianKong.data.TCL_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TCL电视Control)) { WaiBuJianKong.data.TCL_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_小度音箱Discovery)) { WaiBuJianKong.data.XD_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_小度音箱Control)) { WaiBuJianKong.data.XD_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TianMaoQueryAll)) { WaiBuJianKong.data.TM_Ctr_D = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.UDPPackage_TianMaoCONTROL)) { WaiBuJianKong.data.TM_Ctr_C = countsa; } else if (usa.CommandType.Equals(WaiBuJianKong.TotalErrorPackageReceiveCount)) { WaiBuJianKong.data.TotalErrorPackageReceiveCount = countsa; } } var QQQ = usa.ExtraData; if (QQQ != null && QQQ.Count > 0) { foreach (var item in QQQ) { string Location = ""; if (_Cache.TryGetValue(item.Key, out Location)) { } else { Location = GetBaiduIp(item.Key); _Cache.Set(item.Key, Location, DateTimeOffset.Now.AddHours(1)); } QQQ[item.Key] = Location; } } byte[] bytes = MyMessagePacker.FastSerialize(usa); //byte[] bytes = MessagePackSerializer.Serialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageKey; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); //Console.WriteLine("产生了数据:"+body); //var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = Encoding.UTF8.GetBytes(body) }); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine("redis-udppackage"); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var DingYue1 = ("redis-roomstatus-monitor", new Action(async (args) => { string body = args.Body; StepInfo? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageStepMonitor; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); //await _messageChannel.Writer.WriteAsync(""); })); //旁路系统 var DingYue2 = ("redis-forksystemdata", new Action(async (args) => { string body = args.Body; ForkSystem? forkSystem = System.Text.Json.JsonSerializer.Deserialize(body); byte[] qf = MyMessagePacker.FastSerialize(forkSystem); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackageForkSystemKey; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); //await Task.Factory.StartNew(async () => // { // await ConsumeMessagesAsync(); // }, TaskCreationOptions.LongRunning); })); ///能耗 var DingYue3 = ("redis-power", new Action(async (args) => { string body = args.Body; try { NengHao? poo = System.Text.Json.JsonSerializer.Deserialize(body); //string str= Newtonsoft.Json.JsonConvert.SerializeObject(poo); //Console.WriteLine("收到了"+str); byte[] qf = MyMessagePacker.FastSerialize(poo); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.UDPPackagePowerMonitor; await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); List la = new List(); foreach (var item in poo.AllDeviceData) { DeviceData dd = new DeviceData(); dd.HostID = item.HostID; dd.DeviceType = item.DeviceType; dd.Address = item.Address; dd.Brightness = item.Brightness; dd.Status = item.Status; dd.CurrentTemp = item.CurrentTemp; dd.SettingTemp = item.SettingTemp; dd.Mode = item.Mode; dd.FanSpeed = item.FanSpeed; dd.Valve = item.Valve; la.Add(dd); } //宝镜系统使用 EnergyConsumption ese = new EnergyConsumption(); ese.HotelCode = poo.HotelCode; ese.HostNumber = poo.HostNumber; ese.Mac = poo.Mac; ese.EndPoint = poo.EndPoint; ese.V = poo.V; ese.A = poo.A; ese.P = poo.P; ese.EnergyConsumption_ = poo.Energy_Consumption; ese.SumEnergyConsumption = poo.Sum_Energy_Consumption; ese.CreateTime = poo.CreateTime; ese.RoomNumber = poo.RoomNumber ?? ""; ese.IsTakeCard = poo.IsTakeCard; ese.CarbonVIP = poo.CarbonVIP; ese.DeviceStatusList.AddRange(la); ese.IdentityInfo = poo.IdentityInfo; byte[] data = ese.ToByteArray(); string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey1 = KafkaKey.UDPPackagePowerMonitor; await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); var DingYue3_1 = ("redis-action-data", new Action(async (args) => { string body = args.Body; try { NewRoomtusPush? poo = System.Text.Json.JsonSerializer.Deserialize(body); //byte[] qf = MyMessagePacker.FastSerialize(poo); //string TopicKey = KafkaKey.BLWLog_RCU_Topic; //string DetailKey = KafkaKey.UDPPackagePowerMonitor; //await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); //宝镜系统使用 NewActionChangeDataPush ese = new NewActionChangeDataPush(); ese.Code = poo.code; ese.RoomNumber = poo.roomNumber; ese.HostNumber = poo.hostnumber; ese.Address = poo.address; ese.Brightness = poo.brightness; ese.Status = poo.status; ese.Name = poo.name; ese.CurrentTemp = poo.currentTemp; ese.SettingTemp = poo.settingTemp; ese.FanSpeed = poo.fanSpeed; ese.Mode = poo.mode; ese.Valve = poo.valve; ese.Createtime = poo.createtime; byte[] data = ese.ToByteArray(); string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey1 = KafkaKey.UDPPackage_ActionData; await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #region 碳达人 //var DingYue3_1 = ("redis-carbon_trigger", new Action(async (args) => //{ // string body = args.Body; // NengHao? poo = System.Text.Json.JsonSerializer.Deserialize(body); // //poo.ReportTime = DateTime.Now; // byte[] qf = MyMessagePacker.FastSerialize(poo); // string TopicKey = KafkaKey.BLWLog_RCU_Topic; // string DetailKey = KafkaKey.UDPPackagePowerMonitor; // await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = qf }); // //宝镜系统使用 // EnergyConsumption ese = new EnergyConsumption(); // ese.HotelCode = poo.HotelCode; // ese.HostNumber = poo.HostNumber; // ese.Mac = poo.Mac; // ese.EndPoint = poo.EndPoint; // ese.V = poo.V; // ese.A = poo.A; // ese.P = poo.P; // ese.KWH = poo.KW_H; // ese.SumKWH = poo.Sum_KW_H; // ese.CreateTime = poo.CreateTime; // ese.RoomNumber = poo.RoomNumber; // ese.IsTakeCard = poo.IsTakeCard; // ese.CarbonVIP = poo.CarbonVIP; // byte[] data = ese.ToByteArray(); // string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic; // string DetailKey1 = KafkaKey.UDPPackageWholeDataMonitor; // await p.ProduceAsync(TopicKey1, new Message { Key = DetailKey1, Value = data }); //})); #endregion //给宝镜推送数据 ,本意是推送原始的二进制,没有解析过的数据 var DingYue10 = ("redis-baojing-powerdata", new Action(async (args) => { string body = args.Body; try { if (!string.IsNullOrEmpty(body)) { byte[] hexdata = Tools.GetBytesFromString(body); string TopicKey = KafkaKey.BLWLog4BaoJing_RCU_Topic; string DetailKey = KafkaKey.InvokceThirdHttpInterface; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = hexdata }); } } catch (Exception ex) { Console.WriteLine("宝镜错误数为:" + body); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //小度 var DingYue5 = ("redis-iotpackage", new Action(async (args) => { try { string body = args.Body; IOTMonitorData? usa = System.Text.Json.JsonSerializer.Deserialize(body); //usa.CreateTime = DateTime.Now; byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.IotMonitor; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //在线离线状态 var DingYue6 = ("redis-on_off_line", new Action(async (args) => { try { string body = args.Body; OnOffLineData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUOnLineStatus; //var partition = new Partition(0); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine("aaaaaaaaaaaaaaaaaaaaaa"); Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //取电状态 var DingYue7 = ("redis-takecard_change", new Action(async (args) => { try { string body = args.Body; MTakeCardData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.TakeCardStatus; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //服务状态变化 var DingYue8 = ("redis-serviceinfo_change", new Action(async (args) => { try { string body = args.Body; OtherServiceInfo? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.ServiceInfoStatus; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //第三方接口调用日志 var DingYue9 = ("redis-invoke_httpinterface", new Action(async (args) => { try { string body = args.Body; Interface3Log? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.InvokceThirdHttpInterface; //var partition = new Partition(1); // 指定分区号 //var topicPartition = new TopicPartition(TopicKey, partition); var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #region 新版本协议 //新版主机数据重启原因 var DingYue11 = ("redis-rcu-restart", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_RestartReason; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //取电状态 var DingYue12 = ("redis-rcu-card_action", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_TakeCard; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //设备动作状态 var DingYue13 = ("redis-rcu-hexdata", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_0E; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //注册 var DingYue14 = ("redis-rcu-registerdata", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewVersion_Register; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); //定时上报 var DingYue15 = ("redis-rcu-timer_data", new Action(async (args) => { try { string body = args.Body; NewVersionHexData? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MyMessagePacker.FastSerialize(usa); string TopicKey = KafkaKey.BLWLog_RCU_Topic; string DetailKey = KafkaKey.RCUNewTimerData; var dr = await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })); #endregion CSRedisCacheHelper.redis3.Subscribe(wireshark); CSRedisCacheHelper.redis3.Subscribe(xiaofei_fenxi); //这两个是统计 CSRedisCacheHelper.redis3.Subscribe(DingYue); CSRedisCacheHelper.redis3.Subscribe(DingYue1); //旁路 //CSRedisCacheHelper.redis3.Subscribe(DingYue2); CSRedisCacheHelper.redis3.Subscribe(DingYue3); CSRedisCacheHelper.redis3.Subscribe(DingYue5); CSRedisCacheHelper.redis3.Subscribe(DingYue6); CSRedisCacheHelper.redis3.Subscribe(DingYue7); CSRedisCacheHelper.redis3.Subscribe(DingYue8); //CSRedisCacheHelper.redis3.Subscribe(DingYue9); // 很抱歉打扰你了 //CSRedisCacheHelper.redis3.Subscribe(DingYue10); // 创建无界Channel //_messageChannel = Channel.CreateUnbounded(); //CSRedisCacheHelper.redis3.Subscribe(DingYue11); //CSRedisCacheHelper.redis3.Subscribe(DingYue12); //CSRedisCacheHelper.redis3.Subscribe(DingYue13); //CSRedisCacheHelper.redis3.Subscribe(DingYue14); //CSRedisCacheHelper.redis3.Subscribe(DingYue15); //CSRedisCacheHelper.redis3.Subscribe(DingYue3_1); } catch (Exception ex) { logger.Error(ex.Message); logger.Error(ex.StackTrace); } }, TaskCreationOptions.LongRunning, stoppingToken); } /// /// 百度api /// /// public static string GetBaiduIp(string ip) { string location = ""; try { string url = $"https://sp0.baidu.com"; //WebClient client = new WebClient(); RestSharp.RestClient client1 = new RestSharp.RestClient(url); RestSharp.RestRequest request = new RestSharp.RestRequest($"/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8", Method.Get); var buffer = client1.DownloadData(request); //var buffer = client.DownloadData(url); string jsonText = Encoding.UTF8.GetString(buffer); JObject jo = JObject.Parse(jsonText); Root root = JsonConvert.DeserializeObject(jo.ToString()); foreach (var item in root.data) { location = item.location; } return location; } catch (Exception ex) { //Console.WriteLine(ex); return location; } } private static string NewMethod(string Key, string ti, double data) { UDPPackage v = new UDPPackage(); v.CommandType = Key; long CPU_MAX = (long)Math.Round(data); v.TotalCount = CPU_MAX; v.RemoveTime = ti; string f1 = System.Text.Json.JsonSerializer.Serialize(v); return f1; } public static async Task ConsumeMessagesAsync() { while (true) { //if (_messageChannel.Reader.TryRead(out var nnn)) //{ } } } } }