Skip to content

fix: refresh DAG list SSE after file changes#2319

Merged
yohamta0 merged 3 commits into
mainfrom
fix/dag-list-sse-invalidation
Jun 24, 2026
Merged

fix: refresh DAG list SSE after file changes#2319
yohamta0 merged 3 commits into
mainfrom
fix/dag-list-sse-invalidation

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • start the existing app filesystem watcher when SSE routes are configured
  • bridge app file-change events into multiplexed SSE topic wakeups
  • add a regression test for externally edited DAG files refreshing the workflow list topic

Root Cause

The workflow list consumes the multiplexed dagslist SSE topic. When the event store is enabled, that topic is on-demand and only refreshes after explicit wakeups. API mutations and DAG-run events already wake it, but external DAG file writes did not because setupSSERoute disabled the app filesystem stream that detects those changes. The detail/spec page fetched the DAG directly, so it could show the new schedule while the already-open workflow list still displayed the old schedule.

Testing

  • go test ./internal/service/frontend/... -count=1
  • go test ./internal/persis/file/dag -run TestIndexInvalidationOnMutations|TestListUsesIndex -count=1
  • git diff --check

Closes #2316


Summary by cubic

Fixes stale workflow lists by waking the SSE dagslist topic when DAG files change. Starts the app filesystem watcher and bridges its events to multiplexed invalidation; falls back to polling when the watcher isn’t available.

  • Bug Fixes
    • Start AppStreamService during SSE setup; warn and continue if init fails.
    • Bridge app stream events to wake dagslist, DAG, runs, queues, docs, queue items, and reset topics.
    • Keep dagslist polling without the app stream; switch it to on-demand with publish‑on‑wake when the watcher is running.
    • Tests: add SSE regression cases; serialize PowerShell conformance cases to avoid flakiness.

Written for commit 37d642c. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • New Features
    • Improved real-time Server-Sent Events behavior to react to application events (including DAG, queue, document, and reset) and refresh relevant topics automatically.
    • When an app-level SSE stream is available, certain document and DAG list topics now use on-demand refresh with publish-on-wake for more responsive updates.
  • Bug Fixes
    • Fixed cases where dedicated SSE topic behavior could be unintentionally disabled or polled inconsistently depending on app stream availability.
  • Tests
    • Added coverage for DAGs list waking after DAG/config file changes, and for preserving polling behavior without an app stream.

@coderabbitai

coderabbitai Bot commented Jun 23, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

setupSSERoute now creates an sse.AppStreamService and stores it in srv.appStream (previously forced to nil). After building the SSE multiplexer, it starts an invalidation bridge that subscribes to the app stream and wakes the corresponding multiplexed SSE topics on DAG/run/queue/doc/reset events. Topic refresh behavior in registerDedicatedSSEFetchers is now conditional: when appStream is available, document topics and DAGsList use on-demand refresh and publish-on-wake; otherwise they revert to standard polling. Tests validate both scenarios and confirm that writing a DAG file triggers the DAGsList SSE topic.

Changes

App stream → multiplexer invalidation bridge

