Merge pull request 'RabbitMQ添加多消费者支持,实现多线程消费' (#429) from eithday/Admin.NET.Pro:v2 into v2
Reviewed-on: https://code.adminnet.top/Admin.NET/Admin.NET.Pro/pulls/429
This commit is contained in:
commit
e943e8afae
@ -14,5 +14,10 @@
|
|||||||
"HostName": "127.0.0.1",
|
"HostName": "127.0.0.1",
|
||||||
"Port": 5672,
|
"Port": 5672,
|
||||||
"VirtualHost": "/"
|
"VirtualHost": "/"
|
||||||
|
},
|
||||||
|
|
||||||
|
//RabbitMQ消费者配置
|
||||||
|
"RabbitMqConsumerOptions": {
|
||||||
|
"ConsumerCount": 1 // 消费者数量,多消费者实现多线程消费队列
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4,9 +4,11 @@
|
|||||||
//
|
//
|
||||||
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
|
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
|
||||||
|
|
||||||
|
using Admin.NET.Core;
|
||||||
using Microsoft.AspNetCore.Builder;
|
using Microsoft.AspNetCore.Builder;
|
||||||
using Microsoft.AspNetCore.Hosting;
|
using Microsoft.AspNetCore.Hosting;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using NewLife.Reflection;
|
||||||
|
|
||||||
namespace Admin.NET.Application;
|
namespace Admin.NET.Application;
|
||||||
|
|
||||||
@ -15,8 +17,11 @@ public class Startup : AppStartup
|
|||||||
{
|
{
|
||||||
public void ConfigureServices(IServiceCollection services)
|
public void ConfigureServices(IServiceCollection services)
|
||||||
{
|
{
|
||||||
|
var option = App.GetConfig<RabbitMqConsumerOptions>("RabbitMqConsumerOptions");
|
||||||
|
var consumerCount = option.ConsumerCount.ToIntOrDefault(1);
|
||||||
|
|
||||||
// 注册RabbitMQ服务
|
// 注册RabbitMQ服务
|
||||||
services.AddRabbitMQ<RabbitMqHandler>();
|
services.AddRabbitMQ<RabbitMqHandler>(consumerCount);
|
||||||
|
|
||||||
// 后台服务异常,不影响主程序(配置的服务器连接异常)
|
// 后台服务异常,不影响主程序(配置的服务器连接异常)
|
||||||
services.Configure<HostOptions>(options =>
|
services.Configure<HostOptions>(options =>
|
||||||
|
|||||||
@ -136,4 +136,26 @@ public static class StringExtension
|
|||||||
return paramDict.TryGetValue(key, out string value) ? value : string.Empty;
|
return paramDict.TryGetValue(key, out string value) ? value : string.Empty;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// string转int
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="str"></param>
|
||||||
|
/// <param name="defaultValue"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static int ToIntOrDefault(this string? str, int defaultValue = 0)
|
||||||
|
{
|
||||||
|
return int.TryParse(str, out var value) ? value : defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// string转long
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="str"></param>
|
||||||
|
/// <param name="defaultValue"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static long ToLongOrDefault(this string? str, long defaultValue = 0)
|
||||||
|
{
|
||||||
|
return long.TryParse(str, out var value) ? value : defaultValue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -12,8 +12,9 @@ public static class RabbitMQSetup
|
|||||||
/// 注册RabbitMQ
|
/// 注册RabbitMQ
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="services"></param>
|
/// <param name="services"></param>
|
||||||
|
/// <param name="consumerCount">消费者数量,默认1个</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static void AddRabbitMQ<THandler>(this IServiceCollection services)
|
public static void AddRabbitMQ<THandler>(this IServiceCollection services, int consumerCount = 1)
|
||||||
where THandler : class, IMessageHandler
|
where THandler : class, IMessageHandler
|
||||||
{
|
{
|
||||||
if (App.GetConfig<string>("EventBus:EventSourceType", true) != "RabbitMQ") return;
|
if (App.GetConfig<string>("EventBus:EventSourceType", true) != "RabbitMQ") return;
|
||||||
@ -21,6 +22,11 @@ public static class RabbitMQSetup
|
|||||||
services.AddSingleton<IMessageHandler, THandler>();
|
services.AddSingleton<IMessageHandler, THandler>();
|
||||||
services.AddSingleton<RabbitMqConnection>();
|
services.AddSingleton<RabbitMqConnection>();
|
||||||
services.AddSingleton<RabbitMqProducer>();
|
services.AddSingleton<RabbitMqProducer>();
|
||||||
services.AddHostedService<RabbitMqConsumer>();
|
|
||||||
|
// 注册多个消费者服务
|
||||||
|
for (int i = 0; i < consumerCount; i++)
|
||||||
|
{
|
||||||
|
services.AddHostedService<RabbitMqConsumer>();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
18
Admin.NET/Admin.NET.Core/RabbitMQ/RabbitMqConsumerOptions.cs
Normal file
18
Admin.NET/Admin.NET.Core/RabbitMQ/RabbitMqConsumerOptions.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
|
||||||
|
//
|
||||||
|
// 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
|
||||||
|
//
|
||||||
|
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
|
||||||
|
|
||||||
|
namespace Admin.NET.Core;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ消费者配置
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqConsumerOptions : IConfigurableOptions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 消费者数量
|
||||||
|
/// </summary>
|
||||||
|
public string ConsumerCount { get; set; }
|
||||||
|
}
|
||||||
@ -36,6 +36,7 @@ public static class ProjectOptions
|
|||||||
services.AddConfigurableOptions<OAuthOptions>();
|
services.AddConfigurableOptions<OAuthOptions>();
|
||||||
services.AddConfigurableOptions<SMSOptions>();
|
services.AddConfigurableOptions<SMSOptions>();
|
||||||
services.AddConfigurableOptions<RabbitMqOptions>();
|
services.AddConfigurableOptions<RabbitMqOptions>();
|
||||||
|
services.AddConfigurableOptions<RabbitMqConsumerOptions>();
|
||||||
services.AddConfigurableOptions<AlipayOptions>();
|
services.AddConfigurableOptions<AlipayOptions>();
|
||||||
services.AddConfigurableOptions<MqttOptions>();
|
services.AddConfigurableOptions<MqttOptions>();
|
||||||
services.AddConfigurableOptions<HttpRemotesOptions>();
|
services.AddConfigurableOptions<HttpRemotesOptions>();
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user