diff --git a/crates/workflow/src/definition.rs b/crates/workflow/src/definition.rs index 4d900b035..3ff7644c3 100644 --- a/crates/workflow/src/definition.rs +++ b/crates/workflow/src/definition.rs @@ -87,16 +87,19 @@ impl StepDefinition { } } + #[must_use] pub fn with_retry(mut self, policy: RetryPolicy) -> Self { self.retry_policy = Some(policy); self } + #[must_use] pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = Some(timeout); self } + #[must_use] pub fn with_failure_action(mut self, action: FailureAction) -> Self { self.on_failure = action; self @@ -105,8 +108,16 @@ impl StepDefinition { /// Set dependencies for this step. /// The step will only run after ALL specified dependencies have completed successfully. /// Empty slice means no dependencies - step can run immediately in parallel with others. - pub fn depends_on(mut self, deps: &[&str]) -> Self { - self.depends_on = deps.iter().map(|s| StepId::new(*s)).collect(); + /// + /// Accepts anything iterable into `&str`-like values: + /// `&["step_a", "step_b"]`, `vec![s1, s2]` (where `si: String`), etc. + #[must_use] + pub fn depends_on(mut self, deps: I) -> Self + where + I: IntoIterator, + S: AsRef, + { + self.depends_on = deps.into_iter().map(|s| StepId::new(s.as_ref())).collect(); self } @@ -124,8 +135,13 @@ impl StepDefinition { /// /// **Skipped dependencies**: A skipped dependency (e.g., via `run_if`) counts as /// "completed" for dependency satisfaction purposes. - pub fn depends_on_any(mut self, deps: &[&str]) -> Self { - self.depends_on_any = deps.iter().map(|s| StepId::new(*s)).collect(); + #[must_use] + pub fn depends_on_any(mut self, deps: I) -> Self + where + I: IntoIterator, + S: AsRef, + { + self.depends_on_any = deps.into_iter().map(|s| StepId::new(s.as_ref())).collect(); self } @@ -133,6 +149,7 @@ impl StepDefinition { /// /// If both `delay` and `scheduled_at` are set, the step will wait for the /// scheduled time AND THEN wait for the delay duration (they stack). + #[must_use] pub fn with_delay(mut self, delay: Duration) -> Self { self.delay = Some(delay); self @@ -146,6 +163,7 @@ impl StepDefinition { /// /// If both `delay` and `scheduled_at` are set, the step will wait for the /// scheduled time AND THEN wait for the delay duration (they stack). + #[must_use] pub fn scheduled_at(mut self, time: DateTime) -> Self { self.scheduled_at = Some(time); self @@ -164,6 +182,7 @@ impl StepDefinition { /// /// **Note**: The condition closure is not serializable, so workflows with `run_if` /// cannot be persisted and resumed from external storage. + #[must_use] pub fn run_if(mut self, condition: F) -> Self where F: Fn(&WorkflowContext) -> bool + Send + Sync + 'static, @@ -216,16 +235,19 @@ impl WorkflowDefinition { } } + #[must_use] pub fn add_step(mut self, step: StepDefinition) -> Self { self.steps.push(step); self } + #[must_use] pub fn with_default_retry(mut self, policy: RetryPolicy) -> Self { self.default_retry_policy = policy; self } + #[must_use] pub fn with_default_timeout(mut self, timeout: Duration) -> Self { self.default_timeout = timeout; self diff --git a/crates/workflow/src/engine.rs b/crates/workflow/src/engine.rs index 763858695..e13742739 100644 --- a/crates/workflow/src/engine.rs +++ b/crates/workflow/src/engine.rs @@ -18,7 +18,7 @@ use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use chrono::Utc; use parking_lot::RwLock; use tokio::{ - sync::{mpsc, watch}, + sync::{mpsc, watch, Notify}, time::timeout, }; @@ -29,6 +29,15 @@ use crate::{ types::*, }; +/// Terminal outcome of a workflow run, used by [`WorkflowEngine::finalize`] +/// to derive both the persisted [`WorkflowStatus`] and the matching +/// [`WorkflowEvent`] in one step. +enum WorkflowOutcome { + Completed { duration: Duration }, + Failed { failed_step: StepId, error: String }, + Cancelled, +} + #[derive(Default)] struct StepTracker { completed: HashSet, @@ -187,6 +196,10 @@ pub struct WorkflowEngine = InMemoryStore> shutdown_tx: Arc>, /// Count of active workflow executions active_workflows: Arc, + /// Per-instance completion notifiers. Inserted by `start_workflow`, + /// fired and removed by `finalize`. Lets `wait_for_completion` + /// block on a `Notify` instead of polling the state store. + completion_notifiers: Arc>>>, _phantom: PhantomData, } @@ -206,6 +219,7 @@ impl + 'static> WorkflowEngine { event_bus: Arc::new(EventBus::new()), shutdown_tx: Arc::new(shutdown_tx), active_workflows: Arc::new(AtomicUsize::new(0)), + completion_notifiers: Arc::new(RwLock::new(HashMap::new())), _phantom: PhantomData, } } @@ -447,6 +461,13 @@ impl + 'static> WorkflowEngine { self.state_store.save(state).await?; + // Register the completion slot before publishing `WorkflowStarted` + // so a `wait_for_completion` call that races the spawned task + // always finds something to subscribe to. `finalize` removes it. + self.completion_notifiers + .write() + .insert(instance_id, Arc::new(Notify::new())); + self.event_bus .publish(WorkflowEvent::WorkflowStarted { instance_id, @@ -498,6 +519,9 @@ impl + 'static> WorkflowEngine { loop { if self.state_store.is_cancelled(instance_id).await? { + // The status is already Cancelled (set by `cancel_workflow`); + // we just need to fan the event out and exit. Use the + // event-bus directly to avoid a redundant state write. self.event_bus .publish(WorkflowEvent::WorkflowCancelled { instance_id }) .await; @@ -568,16 +592,13 @@ impl + 'static> WorkflowEngine { t.clear_waiting(idx); } - // Collect steps ready to launch, deduplicating indices. - // A step with depends_on_any([A, B]) appears in pending_check once - // per completed dependency, but must only launch once. - let mut seen = HashSet::new(); - let mut ready: Vec = Vec::new(); - for idx in newly_ready_from_wait { - if seen.insert(idx) { - ready.push(idx); - } - } + // Wait-ready indices come from the `waiting_until` HashMap + // and are unique by construction. Dedup is only needed for + // `deps_ready_indices`, where a step with + // `depends_on_any([A, B])` is pushed onto `pending_check` + // once per completed dependency. + let mut ready: Vec = newly_ready_from_wait; + let mut seen: HashSet = ready.iter().copied().collect(); for idx in deps_ready_indices { let step = &definition.steps[idx]; @@ -585,7 +606,6 @@ impl + 'static> WorkflowEngine { if let Some(duration) = wait_duration { if duration > Duration::ZERO { - // Step needs to wait - add to waiting_until let ready_at = now + duration; tracing::debug!( step_id = %step.id, @@ -622,26 +642,20 @@ impl + 'static> WorkflowEngine { && pending_check.is_empty() { let failed_step = tracker.read().failed.iter().next().cloned(); - // Use &'static str to avoid allocation in common error paths - let error_message: &'static str = if failed_step.is_some() { + let error = if failed_step.is_some() { "Workflow failed due to step dependency failure" } else { "Workflow deadlocked: no steps ready and none running" }; - - self.state_store - .update(instance_id, |s| { - s.status = WorkflowStatus::Failed; - }) - .await?; - self.event_bus - .publish(WorkflowEvent::WorkflowFailed { - instance_id, + self.finalize( + instance_id, + WorkflowOutcome::Failed { failed_step: failed_step .unwrap_or_else(|| StepId::new("internal_scheduler")), - error: error_message.to_string(), - }) - .await; + error: error.to_string(), + }, + ) + .await?; return Ok(()); } @@ -867,34 +881,16 @@ impl + 'static> WorkflowEngine { t.failed.iter().next().cloned() }; - if let Some(ref step) = failed_step { - self.state_store - .update(instance_id, |s| { - s.status = WorkflowStatus::Failed; - }) - .await?; - self.event_bus - .publish(WorkflowEvent::WorkflowFailed { - instance_id, - failed_step: step.clone(), - error: "One or more steps failed".into(), - }) - .await; - } else { - self.state_store - .update(instance_id, |s| { - s.status = WorkflowStatus::Completed; - }) - .await?; - - let duration = start_time.elapsed(); - self.event_bus - .publish(WorkflowEvent::WorkflowCompleted { - instance_id, - duration, - }) - .await; - } + let outcome = match failed_step { + Some(failed_step) => WorkflowOutcome::Failed { + failed_step, + error: "One or more steps failed".into(), + }, + None => WorkflowOutcome::Completed { + duration: start_time.elapsed(), + }, + }; + self.finalize(instance_id, outcome).await?; Ok(()) } @@ -1006,7 +1002,25 @@ impl + 'static> WorkflowEngine { _ => ("Step failed".to_string(), false), }; - let will_retry = should_retry && attempt < max_attempts; + // Resolve whether we'll retry AND the delay together. + // `next_backoff()` returns `None` when the policy is + // exhausted — honor that instead of silently swapping + // in a 1s default, which would mask a misconfigured + // policy and turn "stop" into "retry forever". + let retry_delay = if should_retry && attempt < max_attempts { + let next = backoff.next_backoff(); + if next.is_none() { + tracing::warn!( + step_id = %step.id, + attempt, + "Backoff exhausted; falling through to on_failure" + ); + } + next + } else { + None + }; + let will_retry = retry_delay.is_some(); // Update step state self.state_store @@ -1035,12 +1049,7 @@ impl + 'static> WorkflowEngine { }) .await; - if will_retry { - // Calculate backoff delay - let delay = backoff - .next_backoff() - .unwrap_or_else(|| Duration::from_secs(1)); - + if let Some(delay) = retry_delay { self.event_bus .publish(WorkflowEvent::StepRetrying { instance_id, @@ -1090,16 +1099,49 @@ impl + 'static> WorkflowEngine { /// Cancel a running workflow pub async fn cancel_workflow(&self, instance_id: WorkflowInstanceId) -> WorkflowResult<()> { + self.finalize(instance_id, WorkflowOutcome::Cancelled).await + } + + /// Persist a terminal status, emit the matching workflow event, and + /// wake any `wait_for_completion` callers. + /// + /// Used by every termination path (deadlock, step failure, normal + /// completion, explicit cancel) so the "update state + publish event + + /// notify" trio stays in lockstep — historically these were open-coded + /// in three different sites and could drift. + async fn finalize( + &self, + instance_id: WorkflowInstanceId, + outcome: WorkflowOutcome, + ) -> WorkflowResult<()> { + let new_status = match outcome { + WorkflowOutcome::Completed { .. } => WorkflowStatus::Completed, + WorkflowOutcome::Failed { .. } => WorkflowStatus::Failed, + WorkflowOutcome::Cancelled => WorkflowStatus::Cancelled, + }; self.state_store .update(instance_id, |s| { - s.status = WorkflowStatus::Cancelled; + s.status = new_status; }) .await?; - - self.event_bus - .publish(WorkflowEvent::WorkflowCancelled { instance_id }) - .await; - + let event = match outcome { + WorkflowOutcome::Completed { duration } => WorkflowEvent::WorkflowCompleted { + instance_id, + duration, + }, + WorkflowOutcome::Failed { failed_step, error } => WorkflowEvent::WorkflowFailed { + instance_id, + failed_step, + error, + }, + WorkflowOutcome::Cancelled => WorkflowEvent::WorkflowCancelled { instance_id }, + }; + self.event_bus.publish(event).await; + // Wake any waiters and drop the slot. Callers arriving after this + // point fall back to reading the state store directly. + if let Some(notify) = self.completion_notifiers.write().remove(&instance_id) { + notify.notify_waiters(); + } Ok(()) } @@ -1111,60 +1153,96 @@ impl + 'static> WorkflowEngine { self.state_store.load(instance_id).await } - /// Wait for a workflow to complete with adaptive polling + /// Wait for a workflow to reach a terminal state. + /// + /// Subscribes to the per-instance completion notifier registered by + /// `start_workflow`; the spawned execution task fires it via + /// [`finalize`](Self::finalize) when the workflow ends. If the slot + /// is already gone (workflow finished before this call), the current + /// state is read directly. /// - /// Returns Ok with success message on completion, Err on failure/timeout/cancellation. - /// Automatically cleans up terminal workflow states. + /// Returns `Ok` with a success message on completion, `Err` on + /// failure/timeout/cancellation. Automatically cleans up terminal + /// workflow states. pub async fn wait_for_completion( &self, instance_id: WorkflowInstanceId, label: &str, timeout_duration: Duration, ) -> Result { - let start = std::time::Instant::now(); - let mut poll_interval = Duration::from_millis(100); - let max_poll_interval = Duration::from_millis(2000); - let poll_backoff = Duration::from_millis(200); - - loop { - if start.elapsed() > timeout_duration { - return Err(format!( - "Workflow timeout after {}s for {}", - timeout_duration.as_secs(), - label - )); - } + // Snapshot the notifier handle (if any) and prepare the wait + // future *before* checking the state, so we can't miss a + // `notify_waiters` that fires between the state read and the + // await. + let notifier = self.completion_notifiers.read().get(&instance_id).cloned(); + let waiter = notifier.as_ref().map(|n| n.notified()); + + // First state read: handles "already terminal" and "no notifier" + // (workflow finalised before we got here) without ever sleeping. + let state = self + .get_status(instance_id) + .await + .map_err(|e| format!("Failed to get workflow status: {e:?}"))?; + if let Some(result) = Self::result_from_state(&state, label) { + self.state_store.cleanup_if_terminal(instance_id).await; + return result; + } - let state = self - .get_status(instance_id) - .await - .map_err(|e| format!("Failed to get workflow status: {e:?}"))?; + let Some(waiter) = waiter else { + // Notifier slot is gone but state isn't terminal yet — this + // shouldn't happen because `finalize` updates state before + // removing the slot. Fall back to a single timed poll. + return Err(format!( + "Workflow {label} has no completion notifier and is still {:?}", + state.status + )); + }; - let result = match state.status { - WorkflowStatus::Completed => { - Ok(format!("{label} completed successfully via workflow")) - } - WorkflowStatus::Failed => { - let current_step = state.current_step.as_ref(); - let step_name = current_step - .map(|s| s.to_string()) - .unwrap_or_else(|| "unknown".to_string()); - let error_msg = current_step - .and_then(|step_id| state.step_states.get(step_id)) - .and_then(|s| s.last_error.as_deref()) - .unwrap_or("Unknown error"); - Err(format!("Workflow failed at step {step_name}: {error_msg}")) - } - WorkflowStatus::Cancelled => Err(format!("Workflow cancelled for {label}")), - WorkflowStatus::Pending | WorkflowStatus::Paused | WorkflowStatus::Running => { - tokio::time::sleep(poll_interval).await; - poll_interval = (poll_interval + poll_backoff).min(max_poll_interval); - continue; - } - }; + match timeout(timeout_duration, waiter).await { + Ok(()) => { + let state = self + .get_status(instance_id) + .await + .map_err(|e| format!("Failed to get workflow status: {e:?}"))?; + let result = Self::result_from_state(&state, label).unwrap_or_else(|| { + Err(format!( + "Workflow {label} was notified but state is still {:?}", + state.status + )) + }); + self.state_store.cleanup_if_terminal(instance_id).await; + result + } + Err(_) => Err(format!( + "Workflow timeout after {}s for {}", + timeout_duration.as_secs(), + label + )), + } + } - self.state_store.cleanup_if_terminal(instance_id).await; - return result; + /// Map a workflow state to a `wait_for_completion` result, or `None` + /// if the workflow is still in a non-terminal status. + fn result_from_state(state: &WorkflowState, label: &str) -> Option> { + match state.status { + WorkflowStatus::Completed => { + Some(Ok(format!("{label} completed successfully via workflow"))) + } + WorkflowStatus::Failed => { + let current_step = state.current_step.as_ref(); + let step_name = current_step + .map(|s| s.to_string()) + .unwrap_or_else(|| "unknown".to_string()); + let error_msg = current_step + .and_then(|step_id| state.step_states.get(step_id)) + .and_then(|s| s.last_error.as_deref()) + .unwrap_or("Unknown error"); + Some(Err(format!( + "Workflow failed at step {step_name}: {error_msg}" + ))) + } + WorkflowStatus::Cancelled => Some(Err(format!("Workflow cancelled for {label}"))), + WorkflowStatus::Pending | WorkflowStatus::Paused | WorkflowStatus::Running => None, } } @@ -1176,6 +1254,7 @@ impl + 'static> WorkflowEngine { event_bus: Arc::clone(&self.event_bus), shutdown_tx: Arc::clone(&self.shutdown_tx), active_workflows: Arc::clone(&self.active_workflows), + completion_notifiers: Arc::clone(&self.completion_notifiers), _phantom: PhantomData, } } diff --git a/crates/workflow/src/event.rs b/crates/workflow/src/event.rs index 89dffdc85..d3bb19928 100644 --- a/crates/workflow/src/event.rs +++ b/crates/workflow/src/event.rs @@ -116,29 +116,8 @@ impl EventBus { /// Subscriber failures (timeout or panic) are logged but don't affect /// other subscribers or the caller. pub async fn publish(&self, event: WorkflowEvent) { - let subscribers: Vec<_> = self.subscribers.read().await.iter().cloned().collect(); - let timeout = self.subscriber_timeout; - - for (idx, subscriber) in subscribers.into_iter().enumerate() { - let event = event.clone(); - #[expect( - clippy::disallowed_methods, - reason = "fire-and-forget event notification; subscriber timeout and failure are logged internally" - )] - tokio::spawn(async move { - let result = tokio::time::timeout(timeout, subscriber.on_event(&event)).await; - match result { - Ok(()) => {} - Err(_) => { - warn!( - subscriber_index = idx, - timeout_secs = timeout.as_secs(), - "Event subscriber timed out" - ); - } - } - }); - } + // Drop the handles — caller doesn't wait for subscribers. + let _ = self.spawn_subscriber_tasks(event).await; } /// Publish an event and wait for all subscribers to complete @@ -147,15 +126,30 @@ impl EventBus { /// (or timeout). Use this when you need to ensure all subscribers /// have processed the event before continuing. pub async fn publish_and_wait(&self, event: WorkflowEvent) { + let handles = self.spawn_subscriber_tasks(event).await; + for handle in handles { + let _ = handle.await; + } + } + + /// Spawn one task per subscriber to deliver `event`. Returns join + /// handles so callers can choose to drop (fire-and-forget) or await. + async fn spawn_subscriber_tasks( + &self, + event: WorkflowEvent, + ) -> Vec> { let subscribers: Vec<_> = self.subscribers.read().await.iter().cloned().collect(); let timeout = self.subscriber_timeout; - let handles: Vec<_> = subscribers + subscribers .into_iter() .enumerate() .map(|(idx, subscriber)| { let event = event.clone(); - #[expect(clippy::disallowed_methods, reason = "parallel event fan-out; handles are collected and awaited by publish_and_wait")] + #[expect( + clippy::disallowed_methods, + reason = "parallel event fan-out; caller decides whether to await the handles" + )] tokio::spawn(async move { let result = tokio::time::timeout(timeout, subscriber.on_event(&event)).await; if result.is_err() { @@ -167,12 +161,7 @@ impl EventBus { } }) }) - .collect(); - - // Wait for all spawned tasks, ignoring individual failures (panics) - for handle in handles { - let _ = handle.await; - } + .collect() } } diff --git a/crates/workflow/src/state.rs b/crates/workflow/src/state.rs index c975a732b..dac19bc4b 100644 --- a/crates/workflow/src/state.rs +++ b/crates/workflow/src/state.rs @@ -150,7 +150,6 @@ impl StateStore for InMemoryStore { let initial_count = states.len(); states.retain(|_, state| { - // Keep active workflows if matches!( state.status, WorkflowStatus::Running | WorkflowStatus::Pending | WorkflowStatus::Paused @@ -158,11 +157,14 @@ impl StateStore for InMemoryStore { return true; } - // For terminal workflows, check age + // A future `updated_at` (clock skew, manual fiddling) returns a + // negative duration that fails `to_std()`. Treat that as + // "max age" so the workflow is still eligible for cleanup + // instead of being kept forever. let age = now .signed_duration_since(state.updated_at) .to_std() - .unwrap_or_default(); + .unwrap_or(Duration::MAX); age < ttl }); diff --git a/crates/workflow/src/types.rs b/crates/workflow/src/types.rs index 4c324b14a..67bbd640e 100644 --- a/crates/workflow/src/types.rs +++ b/crates/workflow/src/types.rs @@ -79,6 +79,18 @@ impl StepId { } } +impl From<&str> for StepId { + fn from(s: &str) -> Self { + Self(s.to_string()) + } +} + +impl From for StepId { + fn from(s: String) -> Self { + Self(s) + } +} + impl fmt::Display for StepId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) diff --git a/crates/workflow/tests/workflow_test.rs b/crates/workflow/tests/workflow_test.rs index ec16ed70e..24b12dc0a 100644 --- a/crates/workflow/tests/workflow_test.rs +++ b/crates/workflow/tests/workflow_test.rs @@ -528,7 +528,7 @@ async fn test_dag_with_dependencies() { end_times: Arc::clone(&end_times), }), ) - .depends_on(&["step_a", "step_b"]), + .depends_on(["step_a", "step_b"]), ); let workflow_id = workflow.id.clone(); @@ -617,7 +617,7 @@ async fn test_dag_dependency_failure_blocks_dependents() { counter: Arc::clone(&b_executed), }), ) - .depends_on(&["step_a"]), + .depends_on(["step_a"]), ); let workflow_id = workflow.id.clone(); @@ -650,15 +650,15 @@ fn test_dag_validation_cycle_detection() { let mut workflow = WorkflowDefinition::new("cyclic_workflow", "Cyclic Test") .add_step( StepDefinition::new("step_a", "Step A", Arc::new(AlwaysSucceedStep)) - .depends_on(&["step_c"]), + .depends_on(["step_c"]), ) .add_step( StepDefinition::new("step_b", "Step B", Arc::new(AlwaysSucceedStep)) - .depends_on(&["step_a"]), + .depends_on(["step_a"]), ) .add_step( StepDefinition::new("step_c", "Step C", Arc::new(AlwaysSucceedStep)) - .depends_on(&["step_b"]), + .depends_on(["step_b"]), ); let result = workflow.validate(); @@ -680,7 +680,7 @@ fn test_dag_validation_missing_dependency() { )) .add_step( StepDefinition::new("step_b", "Step B", Arc::new(AlwaysSucceedStep)) - .depends_on(&["nonexistent_step"]), + .depends_on(["nonexistent_step"]), ); let result = workflow.validate(); @@ -707,11 +707,11 @@ fn test_dag_validation_valid_workflow() { )) .add_step( StepDefinition::new("step_c", "Step C", Arc::new(AlwaysSucceedStep)) - .depends_on(&["step_a", "step_b"]), + .depends_on(["step_a", "step_b"]), ) .add_step( StepDefinition::new("step_d", "Step D", Arc::new(AlwaysSucceedStep)) - .depends_on(&["step_c"]), + .depends_on(["step_c"]), ); let result = workflow.validate(); @@ -968,7 +968,7 @@ async fn test_run_if_context_based() { "Conditional Step", Arc::new(TrackingStep { counter: executed }), ) - .depends_on(&["set_key_step"]) + .depends_on(["set_key_step"]) .run_if(|ctx| ctx.data.test_key.as_deref() == Some("execute_next")), ); @@ -1037,7 +1037,7 @@ async fn test_depends_on_any() { end_times: Arc::clone(&end_times), }), ) - .depends_on_any(&["step_a", "step_b"]), + .depends_on_any(["step_a", "step_b"]), ); let workflow_id = workflow.id.clone(); @@ -1126,8 +1126,8 @@ async fn test_depends_on_any_combined_with_depends_on() { end_times: Arc::clone(&end_times), }), ) - .depends_on(&["step_a"]) // Must wait for A - .depends_on_any(&["step_b", "step_d"]), // AND any of B or D + .depends_on(["step_a"]) // Must wait for A + .depends_on_any(["step_b", "step_d"]), // AND any of B or D ); let workflow_id = workflow.id.clone(); @@ -1180,7 +1180,7 @@ fn test_dag_validation_depends_on_any_missing() { )) .add_step( StepDefinition::new("step_b", "Step B", Arc::new(AlwaysSucceedStep)) - .depends_on_any(&["nonexistent_step"]), + .depends_on_any(["nonexistent_step"]), ); let result = workflow.validate(); @@ -1237,7 +1237,7 @@ async fn test_depends_on_any_all_fail() { counter: c_executed, }), ) - .depends_on_any(&["step_a", "step_b"]), + .depends_on_any(["step_a", "step_b"]), ); let workflow_id = workflow.id.clone(); @@ -1314,7 +1314,7 @@ async fn test_depends_on_any_one_fails_one_succeeds() { counter: c_executed, }), ) - .depends_on_any(&["step_a", "step_b"]), + .depends_on_any(["step_a", "step_b"]), ); let workflow_id = workflow.id.clone(); diff --git a/model_gateway/src/workflow/mcp_registration.rs b/model_gateway/src/workflow/mcp_registration.rs index eccd54b12..3ab783579 100644 --- a/model_gateway/src/workflow/mcp_registration.rs +++ b/model_gateway/src/workflow/mcp_registration.rs @@ -191,7 +191,7 @@ pub fn create_mcp_registration_workflow() -> WorkflowDefinition ) .with_timeout(Duration::from_secs(1)) .with_failure_action(FailureAction::FailWorkflow) - .depends_on(&["connect_mcp_server"]), + .depends_on(["connect_mcp_server"]), ) } diff --git a/model_gateway/src/workflow/steps/local/mod.rs b/model_gateway/src/workflow/steps/local/mod.rs index e0f1a0623..ebeb15487 100644 --- a/model_gateway/src/workflow/steps/local/mod.rs +++ b/model_gateway/src/workflow/steps/local/mod.rs @@ -108,7 +108,7 @@ pub fn create_worker_removal_workflow() -> WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition WorkflowDefinition