进程间通信--消息队列教程
消息队列(message queue):是进程间通讯的一种常用的方式,可以传递多种类型的数据流,可以实现异步传输。
消息队列是两个或者多个应用程序约定好的一种交互方式,体现在彼此知晓共同的消息队列 ID, 该收取哪种类型的消息。
下面开始介绍应用的过程,附着的代码是在某公司开发feature时候加入的
1. 创建IPC key
key\_t ftok(const char* pathname, int proj\_id)
- pathname 是一个文件的路径,需要注意的是在实际进程间通讯时,需要保证pathname一直存在且不会被修改, proj\_id是一个标识符,常用一个字符表示
- 两个进程想要实现消息队列通信,必须有相同的key,因此pathname,proj\_id是相同的
- 创建成功返回key\_t值,失败为-1,原因可查看errno
- 代码如下:
#define NEIGBOR_INFO_GET "/tmp"
#define NEIGBOR_INFO_GET_PROJ 'x'
...
key = ftok(NEIGBOR_INFO_GET, NEIGBOR_INFO_GET_PROJ);
if(key == -1) {
printf("[%s][%d] creat key error!\n", __FUNCTION__, __LINE__);
return res;
}
2. 获取消息队列 ID
int msgget(key\_t key, int msgflg)
- key是ftok创建所得,flag是对消息队列的操作,常用IPC\_CREAT,即如果消息队列已存在,则返回消息队列 ID,否则创建新的消息队列, 返回 ID
- 消息队列在创建的时候是可以设置读写权限的 | 0666即 设置为可读可写
- flag如果指定为IPC\_CREAT | IPC\_EXCL ,在消息队列已存在下,会返回-1, errno为EEXIST
- 返回值为消息队列的 ID, 否则为-1, 原因可查看errno
- 代码如下:
...<br></br>int gs_getneigborinfo_qid;
...
gs_getneigborinfo_qid = msgget(key, IPC_CREAT | 0666);
if(gs_getneigborinfo_qid == -1) {
printf("[%s][%d], create message deque error!\n", __FUNCTION__, __LINE__);
return res;
}
3. 消息类型定义 msgbuf
msgbuf定义有一定的规范,常用如下:
struct msgbuf {
long type;
char mtext[1];
};
每一个msg中必须含有一个 long 类型的type,用于区分消息类型,后面的数据是可以自定义的,如下:
...
#define CHASSISID_MAX_SIZE 255
#define PORTID_MAX_SIZE 255
...
struct neighborInfo {
long type;
char chassis_id[CHASSISID_MAX_SIZE + 1];
char port_id[PORTID_MAX_SIZE + 1];
};
4. 发送消息 msgsnd
int msgsnd(int msgid, const void *msgp, size\_t msgsz, int msgflg)
- msgid:指定的消息队列 ID
- msgp:待发送的数据buff
- msgsz:待发送数据大小,常用sizeof(struct msgbuf) - sizeof(long)
- msgflg:常为0:表示阻塞发送,ICP\_NOWAIT:非阻塞发送
- 成功返回0,否则为-1, 错误码为errno
- 代码如下:
...
#define RESPONSE_INFO_TYPE 1
#define REQUEST_INFO_TYPE 2
struct neighborInfo temp;
memset(&temp, 0, sizeof(temp));
int rs = -1;
temp.type = REQUEST_INFO_TYPE;
...
rs = msgsnd(gs_getneigborinfo_qid, &temp, sizeof(temp) - sizeof(long), IPC_NOWAIT);
if(rs == -1) {
printf("[%s][%d] msg send error!\n", __FUNCTION__, __LINE__);
// pthread_mutex_unlock(&gs_pthread_lock);
return rs;
}
5. 接收消息 msgrcv
ssize\_t msgrcv(int msgid, void *msgp, size\_t msgsz, long msgtyp, int msgflg)
- msgtyp:需要接收的消息类型,如果为0, 则读取消息队列的第一条消息,不管其消息类型
- 读取成功返回读取的消息长度,否则为-1,错误码查看errno
- 代码如下:
...<br></br>// need to recv info
memset(&temp, 0, sizeof(temp));
int count = 200;
do {
rs= msgrcv(gs_getneigborinfo_qid, &temp, sizeof(temp), RESPONSE_INFO_TYPE, IPC_NOWAIT);
count--;
if(rs == -1) {
usleep(1000); // wait 1ms to rerecive data
}
} while(rs == -1 && count > 0 && errno == ENOMSG); <br></br>...
6. 消息队列控制 msgctl
int msgctl(int msqid, int cmd, struct msqid\_ds *buf);
- msgid:消息队列 ID
- cmd:控制命令 IPC\_STAT::将消息队列信息存放到buf中,IPC\_RMID:删除消息队列
- buf: 消息队列信息存放
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
The ipc_perm structure is defined as follows (the highlighted fields are settable using IPC_SET):
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};