基于Future的Rust操作系统内核异步编程
2024/11/14 by crpboy
前言
本文是有关使用async_task进行操作系统内核异步编程的解析, 不涉及tokio运行时的解析.
建议先阅读官方文档了解基础的异步编程语法.
rust使用async和await进行异步编程, 其底层实现为Future特性, 具体语法不再赘述.
可以参考官方文档. Rust语言圣经 Rust 中的异步编程.
async
在rust当中, async是使用无栈协程机制 (关于无栈协程, 可以看这个视频) 实现的, 其内部是一个状态机, 下面我将通过一个例子来说明.
观察下面这个函数, 其中func_1/2/3都是异步函数, 可能无法立即得到返回.
|
|
异步函数可以被转化为一个状态机模型. 本例中, 异步函数被三个.await语句拆分成为了四个部分, 每个.await都有 完成 / 等待 两种执行结果, 将它上下的执行部分通过 完成 / 等待 这两条状态转移边连接起来.
画个图就很清楚了.
假如子函数func_n能够执行完毕, 那就推动当前状态机进度, 从process_n走到process_n+1, 否则当前函数就阻塞在了func_n, 我们可以考虑让权, 等func_n满足恢复执行的条件了, 再去尝试推动函数进展. 关于让权和恢复的过程, 我会在后续的执行器部分详细说明.
Future
事实上, async fn是Future的一个语法糖, 它上将原函数包装为了一个带有Future特性的匿名类.
为了深入了解, 我们得来看一下Future的具体定义:
|
|
Future的关键是poll函数, 它会通过返回Poll::Ready(T)或Poll::Pending向上级函数反馈当前Future的执行情况信息. 而关于Future.poll的具体用法, 大部分的教材都是这么说的:
你可以通过对带有
Future特性的类调用poll来尝试"推动它的进展"
什么叫"推动进展"? 其实就是在Future对应的状态机上, 尝试从上一个状态转移到下一个状态.
-
如果子函数 (严格来说是子Future, 下略) 返回了
Ready, 当前函数会流畅地继续执行下去. -
如果子函数返回了
Pending, 那么当前函数调用链都会逐级返回Pending, 因此整条函数调用链被阻塞, 一般会在这里选择让权等待.
一个异步函数内的.await调用相当于对于子函数进行一次poll的调用, 假如子函数阻塞在了Pending状态, 那么后续将通过执行器重新进行poll调用直到返回Ready为止.
Waker
Waker与Context的关系
在上文当中我刻意省略了有关poll中的Context参数的内容. 目前Context当中只有Waker被真正使用, 所以可以暂时认为Context和Waker是等价的. 官方也在文档里说明了这一点.
现在让我们回过头来重新看一下poll函数:
|
|
这个传入的cx就是用于传递Waker信息的, 我们可以通过cx.waker()将内部的Waker取出进行调用.
那么Waker到底是用来干什么的呢? 很多教材都会说这是个"唤醒器". 但什么时候需要唤醒? 唤醒的对象是谁? 什么时候需要保存唤醒器? 唤醒信号是如何通知的? 唤醒的具体过程又是如何? 一概不知.
问题太多, 所以我们先按下不表, 必须等完整的了解Waker和Executor的实现之后, 才能够解答这个问题.
Waker的实现
接下来让我们来看看Waker的定义:
|
|
可以发现Waker的内部实现为RawWaker, RawWaker内部保存了data和vtable.
data用于保存Waker对应的任务的相关数据, 在async_task当中, 它指向了RawTask这个结构体, 其中保存了一个任务的所有信息, 可以用于后续状态的恢复.
vtable保存了一个&'static RawWakerVTable的指针, 我们来看看内部实现:
|
|
可以看到, vtable内部保存着四个函数指针, 对应了waker的四个基本函数. 但这四个函数的具体实现取决于waker对应的运行时环境. 在调用Waker.wake_by_ref()等函数的时候, 就会从vtable当中取出对应的函数入口进行执行.
我们以async_task库为例, 在async_task当中, 会在创建一个Task的时候, 在内部创建一个链接着它自定义实现的上述四个函数入口的vtable, 并在创建waker的时候, 将vtable传递给waker.
这样Waker就可以通过调用cx.waker().wake_by_ref()等函数, 使用async_task自定义的方法来进行唤醒了.
至此我们也可以知道前文当中, 为什么关于Waker会产生这么多的疑惑: 因为Waker的内部函数实现取决于它使用的运行时库的实现, 不同的库的调度策略并不一致. 所以, 接下来我将对于async_task运行时及其对应的Executor实现进行进一步的解释.
Executor
Executor的典型实现
与Future和Waker等Rust原生类不同, 在内核编程当中Executor是需要自己进行实现的, 更具体地来说, 你需要给出执行器的调度策略, 但并不需要关心内部的执行流程.
下面是一个典型的async_task下的执行器的实现, 使用了双端队列进行维护. 其中spawn函数用于生成一个新任务, run用于执行任务.
|
|
executor::spawn
我们来看生成一个新任务的过程.
|
|
首先, 我们通过定义schedule闭包, 定义了当前执行器的调度策略. 请注意, 在async_task当中, 所有的执行器调度都是通过这个schedule发生的, async_task内部向executor传递调度信息的唯一方式, 就是通过这个schedule.
然后, 通过调用async_task::spawn, 可以生成一个新的任务, 其中就传入了schedule作为调度策略.
|
|
这里的async_task::spawn会返回两个返回值, 它们的类型分别为Runnable和Task<future::Output>.
我们查看async_task::spawn的内部实现, 会发现它最终是由内部的Builder.spawn_unchecked函数进行实现的:
|
|
在函数内部, 会调用RawTask::allocate来进行RawTask的生成. 查看RawTask发现, RawTask是真正保存了任务的所有信息的结构体:
|
|
而当RawTask生成完毕之后, 会通过ptr传回一个指针以供Runnable和Task创建.
可以看出, Runnable和Task 都从同一个ptr创建而来, 是对同一个RawTask的两种不同的数据保存形式.
事实上, Runnable保留了使用者与任务的信息交互接口, 在我们实际交互的时候, 通常都是调用Runnable的接口完成
而Task用于保存任务实际的执行环境信息, 在本文中, 使用的时候我们不会直接对它进行修改.
executor::run 及 schedule 的调用方式
先来回顾一下执行器执行的过程
|
|
我们发现, 每一次运行run, 程序都会从执行器的双端队列队首通过pop_front取出一个待执行的Runnable对象, 尝试对它进行执行. 这里在内部实现上就是对于runnable对应的Future调用了一次poll轮询.
那么问题来了, 有pop_front就应该有push. 我们目前的代码里, 唯一涉及向队列push的操作, 就只有schedule当中的push_front和push_back. 那么schedule一定在run函数被调用后的某处也跟着一起被调用了才对.
但奇怪的是, 在run函数当中压根就没有对于schedule的调用操作. 事实上, 就算进入Runnable.run内部寻找, 也并没有对于schedule函数的直接调用. 那么schedule到底在哪里被调用了呢?
最终我们在Task的wake函数实现当中找到了如下代码:
|
|
这段代码尝试通过修改SCHEDULED标志位为1, 并在修改成功之后对任务调用schedule, 以此将其插入到执行队列当中. 至此我们终于找到了调用schedule的源头——wake函数.
而这个wake函数, 就是被链接到Waker的vtable里的那个wake函数! 换句话来说, 插入执行队列的操作最终是由Waker.wake来完成的.
很有趣, 找了一圈居然重新找到了先前Future::poll里传入的Waker上! 想要将任务重新恢复到执行器里, 靠的正是唤醒器的唤醒操作, 确实很合理.
重归Waker
现在, 是时候重新回到Waker, 解答我们之前的疑问了.
调用waker::wake的时机
Waker::wake的调用时机是怎样的呢? 我们来回顾一下Future::poll吧:
|
|
Context当中包含了Waker信息, Waker通过poll调用链上逐级传递下去. 一旦出现需要返回Pending的情况, 那么当前的执行过程就不得不中止了. 这种情况下, 对于waker有两种可能的操作:
- 保存当前的
Waker, 等待当前状态能够进一步推进的时候, 调用waker将其放入执行器的队首使其快速恢复执行. - 直接再次唤醒
Waker, 将对应的任务插入到执行器的队尾.
这两种处理方式在实践的时候都有使用, 前者一般用于IO密集型任务, 会在IO信号到来的时候通过先前Pending时保存的waker进行唤醒; 而后者一般用于普通的任务调度过程, 实现了一个较为传统的队列维护过程.
waker到底唤醒了谁 / 从哪来
这个问题还需要进一步的探讨. 为了研究清楚这个问题, 我写了一段嵌套调用Future然后返回Pending的代码, 发现每次poll的时候都会经历一次完整的函数调用链.
所以我们可以得出结论: 每一次尝试poll的时候都会从根Future出发, 经历一次完整的poll调用链, 直到最深层返回Pending或者Ready为止.
这也很好解释, 因为我们的Waker是逐级向下传递的, 保存的就是最开始Future.poll时传入的Waker. 所以每次尝试唤醒的时候也会调用这个传递下来的Waker.
那么这个初始的Waker又是怎么传入的? 查看async_task::run, 发现是在run的过程当中调用poll时传入的. 回顾一下, Waker内部其实就是vtable和一个指向RawTask的指针, 所以这个Waker的生成是很方便的.
总结
至此, 我们终于搞明白了一个完整的执行流程当中, 任务的信息是如何被传递的:
- 任务以
Future参数形式传入spawn函数 spawn函数通过Future生成RawTask, 再通过RawTask派生出Runnable和Task- 先调用一次
runnable.schedule, 将任务插入到执行器队列当中, 至此进入执行器执行的过程当中 - 执行器进行一次执行, 先将
Runnable形态的任务从队首取出 - 对于取出的
Runnable调用它的run函数,run内部会对它对应的Future调用一次poll, 并传入携带有自身wake函数实现方式的Waker信息 - 假如在执行过程当中遇到了一次
Pending状态的传回, 那么当前任务需要阻塞, 同时需要保存Waker便于下次唤醒 - 在适当的时机 (比如某些信号来临的时候) 重新取出先前保存的
Waker信息, 调用其中的wake函数 wake函数会通过Task::wake调用schedule函数, 最终通过schedule函数将任务信息重新插入回到任务执行队列当中. 至此实现了任务的循环调用过程.
完结撒花~