龙空技术网

高级开发必须掌握RabbitMQ之初识RabbitMQ

程序员界的彭于晏 4673

前言:

今天兄弟们对“phprabbitmq”大致比较关怀,小伙伴们都想要知道一些“phprabbitmq”的相关内容。那么小编同时在网上网罗了一些有关“phprabbitmq””的相关知识,希望同学们能喜欢,同学们一起来了解一下吧!

RabbitMQ

我们知道AMQP(高级消息队列协议) 是一个用于在分布式系统中存储转发消息进行通信网络协议。而RabbitMQ是实现AMQP协议的消息中间件的一种(主要用于应用程序的异步通信和解偶,消息的发送者无需关心消息使用者,反之亦然。),RabbitMQ的服务器端用Erlang语言编写,支持多种客户端,如:Python、.NET、Java、JMS、C、PHP等,支持AJAX。

既然RabbitMQ是对AMQP的具体实现。要想彻底的了解RabbitMQ的设计,我们必须要先来了解AMQP 模型。

这里大家如果有兴趣的可以去阅读官方文档AMQP_0-9-1_Model,这里我简单归纳总结一下 AMQP模型。

从上一篇章的应用解耦需求出发,我们来看看消息中间件(messaging middleware broker)及其上下游系统所扮演的角色。

很显然,在上面的使用场景分析中我们可以看出来,消息中间件(brokers) 主要承担一个消息(message)容器的角色,它接收从发布者(publishers) 亦称生产者(producers)那儿来的消息。并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers) 处理(实际上rabbitmq是生产者(producers) 投递到交换机(exchange) ,然后exchange按照路由规则分发到特定的队列(queue) ,再推送给消费者(consumers),或者消费者(consumers)主动拉取。)。

OK,到了这一步,我们对用消息中间件的模型设计有了一定的了解,接下来,我们先看看官方的AMQP模型设计:

AMQP模型

AMQP协议模型设计的工作过程如下:消息(message) 被发布者(publisher) (国内更多称为生产者,后面我们统称生产者(producers))发送给交换机(exchange) (国内一般比喻成邮局或者邮箱)。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue) 。最后AMQP的实现者消息中间件 (例如rabbitmq)会将消息投递给订阅了此队列的消费者(consumers) ,或者消费者按照需求来主动拉取。

图示如下:

从模型设计,我们可以清楚的了解AMQP的所有组件及其功能,接下来,我们介绍我们今天的主角,实现AMQP协议的消息中间件rabbitmq。

AMQP协议层角色相关的概念

生产者(producer):产生消息的应用,能够传递消息到消息中间件的应用。

消息中间件(brokers):消息传递的中间载体,即我们今天的主角rabbitmq。

消费者(consumers):接收并处理消息的应用。从消息中间件获取消息并处理。

连接(Connection):生产者 或 消费者 和 消息中间件之间需要建立起连接。AMQP应用层协议使用的是能够提供可靠投递的TCP连接,AMQP的连接通常是长连接,AMQP使用认证机制并且提供TLS(SSL)保护。当我们的生产者 或 消费者 不再需要连接到消息中间件的的时候,需要优雅的释放掉它们与消息中间件TCP连接,而不是直接将TCP连接关闭。

信道(channel):通常情况下生产者 或 消费者 需要与 消息中间件之间建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP协议提供了信道(channel)这个概念来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个信道准备的。

消息中间件相关的概念

虚拟主机(vHosts):虚拟主机概念,一个Virtual Host里面可以有若干个Exchange和Queue,我们可以控制用户在Virtual Host的权限。后面使用篇章再详细说明。

用户(User):最直接了当的认证方式,谁可以使用当前的消息中间件。

交换机(Exchange):交换机接收生产者发出的消息并且路由到由交换机类型和被称作绑定(bindings)的规则所决定的到队列中,交换机不存储消息。

在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

● Name(名称)

