reactor
Reactor
pub(crate) struct Reactor {
pub epoll: Epoll,
pub wakers: Mutex<BTreeMap<RawFd, Waker>>,
}
字段 epoll
存储创建的 Epoll
实例,wakers
存储等待的IO事件的文件描述符和对应的 waker
。
我们稍后将会创建 Epoll
的静态变量,为了内部可变性,就把 BTreeMap<RawFd, Waker>
包在 Mutex
中。
添加事件
impl Reactor {
pub(crate) fn add_event(&self, fd: RawFd, op: EpollEventType, waker: Waker) -> io::Result<()> {
info!("(Reactor) add event: {}", fd);
self.epoll.add_event(fd, op)?;
self.wakers.lock().unwrap().insert(fd, waker);
Ok(())
}
}
在 Reactor
的添加事件的方法中,首先调用 epoll
的 add_event
方法注册文件描述符和监听的事件,然后把描述符和对应的 waker
存储在 BTreeMap<RawFd, Waker>
中。
reactor 循环
fn reactor_main_loop() -> io::Result<()> {
info!("Start reactor main loop");
let max_event = 32;
let event: libc::epoll_event = unsafe { mem::zeroed() };
let mut events = vec![event; max_event];
let reactor = &REACTOR;
loop {
let nfd = reactor.epoll.wait(&mut events)?;
info!("(Reactor) wake up. nfd = {}", nfd);
#[allow(clippy::needless_range_loop)]
for i in 0..nfd {
let fd = events[i].u64 as RawFd;
if let Some(waker) = reactor.wakers.lock().unwrap().remove(&fd) {
info!("(Reactor) delete event: {}", fd);
reactor.epoll.del_event(fd)?;
waker.wake();
}
}
}
}
在 reactor_main_loop
函数中,我们使用一个 loop
循环,在循环中调用 epoll
的 wait
方法获取所有就绪的 IO 事件的文件描述符,如果没有事件就绪,wait
方法就会阻塞 reactor
线程,避免 CPU 空转。
然后遍历就绪的描述符,从 wakers
中获取描述符对应的 waker
,之后调用 epoll
的 delete_event
方法删除描述符,表示这个事件已经处理完毕。
最后,调用 waker
的 wake
方法,把因为等待IO事件而挂起的 task
发送到 executor
的执行队列中。
REACTOR 静态变量
lazy_static! {
pub(crate) static ref REACTOR: Reactor = {
// Start reactor main loop
std::thread::spawn(move || {
reactor_main_loop()
});
Reactor {
epoll: Epoll::new().expect("failed to create epoll"),
wakers: Mutex::new(BTreeMap::new())
}
};
}
Executor
在主线程运行,负责调度执行 task
,而 reactor_main_loop
内部使用一个无线循环不断地获取就绪的 fd
,并唤醒挂起的 task
。为了避免 reactor_main_loop
阻塞 Executor
,我们就开一个线程去执行 reactor_main_loop
。
之所以把 REACTOR
创建成全局静态变量,是为了在其他的模块中方便地调用 REACTOR
的方法。