Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ mod spawner;
mod task;

pub use blocking::*;
pub use manager::*;
pub use manager::{TaskId, TaskManager};
pub use spawner::*;
pub use task::*;
72 changes: 65 additions & 7 deletions crates/tasks/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};

use futures::future::BoxFuture;
use futures::FutureExt;
Expand All @@ -13,6 +14,22 @@ use tracing::trace;

use crate::{CpuBlockingTaskPool, TaskSpawner};

/// A unique identifier for a task spawned on a [`TaskManager`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(u64);

impl TaskId {
pub(crate) fn new(id: u64) -> Self {
Self(id)
}
}

impl std::fmt::Display for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// Usage for this task manager is mainly to spawn tasks that can be cancelled, and capture
/// panicked tasks (which in the context of the task manager are considered critical) for graceful
/// shutdown.
Expand All @@ -38,7 +55,6 @@ pub struct TaskManager {
inner: Arc<Inner>,
}

#[derive(Debug)]
pub(crate) struct Inner {
/// A handle to the Tokio runtime.
pub(crate) handle: Handle,
Expand All @@ -50,6 +66,23 @@ pub(crate) struct Inner {
pub(crate) on_cancel: CancellationToken,
/// Pool dedicated to CPU-bound blocking work.
pub(crate) blocking_pool: CpuBlockingTaskPool,
/// Counter for generating unique task IDs.
pub(crate) next_task_id: AtomicU64,
/// Stores the ID of the task that initiated a graceful shutdown, if any.
pub(crate) shutdown_initiator: Mutex<Option<TaskId>>,
}

impl core::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("handle", &self.handle)
.field("tracker", &self.tracker)
.field("on_cancel", &self.on_cancel)
.field("blocking_pool", &self.blocking_pool)
.field("next_task_id", &self.next_task_id)
.field("shutdown_initiator", &self.shutdown_initiator)
.finish()
}
}

impl TaskManager {
Expand All @@ -67,6 +100,8 @@ impl TaskManager {
blocking_pool,
tracker: TaskTracker::new(),
on_cancel: CancellationToken::new(),
next_task_id: AtomicU64::new(0),
shutdown_initiator: Mutex::new(None),
}),
}
}
Expand Down Expand Up @@ -112,6 +147,13 @@ impl TaskManager {
ShutdownFuture { fut }
}

/// Returns the ID of the task that initiated a graceful shutdown, if any.
///
/// This is useful during development to identify which task caused the manager to shut down.
pub fn shutdown_initiator(&self) -> Option<TaskId> {
*self.inner.shutdown_initiator.lock().unwrap()
}

