增加MqttEventHandler类用于处理Mqtt业务相关的事件,使框架初始化的核心代码不需要根据业务的功能产生变化

This commit is contained in:
yzp 2025-02-21 15:24:35 +08:00
parent 9a4407017d
commit f4afd8e900
7 changed files with 330 additions and 81 deletions

View File

@ -1,10 +1,14 @@
{ {
"$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json", "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json",
// MQTT // MQTT
"Mqtt": { "Mqtt": {
"Enabled": false, // "MqttServerId": "MqttServer", // ClientId
"Port": "5001", "Enabled": true, //
"Logging": true, //
"ConsoleOutput": true, //
"ConnectionBacklog": 1000,
"Port": "1883",
"IPAddress": "" "IPAddress": ""
} }
} }

View File

@ -4,9 +4,11 @@
// //
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
namespace Admin.NET.Core; namespace Admin.NET.Core;
using Furion.Logging.Extensions;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using MQTTnet;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using MQTTnet.Server; using MQTTnet.Server;
using System; using System;
@ -22,17 +24,46 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
private const string ServerClientId = "Admin.NET.MQTT"; private const string ServerClientId = "Admin.NET.MQTT";
public static MqttServer MqttServer { get; set; } public static MqttServer MqttServer { get; set; }
private readonly MqttOptions _mqttOptions = mqttOptions.Value; private readonly MqttOptions _mqttOptions = mqttOptions.Value;
private readonly ISqlSugarClient _db = db; private readonly ISqlSugarClient _db = db;
private bool _isLogging = false;
private bool _consoleOutput = false;
static List<MqttEventHandler> mqttEventHandlers = new List<MqttEventHandler>();
public static void RegistMqttEventHandler(MqttEventHandler eh, int order)
{
eh.Order = order;
mqttEventHandlers.Add(eh);
mqttEventHandlers.Sort((a,b) => b.Order - a.Order);
}
public async void PublicMessage(string topic, string message)
{
// 创建一个 MQTT 应用消息
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.Build();
// 记录日志
Log($"服务器发布主题: {topic}, 内容:{message}");
await MqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(applicationMessage)
{
SenderClientId = _mqttOptions.MqttServerId
});
}
public async Task StartAsync(CancellationToken cancellationToken) public async Task StartAsync(CancellationToken cancellationToken)
{ {
if (!_mqttOptions.Enabled) return; if (!_mqttOptions.Enabled) return;
_isLogging = _mqttOptions.Logging;
_consoleOutput = _mqttOptions.ConsoleOutput;
var options = new MqttServerOptionsBuilder() var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint() // 默认地址127.0.0.1 .WithDefaultEndpoint() // 默认地址127.0.0.1
.WithDefaultEndpointPort(_mqttOptions.Port) // 端口号 .WithDefaultEndpointPort(_mqttOptions.Port) // 端口号
//.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址 //.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址
.WithConnectionBacklog(1000) // 最大连接数 .WithConnectionBacklog(_mqttOptions.ConnectionBacklog) // 最大连接数
.WithPersistentSessions() .WithPersistentSessions()
.Build(); .Build();
@ -68,10 +99,13 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_StoppedAsync(EventArgs arg) private async Task MqttServer_StoppedAsync(EventArgs arg)
{ {
Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}"); Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.StoppedAsync(arg);
}
} }
/// <summary> /// <summary>
@ -79,32 +113,14 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) private async Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{ {
arg.ReasonCode = MqttConnectReasonCode.Success; foreach (var eh in mqttEventHandlers)
{
// 验证账号 await eh.ValidatingConnectionAsync(arg);
var user = _db.Queryable<SysUser>().First(u => u.Account == arg.UserName); if (arg.ReasonCode != MqttConnectReasonCode.Success)
if (user == null) break;
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine($"客户端验证客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} ");
throw Oops.Oh("MQTT客户端验证账号不能为空或不存在");
} }
// 验证密码
var password = arg.Password;
if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString())
{
if (user.Password.Equals(MD5Encryption.Encrypt(password))) return Task.CompletedTask;
}
else
{
if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return Task.CompletedTask;
}
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine($"客户端验证客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} ");
throw Oops.Oh("MQTT客户端验证密码错误");
} }
/// <summary> /// <summary>
@ -112,10 +128,13 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) private async Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{ {
Console.WriteLine($"客户端连接客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); Log($"客户端连接客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.ClientConnectedAsync(arg);
}
} }
/// <summary> /// <summary>
@ -124,10 +143,13 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="NotImplementedException"></exception> /// <exception cref="NotImplementedException"></exception>
private Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) private async Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{ {
Console.WriteLine($"客户端断开客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}"); Log($"客户端断开客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.ClientDisconnectedAsync(arg);
}
} }
/// <summary> /// <summary>
@ -135,10 +157,13 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) private async Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{ {
Console.WriteLine($"订阅主题客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); Log($"订阅主题客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.ClientSubscribedTopicAsync(arg);
}
} }
/// <summary> /// <summary>
@ -146,10 +171,13 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) private async Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{ {
Console.WriteLine($"取消订阅客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}"); Log($"取消订阅客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.ClientUnsubscribedTopicAsync(arg);
}
} }
/// <summary> /// <summary>
@ -157,13 +185,16 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) private async Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{ {
if (string.Equals(arg.ClientId, ServerClientId)) if (string.Equals(arg.ClientId, _mqttOptions.MqttServerId))
return Task.CompletedTask; return;
Console.WriteLine($"拦截消息客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); Log($"拦截消息客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.InterceptingPublishAsync(arg);
}
} }
/// <summary> /// <summary>
@ -171,14 +202,25 @@ public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClien
/// </summary> /// </summary>
/// <param name="arg"></param> /// <param name="arg"></param>
/// <returns></returns> /// <returns></returns>
private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) private async Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{ {
Console.WriteLine($"接收消息发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}"); Console.WriteLine($"接收消息发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
return Task.CompletedTask; foreach (var eh in mqttEventHandlers)
{
await eh.ApplicationMessageNotConsumedAsync(arg);
}
} }
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
return Task.CompletedTask; return Task.CompletedTask;
}
protected void Log(string msg)
{
if (_consoleOutput)
Console.WriteLine(msg);
if (_isLogging)
msg.LogDebug();
} }
} }

