C++服务器框架:协程库——I/O协程调度
继承自协程调度器,封装了
epoll,支持为socket fd注册读写事件回调函数。
1 I/O协程调度概述
I/O事件调度功能对服务器开发⾄关重要,因为服务器通常需要处理⼤量来⾃客户端的socket fd,使⽤I/O事件调度可以将开发者从判断socket fd是否可读或可写的⼯作中解放出来,使得程序员只需要关⼼socket fd的I/O操作,实现I/O协程调度意义重⼤。
I/O协程调度可以看成是增强版的协程调度。在前⾯的协程调度模块中,调度器对协程的调度是⽆条件执⾏的,在调度器已经启动调度的情况下,任务⼀旦添加成功,就会排队等待调度器执⾏。调度器不⽀持删除调度任务,并且调度器在正常退出之前⼀定会执⾏完全部的调度任务,所以在某种程度上可以认为,把⼀个协程添加到调度器的任务队列,就相当于调⽤了协程的resume⽅法。
I/O协程调度支持协程调度的全部功能,因为I/O协程调度器是直接继承协程调度器实现的。除了协程调度,I/O协程调度还增加了I/O事件调度的功能,这个功能是针对描述符(一般是套接字描述符)的。I/O协程调度支持为描述符注册可读和可写事件的回调函数,当描述符可读或可写时,执行对应的回调函数。(这里可以直接把回调函数等效成协程,所以这个功能被称为I/O协程调度)。
很多的库都可以实现类似的工作,比如libevent,libuv,libev等,这些库被称为异步事件库或异步I/O库。有的库不仅可以处理socket fd事件,还可以处理定时器事件和信号事件。这些事件库的实现原理基本类似,都是先将套接字设置成非阻塞状态,然后将套接字与回调函数绑定,接下来进入一个基于I/O多路复用的事件循环,等待事件发生,然后调用对应的回调函数。
sylar在协程调度模块的基础上,封装了epoll。支持对I/O事件的调度功能,可以为socket句柄添加读事件(EPOLLIN)和写事件(EPOLLOUT),并且支持删除事件功能。I/OManager主要通过FdContext结构体存储文件描述符fd,
注册的事件event,执行任务cb/fiber,其中fd和event用于epoll_wait,cb/fiber用于执行任务。当有任务时,使用管道pipe来唤醒epoll_wait先执行其他任务。
1.2 相关概念
在I/O操作时,操作系统通常会将数据缓存在文件系统的页缓存(page cache)中。这意味着数据的传输过程分为两个阶段:
- 第一阶段:数据首先被拷贝到操作系统内核的缓冲区中。
- 第二阶段:数据从操作系统内核的缓冲区拷贝到应用程序的地址空间。
1.2.1 阻塞I/O
在默认情况下,所有的socket都是被阻塞的,也就是阻塞I/O,这样会导致两个阶段的阻塞:
- 进程可能需要等待数据到达。 -
数据从内核拷贝到用户空间时,进程也可能被阻塞。
1.2.2 非阻塞I/O
通过设置,可以将socket变为非阻塞模式(non-blocking),在非阻塞模式下:
- 如果内核还没有准备好数据,那么它不会阻塞用户进程,而是立即返回一个错误。
- 当内核准备好数据并且再次收到用户进程的
system call时,数据会被拷贝到用户内存。然而,这个拷贝过程(第二阶段)仍然可能导致进程被阻塞。
1.2.3 异步I/O
异步I/O(Asynchronous I/O)允许在两个阶段都不会阻塞用户进程:
- 用户进程发起read操作后,内核收到system call会立即返回,不会阻塞用户进程。
- 当内核准备好数据并完成拷贝到用户空间后,内核会通过某种机制(如信号或回调函数)通知用户进程,整个过程中用户进程不会被阻塞。
异步I/O是真正的异步操作,它在两个阶段都不会阻塞用户进程。阻塞I/O在两个阶段都可能阻塞用户进程。非阻塞I/O在第一阶段不阻塞用户进程,但在第二阶段拷贝数据时可能会阻塞。I/O多路复用是同步I/O的一种形式,它允许进程同时监视多个I/O操作,但在I/O操作就绪时,进程仍然需要被阻塞以处理这些操作。
2 I/O多路复用
服务器需要与多个客户端建立连接时,会涉及处理大量的socket文件描述符。为了有效管理这些文件描述符并进行I/O操作,可以利用I/O多路复用技术。
当用户进程调用select函数时,整个进程会被阻塞。同时,内核会监视所有由select负责的socket文件描述符。一旦任何一个socket中的数据准备好了,select就会返回,此时用户进程可以调用recvfrom函数来接收数据。内核收到系统调用后将数据拷贝到用户进程中。
在Linux中,主要有三种常用的I/O多路复用方式:select、poll和epoll。通常情况下,将socket设置为非阻塞模式(O_NONBLOCK),这样在进行I/O操作时,用户进程虽然仍然会被阻塞,但是是被select函数阻塞,而不是被socket I/O阻塞。
- 当
select返回了,那一定是socket中的数据准备好了,recvfrom也不会阻塞了,所以设不设置socket为非阻塞模式似乎没什么区别?
将socket设置为非阻塞模式(使用O_NONBLOCK标志)可以确保即使select报告socket可读,recvfrom调用也不会阻塞。这是因为在非阻塞模式下,如果数据没有准备好,recvfrom会立即返回一个错误,而不是等待数据。此外,在Linux下,select可能会将socket报告为“准备读取”,即使实际上并没有数据可读。这可能是由于多种原因,包括但不限于网络条件、socket状态的变化,或者是内核内部的实现细节。因此,即使select返回,也不能保证recvfrom调用一定会成功。最后,在不应阻塞的socket上使用非阻塞模式(O_NONBLOCK)可以避免潜在的阻塞,确保应用程序能够继续响应其他事件或进行其他操作。
2.1 select
1 | /* |
当进程调用select时将会被阻塞,fd_set的数据结构为bitmap,通过FD_SET方法将需要监听的文件描述符集合fdset对应的bitmap置为1(例如文件描述符集合为4,9,那么就将bitmap的第4位和第9位置为1),select会截取bitmap前n位进行监听。select会将需要关注的fd_set拷贝到内核态监听,当有数据来时,内核将有数据的fd_set置位(bitmap对应的文件描述符置位为相应的操作,读、写、异常),select返回。因为不知道是哪个文件描述符来数据了,所以再遍历fdset寻找就绪的文件描述符。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
select的缺点: 1.
单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
2.
fd_set是不可重用的,每次需要使用FD_ZERO方法清空。
3.
每次调用select都需要将fd_set拷贝到内核态,有开销。
4.
不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为O(n)。
2.2 poll
1 | /* |
poll数据结构与select不同,poll采用数组存储pollfd,并将fd和关注的事件(POLLIN等)分别保存到pollfd的fd和events中。
poll与select工作原理相同,但要注意的是,当数据来时,poll将revents置位(POLLIN等),然后poll函数返回。仍然要遍历数组来看是哪个文件描述符来了,并且将revents置为0,这样就能重复使用pollfd。
poll优点:
- 解决了
select的1024上限。 - 解决了
selectfd_set不可重用,pollfd可以通过重置revents恢复如初。
poll缺点:
- 每次调用
poll都需要将pollfd拷贝到内核态,有开销。 - 不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为
O(n)。
2.3 epoll
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
epoll通过以下3个接口操作:
更多详细内容可以查看深度Linux - epoll性能那么高,为什么?
2.3.1 epoll_create
1 | //创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 |
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。但看了源码,只要size大于0就可以了,没有实质性的作用。
当创建好epoll句柄后,它就会占用一个fd值,在Linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
通过源码得知,每创建一个epollfd,内核就会分配一个eventpoll结构体与之对应,其中维护了一个RBTree来存放所有要监听的struct epitem(表示一个被监听的fd)。
2.3.2 epoll_ctl
从用户空间将epoll_event结构copy到内核空间
1 | /* |
events可以是以下几个宏的集合: - EPOLLIN
:表示对应的文件描述符可以读(包括对端SOCKET正常关闭); -
EPOLLOUT:表示对应的文件描述符可以写; -
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
- EPOLLERR:表示对应的文件描述符发生错误; -
EPOLLHUP:表示对应的文件描述符被挂断; -
EPOLLET: 将EPOLL设为边缘触发(Edge
Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 -
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
通过源码得知,同一个fd不能重复添加。内核会自动添加这两个事件epds.events |= POLLERR | POLLHUP;并且使用copy_from_user从用户空间将epoll_event结构copy到内核空间。
1 | if (ep_op_has_event(op) && |
2.3.3 epoll_wait
1 | /* |
收集在epoll监控的事件中已经发生的事件,如果epoll中没有任何一个事件发生,则最多等待timeout毫秒后返回。epoll_wait的返回值表示当前发生的事件个数,如果返回0,则表示本次调用中没有事件发生,如果返回-1,则表示发生错误,需要检查errno判断错误类型。
通过源码得知,通过__put_user将数据从内核空间拷贝到用户空间。
1 | if (__put_user(revents, &uevent->events) || |
2.3.4 epoll工作模式
epoll有两种工作模式,LT(水平触发)模式与ET(边缘触发)模式。默认情况下,epoll采用LT模式工作。两个的区别是:
Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用epoll_wait时,它还会通知在没读写完的文件描述符上继续读写,当然如果一直不去读写,会一直通知。如果系统中有大量不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait时,它不会通知,也就是它只会通知一次,直到该文件描述符上出现第二次可读写事件才会再次通知。这种模式比水平触发效率高,系统不会充斥大量用户不关心的就绪文件描述符。
在LT模式下开发基于epoll的应用要简单一些,不太容易出错,而在ET模式下事件发生时,如果没有彻底地将缓冲区的数据处理完,则会导致缓冲区的用户请求得不到响应。注意,默认情况下Nginx采用ET模式使用epoll的。
2.3.5 epoll优点
监视的描述符数量不受限制:它所支持的
fd上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat/proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。IO的效率不会随着监视fd的数量的增长而下降:epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数ep_poll_callback()。ep_poll_callback()的调用时机是由被监听的fd的具体实现,比如socket或者某个设备驱动来决定的,因为等待队列头是他们持有的,epoll和当前进程只是单纯的等待。
epoll使用一个文件描述符管理多个描述符:将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
2.4
epoll与select、poll的比较
select |
poll |
epoll |
|
|---|---|---|---|
| 数据结构 | bitmap |
数组 | 红黑树+链表 |
| 最大连接数 | 1024 |
无上限 | 无上限 |
fd拷贝 |
每次调用select拷贝 | 每次调用poll拷贝 |
首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝 |
| 工作效率 | 轮询:O(n) |
轮询:O(n) |
回调:O(1) |
3 I/OManager实现
言归正传,sylar的I/O协程调度模块基于epoll实现,只支持Linux平台。对每个fd,sylar支持两类事件,一类是可读事件,对应EPOLLIN,一类是可写事件,对应EPOLLOUT,sylar的事件枚举值直接继承自epoll。当然epoll本身除了支持了EPOLLIN和EPOLLOUT两类事件外,还支持其他事件,比如EPOLLRDHUP,
EPOLLERR,
EPOLLHUP等,对于这些事件,sylar的做法是将其进行归类,分别对应到EPOLLIN和EPOLLOUT中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读及可写事件,比如EPOLLERR事件发生时,fd将同时触发可读和可写事件。
对于IO协程调度来说,每次调度都包含一个三元组信息,分别是描述符-事件类型(可读或可写)-回调函数,调度器记录全部需要调度的三元组信息,其中描述符和事件类型用于epoll_wait,回调函数用于协程调度。这个三元组信息在源码上通过FdContext结构体来存储,在执行epoll_wait时通过epoll_event的私有数据指针data.ptr来保存FdContext结构体信息。IO协程调度器在idle时会epoll_wait所有注册的fd,如果有fd满足条件,epoll_wait返回,从私有数据中拿到fd的上下文信息,并且执行其中的回调函数。(实际是idle协程只负责收集所有已触发的fd的回调函数并将其加入调度器的任务队列,真正的执行时机是idle协程退出后,调度器在下一轮调度时执行)。与协程调度器不一样的是,IO协程调度器支持取消事件。取消事件表示不关心某个fd的某个事件了,如果某个fd的可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。
3.1 IOManager类
sylar的IO协程调度器对应IOManager,这个类直接继承自Scheduler:
1 | class IOManager : public Scheduler { |
首先是读写事件的定义,这里直接继承epoll的枚举值,如下:
1
2
3
4
5
6
7/// @brief IO事件,继承自epoll_event对事件的定义
/// @details 这里只关心socket fd的读和写事件,其他epoll事件会归类到这两类事件中
enum Event {
NONE = 0x0, // 无事件
READ = 0x1, // 读事件(EPOLLIN)
WRITE = 0x4 // 写事件(EPOLLOUT)
};
3.2 FdContext结构体
接下来是对描述符-事件类型-回调函数三元组的定义,这个三元组也称为fd上下文,使用结构体FdContext来表示。由于fd有可读和可写两种事件,每种事件的回调函数也可以不一样,所以每个fd都需要保存两个事件类型-回调函数组合。FdContext结构体定义如下:
1 | /// @brief Socket事件上下文 |
3.3 成员变量
IOManager包含一个epoll实例的句柄m_epfd以及用于tickle的一对pipe fd,还有全部的fd上下文m_fdContexts,如下:
1 | int m_epfd = 0; /// epoll文件句柄 |
3.4 IOManager()
IOManager类的构造函数,接收三个参数,分别是线程数量,是否使用调度线程,以及调度器的名字。在构造函数中,首先创建epoll句柄,然后创建一个pipe,并将读端添加到epoll中,这个pipe的作用是用于唤醒epoll_wait,因为epoll_wait是阻塞的,如果没有事件发生,epoll_wait会一直阻塞,所以需要一个pipe来唤醒epoll_wait。最后调用contextResize函数初始化fd上下文数组,然后启动调度器。
1 | IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) |
3.5 ~IOManager()
IOManager类的析构函数,关闭epoll句柄和pipe,并且释放fd上下文数组。
1 | IOManager::~IOManager() { |
3.6 addEvent()
addEvent函数用于添加fd的事件,fd的事件类型,以及事件的回调函数。首先通过RWMutex加写锁,然后通过fd获取fd上下文,如果fd上下文不存在,则创建一个新的fd上下文,然后根据事件类型设置事件上下文的调度器和回调函数,最后将fd上下文的事件类型添加到fd上下文的事件中。
1 | int IOManager::addEvent(int fd, Event event, std::function<void()> cb) { |
3.7 delEvent()
delEvent函数用于删除fd的事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33bool IOManager::delEvent(int fd, Event event) {
// 找到fd对应的上下文 fdcontext
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdContexts.size() <= fd) return false; // 如果fd对应的上下文不存在,那么直接返回false
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (SYLAR_UNLIKELY(!(fd_ctx -> events & event))) return false; // 如果若没有要删除的事件,直接返回false
// 清除指定的事件,表示不关心这个事件了,如果清除之后结果为0,则从epoll_wait中删除该文件描述符
Event new_events = (Event)(fd_ctx -> events & ~event); // 清除指定的事件
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; // 如果还有事件,那么就是修改事件,否则就是删除事件
epoll_event epevent;
epevent.events = EPOLLET | new_events; // 水平触发模式,新的注册事件
epevent.data.ptr = fd_ctx; // 将fd_ctx存到data的指针中
int rt = epoll_ctl(m_epfd, op, fd, &epevent); // 注册事件
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}
--m_pendingEventCount; // 减少待处理事件数量
fd_ctx -> events = new_events; // 更新事件
FdContext::EventContext& event_ctx = fd_ctx -> getEventContext(event); // 拿到对应事件的EventContext
fd_ctx -> resetEventContext(event_ctx); // 重置EventContext
return true;
}
3.8 cancelEvent()
cancelEvent函数用于取消fd的事件,取消事件表示不关心某个fd的某个事件了,如果某个fd的可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。
- 取消事件会触发该事件。
1 | bool IOManager::cancelEvent(int fd, Event event) { |
3.9 cancelAll()
cancelAll函数用于取消fd的所有事件。
1 | bool IOManager::cancelAll(int fd) { |
3.10 GetThis()
获得当前IO调度器:
1 | IOManager* IOManager::GetThis() { |
3.11 tickle()
通知调度协程、也就是Scheduler::run()从idle中退出。IOManager的idle协程每次从idle中退出之后,都会重新把任务队列里的所有任务执行完了再重新进入idle,如果没有调度线程处理于idle状态,那也就没必要发通知了。
1 | void IOManager::tickle() { |
3.12 idle()
重写Scheduler的idle函数,调度器无调度任务时会阻塞idle协程上,对IO调度器而言,idle状态应该关注两件事,一是有没有新的调度任务,对应Schduler::schedule(),如果有新的调度任务,那应该立即退出idle状态,并执行对应的任务;二是关注当前注册的所有IO事件有没有触发,如果有触发,那么应该执行IO事件对应的回调函数。
1 | void IOManager::idle() { |
3.13 stopping()
重写Scheduler的stopping函数,判断是否可以停止,同时获取最近一个定时器的超时时间,如果有定时器,那么就等到定时器超时时间。
1 | bool IOManager::stopping() { |
4 总结
- 总的来说,
sylar的IO协程调度模块可分为两部分,第一部分是对协程调度器的改造,将epoll与协程调度融合,重新实现tickle和idle,并保证原有的功能不变。第二部分是基于epoll实现IO事件的添加、删除、调度、取消等功能。 IO协程调度关注的是FdContext信息,也就是描述符-事件-回调函数三元组,IOManager需要保存所有关注的三元组,并且在epoll_wait检测到描述符事件就绪时执行对应的回调函数。epoll是线程安全的,即使调度器有多个调度线程,它们也可以共用同一个epoll实例,而不用担心互斥。由于空闲时所有线程都阻塞的epoll_wait上,所以也不用担心CPU占用问题。addEvent是一次性的,比如说,注册了一个读事件,当fd可读时会触发该事件,但触发完之后,这次注册的事件就失效了,后面fd再次可读时,并不会继续执行该事件回调,如果要持续触发事件的回调,那每次事件处理完都要手动再addEvent。这样在应对fd的WRITE事件时会比较好处理,因为fd可写是常态,如果注册一次就一直有效,那么可写事件就必须在执行完之后就删除掉。cancelEvent和cancelAll都会触发一次事件,但delEvent不会。FdContext的寻址问题,sylar直接使用fd的值作为FdContext数组的下标,这样可以快速找到一个fd对应的FdContext。由于关闭的fd会被重复利用,所以这里也不用担心FdContext数组膨胀太快,或是利用率低的问题。IO协程调度器的退出,不但所有协程要完成调度,所有IO事件也要完成调度。sylar的IO协程调度器应该配合非阻塞IO来使用,如果使用阻塞模式,可能会阻塞进程,参考为什么 IO 多路复用要搭配非阻塞 IO?