Files

919 lines
40 KiB
C#
Raw Permalink Normal View History

2025-12-11 14:04:39 +08:00

using Microsoft.EntityFrameworkCore.Metadata;
using Newtonsoft.Json;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.VisualBasic;
using IotManager.Controllers;
using NLog;
using ViewModels;
using System.Threading.Channels;
using MySQLAccess.PGModels;
using Microsoft.EntityFrameworkCore;
using CommonEntity;
using SixLabors.ImageSharp.PixelFormats;
using System.Net.WebSockets;
using SixLabors.ImageSharp.Processing.Processors.Quantization;
using Microsoft.Scripting.Utils;
using Newtonsoft.Json.Converters;
using Microsoft.Extensions.Caching.Memory;
using IotManager.Common;
using Microsoft.AspNetCore.Routing.Tree;
using Common;
namespace IotManager.services
{
public class Mqtt2RabbitMQ : BackgroundService
{
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly IConfiguration Configuration;
private readonly IDbContextFactory<PostgresContext> _factory;
private IMemoryCache _cache { get; set; }
public Mqtt2RabbitMQ(IConfiguration config, IDbContextFactory<PostgresContext> db, IMemoryCache cache)
{
this.Configuration = config;
this._factory = db;
this._cache = cache;
}
//public static byte[] IV = new byte[] { 0xf0, 0xf1, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, 0xf9, 0xfa, 0xfb, 0xfc, 0xfd, 0xfe, 0xff };
//public static byte[] key = new byte[] { 0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c };
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
string? UserHost = Configuration["RabbitMQConfig:RabbitMQHost"];
string? Port = Configuration["RabbitMQConfig:RabbitMQPort"];
string? UserName = Configuration["RabbitMQConfig:RabbitMQUserName"];
string? Password = Configuration["RabbitMQConfig:RabbitMQPWD"];
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UserHost;
factory.UserName = UserName;
factory.Password = Password;
factory.VirtualHost = "/";
using var connection = await factory.CreateConnectionAsync();
connection.ConnectionShutdownAsync -= Connection_ConnectionShutdownAsync;
connection.ConnectionShutdownAsync += Connection_ConnectionShutdownAsync;
using var channel0 = await connection.CreateChannelAsync();
using var channel1 = await connection.CreateChannelAsync();
using var channel2 = await connection.CreateChannelAsync();
using var channel3 = await connection.CreateChannelAsync();
using var channel4 = await connection.CreateChannelAsync();
//await channel0.QueueDeclareAsync(queue: "mqtt_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//await channel1.QueueDeclareAsync(queue: "tcp_logdata_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//await channel2.QueueDeclareAsync(queue: "tcp_data_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//await channel3.QueueDeclareAsync(queue: "tcp_connect_logdata", durable: true, exclusive: false, autoDelete: false, arguments: null);
//await channel4.QueueDeclareAsync(queue: "tcp_send_data", durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer0 = new AsyncEventingBasicConsumer(channel0);
var consumer1 = new AsyncEventingBasicConsumer(channel1);
var consumer2 = new AsyncEventingBasicConsumer(channel2);
var consumer3 = new AsyncEventingBasicConsumer(channel3);
var consumer4 = new AsyncEventingBasicConsumer(channel4);
consumer0.ReceivedAsync -= Consumer0_ReceivedAsync;
consumer0.ReceivedAsync += Consumer0_ReceivedAsync;
consumer1.ReceivedAsync -= Consumer1_ReceivedAsync;
consumer1.ReceivedAsync += Consumer1_ReceivedAsync;
consumer2.ReceivedAsync -= Consumer2_ReceivedAsync;
consumer2.ReceivedAsync += Consumer2_ReceivedAsync;
consumer3.ReceivedAsync -= Consumer3_ReceivedAsync;
consumer3.ReceivedAsync += Consumer3_ReceivedAsync;
consumer4.ReceivedAsync -= Consumer4_ReceivedAsync;
consumer4.ReceivedAsync += Consumer4_ReceivedAsync;
var a = channel0.BasicConsumeAsync(queue: "mqtt_queue", autoAck: false, consumer: consumer0);
var b = channel1.BasicConsumeAsync(queue: "tcp_logdata_queue", autoAck: false, consumer: consumer1);
var c = channel2.BasicConsumeAsync(queue: "tcp_data_queue", autoAck: false, consumer: consumer2);
var d = channel3.BasicConsumeAsync(queue: "tcp_connect_logdata", autoAck: false, consumer: consumer3);
var e = channel4.BasicConsumeAsync(queue: "tcp_send_data", autoAck: false, consumer: consumer4);
await Task.WhenAll(a, b, c, d, e);
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(2000,stoppingToken);
}
}
catch (Exception e1)
{
}
_logger.Error("这里不会执行的");
await Task.Delay(5000);
}
}
public class TCPData
{
public string? endpoint { get; set; }
public string? data { get; set; }
public long reporttimestamp { get; set; }
}
/// <summary>
/// 燃气报警
/// </summary>
public static byte[] cmd_buning_alert = new byte[] { 0x03, 0x00 };
public static byte[] cmd_e = new byte[] { 0x02, 0x00 };
/// <summary>
/// 注册命令
/// </summary>
public static byte[] cmd_register = new byte[] { 0x01, 0x00 };
/// <summary>
/// 报警电话
/// </summary>
/// <param name="DeviceClientId"></param>
/// <param name="QQQ1"></param>
/// <returns></returns>
private async Task (uint DeviceClientId, List<string>? QQQ1)
{
double UIA = 3;
foreach (var item in QQQ1)
{
MsgData ss = new MsgData();
ss.PhoneNumber = item.ToString();
ss.CallerName = "IOT平台";
ss.Content = string.Format("【能泰科技】您的燃气报警器【{0}】触发警报,请尽快 确认处理!", DeviceClientId);
ss.StartingPoint = Tools.ToUnixTimestampBySeconds(DateTime.Now).ToString();
ss.DeadLine = Tools.ToUnixTimestampBySeconds(DateTime.Now.AddMinutes(UIA)).ToString();
ss.Type = "2";
await MsgSend.SendMsg(ss);
_logger.Error("手机号" + item.ToString() + "发送了短信");
MsgData ss1 = new MsgData();
ss1.PhoneNumber = item.ToString();
ss1.CallerName = "IOT平台";
ss1.Content = "能泰科技 您的燃气报警器触发警报 请尽快处理";
ss1.StartingPoint = Tools.ToUnixTimestampBySeconds(DateTime.Now).ToString();
ss1.DeadLine = Tools.ToUnixTimestampBySeconds(DateTime.Now.AddMinutes(UIA)).ToString();
ss1.Type = "1";
await MsgSend.SendMsg(ss1);
_logger.Error("手机号" + item.ToString() + "打了电话");
UIA = UIA + 2;
}
}
private async Task Connection_ConnectionShutdownAsync(object sender, ShutdownEventArgs args)
{
//if (args.Initiator != ShutdownInitiator.Application)
//{
// await Task.Delay(5000);
//}
await Task.CompletedTask;
}
/// <summary>
/// TCP 数据解析
/// </summary>
/// <param name="sender"></param>
/// <param name="event_args"></param>
/// <returns></returns>
private async Task Consumer2_ReceivedAsync(object sender, BasicDeliverEventArgs event_args)
{
string BODYStr = "";
try
{
// 获取 channel 对象
var consumer = (AsyncEventingBasicConsumer)sender;
var channel = consumer.Channel;
await channel.BasicAckAsync(event_args.DeliveryTag, false);
var body = event_args.Body;
//AA 30 00
//18 00
//00
//FF FF FF FF
//A3 31 DA 24 3B B3 7E 1D F4 F2 E1 CB 5A C8 8A 91 45 2D A8 72 14 AC 8A B2 C3 69 61 42 F9 F8
//CRC
//AA B9
//尾标志
//2D 2D 0D 0A 2D 2D
var message = Encoding.UTF8.GetString(body.ToArray());
BODYStr = message;
TCPData TTT = System.Text.Json.JsonSerializer.Deserialize<TCPData>(message);
long report_time = TTT.reporttimestamp;
DateTime dangqian_time = Tools.ToLocalTimeDateBySeconds(report_time);
string TcpEndPoint = TTT.endpoint;
string[] data = TcpEndPoint.Split(':');
string ip = data[0];
short port = 0;
short.TryParse(data[1], out port);
var NNN1 = Tools.HEXString2ByteArray(TTT.data);
string OStr = Tools.ByteToString(NNN1);
//头
byte Header = NNN1[0];
//从1到3之间的元素不包括3
byte[] Len = NNN1[1..3];
//SN号
byte[] Pack_SN = NNN1[3..5];
ushort Pack_XUHAO = BitConverter.ToUInt16(Pack_SN.ToArray(), 0);
//重发次数
byte RetryCount = NNN1[5];
//设备编号
byte[] DeviceOnlyNo = NNN1[6..10];
//ClientId
uint DeviceClientId = BitConverter.ToUInt32(DeviceOnlyNo);
Console.WriteLine("Client是" + DeviceClientId);
//ushort llf = (ushort)NNN1.Length;
//IV内容组成Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + Pack_END
//List<byte> IIV = new List<byte>();
//IIV.Add(0xAA);
//IIV.AddRange(BitConverter.GetBytes(llf));
//IIV.AddRange(Pack_SN);
//IIV.Add(0x00);
//IIV.AddRange(DeviceOnlyNo);
//IIV.AddRange(new byte[] { 0x2D, 0x2D, 0x0D, 0x0A, 0x2D, 0x2D });
//byte[] New_IV = IIV.ToArray();
//Console.WriteLine("IV是" + Tools.ByteToString(New_IV));
//AA 30 00 80 01 00 FF FF FF FF 5F C5 64 B3 93 50 FA 3F 48 D0 E9 C0 DD 7E 7C 62 10 E4 AD 4C EA 0D 04 FD 39 4C A7 77 40 18 7F 31 2D 2D 0D 0A 2D 2D
byte[] GGJ = NNN1[10..^8];
//加密key
byte[] key = new byte[] { };
//byte[] key = new byte[] { 0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c };
await using var context = _factory.CreateDbContext();
string Kkey = ConstValue.OnOffLineKey + "_" + TcpEndPoint;
var CID = CSRedisCacheHelper.Get<uint>(Kkey);
if (CID != 0)
{
}
else
{
CSRedisCacheHelper.Set<uint>(Kkey, DeviceClientId);
}
//await NewMethod(TcpEndPoint, TcpEndPoint, context);
string FinallyKey = "";
string cachekey = "SecretKey_" + DeviceClientId.ToString();
string str = _cache.Get<string>(cachekey);
if (!string.IsNullOrEmpty(str))
{
FinallyKey = str;
}
else
{
var SinData = await context.Deviceinfos.SingleOrDefaultAsync(A => A.ClientId == DeviceClientId);
if (SinData != null)
{
FinallyKey = SinData.SecretKey;
_cache.Set<string>(cachekey, FinallyKey, DateTimeOffset.Now.AddMinutes(20));
}
}
if (!string.IsNullOrEmpty(FinallyKey))
{
key = Encoding.ASCII.GetBytes(FinallyKey);
}
//string HHS= Tools.ByteToString(key);
//Console.WriteLine("Key 使用的是:"+HHS);
//AES_CTR forDecrypting = new AES_CTR(key, New_IV);
//byte[] decryptedContent = new byte[GGJ.Length];
//forDecrypting.DecryptBytes(decryptedContent, GGJ);
//byte[] decryptedContent = GGJ;
byte[] decryptedContent = Tools.IHIOT_Message_Content_Encrypt(Pack_XUHAO, key, GGJ);
string DStr = Tools.ByteToString(decryptedContent);
Console.WriteLine("解密结果:" + DStr);
//命令字
byte[] cmd = decryptedContent[0..2];
Console.WriteLine("命令字为A" + Tools.ByteToString(cmd));
Console.WriteLine("命令字为B" + Tools.ByteToString(cmd_e));
List<byte> xianshi = new List<byte>();
xianshi.Add(Header);
xianshi.AddRange(Len);
xianshi.AddRange(Pack_SN);
xianshi.Add(RetryCount);
xianshi.AddRange(DeviceOnlyNo);
xianshi.AddRange(decryptedContent);
byte[] CRCCode_F = NNN1[^8..^6];
byte[] Tail_F = NNN1[^6..];
xianshi.AddRange(CRCCode_F);
xianshi.AddRange(Tail_F);
string xianshi_str = Tools.ByteToString(xianshi.ToArray());
if (MqttClientHostedService._mqttClient != null && MqttClientHostedService._mqttClient.IsConnected)
{
try
{
string endppp = TTT.endpoint;
string topic = string.Format("emqtt/tcp/" + endppp);
string payload = "";
var payloadbyte = new List<byte>();
payloadbyte.Add(Header);
payloadbyte.AddRange(Len);
payloadbyte.AddRange(Pack_SN);
payloadbyte.Add(RetryCount);
payloadbyte.AddRange(DeviceOnlyNo);
payloadbyte.AddRange(decryptedContent);
string sss = Tools.ByteToString(payloadbyte.ToArray());
_logger.Error("MQTT Pub" + sss);
await MqttClientHostedService.PublishAsync(topic, xianshi_str);
}
catch (Exception)
{
}
}
//如果是注册命令
if (cmd.ValueEquals(cmd_register))
{
//P0启动原因
//P1~P32软件版本号(32Byte)
//P33~P48IMEI(16Byte)
//P49~P58: ICCID(20Byte)
//P59~P74: 经度16Byte
//P75~P90: 纬度 16Byte
//P91~P106: MCU软件版本(16Byte)
//P107~P123: MCU UUID(16Byte)
//byte 启动原因 = decryptedContent[0];
//byte[] 软件版本号 = decryptedContent[1..33];
//byte[] IMEI = decryptedContent[33..49];
//byte[] ICCID = decryptedContent[49..59];
//byte[] 经度 = decryptedContent[59..75];
//byte[] 纬度 = decryptedContent[75..91];
//byte[] MCU软件版本 = decryptedContent[91..107];
//byte[] MCUUUID = decryptedContent[107..124];
//P0启动原因
//P1~P2设备类型ID 2Byte
//P3~P4厂牌ID 2Byte
//P5~P6机型ID 2Byte
//P7: MCU软件版本(16Byte)
//P8: MCU硬件版本(16Byte)
//P9: MCU UUID长度(1Byte)
//P10~Pn: MCU UUID(16Byte)
byte = decryptedContent[0];
byte[] ID = decryptedContent[1..3];
byte[] ID = decryptedContent[3..5];
byte[] ID = decryptedContent[5..7];
byte MCU软件版本 = decryptedContent[7];
byte MCU硬件版本 = decryptedContent[8];
byte MCUUUID长度 = decryptedContent[9];
byte[] MCUUUID = decryptedContent[10..];
DeviceRegisterDatum d = new DeviceRegisterDatum();
d.ClientId = DeviceClientId;
d.ReStartReason = Convert.ToInt16().ToString();
d.DeviceClassId = Tools.ByteToString(ID);
d.Pid = Tools.ByteToString(ID);
d.DeviceClassId = Tools.ByteToString(ID);
d.McusoftVersion = MCU软件版本.ToString("X2");
d.McuhardVersion = MCU硬件版本.ToString("X2");
d.Mcuuuid = Tools.ByteToString(MCUUUID);
d.CreateTime = DateTime.Now;
d.EndPoint = TcpEndPoint;
if (TcpEndPoint.Contains(":"))
{
string[] II_PPP = TcpEndPoint.Split(':');
string II = II_PPP[0];
if (!string.IsNullOrEmpty(II))
{
d.RealAddress = BaiduAPI.GetBaiduIp(II);
}
}
context.DeviceRegisterData.Add(d);
await context.SaveChangesAsync();
}
//燃气报警
if (cmd.ValueEquals(cmd_buning_alert))
{
string HexData = Tools.ByteToString(decryptedContent);
//燃气报警数据为:03 00 04 00
Console.WriteLine("燃气报警数据为:" + HexData);
byte = decryptedContent[0];
byte 1 = decryptedContent[2];
//0x04是报警
//0x03是取消报警
if (decryptedContent.Length == 4)
{
if (1 == 0x04)
{
if (DeviceClientId == 71)
{
IronPython.Runtime.PythonList OOO = StaticData.scope2.GetVariable("Dev_MobileNo");
var OOO1 = JsonConvert.SerializeObject(OOO);
var OOO2 = JsonConvert.DeserializeObject<List<string>>(OOO1);
await (DeviceClientId, OOO2);
}
else
{
IronPython.Runtime.PythonList QQQ = StaticData.scope2.GetVariable("MobileNo");
var PPP1 = JsonConvert.SerializeObject(QQQ);
var QQQ1 = JsonConvert.DeserializeObject<List<string>>(PPP1);
IronPython.Runtime.PythonList III = StaticData.scope2.GetVariable("DeviceId");
var PPP2 = JsonConvert.SerializeObject(III);
var QQQ2 = JsonConvert.DeserializeObject<List<uint>>(PPP2);
if (QQQ2.Contains(DeviceClientId))
{
await (DeviceClientId, QQQ1);
}
}
}
}
DeviceAlertDatum alertDatum = new DeviceAlertDatum();
alertDatum.ClientId = DeviceClientId;
alertDatum.EndPoint = TcpEndPoint;
alertDatum.CreateTime = DateTime.Now;
if (TcpEndPoint.Contains(":"))
{
string[] II_PPP = TcpEndPoint.Split(':');
string II = II_PPP[0];
if (!string.IsNullOrEmpty(II))
{
alertDatum.RealAddress = BaiduAPI.GetBaiduIp(II);
}
}
alertDatum.Value = cmd_buning_alert;
alertDatum.Value = new byte[] { };
context.DeviceAlertData.Add(alertDatum);
await context.SaveChangesAsync();
}
//心跳
if (cmd.ValueEquals(cmd_e))
{
byte[] = decryptedContent[2..4];
byte[] = decryptedContent[4..6];
byte[] = decryptedContent[6..8];
byte[] = decryptedContent[8..10];
byte[] = decryptedContent[10..12];
byte[] = decryptedContent[12..14];
byte[] = decryptedContent[14..16];
byte[] 1 = decryptedContent[16..18];
byte[] 2 = decryptedContent[18..20];
byte[] MCU温度 = decryptedContent[20..22];
byte[] = decryptedContent[22..24];
byte[] = decryptedContent[24..26];
byte[] 湿 = decryptedContent[26..28];
byte[] = decryptedContent[28..30];
byte[] ADC原始值 = decryptedContent[30..32];
byte[] = decryptedContent[32..];
//P28-P29 ADC原始值
//P30-P31 故障状态码
//02 00
//02 00
//10 01
//13 01
//10 01
//FD FF
//00 00
//01 00
//7D 02
//7D 07
//6E 00
//38 00
//1E 00
//2F 00
//00 00
//P0~P1燃气检测工作状态
//P2~P3燃气检测实时值
//P4~P5燃气检测环境值
//P6~P7燃气检测背景值
//P8~P9燃气检测差值
//P10~P11燃气检测浓度百分比
//P12~P13燃气检测稳定状态
//P14~P15燃气检测阈值1
//P16~P17燃气检测阈值2
//P18~P19燃气检测MCU温度
//P20~P21燃气检测检测故障
//P22~P23燃气检测传感器温度
//P24~P25燃气检测传感器湿度
//P26~P27燃气检测外部传感器检测有无人状态
//P28-P29 ADC原始值
//P30-P31 故障状态码
short gongzuo = BitConverter.ToInt16(.ToArray(), 0);
short shishizhi = BitConverter.ToInt16(.ToArray(), 0);
short huanjingzhi = BitConverter.ToInt16(.ToArray(), 0);
short beijingzhi = BitConverter.ToInt16(.ToArray(), 0);
short chazhi = BitConverter.ToInt16(.ToArray(), 0);
short nongdu = BitConverter.ToInt16(.ToArray(), 0);
short wending = BitConverter.ToInt16(.ToArray(), 0);
short yuzhi1 = BitConverter.ToInt16(1.ToArray(), 0);
short yuzhi2 = BitConverter.ToInt16(2.ToArray(), 0);
short muc_t = BitConverter.ToInt16(MCU温度.ToArray(), 0);
short guzhang = BitConverter.ToInt16(.ToArray(), 0);
short T1 = BitConverter.ToInt16(.ToArray(), 0);
short H1 = BitConverter.ToInt16(湿.ToArray(), 0);
short humancheck = BitConverter.ToInt16(.ToArray(), 0);
ushort adc_original_valuue = BitConverter.ToUInt16(ADC原始值.ToArray(), 0);
ushort error_code = BitConverter.ToUInt16(.ToArray(), 0);
short XUHAO = BitConverter.ToInt16(Pack_SN.ToArray(), 0);
//string DeviceID= BitConverter.ToInt64(DeviceOnlyNo,0).ToString();
//string DeviceID = Tools.Byte4ToLong(DeviceOnlyNo).ToString();
string DeviceID = Tools.ByteToString(DeviceOnlyNo);
//Common.Grafana.
//var gdata = Grafana.data;
//if (DeviceID.Equals(gdata.device_id))
//{
// gdata.realtime_value = shishizhi.ToString();
// gdata.environment_value = huanjingzhi.ToString();
// gdata.mcu_temperature = muc_t.ToString();
// gdata.sensor_temperature = T1.ToString();
// gdata.sensor_humidity = H1.ToString();
// gdata.adc_raw_value =adc_original_valuue.ToString();
//}
TcpHeartbeatDatum datum = new TcpHeartbeatDatum()
{
WorkStatus = gongzuo.ToString(),
RealTimeValue = shishizhi.ToString(),
EnvValue = huanjingzhi.ToString(),
BgValue = beijingzhi.ToString(),
DiffValue = chazhi.ToString(),
NongDuBaiFenBi = nongdu.ToString(),
StableStatus = wending.ToString(),
ThresholdValue1 = yuzhi1.ToString(),
ThresholdValue2 = yuzhi2.ToString(),
McuT = muc_t.ToString(),
GuZhangCheck = guzhang.ToString(),
Temperature = T1.ToString(),
Humidity = H1.ToString(),
AnyHuman = humancheck.ToString(),
//CreateTime = DateTime.Now,
CreateTime = dangqian_time,
OriginalDataId = XUHAO,
DeviceId = DeviceClientId.ToString(),
AdcOriginalValuue = adc_original_valuue.ToString(),
ErrorCode = error_code.ToString()
};
context.TcpHeartbeatData.Add(datum);
Console.WriteLine("写入心跳");
//校验CRC16小端模式(低地址在前)
//校验内容Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + CMD + PARA
byte[] CRCCode = NNN1[^8..^6];
//Tools.CRC16
//结尾标志符
//获取从倒数第三个元素到数组末尾的所有元素
byte[] Tail = NNN1[^6..];
}
TcpReceiveDatum tcpReceiveDatum = new TcpReceiveDatum();
tcpReceiveDatum.ClientId = DeviceClientId;
tcpReceiveDatum.CommandType = BitConverter.ToInt16(cmd);
tcpReceiveDatum.OriginalData = OStr;
tcpReceiveDatum.DencryptData = DStr;
tcpReceiveDatum.InsertDate = DateTime.Now;
tcpReceiveDatum.InsertUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
tcpReceiveDatum.EndPoint = TcpEndPoint;
context.TcpReceiveData.Add(tcpReceiveDatum);
await context.SaveChangesAsync();
Console.WriteLine("写入数据");
}
catch (Exception ex)
{
_logger.Error("错误的数据为:" + BODYStr);
_logger.Error(ex.Message);
_logger.Error(ex.StackTrace);
}
await Task.CompletedTask;
}
private static async Task 线(string TcpEndPoint, int CID, PostgresContext context)
{
var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(TcpEndPoint));
if (S != null)
{
S.Status = ConstValue.OnLine;
S.DeviceId = CID;
S.UpdateTime = DateTime.Now;
S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
context.Update(S);
}
else
{
DeviceStatus d = new DeviceStatus();
d.EndPoint = TcpEndPoint;
d.DeviceId = CID;
d.UpdateTime = DateTime.Now;
d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
d.Status = ConstValue.OnLine;
context.DeviceStatuses.Add(d);
}
}
/// <summary>
/// TCP 断网原因
/// </summary>
/// <param name="sender"></param>
/// <param name="ea"></param>
/// <returns></returns>
private async Task Consumer1_ReceivedAsync(object sender, BasicDeliverEventArgs ea)
{
try
{
// 获取 channel 对象
var consumer = (AsyncEventingBasicConsumer)sender;
var channel = consumer.Channel;
await channel.BasicAckAsync(ea.DeliveryTag, false);
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_logger.Error("设备断网:" + message);
//{"endpoint":"172.16.4.40:31992","close_reason":"tcp_closed","currenttimestamp":1750643082}
var T = System.Text.Json.JsonSerializer.Deserialize<TcpCloseReason>(message);
string[] data = T.endpoint.Split(':');
string ip = data[0];
int port = 0;
int.TryParse(data[1], out port);
await using var context = _factory.CreateDbContext();
string E = T.endpoint;
var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(E));
if (S != null)
{
S.Status = ConstValue.OffLine;
S.UpdateTime = DateTime.Now;
S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
context.Update(S);
}
else
{
DeviceStatus d = new DeviceStatus();
d.EndPoint = E;
d.DeviceId = 0;
d.UpdateTime = DateTime.Now;
d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
d.Status = ConstValue.OffLine;
context.DeviceStatuses.Add(d);
}
await context.SaveChangesAsync();
TcpCloseDatum t = new TcpCloseDatum();
t.WwwIp = ip;
t.ClientId = T.clientid;
t.RealAddress = BaiduAPI.GetBaiduIp(ip);
t.WwwPort = port;
t.CloseReason = T.close_reason;
t.CreatetimeUnix = T.currenttimestamp;
context.TcpCloseData.Add(t);
await context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.Error(ex.Message);
}
await Task.CompletedTask;
}
/// <summary>
/// TCP 连接信息
/// </summary>
/// <param name="sender"></param>
/// <param name="ea"></param>
/// <returns></returns>
private async Task Consumer3_ReceivedAsync(object sender, BasicDeliverEventArgs ea)
{
try
{
// 获取 channel 对象
var consumer = (AsyncEventingBasicConsumer)sender;
var channel = consumer.Channel;
await channel.BasicAckAsync(ea.DeliveryTag, false);
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
//{"endpoint":"172.16.4.40:31992","close_reason":"tcp_closed","currenttimestamp":1750643082}
var T = System.Text.Json.JsonSerializer.Deserialize<TcpConnectLog>(message);
string[] data = T.endpoint.Split(':');
string ip = data[0];
int port = 0;
bool bbba = int.TryParse(data[1], out port);
if (bbba == false)
{
_logger.Error("端口出错了:" + message);
}
await using var context = _factory.CreateDbContext();
string E = T.endpoint;
var S = await context.DeviceStatuses.SingleOrDefaultAsync(A => A.EndPoint.Equals(E));
if (S != null)
{
S.Status = ConstValue.OnLine;
S.UpdateTime = DateTime.Now;
S.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
context.Update(S);
}
else
{
DeviceStatus d = new DeviceStatus();
d.EndPoint = E;
d.DeviceId = 0;
d.UpdateTime = DateTime.Now;
d.UpdateTimeUnix = Tools.ToUnixTimestampByMilliseconds(DateTime.Now);
d.Status = ConstValue.OnLine;
context.DeviceStatuses.Add(d);
}
await context.SaveChangesAsync();
TcpConnDatum t = new TcpConnDatum();
t.WwwIp = ip;
t.ClientId = T.clientid;
t.RealAddress = BaiduAPI.GetBaiduIp(ip);
t.WwwPort = port;
t.ConnectTimeUnix = T.currenttimestamp;
context.TcpConnData.Add(t);
await context.SaveChangesAsync();
}
catch (Exception)
{
}
}
/// <summary>
/// TCP Send 数据
/// </summary>
/// <param name="sender"></param>
/// <param name="ea"></param>
/// <returns></returns>
private async Task Consumer4_ReceivedAsync(object sender, BasicDeliverEventArgs ea)
{
try
{
// 获取 channel 对象
var consumer = (AsyncEventingBasicConsumer)sender;
var channel = consumer.Channel;
await channel.BasicAckAsync(ea.DeliveryTag, false);
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
//V= #{clientid=>VVVXXX,endpoint=> E1,send_data=> Data,currenttimestamp=>CurrentTimestamp}
//{"clientid":49,"send_data":[170,187,204,221,238],"currenttimestamp":1761200698,"endpoint":"111.55.160.5:4812"}
_logger.Error("收到的数据:" + message);
var T = System.Text.Json.JsonSerializer.Deserialize<TcpSendDataLog>(message);
var AAA1232 = T.send_data?.Select(A => Convert.ToByte(A));
await using var context = _factory.CreateDbContext();
TcpSendDatum t = new TcpSendDatum();
t.ClientId = T.clientid;
t.SendData = Tools.ByteToString(AAA1232.ToArray());
t.CreateTime = DateTime.Now;
t.CreateTimeUnix = T.currenttimestamp;
t.CommandType = "";
context.TcpSendData.Add(t);
await context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.Error("收到数据出异常了:" + ex.Message);
}
}
/// <summary>
/// MQTT的数据还没有 开始启用
/// </summary>
/// <param name="sender"></param>
/// <param name="ea"></param>
/// <returns></returns>
private async Task Consumer0_ReceivedAsync(object sender, BasicDeliverEventArgs ea)
{
// 获取 channel 对象
var consumer = (AsyncEventingBasicConsumer)sender;
var channel = consumer.Channel;
await channel.BasicAckAsync(ea.DeliveryTag, false);
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("收到消息 {0}", message);
try
{
//if (!string.IsNullOrEmpty(message))
//{
// Task.Factory.StartNew((state) =>
// {
// }, body);
//}
//string BodyString = state.ToString();
_logger.Error(body);
var QQQ = System.Text.Json.JsonSerializer.Deserialize<RawPayload>(message);
string topic = QQQ.topic;
string payload = QQQ.payload;
//传递过来的数据就是base64的没有必要再转
byte[] bbb = Convert.FromBase64String(payload);
string bbb1 = Tools.ByteToString(bbb);
this._logger.Info("Topic: " + topic);
this._logger.Info("PayLoad: " + bbb1);
byte[] NNN1 = Tools.HEXString2ByteArray(bbb1.Replace(" ", ""));
//AA 13 00 44 0B 00 FF FF FF FF 61 67 EF 6B E8 2D 2D 0D 0A 2D 2D
byte Header = NNN1[0];
//从1到3之间的元素不包括3
byte[] Len = NNN1[1..3];
//byte[] Len = NNN1.AsSpan(1,2);
byte[] Pack_SN = NNN1[3..5];
//重发次数
byte RetryCount = NNN1[5];
//设备编号
byte[] DeviceOnlyNo = NNN1[6..10];
//命令字
byte[] Command = NNN1[10..12];
//校验CRC16小端模式(低地址在前)
//校验内容Pack_Head + Pack_LEN + Pack_SN + Retry_Num + Client_ID + CMD + PARA
byte[] CRCCode = NNN1[^8..^6];
//结尾标志符
//// 获取从倒数第三个元素到数组末尾的所有元素
byte[] Tail = NNN1[^6..];
}
catch (Exception)
{
}
await Task.CompletedTask;
}
}
}