进程间通信(InterProcess Communication, 简称IPC),是指在不同进程之间交换或传递数据。
以Linux系统为例。
方式一:无名管道(pipe)
无名管道用于从父进程向子进程单向传递数据。
在Linux系统的<unistd.h>头文件中,定义了用于开启一个无名管道的函数pipe()
。
函数原型如下:
int pipe(int* ___pipedes);
这里的pipedes地址,需要传入的实际上是一个2位大小的int型数组地址。我们假设这个数组为pipe_fd[2]
,那么pipe_fd[0]
是管道读取端的文件标识符,pipe_fd[1]
是管道写入端的文件标识符。
以下是一个程序示例。在这个示例中,由该程序通过调用fork()函数创建出一个自身的副本,这个副本会作为该程序所属进程的子进程而存在。
#include <iostream>
#include <unistd.h>
int main()
{
// 创建两个管道的文件标识符,分别标识管道的两头
int pipe_fd[2] = {0};
char buffer[35] = {0};
// pid记录由父线程fork出来的子线程的线程ID
pid_t pid = 0;
if(pipe(pipe_fd) < 0) // pipe函数用于开启管道,传入值是管道文件标识符数组,创建成功后两个文件标识符原地传出,1为写,0为读,是单向的
{
std::cout << "管道创建失败!" << std::endl;
}
if((pid = fork()) < 0) // fork函数用于复制当前线程为一个副本作为当前线程的子线程。这个线程会被安排到管道另一端。
{
std::cout << "子线程创建失败!" << std::endl;
}
else if(pid > 0) // 如果这个线程的pid是大于0的,那么这个线程是父线程,得到的是子线程的pid
{
close(pipe_fd[0]); // 关闭这个进程的读取端管道,父进程只负责写
write(pipe_fd[1], "Hi there, this is DYY.", 22);
}
else // 如果是等于0的,那么这个线程是子线程
{
close(pipe_fd[1]); // 关闭这个进程的写入端管道,子进程只负责读
read(pipe_fd[0], buffer, 35);
std::cout << "从管道另一端读取到数据:" << buffer << std::endl;
}
return 0;
}
方式二:有名管道(FIFO)
FIFO用于在两个没有联系的进程之间单向地传递数据。
在开启FIFO通信之后,Linux会在硬盘中开启一个特殊的文件用于进行FIFO通信。在管道文件所在目录下执行ls -l
之后,可以观察到这个文件的类型属性是p
。
也就是说,FIFO的本质就是系统给两个进程建立了一个文件,写进程在文件里写数据,读进程在文件里读取数据。
这里有一个程序,这个程序的作用是建立一个FIFO文件(即开启一个FIFO有名管道)。
#include <sys/stat.h>
#include <stdio.h>
int main()
{
if(mkfifo("data/fifotest", 0666) != 0)
{
printf("FIFO有名管道创建失败!\n");
}
return 0;
}
注意这一个语句:
mkfifo("data/fifotest", 0666);
这个函数存在于<sys/stat.h>头文件内。函数定义如下:
int mkfifo (const char *__path, __mode_t __mode)
两个传入参数,前一个是FIFO文件的存放路径,后一个是该文件的八进制权限(类型是unsigned int)。
另外,mkfifo本身也是一个Linux命令,可以通过命令的方式建立FIFO文件,但是在这里不展开。
一个FIFO文件创建之后,需要被两个进程打开,其中一个进程以读取形式打开,另外一个进程以写入形式打开。
默认情况下,在其中一个进程通过调用open函数打开FIFO文件之后,程序会阻塞,直到FIFO的另一端也调用了open函数打开了FIFO。但是在open函数中,可以通过调整函数输入参数来进行非阻塞的操作。
open函数需要两个传入参数,前一个是文件位置,后一个是打开参数。
在这里,对应FIFO类型文件有几种打开参数:
O_WRONLY:阻塞式写入(Write only)
O_RDONLY:阻塞式读取(Read only)
O_WRONLY | O_NONBLOCK:非阻塞式写入(Write only | Non_block)
O_RDONLY | O_NONBLOCK:非阻塞式读取(Read only | Non_block)
下面是一个例子。
写入端:
#include <cstdio>
#include <sys/stat.h>
#include <fcntl.h> // File Control Operations
#include <stdlib.h>
#include <unistd.h>
int main()
{
int fd;
char buffer[1024] = "Hi there, this is DYY sending a mail via a FIFO.";
if((fd = open("data/fifotest", O_WRONLY)) < 0)
{
printf("尝试打开文件FIFO文件失败!是否没有运行打开FIFO的程序?\n");
exit(1);
}
if(write(fd, buffer, 48) < 0) // 通过write函数向文件中写入。
// 三个传入参数,第一个是文件描述符(这里的文件描述符应该指向FIFO文件),第二个是要写入的字符串,第三个是写入长度。
{
printf("写入管道失败。\n");
close(fd);
exit(2);
}
// sleep(2);
close(fd);
return 0;
}
读取端:
#include <unistd.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <fcntl.h>
#include <cstdio>
int main()
{
int fd;
char buffer[1024] = {0};
if((fd = open("data/fifotest", O_RDONLY)) < 0)
{
printf("打开FIFO失败。\n");
exit(1);
}
if(read(fd, buffer, 49) < 0) // 通过read函数从文件中读出数据。
// 三个传入参数,第一个是文件描述符(此处应该指向FIFO文件),第二个是存储读取到的数据的存储字符串,第三个是读取最大长度。
{
printf("读取FIFO数据失败!");
close(fd);
exit(2);
}
printf("%s\n", buffer);
// sleep(2);
close(fd);
return 0;
}
在运行这两个程序之前,首先先创建FIFO文件,然后依次运行这两个程序。
运行第一个程序之后,程序会阻塞,直到运行第二个程序(这两个程序的运行顺序无关,一端不论是读还是写永远会等待另一端连接),程序随即结束。
读取端可以读取到这一行信息:
Hi there, this is DYY sending a mail via a FIFO.
使用FIFO的限制是,两端必须提前知道FIFO文件的位置。也就是说,系统要提前给两端的程序提供FIFO接口。
方式三:消息队列
消息队列是存在于内核中的IPC机制,本质上是存储在内核中的一张链表。
消息队列独立于读程序和写程序而存在。不像之前的管道必须要依附于两个进程才能发挥作用,消息队列是解耦合于任何程序的。写程序只需要向消息队列里面扔消息,扔进去就可以跑了;读程序只需要从消息队列里面拿数据,不用等写程序往里面扔。
举个例子,就好像你在宿舍点外卖。外卖员是一个进程,他把外卖标注好你的名字放在校门口就可以跑了;你自己是另一个进程,你只要在校门口根据自己的信息拿掉自己的外卖就可以了。在整个过程中,外卖、外卖员和你自己是三个独立的个体,有联系但是互不影响,完成自己的任务就可以。
因此,消息队列的好处就是,它某种程度上是异步的,两端不需要阻塞。
需要注意的是,经过实验,发送和接收消息需要root权限。
消息队列中,一个消息以一个结构体类型的形式存在。这个结构体的形式如下:
struct msg_buf // 用于投入队列的消息格式,是一个结构体
{
long mtype; // 第一个成员是消息的类型
char mtext[]; // 第二个成员是消息的内容
};
在读写消息的时候,发送端会以这种形式传输自己的消息,接收端以这种形式解读传过来的数据。
这里有一个简单的例子,两个进程之间会做两次握手。
发送端:
// 消息队列进程发送端
#include <iostream>
#include <sys/msg.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
struct msg_buf // 用于投入队列的消息格式,是一个结构体
{
long mtype; // 第一个成员是消息的类型
char mtext[256]; // 第二个成员是消息的内容
};
int main()
{
msg_buf buffer;
int msg_iden = msgget(114514, IPC_CREAT); // 创建消息队列
if(msg_iden == -1)
{
std::cout << "消息队列创建失败!程序即将退出。" << std::endl;
exit(1);
}
buffer.mtype = 114; // 设置所要发送的消息类型是114
for(int i = 0; i < 256; i++)
{
buffer.mtext[i] = 0;
}
sprintf(buffer.mtext, "Hi there, this is DYY sending a mail via a message queue through client %d.", getpid());
if(msgsnd(msg_iden, &buffer, strlen(buffer.mtext), 0) == -1) // 发送消息
{
std::cout << "消息发送失败。" << std::endl;
exit(2);
}
int recv_length = msgrcv(msg_iden, &buffer, 256, 514, 0); // 接收返回的消息
if(recv_length == -1)
{
std::cout << "回信接收失败。" << std::endl;
exit(3);
}
else
{
std::cout << "接收到来自另一端的回信:" << std::endl << buffer.mtext << std::endl;
}
return 0;
}
接收端:
// 消息队列进程接收端
#include <iostream>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
struct msg_buf // 用于投入队列的消息格式,是一个结构体
{
long mtype; // 第一个成员是消息的类型
char mtext[256]; // 第二个成员是消息的内容
};
int main()
{
msg_buf buffer;
int msg_iden = msgget(114514, IPC_CREAT); // 创建消息队列
if(msg_iden == -1)
{
std::cout << "消息队列创建失败!程序即将退出。" << std::endl;
exit(1);
}
int recv_stat = msgrcv(msg_iden, &buffer, 256, 114, 0);
if(recv_stat == -1)
{
std::cout << "消息接收错误。" << std::endl;
exit(2);
}
else
{
std::cout << "接收到来自发送端的消息:" << std::endl;
std::cout << buffer.mtext << std::endl;
}
memset(buffer.mtext, 0, 256);
buffer.mtype = 514;
strcpy(buffer.mtext, "Roger. Now replying.");
if(msgsnd(msg_iden, &buffer, sizeof(buffer.mtext), 0) == -1)
{
std::cout << "回信接收失败。" << std::endl;
exit(3);
}
else
{
std::cout << "对方接收已确认。" << std::endl;
}
return 0;
}
发送进程的显示内容如下:
接收到来自另一端的回信:
Roger. Now replying.
接收进程的显示内容如下:
接收到来自发送端的消息:
Hi there, this is DYY sending a mail via a message queue through client 2244.
对方接收已确认。
下面来详细讲这两个程序里用到的消息队列相关的API和参数。再次提醒,调用这些API需要使用root权限。
#include <sys/msg.h>
// 创建或打开消息队列:成功返回队列ID,失败返回-1
int msgget(key_t key, int flag);
// 添加消息:成功返回0,失败返回-1
int msgsnd(int msqid, const void *ptr, size_t size, int flag);
// 读取消息:成功返回消息数据的长度,失败返回-1
int msgrcv(int msqid, void *ptr, size_t size, long type,int flag);
// 控制消息队列:成功返回0,失败返回-1
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msgget函数:
// 创建或打开消息队列:成功返回队列ID,失败返回-1
int msgget(key_t key, int flag);
这个函数的参数解释如下:
key
: 某个消息队列的名字。msgflg
:由九个权限标志构成,用法和创建文件时使用的mode模式标志是一样的。例如这个程序中所使用到的IPC_CREAT
,表示如果消息队列对象不存在,则创建一个消息队列对象,否则进行打开操作。
如果函数执行成功,就返回一个非负整数,用来表示这个消息队列的ID;如果执行失败,返回-1。
因此,对于成功IPC,比较关键的一点是两者需要有相同的key值。key值可以预先给两个程序定义好,也可以由ftok函数生成。
msgsnd函数
// 添加消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
这个函数的参数解释如下:
- msqid:由msgget函数生成并返回的消息队列ID。
- msgp:指向消息队列结构体的指针。
- msgsz:需要传输的消息队列结构体中mtext数组的大小。
- msgflg:传输参数。准确的说是控制当前消息队列满时的操作。
默认情况下,消息队列满时,这个函数会等待消息队列腾出空间,但是如果有IPC_NOWAIT参数,那么函数就不会等待,在空间满时直接返回EAGAIN错误。
函数执行成功返回0,否则返回-1。
msgrcv函数
// 读取消息
ssize_t msgrcv(int msqid, void *msgp,
size_t msgsz, long msgtyp, int msgflg);
这个函数的参数解释如下:
-
msqid:由msgget函数生成并返回的消息队列ID。
-
msgp:指向消息队列结构体的指针。
-
msgsz:需要传输的消息队列结构体中mtext数组的最大大小。
-
msgtype:它可以实现接收优先级的简单形式:
-
- msgtype = 0时,返回队列第一条信息;
-
- msgtype > 0时,返回队列第一条类型等于msgtype的消息;
-
- msgtype < 0时,返回队列第一条类型小于等于msgtype绝对值的消息。
-
msgflg:传输参数。 以下是一些传输参数的例子:
-
- MSG_NOERROR:不加该参数时,接收到比msgsz更大的消息时返回E2BIG错误,将消息保留在消息队列中;添加该参数时,按msgsz的大小限制截断。
-
- IPC_NOWAIT:同上,没有消息队列时直接返回ENOMSG错误而不等待。
函数执行成功返回实际放到接收缓冲区里去的字符个数;失败则返回-1。
以下是Linux编程手册中的参数描述,针对于上述两个函数:
The msgflg argument is a bit mask constructed by ORing together zero or more of the following flags:
IPC_NOWAIT
Return immediately if no message of the requested type is in the queue. The system call fails with er‐
rno set to ENOMSG.
MSG_COPY (since Linux 3.8)
Nondestructively fetch a copy of the message at the ordinal position in the queue specified by msgtyp
(messages are considered to be numbered starting at 0).
This flag must be specified in conjunction with IPC_NOWAIT, with the result that, if there is no mes‐
sage available at the given position, the call fails immediately with the error ENOMSG. Because they
alter the meaning of msgtyp in orthogonal ways, MSG_COPY and MSG_EXCEPT may not both be specified in
msgflg.
The MSG_COPY flag was added for the implementation of the kernel checkpoint-restore facility and is
available only if the kernel was built with the CONFIG_CHECKPOINT_RESTORE option.
MSG_EXCEPT
Used with msgtyp greater than 0 to read the first message in the queue with message type that differs
from msgtyp.
MSG_NOERROR
To truncate the message text if longer than msgsz bytes.
msgctl函数
// 控制消息队列的行为
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
这个函数的参数解释如下:
- msqid: 由msgget函数生成并返回的消息队列ID。
- cmd:将要采取的动作,下面会详细讲。
- buf:msqid_ds结构体在<sys/msg.h>的头文件中定义如下:
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of creation or last
modification by msgctl() */
unsigned long msg_cbytes; /* # of bytes in queue */
msgqnum_t msg_qnum; /* # number of messages in queue */
msglen_t msg_qbytes; /* Maximum # of bytes in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
- msg_stime:最后一次调用 msgsnd(2) 系统的时间。
- msg_rtime:最后一次调用 msgrcv(2) 系统的时间。
- msg_ctime:创建队列的时间或者最后一次进行 msgctl() IPC_SET 操作的时间。
- msg_cbytes:当前消息队列中所有消息的字节数。 这是一个非标准的Linux扩展,在POSIX中没有指定。
- msg_qnum:当前消息队列中消息的数量。
- msg_qbytes:在消息队列中允许的消息文本的最大字节数。
- msg_lspid:执行最后一次 msgsnd(2) 系统调用的进程的 ID。
- msg_lrpid:执行最后一次 msgrcv(2) 系统调用的进程的 ID。
- ipc_perm结构体定义如下:
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
mode的定义如下:
0400 Read by user
0200 Write by user
0040 Read by group
0020 Write by group
0004 Read by others
0002 Write by others
下面来讲cmd的三个命令:
- IPC_STAT:获取当前消息队列的状态,并赋值给一个buf;
- IPC_SET:将buf中预设的状态赋值给当前的消息队列;
- IPC_RMID:删除消息队列。buf传入空指针。
方式四:共享内存
共享内存可能可以说是最好理解也是最直接的办法。原理实际上和多线程并发时的共享内存类似,因此同样需要注意进程同步。
shmget函数
函数原型如下:
// 创建或者获取一个共享内存。
int shmget(key_t key, size_t size, int shmflg);
- key:同之前的消息队列一样,两端需要同一个key来访问同一段共享内存。
- size:共享内存的大小。
- shmflg:参数。常见参数如下:
-
- IPC_CREAT:创建或根据key访问已知的共享内存。
-
- IPC_EXCL:与IPC_CREAT一起使用,如果已经存在,不访问这段内存,直接返回错误。
如果函数执行成功,返回共享内存的ID,如果执行失败返回-1。
shmat函数
函数原型如下:
// 连接到共享内存。
void *shmat(int shmid, const void *shmaddr, int shmflg);
- shmid:shmget函数成功执行返回的共享内存ID。
- shmaddr:
-
- 如果传入指针为空,则由系统自动分配内存;
-
- 如果传入指针非空,在没有SHM_RND参数的情况下,必须传入一个页面对齐的地址,让共享内存连接到这个地址上;有上述参数时,系统会自动在这个地址周围寻找一个页面对齐的可用地址。
- shmflg:
-
- SHM_RND参数见上。
-
- SHM_RDONLY:将内存设置为只读。
-
- SHM_EXEC:使放入内存的数据可执行。
shmdt函数
函数原型如下:
// 断开与共享内存的连接。
int shmdt(const void *shmaddr);
传入共享内存地址。成功返回0,失败返回-1。
注意,这个函数不会物理删除这段内存空间,只是解绑而已。
shmctl函数
函数原型如下:
// 控制共享内存相关信息。
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
- shmid:共享内存ID。
- cmd:控制命令,下文详细讲。
- buf:一个shmid_ds类型的缓冲区指针。
结构体定义如下:
struct shmid_ds {
struct ipc_perm shm_perm; /* Ownership and permissions */
size_t shm_segsz; /* Size of segment (bytes) */
time_t shm_atime; /* Last attach time */
time_t shm_dtime; /* Last detach time */
time_t shm_ctime; /* Creation time/time of last
modification via shmctl() */
pid_t shm_cpid; /* PID of creator */
pid_t shm_lpid; /* PID of last shmat(2)/shmdt(2) */
shmatt_t shm_nattch; /* No. of current attaches */
...
};
ipc_perm结构体的定义见上。可用命令也与上方类似。
共享内存需要搭配信号量使用,但是信号量本身的概念需要一定的学习才可以理解。因此信号量的部分会放在下半节中。