UNIVPLMDataIntegration/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs

189 lines
7.8 KiB
C#
Raw Normal View History

2025-02-20 02:30:35 +08:00
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
namespace Admin.NET.Core;
using Microsoft.Extensions.Hosting;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// MQTT 服务
/// </summary>
public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClient db) : IHostedService, IDisposable
{
private const string ServerClientId = "Admin.NET.MQTT";
public MqttServer MqttServer { get; set; }
private readonly MqttOptions _mqttOptions = mqttOptions.Value;
private readonly ISqlSugarClient _db = db;
public async Task StartAsync(CancellationToken cancellationToken)
{
if (!_mqttOptions.Enabled) return;
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint() // 默认地址127.0.0.1
.WithDefaultEndpointPort(_mqttOptions.Port) // 端口号
//.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址
.WithConnectionBacklog(1000) // 最大连接数
.WithPersistentSessions()
.Build();
MqttServer = new MqttServerFactory().CreateMqttServer(options);
MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件
MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件
MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件
MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件
MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件
MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件
MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件
MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息
MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费
await MqttServer.StartAsync();
}
/// <summary>
/// 启动后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"【MQTT】服务已启动端口{_mqttOptions.Port}...... {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 关闭后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 客户端验证事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode = MqttConnectReasonCode.Success;
// 验证账号
var user = _db.Queryable<SysUser>().First(u => u.Account == arg.UserName);
if (user == null)
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine($"客户端验证客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} ");
throw Oops.Oh("MQTT客户端验证账号不能为空或不存在");
}
// 验证密码
var password = arg.Password;
if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString())
{
if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask;
}
else
{
if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask;
}
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine($"客户端验证客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} ");
throw Oops.Oh("MQTT客户端验证密码错误");
}
/// <summary>
/// 客户端连接事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"客户端连接客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 客户端断开事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客户端断开客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 订阅主题事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"订阅主题客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 取消订阅事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"取消订阅客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 拦截接收消息
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
return Task.CompletedTask;
Console.WriteLine($"拦截消息客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 消息未被消费
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"接收消息发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public void Dispose()
{
throw new NotImplementedException();
}
}