阻塞 / 非阻塞 VS 同步 / 异步
I/O 模型:
- 第一种阻塞 I/O 就是你去了书店,告诉老板你想要某本书,然后你就一直在那里等着,直到书店老板翻箱倒柜找到你想要的书。
- 第二种非阻塞 I/O 类似于你去了书店,问老板有没有一本书,老板告诉你没有,你就离开了。一周以后,你又来这个书店,再问这个老板,老板一查,有了,于是你买了这本书。
- 第三种基于非阻塞的 I/O 多路复用,你来到书店告诉老板:“老板,到货给我打电话吧,我再来付钱取书。”
- 第四种异步 I/O 就是你连去书店取书的过程也想省了,你留下地址,付了书费,让老板到货时寄给你,你直接在家里拿到就可以看了。
阻塞 I/O
阻塞 I/O 发起的 read 请求,线程会被挂起,一直等到内核数据准备好,并把数据从内核区域拷贝到应用程序的缓冲区中,当拷贝过程完成,read 请求调用才返回。接下来,应用程序就可以对缓冲区的数据进行数据解析。
非阻塞 I/O
非阻塞的 read 请求在数据未准备好的情况下立即返回,应用程序可以不断轮询内核,直到数据准备好,内核将数据拷贝到应用程序缓冲,并完成这次 read 调用。注意,这里最后一次 read 调用,获取数据的过程,是一个同步的过程。这里的同步指的是内核区域的数据拷贝到缓冲区的这个过程。
多路复用 + 非阻塞I/O
每次让应用程序去轮询内核的 I/O 是否准备好,是一个不经济的做法,因为在轮询的过程中应用进程啥也不能干。于是,像 select、poll 这样的 I/O 多路复用技术就隆重登场了。通过 I/O 事件分发,当内核数据准备好时,再通知应用程序进行操作。这个做法大大改善了应用进程对 CPU 的利用率,在没有被通知的情况下,应用进程可以使用 CPU 做其他的事情。
注意,这里 read 调用,获取数据的过程,也是一个同步的过程。
第一种阻塞 I/O 我想你已经比较了解了,在阻塞 I/O 的情况下,应用程序会被挂起,直到获取数据。第二种非阻塞 I/O 和第三种基于非阻塞 I/O 的多路复用技术,获取数据的操作不会被阻塞。
无论是第一种阻塞 I/O,还是第二种非阻塞 I/O,第三种基于非阻塞 I/O 的多路复用都是同步调用技术。
为什么这么说呢?
因为同步调用、 异步调用 的说法,是对于获取数据的过程而言的,前面几种最后获取数据的 read 操作调用,都是同步的,在 read 调用时, 内核 将数据从内核空间拷贝到应用程序空间,这个过程是在 read 函数中同步进行的,如果内核实现的拷贝效率很差,read 调用就会在这个同步过程中消耗比较长的时间。
异步I/O
而真正的异步调用则不用担心这个问题,我们接下来就来介绍第四种 I/O 技术,当我们发起 aio_read 之后,就立即返回,内核自动将数据从内核空间拷贝到应用程序空间,这个拷贝过程是异步的,内核自动完成的,和前面的同步操作不一样,应用程序并不需要主动发起拷贝动作。
IO多路复用:select poll epoll
select
返回int:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
int select(int maxfd, // 待测试的描述符基数
fd_set *readset, // 读描述符集合
fd_set *writeset, // 写描述符集合
fd_set *exceptset, // 异常描述符集合
const struct timeval *timeout // 本次调用的超时时间
);
如何设置这些描述符集合呢?以下的宏可以帮助到我们。
void FD_ZERO(fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
int FD_ISSET(int fd, fd_set *fdset);
FD_ZERO 用来将这个向量的所有元素都设置成 0;
FD_SET 用来把对应套接字 fd 的元素,a[fd]设置成 1;
FD_CLR 用来把对应套接字 fd 的元素,a[fd]设置成 0;
FD_ISSET 对这个向量进行检测,判断出对应套接字的元素 a[fd]是 0 还是 1。
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
这个参数设置成不同的值,会有不同的可能:
第一个可能是设置成空 (NULL),表示如果没有 I/O 事件发生,则 select 一直等待下去。
第二个可能是设置一个非零的值,这个表示等待固定的一段时间后从 select 阻塞调用中返回。
第三个可能是将 tv_sec 和 tv_usec 都设置成 0,表示根本不等待,检测完毕立即返回。
这种情况使用得比较少。
select阻塞后,会在内核空间循环遍历fd,直到遍历到某个fd数据就绪,打上标记(记录到fd_set里面),返回就绪fd的数量。
在遍历完一次内核fd之后,如果发现没有fd就绪,并不会马上遍历第二次(因为会占用大量的cpu进行遍历)。这个时候select会阻塞当前用户进程,当客户端向服务端发送数据时,数据会通过网络传输到达服务端的网卡,网卡会通过DMA的方式将这个数据包写入到指定的内存中。处理完成之后会通过中断信号告诉cpu有新的数据包到达,cpu收到中断信号后会响应中断,然后调用中断的处理程序进行处理:
- 首先就是根据数据包的ip和端口号找到对应的socket,然后把数据保存到socket的接收队列
- 然后再检查这个socket对应的一个等待队列里面是否有进程正在阻塞等待
- 如果有的话,则会唤醒该进程,然后我们用户进程唤醒之后,则会继续遍历检查fd集合。
返回后用户空间也不知道是哪个fd准备就绪,也需要遍历,找到那个就绪的fd,然后进行处理。
总结
将socket是否就绪检查逻辑下沉到操作系统层面,避免大量系统调用。告诉你有事件就绪,但是没有告诉你具体是哪个FD。
优点:
- 不需要每个fd都进行一次系统调用,解决了频繁的用户态内核态切换问题。
缺点:
- 单进程监听的FD存在限制,默认1024
- 每次调用需要将FD从用户态拷贝到内核态
- 不知道具体是哪个文件描述符就绪,需要遍历全部文件描述符
- 入参的3个fd_set集合每次调用都需要重置
例子
int main(int argc, char **argv) {
if (argc != 2) {
error(1, 0, "usage: select01 <IPaddress>");
}
int socket_fd = tcp_client(argv[1], SERV_PORT);
char recv_line[MAXLINE], send_line[MAXLINE];
int n;
fd_set readmask; // 值,不是指针
fd_set allreads; // 值,不是指针
FD_ZERO(&allreads); // 12 行通过FD_ZERO 初始化了一个描述符集合,这个描述符读集合是空的:
FD_SET(0, &allreads); // 使用 FD_SET 将描述符 0,即标准输入
FD_SET(socket_fd, &allreads); //以及连接套接字描述符 3 设置为待检测
//接下来的 16-51 行是循环检测,这里我们没有阻塞在 fgets 或 read 调用,而是通过 select 来检测套接字描述字有数据可读,或者标准输入有数据可读。比如,当用户通过标准输入使得标准输入描述符可读时,返回的 readmask 的值为:[0 0 0 1]
for (;;) {
readmask = allreads; // 第 17 行是每次测试完之后,重新设置待测试的描述符集合。
int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL); //第 18 行则是使用 socket_fd+1 来表示待测试的描述符基数。切记需要 +1。
if (rc <= 0) {
error(1, errno, "select failed");
}
// 如果是连接描述字准备好可读了,第 24 行判断为真,使用 read 将套接字数据读出。
if (FD_ISSET(socket_fd, &readmask)) {
n = read(socket_fd, recv_line, MAXLINE);
if (n < 0) {
error(1, errno, "read error");
} else if (n == 0) {
error(1, 0, "server terminated \n");
}
recv_line[n] = 0;
fputs(recv_line, stdout);
fputs("\n", stdout);
}
// 可以使用 FD_ISSET 来判断哪个描述符准备好可读了。这个时候是标准输入可读,37-51 行程序读入后发送给对端。
if (FD_ISSET(STDIN_FILENO, &readmask)) {
if (fgets(send_line, MAXLINE, stdin) != NULL) {
int i = strlen(send_line);
if (send_line[i - 1] == '\n') {
send_line[i - 1] = 0;
}
printf("now sending %s\n", send_line);
size_t rt = write(socket_fd, send_line, strlen(send_line));
if (rt < 0) {
error(1, errno, "write failed ");
}
printf("send bytes: %zu \n", rt);
}
}
}
}
12行:
13 14行:
36行,当用户通过标准输入使得标准输入描述符可读时,返回的 readmask 的值为:
poll
int poll(struct pollfd *fds, // 要监听的文件描述符集合
unsigned long nfds, // 文件描述符数量
int timeout // 本次调用的超时时间
);
返回值int:大于0:就绪描述符则为其数目。等于0:超时。小于0:出错则为-1。
struct pollfd {
int fd; // 监听的文件描述符
short events; // 监听的事件
short revents; // 就绪的事件
};
这个结构体由三个部分组成,首先是描述符 fd,然后是描述符上待检测的事件类型 events,
注意这里的 events 可以表示多个不同的事件,具体的实现可以通过使用二进制掩码位操作来完成,
events事件分类:
1. 可读事件
#define POLLIN 0x0001 /* any readable data available */
#define POLLPRI 0x0002 /* OOB/Urgent readable data */
#define POLLRDNORM 0x0040 /* non-OOB/URG data available */
#define POLLRDBAND 0x0080 /* OOB/Urgent readable data */
2. 可写事件
#define POLLOUT 0x0004 /* file descriptor is writeable */
#define POLLWRNORM POLLOUT /* no write type differentiation */
#define POLLWRBAND 0x0100 /* OOB/Urgent data can be written */
还有另一大类事件,没有办法通过 poll 向系统内核递交检测请求,
只能通过“returned events”来加以检测,这类事件是各种错误事件。
#define POLLERR 0x0008 /* 一些错误发送 */
#define POLLHUP 0x0010 /* 描述符挂起*/
#define POLLNVAL 0x0020 /* 请求的事件无效*/
总结
跟select基本类似,主要优化了监听1024的限制。底层使用的是动态 数组存储监听文件描述符,所以没有1024的限制,也不需要每次重置fd_set。
在 poll 函数里,我们可以控制 pollfd 结构的 数组 大小,这意味着我们可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。
优点:
- 不需要每个FD都进行一次系统调用,导致频繁的用户态内核态切换
- 因为可以突破 select 文件描述符的个数限制,在高并发的场景下尤其占优势。
缺点:
- 每次需要将FD从用户态拷贝到内核态
- 不知道具体是哪个文件描述符就绪,需要遍历全部文件描述符
例子
#define INIT_SIZE 128
int main(int argc, char **argv) {
int listen_fd, connected_fd;
int ready_number;
ssize_t n;
char buf[MAXLINE];
struct sockaddr_in client_addr;
listen_fd = tcp_server_listen(SERV_PORT);//一开始需要创建一个监听套接字,并绑定在本地的地址和端口上
//初始化pollfd数组,这个数组的第一个元素是listen_fd,其余的用来记录将要连接的connect_fd
struct pollfd event_set[INIT_SIZE];//在第 13 行,我初始化了一个 pollfd 数组,并命名为 event_set,之所以叫这个名字,是引用 pollfd 数组确实代表了检测的事件集合。这里数组的大小固定为 INIT_SIZE,这在实际的生产环境肯定是需要改进的。
event_set[0].fd = listen_fd;//14-15 行,将监听套接字 listen_fd 和对应的 POLLRDNORM 事件加入到 event_set 里
event_set[0].events = POLLRDNORM;
// 用-1表示这个数组位置还没有被占用
int i;
for (i = 1; i < INIT_SIZE; i++) { //如果对应 pollfd 里的文件描述字 fd 为负数,poll 函数将会忽略这个 pollfd,
event_set[i].fd = -1;//所以我们在第 18-21 行将 event_set 数组里其他没有用到的 fd 统统设置为 -1。这里 -1 也表示了当前 pollfd 没有被使用的意思。
}
for (;;) {
if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) {
error(1, errno, "poll failed ");
}
if (event_set[0].revents & POLLRDNORM) {
socklen_t client_len = sizeof(client_addr);
connected_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len);//调用 accept 函数获取了连接描述字
//找到一个可以记录该连接套接字的位置
for (i = 1; i < INIT_SIZE; i++) {
if (event_set[i].fd < 0) {
event_set[i].fd = connected_fd;
event_set[i].events = POLLRDNORM;
break;
}
}
if (i == INIT_SIZE) {//如果在数组里找不到这样一个位置,说明我们的 event_set 已经被很多连接充满了,没有办法接收更多的连接了,这就是第 41-42 行所做的事情。
error(1, errno, "can not hold so many clients");
}
if (--ready_number <= 0) //第 45-46 行是一个加速优化能力,因为 poll 返回的一个整数,说明了这次 I/O 事件描述符的个数,如果处理完监听套接字之后,就已经完成了这次 I/O 复用所要处理的事情,那么我们就可以跳过后面的处理,再次进入 poll 调用。
continue;
}
//接下来的循环处理是查看event_set里面其他的事件,也就是已连接套接字的可读事件。这是通过遍历 event_set 数组来完成的。
for (i = 1; i < INIT_SIZE; i++) {
int socket_fd;
if ((socket_fd = event_set[i].fd) < 0)//如果数组里的 pollfd 的 fd 为 -1,说明这个 pollfd 没有递交有效的检测,直接跳过;
continue;
if (event_set[i].revents & (POLLRDNORM | POLLERR)) {//来到第 53 行,通过检测 revents 的事件类型是 POLLRDNORM 或者 POLLERR,我们可以进行读操作。
if ((n = read(socket_fd, buf, MAXLINE)) > 0) {//在第 54 行,读取数据正常之后,再通过 write 操作回显给客户端;
if (write(socket_fd, buf, n) < 0) {
error(1, errno, "write error");
}
} else if (n == 0 || errno == ECONNRESET) {//在第 58 行,如果读到 EOF 或者是连接重置,则关闭这个连接,并且把 event_set 对应的 pollfd 重置;
close(socket_fd);
event_set[i].fd = -1;
} else {
error(1, errno, "read error");
}
if (--ready_number <= 0)//和前面的优化加速处理一样,第 65-66 行是判断如果事件已经被完全处理完之后,直接跳过对 event_set 的循环处理,再次来到 poll 调用。
break;
}
}
}
}
epoll
epoll_create
epoll_create() 方法创建了一个 epoll 实例,从 Linux 2.6.8 开始,参数 size 被自动忽略,但是该值仍需要一个大于 0 的整数。这个 epoll 实例被用来调用 epoll_ctl 和 epoll_wait,如果这个 epoll 实例不再需要,比如服务器正常关机,需要调用 close() 方法释放 epoll 实例,这样系统内核可以回收 epoll 实例所分配使用的内核资源。
关于这个参数 size,在一开始的 epoll_create 实现中,是用来告知内核期望监控的文件描述字大小,然后内核使用这部分的信息来初始化内核数据结构,在新的实现中,这个参数不再被需要,因为内核可以动态分配需要的内核数据结构。我们只需要注意,每次将 size 设置成一个大于 0 的整数就可以了。
int epoll_create(int size);
返回值: 若成功返回一个大于0的值,表示epoll实例;若返回-1表示出错
epoll_ctl
在创建完 epoll 实例之后,可以通过调用 epoll_ctl 往这个 epoll 实例增加或删除监控的事件。
返回值: 若成功返回0;若返回-1表示出错
int epoll_ctl(int epfd, //poll_create创建的epoll实例描述字,可以简单理解成是 epoll 句柄。
int op, //表示增加还是删除一个监控事件
int fd, //注册的事件的文件描述符,比如一个监听套接字
struct epoll_event *event //注册的事件类型
);
op参数三个选项:
1.EPOLL_CTL_ADD: 向 epoll 实例注册文件描述符对应的事件;
2.EPOLL_CTL_DEL:向 epoll 实例删除文件描述符对应的事件;
3.EPOLL_CTL_MOD: 修改文件描述符对应的事件。
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; // epoll事件类型
epoll_data_t data; /* User data variable */
};
epoll事件类型:
1.EPOLLIN:表示对应的文件描述字可以读;
2.EPOLLOUT:表示对应的文件描述字可以写;
3.EPOLLRDHUP:表示套接字的一端已经关闭,或者半关闭;
4.EPOLLHUP:表示对应的文件描述字被挂起;
5.EPOLLET:设置为 edge-triggered,默认为 level-triggered。
epoll_wait
epoll_wait() 函数类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发。
第二个参数epoll_event:
- 返回给用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定。
- 这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样。
- 这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据。
返回值: 成功返回的是一个大于0的数,表示事件的个数;返回0表示的是超时时间到;若出错返回-1.
int epoll_wait(int epfd, // epoll 实例描述字,也就是 epoll 句柄。
struct epoll_event *events, //返回给用户空间需要处理的 I/O 事件,这是一个数组
int maxevents, //一个大于 0 的整数,表示 epoll_wait 可以返回的最大事件值。
int timeout//第四个参数是 epoll_wait 阻塞调用的超时值,如果这个值设置为 -1,表示不超时;
// 如果设置为 0 则立即返回,即使没有任何 I/O 事件发生。
);
edge-triggered VS level-triggered
第一个程序我们设置为 edge-triggered,即边缘触发。开启这个服务器程序,用 telnet 连接上,输入一些字符,我们看到,服务器端只从 epoll_wait 中苏醒过一次,就是第一次有数据可读的时候。
$./epoll02
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
第二个程序我们设置为 level-triggered,即条件触发。然后按照同样的步骤来一次,观察服务器端,这一次我们可以看到,服务器端不断地从 epoll_wait 中苏醒,告诉我们有数据需要读取。
$./epoll03
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
...
- 条件触发的意思是只要满足事件的条件,比如有数据需要读,就一直不断地把这个事件传递给用户;
- 边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。
一般我们认为,边缘触发的效率比条件触发的效率要高,这一点也是 epoll 的杀手锏之一。
例子
#include "lib/common.h"
#define MAXEVENTS 128
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
int main(int argc, char **argv) {
int listen_fd, socket_fd;
int n, i;
int efd;
struct epoll_event event;
struct epoll_event *events;
listen_fd = tcp_nonblocking_server_listen(SERV_PORT);
efd = epoll_create1(0);//第 23 行调用 epoll_create0 创建了一个 epoll 实例。
if (efd == -1) {
error(1, errno, "epoll create failed");
}
//28-32 行,调用 epoll_ctl 将监听套接字对应的 I/O 事件进行了注册,这样在有新的连接建立之后,就可以感知到。
event.data.fd = listen_fd;
event.events = EPOLLIN | EPOLLET;//注意这里使用的是 edge-triggered(边缘触发)。
if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
error(1, errno, "epoll_ctl add listen fd failed");
}
/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof(event));//35 行,为返回的 event 数组分配了内存。
while (1) {
n = epoll_wait(efd, events, MAXEVENTS, -1);//主循环调用 epoll_wait 函数分发 I/O 事件,当 epoll_wait 成功返回时,
printf("epoll_wait wakeup\n");
for (i = 0; i < n; i++) {//通过遍历返回的 event 数组,就直接可以知道发生的 I/O 事件。
if ((events[i].events & EPOLLERR) ||//第 41-46 行判断了各种错误情况。
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (listen_fd == events[i].data.fd) {//第 47-61 行是监听套接字上有事件发生的情况下,。
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);//调用 accept 获取已建立连接,
if (fd < 0) {
error(1, errno, "accept failed");
} else {
make_nonblocking(fd); //并将该连接设置为非阻塞,
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET; //edge-triggered
if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1) {//再调用 epoll_ctl 把已连接套接字对应的可读事件注册到 epoll 实例中
error(1, errno, "epoll_ctl add connection fd failed");//这里我们使用了 event_data 里面的 fd 字段,将连接套接字存储其中。
}
}
continue;
} else {//第 63-84 行,处理了已连接套接字上的可读事件,读取字节流,编码后再回应给客户端。
socket_fd = events[i].data.fd;
printf("get event on socket fd == %d \n", socket_fd);
while (1) {
char buf[512];
if ((n = read(socket_fd, buf, sizeof(buf))) < 0) {
if (errno != EAGAIN) {
error(1, errno, "read error");
close(socket_fd);
}
break;
} else if (n == 0) {
close(socket_fd);
break;
} else {
for (i = 0; i < n; ++i) {
buf[i] = rot13_char(buf[i]);
}
if (write(socket_fd, buf, n) < 0) {
error(1, errno, "write error");
}
}
}
}
}
}
free(events);
close(listen_fd);
}
注意:listen_fd 和 io通信的fd是不一样的,fd要通过accept从listen_fd里面拿
流程分析
epoll_create会创建出一个eventpoll,里面3个参数,
- wq等待队列,主要是被阻塞的待唤醒的线程,等待被事件唤醒。
- rdllist就绪列表,fd对应的socket如果是就绪的数据已经到了,就添加到就绪列表里面。
- rbr红黑树,要监听的文件描述符通过红黑树进行存储,高效的增删改查。
epoll_ctl
-
fd拷贝到内核空间,并封装成epitem
- rbn关联到红黑树
- ffd关联到就绪列表
- ep关联到epoll对象
- pwqlist关联到回调函数,用来唤醒进程,执行后面的流程
epoll_wait
如果就绪队列为空:
当有数据来时
用户空间收到事件,进行处理
如果就绪队列不为空:
返回就绪事件,不会阻塞进程
总结
高效处理高并发下的大量连接,同时有非常优异的性能
缺点:
- 跨平台性不够好,只支持linux,macOS等操作系统不支持
- 相较于epoll,select更轻量可移植性更强
- 在监听连接数和事件较少的场景下,select可能更优