龙空技术网

延迟队列-及存在的问题和解决方案

空山细兩 2061

前言:

此时咱们对“js 设置延迟”大致比较关心,同学们都需要知道一些“js 设置延迟”的相关知识。那么小编同时在网上搜集了一些对于“js 设置延迟””的相关资讯,希望兄弟们能喜欢,看官们快快来学习一下吧!

延迟队列的实现1:

将消息发送到指定了过期时间的队列。即在声明某个队列时,声明这个队列中所有消息的过期时间,这样,只要进入此队列的消息,它们的过期时间都是相同的,如以下代码,指定了过期时间为60秒,及过期的消息被路由的目标,如果不设置的话,默认过期时间为30分钟。

Map<String,Object> map =

Map.of("x-dead-letter-exchange","dead_exchange",

"x-dead-letter-routing-key","dead_routing",

"x-message-ttl",60000);

channel.queueDeclare("daly_queue",true,false,false,map);

截图:

在队列上,统一设置一个过期时间的缺点是,不能根据业务需求,设置某一个消息的过期时间。

或声明一个没有过期时间的队列,但在发布消息时,指定消息的过期时间,如:

正常声明队列:

Map<String,Object> map =

Map.of("x-dead-letter-exchange","dead_exchange",

"x-dead-letter-routing-key","dead_routing");

channel.queueDeclare("daly_queue",true,false,false,map);

发布时,设置过期时间:

prop = new AMQP.BasicProperties().builder()

.expiration("3000").build();

channel.basicPublish("daly_exchange",

"daly_routing",

prop,

"B过期时间为3秒".getBytes());

但上述方法,设置每一个消息不同的过期时间,依然存在以下问题:

延迟队列的问题:

1:A消息到达队列Q1并设置A消息的过期时间为10秒。

2:B消息也到达队列Q1,并设置B消息的过期时间为1秒。

3:这种情况下,因为队列中的数据是FIFO,排队执行的,所以,虽然B消息已经过期,但因A消息没有过期无法先将B消息发送给消费者,这就是延迟队列的最大问题。

4:解决方案:使用插件。

问题2示例图-演示

以下演示,用于说明后发的信息虽然已经过期,但因为前一个消息并没有过期,所以,即使后面的消息过期,也不会被消费。

开发生产者声明列信交换机,死信队列及绑定关系。声明正常交换机,正常队列,队列中消息过期时路由目的地,及绑定关系。发布消息时,设置消息的过期时间。

package wj.rabbitmq.daly;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import lombok.extern.slf4j.Slf4j;

import wj.mq.utils.ConnUtils;

import java.util.Map;

/**

* 延迟队列生产者

*/

@Slf4j

public class DalySender {

public static void main(String[] args) throws Exception {

Connection con = ConnUtils.newConnection();

Channel channel = con.createChannel();

//声明接收死信的的交换机和队列

channel.exchangeDeclare("dead_exchange","direct",true);

//声明接收列信交换机数据的队列

channel.queueDeclare("dead_queue",true,false,

false,null);

//声明死信交换机与死信队列的绑定关系

channel.queueBind("dead_queue","dead_exchange",

"dead_routing");

//声明正常接收信息的交换机

channel.exchangeDeclare("daly_exchange","direct",true);

//声明正常接收数据的队列,并添加列信路由到哪儿去

Map<String,Object> map =

Map.of("x-dead-letter-exchange","dead_exchange",

"x-dead-letter-routing-key","dead_routing");

channel.queueDeclare("daly_queue",true,false,

false,map);

//声明正常的绑定关系

channel.queueBind("daly_queue","daly_exchange","daly_routing");

//先发送一个过期时间为30秒

AMQP.BasicProperties prop =

new AMQP.BasicProperties().builder()

.expiration("30000")

.build();

channel.basicPublish("daly_exchange",

"daly_routing",

prop,

"A过期时间为30秒".getBytes());

log.info("信息A过期30秒,发送完成。");

//再发送一个过期时间为3秒

prop = new AMQP.BasicProperties().builder()

.expiration("3000").build();

channel.basicPublish("daly_exchange",

"daly_routing",

prop,

"B过期时间为3秒".getBytes());

log.info("信息B过期3秒发送完成。");

con.close();

}

}

