Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions crates/workflow/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ impl<D: WorkflowData> StepDefinition<D> {
}
}

#[must_use]
pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = Some(policy);
self
}

#[must_use]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}

#[must_use]
pub fn with_failure_action(mut self, action: FailureAction) -> Self {
self.on_failure = action;
self
Expand All @@ -105,8 +108,16 @@ impl<D: WorkflowData> StepDefinition<D> {
/// Set dependencies for this step.
/// The step will only run after ALL specified dependencies have completed successfully.
/// Empty slice means no dependencies - step can run immediately in parallel with others.
pub fn depends_on(mut self, deps: &[&str]) -> Self {
self.depends_on = deps.iter().map(|s| StepId::new(*s)).collect();
///
/// Accepts anything iterable into `&str`-like values:
/// `&["step_a", "step_b"]`, `vec![s1, s2]` (where `si: String`), etc.
#[must_use]
pub fn depends_on<I, S>(mut self, deps: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
self.depends_on = deps.into_iter().map(|s| StepId::new(s.as_ref())).collect();
self
}

Expand All @@ -124,15 +135,21 @@ impl<D: WorkflowData> StepDefinition<D> {
///
/// **Skipped dependencies**: A skipped dependency (e.g., via `run_if`) counts as
/// "completed" for dependency satisfaction purposes.
pub fn depends_on_any(mut self, deps: &[&str]) -> Self {
self.depends_on_any = deps.iter().map(|s| StepId::new(*s)).collect();
#[must_use]
pub fn depends_on_any<I, S>(mut self, deps: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
self.depends_on_any = deps.into_iter().map(|s| StepId::new(s.as_ref())).collect();
self
}

/// Set a delay before starting the step (after dependencies are satisfied).
///
/// If both `delay` and `scheduled_at` are set, the step will wait for the
/// scheduled time AND THEN wait for the delay duration (they stack).
#[must_use]
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
Expand All @@ -146,6 +163,7 @@ impl<D: WorkflowData> StepDefinition<D> {
///
/// If both `delay` and `scheduled_at` are set, the step will wait for the
/// scheduled time AND THEN wait for the delay duration (they stack).
#[must_use]
pub fn scheduled_at(mut self, time: DateTime<Utc>) -> Self {
self.scheduled_at = Some(time);
self
Expand All @@ -164,6 +182,7 @@ impl<D: WorkflowData> StepDefinition<D> {
///
/// **Note**: The condition closure is not serializable, so workflows with `run_if`
/// cannot be persisted and resumed from external storage.
#[must_use]
pub fn run_if<F>(mut self, condition: F) -> Self
where
F: Fn(&WorkflowContext<D>) -> bool + Send + Sync + 'static,
Expand Down Expand Up @@ -216,16 +235,19 @@ impl<D: WorkflowData> WorkflowDefinition<D> {
}
}

#[must_use]
pub fn add_step(mut self, step: StepDefinition<D>) -> Self {
self.steps.push(step);
self
}

#[must_use]
pub fn with_default_retry(mut self, policy: RetryPolicy) -> Self {
self.default_retry_policy = policy;
self
}

#[must_use]
pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = timeout;
self
Expand Down
Loading
Loading