As I explained in one of my last articles, Futures are one part of an asynchrounous environment. We need Syntax, Type and a Runtime to be able to run code in an non blocking way. Futures are the type part of the Rust async story.
Content
- The two perspectives
- What actually is a Future?
- When to use Futures?
- Why does using Futures save time?
- How to use Futures?
- What's so different or hard about Futures?
- How to execute more than one Future?
- What is the future of Futures?
- Summary
The two perspectives
If you are coming from NodeJS, Futures in Rust don't make much sense. In NodeJS, everything happens asynchronously. Therefore for you to be able to say "Hey, I really need to wait for the answer for this GET HTTP call", you are putting.then()
on a Promise
, so you can make sure you just execute the code inside the .then()
when the HTTP call is finished.
In Rust, everything is blocking and synchronous by default, so you might ask yourself: "Why bothering with the complexity, that's exactly what I wanted in the first place!"
Rust is a Systems Programming Language. So for you to craft applications in Rust, you always have to wear two hats. The "Systems Hat"(⛑) and the "Programmer Hat"(🎩). The Systems Hat(⛑) lets you consider what is actually best from a machine point of view, the Programmer Hat(🎩) takes care of the syntax and how a software is written.
If you are coming from NodeJS, the Systems Hat is taken care of from Googles V8 runtime, so you can focus on syntax. In Rust you get help from crates, although you need to make certain decisions yourself.
The Systems Hat is the reason why we want to use Futures. Therefore, you need to handle Futures as a type in your application, and afterwards make sure that you use a runtime to actually execute them. If you are consuming Futures (for example when a crate you are using is returning a Future
), you have to understand where the data comes from. In this case, the Programmer Hat and the Systems Hat are needed.
What actually is a Future
? 🎩
A Future
in Rust is actually a trait
, which if you want to implement it, looks like this:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
The poll
method is important. This will get called from your runtime and will return Async::Ready
or Async::NotReady
.
If you want to fetch different magazine issues from a remote location or from the file system, you can create and return your own Future
for that:
struct Magazine {
issues: Vec<u8>
}
and impl
the Future
trait on it:
impl Future for Magazine {
// here we return a single byte
type Item = u8;
type Error = io::Error;
// this method is getting called from the runtime. Everytime we can read
// a byte into the buffer, we return `Async::Ready`
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut buffer = [0;1];
match self.0.poll_read(&mut buf) {
Ok(Async::Ready(_num_bytes_read)) => Ok(Async::Ready(buffer[0])),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e)
}
}
}
The runtime will fill the buffer with the information you are asking for, and the Future
will return Async::Ready
once the buffer is full.
This is the basic essence of a Future
in Rust.
When to use Futures? 🎩
Implementing the
Future
trait for a type is your way of telling the machine "Hey, this might take a while, consider this when executing this part of the code".
Rust is a synchronous language. Everything happens line by line, and waits until each result is processed. This is not a problem until you want to do tasks which take longer time to process.
If you create a micro service with Rust, and you need to fetch three other micro services for some data, and afterwards you merge the data and write it into a database, that's a use case where you want to consider using Futures.
Why does using Futures save time? ⛑
In our test scenario, each of these HTTP calls could take, lets say, 1 Minute. Instead of now waiting 3 Minutes for all of the calls to finish, you want to run them in parallel so you still just wait 1 Minute instead of 3.
Therefore you create three methods which return a Future
each, collect them, and then pass them on to a runtime (to tokio via tokio::run
for example).
How to use Futures? 🎩 + ⛑
If a crate returns a Future
, you can use .and_then()
to work with the result once the Future
is ready. Using .map()
on a Future
lets you alter the type of the Future
.
Inside the async Rust world, we have to take care of different types of data. Instead of using Strings and Numbers for example, we deal with streams of values. You will most likely deal with Streams.
A Stream
is an extension of a Future. Where a Future is about producing a single value, a Stream
is yielding values as long as they are present.
-
Stream: A
Stream
is, like aFuture
, a trait you canimpl
on a type. This lets you iterate over the return values (which areSome(_)
orNone
). - Sink: For writing data continuously (in an asynchronous way) to either a socket or to a file system
So Streams are for reading data, Sinks are for writing them. We have two types of Streams in our web ecosystem:
- Stream of Bytes (like HTTP bodies or TCP streams)
- Stream of Messages (like WebSocket Frames or UDP Packets), where each message has a fixed size
Code example
Lets look at a use case where a crate you are using returns a Future
. When doing HTTP calls, reqwest
is a good example. On the Future
returned from reqwest
, you can use .and_then()
to process the results (🎩):
// We have to use "r#" before "async" because "async" is a reserved keyword.
use reqwest::r#async::Client;
// The return type has to be `Future<Item=(), Error=()>` to be able
// to use `tokio::run`.
// If it has a different type, you have to use `tokio::block_on`
fn fetch_data() -> impl Future<Item=(), Error=()> {
Client::new()
.get(url)
.send()
.and_then(|res| {
res.into_body().concat2()
})
.map_err(|err| println!("request error: {}", err))
.map(|body| {
// here you can use the body to write it to a file
// or return it via Ok()
// If you return it via for example Ok(users)
// then you need to adjust the return type in impl Future<Item=TYPE
// Examples can be found here:
// https://github.com/gruberb/futures_playground
// For now, lets just turn the body into a Vector
let v = body.to_vec();
})
}
Once you created the method which returns a Future
(fetch_data()
), you have to pass it on to a runtime like tokio (⛑):
tokio::run(fetch_data());
High Level Overview
- You receive a
Future
from an external crate - A
Future
is likely to return aStream
of values, so you have to form thisStream
into a type you can work with in a synchronous way (like a Vector or String) - You return the whole Future from a method via
-> impl Future<Item=(), Error=()>
, where the braces()
are placeholders for the actual type you want to return - You pass the method on to a runtime like tokio via
tokio::run(method())
- The
run
call will start the runtime, set up the required resources and then put this future on a threadpool and start polling your future - It will then try to pass the work on to the operating system
- Each time the runtime polls your
Future
, if the underlying I/O resource yourFuture
is waiting on is not ready, it will returnNotReady
. The runtime sees thisNotReady
return value and puts yourFuture
to sleep - Once an event from the underlying I/O resource comes, the runtime checks if this I/O resource is associated with your
Future
, and starts polling again. This time, yourFuture
will be able to return aReady
with a value, since the underlying I/O resource has provided a value - The runtime will then set the status of the
Future
to ready and will process the.and_then()
part of the code
The Future
is getting, unlike in NodeJS, executed via tokio::run
and not before. In Node, as soon as you write a Promise
, the object is being returned immediately.
What's so different or hard about Futures? ⛑
Lets walk through our example above:
- We create a new client with
Client::new()
and.send()
our request - We will get a
Response
back:
pub struct Response {
status: StatusCode,
headers: HeaderMap,
url: Box<Url>,
body: Decoder,
...
}
- The body itself is a Decoder, which can be turned into a
Body
via.into_body()
. -
Body
itself implements a Stream (as mentioned earlier). - Now we can look into the Futures API from Rust and find out: We can turn a Stream of Bytes into single item via
.concat2()
...
.and_then(|res| {
res.into_body().concat2()
})
...
- We use this single item in the
.map()
part asbody
- With the help of your code editor, you will find out that this
body
is actually aChunk
, returned from the libraryHyper
- We can turn this
Chunk
now into aVector
...
.map(|body| {
let v = body.to_vec();
// do whatever with v
})
...
From then on, we are back in "normal" Rust land and can forget what just happened 🙃.
You can find the full example in this GitHub repository. There I receive a
JSON
and write it to a file.
This is one of the reasons why dealing with Futures is so hard in the beginning. You have to think much lower level than for example in NodeJS. In addition, the async/await
syntax is not final yet, which leads to much more boiler code.
These mental steps help you to not get lost when dealing with Futures:
- What's the return type or value I am getting from this library?
- How can I access the
Stream
of values on this response? - What will the library turn this
Stream
into when I collect all the values via.concat2()
? - How can I turn this new type into a Vector or another Rust std format so I can pass it on to synchronous methods?
Basically, you always want to figure out how to access the stream of values, collect them, and then process the resulting object.
How to execute more than one Future
? 🎩
Generally you want to collect your values as Streams
, so that for each item you get over a Stream
, you can spawn off a new Future
to handle it.
The Rust Futures API has a method called FuturesUnordered
which you can use to add more than one Future
:
use futures::stream::futures_unordered::FuturesUnordered;
use hyper::{client::ResponseFuture, Client};
fn setup_requests() -> FuturesUnordered<ResponseFuture> {
let mut list_of_futures = FuturesUnordered::new();
let client = Client::new();
let first = client.get(URL);
list_of_futures.push(first);
let second = client.get(URL);
list_of_futures.push(second);
list_of_futures
}
In this example we used hyper
for our HTTP calls. You can find the rest of the code over here on Github.
The syntax will look slightly different if you are using reqwest
. Here you .join()
multiple requests and return it as "one Future".
fn fetch() -> impl Future<Item=(), Error=()> {
let client = Client::new();
let json = |mut res : Response | {
res.json::<STRUCT_TYPE>()
};
let request1 =
client
.get(URL)
.send()
.and_then(json);
let request2 =
client
.get(URL)
.send()
.and_then(json);
request1.join(request2)
.map(|(res1, res2)|{
println!("{:?}", res1);
println!("{:?}", res2);
})
.map_err(|err| {
println!("stdout error: {}", err);
})
}
The full code can also be found on GitHub.
What is the future of Futures? 🎩 + ⛑
Futures are getting into stable Rust at 1.37, so around June. There are also changes around the syntax and runtime, which will benefit the amount of code you have to write to get this stream of values out of a Future
and into a synchronous Rust format.
You can also use the Runtime
crate, which saves you almost all of the boilerplate code. Although going through the process above helps you understand Futures on a deeper level.
Summary
If you perform asynchronous operations, like fetching files from the operating system or making HTTP requests to a remote server, then Futures let you work with the return values in a non-blocking way.
If it would be synchronous, you would have to block a thread the operation is running on, and wait for the result until you continue. To do this in an asynchronous way, we have a runtime which creates threads themselves, and takes on Futures. It will fill the value inside the Future
when the operating system returns a value to the runtime.
Once the Future
is fulfilled, the runtime sets Async::Ready
and the .and_then()
part of the code will get executed.
Top comments (2)
Pretty good write up.
For me the biggest gotchas were:
tokio::run()
or something else "pumping"poll()
nothing happensSend
andSync
Thanks for sharing this!
I come from a Node.js background and grasping the idea of async programming in Rust has been a roller coaster experience and I'm still trying to get my head around it :)