Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benches/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
#[cfg(feature = "profile")]
use pprof::criterion::{Output, PProfProfiler};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -59,6 +59,7 @@ fn bench_dep_chain_submit(c: &mut Criterion) {
let mut group = c.benchmark_group("dep_chain_submit");

for depth in [10usize, 50, 200] {
group.throughput(Throughput::Elements(depth as u64));
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
Expand Down Expand Up @@ -103,6 +104,7 @@ fn bench_dep_chain_dispatch(c: &mut Criterion) {
group.sample_size(20);

for depth in [10usize, 25, 50] {
group.throughput(Throughput::Elements(depth as u64));
group.bench_with_input(BenchmarkId::from_parameter(depth), &depth, |b, &depth| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
Expand Down Expand Up @@ -168,6 +170,7 @@ fn bench_dep_fan_in_dispatch(c: &mut Criterion) {
group.sample_size(20);

for width in [10usize, 50, 100] {
group.throughput(Throughput::Elements((width + 1) as u64));
group.bench_with_input(BenchmarkId::from_parameter(width), &width, |b, &width| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
Expand Down
13 changes: 10 additions & 3 deletions benches/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskStore,
Expand Down Expand Up @@ -62,7 +62,9 @@ async fn dispatch_all(sched: &Scheduler, expected: usize) {
fn bench_dispatch_no_groups(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("dispatch_no_groups_500", |b| {
let mut group = c.benchmark_group("dispatch_no_groups");
group.throughput(Throughput::Elements(500));
group.bench_function("500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand All @@ -89,14 +91,17 @@ fn bench_dispatch_no_groups(c: &mut Criterion) {
total
});
});
group.finish();
}

/// 500 tasks all in a single group with a high limit (no throttling).
/// The gate performs a group-map lookup on every dispatch.
fn bench_dispatch_one_group(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("dispatch_one_group_500", |b| {
let mut group = c.benchmark_group("dispatch_one_group");
group.throughput(Throughput::Elements(500));
group.bench_function("500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand Down Expand Up @@ -128,6 +133,7 @@ fn bench_dispatch_one_group(c: &mut Criterion) {
total
});
});
group.finish();
}

/// Gate check overhead as the number of tracked groups grows.
Expand All @@ -138,6 +144,7 @@ fn bench_dispatch_group_scaling(c: &mut Criterion) {
let mut group = c.benchmark_group("dispatch_group_scaling");

for n_groups in [1usize, 10, 50, 100] {
group.throughput(Throughput::Elements(500));
group.bench_with_input(
BenchmarkId::from_parameter(n_groups),
&n_groups,
Expand Down
5 changes: 4 additions & 1 deletion benches/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskStore,
Expand Down Expand Up @@ -77,6 +77,7 @@ async fn build_scheduler_with_history(n: usize) -> Scheduler {
fn bench_history_query(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("history_query");
group.throughput(Throughput::Elements(1));
group.sample_size(20);
group.measurement_time(Duration::from_secs(30));

Expand Down Expand Up @@ -109,6 +110,7 @@ fn bench_history_query(c: &mut Criterion) {
fn bench_history_stats(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("history_stats");
group.throughput(Throughput::Elements(1));
group.sample_size(20);
group.measurement_time(Duration::from_secs(30));

Expand Down Expand Up @@ -141,6 +143,7 @@ fn bench_history_stats(c: &mut Criterion) {
fn bench_history_by_type(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("history_by_type");
group.throughput(Throughput::Elements(1));
group.sample_size(20);
group.measurement_time(Duration::from_secs(30));

Expand Down
9 changes: 7 additions & 2 deletions benches/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
BackoffStrategy, Domain, DomainKey, DomainTaskContext, RetryPolicy, Scheduler, SchedulerEvent,
Expand Down Expand Up @@ -94,6 +94,7 @@ fn bench_backoff_delay_computation(c: &mut Criterion) {
];

let mut group = c.benchmark_group("backoff_delay");
group.throughput(Throughput::Elements(20));
for (name, strategy) in strategies {
group.bench_with_input(
BenchmarkId::from_parameter(name),
Expand All @@ -115,7 +116,9 @@ fn bench_backoff_delay_computation(c: &mut Criterion) {
fn bench_dispatch_permanent_failure(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("dispatch_permanent_failure_500", |b| {
let mut group = c.benchmark_group("dispatch_permanent_failure");
group.throughput(Throughput::Elements(500));
group.bench_function("500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand Down Expand Up @@ -160,6 +163,7 @@ fn bench_dispatch_permanent_failure(c: &mut Criterion) {
total
});
});
group.finish();
}

/// E2E: retryable failure path across all 4 backoff strategies.
Expand All @@ -168,6 +172,7 @@ fn bench_dispatch_permanent_failure(c: &mut Criterion) {
fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("retryable_dead_letter");
group.throughput(Throughput::Elements(100));

let strategies: &[(&str, BackoffStrategy)] = &[
(
Expand Down
35 changes: 28 additions & 7 deletions benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Priority, Scheduler, SchedulerEvent, TaskError,
Expand Down Expand Up @@ -91,7 +91,9 @@ async fn build_scheduler(max_concurrency: usize) -> Scheduler {
fn bench_submit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("submit_1000_tasks", |b| {
let mut group = c.benchmark_group("submit_tasks");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand All @@ -108,12 +110,15 @@ fn bench_submit(c: &mut Criterion) {
total
});
});
group.finish();
}

fn bench_submit_dedup_hit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("submit_dedup_hit_1000", |b| {
let mut group = c.benchmark_group("submit_dedup_hit");
group.throughput(Throughput::Elements(999));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand All @@ -136,12 +141,15 @@ fn bench_submit_dedup_hit(c: &mut Criterion) {
total
});
});
group.finish();
}

fn bench_dispatch_and_complete(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("dispatch_and_complete_1000", |b| {
let mut group = c.benchmark_group("dispatch_and_complete");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand Down Expand Up @@ -177,11 +185,13 @@ fn bench_dispatch_and_complete(c: &mut Criterion) {
total
});
});
group.finish();
}

fn bench_peek_next_varying_depth(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("peek_next");
group.throughput(Throughput::Elements(1));

for size in [100, 1000, 5000] {
let store = rt.block_on(async {
Expand Down Expand Up @@ -215,6 +225,7 @@ fn bench_peek_next_varying_depth(c: &mut Criterion) {
fn bench_concurrency_scaling(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("concurrency_scaling");
group.throughput(Throughput::Elements(500));

for concurrency in [1, 2, 4, 8] {
group.bench_with_input(
Expand Down Expand Up @@ -265,7 +276,9 @@ fn bench_concurrency_scaling(c: &mut Criterion) {
fn bench_batch_submit(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("batch_submit_1000", |b| {
let mut group = c.benchmark_group("batch_submit");
group.throughput(Throughput::Elements(1000));
group.bench_function("1000", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand All @@ -280,12 +293,15 @@ fn bench_batch_submit(c: &mut Criterion) {
total
});
});
group.finish();
}

fn bench_mixed_priority_dispatch(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("mixed_priority_dispatch_500", |b| {
let mut group = c.benchmark_group("mixed_priority_dispatch");
group.throughput(Throughput::Elements(500));
group.bench_function("500", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand Down Expand Up @@ -334,11 +350,13 @@ fn bench_mixed_priority_dispatch(c: &mut Criterion) {
total
});
});
group.finish();
}

fn bench_byte_progress_overhead(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("byte_progress");
group.throughput(Throughput::Elements(500));

// Baseline: NoopExecutor (no byte reporting).
group.bench_function("noop_500", |b| {
Expand Down Expand Up @@ -435,7 +453,9 @@ fn bench_byte_progress_overhead(c: &mut Criterion) {
fn bench_byte_progress_snapshot(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("byte_progress_snapshot_100_tasks", |b| {
let mut group = c.benchmark_group("byte_progress_snapshot");
group.throughput(Throughput::Elements(100));
group.bench_function("100_tasks", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let mut total = Duration::ZERO;
for _ in 0..iters {
Expand Down Expand Up @@ -484,6 +504,7 @@ fn bench_byte_progress_snapshot(c: &mut Criterion) {
total
});
});
group.finish();
}

criterion_group!(
Expand Down
6 changes: 5 additions & 1 deletion benches/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::time::{Duration, Instant};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Scheduler, TaskError, TaskStore, TaskSubmission,
Expand Down Expand Up @@ -59,6 +59,7 @@ async fn store_with_tagged_tasks(n: usize) -> TaskStore {
fn bench_submit_with_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("submit_with_tags");
group.throughput(Throughput::Elements(500));

for tag_count in [0usize, 5, 10, 20] {
group.bench_with_input(
Expand Down Expand Up @@ -100,6 +101,7 @@ fn bench_submit_with_tags(c: &mut Criterion) {
fn bench_query_by_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("query_by_tags");
group.throughput(Throughput::Elements(1));

for queue_depth in [100usize, 1000, 5000] {
let store = rt.block_on(store_with_tagged_tasks(queue_depth));
Expand Down Expand Up @@ -133,6 +135,7 @@ fn bench_query_by_tags(c: &mut Criterion) {
fn bench_count_by_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("count_by_tags");
group.throughput(Throughput::Elements(1));

for queue_depth in [100usize, 1000, 5000] {
let store = rt.block_on(store_with_tagged_tasks(queue_depth));
Expand Down Expand Up @@ -166,6 +169,7 @@ fn bench_count_by_tags(c: &mut Criterion) {
fn bench_tag_values_scan(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("tag_values");
group.throughput(Throughput::Elements(1));

for queue_depth in [100usize, 1000, 5000] {
let store = rt.block_on(async {
Expand Down
Loading