Converting to a Multithreaded Web Server

The single-threaded server works, but it can only handle one request at a time. Let's add concurrency using a thread pool so multiple clients can be served simultaneously.

The Challenge

We could spawn a new thread for each incoming connection:

for stream in listener.incoming() {
    let stream = stream.expect("Failed to accept connection")

    thread.spawn {
        handleConnection(stream)
    }
}

While this works, it's inefficient. Creating a new thread for each connection consumes resources and the operating system can only create so many threads before performance degrades.

Thread Pool Design

A better approach is a thread pool: a fixed number of worker threads that wait for jobs.

The workflow:

  1. Main thread accepts connections
  2. Main thread puts the work (handling a connection) in a job queue
  3. Worker threads take jobs from the queue and process them
  4. When a job is done, the worker waits for the next job

Creating the Library Structure

Create src/lib.ox:

import std.sync.{mpsc, Arc, Mutex}
import std.thread

public struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc.Sender<Message>,
}

struct Worker {
    id: UIntSize,
    thread: thread.JoinHandle<()>?,
}

enum Message {
    NewJob(Job),
    Terminate,
}

type Job = Box<dyn Fn() + Send + 'static>

extension ThreadPool {
    public static fn new(size: UIntSize): ThreadPool {
        assert!(size > 0, "Thread pool size must be greater than 0")

        let (sender, receiver) = mpsc.channel()
        let receiver = Arc.new(Mutex.new(receiver))

        var workers = Vec.new()

        for id in 0..size {
            workers.push(Worker.new(id, Arc.clone(&receiver)))
        }

        ThreadPool { workers, sender }
    }

    public fn execute(f: Job) {
        let message = Message.NewJob(f)
        self.sender.send(message).expect("Failed to send job")
    }
}

extension Worker {
    fn new(id: UIntSize, receiver: Arc<Mutex<mpsc.Receiver<Message>>>): Worker {
        let thread = thread.spawn(move {
            loop {
                let message = receiver.lock().expect("Mutex poisoned").recv().expect("Failed to receive message")

                match message {
                    Message.NewJob(job) -> {
                        println!("Worker \(id) got a job; executing.")
                        job()
                    }
                    Message.Terminate -> {
                        println!("Worker \(id) was told to terminate.")
                        break
                    }
                }
            }
        })

        Worker { id, thread: Some(thread) }
    }
}

extension ThreadPool: Drop {
    mutating fn drop() {
        println!("Sending terminate message to all workers.")

        for _ in &self.workers {
            self.sender.send(Message.Terminate).expect("Failed to send terminate message")
        }

        println!("Shutting down all workers.")

        for worker in &mut self.workers {
            println!("Shutting down worker \(worker.id)")

            if let Some(thread) = worker.thread.take() {
                thread.join().expect("Failed to join worker thread")
            }
        }
    }
}

Understanding the Design

The Job Type

type Job = Box<dyn Fn() + Send + 'static>

This defines a type alias for:

  • Box<...> - A boxed value on the heap
  • dyn Fn() - A dynamic trait object for a function that takes no arguments and returns nothing
  • Send - The function can be sent between threads
  • 'static - The function has no borrowed data with a limited lifetime

The Message Enum

enum Message {
    NewJob(Job),
    Terminate,
}

The job queue sends Message enums:

  • NewJob(job) - A new job to execute
  • Terminate - Signal to shutdown

Thread Pool Creation

public static fn new(size: UIntSize): ThreadPool {
    let (sender, receiver) = mpsc.channel()
    let receiver = Arc.new(Mutex.new(receiver))

    var workers = Vec.new()

    for id in 0..size {
        workers.push(Worker.new(id, Arc.clone(&receiver)))
    }

    ThreadPool { workers, sender }
}

We:

  • Create a multi-producer, single-consumer channel
  • Wrap the receiver in Arc<Mutex<...>> so multiple threads can share it
  • Spawn size worker threads, each with a clone of the receiver
  • Return the thread pool with the sender

