Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
name: Benchmarks

on:
push:
branches: [main]
pull_request:
branches: [main]

permissions:
contents: write
pull-requests: write

env:
CARGO_TERM_COLOR: always

jobs:
benchmark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable

- name: Cache cargo
uses: Swatinem/rust-cache@v2

- name: Install critcmp
run: cargo install critcmp

- name: Run benchmarks
run: cargo bench --all-features -- --save-baseline current --output-format bencher 2>/dev/null | tee bench-output.txt

# ── Push to main: update timeline + cache baseline ──

- name: Update benchmark timeline
if: github.event_name == 'push'
uses: benchmark-action/github-action-benchmark@v1
with:
tool: cargo
output-file-path: bench-output.txt
gh-pages-branch: _benchmarks
benchmark-data-dir-path: timeline
auto-push: true
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Cache baseline for PR comparison
if: github.event_name == 'push'
run: critcmp --export current > baseline.json

- name: Save baseline to cache
if: github.event_name == 'push'
uses: actions/cache/save@v4
with:
path: baseline.json
key: benchmark-baseline-${{ github.sha }}

# ── Pull request: compare against cached baseline ──

- name: Restore baseline
if: github.event_name == 'pull_request'
uses: actions/cache/restore@v4
with:
path: baseline.json
key: benchmark-baseline-${{ github.event.pull_request.base.sha }}
restore-keys: benchmark-baseline-

- name: Compare benchmarks
if: github.event_name == 'pull_request'
id: compare
run: |
if [ ! -f baseline.json ]; then
echo "::warning::No benchmark baseline found. Merge to main to generate one."
exit 0
fi
critcmp --export current > pr.json
echo 'result<<EOF' >> $GITHUB_OUTPUT
critcmp baseline.json pr.json >> $GITHUB_OUTPUT
echo 'EOF' >> $GITHUB_OUTPUT

- name: Find existing comment
if: github.event_name == 'pull_request' && steps.compare.outputs.result
uses: peter-evans/find-comment@v3
id: find_comment
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: github-actions[bot]
body-includes: "## Benchmark Comparison"

- name: Post or update comment
if: github.event_name == 'pull_request' && steps.compare.outputs.result
uses: peter-evans/create-or-update-comment@v4
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.find_comment.outputs.comment-id }}
edit-mode: replace
body: |
## Benchmark Comparison

<details>
<summary>Click to expand</summary>

```
${{ steps.compare.outputs.result }}
```

</details>
20 changes: 20 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,23 @@ criterion = { version = "0.5", features = ["async_tokio"] }
[[bench]]
name = "scheduler"
harness = false

[[bench]]
name = "retry"
harness = false

[[bench]]
name = "dependencies"
harness = false

[[bench]]
name = "tags"
harness = false

[[bench]]
name = "groups"
harness = false

[[bench]]
name = "history"
harness = false
200 changes: 200 additions & 0 deletions benches/dependencies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//! Benchmarks for task dependency graph submission and dispatch.
//!
//! Run with: `cargo bench --bench dependencies`

use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

struct NoopExecutor;

impl TaskExecutor for NoopExecutor {
async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> {
Ok(())
}
}

async fn build_scheduler(max_concurrency: usize) -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.max_concurrency(max_concurrency)
.poll_interval(Duration::from_millis(10))
.build()
.await
.unwrap()
}

// ── Benchmarks ──────────────────────────────────────────────────────

/// Submission cost for a linear dependency chain: t0 → t1 → … → tN.
/// Measures store write cost, edge insertion, and per-submission cycle detection.
fn bench_dep_chain_submit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("dep_chain_submit");

for depth in [10usize, 50, 200] {
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter(|| async move {
let sched = build_scheduler(8).await;

let first = sched
.submit(&TaskSubmission::new("bench::test").key("d-0"))
.await
.unwrap()
.id()
.unwrap();

let mut prev = first;
for i in 1..depth {
prev = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("d-{i}"))
.depends_on(prev),
)
.await
.unwrap()
.id()
.unwrap();
}
});
});
}

group.finish();
}

/// End-to-end dispatch of a linear chain at varying depths.
/// Each task is blocked until its predecessor completes — the critical path
/// is fully sequential regardless of concurrency.
fn bench_dep_chain_dispatch(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("dep_chain_dispatch");
group.sample_size(20);

for depth in [10usize, 25, 50] {
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter(|| async move {
let sched = build_scheduler(8).await;

let first = sched
.submit(&TaskSubmission::new("bench::test").key("c-0"))
.await
.unwrap()
.id()
.unwrap();

let mut prev = first;
let mut last_id = first;
for i in 1..depth {
let id = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("c-{i}"))
.depends_on(prev),
)
.await
.unwrap()
.id()
.unwrap();
prev = id;
last_id = id;
}

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

// Wait for the final task in the chain to complete.
while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == last_id {
break;
}
}
}

token.cancel();
let _ = handle.await;
});
});
}

group.finish();
}

/// Fan-in: N independent tasks converging on a single aggregator.
/// Measures the `blocked → pending` transition cost when the last upstream completes.
fn bench_dep_fan_in_dispatch(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("dep_fan_in_dispatch");
group.sample_size(20);

for width in [10usize, 50, 100] {
group.bench_with_input(BenchmarkId::from_parameter(width), &width, |b, &width| {
b.to_async(&rt).iter(|| async move {
// Concurrency high enough to run all upstreams in parallel.
let sched = build_scheduler(width + 1).await;

let mut upstream_ids = Vec::with_capacity(width);
for i in 0..width {
let id = sched
.submit(&TaskSubmission::new("bench::test").key(format!("up-{i}")))
.await
.unwrap()
.id()
.unwrap();
upstream_ids.push(id);
}

let aggregator_id = sched
.submit(
&TaskSubmission::new("bench::test")
.key("aggregator")
.depends_on_all(upstream_ids),
)
.await
.unwrap()
.id()
.unwrap();

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == aggregator_id {
break;
}
}
}

token.cancel();
let _ = handle.await;
});
});
}

group.finish();
}

criterion_group!(
benches,
bench_dep_chain_submit,
bench_dep_chain_dispatch,
bench_dep_fan_in_dispatch,
);
criterion_main!(benches);
Loading
Loading