using System.Text; using Confluent.Kafka; using Google.Protobuf; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Tutorial; namespace duoge { internal class Program { static void Main(string[] args) { string json = @"{ 'name': 'John Doe', 'id': 123, 'email': 'john@example.com' }"; JObject jsonObject = JObject.Parse(json); Person person = new Person(); person.Name = (string)jsonObject["name"]; person.Id = (int)jsonObject["id"]; person.Email = (string)jsonObject["email"]; Console.WriteLine("Name: " + person.Name); // 输出转换后的ProtoBuf对象信息 byte[] bbb1 = Encoding.UTF8.GetBytes(json); byte[] bbb = person.ToByteArray(); Person PPP = Person.Parser.ParseFrom(bbb); // 方法1:使用静态Parser(推荐) Person restoredPerson1 = Person.Parser.ParseFrom(bbb); // 方法2:创建新的Parser实例 var parser = new MessageParser(() => new Person()); Person restoredPerson2 = parser.ParseFrom(bbb); // 方法3:使用扩展方法 Person restoredPerson3 = Person.Parser.ParseFrom(bbb.AsMemory().ToArray()); Console.WriteLine("Hello, World!"); } static void MMM() { string[] _topics = ["", ""]; List> _consumers = new List>(); // 为每个主题创建一个消费者 foreach (var topic in _topics) { var consumerConfig = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = $"consumer-group-{topic}", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; var consumer = new ConsumerBuilder(consumerConfig).Build(); consumer.Subscribe(topic); _consumers.Add(consumer); // 为每个消费者启动一个独立的任务 Task.Run(() => ConsumeMessages(consumer, topic)); } } static private async Task ConsumeMessages(IConsumer consumer, string topic) { try { var consumeResult = consumer.Consume(); if (consumeResult != null) { Console.WriteLine($"收到消息 - 主题: {topic}, 键: {consumeResult.Message.Key}, 值: {consumeResult.Message.Value}"); // 手动提交偏移量 consumer.Commit(consumeResult); } } catch (ConsumeException ex) { } catch (OperationCanceledException) { } } //private async Task ProcessMessage(string topic, string key, string value) //{ // // 根据不同的主题执行不同的处理逻辑 // switch (topic) // { // case "topic-1": // await ProcessTopic1Message(key, value); // break; // case "topic-2": // await ProcessTopic2Message(key, value); // break; // // ... 其他主题的处理逻辑 // default: // _logger.LogInformation($"处理默认主题消息: {topic}, Key: {key}, Value: {value}"); // break; // } //} //private async Task ProcessTopic1Message(string key, string value) //{ // // 主题1的具体处理逻辑 // await Task.Delay(100); // 模拟处理耗时 // _logger.LogInformation($"处理主题1消息: Key={key}, Value={value}"); //} //private async Task ProcessTopic2Message(string key, string value) //{ // // 主题2的具体处理逻辑 // await Task.Delay(100); // 模拟处理耗时 // _logger.LogInformation($"处理主题2消息: Key={key}, Value={value}"); //} //public Task StopAsync(CancellationToken cancellationToken) //{ // _logger.LogInformation("停止Kafka消费者服务..."); // _cancellationTokenSource.Cancel(); // foreach (var consumer in _consumers) // { // consumer.Close(); // consumer.Dispose(); // } // return Task.CompletedTask; //} } }