增加电量报警模块
但是只是一部分功能
This commit is contained in:
@@ -123,18 +123,7 @@ namespace RCUHost.Implement
|
||||
try
|
||||
{
|
||||
var redis = CSRedisCacheHelper.redis1;
|
||||
|
||||
// 检查并创建消费者组
|
||||
try
|
||||
{
|
||||
// 尝试创建消费者组,如果已存在则忽略错误
|
||||
redis.XGroupCreate(UDPAllDataKey, group, "0", true);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// 消费者组可能已存在,忽略错误
|
||||
}
|
||||
|
||||
|
||||
var data = redis.XReadGroup(group, consumer, 500, 10, new ValueTuple<string, string>(UDPAllDataKey, ">"));
|
||||
|
||||
if (data != null)
|
||||
@@ -149,10 +138,11 @@ namespace RCUHost.Implement
|
||||
{
|
||||
var id1 = SerializeNo.Item1;
|
||||
var str = SerializeNo.Item2[1];
|
||||
var GGG = JsonConvert.DeserializeObject<GaiXie>(str);
|
||||
redis.XAck(UDPAllDataKey, group, id1);
|
||||
redis.XDel(UDPAllDataKey, id1);
|
||||
ProcessData(GGG);
|
||||
|
||||
//var GGG = JsonConvert.DeserializeObject<GaiXie>(str);
|
||||
ProcessData(str);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -166,19 +156,12 @@ namespace RCUHost.Implement
|
||||
public static CancellationTokenSource source = new CancellationTokenSource();
|
||||
public static CancellationToken token = source.Token;
|
||||
// 存储长时间运行的Task
|
||||
private static List<Task> _consumerTasks = new List<Task>();
|
||||
private static List<Task> _consumerTasks = new List<Task>();
|
||||
/// <summary>
|
||||
/// 启动 HostServer
|
||||
/// </summary>
|
||||
public void Start()
|
||||
{
|
||||
// 停止之前的Task
|
||||
StopConsumerTasks();
|
||||
|
||||
// 创建新的取消令牌源
|
||||
source = new CancellationTokenSource();
|
||||
token = source.Token;
|
||||
|
||||
CSRedisCacheHelper.redis1.Del(UDPAllDataKey);
|
||||
CSRedisCacheHelper.redis1.XGroupCreate(UDPAllDataKey, "UDPData", "0", true);
|
||||
|
||||
@@ -198,50 +181,31 @@ namespace RCUHost.Implement
|
||||
udpClient.BeginReceive(ReceiveCallback, new UdpState(udpClient));
|
||||
logger.Error("Host Server启动成功,端口:3339");
|
||||
|
||||
// 清空之前的Task列表
|
||||
_consumerTasks.Clear();
|
||||
|
||||
// 创建15个消费者Task
|
||||
for (int i = 1; i <= 15; i++)
|
||||
{
|
||||
int index = i;
|
||||
var task = Task.Factory.StartNew(() =>
|
||||
var task = Task.Factory.StartNew((state) =>
|
||||
{
|
||||
string osos = index.ToString();
|
||||
while (!token.IsCancellationRequested)
|
||||
string osos = state.ToString();
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
StreamConsume("UDPData", "Crics" + osos, "task" + osos);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// 任务被取消,正常退出
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error("消费者Task错误:" + ex.Message);
|
||||
}
|
||||
StreamConsume("UDPData", "Crics" + osos, "task" + osos);
|
||||
}
|
||||
}, token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
||||
|
||||
_consumerTasks.Add(task);
|
||||
}, i, TaskCreationOptions.LongRunning);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Error("Host Server启动失败,端口:3339");
|
||||
Close();
|
||||
StopConsumerTasks();
|
||||
throw ex;//不能去掉,否则重启iis服务,通讯服务不会再次启动
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 停止消费者Task
|
||||
/// </summary>
|
||||
private static void StopConsumerTasks()
|
||||
public static void StopConsumerTasks()
|
||||
{
|
||||
if (source != null)
|
||||
{
|
||||
@@ -250,7 +214,7 @@ namespace RCUHost.Implement
|
||||
// 取消所有Task
|
||||
source.Cancel();
|
||||
// 等待Task完成
|
||||
Task.WaitAll(_consumerTasks.ToArray(), TimeSpan.FromSeconds(5));
|
||||
//Task.WaitAll(_consumerTasks.ToArray(), TimeSpan.FromSeconds(1));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -264,52 +228,6 @@ namespace RCUHost.Implement
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//public Random rs = new Random();
|
||||
///// <summary>
|
||||
///// 定时上报 错误
|
||||
///// </summary>
|
||||
///// <param name="sender"></param>
|
||||
///// <param name="e"></param>
|
||||
//void FaultReport_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
|
||||
//{
|
||||
// try
|
||||
// {
|
||||
|
||||
// //FaultReport.Stop();
|
||||
// string FaultKey = CacheKey.FaultKey_;
|
||||
// //string FFFFFKey = HotelCode + "_" + host.RoomNumber + "_" + hostModal.Modal.ModalAddress;
|
||||
// var data = CSRedisCacheHelper.HMGetAll(5, FaultKey);
|
||||
// foreach (var item in data)
|
||||
// {
|
||||
// var key = item.Key;
|
||||
// var val = item.Value;
|
||||
// var v = JsonConvert.DeserializeObject<XuanZhuResponse_Fault>(val);
|
||||
// TimeSpan span = DateTime.Now - v.updatetime;
|
||||
// if (span.TotalSeconds <= 20)
|
||||
// {
|
||||
// var fault_data = new Tuple<string, XuanZhuResponse>(v.url, v.data);
|
||||
// Task.Factory.StartNew((State) =>
|
||||
// {
|
||||
// var DDD = State as Tuple<string, XuanZhuResponse>;
|
||||
// string Res_P = MyHttp.SendHttpData(DDD.Item1, DDD.Item1);
|
||||
// //XuanZhuOperation.ReportService(DDD.Item1, DDD.Item2);
|
||||
// logger.Error("Fault Return:" + "");
|
||||
// }, fault_data);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// catch (Exception)
|
||||
// {
|
||||
|
||||
// }
|
||||
// finally
|
||||
// {
|
||||
// var ts = rs.Next(10000, 15000);
|
||||
// FaultReport.Interval = ts;
|
||||
// FaultReport.Start();
|
||||
// }
|
||||
//}
|
||||
/// <summary>
|
||||
/// 关闭 HostServer
|
||||
/// </summary>
|
||||
@@ -509,7 +427,6 @@ namespace RCUHost.Implement
|
||||
return _customer;
|
||||
}
|
||||
public IHostRepository HostRepository { get; set; }
|
||||
public ISysHotelRepository SysHotelRepository { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 异步接收回调函数
|
||||
@@ -522,7 +439,7 @@ namespace RCUHost.Implement
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
IPEndPoint remoteEP111 = new IPEndPoint(IPAddress.Any, 0);
|
||||
@@ -532,11 +449,12 @@ namespace RCUHost.Implement
|
||||
if (receiveBuffer111 != null && receiveBuffer111.Length > 0)
|
||||
{
|
||||
// 使用线程池处理数据,避免阻塞接收线程
|
||||
ThreadPool.QueueUserWorkItem((stateObj) =>
|
||||
{
|
||||
var dataState = (Tuple<byte[], IPEndPoint>)stateObj;
|
||||
ProcessUdpData(dataState.Item1, dataState.Item2);
|
||||
}, Tuple.Create(receiveBuffer111, remoteEP111));
|
||||
//ThreadPool.QueueUserWorkItem((stateObj) =>
|
||||
//{
|
||||
// var dataState = (Tuple<byte[], IPEndPoint>)stateObj;
|
||||
//}, Tuple.Create(receiveBuffer111, remoteEP111));
|
||||
|
||||
ProcessUdpData(receiveBuffer111, remoteEP111);
|
||||
}
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
@@ -645,6 +563,8 @@ namespace RCUHost.Implement
|
||||
|
||||
//因为0E不会被拦截所以可以在这里写
|
||||
bool nenver = false;
|
||||
|
||||
#region 弃用,有新的日志平台,这里挺耗资源的
|
||||
if (nenver)
|
||||
//if (VVV.CmdType == 0x0E || VVV.CmdType == 0X01)
|
||||
{
|
||||
@@ -706,6 +626,7 @@ namespace RCUHost.Implement
|
||||
Interlocked.Exchange(ref StepTongJi.LookDataCounter, 0);
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
if (VVV.CmdType == 0x08)
|
||||
{
|
||||
@@ -739,10 +660,13 @@ namespace RCUHost.Implement
|
||||
//如果不存在就处理
|
||||
else
|
||||
{
|
||||
GaiXie g = new GaiXie();
|
||||
g.Data = receiveBuffer111;
|
||||
g.IPEndPoint = remoteEP111.ToString();
|
||||
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
|
||||
//GaiXie g = new GaiXie();
|
||||
//g.Data = receiveBuffer111;
|
||||
//g.IPEndPoint = remoteEP111.ToString();
|
||||
//var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
|
||||
|
||||
string s1 = Tools.ByteToString(receiveBuffer111);
|
||||
string data = s1 + "#" + remoteEP111.ToString();
|
||||
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
|
||||
}
|
||||
}
|
||||
@@ -760,12 +684,14 @@ namespace RCUHost.Implement
|
||||
}
|
||||
else
|
||||
{
|
||||
GaiXie g = new GaiXie();
|
||||
g.Data = receiveBuffer111;
|
||||
g.IPEndPoint = remoteEP111.ToString();
|
||||
var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
|
||||
//GaiXie g = new GaiXie();
|
||||
//g.Data = receiveBuffer111;
|
||||
//g.IPEndPoint = remoteEP111.ToString();
|
||||
//var data = Newtonsoft.Json.JsonConvert.SerializeObject(g);
|
||||
|
||||
string s1 = Tools.ByteToString_NoWhiteSpace(receiveBuffer111);
|
||||
string data = s1 + "#" + remoteEP111.ToString();
|
||||
CSRedisCacheHelper.StreamAdd(1, "All_UDPPackage_Data", data);
|
||||
var ts1 = new Tuple<ReceiverContext, string>(context, hotelCode);
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
@@ -1302,21 +1228,24 @@ namespace RCUHost.Implement
|
||||
/// </summary>
|
||||
/// <param name="context"></param>
|
||||
//private void ProcessData(ReceiverContext context111, string hotelCode)
|
||||
private void ProcessData(GaiXie gga)
|
||||
//private void ProcessData(GaiXie gga)
|
||||
private void ProcessData(string gga)
|
||||
{
|
||||
//GaiXie gga = new GaiXie();
|
||||
string[] III = gga.IPEndPoint.Split(':');
|
||||
string[] ggA = gga.Split('#');
|
||||
byte[] s1 = Tools.GetBytesFromString(ggA[0]);
|
||||
string s2 = ggA[1];
|
||||
|
||||
string[] III = s2.Split(':');
|
||||
int PPP = int.Parse(III[1]);
|
||||
//string[] III = gga.IPEndPoint.Split(':');
|
||||
//int PPP = int.Parse(III[1]);
|
||||
IPEndPoint ip = new IPEndPoint(IPAddress.Parse(III[0]), PPP);
|
||||
ReceiverContext context111 = new ReceiverContext(gga.Data, ip, GetNextCustomer());
|
||||
ReceiverContext context111 = new ReceiverContext(s1, ip, GetNextCustomer());
|
||||
context111.SystemHeader = DecodeSystemHeader(context111.Data);
|
||||
|
||||
|
||||
byte cmdType = context111.SystemHeader.Value.CmdType;
|
||||
if (cmdType == 0x68)
|
||||
{
|
||||
//logger.Error("收到升级返回:" + Tools.ByteToString(gga.Data));
|
||||
}
|
||||
ushort MyFrameNO = context111.SystemHeader.Value.FrameNo;
|
||||
|
||||
byte[] framenolist = BitConverter.GetBytes(MyFrameNO);
|
||||
@@ -1342,10 +1271,8 @@ namespace RCUHost.Implement
|
||||
CSRedisCacheHelper.Publish("redis-heartbeat", Newtonsoft.Json.JsonConvert.SerializeObject(k));
|
||||
}
|
||||
}
|
||||
if (cmdType != 0x01 && cmdType != 0xb1)
|
||||
//if (!(cmdType==0x01||cmdType==0xb1) )
|
||||
//在线状态 不排队 注册0x01
|
||||
//if (true)
|
||||
if (cmdType != 0x01 && cmdType != 0xb1)
|
||||
{
|
||||
#region 只要有数据包,就是心跳
|
||||
string EndPointStr = context111.RemoteEndPoint.ToString();
|
||||
@@ -1362,7 +1289,7 @@ namespace RCUHost.Implement
|
||||
o.CurrentTime = DateTime.Now;
|
||||
o.EndPoint = EndPointStr;
|
||||
o.UnixTime = Tools.GetUnixTime();
|
||||
o.MAC = BitConverter.ToString(gga.Data.Skip(9).Take(2).ToArray());
|
||||
o.MAC = BitConverter.ToString(s1.Skip(9).Take(2).ToArray());
|
||||
//新来的数据
|
||||
var n = Newtonsoft.Json.JsonConvert.SerializeObject(o);
|
||||
|
||||
@@ -1416,7 +1343,6 @@ namespace RCUHost.Implement
|
||||
|
||||
#region 如果是0E 01就直接回复
|
||||
//36 就是新版本的0E
|
||||
//if (cmdType == 0x0E || cmdType == 0x01)
|
||||
if (cmdType == 0x0E || cmdType == 0x01 || cmdType == 0x36)
|
||||
{
|
||||
StepTongJi.SendInfo(1.1, "0E 01指令回复开始", context111.MessageID, context111.IsMonitor);
|
||||
|
||||
Reference in New Issue
Block a user