Featured image of post 基于Future的Rust操作系统内核异步编程

基于Future的Rust操作系统内核异步编程

基于async_task库解析Rust操作系统内核异步编程细节

基于Future的Rust操作系统内核异步编程

2024/11/14 by crpboy

前言

本文是有关使用async_task进行操作系统内核异步编程的解析, 不涉及tokio运行时的解析.

建议先阅读官方文档了解基础的异步编程语法.

rust使用asyncawait进行异步编程, 其底层实现为Future特性, 具体语法不再赘述.

可以参考官方文档. Rust语言圣经 Rust 中的异步编程.

async

在rust当中, async是使用无栈协程机制 (关于无栈协程, 可以看这个视频) 实现的, 其内部是一个状态机, 下面我将通过一个例子来说明.

观察下面这个函数, 其中func_1/2/3都是异步函数, 可能无法立即得到返回.

1
2
3
4
5
6
7
8
9
async fn func_main() {
  println!("process 1");
  func_1().await;
  println!("process 2");
  func_2().await;
  println!("process 3");
  func_3().await;
  println!("process 4");
}

异步函数可以被转化为一个状态机模型. 本例中, 异步函数被三个.await语句拆分成为了四个部分, 每个.await都有 完成 / 等待 两种执行结果, 将它上下的执行部分通过 完成 / 等待 这两条状态转移边连接起来.

画个图就很清楚了.

future_fsm.drawio

假如子函数func_n能够执行完毕, 那就推动当前状态机进度, 从process_n走到process_n+1, 否则当前函数就阻塞在了func_n, 我们可以考虑让权, 等func_n满足恢复执行的条件了, 再去尝试推动函数进展. 关于让权和恢复的过程, 我会在后续的执行器部分详细说明.

Future

事实上, async fnFuture的一个语法糖, 它上将原函数包装为了一个带有Future特性的匿名类.

为了深入了解, 我们得来看一下Future的具体定义:

1
2
3
4
5
6
7
8
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future的关键是poll函数, 它会通过返回Poll::Ready(T)Poll::Pending向上级函数反馈当前Future的执行情况信息. 而关于Future.poll的具体用法, 大部分的教材都是这么说的:

你可以通过对带有Future特性的类调用poll来尝试"推动它的进展"

什么叫"推动进展"? 其实就是在Future对应的状态机上, 尝试从上一个状态转移到下一个状态.

  • 如果子函数 (严格来说是子Future, 下略) 返回了Ready, 当前函数会流畅地继续执行下去.

  • 如果子函数返回了Pending, 那么当前函数调用链都会逐级返回Pending, 因此整条函数调用链被阻塞, 一般会在这里选择让权等待.

一个异步函数内的.await调用相当于对于子函数进行一次poll的调用, 假如子函数阻塞在了Pending状态, 那么后续将通过执行器重新进行poll调用直到返回Ready为止.

Waker

Waker与Context的关系

在上文当中我刻意省略了有关poll中的Context参数的内容. 目前Context当中只有Waker被真正使用, 所以可以暂时认为ContextWaker是等价的. 官方也在文档里说明了这一点.

现在让我们回过头来重新看一下poll函数:

1
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

这个传入的cx就是用于传递Waker信息的, 我们可以通过cx.waker()将内部的Waker取出进行调用.

那么Waker到底是用来干什么的呢? 很多教材都会说这是个"唤醒器". 但什么时候需要唤醒? 唤醒的对象是谁? 什么时候需要保存唤醒器? 唤醒信号是如何通知的? 唤醒的具体过程又是如何? 一概不知.

问题太多, 所以我们先按下不表, 必须等完整的了解WakerExecutor的实现之后, 才能够解答这个问题.

Waker的实现

接下来让我们来看看Waker的定义:

1
2
3
4
5
6
7
8
pub struct Waker {
    waker: RawWaker,
    // ignore phantom data
}
pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

可以发现Waker的内部实现为RawWaker, RawWaker内部保存了datavtable.

data用于保存Waker对应的任务的相关数据, 在async_task当中, 它指向了RawTask这个结构体, 其中保存了一个任务的所有信息, 可以用于后续状态的恢复.

vtable保存了一个&'static RawWakerVTable的指针, 我们来看看内部实现:

1
2
3
4
5
6
pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

可以看到, vtable内部保存着四个函数指针, 对应了waker的四个基本函数. 但这四个函数的具体实现取决于waker对应的运行时环境. 在调用Waker.wake_by_ref()等函数的时候, 就会从vtable当中取出对应的函数入口进行执行.

我们以async_task库为例, 在async_task当中, 会在创建一个Task的时候, 在内部创建一个链接着它自定义实现的上述四个函数入口的vtable, 并在创建waker的时候, 将vtable传递给waker.