● Durability (持久化):消息代理重启后,交换机是否还存在。交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息中间件(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在消息中间件再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

● Auto-delete (自动删除):当所有与之绑定的消息队列都完成了对此交换机的使用后,是否自动删掉它。

● Arguments(参数):alternate-exchange等,指定无法路由时的辅助路由。

消息队列(Queue):存储还未被消费者消费的消息的容器,队列具有以下属性:

● Name(名称)

● Durable(持久化):消息中间件重启后,队列是否依旧存在。持久化队列(Durable queues)会被存储在磁盘上,当消息中间件(broker)重启之后,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。这里需要注意队列的持久化和它存储的未被消费消息的持久化是2个概念,队列的持久化并不会使存储的消息持久化。假如消息中间件(broker)重启之后,持久化队列会被重新声明,但它里面存储的消息只有设置过持久化的消息才能被重新恢复。

● Exclusive(专用队列):可以这样理解当创建这个队列的Connection关闭后队列即被删除,不存在其它的使用可能性。

● Auto-delete(自动删除):当没有消费者订阅这个队列的时候就会被删除。

● Arguments(参数):消息中间件用来完成类似设置最大长度死信队列等的参数。

消息队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。 如果声明中的属性(名称除外)与已存在队列的属性有差异,那么则会申明失败。

消息(message): 生产者产生的和消费者处理的消息。RabbitMQ的消息是有属性概念的。有些属性是被消息中间件所使用的,但是大多数是开放给接收它们的应用解释器用的。我们也可以为消息定义消息头(headers)。消息属性需要在消息被发布的时候定义。例如:

● Content type(内容类型)

● Content encoding(内容编码)

● Routing key(路由键)

● Delivery mode (persistent or not) 投递模式(持久化 或 非持久化)这里需要注意,消息的持久化依赖与队列的持久化,我们需要同步设置。

● Message priority(消息优先权)

● Message publishing timestamp(消息发布的时间戳)

● Expiration period(消息有效期)

● Publisher application id(发布应用的ID)

路由键(routing key):路由关键字,交换机exchange的路由规则利用这个关键字进行消息投递到消息队列。( 路由键长度不能超过255个字节)

绑定(Binding):Binding可以理解为交换机Exchange路由消息到消息队列的路由规则关系(即消息队列和交换机的绑定)。当交换机Exchange收到生产者传递的消息Message时会解析其Routing Key,Exchange根据Routing Key与交换机类型Exchange Type将Message路由到消息队列中去。

概念固然很多,但是这个时候一定要保持耐心,知道概念是第一步,理解它是第二步,运用它是第三步,只有到了第三步,我们才算是真正的能解决问题。

我这里给出 rabbitmq JAVAClient 的API,做技术的,看API更加有助于理解。这里如果不能全部理解也不要着急,一步步消化。

//第一步配置连接工厂ConnectionFactory factory = new ConnectionFactory();//设置用户名factory.setUsername("test");//设置用户密码factory.setPassword("test321");//设置虚拟主机(虚拟主机和用户的权限管理 我们后面抽一小篇描述)factory.setVirtualHost("test");//设置主机地址factory.setHost("192.168.199.188");//设置端口factory.setPort(5672);Connection conn = null;//或者用下面配置连接工厂(分参数配置 和 URI 二选一)try {	 	factory.setUri("amqp://test:test321@192.168.199.188:5672/test");} catch (Exception e) { e.printStackTrace();}//创建连接try { conn = factory.newConnection();} catch (Exception e) { e.printStackTrace();}//创建channelChannel channel = null;if(conn != null){ try {		channel = conn.createChannel(); } catch (IOException e) {		e.printStackTrace(); }}//exchanges和queues是Client端应用所必须的。在使用之前必须先“declared”(声明),确保在使用之前已经存在,如果不存在则创建它,如果存在,申明不回有任何影响的,这些操作都包含在declare里。if(channel !=null){ /** * 申明exchange的API. * @param exchange 名称 * @param type 类型 * @param durable 持久化 defaul true * @param autoDelete 自动删除 defacult true * @param internal 内部使用 true if the exchange is internal, i.e. can't be directly published to by a client. * @param arguments 额外参数 * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ /*Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;*/ try { //交换机的参数配置(这里API给全,大家按需配置,重点是了解) Map<String, Object> arguments =new HashMap<>(); /* 当一个消息不能被route的时候,如果exchange设定了AE,则消息会被投递到AE。如果存在AE链,则会按此继续投递,直到消息被route或AE链结束或遇到已经尝试route过消息的AE。 */ arguments.put("alternate-exchange", "amq.direct"); //申明我们的exchange(具体的API调用) AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare("test", BuiltinExchangeType.DIRECT, true, false,false,arguments); //申明死信队列用的(重点关注上面的API,这里大家也能看出来,预留后面用的上) channel.exchangeDeclare("dead", BuiltinExchangeType.DIRECT, true, false,false,null); } catch (IOException e) { e.printStackTrace(); } /** * 队列申明API * @param queue 名称 * @param durable 持久化 defaul true * @param exclusive 专用性 defaul true * @param autoDelete 自动删除 defacult true * @param arguments 额外参数 * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ /* Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;*/ //队列的参数配置(这里API给全,大家按需配置) Map<String, Object> map = new HashMap<>(); //标志队列中的消息存活时间,也就是说队列中的消息超过了指定时间会被删除(数字类型,标志时间,以豪秒为单位) map.put("x-message-ttl", 1 * 24 * 60 * 60 * 10000); //队列自身的空闲存活时间,当前的queue在指定的时间内,没有consumer、basic.get也就是未被访问,就会被删除。(数字类型,标志时间,以豪秒为单位) map.put("x-expires",60 * 60 * 10000); //最大长度和最大占用空间,设置了最大长度的队列,在超过了最大长度后进行插入会删除之前插入的消息为本次的留出空间(默认操作是如此,我们可以设置overflow来改变,例如用在并发缓冲时),相应的最大占用大小也是这个道理,当超过了这个大小的时候,会删除之前插入的消息为本次的留出空间。 map.put("x-max-length",10000); map.put("x-max-length-bytes",100 * 1024 * 1024); //队列超出最大长度的处理方案 ,队列溢出的默认处理方案:drop-head (default) 或者拒绝消息 reject-publish 我们做并发限流的时候需要设置为超出队列拒绝 map.put("x-overflow","reject-publish"); //惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 map.put("x-queue-mode","lazy"); //集群属性,我们这里暂不讨论 map.put("x-queue-master-locator",""); /* 消息因为超时或超过限制在队列里消失,这样我们就丢失了一些消息,也许里面就有一些是我们做需要获知的。而rabbitmq的死信功能则为我们带来了解决方案。设置了dead letter exchange与dead letter routingkey(要么都设定,要么都不设定)那些因为超时或超出限制而被删除的消息会被推动到我们设置的exchange中,再根据routingkey推到queue中. */ map.put("x-dead-letter-exchange","dead"); map.put("x-dead-letter-routing-key","key"); //队列所支持的优先级别,列如设置为5,表示队列支持0到5六个优先级别,5最高,0最低,当然这需要生产者在发送消息时指定消息的优先级别,消息按照优先级别从高到低的顺序分发给消费者 map.put("x-max-priority",5); try { //申明一个test队列 API channel.queueDeclare("test", true, false, false, map); } catch (IOException e) { e.printStackTrace(); } /** * 绑定队列和交换机的API * @param queue 队列名称 * @param exchange 交换机名称 * @param routingKey 绑定的 routingKey * @param arguments 额外的参数 * @return a binding-confirm method if the binding was successfully created * @throws java.io.IOException if an error is encountered */ /*Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;*/ try { //绑定交换机 和 队列 AMQP.Queue.BindOk bindOK = channel.queueBind("test", "test", "test", null); } catch (IOException e) { e.printStackTrace(); } //传递消息 byte[] messageBodyBytes = "Hello, world!".getBytes(); //我们可以为消息设置Header Map<String, Object> headers = new HashMap<String, Object>(); headers.put("latitude", 51.5252949); headers.put("longitude", -0.0905493); /** * 发布消息API * @param exchange 交换机名称 * @param routingKey 路由建 * @param mandatory true mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者 * @param props other properties for the message - 其他属性 * @param body the message body 消息体 * @throws java.io.IOException if an error is encountered */ /*void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;*/ try { //发布一条消息 channel.basicPublish("test", "test", true, (new AMQP.BasicProperties.Builder() .headers(headers) .contentType("text/plain") .deliveryMode(2) .priority(1) .userId("test") .appId("test") .expiration("60000") .build()), messageBodyBytes); } catch (IOException e) { e.printStackTrace(); } //消息是否进队列监听处理 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { } }); /** * Request specific "quality of service" settings. * @param prefetchSize abbitmq会传递的最大消息总量大小, 0 代表不限制 * @param prefetchCount rabbitmq会传递的最大消息数, 0 代表不限制 * @param global true 代表设备本channel而不只是customer * @throws java.io.IOException if an error is encountered */ /*void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;*/ try { //我们可以通过basicQos设置prefetchCount 限制每个消费者在收到下一个确认回执前一次可以最大接受多少条消息。即如果设置prefetchCount =1,RabbitMQ向这个消费者发送一个消息后,再这个消息的消费者对这个消息进行ack之前,RabbitMQ不会向这个消费者发送新的消息 channel.basicQos(0,1,true); } catch (IOException e) { e.printStackTrace(); } /** * 队列pull消息API * @param queue the name of the queue * @param autoAck 自动确认消息 * @return a {@link GetResponse} containing the retrieved message data * @throws java.io.IOException if an error is encountered */ /*GetResponse basicGet(String queue, boolean autoAck) throws IOException;*/ try { //自动获取消息(pull)下面是Push方式,二选一 GetResponse response = channel.basicGet("test",true); } catch (IOException e) { e.printStackTrace(); } /** * Start a non-nolocal, non-exclusive consumer. * @param queue 队列名称 * @param autoAck 是否自动确认消息处理 * @param consumerTag 消费者标签 * @param callback an interface to the consumer object 消费者处理 * @return the consumerTag associated with the new consumer * @throws java.io.IOException if an error is encountered * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ /*String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;*/	//不自动确认消息,下面消息确认机制篇再详细说明 boolean autoAck = false; Channel finalChannel = channel; try { channel.basicConsume("test", autoAck, "test-consumer-tag", new DefaultConsumer(finalChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //发布的每一条消息都会获得一个唯一的deliveryTag long deliveryTag = envelope.getDeliveryTag(); // 批量确认 finalChannel.basicAck(deliveryTag, true); } }); } catch (IOException e) { e.printStackTrace(); }}//最后关闭channelif(channel != null){ try { channel.close(); } catch (Exception e) { e.printStackTrace(); }}//最后关闭connif(conn != null){ try { conn.close(); } catch (IOException e) { e.printStackTrace(); }}

做JAVA的一般Spring使用比较多的,我这边本来打算给出Spring的API,但是我觉得Spring的API不利于我们学习,真正掌握rabbitmq的API更重要,这里我给出来,大家一则加深理解,后面更多可以当作API来查询使用。

标签: #phprabbitmq