fix: forward task_started / task_notification between turns via a single-consumer session reader#713
Conversation
6eab829 to
43b7e08
Compare
44df1c5 to
bb8dae9
Compare
f6cb5e9 to
4faab61
Compare
|
Rebased onto main (picked up the #680 force-cancel backstop) and folded a review pass into the #336 commit. Two of the findings were real correctness regressions that the single-consumer model introduced, so flagging them here for visibility: 1. A non-process-death iterator error bricked the session permanently. The reader is the sole consumer of 2. A force-cancel racing an iterator throw was reported as an internal error. When the Also folded in: route the reader's lifecycle check through Added two regression tests for the bugs above; both verified to fail without their fix. One scope note on "Fixes #336": this resolves the between-turns desync, which is the core complaint, but it does not change the long-running-bg-task behavior @dmeehan1968 hit. We close the turn on |
Possible improvements after this mergesGrouping by what each one depends on, since that decides which are follow-ups we own and which are blocked on the SDK or the ACP protocol.
Wrapper-only (follow-ups we can do without waiting on anyone)1. Unify the render/usage path between 2. Recover the reader instead of tearing the session down. The brick fix in this PR tears the session down when the reader dies on an iterator error. That's correct and safe, but for a transient stream-protocol hiccup (e.g. 3. Cap the off-turn buffer per group, not per message. Blocked by the SDK4. Live streaming of followups. This PR forwards a followup as a block when its closing 5. Unblock the turn for long-running background tasks (the symptom @dmeehan1968 hit). We close the user turn on Blocked by the ACP protocol6. A dedicated session-update variant for task output. Lifecycle and followups currently piggyback on plain-text SummaryOf these, only 1-3 are ours to schedule; 1 is the highest value since it also fixes the remaining marker divergence and collapses the duplication the #757 rebase widened. 4-6 aren't pending work on our side, they're external dependencies: the actionable part is following the RFD and, once the SDK adds |
|
@josevalim @SteffenDE @benbrandt can you take a look? |
…nFollowupCollector Building blocks for the single-consumer session reader (issue agentclientprotocol#336), in a standalone module so each unit is testable in isolation: - TurnQueue: single-producer / single-consumer async queue. The reader is the sole producer; prompt() is the sole consumer per turn. close() and error() propagate to the consumer; clear() drops leftovers for an abandoned turn; take() guards against concurrent consumers. - classifyOffTurn: pure classification of an off-turn message into a task-lifecycle event (emit immediately) or a followup candidate. - OffTurnFollowupCollector: state machine that buffers off-turn followup candidates until the closing result/idle, then forwards them (for origin.kind === "task-notification") or discards them as aftermath. Bounded by a 256-entry cap.
4faab61 to
c2478a8
Compare
…der (agentclientprotocol#336) Background tasks launched with run_in_background=true emit task_started / task_notification system messages between turns. Previously they sat in the SDK's buffer until the next user prompt, where prompt()'s switch discarded them as a no-op — the agent appeared unresponsive to background-task completion and then acted on a stale prompt. A per-session reader (#runSessionReader) is now the sole consumer of session.query.next(). It forwards lifecycle events to the client immediately (in-turn or between turns), hands in-turn messages to the active prompt() via a TurnQueue, and forwards autonomous task-notification followups out-of-turn. prompt() consumes the queue instead of touching the iterator, so there is only ever one consumer of the AsyncGenerator. Key pieces: - #runSessionReader: single linear loop; lifecycle/raw/followup emits are scheduled on a detached, never-awaited readerSideEffects chain so a slow ACP client can't stall SDK consumption or block the next prompt. - #emitTaskLifecycleUpdate: renders task_started (SDK description) and task_notification (real SDK status, no fallback) as an agent_message_chunk routed to the reader's bound sessionId. - #emitFollowup + computeFollowupUsageUpdate: forward a followup's content via the same notification helpers prompt() uses, plus a usage_update derived from the followup's own snapshots, without touching accumulatedUsage / stopReason / pending prompts. - prompt() consumes turnQueue.take(); raw-SDK emit moved entirely to the reader (no double-emit); the error/cancel finally clears the queue so an abandoned turn's buffered messages don't leak into the next one. - teardownSession and process-died recovery abort, await readerDone, then drain readerSideEffects (bounded by a 2s timeout) before deleting the session, so queued emits finish first and a wedged client can't hang teardown. Tests: session-reader describe block covers lifecycle (description vs summary, real status, skip_transcript), raw-SDK emit (off-turn, no in-turn double-emit, FIFO order), followup forwarding (out-of-turn, no usage contamination, no authRequired from a stale login result, lifecycle mid-followup, real #emitFollowup end-to-end), aftermath discard, process-died propagation, teardown drain, and the single-consumer invariant. computeFollowupUsageUpdate has unit coverage for buffered snapshot, result.usage fallback, subagent-only fallback, and inferred context window. acp-agent-settings mocks gain next/close stubs so the reader parks cleanly.
c2478a8 to
1ec26b7
Compare
Fixes #336.
Summary
When Claude Code launches background tasks via the
Tasktool withrun_in_background=true,task_notification(andtask_started) system messages arriving between turns sit in the SDK's internal buffer until the user sends the next message — at which point the wrapper'sprompt()switch hits the// Todo: process via status apino-op and silently discards them. The user observes this as "the agent doesn't react to my background task completing — I have to type something to wake it, and then it acts on a stale prompt".The wrapper now runs a single-consumer session reader per session that is the sole consumer of
session.query. It forwardstask_started/task_notificationto the ACP client the moment they arrive (in-turn or between turns), hands in-turn messages to the activeprompt()via a queue, and forwards autonomoustask-notificationfollowups out-of-turn.prompt()no longer touches the SDK iterator directly.Architecture
A new module
src/session-reader.tsholds three unit-testable building blocks:TurnQueue— single-producer / single-consumer async queue. The reader is the only producer;prompt()is the only consumer per turn.close()/error()propagate to the consumer;clear()drops leftovers for an abandoned turn.classifyOffTurn— pure classifier: a message is eitherlifecycle(emit immediately) or afollowup-candidate(feed to the collector).OffTurnFollowupCollector— small state machine for messages seen while no turn is active. It buffers followup candidates until the closingresult/idle, then forwards them (whenresult.origin.kind === "task-notification") or discards them as aftermath. Bounded by a 256-entry cap.In
src/acp-agent.ts:#runSessionReader— the only caller ofsession.query.next(). Per message: optionally schedule a raw-SDK emit; if lifecycle, emit it; else if a turn is running, push toturnQueue; else feedoffTurn. Exits cleanly on abort / iterator close / iterator throw, closing the queue so any consumer is unblocked.#enqueueReaderSideEffect— serializes the reader's client-facing emits (raw SDK messages, lifecycle, followup) onto a per-session promise chain the reader never awaits, so a slow ACP client can't stall SDK consumption or block the next prompt.#emitTaskLifecycleUpdate— renderstask_started(using the SDK'sdescription) andtask_notification(using the SDK's realstatus) as anagent_message_chunk, routed to the reader's boundsessionId.#emitFollowup/computeFollowupUsageUpdate— forward an autonomous followup's content using the same notification helpersprompt()uses, plus ausage_updatederived from the followup's own snapshots, without ever touchingaccumulatedUsage,stopReason, or the user's pending prompts.prompt()consumessession.turnQueue.take()instead ofsession.query.next(). Raw-SDK emit moved entirely to the reader (no double-emit). Error/cancel paths clear the queue.Behaviour
task_started/task_notificationproduce a short bracketedagent_message_chunk—[task <id>] started: <description>or[task <id>] <status>: <summary>plusoutput: <path>when present — whether the event arrives during a turn or between turns.skip_transcript: truelifecycle messages produce no client-visible update.task-notificationfollowups (an SDK-driven mini-turn that runs between user turns) are forwarded to the client out-of-turn: their assistant/tool content via the normal notification helpers, plus ausage_updatecarrying the followup's cost and_claude/originmeta. They never contaminate the next user turn's usage or stop reason.Lifecycle & robustness
session.query.next(), eliminating the concurrent-next()-on-an-AsyncGenerator hazard.teardownSessionand the process-died recoveryabort(), then wait for the reader and drain its detachedreaderSideEffectschain (both bounded by a 2s timeout, via#drainReader) before deleting the session — so queued emits finish before the session disappears and a wedged SDK iterator (issue session/cancel doesn't abort in-flight TaskOutput block, pending session/prompt never resolves #680) can't hang teardown on an unboundedawait readerDone.TurnQueueleft in a terminal state (iterator threw a non-process-death error, or returned done mid-session) means the reader is gone and can never feed the session again.prompt()detects this (turnQueue.isTerminal()) and tears the session down so the client starts fresh, rather than every future prompt failing on the latched error.prompt()clears theturnQueueon both error and cancel, so messages the reader buffered for an abandoned turn don't leak into the next one. A force-cancel that races an iterator throw (the rejection winning thetake()vs. wake-up race) still returnsstopReason: "cancelled"rather than surfacing the trailing error.cancel()now keeps a separateinterruptedflag while a turn is winding down, so a queued follow-up prompt can resetcancelledwithout turning the interrupted turn's lateis_errorresult into an Internal error or cancelling the queued prompt._claude/sdkMessageemits are FIFO among themselves but are explicitly not ordered relative to the derivedsession/updatestream (the reader doesn't block on the client). Documented at the emit site.Tests
npm run test:run: 419 passed | 15 skipped.src/tests/session-reader.test.ts— unit tests forTurnQueue(push/take/close/error/clear, FIFO, concurrent-take guard),classifyOffTurn(every subtype),OffTurnFollowupCollector(all transitions, cap, emit-error swallowing).src/tests/acp-agent.test.ts→describe("session reader (issue #336)")— lifecycle (descriptionnotsummary; realstatus;skip_transcript), raw-SDK emit (off-turn, no in-turn double-emit, FIFO order), followup forwarding (out-of-turn, no usage contamination, noauthRequiredfrom a stalePlease run /login, lifecycle-mid-followup, real#emitFollowupend-to-end render), aftermath (orphan+idle, non-followup result, cancellation aftermath discarded), process-died propagation, teardown awaits reader + drains side-effects, single-consumer invariant.computeFollowupUsageUpdateunit cases: buffered snapshot, fallback toresult.usage, subagent-only fallback, model with no matchingmodelUsagekey → inferred context window.cancelled; an interrupt-and-send lateis_errorresult is treated as cancellation after a queued prompt resetscancelled; a followup's assembled text is forwarded when nothing streamed and deduped when it did (the fix: Forward unstreamed assistant text blocks #757 alignment for the off-turn path).Verification
Out of scope (deliberately not in this PR)
Two product enhancements were considered and intentionally left out:
Live streaming of autonomous followups. Today a followup's content is buffered and forwarded as a block when its closing
resultarrives. Streaming it token-by-token is blocked by the SDK contract, not by effort: only theresultmessage carriesorigin; theassistant/stream_eventcontent messages do not. To stream we'd have to emit content before knowing whether the group is a real followup or the aftermath of a cancelled/errored turn — and emitting aftermath optimistically reintroduces exactly the contamination this PR fixes, with no way to retract it over ACP. A safe implementation needs either (a) the SDK tagging followup content messages withorigin(not just theresult), or (b) a retractable preview channel in the ACP protocol. Neither exists today.A dedicated session-update variant /
_metamarker for background-task output. Followup and lifecycle updates currently piggyback onagent_message_chunk(the followup'susage_updatealready carries_claude/origin). A richer scheme — tagging every followup chunk with_metaor a dedicatedtask_*session-update so clients can render background output distinctly — is additive and backward-compatible, but only produces visible effect once an ACP client consumes it. Left for a follow-up coordinated with client-side rendering rather than shipped speculatively here.