From 29714577290b299f975b02e5e75d457667edc193 Mon Sep 17 00:00:00 2001 From: zuohuaijun Date: Thu, 20 Feb 2025 02:30:35 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=98=8E=E5=A2=9E=E5=8A=A0=20MQTT=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configuration/Mqtt.json | 10 + .../Admin.NET.Core/Admin.NET.Core.csproj | 4 +- .../Admin.NET.Core/Mqtt/MqttHostedService.cs | 189 ++++++++++++++++++ .../Admin.NET.Core/Option/MqttOptions.cs | 28 +++ .../Admin.NET.Web.Core/ProjectOptions.cs | 1 + Admin.NET/Admin.NET.Web.Core/Startup.cs | 1 + .../Admin.NET.Plugin.ReZero.csproj | 2 +- 7 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 Admin.NET/Admin.NET.Application/Configuration/Mqtt.json create mode 100644 Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs create mode 100644 Admin.NET/Admin.NET.Core/Option/MqttOptions.cs diff --git a/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json new file mode 100644 index 00000000..427640ef --- /dev/null +++ b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json", + + // MQTT 配置 + "Mqtt": { + "Enabled": false, // 是否开启 + "Port": "5001", + "IPAddress": "" + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj index b8fadf22..e349147c 100644 --- a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj +++ b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj @@ -14,7 +14,7 @@ - + @@ -34,7 +34,7 @@ - + diff --git a/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs b/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs new file mode 100644 index 00000000..2fc64a1d --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs @@ -0,0 +1,189 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +using Microsoft.Extensions.Hosting; +using MQTTnet.Protocol; +using MQTTnet.Server; +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +/// +/// MQTT 服务 +/// +public class MqttHostedService(IOptions mqttOptions, ISqlSugarClient db) : IHostedService, IDisposable +{ + private const string ServerClientId = "Admin.NET.MQTT"; + public MqttServer MqttServer { get; set; } + private readonly MqttOptions _mqttOptions = mqttOptions.Value; + private readonly ISqlSugarClient _db = db; + + public async Task StartAsync(CancellationToken cancellationToken) + { + if (!_mqttOptions.Enabled) return; + + var options = new MqttServerOptionsBuilder() + .WithDefaultEndpoint() // 默认地址127.0.0.1 + .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 + //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 + .WithConnectionBacklog(1000) // 最大连接数 + .WithPersistentSessions() + .Build(); + + MqttServer = new MqttServerFactory().CreateMqttServer(options); + MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件 + MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件 + + MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件 + MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件 + MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件 + + MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件 + MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件 + MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息 + MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费 + + await MqttServer.StartAsync(); + } + + /// + /// 启动后事件 + /// + /// + /// + private Task MqttServer_StartedAsync(EventArgs arg) + { + Console.WriteLine($"【MQTT】服务已启动,端口:{_mqttOptions.Port}...... {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 关闭后事件 + /// + /// + /// + private Task MqttServer_StoppedAsync(EventArgs arg) + { + Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 客户端验证事件 + /// + /// + /// + private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + arg.ReasonCode = MqttConnectReasonCode.Success; + + // 验证账号 + var user = _db.Queryable().First(u => u.Account == arg.UserName); + if (user == null) + { + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); + throw Oops.Oh("MQTT客户端验证:账号不能为空或不存在!"); + } + + // 验证密码 + var password = arg.Password; + if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) + { + if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask; + } + else + { + if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask; + } + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); + throw Oops.Oh("MQTT客户端验证:密码错误!"); + } + + /// + /// 客户端连接事件 + /// + /// + /// + private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) + { + Console.WriteLine($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 客户端断开事件 + /// + /// + /// + /// + private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) + { + Console.WriteLine($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 订阅主题事件 + /// + /// + /// + private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) + { + Console.WriteLine($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 取消订阅事件 + /// + /// + /// + private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) + { + Console.WriteLine($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 拦截接收消息 + /// + /// + /// + private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) + { + if (string.Equals(arg.ClientId, ServerClientId)) + return Task.CompletedTask; + + Console.WriteLine($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 消息未被消费 + /// + /// + /// + private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) + { + Console.WriteLine($"接收消息:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public void Dispose() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs new file mode 100644 index 00000000..69a4fa2f --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs @@ -0,0 +1,28 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +/// +/// MQTT 配置选项 +/// +public sealed class MqttOptions : IConfigurableOptions +{ + /// + /// 是否启用 + /// + public bool Enabled { get; set; } + + /// + /// 端口 + /// + public int Port { get; set; } + + /// + /// IP地址 + /// + public string IPAddress { get; set; } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Web.Core/ProjectOptions.cs b/Admin.NET/Admin.NET.Web.Core/ProjectOptions.cs index 0b47ed08..1a2e5c2b 100644 --- a/Admin.NET/Admin.NET.Web.Core/ProjectOptions.cs +++ b/Admin.NET/Admin.NET.Web.Core/ProjectOptions.cs @@ -38,6 +38,7 @@ public static class ProjectOptions services.AddConfigurableOptions(); services.AddConfigurableOptions(); services.AddConfigurableOptions(); + services.AddConfigurableOptions(); services.Configure(App.Configuration.GetSection("IpRateLimiting")); services.Configure(App.Configuration.GetSection("IpRateLimitPolicies")); services.Configure(App.Configuration.GetSection("ClientRateLimiting")); diff --git a/Admin.NET/Admin.NET.Web.Core/Startup.cs b/Admin.NET/Admin.NET.Web.Core/Startup.cs index 7a844fcb..487ffc0f 100644 --- a/Admin.NET/Admin.NET.Web.Core/Startup.cs +++ b/Admin.NET/Admin.NET.Web.Core/Startup.cs @@ -250,6 +250,7 @@ public class Startup : AppStartup // 注册启动执行任务 services.AddHostedService(); + services.AddHostedService(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) diff --git a/Admin.NET/Plugins/Admin.NET.Plugin.ReZero/Admin.NET.Plugin.ReZero.csproj b/Admin.NET/Plugins/Admin.NET.Plugin.ReZero/Admin.NET.Plugin.ReZero.csproj index 6cf10507..8dd211d3 100644 --- a/Admin.NET/Plugins/Admin.NET.Plugin.ReZero/Admin.NET.Plugin.ReZero.csproj +++ b/Admin.NET/Plugins/Admin.NET.Plugin.ReZero/Admin.NET.Plugin.ReZero.csproj @@ -26,7 +26,7 @@ - +