View File

@ -0,0 +1,90 @@
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
using MQTTnet.Server;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Protocol;
using Furion.Logging.Extensions;
using Furion.HttpRemote;
namespace Admin.NET.Core;
/// <summary>
/// Mqtt事件拦截器
/// </summary>
/// <remarks>
/// 为了让底层的 Mqtt 代码与业务层代码分离,应该不同的业务继函 MqttEventHandler 类,实现自己的代码后
/// 在 Startup.cs 的 Configure 函数中,调用 MqttHostedService 对象的 RegistMqttEventHandler 函数,来注册自己的事件处理器
/// “业务相关的代码千万不要写在 MqttHostedService 中”
/// </remarks>
public class MqttEventHandler
{
/// <summary>
/// 排序,数据越大越先执行
/// </summary>
public int Order = 0;
protected void Log(string msg)
{
var mqttOptions = App.GetOptions<MqttOptions>();
if (mqttOptions.ConsoleOutput)
Console.WriteLine(msg);
if (mqttOptions.Logging)
msg.LogDebug();
}
public virtual async Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
}
public virtual async Task StartedAsync(EventArgs arg)
{
}
public virtual async Task StoppedAsync(EventArgs arg)
{
}
/// <summary>
/// 客户端发布的数据
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual async Task InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
}
/// <summary>
/// 没有客户端接收的数据
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual async Task ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
}
public virtual async Task ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
}
public virtual async Task ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
}
public virtual async Task ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
}
public virtual async Task ClientConnectedAsync(ClientConnectedEventArgs arg)
{
}
}

View File

