前言:
现时兄弟们对“java路由”大概比较注意,你们都需要了解一些“java路由”的相关资讯。那么小编在网摘上收集了一些对于“java路由””的相关资讯,希望兄弟们能喜欢,小伙伴们一起来了解一下吧!简介
本节主要演示使用直连接类型,将多个路由键绑定到同一个队列上。也可以将同一个键绑定到多个队列上(多重绑定multiple bindings),此时满足键的队列都能收到消息,不满足的直接被丢弃。
生产者
public class Producer { @Test public void testBasicPublish() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Routing 的路由规则使用直连接 String EXCHANGE_NAME = "exchange.direct.routing"; String[] routingKeys = {"debug", "info", "warning", "error"}; for (int i = 0; i < 20; i++){ int random = (int)(Math.random() * 4); String routingKey = routingKeys[random]; String message = "Hello RabbitMQ - " + routingKey + " - " + i; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); } // 关闭资源 channel.close(); connection.close(); }}消费者1
public class Consumer1 { @Test public void testBasicConsumer1() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct.routing"; // 生成一个随机的名称,queueDeclare()方法没有任何参数,当最后一个消费者断开时就会删除掉该队列,当消费者结束后可以看到队列就删除了 String QUEUE_NAME = channel.queueDeclare().getQueue(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 在消费者端队列绑定 // 将一个对列绑定多个路由键 String[] routingKeys = {"debug", "info"}; for (int i = 0; i < routingKeys.length; i++) { channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]); } System.out.println("Consumer Wating Receive Message"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [C] Received '" + message + "', 处理业务中..."); } }; channel.basicConsume(QUEUE_NAME, true, consumer); Thread.sleep(1000000); }}消费者2
public class Consumer2 { @Test public void testBasicConsumer2() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct.routing"; // 生成一个随机的名称 String QUEUE_NAME = channel.queueDeclare().getQueue(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 在消费者端队列绑定 // 将一个对列绑定多个路由键 String[] routingKeys = {"warning", "error"}; for (int i = 0; i < routingKeys.length; i++) { channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]); } System.out.println("Consumer Wating Receive Message"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [C] Received '" + message + "', 处理业务中..."); } }; channel.basicConsume(QUEUE_NAME, true, consumer); Thread.sleep(1000000); }}运行结果
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #java路由