using System.Text; using System.Text.Json; using System.Threading.Channels; using BLWWS_BLL.Common; using Confluent.Kafka; using MessagePack; using Microsoft.Extensions.Caching.Memory; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; using PMSLogProduce.Models; using RestSharp; using static CSRedis.CSRedisClient; namespace BLWLogProduce.Services { public class KafkaProduce : BackgroundService { public IConfiguration Configuration { get; set; } public IMemoryCache _Cache { get; set; } public KafkaProduce(IConfiguration configuration, IMemoryCache cache) { this.Configuration = configuration; _Cache = cache; } public static Logger logger = LogManager.GetCurrentClassLogger(); protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew((state) => { try { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; var config = new ProducerConfig { BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, SaslUsername = user, SaslPassword = pwd }; var p = new ProducerBuilder(config).Build(); var DingYue1 = ("PMSLogMonitor", new Action(async (args) => { string body = args.Body; CheckInYuanShidata? usa = System.Text.Json.JsonSerializer.Deserialize(body); byte[] bytes = MessagePackSerializer.Serialize(usa); string TopicKey = "blwlog-rcu-udppackage-topic"; string DetailKey = "pms"; Console.WriteLine("推送了数据"); await p.ProduceAsync(TopicKey, new Message { Key = DetailKey, Value = bytes }); })); CSRedisCacheHelper.redis3.Subscribe(DingYue1); } catch (Exception ex) { logger.Error(ex.Message); } }, TaskCreationOptions.LongRunning); } /// /// 百度api /// /// public static string GetBaiduIp(string ip) { string location = ""; try { string url = $"https://sp0.baidu.com"; //WebClient client = new WebClient(); RestSharp.RestClient client1 = new RestSharp.RestClient(url); RestSharp.RestRequest request = new RestSharp.RestRequest($"/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8", Method.Get); var buffer = client1.DownloadData(request); //var buffer = client.DownloadData(url); string jsonText = Encoding.UTF8.GetString(buffer); JObject jo = JObject.Parse(jsonText); Root root = JsonConvert.DeserializeObject(jo.ToString()); foreach (var item in root.data) { location = item.location; } return location; } catch (Exception ex) { //Console.WriteLine(ex); return location; } } public static async Task ConsumeMessagesAsync() { while (true) { //if (_messageChannel.Reader.TryRead(out var nnn)) //{ } } } } }