龙空技术网

posix消息队列和systemV消息队列

爱音乐的程序员小新人 71

前言:

今天咱们对“sv 队列”大致比较关注,你们都想要学习一些“sv 队列”的相关内容。那么小编也在网上网罗了一些关于“sv 队列””的相关内容,希望姐妹们能喜欢,看官们一起来了解一下吧!

一、概述:

消息队列可认为是一个消息链表。有足够写权限的线程可往队列中放置消息,有足够读权限的

线程可从队列中取走消息。posix消息队列和systemV消息队列主要如下差异:

1、一般来说posix的接口要比systemV的简单,但是systemV的可已移植性更好几乎所有的unix系统都支持。

2、对posix消息队列的读总是返回最高优先级的最早消息,对systemV消息队列的读则可以返回

任意指定优先级的消息。

3、当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或者启动一个线程,systemV消息队列则不提供类似的机制。

二、posix消息队列

1、有如下主要函数:

#include <mqueue.h> mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */); int mq_close(mqd_t mqdes); int mq_unlink(const char *name);int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *attr, struct mq_attr *oattr); int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio); int mq_notify(mqd_t mqdes, const struct sigevent *notification);123456789

mqd_t mq_open(const char name, int oflag, / mode_t mode, struct mq_attr attr /);

功能:创建一个新的消息队列或者打开一个已经存在的消息队列。

返回值:若成功返回消息队列的描述符,失败返回-1

参数:

name : 消息队列的名称(必须以/开头,且后面不能再有/)

oflag: 标志。O_RDONLY,O_WRONLY,O_RDWR 三个中之一,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

O_RDONLY 只读

O_RDWR 读写

O_WRONLY 只写

O_CREAT 没有该对象则创建

O_EXCL 如果O_CREAT指定,但name不存在,就返回错误

O_NONBLOCK 以非阻塞方式打开消息队列

mode: 权限

S_IWUSR 用户/属主写

S_IRUSR 用户/属主读

S_IWGRP 组成员写

S_IRGRP 组成员读

S_IWOTH 其他用户写

S_IROTH 其他用户读

attr:给新队列指定某些属性,如果为空就使用默认属性。

struct mq_attr{

long mq_flags;//标志,其值为0表示阻塞,O_NONBlOCK表示非阻塞。可用mq_getatter和mq_setattr设置和获取

long mq_maxmsg;//消息队列的消息个数的最大值,只能在创建消息队列的时候设置

long mq_msgsize;//每个消息的最大值,只能在创建消息队列的时候设置

long mq_curmsgs://当前队列的消息个数,可用mq_getatter获取

}

创建消息队列成功后可在/dev/mqueue中查看。

int mq_close(mqd_t mqdes)

功能:关闭一个消息队列,调用之后表示进程不在使用该描述符并不代表消息队列会从系统中删除

参数: mqdes消息队列描述符

返回值:若成功返回0,失败返回-1

int mq_unlink(const char *name);

功能:从系统中删除一个名称为name的消息队列

参数: name 消息队列名称

返回值:若成功返回0,失败返回-1

说明:每个消息队列有一个保存当前打开着描述符数的引用计数器,mq_close会使计数器减一,只用调用mq_unlink并且引用计数器为0时该消息队列才会从系统中删除。

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

功能: mq_getattr返回队列的四个属性

参数:attr 属性返回到attr指的结构体中

返回值:若成功返回0,失败返回-1

int mq_setattr(mqd_t mqdes, struct mq_attr *attr, struct mq_attr *oattr);

功能: mq_setattr设置队列的mq_flags属性

参数:attr指向新设置属性的结构体,oattr指向返回旧的属性的结构体。

返回值:若成功返回0,失败返回-1

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

功能: 往消息队列中放置一个消息,消息队列已满,mq_send()函数将阻塞,知道有可用空间再次允许放置消息。

如果O_NONBLOCK被指定,满队时mq_send()将不会阻塞,而是返回EAGAIN错误。当一条消息被插入队列时它会被放置在队列中具有相同优先级的所有的消息之后。

参数:mqdes: 消息队列描述符。

msg_ptr:要发送消息的指针。

msg_len:消息长度(不能大于属性值mq_msgsize的值)。

msg_prio:优先级(消息在队列中将按照优先级从大到小的顺序排列消息;数字越大优先级越高。)。

返回值:若成功返回0,失败返回-1

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);

功能:按优先级从高到低进行接收。即优先级值大的先接收。如果队列为空,mq_receive()函数将阻塞,知道消息队列中有新的消息。

