From 89949d4dc7fc94c50754f67519402c13adde1d54 Mon Sep 17 00:00:00 2001 From: zuohuaijun Date: Mon, 22 Jul 2024 22:52:56 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=98=8E=E4=BB=A3=E7=A0=81=E6=B8=85?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Admin.NET.Core/Admin.NET.Core.csproj | 2 +- .../Admin.NET.Core/Attribute/DictAttribute.cs | 2 +- .../EventBus/RedisEventSourceStorer.cs | 34 +++++++++++-------- .../Service/Wechat/SysWechatPayService.cs | 12 +++---- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj index 7fe0210b..0930f88d 100644 --- a/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj +++ b/Admin.NET/Admin.NET.Core/Admin.NET.Core.csproj @@ -38,7 +38,7 @@ - + diff --git a/Admin.NET/Admin.NET.Core/Attribute/DictAttribute.cs b/Admin.NET/Admin.NET.Core/Attribute/DictAttribute.cs index d252cf03..84e52f64 100644 --- a/Admin.NET/Admin.NET.Core/Attribute/DictAttribute.cs +++ b/Admin.NET/Admin.NET.Core/Attribute/DictAttribute.cs @@ -40,7 +40,7 @@ public class DictAttribute : ValidationAttribute, ITransient // 是否忽略空字符串 if (AllowEmptyStrings && string.IsNullOrEmpty(valueAsString)) return ValidationResult.Success; - var sysDictDataServiceProvider = validationContext.GetRequiredService(); + var sysDictDataServiceProvider = App.GetRequiredService(); var dictDataList = sysDictDataServiceProvider.GetDataList(DictTypeCode).Result; // 使用HashSet来提高查找效率 diff --git a/Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs b/Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs index 475bdf41..f507e7ba 100644 --- a/Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs +++ b/Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs @@ -1,4 +1,4 @@ -// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 +// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。 // // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。 // @@ -35,10 +35,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable private RedisStream _queueBroadcast; - /// - /// 路由键 - /// - private readonly string _routeKey; + private ILogger _logger; /// /// 构造函数 @@ -48,6 +45,8 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable /// 存储器最多能够处理多少消息,超过该容量进入等待写入 public RedisEventSourceStorer(ICacheProvider cacheProvider, string routeKey, int capacity) { + _logger = App.GetRequiredService>(); + // 配置通道,设置超出默认容量后进入等待 var boundedChannelOptions = new BoundedChannelOptions(capacity) { @@ -58,7 +57,6 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable _channel = Channel.CreateBounded(boundedChannelOptions); //_redis = redis as FullRedis; - _routeKey = routeKey; // 创建广播消息订阅者,即所有服务器节点都能收到消息(用来发布重启、Reload配置等消息) FullRedis redis = (FullRedis)cacheProvider.Cache; @@ -73,23 +71,29 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable _eventConsumer = new EventConsumer(_queueSingle); // 订阅消息写入 Channel - _eventConsumer.Received += (send, cr) => + _eventConsumer.Received += async (send, cr) => { - var oriColor = Console.ForegroundColor; - ChannelEventSource ces = (ChannelEventSource)cr; - ConsumeChannelEventSource(ces); + // var oriColor = Console.ForegroundColor; + try + { + ChannelEventSource ces = (ChannelEventSource)cr; + await ConsumeChannelEventSourceAsync(ces, ces.CancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "处理Received中的消息产生错误!"); + } }; _eventConsumer.Start(); } - private Task OnConsumeBroadcast(string source, Message message, CancellationToken token) + private async Task OnConsumeBroadcast(string source, Message message, CancellationToken token) { ChannelEventSource ces = JsonConvert.DeserializeObject(source); - ConsumeChannelEventSource(ces); - return Task.CompletedTask; + await ConsumeChannelEventSourceAsync(ces, token); } - private void ConsumeChannelEventSource(ChannelEventSource ces) + private async Task ConsumeChannelEventSourceAsync(ChannelEventSource ces, CancellationToken cancel = default) { // 打印测试事件 if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0) @@ -99,7 +103,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable Console.WriteLine($"有消息要处理{ces.EventId},{ces.Payload}"); Console.ForegroundColor = oriColor; } - _channel.Writer.WriteAsync(ces); + await _channel.Writer.WriteAsync(ces, cancel); } /// diff --git a/Admin.NET/Admin.NET.Core/Service/Wechat/SysWechatPayService.cs b/Admin.NET/Admin.NET.Core/Service/Wechat/SysWechatPayService.cs index 4220ac1d..04dd1969 100644 --- a/Admin.NET/Admin.NET.Core/Service/Wechat/SysWechatPayService.cs +++ b/Admin.NET/Admin.NET.Core/Service/Wechat/SysWechatPayService.cs @@ -19,10 +19,10 @@ public class SysWechatPayService : IDynamicApiController, ITransient private readonly WechatTenpayClient _wechatTenpayClient; - public SysWechatPayService(SqlSugarRepository sysWechatPayRep - , SqlSugarRepository sysWechatRefundRep - , IOptions wechatPayOptions - , IOptions payCallBackOptions) + public SysWechatPayService(SqlSugarRepository sysWechatPayRep, + SqlSugarRepository sysWechatRefundRep, + IOptions wechatPayOptions, + IOptions payCallBackOptions) { _sysWechatPayRep = sysWechatPayRep; this._sysWechatRefundRep = sysWechatRefundRep; @@ -378,7 +378,7 @@ public class SysWechatPayService : IDynamicApiController, ITransient wechatPay.TradeType = response.TradeType; // 交易类型 wechatPay.TradeState = response.TradeState; // 交易状态 wechatPay.TradeStateDescription = response.TradeStateDescription; // 交易状态描述 - wechatPay.BankType = response.BankType; // 付款银行类型 + wechatPay.BankType = response.BankType; // 付款银行类型 wechatPay.PayerTotal = response.Amount?.PayerTotal; // 用户支付金额 wechatPay.SuccessTime = response.SuccessTime; // 支付完成时间 await _sysWechatPayRep.AsUpdateable(wechatPay).IgnoreColumns(true).ExecuteCommandAsync(); @@ -420,7 +420,7 @@ public class SysWechatPayService : IDynamicApiController, ITransient wechatPay.TradeType = response.TradeType; // 交易类型 wechatPay.TradeState = response.TradeState; // 交易状态 wechatPay.TradeStateDescription = response.TradeStateDescription; // 交易状态描述 - wechatPay.BankType = response.BankType; // 付款银行类型 + wechatPay.BankType = response.BankType; // 付款银行类型 wechatPay.PayerTotal = response.Amount?.PayerTotal; // 用户支付金额 wechatPay.SuccessTime = response.SuccessTime; // 支付完成时间 await _sysWechatPayRep.AsUpdateable(wechatPay).IgnoreColumns(true).ExecuteCommandAsync();