前言:
今天看官们对“zmq 消息队列”大致比较关心,我们都需要学习一些“zmq 消息队列”的相关知识。那么小编在网上汇集了一些对于“zmq 消息队列””的相关内容,希望我们能喜欢,大家快快来学习一下吧!1.消息队列
消息队列简称MQ,先入先出的数据结构,队列里存放的是message,主要是用作不同server、不同进程,不同线程间通信。
2.消息队列使用场景
(1)异步处理
比如在消息队列后面,做短信通知,终端状态推送,App推送,用户注册等。
下图是同步和异步处理的结果:
异步处理优势:
能够更快返回结果,减少等待,实现并发处理,提升系统总体性能。
(2)流量控制
如下图,使用消息队列可以隔离网关和后端服务,达到流量控制和保护后端服务的目的。
(3)服务解耦
实现端与端之间的解耦
(4)发布订阅
比如头条推荐系统,推荐用户感兴趣的频道新闻。
(5)高并发缓冲
kafka日志服务,监控上报。
3.基本概念和原理
(1)生产者、消费者
消息生产者Producer:发送消息到消息队列。
消息消费者Consumer:从消息队列接收消息。
Broker:就是MQ服务器
(2)点对点消息队列模型
消息的生产者和消费者可以不同时处于运行状态。每一个成功处理的消息都由消息消费者签收确认
(3)发布订阅消息模型
发布订阅消息模型中,支持向一个特定的主题Topic发布消息,多个订阅者接收这个主题的信息。在这种模型下,发布者和订阅者彼此不知道对方。
(4)消息的顺序性保证
基于Queue消息模型,利用FIFO先进先出的特性,可以保证消息的顺序性。
(5)ACK确认机制
消息队列提供了消息Acknowledge机制,即ACK机制,保证消息不丢失。当消费者确认消息已经被消费处理,发送一个ACK给消息队列,此时 消息队列就可以删除这个消息了。若消费者宕机/关闭,没有发送ACK,消息队列就认为这个消息没有被处理,会把这个消息重新发送给其它的消费者重新消费处理。
(6)消息持久化
有些消息是非常关键,必须要保存,随时能够恢复,就需要启用消息持久化,当消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续向消费者处理。
(7)消息的同步和异步收发
同步方式:
同步收发场景下,消息生产者和消费者双向应答模式。同步表明的是一种关系,消息的接收如果以同步的方式(Pull)进行接收,如果队列中为空,此时接收将处于同步阻塞状态,会一直等待,直到消息的到达。
异步方式:
消息收发同样支持异步方式,异步发送消息,不需要等待消息队列的接收确认,异步接收消息的方式是以Push的方式触发消息消费者接收模型。
(8)消息的事务支持
消息的收发处理支持事务,次处理可能涉及多个消息的接收、处理,这处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。
4.消息队列方案
以下是一些常用的方案,用作对比。
5.ZeroMQ
如果要详细学习ZeroMQ,可以参考这本书<ZeroMQ云时代极速消息通讯库>。
官方API:
英文指南:
中文指南:,文档里面的代码有些是过时,需要参考提供的github链接。
性能测试:
下载编译安装zmq
官方文档网址:
源码下载地址:
安装库
sudo apt-get install libtool
sudo apt-get install pkg-config
sudo apt-get install build-essential
sudo apt-get install autoconf
sudo apt-get install automake
1 # 下载2 git clone cd libzmq4 # 查看tag5 git tag6 # 版本 获取指定的版本,不要⽤主分⽀,可能有bug7 git checkout v4.3.28 ./autogen.sh9 ./configure && make check10 sudo make install11 sudo ldconfig12 cd ..
sudo make install的是时候可以看到具体的.so和.a。
libtool: install: /usr/bin/install -c src/.libs/libzmq.lai /usr/local/lib/libzmq.la
libtool: install: /usr/bin/install -c src/.libs/libzmq.a /usr/local/lib/libzmq.a
libtool: install: chmod 644 /usr/local/lib/libzmq.a
libtool: install: ranlib /usr/local/lib/libzmq.a
libtool: finish:
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin:/sbin" ldconfig -
n /usr/local/lib
我们在编译的时候需要加上libzmq库,⽐如gcc -o bin file.c -lzmq
测试
从简单的代码开始,⼀段传统的Hello World程序。我们会创建⼀个客户端和⼀个服务端,客户端发送Hello给服务端,服务端返回World。下⽂是C语⾔编写的服务端,它在5555端⼝打开⼀个ZMQ套接字,等待请求,收到后应答World。
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
// gcc -o hwserver hwserver.c -lzmq
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
// 与客户端通信的套接字
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
while (1) {
// 等待客户端请求
char buffer [10];
zmq_recv (responder, buffer, 10, 0);
printf ("收到 Hello\n");
sleep (1); // Do some 'work'
// 返回应答
zmq_send (responder, "World", 5, 0);
}
return 0;
}
}
使⽤REQ-REP套接字发送和接受消息是需要遵循⼀定规律的。客户端⾸先使⽤zmq_send()发送消息,再⽤zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类似地,服务端必须先进⾏接收,后进⾏发送。
先运⾏服务端,再运⾏客户端。
#include <zmq.h> #include <string.h> #include <stdio.h> #include <unistd.h> int main (void) { printf ("Connecting to hello world server...\n"); void *context = zmq_ctx_new (); // 连接⾄服务端的套接字 void *requester = zmq_socket (context, ZMQ_REQ); zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr != 10; request_nbr++) { char buffer [10]; printf ("正在发送 Hello %d...\n", request_nbr); zmq_send (requester, "Hello", 5, 0); zmq_recv (requester, buffer, 10, 0); printf ("接收到 World %d\n", request_nbr); } zmq_close (requester); zmq_ctx_destroy (context); return 0;}
ZeroMQ吞吐量和延时性
ZeroMQ解决了哪些问题呢?
(1)调用socket接口多
(2)TCP是一对一的连接;一对多,reactor模式
(3)不支持跨平台编程
(4)要处理分包、组包问题;
(5)流式传输时要处理粘包、半包问题;
(6)要处理网络异常,如连接异常中断,重连等
(7)服务端和客户端启动有先后
(8)处理IO模型
(9)实现消息缓存
(10)实现消息的加密
ZeroMQ模型1
请求响应模型和发布订阅模型
ZeroMQ模型2
推拉模型和请求-路由-响应模型。
今天的文章就分享到这里,欢迎关注,点赞,转发,收藏!
标签: #zmq 消息队列