Files
Web_BLVLOG_Server_Mvc_Prod/MyTTT/Program.cs
2025-11-20 16:20:37 +08:00

233 lines
6.4 KiB
C#

using MQTTnet.Packets;
using MQTTnet;
using Proto;
using System.Runtime.CompilerServices;
using MQTTnet.Protocol;
using System.Collections;
using System.Threading;
using System.Reflection.PortableExecutable;
namespace MyTTT
{
internal class Program
{
static void Main(string[] args)
{
CountdownEvent c = new CountdownEvent(2);
for (int i = 1; i < 3; i++)
{
Task.Factory.StartNew((dd)=>
{
Console.WriteLine(dd.ToString());
Task.Delay(5000).Wait();
c.Signal();
},i);
}
c.Wait();
Console.WriteLine( "333333333333333333");
Console.ReadKey();
return;
byte[] vvv = new byte[] { 0xb0,0x00 };
BitArray bitArray = new BitArray(vvv);
//卡身份
var qqq1 = bitArray.Cast<bool>().Skip(1).Take(3).ToArray();
//这个是触发来源
var qqq2 = bitArray.Cast<bool>().Skip(4).Take(4).ToArray();
var qqq = new ActorSystem();
Props pp = Props.FromProducer(() =>
{
return new ParentsActor();
});
pp.WithChildSupervisorStrategy(new OneForOneStrategy((pid, reason) =>
{
return reason switch
{
Exception => SupervisorDirective.Restart
};
}, 10, new TimeSpan(0, 1, 0)));
PID pid = qqq.Root.Spawn(pp);
Console.WriteLine("333333333333333");
Console.ReadLine();
PID ppp = qqq.ProcessRegistry.Find("ABCDEF").FirstOrDefault();
qqq.Root.Send(ppp, "connect");
Console.ReadLine();
qqq.Root.Send(ppp, "subscribe");
Console.WriteLine("Hello, World!");
Console.ReadKey();
}
}
public class ParentsActor : IActor
{
public Task ReceiveAsync(IContext context)
{
PID child;
if (context.Children is null || context.Children.Count == 0)
{
var props = Props.FromProducer(() => new ChildrenActor());
child = context.SpawnNamed(props, "ABCDEF");
}
else
{
child = context.Children.First();
}
switch (context.Message)
{
case Msg:
case AAA:
context.Forward(child);
break;
default:
break;
}
return Task.CompletedTask;
}
}
public class ChildrenActor : IActor
{
private IMqttClient mqttClient;
private readonly MqttClientOptions? _mqttClientOptions;
private bool _isConnected;
public ChildrenActor()
{
var mqttFactory = new MqttClientFactory();
string ip = "120.24.73.62";
int? port = 1883;
string username = "blw";
string pwd = "blw@1234";
mqttClient = mqttFactory.CreateMqttClient();
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(System.Guid.NewGuid().ToString("N"))
.WithTcpServer(ip, port)
.WithCredentials(username, pwd)
.Build();
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
}
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
throw new NotImplementedException();
}
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
throw new Exception("638493y2h");
}
async public Task ReceiveAsync(IContext context)
{
switch (context.Message)
{
case "connect":
await ConnectAsync();
break;
case "publish":
await PublishAsync("","");
break;
case "subscribe":
await SubscribeAsync("");
break;
default:
context.Respond("Unknown message");
break;
}
}
private async Task ConnectAsync()
{
try
{
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
await mqttClient.ConnectAsync(_mqttClientOptions, CancellationToken.None);
_isConnected = true;
}
catch (Exception ex)
{
// Log the exception and trigger a restart
Console.WriteLine($"Failed to connect to MQTT: {ex.Message}");
throw; // This will cause the actor to restart according to the configured restart strategy
}
}
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
return Task.CompletedTask;
}
private async Task PublishAsync(string topic, string message)
{
if (!_isConnected)
{
await ConnectAsync();
}
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await mqttClient.PublishAsync(mqttMessage, CancellationToken.None);
}
private async Task SubscribeAsync(string topic)
{
if (!_isConnected)
{
await ConnectAsync();
}
var subscribeOptions = new MqttClientSubscribeOptions();
var topicFilter = new MqttTopicFilter
{
Topic = "blw/logmonitor/delete/report"
};
subscribeOptions.TopicFilters.Add(topicFilter);
await mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
}
}
public record Msg
{
public int Age;
public string mmm { get; set; }
}
public record AAA
{
public string mmm { get; set; }
}
}