using System.Text; using Common; using CommonEntity; using CommonEntity.RCUEntity; using CommonTools; using Confluent.Kafka; using IOT_JieKou; using NLog; using NPOI.OpenXml4Net.OPC; namespace BLWLogServer.Services { public class KafkaConsume : BackgroundService { public IConfiguration Configuration { get; set; } public IClusterClient clusterClient { get; set; } public KafkaConsume(IConfiguration configuration,IClusterClient ccc) { Configuration = configuration; this.clusterClient = ccc; } public static Logger logger = NLog.LogManager.GetCurrentClassLogger(); protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew(async () => { var consumers = new List(); for (int i = 0; i < 3; i++) { consumers.Add(Task.Run(() => StartConsumer(stoppingToken))); } await Task.WhenAll(consumers); }, TaskCreationOptions.LongRunning); } private async Task StartConsumer(CancellationToken stoppingToken) { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; string? mongodbconnectstr = Configuration["Mongodb:Connectstr"]; while (!stoppingToken.IsCancellationRequested) { var config = new ConsumerConfig { GroupId = "blwlogserver-consumer-group", AutoOffsetReset = AutoOffsetReset.Earliest, BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, SaslUsername = user, SaslPassword = pwd }; var c = new ConsumerBuilder(config).Build(); c.Subscribe(KafkaKey.New_RCU_Data_Topic); try { while (true) { var cr = c.Consume(stoppingToken); try { var k = cr.Message.Key; var v = cr.Message.Value; if (k.Equals(KafkaKey.UDPckageAllUDPDataKey)) { RCU_UDPData_With_String UDPPPP = MyMessagePacker.FastDeserialize(v); string? endpoint1= UDPPPP.endpoint; var cl1= clusterClient.GetGrain(endpoint1,GrainPrefix.RCU_Grain_Prefix); await cl1.TongXin(UDPPPP); } } catch (Exception ex) { logger.Error("Ex出错:" + ex.Message); } } } catch (Exception ex) { logger.Error("未知错误" + ex.Message); } } } } }