From 3fce5ceb7f0018088003825330b5ee03dc90bfa7 Mon Sep 17 00:00:00 2001 From: Jorge Raad Date: Wed, 13 May 2026 20:24:35 -0400 Subject: [PATCH 01/11] fix(offSecAgent): emit synthetic tool-result on stream abort/error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `consume()` is iterating the fullStream and the iterator throws (AbortSignal fired, idle-stream timeout, transport error), any tool whose `tool-call` was emitted but whose `tool-result` had not yet streamed was silently dropped — downstream consumers (TUI, persistence, external bus subscribers) were left with hung "running" entries. Track in-flight tool calls inside `consume()`. On error/abort emit synthetic `tool-result` bus events using the SDK's `error-text` output variant — the same shape the SDK itself produces when a tool's `execute()` throws — and append matching `tool-result` content parts to `messagesPath` so resumed sessions see consistent state. Hoist `latestMessages` and `messagesPath` from the constructor closure into class-owned refs so `consume()` can mutate them when emitting synthetic results. Closes #778 --- .../offensiveSecurityAgent.test.ts | 120 ++++++++++++++++++ .../offSecAgent/offensiveSecurityAgent.ts | 117 ++++++++++++++++- 2 files changed, 230 insertions(+), 7 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 7b134799c..d258654df 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -98,6 +98,16 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "resolveResult", { value: overrides.resolveResult, }); + // New fields used by emitSyntheticToolResults — initialized to safe defaults + // so the synthetic-result path is a no-op in tests that don't exercise it. + Object.defineProperty(agent, "latestMessagesRef", { + value: { current: null }, + writable: true, + }); + Object.defineProperty(agent, "messagesPath", { + value: null, + writable: true, + }); return agent; } @@ -234,6 +244,116 @@ describe("OffensiveSecurityAgent.consume()", () => { }); }); + describe("synthetic tool-result on stream abort/error", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + const otherToolCallChunk = { + type: "tool-call", + toolCallId: "tc2", + toolName: "read_file", + }; + const toolResultChunk = { + type: "tool-result", + toolCallId: "tc1", + result: { type: "text", value: "done" }, + }; + + it("emits synthetic tool-result for an in-flight tool when stream throws", async () => { + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new Error("connection reset"), + ), + }); + + const emittedResults: Array<{ + toolCallId: string; + result: unknown; + }> = []; + agent.eventBus.on("tool-result", (e) => { + emittedResults.push({ toolCallId: e.toolCallId, result: e.result }); + }); + + await expect(agent.consume()).rejects.toThrow("connection reset"); + + expect(emittedResults).toHaveLength(1); + expect(emittedResults[0].toolCallId).toBe("tc1"); + expect(emittedResults[0].result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("connection reset"), + }); + }); + + it("does not emit a synthetic result when the matching tool-result already streamed", async () => { + const agent = buildStubAgent({ + fullStream: yieldChunks([toolCallChunk, toolResultChunk]), + }); + + const emittedResults: unknown[] = []; + agent.eventBus.on("tool-result", (e) => emittedResults.push(e)); + + await agent.consume(); + + expect(emittedResults).toHaveLength(0); + }); + + it("emits one synthetic per in-flight tool when multiple are open at abort", async () => { + const controller = new AbortController(); + controller.abort(); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk, otherToolCallChunk], + new DOMException("Aborted", "AbortError"), + ), + abortSignal: controller.signal, + }); + + const emittedResults: Array<{ toolCallId: string; result: unknown }> = []; + agent.eventBus.on("tool-result", (e) => { + emittedResults.push({ toolCallId: e.toolCallId, result: e.result }); + }); + + await expect(agent.consume()).rejects.toBeInstanceOf(DOMException); + + expect(emittedResults).toHaveLength(2); + const ids = emittedResults.map((r) => r.toolCallId).sort(); + expect(ids).toEqual(["tc1", "tc2"]); + for (const r of emittedResults) { + expect(r.result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("aborted"), + }); + } + }); + + it("uses 'Agent aborted by user' as reason when abortSignal is set", async () => { + const controller = new AbortController(); + controller.abort(); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new DOMException("Aborted", "AbortError"), + ), + abortSignal: controller.signal, + }); + + const emittedResults: Array<{ result: unknown }> = []; + agent.eventBus.on("tool-result", (e) => emittedResults.push({ result: e.result })); + + await expect(agent.consume()).rejects.toBeInstanceOf(DOMException); + + expect(emittedResults[0].result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("aborted by user"), + }); + }); + }); + describe("resolveResult", () => { it("returns resolved value when resolveResult is provided", async () => { const agent = buildStubAgent({ diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 715bce3c6..d9a811b6a 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -117,6 +117,18 @@ export class OffensiveSecurityAgent { /** The session this agent is operating within. */ private readonly _session: SessionInfo; + /** + * Shared ref to the cumulative message history. Updated by `onStepFinish`, + * read by `onFinish` and by `consume()` so synthetic abort tool-results + * can be appended in-place before the final `messagesPath` write. + */ + private readonly latestMessagesRef: { current: ModelMessage[] | null } = { + current: null, + }; + + /** Path to the on-disk `messages.json` for this agent's session. */ + private messagesPath: string | null = null; + /** * Async factory that creates a session when one is not provided, * then constructs the agent. Use this instead of `new` when you @@ -328,7 +340,8 @@ export class OffensiveSecurityAgent { if (!existsSync(messagesDir)) { mkdirSync(messagesDir, { recursive: true }); } - const messagesPath = join(messagesDir, "messages.json"); + this.messagesPath = join(messagesDir, "messages.json"); + const messagesPath = this.messagesPath; // Mutable so that summarization can clear stale history. const initialMessagesRef: { current: ModelMessage[] } = { @@ -346,15 +359,15 @@ export class OffensiveSecurityAgent { // JSON.stringify on every step when many agents run concurrently. const PERSIST_INTERVAL_MS = 15_000; let persistTimer: ReturnType | null = null; - let latestMessages: ModelMessage[] | null = null; + const latestMessagesRef = this.latestMessagesRef; const schedulePersist = () => { if (persistTimer) return; persistTimer = setTimeout(() => { persistTimer = null; - if (latestMessages) { - const toWrite = latestMessages; - latestMessages = null; + if (latestMessagesRef.current) { + const toWrite = latestMessagesRef.current; + latestMessagesRef.current = null; writeFile(messagesPath, JSON.stringify(toWrite)).catch(() => {}); } }, PERSIST_INTERVAL_MS); @@ -400,7 +413,7 @@ export class OffensiveSecurityAgent { toolChoice: "auto", sessionPath: input.session.rootPath, onStepFinish: async (event) => { - latestMessages = [ + latestMessagesRef.current = [ ...initialMessagesRef.current, ...event.response.messages, ]; @@ -429,10 +442,11 @@ export class OffensiveSecurityAgent { clearTimeout(persistTimer); persistTimer = null; } - const finalMessages = latestMessages ?? [ + const finalMessages = latestMessagesRef.current ?? [ ...initialMessagesRef.current, ...event.response.messages, ]; + latestMessagesRef.current = finalMessages; await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( () => {}, ); @@ -497,14 +511,43 @@ export class OffensiveSecurityAgent { const sid = this.subagentId; const bus = this.eventBus; + // Track tool calls that have been announced (`tool-call`) but whose + // matching `tool-result` has not yet streamed. On abort / iterator + // throw we synthesize an `error-text` result for each so downstream + // consumers (UI, persistence, MQTT) see a terminal state per call + // rather than a hung "running" entry. + const inFlightTools = new Map(); + let streamError: unknown = null; + try { for await (const chunk of this.streamResult.fullStream) { + if (chunk.type === "tool-call") { + const tc = chunk as { toolCallId: string; toolName: string }; + inFlightTools.set(tc.toolCallId, { toolName: tc.toolName }); + } else if (chunk.type === "tool-result") { + const tr = chunk as { toolCallId: string }; + inFlightTools.delete(tr.toolCallId); + } bus.emitStreamPart(chunk, sid); } + } catch (err) { + streamError = err; } finally { + if (inFlightTools.size > 0) { + const reason = this.abortSignal?.aborted + ? "Agent aborted by user" + : streamError instanceof Error && streamError.message + ? streamError.message + : "Stream terminated unexpectedly"; + await this.emitSyntheticToolResults(inFlightTools, reason); + } this.persistentShell?.dispose(); } + if (streamError) { + throw streamError; + } + if (this.abortSignal?.aborted) { throw new DOMException("Agent aborted by user", "AbortError"); } @@ -522,6 +565,66 @@ export class OffensiveSecurityAgent { get response() { return this.streamResult.response; } + + /** + * Emit synthetic `tool-result` events for tool calls that were announced + * mid-stream but never received a real `tool-result` chunk (typically + * because the stream was aborted or threw). Also appends matching + * `tool-result` content parts to `latestMessagesRef` and writes the + * updated history to `messagesPath` so resumed sessions and on-disk + * replay see the same terminal state. + * + * Uses the SDK's `error-text` output variant — the same shape the SDK + * itself produces when a tool's `execute()` throws — so all downstream + * consumers can handle aborts identically to runtime exceptions. + */ + private async emitSyntheticToolResults( + inFlightTools: Map, + reason: string, + ): Promise { + const bus = this.eventBus; + const sid = this.subagentId; + + const syntheticParts: Array<{ + type: "tool-result"; + toolCallId: string; + toolName: string; + output: { type: "error-text"; value: string }; + }> = []; + + for (const [toolCallId, { toolName }] of inFlightTools) { + const output = { + type: "error-text" as const, + value: `Tool execution aborted: ${reason}`, + }; + bus.emit("tool-result", { + toolCallId, + toolName, + result: output, + subagentId: sid, + }); + syntheticParts.push({ + type: "tool-result", + toolCallId, + toolName, + output, + }); + } + + if (syntheticParts.length === 0 || !this.messagesPath) return; + + const current = this.latestMessagesRef.current ?? []; + const message: ModelMessage = { + role: "tool", + // biome-ignore lint/suspicious/noExplicitAny: `tool-result` content parts + // use the SDK's ToolResultOutput discriminated union; the cast bypasses + // the broader ModelMessage shape inference. + content: syntheticParts as any, + }; + const next = [...current, message]; + this.latestMessagesRef.current = next; + await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); + } } // These tools pause the agent and surface their own UI to the operator. From a694a8eae6ccfe5ab5ca52f250100c0c686c7f03 Mon Sep 17 00:00:00 2001 From: Jorge Raad Date: Wed, 13 May 2026 20:37:12 -0400 Subject: [PATCH 02/11] refactor: simplify synthetic tool-result emission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop unnecessary type casts in consume() — TextStreamPart already narrows tool-call/tool-result chunks. - Collapse inFlightTools value from { toolName } object to bare string. - Remove dead latestMessagesRef.current reassignment in onFinish (only read by the abort/error path, which is mutually exclusive with onFinish). - Drop unused writable:true on test-only defineProperty calls. - Move inFlightTools.size guard into emitSyntheticToolResults for locality. --- .../offensiveSecurityAgent.test.ts | 10 +++----- .../offSecAgent/offensiveSecurityAgent.ts | 23 +++++++++---------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index d258654df..da9654a88 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -98,16 +98,12 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "resolveResult", { value: overrides.resolveResult, }); - // New fields used by emitSyntheticToolResults — initialized to safe defaults - // so the synthetic-result path is a no-op in tests that don't exercise it. + // Fields read by emitSyntheticToolResults — null `messagesPath` makes the + // disk-write branch a no-op for tests that don't supply a real path. Object.defineProperty(agent, "latestMessagesRef", { value: { current: null }, - writable: true, - }); - Object.defineProperty(agent, "messagesPath", { - value: null, - writable: true, }); + Object.defineProperty(agent, "messagesPath", { value: null }); return agent; } diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index d9a811b6a..1df6d41d3 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -118,9 +118,9 @@ export class OffensiveSecurityAgent { private readonly _session: SessionInfo; /** - * Shared ref to the cumulative message history. Updated by `onStepFinish`, - * read by `onFinish` and by `consume()` so synthetic abort tool-results - * can be appended in-place before the final `messagesPath` write. + * Cumulative message history shared between the stream callbacks and + * `consume()`. Held as a ref so `consume()` can append synthetic + * tool-results in-place when the stream aborts mid-flight. */ private readonly latestMessagesRef: { current: ModelMessage[] | null } = { current: null, @@ -446,7 +446,6 @@ export class OffensiveSecurityAgent { ...initialMessagesRef.current, ...event.response.messages, ]; - latestMessagesRef.current = finalMessages; await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( () => {}, ); @@ -516,17 +515,15 @@ export class OffensiveSecurityAgent { // throw we synthesize an `error-text` result for each so downstream // consumers (UI, persistence, MQTT) see a terminal state per call // rather than a hung "running" entry. - const inFlightTools = new Map(); + const inFlightTools = new Map(); // toolCallId -> toolName let streamError: unknown = null; try { for await (const chunk of this.streamResult.fullStream) { if (chunk.type === "tool-call") { - const tc = chunk as { toolCallId: string; toolName: string }; - inFlightTools.set(tc.toolCallId, { toolName: tc.toolName }); + inFlightTools.set(chunk.toolCallId, chunk.toolName); } else if (chunk.type === "tool-result") { - const tr = chunk as { toolCallId: string }; - inFlightTools.delete(tr.toolCallId); + inFlightTools.delete(chunk.toolCallId); } bus.emitStreamPart(chunk, sid); } @@ -579,9 +576,11 @@ export class OffensiveSecurityAgent { * consumers can handle aborts identically to runtime exceptions. */ private async emitSyntheticToolResults( - inFlightTools: Map, + inFlightTools: Map, reason: string, ): Promise { + if (inFlightTools.size === 0) return; + const bus = this.eventBus; const sid = this.subagentId; @@ -592,7 +591,7 @@ export class OffensiveSecurityAgent { output: { type: "error-text"; value: string }; }> = []; - for (const [toolCallId, { toolName }] of inFlightTools) { + for (const [toolCallId, toolName] of inFlightTools) { const output = { type: "error-text" as const, value: `Tool execution aborted: ${reason}`, @@ -611,7 +610,7 @@ export class OffensiveSecurityAgent { }); } - if (syntheticParts.length === 0 || !this.messagesPath) return; + if (!this.messagesPath) return; const current = this.latestMessagesRef.current ?? []; const message: ModelMessage = { From cad3a53b004e13f159f6285a58d1a3c84f846bc2 Mon Sep 17 00:00:00 2001 From: Jorge Raad Date: Wed, 13 May 2026 20:50:53 -0400 Subject: [PATCH 03/11] fix(lint): use ToolResultPart type, format test file, trim comments --- .../offensiveSecurityAgent.test.ts | 6 +-- .../offSecAgent/offensiveSecurityAgent.ts | 52 ++++--------------- 2 files changed, 13 insertions(+), 45 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index da9654a88..3e689937f 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -98,8 +98,6 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "resolveResult", { value: overrides.resolveResult, }); - // Fields read by emitSyntheticToolResults — null `messagesPath` makes the - // disk-write branch a no-op for tests that don't supply a real path. Object.defineProperty(agent, "latestMessagesRef", { value: { current: null }, }); @@ -339,7 +337,9 @@ describe("OffensiveSecurityAgent.consume()", () => { }); const emittedResults: Array<{ result: unknown }> = []; - agent.eventBus.on("tool-result", (e) => emittedResults.push({ result: e.result })); + agent.eventBus.on("tool-result", (e) => + emittedResults.push({ result: e.result }), + ); await expect(agent.consume()).rejects.toBeInstanceOf(DOMException); diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 1df6d41d3..8a90590c5 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -3,6 +3,7 @@ import type { StopCondition, StreamTextResult, TextStreamPart, + ToolResultPart, ToolSet, } from "ai"; import { hasToolCall } from "ai"; @@ -117,16 +118,11 @@ export class OffensiveSecurityAgent { /** The session this agent is operating within. */ private readonly _session: SessionInfo; - /** - * Cumulative message history shared between the stream callbacks and - * `consume()`. Held as a ref so `consume()` can append synthetic - * tool-results in-place when the stream aborts mid-flight. - */ + /** Shared with stream callbacks so `consume()` can append synthetic tool-results on abort. */ private readonly latestMessagesRef: { current: ModelMessage[] | null } = { current: null, }; - /** Path to the on-disk `messages.json` for this agent's session. */ private messagesPath: string | null = null; /** @@ -510,12 +506,8 @@ export class OffensiveSecurityAgent { const sid = this.subagentId; const bus = this.eventBus; - // Track tool calls that have been announced (`tool-call`) but whose - // matching `tool-result` has not yet streamed. On abort / iterator - // throw we synthesize an `error-text` result for each so downstream - // consumers (UI, persistence, MQTT) see a terminal state per call - // rather than a hung "running" entry. - const inFlightTools = new Map(); // toolCallId -> toolName + // toolCallId -> toolName for calls awaiting a matching tool-result. + const inFlightTools = new Map(); let streamError: unknown = null; try { @@ -563,40 +555,21 @@ export class OffensiveSecurityAgent { return this.streamResult.response; } - /** - * Emit synthetic `tool-result` events for tool calls that were announced - * mid-stream but never received a real `tool-result` chunk (typically - * because the stream was aborted or threw). Also appends matching - * `tool-result` content parts to `latestMessagesRef` and writes the - * updated history to `messagesPath` so resumed sessions and on-disk - * replay see the same terminal state. - * - * Uses the SDK's `error-text` output variant — the same shape the SDK - * itself produces when a tool's `execute()` throws — so all downstream - * consumers can handle aborts identically to runtime exceptions. - */ private async emitSyntheticToolResults( inFlightTools: Map, reason: string, ): Promise { if (inFlightTools.size === 0) return; - const bus = this.eventBus; const sid = this.subagentId; - - const syntheticParts: Array<{ - type: "tool-result"; - toolCallId: string; - toolName: string; - output: { type: "error-text"; value: string }; - }> = []; + const syntheticParts: ToolResultPart[] = []; for (const [toolCallId, toolName] of inFlightTools) { const output = { type: "error-text" as const, value: `Tool execution aborted: ${reason}`, }; - bus.emit("tool-result", { + this.eventBus.emit("tool-result", { toolCallId, toolName, result: output, @@ -612,15 +585,10 @@ export class OffensiveSecurityAgent { if (!this.messagesPath) return; - const current = this.latestMessagesRef.current ?? []; - const message: ModelMessage = { - role: "tool", - // biome-ignore lint/suspicious/noExplicitAny: `tool-result` content parts - // use the SDK's ToolResultOutput discriminated union; the cast bypasses - // the broader ModelMessage shape inference. - content: syntheticParts as any, - }; - const next = [...current, message]; + const next: ModelMessage[] = [ + ...(this.latestMessagesRef.current ?? []), + { role: "tool", content: syntheticParts }, + ]; this.latestMessagesRef.current = next; await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); } From 8b93e8d3d21d425d7c6e1941b8a4fcb654f32c1a Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 28 May 2026 15:34:48 +0000 Subject: [PATCH 04/11] =?UTF-8?q?fix(offSecAgent):=20address=20bugbot=20re?= =?UTF-8?q?view=20=E2=80=94=20prevent=20history=20loss=20and=20error=20mas?= =?UTF-8?q?king?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Persist timer fallback: when latestMessagesRef is null (debounced persist timer already flushed to disk), read back from messages.json before appending synthetic tool-results. This prevents overwriting the full conversation history with only synthetic parts. 2. Error masking: wrap emitSyntheticToolResults in try/catch within the finally block so a throwing event listener cannot mask the original stream error. 3. Use node: protocol for fs/path imports (lint fix). Co-authored-by: Jorge Alejandro Raad --- .../offensiveSecurityAgent.test.ts | 128 +++++++++++++++++- .../offSecAgent/offensiveSecurityAgent.ts | 26 +++- 2 files changed, 147 insertions(+), 7 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 3e689937f..0af78bb2c 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -7,6 +7,8 @@ * Heavy transitive dependencies (tools, AI SDK, zod) are stubbed so * the test loads cleanly without external provider keys. */ +import { mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; import { describe, expect, it, vi } from "vitest"; // --------------------------------------------------------------------------- @@ -76,6 +78,8 @@ function buildStubAgent(overrides: { persistentShell?: { dispose: () => void }; abortSignal?: AbortSignal; resolveResult?: (sr: unknown) => unknown; + messagesPath?: string | null; + latestMessagesRef?: { current: unknown[] | null }; }): OffensiveSecurityAgent { const agent = Object.create( OffensiveSecurityAgent.prototype, @@ -99,9 +103,11 @@ function buildStubAgent(overrides: { value: overrides.resolveResult, }); Object.defineProperty(agent, "latestMessagesRef", { - value: { current: null }, + value: overrides.latestMessagesRef ?? { current: null }, + }); + Object.defineProperty(agent, "messagesPath", { + value: overrides.messagesPath ?? null, }); - Object.defineProperty(agent, "messagesPath", { value: null }); return agent; } @@ -370,4 +376,122 @@ describe("OffensiveSecurityAgent.consume()", () => { expect(result).toBeUndefined(); }); }); + + describe("synthetic tool-result: persist timer fallback (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("reads persisted messages from disk when latestMessagesRef is null", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-fallback`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "run nmap" }] }, + { role: "assistant", content: [{ type: "text", text: "Running..." }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + // latestMessagesRef.current is null — simulates persist timer having flushed + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("timeout")), + messagesPath, + latestMessagesRef: { current: null }, + }); + + try { + await agent.consume(); + } catch {} + + // Wait for async writeFile + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // Should have original 2 messages + synthetic tool message + expect(written).toHaveLength(3); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[2].role).toBe("tool"); + expect(written[2].content[0].toolCallId).toBe("tc1"); + expect(written[2].content[0].output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("does not lose history when persist timer nulled latestMessagesRef", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-noloss`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan target" }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("network")), + messagesPath, + latestMessagesRef: { current: null }, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // Must NOT be just [{ role: "tool" ... }] — must include original messages + expect(written.length).toBeGreaterThan(1); + expect(written[0].role).toBe("user"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + + describe("emitSyntheticToolResults error does not mask stream error (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("propagates original stream error when event listener throws", async () => { + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new Error("original stream error"), + ), + }); + + // Make the event listener throw during synthetic emission + agent.eventBus.on("tool-result", () => { + throw new Error("listener explosion"); + }); + + // The original stream error must propagate, not the listener error + await expect(agent.consume()).rejects.toThrow("original stream error"); + }); + + it("still disposes shell when emitSyntheticToolResults throws", async () => { + const dispose = vi.fn(); + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new Error("stream broke"), + ), + persistentShell: { dispose }, + }); + + agent.eventBus.on("tool-result", () => { + throw new Error("listener crash"); + }); + + await expect(agent.consume()).rejects.toThrow("stream broke"); + expect(dispose).toHaveBeenCalledOnce(); + }); + }); }); diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 8a90590c5..b8a1576de 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -7,9 +7,9 @@ import type { ToolSet, } from "ai"; import { hasToolCall } from "ai"; -import { existsSync, mkdirSync } from "fs"; -import { writeFile } from "fs/promises"; -import { join } from "path"; +import { existsSync, mkdirSync, readFileSync } from "node:fs"; +import { writeFile } from "node:fs/promises"; +import { join } from "node:path"; import { streamResponse } from "../../ai"; import { AgentEventBus } from "../../eventBus"; import { @@ -528,7 +528,11 @@ export class OffensiveSecurityAgent { : streamError instanceof Error && streamError.message ? streamError.message : "Stream terminated unexpectedly"; - await this.emitSyntheticToolResults(inFlightTools, reason); + try { + await this.emitSyntheticToolResults(inFlightTools, reason); + } catch { + // Swallow — never mask the original streamError with a listener error. + } } this.persistentShell?.dispose(); } @@ -585,8 +589,20 @@ export class OffensiveSecurityAgent { if (!this.messagesPath) return; + // latestMessagesRef may be null if the debounced persist timer already + // flushed to disk. Fall back to reading the on-disk snapshot so we don't + // overwrite the full conversation with only the synthetic parts. + let base: ModelMessage[] = this.latestMessagesRef.current ?? []; + if (base.length === 0 && existsSync(this.messagesPath)) { + try { + base = JSON.parse(readFileSync(this.messagesPath, "utf-8")); + } catch { + // Corrupt or unreadable — proceed with empty base. + } + } + const next: ModelMessage[] = [ - ...(this.latestMessagesRef.current ?? []), + ...base, { role: "tool", content: syntheticParts }, ]; this.latestMessagesRef.current = next; From 5d1bc1216b65224d80f43ec5bb42312f9265839b Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 28 May 2026 15:46:49 +0000 Subject: [PATCH 05/11] refactor(offSecAgent): simplify synthetic tool-result implementation - Replace { current: ... } ref-box with plain nullable property - Remove redundant early-return guard (caller already checks) - Hoist output object outside the loop (same value for all tools) - Remove intermediate local alias for the ref Co-authored-by: Jorge Alejandro Raad --- .../offensiveSecurityAgent.test.ts | 17 +++--- .../offSecAgent/offensiveSecurityAgent.ts | 53 ++++++------------- 2 files changed, 25 insertions(+), 45 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 0af78bb2c..ba3f85983 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -79,7 +79,7 @@ function buildStubAgent(overrides: { abortSignal?: AbortSignal; resolveResult?: (sr: unknown) => unknown; messagesPath?: string | null; - latestMessagesRef?: { current: unknown[] | null }; + latestMessages?: unknown[] | null; }): OffensiveSecurityAgent { const agent = Object.create( OffensiveSecurityAgent.prototype, @@ -102,8 +102,9 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "resolveResult", { value: overrides.resolveResult, }); - Object.defineProperty(agent, "latestMessagesRef", { - value: overrides.latestMessagesRef ?? { current: null }, + Object.defineProperty(agent, "latestMessages", { + value: overrides.latestMessages ?? null, + writable: true, }); Object.defineProperty(agent, "messagesPath", { value: overrides.messagesPath ?? null, @@ -384,7 +385,7 @@ describe("OffensiveSecurityAgent.consume()", () => { toolName: "execute_command", }; - it("reads persisted messages from disk when latestMessagesRef is null", async () => { + it("reads persisted messages from disk when latestMessages is null", async () => { const tmpDir = join("/tmp", `pensar-test-${Date.now()}-fallback`); mkdirSync(tmpDir, { recursive: true }); const messagesPath = join(tmpDir, "messages.json"); @@ -395,11 +396,11 @@ describe("OffensiveSecurityAgent.consume()", () => { ]; writeFileSync(messagesPath, JSON.stringify(existingMessages)); - // latestMessagesRef.current is null — simulates persist timer having flushed + // latestMessages is null — simulates persist timer having flushed const agent = buildStubAgent({ fullStream: yieldThenThrow([toolCallChunk], new Error("timeout")), messagesPath, - latestMessagesRef: { current: null }, + latestMessages: null, }); try { @@ -421,7 +422,7 @@ describe("OffensiveSecurityAgent.consume()", () => { rmSync(tmpDir, { recursive: true, force: true }); }); - it("does not lose history when persist timer nulled latestMessagesRef", async () => { + it("does not lose history when persist timer nulled latestMessages", async () => { const tmpDir = join("/tmp", `pensar-test-${Date.now()}-noloss`); mkdirSync(tmpDir, { recursive: true }); const messagesPath = join(tmpDir, "messages.json"); @@ -434,7 +435,7 @@ describe("OffensiveSecurityAgent.consume()", () => { const agent = buildStubAgent({ fullStream: yieldThenThrow([toolCallChunk], new Error("network")), messagesPath, - latestMessagesRef: { current: null }, + latestMessages: null, }); try { diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index b8a1576de..a8ab5130e 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -118,10 +118,8 @@ export class OffensiveSecurityAgent { /** The session this agent is operating within. */ private readonly _session: SessionInfo; - /** Shared with stream callbacks so `consume()` can append synthetic tool-results on abort. */ - private readonly latestMessagesRef: { current: ModelMessage[] | null } = { - current: null, - }; + /** Latest accumulated messages — shared between stream callbacks and `consume()` for abort persistence. */ + private latestMessages: ModelMessage[] | null = null; private messagesPath: string | null = null; @@ -355,15 +353,14 @@ export class OffensiveSecurityAgent { // JSON.stringify on every step when many agents run concurrently. const PERSIST_INTERVAL_MS = 15_000; let persistTimer: ReturnType | null = null; - const latestMessagesRef = this.latestMessagesRef; const schedulePersist = () => { if (persistTimer) return; persistTimer = setTimeout(() => { persistTimer = null; - if (latestMessagesRef.current) { - const toWrite = latestMessagesRef.current; - latestMessagesRef.current = null; + if (this.latestMessages) { + const toWrite = this.latestMessages; + this.latestMessages = null; writeFile(messagesPath, JSON.stringify(toWrite)).catch(() => {}); } }, PERSIST_INTERVAL_MS); @@ -409,7 +406,7 @@ export class OffensiveSecurityAgent { toolChoice: "auto", sessionPath: input.session.rootPath, onStepFinish: async (event) => { - latestMessagesRef.current = [ + this.latestMessages = [ ...initialMessagesRef.current, ...event.response.messages, ]; @@ -438,7 +435,7 @@ export class OffensiveSecurityAgent { clearTimeout(persistTimer); persistTimer = null; } - const finalMessages = latestMessagesRef.current ?? [ + const finalMessages = this.latestMessages ?? [ ...initialMessagesRef.current, ...event.response.messages, ]; @@ -563,49 +560,31 @@ export class OffensiveSecurityAgent { inFlightTools: Map, reason: string, ): Promise { - if (inFlightTools.size === 0) return; - const sid = this.subagentId; + const output = { type: "error-text" as const, value: `Tool execution aborted: ${reason}` }; const syntheticParts: ToolResultPart[] = []; for (const [toolCallId, toolName] of inFlightTools) { - const output = { - type: "error-text" as const, - value: `Tool execution aborted: ${reason}`, - }; - this.eventBus.emit("tool-result", { - toolCallId, - toolName, - result: output, - subagentId: sid, - }); - syntheticParts.push({ - type: "tool-result", - toolCallId, - toolName, - output, - }); + this.eventBus.emit("tool-result", { toolCallId, toolName, result: output, subagentId: sid }); + syntheticParts.push({ type: "tool-result", toolCallId, toolName, output }); } if (!this.messagesPath) return; - // latestMessagesRef may be null if the debounced persist timer already - // flushed to disk. Fall back to reading the on-disk snapshot so we don't + // latestMessages may be null if the debounced persist timer already + // flushed to disk. Fall back to the on-disk snapshot so we don't // overwrite the full conversation with only the synthetic parts. - let base: ModelMessage[] = this.latestMessagesRef.current ?? []; + let base: ModelMessage[] = this.latestMessages ?? []; if (base.length === 0 && existsSync(this.messagesPath)) { try { base = JSON.parse(readFileSync(this.messagesPath, "utf-8")); } catch { - // Corrupt or unreadable — proceed with empty base. + // Best-effort: corrupt file → proceed with empty base. } } - const next: ModelMessage[] = [ - ...base, - { role: "tool", content: syntheticParts }, - ]; - this.latestMessagesRef.current = next; + const next: ModelMessage[] = [...base, { role: "tool", content: syntheticParts }]; + this.latestMessages = next; await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); } } From 42d2c9b3591aaf14a3b9705d14199c246002634d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 28 May 2026 15:51:55 +0000 Subject: [PATCH 06/11] style: fix formatting and import ordering (biome) Co-authored-by: Jorge Alejandro Raad --- .../offensiveSecurityAgent.test.ts | 5 +--- .../offSecAgent/offensiveSecurityAgent.ts | 30 ++++++++++++++----- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index ba3f85983..0bc9a2aee 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -480,10 +480,7 @@ describe("OffensiveSecurityAgent.consume()", () => { it("still disposes shell when emitSyntheticToolResults throws", async () => { const dispose = vi.fn(); const agent = buildStubAgent({ - fullStream: yieldThenThrow( - [toolCallChunk], - new Error("stream broke"), - ), + fullStream: yieldThenThrow([toolCallChunk], new Error("stream broke")), persistentShell: { dispose }, }); diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index a8ab5130e..2c6a17b84 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -1,3 +1,6 @@ +import { existsSync, mkdirSync, readFileSync } from "node:fs"; +import { writeFile } from "node:fs/promises"; +import { join } from "node:path"; import type { ModelMessage, StopCondition, @@ -7,9 +10,6 @@ import type { ToolSet, } from "ai"; import { hasToolCall } from "ai"; -import { existsSync, mkdirSync, readFileSync } from "node:fs"; -import { writeFile } from "node:fs/promises"; -import { join } from "node:path"; import { streamResponse } from "../../ai"; import { AgentEventBus } from "../../eventBus"; import { @@ -561,12 +561,25 @@ export class OffensiveSecurityAgent { reason: string, ): Promise { const sid = this.subagentId; - const output = { type: "error-text" as const, value: `Tool execution aborted: ${reason}` }; + const output = { + type: "error-text" as const, + value: `Tool execution aborted: ${reason}`, + }; const syntheticParts: ToolResultPart[] = []; for (const [toolCallId, toolName] of inFlightTools) { - this.eventBus.emit("tool-result", { toolCallId, toolName, result: output, subagentId: sid }); - syntheticParts.push({ type: "tool-result", toolCallId, toolName, output }); + this.eventBus.emit("tool-result", { + toolCallId, + toolName, + result: output, + subagentId: sid, + }); + syntheticParts.push({ + type: "tool-result", + toolCallId, + toolName, + output, + }); } if (!this.messagesPath) return; @@ -583,7 +596,10 @@ export class OffensiveSecurityAgent { } } - const next: ModelMessage[] = [...base, { role: "tool", content: syntheticParts }]; + const next: ModelMessage[] = [ + ...base, + { role: "tool", content: syntheticParts }, + ]; this.latestMessages = next; await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); } From e665eb3f6562bd8ed3aef9193ddfa4d3ac1763cb Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 1 Jun 2026 13:27:04 +0000 Subject: [PATCH 07/11] fix(offSecAgent): prevent orphan tool-results and persist timer race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Orphan tool-results: when the stream aborts mid-step before onStepFinish fires, the assistant message with tool-calls isn't persisted yet. Now emitSyntheticToolResults checks whether the base messages already contain the in-flight tool-calls; if not, it prepends a partial assistant message so resumed sessions see a valid tool-call → tool-result pair. 2. Persist timer race: cancel the debounced persist timer before writing synthetic results, preventing a fire-and-forget writeFile from racing with (and potentially overwriting) the abort write. Adds cancelPersistTimer instance method and baseContainsToolCalls helper. New test covers the no-duplicate-assistant-message case. Co-authored-by: Kyle Ryan --- .../offensiveSecurityAgent.test.ts | 59 ++++++++++++++++-- .../offSecAgent/offensiveSecurityAgent.ts | 60 +++++++++++++++++-- 2 files changed, 111 insertions(+), 8 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 0bc9a2aee..76a930efd 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -109,6 +109,9 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "messagesPath", { value: overrides.messagesPath ?? null, }); + Object.defineProperty(agent, "cancelPersistTimer", { + value: () => {}, + }); return agent; } @@ -411,13 +414,16 @@ describe("OffensiveSecurityAgent.consume()", () => { await new Promise((r) => setTimeout(r, 50)); const written = JSON.parse(readFileSync(messagesPath, "utf-8")); - // Should have original 2 messages + synthetic tool message - expect(written).toHaveLength(3); + // original 2 + assistant (tool-call) + tool (synthetic result) + expect(written).toHaveLength(4); expect(written[0].role).toBe("user"); expect(written[1].role).toBe("assistant"); - expect(written[2].role).toBe("tool"); + expect(written[2].role).toBe("assistant"); + expect(written[2].content[0].type).toBe("tool-call"); expect(written[2].content[0].toolCallId).toBe("tc1"); - expect(written[2].content[0].output.type).toBe("error-text"); + expect(written[3].role).toBe("tool"); + expect(written[3].content[0].toolCallId).toBe("tc1"); + expect(written[3].content[0].output.type).toBe("error-text"); rmSync(tmpDir, { recursive: true, force: true }); }); @@ -451,6 +457,51 @@ describe("OffensiveSecurityAgent.consume()", () => { rmSync(tmpDir, { recursive: true, force: true }); }); + + it("does not add assistant tool-call msg when base already contains them", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-nodup`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + // Simulate onStepFinish having already persisted the assistant tool-call + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + { + role: "assistant", + content: [ + { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + input: {}, + }, + ], + }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("abort")), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // Should NOT duplicate the assistant message — just user + assistant + tool + expect(written).toHaveLength(3); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[2].role).toBe("tool"); + expect(written[2].content[0].output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); }); describe("emitSyntheticToolResults error does not mask stream error (bugbot fix)", () => { diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 2c6a17b84..341b59bdd 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -6,6 +6,7 @@ import type { StopCondition, StreamTextResult, TextStreamPart, + ToolCallPart, ToolResultPart, ToolSet, } from "ai"; @@ -123,6 +124,9 @@ export class OffensiveSecurityAgent { private messagesPath: string | null = null; + /** Cancels and flushes the debounced persist timer so abort writes don't race. */ + private cancelPersistTimer: (() => void) | null = null; + /** * Async factory that creates a session when one is not provided, * then constructs the agent. Use this instead of `new` when you @@ -366,6 +370,13 @@ export class OffensiveSecurityAgent { }, PERSIST_INTERVAL_MS); }; + this.cancelPersistTimer = () => { + if (persistTimer) { + clearTimeout(persistTimer); + persistTimer = null; + } + }; + // -- Init record (trace.jsonl first line) --------------------------------- // Hash only the base system prompt (excluding session workspace paths) // so the hash is stable across runs with identical prompt versions. @@ -584,6 +595,10 @@ export class OffensiveSecurityAgent { if (!this.messagesPath) return; + // Cancel the debounced persist timer so its fire-and-forget writeFile + // cannot race with the write below. + this.cancelPersistTimer?.(); + // latestMessages may be null if the debounced persist timer already // flushed to disk. Fall back to the on-disk snapshot so we don't // overwrite the full conversation with only the synthetic parts. @@ -596,13 +611,50 @@ export class OffensiveSecurityAgent { } } - const next: ModelMessage[] = [ - ...base, - { role: "tool", content: syntheticParts }, - ]; + // If the last message in base is NOT an assistant message containing + // these tool-calls, the current step's assistant turn hasn't been + // persisted yet (onStepFinish never fired). Reconstruct it so + // resumed sessions see a valid tool-call → tool-result pair. + const lastMsg = base[base.length - 1]; + const needsAssistantMsg = + !lastMsg || + lastMsg.role !== "assistant" || + !this.baseContainsToolCalls(lastMsg, inFlightTools); + + const appended: ModelMessage[] = []; + if (needsAssistantMsg) { + const toolCalls: ToolCallPart[] = [...inFlightTools.entries()].map( + ([toolCallId, toolName]) => ({ + type: "tool-call" as const, + toolCallId, + toolName, + input: {}, + }), + ); + appended.push({ role: "assistant", content: toolCalls }); + } + appended.push({ role: "tool", content: syntheticParts }); + + const next: ModelMessage[] = [...base, ...appended]; this.latestMessages = next; await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); } + + private baseContainsToolCalls( + msg: ModelMessage, + inFlightTools: Map, + ): boolean { + if (!Array.isArray(msg.content)) return false; + const contentToolIds = new Set( + (msg.content as Array<{ type: string; toolCallId?: string }>) + .filter((p) => p.type === "tool-call" && p.toolCallId) + .map((p) => p.toolCallId), + ); + for (const toolCallId of inFlightTools.keys()) { + if (!contentToolIds.has(toolCallId)) return false; + } + return true; + } } // These tools pause the agent and surface their own UI to the operator. From fd0ea0e2a00af2f65c640c119916c67a20a3b269 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 2 Jun 2026 17:53:11 +0000 Subject: [PATCH 08/11] =?UTF-8?q?fix(offSecAgent):=20address=20bugbot=20?= =?UTF-8?q?=E2=80=94=20dispose=20shell=20first,=20guard=20onFinish=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Shell dispose no longer blocked on persist: moved dispose() to the top of the finally block so it runs immediately after the stream stops, before any async persistence I/O. 2. onFinish clobber guard: added syntheticsPersisted flag set after emitSyntheticToolResults writes. onFinish checks this flag and skips its write when synthetics already persisted the correct snapshot, preventing a stale overwrite. 3. Clarified that completed tools (removed from inFlightTools) are already persisted via onStepFinish — only genuinely orphaned in-flight calls get synthetic results. Co-authored-by: Kyle Ryan --- .../offSecAgent/offensiveSecurityAgent.ts | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 341b59bdd..42d874725 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -127,6 +127,9 @@ export class OffensiveSecurityAgent { /** Cancels and flushes the debounced persist timer so abort writes don't race. */ private cancelPersistTimer: (() => void) | null = null; + /** Set by emitSyntheticToolResults so onFinish skips its write (synthetics already persisted). */ + private syntheticsPersisted = false; + /** * Async factory that creates a session when one is not provided, * then constructs the agent. Use this instead of `new` when you @@ -446,13 +449,17 @@ export class OffensiveSecurityAgent { clearTimeout(persistTimer); persistTimer = null; } - const finalMessages = this.latestMessages ?? [ - ...initialMessagesRef.current, - ...event.response.messages, - ]; - await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( - () => {}, - ); + // If emitSyntheticToolResults already wrote the abort snapshot, + // skip this write to avoid clobbering it with a stale version. + if (!this.syntheticsPersisted) { + const finalMessages = this.latestMessages ?? [ + ...initialMessagesRef.current, + ...event.response.messages, + ]; + await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( + () => {}, + ); + } await input.onFinish?.(event); }, abortSignal: input.abortSignal, @@ -515,6 +522,9 @@ export class OffensiveSecurityAgent { const bus = this.eventBus; // toolCallId -> toolName for calls awaiting a matching tool-result. + // Completed tools (those that received a real tool-result) are removed + // and already persisted in latestMessages via onStepFinish — only + // genuinely orphaned calls need synthetic results on abort. const inFlightTools = new Map(); let streamError: unknown = null; @@ -530,6 +540,9 @@ export class OffensiveSecurityAgent { } catch (err) { streamError = err; } finally { + // Dispose shell immediately — don't block on persistence I/O. + this.persistentShell?.dispose(); + if (inFlightTools.size > 0) { const reason = this.abortSignal?.aborted ? "Agent aborted by user" @@ -542,7 +555,6 @@ export class OffensiveSecurityAgent { // Swallow — never mask the original streamError with a listener error. } } - this.persistentShell?.dispose(); } if (streamError) { @@ -638,6 +650,7 @@ export class OffensiveSecurityAgent { const next: ModelMessage[] = [...base, ...appended]; this.latestMessages = next; await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); + this.syntheticsPersisted = true; } private baseContainsToolCalls( From e9d444e940ca5eef950a453d0228fa50705f6530 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 2 Jun 2026 18:25:36 +0000 Subject: [PATCH 09/11] fix(offSecAgent): persist completed tool results on parallel-tool abort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When multiple tools run in parallel and the stream aborts mid-step (before onStepFinish fires), tools that already received a real tool-result had their results lost — only in-flight tools got synthetic results. Now consume() captures completed tool results alongside in-flight tracking. On abort, emitSyntheticToolResults reconstructs the full step: assistant message with ALL tool-call entries + a combined tool message with real results for completed tools and synthetic error-text for in-flight ones. Adds test covering the parallel-tool abort scenario. Co-authored-by: Kyle Ryan --- .../offensiveSecurityAgent.test.ts | 76 +++++++++++++++++++ .../offSecAgent/offensiveSecurityAgent.ts | 75 +++++++++++++----- 2 files changed, 131 insertions(+), 20 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 76a930efd..a3331513d 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -504,6 +504,82 @@ describe("OffensiveSecurityAgent.consume()", () => { }); }); + describe("parallel tool abort persists completed tools (bugbot fix)", () => { + it("includes completed tool results alongside synthetics on abort", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-parallel`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + // Two tools called; tool A completes, tool B is still in-flight on abort. + const toolCallA = { + type: "tool-call", + toolCallId: "tcA", + toolName: "read_file", + }; + const toolCallB = { + type: "tool-call", + toolCallId: "tcB", + toolName: "execute_command", + }; + const toolResultA = { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + result: { type: "text", value: "file contents" }, + }; + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallA, toolCallB, toolResultA], + new Error("connection lost"), + ), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // user + assistant (with both tool-calls) + tool (with both results) + expect(written).toHaveLength(3); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[1].content).toHaveLength(2); + const assistantToolIds = written[1].content.map( + (c: { toolCallId: string }) => c.toolCallId, + ); + expect(assistantToolIds.sort()).toEqual(["tcA", "tcB"]); + + expect(written[2].role).toBe("tool"); + expect(written[2].content).toHaveLength(2); + const toolResultIds = written[2].content.map( + (c: { toolCallId: string }) => c.toolCallId, + ); + expect(toolResultIds.sort()).toEqual(["tcA", "tcB"]); + + // Tool A has real result, Tool B has synthetic error-text + const resultA = written[2].content.find( + (c: { toolCallId: string }) => c.toolCallId === "tcA", + ); + const resultB = written[2].content.find( + (c: { toolCallId: string }) => c.toolCallId === "tcB", + ); + expect(resultA.output.type).toBe("text"); + expect(resultB.output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + describe("emitSyntheticToolResults error does not mask stream error (bugbot fix)", () => { const toolCallChunk = { type: "tool-call", diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 42d874725..c0bbee488 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -521,11 +521,12 @@ export class OffensiveSecurityAgent { const sid = this.subagentId; const bus = this.eventBus; - // toolCallId -> toolName for calls awaiting a matching tool-result. - // Completed tools (those that received a real tool-result) are removed - // and already persisted in latestMessages via onStepFinish — only - // genuinely orphaned calls need synthetic results on abort. + // Track all tool calls in the current step so we can reconstruct a + // complete step on abort. inFlightTools holds calls still awaiting a + // result; completedResults holds results already streamed but not yet + // persisted (onStepFinish hasn't fired for this step). const inFlightTools = new Map(); + const completedResults: ToolResultPart[] = []; let streamError: unknown = null; try { @@ -533,10 +534,24 @@ export class OffensiveSecurityAgent { if (chunk.type === "tool-call") { inFlightTools.set(chunk.toolCallId, chunk.toolName); } else if (chunk.type === "tool-result") { - inFlightTools.delete(chunk.toolCallId); + const tc = chunk as { + toolCallId: string; + toolName: string; + result?: unknown; + output?: unknown; + }; + inFlightTools.delete(tc.toolCallId); + completedResults.push({ + type: "tool-result", + toolCallId: tc.toolCallId, + toolName: tc.toolName, + output: (tc.result ?? tc.output) as ToolResultPart["output"], + }); } bus.emitStreamPart(chunk, sid); } + // Stream completed normally — onStepFinish persists everything. + completedResults.length = 0; } catch (err) { streamError = err; } finally { @@ -550,7 +565,11 @@ export class OffensiveSecurityAgent { ? streamError.message : "Stream terminated unexpectedly"; try { - await this.emitSyntheticToolResults(inFlightTools, reason); + await this.emitSyntheticToolResults( + inFlightTools, + completedResults, + reason, + ); } catch { // Swallow — never mask the original streamError with a listener error. } @@ -581,6 +600,7 @@ export class OffensiveSecurityAgent { private async emitSyntheticToolResults( inFlightTools: Map, + completedResults: ToolResultPart[], reason: string, ): Promise { const sid = this.subagentId; @@ -623,29 +643,44 @@ export class OffensiveSecurityAgent { } } - // If the last message in base is NOT an assistant message containing - // these tool-calls, the current step's assistant turn hasn't been - // persisted yet (onStepFinish never fired). Reconstruct it so - // resumed sessions see a valid tool-call → tool-result pair. + // If onStepFinish never fired for the current step, the assistant + // message (with tool-calls) and any completed tool results within + // this step aren't in base yet. Reconstruct the full step so resumed + // sessions see valid tool-call → tool-result pairs for ALL tools. + const allToolCallIds = new Set([ + ...inFlightTools.keys(), + ...completedResults.map((r) => r.toolCallId), + ]); const lastMsg = base[base.length - 1]; - const needsAssistantMsg = + const needsStepReconstruction = !lastMsg || lastMsg.role !== "assistant" || - !this.baseContainsToolCalls(lastMsg, inFlightTools); + !this.baseContainsToolCalls(lastMsg, allToolCallIds); const appended: ModelMessage[] = []; - if (needsAssistantMsg) { - const toolCalls: ToolCallPart[] = [...inFlightTools.entries()].map( - ([toolCallId, toolName]) => ({ + if (needsStepReconstruction) { + // Reconstruct assistant message with ALL tool calls from this step. + const toolCalls: ToolCallPart[] = [ + ...[...inFlightTools.entries()].map(([toolCallId, toolName]) => ({ type: "tool-call" as const, toolCallId, toolName, input: {}, - }), - ); + })), + ...completedResults.map((r) => ({ + type: "tool-call" as const, + toolCallId: r.toolCallId, + toolName: r.toolName, + input: {}, + })), + ]; appended.push({ role: "assistant", content: toolCalls }); } - appended.push({ role: "tool", content: syntheticParts }); + // Combine completed + synthetic results into a single tool message. + appended.push({ + role: "tool", + content: [...completedResults, ...syntheticParts], + }); const next: ModelMessage[] = [...base, ...appended]; this.latestMessages = next; @@ -655,7 +690,7 @@ export class OffensiveSecurityAgent { private baseContainsToolCalls( msg: ModelMessage, - inFlightTools: Map, + toolCallIds: Set, ): boolean { if (!Array.isArray(msg.content)) return false; const contentToolIds = new Set( @@ -663,7 +698,7 @@ export class OffensiveSecurityAgent { .filter((p) => p.type === "tool-call" && p.toolCallId) .map((p) => p.toolCallId), ); - for (const toolCallId of inFlightTools.keys()) { + for (const toolCallId of toolCallIds) { if (!contentToolIds.has(toolCallId)) return false; } return true; From 9806bdfa150d95b06ce529eeca31df280f4b4869 Mon Sep 17 00:00:00 2001 From: Jorge Raad Date: Tue, 2 Jun 2026 14:34:39 -0400 Subject: [PATCH 10/11] refactor(offSecAgent): simplify abort reconstruction and trim comments - Unify in-flight + completed tool calls into a single map when reconstructing the assistant message on abort - Use Array.every for baseContainsToolCalls - Condense verbose comments added for synthetic tool-result handling --- .../offSecAgent/offensiveSecurityAgent.ts | 65 ++++++++----------- 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index c0bbee488..86808dc8d 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -119,15 +119,14 @@ export class OffensiveSecurityAgent { /** The session this agent is operating within. */ private readonly _session: SessionInfo; - /** Latest accumulated messages — shared between stream callbacks and `consume()` for abort persistence. */ + /** Latest accumulated messages, shared with `consume()` for abort persistence. */ private latestMessages: ModelMessage[] | null = null; private messagesPath: string | null = null; - /** Cancels and flushes the debounced persist timer so abort writes don't race. */ + /** Cancels the debounced persist timer so abort writes don't race it. */ private cancelPersistTimer: (() => void) | null = null; - /** Set by emitSyntheticToolResults so onFinish skips its write (synthetics already persisted). */ private syntheticsPersisted = false; /** @@ -449,8 +448,7 @@ export class OffensiveSecurityAgent { clearTimeout(persistTimer); persistTimer = null; } - // If emitSyntheticToolResults already wrote the abort snapshot, - // skip this write to avoid clobbering it with a stale version. + // Skip if emitSyntheticToolResults already wrote the abort snapshot. if (!this.syntheticsPersisted) { const finalMessages = this.latestMessages ?? [ ...initialMessagesRef.current, @@ -521,10 +519,9 @@ export class OffensiveSecurityAgent { const sid = this.subagentId; const bus = this.eventBus; - // Track all tool calls in the current step so we can reconstruct a - // complete step on abort. inFlightTools holds calls still awaiting a - // result; completedResults holds results already streamed but not yet - // persisted (onStepFinish hasn't fired for this step). + // Tracks the current step so it can be reconstructed on abort: + // inFlightTools = calls still awaiting a result, completedResults = + // results already streamed but not yet persisted by onStepFinish. const inFlightTools = new Map(); const completedResults: ToolResultPart[] = []; let streamError: unknown = null; @@ -550,12 +547,12 @@ export class OffensiveSecurityAgent { } bus.emitStreamPart(chunk, sid); } - // Stream completed normally — onStepFinish persists everything. + // Completed normally — onStepFinish persists everything. completedResults.length = 0; } catch (err) { streamError = err; } finally { - // Dispose shell immediately — don't block on persistence I/O. + // Dispose first — don't block on persistence I/O. this.persistentShell?.dispose(); if (inFlightTools.size > 0) { @@ -571,7 +568,7 @@ export class OffensiveSecurityAgent { reason, ); } catch { - // Swallow — never mask the original streamError with a listener error. + // Never mask the original streamError with a listener error. } } } @@ -627,26 +624,22 @@ export class OffensiveSecurityAgent { if (!this.messagesPath) return; - // Cancel the debounced persist timer so its fire-and-forget writeFile - // cannot race with the write below. + // Cancel the debounced timer so its writeFile can't race the write below. this.cancelPersistTimer?.(); - // latestMessages may be null if the debounced persist timer already - // flushed to disk. Fall back to the on-disk snapshot so we don't - // overwrite the full conversation with only the synthetic parts. + // latestMessages is null once the debounced timer has flushed; fall back + // to the on-disk snapshot so we don't overwrite history with synthetics. let base: ModelMessage[] = this.latestMessages ?? []; if (base.length === 0 && existsSync(this.messagesPath)) { try { base = JSON.parse(readFileSync(this.messagesPath, "utf-8")); } catch { - // Best-effort: corrupt file → proceed with empty base. + // Corrupt file → proceed with empty base. } } - // If onStepFinish never fired for the current step, the assistant - // message (with tool-calls) and any completed tool results within - // this step aren't in base yet. Reconstruct the full step so resumed - // sessions see valid tool-call → tool-result pairs for ALL tools. + // When onStepFinish hasn't fired, this step's assistant + tool messages + // aren't in base yet; reconstruct it so resumed sessions see valid pairs. const allToolCallIds = new Set([ ...inFlightTools.keys(), ...completedResults.map((r) => r.toolCallId), @@ -659,24 +652,23 @@ export class OffensiveSecurityAgent { const appended: ModelMessage[] = []; if (needsStepReconstruction) { - // Reconstruct assistant message with ALL tool calls from this step. - const toolCalls: ToolCallPart[] = [ - ...[...inFlightTools.entries()].map(([toolCallId, toolName]) => ({ + const stepTools: Array<[string, string]> = [ + ...inFlightTools, + ...completedResults.map((r): [string, string] => [ + r.toolCallId, + r.toolName, + ]), + ]; + const toolCalls: ToolCallPart[] = stepTools.map( + ([toolCallId, toolName]) => ({ type: "tool-call" as const, toolCallId, toolName, input: {}, - })), - ...completedResults.map((r) => ({ - type: "tool-call" as const, - toolCallId: r.toolCallId, - toolName: r.toolName, - input: {}, - })), - ]; + }), + ); appended.push({ role: "assistant", content: toolCalls }); } - // Combine completed + synthetic results into a single tool message. appended.push({ role: "tool", content: [...completedResults, ...syntheticParts], @@ -698,10 +690,7 @@ export class OffensiveSecurityAgent { .filter((p) => p.type === "tool-call" && p.toolCallId) .map((p) => p.toolCallId), ); - for (const toolCallId of toolCallIds) { - if (!contentToolIds.has(toolCallId)) return false; - } - return true; + return [...toolCallIds].every((id) => contentToolIds.has(id)); } } From 6f3aabee379d5ebd6906937d5c55c03a291017cc Mon Sep 17 00:00:00 2001 From: Jorge Raad Date: Tue, 2 Jun 2026 14:40:50 -0400 Subject: [PATCH 11/11] fix(offSecAgent): clear completed results per step and gate persisted flag on write success - Clear completedResults at each finish-step so a later-step abort no longer re-appends earlier steps' tool calls/results into the snapshot (duplicated/corrupted resume history) - Only set syntheticsPersisted after a successful writeFile, so a failed abort-snapshot write lets onFinish retry instead of being suppressed - Add regression tests for both --- .../offensiveSecurityAgent.test.ts | 109 ++++++++++++++++++ .../offSecAgent/offensiveSecurityAgent.ts | 13 ++- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index a3331513d..e1faba0c6 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -580,6 +580,115 @@ describe("OffensiveSecurityAgent.consume()", () => { }); }); + describe("multi-step abort does not duplicate prior steps (bugbot fix)", () => { + it("clears completed results at finish-step so earlier steps aren't re-appended", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-multistep`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + // Step 1 already persisted by onStepFinish: tool A called and resolved. + const persistedAfterStep1 = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + { + role: "assistant", + content: [ + { type: "tool-call", toolCallId: "tcA", toolName: "read_file" }, + ], + }, + { + role: "tool", + content: [ + { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + output: { type: "text", value: "file contents" }, + }, + ], + }, + ]; + writeFileSync(messagesPath, JSON.stringify(persistedAfterStep1)); + + // Stream replays step 1's chunks, finishes the step, then opens tool B + // in step 2 before erroring. + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [ + { type: "tool-call", toolCallId: "tcA", toolName: "read_file" }, + { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + result: { type: "text", value: "file contents" }, + }, + { type: "finish-step" }, + { + type: "tool-call", + toolCallId: "tcB", + toolName: "execute_command", + }, + ], + new Error("connection lost"), + ), + messagesPath, + latestMessages: persistedAfterStep1, + }); + + try { + await agent.consume(); + } catch {} + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + const toolCallIds = written + .filter((m: { role: string }) => m.role === "assistant") + .flatMap((m: { content: { toolCallId?: string }[] }) => + m.content.map((c) => c.toolCallId), + ); + // tcA must appear exactly once — the abort snapshot must not re-append it. + expect(toolCallIds.filter((id: string) => id === "tcA")).toHaveLength(1); + expect(toolCallIds.filter((id: string) => id === "tcB")).toHaveLength(1); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + + describe("failed abort write leaves flag unset (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("does not mark synthetics persisted when the write fails", async () => { + // A path inside a non-existent directory makes writeFile reject. + const messagesPath = join( + "/tmp", + `pensar-test-${Date.now()}-missing`, + "nope", + "messages.json", + ); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("stream broke")), + messagesPath, + latestMessages: [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + ], + }); + + try { + await agent.consume(); + } catch {} + await new Promise((r) => setTimeout(r, 50)); + + expect( + (agent as unknown as { syntheticsPersisted?: boolean }) + .syntheticsPersisted, + ).toBeFalsy(); + }); + }); + describe("emitSyntheticToolResults error does not mask stream error (bugbot fix)", () => { const toolCallChunk = { type: "tool-call", diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 86808dc8d..ca67b4de5 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -544,6 +544,10 @@ export class OffensiveSecurityAgent { toolName: tc.toolName, output: (tc.result ?? tc.output) as ToolResultPart["output"], }); + } else if (chunk.type === "finish-step") { + // onStepFinish has persisted this step; drop its results so a + // later abort doesn't re-append already-saved tool calls/results. + completedResults.length = 0; } bus.emitStreamPart(chunk, sid); } @@ -676,8 +680,13 @@ export class OffensiveSecurityAgent { const next: ModelMessage[] = [...base, ...appended]; this.latestMessages = next; - await writeFile(this.messagesPath, JSON.stringify(next)).catch(() => {}); - this.syntheticsPersisted = true; + try { + await writeFile(this.messagesPath, JSON.stringify(next)); + // Only suppress onFinish's write once the snapshot is safely on disk. + this.syntheticsPersisted = true; + } catch { + // Write failed — leave the flag false so onFinish still attempts a write. + } } private baseContainsToolCalls(