龙空技术网

rabbitmq为啥不能多个消费者同时消费同一个队列?起飞

程序猿peapeay 1876

前言:

此刻姐妹们对“phpstaticclass”大体比较关切,同学们都需要学习一些“phpstaticclass”的相关知识。那么小编同时在网络上汇集了一些对于“phpstaticclass””的相关资讯,希望朋友们能喜欢,大家快快来学习一下吧!

解决方案

之前遇到一个问题:同一个应用分布式部署时,如何消费同一个消息?

了解了rabbitmq原理后,发现无法直接实现该方式,但是有类似的方式可以参考就是topic模式

最后的解决方案是,服务启动时生成多个通配符队列,这样就不用担心同一个服务需要修改代码才能部署连接rabbitmq

服务启动时生成多个通配符队列

配置如下:

package com.peabody.rabbitmq.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import com.peabody.rabbitmq.envents.BusReceiver;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class BusConfig {    private static final String EXCHANGE_NAME = "peabody-exchange";    private static final String ROUTING_KEY = "peabody-route";        @Value("${spring.application.name}")    private String appName;    @Bean    Queue queue() {        String queueName = new Base64UrlNamingStrategy(appName + ".").generateName();        return new Queue(queueName, false);    }    @Bean    TopicExchange exchange() {        return new TopicExchange(EXCHANGE_NAME);    }    @Bean    Binding binding(Queue queue, TopicExchange exchange) {        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);    }    @Bean    SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter, Queue queue) {        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);        container.setQueueNames(queue().getName());        container.setMessageListener(messageListenerAdapter);        return container;    }    @Bean    MessageListenerAdapter messageListenerAdapter(BusReceiver busReceiver, MessageConverter messageConverter) {        //自定义消息处理方法        MessageListenerAdapter adapter = new MessageListenerAdapter(busReceiver);        adapter.setMessageConverter(messageConverter);        Map<String, String> queueOrTagToMethodName = new HashMap<>();        queueOrTagToMethodName.put(queue().getName(), "handleMessage");        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);        return new MessageListenerAdapter(adapter);    }    @Bean    public MessageConverter messageConverter() {//        自定义消息转换,如json        ObjectMapper objectMapper = new ObjectMapper();        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter(objectMapper));    }}

处理方法随意自行定义:

package com.peabody.rabbitmq.envents;import org.springframework.stereotype.Component;@Componentpublic class BusReceiver {    public void handleMessage(String content) {        System.out.print(content+"消费者1");    }}

温馨备注提示

多个消费者绑定同一个是无法同时消费的,一个消息只能被一个消费者消费

也就是下图的结构是无法实现上述问题的,因为rabbitmq有队列的负载均衡机制,消息会被负载到每个消费者进行消费,无法同时被多个消费者消费,多个消费者监听同一个队列,RabbitMQ会进行消息的轮询分发。

绑定同一个队列

寄几可以试试看

消费者1

package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import com.peabody.utils.ConnectionUtil;public class Consumer1 {    private final static String QUEUE_NAME = "topic_queue_4";    private final static String EXCHANGE_NAME = "topic_exchange2";    public static void main(String[] args) throws Exception{        //1、获取连接        Connection connection = ConnectionUtil.getConnection("10.68.252.126",5672,"/","guest","guest");        //2、声明通道        Channel channel = connection.createChannel();        //3、声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //4、绑定队列到交换机,指定路由key为update.#        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");        //同一时刻服务器只会发送一条消息给消费者        channel.basicQos(1);        //5、定义队列的消费者        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);        //6、监听队列,手动返回完成状态        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);        //6、获取消息        while (true){            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" 消费者1:" + message + "'");            //消费者1接收一条消息后休眠10毫秒            Thread.sleep(10);            //返回确认状态            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        }    }}

消费者2

package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import com.peabody.utils.ConnectionUtil;public class Consumer2 {    private final static String QUEUE_NAME = "topic_queue_4";    private final static String EXCHANGE_NAME = "topic_exchange2";    public static void main(String[] args) throws Exception{        //1、获取连接        Connection connection = ConnectionUtil.getConnection("10.68.252.126",5672,"/","guest","guest");        //2、声明通道        Channel channel = connection.createChannel();        //3、声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //4、绑定队列到交换机,指定路由key为select.#        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");        //同一时刻服务器只会发送一条消息给消费者        channel.basicQos(1);        //5、定义队列的消费者        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);        //6、监听队列,手动返回完成状态        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);        //7、获取消息        while (true){            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" 消费者2:" + message + "'");            //消费者2接收一条消息后休眠10毫秒            Thread.sleep(1000);            //返回确认状态            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        }    }}

工具

package com.peabody.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {    public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{        //1、定义连接工厂        ConnectionFactory factory = new ConnectionFactory();        //2、设置服务器地址        factory.setHost(host);        //3、设置端口        factory.setPort(port);        //4、设置虚拟主机、用户名、密码        factory.setVirtualHost(vHost);        factory.setUsername(userName);        factory.setPassword(passWord);        //5、通过连接工厂获取连接        Connection connection = factory.newConnection();        return connection;    }}

生产者

package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.peabody.utils.ConnectionUtil;public class Producer {    private final static String EXCHANGE_NAME = "topic_exchange2";    public static void main(String[] args) throws Exception {        //1、获取连接        Connection connection = ConnectionUtil.getConnection("10.68.252.126", 5672, "/", "guest", "guest");        //2、声明信道        Channel channel = connection.createChannel();        //3、声明交换器,类型为topic        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        //4、创建消息        String message = "hello rabbitmq111";        //5、发布消息        channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes());        System.out.println("生产者发送" + message + "'");        //6、关闭通道        channel.close();        //7、关闭连接        connection.close();    }}

标签: #phpstaticclass