This blog post is about a project called Romio that I’ve been working on over the past two or three weeks. Romio is a port of a small part of the Tokio project to the newer futures APIs.

I started the project to get some experience porting code from the old futures API to the new API. However, we realized that this code could also be useful to other people who want to experiment with networking code using the new async/await syntax, so with the help of others we polished it up during the RustFest Rome “impl days” and now its being released for people to experiment with.

I wanted to blog about my experience with this because its definitely validated for me that async/await is going to have a big impact, and I hope other people will be able to use this work to help us accelerate the feedback cycle on this feature.

Porting from futures 0.1 to futures 0.3

The portion of code that I ported was initially about 5000 lines of Rust. This code, originally the tokio-reactor, tokio-tcp, tokio-udp, and tokio-uds libraries, defines a number of types which implement Future, Stream, AsyncRead or AsyncWrite, which are used to implement core networking primitives on top of a reactor based on mio. In order to port to the new APIs, the main thing I had to do was adapt these types to conform to the new interfaces of the async traits that are exposed by futures 0.3.

Handling the explicit waker argument

The most pervasive difference that impacted tokio between futures 0.1 and 0.3 was the handling of wakers. In futures 0.3, the Future interface includes an argument which is a “waker” for the future to trigger that it wants to wake the current task. In 0.1, no such argument existed - instead, futures provided task::current function which would would provide a type you could use to wake the current task.

The futures 0.1 API led to a lot of confusion about tasks, waking them, and the poll-based API of the Future type. It also meant that tons of APIs in the tokio crate have a caveat that they will panic if you don’t call them in a “task context.” All of this goes away in porting to 0.3: a bunch of methods (which tend to begin with poll) now take the LocalWaker argument. These methods are intended to be called from within Future or Stream implementations, which provides the LocalWaker for you to use.

Overall, I think this helps clarify the APIs a lot, but did require some “compiler driven development” of figuring out where I needed to thread the waker through. Now that that’s done, it’s much easier to tell which APIs are intended for use from within an “async/await” context, and which are intended to be used in a manual implement of Future, because the ones for implementing manual futures are all called poll and take a LocalWaker as an argument.

Eliminating code duplication

Tokio uses an internal type called AtomicTask to atomically manage the task waker primitive that it gets from futures. This type is actually a fork of a type in the futures library which has evolved in 0.3 into the AtomicWaker type. The only API difference between the two was that tokio’s had the ability to access the inner task without immediately waking it - which tokio uses to avoid waking in a critical code section.

I was able to eliminate this duplication by making a pull request to provide an analogous method in the AtomicWaker API of futures. This made the future API more complete and useful, and also gave romio access to some optimizations that had been made to the futures version since tokio split out the original code.

Changes from tokio-io to futures::io

This was actually the biggest - and probably the most difficult - change in the conversion, even though it hasn’t been discussed much. Current tokio uses AsyncRead and AsyncWrite defined in the tokio-io crate. As the maintainership of futures and tokio became less entangled and we prepared for making a breaking change to futures, it was agreed that the AsyncRead and AsyncWrite interfaces were a broader concern than tokio, and belonged in the futures project. However, the interfaces have changed quite a lot between the two versions, and I was a bit confused about how to make the conversion for a while.

One of the most confusing aspect of the transition for me was figuring out what to do about the way that tokio handles vectored IO operations. tokio’s implementations of AsyncRead and AsyncWrite use unsafe code to set up some IoVecs on the stack in order to perform a vectored read. The new AsyncRead & AsyncWrite APIs now have a specialized poll_vectored_read extension, which expect the caller to have their own IoVecs set up. This seems much more idiomatic to me, and I think it made the implementations of AsyncRead and AsyncWrite more straightforward on futures 0.3 than they were on futures 0.1.

Pinning

The difference between futures 0.1 and futures 0.3 that has gotten the most attention was the addition of the Pin type to both the Future and Stream method signatures. I was really interested in seeing what the impact of pinning would be on something like a reactor. It turned out that the impact was minimal (as Carl Lerche predicted).

Nearly every future and stream in the code I ported was a concrete type, which meant that all of them implemented Unpin and - consequently - it was possible to treat a Pin<&mut Self> the same was an &mut self. All I really had to do was replace &mut self with mut self: Pin<&mut Self> and the code otherwise compiled, exactly as it had before.

The only exceptions were some futures in UDP that were generic over buffer types. Because it was never necessary to have pinned access to the buffers themselves, all I had to do was add a (safe) blanket impl of Unpin for these futures - with that line of code, I was once again able to treat a pinned reference the same way an unpinned reference would behave. (However, these generic futures have since been removed from romio in connection to other changes to the UdpSocket API).

This isn’t surprising: the code I ported doesn’t define any combinators, which wrap one or more generic future types and manipulate them. Combinators are the case in which you tend to need to write careful code to make sure you don’t invalidate the pin of the interior future. Our long term strategy is to enable users to replace any manually written combinators with async functions except for the core concurrency primitives like select and join, but we’re not quite there yet.

The hesitancy of the Buoyant team (who maintain, among other things, tokio and hyper) to migrate to the new futures API has a lot more to do with their other projects (like tower), which involve a lot of manually implemented combinators that can’t be migrated to async/await syntax yet. It makes sense that they don’t want to move one of their projects to the new API before they move the rest of them. But I’m glad to have drawn out that in some cases, such as the networking primitives that tokio provides, pinning has no negative outcomes.