开发消费者

开发一个消费者,只需要从死信队列中,读取过期的信息即可。

package wj.rabbitmq.daly;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import wj.mq.utils.ConnUtils;

/**

* 只接收死信队列里面的数据即可,不要消费正常的队列

* 以便于让正常的队列里面消息过期

*/

@Slf4j

public class DalyReceiver {

public static void main(String[] args) throws Exception{

Connection con = ConnUtils.newConnection();

Channel channel = con.createChannel();

System.err.println("准备消费死信队列里面,即过期的信息");

channel.basicConsume("dead_queue",

true,

(consumerTag, message) -> {

log.info("死信,即过期消息:"+new String(message.getBody()));

}, consumerTag -> {

//ignore

});

}

}

运行

运行生产者,输出以下日志:

14:07:53.138 信息A过期30秒,发送完成。

14:07:53.142 信息B过期3秒发送完成。

运行消费者,输出以下日志,可见,并没有因为B先过期,而先收到B。

14:08:23.140 死信,即过期消息:A过期时间为30秒

14:08:23.140 死信,即过期消息:B过期时间为3秒

延迟队列问题的解决方案(使用插件)

插件下载地址:

rabbitmq所有可用插件列表:

查看已经启用的插件

通过命令rabbitmq-plugins list可以查看rabbitmq的插件列表,及已经启用的插件,前面添加了*号的,为已经启用的插件。可见,启用的插件为management和web,其他插件,都没有启动。但也没有我们要使用的dalyed_message_exchange插件,所以,还需要额外的安装这个插件。

插件相关命令

官方参考地址:

rabbitmq-plugins list 用于列示所有插件

rabbitmq-plugins enable <plugin-name> 启用插件

rabbitmq-plugins disable <plugin-name> 禁用插件

rabbitmq-plugins directories -s 用于查看plugins可安装的目录 (以下示例,要进入容器执行)

安装插件

下载rabbitmq_delayed_message_exchange-<version>.ez文件,进入docker容器,将文件放到plugins目录下,此目录为:/opt/bitnami/rabbitmq/plugins。

然后执行安装插件的命令:

如果不是用docker容器运行的,则可以直接将*.ez文件放到rabbitmq的插件目录下为:/usr/lib/rabbitmq/rabbitmq-server-<version>/plugins。

然后就可以执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange。

验证插件

安装插件后,再次查看ui端口的交换机创建处,可以见到新的交换机类型:x-delayed-message .

测试发送延迟消息(Java项目)开发生产者声明延迟交换机,x-delay-message 。声明普通队列。声明延迟交换机与普通队列的绑定关系。发送延迟消息,通过设置x-delay,单位毫秒。先发送一个延迟长的消息如1分钟。再发送一个延迟少的消息,如6秒。测试必须要先收到延迟少的消息,就算是延迟交换机运行成功。

package wj.rabbitmq.delayexchange;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import lombok.extern.slf4j.Slf4j;

import wj.mq.utils.ConnUtils;

import java.nio.charset.StandardCharsets;

import java.util.HashMap;

import java.util.Map;

/**

* <pre>

* 使用延迟交换机发送延迟消息

* 发送的消息会被阻塞到延迟交换机中,先到期的消息会先被发送到队列中,并且不再设置过期时间

* 发送的队列的消息的,立刻会被消息

* </pre>

*/

@Slf4j

