diff --git a/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json index 427640ef..c9afa21a 100644 --- a/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json +++ b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json @@ -1,10 +1,14 @@ -{ +{ "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json", // MQTT 配置 "Mqtt": { - "Enabled": false, // 是否开启 - "Port": "5001", + "MqttServerId": "MqttServer", // 服务器主动发布时用的ClientId + "Enabled": true, // 是否开启 + "Logging": true, // 记录文件日志 + "ConsoleOutput": true, // 输出控制台日志 + "ConnectionBacklog": 1000, + "Port": "1883", "IPAddress": "" } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs b/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs index cbea4875..b132df76 100644 --- a/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs +++ b/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs @@ -4,9 +4,11 @@ // // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! -namespace Admin.NET.Core; - +namespace Admin.NET.Core; + +using Furion.Logging.Extensions; using Microsoft.Extensions.Hosting; +using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using System; @@ -22,17 +24,46 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien private const string ServerClientId = "Admin.NET.MQTT"; public static MqttServer MqttServer { get; set; } private readonly MqttOptions _mqttOptions = mqttOptions.Value; - private readonly ISqlSugarClient _db = db; - + private readonly ISqlSugarClient _db = db; + private bool _isLogging = false; + private bool _consoleOutput = false; + + static List mqttEventHandlers = new List(); + + public static void RegistMqttEventHandler(MqttEventHandler eh, int order) + { + eh.Order = order; + mqttEventHandlers.Add(eh); + mqttEventHandlers.Sort((a,b) => b.Order - a.Order); + } + + public async void PublicMessage(string topic, string message) + { + // 创建一个 MQTT 应用消息 + var applicationMessage = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(message) + .Build(); + + // 记录日志 + Log($"服务器发布主题: {topic}, 内容:{message}"); + await MqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(applicationMessage) + { + SenderClientId = _mqttOptions.MqttServerId + }); + } + public async Task StartAsync(CancellationToken cancellationToken) { if (!_mqttOptions.Enabled) return; + _isLogging = _mqttOptions.Logging; + _consoleOutput = _mqttOptions.ConsoleOutput; var options = new MqttServerOptionsBuilder() .WithDefaultEndpoint() // 默认地址127.0.0.1 .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 - .WithConnectionBacklog(1000) // 最大连接数 + .WithConnectionBacklog(_mqttOptions.ConnectionBacklog) // 最大连接数 .WithPersistentSessions() .Build(); @@ -68,10 +99,13 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_StoppedAsync(EventArgs arg) + private async Task MqttServer_StoppedAsync(EventArgs arg) { - Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); - return Task.CompletedTask; + Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.StoppedAsync(arg); + } } /// @@ -79,32 +113,14 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + private async Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) { - arg.ReasonCode = MqttConnectReasonCode.Success; - - // 验证账号 - var user = _db.Queryable().First(u => u.Account == arg.UserName); - if (user == null) - { - arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; - Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); - throw Oops.Oh("MQTT客户端验证:账号不能为空或不存在!"); + foreach (var eh in mqttEventHandlers) + { + await eh.ValidatingConnectionAsync(arg); + if (arg.ReasonCode != MqttConnectReasonCode.Success) + break; } - - // 验证密码 - var password = arg.Password; - if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) - { - if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask; - } - else - { - if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask; - } - arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; - Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); - throw Oops.Oh("MQTT客户端验证:密码错误!"); } /// @@ -112,10 +128,13 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) + private async Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) { - Console.WriteLine($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); - return Task.CompletedTask; + Log($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.ClientConnectedAsync(arg); + } } /// @@ -124,10 +143,13 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) + private async Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) { - Console.WriteLine($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); - return Task.CompletedTask; + Log($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.ClientDisconnectedAsync(arg); + } } /// @@ -135,10 +157,13 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) + private async Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) { - Console.WriteLine($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); - return Task.CompletedTask; + Log($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.ClientSubscribedTopicAsync(arg); + } } /// @@ -146,10 +171,13 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) + private async Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) { - Console.WriteLine($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); - return Task.CompletedTask; + Log($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.ClientUnsubscribedTopicAsync(arg); + } } /// @@ -157,13 +185,16 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) + private async Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { - if (string.Equals(arg.ClientId, ServerClientId)) - return Task.CompletedTask; + if (string.Equals(arg.ClientId, _mqttOptions.MqttServerId)) + return; - Console.WriteLine($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); - return Task.CompletedTask; + Log($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); + foreach (var eh in mqttEventHandlers) + { + await eh.InterceptingPublishAsync(arg); + } } /// @@ -171,14 +202,25 @@ public class MqttHostedService(IOptions mqttOptions, ISqlSugarClien /// /// /// - private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) + private async Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) { Console.WriteLine($"接收消息:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); - return Task.CompletedTask; + foreach (var eh in mqttEventHandlers) + { + await eh.ApplicationMessageNotConsumedAsync(arg); + } } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; + } + + protected void Log(string msg) + { + if (_consoleOutput) + Console.WriteLine(msg); + if (_isLogging) + msg.LogDebug(); } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandler.cs b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandler.cs new file mode 100644 index 00000000..ea3146f3 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandler.cs @@ -0,0 +1,90 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using MQTTnet.Server; +using MQTTnet; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading.Tasks; +using MQTTnet.Protocol; +using Furion.Logging.Extensions; +using Furion.HttpRemote; + +namespace Admin.NET.Core; + +/// +/// Mqtt事件拦截器 +/// +/// +/// 为了让底层的 Mqtt 代码与业务层代码分离,应该不同的业务继函 MqttEventHandler 类,实现自己的代码后 +/// 在 Startup.cs 的 Configure 函数中,调用 MqttHostedService 对象的 RegistMqttEventHandler 函数,来注册自己的事件处理器 +/// “业务相关的代码千万不要写在 MqttHostedService 中” +/// + +public class MqttEventHandler +{ + /// + /// 排序,数据越大越先执行 + /// + public int Order = 0; + + protected void Log(string msg) + { + var mqttOptions = App.GetOptions(); + if (mqttOptions.ConsoleOutput) + Console.WriteLine(msg); + if (mqttOptions.Logging) + msg.LogDebug(); + } + + public virtual async Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + } + + public virtual async Task StartedAsync(EventArgs arg) + { + } + + public virtual async Task StoppedAsync(EventArgs arg) + { + } + + /// + /// 客户端发布的数据 + /// + /// + /// + public virtual async Task InterceptingPublishAsync(InterceptingPublishEventArgs arg) + { + } + + /// + /// 没有客户端接收的数据 + /// + /// + /// + public virtual async Task ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) + { + } + + public virtual async Task ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) + { + } + public virtual async Task ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) + { + } + + public virtual async Task ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) + { + } + + public virtual async Task ClientConnectedAsync(ClientConnectedEventArgs arg) + { + } +} diff --git a/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandlerForAdminNet.cs b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandlerForAdminNet.cs new file mode 100644 index 00000000..b11e3062 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventHandlerForAdminNet.cs @@ -0,0 +1,43 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using MQTTnet.Protocol; +using MQTTnet.Server; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Admin.NET.Core; +public class MqttEventHandlerForAdminNet : MqttEventHandler +{ + public override async Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + ISqlSugarClient _db = App.GetRequiredService(); + // 验证账号 + var user = _db.Queryable().First(u => u.Account == arg.UserName); + if (user == null) + { + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Log($"客户端验证:客户端ID=【{arg.ClientId}】用户名不存在 {DateTime.Now} "); + return; + } + + // 验证密码 + var password = arg.Password; + if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) + { + if (user.Password.Equals(MD5Encryption.Encrypt(password))) return; + } + else + { + if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return; + } + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Log($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); + } +} diff --git a/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs index 69a4fa2f..60078e6a 100644 --- a/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs +++ b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs @@ -1,28 +1,48 @@ -// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 -// -// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 -// -// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! - -namespace Admin.NET.Core; - -/// -/// MQTT 配置选项 -/// -public sealed class MqttOptions : IConfigurableOptions -{ - /// - /// 是否启用 - /// - public bool Enabled { get; set; } - - /// - /// 端口 - /// - public int Port { get; set; } - - /// - /// IP地址 - /// - public string IPAddress { get; set; } +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +/// +/// MQTT 配置选项 +/// +public sealed class MqttOptions : IConfigurableOptions +{ + /// + /// 服务器主动发布时用的ClientId + /// + public string MqttServerId { get; set; } + + /// + /// 是否启用 + /// + public bool Enabled { get; set; } + + /// + /// 输出文件日志 + /// + public bool Logging { get; set; } + + /// + /// 控制台输出 + /// + public bool ConsoleOutput { get; set; } + + /// + /// ConnectionBacklog + /// + public int ConnectionBacklog { get; set; } + + /// + /// 端口 + /// + public int Port { get; set; } + + /// + /// IP地址 + /// + public string IPAddress { get; set; } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Service/Mqtt/MqttService.cs b/Admin.NET/Admin.NET.Core/Service/Mqtt/MqttService.cs new file mode 100644 index 00000000..d9582097 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Service/Mqtt/MqttService.cs @@ -0,0 +1,34 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using Admin.NET.Core.Service.Mqtt.Dto; + +namespace Admin.NET.Core.Service; + +/// +/// 系统职位服务 🧩 +/// +[ApiDescriptionSettings(Order = 460, Description = "MQTT 服务")] +public class MqttService : IDynamicApiController, ITransient +{ + MqttHostedService _mqttHostedService; + public MqttService(MqttHostedService mqttProvider) + { + _mqttHostedService = mqttProvider; + } + + /// + /// 发布消息 🔖 + /// + /// + /// + [AllowAnonymous] + [DisplayName("发布消息")] + public async Task PublicMessage(PublicMessageInput input) + { + _mqttHostedService.PublicMessage(input.Topic, input.Message); + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Web.Core/Startup.cs b/Admin.NET/Admin.NET.Web.Core/Startup.cs index 487ffc0f..f97a092c 100644 --- a/Admin.NET/Admin.NET.Web.Core/Startup.cs +++ b/Admin.NET/Admin.NET.Web.Core/Startup.cs @@ -22,6 +22,8 @@ using Microsoft.AspNetCore.ResponseCompression; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using MQTTnet.AspNetCore; using Newtonsoft.Json; using OnceMi.AspNetCore.OSS; using RabbitMQ.Client; @@ -410,6 +412,7 @@ public class Startup : AppStartup } }); + IOptions mqttOptions = App.GetRequiredService>(); app.UseEndpoints(endpoints => { // 注册集线器 @@ -418,6 +421,19 @@ public class Startup : AppStartup endpoints.MapControllerRoute( name: "default", pattern: "{controller=Home}/{action=Index}/{id?}"); + if (mqttOptions.Value.Enabled) + { + endpoints.MapConnectionHandler( + "/mqtt", + httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector = + protocolList => protocolList.FirstOrDefault() ?? string.Empty); + } }); + + // 注册自己业务的 Mqtt 处理器,如果有不同的认证方式,要把以下这句注释掉 + if (mqttOptions.Value.Enabled) + { + MqttHostedService.RegistMqttEventHandler(new MqttEventHandlerForAdminNet()); + } } } \ No newline at end of file