From 0125f0f7f50938cf5690407777a704b4ffbbd861 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 20:07:45 -0700 Subject: [PATCH 1/3] refactor!: simplify executor API with incremental IO tracking Replace `TaskResult` return value with `Result<(), TaskError>` and introduce `IoTracker` for incremental byte reporting via `ctx.record_read_bytes()`/`ctx.record_write_bytes()`. Rename `TaskResult` to `TaskMetrics`, simplify `TaskError` with convenience constructors, add `ctx.payload::()` helper (deprecating `deserialize_typed`), expose `scheduler` on `TaskContext`, and rename `TaskSubmission::key` to `dedup_key` for clarity. --- src/lib.rs | 12 ++-- src/registry.rs | 112 +++++++++++++++++++++++++++++------- src/scheduler/dispatch.rs | 26 ++++++--- src/scheduler/mod.rs | 116 +++++++++++++++----------------------- src/scheduler/progress.rs | 4 +- src/store.rs | 94 ++++++++++-------------------- src/task.rs | 81 ++++++++++++++++++-------- 7 files changed, 254 insertions(+), 191 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2b67f46..9fdc92a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ //! ```no_run //! use std::sync::Arc; //! use taskmill::{ -//! Scheduler, TaskExecutor, TaskContext, TaskResult, TaskError, +//! Scheduler, TaskExecutor, TaskContext, TaskError, //! TypedTask, Priority, //! }; //! use serde::{Serialize, Deserialize}; @@ -42,11 +42,13 @@ //! impl TaskExecutor for ThumbnailExecutor { //! async fn execute<'a>( //! &'a self, ctx: &'a TaskContext, -//! ) -> Result { -//! let thumb: Thumbnail = ctx.deserialize_typed().unwrap().unwrap(); +//! ) -> Result<(), TaskError> { +//! let thumb: Thumbnail = ctx.payload()?; //! ctx.progress.report(0.5, Some("resizing".into())); //! // ... do work, check ctx.token.is_cancelled() ... -//! Ok(TaskResult { actual_read_bytes: 4_096, actual_write_bytes: 1_024 }) +//! ctx.record_read_bytes(4_096); +//! ctx.record_write_bytes(1_024); +//! Ok(()) //! } //! } //! @@ -98,7 +100,7 @@ pub use scheduler::{ pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ generate_dedup_key, HistoryStatus, ParentResolution, SubmitOutcome, TaskError, - TaskHistoryRecord, TaskLookup, TaskRecord, TaskResult, TaskStatus, TaskSubmission, TypeStats, + TaskHistoryRecord, TaskLookup, TaskMetrics, TaskRecord, TaskStatus, TaskSubmission, TypeStats, TypedTask, }; diff --git a/src/registry.rs b/src/registry.rs index 1c73b85..3d7a403 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -9,14 +9,15 @@ use std::any::{Any, TypeId}; use std::collections::HashMap; use std::future::Future; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; -use crate::scheduler::ProgressReporter; +use crate::scheduler::{ProgressReporter, Scheduler}; use crate::store::{StoreError, TaskStore}; -use crate::task::{SubmitOutcome, TaskError, TaskRecord, TaskResult, TaskSubmission, TypedTask}; +use crate::task::{SubmitOutcome, TaskError, TaskRecord, TaskSubmission, TypedTask}; // ── State Map ──────────────────────────────────────────────────────── @@ -128,6 +129,34 @@ impl ChildSpawner { } } +// ── IO Tracker ──────────────────────────────────────────────────── + +/// Accumulated IO metrics reported by the executor during execution. +/// +/// Accessible via [`TaskContext::record_read_bytes`], +/// [`TaskContext::record_write_bytes`], etc. The scheduler reads the +/// final snapshot after the executor returns. +pub(crate) struct IoTracker { + pub read_bytes: AtomicI64, + pub write_bytes: AtomicI64, +} + +impl IoTracker { + pub fn new() -> Self { + Self { + read_bytes: AtomicI64::new(0), + write_bytes: AtomicI64::new(0), + } + } + + pub fn snapshot(&self) -> crate::task::TaskMetrics { + crate::task::TaskMetrics { + read_bytes: self.read_bytes.load(Ordering::Relaxed), + write_bytes: self.write_bytes.load(Ordering::Relaxed), + } + } +} + // ── Task Context ───────────────────────────────────────────────────── /// Execution context passed to a [`TaskExecutor`]. @@ -143,17 +172,45 @@ pub struct TaskContext { pub token: CancellationToken, /// Report progress back to the scheduler (0.0–1.0). pub progress: ProgressReporter, + /// Handle to the scheduler that dispatched this task. Allows executors to + /// submit continuation tasks, look up other tasks, etc. without needing + /// a separate `OnceLock` or `Arc` in application state. + pub scheduler: Scheduler, /// Shared application state set via [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state). pub(crate) app_state: StateSnapshot, /// Spawner for creating child tasks. `None` for non-hierarchical contexts. pub(crate) child_spawner: Option, + /// IO bytes accumulator — read by the scheduler after execution. + pub(crate) io: Arc, } impl TaskContext { + /// Deserialize the payload as a [`TypedTask`]. + /// + /// Returns an error if the payload is missing or deserialization fails. + /// This is the primary way to extract a typed task inside an executor. + /// + /// # Example + /// + /// ```ignore + /// async fn execute(&self, ctx: &TaskContext) -> Result<(), TaskError> { + /// let task: MyTask = ctx.payload()?; + /// // ... do work ... + /// Ok(()) + /// } + /// ``` + pub fn payload(&self) -> Result { + self.record + .deserialize_payload() + .map_err(TaskError::from)? + .ok_or_else(|| TaskError::new("missing payload")) + } + /// Deserialize the payload as a [`TypedTask`]. /// /// Convenience wrapper around [`TaskRecord::deserialize_payload`] that /// mirrors the typed submission API. + #[deprecated(since = "2.0.0", note = "use `ctx.payload::()` instead")] pub fn deserialize_typed(&self) -> Result, serde_json::Error> { self.record.deserialize_payload() } @@ -178,6 +235,22 @@ impl TaskContext { self.app_state.get::() } + /// Record actual bytes read during this task's execution. + /// + /// Can be called multiple times — values are accumulated. The scheduler + /// reads the total after the executor returns. + pub fn record_read_bytes(&self, bytes: i64) { + self.io.read_bytes.fetch_add(bytes, Ordering::Relaxed); + } + + /// Record actual bytes written during this task's execution. + /// + /// Can be called multiple times — values are accumulated. The scheduler + /// reads the total after the executor returns. + pub fn record_write_bytes(&self, bytes: i64) { + self.io.write_bytes.fetch_add(bytes, Ordering::Relaxed); + } + /// Spawn a child task that will be tracked under this task as parent. /// /// The child's `parent_id` is set automatically. Returns the submit @@ -217,7 +290,7 @@ impl TaskContext { /// # Example /// /// ```ignore -/// use taskmill::{TaskExecutor, TaskContext, TaskResult, TaskError}; +/// use taskmill::{TaskExecutor, TaskContext, TaskError}; /// /// struct MyExecutor; /// @@ -225,9 +298,9 @@ impl TaskContext { /// async fn execute<'a>( /// &'a self, /// ctx: &'a TaskContext, -/// ) -> Result { +/// ) -> Result<(), TaskError> { /// ctx.progress.report(0.5, Some("halfway".into())); -/// Ok(TaskResult { actual_read_bytes: 0, actual_write_bytes: 0 }) +/// Ok(()) /// } /// } /// ``` @@ -237,26 +310,26 @@ pub trait TaskExecutor: Send + Sync + 'static { /// - `ctx`: Execution context with the task record, cancellation token, /// and progress reporter. /// - /// On success, return actual IO bytes consumed. On failure, return a - /// `TaskError` indicating whether retry is appropriate. + /// On success, return `Ok(())`. Use [`TaskContext::record_read_bytes`] + /// and [`TaskContext::record_write_bytes`] to report IO during execution. + /// On failure, return a [`TaskError`] indicating whether retry is appropriate. fn execute<'a>( &'a self, ctx: &'a TaskContext, - ) -> impl Future> + Send + 'a; + ) -> impl Future> + Send + 'a; /// Called after all children of a parent task have completed. /// /// Only invoked for tasks that spawned children via - /// [`TaskContext::spawn_child`]. The default implementation is a no-op - /// that returns zero IO bytes. + /// [`TaskContext::spawn_child`]. The default implementation is a no-op. /// /// Use this for cleanup or assembly work (e.g. calling /// `CompleteMultipartUpload` after all parts have been uploaded). fn finalize<'a>( &'a self, _ctx: &'a TaskContext, - ) -> impl Future> + Send + 'a { - async { Ok(TaskResult::zero()) } + ) -> impl Future> + Send + 'a { + async { Ok(()) } } } @@ -277,26 +350,26 @@ pub(crate) trait ErasedExecutor: Send + Sync + 'static { fn execute_erased<'a>( &'a self, ctx: &'a TaskContext, - ) -> std::pin::Pin> + Send + 'a>>; + ) -> std::pin::Pin> + Send + 'a>>; fn finalize_erased<'a>( &'a self, ctx: &'a TaskContext, - ) -> std::pin::Pin> + Send + 'a>>; + ) -> std::pin::Pin> + Send + 'a>>; } impl ErasedExecutor for T { fn execute_erased<'a>( &'a self, ctx: &'a TaskContext, - ) -> std::pin::Pin> + Send + 'a>> { + ) -> std::pin::Pin> + Send + 'a>> { Box::pin(self.execute(ctx)) } fn finalize_erased<'a>( &'a self, ctx: &'a TaskContext, - ) -> std::pin::Pin> + Send + 'a>> { + ) -> std::pin::Pin> + Send + 'a>> { Box::pin(self.finalize(ctx)) } } @@ -364,11 +437,8 @@ mod tests { struct NoopExecutor; impl TaskExecutor for NoopExecutor { - async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result { - Ok(TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }) + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Ok(()) } } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 85206f3..f91d5ec 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -7,9 +7,9 @@ use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use crate::priority::Priority; -use crate::registry::{ChildSpawner, TaskContext}; +use crate::registry::{ChildSpawner, IoTracker, TaskContext}; use crate::store::TaskStore; -use crate::task::{ParentResolution, TaskRecord}; +use crate::task::{ParentResolution, TaskMetrics, TaskRecord}; use super::progress::ProgressReporter; use super::SchedulerEvent; @@ -195,6 +195,7 @@ pub(crate) struct SpawnContext { pub max_retries: i32, pub app_state: crate::registry::StateSnapshot, pub work_notify: Arc, + pub scheduler: super::Scheduler, } /// Spawn a task executor and wire up completion/failure handling. @@ -214,6 +215,7 @@ pub(crate) async fn spawn_task( max_retries, app_state, work_notify, + scheduler, } = ctx; let child_token = CancellationToken::new(); @@ -232,6 +234,7 @@ pub(crate) async fn spawn_task( // Build execution context. let child_spawner = ChildSpawner::new(store.clone(), task.id, work_notify.clone()); + let io = Arc::new(IoTracker::new()); let ctx = TaskContext { record: task.clone(), token: child_token.clone(), @@ -241,8 +244,10 @@ pub(crate) async fn spawn_task( task.key.clone(), event_tx.clone(), ), + scheduler, app_state, child_spawner: Some(child_spawner), + io: io.clone(), }; // Emit dispatched event. @@ -281,11 +286,14 @@ pub(crate) async fn spawn_task( ExecutionPhase::Finalize => executor.finalize_erased(&ctx).await, }; + // Read IO bytes from the context tracker. + let metrics = io.snapshot(); + // Drop the context (and its progress reporter) — executor is done. drop(ctx); match result { - Ok(tr) => { + Ok(()) => { // For the execute phase, check if the task spawned children. // If so, transition to waiting instead of completing. if phase == ExecutionPhase::Execute { @@ -324,7 +332,10 @@ pub(crate) async fn spawn_task( } } - if let Err(e) = store.complete(task_id, &tr).await { + if let Err(e) = store + .complete(task_id, &metrics) + .await + { tracing::error!(task_id, error = %e, "failed to record task completion"); } // Remove from active tracking AFTER the store write completes. @@ -369,8 +380,7 @@ pub(crate) async fn spawn_task( &te.message, te.retryable, max_retries, - te.actual_read_bytes, - te.actual_write_bytes, + &metrics, ) .await { @@ -409,7 +419,7 @@ pub(crate) async fn spawn_task( } // Fail the parent. let msg = format!("child task {task_id} failed: {}", te.message); - if let Err(e) = store.fail(parent_id, &msg, false, 0, 0, 0).await { + if let Err(e) = store.fail(parent_id, &msg, false, 0, &TaskMetrics::default()).await { tracing::error!( parent_id, error = %e, @@ -463,7 +473,7 @@ async fn handle_parent_resolution( Ok(Some(ParentResolution::Failed(reason))) => { // All children done but some failed — fail the parent. if let Ok(Some(parent)) = store.task_by_id(parent_id).await { - if let Err(e) = store.fail(parent_id, &reason, false, 0, 0, 0).await { + if let Err(e) = store.fail(parent_id, &reason, false, 0, &TaskMetrics::default()).await { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } let _ = event_tx.send(SchedulerEvent::Failed { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 4adfadd..1eeaf63 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -23,7 +23,7 @@ use crate::registry::{TaskExecutor, TaskTypeRegistry}; use crate::resource::sampler::{SamplerConfig, SmoothedReader}; use crate::resource::{ResourceReader, ResourceSampler}; use crate::store::{StoreConfig, StoreError, TaskStore}; -use crate::task::{generate_dedup_key, SubmitOutcome, TaskLookup, TaskSubmission, TypedTask}; +use crate::task::{generate_dedup_key, SubmitOutcome, TaskLookup, TaskMetrics, TaskSubmission, TypedTask}; use dispatch::ActiveTaskMap; use gate::{DefaultDispatchGate, GateContext}; @@ -499,8 +499,7 @@ impl Scheduler { &format!("no executor registered for type '{}'", task.task_type), false, 0, - 0, - 0, + &TaskMetrics::default(), ) .await?; return Ok(true); @@ -519,6 +518,7 @@ impl Scheduler { max_retries: self.inner.max_retries, app_state: self.inner.app_state.snapshot().await, work_notify: Arc::clone(&self.inner.work_notify), + scheduler: self.clone(), }, dispatch::ExecutionPhase::Execute, ) @@ -556,7 +556,7 @@ impl Scheduler { ); self.inner .store - .fail(parent_id, "no executor for finalize", false, 0, 0, 0) + .fail(parent_id, "no executor for finalize", false, 0, &TaskMetrics::default()) .await?; return Ok(true); }; @@ -572,6 +572,7 @@ impl Scheduler { max_retries: self.inner.max_retries, app_state: self.inner.app_state.snapshot().await, work_notify: Arc::clone(&self.inner.work_notify), + scheduler: self.clone(), }, dispatch::ExecutionPhase::Finalize, ) @@ -1081,37 +1082,30 @@ impl Default for SchedulerBuilder { mod tests { use super::*; use crate::registry::{TaskContext, TaskExecutor}; - use crate::task::{TaskError, TaskResult}; + use crate::task::TaskError; struct InstantExecutor; impl TaskExecutor for InstantExecutor { - async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result { - Ok(TaskResult { - actual_read_bytes: 100, - actual_write_bytes: 50, - }) + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { + ctx.record_read_bytes(100); + ctx.record_write_bytes(50); + Ok(()) } } struct SlowExecutor; impl TaskExecutor for SlowExecutor { - async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { tokio::select! { _ = ctx.token.cancelled() => { - Err(TaskError { - message: "cancelled".into(), - retryable: false, - actual_read_bytes: 0, - actual_write_bytes: 0, - }) + Err(TaskError::new("cancelled")) } _ = tokio::time::sleep(Duration::from_secs(60)) => { - Ok(TaskResult { - actual_read_bytes: 100, - actual_write_bytes: 50, - }) + ctx.record_read_bytes(100); + ctx.record_write_bytes(50); + Ok(()) } } } @@ -1121,13 +1115,8 @@ mod tests { struct FailingExecutor; impl TaskExecutor for FailingExecutor { - async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result { - Err(TaskError { - message: "boom".into(), - retryable: true, - actual_read_bytes: 0, - actual_write_bytes: 0, - }) + async fn execute<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { + Err(TaskError::retryable("boom")) } } @@ -1156,7 +1145,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("k1".into()), + dedup_key: Some("k1".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1203,7 +1192,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "unknown".into(), - key: Some("k".into()), + dedup_key: Some("k".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1227,7 +1216,7 @@ mod tests { let sub = TaskSubmission { task_type: "test".into(), - key: Some("dup".into()), + dedup_key: Some("dup".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1257,7 +1246,7 @@ mod tests { let id = sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("cancel-me".into()), + dedup_key: Some("cancel-me".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1290,7 +1279,7 @@ mod tests { let id = sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("cancel-running".into()), + dedup_key: Some("cancel-running".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1319,7 +1308,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("evt".into()), + dedup_key: Some("evt".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1352,7 +1341,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("shared".into()), + dedup_key: Some("shared".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1423,7 +1412,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some(key.to_string()), + dedup_key: Some(key.to_string()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1459,7 +1448,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some(key.to_string()), + dedup_key: Some(key.to_string()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1519,13 +1508,10 @@ mod tests { struct StateCheckExecutor; impl TaskExecutor for StateCheckExecutor { - async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { let state = ctx.state::().expect("state should be set"); state.flag.store(true, Ordering::SeqCst); - Ok(TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }) + Ok(()) } } @@ -1542,7 +1528,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("state-test".into()), + dedup_key: Some("state-test".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1566,7 +1552,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("lookup-1".into()), + dedup_key: Some("lookup-1".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1591,7 +1577,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("lookup-done".into()), + dedup_key: Some("lookup-done".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1654,11 +1640,11 @@ mod tests { } impl TaskExecutor for SpawningExecutor { - async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { for i in 0..self.num_children { let sub = TaskSubmission { task_type: "child".into(), - key: Some(format!("child-{i}")), + dedup_key: Some(format!("child-{i}")), priority: ctx.record.priority, payload: None, expected_read_bytes: 0, @@ -1666,14 +1652,9 @@ mod tests { parent_id: None, // spawn_child sets this fail_fast: true, }; - ctx.spawn_child(sub).await.map_err(|e| TaskError { - message: e.to_string(), - retryable: false, - actual_read_bytes: 0, - actual_write_bytes: 0, - })?; + ctx.spawn_child(sub).await?; } - Ok(TaskResult::zero()) + Ok(()) } } @@ -1684,11 +1665,11 @@ mod tests { } impl TaskExecutor for FinalizeTrackingExecutor { - async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { for i in 0..self.children { let sub = TaskSubmission { task_type: "child".into(), - key: Some(format!("ft-child-{i}")), + dedup_key: Some(format!("ft-child-{i}")), priority: ctx.record.priority, payload: None, expected_read_bytes: 0, @@ -1696,20 +1677,15 @@ mod tests { parent_id: None, fail_fast: true, }; - ctx.spawn_child(sub).await.map_err(|e| TaskError { - message: e.to_string(), - retryable: false, - actual_read_bytes: 0, - actual_write_bytes: 0, - })?; + ctx.spawn_child(sub).await?; } - Ok(TaskResult::zero()) + Ok(()) } - async fn finalize<'a>(&'a self, _ctx: &'a TaskContext) -> Result { + async fn finalize<'a>(&'a self, _ctx: &'a TaskContext) -> Result<(), TaskError> { self.finalized .store(true, std::sync::atomic::Ordering::SeqCst); - Ok(TaskResult::zero()) + Ok(()) } } @@ -1733,7 +1709,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "parent".into(), - key: Some("p1".into()), + dedup_key: Some("p1".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1793,7 +1769,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "parent".into(), - key: Some("p-complete".into()), + dedup_key: Some("p-complete".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1866,7 +1842,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "parent".into(), - key: Some("p-finalize".into()), + dedup_key: Some("p-finalize".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1922,7 +1898,7 @@ mod tests { let parent_id = sched .submit(&TaskSubmission { task_type: "parent".into(), - key: Some("p-cancel".into()), + dedup_key: Some("p-cancel".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1956,7 +1932,7 @@ mod tests { sched .submit(&TaskSubmission { task_type: "test".into(), - key: Some("no-kids".into()), + dedup_key: Some("no-kids".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 0b03f33..4c36e69 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -22,13 +22,13 @@ use super::SchedulerEvent; /// /// ```ignore /// // Inside a TaskExecutor::execute implementation: -/// async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { +/// async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { /// let items = vec![/* ... */]; /// for (i, item) in items.iter().enumerate() { /// // process item... /// ctx.progress.report_fraction(i as u64 + 1, items.len() as u64, None); /// } -/// Ok(TaskResult { actual_read_bytes: 0, actual_write_bytes: 0 }) +/// Ok(()) /// } /// ``` #[derive(Clone)] diff --git a/src/store.rs b/src/store.rs index cc240d9..672084d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -13,8 +13,8 @@ use sqlx::{Row, SqlitePool}; use crate::priority::Priority; use crate::task::{ - HistoryStatus, ParentResolution, SubmitOutcome, TaskHistoryRecord, TaskLookup, TaskRecord, - TaskResult, TaskStatus, TaskSubmission, TypeStats, MAX_PAYLOAD_BYTES, + HistoryStatus, ParentResolution, SubmitOutcome, TaskHistoryRecord, TaskLookup, TaskMetrics, + TaskRecord, TaskStatus, TaskSubmission, TypeStats, MAX_PAYLOAD_BYTES, }; /// Serde-friendly error type for Tauri IPC and API boundaries. @@ -435,7 +435,9 @@ impl TaskStore { } /// Mark a task as completed and move it to history. - pub async fn complete(&self, id: i64, result: &TaskResult) -> Result<(), StoreError> { + pub async fn complete(&self, id: i64, metrics: &TaskMetrics) -> Result<(), StoreError> { + let actual_read_bytes = metrics.read_bytes; + let actual_write_bytes = metrics.write_bytes; tracing::debug!(task_id = id, "store.complete: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -477,8 +479,8 @@ impl TaskStore { .bind(&task.payload) .bind(task.expected_read_bytes) .bind(task.expected_write_bytes) - .bind(result.actual_read_bytes) - .bind(result.actual_write_bytes) + .bind(actual_read_bytes) + .bind(actual_write_bytes) .bind(task.retry_count) .bind(&task.last_error) .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) @@ -534,8 +536,7 @@ impl TaskStore { error: &str, retryable: bool, max_retries: i32, - actual_read_bytes: i64, - actual_write_bytes: i64, + metrics: &TaskMetrics, ) -> Result<(), StoreError> { tracing::debug!(task_id = id, "store.fail: BEGIN tx"); let mut conn = self.begin_write().await?; @@ -589,8 +590,8 @@ impl TaskStore { .bind(&task.payload) .bind(task.expected_read_bytes) .bind(task.expected_write_bytes) - .bind(actual_read_bytes) - .bind(actual_write_bytes) + .bind(metrics.read_bytes) + .bind(metrics.write_bytes) .bind(task.retry_count + 1) .bind(error) .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) @@ -1175,7 +1176,7 @@ mod tests { fn make_submission(key: &str, priority: Priority) -> TaskSubmission { TaskSubmission { task_type: "test".into(), - key: Some(key.into()), + dedup_key: Some(key.into()), priority, payload: Some(b"hello".to_vec()), expected_read_bytes: 1000, @@ -1263,13 +1264,7 @@ mod tests { // Complete the running task — should reset to pending with requeue_priority. store - .complete( - task.id, - &TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }, - ) + .complete(task.id, &TaskMetrics::default()) .await .unwrap(); @@ -1339,7 +1334,7 @@ mod tests { store.submit(&sub_high).await.unwrap(); // Permanent failure — requeue flag is dropped. - store.fail(task.id, "boom", false, 0, 0, 0).await.unwrap(); + store.fail(task.id, "boom", false, 0, &TaskMetrics::default()).await.unwrap(); // Key should be free for reuse. let outcome = store.submit(&sub).await.unwrap(); @@ -1352,7 +1347,7 @@ mod tests { let sub_a = TaskSubmission { task_type: "type_a".into(), - key: Some("shared-key".into()), + dedup_key: Some("shared-key".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1362,7 +1357,7 @@ mod tests { }; let sub_b = TaskSubmission { task_type: "type_b".into(), - key: Some("shared-key".into()), + dedup_key: Some("shared-key".into()), priority: Priority::NORMAL, payload: None, expected_read_bytes: 0, @@ -1385,7 +1380,7 @@ mod tests { let sub = TaskSubmission { task_type: "ingest".into(), - key: None, + dedup_key: None, priority: Priority::NORMAL, payload: Some(b"same-data".to_vec()), expected_read_bytes: 0, @@ -1447,10 +1442,7 @@ mod tests { store .complete( task.id, - &TaskResult { - actual_read_bytes: 2000, - actual_write_bytes: 1000, - }, + &TaskMetrics { read_bytes: 2000, write_bytes: 1000 }, ) .await .unwrap(); @@ -1474,7 +1466,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); store - .fail(task.id, "transient error", true, 3, 0, 0) + .fail(task.id, "transient error", true, 3, &TaskMetrics::default()) .await .unwrap(); @@ -1494,11 +1486,11 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); // First fail: retry_count 0 < 1, requeued with retry_count=1. - store.fail(task.id, "err1", true, 1, 0, 0).await.unwrap(); + store.fail(task.id, "err1", true, 1, &TaskMetrics::default()).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); assert_eq!(task.retry_count, 1); // Second fail: retry_count 1 >= max_retries 1, moves to history. - store.fail(task.id, "err2", true, 1, 100, 50).await.unwrap(); + store.fail(task.id, "err2", true, 1, &TaskMetrics { read_bytes: 100, write_bytes: 50 }).await.unwrap(); // Should be in history now. assert!(store.task_by_key(&key).await.unwrap().is_none()); @@ -1547,13 +1539,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }, - ) + .complete(task.id, &TaskMetrics::default()) .await .unwrap(); @@ -1574,10 +1560,7 @@ mod tests { store .complete( task.id, - &TaskResult { - actual_read_bytes: 1000, - actual_write_bytes: 500, - }, + &TaskMetrics { read_bytes: 1000, write_bytes: 500 }, ) .await .unwrap(); @@ -1655,10 +1638,7 @@ mod tests { store .complete( task.id, - &TaskResult { - actual_read_bytes: 100, - actual_write_bytes: 50, - }, + &TaskMetrics { read_bytes: 100, write_bytes: 50 }, ) .await .unwrap(); @@ -1778,13 +1758,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }, - ) + .complete(task.id, &TaskMetrics::default()) .await .unwrap(); } @@ -1862,13 +1836,7 @@ mod tests { store.submit(&sub).await.unwrap(); let task = store.pop_next().await.unwrap().unwrap(); store - .complete( - task.id, - &TaskResult { - actual_read_bytes: 0, - actual_write_bytes: 0, - }, - ) + .complete(task.id, &TaskMetrics::default()) .await .unwrap(); @@ -1892,7 +1860,7 @@ mod tests { let sub = make_submission("ok", Priority::NORMAL); let big = TaskSubmission { task_type: "test".into(), - key: Some("big".into()), + dedup_key: Some("big".into()), priority: Priority::NORMAL, payload: Some(vec![0u8; MAX_PAYLOAD_BYTES + 1]), expected_read_bytes: 0, @@ -1986,7 +1954,7 @@ mod tests { child_sub.parent_id = Some(parent_id); store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskResult::zero()).await.unwrap(); + store.complete(child.id, &TaskMetrics::default()).await.unwrap(); // Parent should be ready to finalize. let resolution = store.try_resolve_parent(parent_id).await.unwrap(); @@ -2009,7 +1977,7 @@ mod tests { store.submit(&sub).await.unwrap(); } let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskResult::zero()).await.unwrap(); + store.complete(child.id, &TaskMetrics::default()).await.unwrap(); let resolution = store.try_resolve_parent(parent_id).await.unwrap(); assert_eq!(resolution, Some(ParentResolution::StillWaiting)); @@ -2029,7 +1997,7 @@ mod tests { child_sub.parent_id = Some(parent_id); store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.fail(child.id, "boom", false, 0, 0, 0).await.unwrap(); + store.fail(child.id, "boom", false, 0, &TaskMetrics::default()).await.unwrap(); let resolution = store.try_resolve_parent(parent_id).await.unwrap(); assert_eq!( @@ -2083,7 +2051,7 @@ mod tests { store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskResult::zero()).await.unwrap(); + store.complete(child.id, &TaskMetrics::default()).await.unwrap(); // Check history record has parent_id. let hist = store.history(10, 0).await.unwrap(); @@ -2102,7 +2070,7 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); assert!(!task.fail_fast); - store.complete(task.id, &TaskResult::zero()).await.unwrap(); + store.complete(task.id, &TaskMetrics::default()).await.unwrap(); let hist = store.history(10, 0).await.unwrap(); assert!(!hist[0].fail_fast); diff --git a/src/task.rs b/src/task.rs index 58955c8..c9c59a8 100644 --- a/src/task.rs +++ b/src/task.rs @@ -146,20 +146,43 @@ pub struct TaskHistoryRecord { pub fail_fast: bool, } -/// Reported by the executor on successful completion. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TaskResult { - pub actual_read_bytes: i64, - pub actual_write_bytes: i64, +/// Accumulated IO metrics captured by the scheduler after an executor finishes. +/// +/// Executors report metrics incrementally via [`TaskContext::record_read_bytes`](crate::TaskContext::record_read_bytes) +/// and [`TaskContext::record_write_bytes`](crate::TaskContext::record_write_bytes). +/// This struct is the snapshot read by the scheduler — executors never construct it directly. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct TaskMetrics { + pub read_bytes: i64, + pub write_bytes: i64, } /// Reported by the executor on failure. +/// +/// Use the convenience constructors [`TaskError::new`] and +/// [`TaskError::retryable`] for the common case where IO bytes are zero. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskError { pub message: String, pub retryable: bool, - pub actual_read_bytes: i64, - pub actual_write_bytes: i64, +} + +impl TaskError { + /// Create a non-retryable error. + pub fn new(message: impl Into) -> Self { + Self { + message: message.into(), + retryable: false, + } + } + + /// Create a retryable error. + pub fn retryable(message: impl Into) -> Self { + Self { + message: message.into(), + retryable: true, + } + } } impl std::fmt::Display for TaskError { @@ -170,6 +193,30 @@ impl std::fmt::Display for TaskError { impl std::error::Error for TaskError {} +impl From for TaskError { + fn from(message: String) -> Self { + Self::new(message) + } +} + +impl From<&str> for TaskError { + fn from(message: &str) -> Self { + Self::new(message) + } +} + +impl From for TaskError { + fn from(e: serde_json::Error) -> Self { + Self::new(e.to_string()) + } +} + +impl From for TaskError { + fn from(e: crate::store::StoreError) -> Self { + Self::new(e.to_string()) + } +} + /// Result of a task submission attempt. #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmitOutcome { @@ -219,7 +266,7 @@ pub struct TaskSubmission { /// Optional dedup key. When `None`, the key is auto-generated by hashing /// `task_type` and `payload`, so two submissions with the same type and /// payload are deduplicated automatically. - pub key: Option, + pub dedup_key: Option, pub priority: Priority, pub payload: Option>, pub expected_read_bytes: i64, @@ -241,7 +288,7 @@ impl TaskSubmission { /// - Explicit key: `hash(task_type + ":" + key)` /// - No key: `hash(task_type + ":" + payload)` pub fn effective_key(&self) -> String { - match &self.key { + match &self.dedup_key { Some(k) => generate_dedup_key(&self.task_type, Some(k.as_bytes())), None => generate_dedup_key(&self.task_type, self.payload.as_deref()), } @@ -261,7 +308,7 @@ impl TaskSubmission { let payload = serde_json::to_vec(data)?; Ok(Self { task_type: task_type.to_string(), - key: None, + dedup_key: None, priority, payload: Some(payload), expected_read_bytes, @@ -322,7 +369,7 @@ impl TaskSubmission { let payload = serde_json::to_vec(task)?; Ok(Self { task_type: T::TASK_TYPE.to_string(), - key: None, + dedup_key: None, priority: task.priority(), payload: Some(payload), expected_read_bytes: task.expected_read_bytes(), @@ -376,16 +423,6 @@ pub enum ParentResolution { StillWaiting, } -impl TaskResult { - /// A result with zero IO bytes. - pub fn zero() -> Self { - Self { - actual_read_bytes: 0, - actual_write_bytes: 0, - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -420,7 +457,7 @@ mod tests { assert_eq!(sub.priority, Priority::NORMAL); assert_eq!(sub.expected_read_bytes, 4096); assert_eq!(sub.expected_write_bytes, 1024); - assert!(sub.key.is_none()); + assert!(sub.dedup_key.is_none()); // Payload round-trips correctly. let recovered: Thumbnail = serde_json::from_slice(sub.payload.as_ref().unwrap()).unwrap(); From 2fc9e8849a25f41c13bf6f3391b8b764546b12e5 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 20:32:32 -0700 Subject: [PATCH 2/3] docs: expand crate and module documentation with concepts, examples, and cross-links Add crate-level sections covering the task lifecycle, deduplication, priority/preemption, IO budgeting, child tasks, and dispatch loop internals. Enrich module docs for backpressure, registry, resource, scheduler, store, and task with builder cross-links, usage examples, and detailed field descriptions. --- src/backpressure.rs | 9 +- src/lib.rs | 214 ++++++++++++++++++++++++++++++++++++-- src/registry.rs | 17 ++- src/resource/mod.rs | 12 ++- src/scheduler/mod.rs | 49 +++++++-- src/scheduler/progress.rs | 9 +- src/store.rs | 45 ++++++++ src/task.rs | 30 ++++-- 8 files changed, 349 insertions(+), 36 deletions(-) diff --git a/src/backpressure.rs b/src/backpressure.rs index c32fbf5..9f6e7c2 100644 --- a/src/backpressure.rs +++ b/src/backpressure.rs @@ -1,9 +1,12 @@ //! Composable backpressure for throttling task dispatch. //! //! Implement [`PressureSource`] to feed external signals (API load, memory -//! pressure, queue depth, etc.) into the scheduler. Multiple sources are -//! combined via [`CompositePressure`], and [`ThrottlePolicy`] maps the -//! aggregate pressure to per-priority throttle decisions. +//! pressure, queue depth, etc.) into the scheduler. Register sources via +//! [`SchedulerBuilder::pressure_source`](crate::SchedulerBuilder::pressure_source). +//! Multiple sources are combined via [`CompositePressure`] (max wins), and +//! [`ThrottlePolicy`] maps the aggregate pressure to per-[`Priority`] +//! throttle decisions. Customize the policy with +//! [`SchedulerBuilder::throttle_policy`](crate::SchedulerBuilder::throttle_policy). use crate::priority::Priority; diff --git a/src/lib.rs b/src/lib.rs index 9fdc92a..f358875 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,16 +4,80 @@ //! //! Taskmill provides a generic task scheduling system that: //! - Persists tasks to SQLite so the queue survives restarts -//! - Schedules by priority (0 = highest, 255 = lowest) with named tiers +//! - Schedules by priority (0 = highest, 255 = lowest) with [named tiers](Priority) //! - Deduplicates tasks by key — submitting an already-queued key is a no-op //! - Tracks expected and actual IO bytes per task for budget-based scheduling //! - Monitors system CPU and disk throughput to adjust concurrency -//! - Supports composable backpressure from arbitrary external sources +//! - Supports [composable backpressure](PressureSource) from arbitrary external sources //! - Preempts lower-priority work when high-priority tasks arrive -//! - Retries failed tasks at the same priority level -//! - Records completed/failed task history for queries and IO learning -//! - Emits lifecycle events including progress for UI integration (via broadcast channel) -//! - Supports graceful shutdown with configurable drain timeout +//! - [Retries](TaskError::retryable) failed tasks at the same priority level +//! - Records completed/failed [task history](TaskHistoryRecord) for queries and IO learning +//! - Emits [lifecycle events](SchedulerEvent) including progress for UI integration +//! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout +//! +//! # Concepts +//! +//! ## Task lifecycle +//! +//! A task flows through a linear pipeline: +//! +//! ```text +//! submit → pending → running → completed +//! ↘ paused ↗ ↘ failed (retryable → pending) +//! ↘ failed (permanent → history) +//! ``` +//! +//! 1. **Submit** — [`Scheduler::submit`] (or [`submit_typed`](Scheduler::submit_typed)) +//! enqueues a [`TaskSubmission`] into the SQLite store. +//! 2. **Pending** — the task waits in a priority queue. The scheduler's run loop +//! pops the highest-priority pending task on each tick. +//! 3. **Running** — the scheduler calls [`TaskExecutor::execute`] with a +//! [`TaskContext`] containing the task record, a cancellation token, and a +//! progress reporter. +//! 4. **Terminal** — on success the task moves to the history table. On failure, +//! a [`retryable`](TaskError::retryable) error requeues it (up to +//! [`SchedulerBuilder::max_retries`]); a non-retryable error moves it to +//! history as failed. +//! +//! ## Deduplication +//! +//! Every task has a dedup key derived from its type name and either an explicit +//! key string or the serialized payload (via SHA-256). Submitting a task whose +//! key already exists returns [`SubmitOutcome::Duplicate`] (or +//! [`Upgraded`](SubmitOutcome::Upgraded) if the new submission has higher +//! priority). This makes it safe to call `submit` idempotently. +//! +//! ## Priority & preemption +//! +//! [`Priority`] is a `u8` newtype where **lower values = higher priority**. +//! Named constants ([`REALTIME`](Priority::REALTIME), +//! [`HIGH`](Priority::HIGH), [`NORMAL`](Priority::NORMAL), +//! [`BACKGROUND`](Priority::BACKGROUND), [`IDLE`](Priority::IDLE)) cover +//! common tiers. When a task at or above the +//! [`preempt_priority`](SchedulerBuilder::preempt_priority) threshold is +//! submitted, lower-priority running tasks are cancelled and paused so the +//! urgent work runs immediately. +//! +//! ## IO budgeting +//! +//! Each task declares expected read/write bytes (via [`TypedTask`] or +//! [`TaskSubmission`] fields). The scheduler tracks running IO totals and, +//! when [resource monitoring](SchedulerBuilder::with_resource_monitoring) is +//! enabled, compares them against observed system disk throughput to avoid +//! over-saturating the disk. Executors report actual IO via +//! [`TaskContext::record_read_bytes`] / [`record_write_bytes`](TaskContext::record_write_bytes), +//! which feeds back into historical throughput averages for future scheduling +//! decisions. +//! +//! ## Child tasks & two-phase execution +//! +//! An executor can spawn child tasks via [`TaskContext::spawn_child`]. When +//! children exist, the parent enters a **waiting** state after its executor +//! returns. Once all children complete, the parent's +//! [`TaskExecutor::finalize`] method is called — useful for assembly work +//! like `CompleteMultipartUpload`. If any child fails and +//! [`fail_fast`](TaskSubmission::fail_fast) is `true` (the default), siblings +//! are cancelled and the parent fails immediately. //! //! # Quick start //! @@ -73,11 +137,147 @@ //! # } //! ``` //! +//! # Common patterns +//! +//! ## Shared application state +//! +//! Register shared services (database pools, HTTP clients, etc.) at build time +//! and retrieve them from any executor via [`TaskContext::state`]: +//! +//! ```ignore +//! struct AppServices { db: DatabasePool, http: reqwest::Client } +//! +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .app_state(AppServices { /* ... */ }) +//! .executor("ingest", Arc::new(IngestExecutor)) +//! .build() +//! .await?; +//! +//! // Inside the executor: +//! async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { +//! let svc = ctx.state::().expect("AppServices not registered"); +//! svc.db.query("...").await?; +//! Ok(()) +//! } +//! ``` +//! +//! State can also be injected after construction via +//! [`Scheduler::register_state`] — useful when a library (e.g. shoebox) +//! receives a pre-built scheduler from a parent application. +//! +//! ## Backpressure +//! +//! Implement [`PressureSource`] to feed external signals into the scheduler's +//! throttle decisions. The default [`ThrottlePolicy`] pauses `BACKGROUND` +//! tasks above 50% pressure and `NORMAL` tasks above 75%: +//! +//! ```ignore +//! use std::sync::atomic::{AtomicU32, Ordering}; +//! use taskmill::{PressureSource, Scheduler}; +//! +//! struct ApiLoad { active: AtomicU32, max: u32 } +//! +//! impl PressureSource for ApiLoad { +//! fn pressure(&self) -> f32 { +//! self.active.load(Ordering::Relaxed) as f32 / self.max as f32 +//! } +//! fn name(&self) -> &str { "api-load" } +//! } +//! +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .pressure_source(Box::new(ApiLoad { active: AtomicU32::new(0), max: 100 })) +//! // .throttle_policy(custom_policy) // optional override +//! .build() +//! .await?; +//! ``` +//! +//! ## Events & progress +//! +//! Subscribe to [`SchedulerEvent`]s to drive a UI or collect metrics: +//! +//! ```ignore +//! let mut rx = scheduler.subscribe(); +//! tokio::spawn(async move { +//! while let Ok(event) = rx.recv().await { +//! match event { +//! SchedulerEvent::Progress { task_id, percent, message, .. } => { +//! update_progress_bar(task_id, percent, message); +//! } +//! SchedulerEvent::Completed { task_id, .. } => { +//! mark_done(task_id); +//! } +//! _ => {} +//! } +//! } +//! }); +//! ``` +//! +//! For a single-call dashboard snapshot, use [`Scheduler::snapshot`] which +//! returns a serializable [`SchedulerSnapshot`] with queue depths, running +//! tasks, progress estimates, and backpressure. +//! +//! ## Child tasks +//! +//! Spawn child tasks from an executor to model fan-out work. The parent +//! automatically waits for all children before its [`finalize`](TaskExecutor::finalize) +//! method is called: +//! +//! ```ignore +//! impl TaskExecutor for MultipartUploadExecutor { +//! async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { +//! let upload: MultipartUpload = ctx.payload()?; +//! for part in &upload.parts { +//! ctx.spawn_child(TaskSubmission { +//! task_type: "upload-part".into(), +//! dedup_key: Some(part.etag.clone()), +//! priority: ctx.record.priority, +//! payload: Some(serde_json::to_vec(part)?), +//! expected_read_bytes: part.size as i64, +//! expected_write_bytes: 0, +//! parent_id: None, // set automatically by spawn_child +//! fail_fast: true, +//! }).await?; +//! } +//! Ok(()) +//! } +//! +//! async fn finalize<'a>(&'a self, ctx: &'a TaskContext) -> Result<(), TaskError> { +//! // All parts uploaded — complete the multipart upload. +//! let upload: MultipartUpload = ctx.payload()?; +//! complete_multipart(&upload).await?; +//! Ok(()) +//! } +//! } +//! ``` +//! +//! # How the dispatch loop works +//! +//! Understanding the run loop helps when tuning [`SchedulerConfig`]: +//! +//! 1. The loop wakes on three conditions: a new task was submitted (via +//! [`Notify`](tokio::sync::Notify)), the +//! [`poll_interval`](SchedulerBuilder::poll_interval) elapsed (default +//! 500ms), or the cancellation token fired. +//! 2. Paused tasks are resumed if no active preemptors exist at their +//! priority level. +//! 3. Pending finalizers (parents whose children all completed) are +//! dispatched first. +//! 4. The highest-priority pending task is peeked (without claiming it). +//! 5. The dispatch gate checks concurrency limits, IO budget, and +//! backpressure. If the gate rejects, no slot is consumed. +//! 6. If admitted, the task is atomically claimed (`peek` → `pop_by_id`) +//! and spawned as a Tokio task. +//! 7. Steps 4–6 repeat until the queue is empty or the gate rejects. +//! //! # Feature flags //! //! - **`sysinfo-monitor`** (default): Enables the built-in [`SysinfoSampler`](resource::sysinfo_monitor::SysinfoSampler) //! for cross-platform CPU and disk IO monitoring. Disable for mobile targets or -//! when providing a custom [`ResourceSampler`]. +//! when providing a custom [`ResourceSampler`]. Without this feature, calling +//! [`SchedulerBuilder::with_resource_monitoring`] requires a custom sampler +//! via [`resource_sampler()`](SchedulerBuilder::resource_sampler). pub mod backpressure; pub mod priority; diff --git a/src/registry.rs b/src/registry.rs index 3d7a403..8642c46 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -1,10 +1,13 @@ //! Executor registration, shared state, and the [`TaskContext`] passed to each task. //! -//! Register one [`TaskExecutor`] per task type with the scheduler. At dispatch +//! Register one [`TaskExecutor`] per task type via +//! [`SchedulerBuilder::executor`](crate::SchedulerBuilder::executor) or +//! [`typed_executor`](crate::SchedulerBuilder::typed_executor). At dispatch //! time the scheduler looks up the executor by name and calls //! [`execute`](TaskExecutor::execute) with a [`TaskContext`] containing the //! persisted record, a cancellation token, a progress reporter, and any -//! shared application state. +//! shared application state registered via +//! [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state). use std::any::{Any, TypeId}; use std::collections::HashMap; @@ -178,9 +181,15 @@ pub struct TaskContext { pub scheduler: Scheduler, /// Shared application state set via [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state). pub(crate) app_state: StateSnapshot, - /// Spawner for creating child tasks. `None` for non-hierarchical contexts. + /// Spawner for creating child tasks via [`spawn_child`](Self::spawn_child) + /// and [`spawn_children`](Self::spawn_children). Present for all tasks + /// dispatched by the scheduler — the parent relationship is set automatically + /// when children are spawned. pub(crate) child_spawner: Option, - /// IO bytes accumulator — read by the scheduler after execution. + /// IO bytes accumulator fed by [`record_read_bytes`](Self::record_read_bytes) + /// and [`record_write_bytes`](Self::record_write_bytes). The scheduler reads + /// the final totals after the executor returns and stores them in history + /// for future IO budget estimation. pub(crate) io: Arc, } diff --git a/src/resource/mod.rs b/src/resource/mod.rs index 4ad9c01..7a68657 100644 --- a/src/resource/mod.rs +++ b/src/resource/mod.rs @@ -1,10 +1,14 @@ //! System resource monitoring for IO-aware scheduling. //! //! Implement [`ResourceSampler`] to feed CPU and disk IO metrics into the -//! scheduler. The built-in [`sysinfo_monitor`] module provides a cross-platform -//! sampler using the `sysinfo` crate (enabled by the `sysinfo-monitor` feature). -//! The scheduler reads the latest smoothed snapshot via [`ResourceReader`] when -//! making IO-budget dispatch decisions. +//! scheduler, or use the built-in [`sysinfo_monitor`] module (enabled by the +//! `sysinfo-monitor` feature) for cross-platform monitoring. Enable via +//! [`SchedulerBuilder::with_resource_monitoring`](crate::SchedulerBuilder::with_resource_monitoring) +//! or provide a custom sampler with +//! [`SchedulerBuilder::resource_sampler`](crate::SchedulerBuilder::resource_sampler). +//! +//! The scheduler reads the latest EWMA-smoothed snapshot via [`ResourceReader`] +//! when making IO-budget dispatch decisions. pub mod sampler; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 1eeaf63..9fd257d 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1,9 +1,13 @@ //! The scheduler: configuration, event stream, and the main run loop. //! -//! [`Scheduler`] coordinates task execution — popping from the store, -//! applying backpressure and IO-budget checks, preempting lower-priority -//! work, and emitting [`SchedulerEvent`]s for UI integration. Use -//! [`SchedulerBuilder`] for ergonomic construction. +//! [`Scheduler`] coordinates task execution — popping from the +//! [`TaskStore`], applying [backpressure](crate::backpressure) +//! and IO-budget checks, preempting lower-priority work, and emitting +//! [`SchedulerEvent`]s for UI integration. Use [`SchedulerBuilder`] for +//! ergonomic construction. +//! +//! See the [crate-level docs](crate) for a full walkthrough of the task +//! lifecycle, common patterns, and how the dispatch loop works. pub(crate) mod dispatch; pub(crate) mod gate; @@ -131,20 +135,47 @@ pub enum ShutdownMode { } /// Scheduler configuration. +/// +/// All fields have sensible defaults (see [`Default`] impl). Most users +/// configure via [`SchedulerBuilder`] methods rather than constructing +/// this directly. pub struct SchedulerConfig { /// Maximum concurrent running tasks. Adjusted dynamically via - /// [`Scheduler::set_max_concurrency`]. + /// [`Scheduler::set_max_concurrency`]. Default: 4. + /// + /// Increase for IO-bound workloads where tasks spend most of their time + /// waiting on network or disk. Decrease for CPU-bound work or when running + /// on battery/mobile. pub max_concurrency: usize, /// Maximum retries before permanent failure. Default: 3. + /// + /// Only applies to tasks that return [`TaskError::retryable`](crate::TaskError::retryable). Non-retryable + /// errors fail immediately regardless of this setting. pub max_retries: i32, - /// Priority threshold: tasks at or above this priority (lower numeric value) - /// trigger preemption of lower-priority running tasks. + /// Priority threshold for preemption. Tasks at or above this priority + /// (lower numeric value = higher priority) trigger preemption of + /// lower-priority running tasks. Default: [`Priority::REALTIME`]. + /// + /// Set to [`Priority::HIGH`] if you want `HIGH`-priority tasks to also + /// preempt. Set to `Priority::new(0)` to effectively disable preemption + /// (only priority 0 would trigger it). pub preempt_priority: Priority, /// Interval between scheduler polls when idle. Default: 500ms. + /// + /// The scheduler also wakes immediately on task submission, so this mainly + /// affects how quickly paused tasks are resumed and how often housekeeping + /// runs. Lower values increase responsiveness at the cost of CPU usage. + /// On mobile targets, the notify-based wake means the CPU can sleep between + /// submissions regardless of this interval. pub poll_interval: Duration, - /// How many recent tasks to consider for IO throughput estimation. + /// How many recent completed tasks to sample for IO throughput estimation. + /// Default: 20. + /// + /// Used by the IO budget gate to estimate how much disk bandwidth running + /// tasks consume. Larger values smooth out outliers but adapt more slowly + /// to changing workloads. pub throughput_sample_size: i32, - /// Shutdown behavior. Default: Hard. + /// Shutdown behavior. Default: [`ShutdownMode::Hard`]. pub shutdown_mode: ShutdownMode, } diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 4c36e69..199638d 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -1,8 +1,11 @@ //! Progress reporting and throughput-based extrapolation. //! -//! Executors call [`ProgressReporter::report`] to emit percentage updates. -//! The scheduler combines these with historical throughput data to produce -//! [`EstimatedProgress`] snapshots for dashboard UIs. +//! Executors call [`ProgressReporter::report`] (via [`TaskContext::progress`](crate::TaskContext::progress)) +//! to emit percentage updates as [`SchedulerEvent::Progress`] +//! events. The scheduler combines these with historical throughput data to +//! produce [`EstimatedProgress`] snapshots, available via +//! [`Scheduler::estimated_progress`](super::Scheduler::estimated_progress) or +//! the [`SchedulerSnapshot`](super::SchedulerSnapshot). use serde::{Deserialize, Serialize}; diff --git a/src/store.rs b/src/store.rs index 672084d..b78618e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -3,6 +3,12 @@ //! [`TaskStore`] manages the active task queue and completed/failed history //! in a single SQLite database. It handles deduplication, priority upgrades, //! retries, parent-child hierarchy, and automatic history pruning. +//! +//! Most users interact with the store through [`Scheduler`](crate::Scheduler) +//! methods like [`submit`](crate::Scheduler::submit) and +//! [`task_lookup`](crate::Scheduler::task_lookup). Direct access is available +//! via [`Scheduler::store()`](crate::Scheduler::store) for queries and +//! diagnostics. use std::sync::atomic::{AtomicU64, Ordering}; @@ -92,6 +98,45 @@ impl Default for StoreConfig { } /// SQLite-backed persistence layer for the task queue and history. +/// +/// Most users interact with [`TaskStore`] indirectly through [`Scheduler`](crate::Scheduler), +/// but direct access is available via [`Scheduler::store()`](crate::Scheduler::store) for +/// queries and diagnostics. +/// +/// # Example +/// +/// ```no_run +/// # async fn example() -> Result<(), taskmill::store::StoreError> { +/// use taskmill::store::TaskStore; +/// use taskmill::task::{TaskSubmission, TaskMetrics, TaskStatus}; +/// use taskmill::priority::Priority; +/// +/// let store = TaskStore::open_memory().await?; +/// +/// // Submit a task. +/// let sub = TaskSubmission { +/// task_type: "thumbnail".into(), +/// dedup_key: Some("photo-1".into()), +/// priority: Priority::NORMAL, +/// payload: Some(br#"{"path":"/a.jpg"}"#.to_vec()), +/// expected_read_bytes: 4096, +/// expected_write_bytes: 1024, +/// parent_id: None, +/// fail_fast: true, +/// }; +/// let outcome = store.submit(&sub).await?; +/// assert!(outcome.is_inserted()); +/// +/// // Pop the highest-priority task and mark it running. +/// let task = store.pop_next().await?.unwrap(); +/// assert_eq!(task.status, TaskStatus::Running); +/// +/// // Complete it — moves to history. +/// store.complete(task.id, &TaskMetrics { read_bytes: 4096, write_bytes: 1024 }).await?; +/// assert!(store.task_by_id(task.id).await?.is_none()); // gone from active queue +/// # Ok(()) +/// # } +/// ``` #[derive(Clone)] pub struct TaskStore { pool: SqlitePool, diff --git a/src/task.rs b/src/task.rs index c9c59a8..4e5ddcf 100644 --- a/src/task.rs +++ b/src/task.rs @@ -4,6 +4,11 @@ //! [`TaskSubmission`] for enqueuing work, [`TaskRecord`] for in-flight tasks, //! [`TaskHistoryRecord`] for completed/failed results, and [`TypedTask`] for //! strongly-typed task payloads with built-in serialization. +//! +//! Submit tasks via [`Scheduler::submit`](crate::Scheduler::submit) or +//! [`Scheduler::submit_typed`](crate::Scheduler::submit_typed). Executors +//! receive a [`TaskContext`](crate::TaskContext) with the deserialized record +//! and report results via [`TaskError`]. use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; @@ -159,8 +164,16 @@ pub struct TaskMetrics { /// Reported by the executor on failure. /// -/// Use the convenience constructors [`TaskError::new`] and -/// [`TaskError::retryable`] for the common case where IO bytes are zero. +/// The scheduler uses the [`retryable`](Self::retryable) flag to decide +/// whether to requeue the task or move it to history as permanently failed: +/// +/// - **Non-retryable** ([`TaskError::new`]): the task moves directly to the +/// history table with status `failed`. Use this for logic errors, invalid +/// payloads, or conditions that won't change on retry. +/// - **Retryable** ([`TaskError::retryable`]): the task is requeued as +/// `pending` with an incremented retry count, keeping the same priority. +/// After [`SchedulerConfig::max_retries`](crate::SchedulerConfig::max_retries) +/// attempts (default 3), the task fails permanently. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskError { pub message: String, @@ -168,7 +181,8 @@ pub struct TaskError { } impl TaskError { - /// Create a non-retryable error. + /// Create a **non-retryable** error. The task will fail permanently and + /// move to the history table. pub fn new(message: impl Into) -> Self { Self { message: message.into(), @@ -176,7 +190,9 @@ impl TaskError { } } - /// Create a retryable error. + /// Create a **retryable** error. The task will be requeued as pending + /// and retried up to [`SchedulerConfig::max_retries`](crate::SchedulerConfig::max_retries) + /// times before failing permanently. pub fn retryable(message: impl Into) -> Self { Self { message: message.into(), @@ -324,8 +340,10 @@ impl TaskSubmission { /// /// Implementing this trait collapses the 6 fields of [`TaskSubmission`] into a /// derive-friendly pattern. Use [`Scheduler::submit_typed`](crate::Scheduler::submit_typed) -/// to submit and [`TaskContext::deserialize_typed`](crate::TaskContext::deserialize_typed) -/// on the executor side. +/// to submit and [`TaskContext::payload`](crate::TaskContext::payload) on the +/// executor side to deserialize. Each `TypedTask` must have a corresponding +/// [`TaskExecutor`](crate::TaskExecutor) registered under the same +/// [`TASK_TYPE`](Self::TASK_TYPE) name. /// /// # Example /// From 1df171477569444e48114b92131648e2019d22fd Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 20:54:24 -0700 Subject: [PATCH 3/3] style: reformat long method chains and struct literals --- src/scheduler/dispatch.rs | 23 ++++++-------- src/scheduler/mod.rs | 12 ++++++-- src/store.rs | 64 ++++++++++++++++++++++++++++++++------- 3 files changed, 73 insertions(+), 26 deletions(-) diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index f91d5ec..2ec890c 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -332,10 +332,7 @@ pub(crate) async fn spawn_task( } } - if let Err(e) = store - .complete(task_id, &metrics) - .await - { + if let Err(e) = store.complete(task_id, &metrics).await { tracing::error!(task_id, error = %e, "failed to record task completion"); } // Remove from active tracking AFTER the store write completes. @@ -375,13 +372,7 @@ pub(crate) async fn spawn_task( "task failed" ); if let Err(e) = store - .fail( - task_id, - &te.message, - te.retryable, - max_retries, - &metrics, - ) + .fail(task_id, &te.message, te.retryable, max_retries, &metrics) .await { tracing::error!(task_id, error = %e, "failed to record task failure"); @@ -419,7 +410,10 @@ pub(crate) async fn spawn_task( } // Fail the parent. let msg = format!("child task {task_id} failed: {}", te.message); - if let Err(e) = store.fail(parent_id, &msg, false, 0, &TaskMetrics::default()).await { + if let Err(e) = store + .fail(parent_id, &msg, false, 0, &TaskMetrics::default()) + .await + { tracing::error!( parent_id, error = %e, @@ -473,7 +467,10 @@ async fn handle_parent_resolution( Ok(Some(ParentResolution::Failed(reason))) => { // All children done but some failed — fail the parent. if let Ok(Some(parent)) = store.task_by_id(parent_id).await { - if let Err(e) = store.fail(parent_id, &reason, false, 0, &TaskMetrics::default()).await { + if let Err(e) = store + .fail(parent_id, &reason, false, 0, &TaskMetrics::default()) + .await + { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } let _ = event_tx.send(SchedulerEvent::Failed { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 9fd257d..604b4f7 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -27,7 +27,9 @@ use crate::registry::{TaskExecutor, TaskTypeRegistry}; use crate::resource::sampler::{SamplerConfig, SmoothedReader}; use crate::resource::{ResourceReader, ResourceSampler}; use crate::store::{StoreConfig, StoreError, TaskStore}; -use crate::task::{generate_dedup_key, SubmitOutcome, TaskLookup, TaskMetrics, TaskSubmission, TypedTask}; +use crate::task::{ + generate_dedup_key, SubmitOutcome, TaskLookup, TaskMetrics, TaskSubmission, TypedTask, +}; use dispatch::ActiveTaskMap; use gate::{DefaultDispatchGate, GateContext}; @@ -587,7 +589,13 @@ impl Scheduler { ); self.inner .store - .fail(parent_id, "no executor for finalize", false, 0, &TaskMetrics::default()) + .fail( + parent_id, + "no executor for finalize", + false, + 0, + &TaskMetrics::default(), + ) .await?; return Ok(true); }; diff --git a/src/store.rs b/src/store.rs index b78618e..0dc3a6c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1379,7 +1379,10 @@ mod tests { store.submit(&sub_high).await.unwrap(); // Permanent failure — requeue flag is dropped. - store.fail(task.id, "boom", false, 0, &TaskMetrics::default()).await.unwrap(); + store + .fail(task.id, "boom", false, 0, &TaskMetrics::default()) + .await + .unwrap(); // Key should be free for reuse. let outcome = store.submit(&sub).await.unwrap(); @@ -1487,7 +1490,10 @@ mod tests { store .complete( task.id, - &TaskMetrics { read_bytes: 2000, write_bytes: 1000 }, + &TaskMetrics { + read_bytes: 2000, + write_bytes: 1000, + }, ) .await .unwrap(); @@ -1531,11 +1537,26 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); // First fail: retry_count 0 < 1, requeued with retry_count=1. - store.fail(task.id, "err1", true, 1, &TaskMetrics::default()).await.unwrap(); + store + .fail(task.id, "err1", true, 1, &TaskMetrics::default()) + .await + .unwrap(); let task = store.pop_next().await.unwrap().unwrap(); assert_eq!(task.retry_count, 1); // Second fail: retry_count 1 >= max_retries 1, moves to history. - store.fail(task.id, "err2", true, 1, &TaskMetrics { read_bytes: 100, write_bytes: 50 }).await.unwrap(); + store + .fail( + task.id, + "err2", + true, + 1, + &TaskMetrics { + read_bytes: 100, + write_bytes: 50, + }, + ) + .await + .unwrap(); // Should be in history now. assert!(store.task_by_key(&key).await.unwrap().is_none()); @@ -1605,7 +1626,10 @@ mod tests { store .complete( task.id, - &TaskMetrics { read_bytes: 1000, write_bytes: 500 }, + &TaskMetrics { + read_bytes: 1000, + write_bytes: 500, + }, ) .await .unwrap(); @@ -1683,7 +1707,10 @@ mod tests { store .complete( task.id, - &TaskMetrics { read_bytes: 100, write_bytes: 50 }, + &TaskMetrics { + read_bytes: 100, + write_bytes: 50, + }, ) .await .unwrap(); @@ -1999,7 +2026,10 @@ mod tests { child_sub.parent_id = Some(parent_id); store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskMetrics::default()).await.unwrap(); + store + .complete(child.id, &TaskMetrics::default()) + .await + .unwrap(); // Parent should be ready to finalize. let resolution = store.try_resolve_parent(parent_id).await.unwrap(); @@ -2022,7 +2052,10 @@ mod tests { store.submit(&sub).await.unwrap(); } let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskMetrics::default()).await.unwrap(); + store + .complete(child.id, &TaskMetrics::default()) + .await + .unwrap(); let resolution = store.try_resolve_parent(parent_id).await.unwrap(); assert_eq!(resolution, Some(ParentResolution::StillWaiting)); @@ -2042,7 +2075,10 @@ mod tests { child_sub.parent_id = Some(parent_id); store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.fail(child.id, "boom", false, 0, &TaskMetrics::default()).await.unwrap(); + store + .fail(child.id, "boom", false, 0, &TaskMetrics::default()) + .await + .unwrap(); let resolution = store.try_resolve_parent(parent_id).await.unwrap(); assert_eq!( @@ -2096,7 +2132,10 @@ mod tests { store.submit(&child_sub).await.unwrap(); let child = store.pop_next().await.unwrap().unwrap(); - store.complete(child.id, &TaskMetrics::default()).await.unwrap(); + store + .complete(child.id, &TaskMetrics::default()) + .await + .unwrap(); // Check history record has parent_id. let hist = store.history(10, 0).await.unwrap(); @@ -2115,7 +2154,10 @@ mod tests { let task = store.pop_next().await.unwrap().unwrap(); assert!(!task.fail_fast); - store.complete(task.id, &TaskMetrics::default()).await.unwrap(); + store + .complete(task.id, &TaskMetrics::default()) + .await + .unwrap(); let hist = store.history(10, 0).await.unwrap(); assert!(!hist[0].fail_fast);