UNIVPLMDataIntegration/Admin.NET/Admin.NET.Core/Job/MqttHostedService.cs

226 lines
8.6 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
//
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
namespace Admin.NET.Core;
using Furion.Logging.Extensions;
using Microsoft.Extensions.Hosting;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// MQTT 服务
/// </summary>
public class MqttHostedService(IOptions<MqttOptions> mqttOptions, ISqlSugarClient db) : IHostedService, ISingleton
{
private const string ServerClientId = "Admin.NET.MQTT";
public static MqttServer MqttServer { get; set; }
private readonly MqttOptions _mqttOptions = mqttOptions.Value;
private readonly ISqlSugarClient _db = db;
private bool _isLogging = false;
private bool _consoleOutput = false;
static List<MqttEventHandler> mqttEventHandlers = new List<MqttEventHandler>();
public static void RegistMqttEventHandler(MqttEventHandler eh, int order)
{
eh.Order = order;
mqttEventHandlers.Add(eh);
mqttEventHandlers.Sort((a,b) => b.Order - a.Order);
}
public async void PublicMessage(string topic, string message)
{
// 创建一个 MQTT 应用消息
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.Build();
// 记录日志
Log($"服务器发布主题: {topic}, 内容:{message}");
await MqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(applicationMessage)
{
SenderClientId = _mqttOptions.MqttServerId
});
}
public async Task StartAsync(CancellationToken cancellationToken)
{
if (!_mqttOptions.Enabled) return;
_isLogging = _mqttOptions.Logging;
_consoleOutput = _mqttOptions.ConsoleOutput;
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint() // 默认地址127.0.0.1
.WithDefaultEndpointPort(_mqttOptions.Port) // 端口号
//.WithDefaultEndpointBoundIPAddress(_mqttOptions.IPAddress) // IP地址
.WithConnectionBacklog(_mqttOptions.ConnectionBacklog) // 最大连接数
.WithPersistentSessions()
.Build();
MqttServer = new MqttServerFactory().CreateMqttServer(options);
MqttServer.StartedAsync += MqttServer_StartedAsync; // 启动后事件
MqttServer.StoppedAsync += MqttServer_StoppedAsync; // 关闭后事件
MqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync; // 客户端验证事件
MqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync; // 客户端连接事件
MqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync; // 客户端断开事件
MqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync; // 订阅主题事件
MqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync; // 取消订阅事件
MqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync; // 拦截接收消息
MqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync; // 消息未被消费
await MqttServer.StartAsync();
}
/// <summary>
/// 启动后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"【MQTT】服务已启动端口{_mqttOptions.Port}...... {DateTime.Now}");
return Task.CompletedTask;
}
/// <summary>
/// 关闭后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"【MQTT】服务已关闭...... {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.StoppedAsync(arg);
}
}
/// <summary>
/// 客户端验证事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
foreach (var eh in mqttEventHandlers)
{
await eh.ValidatingConnectionAsync(arg);
if (arg.ReasonCode != MqttConnectReasonCode.Success)
break;
}
}
/// <summary>
/// 客户端连接事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Log($"客户端连接客户端ID=【{arg.ClientId}】已连接:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.ClientConnectedAsync(arg);
}
}
/// <summary>
/// 客户端断开事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private async Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Log($"客户端断开客户端ID=【{arg.ClientId}】已断开:用户名=【{arg.UserName}】地址=【{arg.RemoteEndPoint}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.ClientDisconnectedAsync(arg);
}
}
/// <summary>
/// 订阅主题事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Log($"订阅主题客户端ID=【{arg.ClientId}】订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.ClientSubscribedTopicAsync(arg);
}
}
/// <summary>
/// 取消订阅事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Log($"取消订阅客户端ID=【{arg.ClientId}】取消订阅主题=【{arg.TopicFilter}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.ClientUnsubscribedTopicAsync(arg);
}
}
/// <summary>
/// 拦截接收消息
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, _mqttOptions.MqttServerId))
return;
Log($"拦截消息客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.InterceptingPublishAsync(arg);
}
}
/// <summary>
/// 消息未被消费
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private async Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"接收消息发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】 {DateTime.Now}");
foreach (var eh in mqttEventHandlers)
{
await eh.ApplicationMessageNotConsumedAsync(arg);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
protected void Log(string msg)
{
if (_consoleOutput)
Console.WriteLine(msg);
if (_isLogging)
msg.LogDebug();
}
}