龙空技术网

RabbitMQ详解

闪念基因 6715

前言:

而今各位老铁们对“rabbit算法”大体比较珍视,我们都想要学习一些“rabbit算法”的相关内容。那么小编同时在网络上网罗了一些关于“rabbit算法””的相关文章,希望你们能喜欢,各位老铁们快快来学习一下吧!

RabbitMQ的优点:

开源, 性能有效, 稳定性好提供可靠性消息投递模式(confirm), 返回模式(return)等与Spring完美整合, API丰富集群模式丰富, 支持表达式配置, 高可用HA模式, 镜像队列模型可以保证数据不丢失的前提下做到高可靠性, 可用性RabbitMQ高性能原因:由Erlang语言开发,继承其天生的并发性,稳定性和安全性有保障RabbitMQ的协议:

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制。

设计概念解释:

Server : 又称Broker, 接受客户端连接, 实现AMQP实体服务Connection : 连接, 应用程序与Broker的网络连接Channel : 网络信道, 几乎所有的操作都在Channel中进行, Channel是进行消息读写的通道。客户端可以建立多个Channel, 每个Channel代表一个会话任务。Message : 消息, 服务器和应用程序之间传送的数据, 有Properties和Body组成。Properties可以对消息进行修饰, 比如消息的优先级, 延迟等高级特性; Body就是消息体内容。Virtual Host : 虚拟地址, 用于进行逻辑隔离, 最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue, 同一个Virtual Host里面不能有相同名称的Exchange或QueueExchange : 交换机, 用于接收消息, 根据路由键转发消息到绑定的队列Binding : Exchange和Queue之间的虚拟连接, binding中可以包含routing keyRouting Key : 一个路由规则, 虚拟机可用它来确定如何路由一个特定消息Queue : 也成Message Queue, 消息队列, 用于保存消息并将它们转发给消费者RabbitMQ整体架构RabbitMQ成员简介Binding-绑定Exchange和Exchange, Queue之间的连接关系绑定中可以包含RoutingKey或者参数Queue-消息队列消息队列, 实际存储消息数据Durability : 是否持久化Auto delete : 如选yes,代表当最后一个监听被移除之后, 该Queue会自动被删除Message-消息服务和应用程序之间传送的数据本质上就是一段数据, 由Properties和Payload(Body)组成常用属性 : delivery mode, headers(自定义属性)其他属性content_type, content_encoding, prioritycorrelation_id : 可以认为是消息的唯一idreplay_to : 重回队列设定expiration : 消息过期时间message_id : 消息idtimestamp, type, user_id, app_id, cluster_idVirtual Host-虚拟主机虚拟地址, 用于进行逻辑隔离, 最上层的消息路由一个Virtual Host里面可以有若干个Exchange和Queue同一个Virtual Host里面不能有相同名称的Exchange或QueueExchange交换机

接收消息,并根据路由键转发消息到所绑定的队列

注:交换机不会存储消息,如果消息发送到没有绑定消费队列的交换机,消息则丢失。

交换机的属性Name : 交换机名称Type : 交换机类型, direct, topic, fanout, headersDurability : 是否需要持久化, true为持久化Auto Delete : 当最后一个绑定到Exchange上的队列删除后, 自动删除该ExchangeInternal : 当前Exchange是否用于RabbitMQ内部使用, 默认为False, 这个属性很少会用到Arguments : 扩展参数, 用于扩展AMQP协议制定化使用交换机的四种类型Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的注意 : Direct模式可以使用RabbitMQ自带的Exchange(default Exchange), 所以不需要将Exchange进行任何绑定(binding)操作, 消息传递时, RoutingKey必须完全匹配才会被队列接收, 否则该消息会被抛弃Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列不处理路由键, 只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列(模糊匹配)“#” : 匹配一个或多个词“*” : 匹配一个词Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。RabbitMQ常用的5种工作模式1、点对点(简单)的队列不需要交换机一个生产者,一个消费者2、工作队列(公平性)不需要交换机一个生产者,多个消费者,但是一个消息只会发送给一个队列(竞争的消费者模式)默认是轮询,即会将消息轮流发给多个消费者,但这样对消费得比较慢的消费者不公平可采用公平分配,即能者多劳channel.basicQos(1);// 限定:发送一条信息给消费者A,消费者A未反馈处理结果之前,不会再次发送信息给消费者Aboolean autoAck = false;// 取消自动反馈 channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 接收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反馈消息处理完毕3、发布/订阅一个生产者,多个消费者每一个消费者都有自己的一个队列生产者没有直接发消息到队列中,而是发送到交换机每个消费者的队列都绑定到交换机上消息通过交换机到达每个消费者的队列

