Implementing Graceful Shutdown

Currently, the server runs forever. Pressing Ctrl+C terminates it abruptly. Let's implement a graceful shutdown that finishes in-flight requests before terminating.

The Problem

When you press Ctrl+C, the OS sends a signal that terminates the program immediately. Any workers processing requests are cut off mid-execution. In a real server, you want to:

  1. Stop accepting new connections
  2. Let existing requests complete
  3. Clean up resources
  4. Exit cleanly

Improving the Thread Pool

We need to improve our thread pool to handle partial shutdown. Let's modify src/lib.ox:

import std.sync.{mpsc, Arc, Mutex}
import std.thread

public struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc.Sender<Message>,
}

struct Worker {
    id: UIntSize,
    thread: thread.JoinHandle<()>?,
}

enum Message {
    NewJob(Job),
    Terminate,
}

type Job = Box<dyn Fn() + Send + 'static>

extension ThreadPool {
    public static fn new(size: UIntSize): ThreadPool {
        assert!(size > 0, "Thread pool size must be greater than 0")

        let (sender, receiver) = mpsc.channel()
        let receiver = Arc.new(Mutex.new(receiver))

        var workers = Vec.new()

        for id in 0..size {
            workers.push(Worker.new(id, Arc.clone(&receiver)))
        }

        ThreadPool { workers, sender }
    }

    public fn execute<F>(f: F)
    where
        F: Fn() + Send + 'static,
    {
        let job = Box.new(f)
        let message = Message.NewJob(job)
        self.sender.send(message).expect("Failed to send job")
    }

    // Graceful shutdown: finish existing jobs then terminate
    public consuming fn shutdown() {
        println!("Sending terminate message to all workers.")

        for _ in &self.workers {
            self.sender.send(Message.Terminate).expect("Failed to send terminate")
        }

        println!("Shutting down all workers.")

        for worker in &mut self.workers {
            println!("Shutting down worker \(worker.id)")

            if let Some(thread) = worker.thread.take() {
                thread.join().expect("Failed to join worker thread")
            }
        }
    }
}

extension Worker {
    fn new(id: UIntSize, receiver: Arc<Mutex<mpsc.Receiver<Message>>>): Worker {
        let thread = thread.spawn(move {
            loop {
                let message = receiver
                    .lock()
                    .expect("Mutex poisoned")
                    .recv()
                    .expect("Failed to receive message")

                match message {
                    Message.NewJob(job) -> {
                        println!("Worker \(id) got a job; executing.")
                        job()
                    }
                    Message.Terminate -> {
                        println!("Worker \(id) was told to terminate.")
                        break
                    }
                }
            }
        })

        Worker { id, thread: Some(thread) }
    }
}

// Graceful shutdown via Drop trait
extension ThreadPool: Drop {
    mutating fn drop() {
        // Don't do anything - we want explicit shutdown via shutdown()
        // This prevents automatic shutdown on scope exit
    }
}

Controlling Server Shutdown

The key improvement is the explicit shutdown() method. Now we can control when the server stops accepting requests.

Update src/main.ox to handle a fixed number of requests before shutdown:

import webserver.ThreadPool

import std.io.{BufRead, BufReader, Write}
import std.net.TcpListener
import std.fs.readToString

fn main() {
    let listener = TcpListener.bind("127.0.0.1:7878").expect("Failed to bind to port 7878")
    let pool = ThreadPool.new(4)

    println!("Server listening on http://127.0.0.1:7878")
    println!("The server will accept 2 requests, then shut down gracefully.")

    for stream in listener.incoming().take(2) {
        let stream = stream.expect("Failed to accept connection")

        pool.execute {
            handleConnection(stream)
        }
    }

    println!("Shutting down server.")
    pool.shutdown()
}

fn handleConnection(stream: &mut TcpStream) {
    let bufReader = BufReader.new(stream)
    let requestLine = bufReader.lines().next().expect("Should have first line")
        .expect("Should read first line")

    let (status, filename) = if requestLine == "GET / HTTP/1.1" {
        ("200 OK", "hello.html")
    } else {
        ("404 NOT FOUND", "404.html")
    }

    let contents = readToString(filename).unwrapOrElse { _ -> "Error reading file".toString() }
    let length = contents.len()

    let response = "HTTP/1.1 \(status)\r\nContent-Length: \(length)\r\n\r\n\(contents)"
    stream.writeAll(response.asBytes()).expect("Failed to write response")
}

Key Changes

  1. Limited incoming connections:

    for stream in listener.incoming().take(2) {
    

    The .take(2) method limits iteration to 2 items. After 2 requests, the loop exits.

  2. Explicit shutdown:

    pool.shutdown()
    

    Call the shutdown method to gracefully terminate all workers.

Advanced: Handling Signals (for Real Servers)

For a production server, you'd want to respond to signals like SIGTERM. Here's how you could handle that:

import std.sync.atomic.{AtomicBool, Ordering}
import std.sync.Arc

fn main() {
    let listener = TcpListener.bind("127.0.0.1:7878").expect("Failed to bind to port 7878")
    let pool = ThreadPool.new(4)
    let shouldRun = Arc.new(AtomicBool.new(true))

    // In a real app, you'd set shouldRun.store(false, Ordering.SeqCst)
    // when a signal handler is invoked

    println!("Server listening on http://127.0.0.1:7878")

    for stream in listener.incoming() {
        if !shouldRun.load(Ordering.SeqCst) {
            println!("Received shutdown signal, stopping acceptance of new connections")
            break
        }

        let stream = stream.expect("Failed to accept connection")

        pool.execute {
            handleConnection(stream)
        }
    }

    println!("Shutting down server.")
    pool.shutdown()
}

Testing Graceful Shutdown

Compile and run with our limited-request version:

cargo run

In another terminal, make requests:

curl http://127.0.0.1:7878 &
curl http://127.0.0.1:7878 &

You'll see output like:

Server listening on http://127.0.0.1:7878
The server will accept 2 requests, then shut down gracefully.
Worker 0 got a job; executing.
Worker 1 got a job; executing.
Shutting down server.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Both requests complete before the workers are terminated.

What Happens During Graceful Shutdown

  1. Stop accepting connections - The loop exits, no new work enters the queue
  2. Send terminate messages - One per worker thread
  3. Wait for workers - Each worker thread processes its current job, then sees the Terminate message and exits
  4. Join threads - The main thread waits for all worker threads to finish
  5. Exit cleanly - Once all workers are done, the program terminates

Key Concepts

Arc (Atomic Reference Counting)

let receiver = Arc.new(Mutex.new(receiver))

Arc allows multiple threads to safely share ownership of the same data. When the last Arc clone is dropped, the data is deallocated.

Mutex (Mutual Exclusion)

receiver.lock().expect("Mutex poisoned")

Mutex ensures only one thread accesses the receiver at a time. Calling lock() blocks until the lock is available.

Message Passing

self.sender.send(message)

The channel allows threads to communicate without sharing memory directly. This is Rust's (and Oxide's) philosophy: "share memory by communicating, don't communicate by sharing memory."

Summary

We've implemented graceful shutdown by:

  • Adding an explicit shutdown() method to the thread pool
  • Finishing in-flight requests before terminating
  • Properly cleaning up all resources
  • Demonstrating how to limit the server to a fixed number of requests

The complete server now demonstrates:

  • TCP networking fundamentals
  • Thread pool design
  • Channel-based communication
  • Graceful shutdown patterns

This is a production-quality pattern used in real servers worldwide.