Files
Web_BLSKafka_Server_Prod/BLWLogServer/Services/KafkaConsume1.cs
2025-11-21 08:48:01 +08:00

154 lines
6.8 KiB
C#

using System.Text;
using Common;
using CommonEntity;
using CommonTools;
using Confluent.Kafka;
using MongoDB.Driver;
using NLog;
using System.Collections.Generic;
using Confluent.Kafka.Admin;
using static Confluent.Kafka.ConfigPropertyNames;
using System.Configuration;
namespace BLWLogServer.Services
{
public class KafkaConsume1 : BackgroundService
{
public IConfiguration Configuration { get; set; }
public KafkaConsume1(IConfiguration configuration)
{
Configuration = configuration;
}
public static Logger logger = NLog.LogManager.GetCurrentClassLogger();
private async Task StartConsumer(CancellationToken stoppingToken)
{
string? ipport = Configuration["Kafka:EndPoint"];
string? user = Configuration["Kafka:UserName"];
string? pwd = Configuration["Kafka:PassWord"];
string? mongodbconnectstr = Configuration["Mongodb:Connectstr"];
while (!stoppingToken.IsCancellationRequested)
{
var conf = new ConsumerConfig
{
GroupId = "blwlogserver-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
BootstrapServers = ipport,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
EnableAutoCommit = true,
SaslUsername = user,
SaslPassword = pwd
};
var c = new ConsumerBuilder<string, byte[]>(conf)
.SetErrorHandler((_, e) =>
{
logger.Error($"消费者错误: {e.Reason}");
}).SetPartitionsAssignedHandler((c, partitions) =>
{
logger.Error($"消费者分配到分区: {string.Join(", ", partitions)}");
}).Build();
//c.Subscribe(KafkaKey.BLWLog_RCU_Topic_Partition);
c.Subscribe(KafkaKey.BLWLog_RCU_Topic);
var connectionString = mongodbconnectstr;
var client = new MongoClient(connectionString);
try
{
while (true)
{
var consumeResult = c.Consume(stoppingToken);
try
{
var k = consumeResult.Message.Key;
var v = consumeResult.Message.Value;
if (k.Equals("testtest"))
{
Console.WriteLine("11111111111111111111");
}
if (k.Equals(KafkaKey.InvokceThirdHttpInterface))
{
var collection = client.GetDatabase("udppackage").GetCollection<Interface3Log_db>("invokehttpinterfacelog");
Interface3Log UDPPPP = MyMessagePacker.FastDeserialize<Interface3Log>(v);
Interface3Log_db us = new Interface3Log_db();
us.HotelCode = UDPPPP.HotelCode;
us.HostNumber = UDPPPP.HostNumber;
us.RoomNumber = UDPPPP.RoomNumber;
us.TriggerTime = UDPPPP.TriggerTime;
us.ActionData = UDPPPP.ActionData;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.TakeCardStatus))
{
var collection = client.GetDatabase("udppackage").GetCollection<MTakeCardData_db>("takecardlog");
MTakeCardData UDPPPP = MyMessagePacker.FastDeserialize<MTakeCardData>(v);
MTakeCardData_db us = new MTakeCardData_db();
us.Status = UDPPPP.Status;
us.HostNUMBER = UDPPPP.HostNUMBER;
us.LastUpdateTime = UDPPPP.LastUpdateTime;
us.HotelCode = UDPPPP.HotelCode;
us.Status = UDPPPP.Status;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.ServiceInfoStatus))
{
var collection = client.GetDatabase("udppackage").GetCollection<OtherServiceInfo_db>("serviceinfolog");
OtherServiceInfo UDPPPP = MyMessagePacker.FastDeserialize<OtherServiceInfo>(v);
OtherServiceInfo_db us = new OtherServiceInfo_db();
us.HotelCode = UDPPPP.HotelCode;
us.HostNumber = UDPPPP.HostNumber;
us.Address = UDPPPP.Address;
us.StatusReceiver = UDPPPP.StatusReceiver;
us.LastUpdateTime = UDPPPP.LastUpdateTime;
await collection.InsertOneAsync(us);
}
else
{
Console.WriteLine( "11111111111111111");
}
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"消费者1到达分区末尾: {consumeResult.TopicPartitionOffset}");
}
//Console.WriteLine($"消费者1收到消息: [分区{consumeResult.Partition}] " +
// $"偏移量{consumeResult.Offset}: {consumeResult.Message.Value}");
// 手动提交偏移量
//c.Commit(consumeResult);
}
catch (ConsumeException e)
{
logger.Error(e.Message);
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Factory.StartNew(async () =>
{
var consumers = new List<Task>();
for (int i = 0; i < 2; i++)
{
consumers.Add(Task.Run(() => StartConsumer(stoppingToken)));
}
await Task.WhenAll(consumers);
}, TaskCreationOptions.LongRunning);
await Task.CompletedTask;
}
}
}