@ -0,0 +1,43 @@
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Admin.NET.Core;
public class MqttEventHandlerForAdminNet : MqttEventHandler
{
public override async Task ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
ISqlSugarClient _db = App.GetRequiredService<ISqlSugarClient>();
// 验证账号
var user = _db.Queryable<SysUser>().First(u => u.Account == arg.UserName);
if (user == null)
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Log($"客户端验证客户端ID=【{arg.ClientId}】用户名不存在 {DateTime.Now} ");
return;
}
// 验证密码
var password = arg.Password;
if (CryptogramUtil.CryptoType == CryptogramEnum.MD5.ToString())
{
if (user.Password.Equals(MD5Encryption.Encrypt(password))) return;
}
else
{
if (CryptogramUtil.Decrypt(user.Password).Equals(password)) return;
}
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Log($"客户端验证客户端ID=【{arg.ClientId}】用户名或密码验证错误 {DateTime.Now} ");
}
}

View File

@ -1,28 +1,48 @@
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 // Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
// //
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
// //
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任! // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
namespace Admin.NET.Core; namespace Admin.NET.Core;
/// <summary> /// <summary>
/// MQTT 配置选项 /// MQTT 配置选项
/// </summary> /// </summary>
public sealed class MqttOptions : IConfigurableOptions public sealed class MqttOptions : IConfigurableOptions
{ {
/// <summary> /// <summary>
/// 是否启用 /// 服务器主动发布时用的ClientId
/// </summary> /// </summary>
public bool Enabled { get; set; } public string MqttServerId { get; set; }
/// <summary> /// <summary>
/// 端口 /// 是否启用
/// </summary> /// </summary>
public int Port { get; set; } public bool Enabled { get; set; }
/// <summary> /// <summary>
/// IP地址 /// 输出文件日志
/// </summary> /// </summary>
public string IPAddress { get; set; } public bool Logging { get; set; }
/// <summary>
/// 控制台输出
/// </summary>
public bool ConsoleOutput { get; set; }
/// <summary>
/// ConnectionBacklog
/// </summary>
public int ConnectionBacklog { get; set; }
/// <summary>
/// 端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// IP地址
/// </summary>
public string IPAddress { get; set; }
} }

View File

@ -0,0 +1,34 @@
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
using Admin.NET.Core.Service.Mqtt.Dto;
namespace Admin.NET.Core.Service;
/// <summary>
/// 系统职位服务 🧩
/// </summary>
[ApiDescriptionSettings(Order = 460, Description = "MQTT 服务")]
public class MqttService : IDynamicApiController, ITransient
{
MqttHostedService _mqttHostedService;
public MqttService(MqttHostedService mqttProvider)
{
_mqttHostedService = mqttProvider;
}
/// <summary>
/// 发布消息 🔖
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
[DisplayName("发布消息")]
public async Task PublicMessage(PublicMessageInput input)
{
_mqttHostedService.PublicMessage(input.Topic, input.Message);
}
}

View File

@ -22,6 +22,8 @@ using Microsoft.AspNetCore.ResponseCompression;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using MQTTnet.AspNetCore;
using Newtonsoft.Json; using Newtonsoft.Json;
using OnceMi.AspNetCore.OSS; using OnceMi.AspNetCore.OSS;
using RabbitMQ.Client; using RabbitMQ.Client;
@ -410,6 +412,7 @@ public class Startup : AppStartup
} }
}); });
IOptions<MqttOptions> mqttOptions = App.GetRequiredService<IOptions<MqttOptions>>();
app.UseEndpoints(endpoints => app.UseEndpoints(endpoints =>
{ {
// 注册集线器 // 注册集线器
@ -418,6 +421,19 @@ public class Startup : AppStartup
endpoints.MapControllerRoute( endpoints.MapControllerRoute(
name: "default", name: "default",
pattern: "{controller=Home}/{action=Index}/{id?}"); pattern: "{controller=Home}/{action=Index}/{id?}");
if (mqttOptions.Value.Enabled)
{
endpoints.MapConnectionHandler<MqttConnectionHandler>(
"/mqtt",
httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
protocolList => protocolList.FirstOrDefault() ?? string.Empty);
}
}); });
// 注册自己业务的 Mqtt 处理器,如果有不同的认证方式,要把以下这句注释掉
if (mqttOptions.Value.Enabled)
{
MqttHostedService.RegistMqttEventHandler(new MqttEventHandlerForAdminNet());
}
} }
} }