Streams: Async Sequences
While a Future represents a single value that will be available later, a
Stream represents a sequence of values that arrive over time. Streams are
the async equivalent of iterators.
Understanding Streams
Think about these real-world scenarios:
- Messages arriving in a chat application
- Log entries being written to a file
- Sensor readings coming from IoT devices
- Chunks of data downloading from the network
Each produces multiple values over time, not just one. That's what streams model.
// Iterator: produces values synchronously
for item in collection.iter() {
process(item)
}
// Stream: produces values asynchronously
while let Some(item) = await stream.next() {
await process(item)
}
Creating Streams
From Iterators
The simplest way to create a stream is from an existing iterator:
import futures.stream.{ self, StreamExt }
async fn processNumbers() {
let numbers = vec![1, 2, 3, 4, 5]
var stream = stream.iter(numbers)
while let Some(n) = await stream.next() {
println!("Got: \(n)")
}
}
From Channels
Channels naturally produce streams:
import tokio.sync.mpsc
async fn receiveMessages() {
let (tx, mut rx) = mpsc.channel(100)
// Producer sends messages
tokio.spawn(async move {
for i in 0..5 {
await tx.send(format!("Message \(i)"))
await sleep(Duration.fromMillis(100))
}
})
// Receiver processes the stream
while let Some(msg) = await rx.recv() {
println!("Received: \(msg)")
}
}
Using stream! Macro
The async-stream crate provides a convenient macro:
import asyncStream.stream
fn countingStream(max: Int): impl Stream<Item = Int> {
stream! {
for i in 0..max {
await sleep(Duration.fromMillis(100))
yield i
}
}
}
async fn useCountingStream() {
var stream = countingStream(5)
while let Some(n) = await stream.next() {
println!("Count: \(n)")
}
}
Stream Combinators
Like iterators, streams have powerful combinators for transformation and
filtering. You need to import StreamExt to access these methods.
Map
Transform each element:
import futures.stream.StreamExt
async fn doubleStream() {
let numbers = stream.iter(vec![1, 2, 3, 4, 5])
var doubled = numbers.map { it * 2 }
while let Some(n) = await doubled.next() {
println!("\(n)") // 2, 4, 6, 8, 10
}
}
Filter
Keep only matching elements:
async fn filterStream() {
let numbers = stream.iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
var evens = numbers.filter { it % 2 == 0 }
while let Some(n) = await evens.next() {
println!("\(n)") // 2, 4, 6, 8, 10
}
}
Filter Map
Combine filter and map - returns Some to include, null to skip:
async fn filterMapStream() {
let items = stream.iter(vec!["1", "two", "3", "four", "5"])
var parsed = items.filterMap { s ->
s.parse<Int>().ok() // Only keeps successfully parsed numbers
}
while let Some(n) = await parsed.next() {
println!("\(n)") // 1, 3, 5
}
}
Take and Skip
Limit the stream:
async fn limitStream() {
let numbers = stream.iter(0..100)
// Skip first 10, then take 5
var limited = numbers.skip(10).take(5)
while let Some(n) = await limited.next() {
println!("\(n)") // 10, 11, 12, 13, 14
}
}
Collect
Gather all stream elements into a collection:
async fn collectStream() {
let numbers = stream.iter(vec![1, 2, 3, 4, 5])
let doubled = numbers.map { it * 2 }
// Collect all results into a Vec
let results: Vec<Int> = await doubled.collect()
println!("Results: \(results:?)") // [2, 4, 6, 8, 10]
}
Fold
Reduce a stream to a single value:
async fn sumStream() {
let numbers = stream.iter(vec![1, 2, 3, 4, 5])
let sum = await numbers.fold(0, { acc, n -> acc + n })
println!("Sum: \(sum)") // 15
}
Async Operations in Streams
Then
Apply an async function to each element:
async fn asyncTransform() {
let urls = stream.iter(vec![
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
])
// fetch is async, so we use then
var responses = urls.then { url -> fetch(url) }
while let Some(response) = await responses.next() {
println!("Got response: \(response.status())")
}
}
Buffered Processing
Process multiple items concurrently with bounded parallelism:
async fn bufferedFetch() {
let urls = stream.iter(getUrls())
// Create futures (doesn't execute yet)
let fetches = urls.map { url -> fetch(url) }
// Execute up to 5 concurrently
var results = fetches.bufferUnordered(5)
while let Some(result) = await results.next() {
println!("Completed: \(result:?)")
}
}
Results arrive in completion order, not original order. Use buffered(n) if
you need original order.
For-Await Loops
Oxide provides a convenient syntax for iterating over streams:
import tokio_stream.StreamExt
async fn forAwaitExample() {
var stream = countingStream(5)
// for-await iterates over a stream
for n in await stream.next() {
println!("Got: \(n)")
}
}
Note: This uses the pattern for item in await stream.next() rather than a
special for await syntax. Each iteration awaits the next item.
For a cleaner loop, you can use while let:
async fn whileLetStream() {
var stream = countingStream(5)
while let Some(n) = await stream.next() {
println!("Got: \(n)")
}
}
Combining Streams
Merge
Combine multiple streams into one, interleaving items:
import futures.stream.{ select, StreamExt }
async fn mergeStreams() {
let streamA = stream.iter(vec![1, 3, 5])
let streamB = stream.iter(vec![2, 4, 6])
// Items from both streams interleave
var merged = select(streamA, streamB)
while let Some(n) = await merged.next() {
println!("\(n)") // Order depends on timing
}
}
Chain
Concatenate streams (one after another):
async fn chainStreams() {
let first = stream.iter(vec![1, 2, 3])
let second = stream.iter(vec![4, 5, 6])
var chained = first.chain(second)
while let Some(n) = await chained.next() {
println!("\(n)") // 1, 2, 3, 4, 5, 6 (in order)
}
}
Zip
Pair items from two streams:
async fn zipStreams() {
let names = stream.iter(vec!["Alice", "Bob", "Carol"])
let ages = stream.iter(vec![30, 25, 35])
var zipped = names.zip(ages)
while let Some((name, age)) = await zipped.next() {
println!("\(name) is \(age) years old")
}
}
Real-World Example: Log Tail
Here's a practical example - watching a log file for new lines:
import tokio.fs.File
import tokio.io.{ AsyncBufReadExt, BufReader }
async fn tailLog(path: &str) {
let file = await File.open(path).unwrap()
var reader = BufReader.new(file)
var line = String.new()
println!("Watching \(path) for new lines...")
loop {
line.clear()
match await reader.readLine(&mut line) {
Ok(0) -> {
// End of file, wait and try again
await sleep(Duration.fromMillis(100))
},
Ok(_) -> {
print!("\(line)") // Line already has newline
},
Err(e) -> {
println!("Error reading: \(e)")
break
},
}
}
}
Real-World Example: Event Throttling
Throttle UI events to prevent overwhelming handlers:
import tokio.time.{ interval, Duration }
import futures.stream.StreamExt
struct ThrottledStream<S> {
inner: S,
interval: Interval,
pending: S.Item?,
}
async fn throttle<S: Stream>(stream: S, period: Duration): impl Stream<Item = S.Item> {
var interval = interval(period)
var pending: S.Item? = null
var stream = Box.pin(stream)
stream! {
loop {
await tokio.select! {
item = stream.next() -> {
match item {
Some(i) -> pending = i,
null -> {
if let p = pending.take() {
yield p
}
return
},
}
},
_ = interval.tick() -> {
if let p = pending.take() {
yield p
}
},
}
}
}
}
// Usage: only emit at most one event per 100ms
async fn handleEvents() {
let events = getEventStream()
var throttled = throttle(events, Duration.fromMillis(100))
while let Some(event) = await throttled.next() {
await handleEvent(event)
}
}
Stream vs Iterator Comparison
| Operation | Iterator | Stream |
|---|---|---|
| Next item | iter.next() | await stream.next() |
| Transform | iter.map(f) | stream.map(f) |
| Filter | iter.filter(p) | stream.filter(p) |
| Collect | iter.collect() | await stream.collect() |
| Fold | iter.fold(init, f) | await stream.fold(init, f) |
| For loop | for x in iter | while let Some(x) = await stream.next() |
The patterns are nearly identical - the key difference is adding await where
async operations occur.
Summary
Streams are the async version of iterators:
- Create streams from iterators, channels, or the
stream!macro - Transform streams with familiar combinators:
map,filter,take, etc. - Process async operations with
thenandbufferUnordered - Combine streams with
merge,chain, andzip - Iterate using
while let Some(item) = await stream.next()
Remember: stream operations that produce values use prefix await:
await stream.next(), await stream.collect(), await stream.fold(...).
In the final section, we'll look at the underlying traits that make async
programming possible: Future, Pin, and Stream.