目录
I/O 多路转接之 poll
poll 函数接口
参数说明
events 和 revents 的取值:
返回结果
socket 就绪条件
poll 的优点
poll 的缺点
poll 示例: 使用 poll 监控标准输入
I/O 多路转接之 epoll
epoll 初识
epoll 的相关系统调用
epoll_create
epoll_ctl
epoll_wait
epoll 工作原理
epoll 的优点(和 select 的缺点对应)
对比总结 select, poll, epoll 之间的优点和缺点(重要).
总结对比表
epoll 工作方式
例子
水平触发 Level Triggered 工作模式
边缘触发 Edge Triggered 工作模式
对比 LT 和 ET
理解 ET 模式和非阻塞文件描述符
epoll 的使用场景
epoll 中的惊群问题
1. 惊群效应产生的原因
2. 惊群问题的解决方法
2.1 多线程环境下的解决方法
2.2 多进程环境下的解决方法
lighttpd 的做法:无视惊群
nginx 的做法:使用全局锁确保同一时刻只有一个子进程在等待
epoll 示例: epoll 服务器(LT 模式)
tcp_epoll_server.hpp
epoll 示例: epoll 服务器(ET 模式)
tcp_socket.hpp
tcp_epoll_server.hpp
I/O 多路转接之 poll
poll 函数接口
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout); // pollfd 结构 struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ };参数说明
fds 是一个 poll 函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
nfds 表示 fds 数组的长度.
timeout 表示 poll 函数的超时时间, 单位是毫秒(ms).
events 和 revents 的取值:
| 事 件 | 描 述 | 是否可作为输入 | 是否可作为输出 |
|---|---|---|---|
| POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
| POLLRDNORM | 普通数据可读 | 是 | 是 |
| POLLRDBAND | 优先级带数据可读(Linux 不支持) | 是 | 是 |
| POLLPRI | 高优先级数据可读,比如 TCP 带外数据 | 是 | 是 |
| POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
| POLLWRNORM | 普通数据可写 | 是 | 是 |
| POLLWRBAND | 优先级带数据可写 | 是 | 是 |
| POLLRDHUP | TCP 连接被对方关闭,或者对方关闭了写操作。它由 GNU 引入 | 是 | 是 |
| POLLERR | 错误 | 否 | 是 |
| POLLHUP | 挂起。比如管道的写端被关闭后,该端描述符上将收到 POLLHUP 事件 | 否 | 是 |
| POLLNVAL | 文件描述符没有打开 | 否 | 是 |
返回结果
返回值小于 0, 表示出错;
返回值等于 0, 表示 poll 函数等待超时;
返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回.
socket 就绪条件
同 select12、多路转接 select-CSDN博客
poll 的优点
不同于 select 使用三个位图来表示三个 fdset 的方式, poll 使用一个 pollfd 的指针实现.
pollfd 结构包含了要监视的 event 和发生的 event, 不再使用 select"参数-值"传递的方式, 接口使用比 select 更方便.
poll 并没有最大数量限制 (但是数量过大后性能也是会下降).
poll 的缺点
poll 中监听的文件描述符数目增多时
和 select 函数一样, poll 返回后, 需要轮询 pollfd 来获取就绪的描述符.
每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
poll 示例: 使用 poll 监控标准输入
#include <poll.h> #include <unistd.h> #include <stdio.h> int main() { struct pollfd poll_fd; poll_fd.fd = 0; poll_fd.events = POLLIN; for (;;) { int ret = poll(&poll_fd, 1, 1000); if (ret < 0) { perror("poll"); continue; } if (ret == 0) { printf("poll timeout\n"); continue; } if (poll_fd.revents == POLLIN) { char buf[1024] = {0}; read(0, buf, sizeof(buf) - 1); printf("stdin:%s", buf); } } }I/O 多路转接之 epoll
epoll 初识
按照 man 手册的说法:是为处理大批量句柄而作了改进的 poll.
它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点, 被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法.
epoll 的相关系统调用
epoll 有 3 个相关的系统调用.
epoll_create
int epoll_create(int size);创建一个 epoll 的句柄.
自从 linux2.6.8 之后, size 参数是被忽略的.
用完之后, 必须调用 close()关闭.
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epoll 的事件注册函数.
它不同于 select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
第一个参数是 epoll_create()的返回值(epoll 的句柄).
第二个参数表示动作, 用三个宏来表示.
第三个参数是需要监听的 fd.
第四个参数是告诉内核需要监听什么事.
第二个参数的取值:
EPOLL_CTL_ADD: 注册新的 fd 到 epfd 中;
EPOLL_CTL_MOD: 修改已经注册的 fd 的监听事件;
EPOLL_CTL_DEL: 从 epfd 中删除一个 fd;
struct epoll_event 结构如下:
events 可以是以下几个宏的集合:
EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT : 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里.
epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);收集在 epoll 监控的事件中已经发送的事件.
参数 events 是分配好的 epoll_event 结构体数组.
epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针, 内核只负责把数据复制到这个 events 数组中, 不会去帮助我们在用户态中分配内存).
maxevents 告之内核这个 events 有多大, 这个 maxevents 的值不能大于创建 epoll_create()时的 size.
参数 timeout 是超时时间 (毫秒, 0 会立即返回, -1 是永久阻塞).
如果函数调用成功, 返回对应 I/O 上已准备好的文件描述符数目, 如返回 0 表示已超时, 返回小于 0 表示函数失败.
epoll 工作原理
当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体, 这个结构体中有两个成员与 epoll 的使用方式密切相关.
struct eventpoll{ .... /*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/ struct rb_root rbr; /*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/ struct list_head rdlist; .... };每一个 epoll 对象都有一个独立的 eventpoll 结构体, 用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件.
这些事件都会挂载在红黑树中, 如此, 重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn, 其中 n 为树的高度).
而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系, 也就是说, 当响应的事件发生时会调用这个回调方法.
这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双链表中.
在 epoll 中, 对于每一个事件, 都会建立一个 epitem 结构体.
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的 eventpoll 对象 struct epoll_event event; //期待发生的事件类型 }当调用 epoll_wait 检查是否有事件发生时, 只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.
如果 rdlist 不为空, 则把发生的事件复制到用户态, 同时将事件数量返回给用户. 这个操作的时间复杂度是 O(1).
总结一下, epoll 的使用过程就是三部曲:
调用 epoll_create 创建一个 epoll 句柄;
调用 epoll_ctl, 将要监控的文件描述符进行注册;
调用 epoll_wait, 等待文件描述符就绪;
epoll 的优点(和 select 的缺点对应)
接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响.
没有数量限制: 文件描述符数目无上限.
注意
网上有些博客说, epoll 中使用了内存映射机制
内存映射机制: 内核直接将就绪队列通过 mmap 的方式映射到用户态. 避免了拷贝内存这样的额外性能开销.
这种说法是不准确的. 我们定义的 struct epoll_event 是我们在用户空间中分配好的内存. 势必还是需要将内核的数据拷贝到这个用户空间的内存中的.
对比总结 select, poll, epoll 之间的优点和缺点(重要).
1. select
| 优点 | 缺点 |
|---|---|
| - 跨平台支持最好(POSIX 标准,几乎所有系统) - 接口简单,易于理解 | -最大文件描述符数量受限:默认FD_SETSIZE=1024,修改需要重新编译内核-每次调用都要复制 fd_set从用户态到内核态,开销随 fd 数量线性增长 -O(n) 扫描:内核需要线性遍历所有 fd 检查就绪状态,fd 数量大时效率极低 -fd_set 每次需要重新填充:用户态需要维护三组位图,重复赋值 -不支持边缘触发,仅水平触发 |
2. poll
| 优点 | 缺点 |
|---|---|
| -无最大 fd 数量限制(受系统内存限制) -数据结构更灵活: pollfd分离events和revents,无需每次重置输入值- 同样是跨平台(POSIX) | -每次调用仍要复制整个 fd 数组从用户态到内核态 -O(n) 扫描:内核仍需遍历所有传入的 fd 检查就绪状态,与 select 同级别性能瓶颈 -不适用高并发低活跃场景:大量空闲连接也会被无差别遍历 - 仅水平触发 |
3. epoll
| 优点 | 缺点 |
|---|---|
| -无 fd 数量上限(受系统内存限制) -就绪事件直接返回:通过回调机制,仅将活跃的 fd 加入就绪链表, epoll_wait只需拷贝这些事件到用户态,无需扫描全部 fd →O(1) 时间复杂度-fd 注册一次性拷贝: epoll_ctl将 fd 加入内核事件表(红黑树),后续调用无需重复拷贝整个集合-支持 LT 和 ET 两种模式,边缘触发可进一步减少系统调用次数 -内存使用更高效:红黑树 + 就绪链表结构 | -仅 Linux 支持(非跨平台,BSD 有 kqueue,但接口不同) -接口复杂:需要 epoll_create、epoll_ctl、epoll_wait三步,使用门槛较高-边缘触发模式下编程需要谨慎:必须一次性读写完数据,否则可能丢失事件 -对短连接、小量 fd 场景未必优于 poll:额外维护红黑树的开销可能反而不如简单遍历 |
总结对比表
| 特性 | select | poll | epoll |
|---|---|---|---|
| 最大 fd 数量 | 1024(通常) | 无上限(受内存限制) | 无上限(受内存限制) |
| 内核扫描开销 | O(n) 全量扫描 | O(n) 全量扫描 | O(1) 仅返回就绪事件 |
| 用户态→内核态拷贝 | 每次调用全量拷贝 fd_set | 每次调用全量拷贝 fd 数组 | 仅 epoll_ctl 时拷贝一次 |
| 工作模式 | 仅水平触发 (LT) | 仅水平触发 (LT) | LT / ET |
| 内核实现 | 位图(fd_set) | 数组/链表 | 红黑树 + 就绪链表 |
| 适用场景 | 连接数少(< 1024) | 连接数中等,活跃连接较多 | 海量连接,活跃连接比例很低 |
| 跨平台性 | 几乎全平台 | 几乎全平台 | 仅 Linux |
Q:为什么 epoll 在海量连接、低活跃场景下性能远优于 select/poll?
A:因为 select/poll 每次调用都要遍历全部连接(O(n)),即使只有一个活跃;而 epoll 通过回调机制只处理活跃连接(O(1) 返回就绪事件列表)。
Q:边缘触发(ET)有何好处?
A:减少epoll_wait调用次数,因为只要数据没读完,下次不会再触发,迫使应用程序一次性循环读完所有数据,从而提升效率。
Q:epoll 真的完全避免了内存拷贝吗?
A:不是。epoll_wait返回时,仍需将内核就绪事件拷贝到用户空间的epoll_event数组中。真正避免的是每次调用都重新拷贝整个 fd 集合(select/poll 的痛点)。
epoll 工作方式
例子
你正在吃鸡, 眼看进入了决赛圈, 你妈饭做好了, 喊你吃饭的时候有两种方式:
1. 如果你妈喊你一次, 你没动, 那么你妈会继续喊你第二次, 第三次...(亲妈, 水平触发)
2. 如果你妈喊你一次, 你没动, 你妈就不管你了(后妈, 边缘触发)
epoll 有 2 种工作方式-水平触发(LT)和边缘触发(ET)
假如有这样一个例子:
我们已经把一个 tcp socket 添加到 epoll 描述符
这个时候 socket 的另一端被写入了 2KB 的数据
调用 epoll_wait, 并且它会返回. 说明它已经准备好读取操作
然后调用 read, 只读取了 1KB 的数据
继续调用 epoll_wait......
水平触发 Level Triggered 工作模式
epoll 默认状态下就是 LT 工作模式.
当 epoll 检测到 socket 上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
如上面的例子, 由于只读了 1K 数据, 缓冲区中还剩 1K 数据, 在第二次调用 epoll_wait 时, epoll_wait 仍然会立刻返回并通知 socket 读事件就绪.
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
支持阻塞读写和非阻塞读写
边缘触发 Edge Triggered 工作模式
如果我们在第 1 步将 socket 添加到 epoll 描述符的时候使用了 EPOLLET 标志, epoll 进入 ET 工作模式.
当 epoll 检测到 socket 上事件就绪时, 必须立刻处理.
如上面的例子, 虽然只读了 1K 的数据, 缓冲区还剩 1K 的数据, 在第二次调用 epoll_wait 的时候, epoll_wait 不会再返回了.
也就是说, ET 模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
ET 的性能比 LT 性能更高(epoll_wait 返回的次数少了很多). Nginx 默认采用 ET 模式使用 epoll.
只支持非阻塞的读写
select 和 poll 其实也是工作在 LT 模式下. epoll 既可以支持 LT, 也可以支持 ET.
对比 LT 和 ET
LT 是 epoll 的默认行为.
使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完.
相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是如果是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.
另一方面, ET 的代码复杂程度更高了.
理解 ET 模式和非阻塞文件描述符
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是"工程实践"上的要求.
假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第二个 10k 请求.
如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据就会待在缓冲区中.
此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪, epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中, 直到下一次客户端再给服务器写数据. epoll_wait 才能返回
但是问题来了.
服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
客户端要读到服务器的响应, 才会发送下一个请求
客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.
所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来.
而如果是 LT 没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪.
epoll 的使用场景
epoll 的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll 的性能可能适得其反.
对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用 epoll.
例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网 APP 的入口服务器, 这样的服务器就很适合 epoll.
如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用 epoll 就并不合适. 具体要根据需求和场景特点来决定使用哪种 IO 模型.
epoll 中的惊群问题
1. 惊群效应产生的原因
在多线程或多进程环境下,有些开发者为了提高程序的稳定性,会安排多个线程(或多个进程)同时在同一 socket 描述符上调用epoll_wait进行监听。当一个新的连接请求到达时,操作系统无法决定应该由哪一个线程或进程来处理这个事件,于是它可能会“唤醒”其中几个等待的线程或进程。最终,实际上只有一个线程或进程能够成功执行accept并处理该连接,其他被唤醒的线程或进程在尝试accept时会失败,并得到errno为EAGAIN的错误。这种现象就被称为惊群效应。
惊群效应显然会带来额外的 CPU 资源消耗和性能下降,因为多余被唤醒的线程/进程白白浪费了调度开销。
2. 惊群问题的解决方法
2.1 多线程环境下的解决方法
在多线程环境下,不建议让多个线程同时在一个 socket 上执行epoll_wait。更好的做法是:只让一个线程负责调用epoll_wait并监听监听 socket,当新的连接请求到来时,由该线程调用accept建立新连接,然后将这个新连接交给其他工作线程去处理后续的数据读写。这样就从根源上避免了多线程同时等待同一个 socket 的情况,惊群效应自然消失。
2.2 多进程环境下的解决方法
很多开源软件(如 lighttpd、nginx)采用 master/workers 的多进程模式来提高吞吐量和并发能力。对于惊群问题,它们有不同的处理思路。
lighttpd 的做法:无视惊群
lighttpd 仍然让每个子进程都在自己的监听 socket 上调用epoll_wait。当新连接到来时,操作系统依然可能唤醒多个子进程,但最终只有一个子进程能成功accept(返回新连接),其他被唤醒的子进程在accept时会遇到EAGAIN错误。lighttpd 的做法是捕获并忽略这个错误,继续运行。也就是说,它并不刻意避免惊群,而是接受它带来的少量额外开销,认为这在可接受范围内。
nginx 的做法:使用全局锁确保同一时刻只有一个子进程在等待
nginx 采用了更精细的控制:同一时刻,永远只让一个子进程在监听 socket 上执行epoll_wait。它实现了一个全局的互斥锁(pthread_mutex_t),每个子进程在准备调用epoll_wait之前,需要先尝试获取这个锁。只有成功获得锁的子进程,才会真正去执行epoll_wait并接受新连接;其他没有获得锁的子进程则跳过本次监听,去做其他事情(例如处理已有连接上的 I/O 事件)。
下面是一段 nginx 相关的核心代码逻辑(简化说明):
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { // 尝试获取全局锁 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_mutex_held = 1; return NGX_OK; } if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; }此外,nginx 还考虑了负载均衡:只有当当前子进程的负载较轻(例如ngx_accept_disabled <= 0)时,它才会去尝试获取锁并监听新连接;如果负载过高,该子进程会暂时放弃竞争监听权,优先处理已有连接。
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { if (ngx_accept_disabled > 0) { ngx_accept_disabled--; // 负载过高时,不参与 accept } else { if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } } // ... 后续调用 epoll_wait 等 }这样,nginx 不仅避免了惊群效应,还能在多进程之间实现更均衡的负载分配。
epoll 示例: epoll 服务器(LT 模式)
tcp_epoll_server.hpp
/////////////////////////////////////////////////////////// // 封装一个 Epoll 服务器, 只考虑读就绪的情况 // 该服务器仅处理读事件(EPOLLIN),不考虑写事件或其他特殊事件 /////////////////////////////////////////////////////////// #pragma once #include <vector> #include <functional> #include <sys/epoll.h> #include "tcp_socket.hpp" // 处理回调函数类型:接收请求字符串,填充响应字符串 typedef std::function<void (const std::string&, std::string*)> Handler; // Epoll 类:封装 epoll 相关操作,简化接口 class Epoll { public: // 构造函数:创建 epoll 实例 // 参数 10 在 Linux 2.6.8 之后已无实际意义,仅需 >0 Epoll() { epoll_fd_ = epoll_create(10); } // 析构函数:关闭 epoll 文件描述符 ~Epoll() { close(epoll_fd_); } // 将一个 socket 添加到 epoll 监听集合中,仅监听读事件(EPOLLIN) bool Add(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Add] fd = %d\n", fd); epoll_event ev; ev.data.fd = fd; // 保存 fd,便于后续识别 ev.events = EPOLLIN; // 只关心可读事件 int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev); if (ret < 0) { perror("epoll_ctl ADD"); return false; } return true; } // 从 epoll 监听集合中移除一个 socket bool Del(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Del] fd = %d\n", fd); // 最后一个参数传 NULL 即可,内核会忽略它 int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL); if (ret < 0) { perror("epoll_ctl DEL"); return false; } return true; } // 等待就绪事件,将就绪的 socket 放入 output 容器 // 注意:该函数会阻塞,直到有事件发生(timeout = -1) bool Wait(std::vector<TcpSocket> *output) const { output->clear(); // 准备事件数组,最多容纳 1000 个就绪事件 epoll_event events[1000]; int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1); if (nfds < 0) { perror("epoll_wait"); return false; } // [注意!] 此处必须是循环到 nfds, 不能多循环 // nfds 就是实际就绪的 fd 个数,循环 events 数组的前 nfds 个即可 for (int i = 0; i < nfds; ++i) { TcpSocket sock(events[i].data.fd); // 通过 fd 构造临时 socket 对象 output->push_back(sock); // 存入输出列表 } return true; } private: int epoll_fd_; // epoll 文件描述符 }; // TcpEpollServer:基于 epoll 的 TCP 服务器类 class TcpEpollServer { public: TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) { // 仅保存 IP 和端口,实际创建在 Start 中进行 } // 启动服务器,传入业务处理回调 handler bool Start(Handler handler) { // 1. 创建监听 socket TcpSocket listen_sock; CHECK_RET(listen_sock.Socket()); // 假设 CHECK_RET 是一个宏,检查返回值并返回 bool // 2. 绑定地址 CHECK_RET(listen_sock.Bind(ip_, port_)); // 3. 开始监听,backlog = 5 CHECK_RET(listen_sock.Listen(5)); // 4. 创建 Epoll 对象,并将监听 socket 加入 epoll 集 Epoll epoll; epoll.Add(listen_sock); // 5. 进入主事件循环 for (;;) { // 6. 调用 epoll_wait 获取就绪的 socket 列表 std::vector<TcpSocket> output; if (!epoll.Wait(&output)) { continue; // 出错则跳过本次循环,继续等待 } // 7. 根据就绪的文件描述符类型进行不同处理 for (size_t i = 0; i < output.size(); ++i) { // 情况一:就绪的是监听 socket,表示有新的连接到来 if (output[i].GetFd() == listen_sock.GetFd()) { TcpSocket new_sock; // 接受新连接,得到客户端 socket listen_sock.Accept(&new_sock); // 将新的客户端 socket 也加入 epoll 监听,以便后续处理其读就绪事件 epoll.Add(new_sock); } // 情况二:就绪的是普通通信 socket(已连接 socket) else { std::string req, resp; // 尝试接收数据 bool ret = output[i].Recv(&req); if (!ret) { // [注意!!] 如果接收失败(对端关闭或出错),则从 epoll 中删除此 socket // 并关闭它。注意顺序:先删除再关闭(Del 内部调用 epoll_ctl 时需要有效 fd) epoll.Del(output[i]); output[i].Close(); continue; // 跳过后续处理,继续下一个就绪 socket } // 调用业务处理回调,生成响应数据 handler(req, &resp); // 将响应数据发送回客户端 output[i].Send(resp); } } // end for (output) } // end for (;;) return true; } private: std::string ip_; uint16_t port_; };dict_server.cc 只需要将 server 对象的类型改成 TcpEpollServer 即可.
epoll 示例: epoll 服务器(ET 模式)
基于 LT 版本稍加修改即可
修改 tcp_socket.hpp, 新增非阻塞读和非阻塞写接口
对于 accept 返回的 new_sock 加上 EPOLLET 这样的选项
注意: 此代码暂时未考虑 listen_sock ET 的情况. 如果将 listen_sock 设为 ET, 则需要非阻塞轮询的方式 accept, 否则会导致同一时刻大量的客户端同时连接的时候, 只能 accept 一次的问题.
tcp_socket.hpp
// 以下代码添加在 TcpSocket 类中 // 非阻塞 IO 接口 bool SetNoBlock() { int fl = fcntl(fd_, F_GETFL); if (fl < 0) { perror("fcntl F_GETFL"); return false; } int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK); if (ret < 0) { perror("fcntl F_SETFL"); return false; } return true; } bool RecvNoBlock(std::string* buf) const { // 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误 // 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试 // 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环 // 这种写法其实不算特别严谨(没有考虑粘包问题) buf->clear(); char tmp[1024 * 10] = {0}; for (;;) { ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0); if (read_size < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { continue; } perror("recv"); return false; } if (read_size == 0) { // 对端关闭, 返回 false return false; } tmp[read_size] = '\0'; *buf += tmp; if (read_size < (ssize_t)sizeof(tmp) - 1) { break; } } return true; } bool SendNoBlock(const std::string& buf) const { // 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况 // 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗 // 而要进行重试 ssize_t cur_pos = 0; // 记录当前写到的位置 ssize_t left_size = buf.size(); for (;;) { ssize_t write_size = send(fd_, buf.data() + cur_pos, left_size, 0); if (write_size < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // 重试写入 continue; } return false; } cur_pos += write_size; left_size -= write_size; // 这个条件说明写完需要的数据了 if (left_size <= 0) { break; } } return true; }tcp_epoll_server.hpp
/////////////////////////////////////////////////////////// // 封装一个 Epoll ET 服务器 // 修改点: // 1. 对于 new sock, 加上 EPOLLET 标记 // 2. 修改 TcpSocket 支持非阻塞读写 // [注意!] listen_sock 如果设置成 ET, 就需要非阻塞调用 accept 了 // 稍微麻烦一点, 此处暂时不实现 /////////////////////////////////////////////////////////// #pragma once #include <vector> #include <functional> #include <sys/epoll.h> #include "tcp_socket.hpp" typedef std::function<void (const std::string&, std::string*)> Handler; class Epoll { public: Epoll() { epoll_fd_ = epoll_create(10); } ~Epoll() { close(epoll_fd_); } bool Add(const TcpSocket& sock, bool epoll_et = false) const { int fd = sock.GetFd(); printf("[Epoll Add] fd = %d\n", fd); epoll_event ev; ev.data.fd = fd; if (epoll_et) { ev.events = EPOLLIN | EPOLLET; } else { ev.events = EPOLLIN; } int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev); if (ret < 0) { perror("epoll_ctl ADD"); return false; } return true; } bool Del(const TcpSocket& sock) const { int fd = sock.GetFd(); printf("[Epoll Del] fd = %d\n", fd); int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL); if (ret < 0) { perror("epoll_ctl DEL"); return false; } return true; } bool Wait(std::vector<TcpSocket> *output) const { output->clear(); epoll_event events[1000]; int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1); if (nfds < 0) { perror("epoll_wait"); return false; } // [注意!] 此处必须是循环到 nfds, 不能多循环 for (int i = 0; i < nfds; ++i) { TcpSocket sock(events[i].data.fd); output->push_back(sock); } return true; } private: int epoll_fd_; }; class TcpEpollServer { public: TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket TcpSocket listen_sock; CHECK_RET(listen_sock.Socket()); // 2. 绑定 CHECK_RET(listen_sock.Bind(ip_, port_)); // 3. 监听 CHECK_RET(listen_sock.Listen(5)); // 4. 创建 Epoll 对象, 并将 listen_sock 加入进去 Epoll epoll; epoll.Add(listen_sock); // 5. 进入事件循环 for (;;) { // 6. 进行 epoll_wait std::vector<TcpSocket> output; if (!epoll.Wait(&output)) { continue; } // 7. 根据就绪的文件描述符的种类决定如何处理 for (size_t i = 0; i < output.size(); ++i) { if (output[i].GetFd() == listen_sock.GetFd()) { // 如果是 listen_sock, 就调用 accept TcpSocket new_sock; listen_sock.Accept(&new_sock); epoll.Add(new_sock, true); } else { // 如果是 new_sock, 就进行一次读写 std::string req, resp; bool ret = output[i].RecvNoBlock(&req); if (!ret) { // [注意!!] 需要把不用的 socket 关闭 // 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了 epoll.Del(output[i]); output[i].Close(); continue; } handler(req, &resp); output[i].SendNoBlock(resp); printf("[client %d] req: %s, resp: %s\n", output[i].GetFd(), req.c_str(), resp.c_str()); } // end for } // end for (;;) } return true; } private: std::string ip_; uint16_t port_; };