using BLWData.Entity; using Confluent.Kafka; using static Confluent.Kafka.ConfigPropertyNames; namespace 消费 { internal class Program { static void Main(string[] args) { RunConsumers().Wait(); Console.WriteLine("Hello, World!"); Console.ReadKey(); } //private const string TopicName = "firsttopic"; private const string TopicName = "blwlog4BaoJing-rcu-udppackage-topic"; private const string BootstrapServers = "43.138.217.154:9092"; private const string GroupId = "parallel-consumer-group"; private const int ConsumerCount = 3; // 启动3个消费者 public static async Task RunConsumers() { var cts = new CancellationTokenSource(); var consumers = new List(); for (int i = 0; i < 1; i++) { consumers.Add(Task.Run(() => StartConsumer(cts.Token))); } await Task.WhenAll(consumers); //await StartConsumer(cts.Token); } private static async Task StartConsumer(CancellationToken cancellationToken) { //var config = new ConsumerConfig //{ // BootstrapServers = BootstrapServers, // GroupId = GroupId, // AutoOffsetReset = AutoOffsetReset.Earliest, // SecurityProtocol = SecurityProtocol.SaslPlaintext, // SaslMechanism = SaslMechanism.Plain, // EnableAutoCommit = true, // 手动提交偏移量 // EnablePartitionEof = true, // AllowAutoCreateTopics = true, // SaslUsername = "aaa", // SaslPassword = "aaa123" //}; var config = new ConsumerConfig { GroupId = "laravel-consumer-kafka-group", AutoOffsetReset = AutoOffsetReset.Earliest, //BootstrapServers = BootstrapServers, BootstrapServers = "43.138.217.154:9092", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Plain, EnableAutoCommit = true, // 手动提交偏移量 SaslUsername = "baojing", SaslPassword = "blwmaigong##2025" }; var consumer = new ConsumerBuilder(config).Build(); //var consumer = new ConsumerBuilder(config).SetErrorHandler((_, e) => Console.WriteLine($"消费者错误: {e.Reason}")).SetPartitionsAssignedHandler((c, partitions) => // { // Console.WriteLine($"消费者分配到分区: {string.Join(", ", partitions)}"); // }).Build(); consumer.Subscribe(TopicName); try { while (!cancellationToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(cancellationToken); var k = consumeResult.Message.Key; var v = consumeResult.Message.Value; EnergyConsumption ese = new EnergyConsumption(); if (k.Equals("")) { } var SSA = EnergyConsumption.Parser.ParseFrom(v); if (consumeResult.IsPartitionEOF) { Console.WriteLine($"消费者1到达分区末尾: {consumeResult.TopicPartitionOffset}"); continue; } Console.WriteLine($"消费者1收到消息: [分区{consumeResult.Partition}] " + $"偏移量{consumeResult.Offset}: {consumeResult.Message.Value}"); // 手动提交偏移量 //consumer.Commit(consumeResult); } catch (ConsumeException e) { Console.WriteLine($"消费者消费错误: {e.Error.Reason}"); } } } catch (Exception) { } finally { consumer.Close(); } await Task.CompletedTask; } } }