Files
2025-11-21 08:48:01 +08:00

469 lines
26 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Text;
using Common;
using CommonEntity;
using CommonTools;
using Confluent.Kafka;
using MongoDB.Driver;
using NLog;
using System.Collections.Generic;
namespace BLWLogServer.Services
{
public class KafkaConsume : BackgroundService
{
public IConfiguration Configuration { get; set; }
public KafkaConsume(IConfiguration configuration)
{
Configuration = configuration;
}
public static Logger logger = NLog.LogManager.GetCurrentClassLogger();
// 添加两个队列用于存储最近10条TotalCount
private static Queue<long> _cpuMaxQueue = new(10);
private static Queue<long> _cpuAvgQueue = new(10);
private static Queue<long> _cpuMinQueue = new(10);
private static Queue<long> _TotalSendPackage = new(10);
private static Queue<long> _TotalRecvPackage = new(10);
private static Queue<long> _RCUOnLine = new(10);
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Factory.StartNew(async () =>
{
var consumers = new List<Task>();
for (int i = 0; i < 3; i++)
{
consumers.Add(Task.Run(() => StartConsumer(stoppingToken)));
}
await Task.WhenAll(consumers);
//await StartConsumer(stoppingToken);
}, TaskCreationOptions.LongRunning);
}
private async Task StartConsumer(CancellationToken stoppingToken)
{
string? ipport = Configuration["Kafka:EndPoint"];
string? user = Configuration["Kafka:UserName"];
string? pwd = Configuration["Kafka:PassWord"];
string? mongodbconnectstr = Configuration["Mongodb:Connectstr"];
while (!stoppingToken.IsCancellationRequested)
{
var config = new ConsumerConfig
{
GroupId = "blwlogserver-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
BootstrapServers = ipport,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = user,
SaslPassword = pwd
};
var c = new ConsumerBuilder<string, byte[]>(config).Build();
c.Subscribe(KafkaKey.BLWLog_RCU_Topic);
var connectionString = mongodbconnectstr;
var client = new MongoClient(connectionString);
try
{
while (true)
{
var cr = c.Consume(stoppingToken);
try
{
var k = cr.Message.Key;
var v = cr.Message.Value;
if (k.Equals(KafkaKey.UDPPackageKey))
{
//var UUU = Encoding.UTF8.GetString(v);
//Console.WriteLine($"Consumed message '{k} {v}' at: '{cr.TopicPartitionOffset}'.");
var collection = client.GetDatabase("udppackage").GetCollection<UDPPackage_db>("totalcount");
UDPPackage UDPPPP = MyMessagePacker.FastDeserialize<UDPPackage>(v);
UDPPackage_db us = new UDPPackage_db();
us.TotalCount = UDPPPP.TotalCount;
us.CommandType = UDPPPP.CommandType;
us.FenLei = UDPPPP.FenLei;
us.ExtraData = UDPPPP.ExtraData;
us.RemoveTimeString = UDPPPP.RemoveTime;
us.RemoveTime = DateTime.Parse(UDPPPP.RemoveTime);
await collection.InsertOneAsync(us);
// xu修改20250908
// 修改CPUMax处理逻辑
if (UDPPPP.CommandType == "UDPPackage_CPUMax")
{
// 维护队列长度不超过10
if (_cpuMaxQueue.Count >= 10)
{
_cpuMaxQueue.Dequeue();
}
_cpuMaxQueue.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _cpuMaxQueue);
CSRedisCacheHelper.Set("UDPPackage_CPUMax", arrayString, 10);
}
if (UDPPPP.CommandType == "UDPPackage_CPUMin")
{
// 维护队列长度不超过10
if (_cpuMinQueue.Count >= 10)
{
_cpuMinQueue.Dequeue();
}
_cpuMinQueue.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _cpuMinQueue);
CSRedisCacheHelper.Set("UDPPackage_CPUMin", arrayString, 10);
}
// 修改CPUAvg处理逻辑
if (UDPPPP.CommandType == "UDPPackage_CPUAvg")
{
// 维护队列长度不超过10
if (_cpuAvgQueue.Count >= 10)
{
_cpuAvgQueue.Dequeue();
}
_cpuAvgQueue.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _cpuAvgQueue);
CSRedisCacheHelper.Set("UDPPackage_CPUAvg", arrayString, 10);
CSRedisCacheHelper.Set("UDPPackage_DetectTime", DateTime.UtcNow.ToString("o"), 10);
}
if (UDPPPP.CommandType == "UDPPackage_TotalSendPackage")
{
// 维护队列长度不超过10
if (_TotalSendPackage.Count >= 10)
{
_TotalSendPackage.Dequeue();
}
_TotalSendPackage.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _TotalSendPackage);
CSRedisCacheHelper.Set("UDPPackage_TotalSendPackage", arrayString, 10);
}
if (UDPPPP.CommandType == "UDPPackage_TotalRecvPackage")
{
// 维护队列长度不超过10
if (_TotalRecvPackage.Count >= 10)
{
_TotalRecvPackage.Dequeue();
}
_TotalRecvPackage.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _TotalRecvPackage);
CSRedisCacheHelper.Set("UDPPackage_TotalRecvPackage", arrayString, 10);
}
if (UDPPPP.CommandType == "RCUOnLine")
{
// 维护队列长度不超过10
if (_RCUOnLine.Count >= 10)
{
_RCUOnLine.Dequeue();
}
_RCUOnLine.Enqueue(UDPPPP.TotalCount);
// 将队列转为逗号分隔字符串
string arrayString = string.Join(",", _RCUOnLine);
CSRedisCacheHelper.Set("RCUOnLine", arrayString, 10);
}
}
else if (k.Equals(KafkaKey.UDPPackageStepMonitor))
{
var collection = client.GetDatabase("udppackage").GetCollection<StepInfo_db>("zeroe_stepmonitor");
StepInfo UDPPPP = MyMessagePacker.FastDeserialize<StepInfo>(v);
StepInfo_db us = new StepInfo_db();
us.MessageId = UDPPPP.MessageId;
us.Step = UDPPPP.Step;
us.StepDescription = UDPPPP.StepDescription;
us.Content = UDPPPP.Content;
us.TriggerTime = DateTime.Parse(UDPPPP.TriggerTime);
us.TriggerTimeString = UDPPPP.TriggerTime;
us.EveryMessageId = UDPPPP.EveryMessageId;
us.Monitor_0E_01 = UDPPPP.Monitor_0E_01;
us.HostNumber = UDPPPP.HostNumber;
us.HotelCode = UDPPPP.HotelCode;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.UDPPackagePowerMonitor))
{
var collection = client.GetDatabase("udppackage").GetCollection<NengHao_db>("powermonitor");
NengHao UDPPPP = MyMessagePacker.FastDeserialize<NengHao>(v);
NengHao_db us = new NengHao_db();
us.Version = UDPPPP.Version;
us.HotelCode = UDPPPP.HotelCode;
us.HostNumber = UDPPPP.HostNumber;
us.Mac = UDPPPP.Mac;
us.EndPoint = UDPPPP.EndPoint;
us.V = UDPPPP.V;
us.A = UDPPPP.A;
us.P = UDPPPP.P;
us.Energy_Consumption = UDPPPP.Energy_Consumption;
us.Sum_Energy_Consumption = UDPPPP.Sum_Energy_Consumption;
us.IsTakeCard = UDPPPP.IsTakeCard;
us.CreateTime = UDPPPP.CreateTime;
us.ReportTime = DateTime.Parse(UDPPPP.ReportTime);
us.RoomNumber = UDPPPP.RoomNumber;
us.CarbonVIP = UDPPPP.CarbonVIP;
us.AllDeviceData = UDPPPP.AllDeviceData;
us.IdentityInfo = UDPPPP.IdentityInfo;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.IotMonitor))
{
var collection = client.GetDatabase("udppackage").GetCollection<IOTMonitorData_db>("voiceiotlog");
IOTMonitorData UDPPPP = MyMessagePacker.FastDeserialize<IOTMonitorData>(v);
IOTMonitorData_db us = new IOTMonitorData_db();
us.Step = UDPPPP.Step;
us.TriggerTime = UDPPPP.TriggerTime;
us.HotelCode = UDPPPP.HotelCode;
us.HotelId = UDPPPP.HotelId;
us.HotelName = UDPPPP.HotelName;
us.RoomNumber = UDPPPP.RoomNumber;
us.RequestId = UDPPPP.RequestId;
us.CommandDescription = UDPPPP.CommandDescription;
us.Platform = UDPPPP.Platform;
us.CreateTime = UDPPPP.CreateTime;
us.RemoteIP = UDPPPP.RemoteIP;
us.ControlClass = UDPPPP.ControlClass;
us.SceneName = UDPPPP.SceneName;
us.WhichOneDevice = UDPPPP.WhichOneDevice;
await collection.InsertOneAsync(us);
}
//
else if (k.Equals(KafkaKey.RCUOnLineStatus))
{
var collection = client.GetDatabase("udppackage").GetCollection<OnOffLineData_db>("rcustatuslog");
OnOffLineData UDPPPP = MyMessagePacker.FastDeserialize<OnOffLineData>(v);
OnOffLineData_db us = new OnOffLineData_db();
us.MAC = UDPPPP.MAC;
us.HostNumber = UDPPPP.HostNumber;
us.CurrentStatus = UDPPPP.CurrentStatus;
us.HotelCode = UDPPPP.HotelCode;
us.CurrentTime = UDPPPP.CurrentTime;
us.EndPoint = UDPPPP.EndPoint;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.InvokceThirdHttpInterface))
{
var collection = client.GetDatabase("udppackage").GetCollection<Interface3Log_db>("invokehttpinterfacelog");
Interface3Log UDPPPP = MyMessagePacker.FastDeserialize<Interface3Log>(v);
Interface3Log_db us = new Interface3Log_db();
us.HotelCode = UDPPPP.HotelCode;
us.HostNumber = UDPPPP.HostNumber;
us.RoomNumber = UDPPPP.RoomNumber;
us.TriggerTime = UDPPPP.TriggerTime;
us.CommandType = UDPPPP.CommandType;
us.ActionData = UDPPPP.ActionData;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.TakeCardStatus))
{
var collection = client.GetDatabase("udppackage").GetCollection<MTakeCardData_db>("takecardlog");
MTakeCardData UDPPPP = MyMessagePacker.FastDeserialize<MTakeCardData>(v);
MTakeCardData_db us = new MTakeCardData_db();
us.Status = UDPPPP.Status;
us.HostNUMBER = UDPPPP.HostNUMBER;
us.LastUpdateTime = UDPPPP.LastUpdateTime;
us.HotelCode = UDPPPP.HotelCode;
us.Status = UDPPPP.Status;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.ServiceInfoStatus))
{
var collection = client.GetDatabase("udppackage").GetCollection<OtherServiceInfo_db>("serviceinfolog");
OtherServiceInfo UDPPPP = MyMessagePacker.FastDeserialize<OtherServiceInfo>(v);
OtherServiceInfo_db us = new OtherServiceInfo_db();
us.HotelCode = UDPPPP.HotelCode;
us.HostNumber = UDPPPP.HostNumber;
us.Address = UDPPPP.Address;
us.StatusReceiver = UDPPPP.StatusReceiver;
us.LastUpdateTime = UDPPPP.LastUpdateTime;
await collection.InsertOneAsync(us);
}
else if (k.Equals(KafkaKey.PMSLogMonitor))
{
//Console.WriteLine("收到了 PMSData");
var collection = client.GetDatabase("udppackage").GetCollection<CheckInYuanShidata_db>("pmslog");
CheckInYuanShidata UDPPPP = MyMessagePacker.FastDeserialize<CheckInYuanShidata>(v);
CheckInYuanShidata_db us = new CheckInYuanShidata_db();
us.Step = UDPPPP.Step;
us.IP = UDPPPP.IP;
us.RequestId = UDPPPP.RequestId;
us.CurrentTime = UDPPPP.CurrentTime;
us.CommandType = UDPPPP.CommandType;
us.JianJieData = UDPPPP.JianJieData;
us.ZhiJieData = UDPPPP.ZhiJieData;
await collection.InsertOneAsync(us);
}
#region
else if (k.Equals(KafkaKey.RCUNewVersion_Register))
{
//P0~P3子网掩码4Byte
//P4~P7网关4Byte
//P8~P9RCU端口2Byte
//P10~P15Mac地址6Byte
//P16~P35软件版本号20Byte
//P36~P38配置版本号3Byte
//P39~P42DNS服务器1 IP4Byte
//P43~P58房型备注16Byte
//P59~P74房号备注16Byte
//P75~P78房型4Byte
//P79~P82房号4Byte
var collection = client.GetDatabase("udppackage").GetCollection<NewVersionHexData_db>("rcu_hexdata");
NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize<NewVersionHexData>(v);
//byte[] hexdata = Tools.GetBytesFromString(UDPPPP.HexData);
//var 子网掩码 = hexdata[0..4];
//string subnet_mask = String.Join(".", 子网掩码);//子网掩码
//var 网关 = hexdata[4..8];
//string gateway = string.Join(".", 网关);
//var RCU端口 = hexdata[8..10];
//int lan_port = BitConverter.ToInt32(RCU端口);//局域网端口
//var MAC = hexdata[10..16];
//var 软件版本号 = hexdata[16..36];
//var 配置版本 = hexdata[36..39];
//var DNS服务器 = hexdata[39..43];
//var 房型备注 = hexdata[43..59];
//var 房号备注 = hexdata[59..75];
//var 房型 = hexdata[75..79];
//var 房号 = hexdata[79..83];
//int ipType = reader.ReadByte();//IP类型
//string type_number = Encoding.GetEncoding("GBK").GetString(reader.ReadBytes(16)).Replace(@"\u0000", "").Replace("", "").Trim();//机型编码
//string lan_ip = String.Join(".", reader.ReadBytes(4));//局域网IP
//string server_ip = String.Join(".", reader.ReadBytes(4));//服务器IP
//string subnet_mask = String.Join(".", reader.ReadBytes(4));//子网掩码
//string gateway = String.Join(".", reader.ReadBytes(4));//网关
//int lan_port = BitConverter.ToInt32(reader.ReadBytes(4), 0);//局域网端口
//string dns = String.Join(".", reader.ReadBytes(4));//DNS
//string software_version = Encoding.GetEncoding("GBK").GetString(reader.ReadBytes(20)).Replace(@"\u0000", "").Replace("", "").Trim();//软件版本号
NewVersionHexData_db cv = new NewVersionHexData_db();
cv.HotelCode = UDPPPP.HotelCode;
cv.HostNumber = UDPPPP.HostNumber;
cv.RoomNumber = UDPPPP.RoomNumber;
cv.CurrentTime = UDPPPP.CurrentTime;
cv.RemoteEndPoint = UDPPPP.RemoteEndPoint;
cv.CmdType = UDPPPP.CmdType;
cv.HexData = UDPPPP.HexData;
cv.CurrentTime = UDPPPP.CurrentTime;
await collection.InsertOneAsync(cv);
}
else if (k.Equals(KafkaKey.RCUNewVersion_RestartReason))
{
var collection = client.GetDatabase("udppackage").GetCollection<NewVersionHexData_db>("rcu_hexdata");
NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize<NewVersionHexData>(v);
NewVersionHexData_db cv = new NewVersionHexData_db();
cv.HotelCode = UDPPPP.HotelCode;
cv.HostNumber = UDPPPP.HostNumber;
cv.RoomNumber = UDPPPP.RoomNumber;
cv.CurrentTime = UDPPPP.CurrentTime;
cv.RemoteEndPoint = UDPPPP.RemoteEndPoint;
cv.CmdType = UDPPPP.CmdType;
cv.HexData = UDPPPP.HexData;
cv.CurrentTime = UDPPPP.CurrentTime;
await collection.InsertOneAsync(cv);
}
else if (k.Equals(KafkaKey.RCUNewVersion_0E))
{
var collection = client.GetDatabase("udppackage").GetCollection<NewVersionHexData_db>("rcu_hexdata");
NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize<NewVersionHexData>(v);
int length = 30 - 15 - 2;
using MemoryStream stream = new MemoryStream(null, 15, length);
NewVersionHexData_db cv = new NewVersionHexData_db();
cv.HotelCode = UDPPPP.HotelCode;
cv.HostNumber = UDPPPP.HostNumber;
cv.RoomNumber = UDPPPP.RoomNumber;
cv.CurrentTime = UDPPPP.CurrentTime;
cv.RemoteEndPoint = UDPPPP.RemoteEndPoint;
cv.CmdType = UDPPPP.CmdType;
cv.HexData = UDPPPP.HexData;
cv.CurrentTime = UDPPPP.CurrentTime;
await collection.InsertOneAsync(cv);
}
else if (k.Equals(KafkaKey.RCUNewVersion_TakeCard))
{
var collection = client.GetDatabase("udppackage").GetCollection<NewVersionHexData_db>("rcu_hexdata");
NewVersionHexData UDPPPP = MyMessagePacker.FastDeserialize<NewVersionHexData>(v);
NewVersionHexData_db cv = new NewVersionHexData_db();
cv.HotelCode = UDPPPP.HotelCode;
cv.HostNumber = UDPPPP.HostNumber;
cv.RoomNumber = UDPPPP.RoomNumber;
cv.CurrentTime = UDPPPP.CurrentTime;
cv.RemoteEndPoint = UDPPPP.RemoteEndPoint;
cv.CmdType = UDPPPP.CmdType;
cv.HexData = UDPPPP.HexData;
cv.CurrentTime = UDPPPP.CurrentTime;
await collection.InsertOneAsync(cv);
}
else if (k.Equals(KafkaKey.RCUNewTimerData))
{
}
else
{
}
#endregion
//Console.WriteLine($"消费者1收到消息: [分区{cr.Partition}] " +
// $"偏移量{cr.Offset}");
//默认是开着的
//c.Commit(cr);
}
catch (ConsumeException e)
{
logger.Error("出错:" + e.Message);
Console.WriteLine(111111111111111);
}
catch (Exception ex)
{
logger.Error("Ex出错" + ex.Message);
Console.WriteLine(22222222222222);
}
}
}
catch (OperationCanceledException)
{
logger.Error("操作出错");
Console.WriteLine("操作错误");
}
catch (Exception ex)
{
Console.WriteLine("未知错误" + ex.Message);
logger.Error("未知错误" + ex.Message);
}
}
}
}
}