Files
Web_PMSApi_Server_V1_Prod/PMSLogProduce/Services/KafkaProduce.cs

114 lines
4.0 KiB
C#
Raw Normal View History

2025-11-20 15:56:30 +08:00

using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using BLWWS_BLL.Common;
using Confluent.Kafka;
using MessagePack;
using Microsoft.Extensions.Caching.Memory;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NLog;
using PMSLogProduce.Models;
using RestSharp;
using static CSRedis.CSRedisClient;
namespace BLWLogProduce.Services
{
public class KafkaProduce : BackgroundService
{
public IConfiguration Configuration { get; set; }
public IMemoryCache _Cache { get; set; }
public KafkaProduce(IConfiguration configuration, IMemoryCache cache)
{
this.Configuration = configuration;
_Cache = cache;
}
public static Logger logger = LogManager.GetCurrentClassLogger();
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Factory.StartNew((state) =>
{
try
{
string? ipport = Configuration["Kafka:EndPoint"];
string? user = Configuration["Kafka:UserName"];
string? pwd = Configuration["Kafka:PassWord"];
var config = new ProducerConfig
{
BootstrapServers = ipport,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = user,
SaslPassword = pwd
};
var p = new ProducerBuilder<string, byte[]>(config).Build();
var DingYue1 = ("PMSLogMonitor", new Action<SubscribeMessageEventArgs>(async (args) =>
{
string body = args.Body;
CheckInYuanShidata? usa = System.Text.Json.JsonSerializer.Deserialize<CheckInYuanShidata>(body);
byte[] bytes = MessagePackSerializer.Serialize(usa);
string TopicKey = "blwlog-rcu-udppackage-topic";
string DetailKey = "pms";
Console.WriteLine("推送了数据");
await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
}));
CSRedisCacheHelper.redis3.Subscribe(DingYue1);
}
catch (Exception ex)
{
logger.Error(ex.Message);
}
}, TaskCreationOptions.LongRunning);
}
/// <summary>
/// 百度api
/// </summary>
/// <returns></returns>
public static string GetBaiduIp(string ip)
{
string location = "";
try
{
string url = $"https://sp0.baidu.com";
//WebClient client = new WebClient();
RestSharp.RestClient client1 = new RestSharp.RestClient(url);
RestSharp.RestRequest request = new RestSharp.RestRequest($"/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8", Method.Get);
var buffer = client1.DownloadData(request);
//var buffer = client.DownloadData(url);
string jsonText = Encoding.UTF8.GetString(buffer);
JObject jo = JObject.Parse(jsonText);
Root root = JsonConvert.DeserializeObject<Root>(jo.ToString());
foreach (var item in root.data)
{
location = item.location;
}
return location;
}
catch (Exception ex)
{
//Console.WriteLine(ex);
return location;
}
}
public static async Task ConsumeMessagesAsync()
{
while (true)
{
//if (_messageChannel.Reader.TryRead(out var nnn))
//{ }
}
}
}
}