reactor

Reactor

pub(crate) struct Reactor {
    pub epoll: Epoll,
    pub wakers: Mutex<BTreeMap<RawFd, Waker>>,
}

字段 epoll 存储创建的 Epoll 实例,wakers 存储等待的IO事件的文件描述符和对应的 waker

我们稍后将会创建 Epoll 的静态变量,为了内部可变性,就把 BTreeMap<RawFd, Waker> 包在 Mutex 中。

添加事件

impl Reactor {
    pub(crate) fn add_event(&self, fd: RawFd, op: EpollEventType, waker: Waker) -> io::Result<()> {
        info!("(Reactor) add event: {}", fd);
        self.epoll.add_event(fd, op)?;
        self.wakers.lock().unwrap().insert(fd, waker);
        Ok(())
    }
}

Reactor 的添加事件的方法中,首先调用 epolladd_event 方法注册文件描述符和监听的事件,然后把描述符和对应的 waker 存储在 BTreeMap<RawFd, Waker> 中。

reactor 循环

fn reactor_main_loop() -> io::Result<()> {
    info!("Start reactor main loop");
    let max_event = 32;
    let event: libc::epoll_event = unsafe { mem::zeroed() };
    let mut events = vec![event; max_event];
    let reactor = &REACTOR;

    loop {
        let nfd = reactor.epoll.wait(&mut events)?;
        info!("(Reactor) wake up. nfd = {}", nfd);

        #[allow(clippy::needless_range_loop)]
        for i in 0..nfd {
            let fd = events[i].u64 as RawFd;
            if let Some(waker) = reactor.wakers.lock().unwrap().remove(&fd) {
                info!("(Reactor) delete event: {}", fd);
                reactor.epoll.del_event(fd)?;
                waker.wake();
            }
        }
    }
}

reactor_main_loop 函数中,我们使用一个 loop 循环,在循环中调用 epollwait 方法获取所有就绪的 IO 事件的文件描述符,如果没有事件就绪,wait 方法就会阻塞 reactor 线程,避免 CPU 空转。

然后遍历就绪的描述符,从 wakers 中获取描述符对应的 waker,之后调用 epolldelete_event 方法删除描述符,表示这个事件已经处理完毕。

最后,调用 wakerwake 方法,把因为等待IO事件而挂起的 task 发送到 executor 的执行队列中。

REACTOR 静态变量

lazy_static! {
    pub(crate) static ref REACTOR: Reactor = {
        // Start reactor main loop
        std::thread::spawn(move || {
            reactor_main_loop()
        });

        Reactor {
            epoll: Epoll::new().expect("failed to create epoll"),
            wakers: Mutex::new(BTreeMap::new())
        }
    };
}

Executor 在主线程运行,负责调度执行 task,而 reactor_main_loop 内部使用一个无线循环不断地获取就绪的 fd,并唤醒挂起的 task。为了避免 reactor_main_loop 阻塞 Executor,我们就开一个线程去执行 reactor_main_loop

之所以把 REACTOR 创建成全局静态变量,是为了在其他的模块中方便地调用 REACTOR 的方法。