// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 // // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 // // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! namespace Admin.NET.Core; using Microsoft.Extensions.Hosting; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Text; using System.Threading; using System.Threading.Tasks; /// /// MQTT 服务 /// public class MqttHostedService(IOptions mqttOptions, ISqlSugarClient db) : IHostedService, IDisposable { private const string ServerClientId = "Admin.NET.MQTT"; public MqttServer MqttServer { get; set; } private readonly MqttOptions _mqttOptions = mqttOptions.Value; private readonly ISqlSugarClient _db = db; public async Task StartAsync(CancellationToken cancellationToken) { if (!_mqttOptions.Enabled) return; var options = new MqttServerOptionsBuilder() .WithDefaultEndpoint() // 默认地址127.0.0.1 .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 .WithConnectionBacklog(1000) // 最大连接数 .WithPersistentSessions() .Build(); MqttServer = new MqttServerFactory().CreateMqttServer(options); MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件 MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件 MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件 MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件 MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件 MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件 MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件 MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息 MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费 await MqttServer.StartAsync(); } /// /// 启动后事件 /// /// /// private Task MqttServer_StartedAsync(EventArgs arg) { Console.WriteLine($"【MQTT】服务已启动,端口:{_mqttOptions.Port}...... {DateTime.Now}"); return Task.CompletedTask; } /// /// 关闭后事件 /// /// /// private Task MqttServer_StoppedAsync(EventArgs arg) { Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); return Task.CompletedTask; } /// /// 客户端验证事件 /// /// /// private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) { arg.ReasonCode = MqttConnectReasonCode.Success; // 验证账号 var user = _db.Queryable().First(u => u.Account == arg.UserName); if (user == null) { arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); throw Oops.Oh("MQTT客户端验证:账号不能为空或不存在!"); } // 验证密码 var password = arg.Password; if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) { if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask; } else { if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask; } arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); throw Oops.Oh("MQTT客户端验证:密码错误!"); } /// /// 客户端连接事件 /// /// /// private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) { Console.WriteLine($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); return Task.CompletedTask; } /// /// 客户端断开事件 /// /// /// /// private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) { Console.WriteLine($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); return Task.CompletedTask; } /// /// 订阅主题事件 /// /// /// private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) { Console.WriteLine($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); return Task.CompletedTask; } /// /// 取消订阅事件 /// /// /// private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) { Console.WriteLine($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); return Task.CompletedTask; } /// /// 拦截接收消息 /// /// /// private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { if (string.Equals(arg.ClientId, ServerClientId)) return Task.CompletedTask; Console.WriteLine($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); return Task.CompletedTask; } /// /// 消息未被消费 /// /// /// private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) { Console.WriteLine($"接收消息:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public void Dispose() { throw new NotImplementedException(); } }