并发控制:进程通信之消息队列教程
消息队列是消息的链接表,存储在内核中,用队列标识符标识(XSI的标识符)。消息队列的每个消息对象至少两个对象:消息类型(长整型表示)和消息主体。结构体定义如下:
struct msg
{
long type;
char data[50];
};//此消息结构体由用户定义,开头一定是消息类型
每个消息队列都有一个msgid\_ds的结构体(XSI IPC都有)。主要内容如下:
struct msgid_ds
{
struct ipc_perm;
msgnum_t msg_qnum; //number of message on queue
msg_len_t msg_qbytes; //max length of message on queue
pid_t msg_lspid; //pid of last msgsnd()
pid_t msg_lrpid; //pid of last msgrcv()
};
该结构体的内容可以通过msgctl()获取(使用方法见进程通信之共享内存3.2)。
消息队列的使用方式通常是:发送方调用msgget()创建消息队列,调用msgsnd()发送消息;接收方调用msgget()获取消息队列ID,调用msgrcv()接收消息,在退出时调用msgctl()删除该消息队列。每个函数的具体使用方法如下:
/*
函数名:msgget
key:IPC的键,通常调用ftok生成或直接指定
msgflg:
IPC_CREAT:创建新的IPC,如果有,则打开
IPC_EXCL:通常和IPC_CREAT一起使用,如果IPC已经存在,创建失败,errno显示EEXIST
关于读写权限见下图
返回值:消息队列ID
*/
int msgget(key_t key, int msgflg);
/*
函数名:msgsnd
msqid:消息队列标识符
msg_ptr:消息格式的结构体
msg_sz:消息主体大小(不包含消息类型)
msgflg:如果为0,则在发送到消息队列前一直等待;如果为IPC_NOWAIT,则当队列满时,出错返回EAGAIN
*/
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
/*
函数名:msgrcv
参数基与msgsnd一致
msgtype:期望接收的消息类型,如果为0,则表示接收消息队列头的一条消息
*/
msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
/*
函数名:msgctl
通常用来删除消息队列
*/
int msgctl(int msqid, int command, struct msqid_ds *buf);
//删除消息队列时用法为:
msgctl(msgid,IPC_RMID,NULL);
关于msgget函数第二个参数msgflag,除了IPC\_CREAT和IPC\_EXCL,还有读写控制权限,如下图:
通常在调用一个XSI IPC时,都用如下形式:
msgget(key,IPC_CREAT|0666);//创建或打开一个消息队列并赋予所有读写权限
下面看一个完整的例子:写进程创建并写入消息;读进程读取消息,并在退出时删除消息队列
1 //写进程
2 #include <sys/msg.h>
3 #include <string.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <sys/ipc.h>
9
10 struct MSG
11 {
12 long type;
13 char msg[50];
14 };
15
16 int main()
17 {
18 key_t key=ftok("/tmp",1);
19 int msgid=msgget(key,IPC_CREAT|0666);
20 if(msgid<0)
21 {
22 printf("msgget error\n");
23 msgid=msgget(key,O_RDONLY); //获取ID
24 msgctl(msgid,IPC_RMID,NULL); //删除消息队列
25 exit(1);
26 }
27 MSG m;
28 char c;
29 while(1)
30 {
31 printf("Input type\n");
32 scanf("%ld",&(m.type));
33 scanf("%c",&c); //处理换行
34 printf("Input message\n");
35 fgets(m.msg,sizeof(m)-sizeof(long),stdin);
36 if(msgsnd(msgid,(void*)&m,50,0)==EAGAIN)
37 printf("Queue has no volumn\n");
38 if(strncmp(m.msg,"Quit",4)==0)
39 break;
40
41 }
42 exit(0);
43 }
44
45
46
47
48 //读进程,头文件与写进程相同
49 struct MSG
50 {
51 long type;
52 char msg[50];
53 };
54
55 int main()
56 {
57 MSG m;
58 key_t key=ftok("/tmp",1);
59 int msgid=msgget(key,IPC_CREAT|0666);
60 if(msgid<0)
61 {
62 printf("msgget error\n");
63 exit(1);
64 }
65 while(1)
66 {
67 if(msgrcv(msgid,(void*)&m,50,0,0)<0) //接收任何消息类型
68 {
69 printf("receive error\n");
70 if(errno==E2BIG) printf("E2BIG");
71 if(errno==EACCES) printf("EACCES");
72 if(errno==EAGAIN) printf("EAGAIN");
73 if(errno==EFAULT) printf("EFAULT");
74 if(errno==EIDRM) printf("EIDRM");
75 if(errno==EINTR) printf("EINTR");
76 if(errno==EINVAL) printf("EINVAL");
77 if(errno==ENOMSG)printf("ENOMSG");
78 exit(1);
79 }
80 printf("type:%ld msg:%s\n",m.type,m.msg);
81 if(strncmp(m.msg,"Quit",4)==0) break;
82 }
83 msgctl(msgid,IPC_RMID,NULL);
84 exit(0);
85 }
注:消息队列一旦创建,除非显示删除(调用msgctl,或命令行ipcrm -q msgid),将一直存在于内存中。当一个进程删除了某个消息队列后,其他进程再次调用该消息队列将出错。