executor
Executor
pub struct Executor {
task_queue: Receiver<Arc<Task>>,
waker_cache: BTreeMap<TaskId, Waker>,
}
task_queue 是一个 channel 的接收端,当 spawn 或者 wake 一个 task 时,就会发送 Arc<Task> 到 task_queue 中。
waker_cache 使用 BTreeMap 缓存可能会重复使用的 Waker,这是为了减小构造 Waker 的开销。
实际上,
Executor中的task_queue只是一个管道的接收端,并不是队列,只是我个人更习惯称之为队列。
方法实现
impl Executor {
fn new(task_queue: Receiver<Arc<Task>>) -> Self {
Self {
task_queue,
waker_cache: BTreeMap::new(),
}
}
fn run_ready_task(&mut self) {
while let Ok(task) = self.task_queue.recv() {
let waker = self
.waker_cache
.entry(task.task_id())
.or_insert_with(|| Waker::from(task.clone()));
let mut context = Context::from_waker(waker);
match task.poll(&mut context) {
Poll::Ready(_) => {
self.waker_cache.remove(&task.task_id());
}
Poll::Pending => {}
}
}
}
pub fn run(&mut self) {
self.run_ready_task();
}
}
new 方法接收 Receiver<Arc<Task>> 参数,然后创建一个执行器实例。
run_ready_task 方法中:
-
从
task_queue中接收task: Arc<Task>,然后从waker_cache中查找是否存在对应的waker,如果没有则构造一个Waker。 -
使用
Context的from_waker方法通过waker的引用创建context。 -
调用
task的poll方法,传入&mut context参数,开始执行task。- 如果返回的是
Poll::Ready,说明task执行完毕,从waker_cache中删除缓存的waker。 - 如果返回的是
Poll::Pending,则什么都不做(最终执行的Leaf-Future中会注册等待的事件和waker)。
- 如果返回的是
Spawner
在初始状态下,executor 的执行队列中是空的,我们需要一种机制能够让用户手动地创建 task 并将 task 发送到 executor 的执行队列中,最后开启 executor 的执行。Spawner 抽象便提供了这种机制:
#[derive(Clone)]
pub struct Spawner {
task_sender: Sender<Arc<Task>>,
}
Spawner 中的 task_sender 和 Task 的 task_sender 一样,都是为了把 task 发送到 executor 的执行队列中。
方法实现
impl Spawner {
fn new(task_sender: Sender<Arc<Task>>) -> Self {
Self { task_sender }
}
pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let task = Task::new(future, self.task_sender.clone());
self.task_sender
.send(Arc::new(task))
.expect("send task failed");
}
}
new 方法接收Sender<Arc<Task>> 参数,然后创建一个 Spawner 实例。
spawn 方法中,使用传入的 future 参数创建一个 Task 实例,然后把这个 task 发送到 executor 的执行队列中。当 executor 开始执行的时候就可以从队列中接收 task,驱动 task 的执行了。
创建 Spawner & Executor
定义一个公开的函数,创建 Spawner 和 Executor 实例:
pub fn spawner_and_executor() -> (Spawner, Executor) {
let (task_sender, task_queue) = bounded(10000);
let spawner = Spawner::new(task_sender);
let executor = Executor::new(task_queue);
(spawner, executor)
}
在 spawner_and_executor 函数中,我们使用 crossbeam-channel 提供的 unbounded 函数创建一个容量为 10000 的管道,分别返回管道的发送端和接收端,然后创建 Spawner 和 Executor 实例并返回。