😎增加事件总线广播消息功能

This commit is contained in:
zuohuaijun 2024-07-18 17:48:15 +08:00
parent 3eb7f2e14f
commit 1faff96638

View File

@ -4,6 +4,8 @@
//
// 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
using NewLife.Caching.Queues;
using Newtonsoft.Json;
using System.Threading.Channels;
namespace Admin.NET.Core;
@ -11,6 +13,12 @@ namespace Admin.NET.Core;
/// <summary>
/// Redis自定义事件源存储器
/// </summary>
/// <remarks>
/// 在集群部署时,一般每一个消息只由一个服务节点消费一次。
/// 有些特殊情情要通知到服务器群中的每一个节点(比如需要强制加载某些配置、重点服务等)
/// 在这种情况下就要以“broadcast:”开头来定义EventId
/// 本系统会把“broadcast:”开头的事件视为“广播消息”保证集群中的每一个服务节点都能消费得到这个消息
/// </remarks>
public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
{
/// <summary>
@ -23,11 +31,9 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
/// </summary>
private readonly Channel<IEventSource> _channel;
///// <summary>
///// Redis 连接对象
///// </summary>
//private readonly FullRedis _redis;
private IProducerConsumer<ChannelEventSource> _queue;
private IProducerConsumer<ChannelEventSource> _queueSingle;
private RedisStream<string> _queueBroadcast;
/// <summary>
/// 路由键
@ -54,24 +60,48 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
//_redis = redis as FullRedis;
_routeKey = routeKey;
// 创建消息订阅者
_queue = cacheProvider.GetQueue<ChannelEventSource>(routeKey);
_eventConsumer = new EventConsumer<ChannelEventSource>(_queue);
// 创建广播消息订阅者即所有服务器节点都能收到消息用来发布重启、Reload配置等消息
FullRedis redis = (FullRedis)cacheProvider.Cache;
var clusterOpt = App.GetConfig<ClusterOptions>("Cluster", true);
_queueBroadcast = redis.GetStream<string>(routeKey + ":broadcast");
_queueBroadcast.Group = clusterOpt.ServerId;//根据服务器标识分配到不同的分组里
_queueBroadcast.Expire = TimeSpan.FromSeconds(10);//消息10秒过期
_queueBroadcast.ConsumeAsync(OnConsumeBroadcast);
// 创建队列消息订阅者,只要有一个服务节点消费了消息即可
_queueSingle = redis.GetQueue<ChannelEventSource>(routeKey + ":single");
_eventConsumer = new EventConsumer<ChannelEventSource>(_queueSingle);
// 订阅消息写入 Channel
_eventConsumer.Received += (send, cr) =>
{
// 反序列化消息
//var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr);
// 写入内存管道存储器
_channel.Writer.WriteAsync(cr);
var oriColor = Console.ForegroundColor;
ChannelEventSource ces = (ChannelEventSource)cr;
ConsumeChannelEventSource(ces);
};
// 启动消费者
_eventConsumer.Start();
}
private Task OnConsumeBroadcast(string source, Message message, CancellationToken token)
{
ChannelEventSource ces = JsonConvert.DeserializeObject<ChannelEventSource>(source);
ConsumeChannelEventSource(ces);
return Task.CompletedTask;
}
private void ConsumeChannelEventSource(ChannelEventSource ces)
{
// 打印测试事件
if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0)
{
var oriColor = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"有消息要处理{ces.EventId},{ces.Payload}");
Console.ForegroundColor = oriColor;
}
_channel.Writer.WriteAsync(ces);
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
@ -82,29 +112,28 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
{
// 空检查
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化消息
//var data = JsonSerializer.Serialize(source);
//// 获取一个订阅对象
//var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
// 异步发布
await Task.Factory.StartNew(() =>
{
//queue.Add(source);
_queue.Add(source);
if (source.EventId != null && source.EventId.StartsWith("broadcast:"))
{
string str = JsonConvert.SerializeObject(source);
_queueBroadcast.Add(str);
}
else
{
_queueSingle.Add(source);
}
}, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
}
else
{
// 这里处理动态订阅问题
// 处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}