Layer / File(s) Summary
AppStream initialization and invalidation bridge wiring
internal/service/frontend/server.go
setupSSERoute creates an AppStreamService instead of setting srv.appStream = nil. After the multiplexer is constructed, it calls startAppStreamInvalidationBridge. Three new methods subscribe to the app stream, dispatch per-event-type wakeup calls to the multiplexer (DAG, run, queue, doc, reset), and on reset wake all file-backed topic types.
Conditional topic refresh behavior based on appStream availability
internal/service/frontend/server.go
registerDedicatedSSEFetchers now computes appStreamAvailable := srv.appStream != nil and only applies on-demand refresh and publish-on-wake for document topics (Doc, DocTree) and DAGsList when appStream is available; without appStream, these topics use standard polling.
Test infrastructure and helper updates
internal/service/frontend/server_test.go
Adds os and filepath imports, introduces testAppStream helper to create and cleanup AppStreamService instances, and updates existing SSE fetcher registration tests to initialize Server with appStream.
Unit and integration tests for invalidation behavior
internal/service/frontend/server_test.go
TestRegisterDedicatedSSEFetchersKeepsDAGsListPollingWithoutAppStream verifies that DAGsList topic continues polling without appStream. TestDAGFileChangeWakesDAGsListSSETopic configures a Server with temp directories, registers a TopicTypeDAGsList fetcher, starts an HTTP SSE stream, asserts an initial fetch, writes external-edit.yaml to the DAGs directory, asserts a second fetch occurs, then cancels the context and shuts down appStream and sseMultiplexer.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately summarizes the main objective: refreshing the DAG list SSE after file changes, which directly addresses the linked issue #2316 about stale workflow list displays.
Linked Issues check ✅ Passed The PR directly addresses issue #2316 by starting the app filesystem watcher, bridging file-change events to SSE topic wakeups, and ensuring the dagslist topic refreshes when DAG files are externally modified.
Out of Scope Changes check ✅ Passed All changes are in-scope, focusing on SSE route setup, app stream initialization, invalidation bridge implementation, and related test coverage with no unrelated modifications.
Description check ✅ Passed The PR description covers all required template sections with substantive content and clear context about the bug fix, testing approach, and linked issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/dag-list-sse-invalidation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/service/frontend/server_test.go (1)

252-260: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Add an explicit assertion that SSE bridge prerequisites initialized.

Right after setupSSERoute, assert srv.appStream and srv.sseMultiplexer are non-nil. Otherwise this test can fail later via timeout, hiding the root setup failure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/frontend/server_test.go` around lines 252 - 260, The test
lacks assertions to verify that the SSE bridge prerequisites were properly
initialized after calling setupSSERoute. Add explicit assertions right after the
setupSSERoute call to check that srv.appStream and srv.sseMultiplexer are both
non-nil, so that any setup failure is caught immediately rather than manifesting
as a timeout later in the test execution.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/service/frontend/server.go`:
- Around line 1199-1207: The app stream initialization failure at the
NewAppStreamService call only logs a warning and allows execution to continue
with srv.appStream remaining nil. This causes registerDedicatedSSEFetchers to
later put TopicTypeDAGsList into on-demand mode when eventService is enabled,
but the on-demand mode cannot function without a valid app stream, breaking DAG
list refresh on external edits. Either fail fast by returning the error when
NewAppStreamService fails instead of just logging a warning, or add a nil check
for srv.appStream in registerDedicatedSSEFetchers before switching
TopicTypeDAGsList to on-demand mode, keeping it in polling mode when the app
stream is unavailable.

---

Nitpick comments:
In `@internal/service/frontend/server_test.go`:
- Around line 252-260: The test lacks assertions to verify that the SSE bridge
prerequisites were properly initialized after calling setupSSERoute. Add
explicit assertions right after the setupSSERoute call to check that
srv.appStream and srv.sseMultiplexer are both non-nil, so that any setup failure
is caught immediately rather than manifesting as a timeout later in the test
execution.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c2607268-43cc-44fc-9919-6b062e1d32e6

📥 Commits

Reviewing files that changed from the base of the PR and between 2f08f01 and e9c82db.

📒 Files selected for processing (2)
  • internal/service/frontend/server.go
  • internal/service/frontend/server_test.go

Comment thread internal/service/frontend/server.go
@yohamta0

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jun 23, 2026

Copy link
Copy Markdown
✅ Action performed

Review finished.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yohamta0 yohamta0 merged commit c98ebe4 into main Jun 24, 2026
11 checks passed
@yohamta0 yohamta0 deleted the fix/dag-list-sse-invalidation branch June 24, 2026 00:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: The schedule shown on the workflow page is different from the one on the detail page.

1 participant