Files
Web_IoTBase_Sever_Prod/BooliveMQTT_Auth/services/MqttClientHostedService.cs
2025-12-11 14:04:39 +08:00

295 lines
9.5 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.VisualBasic;
using MQTTnet;
using MQTTnet.Protocol;
using NLog;
using Proto;
namespace IotManager.services
{
public class MqttClientHostedService : BackgroundService
{
public static IMqttClient? _mqttClient;
public MqttClientOptions? _options;
public static bool IsConnectioned { get; set; }
public IConfiguration? Configuration { get; set; }
private static Logger _logger = LogManager.GetCurrentClassLogger();
// 重连相关变量
private readonly SemaphoreSlim _reconnectLock = new(1, 1);
private bool _isReconnecting = false;
private int _reconnectAttempts = 0;
private const int MAX_RECONNECT_ATTEMPTS = 10;
private const int RECONNECT_DELAY_SECONDS = 5;
public MqttClientHostedService(IConfiguration configuration)
{
Configuration = configuration;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await InitializeMqttClient();
// 主循环,持续监控连接状态
while (!stoppingToken.IsCancellationRequested)
{
try
{
if (!IsConnectioned && !_isReconnecting)
{
await ReconnectWithRetry();
}
// 每30秒检查一次连接状态
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
catch (OperationCanceledException)
{
// 服务停止
break;
}
catch (Exception ex)
{
_logger.Error($"MQTT监控循环异常: {ex.Message}");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
// 清理资源
await DisposeAsync();
}
private async Task InitializeMqttClient()
{
try
{
var factory = new MQTTnet.MqttClientFactory();
_mqttClient = factory.CreateMqttClient();
var IP = Configuration["MQTT:IP"];
int Port = 1883;
int.TryParse(Configuration["MQTT:Port"], out Port);
var UserName = Configuration["MQTT:UserName"];
var PWD = Configuration["MQTT:PassWord"];
string clientId = "IOTConsumer#" + Guid.NewGuid().ToString("N");
_options = new MqttClientOptionsBuilder()
.WithTcpServer(IP, Port)
.WithClientId(clientId)
.WithCredentials(UserName, PWD)
.WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.Build();
// 注册事件
_mqttClient.ConnectedAsync += OnConnectedAsync;
_mqttClient.DisconnectedAsync += OnDisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
await ConnectAsync();
}
catch (Exception ex)
{
_logger.Error($"MQTT初始化错误: {ex.Message}");
_logger.Error(ex.StackTrace);
}
}
private async Task ConnectAsync()
{
try
{
if (_mqttClient == null || _options == null)
return;
_logger.Info("正在连接MQTT服务器...");
await _mqttClient.ConnectAsync(_options);
}
catch (Exception ex)
{
_logger.Error($"MQTT连接失败: {ex.Message}");
IsConnectioned = false;
}
}
private async Task OnConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.Info("MQTT连接成功");
IsConnectioned = true;
_reconnectAttempts = 0; // 重置重连计数
// 重新订阅主题(如果需要)
await ResubscribeTopics();
await Task.CompletedTask;
}
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
{
_logger.Warn($"MQTT连接断开原因: {e.Reason}");
IsConnectioned = false;
// 如果不是正常断开,启动重连
if (e.Reason != MqttClientDisconnectReason.NormalDisconnection)
{
await ReconnectWithRetry();
}
await Task.CompletedTask;
}
private async Task ReconnectWithRetry()
{
// 使用锁防止多个重连同时进行
if (!await _reconnectLock.WaitAsync(0))
return;
try
{
if (_isReconnecting || IsConnectioned)
return;
_isReconnecting = true;
_reconnectAttempts++;
_logger.Info($"尝试重连MQTT服务器 (第{_reconnectAttempts}次)...");
// 指数退避策略:重连间隔逐渐增加
int delaySeconds = Math.Min(RECONNECT_DELAY_SECONDS * _reconnectAttempts, 300); // 最大5分钟
await Task.Delay(TimeSpan.FromSeconds(delaySeconds));
await ConnectAsync();
// 如果超过最大重连次数,重置计数器
if (_reconnectAttempts >= MAX_RECONNECT_ATTEMPTS)
{
_reconnectAttempts = 0;
_logger.Warn("已达到最大重连次数,将重新开始重连计数");
}
}
catch (Exception ex)
{
_logger.Error($"重连失败: {ex.Message}");
}
finally
{
_isReconnecting = false;
_reconnectLock.Release();
}
}
private async Task ResubscribeTopics()
{
try
{
// 这里重新订阅您需要的主题
// 例如await SubscribeAsync("your/topic");
_logger.Info("MQTT主题重新订阅完成");
}
catch (Exception ex)
{
_logger.Error($"重新订阅主题失败: {ex.Message}");
}
}
private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
try
{
var topic = arg.ApplicationMessage.Topic;
var payload = arg.ApplicationMessage.ConvertPayloadToString();
_logger.Debug($"收到MQTT消息 - 主题: {topic}, 负载: {payload}");
// 处理消息的业务逻辑
await ProcessMessage(topic, payload);
}
catch (Exception ex)
{
_logger.Error($"处理MQTT消息异常: {ex.Message}");
}
}
private async Task ProcessMessage(string topic, string payload)
{
// 实现您的消息处理逻辑
await Task.CompletedTask;
}
public async Task SubscribeAsync(string topic)
{
try
{
if (_mqttClient?.IsConnected == true)
{
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(topic)
.Build();
await _mqttClient.SubscribeAsync(subscribeOptions);
_logger.Info($"成功订阅主题: {topic}");
}
}
catch (Exception ex)
{
_logger.Error($"订阅主题失败 {topic}: {ex.Message}");
}
}
public static async Task PublishAsync(string topic, string payload)
{
try
{
if (_mqttClient?.IsConnected == true)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
await _mqttClient.PublishAsync(message);
_logger.Debug($"MQTT消息发送成功 - 主题: {topic}");
}
else
{
_logger.Warn("MQTT客户端未连接消息发送失败");
}
}
catch (Exception ex)
{
_logger.Error($"MQTT消息发送失败: {ex.Message}");
}
}
public async ValueTask DisposeAsync()
{
try
{
if (_mqttClient != null)
{
_mqttClient.ConnectedAsync -= OnConnectedAsync;
_mqttClient.DisconnectedAsync -= OnDisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceivedAsync;
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
}
_mqttClient.Dispose();
}
_logger.Info("MQTT客户端已释放");
}
catch (Exception ex)
{
_logger.Error($"释放MQTT客户端时出错: {ex.Message}");
}
}
}
}