龙空技术网

RocketMQ原理-消息发送流程

Java架构学习指南 282

前言:

此刻朋友们对“rocketmq指定队列发送”大约比较珍视,各位老铁们都想要知道一些“rocketmq指定队列发送”的相关知识。那么小编也在网上收集了一些关于“rocketmq指定队列发送””的相关知识,希望大家能喜欢,看官们快快来了解一下吧!

RocketMQ 源码版本 4.9.1

概述整体架构

各角色说明:

NameServer,负责提供路由服务Producer,生产者,负责发送消息Broker,消息队列,负责存储消息并提供相关的API操作Consumer,消费者,负责消费消息生产-消费模型

消息发送方式:同步,异步,单向

消息类型:普通消息(包含延迟消息),顺序消息(全局顺序与局部顺序),事务消息

本文通过同步发送普通消息的Demo,来了解消息发送的主要流程。

生产者

下面看到一个生产者发送消息的 demo

主要做了几件事:

初始化一个生产者(DefaultMQProducer)对象设置 NameServer 的地址启动生产者发送消息启动

启动过程中,主要的几个定时任务:

NameServer 地址定时更新任务(没有显示指定 NameServer 的场景),支持动态更新 NameServer 地址本地路由信息定时更新任务,定时从 NameServer 拉取最新的路由信息更新到本地缓存Broker心跳检测任务,定时向Broker集群发送心跳包,并清除已下线的Broker

注意:由于生产者与消费者底层都是通过 MQClientInstance 类与 Broker 服务通信,而消息拉取服务,重负载均衡服务都是针对消费者端的服务,因此即使生产者启动了这两个服务,实际也不会生效。

消息发送

消息发送过程中,主要包含两个核心步骤:

路由信息获取,根据消息的 Topic 拿到可用 Broker 的服务路由负载均衡机制,即选择合适的消息队列路由获取

步骤如下:

从本地缓存获取指定 Topic 的路由信息,如果获取到则返回结果从 NameServer 获取指定 Topic 的路由信息,如果获取到则更新本地缓存,返回结果从 NameServer 获取默认 Topic 的路由信息,如果获取到则更新本地缓存,返回结果负载均衡

宗旨就是均匀地把消息发送到各个 Broker 中的消息队列。

负载均衡策略分两种:

默认的负载均衡策略,即通过轮询的方式选择消息队列,在线程级别维护了一个队列下标计数器故障延迟的负载均衡策略,选择消息队列时,在一定时间内会规避掉故障的 Broker故障延迟机制

普通的负载均衡策略虽然也有规避故障 Broker 的逻辑,但它只能作用在一次消息发送的重试场景。

实现逻辑如下:

在线程级别维护一个队列下标计数器每次选择队列时会将计数器的值 + 1,再和当前可用队列总数取模,最终计算出目标队列的下标在重试发送消息时,会传入上次发送失败的 BrokerName,如果发现目标队列所属的 Broker 与上次发送失败的 Broker 名称相同,则继续步骤1,2 重新选择队列(通过这种方式规避故障的 Broker)

如果希望在多次发送消息过程中,规避掉发生故障的Broker,则可通过 sendLatencyFaultEnable 配置开启故障延迟机制。

整体流程如下:

核心点:

在消息发送失败后,会根据消息发送的延迟时间将对应的 Broker 隔离一段时间,称为故障隔离期故障隔离期内生产者认为该 Broker 不可用,即在下次发送消息时会规避掉该 Broker。

隔离时间依据上一次消息发送的延迟时间来定,延迟时间越长则相应的隔离时间也越久。如下是 RocketMQ 定义的延迟时间与隔离时间的对应关系。

总结

本文主要包含以下内容:

生产者启动流程生产者消息发送主流程路由信息获取流程生产者负载均衡策略,并主要介绍了故障延迟机制

通过阅读这篇文章,可以对 RocketMQ 消息发送流程有一个整体的认识,了解了它通过负载均衡策略,故障延迟机制实现了系统可扩展,高可用的特性。

标签: #rocketmq指定队列发送