using MQTTnet.Packets; using MQTTnet; using Proto; using System.Runtime.CompilerServices; using MQTTnet.Protocol; using System.Collections; using System.Threading; using System.Reflection.PortableExecutable; namespace MyTTT { internal class Program { static void Main(string[] args) { CountdownEvent c = new CountdownEvent(2); for (int i = 1; i < 3; i++) { Task.Factory.StartNew((dd)=> { Console.WriteLine(dd.ToString()); Task.Delay(5000).Wait(); c.Signal(); },i); } c.Wait(); Console.WriteLine( "333333333333333333"); Console.ReadKey(); return; byte[] vvv = new byte[] { 0xb0,0x00 }; BitArray bitArray = new BitArray(vvv); //卡身份 var qqq1 = bitArray.Cast().Skip(1).Take(3).ToArray(); //这个是触发来源 var qqq2 = bitArray.Cast().Skip(4).Take(4).ToArray(); var qqq = new ActorSystem(); Props pp = Props.FromProducer(() => { return new ParentsActor(); }); pp.WithChildSupervisorStrategy(new OneForOneStrategy((pid, reason) => { return reason switch { Exception => SupervisorDirective.Restart }; }, 10, new TimeSpan(0, 1, 0))); PID pid = qqq.Root.Spawn(pp); Console.WriteLine("333333333333333"); Console.ReadLine(); PID ppp = qqq.ProcessRegistry.Find("ABCDEF").FirstOrDefault(); qqq.Root.Send(ppp, "connect"); Console.ReadLine(); qqq.Root.Send(ppp, "subscribe"); Console.WriteLine("Hello, World!"); Console.ReadKey(); } } public class ParentsActor : IActor { public Task ReceiveAsync(IContext context) { PID child; if (context.Children is null || context.Children.Count == 0) { var props = Props.FromProducer(() => new ChildrenActor()); child = context.SpawnNamed(props, "ABCDEF"); } else { child = context.Children.First(); } switch (context.Message) { case Msg: case AAA: context.Forward(child); break; default: break; } return Task.CompletedTask; } } public class ChildrenActor : IActor { private IMqttClient mqttClient; private readonly MqttClientOptions? _mqttClientOptions; private bool _isConnected; public ChildrenActor() { var mqttFactory = new MqttClientFactory(); string ip = "120.24.73.62"; int? port = 1883; string username = "blw"; string pwd = "blw@1234"; mqttClient = mqttFactory.CreateMqttClient(); _mqttClientOptions = new MqttClientOptionsBuilder() .WithClientId(System.Guid.NewGuid().ToString("N")) .WithTcpServer(ip, port) .WithCredentials(username, pwd) .Build(); mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; } private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { throw new NotImplementedException(); } private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { throw new Exception("638493y2h"); } async public Task ReceiveAsync(IContext context) { switch (context.Message) { case "connect": await ConnectAsync(); break; case "publish": await PublishAsync("",""); break; case "subscribe": await SubscribeAsync(""); break; default: context.Respond("Unknown message"); break; } } private async Task ConnectAsync() { try { mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; await mqttClient.ConnectAsync(_mqttClientOptions, CancellationToken.None); _isConnected = true; } catch (Exception ex) { // Log the exception and trigger a restart Console.WriteLine($"Failed to connect to MQTT: {ex.Message}"); throw; // This will cause the actor to restart according to the configured restart strategy } } private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { return Task.CompletedTask; } private async Task PublishAsync(string topic, string message) { if (!_isConnected) { await ConnectAsync(); } var mqttMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); await mqttClient.PublishAsync(mqttMessage, CancellationToken.None); } private async Task SubscribeAsync(string topic) { if (!_isConnected) { await ConnectAsync(); } var subscribeOptions = new MqttClientSubscribeOptions(); var topicFilter = new MqttTopicFilter { Topic = "blw/logmonitor/delete/report" }; subscribeOptions.TopicFilters.Add(topicFilter); await mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None); } } public record Msg { public int Age; public string mmm { get; set; } } public record AAA { public string mmm { get; set; } } }