增加RCU上离线功能,能耗双通道

This commit is contained in:
2026-01-15 17:32:26 +08:00
parent b746a1da1a
commit 10bf712006
148 changed files with 8075 additions and 290 deletions

View File

@@ -1,8 +1,4 @@

using System.Diagnostics.Metrics;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using BLWData.Entity;
using BLWLogProduce.Models;
using Common;
@@ -11,11 +7,17 @@ using CommonTools;
using Confluent.Kafka;
using Google.Protobuf;
using MessagePack;
using Microsoft.CodeAnalysis.Host.Mef;
using Microsoft.Extensions.Caching.Memory;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NLog;
using RestSharp;
using System.Diagnostics.Metrics;
using System.Runtime.Intrinsics.X86;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using static CSRedis.CSRedisClient;
namespace BLWLogProduce.Services
@@ -363,6 +365,16 @@ namespace BLWLogProduce.Services
try
{
NengHao? poo = System.Text.Json.JsonSerializer.Deserialize<NengHao>(body);
if (string.IsNullOrEmpty(poo.HostNumber))
{
return;
}
if (poo.NengHaoList == null || poo.NengHaoList.Count == 0)
{
return;
}
logger.Error("能耗:" + body);
//string str= Newtonsoft.Json.JsonConvert.SerializeObject(poo);
//Console.WriteLine("收到了"+str);
byte[] qf = MyMessagePacker.FastSerialize(poo);
@@ -372,6 +384,7 @@ namespace BLWLogProduce.Services
await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = qf });
#region 使
List<DeviceData> la = new List<DeviceData>();
foreach (var item in poo.AllDeviceData)
{
@@ -389,31 +402,56 @@ namespace BLWLogProduce.Services
la.Add(dd);
}
List<SinglePowerChannelData> nenghaolist = new List<SinglePowerChannelData>();
if (poo.NengHaoList != null && poo.NengHaoList.Count > 0)
{
foreach (var item in poo.NengHaoList)
{
SinglePowerChannelData gs1 = new SinglePowerChannelData();
gs1.Address = item.address;
gs1.Dianliu = item.dianliu;
gs1.Dianya = item.dianya;
gs1.Gonglv = item.gonglv;
gs1.Nenghao = item.nenghao;
gs1.Zongnenghao = item.zongnenghao;
nenghaolist.Add(gs1);
}
}
//宝镜系统使用
EnergyConsumption ese = new EnergyConsumption();
ese.HotelCode = poo.HotelCode;
ese.HostNumber = poo.HostNumber;
ese.Mac = poo.Mac;
ese.EndPoint = poo.EndPoint;
ese.V = poo.V;
ese.A = poo.A;
ese.P = poo.P;
ese.EnergyConsumption_ = poo.Energy_Consumption;
ese.SumEnergyConsumption = poo.Sum_Energy_Consumption;
ese.PowerChannelList.AddRange(nenghaolist);
//ese.V = poo.V;
//ese.A = poo.A;
//ese.P = poo.P;
//ese.EnergyConsumption_ = poo.Energy_Consumption;
//ese.SumEnergyConsumption = poo.Sum_Energy_Consumption;
ese.CreateTime = poo.CreateTime;
ese.RoomNumber = poo.RoomNumber ?? "";
ese.IsTakeCard = poo.IsTakeCard;
ese.IsInsertCard = poo.IsInsertCard;
ese.CarbonVIP = poo.CarbonVIP;
ese.DeviceStatusList.AddRange(la);
ese.IdentityInfo = poo.IdentityInfo;
ese.CardEvent = poo.CardEvent;
ese.PMSStatus = poo.PMS_Status;
byte[] data = ese.ToByteArray();
string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic;
string DetailKey1 = KafkaKey.UDPPackagePowerMonitor;
if (poo.HotelCode == 1085)
{
logger.Error("能耗数据:" + body);
}
await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = data });
#endregion
}
catch (Exception ex)
{
@@ -467,6 +505,30 @@ namespace BLWLogProduce.Services
}));
var TSLog_DingYue = ("redis-tslog", new Action<SubscribeMessageEventArgs>(async (args) =>
{
string body = args.Body;
try
{
NewVersionLog? poo = System.Text.Json.JsonSerializer.Deserialize<NewVersionLog>(body);
poo.ts_ms = Tools.GetUnixTime_MS();
string TopicKey1 = KafkaKey.BLWLog4NodeJs_RCU_Topic;
string DetailKey1 = poo.comm_seq.ToString();
var jsonstr = JsonConvert.SerializeObject(poo);
await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = Encoding.UTF8.GetBytes(jsonstr) });
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
}
}));
#region
//var DingYue3_1 = ("redis-carbon_trigger", new Action<SubscribeMessageEventArgs>(async (args) =>
//{
@@ -558,21 +620,41 @@ namespace BLWLogProduce.Services
}));
//在线离线状态
//这里只会是上线,离线在过期事件中处理
//宝镜系统使用的
var DingYue6 = ("redis-on_off_line", new Action<SubscribeMessageEventArgs>(async (args) =>
{
try
{
string body = args.Body;
OnOffLineData? usa = System.Text.Json.JsonSerializer.Deserialize<OnOffLineData>(body);
byte[] bytes = MyMessagePacker.FastSerialize(usa);
CommonEntity.OnOffLineData? usa = System.Text.Json.JsonSerializer.Deserialize<CommonEntity.OnOffLineData>(body);
string TopicKey = KafkaKey.BLWLog_RCU_Topic;
string DetailKey = KafkaKey.RCUOnLineStatus;
if (string.IsNullOrEmpty(usa.EndPoint))
{
logger.Error("RCUOnOffLine:" + body);
return;
}
if (string.IsNullOrEmpty(usa.HostNumber))
{
logger.Error("RCUOnOffLine:" + body);
return;
}
BLWData.Entity.OnOffLineData ese = new BLWData.Entity.OnOffLineData();
ese.HotelCode = usa.HotelCode;
ese.HostNumber = usa.HostNumber;
ese.RoomNumber = usa.RoomNumber ?? "";
ese.CurrentStatus = usa.CurrentStatus;
ese.EndPoint = usa.EndPoint ?? "";
ese.Mac = usa.MAC ?? "";
ese.CurrentTime = usa.CurrentTime.ToString("yyyy-MM-dd HH:mm:ss.fff");
//var partition = new Partition(0); // 指定分区号
//var topicPartition = new TopicPartition(TopicKey, partition);
var dr = await p.ProduceAsync(TopicKey, new Message<string, byte[]> { Key = DetailKey, Value = bytes });
byte[] data = ese.ToByteArray();
string TopicKey1 = KafkaKey.BLWLog4BaoJing_RCU_Topic;
string DetailKey1 = KafkaKey.RCUOnLineStatus;
//var dr = await p.ProduceAsync(TopicKey1, new Message<string, byte[]> { Key = DetailKey1, Value = data });
}
catch (Exception ex)
{
@@ -767,9 +849,9 @@ namespace BLWLogProduce.Services
//string ti = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
DeviceActionChangeDataPush ese = new DeviceActionChangeDataPush();
ese.Code =long.Parse(poo.code);
ese.Code = poo.code;
ese.RoomNumber = poo.roomNumber;
ese.HostNumber ="";
ese.HostNumber = "";
ese.Devicetype = poo.devicetype;
ese.Address = poo.address;
ese.Brightness = poo.brightness;
@@ -828,6 +910,9 @@ namespace BLWLogProduce.Services
//CSRedisCacheHelper.redis3.Subscribe(DingYue3_1);
CSRedisCacheHelper.redis3.Subscribe(DingYue20);
//新版本日志
CSRedisCacheHelper.redis3.Subscribe(TSLog_DingYue);
}
catch (Exception ex)
{