Rust in Parallel
Every program, every function spends some time executing code in the CPU and some time waiting for the data to arrive. In computational-heavy applications the program might use the GPU instead of the CPU which makes the situation even more complex. For simplicity let's focus on the CPU doing some work for our program vs. our program waiting for some data to arrive from the network. During the latter time our program does not need the CPU.
Let's see a few examples:
I/O bound application
We are build a crawler. It downloads a web page. Finds links in it then downloads the pages that were linked and goes on like this for 1000 pages.
At first the program uses the CPU to build up the HTTP request and send it to the server. Once the request was sent our program will wait for the answer to arrive. It might take several seconds. During that time our program waits. Once the content of the page arrives our program starts using the CPU again to extract the links and build up the new requests. It sends out the first request and starts to wait again. When the response arrives stores it and sends out the 2nd request from the first page and starts to wait again.
It has a lot of waiting and little CPU-work in-between.
This is called an I/O bound or I/O intensive or network intensive application,
CPU bound application
We have a thousand CSV files with with millions of data points in each one. We need to do some computation on each file.
Our program will read the first file into memory. During the reading the CPU does not have much to do, but the reading only takes a few milliseconds (and not seconds as was in the previous example). Then our program does some heavy computation on the file during which the CPU is fully engaged. When it is done it takes the second file. It again spends some little time waiting and then a relatively long time doing the computations.
During most of the time the CPU is working for our program.
This is called a CPU bound, or CPU intensive application.
Finish sooner
In general we usually want our applications to finish sooner. We prefer to wait 5 minutes for the results instead of 2 hours.
If our application is CPU-bound then we either need a faster CPU or to use more CPUs. Today most computers have many cores that effectively means we have more than one CPU in every computer. So if we could divide our task to smaller tasks and if we could arrange that some of these task will use one of the cores (CPUs) while other tasks will use another core then we could reduce the overall time needed to do all the work. We might add some extra work, the overhead of managing this, which means we will use more resources of the computer to complete the tasks, but we'll finish them sooner. If this is the situation we have two techniques at our disposal. Creating multiple processes or multiple threads and let the Operating System run them in parallel. Out of these two techniques using threads will incur a lot less overhead than using processes.
If our application is I/O-bound then we have two possibilities. In one case the network is saturated. That is we use the whole capacity of the cable. This might happen if our bandwidth is small, some other people in our office also have network-intensive applications, or we are downloading movies. In this case we might need to buy more bandwidth, or convince our co-workers to take the day off. However, this is rarely the case. In most situation the network could transfer a lot more data, but it takes time for the data travel all the distance and it takes a lot of time for the other server to process our request and send the data.
In such case it might be beneficial if we could send out several requests at the same time. Then the waiting times of several requests will overlap and we might also be able to do the computation part of one request while the other is still waiting for the response. In this case we don't even need multiple CPUs to reduce the the time we need to wait as the CPU wasn't doing much in the first place, but having multiple cores (CPUs) might still help. The techniques at our disposal are: async programming, threads, and processes.
Threads in Rust
Threads in Rust
I'd recommend you read about the Fearless concurrency in The Rust book and check out std::thread.
Available cores
The first thing we should to before trying to use multiple cores is to know how many cores (CPUs) we have.
fn main() { println!("{}", std::thread::available_parallelism().unwrap()); }
Simple thread (with fake work)
- We can easily start a new thread using the spawn function of the thread crate.
- We even have two loops one in the main thread and one in the created thread to "do some work". It seems to work.
- There is a slight problem though. Our main program might end before the thread can do the actual work and this example we don't even see that.
use std::thread; fn main() { println!("Before starting: {:?}", thread::current().id()); thread::spawn(|| { println!("In thread {:?}", thread::current().id()); for i in 1..10000000 { let _n = i + i; } }); for i in 1..10000000 { let _n = i + i; } println!("After ending: {:?}", thread::current().id()); }
Before starting: ThreadId(1)
In thread ThreadId(2)
After ending: ThreadId(1)
Simple thread
- In this case when the main thread does not do "extra job" it is obvious that the other thread did not even have a chance to start working.
use std::thread; fn main() { println!("Before starting: {:?}", thread::current().id()); thread::spawn(|| { println!("In thread {:?}", thread::current().id()); }); println!("After ending: {:?}", thread::current().id()); }
Before starting: ThreadId(1)
After ending: ThreadId(1)
- thread
- spawn
Simple thread with join
- The solution is to save the
handle
of the thread and the usejoin
to wait for its termination.
use std::thread; fn main() { println!("Before starting: {:?}", thread::current().id()); let handle = thread::spawn(|| { println!("In thread {:?}", thread::current().id()); }); println!("Before join: {:?}", thread::current().id()); handle.join().unwrap(); println!("After ending: {:?}", thread::current().id()); }
Before starting: ThreadId(1)
Before join: ThreadId(1)
In thread ThreadId(2)
After ending: ThreadId(1)
- thread
- spawn
- join
Show that threads work in parallel
spawn
will create a new thread. We can usethread::current().id()
to get the id of the current thread.join
in the main thread will block till the other thread stops.- We can see "Loop in main thread ended" is already printed before the "Spawned thread ended", but then the main thread waits.
use std::thread; use std::time::Duration; fn main() { println!("Before starting: {:?}", thread::current().id()); let handle = thread::spawn(|| { for i in 1..=10 { println!( "Hi number {} from the spawned thread! {:?}", i, thread::current().id() ); thread::sleep(Duration::from_millis(1)); } println!("Spawned thread ended"); }); //thread::sleep(Duration::from_millis(1)); //thread::sleep(Duration::from_micros(1)); for i in 1..=5 { println!( "Hi number {} from the main thread! {:?}", i, thread::current().id() ); thread::sleep(Duration::from_millis(1)); } println!("Loop in main thread ended"); handle.join().unwrap(); // waiting for the other thread to end. println!("After ending: {:?}", thread::current().id()); println!("Exiting"); }
Before starting: ThreadId(1)
Hi number 1 from the main thread! ThreadId(1)
Hi number 1 from the spawned thread! ThreadId(2)
Hi number 2 from the main thread! ThreadId(1)
Hi number 2 from the spawned thread! ThreadId(2)
Hi number 3 from the main thread! ThreadId(1)
Hi number 3 from the spawned thread! ThreadId(2)
Hi number 4 from the main thread! ThreadId(1)
Hi number 4 from the spawned thread! ThreadId(2)
Loop in main thread ended
Hi number 5 from the spawned thread! ThreadId(2)
Hi number 6 from the spawned thread! ThreadId(2)
Hi number 7 from the spawned thread! ThreadId(2)
Hi number 8 from the spawned thread! ThreadId(2)
Hi number 9 from the spawned thread! ThreadId(2)
Spawned thread ended
After ending: ThreadId(1)
Exiting
- thread
- spawn
- sleep
- join
- current
Return value from thread
- TODO
Handle panic! in threads
use std::env; use std::thread; // TODO what if the subthread causes a segmentation fault fn main() { println!("Before starting: {:?}", thread::current().id()); let handle = thread::spawn(|| { println!("In thread {:?}", thread::current().id()); if let Ok(val) = env::var("PANIC") { panic!("We have a panic {val}"); } 42 }); println!("Before join: {:?}", thread::current().id()); //handle.join().unwrap(); match handle.join() { Ok(val) => println!("The thread returned {val:?}"), Err(err) => println!("There was a panic in the thread {err:?}"), } println!("After ending: {:?}", thread::current().id()); }
cargo run
PANIC=23 cargo run
- spawn
- join
- match
Threads polling the substhreads
use std::time::Duration; fn main() { println!("Before"); let mut handles = vec![]; for _ in 1..=3 { handles.push(std::thread::spawn(|| { for ix in 0..=4 { println!("{:?} {} ", std::thread::current().id(), ix); std::thread::sleep(Duration::from_millis(10)); } })); } // Enable this to see that the other thread might or might not run before // the main thread gets the chance to print. //std::thread::sleep(Duration::from_millis(10)); println!("Started"); // for handle in handles { // handle.join().unwrap(); // } loop { println!("In Main"); if handles.iter().all(|handle| handle.is_finished()) { break; } std::thread::sleep(Duration::from_millis(10)); } println!("After"); }
Before
Started
In Main
ThreadId(2) 0
ThreadId(3) 0
ThreadId(4) 0
In Main
ThreadId(2) 1
ThreadId(3) 1
ThreadId(4) 1
In Main
ThreadId(2) 2
ThreadId(3) 2
ThreadId(4) 2
In Main
ThreadId(3) 3
ThreadId(2) 3
ThreadId(4) 3
In Main
ThreadId(3) 4
ThreadId(4) 4
ThreadId(2) 4
In Main
In Main
After
- is_finished
- iter
- all
Threads with messages
- We can facilitate communication between the main thread and the spawned thread.
- In this example the spawned thread is sending a message to the main thread.
- The
move
keyword tells Rust that the variables declared before spawning that are also used in the spawned code need to be moved. (tx
in this case) - We can use
recv
, which is blocking the main thread, to wait for a message from the spawned thread. - In that case we will have to know how many messages to expect and if we are still waiting for a message while the spawned thread exits then we either get stuck or get panic!.
- Using the second loop is a better solution.
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("Hello"); tx.send(val).unwrap(); for i in 1..=10 { thread::sleep(Duration::from_millis(1)); tx.send(i.to_string()).unwrap(); } // thread::sleep(Duration::from_millis(1)); println!("Spawned thread ends"); }); let received = rx.recv().unwrap(); println!("Received: {}", received); // complex way for reveiving for _j in 1..=5 { let received = rx.recv().unwrap(); println!("Received: {}", received); } println!(); // simple code for receiving for received in rx { println!("Received: {}", received); } println!(); println!("Main thread ends"); }
Received: Hello
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Spawned thread ends
Received: 10
Main thread ends
- channel
- send
- recv
- move
Two threads sending messages
- Sending messages from more than one spawned threads to the main thread.
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx1, rx) = mpsc::channel(); let tx2 = tx1.clone(); println!("{:?}: start", thread::current().id()); thread::spawn(move || { for i in 1..=5 { thread::sleep(Duration::from_millis(1)); tx1.send(format!("{:?}: {}", thread::current().id(), i)) .unwrap(); } println!("Spawned thread {:?} ends", thread::current().id()); }); thread::spawn(move || { for i in 1..=5 { thread::sleep(Duration::from_millis(1)); tx2.send(format!("{:?}: {}", thread::current().id(), i)) .unwrap(); } println!("Spawned thread {:?} ends", thread::current().id()); }); for received in rx { println!("Received: {received}"); } println!("Main thread ends"); }
ThreadId(1): start
Received: ThreadId(2): 1
Received: ThreadId(3): 1
Received: ThreadId(2): 2
Received: ThreadId(3): 2
Received: ThreadId(2): 3
Received: ThreadId(3): 3
Received: ThreadId(2): 4
Received: ThreadId(3): 4
Spawned thread ThreadId(2) ends
Received: ThreadId(2): 5
Spawned thread ThreadId(3) ends
Received: ThreadId(3): 5
Main thread ends
- clone
Testing speed improvements with threads
- A CPU intensive task - computing the Fibonacci numbers up to N 10 times.
- Once in a single threaded process and once in a multi-threaded process with 10 threads.
Results
N single multi
thread thread
40: 7.1 sec vs 1.3 sec
41: 11.4 sec vs 2.1 sec
42: 18.1 sec vs 3.3 sec
use std::env; use std::process; use std::sync::mpsc; use std::thread; use std::time::Instant; fn main() { let args = env::args().collect::<Vec<String>>(); if args.len() != 3 { eprintln!("Usage: {} n [linear|threads]", args[0]); process::exit(1); } let n = args[1] .parse::<u64>() .unwrap_or_else(|_| panic!("Could not convert {} to integer", args[1])); let repetition = 10; let start = Instant::now(); if args[2] == "linear" { linear(n, repetition); } else if args[2] == "threads" { in_threads(n, repetition); } else { println!("Invalid parameter {}", args[2]) } let duration = start.elapsed(); println!("Time elapsed in expensive_function() is: {:?}", duration); } fn fibonacci(n: u64) -> u64 { if n == 0 || n == 1 { return 1; } fibonacci(n - 1) + fibonacci(n - 2) } fn linear(n: u64, repetition: i32) { println!("single thread"); for _x in 1..=repetition { println!("{n}: {}", fibonacci(n)); } } fn in_threads(n: u64, repetition: i32) { println!("multiple threads"); let (tx, rx) = mpsc::channel(); for _x in 1..=repetition { let txr = tx.clone(); thread::spawn(move || { let res = fibonacci(n); txr.send(res.to_string()).unwrap(); println!("spawned {:?} finished", thread::current().id()); }); } drop(tx); // need to drop in the main thread for received in rx { println!("Received: {}", received); } }
multiple threads
spawned ThreadId(2) finished
spawned ThreadId(4) finished
spawned ThreadId(3) finished
spawned ThreadId(5) finished
spawned ThreadId(7) finished
spawned ThreadId(8) finished
spawned ThreadId(6) finished
spawned ThreadId(9) finished
Received: 10946
Received: 10946
Received: 10946
Received: 10946
Received: 10946
Received: 10946
Received: 10946
Received: 10946
spawned ThreadId(10) finished
Received: 10946
spawned ThreadId(11) finished
Received: 10946
Time elapsed in expensive_function() is: 1.851333ms
Save many files (both CPU and IO intensive)
- In this example we demonstrate the speed improvement of threading.
- The program will count the number of prime numbers up to a given number. This part is CPU intensive.
- Then it will create many small files. - This part is IO intensive.
use std::fs::File; use std::io::Write; use std::path::Path; use tempdir::TempDir; fn main() { let args: Vec<String> = std::env::args().collect(); if args.len() != 3 { println!("Usage: {} NUMBER_OF_FILES PRIME_LIMIT", args[0]); std::process::exit(1); } let number_of_files = args[1].parse::<i32>().unwrap(); let limit = args[2].parse::<u32>().unwrap(); single_thread(number_of_files, limit); for threads in 2..=5 { multiple_thread(number_of_files, limit, threads); } } fn multiple_thread(number_of_files: i32, limit: u32, threads: i32) { let temp_dir = TempDir::new("demo").unwrap(); //println!("temp_dir: {:?}", temp_dir); let temp_dir_path = temp_dir.into_path(); let start = std::time::Instant::now(); let mut handles = vec![]; for part in 0..threads { let temp_dir_path = temp_dir_path.clone(); let batch_size = number_of_files / threads; let start = part * batch_size + 1; let end = if part < threads - 1 { (part + 1) * batch_size } else { number_of_files }; handles.push(std::thread::spawn(move || { //println!("In thread {:?}", std::thread::current().id()); create_files(start, end, limit, &temp_dir_path); })); } for handle in handles { handle.join().unwrap(); } println!( "Elapsed time for {} threads: {:?}", threads, start.elapsed() ); verify_number_of_created_file(&temp_dir_path, number_of_files); } fn single_thread(number_of_files: i32, limit: u32) { let temp_dir = TempDir::new("demo").unwrap(); //println!("temp_dir: {:?}", temp_dir); let temp_dir_path = temp_dir.into_path(); let start_time = std::time::Instant::now(); let start = 1; let end = number_of_files; create_files(start, end, limit, &temp_dir_path); println!("Elapsed time: {:?}", start_time.elapsed()); verify_number_of_created_file(&temp_dir_path, number_of_files); } fn verify_number_of_created_file(temp_dir: &Path, number_of_files: i32) { //println!("Number of files: {}", number_of_files); let content = temp_dir.read_dir().unwrap().count() as i32; std::fs::remove_dir_all(temp_dir).unwrap(); assert_eq!(number_of_files, content); } fn create_files(start: i32, end: i32, limit: u32, temp_dir: &Path) { for ix in start..=end { let file_path = temp_dir.join(format!("{}.txt", ix)); //println!("{:?}", file_path); let mut file = File::create(file_path).unwrap(); let primes = count_primes(limit); writeln!(&mut file, "{} {}.txt", primes, ix).unwrap(); } } fn count_primes(limit: u32) -> u32 { let mut count = 0; for number in 2..=limit { if is_prime(number) { count += 1; } } count } fn is_prime(number: u32) -> bool { for div in 2..number { if number % div == 0 { return false; } } true }
- Compute primes up to 1 (that is, do almost nothing). Create 100,000 files. This is mostly IO intensive.
- We can see a 35-40% speed improvement going from no threads to 2 threads, but there is no more speed improvement.
cargo run -q 100000 1
Elapsed time: 1.619909245s
Elapsed time for 2 threads: 1.003796038s
Elapsed time for 3 threads: 1.019957126s
Elapsed time for 4 threads: 998.978148ms
Elapsed time for 5 threads: 1.042834678s
- Compute primes up to 500 (CPU intensive). Create 100,000 files. This has both CPU and IO part.
- We can see speed increase by each additional thread, but the improvement diminishes as we add more threads.
cargo run -q 100000 500
Elapsed time: 11.111724788s
Elapsed time for 2 threads: 5.728736066s
Elapsed time for 3 threads: 3.934323686s
Elapsed time for 4 threads: 3.066895233s
Elapsed time for 5 threads: 2.547374277s
Rust threads read-only access to shared variables
See several examples:
- Shared read-only variable with a number in it.
- Shared read-only variable with a string in it.
- Shared read-only variable with a string in it using Arc.
Shared read-only variable with numeric value
- The integer is copied
use std::time::Duration; fn main() { let answer = 42; println!("Before: {answer} {:p}", &answer); let mut handles = vec![]; for _ in 1..=3 { handles.push(std::thread::spawn(move || { println!("{:?} {} {:p}", std::thread::current().id(), answer, &answer); std::thread::sleep(Duration::from_millis(10)); })); } println!("Started: {answer} {:p}", &answer); for handle in handles { handle.join().unwrap(); } println!("After: {answer} {:p}", &answer); }
Before: 42 0x7ffd30c3fb2c
Started: 42 0x7ffd30c3fb2c
ThreadId(2) 42 0x79e7a53ffafc
ThreadId(3) 42 0x79e7a4fffafc
ThreadId(4) 42 0x79e7a4bffafc
After: 42 0x7ffd30c3fb2c
Shared read-only variable with string value
- The string must be cloned for this to work.
use std::time::Duration; fn main() { let answer = String::from("Hello World!"); println!("Before: {answer} {:p} {:?}", &answer, answer.as_ptr()); let mut handles = vec![]; for _ in 1..=3 { let answer = answer.clone(); handles.push(std::thread::spawn(move || { println!( "{:?} {} {:p} {:?}", std::thread::current().id(), answer, &answer, answer.as_ptr() ); std::thread::sleep(Duration::from_millis(10)); })); } println!("Started: {answer} {:p} {:?}", &answer, answer.as_ptr()); for handle in handles { handle.join().unwrap(); } println!("After: {answer} {:p} {:?}", &answer, answer.as_ptr()); }
Before: Hello World! 0x7fffa8e341c8 0x5c8f8538db80
Started: Hello World! 0x7fffa8e341c8 0x5c8f8538db80
ThreadId(2) Hello World! 0x7d60e73ffb70 0x5c8f8538dba0
ThreadId(4) Hello World! 0x7d60e6bffb70 0x5c8f8538e090
ThreadId(3) Hello World! 0x7d60e6fffb70 0x5c8f8538de50
After: Hello World! 0x7fffa8e341c8 0x5c8f8538db80
Shared read-only variable with string value with Arc
- We can use Arc to create Reference Counting around the data.
- The
clone
on the Arc will only increment the reference counter, but does not copy the data.
use std::sync::Arc; use std::time::Duration; fn main() { let answer = Arc::new(String::from("Hello World!")); println!("Before: {answer} {:p} {:?}", &answer, answer.as_ptr()); let mut handles = vec![]; for _ in 1..=3 { let answer = answer.clone(); handles.push(std::thread::spawn(move || { println!( "{:?} {} {:p} {:?}", std::thread::current().id(), answer, &answer, answer.as_ptr() ); std::thread::sleep(Duration::from_millis(10)); })); } println!("Started: {answer} {:p} {:?}", &answer, answer.as_ptr()); for handle in handles { handle.join().unwrap(); } println!("After: {answer} {:p} {:?}", &answer, answer.as_ptr()); }
Before: Hello World! 0x7fffeddb81b0 0x61db86afeb80
Started: Hello World! 0x7fffeddb81b0 0x61db86afeb80
ThreadId(2) Hello World! 0x78f1c1bffa78 0x61db86afeb80
ThreadId(3) Hello World! 0x78f1c17ffa78 0x61db86afeb80
ThreadId(4) Hello World! 0x78f1c13ffa78 0x61db86afeb80
After: Hello World! 0x7fffeddb81b0 0x61db86afeb80
Pass reference of read-only vector to thread
- Arc allows us to have reference counting.
- Here the
clone
only copies the reference and not the whole data structure.
use std::sync::Arc; macro_rules! prt { ($text: expr, $var: expr) => { println!("{:11} {:?} {:p} {:?}", $text, $var, &$var, $var.as_ptr()); }; } fn main() { let animals = Arc::new(vec![ String::from("crab"), String::from("ant"), String::from("cat"), String::from("dog"), String::from("bat"), ]); prt!("Before:", animals); let mut handles = vec![]; for _ in 1..=3 { let animals = animals.clone(); handles.push(std::thread::spawn(move || { list_animals(&animals); })); } prt!("Started:", animals); for handle in handles { handle.join().unwrap(); } prt!("After:", animals); } fn list_animals(animals: &Vec<String>) { prt!(format!("{:?}", std::thread::current().id()), animals); //for animal in animals { // println!("{:?} {}", std::thread::current().id(), animal); //} }
Before: ["crab", "ant", "cat", "dog", "bat"] 0x7ffc60e9fbf8 0x6225d6993480
Started: ["crab", "ant", "cat", "dog", "bat"] 0x7ffc60e9fbf8 0x6225d6993480
ThreadId(2) ["crab", "ant", "cat", "dog", "bat"] 0x75e378bff678 0x6225d6993480
ThreadId(3) ["crab", "ant", "cat", "dog", "bat"] 0x75e3787ff678 0x6225d6993480
ThreadId(4) ["crab", "ant", "cat", "dog", "bat"] 0x75e373dff678 0x6225d6993480
After: ["crab", "ant", "cat", "dog", "bat"] 0x7ffc60e9fbf8 0x6225d6993480
- Arc
- clone
Pass reference of read-only vector to thread improved
- In this solution the call to
clone
the Arc was moved inside the spawn.
use std::sync::Arc; macro_rules! prt { ($text: expr, $var: expr) => { println!("{:11} {:?} {:p} {:?}", $text, $var, &$var, $var.as_ptr()); }; } fn main() { let animals = Arc::new(vec![ String::from("crab"), String::from("ant"), String::from("cat"), String::from("dog"), String::from("bat"), ]); prt!("Before:", animals); let mut handles = vec![]; for _ in 1..=3 { handles.push(std::thread::spawn({ let animals = animals.clone(); move || { list_animals(&animals); } })); } prt!("Started:", animals); for handle in handles { handle.join().unwrap(); } prt!("After:", animals); } fn list_animals(animals: &Vec<String>) { prt!(format!("{:?}", std::thread::current().id()), animals); //for animal in animals { // println!("{}", animal); //} }
Before: ["crab", "ant", "cat", "dog", "bat"] 0x7fff3f32bef8 0x5adca2be2480
Started: ["crab", "ant", "cat", "dog", "bat"] 0x7fff3f32bef8 0x5adca2be2480
ThreadId(2) ["crab", "ant", "cat", "dog", "bat"] 0x793310bff678 0x5adca2be2480
ThreadId(4) ["crab", "ant", "cat", "dog", "bat"] 0x7933103ff678 0x5adca2be2480
ThreadId(3) ["crab", "ant", "cat", "dog", "bat"] 0x7933107ff678 0x5adca2be2480
After: ["crab", "ant", "cat", "dog", "bat"] 0x7fff3f32bef8 0x5adca2be2480
- Arc
- clone
Process read-only string slices in parallel
- Reference counting with Arc.
use std::sync::Arc; use std::thread; macro_rules! prt { ($text: expr, $var: expr) => { prt!($text, $var, $var) }; ($text: expr, $var: expr, $show: expr) => { println!("{:11} {:p} {:?} {:?}", $text, &$var, $var.as_ptr(), $show); }; } fn main() { let text = Arc::new(String::from("The black cat climbed the green tree")); let parts = 3; println!("len: {} parts: {}", text.len(), parts); prt!("Before:", text); let mut handles = vec![]; let size = text.len() / parts; for part in 0..parts { let start = part * size; let end = if start + size + size <= text.len() { start + size } else { text.len() }; //println!("{start:3}-{end:3} {}", &text[start..end]); handles.push(thread::spawn({ let text = text.clone(); move || { prt!( format!("{:?}", thread::current().id()), text, &text[start..end] ); } })); } prt!("Started:", text); for handle in handles { handle.join().unwrap(); } prt!("After:", text); }
len: 36 parts: 3
Before: 0x7fff1ba74ea8 0x58b13bed0ae0 "The black cat climbed the green tree"
Started: 0x7fff1ba74ea8 0x58b13bed0ae0 "The black cat climbed the green tree"
ThreadId(2) 0x75d6e05ffb70 0x58b13bed0ae0 "The black ca"
ThreadId(3) 0x75d6e01ffb70 0x58b13bed0ae0 "t climbed th"
ThreadId(4) 0x75d6dfdffb70 0x58b13bed0ae0 "e green tree"
After: 0x7fff1ba74ea8 0x58b13bed0ae0 "The black cat climbed the green tree"
Filling the memory showing that Arc works
use std::sync::Arc; use std::time::Duration; fn main() { let mut text = String::from("x"); // This will create a string that fills the memory. // One more iteration and it would crash the program for i in 0..=33 { text.push_str(&text.clone()); println!("len: {} {}", i, text.len()); } // Without Arc the thread creates a copy of text pushing it over the // total size of memory and crashing the program. // With Arc the long string is not copied and thus the process can run. let text = Arc::new(text); println!("Before: {} {:p} {:?}", text.len(), &text, text.as_ptr()); let mut handles = vec![]; for _ in 1..=3 { let text: Arc<String> = text.clone(); handles.push(std::thread::spawn(move || { println!( "{:?} {} {:p} {:?}", std::thread::current().id(), text.len(), &text, text.as_ptr() ); std::thread::sleep(Duration::from_millis(10)); })); } println!("Started: {} {:p} {:?}", text.len(), &text, text.as_ptr()); for handle in handles { handle.join().unwrap(); } println!("After: {} {:p} {:?}", text.len(), &text, text.as_ptr()); }
Pass and return ownership
-
An alternate way to handle this situation is to return the vector.
-
This way we pass the ownership back to the caller.
-
This would only work properly if the threads do not need the same variable at the same time.
-
So either they run sequentially in which case we don't gain CPU or each thread needs a different variable.
macro_rules! prt { ($text: expr, $var: expr) => { println!("{:11} {:?} {:p} {:?}", $text, $var, &$var, $var.as_ptr()); }; } fn main() { let animals = vec![ String::from("crab"), String::from("ant"), String::from("cat"), String::from("dog"), String::from("bat"), ]; prt!("Before:", animals); let handle = std::thread::spawn(move || { list_animals(&animals); animals }); // Here we cannot access animals //prt!("Started:", animals); println!("Started:"); let animals = handle.join().unwrap(); prt!("After:", animals); } fn list_animals(animals: &Vec<String>) { prt!(format!("{:?}", std::thread::current().id()), animals); }
Before: ["crab", "ant", "cat", "dog", "bat"] 0x7ffe2316f6f0 0x62aa56783480
Started:
ThreadId(2) ["crab", "ant", "cat", "dog", "bat"] 0x730f1d3ff558 0x62aa56783480
After: ["crab", "ant", "cat", "dog", "bat"] 0x7ffe2316fb10 0x62aa56783480
Thread scope
- using thread::scope there is an even simpler solution.
macro_rules! prt { ($text: expr, $var: expr) => { println!("{:11} {:?} {:p} {:?}", $text, $var, &$var, $var.as_ptr()); }; } fn main() { let animals = vec![ String::from("crab"), String::from("ant"), String::from("cat"), String::from("dog"), String::from("bat"), ]; prt!("Before:", animals); std::thread::scope(|scope| { scope.spawn(|| list_animals(&animals)); scope.spawn(|| list_animals(&animals)); scope.spawn(|| list_animals(&animals)); }); prt!("After:", animals); } fn list_animals(animals: &Vec<String>) { // Enable this to show that they work in parallel // for animal in animals { // println!(" {} in {:?}", animal, std::thread::current().id()); // std::thread::sleep(std::time::Duration::from_millis(rand::random::<u8>() as u64)); // } prt!(format!("{:?}", std::thread::current().id()), animals); }
Before: ["crab", "ant", "cat", "dog", "bat"] 0x7ffc77a51ef8 0x59798b84f480
ThreadId(2) ["crab", "ant", "cat", "dog", "bat"] 0x76e603fff688 0x59798b84f480
ThreadId(4) ["crab", "ant", "cat", "dog", "bat"] 0x76e6037ff688 0x59798b84f480
ThreadId(3) ["crab", "ant", "cat", "dog", "bat"] 0x76e603bff688 0x59798b84f480
After: ["crab", "ant", "cat", "dog", "bat"] 0x7ffc77a51ef8 0x59798b84f480
chdir in threads
- The current working directory is a per process so separate threads cannot have different current working directories
Environment variables in threads
- The environment variables are per process so separate threads cannot have different values.
Counter in a loop in the same process and thread
- This simple examples shows how we can change a variable from multiple threads
- This is the example without any threads:
fn main() { let limit = 10_000_000; let result = count_in_process(limit); println!("{}", result); assert_eq!(result, limit); } fn count_in_process(limit: i32) -> i32 { let mut counter = 0; for _ in 0..limit { counter += 1; } counter }
10000000
Mutex - without threads
- Mutex = mutual exclusion.
A few examples to get used to the syntax of Mutex without even using a thread.
use std::sync; fn main() { integer(); string(); vector(); } fn integer() { let number = sync::Mutex::new(12); { if let Ok(mut guarded_number) = number.lock() { *guarded_number += 23; } } if let Ok(value) = number.into_inner() { println!("{value}"); } } fn string() { let text = sync::Mutex::new(String::new()); { let other_text = String::from("The black cat"); if let Ok(mut guarded_text) = text.lock() { guarded_text.push_str(&other_text); } } if let Ok(value) = text.into_inner() { println!("{value}"); } } fn vector() { let animals = sync::Mutex::new(vec![]); { if let Ok(mut guarded_animals) = animals.lock() { guarded_animals.push(String::from("tiger")); } } if let Ok(value) = animals.into_inner() { println!("{value:?}"); } }
35
The black cat
["tiger"]
- Mutex
Lock with Mutex
use std::sync; use std::thread; // TODO: run many times and show that they work in parallel // and still get the right updates fn main() { let text = sync::Mutex::new(String::new()); let threads = 3; thread::scope(|scope| { for _ in 1..=threads { scope.spawn(|| { //println!("{:?}", thread::current().id()); let mut guarded_text = text.lock().unwrap(); let extra = format!("{:?} ", thread::current().id()); guarded_text.push_str(&extra); }); } }); if let Ok(val) = text.into_inner() { println!("Text: {val}"); } }
Text: ThreadId(2) ThreadId(3) ThreadId(4)
- Mutex
Counter with threads (shared variable) using Mutex
-
Solution is using Mutex
-
This solution is actually slower than the single-threaded solution because the the threads are waiting for each other to free the guards.
fn main() { let limit = 1_000_000; let threads = 10; let result = count_with_mutex(threads, limit); println!("{}", result); assert_eq!(result, limit * threads); } fn count_with_mutex(threads: i32, limit: i32) -> i32 { let counter = std::sync::Mutex::new(0); std::thread::scope(|scope| { for _ in 0..threads { scope.spawn(|| { println!("Start {:?}", std::thread::current().id()); for _ in 0..limit { let mut guarded_counter = counter.lock().unwrap(); *guarded_counter += 1; } }); } }); counter.into_inner().unwrap() }
Start ThreadId(2)
Start ThreadId(5)
Start ThreadId(4)
Start ThreadId(6)
Start ThreadId(3)
Start ThreadId(7)
Start ThreadId(9)
Start ThreadId(8)
Start ThreadId(10)
Start ThreadId(11)
10000000
- Mutex
- lock
Counter with threads (local counting)
- This solution does the work locally and updates the shared variable only at the end
fn main() { let limit = 1_000_000; let threads = 10; let result = count_with_mutex(threads, limit); println!("{}", result); assert_eq!(result, limit * threads); } fn count_with_mutex(threads: i32, limit: i32) -> i32 { let counter = std::sync::Mutex::new(0); std::thread::scope(|scope| { for _ in 0..threads { scope.spawn(|| { println!("Start {:?}", std::thread::current().id()); let mut my_counter = 0; for _ in 0..limit { my_counter += 1; } let mut guard = counter.lock().unwrap(); *guard += my_counter; }); } }); counter.into_inner().unwrap() }
Start ThreadId(2)
Start ThreadId(5)
Start ThreadId(4)
Start ThreadId(6)
Start ThreadId(3)
Start ThreadId(7)
Start ThreadId(9)
Start ThreadId(8)
Start ThreadId(10)
Start ThreadId(11)
10000000
- Mutex
- lock
Counter with message passing
- Solution using messages
fn main() { let threads = 10; let limit = 1000; let result = count_with_messages(threads, limit); println!("{}", result); assert_eq!(result, threads * limit); } fn count_with_messages(threads: i32, limit: i32) -> i32 { let mut counter = 0; let (tx, rx) = std::sync::mpsc::channel(); for _ in 1..=threads { let local_tx = tx.clone(); std::thread::spawn(move || { //println!("{:?}", std::thread::current().id()); for _ in 1..=limit { local_tx.send(1).unwrap(); } }); } drop(tx); // we need to close this Sender so the next loop will end properly for received in rx { counter += received; } counter }
- mpsc
- channel
- send
- drop
- TODO
thread-local variables
use std::cell::RefCell; thread_local! { pub static TEXT: RefCell<String> = const { RefCell::new(String::new()) }; } fn main() { let threads = 3; std::thread::scope(|scope| { for i in 1..=threads { scope.spawn(move || { let msg = format!("doubled: {}", i * 2); TEXT.with_borrow_mut(|val| val.push_str(&msg)); do_something(i) }); } }); } fn do_something(index: i32) { TEXT.with_borrow(|val| { println!("{index} '{val}' {:?}", std::thread::current().id()); }); }
1 'doubled: 2' ThreadId(2)
2 'doubled: 4' ThreadId(3)
3 'doubled: 6' ThreadId(4)
- thread_local!
- RefCell
Exercise: character counting
- Given a string count how many time each character appears in the string.
Input: "abcax"
Output:
a: 2
b: 1
c: 1
x: 1
- First implement a function that does this in a single thread.
- Then create a threaded solution with a shared HashMap where each thread updates the shared HashMap.
- Then create a threaded solution with local HashMaps and then updating the central HashMap at the end of the thread.
Exercise: word count
Implement the default behavior of the wc
command of Linux/Unix. For each file showing
- number of lines
- number of words
- number of bytes
$ wc intro.md files.md strings.md
182 519 5658 intro.md
162 273 3133 files.md
345 943 9708 strings.md
689 1735 18499 total
Exercise: count characters, words
- Given many files (e.g. clone the Rust-maven repository or the slides repository or the ladino-dictionary-data repo)
- Count how many times each character appears.
- Count how many times each word appear.
Exercise: run several functions on the same text
- TODO
In earlier parts of the course I have implemented several functions that were searching a text string to find certain characters. Now run those functions on a large text file.
Allow the user to set the number of threads we would like to use.
Exercise: Download many files in threads
In the chapter about http we had an example for a blocking http client and we had an example downloading all the pages of the Rust Maven site using Tokio and async calls.
Implement the "download many files" application using threads and blocking the http client. Make the main thread collect the sizes of the downloaded pages.
Solution: count characters, words
use std::collections::HashMap; fn main() { let (threads, files) = get_args(); if threads == 1 { let mut total: HashMap<char, u32> = HashMap::new(); for file in files { let text = std::fs::read_to_string(file).unwrap(); let data = count_characters(&text); println!("{:#?}", &data); add(&mut total, &data); println!("{:#?}", total); } } } fn get_args() -> (u32, Vec<String>) { let args = std::env::args().collect::<Vec<String>>(); if args.len() < 3 { eprintln!("Usage: {} THREADS, FILEs", args[0]); std::process::exit(1); } (args[1].parse().unwrap(), args[2..].to_owned()) } fn count_characters(text: &str) -> HashMap<char, u32> { let mut counter: HashMap<char, u32> = HashMap::new(); for ch in text.chars() { *counter.entry(ch).or_insert(0) += 1; } counter } fn add(total: &mut HashMap<char, u32>, other: &HashMap<char, u32>) { for (key, value) in other.iter() { *total.entry(*key).or_insert(0) += value; } }
Solution: run several functions on the same text
fn main() { // let (threads, filename) = get_args(); println!("Hello, world!"); } // fn get_args() -> (u32, String) { // let args = std::env::args().collect::<Vec<String>>(); // if args.len() != 3 { // eprintln!("Usage: {} THREADS, FILE", args[0]); // std::process::exit(1); // } // (args[1].parse().unwrap(), args[2].to_owned()) // } // The idea here is to create CPU intensive functions to be able to show the impact ot threading. // It is not to create the most optimal functions // return the first character that repeates itself #[allow(dead_code)] fn find_double_characters(text: &str, mut nth: u32) -> Option<char> { if text.len() < 2 { return None; } if nth < 1 { return None; } let mut resp = None; for chr in text.chars() { if resp.is_none() { resp = Some(chr); continue; } if Some(chr) == resp { if nth == 1 { return resp; } else { nth -= 1; } } resp = Some(chr); } None } #[cfg(test)] mod tests { use super::*; #[test] fn test_find_double_characters() { assert_eq!(find_double_characters("", 1), None); assert_eq!(find_double_characters("aa", 0), None); assert_eq!(find_double_characters("a", 1), None); assert_eq!(find_double_characters("aa", 1), Some('a')); assert_eq!(find_double_characters("aabb", 1), Some('a')); assert_eq!(find_double_characters("aabb", 2), Some('b')); } }
- TODO: How to share workload? e.g. We would like to create 10,000 files with the sequence number of the file being both the content and the filename.
- TODO: What if we have a vector of 10,000 values and we would like to save each one of them in a separate file?
Threadpool
Use threadpool with messages
- threadpool
- Using channel messages to let the main thread know the processing is done.
use std::sync::mpsc::channel; use threadpool::ThreadPool; fn main() { let numbers = (0..100).collect::<Vec<u32>>(); //numbers.iter().map(|val| do_something(*val)).for_each(drop); let n_workers = 4; let pool = ThreadPool::new(n_workers); let (tx, rx) = channel(); let count = numbers.len(); for num in numbers { let tx = tx.clone(); pool.execute(move || { do_something(num); tx.send(1) .expect("channel will be there waiting for the pool"); }); } #[allow(clippy::unnecessary_fold)] let result = rx.iter().take(count).fold(0, |a, b| a + b); assert_eq!(result, count); } fn do_something(num: u32) { println!("{} {:?}", num, std::thread::current().id()); }
- threadpool
Threaded map
Deprecated: A threaded version of the map function
-
TODO
-
It seems this crate is not available any more.
-
Using the threaded-map crate we can use the threadpool as the
map
function.
use threaded_map::ThreadedMappable; fn main() { let items = vec![1, 2, 3, 4, 5, 6]; let target = items.iter().map(|num| double(*num)).collect::<Vec<_>>(); println!("{:?}", target); let results = items .into_iter() .threaded_map(double, None) .collect::<Vec<_>>(); println!("{:?}", results); assert_eq!(results, target); } fn double(n: i32) -> i32 { println!("{:?}", std::thread::current().id()); n * 2 }
[package]
name = "map-with-thread"
version = "0.1.0"
edition = "2021"
[dependencies]
threaded-map = "0.2.0"
rayon
What is rayon
replace map with par_iter
use rayon::prelude::*; fn main() { let numbers = (1..10).collect::<Vec<_>>(); let linear = numbers.iter().map(|number| number * 2).collect::<Vec<_>>(); let parallel = numbers .par_iter() .map(|number| number * 2) .collect::<Vec<_>>(); println!("{numbers:?}"); println!("{linear:?}"); println!("{parallel:?}"); assert_eq!(linear, parallel); let _parallel = numbers .par_iter() .map(|number| { println!("{:?}", std::thread::current().id()); number * 2 }) .collect::<Vec<_>>(); }
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[2, 4, 6, 8, 10, 12, 14, 16, 18]
[2, 4, 6, 8, 10, 12, 14, 16, 18]
ThreadId(8)
ThreadId(9)
ThreadId(2)
ThreadId(17)
ThreadId(6)
ThreadId(5)
ThreadId(7)
ThreadId(3)
ThreadId(15)
- rayon
- par_iter
- map
Tasks with different processing time
- TODO
Experimental example to show how Rayon distributes the load when the tasks have random processing time (betwee 1-1000 ms) and/or when there are a few long-running tasks and many short tasks.
//use rand::Rng; use rayon::prelude::*; use std::collections::HashMap; fn main() { //let numbers = (1..1000).map(|_| rand::thread_rng().gen_range(0..=300) as u64).collect::<Vec<_>>(); let mut numbers = vec![3000_u64, 3000]; numbers.extend([1; 1000]); // println!("{numbers:?}"); let parallel = numbers .par_iter() .map(|number| { //println!("{:?} sleeping for {}", std::thread::current().id(), number); std::thread::sleep(std::time::Duration::from_millis(*number)); format!("{:?}", std::thread::current().id()) }) .collect::<Vec<_>>(); // println!("{parallel:?}"); let mut counter: HashMap<String, u32> = HashMap::new(); for id in parallel { *counter.entry(id).or_insert(0) += 1; } for (key, value) in counter { println!("{key} {value}"); } }
map with threads
TODO: This is experimental code that needs to be improved
use rayon::prelude::*; use std::marker::Send; use std::sync::mpsc; use std::thread; fn main() { let numbers: Vec<i32> = (1..=10).collect(); println!("{:?}", numbers); let doubles = numbers.iter().map(|num| 2 * num).collect::<Vec<i32>>(); println!("{:?}", doubles); let doubles = numbers.iter().map(double_function).collect::<Vec<i32>>(); println!("{:?}", doubles); let doubles = map_thread(&numbers, double_function, 3); println!("{:?}", doubles); let doubles = numbers .into_par_iter() .map(double_function2) .collect::<Vec<i32>>(); //let doubles = numbers.into_par_iter().map(|val| double_function(&val)).collect::<Vec<i32>>(); println!("{:?}", doubles); } fn double_function(num: &i32) -> i32 { //println!("{:?}", std::thread::current().id()); 2 * num } fn double_function2(num: i32) -> i32 { //println!("{:?}", std::thread::current().id()); 2 * num } fn map_thread<Tin: Send + Copy + 'static, Tout: Ord + Send + Copy + 'static>( params: &[Tin], func: fn(&Tin) -> Tout, max_threads: i32, ) -> Vec<Tout> { //params.iter().map(double_function).collect::<Vec<Tout>>() //params.iter().map(func).collect::<Vec<Tout>>() let (tx, rx) = mpsc::channel(); let mut thread_count = 0; let mut started = 0; let mut finished = 0; let mut results: Vec<(i32, Tout)> = vec![]; for paramx in params.iter() { let number = *paramx; started += 1; let mytx = tx.clone(); thread::Builder::new() .name(format!("{}", started)) .spawn(move || { let id: i32 = thread::current().name().unwrap().parse().unwrap(); let res = func(&number); mytx.send((id, res)).unwrap(); }) .unwrap(); thread_count += 1; if thread_count >= max_threads { let received = rx.recv().unwrap(); results.push(received); finished += 1; } } for received in rx { finished += 1; results.push(received); if finished >= started { break; } } results.sort(); results.iter().map(|item| item.1).collect::<Vec<Tout>>() }
async
tokio
-
TODO
-
See the http section for examples.
-
TODO: show an example of a computationally heavy task that I run in async (severeal in parallel) and how they hand back execution to the eventloop.
Simple async example with Tokio
[package]
name = "simple"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.39.2", features = ["full"] }
async fn do_something() { println!("Start to do something"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; println!("End to do something"); } fn main() { println!("Start"); let rt = tokio::runtime::Runtime::new().unwrap(); let future = do_something(); println!("Before block"); rt.block_on(future); println!("End"); }
Start
Before block
Start to do something
End to do something
End