diff --git a/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json index 427640ef..18d14166 100644 --- a/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json +++ b/Admin.NET/Admin.NET.Application/Configuration/Mqtt.json @@ -1,10 +1,13 @@ -{ +{ "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json", // MQTT 配置 "Mqtt": { "Enabled": false, // 是否开启 - "Port": "5001", - "IPAddress": "" + "Port": "1883", // 端口 + "IPAddress": "", // IP地址 + "ConnectionBacklog": 1000, // 最大连接数 + "MqttServerId": "Admin.NET.MQTT", // 服务器主动发消息时的ClientId + "Logging": false // 记录日志 } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj index e349147c..c4570d6a 100644 --- a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj +++ b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj @@ -14,20 +14,20 @@ - + - - - + + + - + @@ -40,7 +40,7 @@ - + @@ -48,7 +48,7 @@ - + diff --git a/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs b/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs new file mode 100644 index 00000000..250952ed --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs @@ -0,0 +1,239 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +using Microsoft.Extensions.Hosting; +using MQTTnet; +using MQTTnet.Protocol; +using MQTTnet.Server; +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +/// +/// MQTT 服务 +/// +public class MqttHostedService(IOptions mqttOptions) : IHostedService, ISingleton +{ + private readonly MqttOptions _mqttOptions = mqttOptions.Value; + public static MqttServer MqttServer { get; set; } + public static readonly List MqttEventInterceptors = []; // MQTT 事件拦截器集合 + + /// + /// 注册 MQTT 事件拦截器 + /// + /// + /// + public static void AddMqttEventInterceptor(MqttEventInterceptor mqttEventInterceptor, int order = 0) + { + mqttEventInterceptor.Order = order; + MqttEventInterceptors.Add(mqttEventInterceptor); + MqttEventInterceptors.Sort((a, b) => b.Order - a.Order); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + if (!_mqttOptions.Enabled) return; + + // 注册 MQTT 自定义客户端验证事件拦截器 + AddMqttEventInterceptor(new DefaultMqttEventInterceptor()); + + var options = new MqttServerOptionsBuilder() + .WithDefaultEndpoint() // 默认地址127.0.0.1 + .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 + //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 + .WithConnectionBacklog(_mqttOptions.ConnectionBacklog) // 最大连接数 + .WithPersistentSessions() + .Build(); + + MqttServer = new MqttServerFactory().CreateMqttServer(options); + MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件 + MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件 + + MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件 + MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件 + MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件 + + MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件 + MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件 + MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息 + MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费 + + await MqttServer.StartAsync(); + } + + /// + /// 启动后事件 + /// + /// + /// + private Task MqttServer_StartedAsync(EventArgs arg) + { + Console.WriteLine($"【MQTT】服务已启动,端口:{_mqttOptions.Port}...... {DateTime.Now}"); + return Task.CompletedTask; + } + + /// + /// 关闭后事件 + /// + /// + /// + private async Task MqttServer_StoppedAsync(EventArgs arg) + { + Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); + foreach (var eh in MqttEventInterceptors) + { + await eh.StoppedAsync(arg); + } + } + + /// + /// 客户端验证事件 + /// + /// + /// + private async Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ValidatingConnectionAsync(arg); + if (arg.ReasonCode != MqttConnectReasonCode.Success) + break; + } + } + + /// + /// 客户端连接事件 + /// + /// + /// + private async Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ClientConnectedAsync(arg); + } + + Logging($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + } + + /// + /// 客户端断开事件 + /// + /// + /// + /// + private async Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ClientDisconnectedAsync(arg); + } + + Logging($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); + } + + /// + /// 订阅主题事件 + /// + /// + /// + private async Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ClientSubscribedTopicAsync(arg); + } + + Logging($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + } + + /// + /// 取消订阅事件 + /// + /// + /// + private async Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ClientUnsubscribedTopicAsync(arg); + } + + Logging($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); + } + + /// + /// 拦截发布的消息事件 + /// + /// + /// + private async Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) + { + if (string.Equals(arg.ClientId, _mqttOptions.MqttServerId)) + return; + + foreach (var eh in MqttEventInterceptors) + { + await eh.InterceptingPublishAsync(arg); + } + + Logging($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); + } + + /// + /// 未被消费的消息事件 + /// + /// + /// + private async Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) + { + foreach (var eh in MqttEventInterceptors) + { + await eh.ApplicationMessageNotConsumedAsync(arg); + } + + Logging($"接收消息:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); + } + + /// + /// 发布主题消息 + /// + /// + /// + public async Task PublicMessageAsync(string topic, string message) + { + var applicationMessage = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(message) + .Build(); + + await MqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(applicationMessage) + { + SenderClientId = _mqttOptions.MqttServerId, + SenderUserName = _mqttOptions.MqttServerId, + }); + + Logging($"服务器发布主题:{topic}, 内容:{message}"); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + /// + /// 输出日志 + /// + /// + protected void Logging(string msg) + { + if (!_mqttOptions.Logging) return; + LoggingWriter.LogInformation(msg); + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Mqtt/DefaultMqttEventInterceptor.cs b/Admin.NET/Admin.NET.Core/Mqtt/DefaultMqttEventInterceptor.cs new file mode 100644 index 00000000..f77a0283 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Mqtt/DefaultMqttEventInterceptor.cs @@ -0,0 +1,47 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using MQTTnet.Protocol; +using MQTTnet.Server; + +namespace Admin.NET.Core; + +/// +/// 默认 MQTT 事件拦截器 +/// +public class DefaultMqttEventInterceptor : MqttEventInterceptor +{ + public new int Order = int.MinValue; + + public override Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + var _db = App.GetRequiredService(); + // 验证账号 + var user = _db.Queryable().First(u => u.Account == arg.UserName); + if (user == null) + { + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Logging($"客户端验证:客户端ID=【{arg.ClientId}】用户名不存在 {DateTime.Now} "); + return Task.CompletedTask; + } + + // 验证密码 + var password = arg.Password; + if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) + { + if (user.Password.Equals(MD5Encryption.Encrypt(password))) + return Task.CompletedTask; + } + else + { + if (CryptogramUtil.Decrypt(user.Password).Equals(password)) + return Task.CompletedTask; + } + arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; + Logging($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Mqtt/MqttEventInterceptor.cs b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventInterceptor.cs new file mode 100644 index 00000000..1442ddd0 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Mqtt/MqttEventInterceptor.cs @@ -0,0 +1,120 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using MQTTnet.Server; + +namespace Admin.NET.Core; + +/// +/// MQTT 事件拦截器 +/// +public class MqttEventInterceptor +{ + /// + /// 数值越大越先执行 + /// + public int Order = 0; + + /// + /// 启动后事件 + /// + /// + /// + public virtual async Task StartedAsync(EventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 关闭后事件 + /// + /// + /// + public virtual async Task StoppedAsync(EventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 客户端验证事件 + /// + /// + /// + public virtual async Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 客户端连接事件 + /// + /// + /// + public virtual async Task ClientConnectedAsync(ClientConnectedEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 客户端断开事件 + /// + /// + /// + public virtual async Task ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 订阅主题事件 + /// + /// + /// + public virtual async Task ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 取消订阅事件 + /// + /// + /// + public virtual async Task ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 拦截发布的消息事件 + /// + /// + /// + public virtual async Task InterceptingPublishAsync(InterceptingPublishEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 未被消费的消息事件 + /// + /// + /// + public virtual async Task ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) + { + await Task.CompletedTask; + } + + /// + /// 输出日志事件 + /// + /// + protected static void Logging(string msg) + { + if (!App.GetOptions().Logging) return; + LoggingWriter.LogInformation(msg); + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs b/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs deleted file mode 100644 index 2fc64a1d..00000000 --- a/Admin.NET/Admin.NET.Core/Mqtt/MqttHostedService.cs +++ /dev/null @@ -1,189 +0,0 @@ -// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 -// -// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 -// -// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! - -namespace Admin.NET.Core; - -using Microsoft.Extensions.Hosting; -using MQTTnet.Protocol; -using MQTTnet.Server; -using System; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -/// -/// MQTT 服务 -/// -public class MqttHostedService(IOptions mqttOptions, ISqlSugarClient db) : IHostedService, IDisposable -{ - private const string ServerClientId = "Admin.NET.MQTT"; - public MqttServer MqttServer { get; set; } - private readonly MqttOptions _mqttOptions = mqttOptions.Value; - private readonly ISqlSugarClient _db = db; - - public async Task StartAsync(CancellationToken cancellationToken) - { - if (!_mqttOptions.Enabled) return; - - var options = new MqttServerOptionsBuilder() - .WithDefaultEndpoint() // 默认地址127.0.0.1 - .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 - //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 - .WithConnectionBacklog(1000) // 最大连接数 - .WithPersistentSessions() - .Build(); - - MqttServer = new MqttServerFactory().CreateMqttServer(options); - MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件 - MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件 - - MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件 - MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件 - MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件 - - MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件 - MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件 - MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息 - MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费 - - await MqttServer.StartAsync(); - } - - /// - /// 启动后事件 - /// - /// - /// - private Task MqttServer_StartedAsync(EventArgs arg) - { - Console.WriteLine($"【MQTT】服务已启动,端口:{_mqttOptions.Port}...... {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 关闭后事件 - /// - /// - /// - private Task MqttServer_StoppedAsync(EventArgs arg) - { - Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 客户端验证事件 - /// - /// - /// - private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) - { - arg.ReasonCode = MqttConnectReasonCode.Success; - - // 验证账号 - var user = _db.Queryable().First(u => u.Account == arg.UserName); - if (user == null) - { - arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; - Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); - throw Oops.Oh("MQTT客户端验证:账号不能为空或不存在!"); - } - - // 验证密码 - var password = arg.Password; - if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString()) - { - if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask; - } - else - { - if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask; - } - arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; - Console.WriteLine($"客户端验证:客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} "); - throw Oops.Oh("MQTT客户端验证:密码错误!"); - } - - /// - /// 客户端连接事件 - /// - /// - /// - private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) - { - Console.WriteLine($"客户端连接:客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 客户端断开事件 - /// - /// - /// - /// - private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) - { - Console.WriteLine($"客户端断开:客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 订阅主题事件 - /// - /// - /// - private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) - { - Console.WriteLine($"订阅主题:客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 取消订阅事件 - /// - /// - /// - private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) - { - Console.WriteLine($"取消订阅:客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 拦截接收消息 - /// - /// - /// - private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) - { - if (string.Equals(arg.ClientId, ServerClientId)) - return Task.CompletedTask; - - Console.WriteLine($"拦截消息:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - /// - /// 消息未被消费 - /// - /// - /// - private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) - { - Console.WriteLine($"接收消息:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - public void Dispose() - { - throw new NotImplementedException(); - } -} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs index 69a4fa2f..873a5d6a 100644 --- a/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs +++ b/Admin.NET/Admin.NET.Core/Option/MqttOptions.cs @@ -1,28 +1,43 @@ -// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 -// -// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 -// -// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! - -namespace Admin.NET.Core; - -/// -/// MQTT 配置选项 -/// -public sealed class MqttOptions : IConfigurableOptions -{ - /// - /// 是否启用 - /// - public bool Enabled { get; set; } - - /// - /// 端口 - /// - public int Port { get; set; } - - /// - /// IP地址 - /// - public string IPAddress { get; set; } +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +/// +/// MQTT 配置选项 +/// +public sealed class MqttOptions : IConfigurableOptions +{ + /// + /// 是否启用 + /// + public bool Enabled { get; set; } + + /// + /// 端口 + /// + public int Port { get; set; } + + /// + /// IP地址 + /// + public string IPAddress { get; set; } + + /// + /// 最大连接数 + /// + public int ConnectionBacklog { get; set; } + + /// + /// 服务器主动发消息时的ClientId + /// + public string MqttServerId { get; set; } + + /// + /// 输出日志 + /// + public bool Logging { get; set; } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Service/Mqtt/Dto/MessageInput.cs b/Admin.NET/Admin.NET.Core/Service/Mqtt/Dto/MessageInput.cs new file mode 100644 index 00000000..3d105392 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Service/Mqtt/Dto/MessageInput.cs @@ -0,0 +1,23 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +namespace Admin.NET.Core; + +/// +/// 发布主题消息 +/// +public class PublicMessageInput +{ + /// + /// 主题名称 + /// + public string Topic { get; set; } + + /// + /// 消息内容 + /// + public string Message { get; set; } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Service/Mqtt/SysMqttService.cs b/Admin.NET/Admin.NET.Core/Service/Mqtt/SysMqttService.cs new file mode 100644 index 00000000..bde16576 --- /dev/null +++ b/Admin.NET/Admin.NET.Core/Service/Mqtt/SysMqttService.cs @@ -0,0 +1,41 @@ +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// +// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 +// +// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! + +using MQTTnet.Server; + +namespace Admin.NET.Core.Service; + +/// +/// 系统 MQTT 服务 🧩 +/// +[ApiDescriptionSettings(Order = 90, Description = "MQTT 服务")] +public class SysMqttService() : IDynamicApiController, ITransient +{ + /// + /// 获取客户端列表 🔖 + /// + /// + [DisplayName("获取客户端列表")] + public async Task> GetClients() + { + if (MqttHostedService.MqttServer == null) + throw Oops.Oh("【MQTT】服务未启动"); + + return await MqttHostedService.MqttServer.GetClientsAsync(); + } + + /// + /// 发布主题消息 🔖 + /// + /// + /// + [DisplayName("发布主题消息")] + public async Task PublicMessage(PublicMessageInput input) + { + var mqttHostedService = App.GetRequiredService(); + await mqttHostedService.PublicMessageAsync(input.Topic, input.Message); + } +} \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Core/Service/OnlineUser/SysOnlineUserService.cs b/Admin.NET/Admin.NET.Core/Service/OnlineUser/SysOnlineUserService.cs index 44a3da5b..c6441b0f 100644 --- a/Admin.NET/Admin.NET.Core/Service/OnlineUser/SysOnlineUserService.cs +++ b/Admin.NET/Admin.NET.Core/Service/OnlineUser/SysOnlineUserService.cs @@ -119,8 +119,7 @@ public class SysOnlineUserService : IDynamicApiController, ITransient if (await _sysConfigService.GetConfigValueByCode(ConfigConst.SysSingleLogin)) return; // 相同账号最后登录的用户Id集合 - var onlineUserRecords = await _sysOnlineUerRep.AsQueryable() - .GroupBy(u => u.UserId) + var onlineUsers = await _sysOnlineUerRep.AsQueryable().GroupBy(u => u.UserId) .Select(u => new { UserId = u.UserId, @@ -128,9 +127,10 @@ public class SysOnlineUserService : IDynamicApiController, ITransient Id = SqlFunc.AggregateMax(u.Id) }) .ToListAsync(); - var onlineUserIds = onlineUserRecords.Select(x => x.Id).ToList(); + if (onlineUsers.Count < 1) return; // 无效登录用户集合 + var onlineUserIds = onlineUsers.Select(u => u.Id).ToList(); var offlineUsers = await _sysOnlineUerRep.AsQueryable().Where(u => !onlineUserIds.Contains(u.Id)).ToListAsync(); foreach (var user in offlineUsers) { diff --git a/Admin.NET/Admin.NET.Web.Core/Admin.NET.Web.Core.csproj b/Admin.NET/Admin.NET.Web.Core/Admin.NET.Web.Core.csproj index 9a5475c5..4bb365c3 100644 --- a/Admin.NET/Admin.NET.Web.Core/Admin.NET.Web.Core.csproj +++ b/Admin.NET/Admin.NET.Web.Core/Admin.NET.Web.Core.csproj @@ -11,6 +11,7 @@ + diff --git a/Admin.NET/Admin.NET.Web.Core/Startup.cs b/Admin.NET/Admin.NET.Web.Core/Startup.cs index 487ffc0f..b7b2ad8e 100644 --- a/Admin.NET/Admin.NET.Web.Core/Startup.cs +++ b/Admin.NET/Admin.NET.Web.Core/Startup.cs @@ -22,6 +22,7 @@ using Microsoft.AspNetCore.ResponseCompression; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Hosting; +using MQTTnet.AspNetCore; using Newtonsoft.Json; using OnceMi.AspNetCore.OSS; using RabbitMQ.Client; @@ -106,7 +107,8 @@ public class Startup : AppStartup // setting.MetadataPropertyHandling = MetadataPropertyHandling.Ignore; // 解决DateTimeOffset异常 // setting.DateParseHandling = DateParseHandling.None; // 解决DateTimeOffset异常 // setting.Converters.Add(new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal }); // 解决DateTimeOffset异常 - }; + } + ; services.AddControllersWithViews() .AddAppLocalization() @@ -410,14 +412,22 @@ public class Startup : AppStartup } }); + var mqttOptions = App.GetConfig("Mqtt", true); app.UseEndpoints(endpoints => { // 注册集线器 endpoints.MapHubs(); - + // 注册路由 endpoints.MapControllerRoute( name: "default", pattern: "{controller=Home}/{action=Index}/{id?}"); + // 注册 MQTT 支持 WebSocket + if (mqttOptions.Enabled) + { + endpoints.MapConnectionHandler("/mqtt", + httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector = + protocolList => protocolList.FirstOrDefault() ?? string.Empty); + } }); } } \ No newline at end of file diff --git a/Admin.NET/Admin.NET.Web.Entry/wwwroot/template/web_views_List.vue.vm b/Admin.NET/Admin.NET.Web.Entry/wwwroot/template/web_views_List.vue.vm index 19957ec7..171b618e 100644 --- a/Admin.NET/Admin.NET.Web.Entry/wwwroot/template/web_views_List.vue.vm +++ b/Admin.NET/Admin.NET.Web.Entry/wwwroot/template/web_views_List.vue.vm @@ -165,7 +165,7 @@ @: } else if(@column.EffectType == "DictSelector") { @: } else if(@column.EffectType == "EnumSelector") { @: @@ -43,7 +43,7 @@