using Microsoft.EntityFrameworkCore.Metadata; using Newtonsoft.Json; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Microsoft.VisualBasic; using IotManager.Controllers; using NLog; using ViewModels; using System.Threading.Channels; using MySQLAccess.PGModels; using Microsoft.EntityFrameworkCore; using CommonEntity; using SixLabors.ImageSharp.PixelFormats; using System.Net.WebSockets; using SixLabors.ImageSharp.Processing.Processors.Quantization; using Microsoft.Scripting.Utils; using Newtonsoft.Json.Converters; using Microsoft.Extensions.Caching.Memory; using IotManager.Common; using Microsoft.AspNetCore.Routing.Tree; using Common; namespace IotManager.services { public class Mqtt2RabbitMQ : BackgroundService { private readonly Logger _logger = LogManager.GetCurrentClassLogger(); private readonly IConfiguration Configuration; private readonly IDbContextFactory _factory; private IMemoryCache _cache { get; set; } public Mqtt2RabbitMQ(IConfiguration config, IDbContextFactory db, IMemoryCache cache) { this.Configuration = config; this._factory = db; this._cache = cache; } //public static byte[] IV = new byte[] { 0xf0, 0xf1, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, 0xf9, 0xfa, 0xfb, 0xfc, 0xfd, 0xfe, 0xff }; //public static byte[] key = new byte[] { 0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c }; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { string? UserHost = Configuration["RabbitMQConfig:RabbitMQHost"]; string? Port = Configuration["RabbitMQConfig:RabbitMQPort"]; string? UserName = Configuration["RabbitMQConfig:RabbitMQUserName"]; string? Password = Configuration["RabbitMQConfig:RabbitMQPWD"]; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = UserHost; factory.UserName = UserName; factory.Password = Password; factory.VirtualHost = "/"; using var connection = await factory.CreateConnectionAsync(); connection.ConnectionShutdownAsync -= Connection_ConnectionShutdownAsync; connection.ConnectionShutdownAsync += Connection_ConnectionShutdownAsync; using var channel0 = await connection.CreateChannelAsync(); using var channel1 = await connection.CreateChannelAsync(); using var channel2 = await connection.CreateChannelAsync(); using var channel3 = await connection.CreateChannelAsync(); using var channel4 = await connection.CreateChannelAsync(); //await channel0.QueueDeclareAsync(queue: "mqtt_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //await channel1.QueueDeclareAsync(queue: "tcp_logdata_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //await channel2.QueueDeclareAsync(queue: "tcp_data_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //await channel3.QueueDeclareAsync(queue: "tcp_connect_logdata", durable: true, exclusive: false, autoDelete: false, arguments: null); //await channel4.QueueDeclareAsync(queue: "tcp_send_data", durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer0 = new AsyncEventingBasicConsumer(channel0); var consumer1 = new AsyncEventingBasicConsumer(channel1); var consumer2 = new AsyncEventingBasicConsumer(channel2); var consumer3 = new AsyncEventingBasicConsumer(channel3); var consumer4 = new AsyncEventingBasicConsumer(channel4); consumer0.ReceivedAsync -= Consumer0_ReceivedAsync; consumer0.ReceivedAsync += Consumer0_ReceivedAsync; consumer1.ReceivedAsync -= Consumer1_ReceivedAsync; consumer1.ReceivedAsync += Consumer1_ReceivedAsync; consumer2.ReceivedAsync -= Consumer2_ReceivedAsync; consumer2.ReceivedAsync += Consumer2_ReceivedAsync; consumer3.ReceivedAsync -= Consumer3_ReceivedAsync; consumer3.ReceivedAsync += Consumer3_ReceivedAsync; consumer4.ReceivedAsync -= Consumer4_ReceivedAsync; consumer4.ReceivedAsync += Consumer4_ReceivedAsync; var a = channel0.BasicConsumeAsync(queue: "mqtt_queue", autoAck: false, consumer: consumer0); var b = channel1.BasicConsumeAsync(queue: "tcp_logdata_queue", autoAck: false, consumer: consumer1); var c = channel2.BasicConsumeAsync(queue: "tcp_data_queue", autoAck: false, consumer: consumer2); var d = channel3.BasicConsumeAsync(queue: "tcp_connect_logdata", autoAck: false, consumer: consumer3); var e = channel4.BasicConsumeAsync(queue: "tcp_send_data", autoAck: false, consumer: consumer4); await Task.WhenAll(a, b, c, d, e); while (!stoppingToken.IsCancellationRequested) { await Task.Delay(2000,stoppingToken); } } catch (Exception e1) { } _logger.Error("这里不会执行的"); await Task.Delay(5000); } } public class TCPData { public string? endpoint { get; set; } public string? data { get; set; } public long reporttimestamp { get; set; } } /// /// 燃气报警 /// public static byte[] cmd_buning_alert = new byte[] { 0x03, 0x00 }; public static byte[] cmd_e = new byte[] { 0x02, 0x00 }; /// /// 注册命令 /// public static byte[] cmd_register = new byte[] { 0x01, 0x00 }; /// /// 报警电话 /// /// /// /// private async Task 发送短信打电话(uint DeviceClientId, List? QQQ1) { double UIA = 3; foreach (var item in QQQ1) { MsgData ss = new MsgData(); ss.PhoneNumber = item.ToString(); ss.CallerName = "IOT平台"; ss.Content = string.Format("【能泰科技】您的燃气报警器【{0}】触发警报,请尽快 确认处理!", DeviceClientId); ss.StartingPoint = Tools.ToUnixTimestampBySeconds(DateTime.Now).ToString(); ss.DeadLine = Tools.ToUnixTimestampBySeconds(DateTime.Now.AddMinutes(UIA)).ToString(); ss.Type = "2"; await MsgSend.SendMsg(ss); _logger.Error("手机号" + item.ToString() + "发送了短信"); MsgData ss1 = new MsgData(); ss1.PhoneNumber = item.ToString(); ss1.CallerName = "IOT平台"; ss1.Content = "能泰科技 您的燃气报警器触发警报 请尽快处理"; ss1.StartingPoint = Tools.ToUnixTimestampBySeconds(DateTime.Now).ToString(); ss1.DeadLine = Tools.ToUnixTimestampBySeconds(DateTime.Now.AddMinutes(UIA)).ToString(); ss1.Type = "1"; await MsgSend.SendMsg(ss1); _logger.Error("手机号" + item.ToString() + "打了电话"); UIA = UIA + 2; } } private async Task Connection_ConnectionShutdownAsync(object sender, ShutdownEventArgs args) { //if (args.Initiator != ShutdownInitiator.Application) //{ // await Task.Delay(5000); //} await Task.CompletedTask; } /// /// TCP 数据解析 /// /// /// /// private async Task Consumer2_ReceivedAsync(object sender, BasicDeliverEventArgs event_args) { string BODYStr = ""; try { // 获取 channel 对象 var consumer = (AsyncEventingBasicConsumer)sender; var channel = consumer.Channel; await channel.BasicAckAsync(event_args.DeliveryTag, false); var body = event_args.Body; //AA 30 00 //18 00 //00 //FF FF FF FF //A3 31 DA 24 3B B3 7E 1D F4 F2 E1 CB 5A C8 8A 91 45 2D A8 72 14 AC 8A B2 C3 69 61 42 F9 F8 //CRC //AA B9 //尾标志 //2D 2D 0D 0A 2D 2D var message = Encoding.UTF8.GetString(body.ToArray()); BODYStr = message; TCPData TTT = System.Text.Json.JsonSerializer.Deserialize(message); long report_time = TTT.reporttimestamp; DateTime dangqian_time = Tools.ToLocalTimeDateBySeconds(report_time); string TcpEndPoint = TTT.endpoint; string[] data = TcpEndPoint.Split(':'); string ip = data[0]; short port = 0; short.TryParse(data[1], out port); var NNN1 = Tools.HEXString2ByteArray(TTT.data); string OStr = Tools.ByteToString(NNN1); //头 byte Header = NNN1[0]; //从1到3之间的元素,不包括3 byte[] Len = NNN1[1..3]; //SN号 byte[] Pack_SN = NNN1[3..5]; ushort Pack_XUHAO = BitConverter.ToUInt16(Pack_SN.ToArray(), 0); //重发次数 byte RetryCount = NNN1[5]; //设备编号 byte[] DeviceOnlyNo = NNN1[6..10]; //ClientId uint DeviceClientId = BitConverter.ToUInt32(DeviceOnlyNo); Console.WriteLine("Client是:" + DeviceClientId); //ushort llf = (ushort)NNN1.Length; //IV内容组成:Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + Pack_END //List IIV = new List(); //IIV.Add(0xAA); //IIV.AddRange(BitConverter.GetBytes(llf)); //IIV.AddRange(Pack_SN); //IIV.Add(0x00); //IIV.AddRange(DeviceOnlyNo); //IIV.AddRange(new byte[] { 0x2D, 0x2D, 0x0D, 0x0A, 0x2D, 0x2D }); //byte[] New_IV = IIV.ToArray(); //Console.WriteLine("IV是" + Tools.ByteToString(New_IV)); //AA 30 00 80 01 00 FF FF FF FF 5F C5 64 B3 93 50 FA 3F 48 D0 E9 C0 DD 7E 7C 62 10 E4 AD 4C EA 0D 04 FD 39 4C A7 77 40 18 7F 31 2D 2D 0D 0A 2D 2D byte[] GGJ = NNN1[10..^8]; //加密key byte[] key = new byte[] { }; //byte[] key = new byte[] { 0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c }; await using var context = _factory.CreateDbContext(); string Kkey = ConstValue.OnOffLineKey + "_" + TcpEndPoint; var CID = CSRedisCacheHelper.Get(Kkey); if (CID != 0) { } else { CSRedisCacheHelper.Set(Kkey, DeviceClientId); } //await NewMethod(TcpEndPoint, TcpEndPoint, context); string FinallyKey = ""; string cachekey = "SecretKey_" + DeviceClientId.ToString(); string str = _cache.Get(cachekey); if (!string.IsNullOrEmpty(str)) { FinallyKey = str; } else { var SinData = await context.Deviceinfos.SingleOrDefaultAsync(A => A.ClientId == DeviceClientId); if (SinData != null) { FinallyKey = SinData.SecretKey; _cache.Set(cachekey, FinallyKey, DateTimeOffset.Now.AddMinutes(20)); } } if (!string.IsNullOrEmpty(FinallyKey)) { key = Encoding.ASCII.GetBytes(FinallyKey); } //string HHS= Tools.ByteToString(key); //Console.WriteLine("Key 使用的是:"+HHS); //AES_CTR forDecrypting = new AES_CTR(key, New_IV); //byte[] decryptedContent = new byte[GGJ.Length]; //forDecrypting.DecryptBytes(decryptedContent, GGJ); //byte[] decryptedContent = GGJ; byte[] decryptedContent = Tools.IHIOT_Message_Content_Encrypt(Pack_XUHAO, key, GGJ); string DStr = Tools.ByteToString(decryptedContent); Console.WriteLine("解密结果:" + DStr); //命令字 byte[] cmd = decryptedContent[0..2]; Console.WriteLine("命令字为A:" + Tools.ByteToString(cmd)); Console.WriteLine("命令字为B:" + Tools.ByteToString(cmd_e)); List xianshi = new List(); xianshi.Add(Header); xianshi.AddRange(Len); xianshi.AddRange(Pack_SN); xianshi.Add(RetryCount); xianshi.AddRange(DeviceOnlyNo); xianshi.AddRange(decryptedContent); byte[] CRCCode_F = NNN1[^8..^6]; byte[] Tail_F = NNN1[^6..]; xianshi.AddRange(CRCCode_F); xianshi.AddRange(Tail_F); string xianshi_str = Tools.ByteToString(xianshi.ToArray()); if (MqttClientHostedService._mqttClient != null && MqttClientHostedService._mqttClient.IsConnected) { try { string endppp = TTT.endpoint; string topic = string.Format("emqtt/tcp/" + endppp); string payload = ""; var payloadbyte = new List(); payloadbyte.Add(Header); payloadbyte.AddRange(Len); payloadbyte.AddRange(Pack_SN); payloadbyte.Add(RetryCount); payloadbyte.AddRange(DeviceOnlyNo); payloadbyte.AddRange(decryptedContent); string sss = Tools.ByteToString(payloadbyte.ToArray()); _logger.Error("MQTT Pub" + sss); await MqttClientHostedService.PublishAsync(topic, xianshi_str); } catch (Exception) { } } //如果是注册命令 if (cmd.ValueEquals(cmd_register)) { //P0:启动原因 //P1~P32:软件版本号(32Byte) //P33~P48:IMEI(16Byte) //P49~P58: ICCID(20Byte) //P59~P74: 经度16Byte //P75~P90: 纬度 16Byte //P91~P106: MCU软件版本(16Byte) //P107~P123: MCU UUID(16Byte) //byte 启动原因 = decryptedContent[0]; //byte[] 软件版本号 = decryptedContent[1..33]; //byte[] IMEI = decryptedContent[33..49]; //byte[] ICCID = decryptedContent[49..59]; //byte[] 经度 = decryptedContent[59..75]; //byte[] 纬度 = decryptedContent[75..91]; //byte[] MCU软件版本 = decryptedContent[91..107]; //byte[] MCUUUID = decryptedContent[107..124]; //P0:启动原因 //P1~P2:设备类型ID 2Byte //P3~P4:厂牌ID 2Byte //P5~P6:机型ID 2Byte //P7: MCU软件版本(16Byte) //P8: MCU硬件版本(16Byte) //P9: MCU UUID长度(1Byte) //P10~Pn: MCU UUID(16Byte) byte 启动原因 = decryptedContent[0]; byte[] 设备类型ID = decryptedContent[1..3]; byte[] 厂牌ID = decryptedContent[3..5]; byte[] 机型ID = decryptedContent[5..7]; byte MCU软件版本 = decryptedContent[7]; byte MCU硬件版本 = decryptedContent[8]; byte MCUUUID长度 = decryptedContent[9]; byte[] MCUUUID = decryptedContent[10..]; DeviceRegisterDatum d = new DeviceRegisterDatum(); d.ClientId = DeviceClientId; d.ReStartReason = Convert.ToInt16(启动原因).ToString(); d.DeviceClassId = Tools.ByteToString(设备类型ID); d.Pid = Tools.ByteToString(厂牌ID); d.DeviceClassId = Tools.ByteToString(机型ID); d.McusoftVersion = MCU软件版本.ToString("X2"); d.McuhardVersion = MCU硬件版本.ToString("X2"); d.Mcuuuid = Tools.ByteToString(MCUUUID); d.CreateTime = DateTime.Now; d.EndPoint = TcpEndPoint; if (TcpEndPoint.Contains(":")) { string[] II_PPP = TcpEndPoint.Split(':'); string II = II_PPP[0]; if (!string.IsNullOrEmpty(II)) { d.RealAddress = BaiduAPI.GetBaiduIp(II); } } context.DeviceRegisterData.Add(d); await context.SaveChangesAsync(); } //燃气报警 if (cmd.ValueEquals(cmd_buning_alert)) { string HexData = Tools.ByteToString(decryptedContent); //燃气报警数据为:03 00 04 00 Console.WriteLine("燃气报警数据为:" + HexData); byte 燃气报警状态 = decryptedContent[0]; byte 报警状态1 = decryptedContent[2]; //0x04是报警 //0x03是取消报警 if (decryptedContent.Length == 4) { if (报警状态1 == 0x04) { if (DeviceClientId == 71) { IronPython.Runtime.PythonList OOO = StaticData.scope2.GetVariable("Dev_MobileNo"); var OOO1 = JsonConvert.SerializeObject(OOO); var OOO2 = JsonConvert.DeserializeObject>(OOO1); await 发送短信打电话(DeviceClientId, OOO2); } else { IronPython.Runtime.PythonList QQQ = StaticData.scope2.GetVariable("MobileNo"); var PPP1 = JsonConvert.SerializeObject(QQQ); var QQQ1 = JsonConvert.DeserializeObject>(PPP1); IronPython.Runtime.PythonList III = StaticData.scope2.GetVariable("DeviceId"); var PPP2 = JsonConvert.SerializeObject(III); var QQQ2 = JsonConvert.DeserializeObject>(PPP2); if (QQQ2.Contains(DeviceClientId)) { await 发送短信打电话(DeviceClientId, QQQ1); } } } } DeviceAlertDatum alertDatum = new DeviceAlertDatum(); alertDatum.ClientId = DeviceClientId; alertDatum.EndPoint = TcpEndPoint; alertDatum.CreateTime = DateTime.Now; if (TcpEndPoint.Contains(":")) { string[] II_PPP = TcpEndPoint.Split(':'); string II = II_PPP[0]; if (!string.IsNullOrEmpty(II)) { alertDatum.RealAddress = BaiduAPI.GetBaiduIp(II); } } alertDatum.Value = cmd_buning_alert; alertDatum.Value = new byte[] { 燃气报警状态 }; context.DeviceAlertData.Add(alertDatum); await context.SaveChangesAsync(); } //心跳 if (cmd.ValueEquals(cmd_e)) { byte[] 工作状态 = decryptedContent[2..4]; byte[] 实时值 = decryptedContent[4..6]; byte[] 环境值 = decryptedContent[6..8]; byte[] 背景值 = decryptedContent[8..10]; byte[] 差值 = decryptedContent[10..12]; byte[] 浓度百分比 = decryptedContent[12..14]; byte[] 稳定状态 = decryptedContent[14..16]; byte[] 阈值1 = decryptedContent[16..18]; byte[] 阈值2 = decryptedContent[18..20]; byte[] MCU温度 = decryptedContent[20..22]; byte[] 检测故障 = decryptedContent[22..24]; byte[] 传感器温度 = decryptedContent[24..26]; byte[] 传感器湿度 = decryptedContent[26..28]; byte[] 人测试 = decryptedContent[28..30]; byte[] ADC原始值 = decryptedContent[30..32]; byte[] 故障状态码 = decryptedContent[32..]; //P28-P29 ADC原始值 //P30-P31 故障状态码 //02 00 //02 00 //10 01 //13 01 //10 01 //FD FF //00 00 //01 00 //7D 02 //7D 07 //6E 00 //38 00 //1E 00 //2F 00 //00 00 //P0~P1:燃气检测:工作状态 //P2~P3:燃气检测:实时值 //P4~P5:燃气检测:环境值 //P6~P7:燃气检测:背景值 //P8~P9:燃气检测:差值 //P10~P11:燃气检测:浓度百分比 //P12~P13:燃气检测:稳定状态 //P14~P15:燃气检测:阈值1 //P16~P17:燃气检测:阈值2 //P18~P19:燃气检测:MCU温度 //P20~P21:燃气检测:检测故障 //P22~P23:燃气检测:传感器温度 //P24~P25:燃气检测:传感器湿度 //P26~P27:燃气检测:外部传感器检测有无人状态 //P28-P29 ADC原始值 //P30-P31 故障状态码 short gongzuo = BitConverter.ToInt16(工作状态.ToArray(), 0); short shishizhi = BitConverter.ToInt16(实时值.ToArray(), 0); short huanjingzhi = BitConverter.ToInt16(环境值.ToArray(), 0); short beijingzhi = BitConverter.ToInt16(背景值.ToArray(), 0); short chazhi = BitConverter.ToInt16(差值.ToArray(), 0); short nongdu = BitConverter.ToInt16(浓度百分比.ToArray(), 0); short wending = BitConverter.ToInt16(稳定状态.ToArray(), 0); short yuzhi1 = BitConverter.ToInt16(阈值1.ToArray(), 0); short yuzhi2 = BitConverter.ToInt16(阈值2.ToArray(), 0); short muc_t = BitConverter.ToInt16(MCU温度.ToArray(), 0); short guzhang = BitConverter.ToInt16(检测故障.ToArray(), 0); short T1 = BitConverter.ToInt16(传感器温度.ToArray(), 0); short H1 = BitConverter.ToInt16(传感器湿度.ToArray(), 0); short humancheck = BitConverter.ToInt16(人测试.ToArray(), 0); ushort adc_original_valuue = BitConverter.ToUInt16(ADC原始值.ToArray(), 0); ushort error_code = BitConverter.ToUInt16(故障状态码.ToArray(), 0); short XUHAO = BitConverter.ToInt16(Pack_SN.ToArray(), 0); //string DeviceID= BitConverter.ToInt64(DeviceOnlyNo,0).ToString(); //string DeviceID = Tools.Byte4ToLong(DeviceOnlyNo).ToString(); string DeviceID = Tools.ByteToString(DeviceOnlyNo); //Common.Grafana. //var gdata = Grafana.data; //if (DeviceID.Equals(gdata.device_id)) //{ // gdata.realtime_value = shishizhi.ToString(); // gdata.environment_value = huanjingzhi.ToString(); // gdata.mcu_temperature = muc_t.ToString(); // gdata.sensor_temperature = T1.ToString(); // gdata.sensor_humidity = H1.ToString(); // gdata.adc_raw_value =adc_original_valuue.ToString(); //} TcpHeartbeatDatum datum = new TcpHeartbeatDatum() { WorkStatus = gongzuo.ToString(), RealTimeValue = shishizhi.ToString(), EnvValue = huanjingzhi.ToString(), BgValue = beijingzhi.ToString(), DiffValue = chazhi.ToString(), NongDuBaiFenBi = nongdu.ToString(), StableStatus = wending.ToString(), ThresholdValue1 = yuzhi1.ToString(), ThresholdValue2 = yuzhi2.ToString(), McuT = muc_t.ToString(), GuZhangCheck = guzhang.ToString(), Temperature = T1.ToString(), Humidity = H1.ToString(), AnyHuman = humancheck.ToString(), //CreateTime = DateTime.Now, CreateTime = dangqian_time, OriginalDataId = XUHAO, DeviceId = DeviceClientId.ToString(), AdcOriginalValuue = adc_original_valuue.ToString(), ErrorCode = error_code.ToString() }; context.TcpHeartbeatData.Add(datum); Console.WriteLine("写入心跳"); //校验CRC16,小端模式(低地址在前) //校验内容:Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + CMD + PARA byte[] CRCCode = NNN1[^8..^6]; //Tools.CRC16 //结尾标志符 //获取从倒数第三个元素到数组末尾的所有元素 byte[] Tail = NNN1[^6..]; } TcpReceiveDatum tcpReceiveDatum = new TcpReceiveDatum(); tcpReceiveDatum.ClientId = DeviceClientId; tcpReceiveDatum.CommandType = BitConverter.ToInt16(cmd); tcpReceiveDatum.OriginalData = OStr; tcpReceiveDatum.DencryptData = DStr; tcpReceiveDatum.InsertDate = DateTime.Now; tcpReceiveDatum.InsertUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); tcpReceiveDatum.EndPoint = TcpEndPoint; context.TcpReceiveData.Add(tcpReceiveDatum); await context.SaveChangesAsync(); Console.WriteLine("写入数据"); } catch (Exception ex) { _logger.Error("错误的数据为:" + BODYStr); _logger.Error(ex.Message); _logger.Error(ex.StackTrace); } await Task.CompletedTask; } private static async Task 设备在线状态(string TcpEndPoint, int CID, PostgresContext context) { var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(TcpEndPoint)); if (S != null) { S.Status = ConstValue.OnLine; S.DeviceId = CID; S.UpdateTime = DateTime.Now; S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); context.Update(S); } else { DeviceStatus d = new DeviceStatus(); d.EndPoint = TcpEndPoint; d.DeviceId = CID; d.UpdateTime = DateTime.Now; d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); d.Status = ConstValue.OnLine; context.DeviceStatuses.Add(d); } } /// /// TCP 断网原因 /// /// /// /// private async Task Consumer1_ReceivedAsync(object sender, BasicDeliverEventArgs ea) { try { // 获取 channel 对象 var consumer = (AsyncEventingBasicConsumer)sender; var channel = consumer.Channel; await channel.BasicAckAsync(ea.DeliveryTag, false); var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); _logger.Error("设备断网:" + message); //{"endpoint":"172.16.4.40:31992","close_reason":"tcp_closed","currenttimestamp":1750643082} var T = System.Text.Json.JsonSerializer.Deserialize(message); string[] data = T.endpoint.Split(':'); string ip = data[0]; int port = 0; int.TryParse(data[1], out port); await using var context = _factory.CreateDbContext(); string E = T.endpoint; var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(E)); if (S != null) { S.Status = ConstValue.OffLine; S.UpdateTime = DateTime.Now; S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); context.Update(S); } else { DeviceStatus d = new DeviceStatus(); d.EndPoint = E; d.DeviceId = 0; d.UpdateTime = DateTime.Now; d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); d.Status = ConstValue.OffLine; context.DeviceStatuses.Add(d); } await context.SaveChangesAsync(); TcpCloseDatum t = new TcpCloseDatum(); t.WwwIp = ip; t.ClientId = T.clientid; t.RealAddress = BaiduAPI.GetBaiduIp(ip); t.WwwPort = port; t.CloseReason = T.close_reason; t.CreatetimeUnix = T.currenttimestamp; context.TcpCloseData.Add(t); await context.SaveChangesAsync(); } catch (Exception ex) { _logger.Error(ex.Message); } await Task.CompletedTask; } /// /// TCP 连接信息 /// /// /// /// private async Task Consumer3_ReceivedAsync(object sender, BasicDeliverEventArgs ea) { try { // 获取 channel 对象 var consumer = (AsyncEventingBasicConsumer)sender; var channel = consumer.Channel; await channel.BasicAckAsync(ea.DeliveryTag, false); var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); //{"endpoint":"172.16.4.40:31992","close_reason":"tcp_closed","currenttimestamp":1750643082} var T = System.Text.Json.JsonSerializer.Deserialize(message); string[] data = T.endpoint.Split(':'); string ip = data[0]; int port = 0; bool bbba = int.TryParse(data[1], out port); if (bbba == false) { _logger.Error("端口出错了:" + message); } await using var context = _factory.CreateDbContext(); string E = T.endpoint; var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(E)); if (S != null) { S.Status = ConstValue.OnLine; S.UpdateTime = DateTime.Now; S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); context.Update(S); } else { DeviceStatus d = new DeviceStatus(); d.EndPoint = E; d.DeviceId = 0; d.UpdateTime = DateTime.Now; d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now); d.Status = ConstValue.OnLine; context.DeviceStatuses.Add(d); } await context.SaveChangesAsync(); TcpConnDatum t = new TcpConnDatum(); t.WwwIp = ip; t.ClientId = T.clientid; t.RealAddress = BaiduAPI.GetBaiduIp(ip); t.WwwPort = port; t.ConnectTimeUnix = T.currenttimestamp; context.TcpConnData.Add(t); await context.SaveChangesAsync(); } catch (Exception) { } } /// /// TCP Send 数据 /// /// /// /// private async Task Consumer4_ReceivedAsync(object sender, BasicDeliverEventArgs ea) { try { // 获取 channel 对象 var consumer = (AsyncEventingBasicConsumer)sender; var channel = consumer.Channel; await channel.BasicAckAsync(ea.DeliveryTag, false); var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); //V= #{clientid=>VVVXXX,endpoint=> E1,send_data=> Data,currenttimestamp=>CurrentTimestamp} //{"clientid":49,"send_data":[170,187,204,221,238],"currenttimestamp":1761200698,"endpoint":"111.55.160.5:4812"} _logger.Error("收到的数据:" + message); var T = System.Text.Json.JsonSerializer.Deserialize(message); var AAA1232 = T.send_data?.Select(A => Convert.ToByte(A)); await using var context = _factory.CreateDbContext(); TcpSendDatum t = new TcpSendDatum(); t.ClientId = T.clientid; t.SendData = Tools.ByteToString(AAA1232.ToArray()); t.CreateTime = DateTime.Now; t.CreateTimeUnix = T.currenttimestamp; t.CommandType = ""; context.TcpSendData.Add(t); await context.SaveChangesAsync(); } catch (Exception ex) { _logger.Error("收到数据出异常了:" + ex.Message); } } /// /// MQTT的数据,还没有 开始启用 /// /// /// /// private async Task Consumer0_ReceivedAsync(object sender, BasicDeliverEventArgs ea) { // 获取 channel 对象 var consumer = (AsyncEventingBasicConsumer)sender; var channel = consumer.Channel; await channel.BasicAckAsync(ea.DeliveryTag, false); var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine("收到消息 {0}", message); try { //if (!string.IsNullOrEmpty(message)) //{ // Task.Factory.StartNew((state) => // { // }, body); //} //string BodyString = state.ToString(); _logger.Error(body); var QQQ = System.Text.Json.JsonSerializer.Deserialize(message); string topic = QQQ.topic; string payload = QQQ.payload; //传递过来的数据就是base64的,没有必要再转 byte[] bbb = Convert.FromBase64String(payload); string bbb1 = Tools.ByteToString(bbb); this._logger.Info("Topic: " + topic); this._logger.Info("PayLoad: " + bbb1); byte[] NNN1 = Tools.HEXString2ByteArray(bbb1.Replace(" ", "")); //AA 13 00 44 0B 00 FF FF FF FF 61 67 EF 6B E8 2D 2D 0D 0A 2D 2D byte Header = NNN1[0]; //从1到3之间的元素,不包括3 byte[] Len = NNN1[1..3]; //byte[] Len = NNN1.AsSpan(1,2); byte[] Pack_SN = NNN1[3..5]; //重发次数 byte RetryCount = NNN1[5]; //设备编号 byte[] DeviceOnlyNo = NNN1[6..10]; //命令字 byte[] Command = NNN1[10..12]; //校验CRC16,小端模式(低地址在前) //校验内容:Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + CMD + PARA byte[] CRCCode = NNN1[^8..^6]; //结尾标志符 //// 获取从倒数第三个元素到数组末尾的所有元素 byte[] Tail = NNN1[^6..]; } catch (Exception) { } await Task.CompletedTask; } } }