该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列

4、路由

生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

该模式采用Direct exchange(直连交换机)

5、主题(通配符)

此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,交换机根据绑定队列的routing key的值进行通配符匹配

符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor

该模式采用Topic exchange(主题交换机)

消息可靠性传递或回退(生产者端)

生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

AMQP 事务机制txSelect 将当前channel设置为transaction模式txCommit 提交当前事务txRollback 事务回滚Confirm 模式

消息的确认, 是指生产者投递消息后, 如果Broker收到消息, 则会给我们产生一个应答

生产者进行接收应答, 用来确定这条消息是否正常发送到Broker, 这种方式也是消息的可靠性投递的核心保障

在channel上开启确认模式 : channel.confirmSelect()在channel上添加监听 : addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送, 或记录日志等后续处理Return消息机制

Return Listener用于处理一些不可路由的消息

正常情况下消息生产者通过指定一个Exchange和RoutingKey, 把消息送到某一个队列中去, 然后消费者监听队列, 进行消费,但在某些情况下, 如果在发送消息的时候, 当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener。

在基础API中有一个关键的配置项Mandatory : 如果为true, 则监听器会接收到路由不可达的消息, 然后进行后续处理(补偿或人工处理), 如果为false, 那么broker端自动删除该消息。

如何保障消息可靠传递保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)的确认应答完善的消息补偿机制

方案:

1、消息落库, 对消息状态进行标记

step1:消息入库step2:消息发送step3:消费端消息确认step4:更新库中消息状态为已确认step5:定时任务读取数据库中未确认的消息step6:未收到确认结果的消息重新发送step7:如果重试几次之后仍然失败, 则将消息状态更改为投递失败的终态, 后面需要人工介入

2、消息的延迟投递, 做二次确认, 回调检查

step1 : 第一次消息发送, 必须业务数据落库之后才能进行消息发送step2 : 第二次消息延迟发送, 设定延迟一段时间发送第二次check消息step3 : 消费端监听Broker, 进行消息消费step4 : 消费成功之后, 发送确认消息到确认消息队列step5 : Callback Service监听step4中的确认消息队列, 维护消息状态, 是否消费成功等状态step6 : Callback Service监听step2发送的Delay Check的消息队列, 检测内部的消息状态, 如果消息是发送成功状态, 则流程结束, 如果消息是失败状态, 或者查不到当前消息状态时, 会通知生产者, 进行消息重发, 重新上述步骤重试机制和幂等性保障(消费者端)重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,会使用消息重试机制。

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。

对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿

重试机制的实现

在SpringBoot中,@RabbitListener(queue="")用于消费者监听队列。底层使用Aop进行拦截,如果程序没有抛出异常,则自动提交事务。如果抛出异常,该消息会缓存到RabbitMQ服务器,自动实施重试机制,一直到成功为止。可以配置重试间隔时间和重试的次数。

幂等性保障

幂等性:多次执行, 结果保持一致

网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,在重试过程中,可能会造成重复消费。

解决方案:

唯一ID+指纹码机制唯一ID + 指纹码机制,利用数据库主键去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指纹码好处:实现简单坏处:高并发下有数据库写入的性能瓶颈解决方案:跟进ID进行分库分表进行算法路由利用Redis的原子性去实现在接收到消息后将消息ID作为key执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费了,执行失败表示消息已经被消费了。自动签收与手动签收(消费端)

默认是自动签收

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);//关闭自动签收,变为手动签收
channel.basicAck(envelope.getDeliveryTag(), false);// 手工签收, 第二个参数表示是否批量签收
消费端限流

消息队列中囤积了大量的消息, 或者某些时刻生产的消息远远大于消费者处理能力的时候, 这个时候如果消费者一次取出大量的消息, 但是客户端又无法处理, 就会出现问题, 甚至可能导致服务崩溃, 所以需要对消费端进行限流

RabbitMQ提供了一种qos(服务质量保证)功能, 即在非自动确认消息的前提下, 如果一定数目的消息(通过consumer或者channel设置qos的值)未被确认前, 不进行消费新的消息

自动签收要设置成false, 建议实际工作中也设置成falsevoid basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息大小限制, 一般设置为0, 消费端不做限制prefetchCount : 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息, 即一旦有N个消息还没有ack, 则该consumer将block(阻塞), 直到有消息ackglobal : true/false 是否将上面设置应用于channel, 简单来说就是上面的限制是channel级别的还是consumer级别 注意 :

prefetchSize和global这两项,RabbitMQ没有实现,暂且不关注,prefetchCount在autoAck设置false的情况下生效,即在自动确认的情况下这个值是不生效的

限流可实现公平队列。

标签: #rabbit算法