龙空技术网

Java 使用 Resilience4j 框架实现客户端 API 调用的限速/节流机制

信码由缰 190

前言:

今天兄弟们对“java读字节流”大致比较关怀,小伙伴们都想要分析一些“java读字节流”的相关知识。那么小编在网摘上网罗了一些有关“java读字节流””的相关知识,希望你们能喜欢,各位老铁们一起来了解一下吧!

在本系列的上一篇文章中,我们了解了 Resilience4j 以及如何使用其 [Retry 模块]()。现在让我们了解 RateLimiter - 它是什么,何时以及如何使用它,以及在实施速率限制(或者也称为“节流”)时要注意什么。

代码示例

本文附有GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 [Resilience4j 的一般工作原理]()

什么是限速?

我们可以从两个角度来看待速率限制——作为服务提供者和作为服务消费者。

服务端限速

作为服务提供商,我们实施速率限制以保护我们的资源免受过载和拒绝服务 (DoS) 攻击。

为了满足我们与所有消费者的服务水平协议 (SLA),我们希望确保一个导致流量激增的消费者不会影响我们对他人的服务质量。

我们通过设置在给定时间单位内允许消费者发出多少请求的限制来做到这一点。我们通过适当的响应拒绝任何超出限制的请求,例如 HTTP 状态 429(请求过多)。这称为服务器端速率限制。

速率限制以每秒请求数 (rps)、每分钟请求数 (rpm) 或类似形式指定。某些服务在不同的持续时间(例如 50 rpm 且不超过 2500 rph)和一天中的不同时间(例如,白天 100 rps 和晚上 150 rps)有多个速率限制。该限制可能适用于单个用户(由用户 ID、IP 地址、API 访问密钥等标识)或多租户应用程序中的租户。

客户端限速

作为服务的消费者,我们希望确保我们不会使服务提供者过载。此外,我们不想招致意外的成本——无论是金钱上的还是服务质量方面的。

如果我们消费的服务是有弹性的,就会发生这种情况。服务提供商可能不会限制我们的请求,而是会因额外负载而向我们收取额外费用。有些甚至在短时间内禁止行为不端的客户。消费者为防止此类问题而实施的速率限制称为客户端速率限制。

何时使用 RateLimiter?

resilience4j-ratelimiter 用于客户端速率限制。

服务器端速率限制需要诸如缓存和多个服务器实例之间的协调之类的东西,这是 resilience4j 不支持的。对于服务器端的速率限制,有 API 网关和 API 过滤器,例如 Kong API Gateway Repose API Filter。Resilience4j 的 RateLimiter 模块并不打算取代它们。

Resilience4j RateLimiter 概念

想要调用远程服务的线程首先向 RateLimiter 请求许可。如果 RateLimiter 允许,则线程继续。 否则,RateLimiter 会停放线程或将其置于等待状态。

RateLimiter 定期创建新权限。当权限可用时,线程会收到通知,然后可以继续。

一段时间内允许的调用次数称为 limitForPeriod。RateLimiter 刷新权限的频率由 limitRefreshPeriod 指定。timeoutDuration 指定线程可以等待多长时间获取权限。如果在等待时间结束时没有可用的权限,RateLimiter 将抛出 RequestNotPermitted 运行时异常。

使用Resilience4j RateLimiter 模块

RateLimiterRegistryRateLimiterConfig RateLimiter resilience4j-ratelimiter 的主要抽象。

RateLimiterRegistry 是一个用于创建和管理 RateLimiter 对象的工厂。

RateLimiterConfig 封装了 limitForPeriodlimitRefreshPeriod timeoutDuration 配置。每个 RateLimiter 对象都与一个 RateLimiterConfig 相关联。

RateLimiter 提供辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。

让我们看看如何使用 RateLimiter 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

基本示例

第一步是创建一个 RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.ofDefaults();

这将创建一个 RateLimiterConfig,其默认值为 limitForPeriod (50)、limitRefreshPeriod(500ns) 和 timeoutDuration (5s)。

假设我们与航空公司服务的合同规定我们可以以 1 rps 调用他们的搜索 API。然后我们将像这样创建 RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.custom()  .limitForPeriod(1)  .limitRefreshPeriod(Duration.ofSeconds(1))  .timeoutDuration(Duration.ofSeconds(1))  .build();

如果线程无法在指定的 1 秒 timeoutDuration 内获取权限,则会出错。

然后我们创建一个 RateLimiter 并装饰 searchFlights() 调用:

RateLimiterRegistry registry = RateLimiterRegistry.of(config);RateLimiter limiter = registry.rateLimiter("flightSearchService");// FlightSearchService and SearchRequest creation omittedSupplier<List<Flight>> flightsSupplier =  RateLimiter.decorateSupplier(limiter,    () -> service.searchFlights(request));

最后,我们多次使用装饰过的 Supplier<List<Flight>>

