基本概念
约 915 字大约 3 分钟
2025-09-26
Future
Future 表示一个 "现在可能还没准备好、未来某时会就绪的值"。Rust 以 Future trait 为核心,各类异步操作通过实现它来暴露可被轮询的状态
重要
异步操作会实现 Future trait,该 trait 内部的 poll 方会返回两种状态: Poll::Pending 或 Poll::Ready<T>
- 若 Future 完成了,那么将返回
Poll::Ready(result),result 会作为为最终的结果 - 若 Future 未完成,那么则必须尽快返回
Poll::Pending,不能阻塞,与此同时会把 Context 提供的Waker保存下来。等后续可继续推进时调用waker.wake()把任务重新入队;执行器线程从队列取出任务后会再次调用poll
socket.rs
// Tips: 用函数指针代替真实的 Waker
pub struct Socket {
pub has_data: bool,
pub buffer: Vec<u8>,
pub waker: Option<fn()>,
}
pub struct SocketRead {
pub socket: Socket,
}
impl Socket {
pub fn new(buffer: Vec<u8>) -> Self {
Self {
has_data: false,
buffer,
waker: None,
}
}
pub fn has_data(&self) -> bool {
self.has_data
}
pub fn read(&mut self) -> Vec<u8> {
self.has_data = false;
self.buffer.clone()
}
pub fn set_waker(&mut self, waker: fn()) {
self.waker = Some(waker);
}
pub fn on_data_ready(&mut self) {
self.has_data = true;
if let Some(w) = self.waker.take() {
w(); // 通知执行器可以再 poll
}
}
}
impl SocketRead {
pub fn new(socket: Socket) -> Self {
Self { socket }
}
}main.rs
mod socket;
use socket::{Socket, SocketRead};
#[derive(Debug)]
enum Poll<T> {
Pending,
Ready(T),
}
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
impl SimpleFuture for SocketRead {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data() {
Poll::Ready(self.socket.read())
} else {
self.socket.set_waker(wake);
Poll::Pending
}
}
}
fn simple_wake() {
println!("waker called -> ready to poll again");
}
fn main() {
let mut future = SocketRead::new(Socket::new(b"hello".to_vec()));
match future.poll(simple_wake) {
Poll::Ready(data) => println!("immediately got: {:?}", data),
Poll::Pending => {
println!("pending, simulate data arrival");
future.socket.on_data_ready();
match future.poll(simple_wake) {
Poll::Ready(data) => println!("after wake got: {:?}", data),
Poll::Pending => println!("still pending"),
}
}
}
}Waker
Waker 由 执行器 构造并通过 Context 传递给 Future。Waker 唤醒的本质是让任务重新入队
详情
wake可以跨线程调用,只保证 "尽快" 重新调度- 一次或多次调用
wake都是安全的,漏掉唤醒则会让 Future 永远停留在Pending - Waker 实现了
clone可以复制和存储,除此之外还可以使用wake_by_ref在只有引用时唤醒,避免 clone
实现一个计时器 Future
timer_calc.rs
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
#[derive(Default)]
pub struct SharedState {
completed: bool,
waker: Option<Waker>,
}
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState::default()));
let thread_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut state = thread_state.lock().unwrap();
state.completed = true;
// 计时结束后唤醒任务;提前 take 掉避免持锁唤醒
if let Some(w) = state.waker.take() {
w.wake();
}
});
Self { shared_state }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// 任务可能在不同执行器/线程间移动,因此需保存本次 poll 的最新 waker
// 使用 will_wake 避免不必要的 clone
let should_update = state
.waker
.as_ref()
.map_or(true, |w| !w.will_wake(cx.waker()));
if should_update {
state.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}
}main.rs
#[tokio::main]
async fn main() {
TimerFuture::new(Duration::from_secs(1)).await;
println!("timer done");
}Executor
执行器 Executor 用于在 Rust 异步运行时里调度与驱动 Future。执行器会把 Future 包装成可调度的任务,通常是 Task 或 JoinHandle,然后再按策略分配给线程运行
执行器在任务就绪或被唤醒后调用 Future 的 poll,直到返回 Poll::Ready<T>;遇到 Pending 会让出,等待下一次唤醒
重要
- 跨线程执行通常要求任务满足
Send + 'static
async
await
Future 是惰性的,除非驱动它们来完成,否则就什么都不做。在 async 函数中使用 await 只是把“驱动”的责任交给上层调用者;顶层的 async 则必须交给执行器去驱动,否则 Future 永远不会运行