龙空技术网

RabbitMQ入门教程(六):路由选择Routing

Java实用技术 1512

前言:

现时兄弟们对“java路由”大概比较注意,你们都需要了解一些“java路由”的相关资讯。那么小编在网摘上收集了一些对于“java路由””的相关资讯,希望兄弟们能喜欢,小伙伴们一起来了解一下吧!

简介

本节主要演示使用直连接类型,将多个路由键绑定到同一个队列上。也可以将同一个键绑定到多个队列上(多重绑定multiple bindings),此时满足键的队列都能收到消息,不满足的直接被丢弃。

生产者

public class Producer {    @Test    public void testBasicPublish() throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(AMQP.PROTOCOL.PORT);        factory.setUsername("mengday");        factory.setPassword("mengday");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        // Routing 的路由规则使用直连接        String EXCHANGE_NAME = "exchange.direct.routing";        String[] routingKeys = {"debug", "info", "warning", "error"};        for (int i = 0; i < 20; i++){            int random = (int)(Math.random() * 4);            String routingKey = routingKeys[random];            String message = "Hello RabbitMQ - " + routingKey + " - " + i;            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));        }        // 关闭资源        channel.close();        connection.close();    }}
消费者1
public class Consumer1 {    @Test    public void testBasicConsumer1() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(AMQP.PROTOCOL.PORT);        factory.setUsername("mengday");        factory.setPassword("mengday");        Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        String EXCHANGE_NAME = "exchange.direct.routing";        // 生成一个随机的名称,queueDeclare()方法没有任何参数,当最后一个消费者断开时就会删除掉该队列,当消费者结束后可以看到队列就删除了        String QUEUE_NAME = channel.queueDeclare().getQueue();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 在消费者端队列绑定        // 将一个对列绑定多个路由键        String[] routingKeys = {"debug", "info"};        for (int i = 0; i < routingKeys.length; i++) {            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);        }        System.out.println("Consumer Wating Receive Message");        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [C] Received '" + message + "', 处理业务中...");            }        };        channel.basicConsume(QUEUE_NAME, true, consumer);        Thread.sleep(1000000);    }}
消费者2
public class Consumer2 {    @Test    public void testBasicConsumer2() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(AMQP.PROTOCOL.PORT);        factory.setUsername("mengday");        factory.setPassword("mengday");        Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        String EXCHANGE_NAME = "exchange.direct.routing";        // 生成一个随机的名称        String QUEUE_NAME = channel.queueDeclare().getQueue();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        // 在消费者端队列绑定        // 将一个对列绑定多个路由键        String[] routingKeys = {"warning", "error"};        for (int i = 0; i < routingKeys.length; i++) {            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);        }        System.out.println("Consumer Wating Receive Message");        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [C] Received '" + message + "', 处理业务中...");            }        };        channel.basicConsume(QUEUE_NAME, true, consumer);        Thread.sleep(1000000);    }}
运行结果

标签: #java路由