Introduction

本书主要介绍 Rust 中 async/await 语法和异步运行时的原理和工作机制,并不涉实际的异步代码编写。全书的内容主要分为以下几个章节:

  • 异步编程:介绍 Rust 异步编程的基础概念,以及在 Rust 中应用的异步模型。

  • async/await:介绍Rust为支持异步编程而提供的语言层面的支持,包括 async/await 语法和它们的工作原理。

  • IO 模型:介绍几种主要的 IO 模型,包括阻塞 IO、非阻塞 IO、IO 多路复用和异步 IO,其中 IO 多路复用是后文介绍 Epoll 的基础。

  • Epoll:介绍 Epoll 的工作原理并提供一个简单的 Epoll server 的实现例子。Epoll 是 Linux 中 IO 多路复用的一种实现,是后文介绍异步运行时的基础。

  • 异步运行时:通过实现一个简单的异步运行时来介绍 ReactorWakerExecutorTask 的基本概念。

References

异步编程

为了避免歧义,本书中的异步编程特指在 Rust 中使用 async/await 关键字进行编写异步代码。

通过 async 关键字创建的异步函数或者异步块会返回一个实现了 Future trait 的类型,其本质上是一个协程对象。将 async/await 关键字和异步运行时结合使用就可以实现对多个协程对象的调度执行,从而达到并发执行的效果。

在 Rust 中主要应用的是 进程—线程—协程 异步模型,如下所示:

下层是进程,进程是持有资源的最小单位;中层是线程,线程不持有资源,是CPU调度的最小单位;上层是协程,协程既不持有资源、也不在意CPU的调度,它仅仅关注的是“协作式的、自然的流程切换”。

异步运行时就负责调度执行上述的协程对象。例如在一个协程在等待 IO 时,这个协程会主动出让自己的执行权给异步运行时,这时异步运行时可以调度运行其他的协程,从而最大化地利用CPU时间片。在 IO 密集型的应用中,异步编程将能够极大地提高执行效率。

async/await

fnclosureblock前使用 async 关键字,会将标记的代码转化为一个 Future。因此,async 标记的代码不会立即运行,只有在 Future 上调用 .await 时才会计算运行 Future。而在 await 一个 Future 时,会暂停当前函数的执行,直到 executor 完成对该 Future 的计算。

以上是对 async/await 语义的基本介绍。在本章中,我们将会更加深入地介绍 async/await 的使用和它们的底层原理。

async/await 的使用

async/await 是 Rust 中特殊的语法,它使得让出当前线程的控制权而不阻塞线程成为可能,从而允许在等待一个操作完成时可以运行其他代码。

有两种主要的方式使用 asyncasync fnasync {}。这两中使用方式都会返回一个实现了 Future trait 的值:

// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。
// `foo().await` 将会产生一个 u8 类型的值。
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

async fnasync {} 返回的 Future 是惰性的:在真正开始运行之前它什么也不会做。运行一个 Future 的最普遍的方式是 await 这个 FutureFuture.await

await 一个 Future 时,会暂停当前函数的运行,直到完成对 Future 的运行。如果这个 Future 被阻塞住了(例如等待网络IO),它会让出当前线程的控制权。当 Future 中的阻塞操作就绪时(例如等待的网络IO返回了响应),executor 会通过 poll 会恢复 Future 的运行。

async lifetime

与普通的函数不一样,async fn 会获取引用或其他非静态生命周期的参数,然后返回被这些参数的生命周期约束的 Future

async fn foo(x: &u8) -> u8 { *x }

// 这与上面的函数完全等价
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

这意味着,async fn 返回的 Future 必须在非静态生命周期参数仍然有效时 .await。在大多数情况下,我们在调用 async 函数后会立马 .await(例如 foo(&x).await),因此 async lifetime 不会对执行产生什么影响。但是,如果我们存储这种 Future 或者发送给其他的 task 或者 thread,就可能会造成问题。

把带有引用参数的 async fn 转化为静态 Future 的解决方法是:把参数和对 async fn 的调用封装到 async 块中:

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

通过把参数移动到 async 块中,我们把它的生命周期扩展到了匹配调用 good 返回的 Future 的生命周期。

async move

async 块和闭包允许像普通闭包那样使用 move 关键字。一个 async move 块会获取变量的所有权,但是这会导致无法与其他的代码共享这些变量:

// 不同的 async 块可以访问相同的变量s,只要它们都在s的作用域范围内执行
async fn blocks() {
    let s = String::new("Hello World");
    let future_one = async {
        println!("{:?}", s);
    };
    let future_two = async {
        println!("{:?}", s);
    };
    
    futures::future::join(future_one, future_two); // need run in cargo with futures crate
}

// s 被 move 进行 async 块中,因此只能在该 async 块内才能访问 
fn move_block() -> impl Future<Output = ()> {
    let s = String::from("Hello World");
    async move {
        println!("{:?}", s);
    }
}

Future trait

在前文中,我们提到使用 async 标记的 fnblockclosure 都会返回一个 Future,本节将会详细地介绍 Future 的概念。

在标准库中,Future 的定义如下所示:

pub trait Future {
    type Output;	// Future计算完成时产生的值的类型
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Future 表示一个异步计算,或者说会在未来完成计算的操作。Future 的核心是 poll 方法,当调用 poll 方法时会尝试计算 Future 得到最终的值。如果值还没有准备好(例如等待某些事件发生),则此方法不会阻塞,而是会直接返回一个结果表示 Future 还没有计算完毕。

注意:Future trait 中涉及到的 Pin 将会在后面的章节中介绍。

poll

在上面对 Future 的介绍中,我们简要提到了 poll 方法,下面我们会对 poll 方法进行更详细的介绍。当调用 Futurepoll 方法时会返回一个枚举类型的值:

  • Poll::Pending,表示这个 Future 还没计算完成
  • Poll::Ready(val),表示这个 Future 计算完毕,并附带计算结果:val

如果 Future 没有计算完成,例如想要等待一个 IO 事件发生,那么在 poll 方法体内,我们通常会调用传递给 poll 方法的 Contextwaker 方法拿到一个 Waker(通常把 Waker 叫做唤醒器),然后注册这个 Waker 到一个“事件通知系统”中,最后返回 Pending 表示 Future 没有计算完成。

在未来某一时刻,Future 等待的 IO 事件就绪了,那么“事件通知系统”就会利用我们注册的 Waker 通过某种唤醒机制唤醒这个 Future,通过 poll 继续计算执行该 Future

通过 Waker 唤醒器,我们可以只在 Future 想要等待的事件就绪时,才去唤醒 Future。这样我们就不需要通过一个死循环不断的调用 poll 方法来驱动 Future 的执行,这是异步编程之所以高效的关键所在。

小栗子

下面我们使用一个具体的例子来介绍 Future trait 的使用。

假设我们准备读取一个 socket,但是它可能还没有准备好数据。如果数据准备好了,我们就可以读取它然后然后返回 Poll::Ready(data),但是如果数据没有准备好,我们可以注册一个唤醒器到“事件通知系统”中:

struct SocketRead<'a> {
	socket: &'a Socket
}

impl<'a> Future for SocketRead<'a> {
	type Output = Vec<u8>;
	
	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {
		let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);
		match data {
			Some(data) => Poll::Ready(data),
			None => {
				REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());
				Poll::Pending
			}
		}
	}
}

代码中的 REACTOR 就是前文中所提到过的“事件通知系统”。当 socket 中有数据可读时,REACTOR 就会使用注册的 Waker 唤醒负责 SocketRead ,然后调用 poll 方法再次计算该 Future

Leaf / Non-leaf Future

在前文中我们提到使用 async 关键字可以创建一个 Future 类型,而在上面的小栗子中我们通过实现 Future trait 的方式也创建了一个 Future 类型,那么这两个 Future 有什么区别呢?

Leaf Future

通过为我们的自定义类型实现 Future trait 的方式创建的 Future 被称为 Leaf Future。例如上面的小栗子中的 SocketRead 类型:

struct SocketRead<'a> {
	socket: &'a Socket
}

impl<'a> Future for SocketRead<'a> {
	/
}

Leaf Future 中通常会涉及到对 IO 的操作,例如从一个 socket 中读取数据,并且对 IO 的操作是非阻塞式的。

当调用异步运行时提供的异步读 socket 的方法时就会返回上述的 Future

impl async_runtime {
	fn read_socket(&self) -> SocketRead {
		// ...
	}
}

let mut leaf_future: SocketRead = async_runtime.read_socket();

通常情况下,这些 Leaf Future 都是由异步运行时自己创建的,用户只需要使用 async/await 关键字即可。

Non-leaf Future

Non-leaf Future 是我们使用 async 关键字创建 Future,并且会由 async runtime 来调度运行。

Non-leaf Future 中可以创建多个 Leaf Future, 并且通过 await Leaf Future 来完成对 IO 的操作:

let non_leaf_future = async {
	let data = async_runtime.read_socket().await;
	println!("Receive data: {:?}", data);
	
	let data = async_runtime.read_socket().await;
	println!("Receive data: {:?}", data);
	
	let data = async_runtime.read_socket().await;
	println!("Receive data: {:?}", data);
}

await 一个 Leaf Future 时,如果返回的是 Pending,那么Non-Leaf Future 就会让出对当前线程的控制权,此时 async runtime 就能够调度执行其他的 Non-Leaf Future 。当 Non-Leaf Future 中的 IO 操作就绪时,async runtime 就会重新激活挂起的 Future,在上次离开的地方继续运行

Generator

Future 的底层依赖于生成器,因此在本节中我们将会介绍生成器的概念,以及生成器是如何转化为 Future 的。

Generator 定义

Generator 的定义位于标准库的 ops 模块中,具体如下所示:

pub trait Generator<R = ()> {
    type Yield;
    type Return;
    fn resume(
        self: Pin<&mut Self>, 
        arg: R
    ) -> GeneratorState<Self::Yield, Self::Return>;
}

pub enum GeneratorState<Y, R> {
    Yielded(Y),
    Complete(R),
}

Generator 通常也被称为协程,主要目的是为 async/await 语法提供构建块,但是未来也可能会扩展到为 Iterator 和其他类型提供符合人体工程学的定义。

Generator 的关联类型 Yield 对应于使用yield 表达式产出的值的类型。

Generator 的关联类型 Return 对应于使用 return 语句或者生成器中的最后一个表达式返回的值的类型。

注意:Generator trait 中涉及到的 Pin 将会在后面的章节中介绍。

resume

调用 Generatorresume 方法会恢复生成器的运行,如果还没有启动生成器的话则会启动生成器。

在执行生成器的过程中,如果遇到 yield 表达式,那么生成器就会在这个 yield 点挂起,并产出 yield 表达式的值:GeneratorState::Yielded(Y)。当再次调用 resume 方法时生成器就会在挂起的 yield 点恢复运行。

在运行过程中,如果遇到的是 return 语句或者生成器末尾的最后一个表达式,那么生成器执行完毕,并返回 GeneratorState::Complete(R)R 就是 return 语句或者末尾表达式的值。

如果生成器已经执行完毕,返回了 GeneratorState::Complete,那么当再次调用 Generatorresume 方法时将会导致 panic

Generator 使用

在闭包中使用 yield 关键字就可以创建一个生成器:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};


fn main() {
    let mut gen = || {
        for i in 0..10 {
            yield i;
        }
        
        return ();
    };
    
    loop {
        match Pin::new(&mut gen).resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(r) => {
                println!("Complete: {:?}", r);
                break;
            }
        }
    }
}

通过为自定义类型实现 Generator trait 来创建生成器:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};


fn main() {
    let mut gen = MyGenerator { counter: 1, completed: false };
    
    loop {
        match Pin::new(&mut gen).resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(r) => {
                println!("Complete: {}", r);
                break;
            }
        }
    }
}


struct MyGenerator {
    counter: i32,
    completed: bool
}


impl<R> Generator<R> for MyGenerator {
    type Yield = i32;
    type Return = char;
    
    fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
        if self.completed {
            panic!("MyGenerator has been completed.");
        }
        
