Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Security notes:
# - GitHub-owned actions use tag pins, third-party actions use SHA pins
# - Only two external actions: actions/checkout (GitHub-owned) and
# jfrog/setup-jfrog-cli (allowlisted, handles OIDC auth)
# - Rust toolchain uses the runner's pre-installed rustup
# - The publish environment requires the anthropic-1.52.3 branch
name: Publish to Artifactory

on:
push:
branches:
- anthropic-1.52.3

permissions:
contents: read
id-token: write

jobs:
publish:
runs-on: ubuntu-latest
environment: publish
steps:
- uses: actions/checkout@v4

- name: Setup Rust
run: |
rustup default stable
rustup show

- name: Setup JFrog CLI
id: setup-jfrog
uses: jfrog/setup-jfrog-cli@ff5cb544114ffc152db9cea1cd3d5978d5074946 # v4.5.11
env:
JF_URL: https://artifactory.infra.ant.dev
with:
oidc-provider-name: github
oidc-audience: jfrog-github

- name: Configure Cargo Registry
run: |
mkdir -p ~/.cargo
cat >> ~/.cargo/config.toml << 'EOF'
[registries.crates-internal]
index = "sparse+https://artifactory.infra.ant.dev/artifactory/api/cargo/crates-internal/index/"
credential-provider = ["cargo:token"]
EOF

- name: Debug - Verify Access
env:
CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN: "Bearer ${{ steps.setup-jfrog.outputs.oidc-token }}"
run: |
echo "=== Token check ==="
echo "Token length: ${#CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN}"
echo "Token starts with Bearer: $(echo "$CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN" | grep -c '^Bearer ')"

echo ""
echo "=== JFrog CLI ping ==="
jf rt ping

echo ""
echo "=== Cargo config ==="
cat ~/.cargo/config.toml

echo ""
echo "=== curl test (with auth) ==="
curl -sI -H "Authorization: ${CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN}" \
"https://artifactory.infra.ant.dev/artifactory/api/cargo/crates-internal/index/to/ki/tokio" \
2>&1 | grep -E "HTTP|WWW-Auth" || true

- name: Publish tokio to Artifactory
env:
CARGO_REGISTRIES_CRATES_INTERNAL_TOKEN: "Bearer ${{ steps.setup-jfrog.outputs.oidc-token }}"
run: |
cd tokio
cargo publish --registry crates-internal --allow-dirty
46 changes: 46 additions & 0 deletions ANTHROPIC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Anthropic Tokio Fork

This is Anthropic's fork of [tokio](https://github.com/tokio-rs/tokio), published to our internal Artifactory registry as `crates-internal`.

## Version Convention

Versions use the upstream major.minor with a fork-owned patch number `P = N * 1000 + upstream_patch`, plus a constant `+anthropic` build-metadata tag:

- `1.49.1000+anthropic` = first fork release based on tokio 1.49.0
- `1.49.2000+anthropic` = second fork release
- `1.49.3001+anthropic` = third fork release, after rebasing onto tokio 1.49.1
- `1.52.7003+anthropic` = seventh fork release, after rebasing onto tokio 1.52.3

## Features Added

### `stall-detection`

Detects when a tokio worker thread is stalled (blocked in a task poll for too long) and reports diagnostics including stack traces via `tracing`.

```rust
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_stall_detection()
.stall_detection_poll_interval(Duration::from_millis(100))
.stall_detection_escalation_threshold(Duration::from_secs(10))
.build()
.unwrap();
```

## Publishing

Publishing happens automatically when changes are pushed to the `anthropic-1.52.3` branch. The GitHub Actions workflow uses OIDC authentication with Artifactory.

### Prerequisites

1. The `anthropics/tokio` repo must be added to the OIDC config in `anthropics/terraform-config`
2. A `publish-cli` GitHub environment must be configured in repo settings
3. The `jfrog/setup-jfrog-cli` action must be allowed in repo settings

## Using in the Monorepo

In the workspace `Cargo.toml`:

```toml
[patch.crates-io]
tokio = { version = "1.52.7003+anthropic", registry = "crates-internal" }
```
41 changes: 41 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Anthropic Tokio Fork - Development Guide

## Version Bumping

Every PR merged to an `anthropic-*` branch MUST bump the patch version in `tokio/Cargo.toml`.