如果O_NONBLOCK被指定,mq_receive()将不会阻塞,而是返回EAGAIN错误。

参数:mqdes: 消息队列描述符。

ptr : 指向接收消息缓冲区

len : ptr指向缓冲区的大小(必须大于等于属性值mq_msgsize的值,否则返回EMSGSIZE错误)

prio: 如果prio不为NULL,那么接收到的消息优先级会被复制到prio指向的位置

返回值:若成功返回接收消息的长度,失败返回-1。

编译要加librt.so -lrt

一个测试的小程序

#include <stdio.h>#include <string.h>#include <unistd.h>#include <mqueue.h>#include <fcntl.h>#include <stdlib.h>#include <pthread.h>#include <sys/stat.h>#define FILE_MODE (S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH)void *sendthread(mqd_t mqd){ int rv; char *str = "hello posix mq_queue"; int len = strlen(str); rv = mq_send(mqd, str, len, 2); if(rv == -1) { perror("mq_send error"); } printf("send msg =[%s]\n", str);}void *recvthread(mqd_t mqd){ int rv; char str[1024] = {0}; unsigned int pro; rv = mq_receive(mqd, str, 1024, &pro); if(rv == -1) { perror("mq_receive error"); } printf("recv len = [%d], msg =[%s],pro = [%d]\n",rv, str, pro);}int main(int argc, char *argv[]){ int rv,flag = 0; long maxmsg = 10; long msglen = 1024; pthread_t pid1,pid2;  struct mq_attr attr, new_attr, old_attr; attr.mq_maxmsg = maxmsg; attr.mq_msgsize = msglen; attr.mq_flags = 0; mqd_t mqd = mq_open("/mq_test", O_CREAT|O_RDWR, FILE_MODE, &attr); if(-1 == mqd) { perror("mq_open error"); return 0; } memset(&new_attr, 0, sizeof(struct mq_attr)); memset(&old_attr, 0, sizeof(struct mq_attr)); rv = mq_getattr(mqd, &new_attr); if(rv == -1) { perror("mq_getattr error"); return 0; } printf("new_attr.mq_maxmsg = [%ld],new_attr.mq_msgsize= [%ld]\n", new_attr.mq_maxmsg, new_attr.mq_msgsize); new_attr.mq_flags = O_NONBLOCK; rv = mq_setattr(mqd, &new_attr, &old_attr); if(rv == -1) { perror("mq_setattr error"); return 0; } printf("new_attr.mq_flags = [%ld],old_attr.mq_flags= [%ld]\n", new_attr.mq_flags, old_attr.mq_flags); rv = pthread_create(&pid1, NULL, (void *)sendthread,mqd);  if(rv < 0)  {  printf("Create pthread error!\n");  return 1;  }  rv = pthread_create(&pid2, NULL, (void *)recvthread,mqd);  if(rv < 0)  {  printf("Create pthread error!\n");  return 1;  }  pthread_join(pid1,NULL); pthread_join(pid2,NULL); mq_unlink("/mq_test"); return 0;}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293

三、SystemV 消息队列

SystemV 消息队列使用消息标识符标识,无论任何进程只要具有权限就可以读写。

对于每个消息队列内核都会维护如下一个结构体

