前言:
此刻姐妹们对“phpstaticclass”大体比较关切,同学们都需要学习一些“phpstaticclass”的相关知识。那么小编同时在网络上汇集了一些对于“phpstaticclass””的相关资讯,希望朋友们能喜欢,大家快快来学习一下吧!解决方案
之前遇到一个问题:同一个应用分布式部署时,如何消费同一个消息?
了解了rabbitmq原理后,发现无法直接实现该方式,但是有类似的方式可以参考就是topic模式
最后的解决方案是,服务启动时生成多个通配符队列,这样就不用担心同一个服务需要修改代码才能部署连接rabbitmq
配置如下:
package com.peabody.rabbitmq.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import com.peabody.rabbitmq.envents.BusReceiver;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class BusConfig { private static final String EXCHANGE_NAME = "peabody-exchange"; private static final String ROUTING_KEY = "peabody-route"; @Value("${spring.application.name}") private String appName; @Bean Queue queue() { String queueName = new Base64UrlNamingStrategy(appName + ".").generateName(); return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY); } @Bean SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter, Queue queue) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames(queue().getName()); container.setMessageListener(messageListenerAdapter); return container; } @Bean MessageListenerAdapter messageListenerAdapter(BusReceiver busReceiver, MessageConverter messageConverter) { //自定义消息处理方法 MessageListenerAdapter adapter = new MessageListenerAdapter(busReceiver); adapter.setMessageConverter(messageConverter); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put(queue().getName(), "handleMessage"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); return new MessageListenerAdapter(adapter); } @Bean public MessageConverter messageConverter() {// 自定义消息转换,如json ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); return new ContentTypeDelegatingMessageConverter(new Jackson2JsonMessageConverter(objectMapper)); }}
处理方法随意自行定义:
package com.peabody.rabbitmq.envents;import org.springframework.stereotype.Component;@Componentpublic class BusReceiver { public void handleMessage(String content) { System.out.print(content+"消费者1"); }}
温馨备注提示
多个消费者绑定同一个是无法同时消费的,一个消息只能被一个消费者消费
也就是下图的结构是无法实现上述问题的,因为rabbitmq有队列的负载均衡机制,消息会被负载到每个消费者进行消费,无法同时被多个消费者消费,多个消费者监听同一个队列,RabbitMQ会进行消息的轮询分发。
寄几可以试试看
消费者1
package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import com.peabody.utils.ConnectionUtil;public class Consumer1 { private final static String QUEUE_NAME = "topic_queue_4"; private final static String EXCHANGE_NAME = "topic_exchange2"; public static void main(String[] args) throws Exception{ //1、获取连接 Connection connection = ConnectionUtil.getConnection("10.68.252.126",5672,"/","guest","guest"); //2、声明通道 Channel channel = connection.createChannel(); //3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、绑定队列到交换机,指定路由key为update.# channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#"); //同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); //5、定义队列的消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、获取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消费者1:" + message + "'"); //消费者1接收一条消息后休眠10毫秒 Thread.sleep(10); //返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }}
消费者2
package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;import com.peabody.utils.ConnectionUtil;public class Consumer2 { private final static String QUEUE_NAME = "topic_queue_4"; private final static String EXCHANGE_NAME = "topic_exchange2"; public static void main(String[] args) throws Exception{ //1、获取连接 Connection connection = ConnectionUtil.getConnection("10.68.252.126",5672,"/","guest","guest"); //2、声明通道 Channel channel = connection.createChannel(); //3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、绑定队列到交换机,指定路由key为select.# channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#"); //同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); //5、定义队列的消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、监听队列,手动返回完成状态 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //7、获取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消费者2:" + message + "'"); //消费者2接收一条消息后休眠10毫秒 Thread.sleep(1000); //返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }}
工具
package com.peabody.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil { public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ //1、定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置服务器地址 factory.setHost(host); //3、设置端口 factory.setPort(port); //4、设置虚拟主机、用户名、密码 factory.setVirtualHost(vHost); factory.setUsername(userName); factory.setPassword(passWord); //5、通过连接工厂获取连接 Connection connection = factory.newConnection(); return connection; }}
生产者
package com.peabody.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.peabody.utils.ConnectionUtil;public class Producer { private final static String EXCHANGE_NAME = "topic_exchange2"; public static void main(String[] args) throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection("10.68.252.126", 5672, "/", "guest", "guest"); //2、声明信道 Channel channel = connection.createChannel(); //3、声明交换器,类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //4、创建消息 String message = "hello rabbitmq111"; //5、发布消息 channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes()); System.out.println("生产者发送" + message + "'"); //6、关闭通道 channel.close(); //7、关闭连接 connection.close(); }}
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #phpstaticclass