From e0d84357440b2889f5f3121540d8ae3f22afda63 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Wed, 18 Mar 2026 03:26:54 -0700 Subject: [PATCH 1/4] bench: add coverage for retry, dependencies, tags, groups, and history MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds five new Criterion benchmark suites to cover gaps identified in the existing happy-path-only bench suite: - benches/retry.rs — backoff delay computation (all 4 strategies) plus E2E permanent-failure and retryable dead-letter paths - benches/dependencies.rs — chain submission cost, sequential dispatch critical path, and fan-in blocked→pending transition - benches/tags.rs — submission overhead by tag count, tasks_by_tags / count_by_tags / tag_values queries at varying depths - benches/groups.rs — gate-check overhead baseline, single-group lookup, and group-map scaling (1–100 groups) - benches/history.rs — paginated history query, aggregate stats, and filter-by-type at 100/1000/5000 history rows All E2E benches use iter_custom where setup must be isolated from the measured portion. History and tag query benches pre-populate the store once per sample before starting the timer. --- Cargo.toml | 20 ++++ benches/dependencies.rs | 200 +++++++++++++++++++++++++++++++++++ benches/groups.rs | 162 ++++++++++++++++++++++++++++ benches/history.rs | 148 ++++++++++++++++++++++++++ benches/retry.rs | 228 ++++++++++++++++++++++++++++++++++++++++ benches/tags.rs | 176 +++++++++++++++++++++++++++++++ 6 files changed, 934 insertions(+) create mode 100644 benches/dependencies.rs create mode 100644 benches/groups.rs create mode 100644 benches/history.rs create mode 100644 benches/retry.rs create mode 100644 benches/tags.rs diff --git a/Cargo.toml b/Cargo.toml index 02417c7..06ebb99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,23 @@ criterion = { version = "0.5", features = ["async_tokio"] } [[bench]] name = "scheduler" harness = false + +[[bench]] +name = "retry" +harness = false + +[[bench]] +name = "dependencies" +harness = false + +[[bench]] +name = "tags" +harness = false + +[[bench]] +name = "groups" +harness = false + +[[bench]] +name = "history" +harness = false diff --git a/benches/dependencies.rs b/benches/dependencies.rs new file mode 100644 index 0000000..57a3df7 --- /dev/null +++ b/benches/dependencies.rs @@ -0,0 +1,200 @@ +//! Benchmarks for task dependency graph submission and dispatch. +//! +//! Run with: `cargo bench --bench dependencies` + +use std::sync::Arc; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use taskmill::{ + Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore, + TaskSubmission, +}; +use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; + +struct NoopExecutor; + +impl TaskExecutor for NoopExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) + } +} + +async fn build_scheduler(max_concurrency: usize) -> Scheduler { + Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(max_concurrency) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap() +} + +// ── Benchmarks ────────────────────────────────────────────────────── + +/// Submission cost for a linear dependency chain: t0 → t1 → … → tN. +/// Measures store write cost, edge insertion, and per-submission cycle detection. +fn bench_dep_chain_submit(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("dep_chain_submit"); + + for depth in [10usize, 50, 200] { + group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| { + b.to_async(&rt).iter(|| async move { + let sched = build_scheduler(8).await; + + let first = sched + .submit(&TaskSubmission::new("bench::test").key("d-0")) + .await + .unwrap() + .id() + .unwrap(); + + let mut prev = first; + for i in 1..depth { + prev = sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("d-{i}")) + .depends_on(prev), + ) + .await + .unwrap() + .id() + .unwrap(); + } + }); + }); + } + + group.finish(); +} + +/// End-to-end dispatch of a linear chain at varying depths. +/// Each task is blocked until its predecessor completes — the critical path +/// is fully sequential regardless of concurrency. +fn bench_dep_chain_dispatch(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("dep_chain_dispatch"); + group.sample_size(20); + + for depth in [10usize, 25, 50] { + group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| { + b.to_async(&rt).iter(|| async move { + let sched = build_scheduler(8).await; + + let first = sched + .submit(&TaskSubmission::new("bench::test").key("c-0")) + .await + .unwrap() + .id() + .unwrap(); + + let mut prev = first; + let mut last_id = first; + for i in 1..depth { + let id = sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("c-{i}")) + .depends_on(prev), + ) + .await + .unwrap() + .id() + .unwrap(); + prev = id; + last_id = id; + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + // Wait for the final task in the chain to complete. + while let Ok(event) = rx.recv().await { + if let SchedulerEvent::Completed(h) = event { + if h.task_id == last_id { + break; + } + } + } + + token.cancel(); + let _ = handle.await; + }); + }); + } + + group.finish(); +} + +/// Fan-in: N independent tasks converging on a single aggregator. +/// Measures the `blocked → pending` transition cost when the last upstream completes. +fn bench_dep_fan_in_dispatch(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("dep_fan_in_dispatch"); + group.sample_size(20); + + for width in [10usize, 50, 100] { + group.bench_with_input(BenchmarkId::from_parameter(width), &width, |b, &width| { + b.to_async(&rt).iter(|| async move { + // Concurrency high enough to run all upstreams in parallel. + let sched = build_scheduler(width + 1).await; + + let mut upstream_ids = Vec::with_capacity(width); + for i in 0..width { + let id = sched + .submit(&TaskSubmission::new("bench::test").key(format!("up-{i}"))) + .await + .unwrap() + .id() + .unwrap(); + upstream_ids.push(id); + } + + let aggregator_id = sched + .submit( + &TaskSubmission::new("bench::test") + .key("aggregator") + .depends_on_all(upstream_ids), + ) + .await + .unwrap() + .id() + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + while let Ok(event) = rx.recv().await { + if let SchedulerEvent::Completed(h) = event { + if h.task_id == aggregator_id { + break; + } + } + } + + token.cancel(); + let _ = handle.await; + }); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_dep_chain_submit, + bench_dep_chain_dispatch, + bench_dep_fan_in_dispatch, +); +criterion_main!(benches); diff --git a/benches/groups.rs b/benches/groups.rs new file mode 100644 index 0000000..c522df6 --- /dev/null +++ b/benches/groups.rs @@ -0,0 +1,162 @@ +//! Benchmarks for per-group concurrency gate checks. +//! +//! Run with: `cargo bench --bench groups` + +use std::sync::Arc; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use taskmill::{ + Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore, + TaskSubmission, +}; +use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; + +struct NoopExecutor; + +impl TaskExecutor for NoopExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) + } +} + +async fn dispatch_all(sched: &Scheduler, expected: usize) { + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + let mut completed = 0; + while completed < expected { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } + } + + token.cancel(); + let _ = handle.await; +} + +// ── Benchmarks ────────────────────────────────────────────────────── + +/// Baseline: 500 tasks with no group assignment. +/// The gate skips the group check entirely. +fn bench_dispatch_no_groups(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("dispatch_no_groups_500", |b| { + b.to_async(&rt).iter(|| async { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..500usize { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("ng-{i}"))) + .await + .unwrap(); + } + + dispatch_all(&sched, 500).await; + }); + }); +} + +/// 500 tasks all in a single group with a high limit (no throttling). +/// The gate performs a group-map lookup on every dispatch. +fn bench_dispatch_one_group(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("dispatch_one_group_500", |b| { + b.to_async(&rt).iter(|| async { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(8) + .group_concurrency("g0", 500) // high limit — no artificial throttling + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..500usize { + sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("og-{i}")) + .group("g0"), + ) + .await + .unwrap(); + } + + dispatch_all(&sched, 500).await; + }); + }); +} + +/// Gate check overhead as the number of tracked groups grows. +/// 500 tasks spread evenly across N groups, each with a high per-group limit. +/// Measures how group-map size affects per-dispatch gate latency. +fn bench_dispatch_group_scaling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("dispatch_group_scaling"); + + for n_groups in [1usize, 10, 50, 100] { + group.bench_with_input( + BenchmarkId::from_parameter(n_groups), + &n_groups, + |b, &n_groups| { + b.to_async(&rt).iter(|| async move { + let tasks_per_group = 500 / n_groups; + let total = tasks_per_group * n_groups; + + let mut builder = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)); + + // Register all groups up front so the gate map is fully populated. + for g in 0..n_groups { + builder = builder.group_concurrency(format!("grp-{g}"), 500); + } + + let sched = builder.build().await.unwrap(); + + for g in 0..n_groups { + for t in 0..tasks_per_group { + sched + .submit( + &TaskSubmission::new("bench::test") + .key(format!("mg-{g}-{t}")) + .group(format!("grp-{g}")), + ) + .await + .unwrap(); + } + } + + dispatch_all(&sched, total).await; + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_dispatch_no_groups, + bench_dispatch_one_group, + bench_dispatch_group_scaling, +); +criterion_main!(benches); diff --git a/benches/history.rs b/benches/history.rs new file mode 100644 index 0000000..d9a859d --- /dev/null +++ b/benches/history.rs @@ -0,0 +1,148 @@ +//! Benchmarks for history table queries and aggregate stats. +//! +//! Run with: `cargo bench --bench history` + +use std::sync::Arc; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use taskmill::{ + Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore, + TaskSubmission, +}; +use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; + +struct NoopExecutor; + +impl TaskExecutor for NoopExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) + } +} + +/// Build a scheduler and run `n` noop tasks to completion, populating history. +async fn build_scheduler_with_history(n: usize) -> Scheduler { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(32) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..n { + sched + .submit(&TaskSubmission::new("bench::test").key(format!("h-{i}"))) + .await + .unwrap(); + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + tokio::spawn(async move { sched_clone.run(token_clone).await }); + + let mut completed = 0; + while completed < n { + if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { + completed += 1; + } + } + + token.cancel(); + sched +} + +// ── Benchmarks ────────────────────────────────────────────────────── + +/// Paginated recent-history query at varying history table sizes. +fn bench_history_query(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("history_query"); + group.sample_size(20); + + for history_size in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(history_size), + &history_size, + |b, &history_size| { + b.to_async(&rt).iter_custom(|iters| async move { + let sched = build_scheduler_with_history(history_size).await; + let store = sched.store(); + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history(50, 0).await.unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +/// Aggregate stats query (`COUNT`, `AVG` duration and IO) at varying history sizes. +fn bench_history_stats(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("history_stats"); + group.sample_size(20); + + for history_size in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(history_size), + &history_size, + |b, &history_size| { + b.to_async(&rt).iter_custom(|iters| async move { + let sched = build_scheduler_with_history(history_size).await; + let store = sched.store(); + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history_stats("bench::test").await.unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +/// Filter-by-type history query at varying history sizes. +fn bench_history_by_type(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("history_by_type"); + group.sample_size(20); + + for history_size in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(history_size), + &history_size, + |b, &history_size| { + b.to_async(&rt).iter_custom(|iters| async move { + let sched = build_scheduler_with_history(history_size).await; + let store = sched.store(); + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.history_by_type("bench::test", 100).await.unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_history_query, + bench_history_stats, + bench_history_by_type, +); +criterion_main!(benches); diff --git a/benches/retry.rs b/benches/retry.rs new file mode 100644 index 0000000..ef7074e --- /dev/null +++ b/benches/retry.rs @@ -0,0 +1,228 @@ +//! Benchmarks for retry and backoff strategies. +//! +//! Run with: `cargo bench --bench retry` + +use std::sync::Arc; +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use taskmill::{ + BackoffStrategy, Module, RetryPolicy, Scheduler, SchedulerEvent, TaskContext, TaskError, + TaskExecutor, TaskStore, TaskSubmission, +}; +use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; + +// ── Executors ─────────────────────────────────────────────────────── + +struct FailPermanentExecutor; + +impl TaskExecutor for FailPermanentExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Err(TaskError::permanent("bench: permanent failure")) + } +} + +struct FailRetryableExecutor; + +impl TaskExecutor for FailRetryableExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Err(TaskError::retryable("bench: transient failure")) + } +} + +// ── Benchmarks ────────────────────────────────────────────────────── + +/// Pure math: compute backoff delay for each strategy at 20 consecutive retry counts. +/// No scheduler involved — isolates the computation cost of `delay_for`. +fn bench_backoff_delay_computation(c: &mut Criterion) { + let strategies: &[(&str, BackoffStrategy)] = &[ + ( + "constant", + BackoffStrategy::Constant { + delay: Duration::from_millis(100), + }, + ), + ( + "linear", + BackoffStrategy::Linear { + initial: Duration::from_millis(100), + increment: Duration::from_millis(50), + max: Duration::from_secs(30), + }, + ), + ( + "exponential", + BackoffStrategy::Exponential { + initial: Duration::from_millis(100), + max: Duration::from_secs(60), + multiplier: 2.0, + }, + ), + ( + "exponential_jitter", + BackoffStrategy::ExponentialJitter { + initial: Duration::from_millis(100), + max: Duration::from_secs(60), + multiplier: 2.0, + }, + ), + ]; + + let mut group = c.benchmark_group("backoff_delay"); + for (name, strategy) in strategies { + group.bench_with_input( + BenchmarkId::from_parameter(name), + strategy, + |b, strategy| { + b.iter(|| { + for retry in 0..20i32 { + black_box(strategy.delay_for(black_box(retry))); + } + }); + }, + ); + } + group.finish(); +} + +/// E2E: permanent (non-retryable) failure path. +/// 500 tasks each fail immediately with a permanent error and move to history. +fn bench_dispatch_permanent_failure(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("dispatch_permanent_failure_500", |b| { + b.to_async(&rt).iter(|| async { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("fail", Arc::new(FailPermanentExecutor))) + .max_concurrency(8) + .max_retries(0) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..500 { + sched + .submit(&TaskSubmission::new("bench::fail").key(format!("pf-{i}"))) + .await + .unwrap(); + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); + + let mut terminal = 0; + while terminal < 500 { + if let Ok(SchedulerEvent::Failed { will_retry: false, .. }) = rx.recv().await { + terminal += 1; + } + } + + token.cancel(); + let _ = handle.await; + }); + }); +} + +/// E2E: retryable failure path across all 4 backoff strategies. +/// 100 tasks × (2 retries + 1 initial attempt) = 300 executor calls before dead-lettering. +/// All strategies use zero delay so the bench does not wait on timers. +fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("retryable_dead_letter"); + + let strategies: &[(&str, BackoffStrategy)] = &[ + ("constant", BackoffStrategy::Constant { delay: Duration::ZERO }), + ( + "linear", + BackoffStrategy::Linear { + initial: Duration::ZERO, + increment: Duration::ZERO, + max: Duration::ZERO, + }, + ), + ( + "exponential", + BackoffStrategy::Exponential { + initial: Duration::ZERO, + max: Duration::ZERO, + multiplier: 2.0, + }, + ), + ( + "exponential_jitter", + BackoffStrategy::ExponentialJitter { + initial: Duration::ZERO, + max: Duration::ZERO, + multiplier: 2.0, + }, + ), + ]; + + for (name, strategy) in strategies { + let policy = RetryPolicy { + strategy: strategy.clone(), + max_retries: 2, + }; + group.bench_function(*name, |b| { + b.to_async(&rt).iter(|| { + let policy = policy.clone(); + async move { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module( + Module::new("bench").executor_with_retry_policy( + "fail", + Arc::new(FailRetryableExecutor), + policy, + ), + ) + .max_concurrency(8) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..100 { + sched + .submit(&TaskSubmission::new("bench::fail").key(format!("rf-{i}"))) + .await + .unwrap(); + } + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = + tokio::spawn(async move { sched_clone.run(token_clone).await }); + + let mut dead_lettered = 0; + while dead_lettered < 100 { + if let Ok(SchedulerEvent::DeadLettered { .. }) = rx.recv().await { + dead_lettered += 1; + } + } + + token.cancel(); + let _ = handle.await; + } + }); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_backoff_delay_computation, + bench_dispatch_permanent_failure, + bench_dispatch_retryable_dead_letter, +); +criterion_main!(benches); diff --git a/benches/tags.rs b/benches/tags.rs new file mode 100644 index 0000000..3e66d98 --- /dev/null +++ b/benches/tags.rs @@ -0,0 +1,176 @@ +//! Benchmarks for task metadata tags: submission cost and query performance. +//! +//! Run with: `cargo bench --bench tags` + +use std::sync::Arc; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use taskmill::{Module, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission}; +use tokio::runtime::Runtime; + +struct NoopExecutor; + +impl TaskExecutor for NoopExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) + } +} + +/// Pre-populate a store with `n` tasks, each carrying `("bucket", "b0")`. +async fn store_with_tagged_tasks(n: usize) -> TaskStore { + let store = TaskStore::open_memory().await.unwrap(); + for i in 0..n { + store + .submit( + &TaskSubmission::new("bench::test") + .key(format!("tq-{i}")) + .tag("bucket", "b0"), + ) + .await + .unwrap(); + } + store +} + +// ── Benchmarks ────────────────────────────────────────────────────── + +/// Submission cost at varying tag counts per task. +/// Measures the write overhead of tag insertion and validation. +fn bench_submit_with_tags(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("submit_with_tags"); + + for tag_count in [0usize, 5, 10, 20] { + group.bench_with_input( + BenchmarkId::from_parameter(tag_count), + &tag_count, + |b, &tag_count| { + b.to_async(&rt).iter(|| async move { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .module(Module::new("bench").executor("test", Arc::new(NoopExecutor))) + .max_concurrency(4) + .poll_interval(Duration::from_millis(10)) + .build() + .await + .unwrap(); + + for i in 0..500 { + let mut sub = + TaskSubmission::new("bench::test").key(format!("st-{i}")); + for t in 0..tag_count { + sub = sub.tag(format!("key-{t}"), format!("val-{i}-{t}")); + } + sched.submit(&sub).await.unwrap(); + } + }); + }, + ); + } + + group.finish(); +} + +/// `tasks_by_tags` with a single filter at varying queue depths. +/// All tasks match the filter, so result size equals queue depth. +fn bench_query_by_tags(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("query_by_tags"); + + for queue_depth in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(queue_depth), + &queue_depth, + |b, &queue_depth| { + b.to_async(&rt).iter_custom(|iters| async move { + let store = store_with_tagged_tasks(queue_depth).await; + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store + .tasks_by_tags(&[("bucket", "b0")], None) + .await + .unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +/// `count_by_tags` with a single filter at varying queue depths. +/// Aggregation query — does not fetch rows, only counts. +fn bench_count_by_tags(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("count_by_tags"); + + for queue_depth in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(queue_depth), + &queue_depth, + |b, &queue_depth| { + b.to_async(&rt).iter_custom(|iters| async move { + let store = store_with_tagged_tasks(queue_depth).await; + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store + .count_by_tags(&[("bucket", "b0")], None) + .await + .unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +/// `tag_values` distinct-value scan at varying queue depths. +/// Tasks are spread across 10 bucket values to exercise the GROUP BY path. +fn bench_tag_values_scan(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("tag_values"); + + for queue_depth in [100usize, 1000, 5000] { + group.bench_with_input( + BenchmarkId::from_parameter(queue_depth), + &queue_depth, + |b, &queue_depth| { + b.to_async(&rt).iter_custom(|iters| async move { + let store = TaskStore::open_memory().await.unwrap(); + for i in 0..queue_depth { + store + .submit( + &TaskSubmission::new("bench::test") + .key(format!("tv-{i}")) + .tag("bucket", format!("b{}", i % 10)), + ) + .await + .unwrap(); + } + let start = std::time::Instant::now(); + for _ in 0..iters { + let _ = store.tag_values("bucket").await.unwrap(); + } + start.elapsed() + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_submit_with_tags, + bench_query_by_tags, + bench_count_by_tags, + bench_tag_values_scan, +); +criterion_main!(benches); From c477df8dd5e8e6ae54f113bc0e3d58c1775cf8e6 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Wed, 18 Mar 2026 03:36:43 -0700 Subject: [PATCH 2/4] docs: add multi-module guides and improve API documentation Add two new guides (multi-module-apps.md, library-modules.md) and update existing docs to address gaps in cross-module documentation: qualified task type names, cross-module dependencies, module pause/resume, scheduler.modules()/active_tasks(), cancel_all() semantics, DuplicateStrategy scoping, module concurrency gating, and new glossary terms. Improve panic messages on Scheduler::module() and ctx.module(). --- docs/design.md | 15 ++ docs/glossary.md | 4 + docs/io-and-backpressure.md | 7 +- docs/library-modules.md | 347 ++++++++++++++++++++++++ docs/migrating-to-0.4.md | 2 +- docs/multi-module-apps.md | 426 ++++++++++++++++++++++++++++++ docs/persistence-and-recovery.md | 4 +- docs/priorities-and-preemption.md | 49 ++++ docs/query-apis.md | 57 +++- docs/quick-start.md | 28 +- src/module.rs | 16 +- src/registry/context.rs | 2 +- src/scheduler/mod.rs | 2 +- 13 files changed, 947 insertions(+), 12 deletions(-) create mode 100644 docs/library-modules.md create mode 100644 docs/multi-module-apps.md diff --git a/docs/design.md b/docs/design.md index 99fbb53..4edad11 100644 --- a/docs/design.md +++ b/docs/design.md @@ -127,6 +127,21 @@ Taskmill is designed for concurrent access from multiple async tasks and Tauri c Each spawned task gets its own `CancellationToken`. All trait objects require `Send + Sync + 'static`. +### Module concurrency gating + +Per-module concurrency is enforced in the dispatch gate alongside the global concurrency check. Each module has two atomic counters: + +| Component | Type | Where | Purpose | +|-----------|------|-------|---------| +| `module_caps` | `RwLock>` | `SchedulerInner` | Per-module concurrency cap. Initialized from `Module::max_concurrency` at build time. Updated at runtime by `ModuleHandle::set_max_concurrency`. | +| `module_running` | `Arc>` | `SchedulerInner` | Live count of running tasks per module. Incremented when a task is dispatched; decremented on every terminal transition (complete, fail, cancel, pause). Shared with spawned tasks via `Arc`. | + +A task blocked on its *module* concurrency limit does **not** block dispatch for other modules — the scheduler moves on to the next candidate in the priority queue. The dispatch gate checks are AND-gates: both the global `max_concurrency` and the per-module cap must have headroom. + +The module is identified at dispatch time by extracting the prefix from the qualified task type (e.g., `"media"` from `"media::thumbnail"`). + +Per-module pause uses a separate `HashMap` (`module_paused`). When a module is paused, the dispatch loop skips candidates whose task type matches that module's prefix. + ## Extension points Taskmill is designed to be extended without forking: diff --git a/docs/glossary.md b/docs/glossary.md index f078ce1..fea454e 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -30,3 +30,7 @@ Quick reference for terms used throughout the taskmill documentation. | **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads, priorities, and IO budgets instead of stringly-typed submissions. Register with `Module::typed_executor::()` and submit with `handle.submit_typed(&task)`. See [Quick Start](quick-start.md#typed-tasks). | | **SubmitBuilder** | The fluent builder returned by `ModuleHandle::submit()` and `ModuleHandle::submit_typed()`. Implements `IntoFuture` so bare `.await` applies all defaults; chain override methods (`.priority()`, `.run_after()`, `.parent()`, etc.) before `.await` to override individual fields for that call only. | | **Module-scoped state** | Application state registered on a `Module` via `.app_state()`, visible only to executors within that module. `TaskContext::state::()` checks module state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). | +| **Qualified task type** | The full database-stored task type including the module prefix, e.g. `"media::thumbnail"`. Required when using store-level query APIs (`history_stats`, `task_lookup`, `avg_throughput`). `ModuleHandle` methods apply the prefix automatically, so you typically only need the short form when submitting tasks. | +| **Cross-module dependency** | A dependency edge where the dependent task and its prerequisite belong to different modules. Functionally identical to same-module dependencies — the module boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies). | +| **Module starvation** | A condition where one module's tasks are never dispatched because higher-priority tasks from other modules continuously consume available concurrency slots. Priority ordering is global across all modules. Mitigated by assigning appropriate priorities and using group concurrency to reserve capacity. See [Multi-Module Applications](multi-module-apps.md#module-starvation-understanding-priority-competition). | +| **Late-binding state** | Application state injected into the scheduler after `build()` via `scheduler.register_state()`. Useful for library crates that receive a pre-built `Scheduler` as a dependency. Must be called before `scheduler.run()` — calling it after tasks are dispatching has no ordering guarantees with in-flight executors. | diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 1e12f79..e7887d8 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -63,11 +63,12 @@ Use store queries to see how your tasks actually perform and refine future estim ```rust let store = scheduler.store(); -// Average throughput for a task type (from recent completions) -let (avg_read_bps, avg_write_bps) = store.avg_throughput("thumbnail", 20).await?; +// Average throughput for a task type (from recent completions). +// Note: use the qualified name including the module prefix. +let (avg_read_bps, avg_write_bps) = store.avg_throughput("media::thumbnail", 20).await?; // Aggregate stats: count, avg duration, avg IO, failure rate -let stats = store.history_stats("thumbnail").await?; +let stats = store.history_stats("media::thumbnail").await?; ``` ## Resource monitoring diff --git a/docs/library-modules.md b/docs/library-modules.md new file mode 100644 index 0000000..e8348c5 --- /dev/null +++ b/docs/library-modules.md @@ -0,0 +1,347 @@ +# Writing a Reusable Module + +This guide covers how to package a taskmill module as a standalone Rust crate that other applications can pull in as a dependency. A library module owns its executors, typed tasks, and scoped state — the host application registers it with `Scheduler::builder().module(...)` and everything works without further wiring. + +## Design goals for library modules + +A good library module is: + +- **Self-contained.** It brings its own executors, task types, defaults, and state. The host should not need to know internal details. +- **Conflict-free.** It uses a unique module name that avoids collisions with the host's modules or other libraries. +- **Decoupled from host state.** Executors only access module-scoped state. They never reach into the host's global state map for types the host did not explicitly promise. +- **Testable in isolation.** The module can be exercised against an in-memory store without a full application scaffold. + +## Naming: avoid conflicts, export a constant + +Module names are global within a scheduler. If two `.module()` calls use the same name, `build()` returns an error. Avoid generic names like `"upload"` or `"sync"` — prefix with your crate or organization name. + +Export the name as a constant so callers can reference it without string duplication: + +```rust +/// Module name for the acme-cdn crate. +pub const MODULE_NAME: &str = "acme-cdn"; +``` + +Task types are automatically prefixed at build time. A task type `"upload"` in the `"acme-cdn"` module is stored in the database as `"acme-cdn::upload"`. Callers never need to construct the prefixed form themselves — the `ModuleHandle` does it. + +## What to export from your crate + +A minimal library module exports: + +1. **A module constructor** that returns a `Module`. +2. **A module name constant** for the host to look up the handle. +3. **Typed task structs** so the host can submit tasks. +4. **A configuration struct** if the module needs runtime settings. + +```rust +use std::sync::Arc; +use std::time::Duration; +use serde::{Serialize, Deserialize}; +use taskmill::{ + Module, TypedTask, IoBudget, Priority, RetryPolicy, BackoffStrategy, +}; + +// ── Public API ────────────────────────────────────────────────── + +pub const MODULE_NAME: &str = "acme-cdn"; + +/// Configuration for the acme-cdn module. +pub struct AcmeCdnConfig { + pub endpoint: String, + pub api_key: String, + pub max_upload_concurrency: usize, +} + +/// Upload a file to the CDN. +#[derive(Serialize, Deserialize)] +pub struct UploadTask { + pub file_id: String, + pub path: String, + pub size: u64, +} + +impl TypedTask for UploadTask { + const TASK_TYPE: &'static str = "upload"; + + fn expected_io(&self) -> IoBudget { + IoBudget::net(0, self.size as i64) + } + + fn priority(&self) -> Priority { Priority::NORMAL } + + fn key(&self) -> Option { + Some(self.file_id.clone()) + } + + fn label(&self) -> Option { + Some(format!("Upload {}", self.file_id)) + } +} + +/// Purge a file from the CDN edge cache. +#[derive(Serialize, Deserialize)] +pub struct PurgeTask { + pub file_id: String, +} + +impl TypedTask for PurgeTask { + const TASK_TYPE: &'static str = "purge"; + fn key(&self) -> Option { Some(self.file_id.clone()) } +} + +/// Build the acme-cdn module. Call this from the host's scheduler builder. +pub fn acme_cdn_module(config: AcmeCdnConfig) -> Module { + Module::new(MODULE_NAME) + .typed_executor::(Arc::new(UploadExecutor)) + .typed_executor::(Arc::new(PurgeExecutor)) + .max_concurrency(config.max_upload_concurrency) + .default_retry_policy(RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: Duration::from_secs(2), + max: Duration::from_secs(120), + multiplier: 2.0, + }, + max_retries: 5, + }) + .default_tag("provider", "acme-cdn") + .app_state(config) +} + +// ── Internal (not exported) ───────────────────────────────────── + +struct UploadExecutor; +struct PurgeExecutor; +// ... impl TaskExecutor for each ... +``` + +The host wires it in one line: + +```rust +let scheduler = Scheduler::builder() + .store_path("app.db") + .module(acme_cdn_module(AcmeCdnConfig { + endpoint: "https://cdn.acme.io".into(), + api_key: std::env::var("ACME_KEY").unwrap(), + max_upload_concurrency: 4, + })) + .build() + .await?; + +let cdn = scheduler.module(acme_cdn::MODULE_NAME); +cdn.submit_typed(&UploadTask { + file_id: "abc123".into(), + path: "/data/photo.jpg".into(), + size: 2_000_000, +}).await?; +``` + +## Module state vs. required global state + +### The anti-pattern: reaching into host state + +Executors can access any type via `ctx.state::()`. It is tempting to grab a database pool or HTTP client that the host registered globally: + +```rust +// BAD: invisible coupling to the host's global state +impl TaskExecutor for UploadExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { + let db = ctx.state::().expect("host must register AppDb"); + // ... + } +} +``` + +This compiles, but it means the library silently requires the host to register `AppDb` as global state. Nothing in the type system enforces it. If the host forgets, the executor panics at runtime. + +### The pattern: inject dependencies via module constructor + +Pass everything the module needs through its constructor and register it as module-scoped state: + +```rust +pub fn acme_cdn_module(config: AcmeCdnConfig) -> Module { + Module::new(MODULE_NAME) + .typed_executor::(Arc::new(UploadExecutor)) + .app_state(config) // module-scoped — only visible to this module's executors + // ... +} +``` + +Inside the executor: + +```rust +impl TaskExecutor for UploadExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { + let config = ctx.state::() + .expect("AcmeCdnConfig is registered by acme_cdn_module()"); + // safe — this module always registers its own config + } +} +``` + +`ctx.state::()` checks module-scoped state first, then falls back to global state. Because the module constructor registers `AcmeCdnConfig` via `Module::app_state()`, the executor always finds it regardless of what the host does with global state. + +## Exposing a typed handle wrapper + +For a polished library API, wrap `ModuleHandle` in a domain-specific struct so callers don't need to know internal task type names: + +```rust +/// Typed handle for submitting CDN tasks. +pub struct CdnHandle { + inner: taskmill::ModuleHandle, +} + +impl CdnHandle { + /// Get the CDN handle from a scheduler. + /// + /// Panics if the acme-cdn module was not registered. + pub fn from_scheduler(scheduler: &taskmill::Scheduler) -> Self { + Self { + inner: scheduler.module(MODULE_NAME), + } + } + + /// Get the CDN handle, returning `None` if the module is not registered. + pub fn try_from_scheduler(scheduler: &taskmill::Scheduler) -> Option { + scheduler.try_module(MODULE_NAME).map(|h| Self { inner: h }) + } + + /// Upload a file to the CDN. + pub fn upload(&self, task: &UploadTask) -> taskmill::SubmitBuilder { + self.inner.submit_typed(task) + } + + /// Purge a file from the CDN edge cache. + pub fn purge(&self, task: &PurgeTask) -> taskmill::SubmitBuilder { + self.inner.submit_typed(task) + } + + /// Subscribe to CDN module events only. + pub fn subscribe(&self) -> taskmill::ModuleReceiver { + self.inner.subscribe() + } +} +``` + +The host uses domain methods instead of generic `submit_typed`: + +```rust +let cdn = CdnHandle::from_scheduler(&scheduler); +cdn.upload(&UploadTask { file_id: "abc".into(), path: "/f.jpg".into(), size: 1024 }).await?; +``` + +## Late-binding state injection + +Sometimes a library module needs state that is only available after the scheduler is built — for example, a shared HTTP client pool created during application startup. + +Use `scheduler.register_state()` to inject global state after `build()` but before `run()`: + +```rust +let scheduler = Scheduler::builder() + .store_path("app.db") + .module(acme_cdn_module(config)) + .build() + .await?; + +// State created after build — perhaps from another init step. +let http_pool = Arc::new(HttpPool::new()); +scheduler.register_state(http_pool).await; + +// Now start the scheduler. +let token = CancellationToken::new(); +scheduler.run(token).await; +``` + +The executor accesses it through the normal `ctx.state::()` path. Since `register_state` writes to global state, all modules can see it. + +**Race condition warning:** `register_state()` is safe to call before `scheduler.run()`. After `run()` starts dispatching tasks, there are no ordering guarantees with in-flight executors. An executor that runs before the state is registered will see `None` from `ctx.state::()`. Always register late-binding state before calling `run()`. + +## Handling scheduler.try_module() for optional integration + +If your library provides optional integration with another module, use `try_module()` to avoid panicking when it is not registered: + +```rust +impl TaskExecutor for UploadExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { + // Core upload logic... + do_upload(ctx).await?; + + // Optional: notify analytics module if present + if let Some(analytics) = ctx.try_module("analytics") { + analytics.submit_typed(&TrackEvent { + action: "cdn_upload".into(), + // ... + }).await.ok(); // best-effort — don't fail the upload + } + + Ok(()) + } +} +``` + +This pattern lets your library cooperate with modules from other crates without requiring them. Document which optional module names your library looks for so the host knows what to wire up. + +## Testing your module in isolation + +`TaskStore::open_memory()` creates an in-memory SQLite database. Combine it with `Scheduler::builder().store()` to test your module without touching the filesystem: + +```rust +#[tokio::test] +async fn upload_task_completes() { + let store = TaskStore::open_memory().await.unwrap(); + + let scheduler = Scheduler::builder() + .store(store) + .module(acme_cdn_module(AcmeCdnConfig { + endpoint: "http://localhost:9999".into(), + api_key: "test-key".into(), + max_upload_concurrency: 2, + })) + .build() + .await + .unwrap(); + + let cdn = scheduler.module(MODULE_NAME); + let mut events = cdn.subscribe(); + + cdn.submit_typed(&UploadTask { + file_id: "test-1".into(), + path: "/tmp/test.jpg".into(), + size: 1024, + }).await.unwrap(); + + let token = CancellationToken::new(); + let sched = scheduler.clone(); + let cancel = token.clone(); + tokio::spawn(async move { sched.run(cancel).await }); + + // Wait for completion event. + let event = tokio::time::timeout( + Duration::from_secs(5), + events.recv(), + ).await.unwrap().unwrap(); + + token.cancel(); +} +``` + +Key points for testing: + +- `TaskStore::open_memory()` gives you a fresh database per test. No cleanup needed. +- Register only your module — no need for the host's modules or global state. +- Spawn `scheduler.run()` in a background task and cancel the token when done. +- Use `cdn.subscribe()` (module-filtered) to receive only your module's events. + +## Checklist before publishing + +- [ ] **Unique module name.** Use an organization or crate prefix (e.g. `"acme-cdn"`, not `"cdn"`). Export it as `pub const MODULE_NAME`. +- [ ] **No global state access.** Executors only use `ctx.state::()` for types registered via `Module::app_state()` in your own module constructor. Document any late-binding state requirements. +- [ ] **Constructor takes config.** All knobs (endpoints, keys, concurrency limits) are parameters of the module constructor function, not hardcoded. +- [ ] **TypedTask for every task type.** Use `typed_executor` over string-based `executor` registration. Export the typed task structs so callers can use `submit_typed`. +- [ ] **Serde derives on task structs.** Both `Serialize` and `Deserialize` — the host needs `Serialize` for submission, your executor needs `Deserialize` for `ctx.payload::()`. +- [ ] **TASK_TYPE uses short form.** `const TASK_TYPE: &'static str = "upload"`, not `"acme-cdn::upload"`. The module prefixes it automatically. +- [ ] **key() returns a stable dedup key.** If your task has a natural identity (file ID, URL), return it from `TypedTask::key()` so duplicate submissions are handled correctly. +- [ ] **Default retry policy set.** Use `Module::default_retry_policy()` with sensible backoff for your use case. +- [ ] **Module concurrency capped.** Set `Module::max_concurrency()` to a reasonable default. The host can adjust at runtime via `ModuleHandle::set_max_concurrency()`. +- [ ] **Tests use open_memory().** No filesystem side effects. Tests should pass in CI without special setup. +- [ ] **Typed handle wrapper (optional).** If your module has more than two task types, consider wrapping `ModuleHandle` in a domain-specific API struct. +- [ ] **Document optional integrations.** If your executors use `ctx.try_module("other")`, list those optional module names in your crate docs. diff --git a/docs/migrating-to-0.4.md b/docs/migrating-to-0.4.md index 038b1c9..f1ce3a2 100644 --- a/docs/migrating-to-0.4.md +++ b/docs/migrating-to-0.4.md @@ -41,7 +41,7 @@ let scheduler = Scheduler::builder() .await?; ``` -At least one `.module()` call is required — `build()` returns an error if no modules are registered. There is no default module. +At least one `.module()` call is required — `build()` returns an error if no modules are registered. There is no default module. Library authors publishing reusable modules should see [Writing a Reusable Module](library-modules.md) for naming conventions, state isolation, and conflict avoidance. ### Task type namespacing diff --git a/docs/multi-module-apps.md b/docs/multi-module-apps.md new file mode 100644 index 0000000..26d89f6 --- /dev/null +++ b/docs/multi-module-apps.md @@ -0,0 +1,426 @@ +# Multi-Module Applications + +Most production applications need more than one module. An upload service has ingestion, processing, and notification stages. A media app has transcoding, thumbnail generation, and CDN sync. This guide covers how to assemble multiple modules on a single `Scheduler` and the interactions you need to understand. + +## When you need multiple modules + +A single module is fine when all your task types share the same defaults (priority, retry policy, concurrency cap, state). Once you have task types with different operational characteristics — or you're pulling in library crates that bring their own modules — you need multiple modules. + +Common reasons: + +- **Different concurrency budgets.** Uploads should run 4-wide; thumbnail generation can run 16-wide. +- **Different retry policies.** API calls need exponential backoff; local file operations retry immediately. +- **Scoped state.** Each module carries its own configuration without polluting a global namespace. +- **Library integration.** Third-party crates register their own modules — you compose them alongside your own. + +## App layout: one module function per feature + +Define each module as a standalone function. This keeps registration clean and makes modules testable in isolation. + +```rust +use std::sync::Arc; +use std::time::Duration; +use taskmill::{Module, Priority, RetryPolicy, BackoffStrategy}; + +pub fn ingest_module(config: IngestConfig) -> Module { + Module::new("ingest") + .typed_executor::(Arc::new(FetchExecutor)) + .typed_executor::(Arc::new(ValidateExecutor)) + .app_state(config) + .max_concurrency(4) + .default_priority(Priority::NORMAL) +} + +pub fn process_module() -> Module { + Module::new("process") + .typed_executor::(Arc::new(TranscodeExecutor)) + .typed_executor::(Arc::new(ThumbnailExecutor)) + .max_concurrency(8) + .default_priority(Priority::BACKGROUND) +} + +pub fn notify_module(config: NotifyConfig) -> Module { + Module::new("notify") + .typed_executor::(Arc::new(EmailExecutor)) + .app_state(config) + .default_retry_policy(RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: Duration::from_secs(5), + max: Duration::from_secs(300), + multiplier: 2.0, + }, + max_retries: 5, + }) +} +``` + +Register all modules at build time: + +```rust +let scheduler = Scheduler::builder() + .store_path("app.db") + .module(ingest_module(ingest_config)) + .module(process_module()) + .module(notify_module(notify_config)) + .app_state(SharedDb::new()) // global state visible to all modules + .max_concurrency(16) // global cap + .build() + .await?; +``` + +`build()` returns an error if two modules share the same name. Use distinct, descriptive names — see [Writing a Reusable Module](library-modules.md#naming-avoid-conflicts-export-a-constant) for naming guidance. + +## Global state vs. module state — what goes where + +State registered on `SchedulerBuilder::app_state()` is **global** — visible to executors in every module. State registered on `Module::app_state()` is **module-scoped** — visible only to executors in that module. + +`TaskContext::state::()` checks module-scoped state first, then falls back to global state. + +| What | Where to register | Why | +|------|-------------------|-----| +| Database pool, HTTP client | Global (`builder.app_state()`) | Shared infrastructure used by many modules | +| Module-specific config (API keys, bucket names) | Module (`Module::app_state()`) | Only relevant to one module's executors | +| Feature flags, metrics collector | Global | Cross-cutting concerns | + +```rust +// In an executor: +let db = ctx.state::().expect("SharedDb not registered"); // global +let cfg = ctx.state::().expect("IngestConfig not registered"); // module-scoped +``` + +If two modules register the same type `T` as module state, each module's executors see their own instance. The global instance (if any) is shadowed within each module. + +## Sharing the scheduler: Clone and ModuleHandle + +`Scheduler` is `Clone` (via `Arc`) — pass it freely to async tasks, Tauri commands, or API handlers. Grab module handles at startup for convenient access: + +```rust +let scheduler = build_scheduler().await?; + +// Grab handles once — they're Clone too. +let ingest = scheduler.module("ingest"); +let process = scheduler.module("process"); + +// Use from anywhere. +tokio::spawn(async move { + ingest.submit_typed(&FetchTask { url: "...".into() }).await.unwrap(); +}); +``` + +`scheduler.module("name")` panics if the module isn't registered — use it at well-known call sites where a typo is a programming error. For dynamic lookups (e.g., plugin systems), use `scheduler.try_module("name")` which returns `Option`. + +## Cross-module task dependencies + +A task in one module can depend on a task in another module. Cross-module dependencies work identically to same-module dependencies — the module boundary does not affect dependency resolution or failure propagation. + +### The pattern + +Submit a task in module A, capture its ID, then use that ID as a dependency in module B: + +```rust +let ingest = scheduler.module("ingest"); +let process = scheduler.module("process"); + +// Submit in the ingest module. +let outcome = ingest.submit_typed(&FetchTask { + url: source_url.clone(), +}).await?; + +let fetch_id = outcome.id().expect("not a duplicate"); + +// This task in the process module won't start until the fetch completes. +process.submit_typed(&TranscodeTask { + source: source_url, +}) + .depends_on(fetch_id) + .await?; +``` + +### Failure cascade across modules + +If the ingest task fails permanently, its dependents in the process module follow the `DependencyFailurePolicy` — the module boundary is irrelevant. The default policy (`Cancel`) moves the dependent to history as `DependencyFailed` and cascades to further dependents. + +```rust +use taskmill::DependencyFailurePolicy; + +// Run the transcode anyway, even if the fetch failed. +process.submit_typed(&TranscodeTask { source }) + .depends_on(fetch_id) + .on_dependency_failure(DependencyFailurePolicy::Ignore) + .await?; +``` + +### From within an executor + +Executors can submit to other modules via `ctx.module("name")`: + +```rust +impl TaskExecutor for FetchExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { + let data = fetch_remote(&ctx.payload::()?).await?; + + // Submit a follow-up in a different module. + ctx.module("process").submit_typed(&TranscodeTask { + source: data.path, + }).await.map_err(|e| TaskError::permanent(e.to_string()))?; + + Ok(()) + } +} +``` + +Use `ctx.try_module("analytics")` if the target module is optional (e.g., an analytics plugin that may not be registered). + +## Concurrency budgets across modules + +### Global cap and per-module cap as AND-gates + +Both the global `max_concurrency` and per-module `max_concurrency` must have headroom for a task to be dispatched. They are **caps, not reservations** — setting a module's cap to 4 does not guarantee it gets 4 slots. + +``` +Global max_concurrency: 16 + ├── ingest: max_concurrency(4) ← at most 4, but only if global has room + ├── process: max_concurrency(8) ← at most 8, but only if global has room + └── notify: (no cap) ← limited only by the global cap +``` + +A task is dispatched when **all** of these pass: +1. `active_count < global max_concurrency` +2. `module_running_count < module max_concurrency` (if set) +3. `group_running_count < group concurrency limit` (if the task has a group) +4. Backpressure / IO budget check passes + +### Module starvation: understanding priority competition + +A module with only `BACKGROUND`-priority tasks can be indefinitely deferred when other modules continuously submit `NORMAL` work. This is by design — priority ordering is global across all modules. + +If you need guaranteed throughput for a module: +- **Raise the priority** of its most important tasks to `NORMAL` or `HIGH`. +- **Use task groups** with a dedicated concurrency reservation. A group limit acts as a soft floor: tasks in the group bypass the global priority queue as long as the group has available slots. + +### Using group concurrency as a soft floor + +```rust +// Reserve 2 concurrent slots for background sync, even under load. +let scheduler = Scheduler::builder() + .module( + Module::new("sync") + .typed_executor::(Arc::new(SyncExecutor)) + .default_group("sync-reserved") + .default_priority(Priority::BACKGROUND) + ) + .group_concurrency("sync-reserved", 2) + .max_concurrency(16) + .build() + .await?; +``` + +## Coordinating with tags: logical jobs across modules + +Tags let you group tasks that belong to the same logical "job" across multiple modules. This is the idiomatic way to cancel, query, or monitor a pipeline that spans modules. + +### Tagging tasks at submit time + +```rust +let job_id = generate_job_id(); + +scheduler.module("ingest") + .submit_typed(&FetchTask { url: source.clone() }) + .tag("job_id", &job_id) + .await?; + +scheduler.module("process") + .submit_typed(&TranscodeTask { source }) + .tag("job_id", &job_id) + .await?; +``` + +### Querying job progress across modules + +Use `scheduler.modules()` to iterate over all modules and aggregate: + +```rust +let mut total_pending = 0i64; +let mut total_running = 0; +for handle in scheduler.modules() { + let snap = handle.snapshot().await?; + total_pending += snap.pending_count; + total_running += snap.running.len(); +} +``` + +### Bulk-cancelling a job via scheduler.modules() + +```rust +for handle in scheduler.modules() { + handle.cancel_where(|task| { + task.tags.get("job_id").map(String::as_str) == Some(&job_id) + }).await?; +} +``` + +## Module-level pause and resume + +Each module can be independently paused and resumed without affecting other modules. + +```rust +let ingest = scheduler.module("ingest"); + +// Pause — stops new task dispatch for this module. Running tasks are +// interrupted (cancellation token triggered) and moved to paused status. +// Pending tasks are also moved to paused. +ingest.pause().await?; + +// Resume — clears the module pause flag and moves paused tasks back to pending. +ingest.resume().await?; + +// Check state. +assert!(ingest.is_paused()); // after pause() +``` + +### Interaction with global pause + +The scheduler must be globally **unpaused** AND the module must be **unpaused** for dispatch to proceed. Module pause is additive: + +| `scheduler.pause_all()` | `handle.pause()` | Dispatch? | +|--------------------------|-------------------|-----------| +| No | No | Yes | +| No | Yes | No | +| Yes | No | No | +| Yes | Yes | No | + +`handle.resume()` clears the module flag but does **not** override a global pause — database tasks stay `paused` until the global scheduler is also resumed. + +### Use case + +A library module with a background sync feature that the user can toggle from settings: + +```rust +// User toggles sync off in the UI. +scheduler.module("sync").pause().await?; + +// Other modules continue running normally. +scheduler.module("ingest").submit_typed(&task).await?; // still works + +// User turns sync back on. +scheduler.module("sync").resume().await?; +``` + +## Building a cross-module dashboard + +### scheduler.snapshot() vs. per-module snapshot() + +`scheduler.snapshot()` returns a `SchedulerSnapshot` with global aggregates — total running, pending, pressure, and progress across all modules. + +`handle.snapshot()` returns a `ModuleSnapshot` with per-module detail — running tasks, pending count, paused count, progress, and byte-level tracking for that module only. + +For a per-module dashboard, iterate: + +```rust +for handle in scheduler.modules() { + let snap = handle.snapshot().await?; + println!( + "{}: {} running, {} pending, {} paused", + handle.name(), + snap.running.len(), + snap.pending_count, + snap.paused_count, + ); +} +``` + +### scheduler.active_tasks() for a unified running view + +`scheduler.active_tasks()` returns all running tasks from all modules in a single `Vec`. Equivalent to aggregating each module's `active_tasks()`, but more convenient for global views. + +```rust +let running = scheduler.active_tasks().await?; +for task in &running { + println!("[{}] {} (priority {})", task.task_type, task.label, task.priority.value()); +} +``` + +## Error isolation between modules + +Modules share a scheduler but their errors don't interact (beyond explicit dependencies). + +### Per-module dead-letter monitoring + +Each module has its own dead-letter view: + +```rust +let failed = scheduler.module("ingest").dead_letter_tasks(10, 0).await?; +for task in &failed { + println!("[ingest] {} failed: {}", task.label, task.last_error.as_deref().unwrap_or("unknown")); +} +``` + +### Different retry policies per module + +Set retry policies at the module level so each module handles failures appropriately: + +```rust +// API calls: exponential backoff, 5 retries. +Module::new("api") + .default_retry_policy(RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: Duration::from_secs(1), + max: Duration::from_secs(120), + multiplier: 2.0, + }, + max_retries: 5, + }) + +// Local file operations: immediate retry, 3 attempts. +Module::new("files") + .default_retry_policy(RetryPolicy { + strategy: BackoffStrategy::Fixed(Duration::ZERO), + max_retries: 3, + }) +``` + +### Module-filtered events + +Subscribe to events for a single module without filtering the global stream: + +```rust +let mut rx = scheduler.module("ingest").subscribe(); +tokio::spawn(async move { + while let Ok(event) = rx.recv().await { + // Only ingest:: events arrive here. + handle_ingest_event(event).await; + } +}); +``` + +## Testing multi-module setups + +Use `TaskStore::open_memory()` for fast, isolated tests that don't touch the filesystem: + +```rust +use taskmill::{Scheduler, TaskStore, Module}; + +#[tokio::test] +async fn test_cross_module_pipeline() { + let store = TaskStore::open_memory().await.unwrap(); + let scheduler = Scheduler::builder() + .store(store) + .module(ingest_module(test_config())) + .module(process_module()) + .max_concurrency(4) + .build() + .await + .unwrap(); + + let token = CancellationToken::new(); + let sched = scheduler.clone(); + let run_handle = tokio::spawn(async move { sched.run(token.clone()).await }); + + // Submit tasks, assert outcomes... + + token.cancel(); + run_handle.await.unwrap(); +} +``` + +Each test gets its own in-memory database, so tests run in parallel without interference. diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 4f87a22..f918616 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -51,6 +51,8 @@ let sub = TaskSubmission::new("upload") .key("/photos/img.jpg"); // dedup on file path, not full payload ``` +> **Dedup keys include the module prefix.** The key is derived from the *qualified* task type (e.g., `"media::thumbnail"`, not `"thumbnail"`), so `"media::thumbnail"` and `"cdn::thumbnail"` have separate key spaces. A `Supersede` submission in module A will never supersede a task in module B, even if both tasks have the same logical payload. + ### Key lifecycle A key is "occupied" while the task is active — pending, running, paused, waiting, or retrying. When the task completes or permanently fails (moves to history), the key is freed and the same work can be submitted again. @@ -82,7 +84,7 @@ let outcome = scheduler.module("app").submit_batch(batch).await?; ### Looking up tasks by dedup key -Check whether a task has been submitted (or has already completed). In 0.4 the task type in the database is the qualified name: +Check whether a task has been submitted (or has already completed). In 0.4 the task type in the database is the qualified name including the module prefix: ```rust use taskmill::TaskLookup; diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 483854d..4d88411 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -121,6 +121,55 @@ scheduler.remove_group_limit("bucket-prod").await; Group limits are checked *in addition to* `max_concurrency` — a task must pass both the global and group gate to be dispatched. +## Module-level pause and resume + +Individual modules can be paused and resumed independently, without affecting other modules. This is useful for features like a user-togglable sync, or temporarily disabling a module during maintenance. + +```rust +let sync = scheduler.module("sync"); + +// Pause — stops dispatch, interrupts running tasks, moves everything to paused. +let paused_count = sync.pause().await?; + +// Resume — moves paused tasks back to pending, re-enables dispatch. +let resumed_count = sync.resume().await?; +``` + +**What `pause()` does:** +1. Sets the per-module `is_paused` flag (prevents new dispatch). +2. Triggers the cancellation token of running tasks and moves them to `paused` in the database. +3. Moves pending tasks to `paused` in the database. + +**What `resume()` does:** +1. Clears the per-module `is_paused` flag. +2. Moves `paused` tasks back to `pending` (unless the global scheduler is also paused). + +### Interaction with global pause + +The scheduler must be globally unpaused **and** the module must be unpaused for dispatch to proceed. Module pause is additive — `handle.resume()` does not override `scheduler.pause_all()`. + +| Global paused? | Module paused? | Dispatch? | +|----------------|----------------|-----------| +| No | No | Yes | +| No | Yes | No | +| Yes | No | No | +| Yes | Yes | No | + +### Use case: user-togglable features + +```rust +// User disables background sync in settings. +scheduler.module("sync").pause().await?; + +// Other modules continue normally. +scheduler.module("media").submit_typed(&thumb).await?; + +// User re-enables sync. +scheduler.module("sync").resume().await?; +``` + +See [Multi-Module Applications](multi-module-apps.md#module-level-pause-and-resume) for more patterns. + ## Throttle behavior Throttling is independent of preemption. While preemption *interrupts* running tasks, throttling controls whether pending tasks are *dispatched in the first place* based on current system [pressure](glossary.md#backpressure). diff --git a/docs/query-apis.md b/docs/query-apis.md index 196fb99..6554dcd 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -29,6 +29,55 @@ Most queries are available in two places: | `task_by_key(key)` | `Option` | Look up an active task by dedup key. | | `running_io_totals()` | `(i64, i64)` | Sum of expected disk read and write bytes across running tasks. Useful for comparing against system capacity. | +## Module-scoped queries (ModuleHandle) + +These methods are available on `ModuleHandle` and are automatically scoped to the module's task type prefix. + +| Method | Returns | Description | +|--------|---------|-------------| +| `handle.snapshot()` | `ModuleSnapshot` | Running tasks, pending/paused counts, progress, byte-level tracking, and pause state for this module. The primary entry point for per-module dashboards. | +| `handle.active_tasks()` | `Vec` | In-memory running tasks for this module (no DB call). | +| `handle.estimated_progress()` | `Vec` | Extrapolated progress for each running task in this module. | +| `handle.byte_progress()` | `Vec` | Live byte-level progress for running tasks in this module. | +| `handle.dead_letter_tasks(limit, offset)` | `Vec` | Paginated dead-letter (permanently failed) tasks for this module. | +| `handle.tasks_by_tags(filters, status)` | `Vec` | Active tasks in this module matching the given tag filters and optional status. | +| `handle.count_by_tag(key, status)` | `Vec<(String, i64)>` | Tag value counts for a given key within this module. | +| `handle.tag_values(key)` | `Vec<(String, i64)>` | Distinct values for a tag key within this module. | + +## Cross-module operations (Scheduler) + +These methods operate across all modules and are available directly on `Scheduler`. + +| Method | Returns | Description | +|--------|---------|-------------| +| `scheduler.modules()` | `Vec` | All registered module handles in registration order. Use to iterate for cross-cutting operations: cancel by tag, aggregate snapshots, build per-module dashboards. | +| `scheduler.active_tasks()` | `Vec` | Running tasks from all modules combined. Equivalent to aggregating each module's `active_tasks()`. | +| `scheduler.task(id)` | `Option` | Look up any active task by ID, regardless of which module owns it. | +| `scheduler.snapshot()` | `SchedulerSnapshot` | Global aggregates: total running, pending, pressure, progress, and recurring schedules. | + +See [Multi-Module Applications](multi-module-apps.md#building-a-cross-module-dashboard) for dashboard patterns using these APIs. + +## Cancellation + +### Single task + +`handle.cancel(task_id)` cancels one task. Returns `true` if the task was found and cancelled. + +### Bulk cancellation + +| Method | Returns | Description | +|--------|---------|-------------| +| `handle.cancel_all()` | `Vec` | Cancel all tasks belonging to this module. | +| `handle.cancel_where(predicate)` | `Vec` | Cancel module tasks matching a predicate. | + +`cancel_all()` and `cancel_where()` affect tasks in any active status: + +- **Pending** tasks are moved to history as `Cancelled`. +- **Running** tasks have their cancellation token triggered and are moved to history as `Cancelled`. +- **Paused** tasks are cancelled immediately (no executor is running). +- **Waiting** tasks (parents waiting for children) are cancelled, and their children are also cancelled (cascade). +- **Children in other modules** are cancelled if they were linked via `.parent()` from a task in this module. + ## Dependency queries | Method | Returns | Description | @@ -38,6 +87,10 @@ Most queries are available in two places: | `blocked_tasks()` | `Vec` | All tasks currently in `blocked` status, waiting for dependencies. | | `blocked_count()` | `i64` | Count of blocked tasks. Also available in `SchedulerSnapshot::blocked_count`. | +Dependencies work across module boundaries. A task in `"process"` can depend on a task in `"ingest"` — the module boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies) for patterns. + +> **Task type names in queries** — All store-level queries that accept a `task_type` string expect the *qualified* name including the module prefix: `"media::thumbnail"`, not `"thumbnail"`. Use `ModuleHandle` methods where possible — they apply the prefix automatically. + ## History queries | Method | Returns | Description | @@ -47,7 +100,9 @@ Most queries are available in two places: | `history_by_key(key)` | `Vec` | All past runs matching a dedup key. | | `failed_tasks(limit)` | `Vec` | Recent failures with error messages. | -History records include a `status` field that can be `completed`, `failed`, `cancelled`, `superseded`, or `expired`. Filter by status to find expired tasks (e.g., for analytics on TTL effectiveness). +History records include a `status` field that can be `completed`, `failed`, `cancelled`, `superseded`, `expired`, or `dead_letter`. Filter by status to find expired tasks (e.g., for analytics on TTL effectiveness). + +The `history_by_type(task_type)` parameter requires the qualified name including the module prefix — e.g. `"media::thumbnail"`, not `"thumbnail"`. ## Aggregate queries diff --git a/docs/quick-start.md b/docs/quick-start.md index a94cf96..bec4b18 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -289,12 +289,14 @@ let token = CancellationToken::new(); scheduler.run(token).await; ``` -Libraries that receive a pre-built scheduler can still inject global state after construction: +Libraries that receive a pre-built scheduler can still inject global state after construction (call before `scheduler.run()`): ```rust scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; ``` +For applications with more than two modules, or when integrating third-party library modules, see [Multi-Module Applications](multi-module-apps.md) for guidance on cross-module dependencies, concurrency budgets, and coordinated cancellation. + ## Delayed and recurring tasks ### Delayed tasks @@ -397,6 +399,26 @@ media.submit( ).await?; ``` +### Cross-module dependencies + +Dependencies work across module boundaries. A task in one module can depend on a task in another module — the module boundary does not affect dependency resolution or failure propagation. + +```rust +let ingest = scheduler.module("ingest"); +let process = scheduler.module("process"); + +// Submit in the ingest module, capture the ID. +let outcome = ingest.submit_typed(&FetchTask { url: source.clone() }).await?; +let fetch_id = outcome.id().expect("not a duplicate"); + +// This task in the process module won't start until the fetch completes. +process.submit_typed(&TranscodeTask { source }) + .depends_on(fetch_id) + .await?; +``` + +See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies) for more patterns including failure cascades and in-executor cross-module submission. + ### Failure handling By default, if a dependency fails permanently the dependent is cancelled and recorded as `DependencyFailed` in history. Change this per-submission: @@ -458,4 +480,6 @@ Work through the topic guides in order: 4. [Persistence & Recovery](persistence-and-recovery.md) — understand crash safety and deduplication 5. [Configuration](configuration.md) — tune for your workload 6. [Query APIs](query-apis.md) — build dashboards and debug stuck tasks -7. [Design](design.md) — understand the architecture for advanced use +7. [Multi-Module Applications](multi-module-apps.md) — assemble multiple modules, cross-module dependencies, tags, and dashboards +8. [Writing a Reusable Module](library-modules.md) — publish a module as a library crate +9. [Design](design.md) — understand the architecture for advanced use diff --git a/src/module.rs b/src/module.rs index da22fe8..3d2758e 100644 --- a/src/module.rs +++ b/src/module.rs @@ -534,8 +534,20 @@ impl ModuleHandle { /// Cancel all tasks belonging to this module. /// - /// Queries with `task_type LIKE '{prefix}%'` and cancels each task. - /// Returns the IDs that were successfully cancelled. + /// Queries all active tasks with `task_type LIKE '{prefix}%'` and cancels + /// each one. Returns the IDs that were successfully cancelled. + /// + /// **Behavior by task status:** + /// - **Pending** — moved to history as `Cancelled`. + /// - **Running** — cancellation token is triggered; the task is moved to + /// history as `Cancelled` (the executor should check `ctx.token()` and + /// exit cleanly). + /// - **Paused** — cancelled immediately (no executor is running). + /// - **Waiting** (parent with live children) — parent is cancelled, and + /// its children are cascade-cancelled regardless of which module owns them. + /// + /// Children in other modules are cancelled if they were linked via + /// `.parent()` from a task in this module. pub async fn cancel_all(&self) -> Result, StoreError> { let tasks = self .scheduler diff --git a/src/registry/context.rs b/src/registry/context.rs index 1763855..eed2027 100644 --- a/src/registry/context.rs +++ b/src/registry/context.rs @@ -210,7 +210,7 @@ impl TaskContext { /// Panics if `name` is not a registered module or the scheduler has shut down. pub fn module(&self, name: &str) -> ModuleHandle { self.try_module(name) - .unwrap_or_else(|| panic!("module '{name}' is not registered")) + .unwrap_or_else(|| panic!("module '{name}' is not registered — did you forget to add .module(...) to the SchedulerBuilder?")) } /// Returns a scoped handle for the named module, or `None` if the module is diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index edcf835..6051c3b 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -269,7 +269,7 @@ impl Scheduler { /// For dynamic / runtime lookup, use [`try_module`](Self::try_module) instead. pub fn module(&self, name: &str) -> crate::module::ModuleHandle { self.try_module(name) - .unwrap_or_else(|| panic!("module '{name}' is not registered")) + .unwrap_or_else(|| panic!("module '{name}' is not registered — did you forget to add .module(...) to the SchedulerBuilder?")) } /// Get a scoped handle for the named module, returning `None` if it is not registered. From 792727cce2861791a05127cd00d71284b5433ac9 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Wed, 18 Mar 2026 03:37:11 -0700 Subject: [PATCH 3/4] style: format bench files --- benches/retry.rs | 27 ++++++++++++++++----------- benches/tags.rs | 7 ++++--- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/benches/retry.rs b/benches/retry.rs index ef7074e..6dd7a2b 100644 --- a/benches/retry.rs +++ b/benches/retry.rs @@ -118,7 +118,10 @@ fn bench_dispatch_permanent_failure(c: &mut Criterion) { let mut terminal = 0; while terminal < 500 { - if let Ok(SchedulerEvent::Failed { will_retry: false, .. }) = rx.recv().await { + if let Ok(SchedulerEvent::Failed { + will_retry: false, .. + }) = rx.recv().await + { terminal += 1; } } @@ -137,7 +140,12 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { let mut group = c.benchmark_group("retryable_dead_letter"); let strategies: &[(&str, BackoffStrategy)] = &[ - ("constant", BackoffStrategy::Constant { delay: Duration::ZERO }), + ( + "constant", + BackoffStrategy::Constant { + delay: Duration::ZERO, + }, + ), ( "linear", BackoffStrategy::Linear { @@ -175,13 +183,11 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { async move { let sched = Scheduler::builder() .store(TaskStore::open_memory().await.unwrap()) - .module( - Module::new("bench").executor_with_retry_policy( - "fail", - Arc::new(FailRetryableExecutor), - policy, - ), - ) + .module(Module::new("bench").executor_with_retry_policy( + "fail", + Arc::new(FailRetryableExecutor), + policy, + )) .max_concurrency(8) .poll_interval(Duration::from_millis(10)) .build() @@ -199,8 +205,7 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { let token = CancellationToken::new(); let sched_clone = sched.clone(); let token_clone = token.clone(); - let handle = - tokio::spawn(async move { sched_clone.run(token_clone).await }); + let handle = tokio::spawn(async move { sched_clone.run(token_clone).await }); let mut dead_lettered = 0; while dead_lettered < 100 { diff --git a/benches/tags.rs b/benches/tags.rs index 3e66d98..e212374 100644 --- a/benches/tags.rs +++ b/benches/tags.rs @@ -6,7 +6,9 @@ use std::sync::Arc; use std::time::Duration; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use taskmill::{Module, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission}; +use taskmill::{ + Module, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission, +}; use tokio::runtime::Runtime; struct NoopExecutor; @@ -57,8 +59,7 @@ fn bench_submit_with_tags(c: &mut Criterion) { .unwrap(); for i in 0..500 { - let mut sub = - TaskSubmission::new("bench::test").key(format!("st-{i}")); + let mut sub = TaskSubmission::new("bench::test").key(format!("st-{i}")); for t in 0..tag_count { sub = sub.tag(format!("key-{t}"), format!("val-{i}-{t}")); } From bec2a1d60d1c1441a9b2b2039f5cdc334ba81109 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Wed, 18 Mar 2026 03:38:18 -0700 Subject: [PATCH 4/4] ci: add benchmark workflow with PR comparison and timeline Runs Criterion benchmarks on push to main (stores baseline on _benchmarks branch for timeline tracking) and on PRs (compares against cached baseline via critcmp, posts results as PR comment). --- .github/workflows/benchmarks.yml | 107 +++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 .github/workflows/benchmarks.yml diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml new file mode 100644 index 0000000..b0fcd6a --- /dev/null +++ b/.github/workflows/benchmarks.yml @@ -0,0 +1,107 @@ +name: Benchmarks + +on: + push: + branches: [main] + pull_request: + branches: [main] + +permissions: + contents: write + pull-requests: write + +env: + CARGO_TERM_COLOR: always + +jobs: + benchmark: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo + uses: Swatinem/rust-cache@v2 + + - name: Install critcmp + run: cargo install critcmp + + - name: Run benchmarks + run: cargo bench --all-features -- --save-baseline current --output-format bencher 2>/dev/null | tee bench-output.txt + + # ── Push to main: update timeline + cache baseline ── + + - name: Update benchmark timeline + if: github.event_name == 'push' + uses: benchmark-action/github-action-benchmark@v1 + with: + tool: cargo + output-file-path: bench-output.txt + gh-pages-branch: _benchmarks + benchmark-data-dir-path: timeline + auto-push: true + github-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Cache baseline for PR comparison + if: github.event_name == 'push' + run: critcmp --export current > baseline.json + + - name: Save baseline to cache + if: github.event_name == 'push' + uses: actions/cache/save@v4 + with: + path: baseline.json + key: benchmark-baseline-${{ github.sha }} + + # ── Pull request: compare against cached baseline ── + + - name: Restore baseline + if: github.event_name == 'pull_request' + uses: actions/cache/restore@v4 + with: + path: baseline.json + key: benchmark-baseline-${{ github.event.pull_request.base.sha }} + restore-keys: benchmark-baseline- + + - name: Compare benchmarks + if: github.event_name == 'pull_request' + id: compare + run: | + if [ ! -f baseline.json ]; then + echo "::warning::No benchmark baseline found. Merge to main to generate one." + exit 0 + fi + critcmp --export current > pr.json + echo 'result<> $GITHUB_OUTPUT + critcmp baseline.json pr.json >> $GITHUB_OUTPUT + echo 'EOF' >> $GITHUB_OUTPUT + + - name: Find existing comment + if: github.event_name == 'pull_request' && steps.compare.outputs.result + uses: peter-evans/find-comment@v3 + id: find_comment + with: + issue-number: ${{ github.event.pull_request.number }} + comment-author: github-actions[bot] + body-includes: "## Benchmark Comparison" + + - name: Post or update comment + if: github.event_name == 'pull_request' && steps.compare.outputs.result + uses: peter-evans/create-or-update-comment@v4 + with: + issue-number: ${{ github.event.pull_request.number }} + comment-id: ${{ steps.find_comment.outputs.comment-id }} + edit-mode: replace + body: | + ## Benchmark Comparison + +
+ Click to expand + + ``` + ${{ steps.compare.outputs.result }} + ``` + +