龙空技术网

开源的基于Redis的NodeJS分布式消息队列——BullMQ

爱分享Coder 207

前言:

此时小伙伴们对“nodejs消息队列框架”可能比较珍视,同学们都需要分析一些“nodejs消息队列框架”的相关内容。那么小编也在网摘上搜集了一些对于“nodejs消息队列框架””的相关知识,希望朋友们能喜欢,小伙伴们快快来学习一下吧!

介绍

BullMQ是一个NodeJS库,它实现了基于Redis的快速而强大的队列系统。该库的设计使其可以实现以下目标:队列语义恰好是一次,即尝试一次准确地传递每条消息,但在最坏的情况下它将至少传递一次*。易于水平缩放。添加更多workers以并行处理作业。一致性、高性能。通过结合高效的.lua脚本和流水线,尝试从Redis获得最大的吞吐量。

Github

特点

如果是Message Queue的新手,你可能想知道为什么需要它们。队列可以用一种优雅的方式解决许多不同的问题,从平滑处理高峰到在微服务之间创建健壮的通信通道,或将繁重的工作从一台服务器转移到许多较小的worker,等等。

免轮询设计,CPU使用率最低基于Redis的分布式作业LIFO和FIFO作业优先事项延迟工作根据cron规范计划和重复的作业重试失败的作业每个worker的并发设置线程化(沙盒)处理功能从进程崩溃中自动恢复

BullMQ基于4个类,可以一起用来解决许多不同的问题。这些类是Queue,Worker,QueueScheduler和QueueEvents。首先应该了解的第一类是Queue类。此类表示队列,可用于将作业添加到队列以及其他一些基本操作,例如暂停,清除或从队列中获取数据。BullMQ中的作业基本上是用户创建的数据结构,可以存储在队列中。作业由工人处理。Worker是需要注意的第二类。工作者是能够处理作业的实例。可以有许多工作程序,可以在同一NodeJS进程中运行,也可以在不同的进程中以及在不同的计算机中运行。它们都会从队列中消费作业,并将作业标记为已完成或失败。

快速开始

BullMQ用typescript编写,尽管可以在普通javascript中使用,但有示例都将使用typescript编写。

安装

yarn add bullmq
引入
import { Queue } from 'bullmq'; const myQueue = new Queue('foo'); async function addJobs(){ await myQueue.add('myJobName', { foo: 'bar' }); await myQueue.add('myJobName', { qux: 'baz' }); } addJobs();

当然你需要在本地计算机上运行Redis服务才能成功运行示例。

作业被添加到队列中并且可以随时处理,至少有一个运行工作器的Nodejs进程:

import { Worker } from 'bullmq' const worker = new Worker(queueName, async job => { // Will print { foo: 'bar'} for the first job // and { qux: 'baz' } for the second. console.log(job.data):});

可以拥有许多所需的工作流程,BullMQ将以循环方式在workers之间分配工作。

可以以通过将侦听器附加到工作程序来侦听已完成(或失败)的工作

worker.on('completed', (job) => {console.log(`${job.id} has completed!`);});worker.on('failed', (job, err) => {console.log(`${job.id} has failed with ${err.message}`);});

有时需要在给定位置监听所有worker事件,因此,需要使用特殊的class QueueEvents:

import { QueueEvents } from 'bullmq'const queueEvents = new QueueEvents();queueEvents.on('completed', (jobId) => {console.log(`${jobId} has completed!`);});queueEvents.on('failed', (jobId, err) => {console.log(`${jobId} has failed with ${err.message}`);});

全局事件侦听器仅返回作业ID,而不返回作业实例。这是出于性能原因,如果需要完整的作业,则可以始终使用Queue ## getJob方法。

更多高级或者深入的用法可参考官方文档!

总结

BullMQ是一个基于Redis的分布式队列,通过队列能够解决你很多问题,就目前而言,其潜力很大,如果你正有这方面的需求,不妨尝试一下它!

标签: #nodejs消息队列框架 #nodejs消息队列库