Concurrency

Fearless Concurrency

Safe concurrent programming with Rust









Why Concurrency?

Modern hardware has many cores – we want to use them all!

But concurrent programming is notoriously hard:

  • Data races: two threads access shared data at the same time
  • Deadlocks: two threads wait for each other forever
  • Hard to reproduce: bugs depend on scheduling, appear randomly

In most languages, these bugs show up at runtime (if you’re lucky).

In Rust, the compiler catches them at compile time.









Goal Today

  • [?] Send and Sync








Creating Threads

Use thread::spawn with a closure

fn test_spawn() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("spawned: {i}");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..3 {
        println!("main: {i}");
        thread::sleep(Duration::from_millis(1));
    }
}

Output (varies each run!):

main: 1
spawned: 1
main: 2
spawned: 2
spawned: 3

Boo!: when main ends, spawned threads are killed – even if not finished!









Waiting for Threads with join

thread::spawn returns a JoinHandle; calling .join() waits for the thread to finish

use std::thread;
use std::time::Duration;

fn test_spawn_join() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("spawned: {i}");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..3 {
        println!("main: {i}");
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();  // wait for spawned thread to finish
}

handle.join() forces the main thread to wait for the spawned thread to finish.









QUIZ

What happens when we compile this?

fn test_spawn_with_vec() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {v:?}");
    });

    handle.join().unwrap();
}









Threads and Ownership: The Problem

The closure borrows v, but the spawned thread might outlive main!

error[E0373]: closure may outlive the current function,
              but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {v:?}");
  |                                     - `v` is borrowed here
  |
help: to force the closure to take ownership of `v`,
      use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

What if main dropped v before the thread used it? Dangling reference!









Fix: move Closures

Use move to transfer ownership of v into the thread

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {v:?}");
    });

    handle.join().unwrap();
}

Now the thread owns v. Main can no longer use it.









QUIZ

What happens here?

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {v:?}");
    });

    println!("v is: {v:?}");
    handle.join().unwrap();
}









Cannot Use v After Move!

error[E0382]: borrow of moved value: `v`
  --> src/main.rs:10:22
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`,
              which does not implement the `Copy` trait
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {v:?}");
   |                                     - variable moved due to use in closure
...
10 |     println!("v is: {v:?}");
   |                      ^ value borrowed here after move

The ownership rules we already know prevent sharing data unsafely between threads!









Goal Today

  • [+] Threads: thread::spawn, move closures, join()
  • [?] Send and Sync








Message Passing with Channels

“Do not communicate by sharing memory; instead, share memory by communicating.” – Go proverb

A channel has two ends:

  • Transmitter (tx): sends data
  • Receiver (rx): receives data

The channel is closed when either end is dropped.









Creating a Channel

use std::sync::mpsc;
use std::thread;

fn test_channel() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let msg = String::from("hello");
        tx.send(msg).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

When you run this

Got: hello
  • mpsc = multiple producer, single consumer
  • tx.send(msg) sends a value (transfers ownership!)
  • rx.recv() blocks until a value arrives









QUIZ

What happens here?

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let msg = String::from("hello");
        tx.send(msg).unwrap();
        println!("sent: {msg}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}









send Transfers Ownership!

error[E0382]: borrow of moved value: `msg`
  --> src/main.rs:10:27
   |
 8 |         let msg = String::from("hello");
   |             --- move occurs because `msg` has type `String`,
                  which does not implement the `Copy` trait
 9 |         tx.send(msg).unwrap();
   |                 --- value moved here
10 |         println!("sent: {msg}");
   |                          ^^^ value borrowed here after move

send takes ownership of msg – you can’t use it afterwards.

but why???









This prevents the sender from modifying data while the receiver reads it!









Sending Multiple Values

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn test_multiple_messages() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let msgs = vec!["hello", "from", "the", "other", "side"];
        for msg in msgs {
            tx.send(String::from(msg)).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    for received in rx {         // rx works as an iterator!
        println!("Got: {received}");
    }
}

When we run it, we get

Got: hello
Got: from
Got: the
Got: other
Got: side

The for loop over rx is an iterator that:

  1. blocks waiting for values and
  2. finishes when the channel closes.









Multiple Producers

Clone tx to send from multiple threads to the same receiver

use std::sync::mpsc;
use std::thread;

fn test_multiple_producers() {
    let (tx1, rx) = mpsc::channel();

    let tx2 = tx1.clone();
    thread::spawn(move || {
        let msgs = vec!["hello", "from", "the", "other", "side"];
        for msg in msgs {
            tx1.send(String::from(msg)).unwrap();
        }
    });

    thread::spawn(move || {
        let msgs = vec!["i", "must've", "called", "a", "thousand", "times"];
        for msg in msgs {
            tx2.send(String::from(msg)).unwrap();
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Output order is nondeterministic – depends on scheduling!









Goal Today

  • [+] Threads: thread::spawn, move closures, join()
  • [+] Message Passing: mpsc::channel(), send, recv, multiple producers
  • [?] Send and Sync








Shared State: Mutex

Sometimes threads need to share data (not just pass messages).

Examples?

  • ???
  • ???







A Mutex (mutual exclusion) guards shared data:

  1. Lock the mutex before accessing data
  2. Use the data
  3. Unlock when done (happens automatically!)

Like a microphone at a panel discussion – only one speaker at a time.









Using Mutex<T>

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap(); // acquire lock
        *num = 6;                        // mutate the data
    }                                    // lock released automatically!

    println!("m = {:?}", m); // m = Mutex { data: 6 }
}
  • Mutex::new(5) wraps the value 5 in a mutex
  • .lock() blocks until the lock is acquired; returns a MutexGuard
  • MutexGuard auto-unlocks when it goes out of scope (like Drop)









QUIZ

What happens here?

fn test_mutex_many() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}









Cannot Move counter Into Multiple Threads!

error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:15:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`,
                     which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure
