Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Rust Async

async

  • Especially for IO intensive applications

  • Reading from a file, but many disks actually cannot do parallel reading or writing so there is not much gain. Unless we have several disks in the computer or unless the disks are mounted from some other devices via nfs or Samba.

  • Network client operations such as downloading pages from the Internet or accessing APIs through the network.

  • Network server operations where our system needs to handle multiple operations that involve network operations.

  • Accessing databases (via the network), which in reality is just accessing APIs over some protocol. (Which is probably not http.)

  • context switching

  • Cooperative multitasking vs Pre-emptive multitasking (interleaving or time-slicing)

    • Even in the pre-emptive case a thread can give up control by waiting for some input (network) or by calling sleep.
  • Using async needs a runtime aka. main loop. There are several crates providing async runtime and tokio is the most popular one.

Asynchronous cooking

Let's make some mashed potatoes

  1. Take a pot fill it with water and put it on the stove. Turn on the stove.
  2. Sit next to it and wait till the water boils.
  3. Take out some potatoes, peal them, wash them, and cut them into little pieces.
  4. Put the potato in the water on the stove.
  5. Sit next to it till the potato becomes soft.
  6. Take out the potato and smash it.

You might have a better recipe, but I guess this covers it. However, most likely you will notice that there are two steps where you basically sit idle watching the water to boil.

Even if you don't have anything else to cook or anything else to do you could finish this job sooner.

You could take step 3 (pealing the potatoes) and do it while the water is warming up.

You could also take the time you spent waiting for the potato to be cook in step 5 to do something else. Cook some other meal, fill the washing machine, etc.

Basically you could take the time you (the CPU) is waiting for some external task to complete, and do something else.

This is basically asynchronous cooking.

tokio

Before we delve into the learning of async and tokio, let's see the results of two examples.

One of them is an HTTP client to download many pages.

The other is an HTTP server to serve many pages.

For both we have a sync and async version.

  • If we run the clients downloading many external pages we can observe the speed improvement from sync to async. In this case we don't really care how those servers handle the requests.

  • If we run the servers we either need our sync and async clients to demonstrate, or we can launch some other program to observer the results.

Hello World in async

Let's see the standard "Hello World!" example using async.

Just remember, in this example, although we have the async code it does not actually do anything asynchronous. It only shows the basic syntax, the basic mechanism.

First we create a new crate called demo and change to its directory.

cargo new demo
cd demo

In order to write async code using the tokio crate we need to add it to our project:

cargo add tokio -F full

Alternatively we can manually edit the Cargo.toml file to add the dependency:

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

We use the full feature here as that makes it easy to use all the features. Later you might fine-tune which features to include. However, full is a good start.

The code

We prefix our async functions with the async keyword and we decorate the main function with #[tokio::main] to convert that function to be our runtime. Actually in reality the real main function cannot be async and using this small syntactic sugar tokio will wrap our main function with a real main function, but let's not worry about it now.

There are other ways to use tokio, but this is probably the most common and also the easiest way as well.

We also need to add the await at the end of our async function calls to make them actually work.

#[tokio::main]
async fn main() {
    println!("Hello, world!");

    hello_world().await;
}

async fn hello_world() {
    println!("Hello, async world!");
}

Running the code

There is nothing special in running the program we use the regular cargo run with the silencer:

$ cargo run -q
Hello, world!
Hello, async world!

  • tokio::main
  • async
  • await

sync vs async

Before we continue down the rabbit whole of async, let's see the difference in the usage of regular (synchronous) and async functions.

In this example we have a regular (synchronous) function brilliantly named sync_func and and an async function called async_func. You can observe that the only difference between these two functions in the async prefix on the async function. This won't be the case once we really start to explore async, but for this example we want to have two functions that behave identically once they start to run.

The question what is the difference in the way we call these functions?

We sprinkled the main function with print-statements so we'll see what gets executed when.

When we call the sync_func() it gets executed immediately and returns 42, the value that the function returns.

When we call the async_func() it returns a Future, that is something that has the Future trait. At this point the body of the function is not executed and thus we can observe that nothing is printed between "Before calling async_func" and "Before await".

We can then write future.await. It is not really a function call as one can observe by the lack of parentheses () at the end of of await. It is an instruction to the async runtime (that is tokio in our case) to stop execution the current function and let some other task running. In our case the only other task that was already initiated is the content of the async_func. So at this point it will get executed.

The expression with the await will terminate when the async_func ends and it will return whatever the async_func returns. In this case 42.

fn sync_func() -> u32 {
    println!("sync_func called");
    42
}

async fn async_func() -> u32 {
    println!("async_func called");
    42
}

#[tokio::main]
async fn main() {
    println!("Start");
    let value = sync_func();
    println!("Value: {}", value);

    println!("Before calling async_func");
    let future = async_func();

    println!("Before await");
    let res = future.await;
    println!("Result: {}", res);

    println!("End");
}

