Executing Graphs of Asynchronous Tasks in Rust

2024-08-24

Modelling a workflow consisting of many tasks with dependencies on each other is a fairly common problem, being found in build systems, CI/CD pipelines, or regular old project management. I recently found myself having to write a deployment automation system in Rust consisting of many asynchronous tasks, and after finding only a single crate attempting to tackle this problem, I decided I probably wouldn't be reinventing the wheel by rolling my own solution.

The problem

Imagine the scenario pictured below as a Directed Acyclic Graph (DAG), with edges from dependents to their dependencies (this will be relevant later). In this example, task C can only be executed once tasks A and B have been completed, and task D relies on C having been completed.

polytree.svg

Let's model the tasks as the following code, where each task is a struct with an async funtion task.

struct A;
struct B;
struct C;
struct D;

impl A {
    async fn task() -> Self {
        Self
    }
}

impl B {
    async fn task() -> Self {
        Self
    }
}

impl C {
    async fn task(_a: &A, _b: &B) -> Self {
        Self
    }
}

impl D {
    async fn task(_c: &C) -> Self {
        Self
    }
}

The baseline solution

The simplest solution is to just ignore all the benefits of async, and execute the tasks sequentially, only starting the next task once the previous one has finished. As long as we run our tasks following a topological ordering of the graph, this solution will eventually give us our desired result.

async fn run() -> D {
    let a = A::task().await;
    let b = B::task().await;
    let c = C::task(&a, &b).await;
    let d = D::task(&c).await;
    d
}

Awaiting futures in the future

