Files
2025-12-11 14:04:39 +08:00

88 lines
3.3 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Text;
using Common;
using CommonEntity;
using CommonEntity.RCUEntity;
using CommonTools;
using Confluent.Kafka;
using IOT_JieKou;
using NLog;
using NPOI.OpenXml4Net.OPC;
namespace BLWLogServer.Services
{
public class KafkaConsume : BackgroundService
{
public IConfiguration Configuration { get; set; }
public IClusterClient clusterClient { get; set; }
public KafkaConsume(IConfiguration configuration,IClusterClient ccc)
{
Configuration = configuration;
this.clusterClient = ccc;
}
public static Logger logger = NLog.LogManager.GetCurrentClassLogger();
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Factory.StartNew(async () =>
{
var consumers = new List<Task>();
for (int i = 0; i < 3; i++)
{
consumers.Add(Task.Run(() => StartConsumer(stoppingToken)));
}
await Task.WhenAll(consumers);
}, TaskCreationOptions.LongRunning);
}
private async Task StartConsumer(CancellationToken stoppingToken)
{
string? ipport = Configuration["Kafka:EndPoint"];
string? user = Configuration["Kafka:UserName"];
string? pwd = Configuration["Kafka:PassWord"];
string? mongodbconnectstr = Configuration["Mongodb:Connectstr"];
while (!stoppingToken.IsCancellationRequested)
{
var config = new ConsumerConfig
{
GroupId = "blwlogserver-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
BootstrapServers = ipport,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = user,
SaslPassword = pwd
};
var c = new ConsumerBuilder<string, byte[]>(config).Build();
c.Subscribe(KafkaKey.New_RCU_Data_Topic);
try
{
while (true)
{
var cr = c.Consume(stoppingToken);
try
{
var k = cr.Message.Key;
var v = cr.Message.Value;
if (k.Equals(KafkaKey.UDPckageAllUDPDataKey))
{
RCU_UDPData_With_String UDPPPP = MyMessagePacker.FastDeserialize<RCU_UDPData_With_String>(v);
string? endpoint1= UDPPPP.endpoint;
var cl1= clusterClient.GetGrain<IDataTran>(endpoint1,GrainPrefix.RCU_Grain_Prefix);
await cl1.TongXin(UDPPPP);
}
}
catch (Exception ex)
{
logger.Error("Ex出错" + ex.Message);
}
}
}
catch (Exception ex)
{
logger.Error("未知错误" + ex.Message);
}
}
}
}
}