The output:

Start
sync_func called
Value: 42
Before calling async_func
Before await
async_func called
Result: 42
End

Cargo.toml

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

  • tokio::main
  • async
  • await
  • Future

Run async functions

What happens when we call async functions?

In order to demonstrate this we have a function called say that gets some text and a number. It sleeps for the given number of seconds using an asynchronous sleep. This imitates some IO operation our code waits for. In a real application instead of that sleep we might have a call to download a page via HTTP, we might access an API, or a database. One the waiting is done we print the string so we can observe the order of what happens.

Then we have a number of function each one calling the say function with the exact same parameters. We'll observe what happens in each case.

If you'd like to see the behaviour yourself you can run the program using cargo run and providing one of the functions as argument:

cargo run call

call

#![allow(unused)]
fn main() {
async fn call_say() {
    let _ = say("Hello", 2);
    let _ = say("Hi", 1);
}
}

In the call_say case we just call say twice but don't await the functions, they don't run at all and Rust even warns us about it:

note: futures do nothing unless you .await or poll them

The total elapsed time is effectively 0 seconds.

await

#![allow(unused)]
fn main() {
async fn await_say() {
    say("Hello", 2).await;
    say("Hi", 1).await;
}
}

The first call waits for 2 second and prints then the second call waits for 1 second and prints. Althought we see all the bells and whistles of async code, in reality the calls run sequentially.

The total elapsed time is 3 seconds.

spawn

#![allow(unused)]
fn main() {
async fn spawn_say() {
    tokio::spawn(say("Hello", 2));
    tokio::spawn(say("Hi", 1));
}
}

In this case we hand over the await-ing to the tokio runtime using the spawn function.

Both functions start to run at the same time, however, our program might finished before the functions can print anything. This is the case in our example. The total elapsed time is 0 seconds and nothing is printed. This also means that our program does not wait for the (faked) remote call to return. Probably not what we want.

wait

#![allow(unused)]
fn main() {
async fn wait_say() {
    tokio::spawn(say("Hello", 2));
    tokio::spawn(say("Hi", 1));
    tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}
}

In this case, after we launch the asynchronous functions calls using spawn we explicitely wait 4 seconds. In this case too both function calls start at the same time. The calling function waits long enough to see their output.

The total elapsed time is 4 seconds because we explicitely waited for 4 second. In reality, knowing the code we could have waited for 2 seconds only and most likely both function calls would have finished, but we can't be sure. Maybe we would want to wait for 2.1 just to make sure and even then we might miss the second call if for some reason it is stuck before it finishes.

Not ideal as we have to guess how long to wait and even with that we can never be really sure.

join

#![allow(unused)]
fn main() {
async fn join_say() {
    tokio::join!(
        say("Hello", 2),
        say("Hi", 1),
    );
}
}

Here, we use the join macro that will spawn the 2 tasks and wait for both of them to finish. Both functions start at the same time. One of them finishes after 1 second, and the other one after 2 seconds. The total elapsed time is 2 seconds.

This is much better than what we had previously, but the drawback is that we can do this only if we know all the tasks at compile time.

join_set

#![allow(unused)]
fn main() {
async fn join_set_say() {
    let mut tasks = tokio::task::JoinSet::new();
    tasks.spawn(say("Hello", 2));
    tasks.spawn(say("Hi", 1));
    tasks.join_all().await;
}
}

Using JoinSet we can spawn any number of tasks dynamically and wait for all of them to finish. The functions start as soon as we add them to the JoinSet and finish when they finish. The join_all method will wait till all of them finishes.

In our case the total elapsed time is 2 seconds.

async fn say(text: &str, sec: u64) {
    tokio::time::sleep(tokio::time::Duration::from_secs(sec)).await;
    println!("{text}");
}

async fn call_say() {
    let _ = say("Hello", 2);
    let _ = say("Hi", 1);
}

async fn await_say() {
    say("Hello", 2).await;
    say("Hi", 1).await;
}

async fn spawn_say() {
    tokio::spawn(say("Hello", 2));
    tokio::spawn(say("Hi", 1));
}

