修改一些可能在某些条件 下可能会触发的BUG

This commit is contained in:
2026-03-26 14:18:52 +08:00
parent d0c626c189
commit 696144b2ff
59 changed files with 109 additions and 82682 deletions

View File

@@ -123,6 +123,18 @@ namespace RCUHost.Implement
try
{
var redis = CSRedisCacheHelper.redis1;
// 检查并创建消费者组
try
{
// 尝试创建消费者组,如果已存在则忽略错误
redis.XGroupCreate(UDPAllDataKey, group, "0", true);
}
catch (Exception)
{
// 消费者组可能已存在,忽略错误
}
var data = redis.XReadGroup(group, consumer, 500, 10, new ValueTuple<string, string>(UDPAllDataKey, ">"));
if (data != null)
@@ -150,22 +162,31 @@ namespace RCUHost.Implement
logger.Error("消息队列失败:" + ex.Message);
}
}
// 静态CancellationTokenSource和CancellationToken
public static CancellationTokenSource source = new CancellationTokenSource();
public static CancellationToken token = source.Token;
// 存储长时间运行的Task
private static List<Task> _consumerTasks = new List<Task>();
/// <summary>
/// 启动 HostServer
/// </summary>
public void Start()
{
// 停止之前的Task
StopConsumerTasks();
// 创建新的取消令牌源
source = new CancellationTokenSource();
token = source.Token;
CSRedisCacheHelper.redis1.Del(UDPAllDataKey);
CSRedisCacheHelper.redis1.XGroupCreate(UDPAllDataKey, "UDPData", "0", true);
var DDD = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffffff");
CSRedisCacheHelper.Forever<string>(CacheKey.ServerStartTime, DDD);
//FaultReport.Interval = 10000;
//FaultReport.Elapsed += new System.Timers.ElapsedEventHandler(FaultReport_Elapsed);
//FaultReport.Start();
Close();
//CSRedisCacheHelper.StreamAdd(1,UDPAllDataKey, data);
//CSRedisCacheHelper.StreamConsume(1, UDPAllDataKey);
try
{
udpClient = new UdpClient(3339);
@@ -177,29 +198,71 @@ namespace RCUHost.Implement
udpClient.BeginReceive(ReceiveCallback, new UdpState(udpClient));
logger.Error("Host Server启动成功端口3339");
CSRedisCacheHelper.redis1.Del(UDPAllDataKey);
CSRedisCacheHelper.redis1.XGroupCreate(UDPAllDataKey, "UDPData", "0", true);
// 清空之前的Task列表
_consumerTasks.Clear();
// 创建15个消费者Task
for (int i = 1; i <= 15; i++)
{
Task.Factory.StartNew((iii) =>
int index = i;
var task = Task.Factory.StartNew(() =>
{
string osos = iii.ToString();
while (true)
string osos = index.ToString();
while (!token.IsCancellationRequested)
{
StreamConsume("UDPData", "Crics" + osos, "task" + osos);
try
{
StreamConsume("UDPData", "Crics" + osos, "task" + osos);
}
catch (OperationCanceledException)
{
// 任务被取消,正常退出
break;
}
catch (Exception ex)
{
logger.Error("消费者Task错误:" + ex.Message);
}
}
}, i, TaskCreationOptions.LongRunning);
}, token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
_consumerTasks.Add(task);
}
}
catch (Exception ex)
{
logger.Error("Host Server启动失败端口3339");
Close();
StopConsumerTasks();
throw ex;//不能去掉否则重启iis服务通讯服务不会再次启动
}
}
/// <summary>
/// 停止消费者Task
/// </summary>
private static void StopConsumerTasks()
{
if (source != null)
{
try
{
// 取消所有Task
source.Cancel();
// 等待Task完成
Task.WaitAll(_consumerTasks.ToArray(), TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
logger.Error("停止消费者Task错误:" + ex.Message);
}
finally
{
source.Dispose();
_consumerTasks.Clear();
}
}
}
//public Random rs = new Random();
@@ -455,11 +518,15 @@ namespace RCUHost.Implement
private void ReceiveCallback(IAsyncResult ar)
{
UdpState state = ar.AsyncState as UdpState;
if (state == null || state.UdpClient == null)
{
return;
}
try
{
IPEndPoint remoteEP111 = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBuffer111 = state.UdpClient.EndReceive(ar, ref remoteEP111);
state.UdpClient.BeginReceive(ReceiveCallback, state);
// 2. 数据处理逻辑 - 异步处理,不阻塞接收线程
if (receiveBuffer111 != null && receiveBuffer111.Length > 0)
@@ -472,23 +539,31 @@ namespace RCUHost.Implement
}, Tuple.Create(receiveBuffer111, remoteEP111));
}
}
catch (ObjectDisposedException)
{
// UdpClient已被释放正常退出
}
catch (Exception ex)
{
logger.Error(string.Format("接收数据失败:{0}", ex.ToString()));
}
finally
{
//try
//{
// if (this.udpClient != null)
// {
// state.UdpClient.BeginReceive(ReceiveCallback, new UdpState(this.udpClient));
// }
//}
//catch
//{
//}
try
{
if (this.udpClient != null && state != null && state.UdpClient != null)
{
state.UdpClient.BeginReceive(ReceiveCallback, new UdpState(this.udpClient));
}
}
catch (ObjectDisposedException)
{
// UdpClient已被释放正常退出
}
catch
{
// 忽略其他错误
}
}
}