龙空技术网

.NET 管道架构的最佳方法

启辰8 350

前言:

当前看官们对“net项目架构”大致比较关心,看官们都需要学习一些“net项目架构”的相关资讯。那么小编同时在网络上网罗了一些对于“net项目架构””的相关知识,希望同学们能喜欢,各位老铁们快快来学习一下吧!

在我最近的项目中,我面临着将 Azure Blob 存储中的数据与本地文件存储同步的挑战。 然而,在同步时,应该对 blob 或文件的元数据应用多个过滤和转换,例如 按特定位置过滤,将文件路径转换为 blob 路由等……

在勾勒出数据流之后,我很快意识到,过滤和转换只是简单的链式操作,就像在管道中一样。 我决定使用管道和过滤器模式作为架构的应用程序逻辑部分,但在 .NET 中实现管道的最佳方法是什么?

我们想要完成的是将几个任务分解为一系列逻辑步骤。 此外,我们的目标是在每个单独的步骤中遵循单一职责原则。 一个步骤应该只关注一件事。 如果我们需要其他东西,我们只需在两者之间添加另一个步骤。 同样非常可取的是,让管道可以通过依赖注入,同时保持可配置。

综上所述,我们要完成以下需求:

一种按逻辑顺序快速设置管道的方法。管道可以使用依赖注入来构造。管道应该是可配置的。管道应该将一个事件流作为输入并输出另一个事件流。 事件类型可能会因此发生变化。方法

在研究过程中,我发现了很多在 .NET 中实现管道架构的方法,但在我看来,它们都过于复杂且不够灵活。 这就是为什么我决定为 .NET 重塑这种架构风格,使用 Reactive Extensions 使其更加简洁和直观。

根据我的经验,处理事件流的最佳方法是 Reactive Extensions。 Reactive Extensions 是一个库,它使您能够使用类似 LINQ 的运算符处理异步数据流。 这是使用观察者模式实现的。

简单来说,您有一个 IObservable<T>,它发出数据/事件。 然后,您可以像使用 LINQ 的 IEnumerable<T> 一样处理这些数据,过滤它,转换它等等,然后数据到达订阅者,称为 IObserver<T>,订阅者订阅流并处理结果。

仅 Reactive Extensions 库就足以满足我们一半的要求。 我们可以设置一个事件源,一个管道,它过滤和转换数据,最后是一个订阅者来处理事件。

然而,我们仍然缺少使用依赖注入的能力,我个人认为,很难在单个类的 Reactive.Linq 查询的大链中实现单一责任。

带有 IObservable 的管道架构

首先,我们需要一个接口,我们可以用它来声明一个管道。 接口应该清楚什么进什么出。 我决定将其命名为 IPipe<TIn, TOut>。 此 Pipe 必须包含一个方法,该方法将 IObservable<TIn> 作为输入并返回 IObservable<TOut>。 我们称之为 Handle()。

public interface IPipe<TIn, TOut>{  IObservable<TOut> Handle(IObservable<TIn> source);}

这个管道的实例现在可以一个接一个地链接起来,同时是类型安全的。 为了能够将多个管道链接在一起,最好有一个简单的扩展方法,而不是将那些 Handle() 调用嵌套在另一个调用中。 这就是我们创建以下扩展方法的原因:

public static IObservable<TOut> Pipe<TIn, TOut>(    this IObservable<TIn> source,    IPipe<TIn, TOut> pipe) => pipe.Handle(source);

这样我们就可以一个接一个地链接管道,而无需在代码中深度嵌套我们的调用链。

我们现在可以创建并将单个负责的管道链接在一起,从而创建一个相当干净的管道。 但是,我们仍然缺少依赖注入部分。

为了能够从依赖注入创建管道,必须将管道和依赖项添加到 DI 容器中。 问题就在这里:当从全局 DI 容器创建管道时,我们如何专门配置和重用管道?

看看这个过滤管道的例子:

internal record MessageFilterPipeOptions(Guid ReceiverId);internal class MessageFilterPipe : IPipe<ChatMessage, ChatMessage>{    private readonly MessageFilterPipeOptions options;    private readonly ILogger<MessageFilterPipe> logger;    public MessageFilterPipe(        MessageFilterPipeOptions options,        ILogger<MessageFilterPipe> logger    ) {        this.options = options            ?? throw new ArgumentNullException(nameof(options));        this.logger = logger            ?? throw new ArgumentNullException(nameof(logger));    }    public IObservable<ChatMessage> Handle(IObservable<ChatMessage> source) {        return source            .Where(m => m.ReceiverId == options.ReceiverId)            .Do(m => logger.LogInformation($"Filter let message: '{m.Message}' pass through."));    }}

