Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
/target
Cargo.lock
.devcontainer
plans
.claude
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
fastrand = "2"
sysinfo = { version = "0.33", optional = true }

[dev-dependencies]
Expand Down
82 changes: 81 additions & 1 deletion docs/migrating-to-0.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ match event {
```rust
match event {
SchedulerEvent::Completed(header) => { /* header.task_id, header.label, ... */ }
SchedulerEvent::Failed { header, error, will_retry } => { ... }
SchedulerEvent::Failed { header, error, will_retry, retry_after } => { ... }
SchedulerEvent::Progress { header, percent, message } => { ... }
}

Expand Down Expand Up @@ -116,3 +116,83 @@ let sub = TaskSubmission::from_typed(&task);
```

Remove any `?` operators on `payload_json()` or `from_typed()` calls. Errors are still caught before the task is persisted — they just surface at submit time instead.

## Adaptive retry with configurable backoff

### `SchedulerEvent::Failed` gains `retry_after` field

The `Failed` event variant now includes an optional `retry_after: Option<Duration>` field indicating when the next retry will happen. Update any exhaustive pattern matches:

**Before:**
```rust
SchedulerEvent::Failed { header, error, will_retry } => { ... }
```

**After:**
```rust
SchedulerEvent::Failed { header, error, will_retry, retry_after } => { ... }
// retry_after is Some(duration) when backoff is active, None for immediate retry or permanent failure
```

### New `SchedulerEvent::DeadLettered` variant

A new event variant is emitted when a task exhausts its retries:

```rust
SchedulerEvent::DeadLettered { header, error, retry_count } => {
// Task failed with a retryable error but hit its max_retries limit.
// Use scheduler.retry_dead_letter(header.task_id) to re-submit.
}
```

Add a match arm for this variant if your match is exhaustive.

### `HistoryStatus` gains `DeadLetter` variant

Tasks that exhaust their retries now receive `HistoryStatus::DeadLetter` instead of `HistoryStatus::Failed`. This distinguishes "might succeed if retried" from "permanently broken." Add a match arm for `DeadLetter` in any exhaustive match on `HistoryStatus`.

### `TaskError` gains `retry_after_ms` field

`TaskError` has a new `retry_after_ms: Option<u64>` field. If you construct `TaskError` via struct literals, add `retry_after_ms: None`. The existing constructors (`new`, `retryable`, `permanent`, `cancelled`) are unaffected.

Executors can now signal a retry delay:

```rust
Err(TaskError::retryable("rate limited").retry_after(Duration::from_secs(60)))
```

### New builder methods (non-breaking)

`SchedulerBuilder` gains two new methods for per-type retry policies:

```rust
// Register with a retry policy (backoff strategy + max_retries)
.executor_with_retry_policy("api-call", Arc::new(ApiExecutor), RetryPolicy {
strategy: BackoffStrategy::Exponential {
initial: Duration::from_secs(1),
max: Duration::from_secs(300),
multiplier: 2.0,
},
max_retries: 5,
})

// Register with both TTL and retry policy
.executor_with_options("upload", Arc::new(UploadExecutor),
Some(Duration::from_secs(600)), // TTL
Some(RetryPolicy::default()), // retry policy
)
```

### New dead-letter query and resubmit APIs (non-breaking)

```rust
// Query tasks that exhausted retries
let dead = scheduler.dead_letter_tasks(10, 0).await?;

// Re-submit a dead-lettered task (resets retry count)
scheduler.retry_dead_letter(task_history_id).await?;
```

### Schema migration

Migration `008_retry_backoff.sql` adds a nullable `max_retries INTEGER` column to both `tasks` and `task_history`. Existing tasks read back `max_retries = None` and fall back to the global `SchedulerConfig::max_retries`.
7 changes: 4 additions & 3 deletions docs/progress-and-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ tokio::spawn(async move {
SchedulerEvent::Completed(header) => {
mark_done(header.task_id);
}
SchedulerEvent::Failed { header, error, will_retry } => {
SchedulerEvent::Failed { header, error, will_retry, .. } => {
if !will_retry {
show_error(header.task_id, error);
}
Expand All @@ -79,7 +79,7 @@ tokio::spawn(async move {
|-------|---------------|
| `Dispatched(TaskEventHeader)` | Task picked from queue and executor spawned |
| `Completed(TaskEventHeader)` | Task finished successfully |
| `Failed { header, error, will_retry }` | Task failed — `will_retry` tells you if it's being requeued |
| `Failed { header, error, will_retry, retry_after }` | Task failed — `will_retry` tells you if it's being requeued, `retry_after` is the backoff delay |
| `Preempted(TaskEventHeader)` | Task paused for higher-priority work |
| `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` |
| `Progress { header, percent, message }` | Progress update from executor |
Expand All @@ -88,6 +88,7 @@ tokio::spawn(async move {
| `RecurringSkipped { header, reason }` | A recurring instance was skipped (e.g., pile-up prevention) |
| `RecurringCompleted { header, occurrences }` | A recurring schedule finished all its occurrences |
| `TaskUnblocked { task_id }` | A blocked task's dependencies are all satisfied — it transitions to `pending` |
| `DeadLettered { header, error, retry_count }` | Task exhausted all retries — can be re-submitted via `retry_dead_letter()` |
| `DependencyFailed { task_id, failed_dependency }` | A blocked task was cancelled because a dependency failed permanently |
| `Paused` | Scheduler globally paused via `pause_all()` |
| `Resumed` | Scheduler resumed via `resume_all()` |
Expand All @@ -100,7 +101,7 @@ Task-specific events share a `TaskEventHeader` with `task_id`, `task_type`, `key
|-----------------------|-----------|
| A progress bar | `Progress`, `Completed`, `Failed` |
| An activity log | All events |
| Error alerting | `Failed` where `will_retry` is false |
| Error alerting | `Failed` where `will_retry` is false, `DeadLettered` |
| A "pause/resume" button | `Paused`, `Resumed` |
| Upload status indicators | `Dispatched`, `Progress`, `Completed`, `Failed`, `Preempted` |
| Stale task cleanup UI | `TaskExpired` |
Expand Down
5 changes: 5 additions & 0 deletions migrations/008_retry_backoff.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Per-task max_retries (resolved from per-type policy or global default at submit time).
-- NULL means "use global default" — for backward compatibility with tasks
-- created before this migration.
ALTER TABLE tasks ADD COLUMN max_retries INTEGER;
ALTER TABLE task_history ADD COLUMN max_retries INTEGER;
33 changes: 20 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
//! - Monitors system CPU, disk, and network throughput to adjust concurrency
//! - Supports [composable backpressure](PressureSource) from arbitrary external sources
//! - Preempts lower-priority work when high-priority tasks arrive
//! - [Retries](TaskError::retryable) failed tasks at the same priority level
//! - [Retries](TaskError::retryable) failed tasks with configurable [backoff](BackoffStrategy)
//! ([`Constant`](BackoffStrategy::Constant), [`Linear`](BackoffStrategy::Linear),
//! [`Exponential`](BackoffStrategy::Exponential), [`ExponentialJitter`](BackoffStrategy::ExponentialJitter))
//! and per-type [retry policies](RetryPolicy)
//! - Records completed/failed [task history](TaskHistoryRecord) for queries and IO learning
//! - Supports [batch submission](Scheduler::submit_batch) with intra-batch dedup and chunking
//! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration
Expand All @@ -28,11 +31,11 @@
//!
//! ```text
//! submit → blocked ─(deps met)─→ pending ──────────────→ running → completed
//! ↑ ↓ ↘ paused ↗ ↘ failed (retryable → pending)
//! ↑ ↓ ↘ paused ↗ ↘ failed (retryable → pending, with backoff delay)
//! (run_after elapsed) ↘ failed (permanent → history)
//! │ ↘ cancelled (via cancel() or supersede)
//! pending (gated) ↘ expired (TTL, cascade to children)
//! cancelled
//! │ ↘ dead_letter (retries exhausted → history)
//! pending (gated) ↘ cancelled (via cancel() or supersede)
//! cancelled ↘ expired (TTL, cascade to children)
//! superseded
//! expired (TTL)
//! blocked ─(dep failed)─→ dep_failed (history)
Expand All @@ -49,8 +52,12 @@
//! progress reporter.
//! 4. **Terminal** — on success the task moves to the history table. On failure,
//! a [`retryable`](TaskError::retryable) error requeues it (up to
//! [`SchedulerBuilder::max_retries`]); a non-retryable error moves it to
//! history as failed.
//! [`SchedulerBuilder::max_retries`] or per-type [`RetryPolicy::max_retries`])
//! with a configurable [`BackoffStrategy`] delay; a non-retryable
//! ([`permanent`](TaskError::permanent)) error moves it to history as failed.
//! Tasks that exhaust all retries enter [`dead_letter`](HistoryStatus::DeadLetter)
//! state — queryable and manually re-submittable via
//! [`Scheduler::retry_dead_letter`].
//!
//! ## Deduplication & duplicate strategies
//!
Expand Down Expand Up @@ -282,7 +289,7 @@
//! For long-running transfers (file copies, uploads, downloads), executors can
//! report byte-level progress via [`TaskContext::set_bytes_total`] and
//! [`TaskContext::add_bytes`]. The scheduler maintains per-task atomic counters
//! on the [`IoTracker`](registry::IoTracker) — updates are lock-free and
//! on the `IoTracker` — updates are lock-free and
//! impose no overhead on the executor hot path.
//!
//! When [`SchedulerBuilder::progress_interval`] is set, a background ticker
Expand Down Expand Up @@ -792,11 +799,11 @@ pub use scheduler::{
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
generate_dedup_key, BatchOutcome, BatchSubmission, DependencyFailurePolicy, DuplicateStrategy,
HistoryStatus, IoBudget, ParentResolution, RecurringSchedule, RecurringScheduleInfo,
SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus,
TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK, MAX_TAG_KEY_LEN,
MAX_TAG_VALUE_LEN,
generate_dedup_key, BackoffStrategy, BatchOutcome, BatchSubmission, DependencyFailurePolicy,
DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, RecurringSchedule,
RecurringScheduleInfo, RetryPolicy, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup,
TaskRecord, TaskStatus, TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK,
MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN,
};

#[cfg(feature = "sysinfo-monitor")]
Expand Down
80 changes: 80 additions & 0 deletions src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use crate::task::retry::RetryPolicy;
use crate::task::TaskError;

pub(crate) use child_spawner::{ChildSpawner, ParentContext};
Expand Down Expand Up @@ -104,6 +105,7 @@ pub trait TaskExecutor: Send + Sync + 'static {
pub struct TaskTypeRegistry {
types: HashMap<String, Arc<dyn ErasedExecutor>>,
type_ttls: HashMap<String, std::time::Duration>,
type_retry_policies: HashMap<String, RetryPolicy>,
}

/// Object-safe wrapper around [`TaskExecutor`] for dynamic dispatch in the registry.
Expand Down Expand Up @@ -157,6 +159,7 @@ impl TaskTypeRegistry {
Self {
types: HashMap::new(),
type_ttls: HashMap::new(),
type_retry_policies: HashMap::new(),
}
}

Expand All @@ -183,11 +186,27 @@ impl TaskTypeRegistry {
self.type_ttls.insert(name.to_string(), ttl);
}

/// Register an executor with a per-type retry policy.
pub fn register_with_retry_policy<E: TaskExecutor>(
&mut self,
name: &str,
executor: Arc<E>,
policy: RetryPolicy,
) {
self.register(name, executor);
self.type_retry_policies.insert(name.to_string(), policy);
}

/// Look up the per-type default TTL for a task type.
pub fn type_ttl(&self, name: &str) -> Option<&std::time::Duration> {
self.type_ttls.get(name)
}

/// Look up the per-type retry policy for a task type.
pub fn type_retry_policy(&self, name: &str) -> Option<&RetryPolicy> {
self.type_retry_policies.get(name)
}

/// Look up the executor for a task type.
pub(crate) fn get(&self, name: &str) -> Option<&Arc<dyn ErasedExecutor>> {
self.types.get(name)
Expand Down Expand Up @@ -227,6 +246,22 @@ impl TaskTypeRegistry {
self.register_erased(name, executor);
self.type_ttls.insert(name.to_string(), ttl);
}

/// Set a retry policy for an already-registered task type.
pub(crate) fn set_retry_policy(&mut self, name: &str, policy: RetryPolicy) {
self.type_retry_policies.insert(name.to_string(), policy);
}

/// Register a pre-erased executor with a per-type retry policy.
pub(crate) fn register_erased_with_retry_policy(
&mut self,
name: &str,
executor: Arc<dyn ErasedExecutor>,
policy: RetryPolicy,
) {
self.register_erased(name, executor);
self.type_retry_policies.insert(name.to_string(), policy);
}
}

impl Default for TaskTypeRegistry {
Expand All @@ -238,6 +273,7 @@ impl Default for TaskTypeRegistry {
#[cfg(test)]
mod tests {
use super::*;
use crate::task::retry::{BackoffStrategy, RetryPolicy};

struct NoopExecutor;

Expand All @@ -264,4 +300,48 @@ mod tests {
reg.register("dup", Arc::new(NoopExecutor));
reg.register("dup", Arc::new(NoopExecutor));
}

#[test]
fn register_with_retry_policy_stores_policy() {
let mut reg = TaskTypeRegistry::new();
let policy = RetryPolicy {
strategy: BackoffStrategy::Exponential {
initial: std::time::Duration::from_secs(1),
max: std::time::Duration::from_secs(60),
multiplier: 2.0,
},
max_retries: 5,
};
reg.register_with_retry_policy("api-call", Arc::new(NoopExecutor), policy);

assert!(reg.get("api-call").is_some());
let retrieved = reg.type_retry_policy("api-call").unwrap();
assert_eq!(retrieved.max_retries, 5);
}

#[test]
fn type_retry_policy_returns_none_for_missing() {
let mut reg = TaskTypeRegistry::new();
reg.register("plain", Arc::new(NoopExecutor));

assert!(reg.type_retry_policy("plain").is_none());
assert!(reg.type_retry_policy("nonexistent").is_none());
}

#[test]
fn register_erased_with_retry_policy_stores_policy() {
let mut reg = TaskTypeRegistry::new();
let policy = RetryPolicy {
strategy: BackoffStrategy::Constant {
delay: std::time::Duration::from_secs(10),
},
max_retries: 7,
};
let executor: Arc<dyn ErasedExecutor> = Arc::new(NoopExecutor);
reg.register_erased_with_retry_policy("erased-type", executor, policy);

assert!(reg.get("erased-type").is_some());
let retrieved = reg.type_retry_policy("erased-type").unwrap();
assert_eq!(retrieved.max_retries, 7);
}
}
2 changes: 1 addition & 1 deletion src/resource/network_pressure.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Built-in [`PressureSource`](crate::PressureSource) derived from network bandwidth utilization.
//! Built-in [`PressureSource`] derived from network bandwidth utilization.
//!
//! [`NetworkPressure`] reads the latest [`ResourceSnapshot`](crate::ResourceSnapshot)
//! and computes pressure as the ratio of observed network throughput to a
Expand Down
Loading
Loading