114 lines
4.0 KiB
C#
114 lines
4.0 KiB
C#
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Threading.Channels;
|
|
using BLWWS_BLL.Common;
|
|
using Confluent.Kafka;
|
|
using MessagePack;
|
|
using Microsoft.Extensions.Caching.Memory;
|
|
using Newtonsoft.Json;
|
|
using Newtonsoft.Json.Linq;
|
|
using NLog;
|
|
using PMSLogProduce.Models;
|
|
using RestSharp;
|
|
using static CSRedis.CSRedisClient;
|
|
|
|
namespace BLWLogProduce.Services
|
|
{
|
|
public class KafkaProduce : BackgroundService
|
|
{
|
|
public IConfiguration Configuration { get; set; }
|
|
public IMemoryCache _Cache { get; set; }
|
|
|
|
public KafkaProduce(IConfiguration configuration, IMemoryCache cache)
|
|
{
|
|
this.Configuration = configuration;
|
|
_Cache = cache;
|
|
}
|
|
public static Logger logger = LogManager.GetCurrentClassLogger();
|
|
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
await Task.Factory.StartNew((state) =>
|
|
{
|
|
try
|
|
{
|
|
string? ipport = Configuration["Kafka:EndPoint"];
|
|
string? user = Configuration["Kafka:UserName"];
|
|
string? pwd = Configuration["Kafka:PassWord"];
|
|
var config = new ProducerConfig
|
|
{
|
|
BootstrapServers = ipport,
|
|
SecurityProtocol = SecurityProtocol.SaslPlaintext,
|
|
SaslMechanism = SaslMechanism.Plain,
|
|
SaslUsername = user,
|
|
SaslPassword = pwd
|
|
};
|
|
var p = new ProducerBuilder<string, byte[]>(config).Build();
|
|
|
|
var DingYue1 = ("PMSLogMonitor", new Action<SubscribeMessageEventArgs>(async (args) =>
|
|
{
|
|
string body = args.Body;
|
|
|
|
CheckInYuanShidata? usa = System.Text.Json.JsonSerializer.Deserialize<CheckInYuanShidata>(body);
|
|
byte[] bytes = MessagePackSerializer.Serialize(usa);
|
|
|
|
string TopicKey = "blwlog-rcu-udppackage-topic";
|
|
string DetailKey = "pms";
|
|
Console.WriteLine("推送了数据");
|
|
await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
|
|
}));
|
|
|
|
CSRedisCacheHelper.redis3.Subscribe(DingYue1);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logger.Error(ex.Message);
|
|
}
|
|
}, TaskCreationOptions.LongRunning);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// 百度api
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public static string GetBaiduIp(string ip)
|
|
{
|
|
string location = "";
|
|
try
|
|
{
|
|
string url = $"https://sp0.baidu.com";
|
|
//WebClient client = new WebClient();
|
|
RestSharp.RestClient client1 = new RestSharp.RestClient(url);
|
|
RestSharp.RestRequest request = new RestSharp.RestRequest($"/8aQDcjqpAAV3otqbppnN2DJv/api.php?query={ip}&co=&resource_id=6006&oe=utf8", Method.Get);
|
|
var buffer = client1.DownloadData(request);
|
|
//var buffer = client.DownloadData(url);
|
|
string jsonText = Encoding.UTF8.GetString(buffer);
|
|
JObject jo = JObject.Parse(jsonText);
|
|
|
|
Root root = JsonConvert.DeserializeObject<Root>(jo.ToString());
|
|
foreach (var item in root.data)
|
|
{
|
|
location = item.location;
|
|
}
|
|
return location;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
//Console.WriteLine(ex);
|
|
return location;
|
|
}
|
|
|
|
}
|
|
|
|
public static async Task ConsumeMessagesAsync()
|
|
{
|
|
while (true)
|
|
{
|
|
//if (_messageChannel.Reader.TryRead(out var nnn))
|
|
//{ }
|
|
}
|
|
}
|
|
}
|
|
}
|