diff --git a/.gitignore b/.gitignore index 96ef6c0..3ead7d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ /target Cargo.lock +.devcontainer +plans +.claude \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 1713ba6..6964cac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ chrono = { version = "0.4", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10" +fastrand = "2" sysinfo = { version = "0.33", optional = true } [dev-dependencies] diff --git a/docs/migrating-to-0.4.md b/docs/migrating-to-0.4.md index ecf92ce..779487a 100644 --- a/docs/migrating-to-0.4.md +++ b/docs/migrating-to-0.4.md @@ -81,7 +81,7 @@ match event { ```rust match event { SchedulerEvent::Completed(header) => { /* header.task_id, header.label, ... */ } - SchedulerEvent::Failed { header, error, will_retry } => { ... } + SchedulerEvent::Failed { header, error, will_retry, retry_after } => { ... } SchedulerEvent::Progress { header, percent, message } => { ... } } @@ -116,3 +116,83 @@ let sub = TaskSubmission::from_typed(&task); ``` Remove any `?` operators on `payload_json()` or `from_typed()` calls. Errors are still caught before the task is persisted — they just surface at submit time instead. + +## Adaptive retry with configurable backoff + +### `SchedulerEvent::Failed` gains `retry_after` field + +The `Failed` event variant now includes an optional `retry_after: Option` field indicating when the next retry will happen. Update any exhaustive pattern matches: + +**Before:** +```rust +SchedulerEvent::Failed { header, error, will_retry } => { ... } +``` + +**After:** +```rust +SchedulerEvent::Failed { header, error, will_retry, retry_after } => { ... } +// retry_after is Some(duration) when backoff is active, None for immediate retry or permanent failure +``` + +### New `SchedulerEvent::DeadLettered` variant + +A new event variant is emitted when a task exhausts its retries: + +```rust +SchedulerEvent::DeadLettered { header, error, retry_count } => { + // Task failed with a retryable error but hit its max_retries limit. + // Use scheduler.retry_dead_letter(header.task_id) to re-submit. +} +``` + +Add a match arm for this variant if your match is exhaustive. + +### `HistoryStatus` gains `DeadLetter` variant + +Tasks that exhaust their retries now receive `HistoryStatus::DeadLetter` instead of `HistoryStatus::Failed`. This distinguishes "might succeed if retried" from "permanently broken." Add a match arm for `DeadLetter` in any exhaustive match on `HistoryStatus`. + +### `TaskError` gains `retry_after_ms` field + +`TaskError` has a new `retry_after_ms: Option` field. If you construct `TaskError` via struct literals, add `retry_after_ms: None`. The existing constructors (`new`, `retryable`, `permanent`, `cancelled`) are unaffected. + +Executors can now signal a retry delay: + +```rust +Err(TaskError::retryable("rate limited").retry_after(Duration::from_secs(60))) +``` + +### New builder methods (non-breaking) + +`SchedulerBuilder` gains two new methods for per-type retry policies: + +```rust +// Register with a retry policy (backoff strategy + max_retries) +.executor_with_retry_policy("api-call", Arc::new(ApiExecutor), RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: Duration::from_secs(1), + max: Duration::from_secs(300), + multiplier: 2.0, + }, + max_retries: 5, +}) + +// Register with both TTL and retry policy +.executor_with_options("upload", Arc::new(UploadExecutor), + Some(Duration::from_secs(600)), // TTL + Some(RetryPolicy::default()), // retry policy +) +``` + +### New dead-letter query and resubmit APIs (non-breaking) + +```rust +// Query tasks that exhausted retries +let dead = scheduler.dead_letter_tasks(10, 0).await?; + +// Re-submit a dead-lettered task (resets retry count) +scheduler.retry_dead_letter(task_history_id).await?; +``` + +### Schema migration + +Migration `008_retry_backoff.sql` adds a nullable `max_retries INTEGER` column to both `tasks` and `task_history`. Existing tasks read back `max_retries = None` and fall back to the global `SchedulerConfig::max_retries`. diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md index e0e89b4..bb74d73 100644 --- a/docs/progress-and-events.md +++ b/docs/progress-and-events.md @@ -62,7 +62,7 @@ tokio::spawn(async move { SchedulerEvent::Completed(header) => { mark_done(header.task_id); } - SchedulerEvent::Failed { header, error, will_retry } => { + SchedulerEvent::Failed { header, error, will_retry, .. } => { if !will_retry { show_error(header.task_id, error); } @@ -79,7 +79,7 @@ tokio::spawn(async move { |-------|---------------| | `Dispatched(TaskEventHeader)` | Task picked from queue and executor spawned | | `Completed(TaskEventHeader)` | Task finished successfully | -| `Failed { header, error, will_retry }` | Task failed — `will_retry` tells you if it's being requeued | +| `Failed { header, error, will_retry, retry_after }` | Task failed — `will_retry` tells you if it's being requeued, `retry_after` is the backoff delay | | `Preempted(TaskEventHeader)` | Task paused for higher-priority work | | `Cancelled(TaskEventHeader)` | Task cancelled via `scheduler.cancel()` | | `Progress { header, percent, message }` | Progress update from executor | @@ -88,6 +88,7 @@ tokio::spawn(async move { | `RecurringSkipped { header, reason }` | A recurring instance was skipped (e.g., pile-up prevention) | | `RecurringCompleted { header, occurrences }` | A recurring schedule finished all its occurrences | | `TaskUnblocked { task_id }` | A blocked task's dependencies are all satisfied — it transitions to `pending` | +| `DeadLettered { header, error, retry_count }` | Task exhausted all retries — can be re-submitted via `retry_dead_letter()` | | `DependencyFailed { task_id, failed_dependency }` | A blocked task was cancelled because a dependency failed permanently | | `Paused` | Scheduler globally paused via `pause_all()` | | `Resumed` | Scheduler resumed via `resume_all()` | @@ -100,7 +101,7 @@ Task-specific events share a `TaskEventHeader` with `task_id`, `task_type`, `key |-----------------------|-----------| | A progress bar | `Progress`, `Completed`, `Failed` | | An activity log | All events | -| Error alerting | `Failed` where `will_retry` is false | +| Error alerting | `Failed` where `will_retry` is false, `DeadLettered` | | A "pause/resume" button | `Paused`, `Resumed` | | Upload status indicators | `Dispatched`, `Progress`, `Completed`, `Failed`, `Preempted` | | Stale task cleanup UI | `TaskExpired` | diff --git a/migrations/008_retry_backoff.sql b/migrations/008_retry_backoff.sql new file mode 100644 index 0000000..c8a6a9e --- /dev/null +++ b/migrations/008_retry_backoff.sql @@ -0,0 +1,5 @@ +-- Per-task max_retries (resolved from per-type policy or global default at submit time). +-- NULL means "use global default" — for backward compatibility with tasks +-- created before this migration. +ALTER TABLE tasks ADD COLUMN max_retries INTEGER; +ALTER TABLE task_history ADD COLUMN max_retries INTEGER; diff --git a/src/lib.rs b/src/lib.rs index 35ac374..3b1e4b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,10 @@ //! - Monitors system CPU, disk, and network throughput to adjust concurrency //! - Supports [composable backpressure](PressureSource) from arbitrary external sources //! - Preempts lower-priority work when high-priority tasks arrive -//! - [Retries](TaskError::retryable) failed tasks at the same priority level +//! - [Retries](TaskError::retryable) failed tasks with configurable [backoff](BackoffStrategy) +//! ([`Constant`](BackoffStrategy::Constant), [`Linear`](BackoffStrategy::Linear), +//! [`Exponential`](BackoffStrategy::Exponential), [`ExponentialJitter`](BackoffStrategy::ExponentialJitter)) +//! and per-type [retry policies](RetryPolicy) //! - Records completed/failed [task history](TaskHistoryRecord) for queries and IO learning //! - Supports [batch submission](Scheduler::submit_batch) with intra-batch dedup and chunking //! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration @@ -28,11 +31,11 @@ //! //! ```text //! submit → blocked ─(deps met)─→ pending ──────────────→ running → completed -//! ↑ ↓ ↘ paused ↗ ↘ failed (retryable → pending) +//! ↑ ↓ ↘ paused ↗ ↘ failed (retryable → pending, with backoff delay) //! (run_after elapsed) ↘ failed (permanent → history) -//! │ ↘ cancelled (via cancel() or supersede) -//! pending (gated) ↘ expired (TTL, cascade to children) -//! cancelled +//! │ ↘ dead_letter (retries exhausted → history) +//! pending (gated) ↘ cancelled (via cancel() or supersede) +//! cancelled ↘ expired (TTL, cascade to children) //! superseded //! expired (TTL) //! blocked ─(dep failed)─→ dep_failed (history) @@ -49,8 +52,12 @@ //! progress reporter. //! 4. **Terminal** — on success the task moves to the history table. On failure, //! a [`retryable`](TaskError::retryable) error requeues it (up to -//! [`SchedulerBuilder::max_retries`]); a non-retryable error moves it to -//! history as failed. +//! [`SchedulerBuilder::max_retries`] or per-type [`RetryPolicy::max_retries`]) +//! with a configurable [`BackoffStrategy`] delay; a non-retryable +//! ([`permanent`](TaskError::permanent)) error moves it to history as failed. +//! Tasks that exhaust all retries enter [`dead_letter`](HistoryStatus::DeadLetter) +//! state — queryable and manually re-submittable via +//! [`Scheduler::retry_dead_letter`]. //! //! ## Deduplication & duplicate strategies //! @@ -282,7 +289,7 @@ //! For long-running transfers (file copies, uploads, downloads), executors can //! report byte-level progress via [`TaskContext::set_bytes_total`] and //! [`TaskContext::add_bytes`]. The scheduler maintains per-task atomic counters -//! on the [`IoTracker`](registry::IoTracker) — updates are lock-free and +//! on the `IoTracker` — updates are lock-free and //! impose no overhead on the executor hot path. //! //! When [`SchedulerBuilder::progress_interval`] is set, a background ticker @@ -792,11 +799,11 @@ pub use scheduler::{ }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ - generate_dedup_key, BatchOutcome, BatchSubmission, DependencyFailurePolicy, DuplicateStrategy, - HistoryStatus, IoBudget, ParentResolution, RecurringSchedule, RecurringScheduleInfo, - SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, TaskRecord, TaskStatus, - TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK, MAX_TAG_KEY_LEN, - MAX_TAG_VALUE_LEN, + generate_dedup_key, BackoffStrategy, BatchOutcome, BatchSubmission, DependencyFailurePolicy, + DuplicateStrategy, HistoryStatus, IoBudget, ParentResolution, RecurringSchedule, + RecurringScheduleInfo, RetryPolicy, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, + TaskRecord, TaskStatus, TaskSubmission, TtlFrom, TypeStats, TypedTask, MAX_TAGS_PER_TASK, + MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/registry/mod.rs b/src/registry/mod.rs index 3d111e1..a52d58f 100644 --- a/src/registry/mod.rs +++ b/src/registry/mod.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use crate::task::retry::RetryPolicy; use crate::task::TaskError; pub(crate) use child_spawner::{ChildSpawner, ParentContext}; @@ -104,6 +105,7 @@ pub trait TaskExecutor: Send + Sync + 'static { pub struct TaskTypeRegistry { types: HashMap>, type_ttls: HashMap, + type_retry_policies: HashMap, } /// Object-safe wrapper around [`TaskExecutor`] for dynamic dispatch in the registry. @@ -157,6 +159,7 @@ impl TaskTypeRegistry { Self { types: HashMap::new(), type_ttls: HashMap::new(), + type_retry_policies: HashMap::new(), } } @@ -183,11 +186,27 @@ impl TaskTypeRegistry { self.type_ttls.insert(name.to_string(), ttl); } + /// Register an executor with a per-type retry policy. + pub fn register_with_retry_policy( + &mut self, + name: &str, + executor: Arc, + policy: RetryPolicy, + ) { + self.register(name, executor); + self.type_retry_policies.insert(name.to_string(), policy); + } + /// Look up the per-type default TTL for a task type. pub fn type_ttl(&self, name: &str) -> Option<&std::time::Duration> { self.type_ttls.get(name) } + /// Look up the per-type retry policy for a task type. + pub fn type_retry_policy(&self, name: &str) -> Option<&RetryPolicy> { + self.type_retry_policies.get(name) + } + /// Look up the executor for a task type. pub(crate) fn get(&self, name: &str) -> Option<&Arc> { self.types.get(name) @@ -227,6 +246,22 @@ impl TaskTypeRegistry { self.register_erased(name, executor); self.type_ttls.insert(name.to_string(), ttl); } + + /// Set a retry policy for an already-registered task type. + pub(crate) fn set_retry_policy(&mut self, name: &str, policy: RetryPolicy) { + self.type_retry_policies.insert(name.to_string(), policy); + } + + /// Register a pre-erased executor with a per-type retry policy. + pub(crate) fn register_erased_with_retry_policy( + &mut self, + name: &str, + executor: Arc, + policy: RetryPolicy, + ) { + self.register_erased(name, executor); + self.type_retry_policies.insert(name.to_string(), policy); + } } impl Default for TaskTypeRegistry { @@ -238,6 +273,7 @@ impl Default for TaskTypeRegistry { #[cfg(test)] mod tests { use super::*; + use crate::task::retry::{BackoffStrategy, RetryPolicy}; struct NoopExecutor; @@ -264,4 +300,48 @@ mod tests { reg.register("dup", Arc::new(NoopExecutor)); reg.register("dup", Arc::new(NoopExecutor)); } + + #[test] + fn register_with_retry_policy_stores_policy() { + let mut reg = TaskTypeRegistry::new(); + let policy = RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: std::time::Duration::from_secs(1), + max: std::time::Duration::from_secs(60), + multiplier: 2.0, + }, + max_retries: 5, + }; + reg.register_with_retry_policy("api-call", Arc::new(NoopExecutor), policy); + + assert!(reg.get("api-call").is_some()); + let retrieved = reg.type_retry_policy("api-call").unwrap(); + assert_eq!(retrieved.max_retries, 5); + } + + #[test] + fn type_retry_policy_returns_none_for_missing() { + let mut reg = TaskTypeRegistry::new(); + reg.register("plain", Arc::new(NoopExecutor)); + + assert!(reg.type_retry_policy("plain").is_none()); + assert!(reg.type_retry_policy("nonexistent").is_none()); + } + + #[test] + fn register_erased_with_retry_policy_stores_policy() { + let mut reg = TaskTypeRegistry::new(); + let policy = RetryPolicy { + strategy: BackoffStrategy::Constant { + delay: std::time::Duration::from_secs(10), + }, + max_retries: 7, + }; + let executor: Arc = Arc::new(NoopExecutor); + reg.register_erased_with_retry_policy("erased-type", executor, policy); + + assert!(reg.get("erased-type").is_some()); + let retrieved = reg.type_retry_policy("erased-type").unwrap(); + assert_eq!(retrieved.max_retries, 7); + } } diff --git a/src/resource/network_pressure.rs b/src/resource/network_pressure.rs index 213b76c..a933f35 100644 --- a/src/resource/network_pressure.rs +++ b/src/resource/network_pressure.rs @@ -1,4 +1,4 @@ -//! Built-in [`PressureSource`](crate::PressureSource) derived from network bandwidth utilization. +//! Built-in [`PressureSource`] derived from network bandwidth utilization. //! //! [`NetworkPressure`] reads the latest [`ResourceSnapshot`](crate::ResourceSnapshot) //! and computes pressure as the ratio of observed network throughput to a diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 27dc334..c649356 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -11,11 +11,20 @@ use crate::registry::TaskExecutor; use crate::resource::sampler::{SamplerConfig, SmoothedReader}; use crate::resource::{ResourceReader, ResourceSampler}; use crate::store::{StoreConfig, StoreError, TaskStore}; +use crate::task::retry::RetryPolicy; use crate::task::TypedTask; use super::event::{SchedulerConfig, ShutdownMode}; use super::Scheduler; +/// A registered executor with its optional per-type TTL and retry policy. +type ExecutorEntry = ( + String, + Arc, + Option, + Option, +); + /// Ergonomic builder for constructing a [`Scheduler`] with all its dependencies. /// /// Hides the `Arc>` wiring and manages the resource sampler lifecycle. @@ -41,11 +50,7 @@ pub struct SchedulerBuilder { store_path: Option, store_config: StoreConfig, store: Option, - executors: Vec<( - String, - Arc, - Option, - )>, + executors: Vec, config: SchedulerConfig, pressure_sources: Vec>, policy: Option, @@ -103,6 +108,7 @@ impl SchedulerBuilder { name.to_string(), executor as Arc, None, + None, )); self } @@ -121,6 +127,43 @@ impl SchedulerBuilder { name.to_string(), executor as Arc, Some(ttl), + None, + )); + self + } + + /// Register a task executor with a per-type retry policy. + /// + /// Tasks of this type will use this policy's backoff strategy and + /// max_retries instead of the global default. + pub fn executor_with_retry_policy( + mut self, + name: &str, + executor: Arc, + policy: RetryPolicy, + ) -> Self { + self.executors.push(( + name.to_string(), + executor as Arc, + None, + Some(policy), + )); + self + } + + /// Register an executor with both a TTL and a retry policy. + pub fn executor_with_options( + mut self, + name: &str, + executor: Arc, + ttl: Option, + retry_policy: Option, + ) -> Self { + self.executors.push(( + name.to_string(), + executor as Arc, + ttl, + retry_policy, )); self } @@ -330,14 +373,24 @@ impl SchedulerBuilder { // Build registry. let mut registry = crate::registry::TaskTypeRegistry::new(); - for (name, executor, ttl) in self.executors { + for (name, executor, ttl, retry_policy) in self.executors { if registry.get(&name).is_some() { panic!("task type '{name}' already registered"); } - if let Some(ttl) = ttl { - registry.register_erased_with_ttl(&name, executor, ttl); - } else { - registry.register_erased(&name, executor); + match (ttl, retry_policy) { + (Some(ttl), Some(policy)) => { + registry.register_erased_with_ttl(&name, executor, ttl); + registry.set_retry_policy(&name, policy); + } + (Some(ttl), None) => { + registry.register_erased_with_ttl(&name, executor, ttl); + } + (None, Some(policy)) => { + registry.register_erased_with_retry_policy(&name, executor, policy); + } + (None, None) => { + registry.register_erased(&name, executor); + } } } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 4b337f5..884ba83 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -266,6 +266,7 @@ pub(crate) struct SpawnContext { pub active: ActiveTaskMap, pub event_tx: tokio::sync::broadcast::Sender, pub max_retries: i32, + pub registry: Arc, pub app_state: crate::registry::StateSnapshot, pub work_notify: Arc, pub scheduler: super::WeakScheduler, @@ -288,6 +289,7 @@ pub(crate) async fn spawn_task( active, event_tx, max_retries, + registry, app_state, work_notify, scheduler, @@ -455,7 +457,34 @@ pub(crate) async fn spawn_task( active.remove(task_id); return; } - let will_retry = te.retryable && task.retry_count < max_retries; + + // Resolve effective retry policy for this task type. + let policy = registry.type_retry_policy(&task.task_type); + let effective_max_retries = task + .max_retries + .unwrap_or(policy.map(|p| p.max_retries).unwrap_or(max_retries)); + let backoff_strategy = policy.map(|p| &p.strategy); + + let will_retry = te.retryable && task.retry_count < effective_max_retries; + + // Compute retry delay for event reporting. + let retry_delay = if will_retry { + if let Some(ms) = te.retry_after_ms { + Some(std::time::Duration::from_millis(ms)) + } else if let Some(strategy) = backoff_strategy { + let d = strategy.delay_for(task.retry_count); + if d.is_zero() { + None + } else { + Some(d) + } + } else { + None + } + } else { + None + }; + tracing::warn!( task_id, task_type = task.task_type, @@ -464,19 +493,40 @@ pub(crate) async fn spawn_task( will_retry, "task failed" ); + let fail_backoff = crate::store::FailBackoff { + strategy: backoff_strategy, + executor_retry_after_ms: te.retry_after_ms, + }; if let Err(e) = store - .fail_with_record(&task, &te.message, te.retryable, max_retries, &metrics) + .fail_with_record( + &task, + &te.message, + te.retryable, + effective_max_retries, + &metrics, + &fail_backoff, + ) .await { tracing::error!(task_id, error = %e, "failed to record task failure"); } // Remove from active tracking AFTER the store write completes. active.remove(task_id); - let _ = event_tx.send(SchedulerEvent::Failed { - header: task.event_header(), - error: te.message.clone(), - will_retry, - }); + let dead_lettered = te.retryable && !will_retry; + if dead_lettered { + let _ = event_tx.send(SchedulerEvent::DeadLettered { + header: task.event_header(), + error: te.message.clone(), + retry_count: task.retry_count + 1, + }); + } else { + let _ = event_tx.send(SchedulerEvent::Failed { + header: task.event_header(), + error: te.message.clone(), + will_retry, + retry_after: retry_delay, + }); + } work_notify.notify_one(); // If permanent failure, propagate to dependency chain. @@ -521,7 +571,14 @@ pub(crate) async fn spawn_task( // Fail the parent. let msg = format!("child task {task_id} failed: {}", te.message); if let Err(e) = store - .fail_with_record(&parent, &msg, false, 0, &IoBudget::default()) + .fail_with_record( + &parent, + &msg, + false, + 0, + &IoBudget::default(), + &Default::default(), + ) .await { tracing::error!( @@ -534,6 +591,7 @@ pub(crate) async fn spawn_task( header: parent.event_header(), error: msg, will_retry: false, + retry_after: None, }); } else { // Not fail_fast — check if all children done. @@ -579,7 +637,14 @@ async fn handle_parent_resolution( // All children done but some failed — fail the parent. if let Ok(Some(parent)) = store.task_by_id(parent_id).await { if let Err(e) = store - .fail_with_record(&parent, &reason, false, 0, &IoBudget::default()) + .fail_with_record( + &parent, + &reason, + false, + 0, + &IoBudget::default(), + &Default::default(), + ) .await { tracing::error!(parent_id, error = %e, "failed to record parent failure"); @@ -588,6 +653,7 @@ async fn handle_parent_resolution( header: parent.event_header(), error: reason, will_retry: false, + retry_after: None, }); } } diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 2cda5c8..237bfe1 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -73,7 +73,7 @@ pub struct TaskEventHeader { /// Events emitted by the scheduler for UI integration and observability. /// /// Subscribe via the `tokio::sync::broadcast::Receiver` returned by -/// [`Scheduler::subscribe`] or passed through the builder. +/// [`Scheduler::subscribe`](super::Scheduler::subscribe) or passed through the builder. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "data")] pub enum SchedulerEvent { @@ -86,6 +86,9 @@ pub enum SchedulerEvent { header: TaskEventHeader, error: String, will_retry: bool, + /// How long until the next retry attempt. `None` if the task will not + /// be retried or if the retry is immediate (no backoff). + retry_after: Option, }, /// A task was preempted by higher-priority work. Preempted(TaskEventHeader), @@ -138,14 +141,24 @@ pub enum SchedulerEvent { }, /// A blocked task became pending after all its dependencies completed. TaskUnblocked { task_id: i64 }, + /// A task exhausted its retries and was moved to dead-letter state. + /// + /// The task failed with a retryable error but has reached its `max_retries` + /// limit. Use [`Scheduler::retry_dead_letter`](super::Scheduler::retry_dead_letter) + /// to re-submit. + DeadLettered { + header: TaskEventHeader, + error: String, + retry_count: i32, + }, /// A blocked task was cancelled because a dependency failed. DependencyFailed { task_id: i64, failed_dependency: i64, }, - /// The scheduler was globally paused via [`Scheduler::pause_all`]. + /// The scheduler was globally paused via [`Scheduler::pause_all`](super::Scheduler::pause_all). Paused, - /// The scheduler was resumed via [`Scheduler::resume_all`]. + /// The scheduler was resumed via [`Scheduler::resume_all`](super::Scheduler::resume_all). Resumed, } @@ -161,7 +174,8 @@ impl SchedulerEvent { | Self::Superseded { old: header, .. } | Self::TaskExpired { header, .. } | Self::RecurringSkipped { header, .. } - | Self::RecurringCompleted { header, .. } => Some(header), + | Self::RecurringCompleted { header, .. } + | Self::DeadLettered { header, .. } => Some(header), Self::Waiting { .. } | Self::BatchSubmitted { .. } | Self::TaskUnblocked { .. } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 4cf4f75..1b269c5 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -7,14 +7,14 @@ //! integration. Use [`SchedulerBuilder`] for ergonomic construction. //! //! The `Scheduler` implementation is split across focused submodules: -//! - [`submit`] — task submission, lookup, cancellation, and superseding -//! - [`run_loop`] — the main event loop, dispatch, and shutdown -//! - [`control`] — pause/resume, concurrency limits, and group limits -//! - [`queries`] — read-only queries (active tasks, progress, snapshots) -//! - [`builder`] — ergonomic construction via [`SchedulerBuilder`] -//! - [`dispatch`] — task spawning, active-task tracking, and preemption -//! - [`gate`] — admission control (IO budget, backpressure, group limits) -//! - [`event`] — event types and scheduler configuration +//! - `submit` — task submission, lookup, cancellation, and superseding +//! - `run_loop` — the main event loop, dispatch, and shutdown +//! - `control` — pause/resume, concurrency limits, and group limits +//! - `queries` — read-only queries (active tasks, progress, snapshots) +//! - `builder` — ergonomic construction via [`SchedulerBuilder`] +//! - `dispatch` — task spawning, active-task tracking, and preemption +//! - `gate` — admission control (IO budget, backpressure, group limits) +//! - `event` — event types and scheduler configuration //! - [`progress`] — progress reporting, byte-level tracking, and extrapolation //! //! See the [crate-level docs](crate) for a full walkthrough of the task diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 5d590d4..063feef 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -16,12 +16,12 @@ //! For streaming transfers, executors call [`ProgressReporter::set_bytes_total`] //! and [`ProgressReporter::add_bytes`] (or the convenience wrappers on //! [`TaskContext`](crate::TaskContext)). These update lock-free atomic counters -//! on the task's [`IoTracker`]. +//! on the task's `IoTracker`. //! -//! A background [`run_progress_ticker`] task (opt-in via +//! A background progress ticker task (opt-in via //! [`SchedulerBuilder::progress_interval`](super::SchedulerBuilder::progress_interval)) //! polls these counters at a fixed interval, feeds them through an -//! EWMA-smoothed [`ThroughputTracker`], and emits [`TaskProgress`] events on +//! EWMA-smoothed `ThroughputTracker`, and emits [`TaskProgress`] events on //! a dedicated broadcast channel. Each event includes throughput (bytes/sec) //! and an estimated time remaining. @@ -46,7 +46,7 @@ use super::SchedulerEvent; /// /// Progress reports are emitted as [`SchedulerEvent::Progress`] /// events, making them available to the UI via the same broadcast channel. -/// The reporter also updates the in-memory [`ActiveTaskMap`] directly, +/// The reporter also updates the in-memory `ActiveTaskMap` directly, /// eliminating the need for a per-task broadcast listener. /// /// # Example diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index 4f685aa..a93992a 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -65,7 +65,7 @@ impl Scheduler { /// Find active tasks matching all specified tag filters (AND semantics). /// - /// Delegates to [`TaskStore::tasks_by_tags`]. + /// Delegates to [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags). pub async fn tasks_by_tags( &self, filters: &[(&str, &str)], @@ -76,7 +76,7 @@ impl Scheduler { /// Count active tasks grouped by a tag key's values. /// - /// Delegates to [`TaskStore::count_by_tag`]. + /// Delegates to [`TaskStore::count_by_tag`](crate::TaskStore::count_by_tag). pub async fn count_by_tag( &self, key: &str, @@ -87,11 +87,24 @@ impl Scheduler { /// List distinct values for a tag key across active tasks, with counts. /// - /// Delegates to [`TaskStore::tag_values`]. + /// Delegates to [`TaskStore::tag_values`](crate::TaskStore::tag_values). pub async fn tag_values(&self, key: &str) -> Result, StoreError> { self.inner.store.tag_values(key).await } + /// Dead-lettered tasks (retries exhausted), newest first. + /// + /// These are tasks that failed with a retryable error but exhausted their + /// retry limit. Use [`retry_dead_letter`](Self::retry_dead_letter) to + /// re-submit them. + pub async fn dead_letter_tasks( + &self, + limit: i64, + offset: i64, + ) -> Result, StoreError> { + self.inner.store.dead_letter_tasks(limit, offset).await + } + /// Capture a single status snapshot for dashboard UIs. /// /// Gathers running tasks, queue depths, progress estimates, and diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 8c5183d..d8641c3 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -22,6 +22,7 @@ impl Scheduler { active: self.inner.active.clone(), event_tx: self.inner.event_tx.clone(), max_retries: self.inner.max_retries, + registry: Arc::clone(&self.inner.registry), app_state: self.inner.app_state.snapshot().await, work_notify: Arc::clone(&self.inner.work_notify), scheduler: self.downgrade(), @@ -99,6 +100,7 @@ impl Scheduler { false, 0, &IoBudget::default(), + &Default::default(), ) .await?; return Ok(true); @@ -154,6 +156,7 @@ impl Scheduler { false, 0, &IoBudget::default(), + &Default::default(), ) .await?; return Ok(true); diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index f029bfb..1814456 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -8,8 +8,8 @@ use crate::priority::Priority; use crate::registry::{IoTracker, TaskContext}; use crate::store::StoreError; use crate::task::{ - generate_dedup_key, BatchOutcome, BatchSubmission, SubmitOutcome, TaskLookup, TaskRecord, - TaskSubmission, TypedTask, + generate_dedup_key, BatchOutcome, BatchSubmission, HistoryStatus, SubmitOutcome, TaskLookup, + TaskRecord, TaskSubmission, TypedTask, }; use super::progress::ProgressReporter; @@ -298,7 +298,7 @@ impl Scheduler { /// Cancel all active tasks matching a tag key-value pair. /// - /// Finds tasks via [`TaskStore::tasks_by_tags`] and cancels each one. + /// Finds tasks via [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags) and cancels each one. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag(&self, key: &str, value: &str) -> Result, StoreError> { let tasks = self @@ -396,6 +396,65 @@ impl Scheduler { } } + /// Re-submit a dead-lettered task. + /// + /// Loads the history record, verifies it has `dead_letter` status, constructs + /// a new [`TaskSubmission`] from its fields, submits it (going through normal + /// dedup and validation), and removes the history row on success. + /// + /// The re-submitted task starts with `retry_count = 0`. + pub async fn retry_dead_letter(&self, history_id: i64) -> Result { + let record = self + .inner + .store + .history_by_id(history_id) + .await? + .ok_or_else(|| StoreError::NotFound(format!("history record {history_id}")))?; + + if record.status != HistoryStatus::DeadLetter { + return Err(StoreError::InvalidState(format!( + "history record {history_id} has status {:?}, expected DeadLetter", + record.status + ))); + } + + // Reconstruct a submission from the history record. + // If label != task_type, the original submission had an explicit dedup + // key (label preserves the original key string). Otherwise, the key + // was derived from the payload hash. + let mut sub = TaskSubmission::new(&record.task_type) + .priority(record.priority) + .expected_io(record.expected_io) + .tags(record.tags.clone()); + + if record.label != record.task_type { + sub = sub.key(&record.label); + } + + if let Some(payload) = &record.payload { + sub = sub.payload_raw(payload.clone()); + } + if let Some(parent_id) = record.parent_id { + sub.parent_id = Some(parent_id); + } + sub.fail_fast = record.fail_fast; + if let Some(ref gk) = record.group_key { + sub.group_key = Some(gk.clone()); + } + if let Some(mr) = record.max_retries { + sub.max_retries = Some(mr); + } + + let outcome = self.submit(&sub).await?; + + // Remove the dead-letter history row on successful submission. + if outcome.is_inserted() { + self.inner.store.delete_history(history_id).await?; + } + + Ok(outcome) + } + /// Fire the `on_cancel` hook for a task (fire-and-forget). /// /// Takes a snapshot of the app state and spawns a tokio task that runs diff --git a/src/scheduler/tests.rs b/src/scheduler/tests.rs index d483b02..54d8253 100644 --- a/src/scheduler/tests.rs +++ b/src/scheduler/tests.rs @@ -1443,3 +1443,70 @@ async fn chain_of_supersedes() { let task = sched.store().task_by_key(&key).await.unwrap().unwrap(); assert_eq!(task.payload.as_deref(), Some(b"C".as_slice())); } + +// ── Phase 5: Dead-letter integration tests ────────────────────── + +#[tokio::test] +async fn retry_dead_letter_resubmits_with_reset_retry_count() { + // Use max_retries=0 so a retryable failure immediately dead-letters. + let store = TaskStore::open_memory().await.unwrap(); + let mut registry = TaskTypeRegistry::new(); + registry.register_erased("test", arc_erased(FailingExecutor)); + let mut config = SchedulerConfig::default(); + config.max_retries = 0; + + let sched = Scheduler::new( + store, + config, + Arc::new(registry), + CompositePressure::new(), + ThrottlePolicy::default_three_tier(), + ); + + let mut rx = sched.subscribe(); + + // Submit and dispatch — will fail with retryable error and dead-letter. + sched + .submit( + &TaskSubmission::new("test") + .key("dl-retry") + .payload_raw(b"payload".to_vec()), + ) + .await + .unwrap(); + + sched.try_dispatch().await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Should have emitted a DeadLettered event. + let mut got_dead_lettered = false; + while let Ok(evt) = rx.try_recv() { + if matches!(evt, SchedulerEvent::DeadLettered { .. }) { + got_dead_lettered = true; + } + } + assert!(got_dead_lettered, "expected DeadLettered event"); + + // Should be in dead_letter_tasks. + let dl = sched.dead_letter_tasks(10, 0).await.unwrap(); + assert_eq!(dl.len(), 1); + assert_eq!(dl[0].status, HistoryStatus::DeadLetter); + let history_id = dl[0].id; + assert_eq!(dl[0].retry_count, 1); // retry_count was incremented + + // Now re-submit from dead-letter. Replace the executor with one that + // succeeds so the re-submitted task can complete. + // (retry_dead_letter only re-submits — it doesn't dispatch.) + let outcome = sched.retry_dead_letter(history_id).await.unwrap(); + assert!(outcome.is_inserted()); + + // Dead-letter history row should be removed. + let dl_after = sched.dead_letter_tasks(10, 0).await.unwrap(); + assert!(dl_after.is_empty()); + + // New task should be in the active queue with retry_count=0. + let key = crate::task::generate_dedup_key("test", Some(b"dl-retry")); + let task = sched.store().task_by_key(&key).await.unwrap().unwrap(); + assert_eq!(task.retry_count, 0); + assert_eq!(task.payload.as_deref(), Some(b"payload".as_slice())); +} diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index ad57810..1be175c 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -311,7 +311,14 @@ mod tests { store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); store - .fail(child.id, "boom", false, 0, &IoBudget::default()) + .fail( + child.id, + "boom", + false, + 0, + &IoBudget::default(), + &Default::default(), + ) .await .unwrap(); diff --git a/src/store/lifecycle.rs b/src/store/lifecycle.rs index 7318a32..40c8627 100644 --- a/src/store/lifecycle.rs +++ b/src/store/lifecycle.rs @@ -1,11 +1,25 @@ //! Task lifecycle transitions: pop, complete, fail, pause, resume, and //! dependency resolution. -use crate::task::{DependencyFailurePolicy, IoBudget, TaskRecord}; +use crate::task::{BackoffStrategy, DependencyFailurePolicy, IoBudget, TaskRecord}; use super::row_mapping::row_to_task_record; use super::{StoreError, TaskStore}; +/// Backoff parameters for retry delay computation. +/// +/// Bundles the optional backoff strategy and executor-signaled override into a +/// single argument to keep `fail()` / `fail_with_record()` under the clippy +/// argument-count lint. +#[derive(Debug, Default, Clone)] +pub struct FailBackoff<'a> { + /// Per-type backoff strategy. `None` means immediate retry. + pub strategy: Option<&'a BackoffStrategy>, + /// Executor-requested retry delay in milliseconds. Overrides the strategy + /// when set. + pub executor_retry_after_ms: Option, +} + /// Insert a task record into the history table. /// /// Shared by `complete()`, `fail()`, and `cancel_to_history()` to eliminate @@ -19,7 +33,7 @@ pub(crate) async fn insert_history( last_error: Option<&str>, ) -> Result<(), StoreError> { let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; - let retry_count = if status == "failed" { + let retry_count = if status == "failed" || status == "dead_letter" { task.retry_count + 1 } else { task.retry_count @@ -29,8 +43,8 @@ pub(crate) async fn insert_history( expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, actual_read_bytes, actual_write_bytes, actual_net_rx_bytes, actual_net_tx_bytes, retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast, group_key, - ttl_seconds, ttl_from, expires_at, run_after) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ttl_seconds, ttl_from, expires_at, run_after, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -67,6 +81,7 @@ pub(crate) async fn insert_history( task.run_after .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()), ) + .bind(task.max_retries) .execute(&mut **conn) .await?; @@ -104,7 +119,7 @@ impl TaskStore { WHERE id = ( SELECT id FROM tasks WHERE status = 'pending' - AND (run_after IS NULL OR run_after <= datetime('now')) + AND (run_after IS NULL OR run_after <= strftime('%Y-%m-%d %H:%M:%f', 'now')) ORDER BY priority ASC, id ASC LIMIT 1 )", @@ -170,7 +185,7 @@ impl TaskStore { WHERE id = ( SELECT id FROM tasks WHERE status = 'pending' - AND (run_after IS NULL OR run_after <= datetime('now')) + AND (run_after IS NULL OR run_after <= strftime('%Y-%m-%d %H:%M:%f', 'now')) ORDER BY priority ASC, id ASC LIMIT 1 ) @@ -353,8 +368,8 @@ impl TaskStore { ttl_seconds, ttl_from, expires_at, run_after, recurring_interval_secs, recurring_max_executions, recurring_execution_count, - recurring_paused) - VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0)", + recurring_paused, max_retries) + VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -375,6 +390,7 @@ impl TaskStore { .bind(task.recurring_interval_secs) .bind(task.recurring_max_executions) .bind(execution_count) + .bind(task.max_retries) .execute(&mut **conn) .await?; @@ -403,6 +419,9 @@ impl TaskStore { /// Mark a task as failed. If `retryable` and under max retries, requeue /// it as pending with the same priority. Otherwise move to history as failed. + /// + /// `backoff` controls the delay before the next retry attempt. See + /// `fail_inner` for details. pub async fn fail( &self, id: i64, @@ -410,6 +429,7 @@ impl TaskStore { retryable: bool, max_retries: i32, metrics: &IoBudget, + backoff: &FailBackoff<'_>, ) -> Result<(), StoreError> { tracing::debug!(task_id = id, "store.fail: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -423,7 +443,16 @@ impl TaskStore { let Some(row) = row else { return Ok(()) }; let task = row_to_task_record(&row); - Self::fail_inner(&mut conn, &task, error, retryable, max_retries, metrics).await?; + Self::fail_inner( + &mut conn, + &task, + error, + retryable, + max_retries, + metrics, + backoff, + ) + .await?; sqlx::query("COMMIT").execute(&mut *conn).await?; drop(conn); @@ -443,12 +472,22 @@ impl TaskStore { retryable: bool, max_retries: i32, metrics: &IoBudget, + backoff: &FailBackoff<'_>, ) -> Result<(), StoreError> { tracing::debug!(task_id = task.id, "store.fail_with_record: BEGIN tx"); let mut conn = self.begin_write().await?; tracing::debug!(task_id = task.id, "store.fail_with_record: BEGIN acquired"); - Self::fail_inner(&mut conn, task, error, retryable, max_retries, metrics).await?; + Self::fail_inner( + &mut conn, + task, + error, + retryable, + max_retries, + metrics, + backoff, + ) + .await?; sqlx::query("COMMIT").execute(&mut *conn).await?; drop(conn); @@ -460,6 +499,13 @@ impl TaskStore { } /// Shared failure logic: retry or move to history. + /// + /// When retrying, computes the backoff delay from (in priority order): + /// 1. `executor_retry_after_ms` — executor-signaled override + /// 2. `backoff` strategy — per-type backoff computation + /// 3. Immediate retry (no delay) — backward-compatible default + /// + /// The delay is applied by setting `run_after` on the requeued task. async fn fail_inner( conn: &mut sqlx::pool::PoolConnection, task: &TaskRecord, @@ -467,23 +513,53 @@ impl TaskStore { retryable: bool, max_retries: i32, metrics: &IoBudget, + backoff: &FailBackoff<'_>, ) -> Result<(), StoreError> { if retryable && task.retry_count < max_retries { - // Requeue with incremented retry count, same priority. - sqlx::query( - "UPDATE tasks SET status = 'pending', started_at = NULL, - retry_count = retry_count + 1, last_error = ? - WHERE id = ?", - ) - .bind(error) - .bind(task.id) - .execute(&mut **conn) - .await?; + // Compute delay: executor override > backoff strategy > immediate. + let delay = if let Some(ms) = backoff.executor_retry_after_ms { + std::time::Duration::from_millis(ms) + } else if let Some(strategy) = backoff.strategy { + strategy.delay_for(task.retry_count) + } else { + std::time::Duration::ZERO + }; + + if delay.is_zero() { + // Immediate retry — current behavior. + sqlx::query( + "UPDATE tasks SET status = 'pending', started_at = NULL, + retry_count = retry_count + 1, last_error = ? + WHERE id = ?", + ) + .bind(error) + .bind(task.id) + .execute(&mut **conn) + .await?; + } else { + // Delayed retry — set run_after. + let run_after = + chrono::Utc::now() + chrono::Duration::milliseconds(delay.as_millis() as i64); + let run_after_str = run_after.format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + sqlx::query( + "UPDATE tasks SET status = 'pending', started_at = NULL, + retry_count = retry_count + 1, last_error = ?, + run_after = ? + WHERE id = ?", + ) + .bind(error) + .bind(&run_after_str) + .bind(task.id) + .execute(&mut **conn) + .await?; + } } else { - // Permanent failure — move to history. + // Terminal failure — move to history. + // Distinguish: retryable + exhausted → dead_letter; non-retryable → failed. + let status = if retryable { "dead_letter" } else { "failed" }; let duration_ms = compute_duration_ms(task); - insert_history(conn, task, "failed", metrics, duration_ms, Some(error)).await?; + insert_history(conn, task, status, metrics, duration_ms, Some(error)).await?; super::delete_task_tags(conn, task.id).await?; sqlx::query("DELETE FROM tasks WHERE id = ?") @@ -928,6 +1004,7 @@ mod tests { use crate::task::{HistoryStatus, IoBudget, TaskStatus, TaskSubmission}; use super::super::TaskStore; + use super::FailBackoff; async fn test_store() -> TaskStore { TaskStore::open_memory().await.unwrap() @@ -997,7 +1074,14 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "transient error", true, 3, &IoBudget::default()) + .fail( + task.id, + "transient error", + true, + 3, + &IoBudget::default(), + &FailBackoff::default(), + ) .await .unwrap(); @@ -1016,20 +1100,35 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "err1", true, 1, &IoBudget::default()) + .fail( + task.id, + "err1", + true, + 1, + &IoBudget::default(), + &FailBackoff::default(), + ) .await .unwrap(); let task = store.pop_next().await.unwrap().unwrap(); assert_eq!(task.retry_count, 1); store - .fail(task.id, "err2", true, 1, &IoBudget::disk(100, 50)) + .fail( + task.id, + "err2", + true, + 1, + &IoBudget::disk(100, 50), + &FailBackoff::default(), + ) .await .unwrap(); assert!(store.task_by_key(&key).await.unwrap().is_none()); - let hist = store.failed_tasks(10).await.unwrap(); + // Exhausted retries now produce dead_letter status (phase 5). + let hist = store.dead_letter_tasks(10, 0).await.unwrap(); assert_eq!(hist.len(), 1); - assert_eq!(hist[0].status, HistoryStatus::Failed); + assert_eq!(hist[0].status, HistoryStatus::DeadLetter); } #[tokio::test] @@ -1205,7 +1304,14 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "boom", false, 0, &IoBudget::default()) + .fail( + task.id, + "boom", + false, + 0, + &IoBudget::default(), + &FailBackoff::default(), + ) .await .unwrap(); @@ -1290,4 +1396,458 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); assert_eq!(task.tags.get("color").unwrap(), "blue"); } + + // ── max_retries persistence (Phase 3) ──────────────────────────── + + #[tokio::test] + async fn max_retries_round_trips_through_insert_and_select() { + let store = test_store().await; + let sub = TaskSubmission::new("test") + .key("mr-roundtrip") + .max_retries(5); + + let id = store.submit(&sub).await.unwrap().id().unwrap(); + let task = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(task.max_retries, Some(5)); + } + + #[tokio::test] + async fn max_retries_none_when_not_set() { + let store = test_store().await; + let sub = TaskSubmission::new("test").key("mr-none"); + + let id = store.submit(&sub).await.unwrap().id().unwrap(); + let task = store.task_by_id(id).await.unwrap().unwrap(); + assert_eq!(task.max_retries, None); + } + + #[tokio::test] + async fn max_retries_preserved_in_history_on_complete() { + let store = test_store().await; + let sub = TaskSubmission::new("test") + .key("mr-hist-complete") + .max_retries(7); + + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.max_retries, Some(7)); + + store.complete(task.id, &IoBudget::default()).await.unwrap(); + + let key = sub.effective_key(); + let history = store.history_by_key(&key).await.unwrap(); + assert!(!history.is_empty()); + assert_eq!(history[0].max_retries, Some(7)); + } + + #[tokio::test] + async fn max_retries_preserved_in_history_on_fail() { + let store = test_store().await; + let sub = TaskSubmission::new("test") + .key("mr-hist-fail") + .max_retries(3); + + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Permanent failure (non-retryable). + store + .fail( + task.id, + "boom", + false, + 0, + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + let key = sub.effective_key(); + let history = store.history_by_key(&key).await.unwrap(); + assert!(!history.is_empty()); + assert_eq!(history[0].max_retries, Some(3)); + assert_eq!(history[0].status, HistoryStatus::Failed); + } + + #[tokio::test] + async fn max_retries_null_reads_back_as_none() { + let store = test_store().await; + // Submit without max_retries (NULL in DB). + let sub = TaskSubmission::new("test").key("mr-null"); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.max_retries, None); + + // Complete it and verify history also has None. + store.complete(task.id, &IoBudget::default()).await.unwrap(); + let key = sub.effective_key(); + let history = store.history_by_key(&key).await.unwrap(); + assert_eq!(history[0].max_retries, None); + } + + // ── Phase 4: Retry with backoff ───────────────────────────────── + + #[tokio::test] + async fn backoff_constant_sets_run_after() { + use crate::task::BackoffStrategy; + use std::time::Duration; + + let store = test_store().await; + let sub = make_submission("const-backoff", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + let strategy = BackoffStrategy::Constant { + delay: Duration::from_secs(60), + }; + store + .fail( + task.id, + "transient", + true, + 3, + &IoBudget::default(), + &FailBackoff { + strategy: Some(&strategy), + ..Default::default() + }, + ) + .await + .unwrap(); + + let requeued = store.task_by_key(&key).await.unwrap().unwrap(); + assert_eq!(requeued.status, TaskStatus::Pending); + assert_eq!(requeued.retry_count, 1); + // run_after should be set roughly 60s in the future. + let run_after = requeued.run_after.expect("run_after should be set"); + let diff = run_after - chrono::Utc::now(); + assert!( + diff.num_seconds() >= 55 && diff.num_seconds() <= 65, + "expected run_after ~60s in the future, got {}s", + diff.num_seconds() + ); + } + + #[tokio::test] + async fn backoff_exponential_increases_across_retries() { + use crate::task::BackoffStrategy; + use std::time::Duration; + + let store = test_store().await; + let sub = make_submission("exp-backoff", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + + let strategy = BackoffStrategy::Exponential { + initial: Duration::from_secs(10), + max: Duration::from_secs(3600), + multiplier: 2.0, + }; + + // First failure (retry_count=0): delay = 10s + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.retry_count, 0); + store + .fail( + task.id, + "err", + true, + 5, + &IoBudget::default(), + &FailBackoff { + strategy: Some(&strategy), + ..Default::default() + }, + ) + .await + .unwrap(); + let requeued = store.task_by_key(&key).await.unwrap().unwrap(); + let run_after_1 = requeued.run_after.expect("run_after should be set"); + let diff_1 = (run_after_1 - chrono::Utc::now()).num_seconds(); + assert!( + (7..=13).contains(&diff_1), + "retry 0: expected ~10s delay, got {diff_1}s" + ); + + // Manually clear run_after so we can pop the task for the next retry. + sqlx::query("UPDATE tasks SET run_after = NULL WHERE key = ?") + .bind(&key) + .execute(store.pool()) + .await + .unwrap(); + + // Second failure (retry_count=1): delay = 20s + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.retry_count, 1); + store + .fail( + task.id, + "err", + true, + 5, + &IoBudget::default(), + &FailBackoff { + strategy: Some(&strategy), + ..Default::default() + }, + ) + .await + .unwrap(); + let requeued = store.task_by_key(&key).await.unwrap().unwrap(); + let run_after_2 = requeued.run_after.expect("run_after should be set"); + let diff_2 = (run_after_2 - chrono::Utc::now()).num_seconds(); + assert!( + (17..=23).contains(&diff_2), + "retry 1: expected ~20s delay, got {diff_2}s" + ); + } + + #[tokio::test] + async fn executor_retry_after_overrides_strategy() { + use crate::task::BackoffStrategy; + use std::time::Duration; + + let store = test_store().await; + let sub = make_submission("override-backoff", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Strategy says 10s, but executor override says 120s. + let strategy = BackoffStrategy::Constant { + delay: Duration::from_secs(10), + }; + store + .fail( + task.id, + "rate limited", + true, + 3, + &IoBudget::default(), + &FailBackoff { + strategy: Some(&strategy), + executor_retry_after_ms: Some(120_000), + }, + ) + .await + .unwrap(); + + let requeued = store.task_by_key(&key).await.unwrap().unwrap(); + let run_after = requeued.run_after.expect("run_after should be set"); + let diff = (run_after - chrono::Utc::now()).num_seconds(); + // Should be ~120s, not ~10s. + assert!( + (115..=125).contains(&diff), + "expected ~120s delay from executor override, got {diff}s" + ); + } + + #[tokio::test] + async fn no_backoff_requeues_immediately() { + let store = test_store().await; + let sub = make_submission("no-backoff", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // No strategy, no executor override → immediate retry. + store + .fail( + task.id, + "err", + true, + 3, + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + let requeued = store.task_by_key(&key).await.unwrap().unwrap(); + assert_eq!(requeued.status, TaskStatus::Pending); + assert_eq!(requeued.retry_count, 1); + // run_after should remain None (immediate dispatch). + assert!( + requeued.run_after.is_none(), + "run_after should be None for immediate retry" + ); + } + + #[tokio::test] + async fn permanent_error_skips_retry_moves_to_history() { + use crate::task::BackoffStrategy; + use std::time::Duration; + + let store = test_store().await; + let sub = make_submission("permanent-err", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Even with a backoff strategy, non-retryable errors go straight to history. + let strategy = BackoffStrategy::Constant { + delay: Duration::from_secs(60), + }; + store + .fail( + task.id, + "fatal error", + false, + 3, + &IoBudget::default(), + &FailBackoff { + strategy: Some(&strategy), + ..Default::default() + }, + ) + .await + .unwrap(); + + // Should be gone from the active queue. + assert!(store.task_by_key(&key).await.unwrap().is_none()); + + // Should be in history as failed. + let hist = store.failed_tasks(10).await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].status, HistoryStatus::Failed); + assert_eq!(hist[0].last_error.as_deref(), Some("fatal error")); + } + + // ── Phase 5: Dead-letter state ────────────────────────────────── + + #[tokio::test] + async fn exhausted_retries_produce_dead_letter_status() { + let store = test_store().await; + let sub = make_submission("dl-exhausted", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + + // First failure: retry_count=0, max_retries=1 → requeue. + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.retry_count, 0); + store + .fail( + task.id, + "transient", + true, + 1, + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + // Second failure: retry_count=1, max_retries=1 → exhausted → dead_letter. + let task = store.pop_next().await.unwrap().unwrap(); + assert_eq!(task.retry_count, 1); + store + .fail( + task.id, + "still transient", + true, + 1, + &IoBudget::disk(100, 50), + &FailBackoff::default(), + ) + .await + .unwrap(); + + // Should be gone from active queue. + assert!(store.task_by_key(&key).await.unwrap().is_none()); + + // Should be in history as dead_letter (not failed). + let hist = store.history_by_key(&key).await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].status, HistoryStatus::DeadLetter); + assert_eq!(hist[0].last_error.as_deref(), Some("still transient")); + assert_eq!(hist[0].retry_count, 2); // retry_count incremented + } + + #[tokio::test] + async fn non_retryable_error_still_produces_failed_status() { + let store = test_store().await; + let sub = make_submission("dl-permanent", Priority::NORMAL); + let key = sub.effective_key(); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + // Non-retryable error with remaining retries → should be "failed", not "dead_letter". + store + .fail( + task.id, + "permanent error", + false, + 3, + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + assert!(store.task_by_key(&key).await.unwrap().is_none()); + + let hist = store.history_by_key(&key).await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].status, HistoryStatus::Failed); + + // Should NOT appear in dead_letter_tasks query. + let dl = store.dead_letter_tasks(10, 0).await.unwrap(); + assert!(dl.is_empty()); + } + + #[tokio::test] + async fn dead_letter_tasks_query_returns_only_dead_lettered() { + let store = test_store().await; + + // Create a dead-lettered task (retryable, exhausted). + let sub_dl = make_submission("dl-query-dl", Priority::NORMAL); + store.submit(&sub_dl).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store + .fail( + task.id, + "transient", + true, + 0, // max_retries=0 → immediately exhausted + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + // Create a failed task (non-retryable). + let sub_fail = make_submission("dl-query-fail", Priority::NORMAL); + store.submit(&sub_fail).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store + .fail( + task.id, + "permanent", + false, + 3, + &IoBudget::default(), + &FailBackoff::default(), + ) + .await + .unwrap(); + + // Create a completed task. + let sub_ok = make_submission("dl-query-ok", Priority::NORMAL); + store.submit(&sub_ok).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store.complete(task.id, &IoBudget::default()).await.unwrap(); + + // dead_letter_tasks should return only the dead-lettered one. + let dl = store.dead_letter_tasks(10, 0).await.unwrap(); + assert_eq!(dl.len(), 1); + assert_eq!(dl[0].status, HistoryStatus::DeadLetter); + assert_eq!(dl[0].task_type, "test"); + + // failed_tasks should return only the permanently failed one. + let failed = store.failed_tasks(10).await.unwrap(); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0].status, HistoryStatus::Failed); + } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 29cacdd..cb5f1f8 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -28,6 +28,8 @@ mod query; pub(crate) mod row_mapping; mod submit; +pub use lifecycle::FailBackoff; + use std::sync::atomic::{AtomicU64, Ordering}; use serde::{Deserialize, Serialize}; @@ -56,6 +58,10 @@ pub enum StoreError { CyclicDependency, #[error("invalid tag: {0}")] InvalidTag(String), + #[error("not found: {0}")] + NotFound(String), + #[error("invalid state: {0}")] + InvalidState(String), } impl From for StoreError { @@ -249,6 +255,11 @@ impl TaskStore { .await?; Self::run_alter_migration(&self.pool, include_str!("../../migrations/007_tags.sql")) .await?; + Self::run_alter_migration( + &self.pool, + include_str!("../../migrations/008_retry_backoff.sql"), + ) + .await?; Ok(()) } @@ -421,6 +432,23 @@ impl TaskStore { self.pool.close().await; } + /// Delete a history record by id. Returns true if a row was deleted. + /// + /// Also removes associated history tags. + pub async fn delete_history(&self, history_id: i64) -> Result { + let mut conn = self.begin_write().await?; + sqlx::query("DELETE FROM task_history_tags WHERE history_rowid = ?") + .bind(history_id) + .execute(&mut *conn) + .await?; + let result = sqlx::query("DELETE FROM task_history WHERE id = ?") + .bind(history_id) + .execute(&mut *conn) + .await?; + sqlx::query("COMMIT").execute(&mut *conn).await?; + Ok(result.rows_affected() > 0) + } + /// Delete a task from the active queue by id. Returns true if a row was deleted. pub async fn delete(&self, id: i64) -> Result { let mut conn = self.begin_write().await?; diff --git a/src/store/query.rs b/src/store/query.rs index 05e8041..fcb843a 100644 --- a/src/store/query.rs +++ b/src/store/query.rs @@ -268,6 +268,24 @@ impl TaskStore { Ok(records) } + /// Dead-lettered tasks from history (retries exhausted). + pub async fn dead_letter_tasks( + &self, + limit: i64, + offset: i64, + ) -> Result, StoreError> { + let rows = sqlx::query( + "SELECT * FROM task_history WHERE status = 'dead_letter' ORDER BY completed_at DESC LIMIT ? OFFSET ?", + ) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + let mut records: Vec = rows.iter().map(row_to_history_record).collect(); + self.populate_history_tags(&mut records).await?; + Ok(records) + } + /// Failed tasks from history. pub async fn failed_tasks(&self, limit: i32) -> Result, StoreError> { let rows = sqlx::query( @@ -339,7 +357,7 @@ impl TaskStore { /// This is the low-level building block for [`Scheduler::task_lookup`](crate::Scheduler::task_lookup). /// The `key` parameter is the pre-computed SHA-256 dedup key (as /// returned by [`generate_dedup_key`](crate::task::generate_dedup_key) - /// or [`TaskSubmission::effective_key`]). + /// or `TaskSubmission::effective_key`). pub async fn task_lookup(&self, key: &str) -> Result { // Check active queue first (pending / running / paused). // task_by_key already populates tags. diff --git a/src/store/row_mapping.rs b/src/store/row_mapping.rs index ba20a2a..e6d0db1 100644 --- a/src/store/row_mapping.rs +++ b/src/store/row_mapping.rs @@ -10,8 +10,11 @@ use crate::task::{ }; pub(crate) fn parse_datetime(s: &str) -> DateTime { - // SQLite stores as "YYYY-MM-DD HH:MM:SS". Parse with chrono. - chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") + // SQLite stores as "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM:SS.mmm" + // (the latter from backoff-computed run_after). Try with fractional seconds + // first, then fall back to whole-second precision. + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")) .map(|ndt| ndt.and_utc()) .unwrap_or_default() } @@ -71,6 +74,7 @@ pub(crate) fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { .unwrap_or(DependencyFailurePolicy::Cancel), // Tags are populated separately from the task_tags table. tags: std::collections::HashMap::new(), + max_retries: row.get("max_retries"), } } @@ -129,5 +133,6 @@ pub(crate) fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistor run_after: run_after_str.map(|s| parse_datetime(&s)), // Tags are populated separately from the task_history_tags table. tags: std::collections::HashMap::new(), + max_retries: row.get("max_retries"), } } diff --git a/src/store/submit.rs b/src/store/submit.rs index 4eaf06e..2ee738a 100644 --- a/src/store/submit.rs +++ b/src/store/submit.rs @@ -94,8 +94,8 @@ pub(crate) async fn submit_one( let on_dep_failure_str = sub.on_dependency_failure.as_str(); let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key, ttl_seconds, ttl_from, expires_at, run_after, recurring_interval_secs, recurring_max_executions, on_dep_failure) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key, ttl_seconds, ttl_from, expires_at, run_after, recurring_interval_secs, recurring_max_executions, on_dep_failure, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -116,6 +116,7 @@ pub(crate) async fn submit_one( .bind(recurring_interval_secs) .bind(recurring_max_executions) .bind(on_dep_failure_str) + .bind(sub.max_retries) .execute(&mut **conn) .await?; @@ -267,7 +268,7 @@ pub(crate) async fn supersede_existing( expected_net_rx_bytes = ?, expected_net_tx_bytes = ?, retry_count = 0, last_error = NULL, status = 'pending', requeue = 0, requeue_priority = NULL, fail_fast = ?, group_key = ?, - ttl_seconds = ?, ttl_from = ?, expires_at = ? + ttl_seconds = ?, ttl_from = ?, expires_at = ?, max_retries = ? WHERE id = ?", ) .bind(&sub.label) @@ -282,6 +283,7 @@ pub(crate) async fn supersede_existing( .bind(ttl_seconds) .bind(ttl_from_str) .bind(&expires_at) + .bind(sub.max_retries) .bind(replaced_id) .execute(&mut **conn) .await?; @@ -307,8 +309,8 @@ pub(crate) async fn supersede_existing( "INSERT INTO tasks (task_type, key, label, priority, payload, expected_read_bytes, expected_write_bytes, expected_net_rx_bytes, expected_net_tx_bytes, parent_id, fail_fast, group_key, - ttl_seconds, ttl_from, expires_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ttl_seconds, ttl_from, expires_at, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(key) @@ -325,6 +327,7 @@ pub(crate) async fn supersede_existing( .bind(ttl_seconds) .bind(ttl_from_str) .bind(&expires_at) + .bind(sub.max_retries) .execute(&mut **conn) .await?; @@ -677,7 +680,14 @@ mod tests { store.submit(&sub_high).await.unwrap(); store - .fail(task.id, "boom", false, 0, &IoBudget::default()) + .fail( + task.id, + "boom", + false, + 0, + &IoBudget::default(), + &Default::default(), + ) .await .unwrap(); diff --git a/src/task/error.rs b/src/task/error.rs index 48f80ff..77a675b 100644 --- a/src/task/error.rs +++ b/src/task/error.rs @@ -1,5 +1,7 @@ //! Task error types for executor failure reporting. +use std::time::Duration; + use serde::{Deserialize, Serialize}; /// Reported by the executor on failure. @@ -7,13 +9,16 @@ use serde::{Deserialize, Serialize}; /// The scheduler uses the [`retryable`](Self::retryable) flag to decide /// whether to requeue the task or move it to history as permanently failed: /// -/// - **Non-retryable** ([`TaskError::new`]): the task moves directly to the -/// history table with status `failed`. Use this for logic errors, invalid -/// payloads, or conditions that won't change on retry. +/// - **Non-retryable** ([`TaskError::new`] / [`TaskError::permanent`]): the +/// task moves directly to the history table with status `failed`. Use this +/// for logic errors, invalid payloads, or conditions that won't change on retry. /// - **Retryable** ([`TaskError::retryable`]): the task is requeued as /// `pending` with an incremented retry count, keeping the same priority. /// After [`SchedulerConfig::max_retries`](crate::SchedulerConfig::max_retries) /// attempts (default 3), the task fails permanently. +/// +/// Retryable errors can optionally include a [`retry_after`](Self::retry_after) +/// delay to override the backoff strategy's computed delay for a single retry. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskError { pub message: String, @@ -21,6 +26,11 @@ pub struct TaskError { /// Whether this error represents a cancellation (not a real failure). #[serde(default)] pub cancelled: bool, + /// Executor-requested retry delay in milliseconds. When set, overrides the + /// backoff strategy's computed delay for this single retry. Useful for + /// respecting `Retry-After` headers from upstream services. + #[serde(default)] + pub retry_after_ms: Option, } impl TaskError { @@ -31,9 +41,19 @@ impl TaskError { message: message.into(), retryable: false, cancelled: false, + retry_after_ms: None, } } + /// Create a **permanent** (non-retryable) error. The task will fail + /// immediately and move to history, skipping any remaining retries. + /// + /// Alias for [`TaskError::new`] — use whichever reads better at the + /// call site. + pub fn permanent(message: impl Into) -> Self { + Self::new(message) + } + /// Create a **retryable** error. The task will be requeued as pending /// and retried up to [`SchedulerConfig::max_retries`](crate::SchedulerConfig::max_retries) /// times before failing permanently. @@ -42,6 +62,7 @@ impl TaskError { message: message.into(), retryable: true, cancelled: false, + retry_after_ms: None, } } @@ -56,6 +77,7 @@ impl TaskError { message: "task cancelled".into(), retryable: false, cancelled: true, + retry_after_ms: None, } } @@ -63,6 +85,19 @@ impl TaskError { pub fn is_cancelled(&self) -> bool { self.cancelled } + + /// Request that the scheduler wait at least `delay` before retrying + /// this task. Overrides the type's backoff strategy for this single + /// retry attempt. + pub fn retry_after(mut self, delay: Duration) -> Self { + self.retry_after_ms = Some(delay.as_millis() as u64); + self + } + + /// Returns the executor-requested retry delay, if any. + pub fn retry_delay(&self) -> Option { + self.retry_after_ms.map(Duration::from_millis) + } } impl std::fmt::Display for TaskError { @@ -79,6 +114,7 @@ impl From for TaskError { message, retryable: false, cancelled: false, + retry_after_ms: None, } } } @@ -89,6 +125,7 @@ impl From<&str> for TaskError { message: message.to_string(), retryable: false, cancelled: false, + retry_after_ms: None, } } } @@ -99,6 +136,7 @@ impl From for TaskError { message: e.to_string(), retryable: false, cancelled: false, + retry_after_ms: None, } } } @@ -109,6 +147,7 @@ impl From for TaskError { message: e.to_string(), retryable: false, cancelled: false, + retry_after_ms: None, } } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 22d7e3e..a2fc938 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -21,6 +21,8 @@ //! - `Completed` / `Failed` / `Cancelled` / `Superseded` / `Expired` — standard outcomes //! - [`DependencyFailed`](HistoryStatus::DependencyFailed) — task was cancelled //! because a dependency failed, per its [`DependencyFailurePolicy`] +//! - [`DeadLetter`](HistoryStatus::DeadLetter) — retries exhausted; task may +//! succeed if manually re-submitted //! //! Submit tasks via [`Scheduler::submit`](crate::Scheduler::submit), //! [`Scheduler::submit_typed`](crate::Scheduler::submit_typed), or @@ -30,6 +32,7 @@ pub mod dedup; mod error; +pub mod retry; mod submission; #[cfg(test)] mod tests; @@ -44,6 +47,7 @@ use crate::priority::Priority; pub use dedup::{generate_dedup_key, MAX_PAYLOAD_BYTES}; pub use error::TaskError; +pub use retry::{BackoffStrategy, RetryPolicy}; pub use submission::{ BatchOutcome, BatchSubmission, DependencyFailurePolicy, DuplicateStrategy, RecurringSchedule, SubmitOutcome, TaskSubmission, MAX_TAGS_PER_TASK, MAX_TAG_KEY_LEN, MAX_TAG_VALUE_LEN, @@ -137,8 +141,15 @@ pub enum HistoryStatus { Superseded, Expired, /// A dependency failed and this task was auto-cancelled per its - /// [`DependencyFailurePolicy`](crate::DependencyFailurePolicy). + /// [`DependencyFailurePolicy`]. DependencyFailed, + /// Retries exhausted — the task failed with a retryable error but has + /// reached its `max_retries` limit. Unlike `Failed` (permanent/non-retryable + /// error), dead-lettered tasks *might* succeed if retried later. + /// + /// Query with [`Scheduler::dead_letter_tasks`](crate::Scheduler::dead_letter_tasks) + /// and re-submit with [`Scheduler::retry_dead_letter`](crate::Scheduler::retry_dead_letter). + DeadLetter, } impl HistoryStatus { @@ -150,6 +161,7 @@ impl HistoryStatus { Self::Superseded => "superseded", Self::Expired => "expired", Self::DependencyFailed => "dependency_failed", + Self::DeadLetter => "dead_letter", } } } @@ -165,6 +177,7 @@ impl std::str::FromStr for HistoryStatus { "superseded" => Ok(Self::Superseded), "expired" => Ok(Self::Expired), "dependency_failed" => Ok(Self::DependencyFailed), + "dead_letter" => Ok(Self::DeadLetter), other => Err(format!("unknown HistoryStatus: {other}")), } } @@ -224,6 +237,10 @@ pub struct TaskRecord { pub on_dependency_failure: DependencyFailurePolicy, /// Key-value metadata tags for filtering, grouping, and display. pub tags: HashMap, + /// Per-task retry limit. `None` means use global default (backward compat + /// with pre-migration tasks). Resolved at submit time from: per-type + /// retry policy → global `SchedulerConfig::max_retries`. + pub max_retries: Option, } impl TaskRecord { @@ -288,6 +305,8 @@ pub struct TaskHistoryRecord { pub run_after: Option>, /// Key-value metadata tags for filtering, grouping, and display. pub tags: HashMap, + /// Per-task retry limit. `None` means use global default. + pub max_retries: Option, } /// IO budget for a task: expected or actual disk and network IO bytes. diff --git a/src/task/retry.rs b/src/task/retry.rs new file mode 100644 index 0000000..c9ed897 --- /dev/null +++ b/src/task/retry.rs @@ -0,0 +1,225 @@ +//! Backoff strategies and retry policies for task failure handling. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +/// Backoff strategy for retryable task failures. +/// +/// Controls the delay between consecutive retries. The computed delay is +/// applied via the existing `run_after` mechanism — the task stays `pending` +/// but is invisible to the dispatch loop until the delay elapses. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BackoffStrategy { + /// Fixed delay between every retry. + Constant { delay: Duration }, + /// Linearly increasing delay: `initial + (retry_count * increment)`. + Linear { + initial: Duration, + increment: Duration, + max: Duration, + }, + /// Exponential delay: `initial * multiplier^retry_count`, capped at `max`. + Exponential { + initial: Duration, + max: Duration, + multiplier: f64, + }, + /// Exponential with full jitter: uniform random in `[0, computed_delay]`. + /// + /// Recommended for tasks hitting the same endpoint — decorrelates retry + /// storms. Uses full jitter per AWS architecture blog recommendations. + ExponentialJitter { + initial: Duration, + max: Duration, + multiplier: f64, + }, +} + +impl BackoffStrategy { + /// Compute the delay for a given retry attempt (0-indexed: first retry = 0). + pub fn delay_for(&self, retry_count: i32) -> Duration { + match self { + Self::Constant { delay } => *delay, + Self::Linear { + initial, + increment, + max, + } => { + let d = *initial + *increment * retry_count as u32; + d.min(*max) + } + Self::Exponential { + initial, + max, + multiplier, + } => { + let d = initial.mul_f64(multiplier.powi(retry_count)); + d.min(*max) + } + Self::ExponentialJitter { + initial, + max, + multiplier, + } => { + let base = initial.mul_f64(multiplier.powi(retry_count)).min(*max); + let base_ms = base.as_millis() as u64; + if base_ms == 0 { + return Duration::ZERO; + } + Duration::from_millis(fastrand::u64(0..=base_ms)) + } + } + } +} + +impl Default for BackoffStrategy { + /// No backoff — immediate retry (preserves current behavior). + fn default() -> Self { + Self::Constant { + delay: Duration::ZERO, + } + } +} + +/// Per-task-type retry policy combining a backoff strategy with a retry limit. +/// +/// Register via [`SchedulerBuilder::executor_with_retry_policy`](crate::SchedulerBuilder) +/// to apply to all tasks of a given type. Tasks without a per-type policy fall +/// back to the global [`SchedulerConfig::max_retries`](crate::SchedulerConfig::max_retries) +/// with no backoff delay. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RetryPolicy { + pub strategy: BackoffStrategy, + pub max_retries: i32, +} + +impl Default for RetryPolicy { + fn default() -> Self { + Self { + strategy: BackoffStrategy::default(), + max_retries: 3, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn constant_returns_fixed_delay() { + let strategy = BackoffStrategy::Constant { + delay: Duration::from_secs(5), + }; + assert_eq!(strategy.delay_for(0), Duration::from_secs(5)); + assert_eq!(strategy.delay_for(1), Duration::from_secs(5)); + assert_eq!(strategy.delay_for(5), Duration::from_secs(5)); + assert_eq!(strategy.delay_for(10), Duration::from_secs(5)); + } + + #[test] + fn linear_increases_by_increment() { + let strategy = BackoffStrategy::Linear { + initial: Duration::from_secs(1), + increment: Duration::from_secs(2), + max: Duration::from_secs(20), + }; + assert_eq!(strategy.delay_for(0), Duration::from_secs(1)); + assert_eq!(strategy.delay_for(1), Duration::from_secs(3)); + assert_eq!(strategy.delay_for(5), Duration::from_secs(11)); + } + + #[test] + fn linear_clamps_at_max() { + let strategy = BackoffStrategy::Linear { + initial: Duration::from_secs(1), + increment: Duration::from_secs(2), + max: Duration::from_secs(10), + }; + // retry_count=10 -> 1 + 2*10 = 21, clamped to 10 + assert_eq!(strategy.delay_for(10), Duration::from_secs(10)); + } + + #[test] + fn exponential_doubles_by_default() { + let strategy = BackoffStrategy::Exponential { + initial: Duration::from_secs(1), + max: Duration::from_secs(3600), + multiplier: 2.0, + }; + assert_eq!(strategy.delay_for(0), Duration::from_secs(1)); + assert_eq!(strategy.delay_for(1), Duration::from_secs(2)); + assert_eq!(strategy.delay_for(5), Duration::from_secs(32)); + assert_eq!(strategy.delay_for(10), Duration::from_secs(1024)); + } + + #[test] + fn exponential_clamps_at_max() { + let strategy = BackoffStrategy::Exponential { + initial: Duration::from_secs(1), + max: Duration::from_secs(60), + multiplier: 2.0, + }; + // retry_count=10 -> 1 * 2^10 = 1024, clamped to 60 + assert_eq!(strategy.delay_for(10), Duration::from_secs(60)); + } + + #[test] + fn exponential_jitter_within_bounds() { + let strategy = BackoffStrategy::ExponentialJitter { + initial: Duration::from_secs(1), + max: Duration::from_secs(3600), + multiplier: 2.0, + }; + // Run multiple times to check bounds (jitter is random). + for _ in 0..100 { + let d = strategy.delay_for(0); + assert!(d <= Duration::from_secs(1), "got {:?}", d); + + let d = strategy.delay_for(1); + assert!(d <= Duration::from_secs(2), "got {:?}", d); + + let d = strategy.delay_for(5); + assert!(d <= Duration::from_secs(32), "got {:?}", d); + } + } + + #[test] + fn exponential_jitter_clamps_at_max() { + let strategy = BackoffStrategy::ExponentialJitter { + initial: Duration::from_secs(1), + max: Duration::from_secs(60), + multiplier: 2.0, + }; + for _ in 0..100 { + let d = strategy.delay_for(10); + assert!(d <= Duration::from_secs(60), "got {:?}", d); + } + } + + #[test] + fn exponential_jitter_zero_initial() { + let strategy = BackoffStrategy::ExponentialJitter { + initial: Duration::ZERO, + max: Duration::from_secs(60), + multiplier: 2.0, + }; + assert_eq!(strategy.delay_for(0), Duration::ZERO); + assert_eq!(strategy.delay_for(5), Duration::ZERO); + } + + #[test] + fn default_strategy_is_zero_constant() { + let strategy = BackoffStrategy::default(); + assert_eq!(strategy.delay_for(0), Duration::ZERO); + assert_eq!(strategy.delay_for(10), Duration::ZERO); + } + + #[test] + fn default_retry_policy() { + let policy = RetryPolicy::default(); + assert_eq!(policy.max_retries, 3); + assert_eq!(policy.strategy.delay_for(0), Duration::ZERO); + } +} diff --git a/src/task/submission.rs b/src/task/submission.rs index 60b7e0c..21653b2 100644 --- a/src/task/submission.rs +++ b/src/task/submission.rs @@ -425,6 +425,11 @@ pub struct TaskSubmission { /// [`MAX_TAG_VALUE_LEN`], and [`MAX_TAGS_PER_TASK`] at submit time. #[serde(default)] pub tags: HashMap, + /// Per-task retry limit, resolved at submit time from the per-type retry + /// policy or the global `SchedulerConfig::max_retries`. `None` means the + /// global default will be used at retry time (backward compatibility). + #[serde(default)] + pub max_retries: Option, } impl TaskSubmission { @@ -464,6 +469,7 @@ impl TaskSubmission { dependencies: Vec::new(), on_dependency_failure: DependencyFailurePolicy::default(), tags: HashMap::new(), + max_retries: None, } } @@ -633,6 +639,13 @@ impl TaskSubmission { self } + /// Set the per-task retry limit. When set, this value is persisted and + /// used instead of the global `SchedulerConfig::max_retries`. + pub fn max_retries(mut self, max_retries: i32) -> Self { + self.max_retries = Some(max_retries); + self + } + /// Make this a recurring task with full schedule control. pub fn recurring_schedule(mut self, schedule: RecurringSchedule) -> Self { if let Some(delay) = schedule.initial_delay { diff --git a/src/task/tests.rs b/src/task/tests.rs index 4a8f96b..7205506 100644 --- a/src/task/tests.rs +++ b/src/task/tests.rs @@ -240,6 +240,7 @@ fn event_header_includes_tags() { tags: HashMap::new(), dependencies: Vec::new(), on_dependency_failure: super::submission::DependencyFailurePolicy::Cancel, + max_retries: None, }; record.tags.insert("env".into(), "prod".into()); record.tags.insert("owner".into(), "alice".into()); diff --git a/tests/integration.rs b/tests/integration.rs index 91529a5..ee75828 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -325,16 +325,10 @@ async fn retryable_error_exhausts_retries() { sched_clone.run(token_clone).await; }); - // Wait for permanent failure (will_retry = false). + // Wait for dead-letter event (retries exhausted). let deadline = tokio::time::Instant::now() + Duration::from_secs(5); - let failed = wait_for_event(&mut rx, deadline, |evt| { - matches!( - evt, - SchedulerEvent::Failed { - will_retry: false, - .. - } - ) + let dead_lettered = wait_for_event(&mut rx, deadline, |evt| { + matches!(evt, SchedulerEvent::DeadLettered { .. }) }) .await; @@ -342,8 +336,8 @@ async fn retryable_error_exhausts_retries() { let _ = handle.await; assert!( - failed.is_some(), - "task should permanently fail after retries exhausted" + dead_lettered.is_some(), + "task should be dead-lettered after retries exhausted" ); } @@ -1264,7 +1258,14 @@ async fn dep_fail_cancels_dependent() { // Fail A permanently. let a = store.pop_next().await.unwrap().unwrap(); store - .fail(a.id, "boom", false, 0, &taskmill::IoBudget::default()) + .fail( + a.id, + "boom", + false, + 0, + &taskmill::IoBudget::default(), + &Default::default(), + ) .await .unwrap(); @@ -1427,7 +1428,14 @@ async fn dep_already_failed() { let a = store.pop_next().await.unwrap().unwrap(); store - .fail(a.id, "boom", false, 0, &taskmill::IoBudget::default()) + .fail( + a.id, + "boom", + false, + 0, + &taskmill::IoBudget::default(), + &Default::default(), + ) .await .unwrap(); @@ -1493,7 +1501,14 @@ async fn dep_ignore_policy_unblocks() { // Fail A permanently. let a = store.pop_next().await.unwrap().unwrap(); store - .fail(a.id, "boom", false, 0, &taskmill::IoBudget::default()) + .fail( + a.id, + "boom", + false, + 0, + &taskmill::IoBudget::default(), + &Default::default(), + ) .await .unwrap(); @@ -1729,3 +1744,368 @@ async fn dep_blocked_tasks_survive_across_store_reopen() { let b = store.task_by_id(id_b).await.unwrap().unwrap(); assert_eq!(b.status, taskmill::TaskStatus::Pending); } + +// ═══════════════════════════════════════════════════════════════════ +// Phase 6: Dispatch Loop — Adaptive Retry Integration +// ═══════════════════════════════════════════════════════════════════ + +/// Always fails with a retryable error. +struct AlwaysRetryableExecutor; + +impl TaskExecutor for AlwaysRetryableExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Err(TaskError::retryable("transient")) + } +} + +/// Fails with a retryable error and requests a specific retry delay. +struct RetryAfterExecutor(Duration); + +impl TaskExecutor for RetryAfterExecutor { + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Err(TaskError::retryable("rate limited").retry_after(self.0)) + } +} + +/// 6.5: Per-type retry policy overrides global default. +/// +/// Type A has a per-type policy with max_retries=5. Type B uses the global +/// default (max_retries=3). Both fail retryably. A should exhaust 5 retries, +/// B should exhaust 3 retries. +#[tokio::test] +async fn per_type_retry_policy_overrides_global_default() { + use taskmill::{BackoffStrategy, RetryPolicy}; + + let policy_a = RetryPolicy { + strategy: BackoffStrategy::Constant { + delay: Duration::ZERO, + }, + max_retries: 5, + }; + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .executor_with_retry_policy("type-a", Arc::new(AlwaysRetryableExecutor), policy_a) + .executor("type-b", Arc::new(AlwaysRetryableExecutor)) + .max_retries(3) + .max_concurrency(2) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let handle = tokio::spawn({ + let s = sched.clone(); + let t = token.clone(); + async move { s.run(t).await } + }); + + sched + .submit(&TaskSubmission::new("type-a").key("a1")) + .await + .unwrap(); + sched + .submit(&TaskSubmission::new("type-b").key("b1")) + .await + .unwrap(); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + let mut dead_a = false; + let mut dead_b = false; + let mut a_retry_count = 0i32; + let mut b_retry_count = 0i32; + + while tokio::time::Instant::now() < deadline && !(dead_a && dead_b) { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::DeadLettered { + header, + retry_count, + .. + })) => { + if header.task_type == "type-a" { + dead_a = true; + a_retry_count = retry_count; + } else if header.task_type == "type-b" { + dead_b = true; + b_retry_count = retry_count; + } + } + _ => continue, + } + } + + token.cancel(); + let _ = handle.await; + + assert!(dead_a, "type-a should be dead-lettered"); + assert!(dead_b, "type-b should be dead-lettered"); + // The DeadLettered event reports task.retry_count + 1 where task.retry_count + // is the value when the task was popped for its final (failing) attempt. + // max_retries=5: retries at counts 0..4, dead-letters when popped at count=5. + // Event: 5 + 1 = 6. + assert_eq!( + a_retry_count, 6, + "type-a: 5 retries + final attempt = retry_count 6" + ); + // max_retries=3: retries at counts 0..2, dead-letters when popped at count=3. + // Event: 3 + 1 = 4. + assert_eq!( + b_retry_count, 4, + "type-b: 3 retries + final attempt = retry_count 4" + ); +} + +/// 6.6: Exponential backoff delays task re-dispatch. +/// +/// A task with exponential backoff (initial=200ms, multiplier=2) should not be +/// re-dispatched until the delay elapses. We verify that the gaps between +/// dispatches grow according to the backoff schedule. +#[tokio::test] +async fn exponential_backoff_delays_redispatch() { + use taskmill::{BackoffStrategy, RetryPolicy}; + + let policy = RetryPolicy { + strategy: BackoffStrategy::Exponential { + initial: Duration::from_millis(200), + max: Duration::from_secs(10), + multiplier: 2.0, + }, + max_retries: 3, + }; + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .executor_with_retry_policy("backoff-test", Arc::new(AlwaysRetryableExecutor), policy) + .max_concurrency(1) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let handle = tokio::spawn({ + let s = sched.clone(); + let t = token.clone(); + async move { s.run(t).await } + }); + + sched + .submit(&TaskSubmission::new("backoff-test").key("bk1")) + .await + .unwrap(); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + let mut dispatch_times: Vec = Vec::new(); + let mut done = false; + + while tokio::time::Instant::now() < deadline && !done { + match tokio::time::timeout(Duration::from_millis(50), rx.recv()).await { + Ok(Ok(SchedulerEvent::Dispatched(_))) => { + dispatch_times.push(tokio::time::Instant::now()); + } + Ok(Ok(SchedulerEvent::DeadLettered { .. })) => { + done = true; + } + _ => continue, + } + } + + token.cancel(); + let _ = handle.await; + + assert!(done, "task should eventually dead-letter"); + // 4 dispatches: initial + 3 retries. + assert!( + dispatch_times.len() >= 3, + "expected at least 3 dispatches, got {}", + dispatch_times.len() + ); + + // Gap between dispatch 1→2 should be ≥150ms (backoff=200ms, allow some slack). + if dispatch_times.len() >= 2 { + let gap = dispatch_times[1] - dispatch_times[0]; + assert!( + gap >= Duration::from_millis(150), + "first retry gap should be >=150ms (backoff 200ms), got {:?}", + gap + ); + } + // Gap between dispatch 2→3 should be ≥300ms (backoff=400ms=200*2^1). + if dispatch_times.len() >= 3 { + let gap = dispatch_times[2] - dispatch_times[1]; + assert!( + gap >= Duration::from_millis(300), + "second retry gap should be >=300ms (backoff 400ms), got {:?}", + gap + ); + } +} + +/// 6.7: `SchedulerEvent::Failed` includes correct `retry_after` duration. +#[tokio::test] +async fn failed_event_includes_retry_after_duration() { + use taskmill::{BackoffStrategy, RetryPolicy}; + + let policy = RetryPolicy { + strategy: BackoffStrategy::Constant { + delay: Duration::from_secs(5), + }, + max_retries: 2, + }; + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .executor_with_retry_policy("retry-event", Arc::new(AlwaysRetryableExecutor), policy) + .max_concurrency(1) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let handle = tokio::spawn({ + let s = sched.clone(); + let t = token.clone(); + async move { s.run(t).await } + }); + + sched + .submit(&TaskSubmission::new("retry-event").key("re1")) + .await + .unwrap(); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut found_retry_after = None; + + while tokio::time::Instant::now() < deadline && found_retry_after.is_none() { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Failed { + will_retry: true, + retry_after, + .. + })) => { + found_retry_after = Some(retry_after); + } + _ => continue, + } + } + + token.cancel(); + let _ = handle.await; + + let retry_after = + found_retry_after.expect("should receive a Failed event with will_retry=true"); + let delay = retry_after.expect("retry_after should be Some for constant 5s backoff"); + assert_eq!(delay, Duration::from_secs(5)); +} + +/// 6.7b: Executor `retry_after` override appears in the Failed event. +#[tokio::test] +async fn failed_event_includes_executor_retry_after_override() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .executor( + "retry-override", + Arc::new(RetryAfterExecutor(Duration::from_secs(42))), + ) + .max_retries(3) + .max_concurrency(1) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let handle = tokio::spawn({ + let s = sched.clone(); + let t = token.clone(); + async move { s.run(t).await } + }); + + sched + .submit(&TaskSubmission::new("retry-override").key("ro1")) + .await + .unwrap(); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut found_retry_after = None; + + while tokio::time::Instant::now() < deadline && found_retry_after.is_none() { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Failed { + will_retry: true, + retry_after, + .. + })) => { + found_retry_after = Some(retry_after); + } + _ => continue, + } + } + + token.cancel(); + let _ = handle.await; + + let retry_after = + found_retry_after.expect("should receive a Failed event with will_retry=true"); + let delay = retry_after.expect("retry_after should be Some with executor override"); + assert_eq!(delay, Duration::from_secs(42)); +} + +/// 6.8: Backward compat — tasks with NULL `max_retries` use global default. +/// +/// Tasks submitted without a per-type policy get NULL max_retries in the DB. +/// The dispatch loop should fall back to the global `SchedulerConfig::max_retries`. +#[tokio::test] +async fn null_max_retries_uses_global_default() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .executor("legacy", Arc::new(AlwaysRetryableExecutor)) + .max_retries(2) + .max_concurrency(1) + .poll_interval(Duration::from_millis(50)) + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + let token = CancellationToken::new(); + let handle = tokio::spawn({ + let s = sched.clone(); + let t = token.clone(); + async move { s.run(t).await } + }); + + sched + .submit(&TaskSubmission::new("legacy").key("leg1")) + .await + .unwrap(); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut dead_letter_retry_count = None; + + while tokio::time::Instant::now() < deadline && dead_letter_retry_count.is_none() { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::DeadLettered { retry_count, .. })) => { + dead_letter_retry_count = Some(retry_count); + } + _ => continue, + } + } + + token.cancel(); + let _ = handle.await; + + let count = dead_letter_retry_count.expect("task should be dead-lettered"); + // max_retries=2: retries at counts 0,1, dead-letters at count=2. + // Event: 2 + 1 = 3. + assert_eq!( + count, 3, + "dead-letter should report retry_count=3 (2 retries + final attempt)" + ); +}