要设置它,我们需要注入一个 ILogger,它通常被添加到全局 DI-Container 和一个特定的 Options-Instance,我们当然不希望它在全局可用。 为实现这一点,我们需要每个管道有一个单独的 DI 容器,我们在其中为每个管道添加特定的配置。

我决定为此创建一个 IPipelineBuilder。 构建器应该有自己的 DI 容器,它源自全局容器。 在这个容器中,我们可以安全地为我们的管道添加选项。 此外,我们将使用此构建器以类型安全的方式设置管道本身。

internal class PipelineBuilder : IPipelineBuilder{    private readonly IServiceCollection serviceCollection;    public PipelineBuilder(IServiceCollection serviceCollection) {        this.serviceCollection = serviceCollection?.Clone()            ?? throw new ArgumentNullException(nameof(serviceCollection));    }    public IPipelineBuilder ConfigureOptions(        Action<IPipelineConfigurationBuilder> configure    ) {        var configurationBuilder = new PipelineConfigurationBuilder(serviceCollection);        configure(configurationBuilder);        return new PipelineBuilder(serviceCollection);    }    public IPipelineBuilder<TOut> ConfigurePipeline<TOut>(        Func<IPipelineSourceBuilder, IPipelineStepBuilder<TOut>> build    ) {        var sourceBuilder = new PipelineSourceBuilder(serviceCollection);        var constructor = (PipelineStepBuilder<TOut>)build(sourceBuilder);        return new PipelineBuilder<TOut>(constructor.Source);    }}internal class PipelineBuilder<TOut> : IPipelineBuilder<TOut>{    private readonly IObservable<TOut> source;    public PipelineBuilder(IObservable<TOut> source) {        this.source = source            ?? throw new ArgumentNullException(nameof(source));    }    public IObservable<TOut> Build() {        return source;    }}

PipelineBuilder 有两个方法:ConfigureOptions() 和 ConfigurePipeline()。 您可以使用第一个方法将特定的管道选项添加到构建器的 DI 容器中,第二种方法用于将 Observable-Pipeline 链接在一起。

我的目标是让管道的创建尽可能简单,需要的参数尽可能少。 出于这个原因,我决定强制将 Source-Observable 添加到 IPipelineStepBuilder,因为这为我们节省了多个额外的通用类型参数。

var pipeline = pipelineBuilder    .ConfigureOptions(builder => builder        .Add(new MessageFilterPipeOptions(Guid.Empty))    )    .ConfigurePipeline(builder => builder        .AddSource(chatMessages)        .AddStep<LoggerPipe, ChatMessage>()        .AddStep<MessageFilterPipe, ChatMessage>()        .AddStep<MessageTransformPipe, IdentifiedChatMessage>()        .AddStep(new ConsoleLoggerPipe())    )    .Build();

如您所见,添加源后,您可以添加一个管道来处理 ChatMessage。 这一步由 LoggerPipe 处理,并将 ChatMessage 返回给下一个管道。 MessageFilterPipe 还处理 ChatMessage,并且只允许满足特定条件的消息通过。 MessageTransformPipe 显示了如何在管道内发生转换并返回另一种类型的 IdentifiedChatMessage。

另一个好处是,您可以决定将 IPipe<TIn,TOut> 的实例添加到管道中,这不需要您添加通用参数,或者让它由构建器中的 DI-Container 构建,您可以在其中构建 遗憾的是必须指定管道的传出类型,因为这无法推断。

调用 Build() 后,您将收到一个 IObservable<TOut>,您可以订阅它。

剩下要做的唯一一件事就是向正在设置管道的类提供 IPipelineBuilder 的实例。 为此,我们为 IServiceCollection 实现一个扩展方法:

public static class ServiceCollectionExtensions{    public static IServiceCollection AddObservablePipelines(this IServiceCollection services)        => services            .AddTransient<IServiceCollection>(_ => services)            .AddTransient<IPipelineBuilder, PipelineBuilder>();}

这使得 DI-Container 可供 PipelineBuilder 克隆,并将构建器添加为临时服务,它始终生成一个新的、干净的 IPipelineBuilder 实例。

总结

使用这些模块,我们能够最大限度地减少所需的代码,以创建一个干净的管道。 管道可以托管无限管道,并且可以使用依赖注入以类型安全的方式构建。 此外,我们能够添加特定的配置选项,这样我们就可以托管多个管道,这些管道具有相同的管道,但具有不同的配置。

管道架构本身强制执行更清晰的代码,因为它会非常清楚一步一步发生的事情。 当您遵循单一职责原则并提供良好的命名时,在构建您的管道时,您将毫无问题地识别错误发生的位置,或在何处添加额外的功能。 此外,通过在管道之间或末端添加额外的管道,可以非常容易地扩展功能。

标签: #net项目架构