reactor
Reactor
pub(crate) struct Reactor {
pub epoll: Epoll,
pub wakers: Mutex<BTreeMap<RawFd, Waker>>,
}
字段 epoll 存储创建的 Epoll 实例,wakers 存储等待的IO事件的文件描述符和对应的 waker。
我们稍后将会创建 Epoll 的静态变量,为了内部可变性,就把 BTreeMap<RawFd, Waker> 包在 Mutex 中。
添加事件
impl Reactor {
pub(crate) fn add_event(&self, fd: RawFd, op: EpollEventType, waker: Waker) -> io::Result<()> {
info!("(Reactor) add event: {}", fd);
self.epoll.add_event(fd, op)?;
self.wakers.lock().unwrap().insert(fd, waker);
Ok(())
}
}
在 Reactor 的添加事件的方法中,首先调用 epoll 的 add_event 方法注册文件描述符和监听的事件,然后把描述符和对应的 waker 存储在 BTreeMap<RawFd, Waker> 中。
reactor 循环
fn reactor_main_loop() -> io::Result<()> {
info!("Start reactor main loop");
let max_event = 32;
let event: libc::epoll_event = unsafe { mem::zeroed() };
let mut events = vec![event; max_event];
let reactor = &REACTOR;
loop {
let nfd = reactor.epoll.wait(&mut events)?;
info!("(Reactor) wake up. nfd = {}", nfd);
#[allow(clippy::needless_range_loop)]
for i in 0..nfd {
let fd = events[i].u64 as RawFd;
if let Some(waker) = reactor.wakers.lock().unwrap().remove(&fd) {
info!("(Reactor) delete event: {}", fd);
reactor.epoll.del_event(fd)?;
waker.wake();
}
}
}
}
在 reactor_main_loop 函数中,我们使用一个 loop 循环,在循环中调用 epoll 的 wait 方法获取所有就绪的 IO 事件的文件描述符,如果没有事件就绪,wait 方法就会阻塞 reactor 线程,避免 CPU 空转。
然后遍历就绪的描述符,从 wakers 中获取描述符对应的 waker,之后调用 epoll 的 delete_event 方法删除描述符,表示这个事件已经处理完毕。
最后,调用 waker 的 wake 方法,把因为等待IO事件而挂起的 task 发送到 executor 的执行队列中。
REACTOR 静态变量
lazy_static! {
pub(crate) static ref REACTOR: Reactor = {
// Start reactor main loop
std::thread::spawn(move || {
reactor_main_loop()
});
Reactor {
epoll: Epoll::new().expect("failed to create epoll"),
wakers: Mutex::new(BTreeMap::new())
}
};
}
Executor 在主线程运行,负责调度执行 task,而 reactor_main_loop 内部使用一个无线循环不断地获取就绪的 fd,并唤醒挂起的 task。为了避免 reactor_main_loop 阻塞 Executor,我们就开一个线程去执行 reactor_main_loop。
之所以把 REACTOR 创建成全局静态变量,是为了在其他的模块中方便地调用 REACTOR 的方法。