The benefit of non-'static futures

On the other hand, the other big thing I’ve found is a lot of opportunities that async/await opens up in terms of API design. In particular, one problem that the futures 0.1 ecosystem has suffered from is the fact that all futures need to be 'static, so they can’t contain any references. This is, of course, just a special case of the general problem of “self-referential futures” that pinning was designed to solve, but its a variation on the theme that’s worth highlighting.

For example, in tokio, the UdpSocket::recv_dgram method is required to take self by value, so that it can return a static future. When that future evaluates, it evaluates to a tuple containing the original UdpSocket, so you can poll it again. Thanks to async/await, this sort of threading the handle through different future return values can be completely eliminated. The equivalent code in romio just takes a reference to the UdpSocket. You are expected to await this future, avoiding any problem with having a future that isn’t 'static.

This is just one aspect of the incredibly powerful impact async/await will have on asynchronous code in Rust: you will be able to recover idiomatic coding patterns that on futures 0.1 are just not possible. As Aaron expressed well earlier this year, “async/await is not just about avoiding combinators; it completely changes the game for borrowing.”

This is already paying dividends even in these fundamental APIs which don’t use the async/await feature internally at all. I’m really excited to see more code built with the async/await feature in mind (rather than just accessing it through a compatibility layer) so we can discover just how much of the ergonomics of blocking code we can recover with async/await syntax.

The Romio project going forward

Once I had gotten a fair bit of experience porting things, it occurred to me that the code I had in front of me could be made useful to other people. Right now, the only “production ready” asynchronous IO primitives are those defined in the tokio project, and tokio still uses futures 0.1. This means the only way to write nonblocking networking code using the async/await syntax is to use a compatibility layer adapting your code to futures 0.1, so that you can interact with tokio’s APIs. This has been a blocker for getting more people experimenting with the new async/await syntax.

For that reason, I retooled the project to be something that could be released to crates.io and used by other people. With the help of Aaron Turon, Yoshua Wuyts, and Katharina Fey, we managed during the RustFest Rome impl days to polish the code up and get a nice library ready for release. It’s now available on crates.io for you to use.

Romio was forked from tokio, but it is not a complete port and it isn’t intended to “compete” with tokio. Rather, romio is intended to unblock people who want to try using async/await. For this reason, there are a few things to note about romio:

  1. Romio only contains code relating to asynchronous networking APIs - TCP, UDP, and Unix Domain Sockets. It doesn’t include any of the other APIs from tokio, such the timers, the threadpool implementation, the codec traits, or the filesystem API. In other words, Romio is futures + mio and nothing else.
  2. Romio only exposes the most minimal APIs. It uses a port of the default reactor set up from tokio (a single background reactor thread for all IO), and doesn’t allow users to configure it. Full configurability of how the reactor runs is necessary to cover every use case, but it isn’t necessary to let people experiment with async IO. This means Romio has a small and simple API surface.
  3. Romio is currently versioned 0.3.0-alpha.1. It’ll continue to see regular new incompatible releases to coincide with the ongoing futures releases, as well as to fix up any problems in its own APIs. We intend to have the same rapid, “nightly-style” release cycle with romio as futures 0.3 currently has.

A crate by any other name

Romio was initially called Romio because we worked on it in Rome, but the name corresponds with a certain tragic hero of Shakespeare. As a result, the documentation and codebase are repleat with references to the works of Shakespeare, which has become the theme of the project.

And last, examples!

Here’s a small echo server, written using romio, futures, and the async/await syntax. All this does is listen on a TCP port and echo any messages it receives back to the client:

#![feature(async_await, await_macro, futures_api)]

use std::io;

use futures::StreamExt;
use futures::executor::{self, ThreadPool};
use futures::io::AsyncReadExt;
use futures::task::SpawnExt;

use romio::{TcpListener, TcpStream};

fn main() -> io::Result<()> {
    executor::block_on(async {
        let mut threadpool = ThreadPool::new()?;

        let addr = "127.0.0.1:7878".parse().unwrap();
        let listener = TcpListener::bind(&addr)?;
        let mut incoming = listener.incoming();

        println!("Listening on 127.0.0.1:7878");

        while let Some(stream) = await!(incoming.next()) {
            let stream = stream?;
            let addr = stream.peer_addr()?;

            threadpool.spawn(async move {
                println!("Accepting stream from: {}", addr);

                await!(echo_on(stream)).unwrap();

                println!("Closing stream from: {}", addr);
            }).unwrap();
        }

        Ok(())
    })
}

async fn echo_on(stream: TcpStream) -> io::Result<()> {
    let (mut reader, mut writer) = stream.split();
    await!(reader.copy_into(&mut writer))?;
    Ok(())
}

Conclusion

My experience writing this port was very positive: it provided a lot of solid confirmation for my belief that async/await is going to be revolutionary for network services in Rust. I hope that by making the code public I will enable other people to move these experimentations forward and provide feedback that will enable us to stabilize async/await as soon as possible.

If you want to experiment with some protocol-level asynchronous networking code using async/await syntax, please try Romio out and report any issues with Romio, the new futures APIs, or the async/await feature! If you’d like to get involved in maintaining Romio, feel free to ping me in the wg-net-async Discord channel or file an issue on GitHub.