diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index b849948..60a21f9 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -25,7 +25,17 @@ jobs: - name: Cache cargo uses: Swatinem/rust-cache@v2 + # critcmp is not a project dependency so Swatinem/rust-cache won't cover it. + # Cache the compiled binary explicitly; bump the key suffix to force a reinstall. + - name: Cache critcmp binary + id: cache-critcmp + uses: actions/cache@v4 + with: + path: ~/.cargo/bin/critcmp + key: critcmp-${{ runner.os }} + - name: Install critcmp + if: steps.cache-critcmp.outputs.cache-hit != 'true' run: cargo install critcmp - name: Build benchmarks diff --git a/Cargo.toml b/Cargo.toml index cf4cf4d..004e644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ sysinfo = { version = "0.33", optional = true } [dev-dependencies] tokio = { version = "1", features = ["full", "test-util"] } criterion = { version = "0.5", features = ["async_tokio"] } +pprof = { version = "0.13", features = ["flamegraph", "criterion"] } [[bench]] name = "scheduler" diff --git a/benches/dependencies.rs b/benches/dependencies.rs index 52d2ecb..e0f8bc2 100644 --- a/benches/dependencies.rs +++ b/benches/dependencies.rs @@ -2,9 +2,10 @@ //! //! Run with: `cargo bench --bench dependencies` -use std::time::Duration; +use std::time::{Duration, Instant}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use pprof::criterion::{Output, PProfProfiler}; use taskmill::{ Domain, DomainKey, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission, @@ -46,29 +47,33 @@ fn bench_dep_chain_submit(c: &mut Criterion) { for depth in [10usize, 50, 200] { group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| { - b.to_async(&rt).iter(|| async move { - let sched = build_scheduler(8).await; - - let first = sched - .submit(&TaskSubmission::new("bench::test").key("d-0")) - .await - .unwrap() - .id() - .unwrap(); - - let mut prev = first; - for i in 1..depth { - prev = sched - .submit( - &TaskSubmission::new("bench::test") - .key(format!("d-{i}")) - .depends_on(prev), - ) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(8).await; + let start = Instant::now(); + let first = sched + .submit(&TaskSubmission::new("bench::test").key("d-0")) .await .unwrap() .id() .unwrap(); + let mut prev = first; + for i in 1..depth { + prev = sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("d-{i}")) + .depends_on(prev), + ) + .await + .unwrap() + .id() + .unwrap(); + } + total += start.elapsed(); } + total }); }); } @@ -86,50 +91,55 @@ fn bench_dep_chain_dispatch(c: &mut Criterion) { for depth in [10usize, 25, 50] { group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| { - b.to_async(&rt).iter(|| async move { - let sched = build_scheduler(8).await; - - let first = sched - .submit(&TaskSubmission::new("bench::test").key("c-0")) - .await - .unwrap() - .id() - .unwrap(); - - let mut prev = first; - let mut last_id = first; - for i in 1..depth { - let id = sched - .submit( - &TaskSubmission::new("bench::test") - .key(format!("c-{i}")) - .depends_on(prev), - ) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(8).await; + let start = Instant::now(); + + let first = sched + .submit(&TaskSubmission::new("bench::test").key("c-0")) .await .unwrap() .id() .unwrap(); - prev = id; - last_id = id; - } - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); - - // Wait for the final task in the chain to complete. - while let Ok(event) = rx.recv().await { - if let SchedulerEvent::Completed(h) = event { - if h.task_id == last_id { - break; + let mut prev = first; + let mut last_id = first; + for i in 1..depth { + let id = sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("c-{i}")) + .depends_on(prev), + ) + .await + .unwrap() + .id() + .unwrap(); + prev = id; + last_id = id; + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + while let Ok(event) = rx.recv().await { + if let SchedulerEvent::Completed(h) = event { + if h.task_id == last_id { + break; + } } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); } @@ -146,48 +156,54 @@ fn bench_dep_fan_in_dispatch(c: &mut Criterion) { for width in [10usize, 50, 100] { group.bench_with_input(BenchmarkId::from_parameter(width), &width, |b, &width| { - b.to_async(&rt).iter(|| async move { - // Concurrency high enough to run all upstreams in parallel. - let sched = build_scheduler(width + 1).await; - - let mut upstream_ids = Vec::with_capacity(width); - for i in 0..width { - let id = sched - .submit(&TaskSubmission::new("bench::test").key(format!("up-{i}"))) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + // Concurrency high enough to run all upstreams in parallel. + let sched = build_scheduler(width + 1).await; + let start = Instant::now(); + + let mut upstream_ids = Vec::with_capacity(width); + for i in 0..width { + let id = sched + .submit(&TaskSubmission::new("bench::test").key(format!("up-{i}"))) + .await + .unwrap() + .id() + .unwrap(); + upstream_ids.push(id); + } + + let aggregator_id = sched + .submit( + &TaskSubmission::new("bench::test") + .key("aggregator") + .depends_on_all(upstream_ids), + ) .await .unwrap() .id() .unwrap(); - upstream_ids.push(id); - } - let aggregator_id = sched - .submit( - &TaskSubmission::new("bench::test") - .key("aggregator") - .depends_on_all(upstream_ids), - ) - .await - .unwrap() - .id() - .unwrap(); - - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); - - while let Ok(event) = rx.recv().await { - if let SchedulerEvent::Completed(h) = event { - if h.task_id == aggregator_id { - break; + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + while let Ok(event) = rx.recv().await { + if let SchedulerEvent::Completed(h) = event { + if h.task_id == aggregator_id { + break; + } } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); } @@ -195,10 +211,15 @@ fn bench_dep_fan_in_dispatch(c: &mut Criterion) { group.finish(); } +criterion_group! { + name = submit_benches; + config = Criterion::default() + .with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); + targets = bench_dep_chain_submit +} criterion_group!( - benches, - bench_dep_chain_submit, + dispatch_benches, bench_dep_chain_dispatch, - bench_dep_fan_in_dispatch, + bench_dep_fan_in_dispatch ); -criterion_main!(benches); +criterion_main!(submit_benches, dispatch_benches); diff --git a/benches/groups.rs b/benches/groups.rs index 29be289..fc9eaf1 100644 --- a/benches/groups.rs +++ b/benches/groups.rs @@ -2,7 +2,7 @@ //! //! Run with: `cargo bench --bench groups` -use std::time::Duration; +use std::time::{Duration, Instant}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use taskmill::{ @@ -51,24 +51,30 @@ fn bench_dispatch_no_groups(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("dispatch_no_groups_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor("test", NoopExecutor)) - .max_concurrency(8) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); - - for i in 0..500usize { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("ng-{i}"))) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor("test", NoopExecutor)) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)) + .build() .await .unwrap(); - } + let start = Instant::now(); + + for i in 0..500usize { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("ng-{i}"))) + .await + .unwrap(); + } - dispatch_all(&sched, 500).await; + dispatch_all(&sched, 500).await; + total += start.elapsed(); + } + total }); }); } @@ -79,29 +85,35 @@ fn bench_dispatch_one_group(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("dispatch_one_group_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor("test", NoopExecutor)) - .max_concurrency(8) - .group_concurrency("g0", 500) // high limit — no artificial throttling - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); - - for i in 0..500usize { - sched - .submit( - &TaskSubmission::new("bench::test") - .key(format!("og-{i}")) - .group("g0"), - ) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor("test", NoopExecutor)) + .max_concurrency(8) + .group_concurrency("g0", 500) // high limit — no artificial throttling + .poll_interval(Duration::from_millis(10)) + .build() .await .unwrap(); + let start = Instant::now(); + + for i in 0..500usize { + sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("og-{i}")) + .group("g0"), + ) + .await + .unwrap(); + } + + dispatch_all(&sched, 500).await; + total += start.elapsed(); } - - dispatch_all(&sched, 500).await; + total }); }); } @@ -118,37 +130,43 @@ fn bench_dispatch_group_scaling(c: &mut Criterion) { BenchmarkId::from_parameter(n_groups), &n_groups, |b, &n_groups| { - b.to_async(&rt).iter(|| async move { - let tasks_per_group = 500 / n_groups; - let total = tasks_per_group * n_groups; - - let mut builder = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor("test", NoopExecutor)) - .max_concurrency(8) - .poll_interval(Duration::from_millis(10)); - - // Register all groups up front so the gate map is fully populated. - for g in 0..n_groups { - builder = builder.group_concurrency(format!("grp-{g}"), 500); - } + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let tasks_per_group = 500 / n_groups; + let total_tasks = tasks_per_group * n_groups; + + let mut builder = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor("test", NoopExecutor)) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)); + + // Register all groups up front so the gate map is fully populated. + for g in 0..n_groups { + builder = builder.group_concurrency(format!("grp-{g}"), 500); + } - let sched = builder.build().await.unwrap(); - - for g in 0..n_groups { - for t in 0..tasks_per_group { - sched - .submit( - &TaskSubmission::new("bench::test") - .key(format!("mg-{g}-{t}")) - .group(format!("grp-{g}")), - ) - .await - .unwrap(); + let sched = builder.build().await.unwrap(); + let start = Instant::now(); + + for g in 0..n_groups { + for t in 0..tasks_per_group { + sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("mg-{g}-{t}")) + .group(format!("grp-{g}")), + ) + .await + .unwrap(); + } } - } - dispatch_all(&sched, total).await; + dispatch_all(&sched, total_tasks).await; + total += start.elapsed(); + } + total }); }, ); diff --git a/benches/history.rs b/benches/history.rs index 834b435..3500be0 100644 --- a/benches/history.rs +++ b/benches/history.rs @@ -66,20 +66,25 @@ fn bench_history_query(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_query"); group.sample_size(20); + group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { + let sched = rt.block_on(build_scheduler_with_history(history_size)); + let store = sched.store().clone(); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, - |b, &history_size| { - b.to_async(&rt).iter_custom(|iters| async move { - let sched = build_scheduler_with_history(history_size).await; - let store = sched.store(); - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store.history(50, 0).await.unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history(50, 0).await.unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); @@ -93,20 +98,25 @@ fn bench_history_stats(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_stats"); group.sample_size(20); + group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { + let sched = rt.block_on(build_scheduler_with_history(history_size)); + let store = sched.store().clone(); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, - |b, &history_size| { - b.to_async(&rt).iter_custom(|iters| async move { - let sched = build_scheduler_with_history(history_size).await; - let store = sched.store(); - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store.history_stats("bench::test").await.unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history_stats("bench::test").await.unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); @@ -120,20 +130,25 @@ fn bench_history_by_type(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_by_type"); group.sample_size(20); + group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { + let sched = rt.block_on(build_scheduler_with_history(history_size)); + let store = sched.store().clone(); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, - |b, &history_size| { - b.to_async(&rt).iter_custom(|iters| async move { - let sched = build_scheduler_with_history(history_size).await; - let store = sched.store(); - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store.history_by_type("bench::test", 100).await.unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history_by_type("bench::test", 100).await.unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); diff --git a/benches/retry.rs b/benches/retry.rs index cb713a5..240e521 100644 --- a/benches/retry.rs +++ b/benches/retry.rs @@ -2,7 +2,7 @@ //! //! Run with: `cargo bench --bench retry` -use std::time::Duration; +use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use taskmill::{ @@ -98,42 +98,50 @@ fn bench_dispatch_permanent_failure(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("dispatch_permanent_failure_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor("fail", FailPermanentExecutor)) - .max_concurrency(8) - .max_retries(0) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); - - for i in 0..500 { - sched - .submit(&TaskSubmission::new("bench::fail").key(format!("pf-{i}"))) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().raw_executor("fail", FailPermanentExecutor), + ) + .max_concurrency(8) + .max_retries(0) + .poll_interval(Duration::from_millis(10)) + .build() .await .unwrap(); - } + let start = Instant::now(); - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); - - let mut terminal = 0; - while terminal < 500 { - if let Ok(SchedulerEvent::Failed { - will_retry: false, .. - }) = rx.recv().await - { - terminal += 1; + for i in 0..500 { + sched + .submit(&TaskSubmission::new("bench::fail").key(format!("pf-{i}"))) + .await + .unwrap(); } - } - token.cancel(); - let _ = handle.await; + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + let mut terminal = 0; + while terminal < 500 { + if let Ok(SchedulerEvent::Failed { + will_retry: false, .. + }) = rx.recv().await + { + terminal += 1; + } + } + + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); } @@ -184,44 +192,53 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { max_retries: 2, }; group.bench_function(*name, |b| { - b.to_async(&rt).iter(|| { + let policy = policy.clone(); + b.to_async(&rt).iter_custom(move |iters| { let policy = policy.clone(); async move { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain( - Domain::::new() - .raw_executor("fail", FailRetryableExecutor) - .default_retry(policy), - ) - .max_concurrency(8) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); - - for i in 0..100 { - sched - .submit(&TaskSubmission::new("bench::fail").key(format!("rf-{i}"))) + let mut total = Duration::ZERO; + for _ in 0..iters { + let policy = policy.clone(); + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .raw_executor("fail", FailRetryableExecutor) + .default_retry(policy), + ) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)) + .build() .await .unwrap(); - } + let start = Instant::now(); - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + for i in 0..100 { + sched + .submit(&TaskSubmission::new("bench::fail").key(format!("rf-{i}"))) + .await + .unwrap(); + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = + tokio::spawn(async move { sched_clone.run(token_clone).await }); - let mut dead_lettered = 0; - while dead_lettered < 100 { - if let Ok(SchedulerEvent::DeadLettered { .. }) = rx.recv().await { - dead_lettered += 1; + let mut dead_lettered = 0; + while dead_lettered < 100 { + if let Ok(SchedulerEvent::DeadLettered { .. }) = rx.recv().await { + dead_lettered += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total } }); }); diff --git a/benches/scheduler.rs b/benches/scheduler.rs index a93179e..658d6bd 100644 --- a/benches/scheduler.rs +++ b/benches/scheduler.rs @@ -2,6 +2,8 @@ //! //! Run with: `cargo bench -p taskmill` +use std::time::{Duration, Instant}; + use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use taskmill::{ Domain, DomainKey, Priority, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, @@ -65,14 +67,20 @@ fn bench_submit(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("submit_1000_tasks", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(4).await; - for i in 0..1000 { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("s-{i}"))) - .await - .unwrap(); + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(4).await; + let start = Instant::now(); + for i in 0..1000 { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("s-{i}"))) + .await + .unwrap(); + } + total += start.elapsed(); } + total }); }); } @@ -81,20 +89,26 @@ fn bench_submit_dedup_hit(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("submit_dedup_hit_1000", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(4).await; - // First submit creates the task. - sched - .submit(&TaskSubmission::new("bench::test").key("same-key")) - .await - .unwrap(); - // Subsequent submits hit the dedup path. - for _ in 0..999 { + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(4).await; + // First submit creates the task — not measured. sched .submit(&TaskSubmission::new("bench::test").key("same-key")) .await .unwrap(); + let start = Instant::now(); + // Subsequent submits hit the dedup path. + for _ in 0..999 { + sched + .submit(&TaskSubmission::new("bench::test").key("same-key")) + .await + .unwrap(); + } + total += start.elapsed(); } + total }); }); } @@ -103,33 +117,39 @@ fn bench_dispatch_and_complete(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("dispatch_and_complete_1000", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(8).await; - - for i in 0..1000 { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("d-{i}"))) - .await - .unwrap(); - } + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(8).await; + let start = Instant::now(); + + for i in 0..1000 { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("d-{i}"))) + .await + .unwrap(); + } - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); - let mut completed = 0; - while completed < 1000 { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; + let mut completed = 0; + while completed < 1000 { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); } @@ -139,18 +159,26 @@ fn bench_peek_next_varying_depth(c: &mut Criterion) { let mut group = c.benchmark_group("peek_next"); for size in [100, 1000, 5000] { - group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| { - b.to_async(&rt).iter(|| async move { - let store = TaskStore::open_memory().await.unwrap(); - for i in 0..size { - store - .submit(&TaskSubmission::new("test").key(format!("pk-{i}"))) - .await - .unwrap(); - } - // Bench just the peek_next call. - for _ in 0..100 { - let _ = store.peek_next().await.unwrap(); + let store = rt.block_on(async { + let store = TaskStore::open_memory().await.unwrap(); + for i in 0..size { + store + .submit(&TaskSubmission::new("test").key(format!("pk-{i}"))) + .await + .unwrap(); + } + store + }); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = store.peek_next().await.unwrap(); + } + start.elapsed() } }); }); @@ -168,33 +196,39 @@ fn bench_concurrency_scaling(c: &mut Criterion) { BenchmarkId::from_parameter(concurrency), &concurrency, |b, &concurrency| { - b.to_async(&rt).iter(|| async move { - let sched = build_scheduler(concurrency).await; - - for i in 0..500 { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("cs-{i}"))) - .await - .unwrap(); - } + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(concurrency).await; + let start = Instant::now(); + + for i in 0..500 { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("cs-{i}"))) + .await + .unwrap(); + } - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); - - let mut completed = 0; - while completed < 500 { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + let mut completed = 0; + while completed < 500 { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }, ); @@ -207,12 +241,18 @@ fn bench_batch_submit(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("batch_submit_1000", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(4).await; - let submissions: Vec<_> = (0..1000) - .map(|i| TaskSubmission::new("bench::test").key(format!("b-{i}"))) - .collect(); - sched.submit_batch(&submissions).await.unwrap(); + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(4).await; + let submissions: Vec<_> = (0..1000) + .map(|i| TaskSubmission::new("bench::test").key(format!("b-{i}"))) + .collect(); + let start = Instant::now(); + sched.submit_batch(&submissions).await.unwrap(); + total += start.elapsed(); + } + total }); }); } @@ -221,46 +261,52 @@ fn bench_mixed_priority_dispatch(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("mixed_priority_dispatch_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(4).await; - - let priorities = [ - Priority::IDLE, - Priority::BACKGROUND, - Priority::NORMAL, - Priority::HIGH, - Priority::REALTIME, - ]; - - for i in 0..500 { - let priority = priorities[i % priorities.len()]; - sched - .submit( - &TaskSubmission::new("bench::test") - .key(format!("mp-{i}")) - .priority(priority), - ) - .await - .unwrap(); - } + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(4).await; + let start = Instant::now(); + + let priorities = [ + Priority::IDLE, + Priority::BACKGROUND, + Priority::NORMAL, + Priority::HIGH, + Priority::REALTIME, + ]; + + for i in 0..500 { + let priority = priorities[i % priorities.len()]; + sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("mp-{i}")) + .priority(priority), + ) + .await + .unwrap(); + } - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); - let mut completed = 0; - while completed < 500 { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; + let mut completed = 0; + while completed < 500 { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); } @@ -271,79 +317,91 @@ fn bench_byte_progress_overhead(c: &mut Criterion) { // Baseline: NoopExecutor (no byte reporting). group.bench_function("noop_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = build_scheduler(8).await; - - for i in 0..500 { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("bp-noop-{i}"))) - .await - .unwrap(); - } + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = build_scheduler(8).await; + let start = Instant::now(); + + for i in 0..500 { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("bp-noop-{i}"))) + .await + .unwrap(); + } - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); - let mut completed = 0; - while completed < 500 { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; + let mut completed = 0; + while completed < 500 { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); // With byte progress reporting: 1MB in 1KB chunks per task. group.bench_function("byte_reporting_500", |b| { - b.to_async(&rt).iter(|| async { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor( - "byte-test", - ByteProgressExecutor { - total: 1_048_576, - chunk_size: 1024, - }, - )) - .max_concurrency(8) - .poll_interval(std::time::Duration::from_millis(10)) - .progress_interval(std::time::Duration::from_millis(100)) - .build() - .await - .unwrap(); - - for i in 0..500 { - sched - .submit(&TaskSubmission::new("bench::byte-test").key(format!("bp-{i}"))) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor( + "byte-test", + ByteProgressExecutor { + total: 1_048_576, + chunk_size: 1024, + }, + )) + .max_concurrency(8) + .poll_interval(std::time::Duration::from_millis(10)) + .progress_interval(std::time::Duration::from_millis(100)) + .build() .await .unwrap(); - } + let start = Instant::now(); - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); + for i in 0..500 { + sched + .submit(&TaskSubmission::new("bench::byte-test").key(format!("bp-{i}"))) + .await + .unwrap(); + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); - let mut completed = 0; - while completed < 500 { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; + let mut completed = 0; + while completed < 500 { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } } - } - token.cancel(); - let _ = handle.await; + total += start.elapsed(); + token.cancel(); + let _ = handle.await; + } + total }); }); @@ -354,47 +412,53 @@ fn bench_byte_progress_snapshot(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("byte_progress_snapshot_100_tasks", |b| { - b.to_async(&rt).iter(|| async { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor( - "byte-test", - ByteProgressExecutor { - total: 10_485_760, - chunk_size: 65_536, - }, - )) - .max_concurrency(100) - .poll_interval(std::time::Duration::from_millis(10)) - .build() - .await - .unwrap(); - - // Submit and dispatch 100 tasks. - for i in 0..100 { - sched - .submit(&TaskSubmission::new("bench::byte-test").key(format!("snap-{i}"))) + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor( + "byte-test", + ByteProgressExecutor { + total: 10_485_760, + chunk_size: 65_536, + }, + )) + .max_concurrency(100) + .poll_interval(std::time::Duration::from_millis(10)) + .build() .await .unwrap(); - } - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - let handle = tokio::spawn(async move { - sched_clone.run(token_clone).await; - }); + // Submit and dispatch 100 tasks. + for i in 0..100 { + sched + .submit(&TaskSubmission::new("bench::byte-test").key(format!("snap-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); - // Small sleep to let tasks start. - tokio::time::sleep(std::time::Duration::from_millis(5)).await; + // Small sleep to let tasks start — not timed. + tokio::time::sleep(std::time::Duration::from_millis(5)).await; - // Bench the snapshot call with byte_progress. - for _ in 0..100 { - let _ = sched.snapshot().await; - } + let start = Instant::now(); + // Bench the snapshot call with byte_progress. + for _ in 0..100 { + let _ = sched.snapshot().await; + } + total += start.elapsed(); - token.cancel(); - let _ = handle.await; + token.cancel(); + let _ = handle.await; + } + total }); }); } diff --git a/benches/tags.rs b/benches/tags.rs index 26f9f39..6423e96 100644 --- a/benches/tags.rs +++ b/benches/tags.rs @@ -2,7 +2,7 @@ //! //! Run with: `cargo bench --bench tags` -use std::time::Duration; +use std::time::{Duration, Instant}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use taskmill::{ @@ -52,23 +52,28 @@ fn bench_submit_with_tags(c: &mut Criterion) { BenchmarkId::from_parameter(tag_count), &tag_count, |b, &tag_count| { - b.to_async(&rt).iter(|| async move { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().raw_executor("test", NoopExecutor)) - .max_concurrency(4) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); - - for i in 0..500 { - let mut sub = TaskSubmission::new("bench::test").key(format!("st-{i}")); - for t in 0..tag_count { - sub = sub.tag(format!("key-{t}"), format!("val-{i}-{t}")); + b.to_async(&rt).iter_custom(|iters| async move { + let mut total = Duration::ZERO; + for _ in 0..iters { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor("test", NoopExecutor)) + .max_concurrency(4) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + let start = Instant::now(); + for i in 0..500 { + let mut sub = TaskSubmission::new("bench::test").key(format!("st-{i}")); + for t in 0..tag_count { + sub = sub.tag(format!("key-{t}"), format!("val-{i}-{t}")); + } + sched.submit(&sub).await.unwrap(); } - sched.submit(&sub).await.unwrap(); + total += start.elapsed(); } + total }); }, ); @@ -84,20 +89,24 @@ fn bench_query_by_tags(c: &mut Criterion) { let mut group = c.benchmark_group("query_by_tags"); for queue_depth in [100usize, 1000, 5000] { + let store = rt.block_on(store_with_tagged_tasks(queue_depth)); group.bench_with_input( BenchmarkId::from_parameter(queue_depth), &queue_depth, - |b, &queue_depth| { - b.to_async(&rt).iter_custom(|iters| async move { - let store = store_with_tagged_tasks(queue_depth).await; - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store - .tasks_by_tags(&[("bucket", "b0")], None) - .await - .unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = store + .tasks_by_tags(&[("bucket", "b0")], None) + .await + .unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); @@ -113,20 +122,24 @@ fn bench_count_by_tags(c: &mut Criterion) { let mut group = c.benchmark_group("count_by_tags"); for queue_depth in [100usize, 1000, 5000] { + let store = rt.block_on(store_with_tagged_tasks(queue_depth)); group.bench_with_input( BenchmarkId::from_parameter(queue_depth), &queue_depth, - |b, &queue_depth| { - b.to_async(&rt).iter_custom(|iters| async move { - let store = store_with_tagged_tasks(queue_depth).await; - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store - .count_by_tags(&[("bucket", "b0")], None) - .await - .unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = store + .count_by_tags(&[("bucket", "b0")], None) + .await + .unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); @@ -142,27 +155,34 @@ fn bench_tag_values_scan(c: &mut Criterion) { let mut group = c.benchmark_group("tag_values"); for queue_depth in [100usize, 1000, 5000] { + let store = rt.block_on(async { + let store = TaskStore::open_memory().await.unwrap(); + for i in 0..queue_depth { + store + .submit( + &TaskSubmission::new("bench::test") + .key(format!("tv-{i}")) + .tag("bucket", format!("b{}", i % 10)), + ) + .await + .unwrap(); + } + store + }); group.bench_with_input( BenchmarkId::from_parameter(queue_depth), &queue_depth, - |b, &queue_depth| { - b.to_async(&rt).iter_custom(|iters| async move { - let store = TaskStore::open_memory().await.unwrap(); - for i in 0..queue_depth { - store - .submit( - &TaskSubmission::new("bench::test") - .key(format!("tv-{i}")) - .tag("bucket", format!("b{}", i % 10)), - ) - .await - .unwrap(); - } - let start = std::time::Instant::now(); - for _ in 0..iters { - let _ = store.tag_values("bucket").await.unwrap(); + |b, _| { + let store = store.clone(); + b.to_async(&rt).iter_custom(|iters| { + let store = store.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = store.tag_values("bucket").await.unwrap(); + } + start.elapsed() } - start.elapsed() }); }, ); diff --git a/examples/profile_dep_chain.rs b/examples/profile_dep_chain.rs new file mode 100644 index 0000000..4558879 --- /dev/null +++ b/examples/profile_dep_chain.rs @@ -0,0 +1,109 @@ +/// One-shot timing breakdown of dep_chain_submit to identify where time is spent. +/// Run with: cargo run --release --example profile_dep_chain +use std::time::{Duration, Instant}; + +use taskmill::{ + Domain, DomainKey, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission, +}; + +struct BenchDomain; +impl DomainKey for BenchDomain { + const NAME: &'static str = "bench"; +} + +struct NoopExecutor; +impl TaskExecutor for NoopExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) + } +} + +async fn build_scheduler() -> Scheduler { + Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().raw_executor("test", NoopExecutor)) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap() +} + +async fn run(depth: usize, iters: u32) { + // Warm up the allocator / SQLite page cache with a throw-away run. + let _ = build_scheduler().await; + + let mut build_times = Vec::with_capacity(iters as usize); + let mut first_submit_times = Vec::with_capacity(iters as usize); + let mut chain_submit_times = Vec::with_capacity(iters as usize); + + for _ in 0..iters { + let t0 = Instant::now(); + let sched = build_scheduler().await; + build_times.push(t0.elapsed()); + + let t1 = Instant::now(); + let first = sched + .submit(&TaskSubmission::new("bench::test").key("d-0")) + .await + .unwrap() + .id() + .unwrap(); + first_submit_times.push(t1.elapsed()); + + let t2 = Instant::now(); + let mut prev = first; + for i in 1..depth { + prev = sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("d-{i}")) + .depends_on(prev), + ) + .await + .unwrap() + .id() + .unwrap(); + } + chain_submit_times.push(t2.elapsed()); + } + + let avg = |v: &[Duration]| -> Duration { v.iter().sum::() / v.len() as u32 }; + let med = |v: &mut Vec| -> Duration { + v.sort(); + v[v.len() / 2] + }; + + let total_avg = avg(&build_times) + avg(&first_submit_times) + avg(&chain_submit_times); + let per_chained_submit_avg = avg(&chain_submit_times) + .checked_div((depth - 1) as u32) + .unwrap_or_default(); + + println!("depth={depth} iters={iters}"); + println!( + " build_scheduler avg={:>8.3?} med={:>8.3?}", + avg(&build_times), + med(&mut build_times) + ); + println!( + " first submit avg={:>8.3?} med={:>8.3?}", + avg(&first_submit_times), + med(&mut first_submit_times) + ); + println!( + " chain submits avg={:>8.3?} med={:>8.3?} ({} calls, {:.3?}/call)", + avg(&chain_submit_times), + med(&mut chain_submit_times), + depth - 1, + per_chained_submit_avg, + ); + println!(" total avg={total_avg:>8.3?}"); + println!(); +} + +#[tokio::main] +async fn main() { + for depth in [10, 50, 200] { + run(depth, 20).await; + } +}