diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts index 7b134799..e1faba0c 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.test.ts @@ -7,6 +7,8 @@ * Heavy transitive dependencies (tools, AI SDK, zod) are stubbed so * the test loads cleanly without external provider keys. */ +import { mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; import { describe, expect, it, vi } from "vitest"; // --------------------------------------------------------------------------- @@ -76,6 +78,8 @@ function buildStubAgent(overrides: { persistentShell?: { dispose: () => void }; abortSignal?: AbortSignal; resolveResult?: (sr: unknown) => unknown; + messagesPath?: string | null; + latestMessages?: unknown[] | null; }): OffensiveSecurityAgent { const agent = Object.create( OffensiveSecurityAgent.prototype, @@ -98,6 +102,16 @@ function buildStubAgent(overrides: { Object.defineProperty(agent, "resolveResult", { value: overrides.resolveResult, }); + Object.defineProperty(agent, "latestMessages", { + value: overrides.latestMessages ?? null, + writable: true, + }); + Object.defineProperty(agent, "messagesPath", { + value: overrides.messagesPath ?? null, + }); + Object.defineProperty(agent, "cancelPersistTimer", { + value: () => {}, + }); return agent; } @@ -234,6 +248,118 @@ describe("OffensiveSecurityAgent.consume()", () => { }); }); + describe("synthetic tool-result on stream abort/error", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + const otherToolCallChunk = { + type: "tool-call", + toolCallId: "tc2", + toolName: "read_file", + }; + const toolResultChunk = { + type: "tool-result", + toolCallId: "tc1", + result: { type: "text", value: "done" }, + }; + + it("emits synthetic tool-result for an in-flight tool when stream throws", async () => { + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new Error("connection reset"), + ), + }); + + const emittedResults: Array<{ + toolCallId: string; + result: unknown; + }> = []; + agent.eventBus.on("tool-result", (e) => { + emittedResults.push({ toolCallId: e.toolCallId, result: e.result }); + }); + + await expect(agent.consume()).rejects.toThrow("connection reset"); + + expect(emittedResults).toHaveLength(1); + expect(emittedResults[0].toolCallId).toBe("tc1"); + expect(emittedResults[0].result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("connection reset"), + }); + }); + + it("does not emit a synthetic result when the matching tool-result already streamed", async () => { + const agent = buildStubAgent({ + fullStream: yieldChunks([toolCallChunk, toolResultChunk]), + }); + + const emittedResults: unknown[] = []; + agent.eventBus.on("tool-result", (e) => emittedResults.push(e)); + + await agent.consume(); + + expect(emittedResults).toHaveLength(0); + }); + + it("emits one synthetic per in-flight tool when multiple are open at abort", async () => { + const controller = new AbortController(); + controller.abort(); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk, otherToolCallChunk], + new DOMException("Aborted", "AbortError"), + ), + abortSignal: controller.signal, + }); + + const emittedResults: Array<{ toolCallId: string; result: unknown }> = []; + agent.eventBus.on("tool-result", (e) => { + emittedResults.push({ toolCallId: e.toolCallId, result: e.result }); + }); + + await expect(agent.consume()).rejects.toBeInstanceOf(DOMException); + + expect(emittedResults).toHaveLength(2); + const ids = emittedResults.map((r) => r.toolCallId).sort(); + expect(ids).toEqual(["tc1", "tc2"]); + for (const r of emittedResults) { + expect(r.result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("aborted"), + }); + } + }); + + it("uses 'Agent aborted by user' as reason when abortSignal is set", async () => { + const controller = new AbortController(); + controller.abort(); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new DOMException("Aborted", "AbortError"), + ), + abortSignal: controller.signal, + }); + + const emittedResults: Array<{ result: unknown }> = []; + agent.eventBus.on("tool-result", (e) => + emittedResults.push({ result: e.result }), + ); + + await expect(agent.consume()).rejects.toBeInstanceOf(DOMException); + + expect(emittedResults[0].result).toMatchObject({ + type: "error-text", + value: expect.stringContaining("aborted by user"), + }); + }); + }); + describe("resolveResult", () => { it("returns resolved value when resolveResult is provided", async () => { const agent = buildStubAgent({ @@ -254,4 +380,352 @@ describe("OffensiveSecurityAgent.consume()", () => { expect(result).toBeUndefined(); }); }); + + describe("synthetic tool-result: persist timer fallback (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("reads persisted messages from disk when latestMessages is null", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-fallback`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "run nmap" }] }, + { role: "assistant", content: [{ type: "text", text: "Running..." }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + // latestMessages is null — simulates persist timer having flushed + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("timeout")), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + // Wait for async writeFile + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // original 2 + assistant (tool-call) + tool (synthetic result) + expect(written).toHaveLength(4); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[2].role).toBe("assistant"); + expect(written[2].content[0].type).toBe("tool-call"); + expect(written[2].content[0].toolCallId).toBe("tc1"); + expect(written[3].role).toBe("tool"); + expect(written[3].content[0].toolCallId).toBe("tc1"); + expect(written[3].content[0].output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("does not lose history when persist timer nulled latestMessages", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-noloss`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan target" }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("network")), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // Must NOT be just [{ role: "tool" ... }] — must include original messages + expect(written.length).toBeGreaterThan(1); + expect(written[0].role).toBe("user"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("does not add assistant tool-call msg when base already contains them", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-nodup`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + // Simulate onStepFinish having already persisted the assistant tool-call + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + { + role: "assistant", + content: [ + { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + input: {}, + }, + ], + }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("abort")), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // Should NOT duplicate the assistant message — just user + assistant + tool + expect(written).toHaveLength(3); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[2].role).toBe("tool"); + expect(written[2].content[0].output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + + describe("parallel tool abort persists completed tools (bugbot fix)", () => { + it("includes completed tool results alongside synthetics on abort", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-parallel`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + const existingMessages = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + ]; + writeFileSync(messagesPath, JSON.stringify(existingMessages)); + + // Two tools called; tool A completes, tool B is still in-flight on abort. + const toolCallA = { + type: "tool-call", + toolCallId: "tcA", + toolName: "read_file", + }; + const toolCallB = { + type: "tool-call", + toolCallId: "tcB", + toolName: "execute_command", + }; + const toolResultA = { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + result: { type: "text", value: "file contents" }, + }; + + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallA, toolCallB, toolResultA], + new Error("connection lost"), + ), + messagesPath, + latestMessages: null, + }); + + try { + await agent.consume(); + } catch {} + + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + // user + assistant (with both tool-calls) + tool (with both results) + expect(written).toHaveLength(3); + expect(written[0].role).toBe("user"); + expect(written[1].role).toBe("assistant"); + expect(written[1].content).toHaveLength(2); + const assistantToolIds = written[1].content.map( + (c: { toolCallId: string }) => c.toolCallId, + ); + expect(assistantToolIds.sort()).toEqual(["tcA", "tcB"]); + + expect(written[2].role).toBe("tool"); + expect(written[2].content).toHaveLength(2); + const toolResultIds = written[2].content.map( + (c: { toolCallId: string }) => c.toolCallId, + ); + expect(toolResultIds.sort()).toEqual(["tcA", "tcB"]); + + // Tool A has real result, Tool B has synthetic error-text + const resultA = written[2].content.find( + (c: { toolCallId: string }) => c.toolCallId === "tcA", + ); + const resultB = written[2].content.find( + (c: { toolCallId: string }) => c.toolCallId === "tcB", + ); + expect(resultA.output.type).toBe("text"); + expect(resultB.output.type).toBe("error-text"); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + + describe("multi-step abort does not duplicate prior steps (bugbot fix)", () => { + it("clears completed results at finish-step so earlier steps aren't re-appended", async () => { + const tmpDir = join("/tmp", `pensar-test-${Date.now()}-multistep`); + mkdirSync(tmpDir, { recursive: true }); + const messagesPath = join(tmpDir, "messages.json"); + + // Step 1 already persisted by onStepFinish: tool A called and resolved. + const persistedAfterStep1 = [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + { + role: "assistant", + content: [ + { type: "tool-call", toolCallId: "tcA", toolName: "read_file" }, + ], + }, + { + role: "tool", + content: [ + { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + output: { type: "text", value: "file contents" }, + }, + ], + }, + ]; + writeFileSync(messagesPath, JSON.stringify(persistedAfterStep1)); + + // Stream replays step 1's chunks, finishes the step, then opens tool B + // in step 2 before erroring. + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [ + { type: "tool-call", toolCallId: "tcA", toolName: "read_file" }, + { + type: "tool-result", + toolCallId: "tcA", + toolName: "read_file", + result: { type: "text", value: "file contents" }, + }, + { type: "finish-step" }, + { + type: "tool-call", + toolCallId: "tcB", + toolName: "execute_command", + }, + ], + new Error("connection lost"), + ), + messagesPath, + latestMessages: persistedAfterStep1, + }); + + try { + await agent.consume(); + } catch {} + await new Promise((r) => setTimeout(r, 50)); + + const written = JSON.parse(readFileSync(messagesPath, "utf-8")); + const toolCallIds = written + .filter((m: { role: string }) => m.role === "assistant") + .flatMap((m: { content: { toolCallId?: string }[] }) => + m.content.map((c) => c.toolCallId), + ); + // tcA must appear exactly once — the abort snapshot must not re-append it. + expect(toolCallIds.filter((id: string) => id === "tcA")).toHaveLength(1); + expect(toolCallIds.filter((id: string) => id === "tcB")).toHaveLength(1); + + rmSync(tmpDir, { recursive: true, force: true }); + }); + }); + + describe("failed abort write leaves flag unset (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("does not mark synthetics persisted when the write fails", async () => { + // A path inside a non-existent directory makes writeFile reject. + const messagesPath = join( + "/tmp", + `pensar-test-${Date.now()}-missing`, + "nope", + "messages.json", + ); + + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("stream broke")), + messagesPath, + latestMessages: [ + { role: "user", content: [{ type: "text", text: "scan" }] }, + ], + }); + + try { + await agent.consume(); + } catch {} + await new Promise((r) => setTimeout(r, 50)); + + expect( + (agent as unknown as { syntheticsPersisted?: boolean }) + .syntheticsPersisted, + ).toBeFalsy(); + }); + }); + + describe("emitSyntheticToolResults error does not mask stream error (bugbot fix)", () => { + const toolCallChunk = { + type: "tool-call", + toolCallId: "tc1", + toolName: "execute_command", + }; + + it("propagates original stream error when event listener throws", async () => { + const agent = buildStubAgent({ + fullStream: yieldThenThrow( + [toolCallChunk], + new Error("original stream error"), + ), + }); + + // Make the event listener throw during synthetic emission + agent.eventBus.on("tool-result", () => { + throw new Error("listener explosion"); + }); + + // The original stream error must propagate, not the listener error + await expect(agent.consume()).rejects.toThrow("original stream error"); + }); + + it("still disposes shell when emitSyntheticToolResults throws", async () => { + const dispose = vi.fn(); + const agent = buildStubAgent({ + fullStream: yieldThenThrow([toolCallChunk], new Error("stream broke")), + persistentShell: { dispose }, + }); + + agent.eventBus.on("tool-result", () => { + throw new Error("listener crash"); + }); + + await expect(agent.consume()).rejects.toThrow("stream broke"); + expect(dispose).toHaveBeenCalledOnce(); + }); + }); }); diff --git a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts index 715bce3c..ca67b4de 100644 --- a/src/core/agents/offSecAgent/offensiveSecurityAgent.ts +++ b/src/core/agents/offSecAgent/offensiveSecurityAgent.ts @@ -1,14 +1,16 @@ +import { existsSync, mkdirSync, readFileSync } from "node:fs"; +import { writeFile } from "node:fs/promises"; +import { join } from "node:path"; import type { ModelMessage, StopCondition, StreamTextResult, TextStreamPart, + ToolCallPart, + ToolResultPart, ToolSet, } from "ai"; import { hasToolCall } from "ai"; -import { existsSync, mkdirSync } from "fs"; -import { writeFile } from "fs/promises"; -import { join } from "path"; import { streamResponse } from "../../ai"; import { AgentEventBus } from "../../eventBus"; import { @@ -117,6 +119,16 @@ export class OffensiveSecurityAgent { /** The session this agent is operating within. */ private readonly _session: SessionInfo; + /** Latest accumulated messages, shared with `consume()` for abort persistence. */ + private latestMessages: ModelMessage[] | null = null; + + private messagesPath: string | null = null; + + /** Cancels the debounced persist timer so abort writes don't race it. */ + private cancelPersistTimer: (() => void) | null = null; + + private syntheticsPersisted = false; + /** * Async factory that creates a session when one is not provided, * then constructs the agent. Use this instead of `new` when you @@ -328,7 +340,8 @@ export class OffensiveSecurityAgent { if (!existsSync(messagesDir)) { mkdirSync(messagesDir, { recursive: true }); } - const messagesPath = join(messagesDir, "messages.json"); + this.messagesPath = join(messagesDir, "messages.json"); + const messagesPath = this.messagesPath; // Mutable so that summarization can clear stale history. const initialMessagesRef: { current: ModelMessage[] } = { @@ -346,20 +359,26 @@ export class OffensiveSecurityAgent { // JSON.stringify on every step when many agents run concurrently. const PERSIST_INTERVAL_MS = 15_000; let persistTimer: ReturnType | null = null; - let latestMessages: ModelMessage[] | null = null; const schedulePersist = () => { if (persistTimer) return; persistTimer = setTimeout(() => { persistTimer = null; - if (latestMessages) { - const toWrite = latestMessages; - latestMessages = null; + if (this.latestMessages) { + const toWrite = this.latestMessages; + this.latestMessages = null; writeFile(messagesPath, JSON.stringify(toWrite)).catch(() => {}); } }, PERSIST_INTERVAL_MS); }; + this.cancelPersistTimer = () => { + if (persistTimer) { + clearTimeout(persistTimer); + persistTimer = null; + } + }; + // -- Init record (trace.jsonl first line) --------------------------------- // Hash only the base system prompt (excluding session workspace paths) // so the hash is stable across runs with identical prompt versions. @@ -400,7 +419,7 @@ export class OffensiveSecurityAgent { toolChoice: "auto", sessionPath: input.session.rootPath, onStepFinish: async (event) => { - latestMessages = [ + this.latestMessages = [ ...initialMessagesRef.current, ...event.response.messages, ]; @@ -429,13 +448,16 @@ export class OffensiveSecurityAgent { clearTimeout(persistTimer); persistTimer = null; } - const finalMessages = latestMessages ?? [ - ...initialMessagesRef.current, - ...event.response.messages, - ]; - await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( - () => {}, - ); + // Skip if emitSyntheticToolResults already wrote the abort snapshot. + if (!this.syntheticsPersisted) { + const finalMessages = this.latestMessages ?? [ + ...initialMessagesRef.current, + ...event.response.messages, + ]; + await writeFile(messagesPath, JSON.stringify(finalMessages)).catch( + () => {}, + ); + } await input.onFinish?.(event); }, abortSignal: input.abortSignal, @@ -497,12 +519,66 @@ export class OffensiveSecurityAgent { const sid = this.subagentId; const bus = this.eventBus; + // Tracks the current step so it can be reconstructed on abort: + // inFlightTools = calls still awaiting a result, completedResults = + // results already streamed but not yet persisted by onStepFinish. + const inFlightTools = new Map(); + const completedResults: ToolResultPart[] = []; + let streamError: unknown = null; + try { for await (const chunk of this.streamResult.fullStream) { + if (chunk.type === "tool-call") { + inFlightTools.set(chunk.toolCallId, chunk.toolName); + } else if (chunk.type === "tool-result") { + const tc = chunk as { + toolCallId: string; + toolName: string; + result?: unknown; + output?: unknown; + }; + inFlightTools.delete(tc.toolCallId); + completedResults.push({ + type: "tool-result", + toolCallId: tc.toolCallId, + toolName: tc.toolName, + output: (tc.result ?? tc.output) as ToolResultPart["output"], + }); + } else if (chunk.type === "finish-step") { + // onStepFinish has persisted this step; drop its results so a + // later abort doesn't re-append already-saved tool calls/results. + completedResults.length = 0; + } bus.emitStreamPart(chunk, sid); } + // Completed normally — onStepFinish persists everything. + completedResults.length = 0; + } catch (err) { + streamError = err; } finally { + // Dispose first — don't block on persistence I/O. this.persistentShell?.dispose(); + + if (inFlightTools.size > 0) { + const reason = this.abortSignal?.aborted + ? "Agent aborted by user" + : streamError instanceof Error && streamError.message + ? streamError.message + : "Stream terminated unexpectedly"; + try { + await this.emitSyntheticToolResults( + inFlightTools, + completedResults, + reason, + ); + } catch { + // Never mask the original streamError with a listener error. + } + } + } + + if (streamError) { + throw streamError; } if (this.abortSignal?.aborted) { @@ -522,6 +598,109 @@ export class OffensiveSecurityAgent { get response() { return this.streamResult.response; } + + private async emitSyntheticToolResults( + inFlightTools: Map, + completedResults: ToolResultPart[], + reason: string, + ): Promise { + const sid = this.subagentId; + const output = { + type: "error-text" as const, + value: `Tool execution aborted: ${reason}`, + }; + const syntheticParts: ToolResultPart[] = []; + + for (const [toolCallId, toolName] of inFlightTools) { + this.eventBus.emit("tool-result", { + toolCallId, + toolName, + result: output, + subagentId: sid, + }); + syntheticParts.push({ + type: "tool-result", + toolCallId, + toolName, + output, + }); + } + + if (!this.messagesPath) return; + + // Cancel the debounced timer so its writeFile can't race the write below. + this.cancelPersistTimer?.(); + + // latestMessages is null once the debounced timer has flushed; fall back + // to the on-disk snapshot so we don't overwrite history with synthetics. + let base: ModelMessage[] = this.latestMessages ?? []; + if (base.length === 0 && existsSync(this.messagesPath)) { + try { + base = JSON.parse(readFileSync(this.messagesPath, "utf-8")); + } catch { + // Corrupt file → proceed with empty base. + } + } + + // When onStepFinish hasn't fired, this step's assistant + tool messages + // aren't in base yet; reconstruct it so resumed sessions see valid pairs. + const allToolCallIds = new Set([ + ...inFlightTools.keys(), + ...completedResults.map((r) => r.toolCallId), + ]); + const lastMsg = base[base.length - 1]; + const needsStepReconstruction = + !lastMsg || + lastMsg.role !== "assistant" || + !this.baseContainsToolCalls(lastMsg, allToolCallIds); + + const appended: ModelMessage[] = []; + if (needsStepReconstruction) { + const stepTools: Array<[string, string]> = [ + ...inFlightTools, + ...completedResults.map((r): [string, string] => [ + r.toolCallId, + r.toolName, + ]), + ]; + const toolCalls: ToolCallPart[] = stepTools.map( + ([toolCallId, toolName]) => ({ + type: "tool-call" as const, + toolCallId, + toolName, + input: {}, + }), + ); + appended.push({ role: "assistant", content: toolCalls }); + } + appended.push({ + role: "tool", + content: [...completedResults, ...syntheticParts], + }); + + const next: ModelMessage[] = [...base, ...appended]; + this.latestMessages = next; + try { + await writeFile(this.messagesPath, JSON.stringify(next)); + // Only suppress onFinish's write once the snapshot is safely on disk. + this.syntheticsPersisted = true; + } catch { + // Write failed — leave the flag false so onFinish still attempts a write. + } + } + + private baseContainsToolCalls( + msg: ModelMessage, + toolCallIds: Set, + ): boolean { + if (!Array.isArray(msg.content)) return false; + const contentToolIds = new Set( + (msg.content as Array<{ type: string; toolCallId?: string }>) + .filter((p) => p.type === "tool-call" && p.toolCallId) + .map((p) => p.toolCallId), + ); + return [...toolCallIds].every((id) => contentToolIds.has(id)); + } } // These tools pause the agent and surface their own UI to the operator.