public class DelayExchangeSender {

public static void main(String[] args) throws Exception {

Connection con = ConnUtils.newConnection();

Channel channel = con.createChannel();

//声明延迟交换机,即指定个x-delayed-message的交换机的基础类型

Map<String, Object> map = new HashMap<>();

map.put("x-delayed-type", "direct");//必须写,固定值

channel.exchangeDeclare(

"my-delayed-exchange",//指定交换机的名称,任意

"x-delayed-message",//指定交换机类型,必须是这个类型

true,//是否持久化

false,//是否自动删除

map);//必须传入这个参数

//声明普通队列

channel.queueDeclare("my-queue",

true, false,

false, null);

//声明绑定关系

channel.queueBind("my-queue",

"my-delayed-exchange",

"delay-routing");

//先发送延迟60秒的消息,其中x-delay为固定key

AMQP.BasicProperties prop = new AMQP.BasicProperties()

.builder().headers(Map.of("x-delay", 1000 * 60))//设置延迟时间为60秒

.build();

channel.basicPublish("my-delayed-exchange",

"delay-routing",

prop,

"Delay for 1 minutes".getBytes(StandardCharsets.UTF_8));

log.info("发送消息:[Delay for 1 minutes] 完成");

//再发送一个延迟时间少的信息,如6秒

prop = new AMQP.BasicProperties()

.builder().headers(Map.of("x-delay",1000*6))

.build();

channel.basicPublish("my-delayed-exchange",

"delay-routing",

prop,

"Delay for 6 seconds".getBytes(StandardCharsets.UTF_8));

log.info("发送消息: [Delay for 6 seconds] 完成");

}

}

如果先启动生产者,并已经声明了交换机与队列的绑定关系,则消费者可以不再次声明绑定关系。所以,消费者的代码就变的比较简单了,消费者仅是消费消息,并输入消费的时间:

开发消息者

package wj.rabbitmq.delayexchange;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import wj.mq.utils.ConnUtils;

/**

* 延迟消息消费者

*/

@Slf4j

public class DelayExchangeReceiver {

public static void main(String[] args) throws Exception {

Connection con = ConnUtils.newConnection();

Channel channel = con.createChannel();

//直接消费这个队列即可

channel.basicConsume("my-queue", true,

(consumerTag, message) -> {

log.info("消费消息:{}", new String(message.getBody()));

}, consumerTag -> {

//ignore

});

}

}

启动

先启动生产者,发送信息,输出一下时间:

12:35:03.772 发送消息:[Delay for 1 minutes] 完成

12:35:03.774 发送消息: [Delay for 6 seconds] 完成

快速启动消费者,等待,并查看收到消息:

12:35:09.782 消费消息:Delay for 6 seconds

12:36:03.776 消费消息:Delay for 1 minutes

通过上面的代码,可以看出,消费者在6秒后,先收到了先延迟到期的消息,一分钟以后,再收到延迟一分钟的消息。虽然,先发送的是延迟一分钟的消息,但此消息,并没有阻塞后面延迟时间短的消息。

查看延迟交换机

可以看到rate out,此时间将会是一个倒计时,到时间以后再给发送给队列。

在没有收以消息之前,消息会被阻塞到交换机中,所以,没有到延迟时间的消息,不会被发送的队列中。

测试发送延迟消息(SpringBoot项目)

使用springboot项目,声明延迟交换机,需要使用CustumExchange自定义交换机类型。

开发配置类定义延迟交换机。定义普通队列。定义绑定关系。

package wj.mq.config.delay;

import java.util.Map;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.CustomExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* 使用rabbitmq_delayed_message_exchange发送延迟消息

*/

@Configuration

public class DelayMessageExchangeConfig {

@Bean

public CustomExchange delayExchange() {

CustomExchange exchange = //

new CustomExchange("my-delayed-exchange", // 交换机名称

"x-delayed-message", // 交换机的类型,必须是此值

true, // durable

false, // autoDelete

Map.of("x-delayed-type", "direct")// 传递延迟类型

);

return exchange;

}

@Bean

public Queue myQueue() {

Queue queue = new Queue("my-queue", //队列名称

true,//durable

false,//exclusive

false);//auto delete

return queue;

}

@Bean

public Binding myBinding(CustomExchange customExchange, Queue myQueue) {

Binding binding = BindingBuilder.bind(myQueue)//Queue

.to(customExchange)//exchange

.with("my-routing")//路由routing

.noargs();

return binding;

}

}

