diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 00000000000..50e88387d62 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,75 @@ +# Security notes: +# - GitHub-owned actions use tag pins, third-party actions use SHA pins +# - Only two external actions: actions/checkout (GitHub-owned) and +# jfrog/setup-jfrog-cli (allowlisted, handles OIDC auth) +# - Rust toolchain uses the runner's pre-installed rustup +# - The publish environment requires the anthropic-1.52.3 branch +name: Publish to Artifactory + +on: + push: + branches: + - anthropic-1.52.3 + +permissions: + contents: read + id-token: write + +jobs: + publish: + runs-on: ubuntu-latest + environment: publish + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust + run: | + rustup default stable + rustup show + + - name: Setup JFrog CLI + id: setup-jfrog + uses: jfrog/setup-jfrog-cli@ff5cb544114ffc152db9cea1cd3d5978d5074946 # v4.5.11 + env: + JF_URL: https://artifactory.infra.ant.dev + with: + oidc-provider-name: github + oidc-audience: jfrog-github + + - name: Configure Cargo Registry + run: | + mkdir -p ~/.cargo + cat >> ~/.cargo/config.toml << 'EOF' + [registries.crates-internal] + index = "sparse+https://artifactory.infra.ant.dev/artifactory/api/cargo/crates-internal/index/" + credential-provider = ["cargo:token"] + EOF + + - name: Debug - Verify Access + env: + CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN: "Bearer ${{ steps.setup-jfrog.outputs.oidc-token }}" + run: | + echo "=== Token check ===" + echo "Token length: ${#CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN}" + echo "Token starts with Bearer: $(echo "$CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN" | grep -c '^Bearer ')" + + echo "" + echo "=== JFrog CLI ping ===" + jf rt ping + + echo "" + echo "=== Cargo config ===" + cat ~/.cargo/config.toml + + echo "" + echo "=== curl test (with auth) ===" + curl -sI -H "Authorization: ${CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN}" \ + "https://artifactory.infra.ant.dev/artifactory/api/cargo/crates-internal/index/to/ki/tokio" \ + 2>&1 | grep -E "HTTP|WWW-Auth" || true + + - name: Publish tokio to Artifactory + env: + CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN: "Bearer ${{ steps.setup-jfrog.outputs.oidc-token }}" + run: | + cd tokio + cargo publish --registry crates-internal --allow-dirty diff --git a/ANTHROPIC.md b/ANTHROPIC.md new file mode 100644 index 00000000000..22572962bd4 --- /dev/null +++ b/ANTHROPIC.md @@ -0,0 +1,46 @@ +# Anthropic Tokio Fork + +This is Anthropic's fork of [tokio](https://github.com/tokio-rs/tokio), published to our internal Artifactory registry as `crates-internal`. + +## Version Convention + +Versions use the upstream major.minor with a fork-owned patch number `P = N * 1000 + upstream_patch`, plus a constant `+anthropic` build-metadata tag: + +- `1.49.1000+anthropic` = first fork release based on tokio 1.49.0 +- `1.49.2000+anthropic` = second fork release +- `1.49.3001+anthropic` = third fork release, after rebasing onto tokio 1.49.1 +- `1.52.7003+anthropic` = seventh fork release, after rebasing onto tokio 1.52.3 + +## Features Added + +### `stall-detection` + +Detects when a tokio worker thread is stalled (blocked in a task poll for too long) and reports diagnostics including stack traces via `tracing`. + +```rust +let rt = tokio::runtime::Builder::new_multi_thread() + .enable_stall_detection() + .stall_detection_poll_interval(Duration::from_millis(100)) + .stall_detection_escalation_threshold(Duration::from_secs(10)) + .build() + .unwrap(); +``` + +## Publishing + +Publishing happens automatically when changes are pushed to the `anthropic-1.52.3` branch. The GitHub Actions workflow uses OIDC authentication with Artifactory. + +### Prerequisites + +1. The `anthropics/tokio` repo must be added to the OIDC config in `anthropics/terraform-config` +2. A `publish-cli` GitHub environment must be configured in repo settings +3. The `jfrog/setup-jfrog-cli` action must be allowed in repo settings + +## Using in the Monorepo + +In the workspace `Cargo.toml`: + +```toml +[patch.crates-io] +tokio = { version = "1.52.7003+anthropic", registry = "crates-internal" } +``` diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..3de83592d91 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,41 @@ +# Anthropic Tokio Fork - Development Guide + +## Version Bumping + +Every PR merged to an `anthropic-*` branch MUST bump the patch version in `tokio/Cargo.toml`. + +### Files to update + +- `tokio/Cargo.toml` - main tokio crate + +The other workspace crates (`tokio-macros`, `tokio-stream`, `tokio-test`, `tokio-util`) are not published from this fork; the published `tokio` crate depends on them via crates.io. + +### Version format + +`..

