Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion migrations/001_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ CREATE TABLE IF NOT EXISTS tasks (
started_at TEXT,
requeue INTEGER NOT NULL DEFAULT 0,
requeue_priority INTEGER,
parent_id INTEGER,
fail_fast INTEGER NOT NULL DEFAULT 1,
UNIQUE(key)
);

Expand All @@ -43,7 +45,9 @@ CREATE TABLE IF NOT EXISTS task_history (
created_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT NOT NULL DEFAULT (datetime('now')),
duration_ms INTEGER
duration_ms INTEGER,
parent_id INTEGER,
fail_fast INTEGER NOT NULL DEFAULT 1
);

-- Index for IO learning: recent completions by task type.
Expand All @@ -62,3 +66,13 @@ CREATE INDEX IF NOT EXISTS idx_history_completed
-- Index for filtering history by status (e.g. listing failed tasks).
CREATE INDEX IF NOT EXISTS idx_history_status
ON task_history (status, completed_at DESC);

-- Index for looking up children of a parent task (active queue).
CREATE INDEX IF NOT EXISTS idx_tasks_parent
ON tasks (parent_id)
WHERE parent_id IS NOT NULL;

-- Index for looking up children of a parent task (history).
CREATE INDEX IF NOT EXISTS idx_task_history_parent
ON task_history (parent_id)
WHERE parent_id IS NOT NULL;
28 changes: 28 additions & 0 deletions src/backpressure.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
//! Composable backpressure for throttling task dispatch.
//!
//! Implement [`PressureSource`] to feed external signals (API load, memory
//! pressure, queue depth, etc.) into the scheduler. Multiple sources are
//! combined via [`CompositePressure`], and [`ThrottlePolicy`] maps the
//! aggregate pressure to per-priority throttle decisions.

use crate::priority::Priority;

/// A source of pressure that signals the scheduler to slow down.
///
/// Consumers implement this trait to feed external signals (API load, memory
/// pressure, queue depth, etc.) into the scheduler's throttle decisions.
///
/// # Example
///
/// ```ignore
/// use std::sync::atomic::{AtomicU32, Ordering};
/// use taskmill::PressureSource;
///
/// struct ApiLoadPressure {
/// active_requests: AtomicU32,
/// max_requests: u32,
/// }
///
/// impl PressureSource for ApiLoadPressure {
/// fn pressure(&self) -> f32 {
/// let current = self.active_requests.load(Ordering::Relaxed);
/// (current as f32 / self.max_requests as f32).min(1.0)
/// }
/// fn name(&self) -> &str { "api-load" }
/// }
/// ```
pub trait PressureSource: Send + Sync + 'static {
/// Current pressure level between 0.0 (idle) and 1.0 (saturated).
fn pressure(&self) -> f32;
Expand Down Expand Up @@ -68,6 +95,7 @@ pub struct CompositePressure {
}

