Concurrency with Async

One of async programming's greatest strengths is efficiently handling multiple operations at once. In this section, we'll explore how to run async operations concurrently, communicate between tasks, and coordinate complex workflows.

Sequential vs. Concurrent Execution

First, let's understand the difference between sequential and concurrent async code:

import tokio
import tokio.time.{ sleep, Duration }

async fn fetchData(name: &str, delayMs: UInt64): String {
    println!("Starting fetch for \(name)...")
    await sleep(Duration.fromMillis(delayMs))
    println!("Finished fetch for \(name)")
    "data_\(name)".toString()
}

#[tokio.main]
async fn main() {
    // Sequential: total time ~3 seconds
    let a = await fetchData("A", 1000)
    let b = await fetchData("B", 1000)
    let c = await fetchData("C", 1000)
    println!("Sequential results: \(a), \(b), \(c)")
}

Each await waits for completion before starting the next fetch. With three 1-second fetches, this takes about 3 seconds total.

Running Futures Concurrently with join!

To run futures concurrently, use tokio.join!:

import tokio

#[tokio.main]
async fn main() {
    // Concurrent: total time ~1 second (they run together!)
    let (a, b, c) = await tokio.join!(
        fetchData("A", 1000),
        fetchData("B", 1000),
        fetchData("C", 1000)
    )
    println!("Concurrent results: \(a), \(b), \(c)")
}

All three fetches start immediately and run concurrently. Since they each take 1 second and run in parallel, the total time is about 1 second.

Note the prefix await before tokio.join! - the macro produces a future that we then await.

Spawning Independent Tasks

Sometimes you want a task to run independently in the background. Use tokio.spawn:

import tokio

#[tokio.main]
async fn main() {
    // Spawn a background task
    let handle = tokio.spawn(async {
        await sleep(Duration.fromSecs(2))
        println!("Background task complete!")
        42
    })

    // Main task continues immediately
    println!("Main task doing work...")
    await sleep(Duration.fromSecs(1))
    println!("Main task still working...")

    // Wait for the spawned task to complete
    let result = await handle
    println!("Background task returned: \(result.unwrap())")
}

Output:

Main task doing work...
Main task still working...
Background task complete!
Background task returned: 42

The spawned task runs concurrently with the main task. tokio.spawn returns a JoinHandle that you can await to get the result.

Important: Move Semantics with Spawn

Spawned tasks may outlive the current function, so they must own their data:

async fn processUser(user: User) {
    // ERROR: task may outlive this function, but borrows `user`
    tokio.spawn(async {
        println!("Processing \(user.name)")
    })

    // CORRECT: move ownership into the task
    tokio.spawn(async move {
        println!("Processing \(user.name)")
    })
}

Message Passing Between Tasks

Tasks often need to communicate. Tokio provides async channels for this:

import tokio
import tokio.sync.mpsc

#[tokio.main]
async fn main() {
    // Create a channel with buffer size 32
    let (tx, mut rx) = mpsc.channel(32)

    // Spawn a producer task
    let producer = tokio.spawn(async move {
        for i in 0..5 {
            await tx.send(format!("Message \(i)"))
            await sleep(Duration.fromMillis(100))
        }
        // tx is dropped here, closing the channel
    })

    // Spawn a consumer task
    let consumer = tokio.spawn(async move {
        while let Some(msg) = await rx.recv() {
            println!("Received: \(msg)")
        }
        println!("Channel closed")
    })

    // Wait for both tasks
    await tokio.join!(producer, consumer)
}

Output:

Received: Message 0
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Channel closed

Key points:

  • mpsc.channel(n) creates a multi-producer, single-consumer channel
  • tx.send() is async and may wait if the buffer is full
  • rx.recv() is async and returns null when the channel closes
  • Dropping all senders closes the channel

Multiple Producers

Clone the sender to allow multiple tasks to send:

import tokio
import tokio.sync.mpsc

#[tokio.main]
async fn main() {
    let (tx, mut rx) = mpsc.channel(32)

    // Spawn multiple producer tasks
    for i in 0..3 {
        let tx = tx.clone()  // Clone for each task
        tokio.spawn(async move {
            for j in 0..3 {
                await tx.send(format!("Producer \(i), message \(j)"))
                await sleep(Duration.fromMillis(50))
            }
        })
    }

    // Drop the original sender so channel closes when tasks finish
    drop(tx)

    // Receive all messages
    while let Some(msg) = await rx.recv() {
        println!("\(msg)")
    }
}

Racing Futures with select!

Sometimes you want the result of whichever future completes first:

import tokio

async fn fetchFastest(): String {
    await tokio.select! {
        result = fetchFromServerA() -> result,
        result = fetchFromServerB() -> result,
    }
}

#[tokio.main]
async fn main() {
    let fastest = await fetchFastest()
    println!("Got response: \(fastest)")
}

