Files
Web_BLVLOG_Server_Mvc_Prod/MQTTSub/Program.cs

735 lines
31 KiB
C#
Raw Normal View History

2025-11-20 16:20:04 +08:00
using System.Net;
using System.Text;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using Common;
using Commonlib;
using DAL.PGModels;
using ICSharpCode.SharpZipLib.Zip;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.VisualBasic;
using MQTTnet;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using NetMQ;
using NetMQ.Sockets;
using NLog;
using NPOI.OpenXmlFormats;
using RestSharp;
namespace MQTTSub
{
internal class Program
{
public static IMqttClient MqttClient { get; set; }
public static MemoryCache _memoryCache = new MemoryCache(new MemoryCacheOptions { });
public static int Connecting = 0;
private static IServiceProvider _serviceProvider; // 需在程序初始化时赋值
private static Channel<string> _messageChannel;
private static Channel<string> _messageChannel_1;
public static IConfiguration Configuration;
static void Main(string[] args)
{
//PostWebRequestToTianMao("0EC0", "4979cf8db1ec46f48404e0d907b84719");
//PostWebRequestToTianMao("0006", "4979cf8db1ec46f48404e0d907b84719");
//PostWebRequestToTianMao("8088", "c3272aafc4224e27b023f1ea60478d61");
//PostWebRequestToTianMaoOld("PushWelcome",
//Newtonsoft.Json.JsonConvert.SerializeObject(new { HotelId ="4979cf8db1ec46f48404e0d907b84719", RoomNo = "0EC0", WelcomeText = "你好,欢迎你入住" }));//通知天猫精灵播放欢迎词
//return;
//string AAA = "AA 55 20 00 54 33 53 41 0E 5F DF 1F 08 77 A1 00 00 00 00 00 00 00 01 04 00 11 00 01 00 00 2E BE";
//AAA = AAA.Replace(" ", "");
//byte[] buffer = Tools.HEXString2ByteArray(AAA);
//int offset = 15;
//int length = buffer.Length - offset - 2;
//if (length > 0)
//{
// using (MemoryStream stream = new MemoryStream(buffer, offset, length))
// {
// string NNN = DecodeRoomStatus(stream, "1001", "2079", buffer).Result;
// }
//}
//return;
// 构建配置
Configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json")
.Build();
var Services = new ServiceCollection();
var ddda = Configuration["TianMaoCUID:1018"];
//var options = new DbContextOptionsBuilder<PostgresContext>()
//.UseNpgsql(Configuration.GetConnectionString("DefaultConnection")).Options;
////AddDbContext默认已经将DbContext注册为Scoped生命周期再次显式注册会导致冲突
Services.AddDbContext<PostgresContext>(options =>
{
options.UseNpgsql(Configuration.GetConnectionString("DefaultConnection"));
});
_serviceProvider = Services.BuildServiceProvider();
// 创建无界Channel
_messageChannel = Channel.CreateUnbounded<string>();
//另外的Channel
_messageChannel_1 = Channel.CreateUnbounded<string>();
_ = Start_MqttClient();
Task.Factory.StartNew(async () =>
{
await ConsumeMessagesAsync();
}, TaskCreationOptions.LongRunning);
Task.Factory.StartNew(async () =>
{
await ConsumeMessagesAsync_1();
}, TaskCreationOptions.LongRunning);
Console.WriteLine("启动了");
Console.ReadLine();
}
public static async Task ConsumeMessagesAsync_1()
{
while (true)
{
if (_messageChannel_1.Reader.TryRead(out var nnn))
{
try
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<PostgresContext>();
if (string.IsNullOrEmpty(nnn))
{
return;
}
MonitorLog? igs = System.Text.Json.JsonSerializer.Deserialize<MonitorLog>(nnn);
DAL.PGModels.DevMonitorlog ddd = new DAL.PGModels.DevMonitorlog();
ddd.HostId = igs.HostID;
ddd.HotelId = igs.HotelID;
ddd.HotelCode = igs.HotelCode.ToString();
ddd.RoomNo = igs.RoomNo;
ddd.HostNumber = igs.HostNumber;
ddd.Mac = igs.MAC;
ddd.LanIp = igs.LanIP;
ddd.LanPort = igs.LanPort;
ddd.WwwIp = igs.WWW_IP;
ddd.WwwPort = igs.WWW_Port;
DateTime dt = DateTime.Parse(igs.CreateTime);
ddd.CreateTime = dt;
long lllgs = Tools.ToUnixTimestampByMilliseconds(DateTime.Parse(igs.CreateTime));
ddd.CreateTimeToUnixTime = lllgs;
ddd.Data = igs.Data;
ddd.CommandType = igs.CommandType;
ddd.SendOrReceive = igs.SendOrReceive;
dbContext.DevMonitorlogs.Add(ddd);
await dbContext.SaveChangesAsync();
}
catch (Exception ex)
{
}
}
}
}
// 消费者方法
public static async Task ConsumeMessagesAsync()
{
while (true)
{
if (_messageChannel.Reader.TryRead(out var nnn))
{
try
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<PostgresContext>();
if (string.IsNullOrEmpty(nnn))
{
return;
}
MonitorLog? igs = System.Text.Json.JsonSerializer.Deserialize<MonitorLog>(nnn);
DAL.PGModels.DevMonitorlog ddd = new DAL.PGModels.DevMonitorlog();
string MYTianMaoCUID = igs.TiaoMaoCUID;
string XiaoDuCUID = igs.XiaoDuCUID;
ddd.HostId = igs.HostID;
ddd.HotelId = igs.HotelID;
ddd.HotelCode = igs.HotelCode.ToString();
ddd.RoomNo = igs.RoomNo;
ddd.HostNumber = igs.HostNumber;
ddd.Mac = igs.MAC;
ddd.LanIp = igs.LanIP;
ddd.LanPort = igs.LanPort;
ddd.WwwIp = igs.WWW_IP;
ddd.WwwPort = igs.WWW_Port;
DateTime dt = DateTime.Parse(igs.CreateTime);
//DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
ddd.CreateTime = dt;
long lllgs = Tools.ToUnixTimestampByMilliseconds(DateTime.Parse(igs.CreateTime));
ddd.CreateTimeToUnixTime = lllgs;
string hexstring = igs.Data;
hexstring = hexstring.Replace(" ", "");
byte[] buffer = Tools.HEXString2ByteArray(hexstring);
#region
//下面是心跳包
//AA 55
//17 00
//54 33 53 41
//02
//D6 92
//E4 07 67 18
//34 D0 B8 11 67 18
//1C E7
//下面是这个状态同步
//AA 55 38 00
//54 33 53 41
//0E
//EC 8C
//E4 07
//67 B4
//00 00 00 00 00 00 00
//05
//04 00 01 00 02 00
//04 00 03 00 02 00
//04 00 02 00 02 00
//04 00 08 00 02 00
//04 00 0D 00 02 00 00
//E6 90
#endregion
var CommandType = buffer[8];
if (igs.CommandType.Equals("客房状态同步"))
{
int offset = 15;
int length = buffer.Length - offset - 2;
if (length > 0)
{
using (MemoryStream stream = new MemoryStream(buffer, offset, length))
{
string NNN = await DecodeRoomStatus(stream, ddd.RoomNo, ddd.HotelCode, buffer, ddd.HotelId, MYTianMaoCUID, XiaoDuCUID, dbContext);
if (ddd.HotelCode.Equals("1197"))
{
_logger.Error("1197解析出的数据为" + NNN);
}
ddd.AnalysisData = NNN;
}
}
}
//不是青奥酒店 的数据才处理
//if (!ddd.HotelCode.Equals("1197"))
if (true)
{
ddd.Data = igs.Data;
ddd.CommandType = igs.CommandType;
ddd.SendOrReceive = igs.SendOrReceive;
dbContext.DevMonitorlogs.Add(ddd);
//这里的MAC可能有重复所以 使用First不使用Single
var ccc = dbContext.StatisticsTotals.FirstOrDefault(A => A.Mac.Equals(igs.MAC));
if (ccc != null)
{
if (igs.SendOrReceive.Equals(ConstKey.Receive_RX))
{
ccc.RxCount += 1;
}
else if (igs.SendOrReceive.Equals(ConstKey.Send_TX))
{
ccc.TxCount += 1;
}
ccc.UpdateTime = igs.CreateTime;
dbContext.StatisticsTotals.Update(ccc);
await dbContext.SaveChangesAsync();
}
else
{
DAL.PGModels.StatisticsTotal ccc1 = new DAL.PGModels.StatisticsTotal();
ccc1.Mac = igs.MAC;
ccc1.RxCount = 1;
ccc1.TxCount = 1;
ccc1.UpdateTime = igs.CreateTime;
ccc1.HotelCode = igs.HotelCode.ToString();
ccc1.HostNumber = igs.HostNumber;
dbContext.StatisticsTotals.Add(ccc1);
await dbContext.SaveChangesAsync();
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Publish failed: {ex.Message}");
Console.WriteLine(ex.StackTrace);
}
}
}
}
async private static Task Start_MqttClient()
{
try
{
var mqttFactory = new MqttClientFactory();
string? ip = "120.24.73.62";
int? port = 1883;
string? username = "blw";
string? pwd = "blw@1234";
long lid = Tools.ToUnixTimestampBySeconds(DateTime.Now);
string ID = $"logmonitor_sub_{lid}";
MqttClient = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(ID)
.WithTcpServer(ip, port)
.WithCredentials(username, pwd)
.WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.Build();
MqttClient.ApplicationMessageReceivedAsync -= MqttClient_ApplicationMessageReceivedAsync;
MqttClient.DisconnectedAsync -= MqttClient_DisconnectedAsync;
MqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
MqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
await MqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var subscribeOptions = new MqttClientSubscribeOptions();
var topicFilter = new MqttTopicFilter
{
Topic = "blw/log/report"
};
var topicFilter1 = new MqttTopicFilter
{
Topic = "blw/rcu/monitor/endpoint"
};
subscribeOptions.TopicFilters.Add(topicFilter);
subscribeOptions.TopicFilters.Add(topicFilter1);
await MqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
}
catch (Exception ex)
{
Console.WriteLine("Start_MqttClient: " + ex.Message);
}
}
async private static Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
try
{
Console.WriteLine("断线重连了");
await Task.Delay(10000);
await Start_MqttClient();
}
catch (Exception)
{
}
}
async private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
string topic = arg.ApplicationMessage.Topic;
var str = arg.ApplicationMessage.Payload;
string nnn = Encoding.UTF8.GetString(str);
try
{
if (topic.Equals("blw/rcu/monitor/endpoint"))
{
}
else
{
await _messageChannel.Writer.WriteAsync(nnn);
}
}
catch (Exception ex)
{
//Console.WriteLine("错误数据为:"+nnn);
Console.WriteLine("MqttClient_ApplicationMessageReceivedAsync: " + ex.Message);
}
await Task.CompletedTask;
}
async private static Task<string> (string RoomNumber, string HotelCode)
{
string Key = "QueryRoomType_" + RoomNumber + "_" + HotelCode;
string MyData = "";
var Data = _memoryCache.Get<string>(Key);
if (Data != null)
{
MyData = Data;
}
else
{
//第一步先查出是什么房型
Dictionary<string, string> dic = new Dictionary<string, string>();
dic.Add("roomnumber", RoomNumber);
dic.Add("hotelcode", HotelCode);
//返回的是var G = new { RoomTypeId = AA.ID, RoomTypeName = AA.Name };
var GGG = await Http.HttpPostFormSendData(dic, "/api/GetRoomTypeInfo");
var GGG1 = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, object>>(GGG.response.ToString());
string jieguo = GGG1["resDesc"].ToString();
if (jieguo.Equals("查询成功"))
{
var DDD1 = System.Text.Json.JsonSerializer.Deserialize<TempResult>(GGG.response.ToString());
string RoomTypeID = DDD1.Result.RoomTypeId.ToString();
_memoryCache.Set<string>(Key, RoomTypeID, new TimeSpan(0, 30, 0));
MyData = RoomTypeID;
}
}
return MyData;
}
async private static Task<List<DeviceInfo>> (string HotelCode, string RoomTypeID)
{
string Key = "HuoQuHuoLuXinXi_" + HotelCode;
int RoomTypeIDInt = 0;
int.TryParse(RoomTypeID, out RoomTypeIDInt);
HotelInfo data = new HotelInfo() { code = HotelCode };
WebAPIData MyWeb = null;
var Data = _memoryCache.Get<WebAPIData>(Key);
if (Data != null)
{
MyWeb = Data;
}
else
{
//第二步查询出所属房型的回路数据
var QQQ = await Http.HttpGetSendData(data, "/api/GetRoomTypeAndModalsListLog");
var QQQ1 = System.Text.Json.JsonSerializer.Deserialize<WebAPIData>(QQQ.response.ToString());
_memoryCache.Set<WebAPIData>(Key, QQQ1, new TimeSpan(0, 20, 0));
MyWeb = QQQ1;
}
var QQQ2 = MyWeb.Result.FirstOrDefault(A => A.ID == RoomTypeIDInt);
var QQQ3 = QQQ2.Modals;
return QQQ3;
}
/// <summary>
/// 分析数据
/// </summary>
/// <param name="stream"></param>
/// <param name="RoomNumber"></param>
/// <param name="HotelCode"></param>
/// <returns></returns>
async private static Task<string> DecodeRoomStatus(Stream stream, string RoomNumber, string HotelCode, byte[] bbbfff, int? hotelid, string WoDeTianMao, string XiaoDu, PostgresContext dbcontext)
{
Dictionary<string, DeviceStatusData> l = new Dictionary<string, DeviceStatusData>();
try
{
using (BinaryReader reader = new BinaryReader(stream))
{
var SysLock = reader.ReadBoolean(); //RCU是否锁定0/否1/是
var CardType = reader.ReadByte(); //房卡类型0/无人1/有人2/客人3/经理4/服务员
var Door = reader.ReadBoolean(); //门磁开关1/开2/关
var ElecQty = reader.ReadUInt16(); //门锁电量单位MV
var HostTemp = reader.ReadByte(); //主机温度
//空调数量现在不关心
//空调数量默认0新的回路方式此处无用兼容老版本
int airConditionNumber = reader.ReadByte();
for (int i = 0; i < airConditionNumber; i++)
{
byte airNo = reader.ReadByte();
}
//现在设备数量只关心
//门磁
//红外 雷达 取电 断电 离线 在线
//设备状态数量,包含所有回路状态
int deviceNumber = reader.ReadByte();
List<DeviceTemp> DevicetempList = new List<DeviceTemp>();
List<string> = new List<string>();
.Add("服务信息");
long originalPosition = reader.BaseStream.Position;
//那究竟哪些是门磁,哪些是红外
for (int i = 0; i < deviceNumber; i++)
{
try
{
var deviceAddress = reader.ReadBytes(4);
var Status = reader.ReadUInt16();
originalPosition = reader.BaseStream.Position;
string RoomTypeID = await (RoomNumber, HotelCode);
List<DeviceInfo> QQQ3 = await (HotelCode, RoomTypeID);
string AddressStr = "";
if (deviceAddress[0] == 0x00)//指令场景地址
{
AddressStr = string.Format("{0}.{1}.{2}", deviceAddress[1], deviceAddress[2], deviceAddress[3]);
}
else
{
ushort type = BitConverter.ToUInt16(deviceAddress, 2);
AddressStr = string.Format("{0:000}{1:000}{2:000}", deviceAddress[0], deviceAddress[1], type);
}
DeviceTemp d = new DeviceTemp();
d.Address = AddressStr;
d.Status = Status;
DevicetempList.Add(d);
//获取监控的列表
foreach (string item in )
{
//从房型回路信息中查找
var LLL1 = QQQ3.Where(A => A.TypeName.Equals(item)).Select(A => new { A.ModalAddress, A.Name });
var bf = LLL1.Where(A => A.ModalAddress.Equals(AddressStr)).ToList();
if (bf.Count > 0)
{
var dname = bf.FirstOrDefault()?.Name;
DeviceStatusData n = new DeviceStatusData() { Address = AddressStr, Status = Status, DeviceType = item, DeviceName = dname };
if (l.TryGetValue(AddressStr, out DeviceStatusData VVV))
{
l[AddressStr] = n;
}
else
{
l.Add(AddressStr, n);
}
}
}
}
catch (Exception)
{
// 发生异常时回退到原始位置
reader.BaseStream.Position = originalPosition;
}
}
////故障数量默认0
//int faultNumber = reader.ReadByte();
//for (int i = 0; i < faultNumber; i++)
//{
//}
if (HotelCode.Equals("1197"))
{
string UKey = string.Format("TianMaoCUID:{0}", RoomNumber);
//WoDeTianMao = "2a54da28396f4b6fb280f6a7f2d0dc25&1018";
WoDeTianMao = Configuration[UKey];
Qingao q1 = new Qingao();
q1.CurrentTime = DateTime.Now;
q1.HotelCode = HotelCode;
q1.RoomNumber = RoomNumber;
//Console.WriteLine("TianMaoCUID:" + WoDeTianMao);
if (DevicetempList.All(A => A.Address.StartsWith("004000")))
{
//这个是定期上报,触发数据读写,但是不触发欢迎词
if (deviceNumber == 5 || deviceNumber == 6)
{
var QQQ = DevicetempList.FirstOrDefault(A => A.Address.Equals("004000001"));
q1.IsTriggerWelcomeMsg = false;
if (QQQ != null)
{
q1.TakeCardStatus = QQQ.Status.ToString();
}
_logger.Error("1197收到定期上报的数据:" + RoomNumber);
}
}
else
{
_logger.Error("1197收到上报的数据:" + RoomNumber + " 天猫CUID: " + WoDeTianMao);
if (!string.IsNullOrEmpty(WoDeTianMao))
{
string ReallyCUID = WoDeTianMao.Split('&')[0];
string RoomNo_T = WoDeTianMao.Split('&')[1];
var QQQ = DevicetempList.FirstOrDefault(A => A.Address.Equals("004000001"));
//这个是取电开关数据
if (QQQ != null)
{
_logger.Error("收到取电上报数据:" + RoomNumber + " 地址:" + QQQ.Address + " 状态:" + QQQ.Status);
q1.TakeCardStatus = QQQ.Status.ToString();
if (QQQ.Status == 1)
{
q1.IsTriggerWelcomeMsg = true;
PostWebRequestToTianMao(RoomNo_T, ReallyCUID);
}
//string KKK = HotelCode + "_" + RoomNumber + "_" + QQQ.Address;
//var PPP = CSRedisCacheHelper.Get<DeviceTemp>(KKK);
//if (PPP != null)
//{
// ushort TakeOutStatus = PPP.Status;
// //如果原来是2现在是1 就触发欢迎词
// if (TakeOutStatus == 2)
// {
// if (QQQ.Status == 1)
// {
// PostWebRequestToTianMao(RoomNumber, t.TianMao);
// }
// }
// CSRedisCacheHelper.Set<DeviceTemp>(KKK, QQQ);
//}
//else
//{
// CSRedisCacheHelper.Set<DeviceTemp>(KKK, QQQ);
//}
}
}
}
if (!string.IsNullOrEmpty(q1.TakeCardStatus))
{
dbcontext.Qingaos.Add(q1);
await dbcontext.SaveChangesAsync();
}
}
}
}
catch (Exception ex)
{
var G = Tools.ByteToString(bbbfff);
Console.WriteLine("解析数据出错了: " + System.Text.Json.JsonSerializer.Serialize(G));
Console.WriteLine(ex.StackTrace);
}
var ll = l.Values.ToList();
return System.Text.Json.JsonSerializer.Serialize(ll);
}
private const string apiURL = "http://pms.boonlive-rcu.com:90";
private const string accessKeyId = "5d36d736c7866d47600d87d6a881adaa";
private const string accessKeySecret = "b4f94f725b2417dfbc4703dd457087f9";
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
public static void PostWebRequestToTianMao(string roomNumber, string tiaomaocuid)
{
//2a54da28396f4b6fb280f6a7f2d0dc25&7001
try
{
var options = new RestClientOptions(apiURL);
var client = new RestClient(options);
Dictionary<string, string> d1 = new Dictionary<string, string>();
d1.Add("HotelId", tiaomaocuid);
d1.Add("RoomNo", roomNumber);
d1.Add("WelcomeText", "贵宾,您好! 欢迎光临青澳国际酒店,我是您的智能语音助手:天猫精灵。调节房间开关是我的特长,您只需要呼叫:天猫精灵,并发出指令,我就可以为您调节室内温度、开关空调、电视、灯光、窗帘等,我很乐意为您提供帮助,期待与您一起度过一段美妙的时光,再见!");
string jsonData = System.Text.Json.JsonSerializer.Serialize(d1);
Dictionary<string, string> dic = new Dictionary<string, string>();
dic.Add("accessKeyId", accessKeyId);
dic.Add("accessKeySecret", accessKeySecret);
dic.Add("jsonData", jsonData);
var request = new RestRequest("/home/PushWelcome");
//request.AddHeader("Content-Type", "application/json");
var data = System.Text.Json.JsonSerializer.Serialize(dic);
request.AddJsonBody(dic);
//request.AddStringBody(data, DataFormat.Json);
RestResponse response = client.Post(request);
string? hotellist = response.Content;
Console.WriteLine(hotellist);
}
catch (Exception ex)
{
_logger.Error("");
}
}
public static void PostWebRequestToTianMaoOld(string method, string jsonData)
{
try
{
string result = PostWebRequest(apiURL + method,
Newtonsoft.Json.JsonConvert.SerializeObject(new
{
accessKeyId = accessKeyId,
accessKeySecret = accessKeySecret,
jsonData = jsonData
}));
}
catch (Exception ex)
{
}
}
public static string PostWebRequest(string url, string postData)
{
string result = string.Empty;
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "application/json";
byte[] data = Encoding.UTF8.GetBytes(postData);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream())
{
reqStream.Write(data, 0, data.Length);
}
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
using (Stream stream = resp.GetResponseStream())
{
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8))
{
result = reader.ReadToEnd();//获取响应内容
}
}
return result;
}
}
public record DeviceTemp
{
public string Address { get; set; }
public ushort Status { get; set; }
}
}