        let counter = self.counter;
        if counter < 10 {
            self.get_mut().counter = counter + 1;
            GeneratorState::Yielded(counter)
        } else {
            self.get_mut().completed = true;
            GeneratorState::Complete('🎉')
        }
    }
}

把生成器当作迭代器使用:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::iter::Iterator;
use std::ops::{Generator, GeneratorState};


fn main() {
    let gen = MyGenerator { counter: 0, completed: false };
    
    for val in gen {
        println!("Got: {}", val);
    }

}


struct MyGenerator {
    counter: i32,
    completed: bool
}


impl<R> Generator<R> for MyGenerator {
    type Yield = i32;
    type Return = ();
    
    fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
        if self.completed {
            panic!("MyGenerator has been completed.");
        }
        
        let counter = self.counter;
        if counter < 10 {
            self.get_mut().counter = counter + 1;
            GeneratorState::Yielded(counter)
        } else {
            self.get_mut().completed = true;
            GeneratorState::Complete(())
        }
    }
}

impl Iterator for MyGenerator {
    type Item = i32;
    
    fn next(&mut self) -> Option<Self::Item> {
        match Pin::new(self).resume(()) {
            GeneratorState::Yielded(y) => Some(y),
            GeneratorState::Complete(_) => None
        }
    }
}

From Generator to Future

Rust 的 core 库中的 future 模块定义了将生成器转化为 Future 的函数(为了便于阅读去掉了注释部分):

pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
	where T: Generator<ResumeTy, Yield = ()>
{
    struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

    impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}

    impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
        type Output = T::Return;
        
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
   
            match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
                GeneratorState::Yielded(()) => Poll::Pending,
                GeneratorState::Complete(x) => Poll::Ready(x),
            }
        }
    }

    GenFuture(gen)
}

从源码中可以看出,实际上我们使用 async 创建的 Future 是一个实现了 Future trait 的结构体 GenFuture,这个结构体的内部是一个生成器。

在我们调用 Futurepoll 方法时,实际上就是在调用底层的生成器的 resume 方法,并且生成器返回的 GeneratorState::Yielded/Complete(val) 会被分别转化为 poll 的返回类型:Poll::Pending/Ready(val)

小栗子

在本节的最后,我们通过一个小栗子把前面讲的 async/awaitFutureGenerator 的知识串联起来。

有如下的代码:

#![allow(unused)]
fn main() {
#[inline(never)]
async fn foo() -> i32 {
    10
}

#[inline(never)]
async fn bar() -> i32 {
    foo().await
}
}

HIR 是 Rust 代码编译的中间产物,可以帮助我们直到代码在脱糖后是什么样子。可以使用 Rust PlaygroundHIR 功能编译上述代码,结果如下:

#[inline(never)]
async fn foo()
    ->
        /*impl Trait*/ #[lang = "from_generator"](move |mut _task_context|
        { { let _t = { 10 }; _t } })

#[inline(never)]
async fn bar()
    ->
        /*impl Trait*/ #[lang = "from_generator"](move |mut _task_context|
        {
                {
                        let _t =
                            {
                                    match #[lang = "into_future"](foo()) {
                                            mut pinned =>
                                                loop {
                                                        match unsafe {
                                                                            #[lang = "poll"](#[lang = "new_unchecked"](&mut pinned),
                                                                                #[lang = "get_context"](_task_context))
                                                                        } {
                                                                #[lang = "Ready"] { 0: result } => break result,
                                                                #[lang = "Pending"] {} => { }
                                                            }
                                                        _task_context = (yield ());
                                                    },
                                        }
                                };
                        _t
                    }
            })

原生的 HIR 代码难以阅读,我们将其转化为下面的 Rust 伪代码:

#[inline(never)]
async fn foo() -> impl Future<Output = i32> {
    from_generator(move |mut _task_context| {
        let _t = 10;
        _t
    })
}

#[inline(never)]
async fn bar() -> impl Future<Output = i32> {
    from_generator(move |mut _task_context| {
        let _t = {
            match into_future(foo()) {
                mut pinned => {
                    loop {
                        match unsafe Pin::new_unchecked(&mut pinned).poll(get_context(_task_context)) {
                            Poll::Ready(result) => break result,
                            Poll::Pending => {}
                        }
                        _task_context = (yield ());
                    }
                }
            }
        };
        _t
    })
}

可以看到 async 函数体内的代码被转化成了一个生成器,然后再调用 from_generator 函数传入生成器创建一个 Future ,这与我们上面介绍的 from_generator 函数的功能一致。

await 部分则被转化为了一个无限循环,在循环的内部会调用 awaitFuturepoll 方法,如果结果是 Poll::Ready,则终止循环并返回 result,继续执行剩余的代码;如果结果是 Poll::Pending,则会使用 yield 挂起生成器,将控制权转移给调用方。当调用方激活这个挂起的生成器时,生成器就会恢复运行,执行循环体中的代码。

因此,只有当 awaitFuture 执行完毕时,才会继续往下执行 async 块中的代码,这样就确保了能够以同步的方式编写异步代码,让我们能拥有良好的开发体验。

状态机

在上一节中,我们讲到生成器执行到 yield 表达式时,会在这个 yield 点挂起,当再次激活生成器时会在挂起的 yield 点恢复运行,那么生成器是怎么保存在 yield 点挂起时的状态呢?

事实上,编译器会把生成器转化为一个状态机,状态机中会保存每一个 yield 点的生成器的执行状态。

假如我们写了一个如下所示的生成器:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};

fn main() {
    let mut gen = || {
        yield 1;
        yield 2;

        ()
    };

    loop {
        match Pin::new(&mut gen).resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(c) => {
                println!("Complete: {:?}", c);
                break;
            }
        }
    }
}

编译器会把生成器转化为下面的代码:

#![feature(generators, generator_trait)]

use std::mem;
use std::pin::Pin;
use std::ops::{Generator, GeneratorState};

fn main() {
    let mut gen = MyGenerator::new();

    loop {
        match Pin::new(&mut gen).resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(c) => {
                println!("Complete: {:?}", c);
                break;
            }
        }
    }
}

enum MyGenerator {
    Enter,
    State1(i32),
    State2(i32),
    Exit
}

impl<R> Generator<R> for MyGenerator {
    type Yield = i32;
    type Return = ();

    fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
        let mut_gen = self.get_mut();
        match mem::replace(mut_gen, MyGenerator::Exit) {
            MyGenerator::Enter => {
                *mut_gen = MyGenerator::State1(1);
                GeneratorState::Yielded(1)
            }
            MyGenerator::State1(_) => {
                *mut_gen = MyGenerator::State2(2);
                GeneratorState::Yielded(2)
            }
            MyGenerator::State2(_) => {
                *mut_gen = MyGenerator::Exit;
                GeneratorState::Complete(())
            }
            MyGenerator::Exit => panic!("Generator has been completed.")
        }
    }
}

impl MyGenerator {
    fn new() -> Self {
        Self::Enter
    }
}

同时,由于每个 async 函数最终都会生成一个状态机,并且每个可执行文件都会捆绑一个异步运行时,这会导致异步的 Rust 代码在编译后产生更大的二进制体积,这也是 async Rust 的一个小缺点。

Pin

前文的 Future traitGeneartor 和状态机中都出现了 Pin,那么 Pin 到底有什么用呢? 在本节中,我们将会详细地介绍它。

自引用结构

在 Safe Rust 中,我们无法创建自引用结构体:

fn main() {
    let s = "Hello World".to_string();
    let _ = SelfReference {
        a: s,
        b: &s
    };
}

struct SelfReference<'a> {
	a: String,
	b: &'a String
}

如果编译,将会发生报错:

error[E0382]: borrow of moved value: `s`
 --> src/main.rs:5:12
  |
2 |     let s = "Hello World".to_string();
  |         - move occurs because `s` has type `String`, which does not implement the `Copy` trait
3 |     let _ = SelfReference {
4 |         a: s,
  |            - value moved here
5 |         b: &s
  |            ^^ value borrowed here after move

这是因为 s 已经发生了 move,因此 b 就不能借用已经 move 了的 s

为了创建自引用结构,我们需要使用裸指针:

fn main() {
    let mut sr_1 = SelfReference::new("Hello");
    sr_1.init();
    
    let mut sr_2 = SelfReference::new("World");
    sr_2.init();
    
    println!("sr_1: {{ a: {}, b: {} }}", sr_1.get_a(), sr_1.get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2.get_a(), sr_2.get_b());
}

#[derive(Debug)]
struct SelfReference {
	a: String,
	b: *const String
}

impl SelfReference {
    fn new(msg: &str) -> Self {
        Self {
            a: msg.to_string(),
            b: std::ptr::null()
        }
    }
    
    fn init(&mut self) {
        let ptr_to_a = &self.a as *const _;
        self.b = ptr_to_a;
    }
    
    fn get_a(&self) -> &str {
        &self.a
    }
    
    fn get_b(&self) -> &str {
        unsafe {
            &*self.b
        }
    }
}

编译运行,结果如下所示:

sr_1: { a: Hello, b: Hello }
sr_2: { a: World, b: World }

接下来,让我们交换 sr_1sr_2 的内存位置的数据,即 sr_1sr_2 互相 move 给对方:

fn main() {
    let mut sr_1 = SelfReference::new("Hello");
    sr_1.init();
    
    let mut sr_2 = SelfReference::new("World");
    sr_2.init();
    
    println!("Before swap:");
    println!("sr_1: {{ a: {}, b: {} }}", sr_1.get_a(), sr_1.get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2.get_a(), sr_2.get_b());
    
    std::mem::swap(&mut sr_1, &mut sr_2);
    println!("\nAfter swap:");
    println!("sr_1: {{ a: {}, b: {} }}", sr_1.get_a(), sr_1.get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2.get_a(), sr_2.get_b());
}

#[derive(Debug)]
struct SelfReference {
	 a: String,
	 b: *const String
}
impl SelfReference {
   fn new(msg: &str) -> Self {
       Self {
           a: msg.to_string(),
           b: std::ptr::null()
       }
   } 
   fn init(&mut self) {
       let ptr_to_a = &self.a as *const _;
       self.b = ptr_to_a;
   }   
   fn get_a(&self) -> &str {
       &self.a
   }    
   fn get_b(&self) -> &str {
       unsafe {
           &*self.b
       }
   }
}

编译运行,结果如下所示:

Before swap:
sr_1: { a: Hello, b: Hello }
sr_2: { a: World, b: World }

After swap:
sr_1: { a: World, b: Hello }
sr_2: { a: Hello, b: World }

可以看出,在交换 sr_1sr_2 后,字段 a 的数据也发生了交换,但是字段 b 的数据没有改变,仍然指向之前的位置,如图所示:

swap problem

这意味着,srsr_1sr_2)将不再是自引用结构体,并保存了一个指向其他对象的裸指针。因此,sr 的字段 b 的生命周期将不再和其结构体本身相关联,我们将难以保证 sr.b 指针不会变成悬垂指针。

在上面的例子中,由于使用 swap 函数导致出现了我们不想要的结果,在后续的代码中对 sr 的使用很可能会出现段错误、UB 等其他类型的错误。

Let's pin it!

Rust 是一门极为注重内存安全的语言,为了能够安全地使用自引用结构,Rust 发明了 Pin

Pin

Pin 位于 std 库的 pin 模块中,源代码定义如下所示:

#[stable(feature = "pin", since = "1.33.0")]
#[lang = "pin"]
#[fundamental]
#[repr(transparent)]
#[derive(Copy, Clone)]
pub struct Pin<P> {
    pointer: P,
}

#[stable(feature = "pin", since = "1.33.0")]
impl<P: Deref> Deref for Pin<P> {
    type Target = P::Target;
    fn deref(&self) -> &P::Target {
        Pin::get_ref(Pin::as_ref(self))
    }
}

#[stable(feature = "pin", since = "1.33.0")]
impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
    fn deref_mut(&mut self) -> &mut P::Target {
        Pin::get_mut(Pin::as_mut(self))
    }
}

