龙空技术网

Spring Boot + Redis 实现延时队列,写得太好了

爱马士团团长 1492

前言:

眼前各位老铁们对“redis队列消费慢”大概比较关怀,咱们都需要剖析一些“redis队列消费慢”的相关资讯。那么小编同时在网络上网罗了一些对于“redis队列消费慢””的相关内容,希望姐妹们能喜欢,大家一起来学习一下吧!

来源:blog.csdn.net/qq330983778/article/details/99341671

业务流程

首先我们分析下这个流程

用户提交任务。首先将任务推送至延迟队列中。延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。然后生成延迟任务(仅仅包含任务id)放入某个桶中时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。完成消费后,发送finish消息,服务端根据job id删除对应信息。对象

我们现在可以了解到中间存在的几个组件

延迟队列,为Redis延迟队列。实现消息传递Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为jobDelay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个BucketReady Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态ready:可执行状态,delay:不可执行状态,等待时钟周期。reserved:已被消费者读取,但没有完成消费。deleted:已被消费完成或者已被删除。对外提供的接口额外的内容首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成

任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)

任务对象

@Data@AllArgsConstructor@NoArgsConstructorpublic class Job implements Serializable {    /**     * 延迟任务的唯一标识,用于检索任务     */    @JsonSerialize(using = ToStringSerializer.class)    private Long id;    /**     * 任务类型(具体业务类型)     */    private String topic;    /**     * 任务的延迟时间     */    private long delayTime;    /**     * 任务的执行超时时间     */    private long ttrTime;    /**     * 任务具体的消息内容,用于处理具体业务逻辑用     */    private String message;    /**     * 重试次数     */    private int retryCount;    /**     * 任务状态     */    private JobStatus status;}

任务引用对象

@Data@AllArgsConstructorpublic class DelayJob implements Serializable {    /**     * 延迟任务的唯一标识     */    private long jodId;        /**     * 任务的执行时间     */    private long delayDate;    /**     * 任务类型(具体业务类型)     */    private String topic;    public DelayJob(Job job) {        this.jodId = job.getId();        this.delayDate = System.currentTimeMillis() + job.getDelayTime();        this.topic = job.getTopic();    }    public DelayJob(Object value, Double score) {        this.jodId = Long.parseLong(String.valueOf(value));        this.delayDate = System.currentTimeMillis() + score.longValue();    }}
容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器。Spring Boot 学习笔记分享看下。

job任务池,为普通的K/V结构,提供基础的操作。Spring Boot 基础就不介绍了,推荐下这个实战教程:

@Component@Slf4jpublic class JobPool {        @Autowired    private RedisTemplate redisTemplate;    private String NAME = "job.pool";        private BoundHashOperations getPool () {        BoundHashOperations ops = redisTemplate.boundHashOps(NAME);        return ops;    }    /**     * 添加任务     * @param job     */    public void addJob (Job job) {        log.info("任务池添加任务:{}", JSON.toJSONString(job));        getPool().put(job.getId(),job);        return ;    }    /**     * 获得任务     * @param jobId     * @return     */    public Job getJob(Long jobId) {        Object o = getPool().get(jobId);        if (o instanceof Job) {            return (Job) o;        }        return null;    }    /**     * 移除任务     * @param jobId     */    public void removeDelayJob (Long jobId) {        log.info("任务池移除任务:{}",jobId);        // 移除任务        getPool().delete(jobId);    }}

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

@Slf4j@Componentpublic class DelayBucket {    @Autowired    private RedisTemplate redisTemplate;    private static AtomicInteger index = new AtomicInteger(0);    @Value("${thread.size}")    private int bucketsSize;    private List <String> bucketNames = new ArrayList <>();    @Bean    public List <String> createBuckets() {        for (int i = 0; i < bucketsSize; i++) {            bucketNames.add("bucket" + i);        }        return bucketNames;    }    /**     * 获得桶的名称     * @return     */    private String getThisBucketName() {        int thisIndex = index.addAndGet(1);        int i1 = thisIndex % bucketsSize;        return bucketNames.get(i1);    }    /**     * 获得桶集合     * @param bucketName     * @return     */    private BoundZSetOperations getBucket(String bucketName) {        return redisTemplate.boundZSetOps(bucketName);    }    /**     * 放入延时任务     * @param job     */    public void addDelayJob(DelayJob job) {        log.info("添加延迟任务:{}", JSON.toJSONString(job));        String thisBucketName = getThisBucketName();        BoundZSetOperations bucket = getBucket(thisBucketName);        bucket.add(job,job.getDelayDate());    }    /**     * 获得最新的延期任务     * @return     */    public DelayJob getFirstDelayTime(Integer index) {        String name = bucketNames.get(index);        BoundZSetOperations bucket = getBucket(name);        Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);        if (CollectionUtils.isEmpty(set)) {            return null;        }        ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];        Object value = typedTuple.getValue();        if (value instanceof DelayJob) {            return (DelayJob) value;        }        return null;    }    /**     * 移除延时任务     * @param index     * @param delayJob     */    public void removeDelayTime(Integer index,DelayJob delayJob) {        String name = bucketNames.get(index);        BoundZSetOperations bucket = getBucket(name);        bucket.remove(delayJob);    }}

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

