295 lines
9.5 KiB
C#
295 lines
9.5 KiB
C#
|
|
|
|||
|
|
using Microsoft.VisualBasic;
|
|||
|
|
using MQTTnet;
|
|||
|
|
using MQTTnet.Protocol;
|
|||
|
|
using NLog;
|
|||
|
|
using Proto;
|
|||
|
|
|
|||
|
|
namespace IotManager.services
|
|||
|
|
{
|
|||
|
|
public class MqttClientHostedService : BackgroundService
|
|||
|
|
{
|
|||
|
|
public static IMqttClient? _mqttClient;
|
|||
|
|
public MqttClientOptions? _options;
|
|||
|
|
public static bool IsConnectioned { get; set; }
|
|||
|
|
|
|||
|
|
public IConfiguration? Configuration { get; set; }
|
|||
|
|
|
|||
|
|
private static Logger _logger = LogManager.GetCurrentClassLogger();
|
|||
|
|
|
|||
|
|
// 重连相关变量
|
|||
|
|
private readonly SemaphoreSlim _reconnectLock = new(1, 1);
|
|||
|
|
private bool _isReconnecting = false;
|
|||
|
|
private int _reconnectAttempts = 0;
|
|||
|
|
private const int MAX_RECONNECT_ATTEMPTS = 10;
|
|||
|
|
private const int RECONNECT_DELAY_SECONDS = 5;
|
|||
|
|
|
|||
|
|
public MqttClientHostedService(IConfiguration configuration)
|
|||
|
|
{
|
|||
|
|
Configuration = configuration;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|||
|
|
{
|
|||
|
|
await InitializeMqttClient();
|
|||
|
|
|
|||
|
|
// 主循环,持续监控连接状态
|
|||
|
|
while (!stoppingToken.IsCancellationRequested)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (!IsConnectioned && !_isReconnecting)
|
|||
|
|
{
|
|||
|
|
await ReconnectWithRetry();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 每30秒检查一次连接状态
|
|||
|
|
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
|
|||
|
|
}
|
|||
|
|
catch (OperationCanceledException)
|
|||
|
|
{
|
|||
|
|
// 服务停止
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"MQTT监控循环异常: {ex.Message}");
|
|||
|
|
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 清理资源
|
|||
|
|
await DisposeAsync();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task InitializeMqttClient()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var factory = new MQTTnet.MqttClientFactory();
|
|||
|
|
_mqttClient = factory.CreateMqttClient();
|
|||
|
|
|
|||
|
|
|
|||
|
|
var IP = Configuration["MQTT:IP"];
|
|||
|
|
int Port = 1883;
|
|||
|
|
int.TryParse(Configuration["MQTT:Port"], out Port);
|
|||
|
|
var UserName = Configuration["MQTT:UserName"];
|
|||
|
|
var PWD = Configuration["MQTT:PassWord"];
|
|||
|
|
string clientId = "IOTConsumer#" + Guid.NewGuid().ToString("N");
|
|||
|
|
|
|||
|
|
_options = new MqttClientOptionsBuilder()
|
|||
|
|
.WithTcpServer(IP, Port)
|
|||
|
|
.WithClientId(clientId)
|
|||
|
|
.WithCredentials(UserName, PWD)
|
|||
|
|
.WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
|
|||
|
|
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
|
|||
|
|
.Build();
|
|||
|
|
|
|||
|
|
// 注册事件
|
|||
|
|
_mqttClient.ConnectedAsync += OnConnectedAsync;
|
|||
|
|
_mqttClient.DisconnectedAsync += OnDisconnectedAsync;
|
|||
|
|
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
|
|||
|
|
|
|||
|
|
await ConnectAsync();
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"MQTT初始化错误: {ex.Message}");
|
|||
|
|
_logger.Error(ex.StackTrace);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task ConnectAsync()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_mqttClient == null || _options == null)
|
|||
|
|
return;
|
|||
|
|
|
|||
|
|
_logger.Info("正在连接MQTT服务器...");
|
|||
|
|
await _mqttClient.ConnectAsync(_options);
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"MQTT连接失败: {ex.Message}");
|
|||
|
|
IsConnectioned = false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task OnConnectedAsync(MqttClientConnectedEventArgs arg)
|
|||
|
|
{
|
|||
|
|
_logger.Info("MQTT连接成功");
|
|||
|
|
IsConnectioned = true;
|
|||
|
|
_reconnectAttempts = 0; // 重置重连计数
|
|||
|
|
|
|||
|
|
// 重新订阅主题(如果需要)
|
|||
|
|
await ResubscribeTopics();
|
|||
|
|
|
|||
|
|
await Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs e)
|
|||
|
|
{
|
|||
|
|
_logger.Warn($"MQTT连接断开,原因: {e.Reason}");
|
|||
|
|
IsConnectioned = false;
|
|||
|
|
|
|||
|
|
// 如果不是正常断开,启动重连
|
|||
|
|
if (e.Reason != MqttClientDisconnectReason.NormalDisconnection)
|
|||
|
|
{
|
|||
|
|
await ReconnectWithRetry();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
await Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task ReconnectWithRetry()
|
|||
|
|
{
|
|||
|
|
// 使用锁防止多个重连同时进行
|
|||
|
|
if (!await _reconnectLock.WaitAsync(0))
|
|||
|
|
return;
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_isReconnecting || IsConnectioned)
|
|||
|
|
return;
|
|||
|
|
|
|||
|
|
_isReconnecting = true;
|
|||
|
|
_reconnectAttempts++;
|
|||
|
|
|
|||
|
|
_logger.Info($"尝试重连MQTT服务器 (第{_reconnectAttempts}次)...");
|
|||
|
|
|
|||
|
|
// 指数退避策略:重连间隔逐渐增加
|
|||
|
|
int delaySeconds = Math.Min(RECONNECT_DELAY_SECONDS * _reconnectAttempts, 300); // 最大5分钟
|
|||
|
|
|
|||
|
|
await Task.Delay(TimeSpan.FromSeconds(delaySeconds));
|
|||
|
|
|
|||
|
|
await ConnectAsync();
|
|||
|
|
|
|||
|
|
// 如果超过最大重连次数,重置计数器
|
|||
|
|
if (_reconnectAttempts >= MAX_RECONNECT_ATTEMPTS)
|
|||
|
|
{
|
|||
|
|
_reconnectAttempts = 0;
|
|||
|
|
_logger.Warn("已达到最大重连次数,将重新开始重连计数");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"重连失败: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
_isReconnecting = false;
|
|||
|
|
_reconnectLock.Release();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task ResubscribeTopics()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 这里重新订阅您需要的主题
|
|||
|
|
// 例如:await SubscribeAsync("your/topic");
|
|||
|
|
_logger.Info("MQTT主题重新订阅完成");
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"重新订阅主题失败: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var topic = arg.ApplicationMessage.Topic;
|
|||
|
|
var payload = arg.ApplicationMessage.ConvertPayloadToString();
|
|||
|
|
|
|||
|
|
_logger.Debug($"收到MQTT消息 - 主题: {topic}, 负载: {payload}");
|
|||
|
|
|
|||
|
|
// 处理消息的业务逻辑
|
|||
|
|
await ProcessMessage(topic, payload);
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"处理MQTT消息异常: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async Task ProcessMessage(string topic, string payload)
|
|||
|
|
{
|
|||
|
|
// 实现您的消息处理逻辑
|
|||
|
|
await Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public async Task SubscribeAsync(string topic)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_mqttClient?.IsConnected == true)
|
|||
|
|
{
|
|||
|
|
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
|
|||
|
|
.WithTopicFilter(topic)
|
|||
|
|
.Build();
|
|||
|
|
|
|||
|
|
await _mqttClient.SubscribeAsync(subscribeOptions);
|
|||
|
|
_logger.Info($"成功订阅主题: {topic}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"订阅主题失败 {topic}: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public static async Task PublishAsync(string topic, string payload)
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_mqttClient?.IsConnected == true)
|
|||
|
|
{
|
|||
|
|
var message = new MqttApplicationMessageBuilder()
|
|||
|
|
.WithTopic(topic)
|
|||
|
|
.WithPayload(payload)
|
|||
|
|
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
|
|||
|
|
.Build();
|
|||
|
|
|
|||
|
|
await _mqttClient.PublishAsync(message);
|
|||
|
|
_logger.Debug($"MQTT消息发送成功 - 主题: {topic}");
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
_logger.Warn("MQTT客户端未连接,消息发送失败");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"MQTT消息发送失败: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public async ValueTask DisposeAsync()
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
if (_mqttClient != null)
|
|||
|
|
{
|
|||
|
|
_mqttClient.ConnectedAsync -= OnConnectedAsync;
|
|||
|
|
_mqttClient.DisconnectedAsync -= OnDisconnectedAsync;
|
|||
|
|
_mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceivedAsync;
|
|||
|
|
|
|||
|
|
if (_mqttClient.IsConnected)
|
|||
|
|
{
|
|||
|
|
await _mqttClient.DisconnectAsync();
|
|||
|
|
}
|
|||
|
|
_mqttClient.Dispose();
|
|||
|
|
}
|
|||
|
|
_logger.Info("MQTT客户端已释放");
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
_logger.Error($"释放MQTT客户端时出错: {ex.Message}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|