Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,7 @@ impl ModuleHandle {

/// All active tasks in this module (any status).
pub fn active_tasks(&self) -> Vec<TaskRecord> {
self.scheduler
.inner
.active
.records_with_prefix(&self.prefix)
self.scheduler.inner.active.records(Some(&self.prefix))
}

/// Dead-lettered tasks in this module, newest first.
Expand Down Expand Up @@ -733,7 +730,7 @@ impl ModuleHandle {
.scheduler
.inner
.active
.progress_snapshots_with_prefix(&self.prefix);
.progress_snapshots(Some(&self.prefix));
let mut results = Vec::with_capacity(snapshots.len());
for (record, reported, reported_at) in snapshots {
results.push(
Expand All @@ -755,7 +752,7 @@ impl ModuleHandle {
.scheduler
.inner
.active
.byte_progress_snapshots_with_prefix(&self.prefix);
.byte_progress_snapshots(Some(&self.prefix));
snapshots
.into_iter()
.filter(|(_, _, _, _, completed, _, _, _)| *completed > 0)
Expand Down Expand Up @@ -868,7 +865,7 @@ impl ModuleHandle {
/// doesn't exist or belongs to a different module.
async fn task_belongs(&self, task_id: i64) -> Result<bool, StoreError> {
// Fast path: check the in-memory active map first.
let records = self.scheduler.inner.active.records();
let records = self.scheduler.inner.active.records(None);
if let Some(r) = records.iter().find(|r| r.id == task_id) {
return Ok(r.task_type.starts_with(self.prefix.as_ref()));
}
Expand Down
90 changes: 21 additions & 69 deletions src/scheduler/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,39 +86,42 @@ impl ActiveTaskMap {
self.inner.lock().unwrap().remove(&id)
}

/// Snapshot of all active task records.
pub fn records(&self) -> Vec<TaskRecord> {
self.inner
.lock()
.unwrap()
.values()
/// Snapshot of active task records, optionally filtered to those whose
/// `task_type` starts with `prefix`.
pub fn records(&self, prefix: Option<&str>) -> Vec<TaskRecord> {
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p)))
.map(|at| at.record.clone())
.collect()
}

/// Snapshot of progress data for all active tasks.
/// Snapshot of progress data for active tasks, optionally filtered to
/// those whose `task_type` starts with `prefix`.
pub fn progress_snapshots(
&self,
prefix: Option<&str>,
) -> Vec<(
TaskRecord,
Option<f32>,
Option<chrono::DateTime<chrono::Utc>>,
)> {
self.inner
.lock()
.unwrap()
.values()
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p)))
.map(|at| (at.record.clone(), at.reported_progress, at.reported_at))
.collect()
}

/// Snapshot of byte-level progress for all active tasks.
/// Snapshot of byte-level progress for active tasks, optionally filtered
/// to those whose `task_type` starts with `prefix`.
///
/// Returns `(task_id, task_type, key, label, bytes_completed, bytes_total, parent_id, started_at)`.
/// Single lock acquisition — reads atomic counters and copies scalar fields only.
pub fn byte_progress_snapshots(&self) -> Vec<ByteProgressSnapshot> {
pub fn byte_progress_snapshots(&self, prefix: Option<&str>) -> Vec<ByteProgressSnapshot> {
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| prefix.map_or(true, |p| at.record.task_type.starts_with(p)))
.map(|at| {
let (completed, total) = at.io.progress_snapshot();
(
Expand Down Expand Up @@ -229,52 +232,6 @@ impl ActiveTaskMap {
handles
}

/// Snapshot of all active task records whose `task_type` starts with `prefix`.
pub fn records_with_prefix(&self, prefix: &str) -> Vec<TaskRecord> {
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| at.record.task_type.starts_with(prefix))
.map(|at| at.record.clone())
.collect()
}

/// Snapshot of progress data for active tasks matching `prefix`.
pub fn progress_snapshots_with_prefix(
&self,
prefix: &str,
) -> Vec<(
TaskRecord,
Option<f32>,
Option<chrono::DateTime<chrono::Utc>>,
)> {
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| at.record.task_type.starts_with(prefix))
.map(|at| (at.record.clone(), at.reported_progress, at.reported_at))
.collect()
}

/// Snapshot of byte-level progress for active tasks matching `prefix`.
pub fn byte_progress_snapshots_with_prefix(&self, prefix: &str) -> Vec<ByteProgressSnapshot> {
let map = self.inner.lock().unwrap();
map.values()
.filter(|at| at.record.task_type.starts_with(prefix))
.map(|at| {
let (completed, total) = at.io.progress_snapshot();
(
at.record.id,
at.record.task_type.clone(),
at.record.key.clone(),
at.record.label.clone(),
completed,
total,
at.record.parent_id,
at.started_at,
)
})
.collect()
}

/// Pause active tasks whose `task_type` starts with `prefix`: cancel their
/// tokens and move them to paused state in the store. Returns count paused.
pub async fn pause_module(
Expand Down Expand Up @@ -382,17 +339,12 @@ pub(crate) async fn spawn_task(
} = ctx;

// Extract the owning module name from the task type prefix (e.g. "media" from "media::thumb").
let owning_module: String = task
.task_type
.split_once("::")
.map(|(n, _)| n.to_string())
.unwrap_or_default();
let owning_module: String = task.module_name().unwrap_or_default().to_string();

// Clone the pre-snapshotted module state — no lock needed, already lock-free.
let module_state_snapshot: StateSnapshot = task
.task_type
.split_once("::")
.and_then(|(name, _)| module_state.get(name).cloned())
.module_name()
.and_then(|name| module_state.get(name).cloned())
.unwrap_or_default();
let child_token = CancellationToken::new();

Expand Down Expand Up @@ -426,7 +378,7 @@ pub(crate) async fn spawn_task(
);

// Increment the module running counter for this task.
if let Some(module_name) = task.task_type.split_once("::").map(|(n, _)| n) {
if let Some(module_name) = task.module_name() {
if let Some(counter) = module_running.get(module_name) {
counter.fetch_add(1, AtomicOrdering::Relaxed);
}
Expand Down Expand Up @@ -462,7 +414,7 @@ pub(crate) async fn spawn_task(
let task_id = task.id;
// Helper: decrement the module running counter when this task leaves "running".
let decrement_module = || {
if let Some(name) = task.task_type.split_once("::").map(|(n, _)| n) {
if let Some(name) = task.module_name() {
if let Some(counter) = module_running_for_task.get(name) {
counter.fetch_sub(1, AtomicOrdering::Relaxed);
}
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl DispatchGate for DefaultDispatchGate {
}

// Module concurrency check.
if let Some(module_name) = task.task_type.split_once("::").map(|(n, _)| n) {
if let Some(module_name) = task.module_name() {
let cap = ctx.module_caps.read().unwrap().get(module_name).copied();
if let Some(cap) = cap {
let running = ctx
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub(crate) async fn run_progress_ticker(
break;
}
_ = tokio::time::sleep(interval) => {
let snapshots = active.byte_progress_snapshots();
let snapshots = active.byte_progress_snapshots(None);

let mut active_ids = std::collections::HashSet::with_capacity(snapshots.len());

Expand Down
8 changes: 4 additions & 4 deletions src/scheduler/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use super::{EstimatedProgress, Scheduler, SchedulerSnapshot};
impl Scheduler {
/// Snapshot of currently active (in-memory) tasks.
pub async fn active_tasks(&self) -> Vec<crate::task::TaskRecord> {
self.inner.active.records()
self.inner.active.records(None)
}

/// Get estimated progress for all running tasks.
///
/// Combines executor-reported progress with throughput-based extrapolation
/// using historical average duration for each task type.
pub async fn estimated_progress(&self) -> Vec<EstimatedProgress> {
let snapshots: Vec<_> = self.inner.active.progress_snapshots();
let snapshots: Vec<_> = self.inner.active.progress_snapshots(None);
let mut results = Vec::with_capacity(snapshots.len());
for (record, reported, reported_at) in snapshots {
results.push(
Expand All @@ -32,7 +32,7 @@ impl Scheduler {
/// Returns instantaneous values (throughput = 0) — for smoothed throughput
/// and ETA, use [`subscribe_progress`](Self::subscribe_progress).
pub fn byte_progress(&self) -> Vec<TaskProgress> {
let snapshots = self.inner.active.byte_progress_snapshots();
let snapshots = self.inner.active.byte_progress_snapshots(None);
snapshots
.into_iter()
.filter(|(_, _, _, _, completed, _, _, _)| *completed > 0)
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Scheduler {
/// backpressure in one call — exactly what a Tauri command would
/// return to the frontend.
pub async fn snapshot(&self) -> Result<SchedulerSnapshot, StoreError> {
let running = self.inner.active.records();
let running = self.inner.active.records(None);
let pending_count = self.inner.store.pending_count().await?;
let paused_count = self.inner.store.paused_count().await?;
let waiting_count = self.inner.store.waiting_count().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/store/hierarchy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl TaskStore {
super::lifecycle::insert_history(
&mut conn,
&task,
"cancelled",
super::lifecycle::HistoryStatus::Cancelled,
&crate::task::IoBudget::default(),
None,
None,
Expand Down
12 changes: 6 additions & 6 deletions src/store/lifecycle/cancel_expire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record;
use crate::store::{StoreError, TaskStore};
use crate::task::{IoBudget, TaskRecord};

use super::{compute_duration_ms, insert_history};
use super::{compute_duration_ms, insert_history, HistoryStatus};

impl TaskStore {
/// Pause a running task (for preemption). Sets status to paused.
Expand Down Expand Up @@ -48,7 +48,7 @@ impl TaskStore {
insert_history(
&mut conn,
&task,
"cancelled",
HistoryStatus::Cancelled,
&IoBudget::default(),
duration_ms,
None,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl TaskStore {
insert_history(
&mut conn,
task,
"cancelled",
HistoryStatus::Cancelled,
&IoBudget::default(),
duration_ms,
None,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl TaskStore {
insert_history(
&mut conn,
&task,
"expired",
HistoryStatus::Expired,
&IoBudget::default(),
None,
None,
Expand All @@ -199,7 +199,7 @@ impl TaskStore {
insert_history(
&mut conn,
&child,
"expired",
HistoryStatus::Expired,
&IoBudget::default(),
None,
None,
Expand Down Expand Up @@ -270,7 +270,7 @@ impl TaskStore {
insert_history(
&mut conn,
&task,
"expired",
HistoryStatus::Expired,
&IoBudget::default(),
None,
None,
Expand Down
4 changes: 2 additions & 2 deletions src/store/lifecycle/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record;
use crate::store::{StoreError, TaskStore};
use crate::task::IoBudget;

use super::{compute_duration_ms, insert_history};
use super::{compute_duration_ms, insert_history, HistoryStatus};

impl TaskStore {
/// Mark a task as completed and move it to history.
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TaskStore {
insert_history(
conn,
task,
"completed",
HistoryStatus::Completed,
metrics,
duration_ms,
task.last_error.as_deref(),
Expand Down
4 changes: 2 additions & 2 deletions src/store/lifecycle/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record;
use crate::store::{StoreError, TaskStore};
use crate::task::{DependencyFailurePolicy, IoBudget};

use super::insert_history;
use super::{insert_history, HistoryStatus};

impl TaskStore {
/// After a task completes, check if any blocked tasks are now unblocked.
Expand Down Expand Up @@ -126,7 +126,7 @@ impl TaskStore {
insert_history(
conn,
&task,
"dependency_failed",
HistoryStatus::DependencyFailed,
&IoBudget::default(),
None,
Some(&format!("dependency task {} failed", failed_task_id)),
Expand Down
8 changes: 6 additions & 2 deletions src/store/lifecycle/fail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::store::row_mapping::row_to_task_record;
use crate::store::{StoreError, TaskStore};
use crate::task::{BackoffStrategy, IoBudget};

use super::{compute_duration_ms, insert_history};
use super::{compute_duration_ms, insert_history, HistoryStatus};

/// Backoff parameters for retry delay computation.
///
Expand Down Expand Up @@ -160,7 +160,11 @@ impl TaskStore {
} else {
// Terminal failure — move to history.
// Distinguish: retryable + exhausted → dead_letter; non-retryable → failed.
let status = if retryable { "dead_letter" } else { "failed" };
let status = if retryable {
HistoryStatus::DeadLetter
} else {
HistoryStatus::Failed
};
let duration_ms = compute_duration_ms(task);

insert_history(conn, task, status, metrics, duration_ms, Some(error)).await?;
Expand Down
Loading
Loading