+anthropic` where `P = N * 1000 + upstream_patch` + +`N` is the anthropic release counter and never resets — it increments on every PR, including across rebases onto new upstream patch versions, so the version is always monotonic. + +Examples based on upstream 1.49.0: +- `1.49.1000+anthropic` (first anthropic release) +- `1.49.2000+anthropic` (second anthropic release) + +After rebasing onto upstream 1.49.1 (N continues): +- `1.49.3001+anthropic` +- `1.49.4001+anthropic` + +The `+anthropic` suffix is a semver build metadata tag and does not affect dependency resolution. + +## Publishing + +Publishing to the `crates-internal` Artifactory registry happens automatically via GitHub Actions when changes are pushed to an `anthropic-*` branch. See `.github/workflows/publish.yml`. + +## Stall Detection Feature + +The `stall-detection` feature is our primary addition. See `ANTHROPIC.md` for user-facing documentation. + +Key implementation files: +- `tokio/src/runtime/stall_detection.rs` - monitor thread, signal handler, frame-pointer walker +- `tokio/src/runtime/scheduler/multi_thread/worker.rs` - generation counter increments +- `tokio/src/runtime/metrics/worker.rs` - WorkerMetrics fields +- `tokio/src/runtime/builder.rs` - builder API methods diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 056ab17f753..6d2bd981731 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -26,6 +26,7 @@ once_cell = "1.5.2" [target.'cfg(target_os = "linux")'.dev-dependencies] libc = "0.2" +tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing", "stall-detection"] } [target.'cfg(all(tokio_unstable, target_os = "linux"))'.dev-dependencies] tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing", "taskdump"] } @@ -113,5 +114,9 @@ path = "dump.rs" name = "prewarm-fd-table" path = "prewarm-fd-table.rs" +[[example]] +name = "stall_detection" +path = "stall_detection.rs" + [lints] workspace = true diff --git a/examples/stall_detection.rs b/examples/stall_detection.rs new file mode 100644 index 00000000000..538161f21e7 --- /dev/null +++ b/examples/stall_detection.rs @@ -0,0 +1,125 @@ +//! Demonstrates tokio's stall detection feature. +//! +//! The stall detection monitor runs a background thread that periodically checks +//! whether any worker thread has been stuck in a single `poll()` call for too +//! long. When a stall is detected, it captures a stack trace (on Linux) and +//! reports the event via tracing and/or a user-provided callback. +//! +//! This example spawns: +//! - A "good" task that cooperates by yielding and using async sleep. +//! - A "bad" task that blocks the worker with `std::thread::sleep`. +//! - A "busy" task that hogs the CPU with a tight loop. +//! +//! Run with: +//! +//! cargo run --example stall_detection +//! +//! Expected output: the tracing subscriber shows WARN-level messages from the +//! stall detection monitor, including user-space and kernel stack traces. The +//! `on_stall` callback silently counts stall events for programmatic use. + +#[cfg(target_os = "linux")] +fn main() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::time::Duration; + + // Set up a tracing subscriber so the monitor's tracing::warn! output is visible. + tracing_subscriber::fmt() + .with_max_level(tracing::Level::WARN) + .init(); + + // Track how many stall events the callback receives. + let stall_count = Arc::new(AtomicUsize::new(0)); + let stall_count_cb = stall_count.clone(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + // Enable stall detection with a short poll interval for this demo. + // In production you would typically use the default (100ms) or tune + // it to your latency requirements. + .stall_detection_poll_interval(Duration::from_millis(50)) + // Register a callback that fires whenever a stall is detected or resolved. + // The tracing output from the monitor is sufficient for display; the callback + // just counts stalls here to demonstrate programmatic access to stall events. + .on_stall(move |_info| { + stall_count_cb.fetch_add(1, Ordering::Relaxed); + }) + .build() + .expect("failed to build runtime"); + + rt.block_on(async { + println!("=== Stall Detection Demo ===\n"); + + // --- Good task: cooperates with the runtime --- + let good = tokio::spawn(async { + println!("[good task] starting -- does async work and yields properly"); + for i in 0..5 { + // Async sleep yields control back to the runtime. + tokio::time::sleep(Duration::from_millis(100)).await; + println!("[good task] iteration {}", i + 1); + } + println!("[good task] done"); + }); + + // Give the good task a moment to start. + tokio::time::sleep(Duration::from_millis(50)).await; + + // --- Bad task: blocks the worker with std::thread::sleep --- + let bad_blocking = tokio::spawn(async { + println!("[bad task] starting -- about to block the worker for 500ms!"); + // THIS IS THE BUG: using std::thread::sleep inside an async task + // blocks the entire worker thread. The stall detector will catch this. + std::thread::sleep(Duration::from_millis(500)); + println!("[bad task] done blocking"); + }); + + // Give the bad task a moment to start and get detected. + tokio::time::sleep(Duration::from_millis(100)).await; + + // --- Busy task: hogs the CPU with a tight loop --- + let bad_busy = tokio::spawn(async { + println!("[busy task] starting -- about to spin the CPU for 300ms!"); + // Another common mistake: a CPU-intensive loop that never yields. + let start = std::time::Instant::now(); + let mut sum: u64 = 0; + while start.elapsed() < Duration::from_millis(300) { + sum = sum.wrapping_add(1); + } + // Use `sum` to prevent the loop from being optimized away. + println!("[busy task] done spinning (sum = {})", sum); + }); + + // Wait for all tasks to finish. + let _ = good.await; + let _ = bad_blocking.await; + let _ = bad_busy.await; + + // Let the monitor have a final poll cycle to report resolved stalls. + tokio::time::sleep(Duration::from_millis(200)).await; + + let total = stall_count.load(Ordering::Relaxed); + println!("\n=== Demo Complete ==="); + println!("Total stall events reported: {}", total); + if total > 0 { + println!( + "The stall detector successfully caught the blocking operations!\n\ + In production, use tokio::task::spawn_blocking() for blocking work." + ); + } else { + println!( + "No stalls detected. This can happen if the OS scheduled things \ + favorably. Try running again or increasing the blocking durations." + ); + } + }); +} + +#[cfg(not(target_os = "linux"))] +fn main() { + println!( + "Stall detection is currently supported on Linux only.\n\ + Run this example on a Linux system to see it in action." + ); +} diff --git a/tokio-macros/Cargo.toml b/tokio-macros/Cargo.toml index 63918fe1b4f..9b2ef4ae8c6 100644 --- a/tokio-macros/Cargo.toml +++ b/tokio-macros/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-macros" # - Remove path dependencies (if any) # - Update CHANGELOG.md. # - Create "tokio-macros-x.y.z" git tag. -version = "2.7.0" +version = "2.7.0+anthropic.1" edition = "2021" rust-version = "1.71" authors = ["Tokio Contributors "] diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 77f20761a62..52a41fed1bf 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-stream" # - Remove path dependencies (if any) # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.18" +version = "0.1.18+anthropic.1" edition = "2021" rust-version = "1.71" authors = ["Tokio Contributors "] diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index c1a72345941..5b2cb8d3ac9 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-test" # - Remove path dependencies (if any) # - Update CHANGELOG.md. # - Create "tokio-test-0.4.x" git tag. -version = "0.4.5" +version = "0.4.5+anthropic.1" edition = "2021" rust-version = "1.71" authors = ["Tokio Contributors "] diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 84cb3cd225c..a58cd08e962 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-util" # - Remove path dependencies (if any) # - Update CHANGELOG.md. # - Create "tokio-util-0.7.x" git tag. -version = "0.7.18" +version = "0.7.18+anthropic.1" edition = "2021" rust-version = "1.71" authors = ["Tokio Contributors "] diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 123781bbf81..66bd05a9b84 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.52.3" +version = "1.52.7003+anthropic" edition = "2021" rust-version = "1.71" authors = ["Tokio Contributors "] @@ -81,6 +81,7 @@ signal = [ "windows-sys/Win32_Foundation", "windows-sys/Win32_System_Console", ] +stall-detection = ["rt", "libc", "dep:backtrace", "tracing"] sync = [] test-util = ["rt", "sync", "time"] time = [] @@ -98,6 +99,8 @@ pin-project-lite = "0.2.11" bytes = { version = "1.2.1", optional = true } mio = { version = "1.2.0", optional = true, default-features = false } parking_lot = { version = "0.12.0", optional = true } +backtrace = { version = "0.3.58", optional = true } +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } [target.'cfg(any(not(target_family = "wasm"), all(target_os = "wasi", not(target_env = "p1"))))'.dependencies] socket2 = { version = "0.6.3", optional = true, features = ["all"] } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index efbd163c7f4..38765525bf2 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -732,6 +732,16 @@ macro_rules! cfg_metrics_variant { } } +macro_rules! cfg_stall_detection { + ($($item:item)*) => { + $( + #[cfg(feature = "stall-detection")] + #[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] + $item + )* + } +} + macro_rules! cfg_io_uring { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index c42924be77d..ee840a2c792 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -6,6 +6,10 @@ mod pool; pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +cfg_stall_detection! { + pub(crate) use pool::BlockingPoolSnapshot; +} + cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index dae98bc9462..07a1a93f3cc 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -493,6 +493,31 @@ cfg_unstable_metrics! { } } +cfg_stall_detection! { + /// Snapshot of blocking-pool state for the stall-detection log. + /// + /// Values are read with relaxed ordering, so they may be momentarily + /// inconsistent with one another. + #[derive(Clone, Copy, Debug)] + pub(crate) struct BlockingPoolSnapshot { + pub(crate) num_threads: usize, + pub(crate) num_idle_threads: usize, + pub(crate) queue_depth: usize, + pub(crate) thread_cap: usize, + } + + impl Spawner { + pub(crate) fn stall_detection_snapshot(&self) -> BlockingPoolSnapshot { + BlockingPoolSnapshot { + num_threads: self.inner.metrics.num_threads.load(Ordering::Relaxed), + num_idle_threads: self.inner.metrics.num_idle_threads.load(Ordering::Relaxed), + queue_depth: self.inner.metrics.queue_depth.load(Ordering::Relaxed), + thread_cap: self.inner.thread_cap, + } + } + } +} + // Tells whether the error when spawning a thread is temporary. #[inline] fn is_temporary_os_thread_error(error: &io::Error) -> bool { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index a37d47f1824..c48df0ef4da 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -146,6 +146,11 @@ pub struct Builder { /// Whether or not to enable eager hand-off for the I/O and time drivers (in /// `tokio_unstable`). enable_eager_driver_handoff: bool, + + /// Configuration for the stall detection monitor thread. + #[cfg(feature = "stall-detection")] + pub(super) stall_detection_config: + Option, } cfg_unstable! { @@ -341,6 +346,9 @@ impl Builder { // Eager driver handoff is disabled by default. enable_eager_driver_handoff: false, + + #[cfg(feature = "stall-detection")] + stall_detection_config: None, } } @@ -386,6 +394,154 @@ impl Builder { self } + /// Enable stall detection with default configuration (100ms poll interval, + /// 10s escalation threshold). + /// + /// When enabled, a background thread periodically checks whether any worker + /// thread is stalled (stuck in a long-running synchronous operation during + /// `Future::poll`). On Linux, the monitor captures a stack trace from the + /// stalled worker via a realtime signal (SIGRTMIN+1). + /// + /// Stall events are reported via `tracing` (at WARN level for resolved stalls, + /// ERROR level for escalations). A resolved stall logs the duration and the + /// stack trace captured at the time of detection. If a stall persists beyond the + /// escalation threshold (default 10 seconds), an intermediate warning is emitted. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_stall_detection() + /// .enable_all() + /// .build() + /// .unwrap(); + /// # } + /// ``` + #[cfg(feature = "stall-detection")] + #[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] + pub fn enable_stall_detection(&mut self) -> &mut Self { + self.stall_detection_config = + Some(crate::runtime::stall_detection::StallDetectionConfig::default()); + self + } + + /// Enable stall detection with a custom poll interval. + /// + /// The poll interval controls how frequently the monitor checks for stalled + /// workers. Default: 100ms. + /// + /// If stall detection has not been explicitly enabled, calling this method + /// will enable it with default values before applying the custom interval. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .stall_detection_poll_interval(Duration::from_millis(50)) + /// .enable_all() + /// .build() + /// .unwrap(); + /// # } + /// ``` + #[cfg(feature = "stall-detection")] + #[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] + pub fn stall_detection_poll_interval(&mut self, interval: std::time::Duration) -> &mut Self { + self.stall_detection_config + .get_or_insert_with(crate::runtime::stall_detection::StallDetectionConfig::default) + .poll_interval = interval; + self + } + + /// Set the escalation threshold for stall detection. + /// + /// If a stall persists longer than this, an intermediate warning is emitted + /// via `tracing` at ERROR level. Default: 10s. + /// + /// If stall detection has not been explicitly enabled, calling this method + /// will enable it with default values before applying the custom threshold. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_stall_detection() + /// .stall_detection_escalation_threshold(Duration::from_secs(30)) + /// .enable_all() + /// .build() + /// .unwrap(); + /// # } + /// ``` + #[cfg(feature = "stall-detection")] + #[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] + pub fn stall_detection_escalation_threshold( + &mut self, + threshold: std::time::Duration, + ) -> &mut Self { + self.stall_detection_config + .get_or_insert_with(crate::runtime::stall_detection::StallDetectionConfig::default) + .escalation_threshold = threshold; + self + } + + /// Set a callback to be invoked when a scheduler stall is detected or resolved. + /// + /// The callback receives a [`StallInfo`] with the worker index, duration, stack trace, + /// and whether the stall has resolved. + /// + /// Stall events are always reported via `tracing` regardless of whether a + /// callback is set. + /// + /// If stall detection has not been explicitly enabled, calling this method + /// will enable it with default values before applying the callback. + /// + /// [`StallInfo`]: crate::runtime::StallInfo + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .on_stall(|info| { + /// if info.resolved { + /// println!("Worker {} stall resolved after {:?}", info.worker, info.duration); + /// } else { + /// println!("Worker {} stalled for {:?}", info.worker, info.duration); + /// } + /// }) + /// .enable_all() + /// .build() + /// .unwrap(); + /// # } + /// ``` + #[cfg(feature = "stall-detection")] + #[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] + pub fn on_stall(&mut self, callback: F) -> &mut Self + where + F: Fn(crate::runtime::stall_detection::StallInfo) + Send + Sync + 'static, + { + self.stall_detection_config + .get_or_insert_with(crate::runtime::stall_detection::StallDetectionConfig::default) + .on_stall = Some(std::sync::Arc::new(callback)); + self + } + /// Enables the alternative timer implementation, which is disabled by default. /// /// The alternative timer implementation is an unstable feature that may @@ -1337,39 +1493,39 @@ impl Builder { self.disable_lifo_slot = true; self } + } - /// Specifies the random number generation seed to use within all - /// threads associated with the runtime being built. - /// - /// This option is intended to make certain parts of the runtime - /// deterministic (e.g. the [`tokio::select!`] macro). In the case of - /// [`tokio::select!`] it will ensure that the order that branches are - /// polled is deterministic. - /// - /// In addition to the code specifying `rng_seed` and interacting with - /// the runtime, the internals of Tokio and the Rust compiler may affect - /// the sequences of random numbers. In order to ensure repeatable - /// results, the version of Tokio, the versions of all other - /// dependencies that interact with Tokio, and the Rust compiler version - /// should also all remain constant. - /// - /// # Examples - /// - /// ``` - /// # use tokio::runtime::{self, RngSeed}; - /// # pub fn main() { - /// let seed = RngSeed::from_bytes(b"place your seed here"); - /// let rt = runtime::Builder::new_current_thread() - /// .rng_seed(seed) - /// .build(); - /// # } - /// ``` - /// - /// [`tokio::select!`]: crate::select - pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { - self.seed_generator = RngSeedGenerator::new(seed); - self - } + /// Specifies the random number generation seed to use within all + /// threads associated with the runtime being built. + /// + /// This option is intended to make certain parts of the runtime + /// deterministic (e.g. the [`tokio::select!`] macro). In the case of + /// [`tokio::select!`] it will ensure that the order that branches are + /// polled is deterministic. + /// + /// In addition to the code specifying `rng_seed` and interacting with + /// the runtime, the internals of Tokio and the Rust compiler may affect + /// the sequences of random numbers. In order to ensure repeatable + /// results, the version of Tokio, the versions of all other + /// dependencies that interact with Tokio, and the Rust compiler version + /// should also all remain constant. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime::{self, RngSeed}; + /// # pub fn main() { + /// let seed = RngSeed::from_bytes(b"place your seed here"); + /// let rt = runtime::Builder::new_current_thread() + /// .rng_seed(seed) + /// .build(); + /// # } + /// ``` + /// + /// [`tokio::select!`]: crate::select + pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { + self.seed_generator = RngSeedGenerator::new(seed); + self } cfg_unstable_metrics! { @@ -1639,11 +1795,18 @@ impl Builder { let (scheduler, handle, blocking_pool) = self.build_current_thread_runtime_components(None)?; - Ok(Runtime::from_parts( - Scheduler::CurrentThread(scheduler), - handle, - blocking_pool, - )) + #[allow(unused_mut)] + let mut rt = + Runtime::from_parts(Scheduler::CurrentThread(scheduler), handle, blocking_pool); + + #[cfg(feature = "stall-detection")] + if let Some(config) = self.stall_detection_config.take() { + let monitor = + crate::runtime::stall_detection::start_monitor(rt.handle().clone(), config); + rt.set_stall_monitor(monitor); + } + + Ok(rt) } fn build_current_thread_local_runtime(&mut self) -> io::Result { @@ -1903,7 +2066,19 @@ cfg_rt_multi_thread! { let _enter = handle.enter(); launch.launch(); - Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) + #[allow(unused_mut)] + let mut rt = Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool); + + #[cfg(feature = "stall-detection")] + if let Some(config) = self.stall_detection_config.take() { + let monitor = crate::runtime::stall_detection::start_monitor( + rt.handle().clone(), + config, + ); + rt.set_stall_monitor(monitor); + } + + Ok(rt) } } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8aeb608bd02..59db80e1ab8 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -101,6 +101,77 @@ impl RuntimeMetrics { self.handle.inner.injection_queue_depth() } + cfg_stall_detection! { + /// Returns the current poll generation counter for the given worker. + /// + /// Odd values indicate the worker is currently polling a task. + /// Even values indicate the worker is idle (between polls). + /// If two successive reads return the same odd value, the worker is likely stalled. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + pub fn worker_poll_generation(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .poll_generation + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Returns the OS thread ID of the thread currently running the given worker. + /// + /// On Linux, this is the value from `gettid()`. Returns 0 if not available + /// or on non-Linux platforms. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + pub fn worker_os_thread_id(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .os_thread_id + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Returns the name of the thread currently running the given worker. + /// + /// Captured once at worker startup from `std::thread::current().name()`. + /// Returns `None` if the worker has not started yet or its OS thread had + /// no name. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + pub fn worker_thread_name(&self, worker: usize) -> Option<&str> { + self.handle + .inner + .worker_metrics(worker) + .thread_name + .get() + .map(String::as_str) + } + } + cfg_64bit_metrics! { /// Returns the amount of time the given worker thread has been busy. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 30926b2a6c2..762d2d16fd1 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -65,6 +65,20 @@ pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + /// Generation counter for stall detection. Odd = polling, even = idle. + #[cfg(feature = "stall-detection")] + pub(crate) poll_generation: std::sync::atomic::AtomicU64, + + /// OS thread ID of the worker thread (e.g. from gettid() on Linux). 0 = unset. + #[cfg(feature = "stall-detection")] + pub(crate) os_thread_id: std::sync::atomic::AtomicU64, + + /// Name of the worker thread, captured once at startup from + /// `std::thread::current().name()`. Unset if the worker has not started + /// or its OS thread had no name. + #[cfg(feature = "stall-detection")] + pub(crate) thread_name: std::sync::OnceLock, } impl WorkerMetrics { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 713f60de05b..9cb7eb8db21 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -416,6 +416,12 @@ cfg_process_driver! { mod process; } +#[cfg(feature = "stall-detection")] +pub(crate) mod stall_detection; +#[cfg(feature = "stall-detection")] +#[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))] +pub use self::stall_detection::StallInfo; + #[cfg_attr(not(feature = "time"), allow(dead_code))] #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum TimerFlavor { @@ -571,9 +577,9 @@ cfg_rt! { mod builder; pub use self::builder::Builder; + pub use crate::util::rand::RngSeed; cfg_unstable! { pub use self::builder::UnhandledPanic; - pub use crate::util::rand::RngSeed; /// Returns the index of the current worker thread, if called from a /// runtime worker thread. diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index b0091bc4b4e..f30a3068597 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -103,6 +103,10 @@ pub struct Runtime { /// Blocking pool handle, used to signal shutdown blocking_pool: BlockingPool, + + /// Stall detection monitor thread handle (dropped to stop the monitor). + #[cfg(feature = "stall-detection")] + stall_monitor: Option, } /// The flavor of a `Runtime`. @@ -138,9 +142,20 @@ impl Runtime { scheduler, handle, blocking_pool, + #[cfg(feature = "stall-detection")] + stall_monitor: None, } } + /// Sets the stall monitor handle. Called by the builder after construction. + #[cfg(feature = "stall-detection")] + pub(super) fn set_stall_monitor( + &mut self, + monitor: crate::runtime::stall_detection::StallMonitorHandle, + ) { + self.stall_monitor = Some(monitor); + } + /// Creates a new runtime instance with default configuration values. /// /// This results in the multi threaded scheduler, I/O driver, and time driver being diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index f0b072d577e..c270a828582 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -73,6 +73,10 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + /// Generation counter for stall detection + #[cfg(feature = "stall-detection")] + local_generation: u64, + /// How often to check the global queue global_queue_interval: u32, @@ -174,6 +178,8 @@ impl CurrentThread { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), + #[cfg(feature = "stall-detection")] + local_generation: 0, global_queue_interval, unhandled_panic: false, }))); @@ -202,6 +208,28 @@ impl CurrentThread { .shared .worker_metrics .set_thread_id(thread::current().id()); + #[cfg(feature = "stall-detection")] + { + #[cfg(target_os = "linux")] + // SAFETY: gettid() is always safe to call. + let tid = unsafe { libc::gettid() } as u64; + #[cfg(not(target_os = "linux"))] + let tid = 0u64; + + handle + .shared + .worker_metrics + .os_thread_id + .store(tid, std::sync::atomic::Ordering::Release); + + if let Some(name) = thread::current().name() { + let _ = handle + .shared + .worker_metrics + .thread_name + .set(name.to_string()); + } + } return core.block_on(future); } else { let notified = self.notify.notified(); @@ -370,7 +398,27 @@ impl Context { /// thread-local context. fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { core.metrics.start_poll(); + // Stall detection: mark poll start (odd generation) + #[cfg(feature = "stall-detection")] + { + core.local_generation += 1; // now odd = polling + self.handle + .shared + .worker_metrics + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } let mut ret = self.enter(core, || crate::task::coop::budget(f)); + // Stall detection: mark poll end (even generation) + #[cfg(feature = "stall-detection")] + { + ret.0.local_generation += 1; // now even = idle + self.handle + .shared + .worker_metrics + .poll_generation + .store(ret.0.local_generation, std::sync::atomic::Ordering::Release); + } ret.0.metrics.end_poll(); ret } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e222edbf9b7..ecd0e2dc63c 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -157,6 +157,10 @@ struct Core { /// Per-worker runtime stats stats: Stats, + /// Generation counter for stall detection + #[cfg(feature = "stall-detection")] + local_generation: u64, + /// How often to check the global queue global_queue_interval: u32, @@ -300,6 +304,8 @@ pub(super) fn create( enable_eager_driver_handoff: config.enable_eager_driver_handoff, had_driver: park::HadDriver::No, park: Some(park), + #[cfg(feature = "stall-detection")] + local_generation: 0, global_queue_interval: stats.tuned_global_queue_interval(&config), stats, rand: FastRand::from_seed(config.seed_generator.next_seed()), @@ -376,6 +382,24 @@ where if core.is_some() { cx.worker.handle.shared.worker_metrics[cx.worker.index] .set_thread_id(thread::current().id()); + + #[cfg(feature = "stall-detection")] + { + #[cfg(target_os = "linux")] + // SAFETY: gettid() is always safe to call. + let tid = unsafe { libc::gettid() } as u64; + #[cfg(not(target_os = "linux"))] + let tid = 0u64; + + let metrics = + &cx.worker.handle.shared.worker_metrics[cx.worker.index]; + metrics + .os_thread_id + .store(tid, std::sync::atomic::Ordering::Release); + if let Some(name) = thread::current().name() { + let _ = metrics.thread_name.set(name.to_string()); + } + } } let mut cx_core = cx.core.borrow_mut(); @@ -532,6 +556,23 @@ fn run(worker: Arc) { worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id()); + #[cfg(feature = "stall-detection")] + { + #[cfg(target_os = "linux")] + // SAFETY: gettid() is always safe to call. + let tid = unsafe { libc::gettid() } as u64; + #[cfg(not(target_os = "linux"))] + let tid = 0u64; + + let metrics = &worker.handle.shared.worker_metrics[worker.index]; + metrics + .os_thread_id + .store(tid, std::sync::atomic::Ordering::Release); + if let Some(name) = thread::current().name() { + let _ = metrics.thread_name.set(name.to_string()); + } + } + let handle = scheduler::Handle::MultiThread(worker.handle.clone()); crate::runtime::context::enter_runtime(&handle, true, |_| { @@ -563,6 +604,16 @@ impl Context { // a task that had the LIFO slot disabled. self.reset_lifo_enabled(&mut core); + // Ensure local_generation is even (= idle) when entering the run loop. + // This matters after block_in_place() hands off a Core mid-poll with an odd generation. + #[cfg(feature = "stall-detection")] + if core.local_generation % 2 == 1 { + core.local_generation += 1; + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } + // Start as "processing" tasks as polling tasks from the local queue // will be one of the first things we do. core.stats.start_processing_scheduled_tasks(); @@ -668,6 +719,15 @@ impl Context { // purposes. These tasks inherent the "parent"'s limits. core.stats.start_poll(); + // Stall detection: mark poll start (odd generation) + #[cfg(feature = "stall-detection")] + { + core.local_generation += 1; // now odd = polling + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } + // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -708,12 +768,28 @@ impl Context { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); + // Stall detection: mark poll end (even generation) + #[cfg(feature = "stall-detection")] + { + core.local_generation += 1; // now even = idle + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } core.stats.end_poll(); return Ok(core); } }; if !coop::has_budget_remaining() { + // Stall detection: mark poll end (even generation) + #[cfg(feature = "stall-detection")] + { + core.local_generation += 1; // now even = idle + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } core.stats.end_poll(); // Not enough budget left to run the LIFO task, push it to @@ -745,6 +821,19 @@ impl Context { super::counters::inc_lifo_capped(); } + // Stall detection: end previous poll (even) then start new poll (odd) + #[cfg(feature = "stall-detection")] + { + core.local_generation += 1; // now even = idle + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + core.local_generation += 1; // now odd = polling + self.worker.handle.shared.worker_metrics[self.worker.index] + .poll_generation + .store(core.local_generation, std::sync::atomic::Ordering::Release); + } + // Run the LIFO task, then loop *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); diff --git a/tokio/src/runtime/stall_detection.rs b/tokio/src/runtime/stall_detection.rs new file mode 100644 index 00000000000..efd9aad68fa --- /dev/null +++ b/tokio/src/runtime/stall_detection.rs @@ -0,0 +1,909 @@ +//! Stall detection monitor thread with signal-based stack trace capture. +//! +//! When enabled, a background monitor thread periodically polls the per-worker +//! `poll_generation` counters. If a worker's generation counter remains at the +//! same odd value between two successive checks, the worker is likely stalled +//! in a long-running synchronous call within `Future::poll`. +//! +//! On Linux, the monitor can capture a stack trace from the stalled worker +//! thread by sending a realtime signal and collecting instruction +//! pointers from within the signal handler. +//! +//! The signal handler uses an async-signal-safe frame-pointer walker instead of +//! `backtrace::trace_unsynchronized` (which internally calls `_Unwind_Backtrace` +//! -> `dl_iterate_phdr` -> glibc loader lock, making it NOT async-signal-safe). +//! Address validation is performed by attempting a `write()` to a pre-opened pipe, +//! which returns EFAULT for unreadable memory. Both raw pointer reads and `write()` +//! are POSIX async-signal-safe. + +use crate::runtime::blocking::BlockingPoolSnapshot; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::{Arc, Mutex}; + +/// Maximum number of stack frames to capture. +const MAX_FRAMES: usize = 64; + +// --- Pipe-based pointer validation --- + +/// File descriptor for the read end of the validation pipe. +static VALIDATE_PIPE_READ: AtomicI32 = AtomicI32::new(-1); +/// File descriptor for the write end of the validation pipe. +static VALIDATE_PIPE_WRITE: AtomicI32 = AtomicI32::new(-1); + +/// Initialize the validation pipe. Call once at monitor startup. +/// +/// The pipe is used by the signal handler to validate pointer readability +/// via `write()`, which is async-signal-safe and returns EFAULT for bad addresses. +#[cfg(target_os = "linux")] +fn init_validate_pipe() { + let mut fds = [0i32; 2]; + // SAFETY: pipe2() with valid pointer and flags is safe. We store the resulting + // fds in atomics for use in the signal handler. + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) == 0 { + VALIDATE_PIPE_READ.store(fds[0], Ordering::Release); + VALIDATE_PIPE_WRITE.store(fds[1], Ordering::Release); + } + } +} + +/// Drain accumulated data from the validation pipe's read end. +/// +/// Called on the monitor thread before each capture to prevent the pipe buffer +/// from filling up (which would cause `write()` to return EAGAIN in the handler). +#[cfg(target_os = "linux")] +fn drain_validate_pipe() { + let read_fd = VALIDATE_PIPE_READ.load(Ordering::Relaxed); + if read_fd >= 0 { + let mut buf = [0u8; 4096]; + // SAFETY: read_fd is a valid fd from pipe2(), buf is a valid local buffer. + unsafe { while libc::read(read_fd, buf.as_mut_ptr() as *mut _, buf.len()) > 0 {} } + } +} + +/// Close and clean up the validation pipe file descriptors. +#[cfg(target_os = "linux")] +fn cleanup_validate_pipe() { + let read_fd = VALIDATE_PIPE_READ.swap(-1, Ordering::Relaxed); + let write_fd = VALIDATE_PIPE_WRITE.swap(-1, Ordering::Relaxed); + // SAFETY: We only close fds that were successfully opened by pipe2(). + // The swap(-1) ensures each fd is closed at most once. + unsafe { + if read_fd >= 0 { + libc::close(read_fd); + } + if write_fd >= 0 { + libc::close(write_fd); + } + } +} + +/// Check if a pointer is readable. Async-signal-safe. +/// +/// Returns `true` if the memory at `ptr` for `len` bytes is readable. +/// Uses `write()` to a pipe: the kernel returns EFAULT if the source address +/// is not readable, and the pipe has O_NONBLOCK so it never blocks. +#[cfg(target_os = "linux")] +unsafe fn is_readable(ptr: *const u8, len: usize) -> bool { + let write_fd = VALIDATE_PIPE_WRITE.load(Ordering::Relaxed); + if write_fd < 0 { + return false; + } + + // write() returns EFAULT if the source address is not readable. + // If the pipe buffer is full (EAGAIN), we conservatively treat the + // pointer as valid -- the monitor thread drains before each capture. + // SAFETY: write_fd is a valid file descriptor opened by init_validate_pipe(), + // and ptr/len are provided by the caller who ensures they describe a memory + // region we want to probe. If ptr is invalid, write() returns EFAULT rather + // than causing undefined behavior. + let ret = unsafe { libc::write(write_fd, ptr as *const _, len) }; + // ret > 0: wrote successfully, memory is readable + // ret == -1 && errno == EAGAIN: pipe full, assume valid (conservative) + // ret == -1 && errno == EFAULT: memory not readable + if ret > 0 { + return true; + } + if ret == -1 { + // SAFETY: __errno_location() returns a valid pointer to the thread-local + // errno value. This is async-signal-safe. + let errno = unsafe { *libc::__errno_location() }; + if errno == libc::EAGAIN || errno == libc::EWOULDBLOCK { + return true; // conservative: assume readable + } + } + false +} + +// --- Frame-pointer walker --- + +/// Frame pointer layout on the stack. +/// +/// Each frame contains a pointer to the previous frame and the return address. +/// This matches the standard x86_64 and aarch64 frame pointer convention. +#[cfg(target_os = "linux")] +#[repr(C)] +struct StackFrame { + next: *const StackFrame, + return_address: usize, +} + +/// Walk the frame pointer chain starting from the ucontext. +/// +/// Fully async-signal-safe: only raw pointer reads and pipe-based address +/// validation (`write()` syscall). No locks, no heap allocation, no `dl_iterate_phdr`. +/// +/// Requires code to be compiled with frame pointers (`-C force-frame-pointers=yes`), +/// which is the default on x86_64-linux since Rust 1.72. +/// +/// # Safety +/// +/// - `ucontext` must be a valid pointer to a `libc::ucontext_t` (as provided +/// by the kernel to SA_SIGINFO signal handlers). +/// - `frames_ptr` must point to a buffer of at least `max_frames` elements. +#[cfg(target_os = "linux")] +unsafe fn walk_frame_pointers( + ucontext: *mut libc::c_void, + frames_ptr: *mut usize, + max_frames: usize, +) -> usize { + let uc = ucontext as *mut libc::ucontext_t; + if uc.is_null() { + return 0; + } + + let mut idx = 0; + + // First frame: use the instruction pointer from the ucontext. + // This is the IP at the point the signal interrupted the thread. + #[cfg(target_arch = "x86_64")] + // SAFETY: uc is non-null (checked above) and was provided by the kernel + // as a valid ucontext_t pointer to the SA_SIGINFO signal handler. + let (ip, fp) = unsafe { + let mctx = &(*uc).uc_mcontext; + ( + mctx.gregs[libc::REG_RIP as usize] as usize, + mctx.gregs[libc::REG_RBP as usize] as usize, + ) + }; + + #[cfg(target_arch = "aarch64")] + // SAFETY: uc is non-null (checked above) and was provided by the kernel + // as a valid ucontext_t pointer to the SA_SIGINFO signal handler. + let (ip, fp) = unsafe { + let mctx = &(*uc).uc_mcontext; + ( + mctx.pc as usize, + mctx.regs[29] as usize, // x29 = frame pointer on aarch64 + ) + }; + + // On unsupported architectures, bail out. + #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))] + { + let _ = (uc, frames_ptr, max_frames, idx); + return 0; + } + + #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] + { + // Store the interrupted instruction pointer as frame 0. + if idx < max_frames { + // SAFETY: frames_ptr has at least max_frames elements (caller invariant), + // and idx < max_frames. + unsafe { *frames_ptr.add(idx) = ip }; + idx += 1; + } + + // Walk the frame pointer chain. + let mut current_fp = fp as *const StackFrame; + + while idx < max_frames { + // Validate the frame pointer: must be non-null, aligned, and readable. + if current_fp.is_null() { + break; + } + if (current_fp as usize) % std::mem::align_of::<*const u8>() != 0 { + break; + } + // SAFETY: is_readable probes the memory via write() to a pipe, + // which is async-signal-safe and will not crash on bad addresses. + if unsafe { !is_readable(current_fp as *const u8, std::mem::size_of::()) } { + break; + } + + // SAFETY: is_readable confirmed the StackFrame at current_fp is readable. + let frame = unsafe { &*current_fp }; + let ret_addr = frame.return_address; + + // A return address of 0 means we've reached the bottom of the stack. + if ret_addr == 0 { + break; + } + + // SAFETY: frames_ptr has at least max_frames elements, and idx < max_frames. + unsafe { *frames_ptr.add(idx) = ret_addr }; + idx += 1; + + // The next frame pointer should be higher on the stack (stack grows down). + // This prevents infinite loops from corrupted frame pointers. + let next_fp = frame.next; + if next_fp <= current_fp { + break; + } + + current_fp = next_fp; + } + + idx + } +} + +/// State for a single in-flight trace capture. +/// +/// Only one capture can be in flight at a time (enforced by the monitor thread, +/// which is the sole caller of `capture_trace`). +/// +/// # Safety protocol +/// +/// - Only the monitor thread writes `ready` (set before sending signal, +/// cleared after reading the result or on timeout). +/// - Only the signal handler writes `frames`, `len`, and `done`. +/// - The monitor reads `frames` and `len` only after observing `done == true`, +/// which establishes a happens-before relationship via Acquire/Release on +/// `done`. +struct CaptureState { + /// Set by monitor before sending signal, cleared after reading result. + ready: AtomicBool, + /// Set by signal handler after capture is complete. + done: AtomicBool, + /// Captured instruction pointers. + frames: std::cell::UnsafeCell<[usize; MAX_FRAMES]>, + /// Number of frames captured. + len: std::cell::UnsafeCell, +} + +// SAFETY: Access is synchronized via the ready/done atomic protocol. +// Only the monitor writes `ready`; only the handler writes `frames`, `len`, `done`. +// The monitor reads `frames` and `len` only after observing `done == true`. +unsafe impl Sync for CaptureState {} +unsafe impl Send for CaptureState {} + +static CAPTURE: CaptureState = CaptureState { + ready: AtomicBool::new(false), + done: AtomicBool::new(false), + frames: std::cell::UnsafeCell::new([0usize; MAX_FRAMES]), + len: std::cell::UnsafeCell::new(0), +}; + +/// Global lock held during stack trace capture to prevent multiple monitor +/// threads from interfering with each other's signal handler communication. +/// This is NOT held by the signal handler itself (which would be unsafe), +/// only by the monitor thread around the ready/signal/wait-for-done sequence. +static CAPTURE_LOCK: Mutex<()> = Mutex::new(()); + +// --- Signal handler infrastructure (Linux only) --- + +#[cfg(target_os = "linux")] +mod signal_impl { + use super::*; + use std::sync::Once; + + /// Dynamically chosen realtime signal number, or -1 if no free signal was found. + static STALL_SIGNAL: AtomicI32 = AtomicI32::new(-1); + static SIGNAL_INIT: Once = Once::new(); + + /// Signal handler invoked on the target worker thread. + /// + /// # Safety + /// + /// This is a signal handler. It is fully async-signal-safe: + /// - Frame pointer walking uses only raw pointer reads + /// - Address validation uses `write()` to a pipe (POSIX async-signal-safe) + /// - No locks, no heap allocation, no `dl_iterate_phdr` + /// - Requires code to be compiled with frame pointers + /// (`-C force-frame-pointers=yes`), which is the default on + /// x86_64-linux since Rust 1.72 + unsafe extern "C" fn stall_signal_handler( + _sig: libc::c_int, + _info: *mut libc::siginfo_t, + ucontext: *mut libc::c_void, + ) { + if !CAPTURE.ready.load(Ordering::Acquire) { + return; + } + + // Save errno before any syscalls. The interrupted thread may be in the + // middle of inspecting errno from a prior syscall; the write()/read() + // calls below would otherwise clobber it. + // SAFETY: __errno_location() returns the thread-local errno pointer and + // is async-signal-safe. + let saved_errno = unsafe { *libc::__errno_location() }; + + let frames_ptr = CAPTURE.frames.get(); + let len_ptr = CAPTURE.len.get(); + + // SAFETY: walk_frame_pointers uses only raw pointer reads and + // pipe-based address validation (write() syscall), both of which + // are async-signal-safe. No locks, no heap allocation, no dl_iterate_phdr. + // + // The raw pointer dereferences of `frames_ptr` and `len_ptr` are safe because + // the atomic protocol guarantees exclusive access: only the signal handler + // writes to these fields, and only one handler runs at a time. + unsafe { + let count = walk_frame_pointers(ucontext, (*frames_ptr).as_mut_ptr(), MAX_FRAMES); + *len_ptr = count; + } + CAPTURE.done.store(true, Ordering::Release); + + // SAFETY: same as the load above. + unsafe { *libc::__errno_location() = saved_errno }; + } + + /// Installs the stall signal handler. Safe to call multiple times; + /// the handler is only installed once. + /// + /// Probes realtime signals from `SIGRTMIN+1` through `SIGRTMAX` and installs + /// our handler on the first signal that has a default or ignored disposition. + /// This avoids overwriting handlers installed by other profiling tools. + pub(super) fn install_signal_handler() { + SIGNAL_INIT.call_once(|| { + // SAFETY: We call sigaction to probe and then install a handler. + // All arguments are valid: zeroed sigaction structs, valid signal numbers. + unsafe { + // Probe for a free realtime signal. + // Skip SIGRTMIN+0 which is often reserved by glibc/pthreads. + let min = libc::SIGRTMIN() + 1; + let max = libc::SIGRTMAX(); + + for sig in min..=max { + let mut old: libc::sigaction = std::mem::zeroed(); + if libc::sigaction(sig, std::ptr::null(), &mut old) != 0 { + continue; + } + + // Check if the signal is free (default or ignored disposition). + let handler = old.sa_sigaction; + if handler == libc::SIG_DFL || handler == libc::SIG_IGN { + // Found a free signal -- install our handler. + let mut sa: libc::sigaction = std::mem::zeroed(); + sa.sa_sigaction = + stall_signal_handler as unsafe extern "C" fn(_, _, _) as usize; + sa.sa_flags = libc::SA_SIGINFO | libc::SA_RESTART; + libc::sigemptyset(&mut sa.sa_mask); + + if libc::sigaction(sig, &sa, std::ptr::null_mut()) == 0 { + STALL_SIGNAL.store(sig, Ordering::Release); + return; + } + } + } + + // No free signal found. + tracing::warn!( + "tokio stall detection: no free realtime signal available, \ + stack traces will be unavailable" + ); + } + }); + } + + /// Captures a stack trace from the thread with the given OS thread ID. + /// + /// Returns instruction pointer addresses, or an empty vec on timeout + /// or if no realtime signal was available. + pub(super) fn capture_trace(os_tid: u64) -> Vec { + let sig = STALL_SIGNAL.load(Ordering::Acquire); + if sig < 0 { + return vec![]; // no signal available + } + + // Hold the capture lock to prevent concurrent monitor threads from + // interfering with each other's signal handler communication. + let _guard = CAPTURE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + // Drain the validation pipe before capture to ensure the signal handler's + // write()-based pointer validation won't hit a full pipe buffer. + drain_validate_pipe(); + + // Reset state + CAPTURE.done.store(false, Ordering::Release); + CAPTURE.ready.store(true, Ordering::Release); + + // SAFETY: tgkill with our own pid and a valid tid sends a signal to + // the specified thread. os_tid was obtained from gettid() on the worker. + unsafe { + libc::syscall( + libc::SYS_tgkill, + libc::getpid() as libc::c_long, + os_tid as libc::c_long, + sig as libc::c_long, + ); + } + + // Wait for completion with timeout + let start = std::time::Instant::now(); + while !CAPTURE.done.load(Ordering::Acquire) { + if start.elapsed() > std::time::Duration::from_millis(500) { + CAPTURE.ready.store(false, Ordering::Release); + return vec![]; + } + std::thread::yield_now(); + } + + CAPTURE.ready.store(false, Ordering::Release); + + // SAFETY: We observed `done == true` with Acquire ordering, which + // synchronizes with the handler's Release store, guaranteeing that + // `frames` and `len` are fully written and visible to us. + unsafe { + let len = *CAPTURE.len.get(); + let frames = &*CAPTURE.frames.get(); + frames[..len].to_vec() + } + } +} + +/// Symbolicate a captured trace: resolve instruction pointers to human-readable +/// symbol names, file paths, and line numbers. +#[cfg(target_os = "linux")] +fn symbolicate_trace(ips: &[usize]) -> Vec { + use std::ffi::c_void; + + let mut result = Vec::new(); + for &ip in ips { + let mut resolved = String::new(); + backtrace::resolve(ip as *mut c_void, |symbol| { + if let Some(name) = symbol.name() { + resolved = format!("{name}"); + if let (Some(file), Some(line)) = (symbol.filename(), symbol.lineno()) { + resolved = format!("{name} at {}:{line}", file.display()); + } + } else { + resolved = format!("{ip:#x}"); + } + }); + if resolved.is_empty() { + resolved = format!("{ip:#x}"); + } + result.push(resolved); + } + result +} + +/// Stub for non-Linux platforms. Capture is currently Linux-only so this is +/// usually called with an empty slice; if any IPs are passed (e.g. from a +/// future non-Linux capture path), format them as hex so the callback still +/// gets a usable per-frame entry. +#[cfg(not(target_os = "linux"))] +fn symbolicate_trace(ips: &[usize]) -> Vec { + ips.iter().map(|ip| format!("{ip:#x}")).collect() +} + +/// Reads the kernel stack trace of a thread from procfs. +/// +/// This provides information about what the thread is doing in kernel space +/// (e.g., waiting in a futex, blocked on I/O, etc.), which complements the +/// user-space backtrace. +#[cfg(target_os = "linux")] +fn capture_kernel_stack(os_tid: u64) -> Option { + let pid = std::process::id(); + let path = format!("/proc/{pid}/task/{os_tid}/stack"); + std::fs::read_to_string(&path).ok() +} + + +// --- Per-worker monitor state --- + +#[derive(Clone, Copy, PartialEq)] +enum WorkerState { + Idle, + WaitingForResolution { + stall_start: std::time::Instant, + trace: usize, // index into stored_traces, stored_kernel_stacks, stored_thread_names, stored_blocking + escalated: bool, + }, +} + +/// Information about a detected stall event. +#[derive(Debug)] +pub struct StallInfo { + /// The worker index that stalled. + pub worker: usize, + /// How long the stall lasted (or has lasted so far for intermediate events). + pub duration: std::time::Duration, + /// Whether this is the final report (stall resolved) or an intermediate warning. + pub resolved: bool, + /// User-space stack trace frames (instruction pointers), if captured. + /// Use `backtrace::resolve` to symbolicate. + pub backtrace_frames: Vec, + /// Symbolicated form of `backtrace_frames`, one entry per IP. + /// + /// Populated on Linux when stack traces are available; empty otherwise + /// (non-Linux platforms, or when capture failed). Provided so consumers + /// such as Sentry forwarders can avoid re-symbolicating the same frames + /// the monitor thread already resolved for the log line. + pub symbolicated_frames: Vec, + /// Kernel stack trace, if available. + pub kernel_stack: Option, + /// Name of the stalled worker thread, captured at startup from + /// `std::thread::current().name()`. + /// + /// This is the value configured via `Builder::thread_name(...)` / + /// `Builder::thread_name_fn(...)` (default: `"tokio-runtime-worker"`), + /// not the kernel-truncated `comm`. `None` if the worker had no thread + /// name set. + pub thread_name: Option, +} + +/// Callback type for stall events. +pub(crate) type StallCallback = Arc; + +// --- Monitor configuration --- + +/// Configuration for the stall detection monitor thread. +pub(crate) struct StallDetectionConfig { + /// How often to poll generation counters. + pub(crate) poll_interval: std::time::Duration, + /// How long a stall must persist before emitting an intermediate "still stalled" warning. + pub(crate) escalation_threshold: std::time::Duration, + /// Optional callback for stall events. Stall events are always reported via + /// `tracing` regardless of whether a callback is set. + pub(crate) on_stall: Option, +} + +impl Clone for StallDetectionConfig { + fn clone(&self) -> Self { + Self { + poll_interval: self.poll_interval, + escalation_threshold: self.escalation_threshold, + on_stall: self.on_stall.clone(), + } + } +} + +impl std::fmt::Debug for StallDetectionConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StallDetectionConfig") + .field("poll_interval", &self.poll_interval) + .field("escalation_threshold", &self.escalation_threshold) + .field("on_stall", &self.on_stall.as_ref().map(|_| "")) + .finish() + } +} + +impl Default for StallDetectionConfig { + fn default() -> Self { + Self { + poll_interval: std::time::Duration::from_millis(100), + escalation_threshold: std::time::Duration::from_secs(10), + on_stall: None, + } + } +} + +// --- Monitor handle for lifecycle management --- + +/// Handle to the stall detection monitor thread. +/// +/// When dropped, signals the monitor to shut down and joins the thread. +pub(crate) struct StallMonitorHandle { + shutdown: Arc, + thread: Option>, +} + +impl std::fmt::Debug for StallMonitorHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StallMonitorHandle") + .field("shutdown", &self.shutdown.load(Ordering::Relaxed)) + .field("thread", &self.thread.as_ref().map(|t| t.thread().name())) + .finish() + } +} + +impl Drop for StallMonitorHandle { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + } +} + +/// Starts the stall detection monitor thread. +/// +/// Returns a `StallMonitorHandle` that will shut down the monitor when dropped. +pub(crate) fn start_monitor( + handle: crate::runtime::Handle, + config: StallDetectionConfig, +) -> StallMonitorHandle { + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + + let thread = std::thread::Builder::new() + .name("tokio-stall-monitor".to_string()) + .spawn(move || { + run_monitor(handle, config, shutdown_clone); + }) + .expect("failed to spawn stall detection monitor thread"); + + StallMonitorHandle { + shutdown, + thread: Some(thread), + } +} + +/// Main loop of the stall detection monitor thread. +fn run_monitor( + handle: crate::runtime::Handle, + config: StallDetectionConfig, + shutdown: Arc, +) { + #[cfg(target_os = "linux")] + { + init_validate_pipe(); + signal_impl::install_signal_handler(); + } + + let num_workers = handle.metrics().num_workers(); + let mut prev_gen = vec![0u64; num_workers]; + let mut worker_states = vec![WorkerState::Idle; num_workers]; + let mut stored_traces: Vec> = Vec::new(); + let mut stored_kernel_stacks: Vec> = Vec::new(); + let mut stored_blocking: Vec = Vec::new(); + let mut stored_thread_names: Vec> = Vec::new(); + + while !shutdown.load(Ordering::Relaxed) { + std::thread::sleep(config.poll_interval); + + let metrics = handle.metrics(); + + for i in 0..num_workers { + let gen = metrics.worker_poll_generation(i); + + match worker_states[i] { + WorkerState::Idle => { + if gen == prev_gen[i] && gen % 2 == 1 { + // Stall detected! Capture trace immediately. + let trace = capture_worker_trace(&metrics, i); + let kernel_stack = capture_worker_kernel_stack(&metrics, i); + let thread_name = capture_worker_thread_name_for(&metrics, i); + let blocking = + handle.inner.blocking_spawner().stall_detection_snapshot(); + let trace_idx = stored_traces.len(); + stored_traces.push(trace); + stored_kernel_stacks.push(kernel_stack); + stored_thread_names.push(thread_name); + stored_blocking.push(blocking); + worker_states[i] = WorkerState::WaitingForResolution { + stall_start: std::time::Instant::now(), + trace: trace_idx, + escalated: false, + }; + } + } + WorkerState::WaitingForResolution { + stall_start, + trace, + ref mut escalated, + } => { + if gen != prev_gen[i] { + // Stall resolved + let duration = stall_start.elapsed(); + emit_resolved( + &config.on_stall, + i, + duration, + &stored_traces[trace], + &stored_kernel_stacks[trace], + &stored_thread_names[trace], + stored_blocking[trace], + ); + worker_states[i] = WorkerState::Idle; + } else if !*escalated && stall_start.elapsed() > config.escalation_threshold { + // Stall is really bad - emit intermediate warning + let duration = stall_start.elapsed(); + emit_escalation( + &config.on_stall, + i, + duration, + &stored_traces[trace], + &stored_kernel_stacks[trace], + &stored_thread_names[trace], + stored_blocking[trace], + ); + *escalated = true; + } + } + } + + prev_gen[i] = gen; + } + } + + #[cfg(target_os = "linux")] + cleanup_validate_pipe(); +} + +/// Attempt to capture a stack trace from the given worker. +/// +/// On Linux, this sends a realtime signal to the worker thread and collects the +/// backtrace from the signal handler. On other platforms, returns an empty vec. +fn capture_worker_trace(metrics: &crate::runtime::RuntimeMetrics, worker: usize) -> Vec { + #[cfg(target_os = "linux")] + { + let os_tid = metrics.worker_os_thread_id(worker); + if os_tid != 0 { + signal_impl::capture_trace(os_tid) + } else { + vec![] + } + } + #[cfg(not(target_os = "linux"))] + { + let _ = (metrics, worker); + vec![] + } +} + +/// Attempt to capture the kernel stack trace from the given worker. +/// +/// On Linux, reads from /proc//task//stack. On other platforms, +/// returns None. +fn capture_worker_kernel_stack( + metrics: &crate::runtime::RuntimeMetrics, + worker: usize, +) -> Option { + #[cfg(target_os = "linux")] + { + let os_tid = metrics.worker_os_thread_id(worker); + if os_tid != 0 { + capture_kernel_stack(os_tid) + } else { + None + } + } + #[cfg(not(target_os = "linux"))] + { + let _ = (metrics, worker); + None + } +} + +/// Look up the worker's thread name as recorded by the worker at startup. +fn capture_worker_thread_name_for( + metrics: &crate::runtime::RuntimeMetrics, + worker: usize, +) -> Option { + metrics.worker_thread_name(worker).map(str::to_owned) +} + +/// Format a symbolicated stack trace and optional kernel stack into a string. +fn format_trace( + trace_ips: &[usize], + symbolicated: &[String], + kernel_stack: &Option, +) -> String { + use std::fmt::Write; + let mut output = String::new(); + + if trace_ips.is_empty() { + writeln!(output, " (no stack trace available)").unwrap(); + } else { + #[cfg(target_os = "linux")] + { + writeln!(output, "User-space stack trace at time of detection:").unwrap(); + for (j, frame) in symbolicated.iter().enumerate() { + writeln!(output, " #{j}: {frame}").unwrap(); + } + } + + #[cfg(not(target_os = "linux"))] + { + let _ = symbolicated; + writeln!(output, " (stack traces not supported on this platform)").unwrap(); + } + } + + match kernel_stack { + Some(ref ks) if !ks.trim().is_empty() => { + writeln!(output, "Kernel stack at time of detection:").unwrap(); + for line in ks.lines() { + if !line.is_empty() { + writeln!(output, " {line}").unwrap(); + } + } + } + _ => { + writeln!( + output, + "Kernel stack: (not available, may require CAP_SYS_PTRACE or root)" + ) + .unwrap(); + } + } + + output +} + +/// Emit a stall-resolved event via tracing and optionally via callback. +fn emit_resolved( + on_stall: &Option, + worker: usize, + duration: std::time::Duration, + trace_ips: &[usize], + kernel_stack: &Option, + thread_name: &Option, + blocking: BlockingPoolSnapshot, +) { + // Always emit via tracing for observability. + let symbolicated = symbolicate_trace(trace_ips); + let trace_str = format_trace(trace_ips, &symbolicated, kernel_stack); + tracing::warn!( + worker = worker, + thread_name = thread_name.as_deref(), + duration_ms = duration.as_millis() as u64, + blocking_threads = blocking.num_threads, + blocking_thread_cap = blocking.thread_cap, + blocking_idle = blocking.num_idle_threads, + blocking_queued = blocking.queue_depth, + "Scheduler stall on worker {} resolved after {:.1}ms\n{}", + worker, + duration.as_secs_f64() * 1000.0, + trace_str, + ); + + // Additionally call the on_stall callback if one is configured. + if let Some(ref cb) = *on_stall { + cb(StallInfo { + worker, + duration, + resolved: true, + backtrace_frames: trace_ips.to_vec(), + symbolicated_frames: symbolicated, + kernel_stack: kernel_stack.clone(), + thread_name: thread_name.clone(), + }); + } +} + +/// Emit an escalation warning via tracing and optionally via callback. +fn emit_escalation( + on_stall: &Option, + worker: usize, + duration: std::time::Duration, + trace_ips: &[usize], + kernel_stack: &Option, + thread_name: &Option, + blocking: BlockingPoolSnapshot, +) { + // Always emit via tracing for observability. + let symbolicated = symbolicate_trace(trace_ips); + let trace_str = format_trace(trace_ips, &symbolicated, kernel_stack); + tracing::error!( + worker = worker, + thread_name = thread_name.as_deref(), + duration_s = duration.as_secs(), + blocking_threads = blocking.num_threads, + blocking_thread_cap = blocking.thread_cap, + blocking_idle = blocking.num_idle_threads, + blocking_queued = blocking.queue_depth, + "Worker {} has been stalled for {:.1}s and counting!\n{}", + worker, + duration.as_secs_f64(), + trace_str, + ); + + // Additionally call the on_stall callback if one is configured. + if let Some(ref cb) = *on_stall { + cb(StallInfo { + worker, + duration, + resolved: false, + backtrace_frames: trace_ips.to_vec(), + symbolicated_frames: symbolicated, + kernel_stack: kernel_stack.clone(), + thread_name: thread_name.clone(), + }); + } +} diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 67c45693c9c..2ba8e0ba279 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -2,9 +2,7 @@ cfg_rt! { mod rt; pub(crate) use rt::RngSeedGenerator; - cfg_unstable! { - mod rt_unstable; - } + mod rt_unstable; } /// A seed for random number generation. diff --git a/tokio/tests/rt_stall_detection.rs b/tokio/tests/rt_stall_detection.rs new file mode 100644 index 00000000000..e4c4c6ebcf5 --- /dev/null +++ b/tokio/tests/rt_stall_detection.rs @@ -0,0 +1,443 @@ +#![warn(rust_2018_idioms)] +#![cfg(all( + feature = "stall-detection", + feature = "rt-multi-thread", + feature = "macros", + feature = "sync", + not(target_os = "wasi"), + target_has_atomic = "64" +))] + +use tokio::runtime::Runtime; + +#[test] +fn multi_thread_generation_starts_even() { + let rt = multi_thread(); + let metrics = rt.metrics(); + let gen = metrics.worker_poll_generation(0); + assert_eq!(gen % 2, 0, "generation should be even when idle"); +} + +#[test] +fn multi_thread_generation_even_after_work() { + let rt = multi_thread(); + let metrics = rt.metrics(); + + // Spawn actual tasks so the worker's run_task path increments the generation. + rt.block_on(async { + let mut handles = Vec::new(); + for _ in 0..10 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + let gen_after = metrics.worker_poll_generation(0); + assert_eq!( + gen_after % 2, + 0, + "generation should be even when idle after work" + ); +} + +#[test] +fn multi_thread_generation_advances() { + let rt = multi_thread(); + let metrics = rt.metrics(); + + let gen_before = metrics.worker_poll_generation(0); + + // Spawn actual tasks so the worker's run_task path increments the generation. + rt.block_on(async { + let mut handles = Vec::new(); + for _ in 0..50 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + let gen_after = metrics.worker_poll_generation(0); + assert_eq!(gen_after % 2, 0, "generation should be even when idle"); + assert!( + gen_after > gen_before, + "generation should have advanced: before={gen_before}, after={gen_after}" + ); +} + +#[test] +fn current_thread_generation_starts_even() { + let rt = current_thread(); + let metrics = rt.metrics(); + let gen = metrics.worker_poll_generation(0); + assert_eq!(gen % 2, 0, "generation should be even when idle"); +} + +#[test] +fn current_thread_generation_even_after_work() { + let rt = current_thread(); + let metrics = rt.metrics(); + + // Spawn actual tasks so the scheduler's run_task path increments the generation. + rt.block_on(async { + let mut handles = Vec::new(); + for _ in 0..10 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + let gen_after = metrics.worker_poll_generation(0); + assert_eq!( + gen_after % 2, + 0, + "generation should be even when idle after work" + ); +} + +#[test] +fn current_thread_generation_advances() { + let rt = current_thread(); + let metrics = rt.metrics(); + + let gen_before = metrics.worker_poll_generation(0); + + // Spawn actual tasks so the scheduler's run_task path increments the generation. + rt.block_on(async { + let mut handles = Vec::new(); + for _ in 0..50 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + let gen_after = metrics.worker_poll_generation(0); + assert_eq!(gen_after % 2, 0, "generation should be even when idle"); + assert!( + gen_after > gen_before, + "generation should have advanced: before={gen_before}, after={gen_after}" + ); +} + +fn multi_thread() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() +} + +fn current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +#[test] +fn multi_thread_enable_stall_detection_builds() { + // Verify that enable_stall_detection() doesn't panic or fail. + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_stall_detection() + .enable_all() + .build() + .unwrap(); + + // The runtime should work normally with stall detection enabled. + rt.block_on(async { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + }); +} + +#[test] +fn current_thread_enable_stall_detection_builds() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_stall_detection() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + }); +} + +#[test] +fn monitor_shuts_down_on_runtime_drop() { + // Build a runtime with stall detection, do some work, then drop it. + // The monitor thread should join cleanly (no hang or panic). + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_stall_detection() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let mut handles = Vec::new(); + for _ in 0..20 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + // Drop the runtime - the monitor should stop. + drop(rt); +} + +/// Test that the monitor detects a stall when a task blocks the worker. +/// We use std::thread::sleep inside a spawned task to simulate a blocking call. +/// The test verifies the runtime survives the stall detection cycle without panicking. +#[test] +fn monitor_detects_stall() { + use std::sync::{Arc, Barrier}; + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_stall_detection() + .enable_all() + .build() + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let barrier2 = barrier.clone(); + + // Spawn a task that blocks the worker for 300ms (enough for the 100ms + // poll interval to detect the stall). + rt.block_on(async { + let handle = tokio::spawn(async move { + std::thread::sleep(std::time::Duration::from_millis(300)); + barrier2.wait(); + }); + + // Wait for the blocking task to finish from a non-worker thread. + barrier.wait(); + handle.await.unwrap(); + }); + + // Give the monitor a moment to process the resolved stall. + std::thread::sleep(std::time::Duration::from_millis(200)); + + // The monitor should have detected and reported the stall via tracing. + // We verify the runtime survived the stall detection cycle without panicking. + drop(rt); +} + +/// Test that stall detection works with a custom short poll interval. +/// Uses a 50ms interval and a 500ms blocking sleep so the monitor has +/// multiple chances to observe the stall. +#[test] +fn detects_blocking_stall_with_short_interval() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_stall_detection() + .stall_detection_poll_interval(std::time::Duration::from_millis(50)) + .stall_detection_escalation_threshold(std::time::Duration::from_secs(60)) + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + // Simulate a stall - blocking the worker thread for 500ms. + // With a 50ms poll interval the monitor should detect the stall + // after ~100ms (two consecutive polls seeing the same odd generation). + std::thread::sleep(std::time::Duration::from_millis(500)); + }) + .await + .unwrap(); + }); + + // Give the monitor time to notice the stall resolved and emit the message. + std::thread::sleep(std::time::Duration::from_millis(200)); + + // The stall detection output is emitted via tracing. We verify the runtime + // didn't crash and that the generation counter reflects the completed work. + let metrics = rt.metrics(); + let gen = metrics.worker_poll_generation(0); + assert_eq!(gen % 2, 0, "generation should be even after stall resolves"); + assert!(gen > 0, "generation should have advanced"); + + drop(rt); +} + +/// Verify that the runtime shuts down promptly when stall detection is enabled. +/// The monitor thread should exit within one poll interval after shutdown is requested. +#[test] +fn runtime_shuts_down_promptly_with_stall_detection() { + let start = std::time::Instant::now(); + { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_stall_detection() + .stall_detection_poll_interval(std::time::Duration::from_millis(100)) + .build() + .unwrap(); + + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // rt is dropped here + } + let elapsed = start.elapsed(); + // Should shut down in well under 1 second + assert!( + elapsed < std::time::Duration::from_secs(1), + "Runtime took {:?} to shut down", + elapsed + ); +} + +/// Verify that the runtime shuts down cleanly even when a worker is actively +/// stalled (blocking in std::thread::sleep). The monitor is in its detection loop +/// while shutdown is requested. We use shutdown_timeout to avoid waiting for the +/// blocking task to finish (which is expected tokio behavior for normal drop). +#[test] +fn runtime_shuts_down_during_active_stall() { + let start = std::time::Instant::now(); + { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_stall_detection() + .stall_detection_poll_interval(std::time::Duration::from_millis(50)) + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + // Spawn a task that blocks a worker thread for a long time. + tokio::spawn(async { + std::thread::sleep(std::time::Duration::from_secs(30)); + }); + // Give the monitor time to detect the stall. + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + }); + + // Use shutdown_timeout so we don't wait for the blocking task. + // The stall monitor should still be stopped promptly. + rt.shutdown_timeout(std::time::Duration::from_millis(100)); + } + let elapsed = start.elapsed(); + // block_on takes ~200ms, shutdown_timeout allows 100ms for blocking pool, + // plus the monitor needs at most one poll_interval (50ms) to notice shutdown. + // Total should be well under 2 seconds. + assert!( + elapsed < std::time::Duration::from_secs(2), + "Runtime took {:?} to shut down during active stall", + elapsed + ); +} + +/// Verify that shutdown_timeout() also properly stops the monitor thread. +#[test] +fn shutdown_timeout_stops_monitor() { + let start = std::time::Instant::now(); + { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_stall_detection() + .stall_detection_poll_interval(std::time::Duration::from_millis(50)) + .build() + .unwrap(); + + rt.block_on(async { + tokio::task::yield_now().await; + }); + + rt.shutdown_timeout(std::time::Duration::from_millis(100)); + } + let elapsed = start.elapsed(); + assert!( + elapsed < std::time::Duration::from_secs(1), + "shutdown_timeout took {:?}", + elapsed + ); +} + +/// Verify that the on_stall callback receives a populated `StallInfo`, +/// including the worker thread name configured via `Builder::thread_name`. +/// +/// This in particular guards the multi-thread `fn run()` startup path that +/// records `WorkerMetrics::thread_name`: if a worker reaches its first stall +/// without that path having run, `thread_name` would still be unset. +#[test] +fn on_stall_callback_receives_thread_name() { + use std::sync::{Arc, Mutex}; + + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let events_cb = events.clone(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name("stall-test-worker") + .enable_stall_detection() + .stall_detection_poll_interval(std::time::Duration::from_millis(50)) + .stall_detection_escalation_threshold(std::time::Duration::from_secs(60)) + .on_stall(move |info| { + events_cb.lock().unwrap().push(info); + }) + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + // Block the worker long enough for the 50ms poller to see two + // consecutive identical odd generations. + std::thread::sleep(std::time::Duration::from_millis(500)); + }) + .await + .unwrap(); + }); + + // Give the monitor a moment to observe the resolved stall and run the callback. + std::thread::sleep(std::time::Duration::from_millis(300)); + drop(rt); + + let events = events.lock().unwrap(); + assert!( + !events.is_empty(), + "expected at least one stall event, got none" + ); + + let info = &events[0]; + assert_eq!(info.worker, 0); + assert!(info.duration >= std::time::Duration::from_millis(100)); + assert_eq!( + info.thread_name.as_deref(), + Some("stall-test-worker"), + "thread_name should reflect Builder::thread_name(...)" + ); + // backtrace_frames and symbolicated_frames must be the same length so + // consumers can zip them. + assert_eq!(info.backtrace_frames.len(), info.symbolicated_frames.len()); +}