Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@ jobs:
- name: Cache cargo
uses: Swatinem/rust-cache@v2

# critcmp is not a project dependency so Swatinem/rust-cache won't cover it.
# Cache the compiled binary explicitly; bump the key suffix to force a reinstall.
- name: Cache critcmp binary
id: cache-critcmp
uses: actions/cache@v4
with:
path: ~/.cargo/bin/critcmp
key: critcmp-${{ runner.os }}

- name: Install critcmp
if: steps.cache-critcmp.outputs.cache-hit != 'true'
run: cargo install critcmp

- name: Build benchmarks
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sysinfo = { version = "0.33", optional = true }
[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
criterion = { version = "0.5", features = ["async_tokio"] }
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[[bench]]
name = "scheduler"
Expand Down
207 changes: 114 additions & 93 deletions benches/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
//!
//! Run with: `cargo bench --bench dependencies`

use std::time::Duration;
use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use taskmill::{
Domain, DomainKey, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
Expand Down Expand Up @@ -46,29 +47,33 @@ fn bench_dep_chain_submit(c: &mut Criterion) {

for depth in [10usize, 50, 200] {
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter(|| async move {
let sched = build_scheduler(8).await;

let first = sched
.submit(&TaskSubmission::new("bench::test").key("d-0"))
.await
.unwrap()
.id()
.unwrap();

let mut prev = first;
for i in 1..depth {
prev = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("d-{i}"))
.depends_on(prev),
)
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(8).await;
let start = Instant::now();
let first = sched
.submit(&TaskSubmission::new("bench::test").key("d-0"))
.await
.unwrap()
.id()
.unwrap();
let mut prev = first;
for i in 1..depth {
prev = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("d-{i}"))
.depends_on(prev),
)
.await
.unwrap()
.id()
.unwrap();
}
total += start.elapsed();
}
total
});
});
}
Expand All @@ -86,50 +91,55 @@ fn bench_dep_chain_dispatch(c: &mut Criterion) {

for depth in [10usize, 25, 50] {
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter(|| async move {
let sched = build_scheduler(8).await;

let first = sched
.submit(&TaskSubmission::new("bench::test").key("c-0"))
.await
.unwrap()
.id()
.unwrap();

let mut prev = first;
let mut last_id = first;
for i in 1..depth {
let id = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("c-{i}"))
.depends_on(prev),
)
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
let sched = build_scheduler(8).await;
let start = Instant::now();

let first = sched
.submit(&TaskSubmission::new("bench::test").key("c-0"))
.await
.unwrap()
.id()
.unwrap();
prev = id;
last_id = id;
}

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

// Wait for the final task in the chain to complete.
while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == last_id {
break;
let mut prev = first;
let mut last_id = first;
for i in 1..depth {
let id = sched
.submit(
&TaskSubmission::new("bench::test")
.key(format!("c-{i}"))
.depends_on(prev),
)
.await
.unwrap()
.id()
.unwrap();
prev = id;
last_id = id;
}

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == last_id {
break;
}
}
}
}

token.cancel();
let _ = handle.await;
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
}
Expand All @@ -146,59 +156,70 @@ fn bench_dep_fan_in_dispatch(c: &mut Criterion) {

for width in [10usize, 50, 100] {
group.bench_with_input(BenchmarkId::from_parameter(width), &width, |b, &width| {
b.to_async(&rt).iter(|| async move {
// Concurrency high enough to run all upstreams in parallel.
let sched = build_scheduler(width + 1).await;

let mut upstream_ids = Vec::with_capacity(width);
for i in 0..width {
let id = sched
.submit(&TaskSubmission::new("bench::test").key(format!("up-{i}")))
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
// Concurrency high enough to run all upstreams in parallel.
let sched = build_scheduler(width + 1).await;
let start = Instant::now();

let mut upstream_ids = Vec::with_capacity(width);
for i in 0..width {
let id = sched
.submit(&TaskSubmission::new("bench::test").key(format!("up-{i}")))
.await
.unwrap()
.id()
.unwrap();
upstream_ids.push(id);
}

let aggregator_id = sched
.submit(
&TaskSubmission::new("bench::test")
.key("aggregator")
.depends_on_all(upstream_ids),
)
.await
.unwrap()
.id()
.unwrap();
upstream_ids.push(id);
}

let aggregator_id = sched
.submit(
&TaskSubmission::new("bench::test")
.key("aggregator")
.depends_on_all(upstream_ids),
)
.await
.unwrap()
.id()
.unwrap();

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == aggregator_id {
break;
let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });

while let Ok(event) = rx.recv().await {
if let SchedulerEvent::Completed(h) = event {
if h.task_id == aggregator_id {
break;
}
}
}
}

token.cancel();
let _ = handle.await;
total += start.elapsed();
token.cancel();
let _ = handle.await;
}
total
});
});
}

group.finish();
}

criterion_group! {
name = submit_benches;
config = Criterion::default()
.with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None)));
targets = bench_dep_chain_submit
}
criterion_group!(
benches,
bench_dep_chain_submit,
dispatch_benches,
bench_dep_chain_dispatch,
bench_dep_fan_in_dispatch,
bench_dep_fan_in_dispatch
);
criterion_main!(benches);
criterion_main!(submit_benches, dispatch_benches);
Loading
Loading