impl CompositePressure {
/// Create an empty composite with no pressure sources.
pub fn new() -> Self {
Self {
sources: Vec::new(),
Expand Down
71 changes: 64 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,67 @@
//! - Emits lifecycle events including progress for UI integration (via broadcast channel)
//! - Supports graceful shutdown with configurable drain timeout
//!
//! # Quick start
//!
//! ```no_run
//! use std::sync::Arc;
//! use taskmill::{
//! Scheduler, TaskExecutor, TaskContext, TaskResult, TaskError,
//! TypedTask, Priority,
//! };
//! use serde::{Serialize, Deserialize};
//! use tokio_util::sync::CancellationToken;
//!
//! // 1. Define a task payload.
//! #[derive(Serialize, Deserialize)]
//! struct Thumbnail { path: String, size: u32 }
//!
//! impl TypedTask for Thumbnail {
//! const TASK_TYPE: &'static str = "thumbnail";
//! fn expected_read_bytes(&self) -> i64 { 4_096 }
//! fn expected_write_bytes(&self) -> i64 { 1_024 }
//! }
//!
//! // 2. Implement the executor.
//! struct ThumbnailExecutor;
//!
//! impl TaskExecutor for ThumbnailExecutor {
//! async fn execute<'a>(
//! &'a self, ctx: &'a TaskContext,
//! ) -> Result<TaskResult, TaskError> {
//! let thumb: Thumbnail = ctx.deserialize_typed().unwrap().unwrap();
//! ctx.progress.report(0.5, Some("resizing".into()));
//! // ... do work, check ctx.token.is_cancelled() ...
//! Ok(TaskResult { actual_read_bytes: 4_096, actual_write_bytes: 1_024 })
//! }
//! }
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! // 3. Build and run the scheduler.
//! let scheduler = Scheduler::builder()
//! .store_path("tasks.db")
//! .typed_executor::<Thumbnail, _>(Arc::new(ThumbnailExecutor))
//! .max_concurrency(4)
//! .with_resource_monitoring()
//! .build()
//! .await?;
//!
//! // 4. Submit work.
//! let task = Thumbnail { path: "/photos/a.jpg".into(), size: 256 };
//! scheduler.submit_typed(&task).await?;
//!
//! // 5. Run until cancelled.
//! let token = CancellationToken::new();
//! scheduler.run(token).await;
//! # Ok(())
//! # }
//! ```
//!
//! # Feature flags
//!
//! - **`sysinfo-monitor`** (default): Enables the built-in `SysinfoSampler` for
//! cross-platform CPU and disk IO monitoring. Disable for mobile targets or
//! when providing a custom `ResourceSampler`.
//! - **`sysinfo-monitor`** (default): Enables the built-in [`SysinfoSampler`](resource::sysinfo_monitor::SysinfoSampler)
//! for cross-platform CPU and disk IO monitoring. Disable for mobile targets or
//! when providing a custom [`ResourceSampler`].

pub mod backpressure;
pub mod priority;
Expand All @@ -32,17 +88,18 @@ pub mod task;
// Convenience re-exports.
pub use backpressure::{CompositePressure, PressureSource, ThrottlePolicy};
pub use priority::Priority;
pub use registry::{StateMap, TaskContext, TaskExecutor};
pub use resource::sampler::{SamplerConfig, SmoothedReader};
pub use registry::{TaskContext, TaskExecutor};
pub use resource::sampler::SamplerConfig;
pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot};
pub use scheduler::{
EstimatedProgress, ProgressReporter, Scheduler, SchedulerBuilder, SchedulerConfig,
SchedulerEvent, SchedulerSnapshot, ShutdownMode,
};
pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore};
pub use task::{
generate_dedup_key, HistoryStatus, SubmitOutcome, TaskError, TaskHistoryRecord, TaskLookup,
TaskRecord, TaskResult, TaskStatus, TaskSubmission, TypeStats, TypedTask,
generate_dedup_key, HistoryStatus, ParentResolution, SubmitOutcome, TaskError,
TaskHistoryRecord, TaskLookup, TaskRecord, TaskResult, TaskStatus, TaskSubmission, TypeStats,
TypedTask,
};

#[cfg(feature = "sysinfo-monitor")]
Expand Down
8 changes: 8 additions & 0 deletions src/priority.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! Priority levels for task scheduling.
//!
//! [`Priority`] is a `u8` newtype where lower values mean higher priority.
//! Named constants ([`REALTIME`](Priority::REALTIME), [`HIGH`](Priority::HIGH),
//! [`NORMAL`](Priority::NORMAL), [`BACKGROUND`](Priority::BACKGROUND),
//! [`IDLE`](Priority::IDLE)) cover common tiers, and any value 0–255 is valid
//! for fine-grained control.

use std::cmp::Ordering;
use std::fmt;

Expand Down
124 changes: 120 additions & 4 deletions src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! Executor registration, shared state, and the [`TaskContext`] passed to each task.
//!
//! Register one [`TaskExecutor`] per task type with the scheduler. At dispatch
//! time the scheduler looks up the executor by name and calls
//! [`execute`](TaskExecutor::execute) with a [`TaskContext`] containing the
//! persisted record, a cancellation token, a progress reporter, and any
//! shared application state.

use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::future::Future;
Expand All @@ -7,7 +15,8 @@ use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

use crate::scheduler::ProgressReporter;
use crate::task::{TaskError, TaskRecord, TaskResult, TypedTask};
use crate::store::{StoreError, TaskStore};
use crate::task::{SubmitOutcome, TaskError, TaskRecord, TaskResult, TaskSubmission, TypedTask};

// ── State Map ────────────────────────────────────────────────────────

Expand All @@ -21,7 +30,7 @@ use crate::task::{TaskError, TaskRecord, TaskResult, TypedTask};
/// so that library consumers (e.g. shoebox inside a Tauri app) can inject
/// state after the scheduler has been constructed by the parent.
#[derive(Default)]
pub struct StateMap {
pub(crate) struct StateMap {
inner: RwLock<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>,
}

Expand Down Expand Up @@ -69,6 +78,56 @@ impl StateMap {
}
}

// ── Child Spawner ────────────────────────────────────────────────

/// Handle for spawning child tasks from within an executor.
///
/// Wraps a [`TaskStore`] reference and the parent task ID so that
/// child submissions automatically inherit the parent relationship.
/// Holds a `Notify` reference to wake the scheduler run loop after
/// spawning, so children are dispatched promptly.
#[derive(Clone)]
pub(crate) struct ChildSpawner {
store: TaskStore,
parent_id: i64,
work_notify: Arc<tokio::sync::Notify>,
}

impl ChildSpawner {
pub(crate) fn new(
store: TaskStore,
parent_id: i64,
work_notify: Arc<tokio::sync::Notify>,
) -> Self {
Self {
store,
parent_id,
work_notify,
}
}

/// Submit a single child task. Sets `parent_id` automatically.
pub async fn spawn(&self, mut sub: TaskSubmission) -> Result<SubmitOutcome, StoreError> {
sub.parent_id = Some(self.parent_id);
let outcome = self.store.submit(&sub).await?;
self.work_notify.notify_one();
Ok(outcome)
}

/// Submit multiple child tasks in a single transaction.
pub async fn spawn_batch(
&self,
submissions: &mut [TaskSubmission],
) -> Result<Vec<SubmitOutcome>, StoreError> {
for sub in submissions.iter_mut() {
sub.parent_id = Some(self.parent_id);
}
let outcomes = self.store.submit_batch(submissions).await?;
self.work_notify.notify_one();
Ok(outcomes)
}
}

// ── Task Context ─────────────────────────────────────────────────────

/// Execution context passed to a [`TaskExecutor`].
Expand All @@ -84,8 +143,10 @@ pub struct TaskContext {
pub token: CancellationToken,
/// Report progress back to the scheduler (0.0–1.0).
pub progress: ProgressReporter,
/// Shared application state set via [`SchedulerBuilder::app_state`].
/// Shared application state set via [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state).
pub(crate) app_state: StateSnapshot,
/// Spawner for creating child tasks. `None` for non-hierarchical contexts.
pub(crate) child_spawner: Option<ChildSpawner>,
}

impl TaskContext {
Expand All @@ -98,7 +159,8 @@ impl TaskContext {
}

/// Retrieve shared application state registered via
/// [`SchedulerBuilder::app_state`] or [`Scheduler::register_state`].
/// [`SchedulerBuilder::app_state`](crate::SchedulerBuilder::app_state) or
/// [`Scheduler::register_state`](crate::Scheduler::register_state).
///
/// Returns `None` if the type was never registered. Multiple types can
/// coexist — each is keyed by its concrete `TypeId`.
Expand All @@ -115,6 +177,31 @@ impl TaskContext {
pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
self.app_state.get::<T>()
}

/// Spawn a child task that will be tracked under this task as parent.
///
/// The child's `parent_id` is set automatically. Returns the submit
/// outcome, or `None` if this context was not created with hierarchy
/// support (should not happen in normal scheduler operation).
pub async fn spawn_child(&self, sub: TaskSubmission) -> Result<SubmitOutcome, StoreError> {
let spawner = self
.child_spawner
.as_ref()
.expect("spawn_child called on a context without ChildSpawner");
spawner.spawn(sub).await
}

/// Spawn multiple child tasks in a single transaction.
pub async fn spawn_children(
&self,
mut submissions: Vec<TaskSubmission>,
) -> Result<Vec<SubmitOutcome>, StoreError> {
let spawner = self
.child_spawner
.as_ref()
.expect("spawn_children called on a context without ChildSpawner");
spawner.spawn_batch(&mut submissions).await
}
}

/// Executes tasks of a registered type.
Expand Down Expand Up @@ -156,6 +243,21 @@ pub trait TaskExecutor: Send + Sync + 'static {
&'a self,
ctx: &'a TaskContext,
) -> impl Future<Output = Result<TaskResult, TaskError>> + Send + 'a;

