using Confluent.Kafka; namespace WebApplication2.services { public class MQServices : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", // Note: The AutoOffsetReset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. By default, offsets are committed // automatically, so in this example, consumption will only start from the // earliest message in the topic 'my-topic' the first time you run the program. AutoOffsetReset = AutoOffsetReset.Earliest }; using (var c = new ConsumerBuilder(conf).Build()) { c.Subscribe("test-topic"); try { while (true) { try { var cr = c.Consume(stoppingToken); var v= cr.Message.Value; var k=cr.Message.Key; Console.WriteLine($"Consumed message '{k} {v}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } } await Task.CompletedTask; } } }