Pin 实现了 DerefDerefMut trait,因此 Pin 是一个智能指针。并且 Pin 的内部包裹了另一个指针 P,因此我们一般使用 Pin<P<T>> 的方式来表示一个 Pin 结构(T 是指针 P 指向的类型)。

既然有 Pin,那么自然就有 Unpin,那么 Unpin 是什么呢?Unpin 是一个 auto trait,编译器会默认为所有的类型实现 Unpin,除非这些类型实现了 !Unpin

要想获取 Pin<P<T>>T 的可变引用 &mut T,可以使用 Pin 提供的 get_mut 方法,这也是 Pin 提供的 api唯一可以安全地获取 &mut T 的方法,其函数签名如下所示:

pub fn get_mut(self) -> &'a mut T
where
    T: Unpin,

发现了吗?要想安全地拿到 &mut TT 就必须实现 Unpin。如果 T 实现了 !Unpin,那么就不可能安全地拿到 T 的可变引用,我们自然也就无法使用 std::mem::swap(x: &mut T, y: &mut T) 等类似的函数 move T,就不会发生前文的例子中出现的未定义行为。

因此,Pin<P<T>> 利用 Rust 的类型系统保证:如果 T 实现了 !Unpin,那么就不可能在 Safe Rust 中获取 T 的可变引用。相反,如果 T 实现了 Unpin,那么 Pin 就仅仅是对 P<T> 的一层包装,我么可以随意地拿到 &mut T

接下来,我们将会使用 Pin 解决上面的那个例子中出现的问题。

Pin to stack

Pin 到栈上是指我们想要 Pin 住的值在栈上,使用 Pin::new_unchecked 函数把 &mut T 包装成 Pin<&mut T> 即可:

#![feature(negative_impls)]
use std::pin::Pin;

fn main() {
    let mut sr_1 = SelfReference::new("Hello");
    let mut sr_1 = unsafe { Pin::new_unchecked(&mut sr_1) };
    sr_1.as_mut().init();
    
    let mut sr_2 = SelfReference::new("World");
    let mut sr_2 = unsafe { Pin::new_unchecked(&mut sr_2) };
    sr_2.as_mut().init();
    
    println!("Before swap:");
    println!("sr_1: {{ a: {}, b: {} }}", sr_1.as_ref().get_a(), sr_1.as_ref().get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2.as_ref().get_a(), sr_2.as_ref().get_b());
    
    println!("If we want to swap:");
    std::mem::swap(sr_1.get_mut(), sr_2.get_mut());
}

#[derive(Debug)]
struct SelfReference {
	a: String,
	b: *const String
}

impl !Unpin for SelfReference {}

impl SelfReference {
    fn new(msg: &str) -> Self {
        Self {
            a: msg.to_string(),
            b: std::ptr::null()
        }
    }
    
    fn init(self: Pin<&mut Self>) {
        let ptr_to_a = &self.a as *const _;
        unsafe {
            self.get_unchecked_mut().b = ptr_to_a;
        }
    }
    
    fn get_a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }
    
    fn get_b(self: Pin<&Self>) -> &str {
        unsafe {
            &*self.b
        }
    }
}

此时代码将不会通过编译:

error[E0277]: `SelfReference` cannot be unpinned
   --> src/main.rs:18:25
    |
18  |     std::mem::swap(sr_1.get_mut(), sr_2.get_mut());
    |                         ^^^^^^^ the trait `Unpin` is not implemented for `SelfReference`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

error[E0277]: `SelfReference` cannot be unpinned
   --> src/main.rs:18:41
    |
18  |     std::mem::swap(sr_1.get_mut(), sr_2.get_mut());
    |                                         ^^^^^^^ the trait `Unpin` is not implemented for `SelfReference`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

这说明当我们把 &mut SelfReference Pin 到栈上之后,无法通过 get_mut 方法拿到 &mut SelfReference,那么自然就无法使用 swap 函数,在编译阶段就保证了不会出现内存安全问题。

Pin::new_unchecked 是一个 unsafe 函数,这是因为需要使用者自己遵守约定只使用 Pin 提供的 api 来获取并使用可变引用。

假如使用者提前 dropPin,这样就可以直接获取 T 的可变引用,仍然会导致内存安全问题:

#![feature(negative_impls)]
use std::pin::Pin;

fn main() {
    let mut sr_1 = SelfReference::new("Hello");
    let mut sr_1_pin = unsafe { Pin::new_unchecked(&mut sr_1) };
    sr_1_pin.as_mut().init();
    
    let mut sr_2 = SelfReference::new("World");
    let mut sr_2_pin = unsafe { Pin::new_unchecked(&mut sr_2) };
    sr_2_pin.as_mut().init();
    
    println!("Before swap:");
    println!("sr_1: {{ a: {}, b: {} }}", sr_1_pin.as_ref().get_a(), sr_1_pin.as_ref().get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2_pin.as_ref().get_a(), sr_2_pin.as_ref().get_b());
    
    drop(sr_1_pin);
    drop(sr_2_pin);
    

    println!("\nAfter swap:");
    std::mem::swap(&mut sr_1, &mut sr_2);
    
    let sr_1_pin = unsafe { Pin::new_unchecked(&mut sr_1) };
    let sr_2_pin = unsafe { Pin::new_unchecked(&mut sr_2) };
    println!("sr_1: {{ a: {}, b: {} }}", sr_1_pin.as_ref().get_a(), sr_1_pin.as_ref().get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2_pin.as_ref().get_a(), sr_2_pin.as_ref().get_b());
}
#[derive(Debug)]
struct SelfReference {
	a: String,
	b: *const String
}
impl !Unpin for SelfReference {}
impl SelfReference {
   fn new(msg: &str) -> Self {
       Self {
           a: msg.to_string(),
           b: std::ptr::null()
       }
   }    
   fn init(self: Pin<&mut Self>) {
       let ptr_to_a = &self.a as *const _;
       unsafe {
           self.get_unchecked_mut().b = ptr_to_a;
       }
   }
   fn get_a(self: Pin<&Self>) -> &str {
       &self.get_ref().a
   }   
   fn get_b(self: Pin<&Self>) -> &str {
       unsafe {
           &*self.b
       }
   }
}    

编译运行,将会出现和之前的例子中一样的问题:

Before swap:
sr_1: { a: Hello, b: Hello }
sr_2: { a: World, b: World }

After swap:
sr_1: { a: World, b: Hello }
sr_2: { a: Hello, b: World }

Pin to heap

Pin 到堆上是指把我们想要 Pin 住的值装箱到堆上面,使用Box::pin 函数即可把 T 包装成 Pin<Box<T>>

#![feature(negative_impls)]
use std::pin::Pin;

fn main() {
    let mut sr_1 = SelfReference::new("Hello");
    let mut sr_2 = SelfReference::new("World");
    
    println!("Before swap:");
    println!("sr_1: {{ a: {}, b: {} }}", sr_1.as_ref().get_a(), sr_1.as_ref().get_b());
    println!("sr_2: {{ a: {}, b: {} }}", sr_2.as_ref().get_a(), sr_2.as_ref().get_b());
    
    println!("If we want to swap:");
    std::mem::swap(sr_1.as_mut().get_mut(), sr_2.as_mut().get_mut());
    
}

#[derive(Debug)]
struct SelfReference {
	a: String,
	b: *const String
}

impl !Unpin for SelfReference {}

impl SelfReference {
    fn new(msg: &str) -> Pin<Box<Self>> {
        let sr = Self {
            a: msg.to_string(),
            b: std::ptr::null()
        };
        let mut boxed = Box::pin(sr);
        let ptr_to_a = &boxed.a as *const _;
        unsafe {
            boxed.as_mut().get_unchecked_mut().b = ptr_to_a;
        }
        
        boxed
    }
    
    fn get_a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }
    
    fn get_b(self: Pin<&Self>) -> &str {
        unsafe {
            &*self.b
        }
    }
}

此时代码将不会通过编译:

error[E0277]: `SelfReference` cannot be unpinned
   --> src/main.rs:13:34
    |
13  |     std::mem::swap(sr_1.as_mut().get_mut(), sr_2.as_mut().get_mut());
    |                                  ^^^^^^^ the trait `Unpin` is not implemented for `SelfReference`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

error[E0277]: `SelfReference` cannot be unpinned
   --> src/main.rs:13:59
    |
13  |     std::mem::swap(sr_1.as_mut().get_mut(), sr_2.as_mut().get_mut());
    |                                                           ^^^^^^^ the trait `Unpin` is not implemented for `SelfReference`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

Pin 到堆上的优点是不需要使用者编写 unsafe 函数来构造 Pin,也不需要使用者自己遵守约定只使用 Pin 提供的 api 来获取可变引用,因为 Pin 到堆上后,用户只能使用 Pin<Box<T>>;缺点是 Pin 到堆上会有额外的性能开销。

Pin and async

在前文中我们给出了 FutureGenerator 的定义:

pub trait Future {
    type Output;	
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub trait Generator<R = ()> {
    type Yield;
    type Return;
    fn resume(
        self: Pin<&mut Self>, 
        arg: R
    ) -> GeneratorState<Self::Yield, Self::Return>;
}

还有将 Generator 转化为 Future 的函数:

pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
    where T: Generator<ResumeTy, Yield = ()>
{
    struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

    impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}

    impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
        type Output = T::Return;
        
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
   
            match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
                GeneratorState::Yielded(()) => Poll::Pending,
                GeneratorState::Complete(x) => Poll::Ready(x),
            }
        }
    }

    GenFuture(gen)
}

可以看到要调用 FuturePoll 方法和 Generatorresume 方法必须使用 Pin<&mut Self> 才行。并且在 from_generator 函数中为 GenFuture 实现了 !Unpin

经过前面的学习,我们知道为 T 实现了 !Unpin 后,就无法在 Safe Rust 中获取 T 的可变引用,而 Rust 会主动为 Future 实现 !Unpin,那么为什么 Rust 需要 PinFuture 呢?

假设我们编写了一个生成器:

#![feature(generators, generator_trait)]

fn main(){
    let _gen = || {
        let s = "Hello World".to_string();
        let borrowed_s = &s;
        
        yield borrowed_s.len();
        
        println!("{}", borrowed_s);
    };
}

编译后将会发生报错:

error[E0626]: borrow may still be in use when generator yields
 --> src/main.rs:6:26
  |
6 |         let borrowed_s = &s;
  |                          ^^
7 |         
8 |         yield borrowed_s.len();
  |         ---------------------- possible yield occurs here

编译器提示我们生成器中存在跨 yield 借用,那么为什么编译器不允许跨 yield 借用呢?

想要知道原因,我们还要继续深入底层,上述的生成器会被编译成一个状态机:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};

fn main() {
    let mut gen = Gen::new();
    
    loop {
        match Pin::new(&mut gen).resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(c) => {
                println!("Complete: {:?}", c);
                break;
            }
        }
    }
}

enum Gen {
    Enter,
    Yielded{
        s: String,
        borrowed_s: *const String
    },
    Exit
}


impl<R> Generator<R> for Gen {
    type Yield = usize;
    type Return = ();
    
    fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
        let mut_gen = self.get_mut();
        match mut_gen {
            Gen::Enter => {
                let s = "Hello World".to_string();
                let borrowed_s = &s;
                let len = borrowed_s.len();
                
                *mut_gen = Gen::Yielded {
                    s,
                    borrowed_s: std::ptr::null()
                };
                if let Gen::Yielded { s, borrowed_s } = mut_gen {
                    *borrowed_s = s as *const _;
                }
                
                GeneratorState::Yielded(len)
            }
            Gen::Yielded{ borrowed_s, .. } => {
                let borrowed_s: &String = unsafe { &**borrowed_s };
                println!("{}", borrowed_s);
                *mut_gen = Gen::Exit;
                
                GeneratorState::Complete(())
            }
            Gen::Exit => panic!("Generator has been completed.")
        }
    }
}

impl Gen {
    fn new() -> Self {
        Self::Enter
    }
}

编译上述代码,结果似乎就是我们所期待的:

