DEV Community

Bastian Gruber
Bastian Gruber

Posted on

Explained: Futures in Rust for Web Development

As I explained in one of my last articles, Futures are one part of an asynchrounous environment. We need Syntax, Type and a Runtime to be able to run code in an non blocking way. Futures are the type part of the Rust async story.

Content

  1. The two perspectives
  2. What actually is a Future?
  3. When to use Futures?
  4. Why does using Futures save time?
  5. How to use Futures?
  6. What's so different or hard about Futures?
  7. How to execute more than one Future?
  8. What is the future of Futures?
  9. Summary

The two perspectives

If you are coming from NodeJS, Futures in Rust don't make much sense. In NodeJS, everything happens asynchronously. Therefore for you to be able to say "Hey, I really need to wait for the answer for this GET HTTP call", you are putting.then() on a Promise, so you can make sure you just execute the code inside the .then() when the HTTP call is finished.

In Rust, everything is blocking and synchronous by default, so you might ask yourself: "Why bothering with the complexity, that's exactly what I wanted in the first place!"

Rust is a Systems Programming Language. So for you to craft applications in Rust, you always have to wear two hats. The "Systems Hat"(⛑) and the "Programmer Hat"(🎩). The Systems Hat(⛑) lets you consider what is actually best from a machine point of view, the Programmer Hat(🎩) takes care of the syntax and how a software is written.

If you are coming from NodeJS, the Systems Hat is taken care of from Googles V8 runtime, so you can focus on syntax. In Rust you get help from crates, although you need to make certain decisions yourself.

The Systems Hat is the reason why we want to use Futures. Therefore, you need to handle Futures as a type in your application, and afterwards make sure that you use a runtime to actually execute them. If you are consuming Futures (for example when a crate you are using is returning a Future), you have to understand where the data comes from. In this case, the Programmer Hat and the Systems Hat are needed.

What actually is a Future? 🎩

A Future in Rust is actually a trait, which if you want to implement it, looks like this:

trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
Enter fullscreen mode Exit fullscreen mode

The poll method is important. This will get called from your runtime and will return Async::Ready or Async::NotReady.

If you want to fetch different magazine issues from a remote location or from the file system, you can create and return your own Future for that:

struct Magazine {
    issues: Vec<u8>
}
Enter fullscreen mode Exit fullscreen mode

and impl the Future trait on it:

impl Future for Magazine {
    // here we return a single byte
    type Item = u8;
    type Error = io::Error;

