修改长时间内存会炸

This commit is contained in:
2026-03-25 17:51:43 +08:00
parent 1840794f40
commit d0c626c189
61 changed files with 82737 additions and 271 deletions

View File

@@ -461,218 +461,15 @@ namespace RCUHost.Implement
byte[] receiveBuffer111 = state.UdpClient.EndReceive(ar, ref remoteEP111);
state.UdpClient.BeginReceive(ReceiveCallback, state);
if (receiveBuffer111.Length > 0)
// 2. 数据处理逻辑 - 异步处理,不阻塞接收线程
if (receiveBuffer111 != null && receiveBuffer111.Length > 0)
{
#region
Interlocked.Increment(ref YUANZI_TongJi.TotalReceiveCount);
#endregion
ReceiverContext context = new ReceiverContext(receiveBuffer111, remoteEP111, GetNextCustomer());
int length = context.Data.Length;
if (length < 3 || length != BitConverter.ToInt16(context.Data, 2))
// 使用线程池处理数据,避免阻塞接收线程
ThreadPool.QueueUserWorkItem((stateObj) =>
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;//非正确的数据包
}
context.SystemHeader = DecodeSystemHeader(context.Data);
if (context.SystemHeader.HasValue)
{
if (!String.Equals(new String(context.SystemHeader.Value.SystemID), SystemHeader.SYSTEM_ID, StringComparison.OrdinalIgnoreCase))
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;//非系统数据包
}
try
{
//透传是新增加的
var VVV = context.SystemHeader.Value;
string KKKHNH = CacheKey.TouChuanKey;
var HHHostNo = context.SystemHeader.Value.HostNumber.ToString();
var da1 = CSRedisCacheHelper.HMGet<string>(1, KKKHNH, HHHostNo);
if (!string.IsNullOrEmpty(da1[0]))
{
string nns = Tools.ByteToString(context.Data);
if (VVV.CmdType == 0x71)
{
BLWMQTT.MQTTPublishData("blw/touchuan/report/" + HHHostNo, nns);
}
}
#region
if (VVV.CmdType != 0xD6 && VVV.CmdType != 0x0C)
{
var hostnumber1 = VVV.HostNumber;
string hotelCode = hostnumber1.ToHotelCode().ToString();//获取酒店编码
var hostnumber2 = hostnumber1.ToString();
//时间拦截 超过2秒就不再处理
string ShiJianLanJie = CacheKey.TimeIntercept + "_" + hostnumber2;
string ShiJianLanJieSync = CacheKey.SyncTimeIntercept + "_" + hostnumber2;
if (VVV.CmdType == 0x0E)
{
//MemoryCacheHelper.Set(ShiJianLanJie, 1, DateTimeOffset.Now.AddSeconds(DataTongJi.LostPackage_Interval));
//有错误的0x19数据所有的 在0x19数据触发后的5秒内的后续数据就不再接收
string key123 = RoomStatusReceiver.PoolOverFlowKey + "_" + hostnumber2;
var OOObj = MemoryCacheHelper.Get(key123);
if (OOObj != null)
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;
}
}
///消息ID +1
Interlocked.Increment(ref StepTongJi.EveryMessageIDNo);
//因为0E不会被拦截所以可以在这里写
if (VVV.CmdType == 0x0E || VVV.CmdType == 0X01)
{
//StepTongJi.EveryMessageID = Guid.NewGuid().ToString("N");
//计数器+1
Interlocked.Increment(ref StepTongJi.LookDataCounter);
//开始监听
Interlocked.Increment(ref StepTongJi.Every_0E_01_MessageID);
if (StepTongJi.LookDataCounter == 500)
{
//监听的ID
//Interlocked.Exchange(ref StepTongJi.GlobalListenID, StepTongJi.EveryMessageID);
context.MessageID = Guid.NewGuid().ToString("N");
context.IsMonitor = true;
int maxWorker = 0;
int maxIo = 0;
int availWorker = 0;
int availIo = 0;
// 获取最大线程数
ThreadPool.GetMaxThreads(out maxWorker, out maxIo);
// 获取可用线程数
ThreadPool.GetAvailableThreads(out availWorker, out availIo);
// 计算忙碌线程数
int busyWorker = maxWorker - availWorker;
int busyIo = maxIo - availIo;
int minWorker = 0;
int minIo = 0;
// 获取最小线程数
ThreadPool.GetMinThreads(out minWorker, out minIo);
int tid = Thread.CurrentThread.ManagedThreadId;
string NNN1 = string.Format("工作线程{8}: 最小={0}, 最大={1}, 可用={2}, 使用中={3},IO线程: 最小={4}, 最大={5}, 可用={6}, 使用中={7}", minWorker, maxWorker, availWorker, busyWorker, minIo, maxIo, availIo, busyIo, tid);
StepTongJi.SendInfo(0.1, NNN1, context.MessageID, context.IsMonitor);
string NNN7 = string.Format("HostNUMBER:{0},端点:{1}", hostnumber2, context.RemoteEndPoint.ToString());
StepTongJi.SendInfo(0.2, NNN7, context.MessageID, context.IsMonitor);
StepInfo s = new StepInfo();
s.Step = 1;
s.MessageId = context.MessageID;
s.StepDescription = "进入UDPCallBack";
//string ti = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffffff");
string ti = CPUData.GetNowPrecise().ToString("yyyy-MM-dd HH:mm:ss.ffffff");
s.TriggerTime = ti;
s.Content = context.Data;
s.EveryMessageId = StepTongJi.EveryMessageIDNo;
s.Monitor_0E_01 = StepTongJi.Every_0E_01_MessageID;
s.HotelCode = hotelCode;
s.HostNumber = hostnumber2;
string NNN = Newtonsoft.Json.JsonConvert.SerializeObject(s);
CSRedisCacheHelper.Publish("redis-roomstatus-monitor", NNN);
Interlocked.Exchange(ref StepTongJi.LookDataCounter, 0);
}
}
if (VVV.CmdType == 0x08)
{
MemoryCacheHelper.Set(ShiJianLanJieSync, 1, DateTimeOffset.Now.AddSeconds(DataTongJi.LostPackage_Interval));
}
if (VVV.CmdType == 0x02)
{
MemoryCacheHelper.Set(ShiJianLanJie, 1, DateTimeOffset.Now.AddSeconds(50));
}
string LanJieKey = "Intercept";
ConcurrentBag<string> RoomNumberList = null;
bool isexists = DataTongJi.BlockLowerMachineList.TryGetValue(hotelCode, out RoomNumberList);
//如果存在数据
if (isexists)
{
//如果 设备序号存在就过滤掉
if (RoomNumberList != null && RoomNumberList.Count > 0)
{
//只过滤这些数据
if (RoomNumberList.ToList().Contains(hostnumber1.ToString()))
{
if (VVV.CmdType == 0x0E)
{
string KeyFilter = "StatusFilter";
RCUHost.RCUHostCommon.tools.LanJieData(KeyFilter, hotelCode);
}
LanJieData(LanJieKey, hotelCode);
StepTongJi.SendInfo(1.01, "进入黑名单,不再有后续处理", context.MessageID, context.IsMonitor);
}
//如果不存在就处理
else
{
GaiXie g = new GaiXie();
g.Data = receiveBuffer111;
g.IPEndPoint = remoteEP111.ToString();
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
}
}
else
{
if (VVV.CmdType == 0x0E)
{
string KeyFilter = "StatusFilter";
RCUHost.RCUHostCommon.tools.LanJieData(KeyFilter, hotelCode);
}
//如果存在,且 设备列表为0
LanJieData(LanJieKey, hotelCode);
StepTongJi.SendInfo(1.01, "进入黑名单,不再有后续处理", context.MessageID, context.IsMonitor);
}
}
else
{
GaiXie g = new GaiXie();
g.Data = receiveBuffer111;
g.IPEndPoint = remoteEP111.ToString();
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
var ts1 = new Tuple<ReceiverContext, string>(context, hotelCode);
}
}
#endregion
else
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
}
}
catch (Exception ex)
{
logger.Error("统计Error:" + ex.Message);
}
}
else
{
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
}
var dataState = (Tuple<byte[], IPEndPoint>)stateObj;
ProcessUdpData(dataState.Item1, dataState.Item2);
}, Tuple.Create(receiveBuffer111, remoteEP111));
}
}
catch (Exception ex)
@@ -695,6 +492,226 @@ namespace RCUHost.Implement
}
}
public void ProcessUdpData(byte[] receiveBuffer111, IPEndPoint remoteEP111)
{
if (receiveBuffer111.Length > 0)
{
#region
Interlocked.Increment(ref YUANZI_TongJi.TotalReceiveCount);
#endregion
ReceiverContext context = new ReceiverContext(receiveBuffer111, remoteEP111, GetNextCustomer());
int length = context.Data.Length;
if (length < 3 || length != BitConverter.ToInt16(context.Data, 2))
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;//非正确的数据包
}
context.SystemHeader = DecodeSystemHeader(context.Data);
if (context.SystemHeader.HasValue)
{
if (!String.Equals(new String(context.SystemHeader.Value.SystemID), SystemHeader.SYSTEM_ID, StringComparison.OrdinalIgnoreCase))
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;//非系统数据包
}
try
{
//透传是新增加的
var VVV = context.SystemHeader.Value;
string KKKHNH = CacheKey.TouChuanKey;
var HHHostNo = context.SystemHeader.Value.HostNumber.ToString();
var da1 = CSRedisCacheHelper.HMGet<string>(1, KKKHNH, HHHostNo);
if (!string.IsNullOrEmpty(da1[0]))
{
string nns = Tools.ByteToString(context.Data);
if (VVV.CmdType == 0x71)
{
BLWMQTT.MQTTPublishData("blw/touchuan/report/" + HHHostNo, nns);
}
}
#region
if (VVV.CmdType != 0xD6 && VVV.CmdType != 0x0C)
{
var hostnumber1 = VVV.HostNumber;
string hotelCode = hostnumber1.ToHotelCode().ToString();//获取酒店编码
var hostnumber2 = hostnumber1.ToString();
//时间拦截 超过2秒就不再处理
string ShiJianLanJie = CacheKey.TimeIntercept + "_" + hostnumber2;
string ShiJianLanJieSync = CacheKey.SyncTimeIntercept + "_" + hostnumber2;
if (VVV.CmdType == 0x0E)
{
//有错误的0x19数据所有的 在0x19数据触发后的5秒内的后续数据就不再接收
#region 0x19
string key123 = RoomStatusReceiver.PoolOverFlowKey + "_" + hostnumber2;
var OOObj = MemoryCacheHelper.Get(key123);
if (OOObj != null)
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
return;
}
#endregion
}
///消息ID +1
Interlocked.Increment(ref StepTongJi.EveryMessageIDNo);
//因为0E不会被拦截所以可以在这里写
bool nenver = false;
if (nenver)
//if (VVV.CmdType == 0x0E || VVV.CmdType == 0X01)
{
//StepTongJi.EveryMessageID = Guid.NewGuid().ToString("N");
//计数器+1
Interlocked.Increment(ref StepTongJi.LookDataCounter);
//开始监听
Interlocked.Increment(ref StepTongJi.Every_0E_01_MessageID);
if (StepTongJi.LookDataCounter == 500)
{
//监听的ID
//Interlocked.Exchange(ref StepTongJi.GlobalListenID, StepTongJi.EveryMessageID);
context.MessageID = Guid.NewGuid().ToString("N");
context.IsMonitor = true;
int maxWorker = 0;
int maxIo = 0;
int availWorker = 0;
int availIo = 0;
// 获取最大线程数
ThreadPool.GetMaxThreads(out maxWorker, out maxIo);
// 获取可用线程数
ThreadPool.GetAvailableThreads(out availWorker, out availIo);
// 计算忙碌线程数
int busyWorker = maxWorker - availWorker;
int busyIo = maxIo - availIo;
int minWorker = 0;
int minIo = 0;
// 获取最小线程数
ThreadPool.GetMinThreads(out minWorker, out minIo);
int tid = Thread.CurrentThread.ManagedThreadId;
string NNN1 = string.Format("工作线程{8}: 最小={0}, 最大={1}, 可用={2}, 使用中={3},IO线程: 最小={4}, 最大={5}, 可用={6}, 使用中={7}", minWorker, maxWorker, availWorker, busyWorker, minIo, maxIo, availIo, busyIo, tid);
StepTongJi.SendInfo(0.1, NNN1, context.MessageID, context.IsMonitor);
string NNN7 = string.Format("HostNUMBER:{0},端点:{1}", hostnumber2, context.RemoteEndPoint.ToString());
StepTongJi.SendInfo(0.2, NNN7, context.MessageID, context.IsMonitor);
StepInfo s = new StepInfo();
s.Step = 1;
s.MessageId = context.MessageID;
s.StepDescription = "进入UDPCallBack";
//string ti = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffffff");
string ti = CPUData.GetNowPrecise().ToString("yyyy-MM-dd HH:mm:ss.ffffff");
s.TriggerTime = ti;
s.Content = context.Data;
s.EveryMessageId = StepTongJi.EveryMessageIDNo;
s.Monitor_0E_01 = StepTongJi.Every_0E_01_MessageID;
s.HotelCode = hotelCode;
s.HostNumber = hostnumber2;
string NNN = Newtonsoft.Json.JsonConvert.SerializeObject(s);
CSRedisCacheHelper.Publish("redis-roomstatus-monitor", NNN);
Interlocked.Exchange(ref StepTongJi.LookDataCounter, 0);
}
}
if (VVV.CmdType == 0x08)
{
MemoryCacheHelper.Set(ShiJianLanJieSync, 1, DateTimeOffset.Now.AddSeconds(DataTongJi.LostPackage_Interval));
}
if (VVV.CmdType == 0x02)
{
MemoryCacheHelper.Set(ShiJianLanJie, 1, DateTimeOffset.Now.AddSeconds(50));
}
string LanJieKey = "Intercept";
ConcurrentBag<string> RoomNumberList = null;
bool isexists = DataTongJi.BlockLowerMachineList.TryGetValue(hotelCode, out RoomNumberList);
//如果存在数据
if (isexists)
{
//如果 设备序号存在就过滤掉
if (RoomNumberList != null && RoomNumberList.Count > 0)
{
//只过滤这些数据
if (RoomNumberList.Contains(hostnumber1.ToString()))
{
if (VVV.CmdType == 0x0E)
{
string KeyFilter = "StatusFilter";
RCUHost.RCUHostCommon.tools.LanJieData(KeyFilter, hotelCode);
}
LanJieData(LanJieKey, hotelCode);
StepTongJi.SendInfo(1.01, "进入黑名单,不再有后续处理", context.MessageID, context.IsMonitor);
}
//如果不存在就处理
else
{
GaiXie g = new GaiXie();
g.Data = receiveBuffer111;
g.IPEndPoint = remoteEP111.ToString();
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
}
}
else
{
if (VVV.CmdType == 0x0E)
{
string KeyFilter = "StatusFilter";
RCUHost.RCUHostCommon.tools.LanJieData(KeyFilter, hotelCode);
}
//如果存在,且 设备列表为0
LanJieData(LanJieKey, hotelCode);
StepTongJi.SendInfo(1.01, "进入黑名单,不再有后续处理", context.MessageID, context.IsMonitor);
}
}
else
{
GaiXie g = new GaiXie();
g.Data = receiveBuffer111;
g.IPEndPoint = remoteEP111.ToString();
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
var ts1 = new Tuple<ReceiverContext, string>(context, hotelCode);
}
}
#endregion
else
{
//错误数据
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
}
}
catch (Exception ex)
{
logger.Error("统计Error:" + ex.Message);
}
}
else
{
Interlocked.Increment(ref YUANZI_TongJi.TotalErrorPackageReceiveCount);
}
}
}
public static void LanJieData(string Key11, string hotelCode)
{
var Key = "UDPPackage_" + Key11.ToString();
@@ -1966,9 +1983,9 @@ namespace RCUHost.Implement
{
string dizhi = item.Value.FaultNo;
ts_faultitem t1 = new ts_faultitem();
t1.dev_type = short.Parse(dizhi.Substring(0, 3));
t1.dev_addr = short.Parse(dizhi.Substring(3, 3));
t1.dev_loop = short.Parse(dizhi.Substring(6, 3));
t1.dev_type = ushort.Parse(dizhi.Substring(0, 3));
t1.dev_addr = ushort.Parse(dizhi.Substring(3, 3));
t1.dev_loop = ushort.Parse(dizhi.Substring(6, 3));
t1.error_type = item.Value.Type;
t1.error_data = item.Value.Data;
lll2.Add(t1);
@@ -2624,7 +2641,7 @@ namespace RCUHost.Implement
CardEvent = NOCardInfo,
PMS_Status = PMS_CurrentStatus,
Bright_G = Bright_Va,
WeiXinSuo_DianLiang=usa
WeiXinSuo_DianLiang = usa
};
string mns = Newtonsoft.Json.JsonConvert.SerializeObject(ns2);