async fn wait_say() {
    tokio::spawn(say("Hello", 2));
    tokio::spawn(say("Hi", 1));
    tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

async fn join_say() {
    tokio::join!(
        say("Hello", 2),
        say("Hi", 1),
    );
}

async fn join_set_say() {
    let mut tasks = tokio::task::JoinSet::new();
    tasks.spawn(say("Hello", 2));
    tasks.spawn(say("Hi", 1));
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    tasks.join_all().await;
}

#[tokio::main]
async fn main() {
    let which = get_args();
    let start = std::time::Instant::now();
    match which {
        Case::Call => call_say().await,
        Case::Await => await_say().await,
        Case::Spawn => spawn_say().await,
        Case::Wait => wait_say().await,
        Case::Join => join_say().await,
        Case::JoinSet => join_set_say().await,
    }

    let elapsed = start.elapsed();
    println!("Elapsed: {:.2?}", elapsed);
}

#[derive(Debug, PartialEq)]
enum Case {
    Call,
    Await,
    Spawn,
    Wait,
    Join,
    JoinSet,
}
impl std::str::FromStr for Case {
    type Err = String;

    fn from_str(input: &str) -> Result<Self, Self::Err> {
        match input {
            "call" => Ok(Case::Call),
            "await" => Ok(Case::Await),
            "spawn" => Ok(Case::Spawn),
            "wait" => Ok(Case::Wait),
            "join" => Ok(Case::Join),
            "join_set" => Ok(Case::JoinSet),
            _ => Err(format!("Invalid case: {}", input)),
        }
    }
}

fn get_args() -> Case {
    let args: Vec<String> = std::env::args().collect();
    if args.len() < 2 {
        eprintln!("Usage: {} <which>", args[0]);
        std::process::exit(1);
    }
    let which = args[1].parse().unwrap_or_else(|err| {
        eprintln!("Error: {}", err);
        std::process::exit(1);
    });

    which
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

  • tokio::main
  • async
  • await
  • tokio::spawn
  • tokio::join!
  • tokio::task:JoinSet
  • JoinSet.spawn
  • JoinSet.join_all

JoinSet scheduling

Earlier it was mentioned that a task starts running when it is added to a JoinSet using the spawn function, but I wanted to have an example to really see it.

So in this example we launch to tasks using spawn and then we call sleep to let some time pass. Looking at the output you can see that faster running task (the one that uses a 1 second sleep as a fake wait for IO) finished before the first 2-second sleep is over.

Then we sleep another 2 seconds and by that time the task that takes 3 seconds has also finished.

async fn say(text: &str, sec: u64) {
    tokio::time::sleep(tokio::time::Duration::from_secs(sec)).await;
    println!("{text}");
}


#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();
    println!("start");

    let mut tasks = tokio::task::JoinSet::new();
    tasks.spawn(say("Hello", 3));
    tasks.spawn(say("Hi", 1));
    println!("launched both");

    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    println!("waited");

    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    println!("waited again");

    tasks.join_all().await;
    println!("done");

    let elapsed = start.elapsed();
    println!("Elapsed: {:.2?}", elapsed);
}

The output

start
launched both
Hi
waited
Hello
waited again
done
Elapsed: 4.00s

Cargo.toml

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

  • JoinSet
  • spawn
  • join_all

Count in parallel with random sleep

TODO: It was mentioned to me that rand is blocking and thus this code is problematic. I need to understand that better and update the example if necessary.


#[tokio::main]
async fn main() {
    println!("Start");
    tokio::join!(
        count("a", 10),
        count("b", 10),
    );
    println!("End");
}

async fn count(which: &str, n: u32) {
    for i in 0..n {
        print(which,i).await;
    }
}


async fn print(which: &str, n: u32) {
    let random_int = rand::random::<u64>() % 11;
    tokio::time::sleep(tokio::time::Duration::from_millis(random_int)).await;
    println!("{which} {n} random: {random_int}");
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }

Process and Ctrl-c

use tokio::select;

#[tokio::main]
async fn main() {
    let mut count = 0;
    loop {
        select! {
            _ = wait_a_bit() => println!("Waited a bit!"),
            _ = tokio::signal::ctrl_c() => {
                    count += 1;
                    if count >= 2 {
                        println!("Exiting...");
                        break;
                    }
                    println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?");
            }
        }
    }
}

async fn wait_a_bit() {
    println!("Waiting a bit...");
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}




[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

CPU heavy Fibonacci

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
Start
Before block
Start to do something
Start reading
End   reading 801 bytes
End   to do something
End
use tokio::io::AsyncReadExt;

async fn do_something() {
    println!("Start to do something");
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    println!("End   to do something");
}

fn fib(n: u32) -> u32 {
    if n == 0 || n == 1 {
        return 1;
    }
    fib(n-1) + fib(n-2)
}

async fn read_file() {
    println!("Start reading");
    let mut fh = tokio::fs::File::open("src/main.rs").await.unwrap();
    let mut content = vec![];
    fh.read_to_end(&mut content).await.unwrap();
    println!("End   reading {} bytes", content.len());

    let n = 40;
    let res = fib(n);
    println!("fib({n}) = {res}");
}

async fn run() {
    //do_something().await;
    //read_file().await;
    tokio::join!(
        do_something(),
        read_file(),
    );
}

fn main() {
    println!("Start");
    let rt = tokio::runtime::Runtime::new().unwrap();
    let future = run();

    println!("Before block");
    rt.block_on(future);

    println!("End");
}

Fake async and CPU heavy task


#[tokio::main]
async fn main() {
      let start = std::time::Instant::now();

    println!("Start");
    let mut tasks = tokio::task::JoinSet::new();
    for n in 1..=40 {
        tasks.spawn(fake_async_work(format!("task-{n}")));
        
    }   
    for n in 1..=40 {
        tasks.spawn(fake_cpu_heavy_work(format!("task-{n}")));
    }

    while let Some(res) = tasks.join_next().await {
        match res {
            Ok(_) => {} //println!("Task finished successfully"),
            Err(e) => println!("Task failed: {e}"),
        }
    }

    println!("End");
    let elapsed = start.elapsed();
    println!("Elapsed time: {:.2?}", elapsed);

}



async fn fake_async_work(name: String) {
    println!("Simulate some asynchronous work {name} {}", std::thread::current().name().unwrap_or("unknown"));
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("Done with async work {name}");
}

async fn fake_cpu_heavy_work(name: String) {
    println!("Simulate some CPU-heavy work {name}");
    std::thread::sleep(std::time::Duration::from_secs(1));
    println!("Done with CPU-heavy work {name}");
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

async read file using join!

use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() {
    println!("Start");
    let future = run();

    println!("Before await");
    future.await;

    println!("End");
}

async fn run() {
    // sequentially
    do_something().await;
    read_file().await;

    // concurrently
    // tokio::join!(
    //     do_something(),
    //     read_file(),
    // );
}

async fn do_something() {
    println!("Start to do something");
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    println!("End   to do something");
}

async fn read_file() {
    println!("Start reading");
    let mut fh = tokio::fs::File::open("src/main.rs").await.unwrap();
    let mut content = vec![];
    fh.read_to_end(&mut content).await.unwrap();
    println!("End   reading {} bytes", content.len());
}

Sequentially

Start
Before await
Start to do something
End   to do something
Start reading
End   reading 780 bytes
End

concurrently with join!

Start
Before await
Start to do something
End   to do something
Start reading
End   reading 780 bytes
End

Cargo.toml

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.39.2", features = ["full"] }

Async tail -f

use std::{env, path::PathBuf, time::Duration};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let filename = get_filename();
    tail_file(filename).await;
}

async fn tail_file(filename: PathBuf) {
    let mut file = File::open(&filename).await.expect("Failed to open file");
    file.seek(SeekFrom::End(0)).await.expect("Failed to seek");

    loop {
        //sleep(Duration::from_millis(500)).await;
        let mut reader = BufReader::new(&mut file);
        let mut buf = String::new();
        while reader
            .read_line(&mut buf)
            .await
            .expect("Failed to read line")
            > 0
        {
            print!("{}", buf);
            buf.clear();
        }
    }
}

fn get_filename() -> PathBuf {
    let args: Vec<String> = env::args().collect();
    if args.len() != 2 {
        eprintln!("Usage: {} <filename>", args[0]);
        std::process::exit(1);
    }
    PathBuf::from(&args[1])
}
[package]
name = "async-tail"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1", features = ["full"] }

Async multi-tail -f

use std::env;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::time::{Duration, sleep};

async fn tail_file(path: PathBuf) {
    println!("Tailing file: {}", path.display());
    let mut file = match File::open(&path).await {
        Ok(f) => f,
        Err(e) => {
            eprintln!("Failed to open {}: {}", path.display(), e);
            return;
        }
    };
    file.seek(SeekFrom::End(0)).await.expect("Failed to seek");
    loop {
        sleep(Duration::from_millis(50)).await;
        let mut reader = BufReader::new(&mut file);
        let mut buf = String::new();
        while reader
            .read_line(&mut buf)
            .await
            .expect("Failed to read line")
            > 0
        {
            print!("{} {}", path.file_name().unwrap().to_string_lossy(), buf);
            buf.clear();
        }
    }
}

fn get_filenames() -> Vec<PathBuf> {
    let args: Vec<String> = env::args().skip(1).collect();
    if args.is_empty() {
        eprintln!("Usage: async-multi-tail <file1> <file2> ...");
        return Vec::new();
    }
    let paths: Vec<PathBuf> = args.into_iter().map(PathBuf::from).collect();

    paths
}

#[tokio::main]
async fn main() {
    let file_paths = get_filenames();
    let mut handles = Vec::new();
    for file_path in file_paths {
        handles.push(tokio::spawn(tail_file(file_path)));
    }
    for handle in handles {
        let _ = handle.await;
    }
}
[package]
name = "async-multi-tail"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1", features = ["fs", "io-util", "macros", "rt-multi-thread", "time"] }

Async tail with notify

use std::{env, fs::File, io::{BufRead, BufReader, Seek, SeekFrom}, path::PathBuf};
use tokio::sync::mpsc;
use notify::{Watcher, RecursiveMode, EventKind, RecommendedWatcher};

#[tokio::main]
async fn main() {
    let args: Vec<String> = env::args().collect();
    if args.len() != 2 {
        eprintln!("Usage: {} <filename>", args[0]);
        std::process::exit(1);
    }
    let filename = PathBuf::from(&args[1]);

    let (tx, mut rx) = mpsc::channel(1);
    let tx2 = tx.clone();

    // Set up file watcher
    let mut watcher = RecommendedWatcher::new(
        move |res: Result<notify::Event, notify::Error>| {
            if let Ok(event) = res {
                if matches!(event.kind, EventKind::Modify(_)) {
                    let _ = tx2.try_send(());
                }
            }
        },
        notify::Config::default()
    ).expect("Failed to create watcher");
    watcher.watch(&filename, RecursiveMode::NonRecursive).expect("Failed to watch file");

    // Open file and seek to end
    let mut file = File::open(&filename).expect("Failed to open file");
    let mut pos = file.seek(SeekFrom::End(0)).expect("Failed to seek");

    loop {
        // Wait for a file change event
        rx.recv().await;
        // Re-open file and seek to last position
        let mut file = File::open(&filename).expect("Failed to open file");
        file.seek(SeekFrom::Start(pos)).expect("Failed to seek");
        let mut reader = BufReader::new(file);
        let mut buf = String::new();
        while reader.read_line(&mut buf).expect("Failed to read line") > 0 {
            print!("{}", buf);
            buf.clear();
        }
        pos = reader.stream_position().expect("Failed to get position");
    }
}
[package]
name = "async-tail"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1", features = ["full"] }
notify = "6"

Join several tasks

use tokio::time::{Duration, sleep};

#[tokio::main]
async fn main() {
    let a = wait_random_time(String::from("a"));
    let b = wait_random_time(String::from("b"));
    tokio::join!(a, b);

    println!("---");

    let n = 3;
    let mut tasks = tokio::task::JoinSet::new();
    for i in 0..n {
        let name = format!("x-{i}");
        let task = wait_random_time(name);
        let _ = tasks.spawn(task);
    }
    while let Some(res) = tasks.join_next().await {
        match res {
            Ok(_) => {} //println!("Task finished successfully"),
            Err(e) => println!("Task failed: {e}"),
        }
    }
}

async fn wait_random_time(name: String) {
    let random_int = rand::random::<u64>() % 100;
    println!("{name} waiting for {random_int} milliseconds");
    sleep(Duration::from_millis(random_int)).await;
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }

Read STDIN Async

use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::select;

#[tokio::main]
async fn main() {
    loop {
        select! {
            input = read_user_input() => {
                println!("User input received: {}", input);
            },
            _ = tokio::signal::ctrl_c() => {
                println!("Received Ctrl+C! Exiting...");
                break;
            },
        }
        println!("After select");
    }
    println!("After loop");
    std::process::exit(0);
}

async fn read_user_input() -> String {
    let mut reader = BufReader::new(tokio::io::stdin());
    println!("Type something and press Enter (async):");
    let mut input = String::new();
    reader.read_line(&mut input).await.unwrap();
    input
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

sync vs async manually

fn sync_func() -> u32 {
    println!("sync_func called");
    42
}

async fn async_func() -> u32 {
    println!("async_func called");
    42
}

fn main() {
    println!("Start");
    let rt = tokio::runtime::Runtime::new().unwrap();

    let value = sync_func();
    println!("Value: {}", value);

    let future = async_func();

    println!("Before block");
    let res =rt.block_on(future);
    println!("Result: {}", res);

    println!("End");
}
Start
sync_func called
Value: 42
Before block
async_func called
Result: 42
End
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

Simple async example with Tokio

  • We have function called do_something that fakes real work by sleeping a bit. It is marked as an async function.
  • In the main function we setup the Runtime.
  • Calling do_something() does not actually execute it. It returns a Future.
async fn do_something() {
    println!("Start to do something");
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    println!("End   to do something");
}

fn main() {
    println!("Start");
    let rt = tokio::runtime::Runtime::new().unwrap();
    let future = do_something();

    println!("Before block");
    rt.block_on(future);

    println!("End");
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
Start
Before block
Start to do something
End   to do something
End

JoinSet random order of completition

We can combine several tasks in a JoinSet and then we can calle join_all to wait for all of them to finish that might be in any order.

#[tokio::main]
async fn main() {
    simple_logger::SimpleLogger::new().init().unwrap();
    log::info!("Start");
    let mut tasks = tokio::task::JoinSet::new();

    tasks.spawn(async move {
        log::info!("Task long starts");
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        log::info!("Task long is done");
    });
    std::thread::sleep(std::time::Duration::from_secs(1));
    log::info!("Long task started");

    for i in 0..5 {
        let time = rand::random::<u64>() % 100;
        log::info!("Task {i} setup for {time}");
        tasks.spawn(async move {
            log::info!("Task {i} starts");
            tokio::time::sleep(tokio::time::Duration::from_millis(time)).await;
            log::info!("Task {i} is done");
        });
    }

    log::info!("All tasks started");
    std::thread::sleep(std::time::Duration::from_secs(1));
    log::info!("Wait done");

    tasks.join_all().await;
    log::info!("End");
}
2025-09-17T12:07:02.366Z INFO  [demo] Start
2025-09-17T12:07:02.366Z INFO  [demo] Task long starts
2025-09-17T12:07:03.366Z INFO  [demo] Long task started
2025-09-17T12:07:03.366Z INFO  [demo] Task 0 setup for 95
2025-09-17T12:07:03.366Z INFO  [demo] Task 1 setup for 87
2025-09-17T12:07:03.366Z INFO  [demo] Task 2 setup for 97
2025-09-17T12:07:03.366Z INFO  [demo] Task 3 setup for 77
2025-09-17T12:07:03.366Z INFO  [demo] Task 4 setup for 87
2025-09-17T12:07:03.366Z INFO  [demo] All tasks started
2025-09-17T12:07:03.366Z INFO  [demo] Task 0 starts
2025-09-17T12:07:03.366Z INFO  [demo] Task 1 starts
2025-09-17T12:07:03.366Z INFO  [demo] Task 3 starts
2025-09-17T12:07:03.366Z INFO  [demo] Task 4 starts
2025-09-17T12:07:03.366Z INFO  [demo] Task 2 starts
2025-09-17T12:07:03.445Z INFO  [demo] Task 3 is done
2025-09-17T12:07:03.454Z INFO  [demo] Task 4 is done
2025-09-17T12:07:03.454Z INFO  [demo] Task 1 is done
2025-09-17T12:07:03.463Z INFO  [demo] Task 0 is done
2025-09-17T12:07:03.465Z INFO  [demo] Task 2 is done
2025-09-17T12:07:04.366Z INFO  [demo] Wait done
2025-09-17T12:07:04.367Z INFO  [demo] Task long is done
2025-09-17T12:07:04.367Z INFO  [demo] End
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
log = "0.4.28"
rand = "0.9.2"
simple_logger = "5.0.0"
tokio = { version = "1.47.1", features = ["full"] }

Basic TCP echo server

Based on the example in the README of tokio.

Start with:

cargo run

From another terminal:

telnet localhost 8080
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

/// Applies ROT13 transformation to a u8 slice.
/// Only ASCII letters are rotated; other bytes are unchanged.
pub fn rot13(input: &[u8]) -> Vec<u8> {
    input
        .iter()
        .map(|&b| match b {
            b'a'..=b'z' => (b - b'a' + 13) % 26 + b'a',
            b'A'..=b'Z' => (b - b'A' + 13) % 26 + b'A',
            _ => b,
        })
        .collect()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        println!("Waiting for a connection...");
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            loop {
                println!("Getting data...");
                let n = match socket.read(&mut buf).await {
                    // socket closed
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                    }
                };

                let received_text = String::from_utf8_lossy(&buf[0..n]);
                println!("Received text: {}", received_text);

                // Write the data back
                let encripted = rot13(&buf[0..n]);
                if let Err(e) = socket.write_all(&encripted[0..n]).await {
                    eprintln!("failed to write to socket; err = {:?}", e);
                    return;
                }
            }
        });
    }
}
[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }

