refactor(workflow): cleanup pass on the wfaas crate#1500
Conversation
A multi-agent review of `crates/workflow/` surfaced one correctness-relevant change, two bug fixes, two duplications worth collapsing, and a handful of small polish items. **`wait_for_completion` is now event-driven.** It used to poll the state store on a 100ms→2s adaptive cycle, taking a write/read lock and cloning the full `WorkflowState<D>` (including the workflow context) on every iteration just to learn that the workflow had ended. The engine now registers a `tokio::sync::Notify` per instance in `start_workflow` and fires it from a new `finalize` helper. `wait_for_completion` snapshots the notifier, reads state once (covers the "already terminal" case race-free), then awaits the notifier under the user's timeout. Removes per-request polling latency (was up to 2s tail) and ~10 deep state clones per worker registration / removal / update. **`finalize` collapses three duplicated terminal sites.** The deadlock branch, the post-loop completion branch, and the explicit `cancel_workflow` path were each open-coding the `state_store.update + event_bus.publish` pair. They now share `finalize(instance_id, WorkflowOutcome)` so the trio (state, event, notifier) stays in lockstep. **`EventBus::publish` and `publish_and_wait`** differed only in whether they awaited the spawned subscriber tasks. Extracted a single `spawn_subscriber_tasks` helper; `publish` drops the handles, `publish_and_wait` awaits them. ~30 lines saved, no behaviour change. **Bugs fixed:** - `cleanup_old_workflows` used `unwrap_or_default()` on the `signed_duration_since().to_std()` result. A `updated_at` in the future (clock skew, manual fiddling) returned `Duration::ZERO` and the workflow was kept forever. Now treats those as `Duration::MAX` so they stay eligible for TTL eviction. - `execute_step_with_retry` masked an exhausted `backoff::next_backoff()` with `unwrap_or_else(|| Duration::from_secs(1))`, silently turning "stop retrying" into "retry forever at 1s". Now folds backoff exhaustion into the retry decision and falls through to the `on_failure` hook. **Polish:** - `#[must_use]` on every builder method on `StepDefinition` and `WorkflowDefinition` so accidental drops like `step.with_timeout(...);` are flagged. - `depends_on` / `depends_on_any` now accept `IntoIterator<Item: AsRef<str>>` instead of `&[&str]`, removing the call-site `&` and accepting any string-like iterable. `From<&str>` and `From<String>` impls on `StepId` for callers who already have one. - Dropped a dead `seen` HashSet seeding loop over `newly_ready_from_wait` — those indices come from a HashMap and are unique by construction; dedup is only required for the `pending_check`-derived `deps_ready_indices`. Net `+292 / -188` across 11 files; the `wait_for_completion` refactor accounts for most of the new lines (helper + better doc comments). All 3388 workspace tests pass. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
📝 WalkthroughWalkthroughThe workflow engine API is generalized to accept flexible iterables for step dependencies, completion tracking is refactored to use per-instance notifiers, termination logic is unified through a finalize method, retry behavior is corrected to respect exhausted backoff, and all test and consumer call sites are migrated to the new syntax. ChangesWorkflow Engine Enhancement & API Migration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ec0607c8fc
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // The status is already Cancelled (set by `cancel_workflow`); | ||
| // we just need to fan the event out and exit. Use the | ||
| // event-bus directly to avoid a redundant state write. | ||
| self.event_bus |
There was a problem hiding this comment.
Notify waiters on externally cancelled workflows
When a workflow is marked Cancelled by a shared/custom StateStore (or any path other than this engine's cancel_workflow) after wait_for_completion has already read the non-terminal state, this branch publishes the cancellation event and returns without going through finalize, so the per-instance Notify is never fired or removed. The old polling waiter would observe the cancelled state on its next poll, but the new event-driven waiter will sleep until its timeout even though the workflow has already terminated.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/workflow/src/engine.rs (1)
521-528:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDuplicate
WorkflowCancelledevent published.When
cancel_workflowis called, it invokesfinalize(..., Cancelled)which already publishesWorkflowCancelled(line 1137-1139). Whenexecute_workflowsubsequently detects cancellation viais_cancelled()and reaches this branch, it publishes the same event again.The comment states "we just need to fan the event out", but
finalizehas already done this.Proposed fix
if self.state_store.is_cancelled(instance_id).await? { - // The status is already Cancelled (set by `cancel_workflow`); - // we just need to fan the event out and exit. Use the - // event-bus directly to avoid a redundant state write. - self.event_bus - .publish(WorkflowEvent::WorkflowCancelled { instance_id }) - .await; + // The status is already Cancelled and the event was published + // by `cancel_workflow` via `finalize`. Just exit cleanly. return Ok(()); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/workflow/src/engine.rs` around lines 521 - 528, The duplicate WorkflowCancelled event is published because cancel_workflow -> finalize(...) already emits WorkflowEvent::WorkflowCancelled; in execute_workflow when detecting cancellation via self.state_store.is_cancelled(instance_id).await? you should not republish the same event. Remove or guard the event_bus.publish(WorkflowEvent::WorkflowCancelled { instance_id }) call in execute_workflow (the branch that currently fans out the event) so it only returns Ok(()) after detecting cancellation, or add a check to avoid publishing if finalize has already emitted it; reference self.state_store.is_cancelled, execute_workflow, event_bus.publish, WorkflowEvent::WorkflowCancelled, cancel_workflow, and finalize to locate and update the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/workflow/src/event.rs`:
- Around line 129-132: The code currently ignores JoinError by doing let _ =
handle.await in publish_and_wait after calling spawn_subscriber_tasks(event);
update publish_and_wait to handle each handle.await result instead of discarding
it: inspect the Result from handle.await, log or propagate JoinError with
context (including subscriber/task id or event info) and ensure failures are
surfaced (e.g., collect errors and return Err or processLogger/error macro);
reference the spawn_subscriber_tasks and the handles iteration where
handle.await is used to implement this error handling.
---
Outside diff comments:
In `@crates/workflow/src/engine.rs`:
- Around line 521-528: The duplicate WorkflowCancelled event is published
because cancel_workflow -> finalize(...) already emits
WorkflowEvent::WorkflowCancelled; in execute_workflow when detecting
cancellation via self.state_store.is_cancelled(instance_id).await? you should
not republish the same event. Remove or guard the
event_bus.publish(WorkflowEvent::WorkflowCancelled { instance_id }) call in
execute_workflow (the branch that currently fans out the event) so it only
returns Ok(()) after detecting cancellation, or add a check to avoid publishing
if finalize has already emitted it; reference self.state_store.is_cancelled,
execute_workflow, event_bus.publish, WorkflowEvent::WorkflowCancelled,
cancel_workflow, and finalize to locate and update the code.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 96393705-915b-456d-88cf-3dd0e38fe40b
📒 Files selected for processing (11)
crates/workflow/src/definition.rscrates/workflow/src/engine.rscrates/workflow/src/event.rscrates/workflow/src/state.rscrates/workflow/src/types.rscrates/workflow/tests/workflow_test.rsmodel_gateway/src/workflow/mcp_registration.rsmodel_gateway/src/workflow/steps/local/mod.rsmodel_gateway/src/workflow/steps/mod.rsmodel_gateway/src/workflow/wasm_module_registration.rsmodel_gateway/src/workflow/wasm_module_removal.rs
| let handles = self.spawn_subscriber_tasks(event).await; | ||
| for handle in handles { | ||
| let _ = handle.await; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify that JoinHandle errors are currently discarded in publish_and_wait
rg -n --type=rust -C3 'publish_and_wait|for handle in handles|handle\.await' crates/workflow/src/event.rs
# Verify existing timeout logging and absence/presence of join-error logging paths
rg -n --type=rust -C2 'timed out|JoinError|is_panic|subscriber task failed' crates/workflow/src/event.rsRepository: lightseekorg/smg
Length of output: 749
🏁 Script executed:
#!/bin/bash
# Find spawn_subscriber_tasks implementation to understand task type
rg -n --type=rust -A10 'fn spawn_subscriber_tasks' crates/workflow/src/event.rs
# Look for tests of publish_and_wait
rg -n --type=rust 'test.*publish_and_wait|publish_and_wait.*test' crates/workflow/src/
# Check if warn! macro is used elsewhere in this file
rg -n --type=rust 'warn!' crates/workflow/src/event.rs
# Check imports to see what logging is available
rg -n --type=rust '^use|^extern' crates/workflow/src/event.rs | head -20Repository: lightseekorg/smg
Length of output: 737
🏁 Script executed:
#!/bin/bash
# Search for tests that might test publish_and_wait behavior
fd -t f '\.rs$' --type f | xargs grep -l 'publish_and_wait' 2>/dev/null
# Look for other handle.await patterns in rust files to see if this is a widespread pattern
rg -n --type=rust 'handle\.await' crates/workflow/src/
# Check if there's a tests directory
find . -type d -name 'tests' -o -type d -name 'test' | head -5Repository: lightseekorg/smg
Length of output: 276
🏁 Script executed:
#!/bin/bash
# Check for tests in workflow crate
find crates/workflow -name '*.rs' -type f | xargs grep -l 'publish_and_wait\|#\[test\]\|#\[tokio::test\]' 2>/dev/null
# Look for any #[test] or #[tokio::test] in event.rs
rg -n --type=rust '#\[.*test' crates/workflow/src/event.rs
# Check if there's a mod tests section at the bottom of event.rs
tail -50 crates/workflow/src/event.rsRepository: lightseekorg/smg
Length of output: 1713
🏁 Script executed:
#!/bin/bash
# Check workflow_test.rs for publish_and_wait tests
rg -n --type=rust -B2 -A15 'publish_and_wait' crates/workflow/tests/workflow_test.rs
# Also check if event.rs has any comments about error handling intention
rg -n --type=rust -B2 -A2 'let _ = handle' crates/workflow/src/event.rsRepository: lightseekorg/smg
Length of output: 226
🏁 Script executed:
#!/bin/bash
# Check what's in workflow_test.rs
wc -l crates/workflow/tests/workflow_test.rs
# See if there are any tests that call publish_and_wait indirectly
rg -n --type=rust 'pub_event|EventPublisher|publish' crates/workflow/tests/workflow_test.rs | head -20
# Check for any error handling patterns in the spawn_subscriber_tasks implementation
rg -n --type=rust -A30 'fn spawn_subscriber_tasks' crates/workflow/src/event.rs | grep -A20 'into_iter'Repository: lightseekorg/smg
Length of output: 1008
Handle subscriber task join failures instead of discarding them.
At line 131, let _ = handle.await drops any JoinError from subscriber task panics or cancellations. This prevents visibility into subscriber failures during event processing in publish_and_wait.
Suggested fix
pub async fn publish_and_wait(&self, event: WorkflowEvent) {
let handles = self.spawn_subscriber_tasks(event).await;
for handle in handles {
- let _ = handle.await;
+ if let Err(join_err) = handle.await {
+ warn!(
+ is_panic = join_err.is_panic(),
+ error = %join_err,
+ "Event subscriber task failed"
+ );
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let handles = self.spawn_subscriber_tasks(event).await; | |
| for handle in handles { | |
| let _ = handle.await; | |
| } | |
| let handles = self.spawn_subscriber_tasks(event).await; | |
| for handle in handles { | |
| if let Err(join_err) = handle.await { | |
| warn!( | |
| is_panic = join_err.is_panic(), | |
| error = %join_err, | |
| "Event subscriber task failed" | |
| ); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/workflow/src/event.rs` around lines 129 - 132, The code currently
ignores JoinError by doing let _ = handle.await in publish_and_wait after
calling spawn_subscriber_tasks(event); update publish_and_wait to handle each
handle.await result instead of discarding it: inspect the Result from
handle.await, log or propagate JoinError with context (including subscriber/task
id or event info) and ensure failures are surfaced (e.g., collect errors and
return Err or processLogger/error macro); reference the spawn_subscriber_tasks
and the handles iteration where handle.await is used to implement this error
handling.
| // The status is already Cancelled (set by `cancel_workflow`); | ||
| // we just need to fan the event out and exit. Use the | ||
| // event-bus directly to avoid a redundant state write. |
There was a problem hiding this comment.
🟣 Pre-existing: cancel_workflow → finalize now publishes WorkflowCancelled and fires notify_waiters. Then the main loop hits this is_cancelled branch and publishes WorkflowCancelled a second time. This double-publish was already present before the refactor (old cancel_workflow also published the event directly), but now that finalize centralises state + event + notify, the loop's publish is redundant and the comment should reflect that.
Consider removing the publish call here and just doing return Ok(()), since finalize already handled everything.
There was a problem hiding this comment.
Code Review
This pull request refactors the workflow engine to replace adaptive polling with a notification-based completion system using tokio::sync::Notify. It introduces a centralized finalize method to ensure that state updates, event publishing, and waiter notifications remain synchronized across all termination paths. Additionally, the StepDefinition builder API was improved with #[must_use] attributes and more flexible dependency input types. Feedback was provided regarding a redundant event publication in the cancellation logic that should be removed to avoid duplicate events.
| self.event_bus | ||
| .publish(WorkflowEvent::WorkflowCancelled { instance_id }) | ||
| .await; |
There was a problem hiding this comment.
This publish call is redundant. The cancel_workflow method now calls the finalize helper, which already handles updating the state to Cancelled and publishing the WorkflowCancelled event. When the background execution task detects the cancellation via is_cancelled, it should simply exit the loop.
return Ok(());| // Register the completion slot before publishing `WorkflowStarted` | ||
| // so a `wait_for_completion` call that races the spawned task | ||
| // always finds something to subscribe to. `finalize` removes it. | ||
| self.completion_notifiers | ||
| .write() | ||
| .insert(instance_id, Arc::new(Notify::new())); |
There was a problem hiding this comment.
🟡 Nit: If the spawned execute_workflow task returns Err (e.g. a state_store operation fails via ?), finalize is never called and this notifier entry is never removed or fired. Any wait_for_completion caller would then hang until its timeout rather than failing fast.
Consider either (a) adding a drop guard in the spawned task that fires notify_waiters() on all exit paths, or (b) calling finalize with WorkflowOutcome::Failed in the Err branch of the spawned task below (line ~491).
There was a problem hiding this comment.
Clean, well-reasoned refactor. The finalize consolidation, event-driven wait_for_completion, backoff-exhaustion fix, and clock-skew cleanup all look correct.
Comments posted: 1 🟣 Pre-existing, 1 🟡 Nit — no blocking issues.
- 🟣 Pre-existing (
engine.rs:522):WorkflowCancelledevent is published twice on cancel — once byfinalize(viacancel_workflow) and once by the main loop'sis_cancelledhandler. Now thatfinalizecentralises the trio, the loop's publish is redundant. - 🟡 Nit (
engine.rs:464): If the spawnedexecute_workflowtask exits viaErr(state store failure),finalizeis never called and thecompletion_notifiersentry leaks. A drop guard orfinalizecall in the error branch would makewait_for_completionfail fast instead of timing out.
Summary
Three-agent review of
crates/workflow/(reuse / quality / efficiency). Findings collapsed into one cleanup PR — no new features, behaviour-preserving except where flagged below.One correctness-relevant change
wait_for_completionis now event-driven. Was pollingstate_storeon a 100ms→2s adaptive cycle, taking the global state lock and cloning the fullWorkflowState<D>(incl. workflow context) every iteration. Now registers atokio::sync::Notifyper instance instart_workflowand fires it from a newfinalizehelper.wait_for_completionsnapshots the notifier, reads state once (covers "already terminal" race-free), then awaits the notifier under the user's timeout. Removes per-request polling latency (was up to 2s tail) and ~10 deep state clones per worker registration / removal / update.Two bug fixes
cleanup_old_workflowswas keeping future-timestamped workflows forever.signed_duration_since().to_std().unwrap_or_default()returnedDuration::ZEROfor a futureupdated_at(clock skew, manual fiddling), so the age was always 0 and never exceeded the TTL. Now treats it asDuration::MAXso they stay eligible for eviction.execute_step_with_retrywas silently turning backoff exhaustion into infinite retry.backoff.next_backoff().unwrap_or_else(|| Duration::from_secs(1))masked the policy's "stop" signal. Now folds backoff exhaustion into the retry decision and falls through to the existingon_failurehook +StepResult::Failurereturn.Two duplications collapsed
finalizehelper absorbs the three open-coded "update state to terminal + publish workflow event" sites (deadlock branch, post-loop branch,cancel_workflow). Also fires the new completion notifier so the trio (state, event, notifier) stays in lockstep.spawn_subscriber_taskshelper inEventBus.publishandpublish_and_waitdiffered only in whether they awaited the join handles; both now go through one helper.Polish
#[must_use]on every builder method onStepDefinition/WorkflowDefinition, sostep.with_timeout(d);(statement) is flagged.depends_on/depends_on_anynow acceptIntoIterator<Item: AsRef<str>>instead of&[&str]. Existing&["a", "b"]callers updated to["a", "b"](clippyneedless_borrows_for_generic_args).From<&str>andFrom<String>forStepId.seenHashSet seeding loop in the scheduler —newly_ready_from_waitindices come from aHashMapand are unique by construction; dedup is only needed for thepending_check-deriveddeps_ready_indices.What was deliberately NOT done (Tier 2)
These were flagged but scoped out:
state_storebyWorkflowInstanceId(DashMap) — biggest concurrency win for worker bursts but touches every state read site. Separate PR.execute_step_with_retry. Separate PR.execute_workflow(~420-line function) intodrain_completions/find_ready_steps/launch_steps/wait_next_event. Separate PR.wait_for_shutdownpolling →Notify— minor; once per process.app_context.as_ref().ok_or_else(...)(~30 sites in consumer steps) andstart_workflow + wait_for_completionboilerplate (~7 sites injob_queue.rs) — both live outside this crate.Test plan
cargo +nightly fmt --all(silent)cargo clippy --all-targets --all-features -- -D warnings(clean)cargo test— 3388 passed / 0 failed across the workspace.depends_on(_any)accepting more types and gaining#[must_use]on builders. Existing callers inmodel_gateway/src/workflow/updated for the clippy lint.Summary by CodeRabbit
New Features
Bug Fixes
Improvements
#[must_use]) to builder methods for better development experience.