这样Waker就可以通过调用cx.waker().wake_by_ref()等函数, 使用async_task自定义的方法来进行唤醒了.

至此我们也可以知道前文当中, 为什么关于Waker会产生这么多的疑惑: 因为Waker的内部函数实现取决于它使用的运行时库的实现, 不同的库的调度策略并不一致. 所以, 接下来我将对于async_task运行时及其对应的Executor实现进行进一步的解释.

Executor

Executor的典型实现

FutureWaker等Rust原生类不同, 在内核编程当中Executor是需要自己进行实现的, 更具体地来说, 你需要给出执行器的调度策略, 但并不需要关心内部的执行流程.

下面是一个典型的async_task下的执行器的实现, 使用了双端队列进行维护. 其中spawn函数用于生成一个新任务, run用于执行任务.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//! executor.rs

struct Executor {
    queue: Mutex<VecDeque<Runnable>>,
}

impl Executor {
    pub fn new() -> Self {
        Self {
            queue: Mutex::new(VecDeque::new()),
        }
    }
    fn push_back(&self, runnable: Runnable) {
        self.queue.lock().unwrap().push_back(runnable);
    }
    fn push_front(&self, runnable: Runnable) {
        self.queue.lock().unwrap().push_front(runnable);
    }
    fn pop_front(&self) -> Option<Runnable> {
        self.queue.lock().unwrap().pop_front()
    }
}

lazy_static! {
    static ref EXECUTOR: Executor = Executor::new();
}

/// Add a task into task queue
pub fn spawn<F, R>(future: F)
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let schedule = move |runnable: Runnable, info: ScheduleInfo| {
        if info.woken_while_running {
            EXECUTOR.push_back(runnable);
        } else {
            EXECUTOR.push_front(runnable);
        }
    };
    let (runnable, handle) = async_task::spawn(
        future,
        WithInfo(schedule),
    );
    runnable.schedule();
    handle.detach();
}

/// run next task
pub fn run() {
    if let Some(runnable) = EXECUTOR.pop_front() {
        runnable.run();
    }
}

executor::spawn

我们来看生成一个新任务的过程.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
pub fn spawn<F, R>(future: F)
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let schedule = move |runnable: Runnable, info: ScheduleInfo| {
        if info.woken_while_running {
            EXECUTOR.push_back(runnable);
        } else {
            EXECUTOR.push_front(runnable);
        }
    };
    let (runnable, handle) = async_task::spawn(
        future,
        WithInfo(schedule),
    );
    runnable.schedule();
    handle.detach();
}

首先, 我们通过定义schedule闭包, 定义了当前执行器的调度策略. 请注意, 在async_task当中, 所有的执行器调度都是通过这个schedule发生的, async_task内部向executor传递调度信息的唯一方式, 就是通过这个schedule.

然后, 通过调用async_task::spawn, 可以生成一个新的任务, 其中就传入了schedule作为调度策略.

1
let (runnable, handle) = async_task::spawn(future, WithInfo(schedule));

这里的async_task::spawn会返回两个返回值, 它们的类型分别为RunnableTask<future::Output>.

我们查看async_task::spawn的内部实现, 会发现它最终是由内部的Builder.spawn_unchecked函数进行实现的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//! Builder.spawn_unchecked
// 通过future和schedule创建一个RawTask, 再把RawTask的信息转为指针返回
let ptr = RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, self);
// 通过ptr创建runnable
let runnable = Runnable::from_raw(ptr);
// 通过ptr创建task
let task = Task {
    ptr,
    _marker: PhantomData,
};
(runnable, task)

在函数内部, 会调用RawTask::allocate来进行RawTask的生成. 查看RawTask发现, RawTask是真正保存了任务的所有信息的结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
pub(crate) struct RawTask<F, T, S, M> {
    /// The task header.
    pub(crate) header: *const Header<M>,
    /// The schedule function.
    pub(crate) schedule: *const S,
    /// The future.
    pub(crate) future: *mut F,
    /// The output of the future.
    pub(crate) output: *mut Result<T, Panic>,
}

而当RawTask生成完毕之后, 会通过ptr传回一个指针以供RunnableTask创建.

可以看出, RunnableTask 都从同一个ptr创建而来, 是对同一个RawTask的两种不同的数据保存形式.

事实上, Runnable保留了使用者与任务的信息交互接口, 在我们实际交互的时候, 通常都是调用Runnable的接口完成

Task用于保存任务实际的执行环境信息, 在本文中, 使用的时候我们不会直接对它进行修改.

executor::run 及 schedule 的调用方式

先来回顾一下执行器执行的过程

1
2
3
4
5
6
/// run next task
pub fn run() {
    if let Some(runnable) = EXECUTOR.pop_front() {
        runnable.run();
    }
}

