Skip to content

Commit

Permalink
Add on_thread_park_id() as unstable method on runtime Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
theron-eg committed Feb 29, 2024
1 parent 11dff31 commit dde346f
Showing 1 changed file with 89 additions and 7 deletions.
96 changes: 89 additions & 7 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl Builder {
/// .worker_threads(1)
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move |_| {
/// move || {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Builder {
/// let runtime = runtime::Builder::new_current_thread()
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move |_| {
/// move || {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
Expand All @@ -638,9 +638,9 @@ impl Builder {
#[cfg(not(loom))]
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
F: Fn() + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self.before_park = Some(std::sync::Arc::new(move |_id| f()));
self
}

Expand All @@ -659,7 +659,7 @@ impl Builder {
/// # use tokio::runtime;
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_unpark(|_| {
/// .on_thread_unpark(|| {
/// println!("thread unparking");
/// })
/// .build();
Expand All @@ -673,9 +673,9 @@ impl Builder {
#[cfg(not(loom))]
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
F: Fn() + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self.after_unpark = Some(std::sync::Arc::new(move |_id| f()));
self
}

Expand Down Expand Up @@ -938,6 +938,88 @@ impl Builder {
self.seed_generator = RngSeedGenerator::new(seed);
self
}

/// Same behavior as `on_thread_park` except the id of the thread that is parked is passed
/// to the callback function `f`. The id corresponds to the same `usize` that is used in
/// calls to `RuntimeMetrics`.
///
/// Note: if both `on_thread_park` and `on_thread_park_id` are called, only the last one
/// will be saved.
///
/// # Examples
///
/// ## Stuck task detector
///
/// ```
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::{thread, time};
/// # use tokio::runtime;
///
/// fn main() {
/// const WORKERS: usize = 4;
/// const UNPARKED: AtomicBool = AtomicBool::new(false);
/// static PARKED: [AtomicBool; WORKERS] = [UNPARKED; WORKERS];
///
/// let runtime = runtime::Builder::new_multi_thread()
/// .worker_threads(WORKERS)
/// .on_thread_park_id(|id| PARKED[id].store(true, Ordering::Release))
/// .on_thread_unpark_id(|id| PARKED[id].store(false, Ordering::Release))
/// .build()
/// .unwrap();
///
/// let metrics = runtime.handle().metrics();
/// thread::spawn(move || {
/// let mut stuck_secs = [0; WORKERS];
/// let mut prev_poll_counts = [None; WORKERS];
/// loop {
/// thread::sleep(time::Duration::from_secs(1));
/// for ii in 0..WORKERS {
/// if PARKED[ii].load(Ordering::Acquire) {
/// prev_poll_counts[ii] = None;
/// } else {
/// let poll_count = metrics.worker_poll_count(ii);
/// if Some(poll_count) == prev_poll_counts[ii] {
/// stuck_secs[ii] += 1;
/// println!("*** worker {} is stuck >= {} secs ***", ii, stuck_secs[ii]);
/// } else {
/// prev_poll_counts[ii] = Some(poll_count);
/// stuck_secs[ii] = 0;
/// }
/// }
/// }
/// }
/// });
///
/// // Spawn a "stuck" task that never yields back to tokio
/// runtime.spawn(async { loop {} });
/// runtime.block_on(async {
/// // Do some work
/// # loop { tokio::task::yield_now().await; }
/// });
/// }
/// ```
#[cfg(not(loom))]
pub fn on_thread_park_id<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self
}

/// Same behavior as `on_thread_unpark` except the id of the thread that is parked is passed
/// to the callback function `f`. The id corresponds to the same `usize` that is used in
/// calls to `RuntimeMetrics`.
///
/// See `on_thread_park_id` for example stuck thread detector.
#[cfg(not(loom))]
pub fn on_thread_unpark_id<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self
}
}

cfg_metrics! {
Expand Down

0 comments on commit dde346f

Please sign in to comment.