简介
Tokio is an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing networking applications. It gives the flexibility to target a wide range of systems, from large servers with dozens of cores to small embedded devices.
Tokio
是 rust
实现的异步库,提供一个异步运行时。它具有以下的特点:
- Fast: rust 本身就有 C 语言的性能,tokio 又建立在 async/await feature 上,使得非常快,特别在处理 I/O 问题上。
- Reliable: 得益于 rust 的特点。可靠性有保障。
- Easy: 借助 Rust 的 async/await 特性,编写异步应用程序的复杂性已大大降低。
- Flexible: Tokio 提供了多版本运行时。从多线程、窃取工作的运行时到轻量级的单线程运行时。
安装
在 Cargo.toml
的 [dependencies]
加入:
1
|
tokio = { version = "1", features = ["full"] }
|
在 src/main.rs
中添加代码:
1
2
3
4
5
6
7
8
9
10
11
12
|
use std::io::Result;
async fn hello() {
println!("hello")
}
fn main() -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(hello());
Ok(())
}
|
cargo run
输出结果:
rust 宏 #[tokio::main]
能将异步 async fn main()
转化成同步的 fn main()
.例如:
1
2
3
4
|
#[tokio::main]
async fn main() {
println!("hello");
}
|
可以转化为:
1
2
3
4
5
6
|
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
|
并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// 每次都新启动 task 处理 tcp 连接
tokio::spawn(async move {
process(socket).await;
});
}
}
async fn process(_: TcpStream) {}
|
tokio
task 是一个异步绿色线程。使用 tokio::spawn
创建异步块,返回类型为 JoinHandle
。使用 .await
获取返回值。
1
2
3
4
5
6
7
8
9
|
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async move {
"return value"
});
let out = handle.await.unwrap();
println!("GOT {}", out);
}
|
task 被 tokio
内部调度器使用, 请确认内部有事可做。同时它们会在不同线程将执行。
tokio
中的任务非常轻量级。它们只需要一次分配和 64 字节的内存。应用程序可以随意产生数千甚至数百万个任务。
‘static
task 的类型是 'static
的,这意味着任务内部的变量不能存在外部引用
1
2
3
4
5
6
7
8
9
10
11
|
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
// v 变量转移到 task 内部
task::spawn(async move {
println!("Here's a vec: {:?}", v);
});
}
|
Send
tokio::spawn
产生的任务必须实现 Send
。这允许 tokio
运行时在线程之间移动任务,同时它们在 .await
处挂起。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
|
共享状态
在 tokio
中有几种不同的方式来共享状态。
- 使用 Mutex 保护共享状态。
- 生成一个任务来管理状态并使用消息传递对其进行操作。
通常,对简单数据使用第一种方法,对需要异步工作的事物(例如 I/O 原语)使用第二种方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
async fn process(_: TcpStream, _db: Arc<Mutex<HashMap<i32, i32>>>) {}
|
注意,使用 std::sync::Mutex 而不是 tokio::sync::Mutex 来保护 HashMap。一个常见的错误是在异步代码中无条件地使用 tokio::sync::Mutex。异步互斥锁是在调用 .await 时锁定的互斥锁。
等待获取锁时,同步 mutex 将等待当前 lock
。这反过来又会阻止其他任务的处理。但是,切换到 tokio::sync::Mutex
通常没有帮助,因为异步 mutex 在内部使用同步 mutex。根据经验,只要争用率保持在低位且未在 .await
中上锁,即可从异步 mutex 内使用同步 mutex 即可。此外,考虑使用 parking_lot::Mutex
作为 std::sync::Mutex
的更快的替代方法。
使用 tokio
异步 mutex。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
use tokio::sync::Mutex;
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
}
#[tokio::main]
async fn main() {
let m = Mutex::new(1);
increment_and_do_stuff(&m).await;
println!("GOT {:?}", m);
}
|
sync
tokio
提供以下几种 channels 类型:
mpsc
tokio::spawn
中使用 Sender 会转移生命周期时,需要调用 clone()
克隆一份。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
let _ = tx.send("sending from first handle").await;
});
tokio::spawn(async move {
let _ = tx2.send("sending from second handle").await;
});
if let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
|
Receiver 在作用域结束时,调用 close() 方法。
oneshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<i32>();
tokio::spawn(async move {
if !tx.is_closed() {
let _ = tx.send(3);
}
});
match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(e) => println!("the sender dropped: {}", e),
}
}
|
broadcast
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.clone().send(20).unwrap();
}
|
watch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel("");
tokio::spawn(async move {
tx.send("world").unwrap();
});
while rx.changed().await.is_ok() {
println!("received = {:?}", *rx.borrow());
}
}
|
I/O
tokio
中的 I/O 操作和标准库中一致,只是异步化了。它内部提供一个读特性(AsyncRead)和一个写特性(AsyncWrite)。并提供实现这个特性的结构(TcpStream, File, Stdout)。
异步读写
这两个特征提供了异步读取和写字节流的设施。这些特征上的方法通常不是直接调用的。相反,您将通过 AsyncReadExt
和 AsyncWriteExt
提供的实用方法使用它们。
当 read()
返回 Ok(0)
时,这意味着流已关闭。任何进一步的调用 read()
将立即返回 Ok(0)
。对于 TcpStream 连接,这意味着读取 socket 已关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
use std::str;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
{
let mut ff = File::create("foo.txt").await?;
ff.write_all(b"hello world").await?;
}
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 20];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", str::from_utf8(&buffer[..n]).unwrap());
Ok(())
}
|
AsyncReadExt::read_to_end
将中流中读取所有字节直到 EOF
(文件结尾标识符)。
1
2
3
4
5
6
7
8
9
|
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
|
io::copy
tokio
提供 tokoio::io
模块,其中包含了一系列有用的方法。例如 tokio::io::copy
异步版本的 io 复制。
1
2
3
4
5
6
7
8
9
10
11
|
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
|
使用字节数组也要实现 AsyncRead。
io::split
io::split
能够从拆分 reader 和 writer 类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
use tokio::{io::{self, AsyncReadExt, AsyncWriteExt}, net::TcpStream};
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
let write_task = tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
|
手动 coping
使用 AsyncReadExt::read
和 AsyncWriteExt::write_all
实现数据拷贝:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// 返回 Ok(0) 表示 socket 连接已断开
Ok(0) => return,
Ok(n) => {
// 数据写入
if socket.write_all(&buf[..n]).await.is_err() {
// 错误返回
return;
}
}
Err(_) => {
// 处理错误
return;
}
}
}
});
}
}
|
忘记从读取循环中 break
通常会导致 100% CPU 无限循环情况。当 socket 关闭时,socket.read()
会直接返回。循环然后永远重复。