Download many pages blocking http-reqwest

[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"

[dependencies]
reqwest = { version = "0.12.23", features = ["blocking"] }

use std::fs::File;
use std::io::{BufRead, BufReader};

use reqwest::Error;

fn download_page(url: &str) -> Result<String, Error> {
    let response = reqwest::blocking::get(url)?;
    let content = response.text()?;
    Ok(content)
}

fn read_urls_from_file(path: &str) -> Vec<String> {
    let file = File::open(path).expect("Failed to open file");
    let reader = BufReader::new(file);
    reader
        .lines()
        .filter_map(|line| {
            let line = line.ok()?.trim().to_string();
            if line.is_empty() || line.starts_with('#') {
                None
            } else {
                Some(line)
            }
        })
        .collect()
}


fn main() {
    let start = std::time::Instant::now();
    let args = std::env::args().collect::<Vec<String>>();
    if args.len() < 2 {
        eprintln!("Usage: {} <urls_file>", args[0]);
        std::process::exit(1);
    }
    let urls_file = &args[1];
    let urls = read_urls_from_file(urls_file);
    println!("URLs: {:?}", urls);

    for url in &urls {
        match download_page(url) {
            Ok(content) => {
                println!("Downloaded {} bytes from {}", content.len(), url);
            }
            Err(e) => println!("Error downloading {}: {}", url, e),
        }
    }
    let elapsed = start.elapsed();
    println!("Elapsed time: {:.2?}", elapsed);
}

Download many pages bad async

I think this code was generated by some AI, and while it uses the async constructs in reality it sends the requests sequentially.


use std::fs::File;
use std::io::{BufRead, BufReader};

use reqwest::Error;

async fn download_page(url: &str) -> Result<String, Error> {
    let response = reqwest::get(url).await?;
    let content = response.text().await?;
    Ok(content)
}

fn read_urls_from_file(path: &str) -> Vec<String> {
    let file = File::open(path).expect("Failed to open file");
    let reader = BufReader::new(file);
    reader
        .lines()
        .filter_map(|line| {
            let line = line.ok()?.trim().to_string();
            if line.is_empty() || line.starts_with('#') {
                None
            } else {
                Some(line)
            }
        })
        .collect()
}


#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();
    let args = std::env::args().collect::<Vec<String>>();
    if args.len() < 2 {
        eprintln!("Usage: {} <urls_file>", args[0]);
        std::process::exit(1);
    }
    let urls_file = &args[1];
    let urls = read_urls_from_file(urls_file);
    println!("URLs: {:?}", urls);

    for url in &urls {
        match download_page(url).await {
            Ok(content) => {
                println!("Downloaded {} bytes from {}", content.len(), url);
            }
            Err(e) => println!("Error downloading {}: {}", url, e),
        }
    }
    let elapsed = start.elapsed();
    println!("Elapsed time: {:.2?}", elapsed);
}
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"