for (int i=0; i<3; i++) {  System.out.println(flightsSupplier.get());}

示例输出中的时间戳显示每秒发出一个请求:

Searching for flights; current time = 15:29:40 786...[Flight{flightNumber='XY 765', ... }, ... ]Searching for flights; current time = 15:29:41 791...[Flight{flightNumber='XY 765', ... }, ... ]

如果超出限制,我们会收到 RequestNotPermitted 异常:

Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43) at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)... other lines omitted ...
装饰方法抛出已检异常

假设我们正在调用 FlightSearchService.searchFlightsThrowingException() ,它可以抛出一个已检 Exception。那么我们就不能使用 RateLimiter.decorateSupplier()。我们将使用 RateLimiter.decorateCheckedSupplier() 代替:

CheckedFunction0<List<Flight>> flights =  RateLimiter.decorateCheckedSupplier(limiter,    () -> service.searchFlightsThrowingException(request));try {  System.out.println(flights.apply());} catch (...) {  // exception handling}

RateLimiter.decorateCheckedSupplier() 返回一个 CheckedFunction0,它表示一个没有参数的函数。请注意对 CheckedFunction0 对象的 apply() 调用以调用远程操作。

如果我们不想使用 Suppliers ,RateLimiter 提供了更多的辅助装饰器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以与其他语言结构一起使用。decorateChecked* 方法用于装饰抛出已检查异常的方法。

应用多个速率限制

假设航空公司的航班搜索有多个速率限制:2 rps 40 rpm。 我们可以通过创建多个 RateLimiters 在客户端应用多个限制:

RateLimiterConfig rpsConfig = RateLimiterConfig.custom().  limitForPeriod(2).  limitRefreshPeriod(Duration.ofSeconds(1)).  timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterConfig rpmConfig = RateLimiterConfig.custom().  limitForPeriod(40).  limitRefreshPeriod(Duration.ofMinutes(1)).  timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);RateLimiter rpsLimiter =  registry.rateLimiter("flightSearchService_rps", rpsConfig);RateLimiter rpmLimiter =  registry.rateLimiter("flightSearchService_rpm", rpmConfig);  

然后我们使用两个 RateLimiters 装饰 searchFlights() 方法:

Supplier<List<Flight>> rpsLimitedSupplier =  RateLimiter.decorateSupplier(rpsLimiter,    () -> service.searchFlights(request));Supplier<List<Flight>> flightsSupplier  = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);

示例输出显示每秒发出 2 个请求,并且限制为 40 个请求:

Searching for flights; current time = 15:13:21 246...Searching for flights; current time = 15:13:21 249...Searching for flights; current time = 15:13:22 212...Searching for flights; current time = 15:13:40 215...Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:RateLimiter 'flightSearchService_rpm' does not permit further callsat io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
在运行时更改限制

如果需要,我们可以在运行时更改 limitForPeriod timeoutDuration 的值:

limiter.changeLimitForPeriod(2);limiter.changeTimeoutDuration(Duration.ofSeconds(2));

例如,如果我们的速率限制根据一天中的时间而变化,则此功能很有用 - 我们可以有一个计划线程来更改这些值。新值不会影响当前正在等待权限的线程。

RateLimiter和 Retry一起使用

假设我们想在收到 RequestNotPermitted 异常时重试,因为它是一个暂时性错误。我们会像往常一样创建 RateLimiter Retry 对象。然后我们装饰一个 Supplier 的供应商并用 Retry 包装它:

Supplier<List<Flight>> rateLimitedFlightsSupplier =  RateLimiter.decorateSupplier(rateLimiter,    () -> service.searchFlights(request));Supplier<List<Flight>> retryingFlightsSupplier =  Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);

示例输出显示为 RequestNotPermitted 异常重试请求:

Searching for flights; current time = 15:29:39 847Flight search successful[Flight{flightNumber='XY 765', ... }, ... ]Searching for flights; current time = 17:10:09 218...[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]2020-07-27T17:10:09.484: Retry 'rateLimitedFlightSearch', waiting PT1S until attempt '1'. Last attempt failed with exception 'io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls'.Searching for flights; current time = 17:10:10 492...2020-07-27T17:10:10.494: Retry 'rateLimitedFlightSearch' recorded a successful retry attempt...[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]

我们创建装饰器的顺序很重要。如果我们将 Retry RateLimiter 包装在一起,它将不起作用。

RateLimiter 事件

RateLimiter 有一个 EventPublisher,它在调用远程操作时生成 RateLimiterOnSuccessEvent RateLimiterOnFailureEvent 类型的事件,以指示获取权限是否成功。我们可以监听这些事件并记录它们,例如:

RateLimiter limiter = registry.rateLimiter("flightSearchService");limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));

日志输出示例如下:

RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.127+05:30}... other lines omitted ...RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.186+05:30}
RateLimiter 指标

