From 58816835b09665172fcfac1039c1305521e4475e Mon Sep 17 00:00:00 2001 From: HaiyiMei Date: Wed, 13 May 2026 12:40:36 +0000 Subject: [PATCH] fix: guard addStreamMessage with isStreaming() to avoid stream-completed race After #1141 started calling streamingPrompt.complete() on result message under non-warm mode, there is a brief window where isRunning() still returns true but streamingPrompt.completed is already true. Webhook handlers that called existingRunner.addStreamMessage(...) on the basis of isRunning() alone would throw "Cannot add message to completed stream", silently dropping the user's message after Cyrus had already posted "I've queued up your message as guidance". IAgentRunner.isStreaming?() is documented in agent-runner-types.ts as "Use this to guard calls to addStreamMessage()". ChatSessionHandler.ts already does this. Add it at the four EdgeWorker.ts sites that did not: - handlePromptWithStreamingCheck (the main prompted-webhook entry) - resumeAgentSession (early-return when stream is alive) - issue-update streaming delivery (CYPACK-954 path) - base-branch change notification streaming When the guard fails, callers fall through to query({resume: ...}) and the user's comment is delivered via the normal resume path instead of being lost. Test mock factory for the issue-update path updated to mirror isRunning into isStreaming so the streaming branch is reachable. Repro: send a comment in an active agent session thread; if it arrives in the window between SDK result emission and runner teardown (~tens of ms), Cyrus acknowledges "queued up as guidance" but logs the throw and drops the message. With this fix the comment falls through to resume. --- packages/edge-worker/src/EdgeWorker.ts | 12 ++++++++---- ...EdgeWorker.issue-update-multiple-sessions.test.ts | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/edge-worker/src/EdgeWorker.ts b/packages/edge-worker/src/EdgeWorker.ts index 095364d68..adb40e661 100644 --- a/packages/edge-worker/src/EdgeWorker.ts +++ b/packages/edge-worker/src/EdgeWorker.ts @@ -1512,7 +1512,8 @@ Your base branch \`${branchName}\` has received ${commitCount} new commit(s). Co if ( isRunning && existingRunner?.supportsStreamingInput && - existingRunner.addStreamMessage + existingRunner.addStreamMessage && + existingRunner.isStreaming?.() ) { existingRunner.addStreamMessage(notification); this.logger.debug( @@ -3521,7 +3522,8 @@ ${taskSection}`; if ( isRunning && existingRunner?.supportsStreamingInput && - existingRunner.addStreamMessage + existingRunner.addStreamMessage && + existingRunner.isStreaming?.() ) { existingRunner.addStreamMessage(fullPrompt); delivered = true; @@ -6616,7 +6618,8 @@ ${input.userComment} if ( existingRunner?.isRunning() && existingRunner.supportsStreamingInput && - existingRunner.addStreamMessage + existingRunner.addStreamMessage && + existingRunner.isStreaming?.() ) { log.debug( `Adding prompt to existing stream for ${sessionId} (${logContext})`, @@ -6703,7 +6706,8 @@ ${input.userComment} if ( existingRunner?.isRunning() && existingRunner.supportsStreamingInput && - existingRunner.addStreamMessage + existingRunner.addStreamMessage && + existingRunner.isStreaming?.() ) { let fullPrompt = promptBody; if (attachmentManifest) { diff --git a/packages/edge-worker/test/EdgeWorker.issue-update-multiple-sessions.test.ts b/packages/edge-worker/test/EdgeWorker.issue-update-multiple-sessions.test.ts index cb6d4bcd6..14f3751fb 100644 --- a/packages/edge-worker/test/EdgeWorker.issue-update-multiple-sessions.test.ts +++ b/packages/edge-worker/test/EdgeWorker.issue-update-multiple-sessions.test.ts @@ -83,6 +83,7 @@ describe("EdgeWorker - Issue Update Session Delivery (CYPACK-954)", () => { agentRunner: hasRunner ? { isRunning: vi.fn().mockReturnValue(isRunning), + isStreaming: vi.fn().mockReturnValue(isRunning), supportsStreamingInput: supportsStreaming, addStreamMessage: vi.fn(), stop: vi.fn(),