[dependencies]
reqwest = "0.12.23"
tokio = { version = "1.47.1", features = ["full"] }

Download many pages async

use std::fs::File;
use std::io::{BufRead, BufReader};

use reqwest::Error;

fn get_urls_file() -> String {
    let args = std::env::args().collect::<Vec<String>>();
    if args.len() < 2 {
        eprintln!("Usage: {} <urls_file>", args[0]);
        std::process::exit(1);
    }
    let urls_file = &args[1];
    urls_file.to_string()
}

fn read_urls_from_file(path: &str) -> Vec<String> {
    let file = File::open(path).expect("Failed to open file");
    let reader = BufReader::new(file);
    reader
        .lines()
        .filter_map(|line| {
            let line = line.ok()?.trim().to_string();
            if line.is_empty() || line.starts_with('#') {
                None
            } else {
                Some(line)
            }
        })
        .collect()
}

async fn download_page(url: &str) -> Result<String, Error> {
    let response = reqwest::get(url).await?;
    let content = response.text().await?;
    Ok(content)
}

async fn downlod_many_urls(urls: &[String]) {
    let mut tasks = tokio::task::JoinSet::new();
    for url in urls {
        println!("Downloading: {url}");
        tasks.spawn({
            let url = url.clone();
            async move {
                match download_page(&url).await {
                    Ok(content) => {
                        println!("Downloaded {} bytes from {url}", content.len());
                    }
                    Err(err) => println!("Error downloading {url}: {err}"),
                }
            }
        });
    }

    println!("Started {} tasks. Waiting...", tasks.len());
    tasks.join_all().await;
}