Worker Thread Loop

let thread = thread.spawn(move {
    loop {
        let message = receiver.lock().expect("Mutex poisoned").recv().expect("Failed to receive message")

        match message {
            Message.NewJob(job) -> {
                job()
            }
            Message.Terminate -> {
                break
            }
        }
    }
})

Each worker:

  • Continuously loops waiting for messages
  • Locks the mutex to access the receiver
  • Blocks until a message arrives
  • Either executes the job or terminates

Sending Jobs

public fn execute(f: Job) {
    let message = Message.NewJob(f)
    self.sender.send(message).expect("Failed to send job")
}

The main thread sends jobs through the channel. Thanks to mpsc (multi-producer), multiple threads could send jobs (though in our case only the main thread does).

Graceful Shutdown

extension ThreadPool: Drop {
    mutating fn drop() {
        for _ in &self.workers {
            self.sender.send(Message.Terminate).expect("Failed to send terminate message")
        }

        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().expect("Failed to join worker thread")
            }
        }
    }
}

When the thread pool is dropped (goes out of scope):

  • Send a Terminate message to each worker
  • Wait for each worker thread to finish with join()

This ensures clean shutdown.

Using the Thread Pool in main.ox

Update src/main.ox:

import webserver.ThreadPool

import std.io.{BufRead, BufReader, Write}
import std.net.TcpListener
import std.fs.readToString

fn main() {
    let listener = TcpListener.bind("127.0.0.1:7878").expect("Failed to bind to port 7878")
    let pool = ThreadPool.new(4)

    println!("Server listening on http://127.0.0.1:7878")

    for stream in listener.incoming() {
        let stream = stream.expect("Failed to accept connection")

        pool.execute {
            handleConnection(stream)
        }
    }
}

fn handleConnection(stream: &mut TcpStream) {
    let bufReader = BufReader.new(stream)
    let requestLine = bufReader.lines().next().expect("Should have first line")
        .expect("Should read first line")

    let (status, filename) = if requestLine == "GET / HTTP/1.1" {
        ("200 OK", "hello.html")
    } else {
        ("404 NOT FOUND", "404.html")
    }

    let contents = readToString(filename).unwrapOrElse { _ -> "Error reading file".toString() }
    let length = contents.len()

    let response = "HTTP/1.1 \(status)\r\nContent-Length: \(length)\r\n\r\n\(contents)"
    stream.writeAll(response.asBytes()).expect("Failed to write response")
}

Key Changes

  1. Create a thread pool with 4 workers:

    let pool = ThreadPool.new(4)
    
  2. Execute jobs in the pool instead of spawning threads:

    pool.execute {
        handleConnection(stream)
    }
    

The closure captures the stream and will be executed by one of the worker threads.

Testing the Multithreaded Server

Compile and run:

cargo run

Open multiple browser tabs to http://127.0.0.1:7878. The server can now handle them concurrently!

You can also test with concurrent curl requests:

curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &
wait

All four requests should complete without waiting for the previous one to finish.

How the Thread Pool Handles Concurrency

When you make multiple requests:

  1. The main thread accepts each connection
  2. Instead of spawning a new thread, it sends the connection to the thread pool
  3. An available worker thread takes the job from the queue
  4. The worker processes the request while the main thread can accept more connections
  5. With 4 workers, up to 4 requests are handled simultaneously
  6. Additional requests queue up and are processed when workers become free

This is much more efficient than creating a new thread per request!

Performance Improvement

With the thread pool approach:

  • Fixed resources - Always 4 threads running, not thousands
  • Better latency - No thread creation overhead per request
  • Predictable performance - System resources are bounded
  • Concurrent handling - Multiple requests processed simultaneously

Summary

We've implemented a thread pool that:

  • Maintains a fixed number of worker threads
  • Uses channels for job distribution
  • Provides graceful shutdown
  • Enables concurrent request handling

Next, we'll add a mechanism to gracefully stop the server.