龙空技术网

RabbitMQ使用及延迟队列的实现

马士兵教育 141

前言:

而今姐妹们对“消息队列实现过程是什么意思”大致比较注重,看官们都想要了解一些“消息队列实现过程是什么意思”的相关内容。那么小编在网络上汇集了一些关于“消息队列实现过程是什么意思””的相关文章,希望各位老铁们能喜欢,我们快快来了解一下吧!

选用RabbitMQ的原因

在我们的微服务项目里,为了更好的分析系统功能点性能、功能使用频率、加速问题追踪排查。我们设计在gateway收集请求、返回信息,将数据发送到请求分析系统。

系统设计时考虑如下几个问题:

不希望gateway的性能被请求分析系统(analysis)的消费能力所限制。所以需要gateway和请求分析系统之间解耦。并且由于gateway的访问量巨大,如果直接发送到请求分析系统,analysis需要做大量的无状态副本。http请求可能会存在消息丢失问题。所以综合考虑我们选用RabbitMQ来做消息收集,MQ的优点服务间解耦服务的异步调用流量削峰但是采用MQ也有缺点:增加了系统整体的复杂度几个基本概念AMQP

RabbitMQ是一个采用AMQP协议的高级消息队列。AMQP是一个定义了客户端到消息服务中间件中间互操作的一套标准。主要采用AMQP模型即:exchange,queue,binding等。

producers ——> exchange ——> queue ——> customers

Broker

即一个erlang node节点上运行着的RabbitMQ程序,即一个服务实体

cluster

在Broker基础之上,建立了了多node间的共享元数据的约束。

vHost

虚拟主机,主要用于资源隔离。一个broker有多个vhost

routing key

路由键,负责queue和exchange的绑定关系。消息被投递到exchange后,会根据routing key被投递到一个或多个队列中。

Exchange

交换机,用于接收消息。可以认为是routing key和queue的绑定表。交换机有多个类型。

dirct:直连交换机,根据routing key精确匹配队列投递消息。Topic:主题交换机,routing key的单词之间使用.隔开;使用通配符“*”匹配两个.之间一个单词,“#”匹配两个.之间多个单词,Fanout:扇形交换机,将消息投递到所有绑定的queue上。所以这里routing key被忽略headers:头交换机,根据消息头中的参数将消息投递到对应的交换机,存在与、或两种模式,即a&&b a||b。

他们的投递效率:fanout > dirct > topic > headers.一般headers效率太低不会采用。

queue

消息队列,消息最初由exchange接收消息,并根据消息的routing key转发到队列。customer通过监听队列获取消息。

channel

信道,RabbitMQ采用的是一个TCP连接上建立多个信道的方式来传输消息。

元数据

每个组件存在一些自己的元数据,比如:

queue(名称、routingkey,kv属性,例如x-message-ttl等)exchange(名称、type类型、kv属性等)binding(ex routing key queue的绑定关系)消息状态ready 即可以被消费的消息unacked 即已经被customer消费但尚未进行ack、unack、reject的消息.unack和reject区别:unack支持multiple参数,一次性可以批量处理小于当前ID的所有消息;而reject不支持。

说完基础概念,接下来探讨一下关于消息投递相关的问题

MQ如何保证消息的不丢失

消息会先通过exchange,然后被投递到queue。所以为了保证消息不丢失就要使用持久化exchange和queue。如图

生产者如何保障消息的百分百可靠性投递

rabbitMQ提供了两种方式供producers使用,即事务机制和confirm机制。

事务机制

即在投递消息之前通过channel.txSelect()开启事务。发送结束提交事务txCommit();异常回滚txRollback()。事务方案的效率过低,一般我们不采用这个方案。

confirm机制

发送消息时会给消息一个唯一ID(递增),当确认消息从exchange成功投递到了queue时,回通过回调异步通知producers;如果因为MQ内部报错那么会异步回调一个nack。

顺便再说一下Springboot 使用时的两个回调,confirmCallback和returnCallback

同步确认机制:使用confirm机制,在发送完消息之后,使用channel.waitForConfirms()等待回调。需要配置publisher-confirm-type: simple使用。异步确认机制:使用回调函数接收回调。publisher-confirm-type: correlated

效率: 事务 <<<<<<< 同步 < 异步

使用confirmCallback确认时,需要在发送时通过rabbitTemplate.convertAndSend传入一个correlationData对象。里面可以放我们的例如单号类型的信息。对消息进行一个发送确认。

customer如何确保成功消费

消费者通过channel的ack/nack/reject操作对消息进行消费、拒绝,重新入队等操作。

Ack机制分:

1,自动ACK:默认ack机制。当消息被推送到消费者就认为消费成功。存在处理报错导致数据丢失问题。一般不采用

2,手动ACK:在代码合适位置手动确认。可以根据业务处理选择ack.或者nack,是否重新入队等操作。

认定死信消息被拒绝病requeue=false消息过期队列达到最大长度队列长度限制

声明队列时可以通过参数设置队列最大长度或者最大byte,当超过最大后,队列会丢弃最老的消息或者转入死信队列

死信队列

当消息被认定为死信时,如果queue配置了x-dead-letter-exchange,将会把死信消息转发到DLX,死信交换机,再根据x-dead-letter-routing-key指定的routingk ey或者queue默认的routing key转发到queue上

延迟队列实现

RabbitMQ不支持延迟队列,但可以通过过期时间+死信队列实现

步骤;

先创建一个exchangeA(DLX),binding queueA。配置routing keyA,这组作为队列。消费者监听这个queue。再创建一个exchangeB,binding queueB,配置routing keyB,queue添加参数设置:x-message-ttl配置超时时间,毫秒;x-dead-letter-exchange配置死信交换机。即x-dead-letter-exchange=exchangeA;x-dead-letter-routing-key配置当消息死信时传入DLX时用的routing key,即x-dead-letter-routing-key=routing keyA,如不配置,则使用queueB的routing key,即routing keyB。还要保证queueB没有消费者监听。producers发送消息到exchangeB,当消息到达过期时间,将被认定为死信,转发到DLX,DLX根据routing key将消息路由到queueA上,消费者监听queueA,获取到消息。也可以将exchangB绑定多个queue,分别使用不同的过期时间。从而达到不同的延迟效果。

标签: #消息队列实现过程是什么意思