88 lines
3.3 KiB
C#
88 lines
3.3 KiB
C#
|
|
using System.Text;
|
|||
|
|
using Common;
|
|||
|
|
using CommonEntity;
|
|||
|
|
using CommonEntity.RCUEntity;
|
|||
|
|
using CommonTools;
|
|||
|
|
using Confluent.Kafka;
|
|||
|
|
using IOT_JieKou;
|
|||
|
|
using NLog;
|
|||
|
|
using NPOI.OpenXml4Net.OPC;
|
|||
|
|
|
|||
|
|
namespace BLWLogServer.Services
|
|||
|
|
{
|
|||
|
|
public class KafkaConsume : BackgroundService
|
|||
|
|
{
|
|||
|
|
public IConfiguration Configuration { get; set; }
|
|||
|
|
public IClusterClient clusterClient { get; set; }
|
|||
|
|
public KafkaConsume(IConfiguration configuration,IClusterClient ccc)
|
|||
|
|
{
|
|||
|
|
Configuration = configuration;
|
|||
|
|
this.clusterClient = ccc;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public static Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
|||
|
|
|
|||
|
|
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
|
|||
|
|
{
|
|||
|
|
await Task.Factory.StartNew(async () =>
|
|||
|
|
{
|
|||
|
|
var consumers = new List<Task>();
|
|||
|
|
for (int i = 0; i < 3; i++)
|
|||
|
|
{
|
|||
|
|
consumers.Add(Task.Run(() => StartConsumer(stoppingToken)));
|
|||
|
|
}
|
|||
|
|
await Task.WhenAll(consumers);
|
|||
|
|
}, TaskCreationOptions.LongRunning);
|
|||
|
|
}
|
|||
|
|
private async Task StartConsumer(CancellationToken stoppingToken)
|
|||
|
|
{
|
|||
|
|
string? ipport = Configuration["Kafka:EndPoint"];
|
|||
|
|
string? user = Configuration["Kafka:UserName"];
|
|||
|
|
string? pwd = Configuration["Kafka:PassWord"];
|
|||
|
|
string? mongodbconnectstr = Configuration["Mongodb:Connectstr"];
|
|||
|
|
while (!stoppingToken.IsCancellationRequested)
|
|||
|
|
{
|
|||
|
|
var config = new ConsumerConfig
|
|||
|
|
{
|
|||
|
|
GroupId = "blwlogserver-consumer-group",
|
|||
|
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
|||
|
|
BootstrapServers = ipport,
|
|||
|
|
SecurityProtocol = SecurityProtocol.SaslPlaintext,
|
|||
|
|
SaslMechanism = SaslMechanism.Plain,
|
|||
|
|
SaslUsername = user,
|
|||
|
|
SaslPassword = pwd
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
var c = new ConsumerBuilder<string, byte[]>(config).Build();
|
|||
|
|
c.Subscribe(KafkaKey.New_RCU_Data_Topic);
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
while (true)
|
|||
|
|
{
|
|||
|
|
var cr = c.Consume(stoppingToken);
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var k = cr.Message.Key;
|
|||
|
|
var v = cr.Message.Value;
|
|||
|
|
if (k.Equals(KafkaKey.UDPckageAllUDPDataKey))
|
|||
|
|
{
|
|||
|
|
RCU_UDPData_With_String UDPPPP = MyMessagePacker.FastDeserialize<RCU_UDPData_With_String>(v);
|
|||
|
|
string? endpoint1= UDPPPP.endpoint;
|
|||
|
|
var cl1= clusterClient.GetGrain<IDataTran>(endpoint1,GrainPrefix.RCU_Grain_Prefix);
|
|||
|
|
await cl1.TongXin(UDPPPP);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
logger.Error("Ex出错:" + ex.Message);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
logger.Error("未知错误" + ex.Message);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|