前言:
现在看官们对“docker rabbitmq持久化”大约比较关心,你们都想要学习一些“docker rabbitmq持久化”的相关文章。那么小编同时在网摘上收集了一些对于“docker rabbitmq持久化””的相关资讯,希望小伙伴们能喜欢,各位老铁们快快来学习一下吧!什么是延迟队列
延迟队列是一个带有延迟功能的消息队列,本质上也是消息队列, 对于某些业务场景,我们需要让消费者在指定的时间延迟后消费该消息
订单提交后一定时间内没支付的话,提示用户该支付比如说在头条上写文章也有个定时发布功能,需要延迟指定的时间后去修改文章状态
上述这两个业务场景虽然说可以利用定时任务去完成,但这种方案在数据量大的情况下会给数据库增加无形中的负担,而且定时任务在计算时间上也有时差。
RabbitMQ本身是不直接支持延时队列的,RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现:
TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
Docker安装RabbitMQ
#下载镜像docker pull rabbitmq:3-management#运行rabbitmqdocker run -it --rm -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management#进入容器并开启管理功能docker exec -it rabbitmq /bin/bash#进入容器后开启管理功能rabbitmq-plugins enable rabbitmq_management
运行起来后可以访问
安装延迟队列插件rabbitmq_delayed_messsage_exchange
首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上面的开源项目,我们直接下载即可:
下载成功后复制到docker容器中去
docker cp ./rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
接下来再次进入容器中
#查看插件rabbitmq-plugins list#安装插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange
实现延迟队列
开发环境:SpringBoot+RabbitMQ
编写配置类
package com.taobao.mqdelaydemo.config;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig { @Bean public Queue queue(){ return new Queue(DelayedTopic.DELAY_QUEUE); } /** * 延迟交换机 */ @Bean public CustomExchange customExchange(){ Map<String,Object> map = new HashMap<>(); map.put("x-delayed-type","direct"); //设置持久化参数 return new CustomExchange(DelayedTopic.DELAY_EXCHANGE,"x-delayed-message",true, false, map); } /** * 绑定延迟队列和交换机 */ @Bean public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAY_ROUTING_KEY).noargs(); }}
编写自定义类
package com.taobao.mqdelaydemo;public interface DelayedTopic { String DELAY_EXCHANGE = "delay_exchange"; String DELAY_QUEUE = "delay_queue"; String DELAY_ROUTING_KEY = "delay_routing_key";}
编写消费发送者
package com.taobao.mqdelaydemo.component;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Componentpublic class MqSender { @Autowired AmqpTemplate amqpTemplate; public void sender() { String msg = "test delayed message"; System.out.println("Send Time:" + LocalDateTime.now() + ", Send:" + msg); //延迟6s执行 this.amqpTemplate.convertAndSend(DelayedTopic.DELAY_EXCHANGE, DelayedTopic.DELAY_ROUTING_KEY, msg, x -> { x.getMessageProperties().setDelay(6000); return x; }); }}
编写消息的消费者
package com.taobao.mqdelaydemo.component;import com.rabbitmq.client.Channel;import com.taobao.mqdelaydemo.DelayedTopic;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.time.LocalDateTime;@Componentpublic class MqReceiver { @RabbitListener(queues = DelayedTopic.DELAY_QUEUE) public void receive(Message message, Channel channel) throws IOException { String str = new String(message.getBody()); System.out.println("Receive Time:" + LocalDateTime.now() + ", Receive:" + str); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }}
测试消息
package com.taobao.mqdelaydemo.controller;import com.taobao.mqdelaydemo.component.MqSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMappingpublic class TestController { @Autowired MqSender mqSender; @GetMapping("/sender") public String sender(){ this.mqSender.sender(); return "ok"; }}
延迟6秒后发送消息
扩展
除了延迟队列外,如果不考虑到消息的持久化,并且系统如果是单机版的话可以使用jdk的Timer和TimerTask两个类来实现,也可以达到一些场景的目的
public class MyTimerTask extends TimerTask { @Override public void run() { System.out.println("MyTimerTask:"+ LocalDateTime.now()); }}public static void main(String[] args) { Timer timer = new Timer(); System.out.println("Main Thread Start at:" + LocalDateTime.now()); MyTimerTask myTimerTask = new MyTimerTask(); timer.schedule(myTimerTask, 5000); System.out.println("Main Thread Stop at:" + LocalDateTime.now());}
参考
标签: #docker rabbitmq持久化