Skip to main content

Streams

In Rust async: A Stream is an asynchronous sequence of values.

Compare:

AbstractionProduces
Future<T>One T
Stream<Item = T>Many Ts over time

Examples of streams:

  • Lines from a socket
  • Messages from a channel
  • Incoming HTTP request bodies
  • Periodic timer ticks

Why Futures are not enough

A future completes once.

But many async problems look like:

wait → value → wait → value → wait → value → done

That’s a stream.

The Stream trait (core definition)

From futures-core:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
type Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}

Return values explained

Poll::Pending           // no item yet
Poll::Ready(Some(item)) // next item produced
Poll::Ready(None) // stream is finished

This mirrors Iterator, but async.

Streams vs Iterators (important comparison)

IteratorStream
next()poll_next()
SynchronousAsynchronous
BlockingNon-blocking
Pull-basedPull-based

You can think of:

Streamasync Iterator

How Streams work with async/await

Rust does not yet have native async fn next() syntax.

So we use:

  • while let Some(item) = stream.next().await
  • Provided by StreamExt
use futures::stream::StreamExt;

while let Some(item) = stream.next().await {
println!("{item}");
}

Under the hood:

  • .next() returns a Future
  • .await polls poll_next

Manual Stream implementation (from scratch)

Let’s build a tiny stream that yields numbers with pauses.

use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use tokio::time::{Sleep, Duration};

struct CounterStream {
current: u32,
max: u32,
delay: Pin<Box<Sleep>>,
}

impl Stream for CounterStream {
type Item = u32;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.current >= self.max {
return Poll::Ready(None);
}

if self.delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}

let value = self.current;
self.current += 1;
self.delay = Box::pin(tokio::time::sleep(Duration::from_secs(1)));

Poll::Ready(Some(value))
}
}

What’s happening

  • Each call waits 1 second
  • Produces a number
  • Resets the timer
  • Ends after max

Using the stream

use futures::StreamExt;

#[tokio::main]
async fn main() {
let mut stream = CounterStream {
current: 0,
max: 5,
delay: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
};

while let Some(value) = stream.next().await {
println!("got {value}");
}
}

Streams from async I/O (real-world example)

TCP lines as a stream

use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpStream;
use futures::StreamExt;

async fn read_lines(stream: TcpStream) {
let reader = BufReader::new(stream);
let mut lines = reader.lines();

while let Some(line) = lines.next_line().await.unwrap() {
println!("received: {}", line);
}
}
  • Each line arrives asynchronously
  • Backpressure is handled naturally
  • No blocking threads

Channels are streams

use tokio::sync::mpsc;
use futures::StreamExt;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);

tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});

while let Some(value) = rx.recv().await {
println!("got {value}");
}
}

Conceptually:

  • recv() = stream item
  • Channel = async stream of messages

Stream combinators (powerful stuff)

From futures::stream::StreamExt:

stream
.filter(|x| async move { x % 2 == 0 })
.map(|x| x * 2)
.take(5)
.for_each(|x| async move {
println!("{x}");
})
.await;

This is like:

  • Async iterator pipelines
  • With backpressure

Backpressure (critical concept)

Streams:

  • Only produce items when polled
  • Naturally slow down producers
  • Prevent memory blowups

This is why streams scale so well.

Common pitfalls

  • Forgetting to .await
stream.next(); // returns a Future, does nothing
  • Blocking inside stream poll: Never block inside poll_next.
  • Holding references across yield points: Streams are state machines → same rules as futures.

Mental model

Think of a stream as:

“Call me when you want the next value. I might not have one yet. I’ll tell you when I do.”

Or:

“An async iterator driven by an executor.”