Yielded: 11
Hello World
Complete: ()

从上述的代码中可以看出,生成的状态机中存在自引用结构。因此如果生成器中存在跨 yield 点借用,那么就可能产生内存安全问题,编译器干脆就禁止存在跨 yield 点借用的生成器通过编译。

例如,如果我们使用 swap 函数 move 生成器就可能发生异常:

#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};

fn main() {
    let mut gen_1 = Gen::new();
    let mut gen_2 = Gen::new();
    
    match Pin::new(&mut gen_1).resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
    match Pin::new(&mut gen_2).resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
    
    std::mem::swap(&mut gen_1, &mut gen_2);
    
    match Pin::new(&mut gen_1).resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
    match Pin::new(&mut gen_2).resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
}
enum Gen {
   Enter,
   Yielded{
       s: String,
       borrowed_s: *const String
   },
   Exit
}
impl<R> Generator<R> for Gen {
   type Yield = usize;
   type Return = ();    
   fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
       let mut_gen = self.get_mut();
       match mut_gen {
           Gen::Enter => {
               let s = "Hello World".to_string();
               let borrowed_s = &s;
               let len = borrowed_s.len();
               
               *mut_gen = Gen::Yielded {
                   s,
                   borrowed_s: std::ptr::null()
               };
               if let Gen::Yielded { s, borrowed_s } = mut_gen {
                   *borrowed_s = s as *const _;
               }               
               GeneratorState::Yielded(len)
           }
           Gen::Yielded{ borrowed_s, .. } => {
               let borrowed_s: &String = unsafe { &**borrowed_s };
               println!("{}", borrowed_s);
               *mut_gen = Gen::Exit;               
               GeneratorState::Complete(())
           }
           Gen::Exit => panic!("Generator has been completed.")
       }
   }
}
impl Gen {
   fn new() -> Self {
       Self::Enter
   }
}

编译运行将会发生段错误:

/playground/tools/entrypoint.sh: line 11:    12 Segmentation fault
Yielded: 11
Yielded: 11
Hello World
Complete: ()

为了防止 move 掉生成器,我们需要为 Gen 实现 !Unpin

#![feature(negative_impls)]
#![feature(generators, generator_trait)]

use std::pin::Pin;
use std::ops::{Generator, GeneratorState};

fn main() {
    let mut gen_1 = Gen::new();
    let mut gen_2 = Gen::new();
    
    let mut boxed_pin_1 = Box::pin(gen_1);
    let mut boxed_pin_2 = Box::pin(gen_2);
    
    match boxed_pin_1.as_mut().resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
    match boxed_pin_2.as_mut().resume(()) {
        GeneratorState::Yielded(y) => println!("Yielded: {}", y),
        GeneratorState::Complete(c) => println!("Complete: {:?}", c)
    }
    
    std::mem::swap(boxed_pin_1.as_mut().get_mut(), boxed_pin_2.as_mut().get_mut());
}

enum Gen {
    Enter,
    Yielded{
        s: String,
        borrowed_s: *const String
    },
    Exit
}

impl !Unpin for Gen {}

impl<R> Generator<R> for Gen {
   type Yield = usize;
   type Return = ();    
   fn resume(self: Pin<&mut Self>, _arg: R) -> GeneratorState<Self::Yield, Self::Return> {
       let mut_gen = unsafe { self.get_unchecked_mut() };
       match mut_gen {
           Gen::Enter => {
               let s = "Hello World".to_string();
               let borrowed_s = &s;
               let len = borrowed_s.len();
               
               *mut_gen = Gen::Yielded {
                   s,
                   borrowed_s: std::ptr::null()
               };
               if let Gen::Yielded { s, borrowed_s } = mut_gen {
                   *borrowed_s = s as *const _;
               }               
               GeneratorState::Yielded(len)
           }
           Gen::Yielded{ borrowed_s, .. } => {
               let borrowed_s: &String = unsafe { &**borrowed_s };
               println!("{}", borrowed_s);
               *mut_gen = Gen::Exit;               
               GeneratorState::Complete(())
           }
           Gen::Exit => panic!("Generator has been completed.")
       }
   }
}
impl Gen {
   fn new() -> Self {
       Self::Enter
   }
}

编译修改后的代码将会直接报错:

error[E0277]: `Gen` cannot be unpinned
   --> src/main.rs:23:41
    |
23  |     std::mem::swap(boxed_pin_1.as_mut().get_mut(), boxed_pin_2.as_mut().get_mut());
    |                                         ^^^^^^^ the trait `Unpin` is not implemented for `Gen`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

error[E0277]: `Gen` cannot be unpinned
   --> src/main.rs:23:73
    |
23  |     std::mem::swap(boxed_pin_1.as_mut().get_mut(), boxed_pin_2.as_mut().get_mut());
    |                                                                         ^^^^^^^ the trait `Unpin` is not implemented for `Gen`
    |
    = note: consider using `Box::pin`
note: required by a bound in `Pin::<&'a mut T>::get_mut`

通过为生成器实现 !Unpin,我们有效的防止了可能会出现的内存安全问题。

但是,我们无法为使用闭包编写的生成器实现 !Unpin,那么怎么让我们的初版代码编译通过呢?答案是使用 static 关键字标记生成器,这就相当于为我们的生成器实现了 !Unpin

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};


fn main(){
    let gen = static || {
        let s = "Hello World".to_string();
        let borrowed_s = &s;
        
        yield borrowed_s.len();
        
        println!("{}", borrowed_s);
    };
    
    let mut boxed_pin_gen = Box::pin(gen);
    
    loop {
        match boxed_pin_gen.as_mut().resume(()) {
            GeneratorState::Yielded(y) => println!("Yielded: {}", y),
            GeneratorState::Complete(c) => {
                println!("Complete: {:?}", c);
                break;
            }
        }
    }
}

编译运行,一切正常:

Yielded: 11
Hello World
Complete: ()

小总结

async 创建的 Future 在编译后会生成一个状态机,如果 async 代码中存在跨 await 借用,那么对应的底层生成器中也会存在跨 yield 点借用,最终生成的状态机中就会存在自引用结构,为了避免可能发生的内存安全问题,Rust 自动为 Future 实现了 !Unpin,并且只能使用 Pin<&mut Self> 来调用 Futurepoll 方法和 Generatorresume 方法,从而避免了使用者在 Safe Rust 中获取 FutureGenerator 的可变引用,最终避免了使用者使用 swap 之类的函数 moveFutureGenerator 而造成的内存安全问题。

Pin 总结

官方的 Async Book 上给出了关于 Pin 的黄金八条:

  1. 如果 T: Unpin(默认会实现),那么 Pin<'a, T> 完全等价于 &'a mut T。换言之: Unpin 意味着这个类型被移走也没关系,就算已经被固定了,所以 Pin 对这样的类型毫无影响。

  2. 如果 T: !Unpin, 获取已经被固定的 T 类型示例的 &mut T需要 unsafe

  3. 标准库中的大部分类型实现 Unpin,在 Rust 中遇到的多数普通类型也是一样。但是, async/await 生成的 Future 是个例外。

  4. 你可以在 nightly 通过特性标记来给类型添加 !Unpin 约束,或者在 stable 给你的类型加 std::marker::PhatomPinned 字段。

  5. 你可以将数据固定到栈上或堆上。

  6. 固定 !Unpin 对象到栈上需要 unsafe

  7. 固定 !Unpin 对象到堆上不需要 unsafeBox::pin可以快速完成这种固定。

  8. 对于 T: !Unpin 的被固定数据,你必须维护好数据内存不会无效的约定,或者叫固定时起直到释放。这是 Pin 约定中的重要部分。

IO 模型

IO 访问

对于一次 IO 访问(例如 read 操作),通常有两个不同的阶段:

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

例如在一个 socket 上读取数据,首先需要等待数据到达网络,当数据到达时将数据拷贝到内核缓冲区中,再将数据从内核缓冲区中拷贝到用户进程的缓冲区中。

正是由于 IO 访问经历的两个阶段,Linux 系统产生了下面五种 IO 模型:

  • 阻塞 IO(blocking IO)
  • 非阻塞 IO(nonblocking IO)
  • IO 多路复用(IO multiplexing)
  • 信号驱动 IO(signal driven IO)
  • 异步 IO(asynchronous IO)

IO 模型与 Future

在介绍 Future trait 的那一章中我们提到:如果一个 Future 没有计算完成,例如想要等待一个 IO 事件发生,那么通常会注册 waker 到一个“事件通知系统”中,当这个 IO 事件就绪时,“事件通知系统”就会通过 waker 唤醒之前的 Future 继续执行。

那么“事件通知系统”要怎么知道 Future 想要等待的 IO 事件什么时候就绪呢?这与 IO 模型有关,因此在本章中我们将会介绍几种不同的 IO 模型以及它们的特点。

阻塞 IO

在 Linux 中,阻塞 IO 是最流行的 IO 模型,默认情况下所有的 socket 都是阻塞的(blocking)。对于阻塞 IO 来说,读操作的流程如下所示:

Blocking IO Model

当用户进程发起 recvfrom 系统调用后,内核开始 IO 的第一个阶段:等待数据准备好,把数据从硬件拷贝到内核缓冲区(对于网络 IO,要先等待数据报文到达)。当数据准备好后,开始 IO 的第二个阶段:把数据从内核缓冲区拷贝到用户进程的缓冲区。当两个 IO 阶段都完成后,recvfrom 系统调用返回,也就是说用户进程从发起 recvfrom 系统调用直到返回都是处于阻塞状态。

因此,对于阻塞 IO 来说,用户进程在 IO 的两个阶段都被 recvfrom 系统调用阻塞了

非阻塞 IO

在 Linux 中,我们可以把一个 socket 设置为非阻塞(nonblocking)。对于非阻塞 IO 来说,读操作的流程如下所示:

Nonblocking IO Model

当用户进程发起 recvfrom 系统调用后,如果数据没有准备好,recvfrom 系统调用会立即返回 EWOULDBLOCK 错误。用户进程可以通过一个死循环不断发起 recvfrom 系统调用,一旦数据准备好了,就进入 IO 的第二个阶段:把数据从内核缓冲区拷贝到用户用进程的缓冲区,当拷贝完成后,recvfrom 系统调用正常返回。

因此,对于 Nonblocking IO 来说,用户进程需要不断轮询内核数据准备好了没有,并且用户进程在 IO 的第二个阶段仍然会被 recvfrom 系统调用阻塞

信号驱动 IO

对于信号驱动 IO 来说,读操作的流程如下所示:

Signal Driven IO

当用户进程发起 sigaction 系统调用后,这个系统调用会马上返回。内核在准备好数据后会向用户进程发送 SIGIO 信号,用户进程收到信号之后会在信号处理程序中发起 recvfrom 系统调用将数据从内核缓冲区复制到用户进程缓冲区中,至此 IO 的两个阶段全部完成。

因此,对于信号驱动 IO 来说,用户进程在 IO 的第二个阶段被 recvfrom 系统调用阻塞了

IO 多路复用

IO 多路复用是指通过一种机制实现在单个线程中可以监视多个文件描述符(例如 socket 描述符),当文件描述读/写就绪时,用户进程就可以获取就绪的文件句柄。selectpollepoll 都是 IO 多路复用的一种实现。

select 为例,读操作的流程如下所示:

IO Multiplexing Model

当用户进程发起 select 系统调用后,用户进程被阻塞,而内核会监控 select 负责的所有文件描述符,当任意一个文件描述符的数据准备好时,select 会返回就绪的文件描述符。此时,用户进程就可以对就绪的文件描述符发起 recvfrom 系统调用,开始 IO 的第二个阶段:将数据从内核缓冲区拷贝到用户进程的缓冲区,当拷贝结束后 recvfrom 调用正常返回。

因此,对于 IO 多路复用来说,用户进程在 IO 的两个阶段都被阻塞了:在 IO 的第一个阶段被 select 系统调用阻塞,在 IO 的第二个阶段被 recvfrom 系统调用阻塞

异步 IO

对于异步 IO 来说,读操作的流程如下所示:

