Streams
In Rust async: A Stream is an asynchronous sequence of values.
Compare:
| Abstraction | Produces |
|---|---|
Future<T> | One T |
Stream<Item = T> | Many Ts over time |
Examples of streams:
- Lines from a socket
- Messages from a channel
- Incoming HTTP request bodies
- Periodic timer ticks
Why Futures are not enough
A future completes once.
But many async problems look like:
wait → value → wait → value → wait → value → done
That’s a stream.
The Stream trait (core definition)
From futures-core:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Return values explained
Poll::Pending // no item yet
Poll::Ready(Some(item)) // next item produced
Poll::Ready(None) // stream is finished
This mirrors Iterator, but async.
Streams vs Iterators (important comparison)
| Iterator | Stream |
|---|---|
next() | poll_next() |
| Synchronous | Asynchronous |
| Blocking | Non-blocking |
| Pull-based | Pull-based |
You can think of:
Stream ≈ async Iterator
How Streams work with async/await
Rust does not yet have native async fn next() syntax.
So we use:
while let Some(item) = stream.next().await- Provided by
StreamExt
use futures::stream::StreamExt;
while let Some(item) = stream.next().await {
println!("{item}");
}
Under the hood:
.next()returns a Future.awaitpollspoll_next
Manual Stream implementation (from scratch)
Let’s build a tiny stream that yields numbers with pauses.
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use tokio::time::{Sleep, Duration};
struct CounterStream {
current: u32,
max: u32,
delay: Pin<Box<Sleep>>,
}
impl Stream for CounterStream {
type Item = u32;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.current >= self.max {
return Poll::Ready(None);
}
if self.delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
let value = self.current;
self.current += 1;
self.delay = Box::pin(tokio::time::sleep(Duration::from_secs(1)));
Poll::Ready(Some(value))
}
}
What’s happening
- Each call waits 1 second
- Produces a number
- Resets the timer
- Ends after
max
Using the stream
use futures::StreamExt;
#[tokio::main]
async fn main() {
let mut stream = CounterStream {
current: 0,
max: 5,
delay: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
};
while let Some(value) = stream.next().await {
println!("got {value}");
}
}
Streams from async I/O (real-world example)
TCP lines as a stream
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpStream;
use futures::StreamExt;
async fn read_lines(stream: TcpStream) {
let reader = BufReader::new(stream);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap() {
println!("received: {}", line);
}
}
- Each line arrives asynchronously
- Backpressure is handled naturally
- No blocking threads
Channels are streams
use tokio::sync::mpsc;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
while let Some(value) = rx.recv().await {
println!("got {value}");
}
}
Conceptually:
recv()= stream item- Channel = async stream of messages
Stream combinators (powerful stuff)
From futures::stream::StreamExt:
stream
.filter(|x| async move { x % 2 == 0 })
.map(|x| x * 2)
.take(5)
.for_each(|x| async move {
println!("{x}");
})
.await;
This is like:
- Async iterator pipelines
- With backpressure
Backpressure (critical concept)
Streams:
- Only produce items when polled
- Naturally slow down producers
- Prevent memory blowups
This is why streams scale so well.
Common pitfalls
- Forgetting to
.await
stream.next(); // returns a Future, does nothing
- Blocking inside stream poll: Never block inside
poll_next. - Holding references across yield points: Streams are state machines → same rules as futures.
Mental model
Think of a stream as:
“Call me when you want the next value. I might not have one yet. I’ll tell you when I do.”
Or:
“An async iterator driven by an executor.”