龙空技术网

13.Rabbitmq使用介绍

80后程序员在北京 83

前言:

现时朋友们对“动态缓冲监视程序作用”大致比较重视,各位老铁们都需要学习一些“动态缓冲监视程序作用”的相关知识。那么小编在网摘上搜集了一些对于“动态缓冲监视程序作用””的相关资讯,希望咱们能喜欢,咱们快快来学习一下吧!

一、前言二.一些RabbitMQ概念介绍三.消息传输介绍四.使用RabbitMQ好处五.代码六.总结一、前言

什么是MQ? 消息总线(Message Queue),是一种跨进程、异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。

MQ是干什么用的? 应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...

MQ衡量标准 服务性能、数据存储、集群架构

二.一些RabbitMQ概念介绍

Broker:消息队列服务器实体     broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。每个节点运行的服务程序,功能为维护该节点的队列的增删以及转发队列操作请求。Exchange:消息交换机,指定消息规则,处理消息和队列之间的关系        exchange是属于Vhost的。同一个Vhost不能有重复的exchange名称。         exchange的绑定功能,可以绑定queue,也可以绑定exchangequeue:队列载体,消息投入队列中master queue:每个队列都分为一个主队列和若干个镜像队列。mirror queue:镜像队列,作为master queue的备份。在master queue所在节点挂掉之后,系统把mirror queue提升为master queue,负责处理客户端队列操作请求。注意,mirror queue只做镜像,设计目的不是为了承担客户端读写压力。binding:绑定,把exchange和queue按照路由规则绑定起来Routing Key:路由关键字。exchange根据这个进行消息投递vhost:虚拟消息服务器,每个RabbitMQ服务器都能够创建虚拟消息服务器。Vhost之间相互完全隔离,不同Vhost之间无法共享Exchange和Queue。因此Vhost之间数据无法共享和分享RabbitMQ的Vhost主要是用来划分不同业务模块。不同业务模块之间没有信息交互。vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段producer:生产者,投递消息的程序consumer:消费者,接受消息的程序channel:信道(重要概念),打开信道才能进行通信,一个channel代码一个会话任务channel 是真实 TCP 连接之上的虚拟连接
三.消息传输介绍
JMS协议:    JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。   JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(Java Database Connectivity):   消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载 的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
AMQP协议:   JMS 非常棒,微软也开发了NMS(.NET 消息传递服务)来支持他们的平台和编程语言,它的效果也还不错,但使用两套不同标准的应用之间该怎么异步消息通信呢,因此,需要一个异步消息的通用标准。JMS、NMS都没有标准的底层协议,并且API是与编程语言绑定的。为了解决这个问题,AMQP诞生了。  AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
消息基于什么传输:     由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。channel是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制“dead letter”queue 的用途?   当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject 进行了拒绝时(同时设置 requeue=false),那么该消息会被放入“dead letter”queue 中。该 queue 可用于排查 message 被 reject 或 undeliver 的原因。消息持久化:     binding 关系可以表示为 exchange – binding – queue 。从文档中我们知道,若要求投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,要求 exchange 和 queue 都设置 durable 属性。其实这问题可以这么想,若 exchange 或 queue 未设置 durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属性,仍然存在 message 虽然能恢复但却无处容身的问题;同理,若 message 本身未设置 persistent 属性,则 message 的持久化更无从谈起。交换机: 过程消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配fanout(不绑定queue)fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。direct(只能绑定一个queue)direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中topic(绑定多个queue)topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。
四.使用RabbitMQ好处
特性:   应用解耦(系统拆分)    异步处理(预约挂号业务处理成功后,异步发送短信、推送消息、日志记录等)   消息分发   流量削峰       消息缓冲       监控
一个生产者 多个消费者:       每个消费者都有自己的队列,生产者没有直接把消息发送给队列,而是先发送给交换机exchange每个队列都要绑定到交换机上。生产者发送的消息是经过交换机的,然后到达队列,就能实现一个消息被多个消费者消费。        若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者。
事务:       对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
Confirm机制:       使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。         Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。
消息恢复:       consumer从durable queue中取回一条消息之后并发回了ACK消息,RabbitMQ就会将其标记,方便后续垃圾回收.如果一条持久化的消息没有被consumer取走,RabbitMQ重启之后会自动重建exchange和queue(以及bingding关系),消息通过持久化日志重建再次进入对应的queues,exchanges。持久化:   消息的持久化需要在消息投递的时候设置delivery mode值为2.由于消息实际存储于queue之中,"皮之不存毛将焉附"逻辑上,消息持久化同时要求exchange和queue也是持久化的.这是消息持久化必须满足的三个条件.   持久化的代价就是性能损失,磁盘IO远远慢于RAM(使用SSD会显著提高消息持久化的性能) , 持久化会大大降低RabbitMQ每秒可处理的消息.两者的性能差距可能在10倍以上.
多个Consumer订阅同一个队列:       如果一个rabbit queue有多个consumer,具体到队列中的某条消息只会发送到其中的一个Consumer。RabbitMQ的消息状态:    Ready:等待消费状态。    Unacked:等待被确认状态,当前消息已经被发送到了客户端。当客户端端断开后,如果这条消息没有被确认,这条消息重新进入Ready中。
RabbitMQ的高可用性:    rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式   普通集群模式:意思就是在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是你创建的queue,只会放在一个rabbtimq实例上,但是每个实例都同步queue的元数据。这样的话,好处在于,你任何一个机器宕机了,没事儿,别的机器都可以用。坏处在于,第一,这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和消耗很重!第二,这么玩儿,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue   镜像集群模式:通过rabbitmq管理控制台在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
五.代码
//confirm机制@Componentpublic class TopicSender   implements RabbitTemplate.ConfirmCallback {    private RabbitTemplate rabbitTemplate;    @Autowired    public TopicSender(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;        rabbitTemplate.setConfirmCallback(this);    }    public void send2() {        for (int i = 0; i < 3; i++) {            String context = "hi, i am messages " + i;            System.out.println("Sender : " + context);            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());            System.out.println("callbackSender UUID: " + correlationData.getId());            this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context, correlationData);            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    //实现回调    @Override    public void confirm(CorrelationData correlationData, boolean b, String s) {        System.out.println("confirm: " + correlationData.getId() + ",s=" + s + ",b:" + b);    }}
六.总结

上面的逻辑是从控制器带service的整个逻辑实现,其中原理就是利用线程池实现异步多线程调用方法,最后利用闭锁CountDownLatch完成了线程的整个等待结果的过程,最后实现有序结果的放回。

标签: #动态缓冲监视程序作用