假设在实施客户端节流后,我们发现 API 的响应时间增加了。这是可能的 - 正如我们所见,如果在线程调用远程操作时权限不可用,RateLimiter 会将线程置于等待状态。

如果我们的请求处理线程经常等待获得许可,则可能意味着我们的 limitForPeriod 太低。也许我们需要与我们的服务提供商合作并首先获得额外的配额。

监控 RateLimiter 指标可帮助我们识别此类容量问题,并确保我们在 RateLimiterConfig 上设置的值运行良好。

RateLimiter 跟踪两个指标:可用权限的数量(resilience4j.ratelimiter.available.permissions)和等待权限的线程数量(resilience4j.ratelimiter.waiting.threads)。

首先,我们像往常一样创建 RateLimiterConfigRateLimiterRegistry RateLimiter。然后,我们创建一个 MeterRegistry 并将 RateLimiterRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry)  .bindTo(meterRegistry);

运行几次限速操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {  String desc = meter.getId().getDescription();  String metricName = meter.getId().getName();  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)    .filter(m -> m.getStatistic().name().equals("VALUE"))    .findFirst()    .map(m -> m.getValue())    .orElse(0.0);  System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

这是一些示例输出:

The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0

resilience4j.ratelimiter.available.permissions 的负值显示为请求线程保留的权限数。在实际应用中,我们会定期将数据导出到监控系统,并在仪表板上进行分析。

实施客户端速率限制时的陷阱和良好实践使速率限制器成为单例

对给定远程服务的所有调用都应通过相同的 RateLimiter 实例。对于给定的远程服务,RateLimiter 必须是单例

如果我们不强制执行此操作,我们代码库的某些区域可能会绕过 RateLimiter 直接调用远程服务。为了防止这种情况,对远程服务的实际调用应该在核心、内部层和其他区域应该使用内部层暴露的限速装饰器。

我们如何确保未来的新开发人员理解这一意图?查看 Tom 的文章,其中揭示了一种解决此类问题的方法,即通过组织包结构来明确此类意图。此外,它还展示了如何通过在 ArchUnit 测试中编码意图来强制执行此操作。

为多个服务器实例配置速率限制器

为配置找出正确的值可能很棘手。如果我们在集群中运行多个服务实例,limitForPeriod 的值必须考虑到这一点。

例如,如果上游服务的速率限制为 100 rps,而我们的服务有 4 个实例,那么我们将配置 25 rps 作为每个实例的限制。

然而,这假设我们每个实例上的负载大致相同。 如果情况并非如此,或者如果我们的服务本身具有弹性并且实例数量可能会有所不同,那么 Resilience4j 的 RateLimiter 可能不适合

在这种情况下,我们需要一个速率限制器,将其数据保存在分布式缓存中,而不是像 Resilience4j RateLimiter 那样保存在内存中。但这会影响我们服务的响应时间。另一种选择是实现某种自适应速率限制。尽管 Resilience4j 可能会支持它,但尚不清楚何时可用。

选择正确的超时时间

对于 timeoutDuration 配置值,我们应该牢记 API 的预期响应时间。

如果我们将 timeoutDuration 设置得太高,响应时间和吞吐量就会受到影响。如果它太低,我们的错误率可能会增加。

由于此处可能涉及一些反复试验,因此一个好的做法是将我们在 RateLimiterConfig 中使用的值(如 timeoutDurationlimitForPeriod limitRefreshPeriod)作为我们服务之外的配置进行维护。然后我们可以在不更改代码的情况下更改它们。

调优客户端和服务器端速率限制器

实现客户端速率限制并不能保证我们永远不会受到上游服务的速率限制。

假设我们有来自上游服务的 2 rps 的限制,并且我们将 limitForPeriod 配置为 2,将 limitRefreshPeriod 配置为 1s。如果我们在第二秒的最后几毫秒发出两个请求,在此之前没有其他调用,RateLimiter 将允许它们。如果我们在下一秒的前几毫秒内再进行两次调用,RateLimiter 也会允许它们,因为有两个新权限可用。但是上游服务可能会拒绝这两个请求,因为服务器通常会实现基于滑动窗口的速率限制。

为了保证我们永远不会从上游服务中获得超过速率,我们需要将客户端中的固定窗口配置为短于服务中的滑动窗口。因此,如果我们在前面的示例中将 limitForPeriod 配置为 1 并将 limitRefreshPeriod 配置为 500ms,我们就不会出现超出速率限制的错误。但是,第一个请求之后的所有三个请求都会等待,从而增加响应时间并降低吞吐量。

结论

在本文中,我们学习了如何使用 Resilience4j 的 RateLimiter 模块来实现客户端速率限制。 我们通过实际示例研究了配置它的不同方法。我们学习了一些在实施速率限制时要记住的良好做法和注意事项。

您可以使用 [GitHub 上]()的代码演示一个完整的应用程序来说明这些想法。

本文译自: [Implementing Rate Limiting with Resilience4j - Reflectoring]()

标签: #java读字节流