Asynchronous IO Model

当用户进程发起异步框架 AIO 提供的 aio_read 系统调用后,这个系统调用会马上返回。内核会准备好数据然后把数据从内核缓冲区拷贝到用户进程缓冲区,当 IO 的两个阶段都完成后,内核会发送一个信号通知用户进程 read 操作完成了。

因此,对于异步 IO 来说,用户进程在 IO 的两个阶段都不会被阻塞

总结

POSIX 对同步 IO 和异步 IO 的定义如下:

  • 同步 IO 操作会导致发起请求的进程被阻塞,直到 IO 操作完成。
  • 异步 IO 操作导致发起请求的进程被阻塞。

根据 PISIX 的定义,可以把 IO 模型分为以下两类:

最后,各个 IO 模型的比较如下所示:

Comparison IO Model

Epoll

Epoll 本质上是一种 IO 事件通知机制,是前文所述的在 Linux 中 IO 多路复用的一种实现。在本章中,我们将会简略介绍 Epoll 的原理,并使用 Epoll 实现一个简单的 echo server

在最后一章《异步运行时》中,我们也会使用 Epoll 作为基础来实现一个 ReactorReactor 的概念会在后面介绍)。

Epoll 介绍

Epoll 工作流程

Epoll 的大致工作流程如下所示:

  1. int epoll_create(int size)

内核会产生一个 Epoll 实例数据结构并返回一个文件描述符,这个特殊的描述符是 epoll 实例的句柄。

size 参数只是告诉内核 Epoll 处理的事件的大致数目,而不是能够处理的事件的最大个数。在 Linux 最新的一些内核版本中,size 参数没有任何意义。


  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

将被监听的描述符添加到红黑树或从红黑树中删除或者对监听事件进行修改。

op 参数用于说明操作类型:

  • EPOLL_CTL_ADD:添加一个需要监视的描述符
  • EPOLL_CTL_DEL:删除一个描述符
  • EPOLL_CTL_MOD:修改一个描述符

struct epoll_event 结构描述一个文件描述符 fdepoll 行为:

typedef union epoll_data {
    void *ptr; 				/* 指向用户自定义数据 */
    int fd; 				/* 注册的文件描述符 */
    uint32_t u32; 			/* 32-bit integer */
    uint64_t u64; 			/* 64-bit integer */
} epoll_data_t;

struct epoll_event {
    uint32_t events; 		/* 描述epoll事件 */
    epoll_data_t data; 		/* 见上面的结构体 */
};

常用的 epoll 事件如下所示:

  • EPOLLIN:描述符处于可读状态
  • EPOLLOUT:描述符处于可写状态
  • EPOLLET:将 epoll event 通知模式设置成 edge trigger
  • EPOLLONESHOT:第一次进行通知,之后不再监测
  • EPOLLHUP:本端描述符产生一个挂断事件,默认监测事件
  • EPOLLRDHUP:对端描述符产生一个挂断事件
  • EPOLLPRI:由带外数据触发
  • EPOLLERR:描述符产生错误时触发,默认检测事件

  1. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

阻塞等待注册的事件发生,返回触发的事件的数目,并将触发的事件写入 events 数组中。

maxevents 是返回的 event 的最大数量。events 数组的长度应该与 maxevents 一致。timeoutepoll_wait 调用阻塞的时间上限。

Epoll 触发机制

Epoll 监控多个文件描述符的 IO 事件,支持边缘触发(edge trigger,ET)和水平触发(level trigger,LT)。

水平触发

对于读操作,只要文件描述符的读缓冲区不为空,触发可读事件。

对于写操作,只要文件描述的写缓冲区不满,触发可写事件。

边缘触发

当文件描述符的缓冲区状态发生变化时触发。

对于读操作:

  • 当读缓冲区数据为空变为非空时,触发可读事件。
  • 当读缓冲区接收到新数据时,即读缓冲区待读数据变多时,触发可读事件。
  • 当读缓冲区有数据可读,且进程对相应的文件描述符进行 EPOLL_CTL_MOD 修改 EPOLLIN 事件时,触发可读事件。

对于写操作:

  • 当写缓冲区由不可写变为可写时,触发可写事件。
  • 当有旧数据被发送走,即读缓冲区中的内容变少的时候,触发可写事件。
  • 当写缓冲区有空间可写,且进程对相应的文件描述符进行 EPOLL_CTL_MOD 修改 EPOLLOUT 事件时,触发可写事件。

Epoll server example

在本节中,我们将会编写一个简单的 epoll server,来看一下 epoll 是如何工作的。libc crate 中提供了与 epoll 相关的系统调用,因此这个小项目需要添加 libc crate 依赖。

源代码的仓库地址:rust epoll example

epoll 调用宏

为了方便地调用 epoll 相关的 api,我们可以编写如下所示的宏:

#[macro_export]
macro_rules! syscall {
    ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
        let res = unsafe { libc::$fn($($arg, )*) };
        if res == -1 {
            Err(std::io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}

例如,现在我们可以这样调用 epoll_wait

syscall!(epoll_wait(
            epoll_fd,
            events.as_mut_ptr() as *mut libc::epoll_event,
            1024,
            1000
))

宏展开后的代码如下所示:

{
    let res = unsafe {
        libc::epoll_wait(
            epoll_fd,
            events.as_mut_ptr() as *mut libc::epoll_event,
            1024,
            1000
        )
    };
    if res == -1 {
        Err(std::io::Error::last_os_error())
    } else {
        Ok(res)
}

epoll 模块

接下来,我们将会利用 epoll 提供的 api 来编写 IO 事件的注册、修改等功能。本模块需要导入的项:

use std::io;
use std::os::unix::io::RawFd;

use crate::syscall;

创建 epoll 实例

/// 包装epoll_create,创建一个epoll实例
pub fn epoll_create() -> io::Result<RawFd> {
    // 创建一个epoll实例,返回epoll对象的文件描述符fd
    let fd = syscall!(epoll_create1(0))?;

    // fcntl(fd, libc::F_GETFD) 函数返回与 fd 关联的 close_on_exec 标志
    // close_on_exec 用于确定在系统调用 execve() 后是否需要关闭文件描述符
    if let Ok(flags) = syscall!(fcntl(fd, libc::F_GETFD)) {

        // 设置在系统调用 execve() 后关闭文件描述符 fd
        let _ = syscall!(fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC));
    }

    Ok(fd)
}

注册文件描述并监听事件

/// 包装 epoll_ctl,注册文件描述符和事件
pub fn add_interest(epoll_fd: RawFd, fd: RawFd, mut event: libc::epoll_event) -> io::Result<()> {
    // epoll_fd 是 epoll 实例的的文件描述符
    // fd 是要注册的目标文件描述符
    // event 是要在 fd 上监听的事件
    // libc::EPOLL_CTL_ADD 表示添加一个需要监视的文件描述符
    syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut event))?;

    Ok(())
}

修改注册的文件描述符

/// 包装 epoll_ctl,修改文件描述符
pub fn modify_interest(epoll_fd: RawFd, fd: RawFd, mut event: libc::epoll_event) -> io::Result<()> {
    // epoll_fd 是 epoll 实例的的文件描述符
    // fd 是要修改目标文件描述符
    // event 是要在 fd 上监听的事件
    // libc::EPOLL_CTL_MOD 表示修改文件描述符 fd
    syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut event))?;

    Ok(())
}

删除注册的文件描述符

/// 包装 epoll_ctl,删除文件描述符
pub fn remove_interest(epoll_fd: RawFd, fd: RawFd) -> io::Result<()> {
    // epoll_fd 是 epoll 实例的的文件描述符
    // fd 是要删除的目标文件描述符
    // libc::EPOLL_CTL_DEL 表示要删除文件描述符 fd
    syscall!(epoll_ctl(
        epoll_fd,
        libc::EPOLL_CTL_DEL,
        fd,
        std::ptr::null_mut() // 将监听的 event 设置为空
    ))?;

    Ok(())
}

关闭文件描述符

/// 关闭文件描述符 fd
pub fn close(fd: RawFd) {
    let _ = syscall!(close(fd));
}

创建一个读事件

const READ_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLIN;

/// 创建一个读事件
pub fn listener_read_event(key: u64) -> libc::epoll_event {
    // key 用于区分不同的文件描述符
    libc::epoll_event {
        events: READ_FLAGS as u32,
        u64: key,
    }
}

创建一个写事件

const WRITE_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLOUT;

/// 创建一个写事件
pub fn listener_write_event(key: u64) -> libc::epoll_event {
    // key 用于区分不同的文件描述符
    libc::epoll_event {
        events: WRITE_FLAGS as u32,
        u64: key,
    }
}

http 模块

http 模块中,我们将会编写处理 HTTP 请求相关的函数,需要导入的项:

use std::io;
use std::net::TcpStream;
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};

use crate::epoll::{
    close, listener_read_event, listener_write_event, modify_interest, remove_interest,
};

请求上下文

将与客户端建立的连接抽象成请求上下文:

/// 请求上下文,用于处理 HTTP 请求
#[derive(Debug)]
pub struct RequestContext {
    /// 与客户端建立的连接的 stream 流
    pub stream: TcpStream,
    /// 收到的 HTTP 请求的 content-length 的值
    pub content_length: usize,
    /// 收到的 HTTP 请求的数据写入的缓冲区
    pub buf: Vec<u8>,
}

接下来编写的函数,都是为 RequestContext 实现的方法。

创建请求上下文

pub fn new(stream: TcpStream) -> Self {
    Self {
        stream,
        buf: Vec::new(),
        content_length: 0,
    }
}

从 stream 流中读取数据

pub fn read_cb(&mut self, key: u64, epoll_fd: RawFd) -> io::Result<()> {
    let mut buf = [0u8; 4096];

    // 从 stream 流中读取数据写入到 buf 中
    match self.stream.read(&mut buf) {
        Ok(_) => {
            if let Ok(data) = std::str::from_utf8(&buf) {

                // 解析并且设置读取到的 HTTP 请求的 content-length 字段的值
                self.parse_and_set_content_length(data);
            }
        }
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
        Err(e) => {
            return Err(e);
        }
    };

    // 将读取的数据扩展到 RequestContext 的 buf 中
    self.buf.extend_from_slice(&buf);

    // 如果 buf 中的数据长度大于等于 content-length,说明从客户端发送的 HTTP 请求已经读取完毕
    if self.buf.len() >= self.content_length {
        println!("got all data: {} bytes", self.buf.len());

        // 将在 stream 上监听的事件修改为写事件
        modify_interest(epoll_fd, self.stream.as_raw_fd(), listener_write_event(key))?;
    } else {

        // 将在 stream 上监听的事件修改为读事件,继续读取剩下的 HTTP 请求
        modify_interest(epoll_fd, self.stream.as_raw_fd(), listener_read_event(key))?;
    }

    Ok(())
}

解析 HTTP 请求

/// 解析并且设置读取到的 HTTP 请求的 content-length 字段的值
pub fn parse_and_set_content_length(&mut self, data: &str) {
    if data.contains("HTTP") {
        if let Some(content_length) = data
            .lines()
            .find(|l| l.to_lowercase().starts_with("content-length: "))
        {
            if let Some(len) = content_length
                .to_lowercase()
                .strip_prefix("content-length: ")
            {
                self.content_length = len.parse::<usize>().expect("content-length is valid");
                println!("set content length: {} bytes", self.content_length);
            }
        }
    }
}

写入返回数据到 stream 流中

为了简单起见,我们直接返回一段固定的 HTTP 文本。

// 返回的响应为固定的 HTTP 文本
const HTTP_RESP: &[u8] = br#"HTTP/1.1 200 OK
content-type: text/html
content-length: 28

Hello! I am an epoll server."#;