struct msqid_ds {  struct ipc_perm msg_perm; //存一些权限,创建信息之类的  struct msg* msg_first; //ptr to first message on queue  struct msg* msg_last; //ptr to last message on queue  msglen_t msg_cbytes; //current # bytes on queue  msgqnum_t msg_qnum; //current # of message on queue  msglen_t msg_qbytes; <span style="white-space:pre"> </span>//max \# of bytes allowed on queue  pid_t msg_lspid; //pid of last msgsnd  pid_t msg_lrpid; //pid of last msgrcv  time_t msg_stime; //time of last msgsnd()  time_t msg_rtime; //time of last msgrcv()  time_t msg_ctime; //time of last msgctl() }; ![这里写图片描述]()```12345678910111213141516

主要函数

#include <sys/msg.h>int msgget(key_t key, int oflag)int msgsnd(int msqid, const void * ptr, size_t length, int flag)ssize_t msgrcv (int msqid, void *ptr, size_t length, long type, int flag)int msgctl(int msqid, int cmd, struct msqid_ds *buf)12345

int msgget(key_t key, int oflag)

功能:创建一个新的消息队列或者访问一个已存在的消息队列。

参数: key 值通常为IPC_PRIVATE 或者ftok()函数的返回值。

msgflg: 读写权限值的组合,还可以与IPC_CREAT或 IPC_EXCL|IPC_CREAT

返回值:若成功返回非负标识符,失败返回-1

说明:当创建一个新的消息队列时,msqid_ds结构如下成员会被初始化

msg_perm 结构的uid和cuid成员被设置为当前进程的有效用户ID,gid和cgid成员被设置为当前进程的有效组ID。

oflag中的读写权限位存放在msg_perm.mode中

msg_qnum、msg_lspid、msg_lrpid、msg_stime和msg_rtime被设置为0

msg_ctime被设置为当前时间

msg_qbytes被设置为系统限制值

int msgsnd(int msqid, const void * ptr, size_t length, int flag)

功能:往消息队列上放置一个消息

参数:msqid 消息队列标识符

ptr 结构指针,结构体的常规形式为

struct msgbuf

{

long mtype; /消息类型,必须大于0/

char mtext[1];/消息内容/

};

常规形式并不代表必须,消息类型是必须要带的,后面的消息内容可以是其他类型或者其他类型的组合。

length: 消息长度,可以为0,值为sizeof(struct msgbuf) - sizeof(long)

flag : 标志,0或者IPC_NOWAIT,IPC_NOWAIT使得msgsnd调用非阻塞。

返回值:成功返回0,失败返回-1

ssize_t msgrcv (int msqid, void *ptr, size_t length, long type, int flag)

功能:从消息队列中取一个消息

参数:msqid 消息队列标识符

ptr, 指向接受消息的结构

length 指定了ptr指向的数据缓冲区的数据部分大小。该函数返回值最大不能超过length。可用sizeof(struct msgbuf) - sizeof(long)表示;

type 指定从消息队列中读出消息的类型。

如果type为0 那就返回队列中的第一个消息,即最早入队列的消息。

如果type大于0 那就返回类型为type的第一个消息。

如果type小于0,那就返回类型值小于或者等于type绝对值的消息中的类型值最小的第一个消息。

flag 当flag值为IPC_NOWAIT时,队列中没有消息,msgrcv会返回ENOMSG错误,否则msgrcv会阻塞到队列中有消息可取或者消息队列被删除。‘’

返回值:成功时返回读取的数据的长度,失败 返回-1

int msgctl(int msqid, int cmd, struct msqid_ds *buf)

功能:对消息队列的各种操作

参数:msqid 消息队列标识符

cmd 指令 IPC_RMID删除消息队列,队列中的数据也删除

IPC_SET 设置msqid_ds中msg_perm.uid 、msg_perm.gid ,msg_perm.mode 和 msg_qbytes,他们值来自buff结构中相应的成员

IPC_STAT 获取消息队列的msqid_ds 结构,值存放在buff中。

返回值:成功返回0,失败返回-1.

一个简单的测试

#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <string.h>typedef struct msg_buf{ long msg_type; char msg_text[256];}Msg; int main(){ int msgid, ret; Msg msg; int key; key = ftok("./systemv_queue.c", 10); msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666); if(msgid == -1) { if(errno == EEXIST) { key = ftok("./systemv_queue.c", 10); msgid = msgget(key, IPC_CREAT | 0666); } else { printf("msgget error!\n"); } return -1;  } msg.msg_type = getpid(); strcpy(msg.msg_text, "Hello everyone!"); ret = msgsnd(msgid, &msg, strlen(msg.msg_text), IPC_NOWAIT); if(ret == -1) { printf("msgsnd error!\n"); return -1; } sleep(5); memset(&msg, 0, sizeof(Msg)); ret = msgrcv(msgid, &msg, sizeof(msg.msg_text), 0, IPC_NOWAIT); if(ret == -1) { printf("msgrcv error!\n"); return -1; } printf("ret = [%d], msg.msg_type = [%d], msg.msg_text = [%s]\n", ret, msg.msg_type, msg.msg_text); return 0;}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960

查看系统的消息队列命令为:ipcs

ipcs -a 是默认的输出信息 打印出当前系统中所有的进程间通信方式的信息

ipcs -m 打印出使用共享内存进行进程间通信的信息

ipcs -q 打印出使用消息队列进行进程间通信的信息

ipcs -s 打印出使用信号进行进程间通信的信息

删除命令为ipcrm

ipcrm -M shmkey 移除用shmkey创建的共享内存段

ipcrm -m shmid 移除用shmid标识的共享内存段

ipcrm -Q msgkey 移除用msqkey创建的消息队列

ipcrm -q msqid 移除用msqid标识的消息队列

ipcrm -S semkey 移除用semkey创建的信号

ipcrm -s semid 移除用semid标识的信号

标签: #sv 队列