前言:
现在各位老铁们对“redis队列算法”可能比较关切,看官们都需要学习一些“redis队列算法”的相关文章。那么小编也在网络上收集了一些对于“redis队列算法””的相关知识,希望我们能喜欢,小伙伴们一起来了解一下吧!场景说明:异步处理,可用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时。高并发场景进行削峰,例如当某个时刻请求瞬间增加时,可以把请求写入到队列,后台再去慢慢处理这些请求。微服务架构中多模块之间解耦,例如订单模块下单之后,可以直接调用消息模块,仓库模块,但是当后续增多一个模块时,那么需要订单模块修改调用代码。可以将订单消息写入队列中,再由其他模块主动去队列获取订单消息然后进行相应处理。
以上场景适合使用消息队列来解决,消息队列的作用简单来说有三种作用,分别为异步,削峰,解耦。本遍文章主要讲解用redis实现多种类型的消息队列。
命令:
rpush + blpop 或 lpush + brpop
rpush : 往列表右侧推入数据
blpop : 客户端阻塞直到队列有值输出
简单队列:
simple.php
//simple.php$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));}$redis->close();
获取20000万个商品,并把json化后的数据推入goods:task队列
queueBlpop.php
// 出队while (true) { // 阻塞设置超时时间为3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); }}
设置blpop阻塞时间为3秒,当有数据出队时保存到goods:success:task表示执行成功,当队列没有数据时,程序睡眠10秒重新检查goods:task是否有数据出队
cli 模式执行命令:
php simple.phpphp queueBlpop.php
优先级队列
思路:
blpop 有多个键时,blpop会从左至右遍历键,一旦一个键能弹出元素,客户端立即返回。例如:
blpop key1 key2 key3 key4
从key1到key4遍历,如果哪个key有值,则弹出这个值,若多个key同时有值时,优先弹出排在左边的key。
priority.php
// 设置优先级队列$high = 'goods:high:task';$mid = 'goods:mid:task';$low = 'goods:low:task';$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小于100放在低级队列 if ($row['cid'] < 100) { $redis->rPush($low, json_encode($row)); } // cid 100到600之间放在中级队列 elseif ($row['cid'] > 100 && $row['cid'] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大于600放在高级队列 else { $redis->rPush($high, json_encode($row)); }}$redis->close();priorityBlop.php
// 优先级队列$high = 'goods:high:task';$mid = 'goods:mid:task';$low = 'goods:low:task';// 出队while(true){ // 优先级高的队列放在左侧 $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); }}
优先级高的队列放在blpop命令左侧,依次排序,blpop命令会依次弹出high, mid, low队列的值
cli 模式执行命令:
php priority.phpphp priorityBlpop.php
延迟队列
思路:
可以用一个有序集合来保存延迟任务,member保存任务内容,score保存(当前时间 + 延时时间)。用时间作为score。程序只要用有序集合的第一条任务的score和当前时间做比较,如果当前时间比score小,说明有序集合的所有任务还没到执行时间。
delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}
将20万条任务导入有序集合goods:delay:task,所有任务延迟到之后的1秒到300秒内执行
delayHandle.php
while (true) { // 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间 // 相对说明集合的其他任务未到执行时间 $rs = $redis->zRange('goods:delay:task', 0, 0, true); // 集合没有任务,睡眠时间设置为5秒 if (empty($rs)) { echo 'no tasks , sleep 5 seconds' . PHP_EOL; sleep(5); continue; } $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time(); // 到时间执行延时任务 if ($delay <= $now) { // 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改 if (!($identifier = acquireLock($task['id']))) { continue; } // 移动延时任务到任务队列 $redis->zRem('goods:delay:task', $taskJson); $redis->rPush('goods:task', $taskJson); echo $task['id'] . ' run ' . PHP_EOL; // 释放锁 releaseLock($task['id'], $identifier); } else { // 延时任务未到执行时间 $sleep = $delay - $now; // 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理// $sleep = $sleep > 2 ? 2 :$sleep; echo 'wait ' . $sleep . ' seconds ' . PHP_EOL; sleep($sleep); }}
这个文件对有序集合内的延迟任务做处理,如果延迟任务到了执行时间,则把延迟任务移动到任务队列中
queueBlpop.php
// 出队while (true) { // 阻塞设置超时时间为3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); }}
处理任务队列中的任务
cli模式下执行命令:
php delay.phpphp delayHanlde.phpphp queueBlpop.php总结
以上分别讲解了简单队列,优先级队列,延迟队列的使用的思路。redis作为队列使用非常的简单,但是存在数据丢失的情况,如果你的系统不允许数据丢失,可以使用专门的消息中间件,如rabbitmq,kafka,rocketmq,nsq等等。关于队列的使用我们还需要考虑消息的可靠性,有序性,重复消费等等问题。今天就先到这里了,如果喜欢我的文章,可以关注我,我将继续为大家带来优质的文章。
标签: #redis队列算法