While building a Rust server to serve Astro sites, I ran into something called StreamTrait. I had no idea what this was, or how to work with it. I hacked together a few examples and got it working, but I wanted to understand how it worked.
I decided to try and build my own Ext trait. I implemented the next, take, and map methods, and integrated them with the Future trait.
What Are Ext Traits?
An Ext trait in Rust is a pattern used to extend the functionality of an existing type or trait. By defining methods in an Ext trait, you can add custom behavior to types without modifying their original implementation. This is particularly useful for adding utility methods to widely used traits like Stream.
The Ext pattern is often used because working directly with core traits like Stream can be difficult. The Stream trait itself defines a minimal set of low-level methods, such as poll_next, that are essential for implementing a stream but are not user-friendly for day-to-day use. Extension traits like StreamExt provide higher-level utility methods—such as next, filter, and map—built on top of these low-level primitives. This makes streams much easier to work with.
In this tutorial, I’ll build a custom Ext trait and add next, map, and take methods to _Stream. These methods will return a Future that resolves to the next item in the stream.
Building the StreamExt Trait
Let’s start by defining the StreamExt trait. It extends the Stream trait with a next, map and take methods:
use futures_core::stream::Stream;
// Extension trait
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next { stream: self }
}
fn map<F, T>(self, f: F) -> Map<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> T,
{
Map { stream: self, f }
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take {
stream: self,
remaining: n,
}
}
}
// Implement the extension trait for all types that implement Stream
impl<T: ?Sized> StreamExt for T where T: Stream {}
Here’s what’s happening:
- The
StreamExttrait defines anextmethod that returns aNextstruct. - It also defines a
mapmethod, which takes a closure and applies it to each item in the stream. - The
takemethod limits the number of items produced by a stream. - We implement
StreamExtfor all types that implementStream, ensuring broad compatibility.
Introducing the Next Future
The next method returns a Next struct, which implements the Future trait. This is where the magic happens:
use futures_core::task::{Context, Poll};
use std::pin::Pin;
// Future returned by the `next` method
pub struct Next<'a, S: ?Sized> {
stream: &'a mut S,
}
impl<S: Stream + Unpin + ?Sized> std::future::Future for Next<'_, S> {
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream = Pin::new(&mut *self.get_mut().stream);
stream.poll_next(cx)
}
}
Key points:
Nextholds a mutable reference to the stream.- It implements the
Futuretrait, withOutputbeingOption<S::Item>. - The
pollmethod drives the stream’spoll_nextmethod to retrieve the next item.
Introducing the Map Adapter
The map method transforms each item in the stream using a provided closure. Here’s how it works:
// Stream adapter for `map`
pub struct Map<S, F> {
stream: S,
f: F,
}
impl<S, F, T> Stream for Map<S, F>
where
S: Stream + std::marker::Unpin,
F: FnMut(S::Item) -> T + std::marker::Unpin,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
Key points:
Mapwraps a stream and a closure.- It implements
Stream, transforming each item from the inner stream using the closure. - The
poll_nextmethod applies the closure to each item as it is polled.
Introducing the Take Adapter
The take method limits the number of items a stream produces. Here’s how it works:
// Stream adapter for `take`
pub struct Take<S> {
stream: S,
remaining: usize,
}
impl<S: Stream + Unpin> Stream for Take<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.remaining == 0 {
Poll::Ready(None)
} else {
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
this.remaining -= 1;
Poll::Ready(Some(item))
}
other => other,
}
}
}
}
Key points:
Taketracks the number of remaining items to produce.- It stops producing items when the count reaches zero.
- It passes through items from the inner stream until the limit is reached.
Using the next, map, and take Methods
Now that we have our StreamExt trait with next, map, and take, let’s see them in action:
use futures::stream;
#[tokio::main]
async fn main() {
let mut stream = stream::repeat_with(|| 42).take(10).map(|x| x * 2);
// Using the `next` method from our custom `StreamExt`
while let Some(item) = stream.next().await {
println!("{}", item);
}
}
Here’s what’s happening:
- We create a stream that repeatedly yields the number
42. - We use the
takemethod to limit the stream to 10 items, ensuring it terminates. - We use the
mapmethod to multiply each item by 2. - We use the
nextmethod to retrieve items from the transformed stream one at a time. - The
.awaitkeyword drives the futures to completion, printing each transformed item.
Why Use .await?
Finally, you might wonder why .await is necessary. The answer lies in the asynchronous nature of streams:
- The
nextmethod returns aNextstruct, which implementsFuture. - Calling
.awaitdrives this future to completion, executing itspollmethod. - The
pollmethod, in turn, invokespoll_nexton the underlying stream, retrieving the next item asynchronously.
Recap and Takeaways
By combining Ext traits with futures, we can create ergonomic and powerful abstractions for working with streams in Rust. I’ve pushed this example to GitHub if you want to try it out yourself.