using Microsoft.VisualBasic; using MQTTnet; using MQTTnet.Protocol; using NLog; using Proto; namespace IotManager.services { public class MqttClientHostedService : BackgroundService { public static IMqttClient? _mqttClient; public MqttClientOptions? _options; public static bool IsConnectioned { get; set; } public IConfiguration? Configuration { get; set; } private static Logger _logger = LogManager.GetCurrentClassLogger(); // 重连相关变量 private readonly SemaphoreSlim _reconnectLock = new(1, 1); private bool _isReconnecting = false; private int _reconnectAttempts = 0; private const int MAX_RECONNECT_ATTEMPTS = 10; private const int RECONNECT_DELAY_SECONDS = 5; public MqttClientHostedService(IConfiguration configuration) { Configuration = configuration; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await InitializeMqttClient(); // 主循环,持续监控连接状态 while (!stoppingToken.IsCancellationRequested) { try { if (!IsConnectioned && !_isReconnecting) { await ReconnectWithRetry(); } // 每30秒检查一次连接状态 await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); } catch (OperationCanceledException) { // 服务停止 break; } catch (Exception ex) { _logger.Error($"MQTT监控循环异常: {ex.Message}"); await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); } } // 清理资源 await DisposeAsync(); } private async Task InitializeMqttClient() { try { var factory = new MQTTnet.MqttClientFactory(); _mqttClient = factory.CreateMqttClient(); var IP = Configuration["MQTT:IP"]; int Port = 1883; int.TryParse(Configuration["MQTT:Port"], out Port); var UserName = Configuration["MQTT:UserName"]; var PWD = Configuration["MQTT:PassWord"]; string clientId = "IOTConsumer#" + Guid.NewGuid().ToString("N"); _options = new MqttClientOptionsBuilder() .WithTcpServer(IP, Port) .WithClientId(clientId) .WithCredentials(UserName, PWD) .WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500) .Build(); // 注册事件 _mqttClient.ConnectedAsync += OnConnectedAsync; _mqttClient.DisconnectedAsync += OnDisconnectedAsync; _mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync; await ConnectAsync(); } catch (Exception ex) { _logger.Error($"MQTT初始化错误: {ex.Message}"); _logger.Error(ex.StackTrace); } } private async Task ConnectAsync() { try { if (_mqttClient == null || _options == null) return; _logger.Info("正在连接MQTT服务器..."); await _mqttClient.ConnectAsync(_options); } catch (Exception ex) { _logger.Error($"MQTT连接失败: {ex.Message}"); IsConnectioned = false; } } private async Task OnConnectedAsync(MqttClientConnectedEventArgs arg) { _logger.Info("MQTT连接成功"); IsConnectioned = true; _reconnectAttempts = 0; // 重置重连计数 // 重新订阅主题(如果需要) await ResubscribeTopics(); await Task.CompletedTask; } private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e) { _logger.Warn($"MQTT连接断开,原因: {e.Reason}"); IsConnectioned = false; // 如果不是正常断开,启动重连 if (e.Reason != MqttClientDisconnectReason.NormalDisconnection) { await ReconnectWithRetry(); } await Task.CompletedTask; } private async Task ReconnectWithRetry() { // 使用锁防止多个重连同时进行 if (!await _reconnectLock.WaitAsync(0)) return; try { if (_isReconnecting || IsConnectioned) return; _isReconnecting = true; _reconnectAttempts++; _logger.Info($"尝试重连MQTT服务器 (第{_reconnectAttempts}次)..."); // 指数退避策略:重连间隔逐渐增加 int delaySeconds = Math.Min(RECONNECT_DELAY_SECONDS * _reconnectAttempts, 300); // 最大5分钟 await Task.Delay(TimeSpan.FromSeconds(delaySeconds)); await ConnectAsync(); // 如果超过最大重连次数,重置计数器 if (_reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { _reconnectAttempts = 0; _logger.Warn("已达到最大重连次数,将重新开始重连计数"); } } catch (Exception ex) { _logger.Error($"重连失败: {ex.Message}"); } finally { _isReconnecting = false; _reconnectLock.Release(); } } private async Task ResubscribeTopics() { try { // 这里重新订阅您需要的主题 // 例如:await SubscribeAsync("your/topic"); _logger.Info("MQTT主题重新订阅完成"); } catch (Exception ex) { _logger.Error($"重新订阅主题失败: {ex.Message}"); } } private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { try { var topic = arg.ApplicationMessage.Topic; var payload = arg.ApplicationMessage.ConvertPayloadToString(); _logger.Debug($"收到MQTT消息 - 主题: {topic}, 负载: {payload}"); // 处理消息的业务逻辑 await ProcessMessage(topic, payload); } catch (Exception ex) { _logger.Error($"处理MQTT消息异常: {ex.Message}"); } } private async Task ProcessMessage(string topic, string payload) { // 实现您的消息处理逻辑 await Task.CompletedTask; } public async Task SubscribeAsync(string topic) { try { if (_mqttClient?.IsConnected == true) { var subscribeOptions = new MqttClientSubscribeOptionsBuilder() .WithTopicFilter(topic) .Build(); await _mqttClient.SubscribeAsync(subscribeOptions); _logger.Info($"成功订阅主题: {topic}"); } } catch (Exception ex) { _logger.Error($"订阅主题失败 {topic}: {ex.Message}"); } } public static async Task PublishAsync(string topic, string payload) { try { if (_mqttClient?.IsConnected == true) { var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) .Build(); await _mqttClient.PublishAsync(message); _logger.Debug($"MQTT消息发送成功 - 主题: {topic}"); } else { _logger.Warn("MQTT客户端未连接,消息发送失败"); } } catch (Exception ex) { _logger.Error($"MQTT消息发送失败: {ex.Message}"); } } public async ValueTask DisposeAsync() { try { if (_mqttClient != null) { _mqttClient.ConnectedAsync -= OnConnectedAsync; _mqttClient.DisconnectedAsync -= OnDisconnectedAsync; _mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceivedAsync; if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); } _mqttClient.Dispose(); } _logger.Info("MQTT客户端已释放"); } catch (Exception ex) { _logger.Error($"释放MQTT客户端时出错: {ex.Message}"); } } } }