Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ jobs:
- name: Install critcmp
run: cargo install critcmp

- name: Build benchmarks
run: cargo bench --all-features --no-run

- name: Run benchmarks
run: cargo bench --all-features -- --save-baseline current --output-format bencher 2>&1 | tee bench-output.txt
run: cargo bench --all-features -- --save-baseline current --output-format bencher 2>/dev/null | tee bench-output.txt

# ── Push to main: update timeline + cache baseline ──

Expand Down
50 changes: 34 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,36 @@ Read more about the [motivation and use cases](docs/why-taskmill.md).
## Quick example

```rust
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use serde::{Serialize, Deserialize};
use taskmill::{
Module, Scheduler, IoBudget, TaskSubmission, TaskExecutor,
TaskContext, TaskError,
Domain, DomainKey, DomainHandle, Scheduler, TypedExecutor,
TypedTask, TaskTypeConfig, TaskContext, TaskError, IoBudget,
};

// 1. Define a domain and a typed task.
struct Media;
impl DomainKey for Media { const NAME: &'static str = "media"; }

#[derive(Serialize, Deserialize)]
struct Thumbnail { path: String, size: u32 }

impl TypedTask for Thumbnail {
type Domain = Media;
const TASK_TYPE: &'static str = "thumbnail";

fn config() -> TaskTypeConfig {
TaskTypeConfig::new().expected_io(IoBudget::disk(4096, 1024))
}

fn key(&self) -> Option<String> { Some(self.path.clone()) }
}

// 2. Implement a typed executor — no manual deserialization.
struct ThumbnailGenerator;

impl TaskExecutor for ThumbnailGenerator {
async fn execute<'a>(
&'a self, ctx: &'a TaskContext,
) -> Result<(), TaskError> {
impl TypedExecutor<Thumbnail> for ThumbnailGenerator {
async fn execute(&self, thumb: Thumbnail, ctx: &TaskContext) -> Result<(), TaskError> {
ctx.progress().report(0.5, Some("resizing".into()));
ctx.record_read_bytes(4096);
ctx.record_write_bytes(1024);
Expand All @@ -31,21 +48,20 @@ impl TaskExecutor for ThumbnailGenerator {

#[tokio::main]
async fn main() {
// 3. Build the scheduler with a typed domain.
let scheduler = Scheduler::builder()
.store_path("tasks.db")
.module(Module::new("media")
.executor("thumbnail", Arc::new(ThumbnailGenerator)))
.domain(Domain::<Media>::new()
.task::<Thumbnail>(ThumbnailGenerator))
.max_concurrency(8)
.with_resource_monitoring()
.build()
.await
.unwrap();

let media = scheduler.module("media");
let sub = TaskSubmission::new("thumbnail")
.payload_json(&serde_json::json!({"path": "/photos/img.jpg"}))
.expected_io(IoBudget::disk(4096, 1024));
media.submit(sub).await.unwrap();
// 4. Submit via a typed domain handle — compile-time domain enforcement.
let media: DomainHandle<Media> = scheduler.domain::<Media>();
media.submit(Thumbnail { path: "/photos/img.jpg".into(), size: 256 }).await.unwrap();

let token = CancellationToken::new();
scheduler.run(token).await;
Expand Down Expand Up @@ -78,10 +94,12 @@ async fn main() {

| Guide | What it covers |
|-------|----------------|
| [Quick Start](docs/quick-start.md) | Installation, first executor, builder setup, Tauri integration |
| [Quick Start](docs/quick-start.md) | Installation, domains, typed executors, builder setup, Tauri integration |
| [Multi-Module Applications](docs/multi-module-apps.md) | Composing multiple domains, cross-domain patterns, concurrency budgets |
| [Writing a Reusable Module](docs/library-modules.md) | Publishing a domain as a library crate |
| [Priorities & Preemption](docs/priorities-and-preemption.md) | Priority levels, task groups, preemption, and throttle behavior |
| [IO & Backpressure](docs/io-and-backpressure.md) | IO budgets, resource monitoring, pressure sources, and tuning |
| [Progress & Events](docs/progress-and-events.md) | Progress reporting, lifecycle events, dashboard snapshots |
| [Progress & Events](docs/progress-and-events.md) | Progress reporting, lifecycle events, typed event streams |
| [Persistence & Recovery](docs/persistence-and-recovery.md) | Crash recovery, deduplication, history retention |
| [Configuration](docs/configuration.md) | All options, recommended defaults, workload-specific tuning |
| [Query APIs](docs/query-apis.md) | TaskStore queries for dashboards, debugging, and analytics |
Expand Down
10 changes: 7 additions & 3 deletions benches/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
//!
//! Run with: `cargo bench --bench dependencies`

use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
Domain, DomainKey, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

struct NoopExecutor;

impl TaskExecutor for NoopExecutor {
Expand All @@ -24,7 +28,7 @@ impl TaskExecutor for NoopExecutor {
async fn build_scheduler(max_concurrency: usize) -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(max_concurrency)
.poll_interval(Duration::from_millis(10))
.build()
Expand Down
14 changes: 9 additions & 5 deletions benches/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
//!
//! Run with: `cargo bench --bench groups`

use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
Domain, DomainKey, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

struct NoopExecutor;

impl TaskExecutor for NoopExecutor {
Expand Down Expand Up @@ -50,7 +54,7 @@ fn bench_dispatch_no_groups(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(8)
.poll_interval(Duration::from_millis(10))
.build()
Expand Down Expand Up @@ -78,7 +82,7 @@ fn bench_dispatch_one_group(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(8)
.group_concurrency("g0", 500) // high limit — no artificial throttling
.poll_interval(Duration::from_millis(10))
Expand Down Expand Up @@ -120,7 +124,7 @@ fn bench_dispatch_group_scaling(c: &mut Criterion) {

let mut builder = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(8)
.poll_interval(Duration::from_millis(10));

Expand Down
10 changes: 7 additions & 3 deletions benches/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
//!
//! Run with: `cargo bench --bench history`

use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
Domain, DomainKey, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

struct NoopExecutor;

impl TaskExecutor for NoopExecutor {
Expand All @@ -25,7 +29,7 @@ impl TaskExecutor for NoopExecutor {
async fn build_scheduler_with_history(n: usize) -> Scheduler {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(32)
.poll_interval(Duration::from_millis(10))
.build()
Expand Down
24 changes: 15 additions & 9 deletions benches/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
//!
//! Run with: `cargo bench --bench retry`

use std::sync::Arc;
use std::time::Duration;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
BackoffStrategy, Module, RetryPolicy, Scheduler, SchedulerEvent, TaskContext, TaskError,
TaskExecutor, TaskStore, TaskSubmission,
BackoffStrategy, Domain, DomainKey, RetryPolicy, Scheduler, SchedulerEvent, TaskContext,
TaskError, TaskExecutor, TaskStore, TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

// ── Domain Key ─────────────────────────────────────────────────────

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

// ── Executors ───────────────────────────────────────────────────────

struct FailPermanentExecutor;
Expand Down Expand Up @@ -95,7 +101,7 @@ fn bench_dispatch_permanent_failure(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("fail", Arc::new(FailPermanentExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("fail", FailPermanentExecutor))
.max_concurrency(8)
.max_retries(0)
.poll_interval(Duration::from_millis(10))
Expand Down Expand Up @@ -183,11 +189,11 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) {
async move {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor_with_retry_policy(
"fail",
Arc::new(FailRetryableExecutor),
policy,
))
.domain(
Domain::<BenchDomain>::new()
.raw_executor("fail", FailRetryableExecutor)
.default_retry(policy),
)
.max_concurrency(8)
.poll_interval(Duration::from_millis(10))
.build()
Expand Down
27 changes: 16 additions & 11 deletions benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
//!
//! Run with: `cargo bench -p taskmill`

use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Priority, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor, TaskStore,
TaskSubmission,
Domain, DomainKey, Priority, Scheduler, SchedulerEvent, TaskContext, TaskError, TaskExecutor,
TaskStore, TaskSubmission,
};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

// ── Domain Key ─────────────────────────────────────────────────────

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

// ── Test Executors ──────────────────────────────────────────────────

struct NoopExecutor;
Expand Down Expand Up @@ -46,7 +51,7 @@ impl TaskExecutor for ByteProgressExecutor {
async fn build_scheduler(max_concurrency: usize) -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(max_concurrency)
.poll_interval(std::time::Duration::from_millis(10))
.build()
Expand Down Expand Up @@ -301,12 +306,12 @@ fn bench_byte_progress_overhead(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor(
.domain(Domain::<BenchDomain>::new().raw_executor(
"byte-test",
Arc::new(ByteProgressExecutor {
ByteProgressExecutor {
total: 1_048_576,
chunk_size: 1024,
}),
},
))
.max_concurrency(8)
.poll_interval(std::time::Duration::from_millis(10))
Expand Down Expand Up @@ -352,12 +357,12 @@ fn bench_byte_progress_snapshot(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor(
.domain(Domain::<BenchDomain>::new().raw_executor(
"byte-test",
Arc::new(ByteProgressExecutor {
ByteProgressExecutor {
total: 10_485_760,
chunk_size: 65_536,
}),
},
))
.max_concurrency(100)
.poll_interval(std::time::Duration::from_millis(10))
Expand Down
10 changes: 7 additions & 3 deletions benches/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
//!
//! Run with: `cargo bench --bench tags`

use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use taskmill::{
Module, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission,
Domain, DomainKey, Scheduler, TaskContext, TaskError, TaskExecutor, TaskStore, TaskSubmission,
};
use tokio::runtime::Runtime;

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

struct NoopExecutor;

impl TaskExecutor for NoopExecutor {
Expand Down Expand Up @@ -51,7 +55,7 @@ fn bench_submit_with_tags(c: &mut Criterion) {
b.to_async(&rt).iter(|| async move {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.module(Module::new("bench").executor("test", Arc::new(NoopExecutor)))
.domain(Domain::<BenchDomain>::new().raw_executor("test", NoopExecutor))
.max_concurrency(4)
.poll_interval(Duration::from_millis(10))
.build()
Expand Down
Loading
Loading