#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();

    let urls_file = get_urls_file();
    let urls = read_urls_from_file(&urls_file);
    println!("URLs: {urls:?}");

    downlod_many_urls(&urls).await;

    let elapsed = start.elapsed();
    println!("Elapsed time: {elapsed:.2?}");
}
  • Takes 0.3-0.5 sec to the run this.
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"

[dependencies]
reqwest = "0.12.23"
tokio = { version = "1.47.1", features = ["full"] }

Crawler

use reqwest::Error;
use scraper::{Html, Selector};

fn get_url() -> String {
    let args = std::env::args().collect::<Vec<String>>();
    if args.len() < 2 {
        eprintln!("Usage: {} <urls_file>", args[0]);
        std::process::exit(1);
    }
    let url = &args[1];
    url.to_string()
}

async fn download_page(url: String) -> Result<(String, String), Error> {
    let response = reqwest::get(&url).await?;
    let content = response.text().await?;
    Ok((url, content))
}

async fn crawl(url: &str, limit: u32) {
    let mut tasks = tokio::task::JoinSet::new();
    println!("Downloading: {url}");
    let mut count = 1;
    tasks.spawn({
        let url = url.to_string();
        async move { download_page(url).await }
    });

    println!("Started tasks. Waiting...");
    while !tasks.is_empty() {
        let result = tasks.join_next().await;
        match result {
            Some(Ok(content)) => {
                match content {
                    Ok((url, html)) => {
                        println!("Downloaded {} bytes from {url}", html.len());
                        parse_links(&html).iter().for_each(|link| {
                            if link.starts_with("http://") {
                                //println!("Found unsecure: {link}");
                            }

                            if link.starts_with("https://") {
                                //println!("Found secure link: {link}");
                                if count < limit {
                                    count += 1;
                                    tasks.spawn({
                                        let link = link.to_string();
                                        async move { download_page(link).await }
                                    });
                                }
                            }
                            // internal links
                        });
                    }
                    Err(err) => println!("Error downloading {url}: {err}"),
                }
            }
            Some(Err(err)) => {
                println!("Task failed: {err}");
            }
            None => break,
        }
    }
}

