Using Message Passing to Transfer Data Between Threads

One increasingly popular approach to ensuring safe concurrency is message passing, where threads communicate by sending each other messages containing data. Here's the idea in a slogan from the Go language documentation: "Do not communicate by sharing memory; instead, share memory by communicating."

To accomplish message-sending concurrency, Oxide's standard library provides an implementation of channels. A channel is a programming concept by which data is sent from one thread to another.

You can imagine a channel in programming as being like a directional channel of water, such as a stream or a river. If you put something like a rubber duck into a river, it will travel downstream to the end of the waterway.

A channel has two halves: a transmitter and a receiver. The transmitter half is the upstream location where you put rubber ducks into the river, and the receiver half is where the rubber duck ends up downstream. One part of your code calls methods on the transmitter with the data you want to send, and another part checks the receiving end for arriving messages. A channel is said to be closed if either the transmitter or receiver half is dropped.

Creating a Channel

Let's start by creating a channel that doesn't do anything:

import std.sync.mpsc

fn main() {
    let (tx, rx) = mpsc.channel()
}

We create a new channel using the mpsc.channel function. The name mpsc stands for multiple producer, single consumer. This means a channel can have multiple sending ends that produce values but only one receiving end that consumes those values. Think of multiple streams flowing into one big river: everything sent down any of the streams will end up in one river at the end.

The mpsc.channel function returns a tuple, the first element of which is the transmitter (often called tx) and the second element is the receiver (often called rx). We use let with a pattern to destructure the tuple.

Let's move the transmitting end into a spawned thread and have it send one string so the spawned thread is communicating with the main thread:

import std.sync.mpsc
import std.thread

fn main() {
    let (tx, rx) = mpsc.channel()

    thread.spawn move {
        let val = "hi".toString()
        tx.send(val).unwrap()
    }
}

We're using thread.spawn to create a new thread and then using move to move tx into the closure so the spawned thread owns tx. The spawned thread needs to own the transmitter to send messages through the channel.

The transmitter has a send method that takes the value we want to send. The send method returns a Result<T, E> type, so if the receiver has already been dropped and there's nowhere to send a value, the send operation will return an error. In this example, we're calling unwrap to panic in case of an error.

Receiving Values from the Channel

Now let's receive the value in the main thread:

import std.sync.mpsc
import std.thread

fn main() {
    let (tx, rx) = mpsc.channel()

    thread.spawn move {
        let val = "hi".toString()
        tx.send(val).unwrap()
    }

    let received = rx.recv().unwrap()
    println!("Got: \(received)")
}

The receiver has two useful methods: recv and tryRecv. We're using recv, short for receive, which will block the main thread's execution and wait until a value is sent down the channel. Once a value is sent, recv will return it in a Result<T, E>. When the transmitter closes, recv will return an error to signal that no more values will be coming.

The tryRecv method doesn't block, but will instead return a Result<T, E> immediately: an Ok value holding a message if one is available and an Err value if there aren't any messages this time. Using tryRecv is useful if this thread has other work to do while waiting for messages: we could write a loop that calls tryRecv every so often, handles a message if one is available, and otherwise does other work for a little while until checking again.

We've used recv in this example for simplicity; we don't have any other work for the main thread to do other than wait for messages, so blocking the main thread is appropriate.

When we run this code, we'll see the value printed from the main thread:

Got: hi

Channels and Ownership Transference

The ownership rules play a vital role in message sending because they help you write safe, concurrent code. Preventing errors in concurrent programming is the advantage of thinking about ownership throughout your Oxide programs. Let's do an experiment to show how channels and ownership work together to prevent problems.

Consider what would happen if we tried to use val in the spawned thread after we've sent it down the channel:

import std.sync.mpsc
import std.thread

fn main() {
    let (tx, rx) = mpsc.channel()

    thread.spawn move {
        let val = "hi".toString()
        tx.send(val).unwrap()
        println!("val is \(val)")  // Error!
    }

    let received = rx.recv().unwrap()
    println!("Got: \(received)")
}

Here, we try to print val after we've sent it down the channel via tx.send. Allowing this would be a bad idea: once the value has been sent to another thread, that thread could modify or drop it before we try to use the value again. Potentially, the other thread's modifications could cause errors or unexpected results due to inconsistent or nonexistent data.

The compiler catches this mistake:

error[E0382]: borrow of moved value: `val`
  --> src/main.ox:10:32
   |
