diff --git a/src/module.rs b/src/module.rs index db480f8..f706019 100644 --- a/src/module.rs +++ b/src/module.rs @@ -695,10 +695,7 @@ impl ModuleHandle { /// All active tasks in this module (any status). pub fn active_tasks(&self) -> Vec { - self.scheduler - .inner - .active - .records_with_prefix(&self.prefix) + self.scheduler.inner.active.records(Some(&self.prefix)) } /// Dead-lettered tasks in this module, newest first. @@ -733,7 +730,7 @@ impl ModuleHandle { .scheduler .inner .active - .progress_snapshots_with_prefix(&self.prefix); + .progress_snapshots(Some(&self.prefix)); let mut results = Vec::with_capacity(snapshots.len()); for (record, reported, reported_at) in snapshots { results.push( @@ -755,7 +752,7 @@ impl ModuleHandle { .scheduler .inner .active - .byte_progress_snapshots_with_prefix(&self.prefix); + .byte_progress_snapshots(Some(&self.prefix)); snapshots .into_iter() .filter(|(_, _, _, _, completed, _, _, _)| *completed > 0) @@ -868,7 +865,7 @@ impl ModuleHandle { /// doesn't exist or belongs to a different module. async fn task_belongs(&self, task_id: i64) -> Result { // Fast path: check the in-memory active map first. - let records = self.scheduler.inner.active.records(); + let records = self.scheduler.inner.active.records(None); if let Some(r) = records.iter().find(|r| r.id == task_id) { return Ok(r.task_type.starts_with(self.prefix.as_ref())); } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 5f2653d..1922686 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -86,39 +86,42 @@ impl ActiveTaskMap { self.inner.lock().unwrap().remove(&id) } - /// Snapshot of all active task records. - pub fn records(&self) -> Vec { - self.inner - .lock() - .unwrap() - .values() + /// Snapshot of active task records, optionally filtered to those whose + /// `task_type` starts with `prefix`. + pub fn records(&self, prefix: Option<&str>) -> Vec { + let map = self.inner.lock().unwrap(); + map.values() + .filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p))) .map(|at| at.record.clone()) .collect() } - /// Snapshot of progress data for all active tasks. + /// Snapshot of progress data for active tasks, optionally filtered to + /// those whose `task_type` starts with `prefix`. pub fn progress_snapshots( &self, + prefix: Option<&str>, ) -> Vec<( TaskRecord, Option, Option>, )> { - self.inner - .lock() - .unwrap() - .values() + let map = self.inner.lock().unwrap(); + map.values() + .filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p))) .map(|at| (at.record.clone(), at.reported_progress, at.reported_at)) .collect() } - /// Snapshot of byte-level progress for all active tasks. + /// Snapshot of byte-level progress for active tasks, optionally filtered + /// to those whose `task_type` starts with `prefix`. /// /// Returns `(task_id, task_type, key, label, bytes_completed, bytes_total, parent_id, started_at)`. /// Single lock acquisition — reads atomic counters and copies scalar fields only. - pub fn byte_progress_snapshots(&self) -> Vec { + pub fn byte_progress_snapshots(&self, prefix: Option<&str>) -> Vec { let map = self.inner.lock().unwrap(); map.values() + .filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p))) .map(|at| { let (completed, total) = at.io.progress_snapshot(); ( @@ -229,52 +232,6 @@ impl ActiveTaskMap { handles } - /// Snapshot of all active task records whose `task_type` starts with `prefix`. - pub fn records_with_prefix(&self, prefix: &str) -> Vec { - let map = self.inner.lock().unwrap(); - map.values() - .filter(|at| at.record.task_type.starts_with(prefix)) - .map(|at| at.record.clone()) - .collect() - } - - /// Snapshot of progress data for active tasks matching `prefix`. - pub fn progress_snapshots_with_prefix( - &self, - prefix: &str, - ) -> Vec<( - TaskRecord, - Option, - Option>, - )> { - let map = self.inner.lock().unwrap(); - map.values() - .filter(|at| at.record.task_type.starts_with(prefix)) - .map(|at| (at.record.clone(), at.reported_progress, at.reported_at)) - .collect() - } - - /// Snapshot of byte-level progress for active tasks matching `prefix`. - pub fn byte_progress_snapshots_with_prefix(&self, prefix: &str) -> Vec { - let map = self.inner.lock().unwrap(); - map.values() - .filter(|at| at.record.task_type.starts_with(prefix)) - .map(|at| { - let (completed, total) = at.io.progress_snapshot(); - ( - at.record.id, - at.record.task_type.clone(), - at.record.key.clone(), - at.record.label.clone(), - completed, - total, - at.record.parent_id, - at.started_at, - ) - }) - .collect() - } - /// Pause active tasks whose `task_type` starts with `prefix`: cancel their /// tokens and move them to paused state in the store. Returns count paused. pub async fn pause_module( @@ -382,17 +339,12 @@ pub(crate) async fn spawn_task( } = ctx; // Extract the owning module name from the task type prefix (e.g. "media" from "media::thumb"). - let owning_module: String = task - .task_type - .split_once("::") - .map(|(n, _)| n.to_string()) - .unwrap_or_default(); + let owning_module: String = task.module_name().unwrap_or_default().to_string(); // Clone the pre-snapshotted module state — no lock needed, already lock-free. let module_state_snapshot: StateSnapshot = task - .task_type - .split_once("::") - .and_then(|(name, _)| module_state.get(name).cloned()) + .module_name() + .and_then(|name| module_state.get(name).cloned()) .unwrap_or_default(); let child_token = CancellationToken::new(); @@ -426,7 +378,7 @@ pub(crate) async fn spawn_task( ); // Increment the module running counter for this task. - if let Some(module_name) = task.task_type.split_once("::").map(|(n, _)| n) { + if let Some(module_name) = task.module_name() { if let Some(counter) = module_running.get(module_name) { counter.fetch_add(1, AtomicOrdering::Relaxed); } @@ -462,7 +414,7 @@ pub(crate) async fn spawn_task( let task_id = task.id; // Helper: decrement the module running counter when this task leaves "running". let decrement_module = || { - if let Some(name) = task.task_type.split_once("::").map(|(n, _)| n) { + if let Some(name) = task.module_name() { if let Some(counter) = module_running_for_task.get(name) { counter.fetch_sub(1, AtomicOrdering::Relaxed); } diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 5e30105..4cd6539 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -169,7 +169,7 @@ impl DispatchGate for DefaultDispatchGate { } // Module concurrency check. - if let Some(module_name) = task.task_type.split_once("::").map(|(n, _)| n) { + if let Some(module_name) = task.module_name() { let cap = ctx.module_caps.read().unwrap().get(module_name).copied(); if let Some(cap) = cap { let running = ctx diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 063feef..3334c59 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -240,7 +240,7 @@ pub(crate) async fn run_progress_ticker( break; } _ = tokio::time::sleep(interval) => { - let snapshots = active.byte_progress_snapshots(); + let snapshots = active.byte_progress_snapshots(None); let mut active_ids = std::collections::HashSet::with_capacity(snapshots.len()); diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index a93992a..cb5bbca 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -8,7 +8,7 @@ use super::{EstimatedProgress, Scheduler, SchedulerSnapshot}; impl Scheduler { /// Snapshot of currently active (in-memory) tasks. pub async fn active_tasks(&self) -> Vec { - self.inner.active.records() + self.inner.active.records(None) } /// Get estimated progress for all running tasks. @@ -16,7 +16,7 @@ impl Scheduler { /// Combines executor-reported progress with throughput-based extrapolation /// using historical average duration for each task type. pub async fn estimated_progress(&self) -> Vec { - let snapshots: Vec<_> = self.inner.active.progress_snapshots(); + let snapshots: Vec<_> = self.inner.active.progress_snapshots(None); let mut results = Vec::with_capacity(snapshots.len()); for (record, reported, reported_at) in snapshots { results.push( @@ -32,7 +32,7 @@ impl Scheduler { /// Returns instantaneous values (throughput = 0) — for smoothed throughput /// and ETA, use [`subscribe_progress`](Self::subscribe_progress). pub fn byte_progress(&self) -> Vec { - let snapshots = self.inner.active.byte_progress_snapshots(); + let snapshots = self.inner.active.byte_progress_snapshots(None); snapshots .into_iter() .filter(|(_, _, _, _, completed, _, _, _)| *completed > 0) @@ -111,7 +111,7 @@ impl Scheduler { /// backpressure in one call — exactly what a Tauri command would /// return to the frontend. pub async fn snapshot(&self) -> Result { - let running = self.inner.active.records(); + let running = self.inner.active.records(None); let pending_count = self.inner.store.pending_count().await?; let paused_count = self.inner.store.paused_count().await?; let waiting_count = self.inner.store.waiting_count().await?; diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index 1be175c..84e774a 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -104,7 +104,7 @@ impl TaskStore { super::lifecycle::insert_history( &mut conn, &task, - "cancelled", + super::lifecycle::HistoryStatus::Cancelled, &crate::task::IoBudget::default(), None, None, diff --git a/src/store/lifecycle/cancel_expire.rs b/src/store/lifecycle/cancel_expire.rs index f84f821..7ee487d 100644 --- a/src/store/lifecycle/cancel_expire.rs +++ b/src/store/lifecycle/cancel_expire.rs @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; use crate::task::{IoBudget, TaskRecord}; -use super::{compute_duration_ms, insert_history}; +use super::{compute_duration_ms, insert_history, HistoryStatus}; impl TaskStore { /// Pause a running task (for preemption). Sets status to paused. @@ -48,7 +48,7 @@ impl TaskStore { insert_history( &mut conn, &task, - "cancelled", + HistoryStatus::Cancelled, &IoBudget::default(), duration_ms, None, @@ -87,7 +87,7 @@ impl TaskStore { insert_history( &mut conn, task, - "cancelled", + HistoryStatus::Cancelled, &IoBudget::default(), duration_ms, None, @@ -177,7 +177,7 @@ impl TaskStore { insert_history( &mut conn, &task, - "expired", + HistoryStatus::Expired, &IoBudget::default(), None, None, @@ -199,7 +199,7 @@ impl TaskStore { insert_history( &mut conn, &child, - "expired", + HistoryStatus::Expired, &IoBudget::default(), None, None, @@ -270,7 +270,7 @@ impl TaskStore { insert_history( &mut conn, &task, - "expired", + HistoryStatus::Expired, &IoBudget::default(), None, None, diff --git a/src/store/lifecycle/complete.rs b/src/store/lifecycle/complete.rs index d8e5027..8508647 100644 --- a/src/store/lifecycle/complete.rs +++ b/src/store/lifecycle/complete.rs @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; use crate::task::IoBudget; -use super::{compute_duration_ms, insert_history}; +use super::{compute_duration_ms, insert_history, HistoryStatus}; impl TaskStore { /// Mark a task as completed and move it to history. @@ -74,7 +74,7 @@ impl TaskStore { insert_history( conn, task, - "completed", + HistoryStatus::Completed, metrics, duration_ms, task.last_error.as_deref(), diff --git a/src/store/lifecycle/dependencies.rs b/src/store/lifecycle/dependencies.rs index 642d61b..edbff94 100644 --- a/src/store/lifecycle/dependencies.rs +++ b/src/store/lifecycle/dependencies.rs @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; use crate::task::{DependencyFailurePolicy, IoBudget}; -use super::insert_history; +use super::{insert_history, HistoryStatus}; impl TaskStore { /// After a task completes, check if any blocked tasks are now unblocked. @@ -126,7 +126,7 @@ impl TaskStore { insert_history( conn, &task, - "dependency_failed", + HistoryStatus::DependencyFailed, &IoBudget::default(), None, Some(&format!("dependency task {} failed", failed_task_id)), diff --git a/src/store/lifecycle/fail.rs b/src/store/lifecycle/fail.rs index d29714c..172ab80 100644 --- a/src/store/lifecycle/fail.rs +++ b/src/store/lifecycle/fail.rs @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; use crate::task::{BackoffStrategy, IoBudget}; -use super::{compute_duration_ms, insert_history}; +use super::{compute_duration_ms, insert_history, HistoryStatus}; /// Backoff parameters for retry delay computation. /// @@ -160,7 +160,11 @@ impl TaskStore { } else { // Terminal failure — move to history. // Distinguish: retryable + exhausted → dead_letter; non-retryable → failed. - let status = if retryable { "dead_letter" } else { "failed" }; + let status = if retryable { + HistoryStatus::DeadLetter + } else { + HistoryStatus::Failed + }; let duration_ms = compute_duration_ms(task); insert_history(conn, task, status, metrics, duration_ms, Some(error)).await?; diff --git a/src/store/lifecycle/mod.rs b/src/store/lifecycle/mod.rs index 84c8626..aae4244 100644 --- a/src/store/lifecycle/mod.rs +++ b/src/store/lifecycle/mod.rs @@ -16,6 +16,37 @@ use crate::task::{IoBudget, TaskRecord}; use super::StoreError; +/// Terminal status values for tasks moved to history. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum HistoryStatus { + Completed, + Failed, + DeadLetter, + Cancelled, + Expired, + Superseded, + DependencyFailed, +} + +impl HistoryStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Completed => "completed", + Self::Failed => "failed", + Self::DeadLetter => "dead_letter", + Self::Cancelled => "cancelled", + Self::Expired => "expired", + Self::Superseded => "superseded", + Self::DependencyFailed => "dependency_failed", + } + } + + /// Whether this status increments `retry_count` in history. + pub fn increments_retries(self) -> bool { + matches!(self, Self::Failed | Self::DeadLetter) + } +} + /// Insert a task record into the history table. /// /// Shared by `complete()`, `fail()`, and `cancel_to_history()` to eliminate @@ -23,13 +54,13 @@ use super::StoreError; pub(crate) async fn insert_history( conn: &mut sqlx::pool::PoolConnection, task: &TaskRecord, - status: &str, + status: HistoryStatus, metrics: &IoBudget, duration_ms: Option, last_error: Option<&str>, ) -> Result<(), StoreError> { let fail_fast_val: i32 = if task.fail_fast { 1 } else { 0 }; - let retry_count = if status == "failed" || status == "dead_letter" { + let retry_count = if status.increments_retries() { task.retry_count + 1 } else { task.retry_count @@ -46,7 +77,7 @@ pub(crate) async fn insert_history( .bind(&task.key) .bind(&task.label) .bind(task.priority.value() as i32) - .bind(status) + .bind(status.as_str()) .bind(&task.payload) .bind(task.expected_io.disk_read) .bind(task.expected_io.disk_write) diff --git a/src/store/submit/dedup.rs b/src/store/submit/dedup.rs index 76d15ee..0dc69fd 100644 --- a/src/store/submit/dedup.rs +++ b/src/store/submit/dedup.rs @@ -76,7 +76,7 @@ pub(crate) async fn supersede_existing( crate::store::lifecycle::insert_history( conn, &existing, - "superseded", + crate::store::lifecycle::HistoryStatus::Superseded, &crate::task::IoBudget::default(), existing .started_at diff --git a/src/task/mod.rs b/src/task/mod.rs index 34004fa..7edc378 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -246,6 +246,11 @@ pub struct TaskRecord { } impl TaskRecord { + /// Extract the module name prefix from `task_type` (e.g. `"media"` from `"media::thumb"`). + pub fn module_name(&self) -> Option<&str> { + self.task_type.split_once("::").map(|(n, _)| n) + } + /// Deserialize the payload blob into a typed value. /// /// Returns `None` if the payload is absent, or an error if deserialization fails. diff --git a/src/task/submit_builder.rs b/src/task/submit_builder.rs index 6c6503c..55b58be 100644 --- a/src/task/submit_builder.rs +++ b/src/task/submit_builder.rs @@ -211,6 +211,12 @@ impl SubmitBuilder { self } + fn merge_module_tags(sub: &mut TaskSubmission, module_tags: &HashMap) { + for (k, v) in module_tags { + sub.tags.entry(k.clone()).or_insert_with(|| v.clone()); + } + } + /// Apply all default layers and per-call overrides, returning the /// scheduler and the fully resolved [`TaskSubmission`]. /// @@ -255,9 +261,7 @@ impl SubmitBuilder { sub.ttl = td.ttl; // TypedTask tags are the base; module tags add new keys only. sub.tags = td.tags; - for (k, v) in &self.module_defaults.tags { - sub.tags.entry(k.clone()).or_insert_with(|| v.clone()); - } + Self::merge_module_tags(&mut sub, &self.module_defaults.tags); if let Some(p) = self.module_defaults.priority { sub.priority = p; } @@ -290,9 +294,7 @@ impl SubmitBuilder { } } // Module tags: add keys not already on the submission (submission wins). - for (k, v) in &self.module_defaults.tags { - sub.tags.entry(k.clone()).or_insert_with(|| v.clone()); - } + Self::merge_module_tags(&mut sub, &self.module_defaults.tags); } // ── 4. Apply per-call overrides (layer 1 — always highest priority) ──