基于Future的Rust操作系统内核异步编程
2024/11/14 by crpboy
前言
本文是有关使用async_task
进行操作系统内核异步编程的解析, 不涉及tokio
运行时的解析.
建议先阅读官方文档了解基础的异步编程语法.
rust使用async
和await
进行异步编程, 其底层实现为Future
特性, 具体语法不再赘述.
可以参考官方文档. Rust语言圣经 Rust 中的异步编程.
async
在rust当中, async
是使用无栈协程机制 (关于无栈协程, 可以看这个视频) 实现的, 其内部是一个状态机, 下面我将通过一个例子来说明.
观察下面这个函数, 其中func_1/2/3
都是异步函数, 可能无法立即得到返回.
|
|
异步函数可以被转化为一个状态机模型. 本例中, 异步函数被三个.await
语句拆分成为了四个部分, 每个.await
都有 完成 / 等待 两种执行结果, 将它上下的执行部分通过 完成 / 等待 这两条状态转移边连接起来.
画个图就很清楚了.
假如子函数func_n
能够执行完毕, 那就推动当前状态机进度, 从process_n
走到process_n+1
, 否则当前函数就阻塞在了func_n
, 我们可以考虑让权, 等func_n
满足恢复执行的条件了, 再去尝试推动函数进展. 关于让权和恢复的过程, 我会在后续的执行器部分详细说明.
Future
事实上, async fn
是Future
的一个语法糖, 它上将原函数包装为了一个带有Future
特性的匿名类.
为了深入了解, 我们得来看一下Future
的具体定义:
|
|
Future的关键是poll
函数, 它会通过返回Poll::Ready(T)
或Poll::Pending
向上级函数反馈当前Future的执行情况信息. 而关于Future.poll
的具体用法, 大部分的教材都是这么说的:
你可以通过对带有
Future
特性的类调用poll
来尝试"推动它的进展"
什么叫"推动进展"? 其实就是在Future对应的状态机上, 尝试从上一个状态转移到下一个状态.
-
如果子函数 (严格来说是子Future, 下略) 返回了
Ready
, 当前函数会流畅地继续执行下去. -
如果子函数返回了
Pending
, 那么当前函数调用链都会逐级返回Pending
, 因此整条函数调用链被阻塞, 一般会在这里选择让权等待.
一个异步函数内的.await
调用相当于对于子函数进行一次poll
的调用, 假如子函数阻塞在了Pending
状态, 那么后续将通过执行器重新进行poll
调用直到返回Ready
为止.
Waker
Waker与Context的关系
在上文当中我刻意省略了有关poll
中的Context
参数的内容. 目前Context
当中只有Waker
被真正使用, 所以可以暂时认为Context
和Waker
是等价的. 官方也在文档里说明了这一点.
现在让我们回过头来重新看一下poll
函数:
|
|
这个传入的cx
就是用于传递Waker
信息的, 我们可以通过cx.waker()
将内部的Waker
取出进行调用.
那么Waker
到底是用来干什么的呢? 很多教材都会说这是个"唤醒器". 但什么时候需要唤醒? 唤醒的对象是谁? 什么时候需要保存唤醒器? 唤醒信号是如何通知的? 唤醒的具体过程又是如何? 一概不知.
问题太多, 所以我们先按下不表, 必须等完整的了解Waker
和Executor
的实现之后, 才能够解答这个问题.
Waker的实现
接下来让我们来看看Waker
的定义:
|
|
可以发现Waker
的内部实现为RawWaker
, RawWaker
内部保存了data
和vtable
.
data
用于保存Waker
对应的任务的相关数据, 在async_task
当中, 它指向了RawTask
这个结构体, 其中保存了一个任务的所有信息, 可以用于后续状态的恢复.
vtable
保存了一个&'static RawWakerVTable
的指针, 我们来看看内部实现:
|
|
可以看到, vtable
内部保存着四个函数指针, 对应了waker
的四个基本函数. 但这四个函数的具体实现取决于waker
对应的运行时环境. 在调用Waker.wake_by_ref()
等函数的时候, 就会从vtable
当中取出对应的函数入口进行执行.
我们以async_task
库为例, 在async_task
当中, 会在创建一个Task
的时候, 在内部创建一个链接着它自定义实现的上述四个函数入口的vtable
, 并在创建waker
的时候, 将vtable
传递给waker
.
这样Waker
就可以通过调用cx.waker().wake_by_ref()
等函数, 使用async_task
自定义的方法来进行唤醒了.
至此我们也可以知道前文当中, 为什么关于Waker
会产生这么多的疑惑: 因为Waker
的内部函数实现取决于它使用的运行时库的实现, 不同的库的调度策略并不一致. 所以, 接下来我将对于async_task
运行时及其对应的Executor
实现进行进一步的解释.
Executor
Executor的典型实现
与Future
和Waker
等Rust原生类不同, 在内核编程当中Executor
是需要自己进行实现的, 更具体地来说, 你需要给出执行器的调度策略, 但并不需要关心内部的执行流程.
下面是一个典型的async_task
下的执行器的实现, 使用了双端队列进行维护. 其中spawn
函数用于生成一个新任务, run
用于执行任务.
|
|
executor::spawn
我们来看生成一个新任务的过程.
|
|
首先, 我们通过定义schedule
闭包, 定义了当前执行器的调度策略. 请注意, 在async_task
当中, 所有的执行器调度都是通过这个schedule
发生的, async_task
内部向executor
传递调度信息的唯一方式, 就是通过这个schedule
.
然后, 通过调用async_task::spawn
, 可以生成一个新的任务, 其中就传入了schedule
作为调度策略.
|
|
这里的async_task::spawn
会返回两个返回值, 它们的类型分别为Runnable
和Task<future::Output>
.
我们查看async_task::spawn
的内部实现, 会发现它最终是由内部的Builder.spawn_unchecked
函数进行实现的:
|
|
在函数内部, 会调用RawTask::allocate
来进行RawTask
的生成. 查看RawTask
发现, RawTask
是真正保存了任务的所有信息的结构体:
|
|
而当RawTask
生成完毕之后, 会通过ptr
传回一个指针以供Runnable
和Task
创建.
可以看出, Runnable
和Task
都从同一个ptr
创建而来, 是对同一个RawTask
的两种不同的数据保存形式.
事实上, Runnable
保留了使用者与任务的信息交互接口, 在我们实际交互的时候, 通常都是调用Runnable
的接口完成
而Task
用于保存任务实际的执行环境信息, 在本文中, 使用的时候我们不会直接对它进行修改.
executor::run 及 schedule 的调用方式
先来回顾一下执行器执行的过程
|
|
我们发现, 每一次运行run
, 程序都会从执行器的双端队列队首通过pop_front
取出一个待执行的Runnable
对象, 尝试对它进行执行. 这里在内部实现上就是对于runnable
对应的Future
调用了一次poll
轮询.
那么问题来了, 有pop_front
就应该有push
. 我们目前的代码里, 唯一涉及向队列push
的操作, 就只有schedule
当中的push_front
和push_back
. 那么schedule
一定在run
函数被调用后的某处也跟着一起被调用了才对.
但奇怪的是, 在run
函数当中压根就没有对于schedule
的调用操作. 事实上, 就算进入Runnable.run
内部寻找, 也并没有对于schedule
函数的直接调用. 那么schedule
到底在哪里被调用了呢?
最终我们在Task
的wake
函数实现当中找到了如下代码:
|
|
这段代码尝试通过修改SCHEDULED
标志位为1, 并在修改成功之后对任务调用schedule
, 以此将其插入到执行队列当中. 至此我们终于找到了调用schedule
的源头——wake
函数.
而这个wake
函数, 就是被链接到Waker
的vtable
里的那个wake
函数! 换句话来说, 插入执行队列的操作最终是由Waker.wake
来完成的.
很有趣, 找了一圈居然重新找到了先前Future::poll
里传入的Waker
上! 想要将任务重新恢复到执行器里, 靠的正是唤醒器的唤醒操作, 确实很合理.
重归Waker
现在, 是时候重新回到Waker
, 解答我们之前的疑问了.
调用waker::wake的时机
Waker::wake
的调用时机是怎样的呢? 我们来回顾一下Future::poll
吧:
|
|
Context
当中包含了Waker
信息, Waker
通过poll
调用链上逐级传递下去. 一旦出现需要返回Pending
的情况, 那么当前的执行过程就不得不中止了. 这种情况下, 对于waker
有两种可能的操作:
- 保存当前的
Waker
, 等待当前状态能够进一步推进的时候, 调用waker
将其放入执行器的队首使其快速恢复执行. - 直接再次唤醒
Waker
, 将对应的任务插入到执行器的队尾.
这两种处理方式在实践的时候都有使用, 前者一般用于IO密集型任务, 会在IO信号到来的时候通过先前Pending
时保存的waker
进行唤醒; 而后者一般用于普通的任务调度过程, 实现了一个较为传统的队列维护过程.
waker到底唤醒了谁 / 从哪来
这个问题还需要进一步的探讨. 为了研究清楚这个问题, 我写了一段嵌套调用Future
然后返回Pending
的代码, 发现每次poll
的时候都会经历一次完整的函数调用链.
所以我们可以得出结论: 每一次尝试poll
的时候都会从根Future
出发, 经历一次完整的poll
调用链, 直到最深层返回Pending
或者Ready
为止.
这也很好解释, 因为我们的Waker
是逐级向下传递的, 保存的就是最开始Future.poll
时传入的Waker
. 所以每次尝试唤醒的时候也会调用这个传递下来的Waker
.
那么这个初始的Waker
又是怎么传入的? 查看async_task::run
, 发现是在run
的过程当中调用poll
时传入的. 回顾一下, Waker
内部其实就是vtable
和一个指向RawTask
的指针, 所以这个Waker
的生成是很方便的.
总结
至此, 我们终于搞明白了一个完整的执行流程当中, 任务的信息是如何被传递的:
- 任务以
Future
参数形式传入spawn
函数 spawn
函数通过Future
生成RawTask
, 再通过RawTask
派生出Runnable
和Task
- 先调用一次
runnable.schedule
, 将任务插入到执行器队列当中, 至此进入执行器执行的过程当中 - 执行器进行一次执行, 先将
Runnable
形态的任务从队首取出 - 对于取出的
Runnable
调用它的run
函数,run
内部会对它对应的Future
调用一次poll
, 并传入携带有自身wake
函数实现方式的Waker
信息 - 假如在执行过程当中遇到了一次
Pending
状态的传回, 那么当前任务需要阻塞, 同时需要保存Waker
便于下次唤醒 - 在适当的时机 (比如某些信号来临的时候) 重新取出先前保存的
Waker
信息, 调用其中的wake
函数 wake
函数会通过Task::wake
调用schedule
函数, 最终通过schedule
函数将任务信息重新插入回到任务执行队列当中. 至此实现了任务的循环调用过程.
完结撒花~