### Files to update

- `tokio/Cargo.toml` - main tokio crate

The other workspace crates (`tokio-macros`, `tokio-stream`, `tokio-test`, `tokio-util`) are not published from this fork; the published `tokio` crate depends on them via crates.io.

### Version format

`<upstream_major>.<upstream_minor>.<P>+anthropic` where `P = N * 1000 + upstream_patch`

`N` is the anthropic release counter and never resets — it increments on every PR, including across rebases onto new upstream patch versions, so the version is always monotonic.

Examples based on upstream 1.49.0:
- `1.49.1000+anthropic` (first anthropic release)
- `1.49.2000+anthropic` (second anthropic release)

After rebasing onto upstream 1.49.1 (N continues):
- `1.49.3001+anthropic`
- `1.49.4001+anthropic`

The `+anthropic` suffix is a semver build metadata tag and does not affect dependency resolution.

## Publishing

Publishing to the `crates-internal` Artifactory registry happens automatically via GitHub Actions when changes are pushed to an `anthropic-*` branch. See `.github/workflows/publish.yml`.

## Stall Detection Feature

The `stall-detection` feature is our primary addition. See `ANTHROPIC.md` for user-facing documentation.

Key implementation files:
- `tokio/src/runtime/stall_detection.rs` - monitor thread, signal handler, frame-pointer walker
- `tokio/src/runtime/scheduler/multi_thread/worker.rs` - generation counter increments
- `tokio/src/runtime/metrics/worker.rs` - WorkerMetrics fields
- `tokio/src/runtime/builder.rs` - builder API methods
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ once_cell = "1.5.2"

[target.'cfg(target_os = "linux")'.dev-dependencies]
libc = "0.2"
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing", "stall-detection"] }

[target.'cfg(all(tokio_unstable, target_os = "linux"))'.dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing", "taskdump"] }
Expand Down Expand Up @@ -113,5 +114,9 @@ path = "dump.rs"
name = "prewarm-fd-table"
path = "prewarm-fd-table.rs"

[[example]]
name = "stall_detection"
path = "stall_detection.rs"

[lints]
workspace = true
125 changes: 125 additions & 0 deletions examples/stall_detection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//! Demonstrates tokio's stall detection feature.
//!
//! The stall detection monitor runs a background thread that periodically checks
//! whether any worker thread has been stuck in a single `poll()` call for too
//! long. When a stall is detected, it captures a stack trace (on Linux) and
//! reports the event via tracing and/or a user-provided callback.
//!
//! This example spawns:
//! - A "good" task that cooperates by yielding and using async sleep.
//! - A "bad" task that blocks the worker with `std::thread::sleep`.
//! - A "busy" task that hogs the CPU with a tight loop.
//!
//! Run with:
//!
//! cargo run --example stall_detection
//!
//! Expected output: the tracing subscriber shows WARN-level messages from the
//! stall detection monitor, including user-space and kernel stack traces. The
//! `on_stall` callback silently counts stall events for programmatic use.

#[cfg(target_os = "linux")]
fn main() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

// Set up a tracing subscriber so the monitor's tracing::warn! output is visible.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::WARN)
.init();

// Track how many stall events the callback receives.
let stall_count = Arc::new(AtomicUsize::new(0));
let stall_count_cb = stall_count.clone();

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
// Enable stall detection with a short poll interval for this demo.
// In production you would typically use the default (100ms) or tune
// it to your latency requirements.
.stall_detection_poll_interval(Duration::from_millis(50))
// Register a callback that fires whenever a stall is detected or resolved.
// The tracing output from the monitor is sufficient for display; the callback
// just counts stalls here to demonstrate programmatic access to stall events.
.on_stall(move |_info| {
stall_count_cb.fetch_add(1, Ordering::Relaxed);
})
.build()
.expect("failed to build runtime");

rt.block_on(async {
println!("=== Stall Detection Demo ===\n");

// --- Good task: cooperates with the runtime ---
let good = tokio::spawn(async {
println!("[good task] starting -- does async work and yields properly");
for i in 0..5 {
// Async sleep yields control back to the runtime.
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[good task] iteration {}", i + 1);
}
println!("[good task] done");
});

// Give the good task a moment to start.
tokio::time::sleep(Duration::from_millis(50)).await;

// --- Bad task: blocks the worker with std::thread::sleep ---
let bad_blocking = tokio::spawn(async {
println!("[bad task] starting -- about to block the worker for 500ms!");
// THIS IS THE BUG: using std::thread::sleep inside an async task
// blocks the entire worker thread. The stall detector will catch this.
std::thread::sleep(Duration::from_millis(500));
println!("[bad task] done blocking");
});

// Give the bad task a moment to start and get detected.
tokio::time::sleep(Duration::from_millis(100)).await;

// --- Busy task: hogs the CPU with a tight loop ---
let bad_busy = tokio::spawn(async {
println!("[busy task] starting -- about to spin the CPU for 300ms!");
// Another common mistake: a CPU-intensive loop that never yields.
let start = std::time::Instant::now();
let mut sum: u64 = 0;
while start.elapsed() < Duration::from_millis(300) {
sum = sum.wrapping_add(1);
}
// Use `sum` to prevent the loop from being optimized away.
println!("[busy task] done spinning (sum = {})", sum);
});

// Wait for all tasks to finish.
let _ = good.await;
let _ = bad_blocking.await;
let _ = bad_busy.await;

// Let the monitor have a final poll cycle to report resolved stalls.
tokio::time::sleep(Duration::from_millis(200)).await;

let total = stall_count.load(Ordering::Relaxed);
println!("\n=== Demo Complete ===");
println!("Total stall events reported: {}", total);
if total > 0 {
println!(
"The stall detector successfully caught the blocking operations!\n\
In production, use tokio::task::spawn_blocking() for blocking work."
);
} else {
println!(
"No stalls detected. This can happen if the OS scheduled things \
favorably. Try running again or increasing the blocking durations."
);
}
});
}

