88 lines
3.3 KiB
C#
88 lines
3.3 KiB
C#
using System.Text;
|
||
using Common;
|
||
using CommonEntity;
|
||
using CommonEntity.RCUEntity;
|
||
using CommonTools;
|
||
using Confluent.Kafka;
|
||
using IOT_JieKou;
|
||
using NLog;
|
||
using NPOI.OpenXml4Net.OPC;
|
||
|
||
namespace BLWLogServer.Services
|
||
{
|
||
public class KafkaConsume : BackgroundService
|
||
{
|
||
public IConfiguration Configuration { get; set; }
|
||
public IClusterClient clusterClient { get; set; }
|
||
public KafkaConsume(IConfiguration configuration,IClusterClient ccc)
|
||
{
|
||
Configuration = configuration;
|
||
this.clusterClient = ccc;
|
||
}
|
||
|
||
public static Logger logger = NLog.LogManager.GetCurrentClassLogger();
|
||
|
||
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
await Task.Factory.StartNew(async () =>
|
||
{
|
||
var consumers = new List<Task>();
|
||
for (int i = 0; i < 3; i++)
|
||
{
|
||
consumers.Add(Task.Run(() => StartConsumer(stoppingToken)));
|
||
}
|
||
await Task.WhenAll(consumers);
|
||
}, TaskCreationOptions.LongRunning);
|
||
}
|
||
private async Task StartConsumer(CancellationToken stoppingToken)
|
||
{
|
||
string? ipport = Configuration["Kafka:EndPoint"];
|
||
string? user = Configuration["Kafka:UserName"];
|
||
string? pwd = Configuration["Kafka:PassWord"];
|
||
string? mongodbconnectstr = Configuration["Mongodb:Connectstr"];
|
||
while (!stoppingToken.IsCancellationRequested)
|
||
{
|
||
var config = new ConsumerConfig
|
||
{
|
||
GroupId = "blwlogserver-consumer-group",
|
||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||
BootstrapServers = ipport,
|
||
SecurityProtocol = SecurityProtocol.SaslPlaintext,
|
||
SaslMechanism = SaslMechanism.Plain,
|
||
SaslUsername = user,
|
||
SaslPassword = pwd
|
||
};
|
||
|
||
var c = new ConsumerBuilder<string, byte[]>(config).Build();
|
||
c.Subscribe(KafkaKey.New_RCU_Data_Topic);
|
||
try
|
||
{
|
||
while (true)
|
||
{
|
||
var cr = c.Consume(stoppingToken);
|
||
try
|
||
{
|
||
var k = cr.Message.Key;
|
||
var v = cr.Message.Value;
|
||
if (k.Equals(KafkaKey.UDPckageAllUDPDataKey))
|
||
{
|
||
RCU_UDPData_With_String UDPPPP = MyMessagePacker.FastDeserialize<RCU_UDPData_With_String>(v);
|
||
string? endpoint1= UDPPPP.endpoint;
|
||
var cl1= clusterClient.GetGrain<IDataTran>(endpoint1,GrainPrefix.RCU_Grain_Prefix);
|
||
await cl1.TongXin(UDPPPP);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
logger.Error("Ex出错:" + ex.Message);
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
logger.Error("未知错误" + ex.Message);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
} |