龙空技术网

一篇详解Redis -- 延时队列

编码码农 11589

前言:

眼前兄弟们对“redis队列算法”都比较注意,小伙伴们都想要知道一些“redis队列算法”的相关知识。那么小编同时在网络上网罗了一些对于“redis队列算法””的相关文章,希望咱们能喜欢,朋友们一起来了解一下吧!

Redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队

> rpush my-queue apple banana pear(integer) 3> llen my-queue(integer) 3> lpop my-queue"apple"> llen my-queue(integer) 2> lpop my-queue"banana"> llen my-queue(integer) 1> lpop my-queue"pear"> llen my-queue(integer) 0> lpop my-queue(nil)

空队列

如果队列为空,客户端会陷入 pop的死循环 , 空轮询 不仅拉高了 客户端的CPU , Redis的QPS 也会被拉高如果空轮询的客户端有几十个, Redis的慢查询 也会显著增加,可以尝试让客户端线程 sleep 1s但睡眠会导致消息的 延迟增大 ,可以使用 blpop/brpop (blocking, 阻塞读 )阻塞读在队列没有数据时,会立即进入 休眠 状态,一旦有数据到来,会立即被 唤醒 , 消息延迟几乎为0

空闲连接

如果线程一直阻塞在那里,Redis的客户端连接就成了 闲置连接闲置过久, 服务器 一般会 主动断开 连接, 减少闲置的资源占用 ,此时 blpop/brpop 会 抛出异常

锁冲突处理

分布式锁 加锁失败 的处理策略直接抛出异常 ,通知用户稍后重试sleep 后再重试将请求转移到 延时队列 ,过一会重试抛出异常这种方式比较适合由 用户直接发起 的请求sleepsleep会 阻塞 当前的消息处理线程,从而导致队列的后续消息处理出现 延迟如果 碰撞比较频繁 ,sleep方案不合适延时队列比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列 延后处理 来 避免冲突

延时队列

可以通过Redis的 zset 来实现延时队列将消息序列化成一个字符串作为zet的 value ,将该消息的 到期处理时间 作为 score然后 多线程轮询 zset获取 到期的任务 进行处理多线程是为了保障 可用性 ,但同时要考虑 并发安全 ,确保 任务不能被多次执行

public class RedisDelayingQueue<T> { @Data @AllArgsConstructor @NoArgsConstructor private static class TaskItem<T> { private String id; private T msg; } private Type taskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task)); } public void loop() { // 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费 while (!Thread.interrupted()) { // 只取一条 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // zrem是多线程多进程争夺任务的关键 TaskItem<T> task = JSON.parseObject(s, taskType); this.handleMsg(task.msg); } } } private void handleMsg(T msg) { try { System.out.println(msg); } catch (Throwable ignored) { // 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出 } } public static void main(String[] args) { final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo"); Thread producer = new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { queue.delay("zhongmingmao" + i); } } }; Thread consumer = new Thread() { @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException ignored) { } }}

标签: #redis队列算法