首页 / 消息队列 / 进程通信——POSIX 消息队列
进程通信——POSIX 消息队列
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了进程通信——POSIX 消息队列,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7843字,纯文字阅读大概需要12分钟。
内容图文
1.消息队列
消息队列与管道
相同点: 都借助内核空间,进行通信,若没有unlink,即使进程close,消息队列也会继续存在。 又因为借助内核空间,则存在用户空间和内核空间的互相数据拷贝,而消耗效率。 不同点: 消息队列类似短信,不需要对端在线。 管道类似电话,需要对端在线。
2.API
(1)mq_open
mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
mq_open() creates a new POSIX message queue or opens an existing queue.
flag必须值
O_RDONLY, O_WRONLY, O_RDWR
flag可选值
O_CLOEXEC, O_CREAT, O_EXCL, O_UNBLOCK
O_CREAT: 如果消息队列不存在,则新建,否则不起任何作用,相当于open O_EXCL 必须和O_CREAT连用,如果消息队列已存在,返回错误,errno=EEXIST O_NONBLOCK 消息队列的读写,默认是阻塞的,即, send时,如果队列满了会阻塞, recv时,如果队列空,会阻塞。 设置O_NONBLOCK,若异常,则直接返回错误,errno=EAGAIN
attr设置队列参数,attr==NULL则为默认参数。
(2)mq_receive
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio); #include <time.h> #include <mqueue.h> ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
优先级最高的,最老的消息出队, 输出于msg_ptr, msg_len指定 msg_ptr的 len, msg_len必须 大于等于 消息队列的项的大小(用mq_getattr获得)
若队列为空, 默认阻塞,直到不为空,或信号打断(返回错误,errno == EINTR), 设置O_NONBLOCK,直接返回错误(errno==EAGAIN), 使用 mq_timedreceive,可以设置阻塞的时间(绝对时间)
返回值: 成功返回接收的字节数
(3)mq_getattr
int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ };
mq_attr 都是在 新建 队列时,初始化。
mq_flags 只会显示0或O_NONBLOCK mq_maxmsg 队列最多消息数 mq_msgsize 队列消息最大长度 mq_curmsgs 队列当前消息个数
mq_setattr 只能设置 mq_flags 添加O_NONBLOCK
(4)mq_close
int mq_close(mqd_t mqdes);
关闭消息队列描述符。
并关闭已开启的通知请求,这样其他线程可以申请通知请求。
消息队列描述符在进程结束时或exec时自动关闭。
(5)mq_unlink
mqd_t mq_unlink(const char *name);
移除消息队列名称,
当所有打开该队列的进程都关闭了该队列,队列自己销毁。
3.应用
(1)单纯的接收
#include <stdlib.h> #include <errno.h> #include <stdio.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #define MQ_NAME "/mq_test" int main() { struct mq_attr attr = {0}; mqd_t mqid; int mq_len = 60, bytes; char *buf; attr.mq_msgsize = mq_len; attr.mq_maxmsg = 10; __again_open__: if (0 > (mqid = mq_open("/mq_test", O_RDONLY | O_CREAT | O_EXCL, 0777, &attr))) { if (errno == EEXIST) { mq_unlink(MQ_NAME); goto __again_open__; } perror("mq_open"); return -1; } if (0 > (buf = (char *)malloc(mq_len))) { perror("malloc"); return -1; } __again_recv__: if (0 > (bytes = mq_receive(mqid, buf, mq_len, NULL))) { if (errno == EINTR) goto __again_recv__; perror("mq_receive"); return -1; } printf("recv %d bytes : %s\n", bytes, buf); goto __again_recv__; return 0; }
(2)单纯发送
#include <string.h> #include <errno.h> #include <stdio.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #define MQ_NAME "/mq_test" int main(int argc, char **argv) { int bytes, msg_len; mqd_t mqid; struct mq_attr attr; char *msg = argv[1]; if (0 > (mqid = mq_open(MQ_NAME, O_WRONLY | O_NONBLOCK))) { perror("mq_open"); return -1; } __again__: if (0 > (bytes = mq_send(mqid, msg, strlen(msg), 10))) { if (errno == EAGAIN) { usleep(100); goto __again__; } perror("mq_send"); return -1; } printf("success to send %d bytes\n", bytes); return 0; }
--------------POSIX消息队列与异步通知
POSIX消息队列相对于SystemV消息队列的重要不同:
POSIX消息队列支持 信号 和 线程 实现异步通知。
1.API
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
用于线程 注册 或 撤销 一次 消息通知。
union sigval { /* Data passed with notification */ int sival_int; /* Integer value */ void *sival_ptr; /* Pointer value */ }; struct sigevent { int sigev_notify; /* Notification method */ int sigev_signo; /* Notification signal */ union sigval sigev_value; /* Data passed with notification */ void (*sigev_notify_function) (union sigval); /* Function for thread notification */ void *sigev_notify_attributes; /* Thread function attributes */ };
sigev_notify 用于指定 通知方式 SIGEV_NONE, 指定本线程接受通知,但事件发生后,不会进行通知 SIGEV_SIGNAL:使用信号通知 SIGEV_THREAD:使用线程通知 sigev_signo 如果使用 信号通知,则用于 指定 通知的信号 sigev_value 如果使用 信号通知,并且 sigaction 指定 SA_SIGINFO,则用于信号传参 如果使用 线程通知,则用于 线程传参 sigev_notify_function: 指定 线程 起始函数 sigev_notify_atrributes: 指定 线程 属性,使用pthread_attr_init 设置
只能在 一个线程/ 进程 上注册 通知。 只会在 消息队列 为空时,有消息 入队 时,释放通知。 一次 注册通知,只会通知一次,之后 通知会被 移除。 当 消息队列是 阻塞态,且有 进程或线程 正在 recv ,即阻塞在上面时,不会发送通知(建议 使用 NONBLOCK 的队列)。
3.应用
#include <pthread.h> #include <mqueue.h> #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define handle_error(msg) \ do { perror(msg); exit(EXIT_FAILURE); } while (0) static void /* Thread start function */ tfunc(union sigval sv) { struct mq_attr attr; ssize_t nr; void *buf; mqd_t mqdes = *((mqd_t *) sv.sival_ptr); /* Determine max. msg size; allocate buffer to receive msg */ if (mq_getattr(mqdes, &attr) == -1) handle_error("mq_getattr"); buf = malloc(attr.mq_msgsize); if (buf == NULL) handle_error("malloc"); nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL); if (nr == -1) handle_error("mq_receive"); printf("Read %ld bytes from MQ\n", (long) nr); free(buf); exit(EXIT_SUCCESS); /* Terminate the process */ } int main(int argc, char *argv[]) { mqd_t mqdes; struct sigevent not; assert(argc == 2); mqdes = mq_open(argv[1], O_RDONLY); if (mqdes == (mqd_t) -1) handle_error("mq_open"); not.sigev_notify = SIGEV_THREAD; not.sigev_notify_function = tfunc; not.sigev_notify_attributes = NULL; not.sigev_value.sival_ptr = &mqdes; /* Arg. to thread func. */ if (mq_notify(mqdes, ¬) == -1) handle_error("mq_notify"); pause(); /* Process will be terminated by thread function */ }
#include <unistd.h> #include <errno.h> #include <stdio.h> #include <signal.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> #define MQ_NAME "/mq_test" int g_catch_usr1_flag; void catch_usr1(int sig) { g_catch_usr1_flag = 1; } int main() { int msg_len = 256, bytes; struct mq_attr attr; mqd_t mqid; struct sigevent notification; struct sigaction act; char msg[msg_len + 1]; attr.mq_maxmsg = 10; attr.mq_msgsize = msg_len; __try_open__: if (0 > (mqid = mq_open(MQ_NAME, O_RDONLY | O_CREAT | O_EXCL | O_NONBLOCK, 0777, &attr))) { if (errno == EEXIST) { if (0 > mq_unlink(MQ_NAME)) { perror("mq_unlink"); return -1; } goto __try_open__; } perror("mq_open"); } signal(SIGUSR1, catch_usr1); notification.sigev_notify = SIGEV_SIGNAL; notification.sigev_signo = SIGUSR1; if (0 > mq_notify(mqid, ¬ification)) { perror("mq_notify"); return -1; } while (1) { if (0 == g_catch_usr1_flag) { usleep(300); continue; } g_catch_usr1_flag = 0; if (0 > mq_notify(mqid, ¬ification)) { perror("mq_notify"); return -1; } if (0 > (bytes = mq_receive(mqid, msg, msg_len, NULL))) { perror("mq_receive"); return -1; } msg[bytes] = 0; printf("recv %d bytes : %s\n", bytes, msg); } return 0; }
内容总结
以上是互联网集市为您收集整理的进程通信——POSIX 消息队列全部内容,希望文章能够帮你解决进程通信——POSIX 消息队列所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。