我们发现, 每一次运行run, 程序都会从执行器的双端队列队首通过pop_front取出一个待执行的Runnable对象, 尝试对它进行执行. 这里在内部实现上就是对于runnable对应的Future调用了一次poll轮询.

那么问题来了, 有pop_front就应该有push. 我们目前的代码里, 唯一涉及向队列push的操作, 就只有schedule当中的push_frontpush_back. 那么schedule一定在run函数被调用后的某处也跟着一起被调用了才对.

但奇怪的是, 在run函数当中压根就没有对于schedule的调用操作. 事实上, 就算进入Runnable.run内部寻找, 也并没有对于schedule函数的直接调用. 那么schedule到底在哪里被调用了呢?

最终我们在Taskwake函数实现当中找到了如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
match (*raw.header).state.compare_exchange_weak(
    state,
    state | SCHEDULED,
    Ordering::AcqRel,
    Ordering::Acquire,
) {
    Ok(_) => {
        // If the task is not yet scheduled and isn't currently running, now is the
        // time to schedule it.
        if state & RUNNING == 0 {
            // Schedule the task.
            Self::schedule(ptr, ScheduleInfo::new(false));
        } else {
            // Drop the waker.
            Self::drop_waker(ptr);
        }
        break;
    }
    Err(s) => state = s,
}

这段代码尝试通过修改SCHEDULED标志位为1, 并在修改成功之后对任务调用schedule, 以此将其插入到执行队列当中. 至此我们终于找到了调用schedule的源头——wake函数.

而这个wake函数, 就是被链接到Wakervtable里的那个wake函数! 换句话来说, 插入执行队列的操作最终是由Waker.wake来完成的.

很有趣, 找了一圈居然重新找到了先前Future::poll里传入的Waker上! 想要将任务重新恢复到执行器里, 靠的正是唤醒器的唤醒操作, 确实很合理.

重归Waker

现在, 是时候重新回到Waker, 解答我们之前的疑问了.

调用waker::wake的时机

Waker::wake的调用时机是怎样的呢? 我们来回顾一下Future::poll吧:

1
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

Context当中包含了Waker信息, Waker通过poll调用链上逐级传递下去. 一旦出现需要返回Pending的情况, 那么当前的执行过程就不得不中止了. 这种情况下, 对于waker有两种可能的操作:

  • 保存当前的Waker, 等待当前状态能够进一步推进的时候, 调用waker将其放入执行器的队首使其快速恢复执行.
  • 直接再次唤醒Waker, 将对应的任务插入到执行器的队尾.

这两种处理方式在实践的时候都有使用, 前者一般用于IO密集型任务, 会在IO信号到来的时候通过先前Pending时保存的waker进行唤醒; 而后者一般用于普通的任务调度过程, 实现了一个较为传统的队列维护过程.

waker到底唤醒了谁 / 从哪来

这个问题还需要进一步的探讨. 为了研究清楚这个问题, 我写了一段嵌套调用Future然后返回Pending的代码, 发现每次poll的时候都会经历一次完整的函数调用链.

所以我们可以得出结论: 每一次尝试poll的时候都会从根Future出发, 经历一次完整的poll调用链, 直到最深层返回Pending或者Ready为止.

这也很好解释, 因为我们的Waker是逐级向下传递的, 保存的就是最开始Future.poll时传入的Waker. 所以每次尝试唤醒的时候也会调用这个传递下来的Waker.

那么这个初始的Waker又是怎么传入的? 查看async_task::run, 发现是在run的过程当中调用poll时传入的. 回顾一下, Waker内部其实就是vtable和一个指向RawTask的指针, 所以这个Waker的生成是很方便的.

总结

至此, 我们终于搞明白了一个完整的执行流程当中, 任务的信息是如何被传递的:

  1. 任务以Future参数形式传入spawn函数
  2. spawn函数通过Future生成RawTask, 再通过RawTask派生出RunnableTask
  3. 先调用一次runnable.schedule, 将任务插入到执行器队列当中, 至此进入执行器执行的过程当中
  4. 执行器进行一次执行, 先将Runnable形态的任务从队首取出
  5. 对于取出的Runnable调用它的run函数, run内部会对它对应的Future调用一次poll, 并传入携带有自身wake函数实现方式的Waker信息
  6. 假如在执行过程当中遇到了一次Pending状态的传回, 那么当前任务需要阻塞, 同时需要保存Waker便于下次唤醒
  7. 在适当的时机 (比如某些信号来临的时候) 重新取出先前保存的Waker信息, 调用其中的wake函数
  8. wake函数会通过Task::wake调用schedule函数, 最终通过schedule函数将任务信息重新插入回到任务执行队列当中. 至此实现了任务的循环调用过程.

完结撒花~

2024 crpboy, All rights reserved.