增加一些功能,比如 修正重启后,队列消费会报错的问题

This commit is contained in:
2026-04-02 15:16:13 +08:00
parent 182186e1fb
commit c13ab0cb56
25 changed files with 589 additions and 176 deletions

View File

@@ -118,6 +118,19 @@ namespace RCUHost.Implement
}
public static string UDPAllDataKey = "All_UDPPackage_Data";
private void EnsureConsumerGroup(string group)
{
try
{
// 检查或创建消费者组
var groups = CSRedisCacheHelper.redis1.XInfoGroups(UDPAllDataKey);
}
catch (Exception ex)
{
logger.Warn("创建消费者组失败:" + ex.Message);
}
}
public void StreamConsume(string group = "UDPData", string consumer = "Crics1", string task_key = "task1")
{
try
@@ -133,7 +146,7 @@ namespace RCUHost.Implement
{
var idarray = item.Item2;
string nsa = string.Concat(task_key, "#", idarray.Count().ToString());
CSRedisCacheHelper.Publish("udp_package_consumer", nsa);
//CSRedisCacheHelper.Publish("udp_package_consumer", nsa);
foreach (var SerializeNo in idarray)
{
var id1 = SerializeNo.Item1;
@@ -141,7 +154,6 @@ namespace RCUHost.Implement
redis.XAck(UDPAllDataKey, group, id1);
redis.XDel(UDPAllDataKey, id1);
//var GGG = JsonConvert.DeserializeObject<GaiXie>(str);
ProcessData(str);
}
}
@@ -155,16 +167,19 @@ namespace RCUHost.Implement
// 静态CancellationTokenSource和CancellationToken
public static CancellationTokenSource source = new CancellationTokenSource();
public static CancellationToken token = source.Token;
// 存储长时间运行的Task
private static List<Task> _consumerTasks = new List<Task>();
/// <summary>
/// 启动 HostServer
/// </summary>
public void Start()
{
source.Cancel();
Thread.Sleep(5000);
CSRedisCacheHelper.redis1.Del(UDPAllDataKey);
CSRedisCacheHelper.redis1.XGroupCreate(UDPAllDataKey, "UDPData", "0", true);
source = new CancellationTokenSource();
token = source.Token;
var DDD = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffffff");
CSRedisCacheHelper.Forever<string>(CacheKey.ServerStartTime, DDD);
@@ -184,14 +199,14 @@ namespace RCUHost.Implement
// 创建15个消费者Task
for (int i = 1; i <= 15; i++)
{
var task = Task.Factory.StartNew((state) =>
Task.Factory.StartNew((state) =>
{
string osos = state.ToString();
while (true)
while (source.IsCancellationRequested == false)
{
StreamConsume("UDPData", "Crics" + osos, "task" + osos);
}
}, i, TaskCreationOptions.LongRunning);
}, i, token);
}
}
catch (Exception ex)
@@ -202,32 +217,6 @@ namespace RCUHost.Implement
}
}
/// <summary>
/// 停止消费者Task
/// </summary>
public static void StopConsumerTasks()
{
if (source != null)
{
try
{
// 取消所有Task
source.Cancel();
// 等待Task完成
//Task.WaitAll(_consumerTasks.ToArray(), TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
logger.Error("停止消费者Task错误:" + ex.Message);
}
finally
{
source.Dispose();
_consumerTasks.Clear();
}
}
}
/// <summary>
/// 关闭 HostServer
/// </summary>
@@ -667,7 +656,10 @@ namespace RCUHost.Implement
string s1 = Tools.ByteToString(receiveBuffer111);
string data = s1 + "#" + remoteEP111.ToString();
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
//ProcessData(data);
CSRedisCacheHelper.StreamAdd(1, UDPAllDataKey, data);
}
}
else
@@ -691,7 +683,9 @@ namespace RCUHost.Implement
string s1 = Tools.ByteToString_NoWhiteSpace(receiveBuffer111);
string data = s1 + "#" + remoteEP111.ToString();
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
//ProcessData(data);
CSRedisCacheHelper.StreamAdd(1, UDPAllDataKey, data);
}
}
#endregion
@@ -1399,6 +1393,12 @@ namespace RCUHost.Implement
int length = context111.Data.Length - offset - 2;
string HostNUMBER = HostNNN;
//询问房态没有 数据体
if (cmdType == 0x32 && length == 0)
{
NewXieYi(context111, hotelCode, HostNNN, framenolist, cmdType, EndPoint, MyFrameNO);
}
if (length > 0)
{
#region
@@ -1758,6 +1758,7 @@ namespace RCUHost.Implement
byte[] framenolist = ts.Item4;
byte cmdType = ts.Item5;
string EndPoint = ts.Item6;
#region
if (cmdType == 0x33)
{
@@ -1855,11 +1856,12 @@ namespace RCUHost.Implement
string mns1 = Newtonsoft.Json.JsonConvert.SerializeObject(ns);
CSRedisCacheHelper.Publish("redis-rcu-card_action", mns1);
string HostID = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
string HostID_O = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
if (!string.IsNullOrEmpty(HostID))
if (!string.IsNullOrEmpty(HostID_O))
{
string HostID = HostID_O.Split('#')[0];
string KKey = CacheKey.HostModalStatus_Prefix + "_" + HostID + "_" + "004000001";
var OldHostModal = CSRedisCacheHelper.Get_Partition<HostModal_Cache>(KKey);
if (OldHostModal != null)
@@ -1882,6 +1884,7 @@ namespace RCUHost.Implement
}
#endregion
#region
// AA 55 35 00 54 33 53 41 34 10 80 EB 03 6B 24
// 01 //协议版本
// 01 //取电状态
@@ -1917,7 +1920,6 @@ namespace RCUHost.Implement
//P26~P29通道能耗单位Wh1度电 = 1KWh
//P30~P33通道总能耗单位Wh1度电 = 1KWh
#region
if (cmdType == 0x36)
{
@@ -2042,7 +2044,15 @@ namespace RCUHost.Implement
//CSRedisCacheHelper.Publish("redis-rcu-timer_data", Newtonsoft.Json.JsonConvert.SerializeObject(ns));
string HostID = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
string HostID_O = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
if (string.IsNullOrEmpty(HostID_O))
{
return;
}
string[] NNN111 = HostID_O.Split('#');
string HostID = NNN111[0];
string RoomTypeID = NNN111[1];
int room_type_id = int.Parse(RoomTypeID);
string RoomNUMBER = CSRedisCacheHelper.HMGet<string>(5, CacheKey.RoomNumber_HostNumber, HostNNN)[0];
if (string.IsNullOrEmpty(RoomNUMBER))
{
@@ -2050,40 +2060,65 @@ namespace RCUHost.Implement
}
//List<DingShiReportDate> DeviceList = new List<DingShiReportDate>();
//var DS1 = CSRedisCacheHelper.Get_Partition<List<DingShiReportDate>>(CacheKey.DingShiReportData + "_" + HostID, 3);
//if (DS1 != null)
//{
// DeviceList = DS1;
//}
//else
//{
// //logger.Error("定时上报的HostId:" + HostID);
// //定时上报要查询 当前房间配置了哪些设备
// //这里根据主机Id 号查询当前房间,有哪些设备
// if (!string.IsNullOrEmpty(HostID))
// {
// lock (oo)
// {
// //var list2 = HostModalRepository.LoadByHostID(int.Parse(HostID));
// //var list = list2.Where(r => r.Modal.IsUploadBaoJing).OrderBy(r => r.Modal.ModalAddress);//只装置标志启动的
// var qs= QuanJuVar.BaoJingUpLoad.Where(A=>A.RoomType.ID==room_type_id);
// List<DingShiReportDate> dingls = new List<DingShiReportDate>();
// //foreach (HostModal item in list)
// //{
// // DingShiReportDate ddd = new DingShiReportDate();
// // ddd.HostID = item.HostID;
// // ddd.Address = item.Modal.ModalAddress;
// // ddd.DeviceType = item.Modal.Type.ToString();
// // dingls.Add(ddd);
// //}
// foreach (var item in qs)
// {
// DingShiReportDate ddd = new DingShiReportDate();
// ddd.HostID = int.Parse(HostID);
// ddd.Address = item.ModalAddress;
// ddd.DeviceType = item.Type.ToString();
// dingls.Add(ddd);
// }
// DeviceList = dingls;
// CSRedisCacheHelper.Set_PartitionWithForever<List<DingShiReportDate>>(CacheKey.DingShiReportData + "_" + HostID, dingls, 3);
// }
// }
//}
List<DingShiReportDate> DeviceList = new List<DingShiReportDate>();
var DS1 = CSRedisCacheHelper.Get_Partition<List<DingShiReportDate>>(CacheKey.DingShiReportData + "_" + HostID, 3);
if (DS1 != null)
//定时上报要查询 当前房间配置了哪些设备
//这里根据主机Id 号查询当前房间,有哪些设备
if (!string.IsNullOrEmpty(HostID))
{
DeviceList = DS1;
}
else
{
//logger.Error("定时上报的HostId:" + HostID);
//定时上报要查询 当前房间配置了哪些设备
//这里根据主机Id 号查询当前房间,有哪些设备
if (!string.IsNullOrEmpty(HostID))
var qs = QuanJuVar.BaoJingUpLoad.Where(A => A.RoomType.ID == room_type_id);
foreach (var item in qs)
{
lock (oo)
{
var list2 = HostModalRepository.LoadByHostID(int.Parse(HostID));
var list = list2.Where(r => r.Modal.IsUploadBaoJing).OrderBy(r => r.Modal.ModalAddress);//只装置标志启动的
List<DingShiReportDate> dingls = new List<DingShiReportDate>();
foreach (HostModal item in list)
{
DingShiReportDate ddd = new DingShiReportDate();
ddd.HostID = item.HostID;
ddd.Address = item.Modal.ModalAddress;
ddd.DeviceType = item.Modal.Type.ToString();
dingls.Add(ddd);
}
DeviceList = dingls;
CSRedisCacheHelper.Set_PartitionWithForever<List<DingShiReportDate>>(CacheKey.DingShiReportData + "_" + HostID, dingls, 3);
}
DingShiReportDate ddd = new DingShiReportDate();
ddd.HostID = int.Parse(HostID);
ddd.Address = item.ModalAddress;
ddd.DeviceType = item.Type.ToString();
DeviceList.Add(ddd);
}
}
List<DingShiReportDate> DeviceStatusList = new List<DingShiReportDate>();
foreach (DingShiReportDate item in DeviceList)
{
@@ -2170,7 +2205,7 @@ namespace RCUHost.Implement
//从取电动作那里取数据
if (Version == 0x01)
{
string HostID_redis = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
//string HostID_redis = CSRedisCacheHelper.HMGet<string>(5, CacheKey.HostId_HostNumber, HostNNN)[0];
if (!string.IsNullOrEmpty(HostID))
{
string KKey = CacheKey.HostModalStatus_Prefix + "_" + HostID + "_" + "004000001";