    // this method is getting called from the runtime. Everytime we can read
    // a byte into the buffer, we return `Async::Ready`
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let mut buffer = [0;1];
        match self.0.poll_read(&mut buf) {
            Ok(Async::Ready(_num_bytes_read)) => Ok(Async::Ready(buffer[0])),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The runtime will fill the buffer with the information you are asking for, and the Future will return Async::Ready once the buffer is full.

This is the basic essence of a Future in Rust.

When to use Futures? 🎩

Implementing the Future trait for a type is your way of telling the machine "Hey, this might take a while, consider this when executing this part of the code".

Rust is a synchronous language. Everything happens line by line, and waits until each result is processed. This is not a problem until you want to do tasks which take longer time to process.

If you create a micro service with Rust, and you need to fetch three other micro services for some data, and afterwards you merge the data and write it into a database, that's a use case where you want to consider using Futures.

Why does using Futures save time? ⛑

In our test scenario, each of these HTTP calls could take, lets say, 1 Minute. Instead of now waiting 3 Minutes for all of the calls to finish, you want to run them in parallel so you still just wait 1 Minute instead of 3.

Therefore you create three methods which return a Future each, collect them, and then pass them on to a runtime (to tokio via tokio::run for example).

How to use Futures? 🎩 + ⛑

If a crate returns a Future, you can use .and_then() to work with the result once the Future is ready. Using .map() on a Future lets you alter the type of the Future.

Inside the async Rust world, we have to take care of different types of data. Instead of using Strings and Numbers for example, we deal with streams of values. You will most likely deal with Streams.

A Stream is an extension of a Future. Where a Future is about producing a single value, a Stream is yielding values as long as they are present.

  • Stream: A Stream is, like a Future, a trait you can impl on a type. This lets you iterate over the return values (which are Some(_) or None).
  • Sink: For writing data continuously (in an asynchronous way) to either a socket or to a file system

So Streams are for reading data, Sinks are for writing them. We have two types of Streams in our web ecosystem:

  • Stream of Bytes (like HTTP bodies or TCP streams)
  • Stream of Messages (like WebSocket Frames or UDP Packets), where each message has a fixed size

Code example

Lets look at a use case where a crate you are using returns a Future. When doing HTTP calls, reqwest is a good example. On the Future returned from reqwest, you can use .and_then() to process the results (🎩):

// We have to use "r#" before "async" because "async" is a reserved keyword.
use reqwest::r#async::Client;

// The return type has to be `Future<Item=(), Error=()>` to be able
// to use `tokio::run`. 
// If it has a different type, you have to use `tokio::block_on`
fn fetch_data() -> impl Future<Item=(), Error=()> {
    Client::new()
        .get(url)
        .send()
        .and_then(|res| {
            res.into_body().concat2()
        })
        .map_err(|err| println!("request error: {}", err))
        .map(|body| {
            // here you can use the body to write it to a file
            // or return it via Ok()
            // If you return it via for example Ok(users)
            // then you need to adjust the return type in impl Future<Item=TYPE
            // Examples can be found here: 
            // https://github.com/gruberb/futures_playground

            // For now, lets just turn the body into a Vector
            let v = body.to_vec();
        })
}
Enter fullscreen mode Exit fullscreen mode

Once you created the method which returns a Future (fetch_data()), you have to pass it on to a runtime like tokio (⛑):

tokio::run(fetch_data());
Enter fullscreen mode Exit fullscreen mode

High Level Overview

  1. You receive a Future from an external crate
  2. A Future is likely to return a Stream of values, so you have to form this Stream into a type you can work with in a synchronous way (like a Vector or String)
  3. You return the whole Future from a method via -> impl Future<Item=(), Error=()>, where the braces () are placeholders for the actual type you want to return
  4. You pass the method on to a runtime like tokio via tokio::run(method())
  5. The run​ call will start the runtime, set up the required resources and then put this future on a threadpool and start polling your future
  6. It will then try to pass the work on to the operating system
  7. Each time the runtime polls your Future, if the underlying I/O resource your Future is waiting on is not ready, it will return NotReady​. The runtime sees this NotReady​ return value and puts your Future to sleep
  8. Once an event from the underlying I/O resource comes, the runtime checks if this I/O resource is associated with your Future, and starts polling again. This time, your Future will be able to return a Ready​ with a value, since the underlying I/O resource has provided a value
  9. The runtime will then set the status of the Future to ready and will process the .and_then() part of the code

The Future is getting, unlike in NodeJS, executed via tokio::run and not before. In Node, as soon as you write a Promise, the object is being returned immediately.

What's so different or hard about Futures? ⛑

Lets walk through our example above:

  • We create a new client with Client::new() and .send() our request
  • We will get a Response back:
pub struct Response {
    status: StatusCode,
    headers: HeaderMap,
    url: Box<Url>,
    body: Decoder,
    ...
}
Enter fullscreen mode Exit fullscreen mode
  • The body itself is a Decoder, which can be turned into a Body via .into_body().
  • Body itself implements a Stream (as mentioned earlier).
  • Now we can look into the Futures API from Rust and find out: We can turn a Stream of Bytes into single item via .concat2()
...
        .and_then(|res| {
            res.into_body().concat2()
        })
...
Enter fullscreen mode Exit fullscreen mode
  • We use this single item in the .map() part as body
  • With the help of your code editor, you will find out that this body is actually a Chunk, returned from the library Hyper
  • We can turn this Chunk now into a Vector
...
        .map(|body| {
            let v = body.to_vec();
            // do whatever with v
        })
...
Enter fullscreen mode Exit fullscreen mode

From then on, we are back in "normal" Rust land and can forget what just happened 🙃.

You can find the full example in this GitHub repository. There I receive a JSON and write it to a file.

This is one of the reasons why dealing with Futures is so hard in the beginning. You have to think much lower level than for example in NodeJS. In addition, the async/await syntax is not final yet, which leads to much more boiler code.

These mental steps help you to not get lost when dealing with Futures:

  1. What's the return type or value I am getting from this library?
  2. How can I access the Stream of values on this response?
  3. What will the library turn this Stream into when I collect all the values via .concat2()?
  4. How can I turn this new type into a Vector or another Rust std format so I can pass it on to synchronous methods?

futures_how_they_work

Basically, you always want to figure out how to access the stream of values, collect them, and then process the resulting object.

How to execute more than one Future? 🎩

Generally you want to collect your values as Streams, so that for each item you get over a Stream, you can spawn off a new Future to handle it.

The Rust Futures API has a method called FuturesUnordered which you can use to add more than one Future:

use futures::stream::futures_unordered::FuturesUnordered;
use hyper::{client::ResponseFuture, Client};

fn setup_requests() -> FuturesUnordered<ResponseFuture> {
    let mut list_of_futures = FuturesUnordered::new();

    let client = Client::new();

    let first = client.get(URL);
    list_of_futures.push(first);

    let second = client.get(URL);
    list_of_futures.push(second);

    list_of_futures
}

Enter fullscreen mode Exit fullscreen mode

In this example we used hyper for our HTTP calls. You can find the rest of the code over here on Github.

The syntax will look slightly different if you are using reqwest. Here you .join() multiple requests and return it as "one Future".

fn fetch() -> impl Future<Item=(), Error=()> {
    let client = Client::new();

    let json = |mut res : Response | {
        res.json::<STRUCT_TYPE>()
    };

    let request1 =
        client
            .get(URL)
            .send()
            .and_then(json);

    let request2 =
        client
            .get(URL)
            .send()
            .and_then(json);

    request1.join(request2)
        .map(|(res1, res2)|{
            println!("{:?}", res1);
            println!("{:?}", res2);
        })
        .map_err(|err| {
            println!("stdout error: {}", err);
        })
}
Enter fullscreen mode Exit fullscreen mode

The full code can also be found on GitHub.

What is the future of Futures? 🎩 + ⛑

Futures are getting into stable Rust at 1.37, so around June. There are also changes around the syntax and runtime, which will benefit the amount of code you have to write to get this stream of values out of a Future and into a synchronous Rust format.

You can also use the Runtime crate, which saves you almost all of the boilerplate code. Although going through the process above helps you understand Futures on a deeper level.

Summary

If you perform asynchronous operations, like fetching files from the operating system or making HTTP requests to a remote server, then Futures let you work with the return values in a non-blocking way.

If it would be synchronous, you would have to block a thread the operation is running on, and wait for the result until you continue. To do this in an asynchronous way, we have a runtime which creates threads themselves, and takes on Futures. It will fill the value inside the Future when the operating system returns a value to the runtime.

Once the Future is fulfilled, the runtime sets Async::Ready and the .and_then() part of the code will get executed.

Top comments (2)

Collapse
 
jeikabu profile image
jeikabu

Pretty good write up.

For me the biggest gotchas were:

  1. Pull (rather than push) model; without tokio::run() or something else "pumping" poll() nothing happens
  2. Understanding what's going on with futures 0.1/0.2/0.3 crates
  3. Dealing with Send and Sync
Collapse
 
victorhqc profile image
Victor Quiroz

Thanks for sharing this!

I come from a Node.js background and grasping the idea of async programming in Rust has been a roller coaster experience and I'm still trying to get my head around it :)