前言:
当前看官们对“net项目架构”大致比较关心,看官们都需要学习一些“net项目架构”的相关资讯。那么小编同时在网络上网罗了一些对于“net项目架构””的相关知识,希望同学们能喜欢,各位老铁们快快来学习一下吧!在我最近的项目中,我面临着将 Azure Blob 存储中的数据与本地文件存储同步的挑战。 然而,在同步时,应该对 blob 或文件的元数据应用多个过滤和转换,例如 按特定位置过滤,将文件路径转换为 blob 路由等……
在勾勒出数据流之后,我很快意识到,过滤和转换只是简单的链式操作,就像在管道中一样。 我决定使用管道和过滤器模式作为架构的应用程序逻辑部分,但在 .NET 中实现管道的最佳方法是什么?
我们想要完成的是将几个任务分解为一系列逻辑步骤。 此外,我们的目标是在每个单独的步骤中遵循单一职责原则。 一个步骤应该只关注一件事。 如果我们需要其他东西,我们只需在两者之间添加另一个步骤。 同样非常可取的是,让管道可以通过依赖注入,同时保持可配置。
综上所述,我们要完成以下需求:
一种按逻辑顺序快速设置管道的方法。管道可以使用依赖注入来构造。管道应该是可配置的。管道应该将一个事件流作为输入并输出另一个事件流。 事件类型可能会因此发生变化。方法
在研究过程中,我发现了很多在 .NET 中实现管道架构的方法,但在我看来,它们都过于复杂且不够灵活。 这就是为什么我决定为 .NET 重塑这种架构风格,使用 Reactive Extensions 使其更加简洁和直观。
根据我的经验,处理事件流的最佳方法是 Reactive Extensions。 Reactive Extensions 是一个库,它使您能够使用类似 LINQ 的运算符处理异步数据流。 这是使用观察者模式实现的。
简单来说,您有一个 IObservable<T>,它发出数据/事件。 然后,您可以像使用 LINQ 的 IEnumerable<T> 一样处理这些数据,过滤它,转换它等等,然后数据到达订阅者,称为 IObserver<T>,订阅者订阅流并处理结果。
仅 Reactive Extensions 库就足以满足我们一半的要求。 我们可以设置一个事件源,一个管道,它过滤和转换数据,最后是一个订阅者来处理事件。
然而,我们仍然缺少使用依赖注入的能力,我个人认为,很难在单个类的 Reactive.Linq 查询的大链中实现单一责任。
带有 IObservable 的管道架构
首先,我们需要一个接口,我们可以用它来声明一个管道。 接口应该清楚什么进什么出。 我决定将其命名为 IPipe<TIn, TOut>。 此 Pipe 必须包含一个方法,该方法将 IObservable<TIn> 作为输入并返回 IObservable<TOut>。 我们称之为 Handle()。
public interface IPipe<TIn, TOut>{ IObservable<TOut> Handle(IObservable<TIn> source);}
这个管道的实例现在可以一个接一个地链接起来,同时是类型安全的。 为了能够将多个管道链接在一起,最好有一个简单的扩展方法,而不是将那些 Handle() 调用嵌套在另一个调用中。 这就是我们创建以下扩展方法的原因:
public static IObservable<TOut> Pipe<TIn, TOut>( this IObservable<TIn> source, IPipe<TIn, TOut> pipe) => pipe.Handle(source);
这样我们就可以一个接一个地链接管道,而无需在代码中深度嵌套我们的调用链。
我们现在可以创建并将单个负责的管道链接在一起,从而创建一个相当干净的管道。 但是,我们仍然缺少依赖注入部分。
为了能够从依赖注入创建管道,必须将管道和依赖项添加到 DI 容器中。 问题就在这里:当从全局 DI 容器创建管道时,我们如何专门配置和重用管道?
看看这个过滤管道的例子:
internal record MessageFilterPipeOptions(Guid ReceiverId);internal class MessageFilterPipe : IPipe<ChatMessage, ChatMessage>{ private readonly MessageFilterPipeOptions options; private readonly ILogger<MessageFilterPipe> logger; public MessageFilterPipe( MessageFilterPipeOptions options, ILogger<MessageFilterPipe> logger ) { this.options = options ?? throw new ArgumentNullException(nameof(options)); this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public IObservable<ChatMessage> Handle(IObservable<ChatMessage> source) { return source .Where(m => m.ReceiverId == options.ReceiverId) .Do(m => logger.LogInformation($"Filter let message: '{m.Message}' pass through.")); }}
要设置它,我们需要注入一个 ILogger,它通常被添加到全局 DI-Container 和一个特定的 Options-Instance,我们当然不希望它在全局可用。 为实现这一点,我们需要每个管道有一个单独的 DI 容器,我们在其中为每个管道添加特定的配置。
我决定为此创建一个 IPipelineBuilder。 构建器应该有自己的 DI 容器,它源自全局容器。 在这个容器中,我们可以安全地为我们的管道添加选项。 此外,我们将使用此构建器以类型安全的方式设置管道本身。
internal class PipelineBuilder : IPipelineBuilder{ private readonly IServiceCollection serviceCollection; public PipelineBuilder(IServiceCollection serviceCollection) { this.serviceCollection = serviceCollection?.Clone() ?? throw new ArgumentNullException(nameof(serviceCollection)); } public IPipelineBuilder ConfigureOptions( Action<IPipelineConfigurationBuilder> configure ) { var configurationBuilder = new PipelineConfigurationBuilder(serviceCollection); configure(configurationBuilder); return new PipelineBuilder(serviceCollection); } public IPipelineBuilder<TOut> ConfigurePipeline<TOut>( Func<IPipelineSourceBuilder, IPipelineStepBuilder<TOut>> build ) { var sourceBuilder = new PipelineSourceBuilder(serviceCollection); var constructor = (PipelineStepBuilder<TOut>)build(sourceBuilder); return new PipelineBuilder<TOut>(constructor.Source); }}internal class PipelineBuilder<TOut> : IPipelineBuilder<TOut>{ private readonly IObservable<TOut> source; public PipelineBuilder(IObservable<TOut> source) { this.source = source ?? throw new ArgumentNullException(nameof(source)); } public IObservable<TOut> Build() { return source; }}
PipelineBuilder 有两个方法:ConfigureOptions() 和 ConfigurePipeline()。 您可以使用第一个方法将特定的管道选项添加到构建器的 DI 容器中,第二种方法用于将 Observable-Pipeline 链接在一起。
我的目标是让管道的创建尽可能简单,需要的参数尽可能少。 出于这个原因,我决定强制将 Source-Observable 添加到 IPipelineStepBuilder,因为这为我们节省了多个额外的通用类型参数。
var pipeline = pipelineBuilder .ConfigureOptions(builder => builder .Add(new MessageFilterPipeOptions(Guid.Empty)) ) .ConfigurePipeline(builder => builder .AddSource(chatMessages) .AddStep<LoggerPipe, ChatMessage>() .AddStep<MessageFilterPipe, ChatMessage>() .AddStep<MessageTransformPipe, IdentifiedChatMessage>() .AddStep(new ConsoleLoggerPipe()) ) .Build();
如您所见,添加源后,您可以添加一个管道来处理 ChatMessage。 这一步由 LoggerPipe 处理,并将 ChatMessage 返回给下一个管道。 MessageFilterPipe 还处理 ChatMessage,并且只允许满足特定条件的消息通过。 MessageTransformPipe 显示了如何在管道内发生转换并返回另一种类型的 IdentifiedChatMessage。
另一个好处是,您可以决定将 IPipe<TIn,TOut> 的实例添加到管道中,这不需要您添加通用参数,或者让它由构建器中的 DI-Container 构建,您可以在其中构建 遗憾的是必须指定管道的传出类型,因为这无法推断。
调用 Build() 后,您将收到一个 IObservable<TOut>,您可以订阅它。
剩下要做的唯一一件事就是向正在设置管道的类提供 IPipelineBuilder 的实例。 为此,我们为 IServiceCollection 实现一个扩展方法:
public static class ServiceCollectionExtensions{ public static IServiceCollection AddObservablePipelines(this IServiceCollection services) => services .AddTransient<IServiceCollection>(_ => services) .AddTransient<IPipelineBuilder, PipelineBuilder>();}
这使得 DI-Container 可供 PipelineBuilder 克隆,并将构建器添加为临时服务,它始终生成一个新的、干净的 IPipelineBuilder 实例。
总结
使用这些模块,我们能够最大限度地减少所需的代码,以创建一个干净的管道。 管道可以托管无限管道,并且可以使用依赖注入以类型安全的方式构建。 此外,我们能够添加特定的配置选项,这样我们就可以托管多个管道,这些管道具有相同的管道,但具有不同的配置。
管道架构本身强制执行更清晰的代码,因为它会非常清楚一步一步发生的事情。 当您遵循单一职责原则并提供良好的命名时,在构建您的管道时,您将毫无问题地识别错误发生的位置,或在何处添加额外的功能。 此外,通过在管道之间或末端添加额外的管道,可以非常容易地扩展功能。
标签: #net项目架构