龙空技术网

使用RabbitMQ的Java接口实现round-robin

进击的IT程序员 151

前言:

而今朋友们对“操作系统roundrobin算法”都比较关怀,你们都需要知道一些“操作系统roundrobin算法”的相关知识。那么小编也在网络上汇集了一些对于“操作系统roundrobin算法””的相关文章,希望我们能喜欢,咱们一起来了解一下吧!

基本概念

简单队列的不足:

1. 耦合性高;

2. 如果生产者把生产队列该了,消费者也要同时改;

Work Queues工作队列,模型如下:

代码与实例

程序运行截图如下:

生产者:

两个消费者如下:

在源码中,消费者一个是1秒消费一次,一个是2秒消费一次,但从我们知道,RabbitMQ给他轮询分发。平均分配

程序结构如下:

Send.java

package work; import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils; import java.io.IOException;import java.util.concurrent.TimeoutException; public class Send {  private static final String QUEUE_NAME = "test_work_queue";  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel();  //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);  for(int i = 0 ; i < 50; i++){  String msg = "Hello World : " + i; System.out.println("send msg : " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); }  channel.close(); connection.close(); }}

Recv1.java

package work; import com.rabbitmq.client.*;import util.ConnectionUtils; import java.io.IOException;import java.util.concurrent.TimeoutException; public class Recv1 {  private static final String QUEUE_NAME = "test_work_queue";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);  System.out.println("recv1 running");  //消费者 Consumer consumer = new DefaultConsumer(channel){  @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String msg = new String(body, "utf-8"); System.out.println("Recv[1] msg is : " + msg);  try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally {  System.out.println("Recv[1] done"); } } };  channel.basicConsume(QUEUE_NAME, true, consumer); }}

Recv2.java

package work; import com.rabbitmq.client.*;import util.ConnectionUtils; import java.io.IOException;import java.util.concurrent.TimeoutException; public class Recv2 {  private static final String QUEUE_NAME = "test_work_queue";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);  System.out.println("recv2 running");  //消费者 Consumer consumer = new DefaultConsumer(channel){  @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String msg = new String(body, "utf-8"); System.out.println("Recv[2] msg is : " + msg);  try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally {  System.out.println("Recv[2] done"); } } };  channel.basicConsume(QUEUE_NAME, true, consumer); }}
想要源码的小伙伴可以私信我,想要更多学习资料的也可以私信我哈

标签: #操作系统roundrobin算法