#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();

    let url = get_url();
    println!("Staring with: {url}");
    crawl(&url, 5).await;

    let elapsed = start.elapsed();
    println!("Elapsed time: {elapsed:.2?}");
}

fn parse_links(html: &str) -> Vec<String> {
    let document = Html::parse_document(html);
    let selector = Selector::parse("a[href]").unwrap();

    document
        .select(&selector)
        .filter_map(|element| element.value().attr("href"))
        .map(|href| href.to_string())
        .collect()
}
[package]
name = "download-many-pages"
version = "0.1.0"
edition = "2024"

[dependencies]
reqwest = "0.12.23"
scraper = "0.24.0"
tokio = { version = "1.47.1", features = ["full"] }

Demo

Demo 1

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
rand = "0.9.2"
tokio = { version = "1.47.1", features = ["full"] }
#[tokio::main]
async fn main() {
    let result = multi_step(0).await;   
    println!("Final result: {}", result);
}



async fn multi_step(input: u64) -> u64 {
    let step1 = get_data(input).await;
    let step2 = get_data(step1).await;
    let step3 = get_data(step2).await;
    step3
}

async fn get_data(input: u64) -> u64 {
    // pretend we get some data from the network
    let random_int = rand::random::<u64>() % 11;
    tokio::time::sleep(tokio::time::Duration::from_millis(random_int)).await;
    input + random_int
}

