龙空技术网

RocketMQ 重试机制详解及最佳实践

阿里开发者 520

前言:

今天小伙伴们对“rocketmq 命令”可能比较关切,朋友们都需要剖析一些“rocketmq 命令”的相关文章。那么小编同时在网摘上搜集了一些对于“rocketmq 命令””的相关资讯,希望兄弟们能喜欢,我们一起来了解一下吧!

本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。

作者:斜阳

引言

本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践

RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。

生产者发送重试

RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢?

1. 生产者重试次数

RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试

同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。

2. 生产者重试间隔

在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略:

非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔

流控错误场景:系统会按照预设的指数退避策略进行延迟重试

为什么要引入退避和随机抖动?

如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connection-backoff.md。

INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒;

MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6;

JITTER :随机抖动因子,默认值:0.2;

MAX_BACKOFF :等待间隔时间上限,默认值:120 秒;

MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。

ConnectWithBackoff()  current_backoff = INITIAL_BACKOFF  current_deadline = now() + INITIAL_BACKOFF  while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)    SleepUntil(current_deadline)    current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)    current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)

特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。

3. 重试带来的副作用

不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大

远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息较多的重试次数也会增大服务端的处理压力

4. 用户的最佳实践是什么

1)合理设置发送超时时间,发送的最大次数

发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。

2)如何保证发送消息不丢失

由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种:

向调用方返回业务处理失败;尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。

3)关注流控异常导致无法重试

触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。

4)早期版本客户端如何使用故障延迟机制进行发送重试?

对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用:

producer.setSendLatencyFaultEnable(true)

配置重试次数使用:

producer.setRetryTimesWhenSendFailed()

producer.setRetryTimesWhenSendAsyncFailed()

消费者消费重试

消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢?

RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题:

如何保证业务完整处理消息?

消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。

业务应用异常时处理中的消息状态如何恢复?

当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。

1. 什么是消费重试?什么时候认为消费失败?

消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败

消费重试是什么?

消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;

重试过程状态机:消息在重试流程中的状态和变化逻辑;重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;最大重试次数:消息可被重试消费的最大次数。

2. 消息重试的场景

需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。

推荐使用场景:

业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功;是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。

正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。

错误使用场景:

消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。

反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。

消费处理中使用消费失败来做处理速率限流,是不合理的。

限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。

这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面:RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力;重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。

3. 不要以重试替代限流

上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。

如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。

RateLimiter rateLimiter = RateLimiter.create(50);PushConsumer pushConsumer = provider.newPushConsumerBuilder()    .setClientConfiguration(clientConfiguration)    // 设置订阅组名称    .setConsumerGroup(consumerGroup)    // 设置订阅的过滤器    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))    .setMessageListener(messageView -> {        // 阻塞直到获得一个令牌,也可以配置一个超时时间        rateLimiter.acquire();        LOGGER.info("Consume message={}", messageView);        return ConsumeResult.SUCCESS;    })    .build();

4. PushConsumer 消费重试策略

PushConsumer 消费消息时,消息的几个主要状态如下:

Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费;Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态;Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机;DLQ:死信状态

消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。

该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

最大重试次数

PushConsumer 的最大重试次数由创建时决定。

例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。

重试间隔时间无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:

说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。

顺序消息:重试间隔为固定时间,默认为 3 秒。

点击查看原文,获取更多福利!

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

标签: #rocketmq 命令