8  |         let val = "hi".toString()
   |             --- move occurs because `val` has type `String`, which does
   |                 not implement the `Copy` trait
9  |         tx.send(val).unwrap()
   |                 --- value moved here
10 |         println!("val is \(val)")
   |                            ^^^ value borrowed here after move

Our concurrency mistake has caused a compile time error. The send function takes ownership of its parameter, and when the value is moved, the receiver takes ownership of it. This stops us from accidentally using the value again after sending it; the ownership system checks that everything is okay.

Sending Multiple Values and Seeing the Receiver Waiting

The previous code compiled and ran, but it didn't clearly show us that two separate threads were talking to each other over the channel. Let's make a modification that will prove the code is running concurrently: the spawned thread will send multiple messages and pause for a second between each message:

import std.sync.mpsc
import std.thread
import std.time.Duration

fn main() {
    let (tx, rx) = mpsc.channel()

    thread.spawn move {
        let vals = vec![
            "hi".toString(),
            "from".toString(),
            "the".toString(),
            "thread".toString(),
        ]

        for val in vals {
            tx.send(val).unwrap()
            thread.sleep(Duration.fromSecs(1))
        }
    }

    for received in rx {
        println!("Got: \(received)")
    }
}

This time, the spawned thread has a vector of strings that we want to send to the main thread. We iterate over them, sending each individually, and pause between each by calling the thread.sleep function with a Duration value of 1 second.

In the main thread, we're not calling the recv function explicitly anymore: instead, we're treating rx as an iterator. For each value received, we print it. When the channel is closed, iteration will end.

When running this code, you should see the following output with a 1-second pause between each line:

Got: hi
Got: from
Got: the
Got: thread

Because we don't have any code that pauses or delays in the for loop in the main thread, we can tell that the main thread is waiting to receive values from the spawned thread.

Creating Multiple Producers by Cloning the Transmitter

Earlier we mentioned that mpsc stands for multiple producer, single consumer. Let's put mpsc to use and expand the code to create multiple threads that all send values to the same receiver. We can do so by cloning the transmitter:

import std.sync.mpsc
import std.thread
import std.time.Duration

fn main() {
    let (tx, rx) = mpsc.channel()

    // Clone the transmitter for the second thread
    let tx1 = tx.clone()

    // First producer thread
    thread.spawn move {
        let vals = vec![
            "hi".toString(),
            "from".toString(),
            "the".toString(),
            "thread".toString(),
        ]

        for val in vals {
            tx1.send(val).unwrap()
            thread.sleep(Duration.fromSecs(1))
        }
    }

    // Second producer thread
    thread.spawn move {
        let vals = vec![
            "more".toString(),
            "messages".toString(),
            "for".toString(),
            "you".toString(),
        ]

        for val in vals {
            tx.send(val).unwrap()
            thread.sleep(Duration.fromSecs(1))
        }
    }

    for received in rx {
        println!("Got: \(received)")
    }
}

This time, before we create the first spawned thread, we call clone on the transmitter. This will give us a new transmitter we can pass to the first spawned thread. We pass the original transmitter to a second spawned thread. This gives us two threads, each sending different messages to the one receiver.

When you run the code, your output will probably look something like this:

Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: thread
Got: you

You might see the values in another order, depending on your system. This is what makes concurrency interesting as well as difficult. If you experiment with thread.sleep, giving it various values in the different threads, each run will be more nondeterministic and create different output each time.

Rust Comparison

The channel API works identically between Rust and Oxide:

ConceptRustOxide
Importuse std::sync::mpsc;import std.sync.mpsc
Create channelmpsc::channel()mpsc.channel()
Sendtx.send(val)tx.send(val)
Receiverx.recv()rx.recv()
Try receiverx.tryRecv()rx.tryRecv()
Clone transmittertx.clone()tx.clone()

The ownership transfer semantics are identical: when you send a value through a channel, ownership moves to the receiver. This prevents data races by ensuring only one thread can access the data at a time.

Summary

  • Channels allow threads to communicate by passing messages
  • Use mpsc.channel() to create a transmitter/receiver pair
  • send transfers ownership of the value to the channel
  • recv blocks until a value is available; tryRecv returns immediately
  • Clone the transmitter to create multiple producers
  • The receiver can be used as an iterator
  • Ownership rules prevent accessing values after they're sent