#[cfg(not(target_os = "linux"))]
fn main() {
println!(
"Stall detection is currently supported on Linux only.\n\
Run this example on a Linux system to see it in action."
);
}
2 changes: 1 addition & 1 deletion tokio-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = "tokio-macros"
# - Remove path dependencies (if any)
# - Update CHANGELOG.md.
# - Create "tokio-macros-x.y.z" git tag.
version = "2.7.0"
version = "2.7.0+anthropic.1"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = "tokio-stream"
# - Remove path dependencies (if any)
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
version = "0.1.18"
version = "0.1.18+anthropic.1"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = "tokio-test"
# - Remove path dependencies (if any)
# - Update CHANGELOG.md.
# - Create "tokio-test-0.4.x" git tag.
version = "0.4.5"
version = "0.4.5+anthropic.1"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = "tokio-util"
# - Remove path dependencies (if any)
# - Update CHANGELOG.md.
# - Create "tokio-util-0.7.x" git tag.
version = "0.7.18"
version = "0.7.18+anthropic.1"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
5 changes: 4 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.52.3"
version = "1.52.7003+anthropic"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down Expand Up @@ -81,6 +81,7 @@ signal = [
"windows-sys/Win32_Foundation",
"windows-sys/Win32_System_Console",
]
stall-detection = ["rt", "libc", "dep:backtrace", "tracing"]
sync = []
test-util = ["rt", "sync", "time"]
time = []
Expand All @@ -98,6 +99,8 @@ pin-project-lite = "0.2.11"
bytes = { version = "1.2.1", optional = true }
mio = { version = "1.2.0", optional = true, default-features = false }
parking_lot = { version = "0.12.0", optional = true }
backtrace = { version = "0.3.58", optional = true }
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true }

[target.'cfg(any(not(target_family = "wasm"), all(target_os = "wasi", not(target_env = "p1"))))'.dependencies]
socket2 = { version = "0.6.3", optional = true, features = ["all"] }
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,16 @@ macro_rules! cfg_metrics_variant {
}
}

macro_rules! cfg_stall_detection {
($($item:item)*) => {
$(
#[cfg(feature = "stall-detection")]
#[cfg_attr(docsrs, doc(cfg(feature = "stall-detection")))]
$item
)*
}
}

macro_rules! cfg_io_uring {
($($item:item)*) => {
$(
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};

cfg_stall_detection! {
pub(crate) use pool::BlockingPoolSnapshot;
}

cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;
}
Expand Down
Loading
Loading