本文更新于 {{ days }} 天前,内容可能已经过时。
安全且高效地处理并发编程是 Rust 的一个主要目标,在我们日常编写 Rust 应用也经常用到并发编程,本文将介绍下 Rust 关于并发编程相关的语法和特性。
Rust 并发
多线程
Rust 允许通过标准库 std::thread 创建新的线程,该库使用 1:1 的线程模型,这代表一个语言级线程使用一个系统线程。下面是一个使用标准库 std::thread 创建 Rust 线程的例子,两个线程将交替输出文本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap(); // 等待子线程执行完成
}
|
Rust 的线程模型具备以下特点:
特性 | 描述 |
---|
调度模型 | 1:1 (OS线程) |
栈大小 | 固定 |
内存开销 | 较高 (MB级别) |
调度开销 | 由OS决定 |
数据竞争 (data race) | 编译时检测 |
错误处理 | 线程 panic 仅该线程崩溃 主线程 panic 整个程序终止 |
在上面创建线程的例子中我们使用闭包,闭包是一个可以捕获环境的匿名函数。
1
2
3
4
5
6
7
| fn main() {
let v = vec![1, 2, 3];
let print = || {
println!("Here's a vector: {:?}", v); // 自动捕获, 借用 v
};
print();
}
|
然而在线程创建中,线程使用的闭包的生命周期是独立于 main 函数的,Rust 不知道这个新建的线程会执行多久,所以无法知晓捕获的引用是否一直有效。以下代码会编译失败。
1
2
3
4
5
6
7
8
9
10
| use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
// error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
}
|
通过 move 关键词可以将数据的所有权移动到新的线程中。move 之后原来的线程将不再允许使用对应的变量,否则会编译失败。
1
2
3
4
5
6
7
8
9
10
11
12
| use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
// println!("Test: {:?}", v);
// ^ value borrowed here after move
}
|
Rust 存在类似 Java 的 ThreadLocal 一样的实现,通过 thread_local! 宏可以创建一个线程局部变量,用于在单个线程内隐式传递一些数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| use std::cell::RefCell;
use std::thread;
thread_local!{
// thread_local! 定义的变量是不可变的, 通过 RefCell 来实现可变
static CTX: RefCell<u32> = RefCell::new(1);
}
fn main() {
let handle1 = thread::spawn(move || {
CTX.with(|ctx| {
println!("handle1: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
*ctx.borrow_mut() = 100;
thread::sleep(Duration::from_secs(2));
println!("handle1: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
});
});
let handle2 = thread::spawn(move || {
CTX.with(|ctx| {
println!("handle2: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
thread::sleep(Duration::from_secs(1));
*ctx.borrow_mut() = 200;
println!("handle2: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
});
});
CTX.with(|ctx| {
println!("main: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
handle1.join().unwrap();
handle2.join().unwrap();
println!("main: thread={:?}, ctx={:?}", thread::current().id(), *ctx.borrow());
});
}
|
消息通信
不要通过共享内存来通信,而是通过通信来共享内存。
Rust 可以通过类似于 Golang 中的 channel 来实现消息传递。Rust 的标准库 std::sync::mpsc (multiple producer, single consumer) 提供了一个多生产者、单消费者的 channel 模型,允许跨线程生产、消费数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// 创建 channel 的生产者和消费者
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
tx.send(1).unwrap();
thread::sleep(Duration::from_secs(1));
tx.send(2).unwrap();
tx.send(3).unwrap();
});
// 阻塞接收消息
let v = rx.recv().unwrap();
println!("{}", v);
// 非阻塞接收消息
match rx.try_recv() {
Ok(v) => println!("{}", v),
Err(_) => println!("no message"),
}
// 阻塞遍历消息
for v in rx {
println!("{}", v);
}
}
// 输出:
// 1
// no message
// 2
// 3
|
Rust 的 channel 具备以下特点:
特性 | 描述 |
---|
内存安全 | 编译期保障 (所有权和生命周期) |
传递方式 | 方法调用 (send、recv) |
生产消费模式 | 多生产者、单消费者 |
错误处理 | 返回 Result,需显式处理 |
阻塞操作 | recv |
非阻塞操作 | try_recv |
超时机制 | recv_timeout |
通道关闭 | 生产者全部 drop 后自动关闭 |
Rust 的 mpsc 默认返回一个生产者,通过 clone 可以实现多生产者。由于 move 关键词已经将 tx 的所有权转移到了闭包中,因此在另一个线程中无法使用 tx,所以需要提前 clone 创建新的生产者,然后再通过 move 关键词将新的生产者的所有权转移到新的闭包中。
1
2
3
4
5
6
7
8
9
10
11
| let (tx, rx) = mpsc::channel();
let tx2 = tx.clone();
thread::spawn(move || {
tx.send(1).unwrap();
});
thread::spawn(move || {
tx2.send(2).unwrap();
});
for v in rx {
println!("{}", v);
}
|
Rust 默认的 mpsc::channel 可以被理解为一个无限 Buffer 的通道,如果需要创建一个指定 Buffer 的通道,需要使用 mpsc::sync_channel。
互斥和读写锁
回到上文提及的共享内存,Rust 提供了锁的结构来实现共享状态,比如 sync::Mutex<T> 互斥锁、sync::RwLock<T> 读写锁 和 atomic::Arc<T> 原子引用计数,用于确保编译期线程安全。
1
2
3
4
5
6
7
8
9
10
| use std::sync::Mutex;
fn main() {
let m = Mutex::new(1);
let mut num = m.lock().unwrap();
*num = 2;
drop(num); // 释放锁
let v = m.into_inner().unwrap(); // 获取锁中的值 (已移动 m 的所有权)
println!("{}", v);
}
|
这里我们显式调用 drop 释放锁,实际上 drop 是一个释放资源的函数,它通过转移数据的所有权使得后续代码不能再引用该数据。
1
2
| // std::mem::drop
pub fn drop<T>(_x: T) // 传入参数会直接移动所有权
|
根据以上例子,发现本质上加锁数据的生命周期结束后,锁会被释放。因此,上述代码可以省略锁释放的步骤。
1
2
3
4
5
6
7
| let m = Mutex::new(1);
{
let mut num = m.lock().unwrap();
*num = 2;
}
let v = m.into_inner().unwrap(); // 获取锁中的值 (已移动 m 的所有权)
println!("{}", v);
|
sync::RwLock 读写锁的使用类似 sync::Mutex<T>。
1
2
3
4
| let rw = RwLock::new(1);
let r1 = rw.read().unwrap();
let r2 = rw.read().unwrap();
rw.write().unwrap(); // 这里会一直等待锁
|
Mutex 和 RwLock 本身是线程安全的,但是不能直接在多线程中使用的,因为 Rust 的所有权存在单一所有者原则,限制了一个值同一时间只能被一个所有者持有。为了解决这一问题,需要引入智能指针 Arc 原子引用计数指针。
- Arc 通过原子操作维护引用计数,允许多个线程持有对同一数据的共享所有权 (Rc 升级版)
- 当最后一个 Arc 实例被销毁时,内部的数据才会被释放
与 Rc 类似,Arc 通过 clone 增加引用计数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let m = Arc::new(Mutex::new(1));
let m1 = Arc::clone(&m); // 增加引用计数
let m2 = Arc::clone(&m); // 增加引用计数
let handle1 = thread::spawn(move || { // 通过 move 转移 m1 的所有权
let mut num = m1.lock().unwrap();
*num += 1;
});
let handle2 = thread::spawn(move || { // 通过 move 转移 m2 的所有权
let mut num = m2.lock().unwrap();
*num += 2;
});
handle1.join().unwrap();
handle2.join().unwrap();
let v = Arc::into_inner(m).unwrap().into_inner().unwrap();
println!("{}", v);
}
|
Rust 的锁具备以下特点:
特性 | 描述 |
---|
内存安全 | 编译期保障 (所有权和生命周期) |
使用方式 | 通过泛型封装需要共享的数据 |
锁管理 | 手动加锁、自动释放 (生命周期) |
并发安全 | 并发安全,但由于所有权需结合 Arc 智能指针使用 |
死锁检测 | 无法检测 |
Sync 和 Send trait
Rust 在语言级别存在两个并发相关的 trait,而非通过标准库实现。
- Send trait:表明该类型的所有权可以在线程间传递,几乎所有的 Rust 类型都是 Send 的
- Sync trait:表明该类型可以安全的在多个线程中拥有其值的引用
存在一些特例:
- Rc<T>:引用计数智能指针,本身设计就是非线程安全的,因此未实现 Send 和 Sync
- RefCell<T>:运行时借用绕过编译限制,所进行的借用检查也不是线程安全的,因此未实现 Sync
- 裸指针 *const T / *mut T:默认未实现 Send 和 Sync
一般情况不需要手动实现 Sync 和 Send trait,由 Send 和 Sync 的类型组成的类型,自动就是 Send 和 Sync 的,在语言层面这两个是属于标记 trait,不需要实现任何方法。
编译器利用这两个 trait 来约束并发代码实现,编译时检测代码是否符合规定。
- std::thread::spawn 要求闭包时 Send 的,意味着闭包捕获的所有变量的所有权都必须能被移动到新的线程
- std::sync::Arc<T> 要求 T 必须是 Send + Sync 的,才能保证它能被安全地在线程间共享和访问
1
2
3
4
| pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
|
Rust 异步
Rust 的 1:1 线程模型相比用户级协程存在较大开销,且在一些 IO 密集型的场景下会阻塞线程。Rust 提供了一套异步规范,通过 async 和 await 的语法实现类似于 JavaScript / TypeScript 的异步,能够像在阻塞代码中一样以直接的风格编写非阻塞代码。
Futures 和 async 语法
- future:在 Rust 中称实现了 Futurn trait 的类型为 future 类型
- async:该关键词用于标记代码块和函数,表示它们可以被中断并恢复,对应块和函数需返回一个 future 变量
- await:该关键词用于等待 future 就绪,该操作会交出当前函数的控制权,当 future 就绪后 运行时 会唤醒该任务并从 await 的地方继续执行
1
2
3
4
| pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
|
在 Rust 的异步机制设计中,标准库接口和运行时 (Runtime) 是解耦。前者定义了规范,后者作为规范的实现,知名的 Rust 异步运行时有 tokio、async-std。
下面我们使用 tokio 和 reqwest 第三方库展示一个 async/await 的例子,需要创建一个 cargo 项目并且在 Cargo.toml 中添加相关依赖。
1
2
3
4
| [dependencies]
futures = "0.3"
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1", features = ["full"] }
|
通过 tokio 创建一个 Rust 的 N:M 的异步运行时,并通过 block_on 阻塞并等待传入的 future 就绪。
1
2
3
4
5
6
7
8
| fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// reqwest::get 函数返回一个 future, 当执行到 await 时才会真正发起网络请求
let resp = reqwest::get("https://www.toutiao.com").await.unwrap();
println!("{:?}", resp);
})
}
|
tokio 提供更便捷的宏来实现上面的效果。
1
2
3
4
5
| #[tokio::main]
async fn main() {
let resp = reqwest::get("https://www.toutiao.com").await.unwrap();
println!("{:?}", resp);
}
|
并发与 async
下面将最开始创建线程的例子改写成使用 tokio 异步运行时并发执行逻辑,thread::spawn 被替换为 tokio::spawn,thread::sleep 被替换为 tokio::time::sleep,相关的异步 API 也被侵入式的传入 await 操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
for i in 1..10 {
println!("hi number {} from the spawned task!", i);
sleep(Duration::from_millis(1)).await;
}
});
for i in 1..5 {
println!("hi number {} from the main task!", i);
sleep(Duration::from_millis(1)).await;
}
handle.await.unwrap(); // 等待异步任务完成
}
|
同样,在消息传递上 tokio 也提供了异步版本的 channel、Mutex。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send(1).await.unwrap();
});
tokio::spawn(async move {
tx2.send(2).await.unwrap();
});
while let Some(v) = rx.recv().await {
println!("{}", v);
}
}
|
tokio 也为异步任务提供了创建异步局部变量的宏 task_local!,与 thread 不同的是需要显式指定作用域。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| use std::cell::RefCell;
use tokio::task_local;
use tokio::time::{sleep, Duration};
task_local!{
static CTX: RefCell<u32>; // task_local! 定义的变量是不可变的, 通过 RefCell 来实现可变
}
#[tokio::main]
async fn main() {
let handle1 = tokio::spawn(async move {
CTX.scope(RefCell::new(1), async move { // 需要显式指定作用域
CTX.with(|ctx| {
println!("handle1: ctx={:?}", *ctx.borrow());
*ctx.borrow_mut() = 100;
println!("handle1: ctx={:?}", *ctx.borrow());
})
}).await;
});
tokio::join!(handle1);
// panic: 因为 task_local 需要显式指定作用域
// CTX.with(|ctx| {
// println!("handle1: ctx={:?}", *ctx.borrow());
// });
}
|
Rust tokio 的异步也支持通过 join 的方式等待异步任务执行完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| use futures::future::join_all;
use tokio::time::{sleep, Duration};
async fn do_something(task_id: i32) {
for i in 1..5 {
println!("task: {}, i: {:?}", task_id, i);
sleep(Duration::from_millis(50)).await;
}
}
#[tokio::main]
async fn main() {
let handle1 = tokio::spawn(do_something(1));
let handle2 = tokio::spawn(do_something(2));
tokio::join!(handle1, handle2);
// 以下效果相同
// let futures = vec![handle1, handle2];
// join_all(futures).await;
}
|
tokio 还提供了一个特殊的宏 select! 用于实现异步任务 future 竞争,哪个 future 最先准备就绪就先执行其对应的后续流程。
1
2
3
4
5
6
| let handle1 = tokio::spawn(do_something(1));
let handle2 = tokio::spawn(do_something(2));
tokio::select! {
_ = handle1 => println!("handle1 done"),
_ = handle2 => println!("handle2 done"),
}
|
了解了一些基础 Rust tokio 异步的用法,现在总结下 Rust 标准库、Rust 异步库之间的区别。
特性 | Rust 标准库 std::thread | Rust 异步 tokio |
---|
调度模型 | 1:1 (OS线程) | M:N (Task) |
栈大小 | 固定 (默认2M, 需预先指定) | 动态增长 (初始2K~4K) |
内存开销 | 较高 (MB级别) | 较低 (KB级别) |
数据竞争 (data race) | 编译时检测 | 编译时检测 |
调度开销 | 由OS决定 | 用户态轻量级调度 |
调度方式 | 由OS决定 | await 主动让出线程 (协作式) |
运行时依赖 | 无额外依赖 | 依赖第三方运行时 (tokio) |
错误处理 | 线程 panic 仅该线程崩溃 主线程 panic 整个程序终止 | 任务 panic 仅该任务崩溃 |
创建方式 | std::thread:spawn 创建线程 | async fn 定义异步任务 tokio::spawn 提交任务给运行时 |
数据传递 | thread_local! 可创建线程局部变量,隐式传递数据 | task_local! 可创建任务局部变量,隐式传递数据 |
执行时机 | spawn 后立刻执行 | spawn 后立刻执行或 await 时执行 |
代码风格 | 同步风格 | 侵入式,需引入 async/await |
流式处理
Rust 通过迭代器适配器实现 Iterator 的 trait 来支持数据的流式串行处理,如下例子。
1
2
3
4
5
| let values = [1, 2, 3];
let mut iter = values.iter().map(|x| x * 2);
while let Some(value) = iter.next() {
println!("Value: {}", value);
}
|
在 Rust 异步中也有类似流式处理特性,不过该实现并未实现在标准库中,而是官方提供的 futures 库。tokio 也有它们的 Stream 和 StreamExt 实现,前者是 Stream trait 的定义,后者 Ext 是 Stream trait 的实现 (提供 map、filter 等函数实现)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let values = [1, 2, 3];
let mut iter = stream::iter(values.iter()).map(|x| x * 2);
while let Some(value) = iter.next().await {
println!("Value: {}", value);
}
let values = [4, 1, 2, 3];
let mut iter = stream::iter(values.iter())
.map(|x| async move {
sleep(Duration::from_millis(100 * x)).await;
x * 2
})
.buffer_unordered(3); // 并发度3
let results: Vec<_> = iter.collect().await;
println!("Results: {:?}", results); // => [2, 4, 8, 6]
}
|
特性 | Iterator | Stream |
---|
执行模式 | 同步阻塞 (直接返回结果) | 异步非阻塞 (需 .await 获取值) |
阻塞行为 | 调用 next() 时可能阻塞当前线程 | 调用 .await 时让出控制权,允许执行其他任务 |
并发能力 | 顺序执行,不支持并发 | 支持并发处理 |
适用场景 | 纯 CPU 计算、本地数据处理 | I/O 密集型操作、高并发数据流 (异步资源) |
Rust 还允许以多种方式组合多个流,比如按顺序连接流、并发合并流、按元素配对合并流等,下面举几个例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| async fn process(v: u64) -> u64 {
sleep(Duration::from_millis(100 * v)).await;
v
}
let stream1 = stream::iter([1, 3, 5]).then(process);
let stream2 = stream::iter([2, 4, 6]).then(process);
let chain = stream1.chain(stream2);
let results: Vec<_> = chain.collect().await;
println!("Results: {:?}", results); // => [1, 3, 5, 2, 4, 6]
let zip = stream1.zip(stream2);
let results: Vec<_> = zip.collect().await;
println!("Results: {:?}", results); // => [(1, 2), (3, 4), (5, 6)]
let stream1 = stream::iter([1, 3, 5]).map(process);
let stream2 = stream::iter([2, 4, 6]).map(process);
let chain = stream1.chain(stream2).buffer_unordered(10);
let results: Vec<_> = chain.collect().await;
println!("Results: {:?}", results); // => [1, 2, 3, 4, 5, 6]
|
Future trait
让我们回顾 Futurn trait 的定义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
pub struct Context<'a> {
waker: &'a Waker;
}
|
- type Output:关联类型,用于在 trait 中声明一个占位类型,具体类型由实现该 trait 的类型指定
- 与泛型实现 Future<T> 的区别是,在实现 trait 时需要指定类型,而在使用该 trait 时无需重复指定类型
- Poll<T>:类似 Option 的枚举值,作为 poll 方法的返回值,用于判断 future 是否已就绪
- Pin<&mut Self>:表示一个固定的可变引用,用于固定 future 值在内存中的位置
- Rust 出于性能代价,数据被移动时不会更新其引用
- future 可能包含自引用 (自身字段指向自身其他字段的引用),自引用在变量被复制或重新分配时可能失效
- 使用 Pin 确保 poll 方法只能在 future 被固定后使用
- &mut Context<’_>:任务执行上下文的可变引用,用于异步任务执行时传递参数
- 当 future 准备好继续执行时,调用 wake() 函数通知调度器
- 当调度器收到唤醒信号后,会重新调度 future 执行 poll 方法
poll方法有什么作用?
async/await 实际上是 Rust 的语法糖,Rust 将调用 await 的代码编译为调用 poll 的代码。
1
2
3
4
5
6
7
8
| process(v).await
// 转变为
match process(v).poll() {
Ready(process) => { xxxxx } // 执行 await 之后的逻辑
Pending => { xxxxx } // 转移控制权, 等待就绪
}
|
为什么需要固定future的内存?
让我们回顾一个简单的数据移动导致引用失效的例子。
1
2
3
4
5
6
7
| fn main() {
let s1 = String::from("hello");
let r = &s1; // r 是 s1 的引用
let s2 = s1; // s1 被移动到 s2
// println!("{}", r); // 错误:借用检查器禁止使用 r
// 因为 s1 已被移动,r 变为悬垂引用
}
|
自定义类型可以通过编译时保证自引用代码无法通过编译,而在 future 中可能存在类似的自引用结构,因此需要通过 Pin 固定内存,确保内存无法移动且引用不会失效。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async fn example() {
let data = "hello".to_string();
let ptr = &data; // 捕获自身引用
// 模拟异步操作
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("{}", ptr); // 使用自引用
}
// 编译器会对应生成一个存在自引用的 future 状态机, 保存异步任务的状态
struct ExampleFuture {
data: String,
ptr: *const String, // 自引用
sleep: tokio::time::Sleep,
state: State,
}
|
Stream trait 的定义也类似于 Future。
1
2
3
4
5
6
7
| use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
|
结语
本文介绍了 Rust 的系统线程和异步任务使用,可以看出 Rust 的并发模型依然遵循了 Rust 的理念,强调编译时安全,通过所有权、生命周期、类型检测等机制防止数据竞争。不过,相比于 Golang 这类语言,Rust 的并发模型上手难度确实相对高不少。