Demo 2

[package]
name = "demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
use tokio::fs;
use tokio::time::{sleep, Duration};
async fn has_server_log_changed(mut last_modified: Option<std::time::SystemTime>) -> Option<std::time::SystemTime> {
    match fs::metadata("server.log").await {
        Ok(metadata) => {
            let modified = metadata.modified().ok()?;
            if let Some(last) = last_modified {
                if modified > last {
                    println!("server.log has changed!");
                }
            }
            Some(modified)
        },
        Err(_) => {
            println!("server.log not found");
            None
        }
    }
}

// Example usage: poll for changes every second
// async fn poll_server_log_changes() {
//     let mut last_modified = None;
//     loop {
//         last_modified = has_server_log_changed(last_modified).await;
//         sleep(Duration::from_secs(1)).await;
//     }
// }
use tokio::select;
use tokio::io::{AsyncBufReadExt, BufReader};

#[tokio::main]
async fn main() {
    let mut last_modified = None;
    let mut quit = false;
    loop {
        select! {
            input = read_user_input() => {
                println!("User input received: {}", input);
            },
            _ = tokio::signal::ctrl_c() => {
                println!("Received Ctrl+C! Exiting...");
                quit = true;
                break;
            },
            new_modified = has_server_log_changed(last_modified) => {
                if let Some(modified) = new_modified {
                    if let Some(last) = last_modified {
                        if modified > last {
                            println!("changed");
                        }
                    }
                    last_modified = Some(modified);
                }
            }
        }
        println!("After select");
        // Add a short sleep to avoid busy looping
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
    println!("After loop");
    std::process::exit(0);

}

async fn read_user_input() -> String{
    let mut reader = BufReader::new(tokio::io::stdin());
    println!("Type something and press Enter (async):");
    let mut input = String::new();
    reader.read_line(&mut input).await.unwrap();
    input
}   




// #[tokio::main]
// async fn main() {
//     tokio::spawn(run_callback_in_3_seconds());
//         //     _ = tokio::signal::ctrl_c() => {                    
//     //             if quit {
//     //                 println!("Exiting...");
//     //                 break;
//     //             }
//     //             quit = true;
//     //             println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?");
//     //     },

//     loop {
//         wait_a_bit().await;
//     }

//     // let mut quit = false;
//     // select! {
//     //     _ = do_some_work() => println!("done"),
//     //     _ = tokio::signal::ctrl_c() => {                    
//     //             if quit {
//     //                 println!("Exiting...");
//     //                 break;
//     //             }
//     //             quit = true;
//     //             println!("Received Ctrl+C! Press Ctrl-C again if you'd like to quit?");
//     //     },
//     //     _ = run_callback_in_3_seconds() => {
//     //         quit = false;
//     //     }
//     // }
// }
async fn run_callback_in_3_seconds() {
    println!("Setup a callback in 3 seconds...");
    tokio::time::sleep(std::time::Duration::from_secs(3)).await;
    // Place your callback logic here
    std::process::exit(0);
}
async fn do_some_work() {
    loop {
        println!("Doing some work...");
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}



async fn wait_a_bit() {
        println!("Waiting a bit...");
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

Resources

Other runtimes:

  • Embasy.dev for RTOS
  • glommio
  • https://crates.io/crates/smol
  • https://crates.io/crates/futures