The entire selling point of async is being able to do other stuff (such as running other tasks) while a task is waiting for I/O. We can benefit from this by creating futures for each task, but only executing all of them once we run the final d.await (so long as D is the graph's root). By using tokio::join to await multiple dependencies, in this case A and B, B doesn't have to wait for A to finish. B can start progressing as soon as A relinquishes its use of the processor when awaiting I/O.

async fn run() -> D {
    let a = A::task();
    let b = B::task();
    let c = async {
        let (a, b) = tokio::join!(a, b);
        C::task(&a, &b).await
    };
    let d = async {
        let (c,) = tokio::join!(c);
        D::task(&c).await
    };
    d.await
}

Multiprogramming is so 1960

The previous solution still has a major problem, namely that all of the tasks are running on a single thread. While this might be fine if each task spends most of its time waiting, we will be wasting time whenever B is ready to run, but can't because A is executing and has exclusive control over the processor. This is where tokio::spawn comes into play, by immediately starting the future's execution in the background. A and B may run in parallel on different threads. 1

async fn run() -> D {
    let a = tokio::spawn(A::task());
    let b = tokio::spawn(B::task());
    let c = tokio::spawn(async {
        let (a, b) = tokio::try_join!(a, b).unwrap();
        C::task(&a, &b).await
    });
    let d = tokio::spawn(async {
        let (c,) = tokio::try_join!(c).unwrap();
        D::task(&c).await
    });
    d.await.unwrap()
}

Wait, it's all Polytrees

This is where we could pat ourselves on the back in contentment, if not for the fact that we've been operating on Polytrees instead of arbitrary DAGs. In the below DAG, both C and D depend on A, but the JoinHandle returned by tokio::spawn doesn't implement Clone, meaning that only one of C or D can await the completion of A.

dag.svg

#[derive(Clone)]
struct A;
#[derive(Clone)]
struct B;
#[derive(Clone)]
struct C;
#[derive(Clone)]
struct D;

impl D {
    async fn task(_a: &A, _c: &C) -> Self {
        Self
    }
}

Fortunately FutureExt::shared can make any future with a cloneable output cloneable (note the #[derive(Clone)] for each task). Under the hood, FutureExt::shared allows all threads to poll the future, but only allows a single thread to actually advance it towards completion at any given time. However, this is of no concern to us, as each task is being run in the background, regardless of whether or not we poll it, by virtue of having been spawned. 2

use futures::FutureExt as _;

async fn run() -> D {
    let a = tokio::spawn(A::task()).map(|r| r.unwrap()).shared();
    let b = tokio::spawn(B::task()).map(|r| r.unwrap()).shared();
    let c = {
        let a = a.clone();
        tokio::spawn(async {
            let (a, b) = tokio::join!(a, b);
            C::task(&a, &b).await
        })
        .map(|r| r.unwrap())
        .shared()
    };
    let d = tokio::spawn(async {
        let (a, c) = tokio::join!(a, c);
        D::task(&a, &c).await
    })
    .map(|r| r.unwrap())
    .shared();
    d.await
}

A friendly facade

Having arrived at the solution, we can hide all of the complexity. Because joining a spawned task is fallible, it's simplest if we require all tasks to be fallible (returning a Result), with the task's error implementing From<JoinError>. As such, we define a Task<R, E> trait for cloneable fallible futures, and implement a wrapper over tokio's spawning functions.

It's uncommon for errors to be cloneable, hence you might consider creating an error type used exclusively in tasks that wraps your application's Error type in an Arc.

use futures::{Future, FutureExt as _};
use tokio::task::{self, JoinError};

pub trait Task<R, E>: Future<Output = Result<R, E>> + Clone + Send + 'static {}
impl<T, R, E> Task<R, E> for T where T: Future<Output = Result<R, E>> + Clone + Send + 'static {}

pub fn spawn<F, R, E>(future: F) -> impl Task<R, E>
where
    F: Future<Output = Result<R, E>> + Send + 'static,
    R: Clone + Send + Sync + 'static,
    E: Clone + Send + Sync + From<JoinError> + 'static,
{
    tokio::spawn(future).map(|r| r?).shared()
}

pub fn spawn_blocking<F, R, E>(f: F) -> impl Task<R, E>
where
    F: FnOnce() -> Result<R, E> + Send + 'static,
    R: Clone + Send + Sync + 'static,
    E: Clone + Send + Sync + From<JoinError> + 'static,
{
    task::spawn_blocking(f).map(|r| r?).shared()
}

A final example

We can take a look at a final example with two problems that one could commonly encounter: A is not Clone, and B's task is fully synchronous, as shown below.

final.svg

#[derive(Clone)]
struct Error;

impl From<JoinError> for Error {
    fn from(value: JoinError) -> Self {
        Self
    }
}

/// `A` is not Clone
struct A;
#[derive(Clone)]
struct B;
#[derive(Clone)]
struct C;
#[derive(Clone)]
struct D;

impl A {
    async fn task() -> Result<Self, Error> {
        Ok(Self)
    }
}

impl B {
    /// `B::task` is sync
    fn task() -> Result<Self, Error> {
        Ok(Self)
    }
}

impl C {
    async fn task(_a: &A, _b: &B) -> Result<Self, Error> {
        Ok(Self)
    }
}

impl D {
    async fn task(_a: &A, _b: &B, _c: &C) -> Result<Self, Error> {
        Ok(Self)
    }
}

Fortunately, solving both of these problems is trivial. A can be wrapped in an Arc, allowing a single instance to live on the heap, with references to it that can be cloned. To run a synchronous B, task::spawn_blocking can be used in our spawn_blocking implementation.

use std::sync::Arc;

use futures::TryFutureExt as _;

async fn run() -> Result<D, Error> {
    let a = spawn(A::task().map_ok(Arc::new));
    let b = spawn_blocking(B::task);
    let c = {
        let a = a.clone();
        let b = b.clone();
        spawn(async {
            let (a, b) = tokio::try_join!(a, b)?;
            C::task(&a, &b).await
        })
    };
    let d = spawn(async {
        let (a, b, c) = tokio::try_join!(a, b, c)?;
        D::task(&a, &b, &c).await
    });
    d.await
}

Footnotes:

1

tokio::try_join is required because joining a spawned task is fallible. We unwrap the errors for simplicity's sake.

2

Since JoinError isn't cloneable, we have to deal with the error before the future can become shared.

© Wojciech Graj 2024