开发生产者代码注入RabbitTemplate发送,并设置过期时间。发送信息,使用ApplicationRunner启动后即发送,也可以开发一个Conntroller,动态调用。

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j

@Component

public class DelaySender implements ApplicationRunner {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send() {

// 先发送一个延迟1分钟的消息

rabbitTemplate.convertAndSend("my-delayed-exchange", // 交换机名称

"my-routing", // 路由key

"Message of delay 1 minutes", // 消息对象

new MessagePostProcessor() {// 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message)//

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,60秒钟

message.getMessageProperties().setDelay(1000 * 60);

return message;

}

});

log.info("发送延迟1分钟的消息-完成");

// 再发送一个延迟6秒钟的消息

rabbitTemplate.convertAndSend("my-delayed-exchange", // 交换机名称

"my-routing", // 路由key

"Message of delay 6 seconds", // 消息对象

new MessagePostProcessor() {// 接收一个消息发送前的处理函数

@Override

public Message postProcessMessage(Message message)//

throws AmqpException {

// 处理消息,设置消息的过期时间设置x-delay头,6秒钟

message.getMessageProperties().setDelay(1000 * 6);

return message;

}

});

log.info("发送延迟6秒的消息-完成");

}

@Override

public void run(ApplicationArguments args) throws Exception {

send();//启动完成后就发送,也可以通过一个controller调用测试

}

}

开发消息者代码

通过@RabblitListener接收消息

package wj.mq.rabbitmq.delay;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

/**

* 消费延迟消息

*/

@Slf4j

@Component

public class DelayReceiver {

@RabbitListener(queues = "my-queue", ackMode = "AUTO")

public void cousumer(@Payload() Message message, Channel channel) {

log.info("接收到消息:{}", new String(message.getBody()));

}

}

运行测试

通过以下输出可以看出,虽然先发送的延迟1分钟的消息,但由于后发出的消息因延迟时间更少(6秒)而被先接收到。说明这个测试已经成功运行。

2022-10-01 21:28:55.408 INFO : 发送延迟1分钟的消息-完成

2022-10-01 21:28:55.417 INFO : 发送延迟6秒的消息-完成

2022-10-01 21:29:01.433 INFO : 接收到消息:Message of delay 6 seconds

2022-10-01 21:29:55.408 INFO : 接收到消息:Message of delay 1 minutes

延迟插件的运行原理延迟交换机,会将收到信息,先缓存到延迟交换机内部。消息延迟到期后,才会发送给目标队列。延迟插件窗口镜像自定义

由于每一次启动延迟插件,需要将rabbitmq_delayed_message_exchange-<version>.ez copy到bitnami/rabbitmq容器的目录: /opt/bitnami/rabbitmq/plugins/下。且如果容器删除后,必须重新copy。

可以创建一个新的镜像,将插件copy到镜像中,在启动时通过RABBITMQ_PLUGINS指定启动的插件,就可以了。

创建Dockerfile

创建新的镜像

from bitnami/rabbitmq:3.10.8

MAINTAINER WJ

COPY rabbitmq_delayed_message_exchange-3.10.2.ez /opt/bitnami/rabbitmq/plugins/

构建新的镜像

docker build -t rabbitmq:1.0 .

启动新的容器

并指定启用的插件。

#!/bin/bash

docker stop mq

docker rm mq

docker run --name mq -d \

-p 5672:5672 \

-p 15672:15672 \

-e RABBITMQ_USERNAME=admin \

-e RABBITMQ_PASSWORD=admin \

-e TZ=Asia/Shanghai \

-e RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream,rabbitmq_delayed_message_exchange \

-v ${PWD}/data:/bitnami \

mq:1.0

进入容器查看

可见,指定的插件,已经启用。

通过界面查看:

标签: #js 设置延迟