...

counter gets moved into the first thread’s closure. The second iteration tries to move it again – but it’s already gone!

We need multiple ownership across threads.









Solution: Arc<T> (Atomic Reference Counting)

Arc<T> uses atomic operations to allow multiple sharers across threads!

  • “RC” is for Reference Counting; data tracks how many owners there are
  • “A” is for Atomic; reference count updates are thread-safe
  • Data is “dropped” when number of owners goes to 0
fn test_mutex_many() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Result: 10









Arc + Mutex: The Pattern

  • Arc gives multiple ownership across threads
  • Mutex gives interior mutability (only one writer at a time)
  • Together: shared mutable state that is safe

Note: Arc has a performance cost (atomic operations).

(Sometimes, when you need multiple owners in single-threaded code, use Rc instead.)









Goal Today

  • [+] Threads: thread::spawn, move closures, join()
  • [+] Message Passing: mpsc::channel(), send, recv, multiple producers
  • [+] Shared State: Mutex<T>, Arc<T>, Arc<Mutex<T>>
  • [?] Send and Sync








Goal Today

  • [+] Threads: thread::spawn, move closures, join()
  • [+] Message Passing: mpsc::channel(), send, recv, multiple producers
  • [+] Shared State: Mutex<T>, Arc<T>, Arc<Mutex<T>>
  • [?] Send and Sync: compiler-checked thread safety







Summary: Fearless Concurrency

Rust prevents concurrency bugs at compile time using the same tools we already know:

  • Ownership prevents data races (only one owner)
  • Borrowing prevents dangling references across threads
  • Send/Sync traits prevent non-thread-safe types from crossing thread boundaries
Tool What it does
thread::spawn create a new thread
move closures transfer ownership into a thread
mpsc::channel send data between threads (ownership transfer)
Mutex<T> shared mutable access with locking
Arc<T> thread-safe reference counting
Send / Sync compile-time thread safety checks









Material Inspired by

  • https://doc.rust-lang.org/book/ch16-01-threads.html
  • https://doc.rust-lang.org/book/ch16-02-message-passing.html
  • https://doc.rust-lang.org/book/ch16-03-shared-state.html
  • https://doc.rust-lang.org/book/ch16-04-extensible-concurrency-sync-and-send.html