/// Called after all children of a parent task have completed.
///
/// Only invoked for tasks that spawned children via
/// [`TaskContext::spawn_child`]. The default implementation is a no-op
/// that returns zero IO bytes.
///
/// Use this for cleanup or assembly work (e.g. calling
/// `CompleteMultipartUpload` after all parts have been uploaded).
fn finalize<'a>(
&'a self,
_ctx: &'a TaskContext,
) -> impl Future<Output = Result<TaskResult, TaskError>> + Send + 'a {
async { Ok(TaskResult::zero()) }
}
}

/// Registry mapping task type names to their executors.
Expand All @@ -176,6 +278,11 @@ pub(crate) trait ErasedExecutor: Send + Sync + 'static {
&'a self,
ctx: &'a TaskContext,
) -> std::pin::Pin<Box<dyn Future<Output = Result<TaskResult, TaskError>> + Send + 'a>>;

fn finalize_erased<'a>(
&'a self,
ctx: &'a TaskContext,
) -> std::pin::Pin<Box<dyn Future<Output = Result<TaskResult, TaskError>> + Send + 'a>>;
}

impl<T: TaskExecutor> ErasedExecutor for T {
Expand All @@ -185,9 +292,17 @@ impl<T: TaskExecutor> ErasedExecutor for T {
) -> std::pin::Pin<Box<dyn Future<Output = Result<TaskResult, TaskError>> + Send + 'a>> {
Box::pin(self.execute(ctx))
}

fn finalize_erased<'a>(
&'a self,
ctx: &'a TaskContext,
) -> std::pin::Pin<Box<dyn Future<Output = Result<TaskResult, TaskError>> + Send + 'a>> {
Box::pin(self.finalize(ctx))
}
}

impl TaskTypeRegistry {
/// Create an empty registry.
pub fn new() -> Self {
Self {
types: HashMap::new(),
Expand Down Expand Up @@ -221,6 +336,7 @@ impl TaskTypeRegistry {
self.types.len()
}

/// Returns `true` if no executors have been registered.
pub fn is_empty(&self) -> bool {
self.types.is_empty()
}
Expand Down
Loading
Loading