龙空技术网

RockerMQ4.x 延时消息原理、流程梳理~

java小悠 143

前言:

当前大家对“c语言延时函数5s”大概比较重视,姐妹们都需要学习一些“c语言延时函数5s”的相关知识。那么小编也在网络上搜集了一些关于“c语言延时函数5s””的相关知识,希望看官们能喜欢,小伙伴们一起来了解一下吧!

前言

相信用过RocketMq的小伙伴都知道,RocketMq支持延时消息,通过设置指定的延时级别就可以让消息实现不同时效的延时功能,今天带大家了解的就是延时消息的原理~

如何实现延时?

在带大家正式了解RocketMq延时原理之前,先问大家一个问题,如果我们自己来实现延时功能,我们会如何实现?

Sleep

经典永不过时,相信Sleep是我们最早接触的具有延时功能的函数,下面代码就可以简单实现延时5s后执行业务逻辑

java复制代码public static void main(String[] args) throws InterruptedException {   TimeUnit.SECONDS.sleep(5);   System.out.println("执行业务代码");}
Timer

Timer 类是在 JDK 1.3 版本中引入的。它位于 java.util 包中,用于支持简单的定时任务调度。

不过Timer也有着许多缺陷,谨慎使用~

java复制代码public static void main(String[] args) throws InterruptedException {  // schedule实现延时  Timer timer = new Timer();  timer.schedule(new TimerTask() {    @Override    public void run() {      System.out.println("执行业务逻辑");    }  }, 5000);}
ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 类是在 JDK 1.5 版本中引入的。它是 ThreadPoolExecutor 类的子类,专门用于支持定时任务的调度和执行。

java复制代码public static void main(String[] args) throws InterruptedException {  ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);  // schedule实现延时  executor.schedule(() -> {    System.out.println("执行业务代码");  }, 5, TimeUnit.SECONDS);}
时间轮

有关时间轮我就不细说了,在各大开源框架,诸如: Netty、Dubbo、Kafka都少不了它的影子

RocketMq如何实现的延时消息?

在上面带大家了解了常见的延时方案之后,我们再来探索RcketMq的延时原理~

RocketMq官方延时案例

下面是RocketMq官方提供的案例,我们可以看到代码中通过setDelayTimeLevel设定了延时级别,对应的延时时间就是10s

java复制代码package org.apache.rocketmq.example.schedule;public class ScheduledMessageProducer {  public static final String PRODUCER_GROUP = "ExampleProducerGroup";  public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";  public static final String TOPIC = "TestTopic";  public static void main(String[] args) throws Exception {    DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);    producer.start();    int totalMessagesToSend = 100;    for (int i = 0; i < totalMessagesToSend; i++) {      Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes());      // 设定延时级别      message.setDelayTimeLevel(3);      // 发送消息      SendResult result = producer.send(message);      System.out.print(result);    }    // Shutdown producer after use.    producer.shutdown();  }}
存储延时消息

Producer回将消息发送到Broker,接下来看看Broker是怎么处理延时消息的.

org.apache.rocketmq.store.CommitLog#asyncPutMessage中会对延时消息进行特殊处理

如果不是事务消息且延时级别 > 0,说明该消息是延时消息,需要进行特殊处理检查延时级别是否超过最大值18,如果超过则重置为18保存消息的原始topic和queueId,并将该消息的topic覆盖为延时消息专用的topic,且将queueId设置为延时级别 - 1

java复制代码public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {   // ......  String topic = msg.getTopic();    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  // 事务消息不支持延时  if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE      || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {        // 如果延时级别 > 0说明是延时消息    if (msg.getDelayTimeLevel() > 0) {      if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());      }      // 延时消息使用同一个topic: SCHEDULE_TOPIC_XXXX      // 且queueId: 延时级别 - 1      topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;      int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());      // 存储原始消息的topic、queueId      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));      msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));      // 更新延时消息的topic、queueId      msg.setTopic(topic);      msg.setQueueId(queueId);    }  }    // ......}

经过上面的一番桶特殊处理及之后一些逻辑后,消息会被存储到commitLog中去

取出延时消息

Broker将延迟消息写到了commitLog中后,由于Broker替换了消息原始topic,所以订阅该topic的消费者此时还无法消费该消息。

org.apache.rocketmq.store.schedule.ScheduleMessageService,Broker启动时会启动ScheduleMessageService

而在ScheduleMessageService中会为每个延时级别都开启一个延时任务,延时能力正是利用我前面提到的ScheduledThreadPoolExecutor

同时也会开启一个定时任务,固定时间持久化每个队列的消费偏移量

java复制代码public class ScheduleMessageService extends ConfigManager {    private static final long FIRST_DELAY_TIME = 1000L;  // Broker启动时会初始化这个Map,key是延迟等级,共计18个  // key: 延时级别,value: 延时时间(毫秒)  private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =        new ConcurrentHashMap<Integer, Long>(32);    // key: 延时级别,value: 消费偏移量  private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =        new ConcurrentHashMap<Integer, Long>(32);  public void start() {    if (started.compareAndSet(false, true)) {      this.load();      this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));      if (this.enableAsyncDeliver) {        this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));      }            // 遍历延时级别map      for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {        Integer level = entry.getKey();        Long timeDelay = entry.getValue();        Long offset = this.offsetTable.get(level);        if (null == offset) {          offset = 0L;        }        // 为每个延时级别开一个延时任务,延时1s执行,使用的是ScheduledThreadPoolExecutor        if (timeDelay != null) {          // enableAsyncDeliver 默认false          if (this.enableAsyncDeliver) {            this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);          }          this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);        }      }      // 消费偏移量持久化定时任务,每10s执行一次      this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {          try {            if (started.get()) {              ScheduleMessageService.this.persist();            }          } catch (Throwable e) {            log.error("scheduleAtFixedRate flush exception", e);          }        }      }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);    }  }}

DeliverDelayedMessageTimerTask封装了delayLevel、offset,实现了Runnable重写了run方法,在run方法中如果处于运行中,则调用executeOnTimeup方法执行具体逻辑

java复制代码class DeliverDelayedMessageTimerTask implements Runnable {  private final int delayLevel;  private final long offset;  public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {    this.delayLevel = delayLevel;    this.offset = offset;  }  @Override  public void run() {    try {      if (isStarted()) {				// 核心处理逻辑        this.executeOnTimeup();      }    } catch (Exception e) {      // XXX: warn and notify me      log.error("ScheduleMessageService, executeOnTimeup exception", e);      this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);    }  }}
java复制代码public void executeOnTimeup() {    // 根据topic和延时级别找到对应的ConsumeQueue  ConsumeQueue cq =    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,                                                                     delayLevel2QueueId(delayLevel));  // ......  long nextOffset = this.offset;  try {    int i = 0;    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();    for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {            // ......      // 当前时间      long now = System.currentTimeMillis();      // 计算投递时间,时间存储在了tag hashcode 中      long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);      nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);      // 剩余时间      long countdown = deliverTimestamp - now;      if (countdown > 0) {        // 还未到投递时间,重新加入到延时任务,此时延时100ms        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);        return;      }      // 从commitLog读取message      MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);      if (msgExt == null) {        continue;      }      // 恢复message原始的topic和queueId,之前保存过      MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);      if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",                  msgInner.getTopic(), msgInner);        continue;      }      // 将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了      boolean deliverSuc;      // enableAsyncDeliver默认为false      if (ScheduleMessageService.this.enableAsyncDeliver) {        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);      } else {        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);      }      // 写入commitLog失败,延时重试      if (!deliverSuc) {        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);        return;      }    }    // 更新消费进度    nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);  } catch (Exception e) {    log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);  } finally {    bufferCQ.release();  }  // 更新offset,重新添加延时任务处理延时消息  this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}
根据topic和延时级别找到对应的ConsumeQueue计算校验消息是否到达投递时间如果还没到达则重新加入延时任务到达投递时间,则从commitLog中取出消息,将消息的topic、queueId设置为原始值,并重新写入到commitLog中更新消费偏移量nextOffset,并重新添加延时任务处理延时消息总结

RocketMq延时消息大致流程并不复杂,下面简单总结下

当我们需要发送延时消息的时候,需要对消息设置延时级别,标明该消息是延时消息Broker收到延时消息后,会重新设置延时消息的topic、queueId,并备份消息原始的topic、queueId,之后写入到commitLogBroker会启动ScheduleMessageService,ScheduleMessageService会为每个延时级别启动一个延时任务(利用ScheduledThreadPoolExecutor)根据topic、延时级别找到对应的consumerQueue,然后检查消息是否到达投递时间如果还没达到投递时间,则重新添加到延时任务中如果已经达到投递时间,则将消息topic、queueId归还为原始值,再重新写入commitLog,这样消费者就能感知到进行消费了。最后更新消费偏移量,并重新加入到延时任务进行下一次处理

原文链接:

标签: #c语言延时函数5s