/// 将要返回的 HTTP 数据写入到 stream 流中
pub fn write_cb(&mut self, key: u64, epoll_fd: RawFd) -> io::Result<()> {
    // 写入数据到 stream 流中
    match self.stream.write(HTTP_RESP) {
        Ok(_) => println!("answered from request {}", key),
        Err(e) => eprintln!("could not answer to request {}, {}", key, e),
    };

    // 关闭 stream 流
    self.stream.shutdown(std::net::Shutdown::Both)?;

    let fd = self.stream.as_raw_fd();
    // 移除在 epoll 中注册的文件描述符 fd
    remove_interest(epoll_fd, fd)?;

    // 关闭文件描述符 fd
    close(fd);

    Ok(())
}

main 模块

接下来,我们将会编写 server 的入口函数,主要的逻辑为:注册文件描述符 => 调用 epoll_wait 获取就绪的事件 => 根据不同的事件进行处理。

use std::collections::HashMap;
use std::io;
use std::net::TcpListener;
use std::os::unix::io::AsRawFd;

use rust_epoll_example::epoll::{add_interest, epoll_create, listener_read_event, modify_interest};
use rust_epoll_example::http::RequestContext;
use rust_epoll_example::syscall;

fn main() -> io::Result<()> {
    // 存储 RequestContext 实例,key 用来区分不同的 RequestContext
    let mut request_contexts: HashMap<u64, RequestContext> = HashMap::new();

    // 存储就绪的 event
    let mut events: Vec<libc::epoll_event> = Vec::with_capacity(1024);

    // key 对应 epoll_event 中的 u64 字段,用于区分文件描述、RequestContext
    let mut key = 100;

    // 创建一个 listener,并监听 8000 端口
    let listener = TcpListener::bind("127.0.0.1:8000")?;

    // 将 socket 设置为非阻塞
    listener.set_nonblocking(true)?;

    // 获取 listener 对应文件描述符
    let listener_fd = listener.as_raw_fd();

    // 创建 epoll 实例,返回 epoll 文件描述符
    let epoll_fd = epoll_create().expect("can create epoll queue");

    // 在 epoll 实例中注册 listener 文件描述符,并监听读事件
    // key 等于 100,对应 listener 文件描述符
    add_interest(epoll_fd, listener_fd, listener_read_event(key))?;

    loop {
        println!("requests in flight: {}", request_contexts.len());
        events.clear();

        // 将就绪的事件添加到 events vec 中,返回就绪的事件数量
        let res = match syscall!(epoll_wait(
            epoll_fd,
            events.as_mut_ptr() as *mut libc::epoll_event,
            1024,
            1000,
        )) {
            Ok(v) => v,
            Err(e) => panic!("error during epoll wait: {}", e),
        };

        // safe  as long as the kernel does nothing wrong - copied from mio
        // 根据就绪的事件数量设置 events vec 的长度
        unsafe { events.set_len(res as usize) };

        // 遍历处理就绪的事件
        for ev in &events {
            match ev.u64 {
                // key = 100 说明是在 listener fd 上监听的读事件就绪了
                100 => {
                    match listener.accept() {

                        // stream 是与客户端建立的连接的 stream 流
                        Ok((stream, addr)) => {
                            // 设置为非阻塞
                            stream.set_nonblocking(true)?;

                            // 有一个新的连接来了
                            println!("new client: {}", addr);
                            key += 1;

                            // 在 epoll 中注册 stream 文件描述符,并监听读事件
                            add_interest(epoll_fd, stream.as_raw_fd(), listener_read_event(key))?;

                            // 创建一个 RequestContext,并保存到 request_contexts 中
                            request_contexts.insert(key, RequestContext::new(stream));
                            // 上面使用的 key,用来区分不同的文件描述符和 RequestContext
                        }
                        Err(e) => eprintln!("couldn't accept: {}", e),
                    };

                    // 修改在 listener fd 上监听的的事件为读事件(继续等待新的连接到来)
                    modify_interest(epoll_fd, listener_fd, listener_read_event(100))?;
                }
                // key != 100,说明是其他的 fd 上监听的事件就绪了
                key => {
                    let mut to_delete = None;

                    // 获取这个 key 对应的 RequestContext
                    if let Some(context) = request_contexts.get_mut(&key) {

                        let events: u32 = ev.events;

                        // 匹配就绪的事件是读事件还是写事件
                        match events {

                            // 读事件就绪
                            v if v as i32 & libc::EPOLLIN == libc::EPOLLIN => {

                                // 读取请求数据
                                context.read_cb(key, epoll_fd)?;
                            }
                            // 写事件就绪
                            v if v as i32 & libc::EPOLLOUT == libc::EPOLLOUT => {

                                // 写入返回数据
                                context.write_cb(key, epoll_fd)?;

                                // 返回数据后,就删除对应的 RequestContext,
                                // 当客户端再次发起请求时会建立新的连接,创建新的 RequestContext
                                to_delete = Some(key);
                            }
                            v => println!("unexpected events: {}", v),
                        };
                    }

                    // 写事件处理完毕,删除对应的 RequestContext
                    if let Some(key) = to_delete {
                        request_contexts.remove(&key);
                    }
                }
            }
        }
    }
}

HTTP 协议是无状态的,我们在完整处理一次请求后就删除对应的请求上下文,当客户端再次发起请求时会建立新的连接,创建新的请求上下文。

至此,epoll server 编写完毕。

运行 server

使用 cargo run 启动 server,然后这个 server 会监听地址:http://127.0.0.1:8000

为了测试 server,编写一个 Python 小脚本,使用多线程循环发送 HTTP 请求:

import requests

from threading import Thread

with open('image.jpeg', 'rb') as f:
    FILE = f.read()


# send request to http://127.0.0.1:8000
def send_request(host, port):
    for _ in range(100):
        r = requests.post(f"http://{host}:{port}", data={'file': FILE})
        print(f"Receive response: '{r.text}' from {r.url}")


if __name__ == '__main__':
    t_lst = []
    for _ in range(4):
        t = Thread(target=send_request, args=('127.0.0.1', 8000))
        t_lst.append(t)
        t.start()

    for t in t_lst:
        t.join()

client 端的输出:

.....
.....
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/
Receive response: 'Hello! I am an epoll server.' from http://127.0.0.1:8000/

server 端的输出:

.....
.....
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
requests in flight: 3
got all data: 9379840 bytes
requests in flight: 3
answered from request 195

正如我们所看到的那样,server 在同时处理多个请求!

异步运行时

在前面的章节中,我们讲到过异步运行时负责调度执行使用者创建的 Future,那么异步运行时到底是如何工作的呢?在本章中,我们将会实现一个简单的单线程异步运行时,提供异步的网络IO读写操作,以探讨运行时的具体工作机制。

本章节的源代码仓库地址:async-runtime

在正式开始之前,我们首先明确一下即将实现的运行时的工作原理:

  1. 用户使用 async fn 或者 async {} 的方式创建 Non-Leaf Future,然后使用 spawn 方法创建一个异步 task,并将这个 task 发送到 executor 的任务队列中。

  2. executortask_queue 中取出 task,调用taskpoll 方法,驱动 Non-Leaf Future 开始执行(如果已经开始执行了,则从上次的 await 断点处继续执行),就这样一直执行 Future 中的代码,直到遇到 Leaf Future.await

  3. 调用 Leaf Futurepoll 方法,如果 Leaf Future 对应的IO事件已经就绪,则直接返回 Poll::Ready(data);如果对应的IO事件没有就绪,则调用 Reactorregister 方法注册等待的IO事件和 waker,然后 Poll::PendingNon-Leaf Future 将会被挂起),executor 可以继续执行其他的 task

  4. Reactor 会把注册的文件描述符 fdwaker 保存在BTreeMap<fd, waker> 中,然后调用 Epoll 提供的方法注册在 fd 上想要等待的 eventEpoll 系统中。

  5. Reactor 调用 Epoll 提供的 wait 方法获取所有就绪的文件描述符 fds,然后遍历 fds,通过 fd 匹配之前在 BTreeMap 中存储的 waker,然后调用 wakerwake 方法把 task 发送到 executor 的执行队列中,这样之前挂起的 Non-Leaf Future 就能够继续执行了。

通过上面的原理讲解我们可以知道,异步代码之所以高效的原因就是避免了IO对线程的阻塞:

  • 当执行一个 task 时,如果遇到了没有就绪的 IO 操作,就注册 wakerReactor 中,然后挂起这个 taskexecutor 就可以继续执行其他的 task

  • task 等待的 IO 事件就绪时,Reactor 就会通过 waker 唤醒关联的 task,然后就可以执行之前挂起的 task 了。

epoll

就像 Epoll servere example 一节中那样,为了方便地调用 libc 提供的 api,我们先创建一个 syscall 宏:

macro_rules! syscall {
    ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
        let res = unsafe { libc::$fn($($arg, )*) };
        if res == -1 {
            Err(io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}

Epoll

抽象出 EpollEpollEventType 类型:

pub(crate) struct Epoll {
    fd: RawFd,
}

pub(crate) enum EpollEventType {
    // Only event types used in this example
    In,
    Out,
}

RawFd 表示原始文件描述符。

方法实现

new

创建一个 Epoll 实例:

pub(crate) fn new() -> io::Result<Self> {
    let fd = syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?;
    Ok(Epoll { fd })
}

添加事件/修改事件

fn run_ctl(&self, epoll_ctl: libc::c_int, fd: RawFd, op: EpollEventType) -> io::Result<()> {
    let mut event: libc::epoll_event = unsafe { mem::zeroed() };
    event.u64 = fd as u64;
    event.events = match op {
        EpollEventType::In => libc::EPOLLIN as u32,
        EpollEventType::Out => libc::EPOLLOUT as u32,
    };

    let event_p: *mut _ = &mut event as *mut _;
    syscall!(epoll_ctl(self.fd, epoll_ctl, fd, event_p))?;

    Ok(())
}

pub(crate) fn add_event(&self, fd: RawFd, op: EpollEventType) -> io::Result<()> {
    self.run_ctl(libc::EPOLL_CTL_ADD, fd, op)
}

#[allow(dead_code)]
pub(crate) fn mod_event(&self, fd: RawFd, op: EpollEventType) -> io::Result<()> {
    self.run_ctl(libc::EPOLL_CTL_MOD, fd, op)
}

add_eventmod_event 都是通过调用run_ctl 方法实现的。在 run_ctl 方法中根据 op 类型设置要注册/修改的事件类型,然后调用 epoll_ctl 方法来注册/修改事件。

删除事件

pub(crate) fn del_event(&self, fd: RawFd) -> io::Result<()> {
    syscall!(epoll_ctl(
        self.fd,
        libc::EPOLL_CTL_DEL,
        fd,
        std::ptr::null_mut() as *mut libc::epoll_event
    ))?;

    Ok(())
}

删除在 Epoll 实例中注册描述符 fd

等待就绪事件

pub(crate) fn wait(&self, events: &mut [libc::epoll_event]) -> io::Result<usize> {
    let nfd = syscall!(epoll_wait(
        self.fd,
        events.as_mut_ptr(),
        events.len() as i32,
        -1
    ))?;

    Ok(nfd as usize)
}

调用 epoll_wait 函数获取所有就绪的文件描述符,并将就绪的描述符存放到 events 中,最后返回就绪的描述符数量。

关闭Epoll

Epoll 实现 Drop trait,在清理 Epoll 时关闭 Epoll 的文件描述符:

impl Drop for Epoll {
    fn drop(&mut self) {
        syscall!(close(self.fd)).ok();
    }
}

reactor

Reactor

pub(crate) struct Reactor {
    pub epoll: Epoll,
    pub wakers: Mutex<BTreeMap<RawFd, Waker>>,
}

字段 epoll 存储创建的 Epoll 实例,wakers 存储等待的IO事件的文件描述符和对应的 waker

我们稍后将会创建 Epoll 的静态变量,为了内部可变性,就把 BTreeMap<RawFd, Waker> 包在 Mutex 中。

添加事件

impl Reactor {
    pub(crate) fn add_event(&self, fd: RawFd, op: EpollEventType, waker: Waker) -> io::Result<()> {
        info!("(Reactor) add event: {}", fd);
        self.epoll.add_event(fd, op)?;
        self.wakers.lock().unwrap().insert(fd, waker);
        Ok(())
    }
}

Reactor 的添加事件的方法中,首先调用 epolladd_event 方法注册文件描述符和监听的事件,然后把描述符和对应的 waker 存储在 BTreeMap<RawFd, Waker> 中。

reactor 循环

fn reactor_main_loop() -> io::Result<()> {
    info!("Start reactor main loop");
    let max_event = 32;
    let event: libc::epoll_event = unsafe { mem::zeroed() };
    let mut events = vec![event; max_event];
    let reactor = &REACTOR;

    loop {
        let nfd = reactor.epoll.wait(&mut events)?;
        info!("(Reactor) wake up. nfd = {}", nfd);

        #[allow(clippy::needless_range_loop)]
        for i in 0..nfd {
            let fd = events[i].u64 as RawFd;
            if let Some(waker) = reactor.wakers.lock().unwrap().remove(&fd) {
                info!("(Reactor) delete event: {}", fd);
                reactor.epoll.del_event(fd)?;
                waker.wake();
            }
        }
    }
}

reactor_main_loop 函数中,我们使用一个 loop 循环,在循环中调用 epollwait 方法获取所有就绪的 IO 事件的文件描述符,如果没有事件就绪,wait 方法就会阻塞 reactor 线程,避免 CPU 空转。

然后遍历就绪的描述符,从 wakers 中获取描述符对应的 waker,之后调用 epolldelete_event 方法删除描述符,表示这个事件已经处理完毕。

最后,调用 wakerwake 方法,把因为等待IO事件而挂起的 task 发送到 executor 的执行队列中。

REACTOR 静态变量

lazy_static! {
    pub(crate) static ref REACTOR: Reactor = {
        // Start reactor main loop
        std::thread::spawn(move || {
            reactor_main_loop()
        });

        Reactor {
            epoll: Epoll::new().expect("failed to create epoll"),
            wakers: Mutex::new(BTreeMap::new())
        }
    };
}

Executor 在主线程运行,负责调度执行 task,而 reactor_main_loop 内部使用一个无线循环不断地获取就绪的 fd,并唤醒挂起的 task。为了避免 reactor_main_loop 阻塞 Executor,我们就开一个线程去执行 reactor_main_loop

之所以把 REACTOR 创建成全局静态变量,是为了在其他的模块中方便地调用 REACTOR 的方法。

async_io

async_io 模块中,我们将会创建 Leaf Future,异步化网络IO的监听和读写操作。

Ipv4Addr

pub struct Ipv4Addr(libc::in_addr);

impl Ipv4Addr {
    pub fn new(a: u8, b: u8, c: u8, d: u8) -> Self {
        Ipv4Addr(libc::in_addr {
            s_addr: ((u32::from(a) << 24)
                | (u32::from(b) << 16)
                | (u32::from(c) << 8)
                | u32::from(d))
            .to_be(),
        })
    }
}

Ipv4Addr 就是 IPv4 地址,new 方法负责创建一个 Ipv4Addr 类型。

TcpListener

pub struct TcpListener(RawFd);

impl TcpListener {
    // NOTE: bind() may be block. So this should be an async function in reality.
    pub fn bind(addr: Ipv4Addr, port: u16) -> io::Result<TcpListener> {
        let backlog = 128;
        let sock = syscall!(socket(
            libc::PF_INET,
            libc::SOCK_STREAM | libc::SOCK_CLOEXEC,
            0
        ))?;
        let opt: i32 = 1;
        syscall!(setsockopt(
            sock,
            libc::SOL_SOCKET,
            libc::SO_REUSEADDR,
            &opt as *const _ as *const libc::c_void,
            std::mem::size_of_val(&opt) as u32
        ))?;

        let sin: libc::sockaddr_in = libc::sockaddr_in {
            sin_family: libc::AF_INET as libc::sa_family_t,
            sin_port: port.to_be(),
            sin_addr: addr.0,
            ..unsafe { mem::zeroed() }
        };
        let addr_p: *const libc::sockaddr = &sin as *const _ as *const _;
        let len = mem::size_of_val(&sin) as libc::socklen_t;

        syscall!(bind(sock, addr_p, len))?;
        syscall!(listen(sock, backlog))?;

        info!("(TcpListener) listen: {}", sock);
        let listener = TcpListener(sock);
        listener.nonblocking()?;
        Ok(listener)
    }

    pub(crate) fn accept(&self) -> io::Result<TcpStream> {
        let mut sin_client: libc::sockaddr_in = unsafe { mem::zeroed() };
        let addr_p: *mut libc::sockaddr = &mut sin_client as *mut _ as *mut _;
        let mut len: libc::socklen_t = unsafe { mem::zeroed() };
        let len_p: *mut _ = &mut len as *mut _;
        let sock_client = syscall!(accept(self.0, addr_p, len_p))?;
        info!("(TcpStream)  accept: {}", sock_client);
        Ok(TcpStream(sock_client))
    }

    pub fn incoming(&self) -> Incoming<'_> {
        Incoming(self)
    }

    fn nonblocking(&self) -> io::Result<()> {
        let flag = syscall!(fcntl(self.0, libc::F_GETFL, 0))?;
        syscall!(fcntl(self.0, libc::F_SETFL, flag | libc::O_NONBLOCK))?;
        Ok(())
    }
}

impl Drop for TcpListener {
    fn drop(&mut self) {
        info!("(TcpListener) close : {}", self.0);
        syscall!(close(self.0)).ok();
    }
}


pub struct Incoming<'a>(&'a TcpListener);

impl<'a> Incoming<'a> {
    pub fn next(&self) -> AcceptFuture<'a> {
        AcceptFuture(self.0)
    }
}

bind 方法负责绑定传入的的 IpV4 地址和端口号,创建一个 TcpListener 实例,需要注意的是要把 TcpListener 设置为非阻塞:listener.nonblocking(),这样在调用 accept 方法接收客户端连接时才不会阻塞。

accept 方法负责接收到来的客户端连接,然后创建 TcpStream,如果没有连接到来就返回一个 io error

nonblocking 方法调用 libc::fcntl 函数把 TcpListener 设置为非阻塞。

incoming 方法把 TcpListener 的引用包在 Incoming 中,然后返回一个 Incoming 的实例。

Incoming 表示 TcpListener 接收连接的流式处理,每当我们想要接收一个新的连接时,就调用 next 方法返回一个 AcceptFuture(后面会讲这个)。

TcpStream

pub struct TcpStream(RawFd);

impl TcpStream {
    fn nonblocking(&self) -> io::Result<()> {
        let flag = syscall!(fcntl(self.0, libc::F_GETFL, 0))?;
        syscall!(fcntl(self.0, libc::F_SETFL, flag | libc::O_NONBLOCK))?;
        Ok(())
    }

    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a> {
        ReadFuture(self, buf)
    }

    pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a> {
        WriteFuture(self, buf)
    }

    pub fn raw_fd(&self) -> RawFd {
        self.0
    }
}

impl Drop for TcpStream {
    fn drop(&mut self) {
        info!("(TcpStream)  close : {}", self.0);
        syscall!(close(self.0)).ok();
    }
}

nonblocking 方法调用 libc::fcntl 函数把 TcpStream 设置为非阻塞。

read/write 方法分别返回 RreadFture/WriteFuture,和上面的 AcceptFuture 一样,我们将会在下面讲解这些 Future 的定义和作用。

Leaf Future

pub struct AcceptFuture<'a>(&'a TcpListener);
pub struct ReadFuture<'a>(&'a TcpStream, &'a mut [u8]);
pub struct WriteFuture<'a>(&'a TcpStream, &'a [u8]);

impl<'a> Future for AcceptFuture<'a> {
    type Output = Option<io::Result<TcpStream>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.0.accept() {
            Ok(stream) => {
                stream.nonblocking()?;
                Poll::Ready(Some(Ok(stream)))
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                REACTOR.add_event((self.0).0, EpollEventType::In, cx.waker().clone())?;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Some(Err(e))),
        }
    }
}

impl<'a> Future for ReadFuture<'a> {
    type Output = io::Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let res = syscall!(read(
            (self.0).0,
            self.1.as_mut_ptr() as *mut libc::c_void,
            self.1.len()
        ));
        match res {
            Ok(n) => Poll::Ready(Ok(n as usize)),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                REACTOR.add_event((self.0).0, EpollEventType::In, cx.waker().clone())?;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}

impl<'a> Future for WriteFuture<'a> {
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let res = syscall!(write(
            (self.0).0,
            self.1.as_ptr() as *mut libc::c_void,
            self.1.len()
        ));
        match res {
            Ok(n) => Poll::Ready(Ok(n as usize)),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                REACTOR.add_event((self.0).0, EpollEventType::Out, cx.waker().clone())?;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}

在同步的处理方式中,监听 TcpListener 和读写 TcpStream 是阻塞式的,即会阻塞线程直到相应的 IO 事件发生;

而在异步的处理方式中,监听 TcpListener 和读写 TcpStream 不会阻塞掉线程,而是会返回一个对应的 Leaf Future

  • Incomingnext 方法会返回一个 AcceptFuture
  • TcpStreamread/write 方法分别返回 ReadFuture/WriteFuture

AcceptFuture/ReadFuture/WriteFuture 实现 Future trait,这样它们就有了 poll 方法。上述三个 Futurepoll 方法的执行流程时类似的,因此下面我们只讲解 ReadFutrue 的执行流程。

当调用 ReadFutrue.await 时,会调用 ReadFutruepoll 方法,在 poll 方法内部:

  • 调用 libc::read 函数从 TcpStream 中读取数据,返回 res

  • 匹配 res 的值:

    • 如果是 Ok(n),则读取到了数据,此时直接返回 Poll::Ready(OK(n)),调用方继续执行 ReadFutrue.await 下面的代码。

    • 如果是 Err(e),并且 e.kind() == io::ErrorKind::WouldBlock,则说明 TcpStream 中没有数据可读,这时就调用 REACTORadd_event 方法注册文件描述符(关联读事件)和 waker。最后返回 Poll::Pending,调用方接收到 Poll::Pending 后就会调用 yield 表达式挂起当前的执行流(Task)。

    • 如果是其他的 Err(e),则说明读取数据发生了其他错误,此时返回 Poll::Ready(Err(e)) 表示读取失败,调用方继续执行 ReadFutrue.await 下面的代码。

当注册到 REACTOR 中的事件就绪时,REACTOR 就会使用注册的 waker 唤醒挂起的 Task,继续调用 ReadFutruepoll 方法,重复上述的执行流程。

task

Task 是对 async fn 或者 async {} 创建的 Non-Leaf Future 的抽象,一个 task 就代表一个异步执行的任务:

#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub(crate) struct TaskId(u64);

impl TaskId {
    pub(crate) fn new() -> Self {
        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
        TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
    }
}

pub(crate) struct Task {
    id: TaskId,
    future: Mutex<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
    task_sender: Sender<Arc<Task>>,
}

Task 中有三个字段:

  • id:每个 task 都有一个唯一的 TaskIdTaskId 是有可能在不同的线程中创建的,因此使用原子类型 AtomicU64 来创建 TaskId 的实例,保证唯一性。

  • future:对用户创建的 Non-Leaf Future 的包装,使用 Pin 的目的是为了安全地使用自引用结构(Future 生成的状态机中可能存在自引用结构),使用 Mutex 的目的稍后讲解。

  • task_sender:一个 channel 的发送端,发送的 item 是 Arc<Task>,之所以使用 Arc<Task> 一方面是想要减小克隆 Task 的开销,另一方面与 Waker 的实现机制有关(稍后讲解)。

方法实现

impl Task {
    pub(crate) fn new(
        future: impl Future<Output = ()> + 'static + Send,
        task_sender: Sender<Arc<Task>>,
    ) -> Self {
        Task {
            id: TaskId::new(),
            future: Mutex::new(Box::pin(future)),
            task_sender,
        }
    }

    pub(crate) fn task_id(&self) -> TaskId {
        self.id
    }

    pub(crate) fn poll(&self, context: &mut Context) -> Poll<()> {
        self.future
            .lock()
            .expect("get lock failed")
            .as_mut()
            .poll(context)
    }
}

new 方法中传入参数 futuretask_sender 后创建一个 Task 实例:

