龙空技术网

RabbitMQ延迟队列实现

西二旗的哥 235

前言:

现在看官们对“docker rabbitmq持久化”大约比较关心,你们都想要学习一些“docker rabbitmq持久化”的相关文章。那么小编同时在网摘上收集了一些对于“docker rabbitmq持久化””的相关资讯,希望小伙伴们能喜欢,各位老铁们快快来学习一下吧!

什么是延迟队列

延迟队列是一个带有延迟功能的消息队列,本质上也是消息队列, 对于某些业务场景,我们需要让消费者在指定的时间延迟后消费该消息

订单提交后一定时间内没支付的话,提示用户该支付比如说在头条上写文章也有个定时发布功能,需要延迟指定的时间后去修改文章状态

上述这两个业务场景虽然说可以利用定时任务去完成,但这种方案在数据量大的情况下会给数据库增加无形中的负担,而且定时任务在计算时间上也有时差。

RabbitMQ本身是不直接支持延时队列的,RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现:

TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

Docker安装RabbitMQ

#下载镜像docker pull rabbitmq:3-management#运行rabbitmqdocker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management#进入容器并开启管理功能docker exec -it rabbitmq /bin/bash#进入容器后开启管理功能rabbitmq-plugins enable rabbitmq_management

运行起来后可以访问

安装延迟队列插件rabbitmq_delayed_messsage_exchange

首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:

下载成功后复制到docker容器中去

docker cp ./rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

接下来再次进入容器中

#查看插件rabbitmq-plugins list#安装插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看插件列表

安装成功后在add a new exchange中可以查看

实现延迟队列

开发环境:SpringBoot+RabbitMQ

编写配置类

package com.taobao.mqdelaydemo.config;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig {    @Bean    public Queue queue(){        return new Queue(DelayedTopic.DELAY_QUEUE);    }    /**     * 延迟交换机     */    @Bean    public CustomExchange customExchange(){        Map<String,Object> map = new HashMap<>();        map.put("x-delayed-type","direct");        //设置持久化参数        return new CustomExchange(DelayedTopic.DELAY_EXCHANGE,"x-delayed-message",true, false, map);    }    /**     * 绑定延迟队列和交换机     */    @Bean    public Binding binding(@Qualifier("queue") Queue queue,                           @Qualifier("customExchange") CustomExchange customExchange){        return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAY_ROUTING_KEY).noargs();    }}

编写自定义类

package com.taobao.mqdelaydemo;public interface DelayedTopic {    String DELAY_EXCHANGE = "delay_exchange";    String DELAY_QUEUE = "delay_queue";    String DELAY_ROUTING_KEY = "delay_routing_key";}

编写消费发送者

package com.taobao.mqdelaydemo.component;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Componentpublic class MqSender {    @Autowired    AmqpTemplate amqpTemplate;    public void sender() {        String msg = "test delayed message";        System.out.println("Send Time:" + LocalDateTime.now() + ", Send:" + msg);        //延迟6s执行        this.amqpTemplate.convertAndSend(DelayedTopic.DELAY_EXCHANGE, DelayedTopic.DELAY_ROUTING_KEY, msg, x -> {            x.getMessageProperties().setDelay(6000);            return x;        });    }}

编写消息的消费者

package com.taobao.mqdelaydemo.component;import com.rabbitmq.client.Channel;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.time.LocalDateTime;@Componentpublic class MqReceiver {    @RabbitListener(queues = DelayedTopic.DELAY_QUEUE)    public void receive(Message message, Channel channel) throws IOException {        String str = new String(message.getBody());        System.out.println("Receive Time:" + LocalDateTime.now() + ", Receive:" + str);        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    }}

测试消息

package com.taobao.mqdelaydemo.controller;import com.taobao.mqdelaydemo.component.MqSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMappingpublic class TestController {    @Autowired    MqSender mqSender;    @GetMapping("/sender")    public String sender(){        this.mqSender.sender();        return "ok";    }}

延迟6秒后发送消息

扩展

除了延迟队列外,如果不考虑到消息的持久化,并且系统如果是单机版的话可以使用jdk的Timer和TimerTask两个类来实现,也可以达到一些场景的目的

public class MyTimerTask extends TimerTask {    @Override    public void run() {        System.out.println("MyTimerTask:"+ LocalDateTime.now());    }}public static void main(String[] args) {  Timer timer = new Timer();  System.out.println("Main Thread Start at:" + LocalDateTime.now());  MyTimerTask myTimerTask = new MyTimerTask();  timer.schedule(myTimerTask, 5000);  System.out.println("Main Thread Stop at:" + LocalDateTime.now());}

延迟5秒执行

参考

标签: #docker rabbitmq持久化