Modelling a workflow consisting of many tasks with dependencies on each other is a fairly common problem, being found in build systems, CI/CD pipelines, or regular old project management. I recently found myself having to write a deployment automation system in Rust consisting of many asynchronous tasks, and after finding only a single crate attempting to tackle this problem, I decided I probably wouldn't be reinventing the wheel by rolling my own solution.
The problem
Imagine the scenario pictured below as a Directed Acyclic Graph (DAG), with edges from dependents to their dependencies (this will be relevant later). In this example, task C can only be executed once tasks A and B have been completed, and task D relies on C having been completed.
Let's model the tasks as the following code, where each task is a struct with an async funtion task
.
struct A; struct B; struct C; struct D; impl A { async fn task() -> Self { Self } } impl B { async fn task() -> Self { Self } } impl C { async fn task(_a: &A, _b: &B) -> Self { Self } } impl D { async fn task(_c: &C) -> Self { Self } }
The baseline solution
The simplest solution is to just ignore all the benefits of async, and execute the tasks sequentially, only starting the next task once the previous one has finished. As long as we run our tasks following a topological ordering of the graph, this solution will eventually give us our desired result.
async fn run() -> D { let a = A::task().await; let b = B::task().await; let c = C::task(&a, &b).await; let d = D::task(&c).await; d }
Awaiting futures in the future
The entire selling point of async is being able to do other stuff (such as running other tasks) while a task is waiting for I/O. We can benefit from this by creating futures for each task, but only executing all of them once we run the final d.await
(so long as D is the graph's root). By using tokio::join
to await multiple dependencies, in this case A and B, B doesn't have to wait for A to finish. B can start progressing as soon as A relinquishes its use of the processor when awaiting I/O.
async fn run() -> D { let a = A::task(); let b = B::task(); let c = async { let (a, b) = tokio::join!(a, b); C::task(&a, &b).await }; let d = async { let (c,) = tokio::join!(c); D::task(&c).await }; d.await }
Multiprogramming is so 1960
The previous solution still has a major problem, namely that all of the tasks are running on a single thread. While this might be fine if each task spends most of its time waiting, we will be wasting time whenever B is ready to run, but can't because A is executing and has exclusive control over the processor. This is where tokio::spawn
comes into play, by immediately starting the future's execution in the background. A and B may run in parallel on different threads. 1
async fn run() -> D { let a = tokio::spawn(A::task()); let b = tokio::spawn(B::task()); let c = tokio::spawn(async { let (a, b) = tokio::try_join!(a, b).unwrap(); C::task(&a, &b).await }); let d = tokio::spawn(async { let (c,) = tokio::try_join!(c).unwrap(); D::task(&c).await }); d.await.unwrap() }
Wait, it's all Polytrees
This is where we could pat ourselves on the back in contentment, if not for the fact that we've been operating on Polytrees instead of arbitrary DAGs. In the below DAG, both C and D depend on A, but the JoinHandle
returned by tokio::spawn
doesn't implement Clone
, meaning that only one of C or D can await the completion of A.
#[derive(Clone)] struct A; #[derive(Clone)] struct B; #[derive(Clone)] struct C; #[derive(Clone)] struct D; impl D { async fn task(_a: &A, _c: &C) -> Self { Self } }
Fortunately FutureExt::shared
can make any future with a cloneable output cloneable (note the #[derive(Clone)]
for each task). Under the hood, FutureExt::shared
allows all threads to poll the future, but only allows a single thread to actually advance it towards completion at any given time. However, this is of no concern to us, as each task is being run in the background, regardless of whether or not we poll it, by virtue of having been spawned. 2
use futures::FutureExt as _; async fn run() -> D { let a = tokio::spawn(A::task()).map(|r| r.unwrap()).shared(); let b = tokio::spawn(B::task()).map(|r| r.unwrap()).shared(); let c = { let a = a.clone(); tokio::spawn(async { let (a, b) = tokio::join!(a, b); C::task(&a, &b).await }) .map(|r| r.unwrap()) .shared() }; let d = tokio::spawn(async { let (a, c) = tokio::join!(a, c); D::task(&a, &c).await }) .map(|r| r.unwrap()) .shared(); d.await }
A friendly facade
Having arrived at the solution, we can hide all of the complexity. Because joining a spawned task is fallible, it's simplest if we require all tasks to be fallible (returning a Result
), with the task's error implementing From<JoinError>
. As such, we define a Task<R, E>
trait for cloneable fallible futures, and implement a wrapper over tokio's spawning functions.
It's uncommon for errors to be cloneable, hence you might consider creating an error type used exclusively in tasks that wraps your application's Error
type in an Arc
.
use futures::{Future, FutureExt as _}; use tokio::task::{self, JoinError}; pub trait Task<R, E>: Future<Output = Result<R, E>> + Clone + Send + 'static {} impl<T, R, E> Task<R, E> for T where T: Future<Output = Result<R, E>> + Clone + Send + 'static {} pub fn spawn<F, R, E>(future: F) -> impl Task<R, E> where F: Future<Output = Result<R, E>> + Send + 'static, R: Clone + Send + Sync + 'static, E: Clone + Send + Sync + From<JoinError> + 'static, { tokio::spawn(future).map(|r| r?).shared() } pub fn spawn_blocking<F, R, E>(f: F) -> impl Task<R, E> where F: FnOnce() -> Result<R, E> + Send + 'static, R: Clone + Send + Sync + 'static, E: Clone + Send + Sync + From<JoinError> + 'static, { task::spawn_blocking(f).map(|r| r?).shared() }
A final example
We can take a look at a final example with two problems that one could commonly encounter: A is not Clone
, and B's task is fully synchronous, as shown below.
#[derive(Clone)] struct Error; impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Self } } /// `A` is not Clone struct A; #[derive(Clone)] struct B; #[derive(Clone)] struct C; #[derive(Clone)] struct D; impl A { async fn task() -> Result<Self, Error> { Ok(Self) } } impl B { /// `B::task` is sync fn task() -> Result<Self, Error> { Ok(Self) } } impl C { async fn task(_a: &A, _b: &B) -> Result<Self, Error> { Ok(Self) } } impl D { async fn task(_a: &A, _b: &B, _c: &C) -> Result<Self, Error> { Ok(Self) } }
Fortunately, solving both of these problems is trivial. A can be wrapped in an Arc
, allowing a single instance to live on the heap, with references to it that can be cloned. To run a synchronous B, task::spawn_blocking
can be used in our spawn_blocking
implementation.
use std::sync::Arc; use futures::TryFutureExt as _; async fn run() -> Result<D, Error> { let a = spawn(A::task().map_ok(Arc::new)); let b = spawn_blocking(B::task); let c = { let a = a.clone(); let b = b.clone(); spawn(async { let (a, b) = tokio::try_join!(a, b)?; C::task(&a, &b).await }) }; let d = spawn(async { let (a, b, c) = tokio::try_join!(a, b, c)?; D::task(&a, &b, &c).await }); d.await }
Footnotes:
tokio::try_join
is required because joining a spawned task is fallible. We unwrap
the errors for simplicity's sake.