Intro
In our last post we saw how to build a complete Future. While being a contrived example we saw what is expected from a future: do not block as much as possible and unpark the task only when necessary. In this post we will expand the future in order to return a stream
of values.
Streams are akin to iterators: they produce multiple outputs of the same type over time. The only difference here is how we consume them. Instead of using Rust's for
statement we will ask our reactor
to do it for us. How?
ForEach combinator
Instead of iterating manually between a stream
s items we use a specific combinator called for_each. But... ForEach
implements Future
so we can pass it to a reactor
or even chain, join etc... with other futures! It's really cool.
Let's build one, shall we?
impl Stream
The Stream
trait is very similar to the Future
trait:
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
// <-- CUT -->
}
pub trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
// <-- CUT -->
}
Both traits have many more functions; since those functions have defaults you do not need to implement them if you don't need to. Let's focus on the poll
function:
// Future
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
// Stream
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
The Stream
trait can optionally return an Item. The Future must do so. The convention here is: if your stream has more data to produce and is readily available return a Ok(Async::Ready(Some(t)))
. If the data to produce is not yet ready return a Ok(Async::NotReady)
. If you are done return Ok(Async::Ready(None))
. In case of errors you can return Err(e)
as usual. So, to summarize:
Situation | Future |
Stream |
---|---|---|
Item to return ready | Ok(Async::Ready(t)) |
Ok(Async::Ready(Some(t))) |
Item to return not ready | Ok(Async::NotReady) |
Ok(Async::NotReady) |
No more items to return | N.A. | Ok(Async::Ready(None)) |
Error | Err(e) |
Err(e) |
Simple stream
Let's build a very simple stream
: one that returns the integers from 0 up to X. Let's see the code first:
struct MyStream {
current: u32,
max: u32,
}
impl MyStream {
pub fn new(max: u32) -> MyStream {
MyStream {
current: 0,
max: max,
}
}
}
impl Stream for MyStream {
type Item = u32;
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.current {
ref mut x if *x < self.max => {
*x = *x + 1;
Ok(Async::Ready(Some(*x)))
}
_ => Ok(Async::Ready(None)),
}
}
}
The important part here is the poll
function. poll
takes a mutable reference of self
so we can mutate the inner fields. What we do it to check if we have returned the last number (max
) and then either terminate the stream or return the actual number and increment it.
Note that while here we have an upper bound you are not forced to specify it. So we could return all the numbers from here to infinity (or overflow, whichever comes first :)).
Consume a stream
To consume a stream we can use the for_each
combinator we saw earlier. Let's print those numbers:
let mut reactor = Core::new().unwrap();
let my_stream = MyStream::new(5);
let fut = my_stream.for_each(|num| {
println!("num === {}", num);
ok(())
});
I will not spoil the surprise by posting the results :). Anyway note that each closure is a future
in itself. You can tell because we are calling the ok(())
lowercase function.
We can call other futures, chain them and so on as usual. In our example we are simply returning ok.
Spawn futures during the event loop
Sometimes when producing items in streams we might want to spawn other futures. There are various reasons to do so (mainly: do not block if you can help it!). Rust's future
s allow you to add futures to an existing event loop using the execute function of a reactor
. There is a gotcha, however: the spawned future must not return anything. The function signature is this one:
For example we use a slightly modified wait-in-another-thread future
written in the previous posts. We revise our stream like this:
impl Stream for MyStream {
type Item = u32;
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use futures::future::Executor;
match self.current {
ref mut x if *x < self.max => {
*x = *x + 1;
self.core_handle.execute(WaitInAnotherThread::new(
Duration::seconds(2),
format!("WAIT {:?}", x),
));
Ok(Async::Ready(Some(*x)))
}
_ => Ok(Async::Ready(None)),
}
}
}
The important part here is the execute
code. Here we are spawning another future in the event loop (a future that will just wait for two seconds and then print WAIT <number>
). Remember, this is supposed to be a daemon-like future
so it must not return anything (except maybe an error).
To test this we'll use this code:
fn main() {
let mut reactor = Core::new().unwrap();
// create a Stream returning 5 items
// Each item will spawn an "inner" future
// into the same reactor loop
let my_stream = MyStream::new(5, reactor.handle());
// we use for_each to consume
// the stream
let fut = my_stream.for_each(|num| {
println!("num === {:?}", num);
ok(())
});
// this is a manual future. it's the same as the
// future spawned into our stream
let wait = WaitInAnotherThread::new(Duration::seconds(3), "Manual3".to_owned());
// we join the futures to let them run concurrently
let future_joined = fut.map_err(|err| {}).join(wait);
// let's run the future
let ret = reactor.run(future_joined).unwrap();
println!("ret == {:?}", ret);
}
Here we also show how to join a stream
with a future
. The map_err
dance is needed, as before, to make errors compatible (see Rust futures: an uneducated, short and hopefully not boring tutorial - Part 2).
If we run this code you can expect an output like this:
num === 1
num === 2
num === 3
num === 4
num === 5
"Manual3" starting the secondary thread!
"Manual3" not ready yet! parking the task.
"WAIT 1" starting the secondary thread!
"WAIT 1" not ready yet! parking the task.
"WAIT 2" starting the secondary thread!
"WAIT 2" not ready yet! parking the task.
"WAIT 3" starting the secondary thread!
"WAIT 3" not ready yet! parking the task.
"WAIT 4" starting the secondary thread!
"WAIT 4" not ready yet! parking the task.
"WAIT 5" starting the secondary thread!
"WAIT 5" not ready yet! parking the task.
"WAIT 1" the time has come == 2017-12-06T10:23:30.853796527Z!
"WAIT 1" ready! the task will complete.
"WAIT 2" the time has come == 2017-12-06T10:23:30.853831227Z!
"WAIT 2" ready! the task will complete.
"WAIT 3" the time has come == 2017-12-06T10:23:30.853842927Z!
"WAIT 3" ready! the task will complete.
"WAIT 5" the time has come == 2017-12-06T10:23:30.853856927Z!
"WAIT 5" ready! the task will complete.
"WAIT 4" the time has come == 2017-12-06T10:23:30.853850427Z!
"WAIT 4" ready! the task will complete.
"Manual3" the time has come == 2017-12-06T10:23:31.853775627Z!
"Manual3" ready! the task will complete.
ret == ((), ())
Notice even if we spawned the task 5 after the task 4 the completion order is scrambled. Also, your output would likely be different.
But what if we did not join the "Wait for 3 seconds" future
?
The revised code is like this:
fn main() {
let mut reactor = Core::new().unwrap();
// create a Stream returning 5 items
// Each item will spawn an "inner" future
// into the same reactor loop
let my_stream = MyStream::new(5, reactor.handle());
// we use for_each to consume
// the stream
let fut = my_stream.for_each(|num| {
println!("num === {:?}", num);
ok(())
});
// let's run the future
let ret = reactor.run(fut).unwrap();
println!("ret == {:?}", ret);
}
We will notice that the code will return almost immediately with this output:
num === 1
num === 2
num === 3
num === 4
num === 5
ret == ()
The background futures did not get a chance to run.
Next steps
In the following posts we will, at least, cover the await!
macro to streamline our futures.
Happy Coding
Francesco Cogno
Top comments (3)
Hi Francesco, thanks for this series. However in this part5, I found that you didn't describe changes required last to
run()
examples to work (maybe intentionally as exercise?)Anyway, if anyone is struggling, here's a cheat sheet _.
Add
core_handle
toMyStream
Add thread_name to
WaitInAnotherThread
Futureand extend its constructor
WaitForAnotherThread Future must have Error type
()
. Also add thread_name to print outs.it's very good!
Hi Francesco,
It seems that the
poll
implementation returns 1..max (both ends inclusively).