/// Wait until all spawned tasks are completed.
#[cfg(test)]
async fn wait(&self) {
Expand Down Expand Up @@ -187,15 +229,21 @@ mod tests {
// normal task completion shouldn't trigger graceful shutdown
let _ = spawner.build_task().spawn(future::ready(())).await;
assert!(!manager.inner.on_cancel.is_cancelled());
assert!(manager.shutdown_initiator().is_none(), "no shutdown initiator yet");

// but we can still spawn new tasks, and ongoing tasks shouldn't be cancelled
let result = spawner.spawn(async { true }).await;
assert!(result.is_ok());

// task with graceful shutdown should trigger graceful shutdown on success completion
let _ = spawner.build_task().graceful_shutdown().spawn(future::ready(())).await;
let shutdown_task = spawner.build_task().graceful_shutdown();
let shutdown_task_id = shutdown_task.id();
let _ = shutdown_task.spawn(future::ready(())).await;
assert!(manager.inner.on_cancel.is_cancelled());

// verify the shutdown initiator is set correctly
assert_eq!(manager.shutdown_initiator(), Some(shutdown_task_id));

// wait for the task manager to shutdown gracefully
manager.wait_for_shutdown().await;

Expand All @@ -220,16 +268,21 @@ mod tests {
let result = spawner.build_task().spawn(async { panic!("panicking") }).await;
assert!(result.unwrap_err().is_panic());
assert!(!manager.inner.on_cancel.is_cancelled());
assert!(manager.shutdown_initiator().is_none(), "no shutdown initiator yet");

// but we can still spawn new tasks, and ongoing tasks shouldn't be cancelled
let result = spawner.spawn(async { true }).await;
assert!(result.is_ok());

// task with graceful shutdown should trigger graceful shutdown on panic
let result =
spawner.build_task().graceful_shutdown().spawn(async { panic!("panicking") }).await;
let shutdown_task = spawner.build_task().graceful_shutdown();
let shutdown_task_id = shutdown_task.id();
let result = shutdown_task.spawn(async { panic!("panicking") }).await;
assert!(result.unwrap_err().is_panic());

// verify the shutdown initiator is set correctly
assert_eq!(manager.shutdown_initiator(), Some(shutdown_task_id));

// wait for the task manager to shutdown gracefully
manager.wait_for_shutdown().await;

Expand All @@ -254,16 +307,21 @@ mod tests {
let result = spawner.build_task().spawn_blocking(|| panic!("panicking")).await;
assert!(result.unwrap_err().is_panic());
assert!(!manager.inner.on_cancel.is_cancelled());
assert!(manager.shutdown_initiator().is_none(), "no shutdown initiator yet");

// but we can still spawn new tasks, and ongoing tasks shouldn't be cancelled
let result = spawner.spawn_blocking(|| true).await;
assert!(result.is_ok());

// blocking task with graceful shutdown should trigger graceful shutdown on panic
let result =
spawner.build_task().graceful_shutdown().spawn_blocking(|| panic!("panicking")).await;
let shutdown_task = spawner.build_task().graceful_shutdown();
let shutdown_task_id = shutdown_task.id();
let result = shutdown_task.spawn_blocking(|| panic!("panicking")).await;
assert!(result.unwrap_err().is_panic());

// verify the shutdown initiator is set correctly
assert_eq!(manager.shutdown_initiator(), Some(shutdown_task_id));

// wait for the task manager to shutdown gracefully
manager.wait_for_shutdown().await;

Expand Down
57 changes: 44 additions & 13 deletions crates/tasks/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::sync::Arc;

use futures::FutureExt;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing::{debug, error, info};

use crate::{CpuBlockingJoinHandle, Inner, JoinError, JoinHandle};
use crate::manager::Inner;
use crate::{CpuBlockingJoinHandle, JoinError, JoinHandle, TaskId};

/// A spawner for spawning tasks on the [`TaskManager`] that it was derived from.
///
Expand Down Expand Up @@ -56,6 +57,18 @@ impl TaskSpawner {
pub(crate) fn cancellation_token(&self) -> &CancellationToken {
&self.inner.on_cancel
}

pub(crate) fn next_task_id(&self) -> TaskId {
let id = self.inner.next_task_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
TaskId::new(id)
}

pub(crate) fn set_shutdown_initiator(&self, task_id: TaskId) {
let mut initiator = self.inner.shutdown_initiator.lock().unwrap();
if initiator.is_none() {
*initiator = Some(task_id);
}
}
}

/// A task spawner dedicated for spawning CPU-bound blocking tasks.
Expand Down Expand Up @@ -85,6 +98,8 @@ impl CPUBoundTaskSpawner {
pub struct TaskBuilder<'a> {
/// The task manager that the task will be spawned on.
spawner: &'a TaskSpawner,
/// The unique ID of the task.
id: TaskId,
/// The name of the task.
name: Option<String>,
/// Notifies the task manager to perform a graceful shutdown when the task is finished due to
Expand All @@ -95,7 +110,13 @@ pub struct TaskBuilder<'a> {
impl<'a> TaskBuilder<'a> {
/// Creates a new task builder associated with the given task manager.
pub(crate) fn new(spawner: &'a TaskSpawner) -> Self {
Self { spawner, name: None, graceful_shutdown: false }
let id = spawner.next_task_id();
Self { spawner, id, name: None, graceful_shutdown: false }
}

/// Returns the unique ID of the task.
pub fn id(&self) -> TaskId {
self.id
}

/// Sets the name of the task.
Expand Down Expand Up @@ -138,28 +159,33 @@ impl<'a> TaskBuilder<'a> {
where
F: Future + Send + 'static,
{
let task_id = self.id;
let task_name = self.name.clone();
let graceful_shutdown = self.graceful_shutdown;
let cancellation_token = self.spawner.cancellation_token().clone();
let spawner = self.spawner.clone();

// Tokio already catches panics in the spawned task, but we are unable to handle it directly
// inside the task without awaiting on it. So we catch it ourselves and resume it
// again for tokio to catch it.
AssertUnwindSafe(fut).catch_unwind().map(move |result| match result {
Ok(value) => {
if graceful_shutdown {
debug!(target: "tasks", task = ?task_name, "Task with graceful shutdown completed.");
cancellation_token.cancel();
info!(target: "tasks", id = %task_id, task = ?task_name, "Task with graceful shutdown completed.");
spawner.set_shutdown_initiator(task_id);
spawner.cancellation_token().cancel();
} else {
debug!(target: "tasks", id = %task_id, task = ?task_name, "Task completed.");
}
value
},
Err(error) => {
// get the panic reason message
let reason = error.downcast_ref::<String>();
error!(target = "tasks", task = ?task_name, ?reason, "Task panicked.");
error!(target = "tasks", id = %task_id, task = ?task_name, ?reason, "Task panicked.");

if graceful_shutdown {
cancellation_token.cancel();
spawner.set_shutdown_initiator(task_id);
spawner.cancellation_token().cancel();
}

std::panic::resume_unwind(error);
Expand All @@ -172,26 +198,31 @@ impl<'a> TaskBuilder<'a> {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let task_id = self.id;
let task_name = self.name.clone();
let graceful_shutdown = self.graceful_shutdown;
let cancellation_token = self.spawner.cancellation_token().clone();
let spawner = self.spawner.clone();

move || {
match panic::catch_unwind(AssertUnwindSafe(func)) {
Ok(value) => {
if graceful_shutdown {
debug!(target: "tasks", task = ?task_name, "Task with graceful shutdown completed.");
cancellation_token.cancel();
info!(target: "tasks", id = %task_id, task = ?task_name, "Task with graceful shutdown completed.");
spawner.set_shutdown_initiator(task_id);
spawner.cancellation_token().cancel();
} else {
debug!(target: "tasks", id = %task_id, task = ?task_name, "Task completed.");
}
value
}
Err(error) => {
// get the panic reason message
let reason = error.downcast_ref::<String>();
error!(target = "tasks", task = ?task_name, ?reason, "Task panicked.");
error!(target = "tasks", id = %task_id, task = ?task_name, ?reason, "Task panicked.");

if graceful_shutdown {
cancellation_token.cancel();
spawner.set_shutdown_initiator(task_id);
spawner.cancellation_token().cancel();
}

std::panic::resume_unwind(error);
Expand Down
Loading