  • 参数 Future 要求满足 'static 生命周期是因为 task 的存在时间可能是任意长的,因此需要 Future 具有静态生命周期。
  • 要求 Future 满足 Send 是因为 Task 需要跨线程发送。
  • 由于 Future 最终使用 Mutex 包了起来,因此 future 字段最终同时满足 Send + Sync + 'static
  • Task 的其他两个字段也满足 Send + Sync + 'static ,因此 Task 满足 Send + Sync + 'static

由于 task_sender 发送的 item 是 Arc<Task>executor 的执行队列中收到的也是 Arc<Task>,因此 poll 方法的定义中只能使用 &self 不变引用。

又因为 Futurepoll 方法调用需要可变引用,为了实现内部可变性,我们就用 MutexPin<Box<Future>> 包了起来,这就是使用 Mutex 的原因。

poll 方法中,首先调用 self.future.lock() 获取锁,然后将调用 .as_mut() 方法获取 Pin<&mut dyn Future>,最后再调用 Future 中的 poll 方法执行 Future

实现 Wake trait

Task 实现 Wake trait,这样就可以通过 Task 来构建一个 Waker

impl Wake for Task {
    fn wake(self: Arc<Self>) {
        self.task_sender
            .send(self.clone())
            .expect("send task failed");
    }

    fn wake_by_ref(self: &Arc<Self>) {
        self.task_sender
            .send(self.clone())
            .expect("send task failed");
    }
}

Wake 中的 wake/wake_by_ref 方法实现就是具体的唤醒 task 的机制,在这个实现中,我们把想要唤醒的 task 通过 task_sender 发送到 executor 的执行队列中,这样 executor 就可以执行这个 task 了,这也是在 Task 定义中,需要 task_sender 字段的原因。

此外,wake/wake_by_ref 方法中都需要 Arc<Task>,这是 task_sender 的 item 类型为 Arc<Task> 的原因之一。

构造 Waker

对于实现了 Wake traitTask,可以使用 std::task::Wakerfrom 方法构造一个 Waker

impl<W: Wake + Send + Sync + 'static> From<Arc<W>> for Waker {
    fn from(waker: Arc<W>) -> Waker {}
}

通过前面的分析,我们知道 Task 已经同时满足 Wake + Send + Sync + 'static,因此可以安全地使用 from 方法构造一个 Waker

executor

Executor

pub struct Executor {
    task_queue: Receiver<Arc<Task>>,
    waker_cache: BTreeMap<TaskId, Waker>,
}

task_queue 是一个 channel 的接收端,当 spawn 或者 wake 一个 task 时,就会发送 Arc<Task>task_queue 中。

waker_cache 使用 BTreeMap 缓存可能会重复使用的 Waker,这是为了减小构造 Waker 的开销。

实际上,Executor 中的 task_queue 只是一个管道的接收端,并不是队列,只是我个人更习惯称之为队列。

方法实现

impl Executor {
    fn new(task_queue: Receiver<Arc<Task>>) -> Self {
        Self {
            task_queue,
            waker_cache: BTreeMap::new(),
        }
    }

    fn run_ready_task(&mut self) {
        while let Ok(task) = self.task_queue.recv() {
            let waker = self
                .waker_cache
                .entry(task.task_id())
                .or_insert_with(|| Waker::from(task.clone()));

            let mut context = Context::from_waker(waker);
            match task.poll(&mut context) {
                Poll::Ready(_) => {
                    self.waker_cache.remove(&task.task_id());
                }
                Poll::Pending => {}
            }
        }
    }

    pub fn run(&mut self) {
        self.run_ready_task();
    }
}

new 方法接收 Receiver<Arc<Task>> 参数,然后创建一个执行器实例。

run_ready_task 方法中:

  • task_queue 中接收 task: Arc<Task>,然后从 waker_cache 中查找是否存在对应的 waker,如果没有则构造一个 Waker

  • 使用 Contextfrom_waker 方法通过 waker 的引用创建 context

  • 调用 taskpoll 方法,传入 &mut context 参数,开始执行task

    • 如果返回的是 Poll::Ready,说明 task 执行完毕,从 waker_cache 中删除缓存的 waker
    • 如果返回的是 Poll::Pending,则什么都不做(最终执行的 Leaf-Future 中会注册等待的事件和 waker)。

Spawner

在初始状态下,executor 的执行队列中是空的,我们需要一种机制能够让用户手动地创建 task 并将 task 发送到 executor 的执行队列中,最后开启 executor 的执行。Spawner 抽象便提供了这种机制:

#[derive(Clone)]
pub struct Spawner {
    task_sender: Sender<Arc<Task>>,
}

Spawner 中的 task_senderTasktask_sender 一样,都是为了把 task 发送到 executor 的执行队列中。

方法实现

impl Spawner {
    fn new(task_sender: Sender<Arc<Task>>) -> Self {
        Self { task_sender }
    }

    pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let task = Task::new(future, self.task_sender.clone());
        self.task_sender
            .send(Arc::new(task))
            .expect("send task failed");
    }
}

new 方法接收Sender<Arc<Task>> 参数,然后创建一个 Spawner 实例。

spawn 方法中,使用传入的 future 参数创建一个 Task 实例,然后把这个 task 发送到 executor 的执行队列中。当 executor 开始执行的时候就可以从队列中接收 task,驱动 task 的执行了。

创建 Spawner & Executor

定义一个公开的函数,创建 SpawnerExecutor 实例:

pub fn spawner_and_executor() -> (Spawner, Executor) {
    let (task_sender, task_queue) = bounded(10000);
    let spawner = Spawner::new(task_sender);
    let executor = Executor::new(task_queue);
    (spawner, executor)
}

spawner_and_executor 函数中,我们使用 crossbeam-channel 提供的 unbounded 函数创建一个容量为 10000 的管道,分别返回管道的发送端和接收端,然后创建 SpawnerExecutor 实例并返回。

example

在这一节中,我们将使用之前实现的异步运行时创建一个 tcp echo server。需要导入的模块如下所示:

use std::env;
use std::io::Write;

use log::info;

use async_runtime::async_io::{Ipv4Addr, TcpListener, TcpStream};
use async_runtime::executor::{spawner_and_executor, Spawner};

日志打印

fn init_log() {
    // format = [file:line] msg
    env::set_var("RUST_LOG", "info");
    env_logger::Builder::from_default_env()
        .format(|buf, record| {
            writeln!(
                buf,
                "[{}:{:>3}] {}",
                record.file().unwrap_or("unknown"),
                record.line().unwrap_or(0),
                record.args(),
            )
        })
        .init();
}

init_log 函数是为了设置日志打印的消息格式,这跟异步运行时的使用没啥关系,这里就不再赘述了。

handle_client

async fn handle_client(stream: TcpStream) {
    let mut buf = [0u8; 1024];
    info!("(handle client) {}", stream.raw_fd());
    loop {
        let n = stream.read(&mut buf).await.unwrap();
        if n == 0 {
            break;
        }
        stream.write(&buf[..n]).await.unwrap();
    }
}

handle_client 函数中,我们首先创建一个 buf 数组,然后打印一个 handle client 的日志消息,接着开启一个无限循环:

  • 调用 stream.read(&mut buf) 方法后会返回一个 ReadFuture,在 ReadFuture 上调用 await 方法:

    • ReadFuture.await 会展开成一个无限循环,在循环内部会调用 ReadFuturepoll 方法。

    • 如果返回 Poll::Pending,则使用 Yield 表达式挂起当前的 task

    • 如果返回 poll::Ready 则中断循环并返回结果。

  • 如果 n== 0,则说明客户端已经断开了连接,则退出循环。

  • 调用 stream.write(&mut buf[.n]) 会返回一个 WriteFuture,在 WriteFuture 上调用 await 方法后执行流程与 ReadFuture 一致。

server_loop

async fn server_loop(spawner: Spawner) {
    let addr = Ipv4Addr::new(127, 0, 0, 1);
    let port = 8080;
    let listener = TcpListener::bind(addr, port).unwrap();

    let incoming = listener.incoming();

    while let Some(stream) = incoming.next().await {
        let stream = stream.unwrap();
        spawner.spawn(handle_client(stream));
    }
}

server_loop 函数中,我们调用 TcpListenerbind 方法创建一个 TcpListener 实例,然后调用 incoming 方法创建一个 Incoming 实例。

接着,在 While let 循环中,调用 incomingnext 方法返回一个 AcceptFuture 实例,在 AcceptFuture 上调用 await 方法后,如果返回的 Poll::Pending,则挂起当前的 task

当有客户端连接到来时,AcceptFuture 等待的 IO 事件就绪,会返回 io::Readult<TcpStream> 的实例并绑定到 stream 变量上,接着使用 spawner 调用 spawn 方法创建一个 task 处理与客户端的交互。

最后,又进入循环的开始位置,继续等待新的连接到来。

main 函数

fn main() {
    init_log();

    let (spawner, mut executor) = spawner_and_executor();

    spawner.spawn(server_loop(spawner.clone()));

    executor.run();
}

main 函数中,我们首先调用 init_log 函数设置日志消息格式,接着使用 spawner_and_executor 函数创建 SpawnerExecutor 的实例。

然后调用 spawner.spawn 方法创建一个 task 用于执行 server_loop

最后调用 executor.run() 方法开启 executor 的运行,开始调度执行各个 task

运行示例

开启运行后的 echo server 会监听地址:127.0.0.1:8080

cargo run --example echo_server
    Finished dev [unoptimized + debuginfo] target(s) in 0.69s
     Running `target/debug/examples/echo_server`
[src/async_io.rs: 56] (TcpListener) listen: 3
[src/reactor.rs: 27] (Reactor) add event: 3
[src/reactor.rs: 35] Start reactor main loop

使用 Python 写一个小脚本模拟 TCP 客户端:

import socket
import threading

HOST = '127.0.0.1'
PORT = 8080


def send_request():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    for i in range(1, 1025):
        s.send(f"HELLO WORLD[{i}]".encode())
        data = s.recv(1024).decode()
        print(f"RECEIVE DATA: '{data}' in THREAD[{threading.currentThread().name}]")
    s.close()


def main():
    t_lst = []
    for _ in range(10):
        t = threading.Thread(target=send_request)
        t_lst.append(t)
        t.start()

    for t in t_lst:
        t.join()


if __name__ == '__main__':
    main()

运行脚本,服务端会输出以下内容:

.....
.....
.....
[src/reactor.rs: 43] (Reactor) wake up. nfd = 2
[src/reactor.rs: 49] (Reactor) delete event: 6
[src/reactor.rs: 49] (Reactor) delete event: 7
[src/reactor.rs: 43] (Reactor) wake up. nfd = 3
[src/reactor.rs: 49] (Reactor) delete event: 9
[src/reactor.rs: 27] (Reactor) add event: 6
[src/reactor.rs: 49] (Reactor) delete event: 10
[src/reactor.rs: 49] (Reactor) delete event: 11
[src/reactor.rs: 43] (Reactor) wake up. nfd = 2

客户端的输出内容如下所示:

.....
.....
.....
RECEIVE DATA: 'HELLO WORLD[1022]' in THREAD[Thread-3]
RECEIVE DATA: 'HELLO WORLD[1015]' in THREAD[Thread-7]
RECEIVE DATA: 'HELLO WORLD[1013]' in THREAD[Thread-6]
RECEIVE DATA: 'HELLO WORLD[1021]' in THREAD[Thread-1]
RECEIVE DATA: 'HELLO WORLD[1023]' in THREAD[Thread-3]
RECEIVE DATA: 'HELLO WORLD[1008]' in THREAD[Thread-10]
RECEIVE DATA: 'HELLO WORLD[1014]' in THREAD[Thread-6]
RECEIVE DATA: 'HELLO WORLD[1016]' in THREAD[Thread-7]

可以看出,我们的 echo server 正确地返回了响应,wake up. nfd = 3 表示有3个事件同时就绪,这说明 server 确实在并发地处理多个请求!