@Component@Slf4jpublic class ReadyQueue {    @Autowired    private RedisTemplate redisTemplate;    private String NAME = "process.queue";    private String getKey(String topic) {        return NAME + topic;    }    /**     * 获得队列     * @param topic     * @return     */    private BoundListOperations getQueue (String topic) {        BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));        return ops;    }    /**     * 设置任务     * @param delayJob     */    public void pushJob(DelayJob delayJob) {        log.info("执行队列添加任务:{}",delayJob);        BoundListOperations listOperations = getQueue(delayJob.getTopic());        listOperations.leftPush(delayJob);    }    /**     * 移除并获得任务     * @param topic     * @return     */    public DelayJob popJob(String topic) {        BoundListOperations listOperations = getQueue(topic);        Object o = listOperations.leftPop();        if (o instanceof DelayJob) {            log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));            return (DelayJob) o;        }        return null;    }    }
轮询处理

设置了线程池为每个bucket设置一个轮询操作

@Componentpublic class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {    @Autowired    private DelayBucket delayBucket;    @Autowired    private JobPool     jobPool;    @Autowired    private ReadyQueue  readyQueue;        @Value("${thread.size}")    private int length;        @Override     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {        ExecutorService executorService = new ThreadPoolExecutor(                length,                 length,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue <Runnable>());        for (int i = 0; i < length; i++) {            executorService.execute(                    new DelayJobHandler(                            delayBucket,                            jobPool,                            readyQueue,                            i));        }            }}
测试请求
/** * 测试用请求 * @author daify **/@RestController@RequestMapping("delay")public class DelayController {        @Autowired    private JobService jobService;    /**     * 添加     * @param request     * @return     */    @RequestMapping(value = "add",method = RequestMethod.POST)    public String addDefJob(Job request) {        DelayJob delayJob = jobService.addDefJob(request);        return JSON.toJSONString(delayJob);    }    /**     * 获取     * @return     */    @RequestMapping(value = "pop",method = RequestMethod.GET)    public String getProcessJob(String topic) {        Job process = jobService.getProcessJob(topic);        return JSON.toJSONString(process);    }    /**     * 完成一个执行的任务     * @param jobId     * @return     */    @RequestMapping(value = "finish",method = RequestMethod.DELETE)    public String finishJob(Long jobId) {        jobService.finishJob(jobId);        return "success";    }    @RequestMapping(value = "delete",method = RequestMethod.DELETE)    public String deleteJob(Long jobId) {        jobService.deleteJob(jobId);        return "success";    }    }
测试添加延迟任务

通过postman请求:localhost:8000/delay/add

此时这条延时任务被添加进了线程池中

2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
获得任务

这时候我们请求localhost:8000/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
任务的删除/消费

现在我们请求:localhost:8000/delay/delete

此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:32019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}

(完)

标签: #redis队列消费慢