using System.Text; using Common; using CommonEntity; using CommonTools; using Confluent.Kafka; using MongoDB.Driver; using NLog; using System.Collections.Generic; using Confluent.Kafka.Admin; using static Confluent.Kafka.ConfigPropertyNames; using System.Configuration; namespace BLWLogServer.Services { public class KafkaConsume1 : BackgroundService { public IConfiguration Configuration { get; set; } public KafkaConsume1(IConfiguration configuration) { Configuration = configuration; } public static Logger logger = NLog.LogManager.GetCurrentClassLogger(); private async Task StartConsumer(CancellationToken stoppingToken) { string? ipport = Configuration["Kafka:EndPoint"]; string? user = Configuration["Kafka:UserName"]; string? pwd = Configuration["Kafka:PassWord"]; string? mongodbconnectstr = Configuration["Mongodb:Connectstr"]; while (!stoppingToken.IsCancellationRequested) { var conf = new ConsumerConfig { GroupId = "blwlogserver-consumer-group", AutoOffsetReset = AutoOffsetReset.Earliest, BootstrapServers = ipport, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, EnableAutoCommit = true, SaslUsername = user, SaslPassword = pwd }; var c = new ConsumerBuilder(conf) .SetErrorHandler((_, e) => { logger.Error($"消费者错误: {e.Reason}"); }).SetPartitionsAssignedHandler((c, partitions) => { logger.Error($"消费者分配到分区: {string.Join(", ", partitions)}"); }).Build(); //c.Subscribe(KafkaKey.BLWLog_RCU_Topic_Partition); c.Subscribe(KafkaKey.BLWLog_RCU_Topic); var connectionString = mongodbconnectstr; var client = new MongoClient(connectionString); try { while (true) { var consumeResult = c.Consume(stoppingToken); try { var k = consumeResult.Message.Key; var v = consumeResult.Message.Value; if (k.Equals("testtest")) { Console.WriteLine("11111111111111111111"); } if (k.Equals(KafkaKey.InvokceThirdHttpInterface)) { var collection = client.GetDatabase("udppackage").GetCollection("invokehttpinterfacelog"); Interface3Log UDPPPP = MyMessagePacker.FastDeserialize(v); Interface3Log_db us = new Interface3Log_db(); us.HotelCode = UDPPPP.HotelCode; us.HostNumber = UDPPPP.HostNumber; us.RoomNumber = UDPPPP.RoomNumber; us.TriggerTime = UDPPPP.TriggerTime; us.ActionData = UDPPPP.ActionData; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.TakeCardStatus)) { var collection = client.GetDatabase("udppackage").GetCollection("takecardlog"); MTakeCardData UDPPPP = MyMessagePacker.FastDeserialize(v); MTakeCardData_db us = new MTakeCardData_db(); us.Status = UDPPPP.Status; us.HostNUMBER = UDPPPP.HostNUMBER; us.LastUpdateTime = UDPPPP.LastUpdateTime; us.HotelCode = UDPPPP.HotelCode; us.Status = UDPPPP.Status; await collection.InsertOneAsync(us); } else if (k.Equals(KafkaKey.ServiceInfoStatus)) { var collection = client.GetDatabase("udppackage").GetCollection("serviceinfolog"); OtherServiceInfo UDPPPP = MyMessagePacker.FastDeserialize(v); OtherServiceInfo_db us = new OtherServiceInfo_db(); us.HotelCode = UDPPPP.HotelCode; us.HostNumber = UDPPPP.HostNumber; us.Address = UDPPPP.Address; us.StatusReceiver = UDPPPP.StatusReceiver; us.LastUpdateTime = UDPPPP.LastUpdateTime; await collection.InsertOneAsync(us); } else { Console.WriteLine( "11111111111111111"); } if (consumeResult.IsPartitionEOF) { Console.WriteLine($"消费者1到达分区末尾: {consumeResult.TopicPartitionOffset}"); } //Console.WriteLine($"消费者1收到消息: [分区{consumeResult.Partition}] " + // $"偏移量{consumeResult.Offset}: {consumeResult.Message.Value}"); // 手动提交偏移量 //c.Commit(consumeResult); } catch (ConsumeException e) { logger.Error(e.Message); } } } catch (OperationCanceledException) { c.Close(); } } } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew(async () => { var consumers = new List(); for (int i = 0; i < 2; i++) { consumers.Add(Task.Run(() => StartConsumer(stoppingToken))); } await Task.WhenAll(consumers); }, TaskCreationOptions.LongRunning); await Task.CompletedTask; } } }