线程通信
约 1001 字大约 3 分钟
2025-09-26
channel
在并发编程中,消息传递(message passing) 已逐渐成为确保安全性的关键范式。Rust 标准库通过 信道(channel) 提供了对这一模式的支持
Channel 架构
- 发送端(transmitter):负责提交待传递的数据。
- 接收端(receiver):负责从通道中提取已发送的数据。
程序的一部分通过发送端提交消息,另一部分通过接收端提取并处理消息。当发送端或接收端任一被丢弃时,通道即被视为 关闭(closed),后续发送或接收将返回相应的错误结果。
注
该理念源自 Go 的并发准则:“不要通过共享内存来通信;应通过通信来共享内存。”
创建 channel
创建 channel 使用 mpsc::channel 函数。mpsc 是 多生产者,单消费者 的缩写。简而言之,Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的 发送端(sending),但只能有一个消费这些值的接收端(receiving)
use std::sync::mpsc;
fn main() {
// 需要额外标注 channel 的类型参数
let (tx, rx) = mpsc::channel::<String>();
}该函数返回一个元组:第一个元素是发送侧 -- 发送端,而第二个元素是接收侧 -- 接收端。由于历史原因,tx 和 rx 通常作为发送端(transmitter)和 接收端(receiver)的传统缩写
注
mpsc::channel() 是泛型函数因此需要显示的声明 channel 的类型参数。
使用 channel
channel 的接收端有两个方法:recv() 和 try_recv()
recv:会阻塞主线程执行直到从信道中接收一个值,一旦发送了值,recv会在一个Result<T, E>中返回它。当信道发送端关闭,recv会返回一个错误表明不会再有新的值到来了。try_recv:不会阻塞,但会立即返回一个Result<T, E>。当信道为空时,try_recv将返回Err,而不是阻塞主线程。
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel::<String>();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}注
线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。
channel 与所有权转移
发送端的 send 方法会获取参数的所有权,并将其移动到接收端。这意味着:一旦数据被发送,就不能在原线程中再次使用该值
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel::<String>();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
// 错误做法
println!("{val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}与上述案例不同的是,在 tx.send 后额外进行了一次 println,而 send 方法会消耗掉变量的所有权因此编译器提示 borrow of moved value
send 被设计消耗所有权的原因:
一旦将值 send 到另一个线程后,另一个线程可能会在使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果
小案例
发送多个值
use std::{sync::mpsc, thread, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel::<String>();
thread::spawn(move|| {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for i in vals {
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {received}");
}
}通过克隆发送端来创建多个生产者
use std::{sync::mpsc, thread, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let tx1 = tx.clone();
thread::spawn(move|| {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for v in vals {
tx1.send(v).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for v in vals {
tx.send(v).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}