diff --git a/.changeset/soft-spoons-matter.md b/.changeset/soft-spoons-matter.md new file mode 100644 index 0000000..1777753 --- /dev/null +++ b/.changeset/soft-spoons-matter.md @@ -0,0 +1,5 @@ +--- +"@ai-sdk-tool/harness": patch +--- + +Document the new harness DX for observing every stream part, surfacing buffered assistant text before tool calls, using the non-streaming `GeneratingAgent.generate()` path, and depending on the narrower `LoopAgent` contract for loop-only integrations. diff --git a/.gitignore b/.gitignore index fc0c78a..85a00c6 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ work/ # IDE config (may contain tokens) *pencode.json .omx/ +packages/minimal-agent/.minimal-agent diff --git a/packages/harness/README.md b/packages/harness/README.md index 503dbd0..a5ddea8 100644 --- a/packages/harness/README.md +++ b/packages/harness/README.md @@ -88,6 +88,10 @@ const agent = await createAgent({ const result = await runAgentLoop({ agent, messages: [{ role: "user", content: "What time is it?" }], + onTextBeforeToolCall: (text, boundary, ctx) => { + const toolLabel = boundary.toolName ?? boundary.toolCallId ?? boundary.type; + console.log(`[${ctx.iteration}] Assistant before ${toolLabel}: ${text}`); + }, onToolCall: (call, ctx) => { console.log(`[${ctx.iteration}] Tool call: ${call.toolName}`); }, @@ -103,7 +107,7 @@ console.log(`Finished after ${result.iterations} iterations`); ### `createAgent(config)` -Creates an `Agent` instance that wraps a Vercel AI SDK `streamText` call. +Creates a `GeneratingAgent` instance that wraps Vercel AI SDK text generation with shared harness defaults, tools, guardrails, and optional MCP tool loading. Use `stream(opts)` for streaming turns and `generate(opts)` for a one-shot non-streaming turn. ```typescript import { createAgent } from "@ai-sdk-tool/harness"; @@ -116,9 +120,28 @@ const agent = await createAgent({ extraStopConditions, // StopCondition[] — additional independent stop triggers experimental_repairToolCall, // repair callback for malformed tool calls }); + +const stream = agent.stream({ messages }); +const generated = await agent.generate({ messages }); ``` -**Returns:** `Agent` — an object with `config` and `stream(opts)` method. +**Returns:** `GeneratingAgent` — an object with `config`, `stream(opts)`, `generate(opts)`, and `close()` methods. Loop-only integrations can depend on the narrower `LoopAgent` type, which requires only `config` and synchronous `stream(opts)`. + +#### Non-streaming generation + +Use `agent.generate(opts)` when a consumer needs the same agent configuration but does not need incremental stream parts. The method uses the same model, tools, provider option merging, `streamDefaults`, `prepareStep`, MCP-loaded tools, guardrails, and repair callback path as streaming turns. + +```typescript +const result = await agent.generate({ + messages: [{ role: "user", content: "Summarize this diff." }], + temperature: 0.2, +}); + +console.log(result.text); +console.log(result.response.messages); +``` + +Prefer `runAgentLoop` for multi-iteration streaming tool loops and `agent.generate()` for simple request/response integrations, tests, and batch jobs that only need the final generated result. `agent.generate()` resolves async `instructions` itself; `runAgentLoop` does the same before each streaming turn. Direct `agent.stream()` is intentionally synchronous, so direct stream callers should pass `system` explicitly when using async instructions. --- @@ -130,20 +153,24 @@ Runs the agent in a loop until a stop condition is met or `maxIterations` is rea import { runAgentLoop } from "@ai-sdk-tool/harness"; const result = await runAgentLoop({ - agent, // Agent — required + agent, // LoopAgent — required messages, // ModelMessage[] — initial conversation history maxIterations, // number — max loop iterations (default: unlimited) abortSignal, // AbortSignal — for cancellation // Hooks - onPrepareStep, // (context) => partial AgentStreamOptions override, applied before onBeforeTurn - onBeforeTurn, // (context) => partial AgentStreamOptions override - onInterrupt, // ({ iteration, reason }, context) => void | Promise - shouldContinue, // (finishReason, context) => boolean — custom continuation logic - onToolLifecycle, // (lifecycle, context) => void | Promise - onToolCall, // (call, context) => void | Promise - onStepComplete, // (step) => void | Promise - onError, // (error, context) => void | Promise | { shouldContinue?, recovery? } + onPrepareStep, // (context) => partial AgentStreamOptions override, applied before onBeforeTurn + onBeforeTurn, // (context) => partial AgentStreamOptions override + onStreamStart, // (context) => observer fired after Agent.stream() is created + onFirstStreamPart, // (part, context) => observer fired once per non-empty iteration + onStreamPart, // (part, context) => observer fired for every fullStream part + onTextBeforeToolCall, // (text, boundary, context) => buffered text emitted before each tool boundary + onInterrupt, // ({ iteration, reason }, context) => void | Promise + shouldContinue, // (finishReason, context) => boolean — custom continuation logic + onToolLifecycle, // (lifecycle, context) => void | Promise + onToolCall, // (call, context) => void | Promise + onStepComplete, // (step) => void | Promise + onError, // (error, context) => void | Promise | { shouldContinue?, recovery? } }); ``` @@ -157,6 +184,30 @@ interface RunAgentLoopResult { } ``` +#### Stream observation hooks + +Use the loop hooks when you want harness-managed streaming without reimplementing `stream.fullStream` consumption. Observer hook errors are logged and swallowed so a buggy UI callback does not abort an otherwise valid model stream. Hooks are still awaited inline, so keep callbacks fast to avoid delaying stream consumption or tool execution. + +```typescript +await runAgentLoop({ + agent, + messages, + onStreamPart: (part, ctx) => { + // Receives every AI SDK fullStream part: start, text-delta, tool-call, etc. + console.debug(`[${ctx.iteration}] ${part.type}`); + }, + onTextBeforeToolCall: (text, boundary) => { + // Useful for pre-tool acknowledgements such as "I'll check that now." + showAssistantAcknowledgement(text); + if (boundary.toolName) { + showToolPending(boundary.toolName); + } + }, +}); +``` + +`onTextBeforeToolCall` buffers `text-delta` content inside the current loop iteration, flushes it immediately before the next tool boundary (`tool-input-start`, `tool-input-end`, or `tool-call`), and resets the buffer so the same acknowledgement is not emitted again for later parts of the same tool. The boundary argument is harness-normalized metadata (`type`, optional `toolCallId`, optional `toolName`, and the raw `part`); `toolName` is optional because some AI SDK boundaries only include an id. Tool calls that arrive without preceding text do not fire this hook. + --- ### `CheckpointHistory` @@ -437,8 +488,12 @@ const history = new CheckpointHistory({ import type { Agent, AgentConfig, + GeneratingAgent, + LoopAgent, AgentStreamOptions, AgentStreamResult, + AgentGenerateOptions, + AgentGenerateResult, AgentFinishReason, LoopContinueContext, LoopStepInfo, diff --git a/packages/harness/src/agent.test.ts b/packages/harness/src/agent.test.ts index 0fa21c2..210208b 100644 --- a/packages/harness/src/agent.test.ts +++ b/packages/harness/src/agent.test.ts @@ -4,48 +4,79 @@ import { AgentError, AgentErrorCode } from "./errors"; import { clearMCPCache } from "./mcp-init"; import type { AgentConfig } from "./types"; -const { streamTextMock, resolveMCPOptionMock, stepCountIsMock, toolMock } = - vi.hoisted(() => { - const streamTextMock = vi.fn(() => { - const fullStream: AsyncIterable<{ finishReason: string; type: string }> = - { - [Symbol.asyncIterator]() { - let done = false; - return { - next: () => { - if (done) { - return Promise.resolve({ done: true, value: undefined }); - } - done = true; - return Promise.resolve({ - done: false, - value: { type: "finish-step", finishReason: "stop" }, - }); - }, - }; +const { + generateTextMock, + streamTextMock, + resolveMCPOptionMock, + stepCountIsMock, + toolMock, +} = vi.hoisted(() => { + const generateTextMock = vi.fn(() => + Promise.resolve({ + content: [{ type: "text", text: "generated" }], + files: [], + finishReason: "stop", + providerMetadata: undefined, + reasoning: [], + reasoningText: undefined, + response: { messages: [{ role: "assistant", content: "generated" }] }, + sources: [], + steps: [], + text: "generated", + toolCalls: [], + toolResults: [], + totalUsage: undefined, + usage: undefined, + warnings: undefined, + }) + ); + + const streamTextMock = vi.fn(() => { + const fullStream: AsyncIterable<{ finishReason: string; type: string }> = { + [Symbol.asyncIterator]() { + let done = false; + return { + next: () => { + if (done) { + return Promise.resolve({ done: true, value: undefined }); + } + done = true; + return Promise.resolve({ + done: false, + value: { type: "finish-step", finishReason: "stop" }, + }); }, }; - return { - finishReason: Promise.resolve("stop"), - fullStream, - response: Promise.resolve({ messages: [] }), - totalUsage: Promise.resolve(undefined), - usage: Promise.resolve(undefined), - }; - }); + }, + }; + return { + finishReason: Promise.resolve("stop"), + fullStream, + response: Promise.resolve({ messages: [] }), + totalUsage: Promise.resolve(undefined), + usage: Promise.resolve(undefined), + }; + }); - const resolveMCPOptionMock = vi.fn().mockResolvedValue({ - close: vi.fn().mockResolvedValue(undefined), - tools: { mcp_tool: {} }, - }); + const resolveMCPOptionMock = vi.fn().mockResolvedValue({ + close: vi.fn().mockResolvedValue(undefined), + tools: { mcp_tool: {} }, + }); - const stepCountIsMock = vi.fn(() => undefined); - const toolMock = vi.fn((config) => config); + const stepCountIsMock = vi.fn(() => undefined); + const toolMock = vi.fn((config) => config); - return { streamTextMock, resolveMCPOptionMock, stepCountIsMock, toolMock }; - }); + return { + generateTextMock, + streamTextMock, + resolveMCPOptionMock, + stepCountIsMock, + toolMock, + }; +}); vi.mock("ai", () => ({ + generateText: generateTextMock, stepCountIs: stepCountIsMock, streamText: streamTextMock, tool: toolMock, @@ -71,6 +102,29 @@ function getLastStreamTextCall() { return lastCall ? lastCall[0] : undefined; } +function getLastGenerateTextCall() { + const calls = generateTextMock.mock.calls as unknown as Array< + Array<{ stopWhen?: unknown }> + >; + if (calls.length === 0) { + throw new Error("Expected generateText to be called"); + } + const lastCall = calls.at(-1); + return lastCall ? lastCall[0] : undefined; +} + +function getGenerateStopWhen() { + const lastCall = getLastGenerateTextCall(); + if (!lastCall) { + throw new Error("Expected generateText call arguments"); + } + const stopWhen = lastCall.stopWhen; + if (!stopWhen) { + throw new Error("Expected generateText stopWhen to be defined"); + } + return stopWhen; +} + function getStopWhen() { const lastCall = getLastStreamTextCall(); if (!lastCall) { @@ -87,6 +141,7 @@ describe("createAgent", () => { beforeEach(() => { clearMCPCache(); resolveMCPOptionMock.mockClear(); + generateTextMock.mockClear(); streamTextMock.mockClear(); stepCountIsMock.mockClear(); toolMock.mockClear(); @@ -98,8 +153,10 @@ describe("createAgent", () => { expect(agent).toHaveProperty("config"); expect(agent).toHaveProperty("close"); + expect(agent).toHaveProperty("generate"); expect(agent).toHaveProperty("stream"); expect(typeof agent.close).toBe("function"); + expect(typeof agent.generate).toBe("function"); expect(typeof agent.stream).toBe("function"); }); @@ -124,6 +181,163 @@ describe("createAgent", () => { expect(agent.config.maxStepsPerTurn).toBeUndefined(); }); + it("generates a non-streaming turn through generateText", async () => { + const agent = await createAgent({ + instructions: "base-system", + model: createMockModel(), + }); + + const result = await agent.generate({ + messages: [{ role: "user", content: "Hello" }], + seed: 7, + temperature: 0, + }); + + expect(result.text).toBe("generated"); + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + messages: [{ role: "user", content: "Hello" }], + model: agent.config.model, + seed: 7, + system: "base-system", + temperature: 0, + tools: {}, + }) + ); + expect(streamTextMock).not.toHaveBeenCalled(); + }); + + it("applies streamDefaults before calling generateText", async () => { + const agent = await createAgent({ + model: createMockModel(), + streamDefaults: { + providerOptions: { openai: { parallelToolCalls: false } }, + seed: 11, + temperature: 0.2, + }, + }); + + await agent.generate({ messages: [] }); + + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + providerOptions: { openai: { parallelToolCalls: false } }, + seed: 11, + temperature: 0.2, + }) + ); + }); + + it("resolves async instructions before calling generateText", async () => { + const instructions = vi.fn().mockResolvedValue("dynamic-system"); + const agent = await createAgent({ + instructions, + model: createMockModel(), + }); + + await agent.generate({ messages: [] }); + + expect(instructions).toHaveBeenCalledOnce(); + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + system: "dynamic-system", + }) + ); + }); + + it("keeps explicit generate system above async instructions", async () => { + const instructions = vi.fn().mockResolvedValue("dynamic-system"); + const agent = await createAgent({ + instructions, + model: createMockModel(), + }); + + await agent.generate({ messages: [], system: "call-system" }); + + expect(instructions).not.toHaveBeenCalled(); + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + system: "call-system", + }) + ); + }); + + it("lets prepareStep rewrite generate options before invoking generateText", async () => { + const prepareStep = vi.fn(({ system }) => ({ + messages: [{ role: "system", content: "prepared" }], + providerOptions: { anthropic: { cacheControl: { type: "ephemeral" } } }, + system: `${system ?? ""}-next`, + })); + const agent = await createAgent({ + model: createMockModel(), + instructions: "base-system", + prepareStep, + }); + + await agent.generate({ + messages: [{ role: "user", content: "Hello" }], + providerOptions: { openai: { parallelToolCalls: false } }, + }); + + expect(prepareStep).toHaveBeenCalledWith( + expect.objectContaining({ + messages: [{ role: "user", content: "Hello" }], + system: "base-system", + }) + ); + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + messages: [{ role: "system", content: "prepared" }], + providerOptions: { + openai: { parallelToolCalls: false }, + anthropic: { cacheControl: { type: "ephemeral" } }, + }, + system: "base-system-next", + }) + ); + }); + + it("uses MCP-resolved tools for generateText", async () => { + const mcpTools = { mcp_tool: {} }; + resolveMCPOptionMock.mockResolvedValueOnce({ + close: vi.fn().mockResolvedValue(undefined), + tools: mcpTools, + }); + const agent = await createAgent({ model: createMockModel(), mcp: true }); + + await agent.generate({ messages: [] }); + + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ tools: mcpTools }) + ); + }); + + it("passes stopWhen guardrails through generateText", async () => { + const extraStopCondition = vi.fn(() => false); + const agent = await createAgent({ + extraStopConditions: [extraStopCondition], + guardrails: { maxToolCallsPerTurn: 5 }, + maxStepsPerTurn: 2, + model: createMockModel(), + }); + + await agent.generate({ messages: [] }); + + expect(generateTextMock).toHaveBeenCalledWith( + expect.objectContaining({ + stopWhen: expect.any(Array), + }) + ); + const stopWhen = getGenerateStopWhen(); + expect(stopWhen).toHaveLength(3); + expect( + stopWhen[1]({ + steps: [{ toolCalls: [] }, { toolCalls: [] }], + }) + ).toBe(true); + expect(stopWhen[2]).toBe(extraStopCondition); + }); + it("uses text-response stop condition when maxStepsPerTurn is omitted and guardrails are not set", async () => { const agent = await createAgent({ model: createMockModel() }); diff --git a/packages/harness/src/agent.ts b/packages/harness/src/agent.ts index a3d683e..ed0e6cb 100644 --- a/packages/harness/src/agent.ts +++ b/packages/harness/src/agent.ts @@ -3,18 +3,20 @@ * Core agent factory for the harness package. */ -import { streamText, tool } from "ai"; +import { generateText, streamText, tool } from "ai"; import { AgentError, AgentErrorCode } from "./errors"; import type { AgentExecutionContext } from "./execution-context"; import type { ToolDefinition, ToolSource } from "./tool-source"; import type { - Agent, AgentConfig, + AgentGenerateOptions, + AgentGenerateResult, AgentGuardrails, AgentPrepareStepContext, AgentPrepareStepResult, AgentStreamOptions, AgentStreamResult, + GeneratingAgent, ToolCallPart, } from "./types"; @@ -142,6 +144,33 @@ const createStreamTextResult = ( experimental_repairToolCall: config.experimental_repairToolCall, }); +const createGenerateTextResult = ( + config: AgentConfig, + preparedOptions: AgentGenerateOptions +) => + generateText({ + model: config.model, + tools: config.tools, + system: preparedOptions.system, + messages: preparedOptions.messages, + providerOptions: preparedOptions.providerOptions, + maxOutputTokens: preparedOptions.maxOutputTokens, + seed: preparedOptions.seed, + stopWhen: [ + config.guardrails + ? createGuardedStopCondition(config.guardrails) + : textResponseReceived(), + ...(config.maxStepsPerTurn === undefined + ? [] + : [createStepCountStopCondition(config.maxStepsPerTurn)]), + ...(config.extraStopConditions ?? []), + ], + temperature: preparedOptions.temperature, + abortSignal: preparedOptions.abortSignal, + experimental_context: preparedOptions.experimentalContext, + experimental_repairToolCall: config.experimental_repairToolCall, + }); + const serializeToolCall = ( toolCall: Pick ) => JSON.stringify({ input: toolCall.input, toolName: toolCall.toolName }); @@ -211,10 +240,10 @@ const createStepCountStopCondition = steps.length >= maxStepsPerTurn; /** - * Creates an {@link Agent} instance that wraps a Vercel AI SDK `streamText` call. + * Creates a harness agent instance that wraps Vercel AI SDK text generation. * * @param config - Agent configuration including model, tools, and instructions. - * @returns An `Agent` object with a `stream()` method for initiating a single turn. + * @returns A `GeneratingAgent` object with `stream()` and `generate()` methods for initiating a single turn. * * @example * ```typescript @@ -225,7 +254,9 @@ const createStepCountStopCondition = * }); * ``` */ -export async function createAgent(config: AgentConfig): Promise { +export async function createAgent( + config: AgentConfig +): Promise { let mergedTools = { ...(config.tools ?? {}), ...(await toToolSet(config.toolSources)), @@ -258,6 +289,29 @@ export async function createAgent(config: AgentConfig): Promise { return { config: effectiveConfig, close: closeFn, + /** + * Initiates a single non-streaming turn with the given messages. + * Returns the AI SDK `generateText` result after applying the same defaults, + * prepareStep overrides, tools, guardrails, and execution context as `stream()`. + */ + async generate(opts: AgentGenerateOptions): Promise { + const instructions = effectiveConfig.instructions; + const system = + opts.system ?? + effectiveConfig.streamDefaults?.system ?? + (typeof instructions === "function" + ? await instructions() + : instructions); + const baseOptions = buildBaseStreamOptions(effectiveConfig, { + ...opts, + system, + }); + const prepared = effectiveConfig.prepareStep?.(baseOptions); + return createGenerateTextResult( + effectiveConfig, + applyPreparedOverrides(baseOptions, prepared) + ); + }, /** * Initiates a single streaming turn with the given messages. * Returns a result object with `fullStream`, `finishReason`, and `response`. diff --git a/packages/harness/src/index.ts b/packages/harness/src/index.ts index 89e2b89..c897e4d 100644 --- a/packages/harness/src/index.ts +++ b/packages/harness/src/index.ts @@ -244,10 +244,13 @@ export { } from "./tool-stream-parts"; export type * from "./types"; export type { + Agent, AgentConfig, AgentGuardrails, AgentStreamOptions, AgentStreamResult, + GeneratingAgent, + LoopAgent, RunnableAgent, } from "./types"; export type { UsageMeasurement } from "./usage"; diff --git a/packages/harness/src/loop.test.ts b/packages/harness/src/loop.test.ts index 7d75a94..0b3a5d7 100644 --- a/packages/harness/src/loop.test.ts +++ b/packages/harness/src/loop.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it, vi } from "vitest"; import { AgentError, AgentErrorCode } from "./errors"; import { runAgentLoop } from "./loop"; -import type { Agent, AgentStreamResult } from "./types"; +import type { AgentStreamResult, LoopAgent } from "./types"; /** * Creates a mock agent that simulates streaming behavior. @@ -10,17 +10,18 @@ import type { Agent, AgentStreamResult } from "./types"; function createMockAgent( finishReasons: string[], options?: { + streamPartsPerIteration?: Record[][]; throwOnIteration?: number; toolCallsPerIteration?: Array< Array<{ toolName: string; args: Record }> >; } -): Agent { +): LoopAgent { let callIndex = 0; return { config: { - model: {} as Agent["config"]["model"], + model: {} as LoopAgent["config"]["model"], }, stream(): AgentStreamResult { const currentIndex = callIndex; @@ -33,9 +34,19 @@ function createMockAgent( const finishReason = finishReasons[currentIndex] ?? "stop"; const toolCallsForThisIteration = options?.toolCallsPerIteration?.[currentIndex] ?? []; + const streamPartsForThisIteration = + options?.streamPartsPerIteration?.[currentIndex]; // Simulate async iterator for fullStream async function* fullStreamGenerator() { + if (streamPartsForThisIteration) { + for (const part of streamPartsForThisIteration) { + await Promise.resolve(); + yield part; + } + return; + } + for (const call of toolCallsForThisIteration) { await Promise.resolve(); yield { @@ -181,8 +192,8 @@ describe("runAgentLoop", () => { it("applies onBeforeTurn overrides before streaming", async () => { const streamCalls: Array<{ system?: string }> = []; - const agent: Agent = { - config: { model: {} as Agent["config"]["model"] }, + const agent: LoopAgent = { + config: { model: {} as LoopAgent["config"]["model"] }, stream(opts): AgentStreamResult { streamCalls.push({ system: opts.system }); const fullStream: AsyncIterable = { @@ -218,8 +229,8 @@ describe("runAgentLoop", () => { system?: string; }> = []; - const agent: Agent = { - config: { model: {} as Agent["config"]["model"] }, + const agent: LoopAgent = { + config: { model: {} as LoopAgent["config"]["model"] }, stream(opts): AgentStreamResult { streamCalls.push({ experimentalContext: opts.experimentalContext as @@ -400,6 +411,196 @@ describe("runAgentLoop", () => { } }); + it("calls onStreamPart for every emitted stream part across iterations", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + streamPartsPerIteration: [ + [ + { type: "start" }, + { textDelta: "Thinking", type: "text-delta" }, + { + toolCallId: "call_0_lookup", + toolName: "lookup", + type: "tool-call", + }, + ], + [{ type: "start" }, { textDelta: "Done", type: "text-delta" }], + ], + }); + + const observed: Array<{ iteration: number; type: string }> = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onStreamPart: (part, context) => { + observed.push({ iteration: context.iteration, type: part.type }); + }, + }); + + expect(observed).toEqual([ + { iteration: 0, type: "start" }, + { iteration: 0, type: "text-delta" }, + { iteration: 0, type: "tool-call" }, + { iteration: 1, type: "start" }, + { iteration: 1, type: "text-delta" }, + ]); + }); + + it("emits buffered text before each tool call and resets between boundaries", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + streamPartsPerIteration: [ + [ + { textDelta: "I will ", type: "text-delta" }, + { textDelta: "look this up.", type: "text-delta" }, + { toolCallId: "call_search", toolName: "search", type: "tool-call" }, + { textDelta: " Then I will ", type: "text-delta" }, + { textDelta: "summarize.", type: "text-delta" }, + { + toolCallId: "call_summarize", + toolName: "summarize", + type: "tool-call", + }, + ], + [], + ], + }); + + const boundaries: Array<{ + iteration: number; + text: string; + toolName: string; + }> = []; + const toolCalls: string[] = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onTextBeforeToolCall: (text, boundary, context) => { + boundaries.push({ + iteration: context.iteration, + text, + toolName: boundary.toolName ?? "(unknown)", + }); + }, + onToolCall: (part) => { + toolCalls.push(part.toolName); + }, + }); + + expect(boundaries).toEqual([ + { + iteration: 0, + text: "I will look this up.", + toolName: "search", + }, + { + iteration: 0, + text: " Then I will summarize.", + toolName: "summarize", + }, + ]); + expect(toolCalls).toEqual(["search", "summarize"]); + }); + + it("flushes pre-tool text at early tool input boundaries without double emitting", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + streamPartsPerIteration: [ + [ + { textDelta: "Checking ", type: "text-delta" }, + { textDelta: "now", type: "text-delta" }, + { + toolCallId: "call_search", + toolName: "search", + type: "tool-input-start", + }, + { + input: { query: "weather" }, + toolCallId: "call_search", + toolName: "search", + type: "tool-input-end", + }, + { toolCallId: "call_search", toolName: "search", type: "tool-call" }, + ], + [], + ], + }); + + const boundaries: Array<{ text: string; type: string }> = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onTextBeforeToolCall: (text, boundary) => { + boundaries.push({ text, type: boundary.type }); + }, + }); + + expect(boundaries).toEqual([ + { text: "Checking now", type: "tool-input-start" }, + ]); + }); + + it("does not emit pre-tool text for no-text tool calls", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + streamPartsPerIteration: [ + [{ toolCallId: "call_search", toolName: "search", type: "tool-call" }], + [], + ], + }); + + const boundaries: string[] = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onTextBeforeToolCall: (text) => { + boundaries.push(text); + }, + }); + + expect(boundaries).toEqual([]); + }); + + it("isolates onStreamPart and onTextBeforeToolCall observer errors", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + streamPartsPerIteration: [ + [ + { textDelta: "Need data", type: "text-delta" }, + { toolCallId: "call_search", toolName: "search", type: "tool-call" }, + ], + [], + ], + }); + const consoleErrorSpy = vi + .spyOn(console, "error") + .mockImplementation(() => undefined); + + try { + const result = await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onStreamPart: () => { + throw new Error("stream observer bug"); + }, + onTextBeforeToolCall: () => { + throw new Error("pre-tool observer bug"); + }, + }); + + expect(result.iterations).toBe(2); + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.stringContaining("onStreamPart"), + expect.any(Error) + ); + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.stringContaining("onTextBeforeToolCall"), + expect.any(Error) + ); + } finally { + consoleErrorSpy.mockRestore(); + } + }); + it("calls onToolCall for each tool call", async () => { const agent = createMockAgent(["tool-calls", "stop"], { toolCallsPerIteration: [ @@ -435,10 +636,9 @@ describe("runAgentLoop", () => { toolName?: string; }> = []; - const agent: Agent = { - close: async () => undefined, + const agent: LoopAgent = { config: { - model: {} as Agent["config"]["model"], + model: {} as LoopAgent["config"]["model"], }, stream(): AgentStreamResult { async function* fullStreamGenerator() { diff --git a/packages/harness/src/loop.ts b/packages/harness/src/loop.ts index b5c40f8..13c65d9 100644 --- a/packages/harness/src/loop.ts +++ b/packages/harness/src/loop.ts @@ -12,11 +12,66 @@ import { getToolLifecycleState } from "./tool-stream-parts"; import type { AgentFinishReason, AgentStreamOptions, + AgentStreamPart, + AgentToolBoundaryPart, + AgentToolTextBoundary, LoopContinueContext, RunAgentLoopOptions, RunAgentLoopResult, } from "./types"; +const getTextDelta = (part: AgentStreamPart): string | undefined => { + if ( + part && + typeof part === "object" && + "type" in part && + part.type === "text-delta" + ) { + if ("textDelta" in part && typeof part.textDelta === "string") { + return part.textDelta; + } + if ("text" in part && typeof part.text === "string") { + return part.text; + } + if ("delta" in part && typeof part.delta === "string") { + return part.delta; + } + } + + return; +}; + +const isToolBoundaryPart = ( + part: AgentStreamPart +): part is AgentToolBoundaryPart => + part.type === "tool-input-start" || + part.type === "tool-input-end" || + part.type === "tool-call"; + +const isStepBoundaryPart = (part: AgentStreamPart): boolean => + part.type === "start-step" || part.type === "finish-step"; + +const getStringField = ( + part: AgentToolBoundaryPart, + field: "id" | "toolCallId" | "toolName" +): string | undefined => { + const value = (part as Record)[field]; + if (typeof value === "string") { + return value; + } + + return; +}; + +const createToolTextBoundary = ( + part: AgentToolBoundaryPart +): AgentToolTextBoundary => ({ + part, + toolCallId: getStringField(part, "toolCallId") ?? getStringField(part, "id"), + toolName: getStringField(part, "toolName"), + type: part.type, +}); + async function invokeObserverHook( hook: ((...args: Args) => void | Promise) | undefined, hookName: string, @@ -62,7 +117,9 @@ export async function runAgentLoop( onFirstStreamPart, onInterrupt, onStepComplete, + onStreamPart, onStreamStart, + onTextBeforeToolCall, onToolCall, onToolLifecycle, } = options; @@ -118,8 +175,11 @@ export async function runAgentLoop( await invokeObserverHook(onStreamStart, "onStreamStart", context); let firstPartSeen = false; + let textBeforeToolCall = ""; for await (const part of stream.fullStream) { + await invokeObserverHook(onStreamPart, "onStreamPart", part, context); + if (!firstPartSeen) { firstPartSeen = true; await invokeObserverHook( @@ -130,6 +190,25 @@ export async function runAgentLoop( ); } + const textDelta = getTextDelta(part); + if (textDelta) { + textBeforeToolCall += textDelta; + } + + if (isStepBoundaryPart(part)) { + textBeforeToolCall = ""; + } else if (isToolBoundaryPart(part) && textBeforeToolCall) { + const bufferedText = textBeforeToolCall; + textBeforeToolCall = ""; + await invokeObserverHook( + onTextBeforeToolCall, + "onTextBeforeToolCall", + bufferedText, + createToolTextBoundary(part), + context + ); + } + const lifecycle = getToolLifecycleState( part as { toolCallId?: string; toolName?: string; type: string } ); diff --git a/packages/harness/src/runtime/agent-session.ts b/packages/harness/src/runtime/agent-session.ts index cd8785b..605ec83 100644 --- a/packages/harness/src/runtime/agent-session.ts +++ b/packages/harness/src/runtime/agent-session.ts @@ -4,7 +4,7 @@ import { runAgentLoop } from "../loop"; import type { SessionManager } from "../session"; import type { SkillInfo } from "../skills"; import type { SnapshotStore } from "../snapshot-store"; -import type { Agent, AgentConfig, RunnableAgent } from "../types"; +import type { AgentConfig, LoopAgent, RunnableAgent } from "../types"; import { createRuntimeUUID } from "../uuid"; import type { AgentHistoryConfig, @@ -35,8 +35,11 @@ export interface CreateAgentSessionParams { snapshotStore?: SnapshotStore; } -function isAgent(value: RunnableAgent): value is Agent { - return "config" in value; +function isLoopAgent(value: RunnableAgent): value is LoopAgent { + return ( + "config" in value && + typeof (value as { stream?: unknown }).stream === "function" + ); } function makeDefineAgentContext(params: { @@ -303,10 +306,10 @@ class AgentSessionImpl }); } - private getLoopAgent(): Agent { - if (!isAgent(this.currentRuntimeAgent)) { + private getLoopAgent(): LoopAgent { + if (!isLoopAgent(this.currentRuntimeAgent)) { throw new TypeError( - "AgentSession runtimeAgent must be a full Agent with config to run turns" + "AgentSession runtimeAgent must include config and stream() to run turns" ); } diff --git a/packages/harness/src/types.ts b/packages/harness/src/types.ts index 73661d4..368d5dd 100644 --- a/packages/harness/src/types.ts +++ b/packages/harness/src/types.ts @@ -4,6 +4,7 @@ */ import type { + generateText, LanguageModel, ModelMessage, streamText, @@ -29,9 +30,34 @@ export type { ToolSet, } from "ai"; +type CoreGenerateResult = Awaited>; type CoreStreamResult = ReturnType; type StreamTextOptions = Parameters[0]; +/** A single part emitted by `Agent.stream().fullStream`. */ +export type AgentStreamPart = + CoreStreamResult["fullStream"] extends AsyncIterable + ? Part + : never; + +/** Stream parts that indicate a tool boundary in the AI SDK stream. */ +export type AgentToolBoundaryPart = Extract< + AgentStreamPart, + { type: "tool-call" | "tool-input-end" | "tool-input-start" } +>; + +/** + * Harness-normalized boundary metadata emitted with text buffered before a + * tool boundary. `toolName` is optional because some AI SDK boundary parts + * (notably `tool-input-end`) only carry a tool-call id. + */ +export interface AgentToolTextBoundary { + part: AgentToolBoundaryPart; + toolCallId?: string; + toolName?: string; + type: AgentToolBoundaryPart["type"]; +} + export type AgentInstructions = string | (() => Promise); export interface AgentGuardrails { @@ -78,11 +104,15 @@ export interface AgentConfig { } /** An agent instance returned by {@link createAgent}. */ -export interface Agent { +export interface Agent extends LoopAgent { /** Release MCP connections and resources. Safe to call multiple times (idempotent). No-op if no MCP was configured. */ close(): Promise; - config: AgentConfig; - stream(opts: AgentStreamOptions): AgentStreamResult; +} + +/** A full agent returned by {@link createAgent}, including non-streaming generation. */ +export interface GeneratingAgent extends Agent { + /** Initiate a single non-streaming turn using Vercel AI SDK `generateText`. */ + generate(opts: AgentGenerateOptions): Promise; } /** Shared runtime stream surface consumed by shell packages. */ @@ -92,6 +122,12 @@ export interface RunnableAgent { ): AgentStreamResult | Promise; } +/** Minimal configured agent surface required by {@link runAgentLoop}. */ +export interface LoopAgent { + config: AgentConfig; + stream(opts: AgentStreamOptions): AgentStreamResult; +} + /** Options passed to {@link Agent.stream} for a single turn. */ export interface AgentStreamOptions { abortSignal?: AbortSignal; @@ -106,6 +142,8 @@ export interface AgentStreamOptions { export interface BeforeTurnResult extends Partial {} +export interface AgentGenerateOptions extends AgentStreamOptions {} + export interface AgentStreamDefaults extends Omit, "abortSignal" | "messages"> {} @@ -115,6 +153,9 @@ export interface AgentPrepareStepContext extends AgentStreamOptions { export interface AgentPrepareStepResult extends Partial {} +/** Result of a single non-streaming turn from {@link Agent.generate}. */ +export type AgentGenerateResult = CoreGenerateResult; + /** Result of a single streaming turn from {@link Agent.stream}. */ export interface AgentStreamResult { /** Promise resolving to the finish reason. Await: `const reason = await result.finishReason` */ @@ -190,6 +231,17 @@ export interface LoopHooks { context: LoopContinueContext ) => BeforeTurnResult | Promise | undefined; onStepComplete?: (step: LoopStepInfo) => void | Promise; + /** + * Observer-only hook fired for every part emitted by + * `Agent.stream().fullStream`, before more specific stream hooks such as + * `onFirstStreamPart`, `onToolLifecycle`, and `onToolCall`. + * + * Errors thrown from this callback are logged and swallowed. + */ + onStreamPart?: ( + part: AgentStreamPart, + context: LoopContinueContext + ) => void | Promise; /** * Fires immediately after {@link Agent.stream} is invoked and before the * `fullStream` iteration begins. This is the closest hook to "LLM request @@ -208,6 +260,21 @@ export interface LoopHooks { * with a `phase` argument. */ onStreamStart?: (context: LoopContinueContext) => void | Promise; + /** + * Observer-only hook fired when visible assistant text has been buffered + * before the next tool boundary in the same loop iteration. This lets chat + * surfaces send a generic "I'll look that up" style acknowledgement before + * the tool actually runs, without hard-coding a weather/search/etc. branch. + * + * Errors thrown from this callback are logged and swallowed. The callback is + * still awaited inline, so keep it fast to avoid delaying stream consumption + * and tool execution. + */ + onTextBeforeToolCall?: ( + text: string, + boundary: AgentToolTextBoundary, + context: LoopContinueContext + ) => void | Promise; onToolCall?: ( call: ToolCallPart, context: LoopContinueContext @@ -224,7 +291,7 @@ export interface LoopHooks { /** Options for {@link runAgentLoop}. */ export interface RunAgentLoopOptions extends LoopHooks { abortSignal?: AbortSignal; - agent: Agent; + agent: LoopAgent; maxIterations?: number; messages: ModelMessage[]; }