async fn fetchFromServerA(): String {
    await sleep(Duration.fromMillis(100))
    "Response from A".toString()
}

async fn fetchFromServerB(): String {
    await sleep(Duration.fromMillis(200))
    "Response from B".toString()
}

The select! macro races the futures and returns when the first one completes. The other futures are cancelled.

Select with Timeouts

A common pattern is racing an operation against a timeout:

import tokio
import tokio.time.timeout

async fn fetchWithTimeout(url: &str): Result<Response, Error> {
    match await timeout(Duration.fromSecs(5), fetch(url)) {
        Ok(response) -> Ok(response),
        Err(elapsed) -> Err(Error.Timeout(elapsed)),
    }
}

// Or using select! directly:
async fn fetchWithTimeoutSelect(url: &str): Result<Response, Error> {
    await tokio.select! {
        response = fetch(url) -> Ok(response),
        _ = sleep(Duration.fromSecs(5)) -> Err(Error.Timeout),
    }
}

Fair Scheduling with join!

The join! macro provides fair scheduling between futures:

import tokio

#[tokio.main]
async fn main() {
    await tokio.join!(
        countTo("A", 5),
        countTo("B", 5),
    )
}

async fn countTo(name: &str, n: Int) {
    for i in 1..=n {
        println!("\(name): \(i)")
        await tokio.task.yieldNow()  // Let other tasks run
    }
}

Output (interleaved):

A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
...

The yieldNow() function explicitly yields control to the runtime, allowing other tasks to make progress.

Handling Multiple Channels

Use select! to handle messages from multiple sources:

import tokio
import tokio.sync.mpsc

#[tokio.main]
async fn main() {
    let (tx1, mut rx1) = mpsc.channel(10)
    let (tx2, mut rx2) = mpsc.channel(10)

    // Spawn producers
    tokio.spawn(async move {
        for i in 0..3 {
            await tx1.send(format!("From channel 1: \(i)"))
            await sleep(Duration.fromMillis(100))
        }
    })

    tokio.spawn(async move {
        for i in 0..3 {
            await tx2.send(format!("From channel 2: \(i)"))
            await sleep(Duration.fromMillis(150))
        }
    })

    // Handle messages from both channels
    loop {
        await tokio.select! {
            msg = rx1.recv() -> {
                match msg {
                    Some(m) -> println!("RX1: \(m)"),
                    null -> break,
                }
            },
            msg = rx2.recv() -> {
                match msg {
                    Some(m) -> println!("RX2: \(m)"),
                    null -> break,
                }
            },
        }
    }
}

Shared State Between Tasks

For shared mutable state, use tokio.sync.Mutex:

import tokio
import tokio.sync.Mutex
import std.sync.Arc

#[tokio.main]
async fn main() {
    let counter = Arc.new(Mutex.new(0))

    var handles = vec![]

    for _ in 0..10 {
        let counter = Arc.clone(&counter)
        let handle = tokio.spawn(async move {
            for _ in 0..100 {
                var lock = await counter.lock()
                *lock += 1
            }
        })
        handles.push(handle)
    }

    for handle in handles {
        await handle.unwrap()
    }

    println!("Counter: \(await counter.lock())")  // Prints: Counter: 1000
}

Note: tokio.sync.Mutex is designed for async code. It allows the task to yield while waiting for the lock, unlike std.sync.Mutex which blocks the thread.

Task Cancellation

When you drop a future before it completes, it's cancelled:

import tokio

#[tokio.main]
async fn main() {
    let handle = tokio.spawn(async {
        println!("Task starting...")
        await sleep(Duration.fromSecs(10))
        println!("Task complete!")  // Never printed if cancelled
    })

    // Wait a bit then cancel
    await sleep(Duration.fromSecs(1))
    handle.abort()  // Cancel the task

    // Check if it was cancelled
    match await handle {
        Ok(_) -> println!("Task completed"),
        Err(e) if e.isCancelled() -> println!("Task was cancelled"),
        Err(e) -> println!("Task failed: \(e)"),
    }
}

Cancellation happens at await points - if a task is in the middle of non-async code when cancelled, it will continue until the next await.

Summary

This section covered key concurrency patterns:

  • join! runs multiple futures concurrently and waits for all
  • tokio.spawn creates independent background tasks
  • Channels (mpsc) enable message passing between tasks
  • select! races futures and returns the first to complete
  • yieldNow() explicitly yields control to other tasks
  • Async Mutex provides safe shared mutable state
  • Task cancellation happens when futures are dropped

Remember: all await expressions use prefix syntax in Oxide. The examples above consistently show await tokio.join!(...), await tx.send(...), and similar patterns.

In the next section, we'll explore more advanced future patterns including custom timeouts and composing futures in sophisticated ways.