Rust Async
async
-
Especially for IO intensive applications
-
Reading from a file, but many disks actually cannot do parallel reading or writing so there is not much gain. Unless we have several disks in the computer or unless the disks are mounted from some other devices via
nfs
orSamba
. -
Network client operations such as downloading pages from the Internet or accessing APIs through the network.
-
Network server operations where our system needs to handle multiple operations that involve network operations.
-
Accessing databases (via the network), which in reality is just accessing APIs over some protocol. (Which is probably not http.)
-
context switching
-
Cooperative multitasking vs Pre-emptive multitasking (interleaving or time-slicing)
- Even in the pre-emptive case a thread can give up control by waiting for some input (network) or by calling sleep.
-
Using async needs a runtime aka. main loop. There are several crates providing async runtime and tokio is the most popular one.
Asynchronous cooking
Let's make some mashed potatoes
- Take a pot fill it with water and put it on the stove. Turn on the stove.
- Sit next to it and wait till the water boils.
- Take out some potatoes, peal them, wash them, and cut them into little pieces.
- Put the potato in the water on the stove.
- Sit next to it till the potato becomes soft.
- Take out the potato and smash it.
You might have a better recipe, but I guess this covers it. However, most likely you will notice that there are two steps where you basically sit idle watching the water to boil.
Even if you don't have anything else to cook or anything else to do you could finish this job sooner.
You could take step 3 (pealing the potatoes) and do it while the water is warming up.
You could also take the time you spent waiting for the potato to be cook in step 5 to do something else. Cook some other meal, fill the washing machine, etc.
Basically you could take the time you (the CPU) is waiting for some external task to complete, and do something else.
This is basically asynchronous cooking.
tokio
Before we delve into the learning of async and tokio, let's see the results of two examples.
One of them is an HTTP client to download many pages.
The other is an HTTP server to serve many pages.
For both we have a sync and async version.
-
If we run the clients downloading many external pages we can observe the speed improvement from sync to async. In this case we don't really care how those servers handle the requests.
-
If we run the servers we either need our sync and async clients to demonstrate, or we can launch some other program to observer the results.
Hello World in async
Let's see the standard "Hello World!" example using async.
Just remember, in this example, although we have the async code it does not actually do anything asynchronous. It only shows the basic syntax, the basic mechanism.
First we create a new crate called demo and change to its directory.
cargo new demo
cd demo
In order to write async
code using the tokio
crate we need to add it to our project:
cargo add tokio -F full
Alternatively we can manually edit the Cargo.toml
file to add the dependency:
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
We use the full
feature here as that makes it easy to use all the features. Later you might fine-tune which features to include. However, full
is a good start.
The code
We prefix our async functions with the async
keyword and we decorate the main
function with #[tokio::main]
to convert that function to be our runtime.
Actually in reality the real main function cannot be async
and using this small syntactic sugar tokio will wrap our main function with a real main function, but let's not worry about it now.
There are other ways to use tokio
, but this is probably the most common and also the easiest way as well.
We also need to add the await
at the end of our async function calls to make them actually work.
#[tokio::main] async fn main() { println!("Hello, world!"); hello_world().await; } async fn hello_world() { println!("Hello, async world!"); }
Running the code
There is nothing special in running the program we use the regular cargo run
with the silencer:
$ cargo run -q
Hello, world!
Hello, async world!
- tokio::main
- async
- await
sync vs async
Before we continue down the rabbit whole of async, let's see the difference in the usage of regular (synchronous) and async functions.
In this example we have a regular (synchronous) function brilliantly named sync_func
and and an async function called async_func
.
You can observe that the only difference between these two functions in the async
prefix on the async function.
This won't be the case once we really start to explore async, but for this example we want to have two functions that behave identically once they start to run.
The question what is the difference in the way we call these functions?
We sprinkled the main
function with print-statements so we'll see what gets executed when.
When we call the sync_func()
it gets executed immediately and returns 42, the value that the function returns.
When we call the async_func()
it returns a Future, that is something that has the Future trait.
At this point the body of the function is not executed and thus we can observe that nothing is printed between "Before calling async_func" and "Before await".
We can then write future.await
. It is not really a function call as one can observe by the lack of parentheses ()
at the end of of await
. It is an instruction to the async runtime (that is tokio in our case) to stop execution the current function and let some other task running. In our case the only other task that was already initiated is the content of the async_func
. So at this point it will get executed.
The expression with the await
will terminate when the async_func
ends and it will return whatever the async_func
returns. In this case 42.
fn sync_func() -> u32 { println!("sync_func called"); 42 } async fn async_func() -> u32 { println!("async_func called"); 42 } #[tokio::main] async fn main() { println!("Start"); let value = sync_func(); println!("Value: {}", value); println!("Before calling async_func"); let future = async_func(); println!("Before await"); let res = future.await; println!("Result: {}", res); println!("End"); }
The output:
Start
sync_func called
Value: 42
Before calling async_func
Before await
async_func called
Result: 42
End
Cargo.toml
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
- tokio::main
- async
- await
- Future
Run async functions
What happens when we call async functions?
In order to demonstrate this we have a function called say
that gets some text and a number. It sleeps for the given number of seconds using an asynchronous sleep.
This imitates some IO operation our code waits for. In a real application instead of that sleep we might have a call to download a page via HTTP, we might access an API, or a database.
One the waiting is done we print the string so we can observe the order of what happens.
Then we have a number of function each one calling the say
function with the exact same parameters. We'll observe what happens in each case.
If you'd like to see the behaviour yourself you can run the program using cargo run
and providing one of the functions as argument:
cargo run call
call
#![allow(unused)] fn main() { async fn call_say() { let _ = say("Hello", 2); let _ = say("Hi", 1); } }
In the call_say
case we just call say
twice but don't await
the functions, they don't run at all and Rust even warns us about it:
note: futures do nothing unless you .await
or poll them
The total elapsed time is effectively 0 seconds.
await
#![allow(unused)] fn main() { async fn await_say() { say("Hello", 2).await; say("Hi", 1).await; } }
The first call waits for 2 second and prints then the second call waits for 1 second and prints. Althought we see all the bells and whistles of async code, in reality the calls run sequentially.
The total elapsed time is 3 seconds.
spawn
#![allow(unused)] fn main() { async fn spawn_say() { tokio::spawn(say("Hello", 2)); tokio::spawn(say("Hi", 1)); } }
In this case we hand over the await-ing to the tokio runtime using the spawn function.
Both functions start to run at the same time, however, our program might finished before the functions can print anything. This is the case in our example. The total elapsed time is 0 seconds and nothing is printed. This also means that our program does not wait for the (faked) remote call to return. Probably not what we want.
wait
#![allow(unused)] fn main() { async fn wait_say() { tokio::spawn(say("Hello", 2)); tokio::spawn(say("Hi", 1)); tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } }
In this case, after we launch the asynchronous functions calls using spawn we explicitely wait 4 seconds. In this case too both function calls start at the same time. The calling function waits long enough to see their output.
The total elapsed time is 4 seconds because we explicitely waited for 4 second. In reality, knowing the code we could have waited for 2 seconds only and most likely both function calls would have finished, but we can't be sure. Maybe we would want to wait for 2.1 just to make sure and even then we might miss the second call if for some reason it is stuck before it finishes.
Not ideal as we have to guess how long to wait and even with that we can never be really sure.
join
#![allow(unused)] fn main() { async fn join_say() { tokio::join!( say("Hello", 2), say("Hi", 1), ); } }
Here, we use the join macro that will spawn the 2 tasks and wait for both of them to finish. Both functions start at the same time. One of them finishes after 1 second, and the other one after 2 seconds. The total elapsed time is 2 seconds.
This is much better than what we had previously, but the drawback is that we can do this only if we know all the tasks at compile time.
join_set
#![allow(unused)] fn main() { async fn join_set_say() { let mut tasks = tokio::task::JoinSet::new(); tasks.spawn(say("Hello", 2)); tasks.spawn(say("Hi", 1)); tasks.join_all().await; } }
Using JoinSet we can spawn any number of tasks dynamically and wait for all of them to finish. The functions start as soon as we add them to the JoinSet and finish when they finish. The join_all method will wait till all of them finishes.
In our case the total elapsed time is 2 seconds.
async fn say(text: &str, sec: u64) { tokio::time::sleep(tokio::time::Duration::from_secs(sec)).await; println!("{text}"); } async fn call_say() { let _ = say("Hello", 2); let _ = say("Hi", 1); } async fn await_say() { say("Hello", 2).await; say("Hi", 1).await; } async fn spawn_say() { tokio::spawn(say("Hello", 2)); tokio::spawn(say("Hi", 1)); } async fn wait_say() { tokio::spawn(say("Hello", 2)); tokio::spawn(say("Hi", 1)); tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } async fn join_say() { tokio::join!( say("Hello", 2), say("Hi", 1), ); } async fn join_set_say() { let mut tasks = tokio::task::JoinSet::new(); tasks.spawn(say("Hello", 2)); tasks.spawn(say("Hi", 1)); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; tasks.join_all().await; } #[tokio::main] async fn main() { let which = get_args(); let start = std::time::Instant::now(); match which { Case::Call => call_say().await, Case::Await => await_say().await, Case::Spawn => spawn_say().await, Case::Wait => wait_say().await, Case::Join => join_say().await, Case::JoinSet => join_set_say().await, } let elapsed = start.elapsed(); println!("Elapsed: {:.2?}", elapsed); } #[derive(Debug, PartialEq)] enum Case { Call, Await, Spawn, Wait, Join, JoinSet, } impl std::str::FromStr for Case { type Err = String; fn from_str(input: &str) -> Result<Self, Self::Err> { match input { "call" => Ok(Case::Call), "await" => Ok(Case::Await), "spawn" => Ok(Case::Spawn), "wait" => Ok(Case::Wait), "join" => Ok(Case::Join), "join_set" => Ok(Case::JoinSet), _ => Err(format!("Invalid case: {}", input)), } } } fn get_args() -> Case { let args: Vec<String> = std::env::args().collect(); if args.len() < 2 { eprintln!("Usage: {} <which>", args[0]); std::process::exit(1); } let which = args[1].parse().unwrap_or_else(|err| { eprintln!("Error: {}", err); std::process::exit(1); }); which }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
- tokio::main
- async
- await
- tokio::spawn
- tokio::join!
- tokio::task:JoinSet
- JoinSet.spawn
- JoinSet.join_all
JoinSet scheduling
Earlier it was mentioned that a task starts running when it is added to a JoinSet using the spawn
function, but I wanted to have an example to really see it.
So in this example we launch to tasks using spawn
and then we call sleep
to let some time pass. Looking at the output you can see that faster running task
(the one that uses a 1 second sleep as a fake wait for IO) finished before the first 2-second sleep is over.
Then we sleep another 2 seconds and by that time the task that takes 3 seconds has also finished.
async fn say(text: &str, sec: u64) { tokio::time::sleep(tokio::time::Duration::from_secs(sec)).await; println!("{text}"); } #[tokio::main] async fn main() { let start = std::time::Instant::now(); println!("start"); let mut tasks = tokio::task::JoinSet::new(); tasks.spawn(say("Hello", 3)); tasks.spawn(say("Hi", 1)); println!("launched both"); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; println!("waited"); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; println!("waited again"); tasks.join_all().await; println!("done"); let elapsed = start.elapsed(); println!("Elapsed: {:.2?}", elapsed); }
The output
start
launched both
Hi
waited
Hello
waited again
done
Elapsed: 4.00s
Cargo.toml
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
- JoinSet
- spawn
- join_all
Count in parallel with random sleep
TODO: It was mentioned to me that rand is blocking and thus this code is problematic. I need to understand that better and update the example if necessary.
#[tokio::main] async fn main() { println!("Start"); tokio::join!( count("a", 10), count("b", 10), ); println!("End"); } async fn count(which: &str, n: u32) { for i in 0..n { print(which,i).await; } } async fn print(which: &str, n: u32) { let random_int = rand::random::<u64>() % 11; tokio::time::sleep(tokio::time::Duration::from_millis(random_int)).await; println!("{which} {n} random: {random_int}"); }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }
Process and Ctrl-c
use tokio::select; #[tokio::main] async fn main() { let mut count = 0; loop { select! { _ = wait_a_bit() => println!("Waited a bit!"), _ = tokio::signal::ctrl_c() => { count += 1; if count >= 2 { println!("Exiting..."); break; } println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?"); } } } } async fn wait_a_bit() { println!("Waiting a bit..."); tokio::time::sleep(std::time::Duration::from_secs(1)).await; }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
CPU heavy Fibonacci
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
Start
Before block
Start to do something
Start reading
End reading 801 bytes
End to do something
End
use tokio::io::AsyncReadExt; async fn do_something() { println!("Start to do something"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; println!("End to do something"); } fn fib(n: u32) -> u32 { if n == 0 || n == 1 { return 1; } fib(n-1) + fib(n-2) } async fn read_file() { println!("Start reading"); let mut fh = tokio::fs::File::open("src/main.rs").await.unwrap(); let mut content = vec![]; fh.read_to_end(&mut content).await.unwrap(); println!("End reading {} bytes", content.len()); let n = 40; let res = fib(n); println!("fib({n}) = {res}"); } async fn run() { //do_something().await; //read_file().await; tokio::join!( do_something(), read_file(), ); } fn main() { println!("Start"); let rt = tokio::runtime::Runtime::new().unwrap(); let future = run(); println!("Before block"); rt.block_on(future); println!("End"); }
Fake async and CPU heavy task
#[tokio::main] async fn main() { let start = std::time::Instant::now(); println!("Start"); let mut tasks = tokio::task::JoinSet::new(); for n in 1..=40 { tasks.spawn(fake_async_work(format!("task-{n}"))); } for n in 1..=40 { tasks.spawn(fake_cpu_heavy_work(format!("task-{n}"))); } while let Some(res) = tasks.join_next().await { match res { Ok(_) => {} //println!("Task finished successfully"), Err(e) => println!("Task failed: {e}"), } } println!("End"); let elapsed = start.elapsed(); println!("Elapsed time: {:.2?}", elapsed); } async fn fake_async_work(name: String) { println!("Simulate some asynchronous work {name} {}", std::thread::current().name().unwrap_or("unknown")); tokio::time::sleep(std::time::Duration::from_secs(1)).await; println!("Done with async work {name}"); } async fn fake_cpu_heavy_work(name: String) { println!("Simulate some CPU-heavy work {name}"); std::thread::sleep(std::time::Duration::from_secs(1)); println!("Done with CPU-heavy work {name}"); }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
async read file using join!
use tokio::io::AsyncReadExt; #[tokio::main] async fn main() { println!("Start"); let future = run(); println!("Before await"); future.await; println!("End"); } async fn run() { // sequentially do_something().await; read_file().await; // concurrently // tokio::join!( // do_something(), // read_file(), // ); } async fn do_something() { println!("Start to do something"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; println!("End to do something"); } async fn read_file() { println!("Start reading"); let mut fh = tokio::fs::File::open("src/main.rs").await.unwrap(); let mut content = vec![]; fh.read_to_end(&mut content).await.unwrap(); println!("End reading {} bytes", content.len()); }
Sequentially
Start
Before await
Start to do something
End to do something
Start reading
End reading 780 bytes
End
concurrently with join!
Start
Before await
Start to do something
End to do something
Start reading
End reading 780 bytes
End
Cargo.toml
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
Async tail -f
use std::{env, path::PathBuf, time::Duration}; use tokio::fs::File; use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; use tokio::time::sleep; #[tokio::main] async fn main() { let filename = get_filename(); tail_file(filename).await; } async fn tail_file(filename: PathBuf) { let mut file = File::open(&filename).await.expect("Failed to open file"); file.seek(SeekFrom::End(0)).await.expect("Failed to seek"); loop { //sleep(Duration::from_millis(500)).await; let mut reader = BufReader::new(&mut file); let mut buf = String::new(); while reader .read_line(&mut buf) .await .expect("Failed to read line") > 0 { print!("{}", buf); buf.clear(); } } } fn get_filename() -> PathBuf { let args: Vec<String> = env::args().collect(); if args.len() != 2 { eprintln!("Usage: {} <filename>", args[0]); std::process::exit(1); } PathBuf::from(&args[1]) }
[package]
name = "async-tail"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1", features = ["full"] }
Async multi-tail -f
use std::env; use std::path::PathBuf; use tokio::fs::File; use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; use tokio::time::{Duration, sleep}; async fn tail_file(path: PathBuf) { println!("Tailing file: {}", path.display()); let mut file = match File::open(&path).await { Ok(f) => f, Err(e) => { eprintln!("Failed to open {}: {}", path.display(), e); return; } }; file.seek(SeekFrom::End(0)).await.expect("Failed to seek"); loop { sleep(Duration::from_millis(50)).await; let mut reader = BufReader::new(&mut file); let mut buf = String::new(); while reader .read_line(&mut buf) .await .expect("Failed to read line") > 0 { print!("{} {}", path.file_name().unwrap().to_string_lossy(), buf); buf.clear(); } } } fn get_filenames() -> Vec<PathBuf> { let args: Vec<String> = env::args().skip(1).collect(); if args.is_empty() { eprintln!("Usage: async-multi-tail <file1> <file2> ..."); return Vec::new(); } let paths: Vec<PathBuf> = args.into_iter().map(PathBuf::from).collect(); paths } #[tokio::main] async fn main() { let file_paths = get_filenames(); let mut handles = Vec::new(); for file_path in file_paths { handles.push(tokio::spawn(tail_file(file_path))); } for handle in handles { let _ = handle.await; } }
[package]
name = "async-multi-tail"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1", features = ["fs", "io-util", "macros", "rt-multi-thread", "time"] }
Async tail with notify
use std::{env, fs::File, io::{BufRead, BufReader, Seek, SeekFrom}, path::PathBuf}; use tokio::sync::mpsc; use notify::{Watcher, RecursiveMode, EventKind, RecommendedWatcher}; #[tokio::main] async fn main() { let args: Vec<String> = env::args().collect(); if args.len() != 2 { eprintln!("Usage: {} <filename>", args[0]); std::process::exit(1); } let filename = PathBuf::from(&args[1]); let (tx, mut rx) = mpsc::channel(1); let tx2 = tx.clone(); // Set up file watcher let mut watcher = RecommendedWatcher::new( move |res: Result<notify::Event, notify::Error>| { if let Ok(event) = res { if matches!(event.kind, EventKind::Modify(_)) { let _ = tx2.try_send(()); } } }, notify::Config::default() ).expect("Failed to create watcher"); watcher.watch(&filename, RecursiveMode::NonRecursive).expect("Failed to watch file"); // Open file and seek to end let mut file = File::open(&filename).expect("Failed to open file"); let mut pos = file.seek(SeekFrom::End(0)).expect("Failed to seek"); loop { // Wait for a file change event rx.recv().await; // Re-open file and seek to last position let mut file = File::open(&filename).expect("Failed to open file"); file.seek(SeekFrom::Start(pos)).expect("Failed to seek"); let mut reader = BufReader::new(file); let mut buf = String::new(); while reader.read_line(&mut buf).expect("Failed to read line") > 0 { print!("{}", buf); buf.clear(); } pos = reader.stream_position().expect("Failed to get position"); } }
[package]
name = "async-tail"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1", features = ["full"] }
notify = "6"
Join several tasks
use tokio::time::{Duration, sleep}; #[tokio::main] async fn main() { let a = wait_random_time(String::from("a")); let b = wait_random_time(String::from("b")); tokio::join!(a, b); println!("---"); let n = 3; let mut tasks = tokio::task::JoinSet::new(); for i in 0..n { let name = format!("x-{i}"); let task = wait_random_time(name); let _ = tasks.spawn(task); } while let Some(res) = tasks.join_next().await { match res { Ok(_) => {} //println!("Task finished successfully"), Err(e) => println!("Task failed: {e}"), } } } async fn wait_random_time(name: String) { let random_int = rand::random::<u64>() % 100; println!("{name} waiting for {random_int} milliseconds"); sleep(Duration::from_millis(random_int)).await; }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }
Read STDIN Async
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::select; #[tokio::main] async fn main() { loop { select! { input = read_user_input() => { println!("User input received: {}", input); }, _ = tokio::signal::ctrl_c() => { println!("Received Ctrl+C! Exiting..."); break; }, } println!("After select"); } println!("After loop"); std::process::exit(0); } async fn read_user_input() -> String { let mut reader = BufReader::new(tokio::io::stdin()); println!("Type something and press Enter (async):"); let mut input = String::new(); reader.read_line(&mut input).await.unwrap(); input }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
sync vs async manually
fn sync_func() -> u32 { println!("sync_func called"); 42 } async fn async_func() -> u32 { println!("async_func called"); 42 } fn main() { println!("Start"); let rt = tokio::runtime::Runtime::new().unwrap(); let value = sync_func(); println!("Value: {}", value); let future = async_func(); println!("Before block"); let res =rt.block_on(future); println!("Result: {}", res); println!("End"); }
Start
sync_func called
Value: 42
Before block
async_func called
Result: 42
End
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
Simple async example with Tokio
- We have function called
do_something
that fakes real work by sleeping a bit. It is marked as an async function. - In the
main
function we setup the Runtime. - Calling
do_something()
does not actually execute it. It returns a Future.
async fn do_something() { println!("Start to do something"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; println!("End to do something"); } fn main() { println!("Start"); let rt = tokio::runtime::Runtime::new().unwrap(); let future = do_something(); println!("Before block"); rt.block_on(future); println!("End"); }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
Start
Before block
Start to do something
End to do something
End
JoinSet random order of completition
We can combine several tasks in a JoinSet and then we can calle join_all to wait for all of them to finish that might be in any order.
#[tokio::main] async fn main() { simple_logger::SimpleLogger::new().init().unwrap(); log::info!("Start"); let mut tasks = tokio::task::JoinSet::new(); tasks.spawn(async move { log::info!("Task long starts"); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; log::info!("Task long is done"); }); std::thread::sleep(std::time::Duration::from_secs(1)); log::info!("Long task started"); for i in 0..5 { let time = rand::random::<u64>() % 100; log::info!("Task {i} setup for {time}"); tasks.spawn(async move { log::info!("Task {i} starts"); tokio::time::sleep(tokio::time::Duration::from_millis(time)).await; log::info!("Task {i} is done"); }); } log::info!("All tasks started"); std::thread::sleep(std::time::Duration::from_secs(1)); log::info!("Wait done"); tasks.join_all().await; log::info!("End"); }
2025-09-17T12:07:02.366Z INFO [demo] Start
2025-09-17T12:07:02.366Z INFO [demo] Task long starts
2025-09-17T12:07:03.366Z INFO [demo] Long task started
2025-09-17T12:07:03.366Z INFO [demo] Task 0 setup for 95
2025-09-17T12:07:03.366Z INFO [demo] Task 1 setup for 87
2025-09-17T12:07:03.366Z INFO [demo] Task 2 setup for 97
2025-09-17T12:07:03.366Z INFO [demo] Task 3 setup for 77
2025-09-17T12:07:03.366Z INFO [demo] Task 4 setup for 87
2025-09-17T12:07:03.366Z INFO [demo] All tasks started
2025-09-17T12:07:03.366Z INFO [demo] Task 0 starts
2025-09-17T12:07:03.366Z INFO [demo] Task 1 starts
2025-09-17T12:07:03.366Z INFO [demo] Task 3 starts
2025-09-17T12:07:03.366Z INFO [demo] Task 4 starts
2025-09-17T12:07:03.366Z INFO [demo] Task 2 starts
2025-09-17T12:07:03.445Z INFO [demo] Task 3 is done
2025-09-17T12:07:03.454Z INFO [demo] Task 4 is done
2025-09-17T12:07:03.454Z INFO [demo] Task 1 is done
2025-09-17T12:07:03.463Z INFO [demo] Task 0 is done
2025-09-17T12:07:03.465Z INFO [demo] Task 2 is done
2025-09-17T12:07:04.366Z INFO [demo] Wait done
2025-09-17T12:07:04.367Z INFO [demo] Task long is done
2025-09-17T12:07:04.367Z INFO [demo] End
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
log = "0.4.28"
rand = "0.9.2"
simple_logger = "5.0.0"
tokio = { version = "1.47.1", features = ["full"] }
Basic TCP echo server
Based on the example in the README of tokio.
Start with:
cargo run
From another terminal:
telnet localhost 8080
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; /// Applies ROT13 transformation to a u8 slice. /// Only ASCII letters are rotated; other bytes are unchanged. pub fn rot13(input: &[u8]) -> Vec<u8> { input .iter() .map(|&b| match b { b'a'..=b'z' => (b - b'a' + 13) % 26 + b'a', b'A'..=b'Z' => (b - b'A' + 13) % 26 + b'A', _ => b, }) .collect() } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { println!("Waiting for a connection..."); let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; 1024]; loop { println!("Getting data..."); let n = match socket.read(&mut buf).await { // socket closed Ok(0) => return, Ok(n) => n, Err(e) => { eprintln!("failed to read from socket; err = {:?}", e); return; } }; let received_text = String::from_utf8_lossy(&buf[0..n]); println!("Received text: {}", received_text); // Write the data back let encripted = rot13(&buf[0..n]); if let Err(e) = socket.write_all(&encripted[0..n]).await { eprintln!("failed to write to socket; err = {:?}", e); return; } } }); } }
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
Download many pages blocking http-reqwest
- Using the reqwest crate in blocking mode.
- 7 seconds
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"
[dependencies]
reqwest = { version = "0.12.23", features = ["blocking"] }
use std::fs::File; use std::io::{BufRead, BufReader}; use reqwest::Error; fn download_page(url: &str) -> Result<String, Error> { let response = reqwest::blocking::get(url)?; let content = response.text()?; Ok(content) } fn read_urls_from_file(path: &str) -> Vec<String> { let file = File::open(path).expect("Failed to open file"); let reader = BufReader::new(file); reader .lines() .filter_map(|line| { let line = line.ok()?.trim().to_string(); if line.is_empty() || line.starts_with('#') { None } else { Some(line) } }) .collect() } fn main() { let start = std::time::Instant::now(); let args = std::env::args().collect::<Vec<String>>(); if args.len() < 2 { eprintln!("Usage: {} <urls_file>", args[0]); std::process::exit(1); } let urls_file = &args[1]; let urls = read_urls_from_file(urls_file); println!("URLs: {:?}", urls); for url in &urls { match download_page(url) { Ok(content) => { println!("Downloaded {} bytes from {}", content.len(), url); } Err(e) => println!("Error downloading {}: {}", url, e), } } let elapsed = start.elapsed(); println!("Elapsed time: {:.2?}", elapsed); }
Download many pages bad async
I think this code was generated by some AI, and while it uses the async constructs in reality it sends the requests sequentially.
use std::fs::File; use std::io::{BufRead, BufReader}; use reqwest::Error; async fn download_page(url: &str) -> Result<String, Error> { let response = reqwest::get(url).await?; let content = response.text().await?; Ok(content) } fn read_urls_from_file(path: &str) -> Vec<String> { let file = File::open(path).expect("Failed to open file"); let reader = BufReader::new(file); reader .lines() .filter_map(|line| { let line = line.ok()?.trim().to_string(); if line.is_empty() || line.starts_with('#') { None } else { Some(line) } }) .collect() } #[tokio::main] async fn main() { let start = std::time::Instant::now(); let args = std::env::args().collect::<Vec<String>>(); if args.len() < 2 { eprintln!("Usage: {} <urls_file>", args[0]); std::process::exit(1); } let urls_file = &args[1]; let urls = read_urls_from_file(urls_file); println!("URLs: {:?}", urls); for url in &urls { match download_page(url).await { Ok(content) => { println!("Downloaded {} bytes from {}", content.len(), url); } Err(e) => println!("Error downloading {}: {}", url, e), } } let elapsed = start.elapsed(); println!("Elapsed time: {:.2?}", elapsed); }
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"
[dependencies]
reqwest = "0.12.23"
tokio = { version = "1.47.1", features = ["full"] }
Download many pages async
use std::fs::File; use std::io::{BufRead, BufReader}; use reqwest::Error; fn get_urls_file() -> String { let args = std::env::args().collect::<Vec<String>>(); if args.len() < 2 { eprintln!("Usage: {} <urls_file>", args[0]); std::process::exit(1); } let urls_file = &args[1]; urls_file.to_string() } fn read_urls_from_file(path: &str) -> Vec<String> { let file = File::open(path).expect("Failed to open file"); let reader = BufReader::new(file); reader .lines() .filter_map(|line| { let line = line.ok()?.trim().to_string(); if line.is_empty() || line.starts_with('#') { None } else { Some(line) } }) .collect() } async fn download_page(url: &str) -> Result<String, Error> { let response = reqwest::get(url).await?; let content = response.text().await?; Ok(content) } async fn downlod_many_urls(urls: &[String]) { let mut tasks = tokio::task::JoinSet::new(); for url in urls { println!("Downloading: {url}"); tasks.spawn({ let url = url.clone(); async move { match download_page(&url).await { Ok(content) => { println!("Downloaded {} bytes from {url}", content.len()); } Err(err) => println!("Error downloading {url}: {err}"), } } }); } println!("Started {} tasks. Waiting...", tasks.len()); tasks.join_all().await; } #[tokio::main] async fn main() { let start = std::time::Instant::now(); let urls_file = get_urls_file(); let urls = read_urls_from_file(&urls_file); println!("URLs: {urls:?}"); downlod_many_urls(&urls).await; let elapsed = start.elapsed(); println!("Elapsed time: {elapsed:.2?}"); }
- Takes 0.3-0.5 sec to the run this.
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"
[dependencies]
reqwest = "0.12.23"
tokio = { version = "1.47.1", features = ["full"] }
Crawler
use reqwest::Error; use scraper::{Html, Selector}; fn get_url() -> String { let args = std::env::args().collect::<Vec<String>>(); if args.len() < 2 { eprintln!("Usage: {} <urls_file>", args[0]); std::process::exit(1); } let url = &args[1]; url.to_string() } async fn download_page(url: String) -> Result<(String, String), Error> { let response = reqwest::get(&url).await?; let content = response.text().await?; Ok((url, content)) } async fn crawl(url: &str, limit: u32) { let mut tasks = tokio::task::JoinSet::new(); println!("Downloading: {url}"); let mut count = 1; tasks.spawn({ let url = url.to_string(); async move { download_page(url).await } }); println!("Started tasks. Waiting..."); while !tasks.is_empty() { let result = tasks.join_next().await; match result { Some(Ok(content)) => { match content { Ok((url, html)) => { println!("Downloaded {} bytes from {url}", html.len()); parse_links(&html).iter().for_each(|link| { if link.starts_with("http://") { //println!("Found unsecure: {link}"); } if link.starts_with("https://") { //println!("Found secure link: {link}"); if count < limit { count += 1; tasks.spawn({ let link = link.to_string(); async move { download_page(link).await } }); } } // internal links }); } Err(err) => println!("Error downloading {url}: {err}"), } } Some(Err(err)) => { println!("Task failed: {err}"); } None => break, } } } #[tokio::main] async fn main() { let start = std::time::Instant::now(); let url = get_url(); println!("Staring with: {url}"); crawl(&url, 5).await; let elapsed = start.elapsed(); println!("Elapsed time: {elapsed:.2?}"); } fn parse_links(html: &str) -> Vec<String> { let document = Html::parse_document(html); let selector = Selector::parse("a[href]").unwrap(); document .select(&selector) .filter_map(|element| element.value().attr("href")) .map(|href| href.to_string()) .collect() }
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"
[dependencies]
reqwest = "0.12.23"
scraper = "0.24.0"
tokio = { version = "1.47.1", features = ["full"] }
Demo
Demo 1
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }
#[tokio::main] async fn main() { let result = multi_step(0).await; println!("Final result: {}", result); } async fn multi_step(input: u64) -> u64 { let step1 = get_data(input).await; let step2 = get_data(step1).await; let step3 = get_data(step2).await; step3 } async fn get_data(input: u64) -> u64 { // pretend we get some data from the network let random_int = rand::random::<u64>() % 11; tokio::time::sleep(tokio::time::Duration::from_millis(random_int)).await; input + random_int }
Demo 2
[package]
name = "demo"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
use tokio::fs; use tokio::time::{sleep, Duration}; async fn has_server_log_changed(mut last_modified: Option<std::time::SystemTime>) -> Option<std::time::SystemTime> { match fs::metadata("server.log").await { Ok(metadata) => { let modified = metadata.modified().ok()?; if let Some(last) = last_modified { if modified > last { println!("server.log has changed!"); } } Some(modified) }, Err(_) => { println!("server.log not found"); None } } } // Example usage: poll for changes every second // async fn poll_server_log_changes() { // let mut last_modified = None; // loop { // last_modified = has_server_log_changed(last_modified).await; // sleep(Duration::from_secs(1)).await; // } // } use tokio::select; use tokio::io::{AsyncBufReadExt, BufReader}; #[tokio::main] async fn main() { let mut last_modified = None; let mut quit = false; loop { select! { input = read_user_input() => { println!("User input received: {}", input); }, _ = tokio::signal::ctrl_c() => { println!("Received Ctrl+C! Exiting..."); quit = true; break; }, new_modified = has_server_log_changed(last_modified) => { if let Some(modified) = new_modified { if let Some(last) = last_modified { if modified > last { println!("changed"); } } last_modified = Some(modified); } } } println!("After select"); // Add a short sleep to avoid busy looping tokio::time::sleep(std::time::Duration::from_secs(1)).await; } println!("After loop"); std::process::exit(0); } async fn read_user_input() -> String{ let mut reader = BufReader::new(tokio::io::stdin()); println!("Type something and press Enter (async):"); let mut input = String::new(); reader.read_line(&mut input).await.unwrap(); input } // #[tokio::main] // async fn main() { // tokio::spawn(run_callback_in_3_seconds()); // // _ = tokio::signal::ctrl_c() => { // // if quit { // // println!("Exiting..."); // // break; // // } // // quit = true; // // println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?"); // // }, // loop { // wait_a_bit().await; // } // // let mut quit = false; // // select! { // // _ = do_some_work() => println!("done"), // // _ = tokio::signal::ctrl_c() => { // // if quit { // // println!("Exiting..."); // // break; // // } // // quit = true; // // println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?"); // // }, // // _ = run_callback_in_3_seconds() => { // // quit = false; // // } // // } // } async fn run_callback_in_3_seconds() { println!("Setup a callback in 3 seconds..."); tokio::time::sleep(std::time::Duration::from_secs(3)).await; // Place your callback logic here std::process::exit(0); } async fn do_some_work() { loop { println!("Doing some work..."); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } async fn wait_a_bit() { println!("Waiting a bit..."); tokio::time::sleep(std::time::Duration::from_secs(1)).await; }
Resources
-
Crust of Rust: async/await by Jon Gjengset - 2:30 hours long video from 2021.09.01
-
Getting started with Tokio. The ultimate starter guide to writing async Rust by Dreams of Code 7 min video from 2023.03.02
-
RustLatam 2019 - Without Boats: Zero-Cost Async IO 32 min video from 2019.04.22
-
Protohackers Networking challenges
-
https://crates.io/crates/notify
-
https://github.com/Sherlock-Holo/async-notify
-
https://crates.io/crates/inotify
-
Learning Async Rust With Entirely Too Many Web Servers from 2023.08.11
-
Actors with Tokio 2021.02.12
-
Stop Worrying and Learn to Loop-Select. 2025.02.21
-
More Actors with Tokio. 2025.02.19
Other runtimes:
- Embasy.dev for RTOS
- glommio
- https://crates.io/crates/smol
- https://crates.io/crates/futures