From 0a8d8353d063c8be0f2b5848897c18f9a11f5766 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 08:49:29 -0700 Subject: [PATCH 1/2] feat(taskmill): add hierarchical child tasks with two-phase execution Parent tasks can spawn children during execution via ChildSpawner. Parents enter a Waiting state until all children finish, then a finalize() phase runs before the parent completes. Configurable fail_fast flag controls whether first child failure cancels siblings. Cancelling a parent cascades to all children. Also fixes a pool self-deadlock in store.complete/fail where the connection was held across maybe_prune(), and changes the default retention policy to MaxCount(10,000). --- migrations/001_tasks.sql | 16 +- src/lib.rs | 7 +- src/registry.rs | 107 ++++++++- src/scheduler/dispatch.rs | 197 +++++++++++++++- src/scheduler/mod.rs | 461 +++++++++++++++++++++++++++++++++++++- src/store.rs | 439 +++++++++++++++++++++++++++++++++++- src/task.rs | 52 +++++ 7 files changed, 1241 insertions(+), 38 deletions(-) diff --git a/migrations/001_tasks.sql b/migrations/001_tasks.sql index 6e92f71..a90205f 100644 --- a/migrations/001_tasks.sql +++ b/migrations/001_tasks.sql @@ -18,6 +18,8 @@ CREATE TABLE IF NOT EXISTS tasks ( started_at TEXT, requeue INTEGER NOT NULL DEFAULT 0, requeue_priority INTEGER, + parent_id INTEGER, + fail_fast INTEGER NOT NULL DEFAULT 1, UNIQUE(key) ); @@ -43,7 +45,9 @@ CREATE TABLE IF NOT EXISTS task_history ( created_at TEXT NOT NULL, started_at TEXT, completed_at TEXT NOT NULL DEFAULT (datetime('now')), - duration_ms INTEGER + duration_ms INTEGER, + parent_id INTEGER, + fail_fast INTEGER NOT NULL DEFAULT 1 ); -- Index for IO learning: recent completions by task type. @@ -62,3 +66,13 @@ CREATE INDEX IF NOT EXISTS idx_history_completed -- Index for filtering history by status (e.g. listing failed tasks). CREATE INDEX IF NOT EXISTS idx_history_status ON task_history (status, completed_at DESC); + +-- Index for looking up children of a parent task (active queue). +CREATE INDEX IF NOT EXISTS idx_tasks_parent + ON tasks (parent_id) + WHERE parent_id IS NOT NULL; + +-- Index for looking up children of a parent task (history). +CREATE INDEX IF NOT EXISTS idx_task_history_parent + ON task_history (parent_id) + WHERE parent_id IS NOT NULL; diff --git a/src/lib.rs b/src/lib.rs index ee3db34..c7ea34b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ pub mod task; // Convenience re-exports. pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy}; pub use priority::Priority; -pub use registry::{StateMap, TaskContext, TaskExecutor}; +pub use registry::{ChildSpawner, StateMap, TaskContext, TaskExecutor}; pub use resource::sampler::{SamplerConfig, SmoothedReader}; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ @@ -41,8 +41,9 @@ pub use scheduler::{ }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ - generate_dedup_key, HistoryStatus, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup, - TaskRecord, TaskResult, TaskStatus, TaskSubmission, TypeStats, TypedTask, + generate_dedup_key, HistoryStatus, ParentResolution, SubmitOutcome, TaskError, + TaskHistoryRecord, TaskLookup, TaskRecord, TaskResult, TaskStatus, TaskSubmission, TypeStats, + TypedTask, }; #[cfg(feature = "sysinfo-monitor")] diff --git a/src/registry.rs b/src/registry.rs index f903c80..56e3406 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -7,7 +7,8 @@ use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use crate::scheduler::ProgressReporter; -use crate::task::{TaskError, TaskRecord, TaskResult, TypedTask}; +use crate::store::{StoreError, TaskStore}; +use crate::task::{SubmitOutcome, TaskError, TaskRecord, TaskResult, TaskSubmission, TypedTask}; // ── State Map ──────────────────────────────────────────────────────── @@ -69,6 +70,56 @@ impl StateMap { } } +// ── Child Spawner ──────────────────────────────────────────────── + +/// Handle for spawning child tasks from within an executor. +/// +/// Wraps a [`TaskStore`] reference and the parent task ID so that +/// child submissions automatically inherit the parent relationship. +/// Holds a `Notify` reference to wake the scheduler run loop after +/// spawning, so children are dispatched promptly. +#[derive(Clone)] +pub struct ChildSpawner { + store: TaskStore, + parent_id: i64, + work_notify: Arc, +} + +impl ChildSpawner { + pub(crate) fn new( + store: TaskStore, + parent_id: i64, + work_notify: Arc, + ) -> Self { + Self { + store, + parent_id, + work_notify, + } + } + + /// Submit a single child task. Sets `parent_id` automatically. + pub async fn spawn(&self, mut sub: TaskSubmission) -> Result { + sub.parent_id = Some(self.parent_id); + let outcome = self.store.submit(&sub).await?; + self.work_notify.notify_one(); + Ok(outcome) + } + + /// Submit multiple child tasks in a single transaction. + pub async fn spawn_batch( + &self, + submissions: &mut [TaskSubmission], + ) -> Result, StoreError> { + for sub in submissions.iter_mut() { + sub.parent_id = Some(self.parent_id); + } + let outcomes = self.store.submit_batch(submissions).await?; + self.work_notify.notify_one(); + Ok(outcomes) + } +} + // ── Task Context ───────────────────────────────────────────────────── /// Execution context passed to a [`TaskExecutor`]. @@ -86,6 +137,8 @@ pub struct TaskContext { pub progress: ProgressReporter, /// Shared application state set via [`SchedulerBuilder::app_state`]. pub(crate) app_state: StateSnapshot, + /// Spawner for creating child tasks. `None` for non-hierarchical contexts. + pub(crate) child_spawner: Option, } impl TaskContext { @@ -115,6 +168,31 @@ impl TaskContext { pub fn state(&self) -> Option<&T> { self.app_state.get::() } + + /// Spawn a child task that will be tracked under this task as parent. + /// + /// The child's `parent_id` is set automatically. Returns the submit + /// outcome, or `None` if this context was not created with hierarchy + /// support (should not happen in normal scheduler operation). + pub async fn spawn_child(&self, sub: TaskSubmission) -> Result { + let spawner = self + .child_spawner + .as_ref() + .expect("spawn_child called on a context without ChildSpawner"); + spawner.spawn(sub).await + } + + /// Spawn multiple child tasks in a single transaction. + pub async fn spawn_children( + &self, + mut submissions: Vec, + ) -> Result, StoreError> { + let spawner = self + .child_spawner + .as_ref() + .expect("spawn_children called on a context without ChildSpawner"); + spawner.spawn_batch(&mut submissions).await + } } /// Executes tasks of a registered type. @@ -156,6 +234,21 @@ pub trait TaskExecutor: Send + Sync + 'static { &'a self, ctx: &'a TaskContext, ) -> impl Future> + Send + 'a; + + /// Called after all children of a parent task have completed. + /// + /// Only invoked for tasks that spawned children via + /// [`TaskContext::spawn_child`]. The default implementation is a no-op + /// that returns zero IO bytes. + /// + /// Use this for cleanup or assembly work (e.g. calling + /// `CompleteMultipartUpload` after all parts have been uploaded). + fn finalize<'a>( + &'a self, + _ctx: &'a TaskContext, + ) -> impl Future> + Send + 'a { + async { Ok(TaskResult::zero()) } + } } /// Registry mapping task type names to their executors. @@ -176,6 +269,11 @@ pub(crate) trait ErasedExecutor: Send + Sync + 'static { &'a self, ctx: &'a TaskContext, ) -> std::pin::Pin> + Send + 'a>>; + + fn finalize_erased<'a>( + &'a self, + ctx: &'a TaskContext, + ) -> std::pin::Pin> + Send + 'a>>; } impl ErasedExecutor for T { @@ -185,6 +283,13 @@ impl ErasedExecutor for T { ) -> std::pin::Pin> + Send + 'a>> { Box::pin(self.execute(ctx)) } + + fn finalize_erased<'a>( + &'a self, + ctx: &'a TaskContext, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(self.finalize(ctx)) + } } impl TaskTypeRegistry { diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index e55b173..9d9e619 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -5,9 +5,9 @@ use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use crate::priority::Priority; -use crate::registry::TaskContext; +use crate::registry::{ChildSpawner, TaskContext}; use crate::store::TaskStore; -use crate::task::TaskRecord; +use crate::task::{ParentResolution, TaskRecord}; use super::progress::ProgressReporter; use super::SchedulerEvent; @@ -34,12 +34,16 @@ pub(crate) struct ActiveTask { #[derive(Clone)] pub(crate) struct ActiveTaskMap { inner: Arc>>, + /// IDs of parent tasks ready for finalization. Populated by + /// `handle_parent_resolution` when all children complete. + pub(crate) pending_finalizers: Arc>>, } impl ActiveTaskMap { pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(HashMap::new())), + pending_finalizers: Arc::new(Mutex::new(Vec::new())), } } @@ -178,15 +182,37 @@ impl ActiveTaskMap { /// /// Inserts the task into the active map, starts a progress listener, /// and spawns the executor. +/// Whether to call `execute` or `finalize` on the executor. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ExecutionPhase { + Execute, + Finalize, +} + +/// Shared scheduler resources passed to each spawned task. +pub(crate) struct SpawnContext { + pub store: TaskStore, + pub active: ActiveTaskMap, + pub event_tx: tokio::sync::broadcast::Sender, + pub max_retries: i32, + pub app_state: crate::registry::StateSnapshot, + pub work_notify: Arc, +} + pub(crate) async fn spawn_task( task: TaskRecord, executor: Arc, - store: TaskStore, - active: ActiveTaskMap, - event_tx: tokio::sync::broadcast::Sender, - max_retries: i32, - app_state: crate::registry::StateSnapshot, + ctx: SpawnContext, + phase: ExecutionPhase, ) { + let SpawnContext { + store, + active, + event_tx, + max_retries, + app_state, + work_notify, + } = ctx; let child_token = CancellationToken::new(); // Insert into active map before spawning to avoid races. @@ -203,6 +229,7 @@ pub(crate) async fn spawn_task( .await; // Build execution context. + let child_spawner = ChildSpawner::new(store.clone(), task.id, work_notify.clone()); let ctx = TaskContext { record: task.clone(), token: child_token.clone(), @@ -213,6 +240,7 @@ pub(crate) async fn spawn_task( event_tx.clone(), ), app_state, + child_spawner: Some(child_spawner), }; // Emit dispatched event. @@ -246,26 +274,77 @@ pub(crate) async fn spawn_task( let token_for_spawn = child_token.clone(); tokio::spawn(async move { let task_id = task.id; - let result = executor.execute_erased(&ctx).await; + let result = match phase { + ExecutionPhase::Execute => executor.execute_erased(&ctx).await, + ExecutionPhase::Finalize => executor.finalize_erased(&ctx).await, + }; // Drop the context (and its progress reporter) — executor is done. drop(ctx); match result { Ok(tr) => { + // For the execute phase, check if the task spawned children. + // If so, transition to waiting instead of completing. + if phase == ExecutionPhase::Execute { + match store.active_children_count(task_id).await { + Ok(count) if count > 0 => { + if let Err(e) = store.set_waiting(task_id).await { + tracing::error!(task_id, error = %e, "failed to set task to waiting"); + } + active.remove(task_id).await; + let _ = event_tx.send(SchedulerEvent::Waiting { + task_id, + children_count: count, + }); + // Children may have completed before we set waiting. + // Re-check to avoid a missed finalization. + handle_parent_resolution( + task_id, + &store, + &active, + &event_tx, + max_retries, + &work_notify, + ) + .await; + // Wake the scheduler to dispatch children (or finalizer). + work_notify.notify_one(); + return; + } + Err(e) => { + tracing::error!(task_id, error = %e, "failed to check children count"); + // Fall through to normal completion. + } + _ => { + // No children — complete normally. + } + } + } + if let Err(e) = store.complete(task_id, &tr).await { tracing::error!(task_id, error = %e, "failed to record task completion"); } // Remove from active tracking AFTER the store write completes. - // This keeps the concurrency slot occupied, preventing the - // scheduler from dispatching new tasks that would create - // concurrent SQLite write transactions (which cause SQLITE_BUSY). active.remove(task_id).await; let _ = event_tx.send(SchedulerEvent::Completed { task_id, task_type: task.task_type.clone(), key: task.key.clone(), }); + + // If this was a child task, check if parent is ready. + if let Some(parent_id) = task.parent_id { + handle_parent_resolution( + parent_id, + &store, + &active, + &event_tx, + max_retries, + &work_notify, + ) + .await; + } } Err(te) => { // If cancelled (preempted), the scheduler already paused it. @@ -301,10 +380,104 @@ pub(crate) async fn spawn_task( task_id, task_type: task.task_type.clone(), key: task.key.clone(), - error: te.message, + error: te.message.clone(), will_retry, }); + + // If this child failed permanently and parent is fail_fast, + // cancel siblings and fail the parent. + if !will_retry { + if let Some(parent_id) = task.parent_id { + // Check if parent uses fail_fast. + if let Ok(Some(parent)) = store.task_by_id(parent_id).await { + if parent.fail_fast { + // Cancel remaining siblings. + if let Ok(running_ids) = store.cancel_children(parent_id).await { + for rid in &running_ids { + if let Some(at) = active.remove(*rid).await { + at.token.cancel(); + let _ = store.delete(*rid).await; + let _ = event_tx.send(SchedulerEvent::Cancelled { + task_id: *rid, + task_type: at.record.task_type.clone(), + key: at.record.key.clone(), + }); + } + } + } + // Fail the parent. + let msg = format!("child task {task_id} failed: {}", te.message); + if let Err(e) = store.fail(parent_id, &msg, false, 0, 0, 0).await { + tracing::error!( + parent_id, + error = %e, + "failed to record parent failure" + ); + } + let _ = event_tx.send(SchedulerEvent::Failed { + task_id: parent_id, + task_type: parent.task_type.clone(), + key: parent.key.clone(), + error: msg, + will_retry: false, + }); + } else { + // Not fail_fast — check if all children done. + handle_parent_resolution( + parent_id, + &store, + &active, + &event_tx, + max_retries, + &work_notify, + ) + .await; + } + } + } + } } } }); } + +/// Check if a waiting parent is ready for finalization or has failed, +/// and dispatch the finalize phase if ready. +async fn handle_parent_resolution( + parent_id: i64, + store: &TaskStore, + active: &ActiveTaskMap, + event_tx: &tokio::sync::broadcast::Sender, + _max_retries: i32, + work_notify: &Arc, +) { + match store.try_resolve_parent(parent_id).await { + Ok(Some(ParentResolution::ReadyToFinalize)) => { + // Enqueue parent for finalize dispatch. + active.pending_finalizers.lock().await.push(parent_id); + // Wake the scheduler to dispatch the finalize phase. + work_notify.notify_one(); + } + Ok(Some(ParentResolution::Failed(reason))) => { + // All children done but some failed — fail the parent. + if let Ok(Some(parent)) = store.task_by_id(parent_id).await { + if let Err(e) = store.fail(parent_id, &reason, false, 0, 0, 0).await { + tracing::error!(parent_id, error = %e, "failed to record parent failure"); + } + let _ = event_tx.send(SchedulerEvent::Failed { + task_id: parent_id, + task_type: parent.task_type.clone(), + key: parent.key.clone(), + error: reason, + will_retry: false, + }); + } + } + Ok(Some(ParentResolution::StillWaiting)) | Ok(None) => { + // Children still active or parent not found — nothing to do. + } + Err(e) => { + tracing::error!(parent_id, error = %e, "failed to resolve parent"); + } + } +} diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 84ce287..65e19d1 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -37,6 +37,8 @@ pub struct SchedulerSnapshot { pub pending_count: i64, /// Number of tasks paused (preempted). pub paused_count: i64, + /// Number of parent tasks waiting for children to complete. + pub waiting_count: i64, /// Progress estimates for every running task. pub progress: Vec, /// Aggregate backpressure (0.0–1.0). @@ -100,6 +102,9 @@ pub enum SchedulerEvent { /// Optional human-readable message from the executor. message: Option, }, + /// A parent task entered the waiting state after its executor returned + /// and it has active children. + Waiting { task_id: i64, children_count: i64 }, /// The scheduler was globally paused via [`Scheduler::pause_all`]. Paused, /// The scheduler was resumed via [`Scheduler::resume_all`]. @@ -175,7 +180,7 @@ struct SchedulerInner { /// Global pause flag — when `true`, the run loop skips dispatching. paused: AtomicBool, /// Wakes the run loop when new work is submitted or the scheduler is resumed. - work_notify: Notify, + work_notify: Arc, } /// IO-aware priority scheduler. @@ -240,7 +245,7 @@ impl Scheduler { sampler_token: CancellationToken::new(), app_state, paused: AtomicBool::new(false), - work_notify: Notify::new(), + work_notify: Arc::new(Notify::new()), }), } } @@ -405,6 +410,20 @@ impl Scheduler { /// it is deleted from the store. Returns `true` if the task was found /// and cancelled. pub async fn cancel(&self, task_id: i64) -> Result { + // Cancel children first (cascade). + let running_child_ids = self.inner.store.cancel_children(task_id).await?; + for child_id in &running_child_ids { + if let Some(at) = self.inner.active.remove(*child_id).await { + at.token.cancel(); + let _ = self.inner.store.delete(*child_id).await; + let _ = self.inner.event_tx.send(SchedulerEvent::Cancelled { + task_id: *child_id, + task_type: at.record.task_type.clone(), + key: at.record.key.clone(), + }); + } + } + // Check if it's an active (running) task first. if let Some(at) = self.inner.active.remove(task_id).await { at.token.cancel(); @@ -417,7 +436,7 @@ impl Scheduler { return Ok(true); } - // Not active — try to delete from the queue (pending/paused). + // Not active — try to delete from the queue (pending/paused/waiting). let deleted = self.inner.store.delete(task_id).await?; Ok(deleted) } @@ -486,11 +505,68 @@ impl Scheduler { dispatch::spawn_task( task, executor, - self.inner.store.clone(), - self.inner.active.clone(), - self.inner.event_tx.clone(), - self.inner.max_retries, - self.inner.app_state.snapshot().await, + dispatch::SpawnContext { + store: self.inner.store.clone(), + active: self.inner.active.clone(), + event_tx: self.inner.event_tx.clone(), + max_retries: self.inner.max_retries, + app_state: self.inner.app_state.snapshot().await, + work_notify: Arc::clone(&self.inner.work_notify), + }, + dispatch::ExecutionPhase::Execute, + ) + .await; + + Ok(true) + } + + /// Try to dispatch a parent task for its finalize phase. + /// + /// Returns `true` if a finalizer was dispatched. + async fn try_dispatch_finalizer(&self) -> Result { + // Pop the next pending finalizer. + let parent_id = { + let mut finalizers = self.inner.active.pending_finalizers.lock().await; + if finalizers.is_empty() { + return Ok(false); + } + finalizers.remove(0) + }; + + // Transition the parent from waiting to running for finalize. + self.inner.store.set_running_for_finalize(parent_id).await?; + + // Fetch the parent record (now running). + let Some(task) = self.inner.store.task_by_id(parent_id).await? else { + return Ok(false); + }; + + // Look up executor. + let Some(executor) = self.inner.registry.get(&task.task_type) else { + tracing::error!( + task_type = task.task_type, + "no executor registered for finalize — failing parent" + ); + self.inner + .store + .fail(parent_id, "no executor for finalize", false, 0, 0, 0) + .await?; + return Ok(true); + }; + let executor = Arc::clone(executor); + + dispatch::spawn_task( + task, + executor, + dispatch::SpawnContext { + store: self.inner.store.clone(), + active: self.inner.active.clone(), + event_tx: self.inner.event_tx.clone(), + max_retries: self.inner.max_retries, + app_state: self.inner.app_state.snapshot().await, + work_notify: Arc::clone(&self.inner.work_notify), + }, + dispatch::ExecutionPhase::Finalize, ) .await; @@ -529,7 +605,7 @@ impl Scheduler { } } - /// Resume paused tasks and dispatch pending work. + /// Resume paused tasks, dispatch finalizers, and dispatch pending work. async fn poll_and_dispatch(&self) { if self.is_paused() { return; @@ -549,6 +625,18 @@ impl Scheduler { } } + // Dispatch any pending finalizers (parent tasks ready for finalize phase). + loop { + match self.try_dispatch_finalizer().await { + Ok(true) => continue, + Ok(false) => break, + Err(e) => { + tracing::error!(error = %e, "scheduler finalizer dispatch error"); + break; + } + } + } + // Try to dispatch tasks until we can't. loop { match self.try_dispatch().await { @@ -630,6 +718,7 @@ impl Scheduler { let running = self.inner.active.records().await; let pending_count = self.inner.store.pending_count().await?; let paused_count = self.inner.store.paused_count().await?; + let waiting_count = self.inner.store.waiting_count().await?; let progress = self.estimated_progress().await; let pressure = self.inner.gate.pressure().await; let pressure_breakdown = self.inner.gate.pressure_breakdown().await; @@ -639,6 +728,7 @@ impl Scheduler { running, pending_count, paused_count, + waiting_count, progress, pressure, pressure_breakdown, @@ -1063,6 +1153,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1108,6 +1200,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1130,6 +1224,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }; let first = sched.submit(&sub).await.unwrap(); @@ -1158,6 +1254,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap() @@ -1189,6 +1287,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap() @@ -1216,6 +1316,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1247,6 +1349,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1316,6 +1420,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1350,6 +1456,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1431,6 +1539,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1453,6 +1563,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1476,6 +1588,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }) .await .unwrap(); @@ -1523,4 +1637,333 @@ mod tests { let result = sched.lookup_typed(&task).await.unwrap(); assert!(matches!(result, crate::task::TaskLookup::Active(_))); } + + // ── Hierarchy tests ───────────────────────────────────────────── + + /// An executor that spawns N child tasks during execution. + struct SpawningExecutor { + num_children: usize, + } + + impl TaskExecutor for SpawningExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + for i in 0..self.num_children { + let sub = TaskSubmission { + task_type: "child".into(), + key: Some(format!("child-{i}")), + priority: ctx.record.priority, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, // spawn_child sets this + fail_fast: true, + }; + ctx.spawn_child(sub).await.map_err(|e| TaskError { + message: e.to_string(), + retryable: false, + actual_read_bytes: 0, + actual_write_bytes: 0, + })?; + } + Ok(TaskResult::zero()) + } + } + + /// An executor that records whether finalize was called. + struct FinalizeTrackingExecutor { + children: usize, + finalized: Arc, + } + + impl TaskExecutor for FinalizeTrackingExecutor { + async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { + for i in 0..self.children { + let sub = TaskSubmission { + task_type: "child".into(), + key: Some(format!("ft-child-{i}")), + priority: ctx.record.priority, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }; + ctx.spawn_child(sub).await.map_err(|e| TaskError { + message: e.to_string(), + retryable: false, + actual_read_bytes: 0, + actual_write_bytes: 0, + })?; + } + Ok(TaskResult::zero()) + } + + async fn finalize<'a>(&'a self, _ctx: &'a TaskContext) -> Result { + self.finalized + .store(true, std::sync::atomic::Ordering::SeqCst); + Ok(TaskResult::zero()) + } + } + + #[tokio::test] + async fn parent_enters_waiting_when_children_spawned() { + let store = TaskStore::open_memory().await.unwrap(); + let mut registry = TaskTypeRegistry::new(); + registry.register_erased("parent", arc_erased(SpawningExecutor { num_children: 2 })); + registry.register_erased("child", arc_erased(InstantExecutor)); + + let sched = Scheduler::new( + store, + SchedulerConfig::default(), + Arc::new(registry), + CompositePressure::new(), + ThrottlePolicy::default_three_tier(), + ); + let mut rx = sched.subscribe(); + + // Submit parent task. + sched + .submit(&TaskSubmission { + task_type: "parent".into(), + key: Some("p1".into()), + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }) + .await + .unwrap(); + + // Dispatch parent. + sched.try_dispatch().await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Should get Dispatched, then Waiting events for the parent. + let mut saw_waiting = false; + for _ in 0..10 { + if let Ok(evt) = rx.try_recv() { + if matches!(evt, SchedulerEvent::Waiting { .. }) { + saw_waiting = true; + break; + } + } + } + assert!(saw_waiting, "expected Waiting event for parent"); + + // Parent should be in waiting status in the store. + let parent_key = crate::task::generate_dedup_key("parent", Some(b"p1")); + let parent = sched + .store() + .task_by_key(&parent_key) + .await + .unwrap() + .unwrap(); + assert_eq!(parent.status, crate::task::TaskStatus::Waiting); + + // Two children should be pending. + assert_eq!(sched.store().pending_count().await.unwrap(), 2); + } + + #[tokio::test] + async fn parent_auto_completes_after_children_finish() { + let store = TaskStore::open_memory().await.unwrap(); + let mut registry = TaskTypeRegistry::new(); + registry.register_erased("parent", arc_erased(SpawningExecutor { num_children: 2 })); + registry.register_erased("child", arc_erased(InstantExecutor)); + + let sched = Scheduler::new( + store, + SchedulerConfig::default(), + Arc::new(registry), + CompositePressure::new(), + ThrottlePolicy::default_three_tier(), + ); + let mut rx = sched.subscribe(); + + sched + .submit(&TaskSubmission { + task_type: "parent".into(), + key: Some("p-complete".into()), + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }) + .await + .unwrap(); + + // Run scheduler loop. + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for parent Completed event. + let parent_key = crate::task::generate_dedup_key("parent", Some(b"p-complete")); + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let mut parent_completed = false; + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(200), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed { task_type, .. })) if task_type == "parent" => { + parent_completed = true; + break; + } + _ => {} + } + } + + // Check before shutdown closes the pool. + let lookup = sched.store().task_lookup(&parent_key).await.unwrap(); + + token.cancel(); + let _ = handle.await; + + assert!(parent_completed, "expected parent Completed event"); + assert!( + matches!(lookup, crate::task::TaskLookup::History(ref h) if h.status == crate::task::HistoryStatus::Completed), + "expected parent in history as completed, got: {lookup:?}" + ); + } + + #[tokio::test] + async fn finalize_called_after_children_complete() { + let finalized = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + let store = TaskStore::open_memory().await.unwrap(); + let mut registry = TaskTypeRegistry::new(); + registry.register_erased( + "parent", + arc_erased(FinalizeTrackingExecutor { + children: 1, + finalized: finalized.clone(), + }), + ); + registry.register_erased("child", arc_erased(InstantExecutor)); + + let sched = Scheduler::new( + store, + SchedulerConfig::default(), + Arc::new(registry), + CompositePressure::new(), + ThrottlePolicy::default_three_tier(), + ); + let mut rx = sched.subscribe(); + + sched + .submit(&TaskSubmission { + task_type: "parent".into(), + key: Some("p-finalize".into()), + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }) + .await + .unwrap(); + + let token = CancellationToken::new(); + let sched_clone = sched.clone(); + let token_clone = token.clone(); + let handle = tokio::spawn(async move { + sched_clone.run(token_clone).await; + }); + + // Wait for parent Completed event rather than a fixed sleep. + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed { task_type, .. })) if task_type == "parent" => { + break; + } + _ => {} + } + } + + token.cancel(); + let _ = handle.await; + + assert!( + finalized.load(std::sync::atomic::Ordering::SeqCst), + "finalize() should have been called" + ); + } + + #[tokio::test] + async fn cancel_parent_cascades_to_children() { + let store = TaskStore::open_memory().await.unwrap(); + let mut registry = TaskTypeRegistry::new(); + registry.register_erased("parent", arc_erased(SpawningExecutor { num_children: 3 })); + registry.register_erased("child", arc_erased(SlowExecutor)); + + let sched = Scheduler::new( + store, + SchedulerConfig::default(), + Arc::new(registry), + CompositePressure::new(), + ThrottlePolicy::default_three_tier(), + ); + + let parent_id = sched + .submit(&TaskSubmission { + task_type: "parent".into(), + key: Some("p-cancel".into()), + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }) + .await + .unwrap() + .id() + .unwrap(); + + // Dispatch parent (which spawns children). + sched.try_dispatch().await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Cancel parent — should cascade to children. + let cancelled = sched.cancel(parent_id).await.unwrap(); + assert!(cancelled); + + // All children should be gone. + assert_eq!(sched.store().pending_count().await.unwrap(), 0); + assert_eq!(sched.store().running_count().await.unwrap(), 0); + } + + #[tokio::test] + async fn no_children_completes_normally() { + // Task without children should complete as before (backward compat). + let sched = setup(arc_erased(InstantExecutor)).await; + + sched + .submit(&TaskSubmission { + task_type: "test".into(), + key: Some("no-kids".into()), + priority: Priority::NORMAL, + payload: None, + expected_read_bytes: 0, + expected_write_bytes: 0, + parent_id: None, + fail_fast: true, + }) + .await + .unwrap(); + + sched.try_dispatch().await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let key = crate::task::generate_dedup_key("test", Some(b"no-kids")); + let lookup = sched.store().task_lookup(&key).await.unwrap(); + assert!(matches!(lookup, crate::task::TaskLookup::History(_))); + } } diff --git a/src/store.rs b/src/store.rs index 12dd188..89656d8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -7,8 +7,8 @@ use sqlx::{Row, SqlitePool}; use crate::priority::Priority; use crate::task::{ - HistoryStatus, SubmitOutcome, TaskHistoryRecord, TaskLookup, TaskRecord, TaskResult, - TaskStatus, TaskSubmission, TypeStats, MAX_PAYLOAD_BYTES, + HistoryStatus, ParentResolution, SubmitOutcome, TaskHistoryRecord, TaskLookup, TaskRecord, + TaskResult, TaskStatus, TaskSubmission, TypeStats, MAX_PAYLOAD_BYTES, }; /// Serde-friendly error type for Tauri IPC and API boundaries. @@ -79,7 +79,7 @@ impl Default for StoreConfig { fn default() -> Self { Self { max_connections: 16, - retention_policy: None, + retention_policy: Some(RetentionPolicy::MaxCount(10_000)), prune_interval: 100, } } @@ -140,7 +140,7 @@ impl TaskStore { let store = Self { pool, - retention_policy: None, + retention_policy: Some(RetentionPolicy::MaxCount(10_000)), prune_interval: 100, completion_count: std::sync::Arc::new(AtomicU64::new(0)), }; @@ -157,6 +157,8 @@ impl TaskStore { } /// Restart recovery: reset any `running` tasks back to `pending`. + /// `waiting` parents are left as-is — their children will be reset to + /// pending and will eventually re-trigger parent finalization. async fn recover_running(&self) -> Result<(), StoreError> { let result = sqlx::query( "UPDATE tasks SET status = 'pending', started_at = NULL WHERE status = 'running'", @@ -210,10 +212,11 @@ impl TaskStore { let key = sub.effective_key(); let priority = sub.priority.value() as i32; + let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT start"); let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes) - VALUES (?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -221,6 +224,8 @@ impl TaskStore { .bind(&sub.payload) .bind(sub.expected_read_bytes) .bind(sub.expected_write_bytes) + .bind(sub.parent_id) + .bind(fail_fast_val) .execute(&self.pool) .await?; tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT end"); @@ -294,9 +299,10 @@ impl TaskStore { for sub in submissions { let key = sub.effective_key(); let priority = sub.priority.value() as i32; + let fail_fast_val: i32 = if sub.fail_fast { 1 } else { 0 }; let result = sqlx::query( - "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes) - VALUES (?, ?, ?, ?, ?, ?)", + "INSERT OR IGNORE INTO tasks (task_type, key, priority, payload, expected_read_bytes, expected_write_bytes, parent_id, fail_fast) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&sub.task_type) .bind(&key) @@ -304,6 +310,8 @@ impl TaskStore { .bind(&sub.payload) .bind(sub.expected_read_bytes) .bind(sub.expected_write_bytes) + .bind(sub.parent_id) + .bind(fail_fast_val) .execute(&mut *conn) .await?; @@ -450,11 +458,12 @@ impl TaskStore { }; // Insert into history. + let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( "INSERT INTO task_history (task_type, key, priority, status, payload, expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, - retry_count, last_error, created_at, started_at, duration_ms) - VALUES (?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) + VALUES (?, ?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -472,6 +481,8 @@ impl TaskStore { .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()), ) .bind(duration_ms) + .bind(task.parent_id) + .bind(fail_fast_val) .execute(&mut *conn) .await?; @@ -501,6 +512,7 @@ impl TaskStore { } sqlx::query("COMMIT").execute(&mut *conn).await?; + drop(conn); // Release the pool connection before pruning. tracing::debug!(task_id = id, "store.complete: COMMIT ok"); self.maybe_prune().await; @@ -558,11 +570,12 @@ impl TaskStore { None }; + let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; sqlx::query( "INSERT INTO task_history (task_type, key, priority, status, payload, expected_read_bytes, expected_write_bytes, actual_read_bytes, actual_write_bytes, - retry_count, last_error, created_at, started_at, duration_ms) - VALUES (?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + retry_count, last_error, created_at, started_at, duration_ms, parent_id, fail_fast) + VALUES (?, ?, ?, 'failed', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&task.task_type) .bind(&task.key) @@ -577,6 +590,8 @@ impl TaskStore { .bind(task.created_at.format("%Y-%m-%d %H:%M:%S").to_string()) .bind(task.started_at.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())) .bind(duration_ms) + .bind(task.parent_id) + .bind(fail_fast_val) .execute(&mut *conn) .await?; @@ -587,6 +602,7 @@ impl TaskStore { } sqlx::query("COMMIT").execute(&mut *conn).await?; + drop(conn); // Release the pool connection before pruning. tracing::debug!(task_id = id, "store.fail: COMMIT ok"); self.maybe_prune().await; @@ -853,6 +869,146 @@ impl TaskStore { } } + // ── Hierarchy ─────────────────────────────────────────────────── + + /// Transition a running parent task to `waiting` status. + /// + /// Called after the parent's executor returns when it has spawned children. + pub async fn set_waiting(&self, id: i64) -> Result<(), StoreError> { + sqlx::query( + "UPDATE tasks SET status = 'waiting', started_at = NULL WHERE id = ? AND status = 'running'", + ) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Transition a waiting parent task back to `running` for finalization. + pub async fn set_running_for_finalize(&self, id: i64) -> Result<(), StoreError> { + sqlx::query( + "UPDATE tasks SET status = 'running', started_at = datetime('now') WHERE id = ? AND status = 'waiting'", + ) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Count of active (non-terminal) children for a parent task. + pub async fn active_children_count(&self, parent_id: i64) -> Result { + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE parent_id = ?") + .bind(parent_id) + .fetch_one(&self.pool) + .await?; + Ok(count.0) + } + + /// List active children of a parent task. + pub async fn children(&self, parent_id: i64) -> Result, StoreError> { + let rows = sqlx::query("SELECT * FROM tasks WHERE parent_id = ? ORDER BY id ASC") + .bind(parent_id) + .fetch_all(&self.pool) + .await?; + Ok(rows.iter().map(row_to_task_record).collect()) + } + + /// Count of children that completed successfully in history. + pub async fn completed_children_count(&self, parent_id: i64) -> Result { + let count: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM task_history WHERE parent_id = ? AND status = 'completed'", + ) + .bind(parent_id) + .fetch_one(&self.pool) + .await?; + Ok(count.0) + } + + /// Count of children that failed permanently in history. + pub async fn failed_children_count(&self, parent_id: i64) -> Result { + let count: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM task_history WHERE parent_id = ? AND status = 'failed'", + ) + .bind(parent_id) + .fetch_one(&self.pool) + .await?; + Ok(count.0) + } + + /// Cancel all active children of a parent task. + /// + /// Deletes pending/paused children from the active queue and returns the + /// IDs of running children (whose cancellation tokens the scheduler must + /// cancel separately). + pub async fn cancel_children(&self, parent_id: i64) -> Result, StoreError> { + // Collect IDs of running children (scheduler needs to cancel their tokens). + let running_rows = + sqlx::query("SELECT id FROM tasks WHERE parent_id = ? AND status = 'running'") + .bind(parent_id) + .fetch_all(&self.pool) + .await?; + let running_ids: Vec = running_rows.iter().map(|r| r.get("id")).collect(); + + // Delete all non-running children from the active queue. + sqlx::query("DELETE FROM tasks WHERE parent_id = ? AND status IN ('pending', 'paused')") + .bind(parent_id) + .execute(&self.pool) + .await?; + + Ok(running_ids) + } + + /// Atomically check whether a waiting parent is ready to finalize or has failed. + /// + /// Called after a child completes or fails. The parent must be in `waiting` + /// status for resolution to proceed. + pub async fn try_resolve_parent( + &self, + parent_id: i64, + ) -> Result, StoreError> { + // Check the parent exists and is waiting. + let parent = self.task_by_id(parent_id).await?; + let Some(parent) = parent else { + return Ok(None); + }; + if parent.status != TaskStatus::Waiting { + return Ok(None); + } + + let active_count = self.active_children_count(parent_id).await?; + if active_count > 0 { + return Ok(Some(ParentResolution::StillWaiting)); + } + + // All children are terminal — check for failures. + let failed_count = self.failed_children_count(parent_id).await?; + if failed_count > 0 { + return Ok(Some(ParentResolution::Failed(format!( + "{failed_count} child task(s) failed" + )))); + } + + Ok(Some(ParentResolution::ReadyToFinalize)) + } + + /// Count of waiting tasks (parents waiting for children). + pub async fn waiting_count(&self) -> Result { + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE status = 'waiting'") + .fetch_one(&self.pool) + .await?; + Ok(count.0) + } + + /// Waiting tasks (parents waiting for children). + pub async fn waiting_tasks(&self) -> Result, StoreError> { + let rows = sqlx::query( + "SELECT * FROM tasks WHERE status = 'waiting' ORDER BY priority ASC, id ASC", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows.iter().map(row_to_task_record).collect()) + } + // ── Pruning ───────────────────────────────────────────────────── /// Prune history records older than `max_age_days` days. @@ -948,6 +1104,8 @@ fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { let requeue_val: i32 = row.get("requeue"); let requeue_priority_val: Option = row.get("requeue_priority"); + let parent_id: Option = row.get("parent_id"); + let fail_fast_val: i32 = row.get("fail_fast"); TaskRecord { id: row.get("id"), @@ -964,6 +1122,8 @@ fn row_to_task_record(row: &sqlx::sqlite::SqliteRow) -> TaskRecord { started_at: started_at_str.map(|s| parse_datetime(&s)), requeue: requeue_val != 0, requeue_priority: requeue_priority_val.map(|p| Priority::new(p as u8)), + parent_id, + fail_fast: fail_fast_val != 0, } } @@ -973,6 +1133,8 @@ fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistoryRecord { let created_at_str: String = row.get("created_at"); let started_at_str: Option = row.get("started_at"); let completed_at_str: String = row.get("completed_at"); + let parent_id: Option = row.get("parent_id"); + let fail_fast_val: i32 = row.get("fail_fast"); TaskHistoryRecord { id: row.get("id"), @@ -991,6 +1153,8 @@ fn row_to_history_record(row: &sqlx::sqlite::SqliteRow) -> TaskHistoryRecord { started_at: started_at_str.map(|s| parse_datetime(&s)), completed_at: parse_datetime(&completed_at_str), duration_ms: row.get("duration_ms"), + parent_id, + fail_fast: fail_fast_val != 0, } } @@ -1010,6 +1174,8 @@ mod tests { payload: Some(b"hello".to_vec()), expected_read_bytes: 1000, expected_write_bytes: 500, + parent_id: None, + fail_fast: true, } } @@ -1185,6 +1351,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }; let sub_b = TaskSubmission { task_type: "type_b".into(), @@ -1193,6 +1361,8 @@ mod tests { payload: None, expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }; let first = store.submit(&sub_a).await.unwrap(); @@ -1214,6 +1384,8 @@ mod tests { payload: Some(b"same-data".to_vec()), expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }; let first = store.submit(&sub).await.unwrap(); @@ -1719,6 +1891,8 @@ mod tests { payload: Some(vec![0u8; MAX_PAYLOAD_BYTES + 1]), expected_read_bytes: 0, expected_write_bytes: 0, + parent_id: None, + fail_fast: true, }; // The oversized payload should fail the entire batch — no partial inserts. @@ -1729,4 +1903,245 @@ mod tests { let count = store.pending_count().await.unwrap(); assert_eq!(count, 0); } + + // ── Hierarchy tests ───────────────────────────────────────────── + + #[tokio::test] + async fn parent_child_relationship_persisted() { + let store = test_store().await; + + // Submit parent. + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + + // Submit child with parent_id. + let mut child_sub = make_submission("child-1", Priority::NORMAL); + child_sub.parent_id = Some(parent_id); + let child_id = store.submit(&child_sub).await.unwrap().id().unwrap(); + + // Verify parent_id is persisted. + let child = store.task_by_id(child_id).await.unwrap().unwrap(); + assert_eq!(child.parent_id, Some(parent_id)); + + // Parent has no parent_id. + let parent = store.task_by_id(parent_id).await.unwrap().unwrap(); + assert_eq!(parent.parent_id, None); + } + + #[tokio::test] + async fn active_children_count_tracks_children() { + let store = test_store().await; + + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + + // No children yet. + assert_eq!(store.active_children_count(parent_id).await.unwrap(), 0); + + // Add two children. + for i in 0..2 { + let mut sub = make_submission(&format!("child-{i}"), Priority::NORMAL); + sub.parent_id = Some(parent_id); + store.submit(&sub).await.unwrap(); + } + assert_eq!(store.active_children_count(parent_id).await.unwrap(), 2); + } + + #[tokio::test] + async fn set_waiting_and_waiting_tasks() { + let store = test_store().await; + + let sub = make_submission("waiter", Priority::NORMAL); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + + store.set_waiting(task.id).await.unwrap(); + + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Waiting); + + let waiting = store.waiting_tasks().await.unwrap(); + assert_eq!(waiting.len(), 1); + assert_eq!(store.waiting_count().await.unwrap(), 1); + } + + #[tokio::test] + async fn try_resolve_parent_ready_to_finalize() { + let store = test_store().await; + + // Create parent, pop it, set to waiting. + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + store.pop_next().await.unwrap(); + store.set_waiting(parent_id).await.unwrap(); + + // Create and complete a child. + let mut child_sub = make_submission("child", Priority::NORMAL); + child_sub.parent_id = Some(parent_id); + store.submit(&child_sub).await.unwrap(); + let child = store.pop_next().await.unwrap().unwrap(); + store.complete(child.id, &TaskResult::zero()).await.unwrap(); + + // Parent should be ready to finalize. + let resolution = store.try_resolve_parent(parent_id).await.unwrap(); + assert_eq!(resolution, Some(ParentResolution::ReadyToFinalize)); + } + + #[tokio::test] + async fn try_resolve_parent_still_waiting() { + let store = test_store().await; + + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + store.pop_next().await.unwrap(); + store.set_waiting(parent_id).await.unwrap(); + + // Create two children, complete only one. + for i in 0..2 { + let mut sub = make_submission(&format!("child-{i}"), Priority::NORMAL); + sub.parent_id = Some(parent_id); + store.submit(&sub).await.unwrap(); + } + let child = store.pop_next().await.unwrap().unwrap(); + store.complete(child.id, &TaskResult::zero()).await.unwrap(); + + let resolution = store.try_resolve_parent(parent_id).await.unwrap(); + assert_eq!(resolution, Some(ParentResolution::StillWaiting)); + } + + #[tokio::test] + async fn try_resolve_parent_failed() { + let store = test_store().await; + + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + store.pop_next().await.unwrap(); + store.set_waiting(parent_id).await.unwrap(); + + // Create child, fail it permanently. + let mut child_sub = make_submission("child", Priority::NORMAL); + child_sub.parent_id = Some(parent_id); + store.submit(&child_sub).await.unwrap(); + let child = store.pop_next().await.unwrap().unwrap(); + store.fail(child.id, "boom", false, 0, 0, 0).await.unwrap(); + + let resolution = store.try_resolve_parent(parent_id).await.unwrap(); + assert_eq!( + resolution, + Some(ParentResolution::Failed("1 child task(s) failed".into())) + ); + } + + #[tokio::test] + async fn cancel_children_removes_pending_returns_running() { + let store = test_store().await; + + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + + // Pop the parent first so it's running (it was submitted first). + let _parent = store.pop_next().await.unwrap().unwrap(); + + // Create 3 children. + for i in 0..3 { + let mut sub = make_submission(&format!("child-{i}"), Priority::NORMAL); + sub.parent_id = Some(parent_id); + store.submit(&sub).await.unwrap(); + } + + // Pop one child so it's running. + let running_child = store.pop_next().await.unwrap().unwrap(); + + // Cancel children. + let running_ids = store.cancel_children(parent_id).await.unwrap(); + assert_eq!(running_ids.len(), 1); + assert_eq!(running_ids[0], running_child.id); + + // Pending children should be deleted. + assert_eq!(store.active_children_count(parent_id).await.unwrap(), 1); + // Only the running one remains. + } + + #[tokio::test] + async fn parent_id_persisted_in_history() { + let store = test_store().await; + + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + + // Pop the parent first (it was submitted first, same priority). + let _parent = store.pop_next().await.unwrap().unwrap(); + + let mut child_sub = make_submission("child", Priority::NORMAL); + child_sub.parent_id = Some(parent_id); + store.submit(&child_sub).await.unwrap(); + let child = store.pop_next().await.unwrap().unwrap(); + + store.complete(child.id, &TaskResult::zero()).await.unwrap(); + + // Check history record has parent_id. + let hist = store.history(10, 0).await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].parent_id, Some(parent_id)); + } + + #[tokio::test] + async fn fail_fast_field_persisted() { + let store = test_store().await; + + let mut sub = make_submission("ff", Priority::NORMAL); + sub.fail_fast = false; + store.submit(&sub).await.unwrap(); + + let task = store.pop_next().await.unwrap().unwrap(); + assert!(!task.fail_fast); + + store.complete(task.id, &TaskResult::zero()).await.unwrap(); + + let hist = store.history(10, 0).await.unwrap(); + assert!(!hist[0].fail_fast); + } + + #[tokio::test] + async fn set_running_for_finalize() { + let store = test_store().await; + + let sub = make_submission("fin", Priority::NORMAL); + store.submit(&sub).await.unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store.set_waiting(task.id).await.unwrap(); + + store.set_running_for_finalize(task.id).await.unwrap(); + let t = store.task_by_id(task.id).await.unwrap().unwrap(); + assert_eq!(t.status, TaskStatus::Running); + assert!(t.started_at.is_some()); + } + + #[tokio::test] + async fn recover_preserves_waiting_parents() { + let store = test_store().await; + + // Create a parent and a child. + let parent_sub = make_submission("parent", Priority::NORMAL); + let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); + store.pop_next().await.unwrap(); + store.set_waiting(parent_id).await.unwrap(); + + let mut child_sub = make_submission("child", Priority::NORMAL); + child_sub.parent_id = Some(parent_id); + store.submit(&child_sub).await.unwrap(); + let child = store.pop_next().await.unwrap().unwrap(); + assert_eq!(child.status, TaskStatus::Running); + + // Simulate crash recovery. + store.recover_running().await.unwrap(); + + // Parent should still be waiting. + let parent = store.task_by_id(parent_id).await.unwrap().unwrap(); + assert_eq!(parent.status, TaskStatus::Waiting); + + // Child should be reset to pending. + let child = store.task_by_id(child.id).await.unwrap().unwrap(); + assert_eq!(child.status, TaskStatus::Pending); + } } diff --git a/src/task.rs b/src/task.rs index c75d13c..050a0a7 100644 --- a/src/task.rs +++ b/src/task.rs @@ -15,6 +15,10 @@ pub enum TaskStatus { Pending, Running, Paused, + /// Parent task whose executor has returned but whose children are still + /// active. Transitions to `Running` (for finalize) or terminal once all + /// children complete. + Waiting, } impl TaskStatus { @@ -23,6 +27,7 @@ impl TaskStatus { Self::Pending => "pending", Self::Running => "running", Self::Paused => "paused", + Self::Waiting => "waiting", } } } @@ -35,6 +40,7 @@ impl std::str::FromStr for TaskStatus { "pending" => Ok(Self::Pending), "running" => Ok(Self::Running), "paused" => Ok(Self::Paused), + "waiting" => Ok(Self::Waiting), other => Err(format!("unknown TaskStatus: {other}")), } } @@ -86,6 +92,12 @@ pub struct TaskRecord { pub started_at: Option>, pub requeue: bool, pub requeue_priority: Option, + /// Parent task ID for hierarchical tasks. `None` for top-level tasks. + pub parent_id: Option, + /// When `true` (default), the first child failure cancels siblings and + /// fails the parent immediately. When `false`, the parent waits for all + /// children to finish before resolving. + pub fail_fast: bool, } impl TaskRecord { @@ -121,6 +133,10 @@ pub struct TaskHistoryRecord { pub started_at: Option>, pub completed_at: DateTime, pub duration_ms: Option, + /// Parent task ID for hierarchical tasks. + pub parent_id: Option, + /// Whether the parent used fail-fast semantics. + pub fail_fast: bool, } /// Reported by the executor on successful completion. @@ -201,6 +217,14 @@ pub struct TaskSubmission { pub payload: Option>, pub expected_read_bytes: i64, pub expected_write_bytes: i64, + /// Parent task ID for hierarchical tasks. Set automatically by + /// [`TaskContext::spawn_child`]. + pub parent_id: Option, + /// When `true` (default), the first child failure cancels siblings and + /// fails the parent immediately. When `false`, the parent waits for all + /// children to finish before resolving. Only meaningful for parent tasks + /// that spawn children. + pub fail_fast: bool, } impl TaskSubmission { @@ -235,6 +259,8 @@ impl TaskSubmission { payload: Some(payload), expected_read_bytes, expected_write_bytes, + parent_id: None, + fail_fast: true, }) } } @@ -293,6 +319,8 @@ impl TaskSubmission { payload: Some(payload), expected_read_bytes: task.expected_read_bytes(), expected_write_bytes: task.expected_write_bytes(), + parent_id: None, + fail_fast: true, }) } } @@ -325,6 +353,30 @@ pub struct TypeStats { pub failure_rate: f64, } +/// Resolution of a parent task after a child completes or fails. +/// +/// Returned by [`TaskStore::try_resolve_parent`] to tell the scheduler +/// what action to take on the parent. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ParentResolution { + /// All children are done and none failed — parent is ready for finalize. + ReadyToFinalize, + /// At least one child failed (terminal) — parent should fail. + Failed(String), + /// Children are still active — no action needed yet. + StillWaiting, +} + +impl TaskResult { + /// A result with zero IO bytes. + pub fn zero() -> Self { + Self { + actual_read_bytes: 0, + actual_write_bytes: 0, + } + } +} + #[cfg(test)] mod tests { use super::*; From 8e9811d26b731e3a36a9244acab4e83fb7ab00a8 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Fri, 13 Mar 2026 18:53:17 -0700 Subject: [PATCH 2/2] docs: add module-level rustdoc and crate quick-start example - Add module-level doc comments (`//!`) to all public and internal modules (backpressure, priority, registry, resource, sampler, sysinfo_monitor, scheduler, dispatch, gate, progress, store, task) - Add a `# Quick start` code example to the crate root (`lib.rs`) - Add doc examples for `PressureSource`, `ResourceSampler`, and `ProgressReporter` - Fix broken intra-doc links by qualifying cross-module references (e.g. `[TaskContext::state](crate::TaskContext::state)`) - Tighten visibility: make `StateMap`, `ChildSpawner`, `SmoothedReader`, and `run_sampler` `pub(crate)` instead of `pub` - Remove `ChildSpawner` and `SmoothedReader` from the public re-exports --- src/backpressure.rs | 28 ++++++++++++++ src/lib.rs | 66 ++++++++++++++++++++++++++++++--- src/priority.rs | 8 ++++ src/registry.rs | 19 ++++++++-- src/resource/mod.rs | 30 ++++++++++++++- src/resource/sampler.rs | 10 ++++- src/resource/sysinfo_monitor.rs | 6 +++ src/scheduler/dispatch.rs | 10 +++-- src/scheduler/gate.rs | 6 +++ src/scheduler/mod.rs | 14 +++++-- src/scheduler/progress.rs | 24 +++++++++++- src/store.rs | 8 +++- src/task.rs | 19 +++++++--- 13 files changed, 221 insertions(+), 27 deletions(-) diff --git a/src/backpressure.rs b/src/backpressure.rs index 77dd4a7..c32fbf5 100644 --- a/src/backpressure.rs +++ b/src/backpressure.rs @@ -1,9 +1,36 @@ +//! Composable backpressure for throttling task dispatch. +//! +//! Implement [`PressureSource`] to feed external signals (API load, memory +//! pressure, queue depth, etc.) into the scheduler. Multiple sources are +//! combined via [`CompositePressure`], and [`ThrottlePolicy`] maps the +//! aggregate pressure to per-priority throttle decisions. + use crate::priority::Priority; /// A source of pressure that signals the scheduler to slow down. /// /// Consumers implement this trait to feed external signals (API load, memory /// pressure, queue depth, etc.) into the scheduler's throttle decisions. +/// +/// # Example +/// +/// ```ignore +/// use std::sync::atomic::{AtomicU32, Ordering}; +/// use taskmill::PressureSource; +/// +/// struct ApiLoadPressure { +/// active_requests: AtomicU32, +/// max_requests: u32, +/// } +/// +/// impl PressureSource for ApiLoadPressure { +/// fn pressure(&self) -> f32 { +/// let current = self.active_requests.load(Ordering::Relaxed); +/// (current as f32 / self.max_requests as f32).min(1.0) +/// } +/// fn name(&self) -> &str { "api-load" } +/// } +/// ``` pub trait PressureSource: Send + Sync + 'static { /// Current pressure level between 0.0 (idle) and 1.0 (saturated). fn pressure(&self) -> f32; @@ -68,6 +95,7 @@ pub struct CompositePressure { } impl CompositePressure { + /// Create an empty composite with no pressure sources. pub fn new() -> Self { Self { sources: Vec::new(), diff --git a/src/lib.rs b/src/lib.rs index c7ea34b..2b67f46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,11 +15,67 @@ //! - Emits lifecycle events including progress for UI integration (via broadcast channel) //! - Supports graceful shutdown with configurable drain timeout //! +//! # Quick start +//! +//! ```no_run +//! use std::sync::Arc; +//! use taskmill::{ +//! Scheduler, TaskExecutor, TaskContext, TaskResult, TaskError, +//! TypedTask, Priority, +//! }; +//! use serde::{Serialize, Deserialize}; +//! use tokio_util::sync::CancellationToken; +//! +//! // 1. Define a task payload. +//! #[derive(Serialize, Deserialize)] +//! struct Thumbnail { path: String, size: u32 } +//! +//! impl TypedTask for Thumbnail { +//! const TASK_TYPE: &'static str = "thumbnail"; +//! fn expected_read_bytes(&self) -> i64 { 4_096 } +//! fn expected_write_bytes(&self) -> i64 { 1_024 } +//! } +//! +//! // 2. Implement the executor. +//! struct ThumbnailExecutor; +//! +//! impl TaskExecutor for ThumbnailExecutor { +//! async fn execute<'a>( +//! &'a self, ctx: &'a TaskContext, +//! ) -> Result { +//! let thumb: Thumbnail = ctx.deserialize_typed().unwrap().unwrap(); +//! ctx.progress.report(0.5, Some("resizing".into())); +//! // ... do work, check ctx.token.is_cancelled() ... +//! Ok(TaskResult { actual_read_bytes: 4_096, actual_write_bytes: 1_024 }) +//! } +//! } +//! +//! # async fn run() -> Result<(), Box> { +//! // 3. Build and run the scheduler. +//! let scheduler = Scheduler::builder() +//! .store_path("tasks.db") +//! .typed_executor::(Arc::new(ThumbnailExecutor)) +//! .max_concurrency(4) +//! .with_resource_monitoring() +//! .build() +//! .await?; +//! +//! // 4. Submit work. +//! let task = Thumbnail { path: "/photos/a.jpg".into(), size: 256 }; +//! scheduler.submit_typed(&task).await?; +//! +//! // 5. Run until cancelled. +//! let token = CancellationToken::new(); +//! scheduler.run(token).await; +//! # Ok(()) +//! # } +//! ``` +//! //! # Feature flags //! -//! - **`sysinfo-monitor`** (default): Enables the built-in `SysinfoSampler` for -//! cross-platform CPU and disk IO monitoring. Disable for mobile targets or -//! when providing a custom `ResourceSampler`. +//! - **`sysinfo-monitor`** (default): Enables the built-in [`SysinfoSampler`](resource::sysinfo_monitor::SysinfoSampler) +//! for cross-platform CPU and disk IO monitoring. Disable for mobile targets or +//! when providing a custom [`ResourceSampler`]. pub mod backpressure; pub mod priority; @@ -32,8 +88,8 @@ pub mod task; // Convenience re-exports. pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy}; pub use priority::Priority; -pub use registry::{ChildSpawner, StateMap, TaskContext, TaskExecutor}; -pub use resource::sampler::{SamplerConfig, SmoothedReader}; +pub use registry::{TaskContext, TaskExecutor}; +pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ EstimatedProgress, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig, diff --git a/src/priority.rs b/src/priority.rs index aa9767d..470fe25 100644 --- a/src/priority.rs +++ b/src/priority.rs @@ -1,3 +1,11 @@ +//! Priority levels for task scheduling. +//! +//! [`Priority`] is a `u8` newtype where lower values mean higher priority. +//! Named constants ([`REALTIME`](Priority::REALTIME), [`HIGH`](Priority::HIGH), +//! [`NORMAL`](Priority::NORMAL), [`BACKGROUND`](Priority::BACKGROUND), +//! [`IDLE`](Priority::IDLE)) cover common tiers, and any value 0–255 is valid +//! for fine-grained control. + use std::cmp::Ordering; use std::fmt; diff --git a/src/registry.rs b/src/registry.rs index 56e3406..1c73b85 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -1,3 +1,11 @@ +//! Executor registration, shared state, and the [`TaskContext`] passed to each task. +//! +//! Register one [`TaskExecutor`] per task type with the scheduler. At dispatch +//! time the scheduler looks up the executor by name and calls +//! [`execute`](TaskExecutor::execute) with a [`TaskContext`] containing the +//! persisted record, a cancellation token, a progress reporter, and any +//! shared application state. + use std::any::{Any, TypeId}; use std::collections::HashMap; use std::future::Future; @@ -22,7 +30,7 @@ use crate::task::{SubmitOutcome, TaskError, TaskRecord, TaskResult, TaskSubmissi /// so that library consumers (e.g. shoebox inside a Tauri app) can inject /// state after the scheduler has been constructed by the parent. #[derive(Default)] -pub struct StateMap { +pub(crate) struct StateMap { inner: RwLock>>, } @@ -79,7 +87,7 @@ impl StateMap { /// Holds a `Notify` reference to wake the scheduler run loop after /// spawning, so children are dispatched promptly. #[derive(Clone)] -pub struct ChildSpawner { +pub(crate) struct ChildSpawner { store: TaskStore, parent_id: i64, work_notify: Arc, @@ -135,7 +143,7 @@ pub struct TaskContext { pub token: CancellationToken, /// Report progress back to the scheduler (0.0–1.0). pub progress: ProgressReporter, - /// Shared application state set via [`SchedulerBuilder::app_state`]. + /// Shared application state set via [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state). pub(crate) app_state: StateSnapshot, /// Spawner for creating child tasks. `None` for non-hierarchical contexts. pub(crate) child_spawner: Option, @@ -151,7 +159,8 @@ impl TaskContext { } /// Retrieve shared application state registered via - /// [`SchedulerBuilder::app_state`] or [`Scheduler::register_state`]. + /// [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state) or + /// [`Scheduler::register_state`](crate::Scheduler::register_state). /// /// Returns `None` if the type was never registered. Multiple types can /// coexist — each is keyed by its concrete `TypeId`. @@ -293,6 +302,7 @@ impl ErasedExecutor for T { } impl TaskTypeRegistry { + /// Create an empty registry. pub fn new() -> Self { Self { types: HashMap::new(), @@ -326,6 +336,7 @@ impl TaskTypeRegistry { self.types.len() } + /// Returns `true` if no executors have been registered. pub fn is_empty(&self) -> bool { self.types.is_empty() } diff --git a/src/resource/mod.rs b/src/resource/mod.rs index 5b7aae4..4ad9c01 100644 --- a/src/resource/mod.rs +++ b/src/resource/mod.rs @@ -1,3 +1,11 @@ +//! System resource monitoring for IO-aware scheduling. +//! +//! Implement [`ResourceSampler`] to feed CPU and disk IO metrics into the +//! scheduler. The built-in [`sysinfo_monitor`] module provides a cross-platform +//! sampler using the `sysinfo` crate (enabled by the `sysinfo-monitor` feature). +//! The scheduler reads the latest smoothed snapshot via [`ResourceReader`] when +//! making IO-budget dispatch decisions. + pub mod sampler; #[cfg(feature = "sysinfo-monitor")] @@ -32,7 +40,27 @@ impl Default for ResourceSnapshot { /// The sampler loop handles EWMA smoothing separately. /// /// To override the built-in monitor (e.g. for container cgroup-aware monitoring), -/// implement this trait and pass it to the scheduler. +/// implement this trait and pass it via +/// [`SchedulerBuilder::resource_sampler`](crate::SchedulerBuilder::resource_sampler). +/// +/// # Example +/// +/// ```ignore +/// use taskmill::{ResourceSampler, ResourceSnapshot}; +/// +/// struct CgroupSampler { /* ... */ } +/// +/// impl ResourceSampler for CgroupSampler { +/// fn sample(&mut self) -> ResourceSnapshot { +/// // Read from /sys/fs/cgroup/... +/// ResourceSnapshot { +/// cpu_usage: 0.42, +/// io_read_bytes_per_sec: 50_000_000.0, +/// io_write_bytes_per_sec: 20_000_000.0, +/// } +/// } +/// } +/// ``` pub trait ResourceSampler: Send + Sync + 'static { /// Take a raw sample. Called periodically by the sampler loop. /// Returns a snapshot with absolute values (not smoothed — the sampler diff --git a/src/resource/sampler.rs b/src/resource/sampler.rs index 160e686..62a1dc4 100644 --- a/src/resource/sampler.rs +++ b/src/resource/sampler.rs @@ -1,3 +1,9 @@ +//! Background sampling loop and EWMA smoothing for resource metrics. +//! +//! [`SamplerConfig`] controls the polling interval and smoothing factor. +//! The sampler loop periodically calls [`ResourceSampler::sample`](super::ResourceSampler), +//! applies EWMA smoothing, and publishes the result for the scheduler to read. + use std::sync::Arc; use std::time::Duration; @@ -38,7 +44,7 @@ fn ewma(old: f64, raw: f64, alpha: f64) -> f64 { /// The sampler loop writes to this; the scheduler reads from it. /// Uses `RwLock` so readers never block each other. #[derive(Clone)] -pub struct SmoothedReader { +pub(crate) struct SmoothedReader { inner: Arc>, } @@ -77,7 +83,7 @@ impl ResourceReader for SmoothedReader { /// Periodically calls `sampler.sample()`, applies EWMA smoothing, and /// stores the result in the `reader`. The scheduler reads /// `reader.latest()` when making IO budget decisions. -pub async fn run_sampler( +pub(crate) async fn run_sampler( sampler: Arc>>, reader: SmoothedReader, config: SamplerConfig, diff --git a/src/resource/sysinfo_monitor.rs b/src/resource/sysinfo_monitor.rs index a2e7592..25f53e3 100644 --- a/src/resource/sysinfo_monitor.rs +++ b/src/resource/sysinfo_monitor.rs @@ -1,3 +1,8 @@ +//! Cross-platform resource sampler using the [`sysinfo`](https://docs.rs/sysinfo) crate. +//! +//! Tracks CPU utilization and aggregate disk IO throughput across all mounted +//! disks. Gated behind the `sysinfo-monitor` feature (enabled by default). + use std::time::Instant; use sysinfo::{Disks, System}; @@ -17,6 +22,7 @@ pub struct SysinfoSampler { } impl SysinfoSampler { + /// Create a new sampler, taking an initial CPU and disk reading. pub fn new() -> Self { let mut sys = System::new(); sys.refresh_cpu_usage(); diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 9d9e619..85206f3 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -1,3 +1,5 @@ +//! Task spawning, active-task tracking, preemption, and parent-child resolution. + use std::collections::HashMap; use std::sync::Arc; @@ -178,10 +180,6 @@ impl ActiveTaskMap { // ── Spawn ────────────────────────────────────────────────────────── -/// Spawn a task executor and wire up completion/failure handling. -/// -/// Inserts the task into the active map, starts a progress listener, -/// and spawns the executor. /// Whether to call `execute` or `finalize` on the executor. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum ExecutionPhase { @@ -199,6 +197,10 @@ pub(crate) struct SpawnContext { pub work_notify: Arc, } +/// Spawn a task executor and wire up completion/failure handling. +/// +/// Inserts the task into the active map, starts a progress listener, +/// and spawns the executor on a new tokio task. pub(crate) async fn spawn_task( task: TaskRecord, executor: Arc, diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index af04120..3485c8f 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -1,3 +1,9 @@ +//! Dispatch gate: admission control for task dispatch. +//! +//! The [`DispatchGate`] trait decides whether a popped task should run or be +//! requeued. The built-in [`DefaultDispatchGate`] applies backpressure +//! throttling and IO-budget checks. + use std::future::Future; use std::pin::Pin; use std::sync::Arc; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 65e19d1..4adfadd 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1,3 +1,10 @@ +//! The scheduler: configuration, event stream, and the main run loop. +//! +//! [`Scheduler`] coordinates task execution — popping from the store, +//! applying backpressure and IO-budget checks, preempting lower-priority +//! work, and emitting [`SchedulerEvent`]s for UI integration. Use +//! [`SchedulerBuilder`] for ergonomic construction. + pub(crate) mod dispatch; pub(crate) mod gate; pub mod progress; @@ -827,6 +834,7 @@ pub struct SchedulerBuilder { } impl SchedulerBuilder { + /// Create a new builder with default configuration. pub fn new() -> Self { Self { store_path: None, @@ -843,7 +851,7 @@ impl SchedulerBuilder { } } - /// Set the SQLite database path. Either this or [`store`] must be called. + /// Set the SQLite database path. Either this or [`store`](Self::store) must be called. pub fn store_path(mut self, path: &str) -> Self { self.store_path = Some(path.to_string()); self @@ -946,7 +954,7 @@ impl SchedulerBuilder { } /// Register shared application state accessible from every executor via - /// [`TaskContext::state`]. + /// [`TaskContext::state`](crate::TaskContext::state). /// /// Multiple types can be registered — each is keyed by its concrete /// `TypeId`. Calling this twice with the same `T` overwrites the @@ -977,7 +985,7 @@ impl SchedulerBuilder { /// have an `Arc` and need to retain a handle for use outside the /// scheduler (e.g. to populate `OnceLock` fields after build). Avoids /// double-wrapping (`Arc>`), which would cause - /// [`TaskContext::state`] downcasts to fail. + /// [`TaskContext::state`](crate::TaskContext::state) downcasts to fail. /// /// Multiple types can be registered — each is keyed by its concrete /// `TypeId`. diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index bee4b13..0b03f33 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -1,3 +1,9 @@ +//! Progress reporting and throughput-based extrapolation. +//! +//! Executors call [`ProgressReporter::report`] to emit percentage updates. +//! The scheduler combines these with historical throughput data to produce +//! [`EstimatedProgress`] snapshots for dashboard UIs. + use serde::{Deserialize, Serialize}; use crate::store::TaskStore; @@ -9,8 +15,22 @@ use super::SchedulerEvent; /// Handle passed to executors for reporting progress back to the scheduler. /// -/// Progress reports are emitted as `SchedulerEvent::Progress` events, -/// making them available to the UI via the same broadcast channel. +/// Progress reports are emitted as [`SchedulerEvent::Progress`] +/// events, making them available to the UI via the same broadcast channel. +/// +/// # Example +/// +/// ```ignore +/// // Inside a TaskExecutor::execute implementation: +/// async fn execute<'a>(&'a self, ctx: &'a TaskContext) -> Result { +/// let items = vec![/* ... */]; +/// for (i, item) in items.iter().enumerate() { +/// // process item... +/// ctx.progress.report_fraction(i as u64 + 1, items.len() as u64, None); +/// } +/// Ok(TaskResult { actual_read_bytes: 0, actual_write_bytes: 0 }) +/// } +/// ``` #[derive(Clone)] pub struct ProgressReporter { task_id: i64, diff --git a/src/store.rs b/src/store.rs index 89656d8..cc240d9 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,3 +1,9 @@ +//! SQLite-backed persistence layer for the task queue and history. +//! +//! [`TaskStore`] manages the active task queue and completed/failed history +//! in a single SQLite database. It handles deduplication, priority upgrades, +//! retries, parent-child hierarchy, and automatic history pruning. + use std::sync::atomic::{AtomicU64, Ordering}; use chrono::{DateTime, Utc}; @@ -845,7 +851,7 @@ impl TaskStore { /// Look up a task by its dedup key, checking the active queue first /// and falling back to history. /// - /// This is the low-level building block for [`Scheduler::task_lookup`]. + /// This is the low-level building block for [`Scheduler::task_lookup`](crate::Scheduler::task_lookup). /// The `key` parameter is the pre-computed SHA-256 dedup key (as /// returned by [`generate_dedup_key`](crate::task::generate_dedup_key) /// or [`TaskSubmission::effective_key`]). diff --git a/src/task.rs b/src/task.rs index 050a0a7..58955c8 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,3 +1,10 @@ +//! Task types, submission parameters, and the [`TypedTask`] trait. +//! +//! This module defines the data structures that flow through the scheduler: +//! [`TaskSubmission`] for enqueuing work, [`TaskRecord`] for in-flight tasks, +//! [`TaskHistoryRecord`] for completed/failed results, and [`TypedTask`] for +//! strongly-typed task payloads with built-in serialization. + use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -218,7 +225,7 @@ pub struct TaskSubmission { pub expected_read_bytes: i64, pub expected_write_bytes: i64, /// Parent task ID for hierarchical tasks. Set automatically by - /// [`TaskContext::spawn_child`]. + /// [`TaskContext::spawn_child`](crate::TaskContext::spawn_child). pub parent_id: Option, /// When `true` (default), the first child failure cancels siblings and /// fails the parent immediately. When `false`, the parent waits for all @@ -269,8 +276,9 @@ impl TaskSubmission { /// IO estimates. /// /// Implementing this trait collapses the 6 fields of [`TaskSubmission`] into a -/// derive-friendly pattern. Use [`Scheduler::submit_typed`] to submit and -/// [`TaskContext::deserialize_typed`] on the executor side. +/// derive-friendly pattern. Use [`Scheduler::submit_typed`](crate::Scheduler::submit_typed) +/// to submit and [`TaskContext::deserialize_typed`](crate::TaskContext::deserialize_typed) +/// on the executor side. /// /// # Example /// @@ -327,7 +335,8 @@ impl TaskSubmission { /// Unified lookup result for querying a task by its dedup inputs. /// -/// Returned by [`TaskStore::task_lookup`] and [`Scheduler::task_lookup`]. +/// Returned by [`TaskStore::task_lookup`](crate::TaskStore::task_lookup) and +/// [`Scheduler::task_lookup`](crate::Scheduler::task_lookup). /// Tells the caller whether a task is currently active (pending, running, /// or paused) or has finished (completed or failed), without requiring /// them to manually compute the dedup key or query two tables. @@ -355,7 +364,7 @@ pub struct TypeStats { /// Resolution of a parent task after a child completes or fails. /// -/// Returned by [`TaskStore::try_resolve_parent`] to tell the scheduler +/// Returned by [`TaskStore::try_resolve_parent`](crate::TaskStore::try_resolve_parent) to tell the scheduler /// what action to take on the parent. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ParentResolution {