diff --git a/src/flows.ts b/src/flows.ts index 5fc1c05..41ac175 100644 --- a/src/flows.ts +++ b/src/flows.ts @@ -1,10 +1,21 @@ export { FlowRunner } from "./flows/runtime.js"; -export { acp, action, checkpoint, compute, defineFlow, shell } from "./flows/definition.js"; +export { + acp, + action, + checkpoint, + compute, + decision, + defineFlow, + shell, +} from "./flows/definition.js"; export type { AcpNodeDefinition, ActionNodeDefinition, CheckpointNodeDefinition, ComputeNodeDefinition, + DecisionNodeDefinition, + DecisionResolverInput, + DecisionResult, FlowDefinition, FlowEdge, FlowNodeCommon, diff --git a/src/flows/definition.ts b/src/flows/definition.ts index e92d18a..2da07f0 100644 --- a/src/flows/definition.ts +++ b/src/flows/definition.ts @@ -3,6 +3,7 @@ import type { ActionNodeDefinition, CheckpointNodeDefinition, ComputeNodeDefinition, + DecisionNodeDefinition, FlowDefinition, FunctionActionNodeDefinition, ShellActionNodeDefinition, @@ -62,3 +63,12 @@ export function checkpoint( ...definition, }; } + +export function decision( + definition: Omit, +): DecisionNodeDefinition { + return { + nodeType: "decision", + ...definition, + }; +} diff --git a/src/flows/runtime.ts b/src/flows/runtime.ts index 047a6b1..478c493 100644 --- a/src/flows/runtime.ts +++ b/src/flows/runtime.ts @@ -26,15 +26,20 @@ import { } from "../session.js"; import { SESSION_RECORD_SCHEMA } from "../types.js"; import type { PromptInput, SessionRecord } from "../types.js"; -import { acp, action, checkpoint, compute, defineFlow, shell } from "./definition.js"; +import { acp, action, checkpoint, compute, decision, defineFlow, shell } from "./definition.js"; import { formatShellActionSummary, runShellAction } from "./executors/shell.js"; import { resolveNext, resolveNextForOutcome, validateFlowDefinition } from "./graph.js"; +import { extractJsonObject } from "./json.js"; import { FlowRunStore } from "./store.js"; import type { AcpNodeDefinition, ActionNodeDefinition, CheckpointNodeDefinition, ComputeNodeDefinition, + DecisionNodeDefinition, + DecisionResolverInput, + DecisionResult, + FlowArtifactRef, FlowDefinition, FlowNodeCommon, FlowNodeContext, @@ -56,12 +61,15 @@ import type { ShellActionResult, } from "./types.js"; -export { acp, action, checkpoint, compute, defineFlow, shell }; +export { acp, action, checkpoint, compute, decision, defineFlow, shell }; export type { AcpNodeDefinition, ActionNodeDefinition, CheckpointNodeDefinition, ComputeNodeDefinition, + DecisionNodeDefinition, + DecisionResolverInput, + DecisionResult, FlowDefinition, FlowEdge, FlowNodeCommon, @@ -112,6 +120,7 @@ type TracedPromptResult = { export class FlowRunner { private readonly resolveAgent; + private readonly resolveDecision?; private readonly defaultCwd; private readonly permissionMode; private readonly mcpServers?; @@ -129,6 +138,7 @@ export class FlowRunner { constructor(options: FlowRunnerOptions) { this.resolveAgent = options.resolveAgent; + this.resolveDecision = options.resolveDecision; this.defaultCwd = options.resolveAgent(undefined).cwd; this.permissionMode = options.permissionMode; this.mcpServers = options.mcpServers; @@ -432,6 +442,8 @@ export class FlowRunner { return await this.executeCheckpointNode(runDir, state, nodeId, node, context); case "acp": return await this.executeAcpNode(runDir, state, flow, node, context); + case "decision": + return await this.executeDecisionNode(runDir, state, flow, node, context); default: { const exhaustive: never = node; throw new Error(`Unsupported flow node: ${String(exhaustive)}`); @@ -464,6 +476,101 @@ export class FlowRunner { }; } + private async executeDecisionNode( + runDir: string, + state: FlowRunState, + flow: FlowDefinition, + node: DecisionNodeDefinition, + context: FlowNodeContext, + ): Promise { + const nodeTimeoutMs = node.timeoutMs ?? this.defaultNodeTimeoutMs; + + return await this.runWithHeartbeat( + runDir, + state, + state.currentNode ?? "", + node, + nodeTimeoutMs, + async () => { + const promptText = await Promise.resolve(node.prompt(context)); + const optionKeys = Object.keys(node.options).join(", "); + this.updateStatusDetail(state, node.statusDetail ?? `Deciding: ${optionKeys}`); + + if (this.resolveDecision) { + const result = await this.resolveDecision({ + prompt: promptText, + options: node.options, + model: node.model, + }); + validateDecisionResult(result, node.options); + return { + output: result, + promptText, + rawText: JSON.stringify(result), + sessionInfo: null, + agentInfo: null, + trace: null, + }; + } + + // Fallback: use an isolated ACP agent session with a structured prompt + const resolvedAgent = this.resolveAgent(node.profile); + const agentInfo = { ...resolvedAgent }; + + const optionLines = Object.entries(node.options) + .map(([id, desc]) => ` - "${id}": ${desc}`) + .join("\n"); + + const acpPrompt = [ + promptText, + "", + "Choose exactly one of the following options:", + optionLines, + "", + "Respond with ONLY a JSON object in this exact format:", + '{"choice": "", "reasoning": ""}', + "", + "Do not include any other text.", + ].join("\n"); + + const promptArtifact = await this.store.writeArtifact(runDir, state, acpPrompt, { + mediaType: "text/plain", + extension: "txt", + nodeId: state.currentNode, + attemptId: state.currentAttemptId, + }); + + const { rawText, binding, trace } = await this.runIsolatedAcpPromptWithTrace( + runDir, + state, + flow, + node.profile, + agentInfo, + normalizePromptInput(acpPrompt), + promptArtifact, + nodeTimeoutMs, + ); + + let parsed: DecisionResult; + try { + parsed = extractJsonObject(rawText) as DecisionResult; + validateDecisionResult(parsed, node.options); + } catch (error) { + throw attachStepTrace(error, trace); + } + + return { + output: parsed, + promptText: acpPrompt, + rawText, + sessionInfo: binding, + agentInfo, + trace, + }; + }, + ); + } + private async executeActionNode( runDir: string, state: FlowRunState, @@ -672,89 +779,27 @@ export class FlowRunner { }); if (node.session?.isolated) { - const isolatedBinding = createIsolatedSessionBinding( - flow.name, - state.runId, - state.currentAttemptId ?? randomUUID(), - node.profile, - agentInfo, - ); - const initialIsolatedRecord = createSyntheticSessionRecord({ - binding: isolatedBinding, - createdAt: state.currentNodeStartedAt ?? isoNow(), - updatedAt: state.currentNodeStartedAt ?? isoNow(), - conversation: createSessionConversation(state.currentNodeStartedAt ?? isoNow()), - acpxState: undefined, - lastSeq: 0, - }); - await this.store.ensureSessionBundle( - runDir, - state, - isolatedBinding, - initialIsolatedRecord, - ); - await this.store.appendTrace(runDir, state, { - scope: "acp", - type: "acp_prompt_prepared", - nodeId: state.currentNode, - attemptId: state.currentAttemptId, - sessionId: isolatedBinding.bundleId, - payload: { - sessionId: isolatedBinding.bundleId, - promptArtifact, - }, - }); - const isolatedPrompt = await this.runIsolatedPrompt( + const { rawText, binding, trace } = await this.runIsolatedAcpPromptWithTrace( runDir, state, - isolatedBinding, + flow, + node.profile, agentInfo, prompt, + promptArtifact, nodeTimeoutMs, ); - const rawResponseArtifact = await this.store.writeArtifact( - runDir, - state, - isolatedPrompt.rawText, - { - mediaType: "text/plain", - extension: "txt", - nodeId: state.currentNode, - attemptId: state.currentAttemptId, - sessionId: isolatedBinding.bundleId, - }, - ); - await this.store.appendTrace(runDir, state, { - scope: "acp", - type: "acp_response_parsed", - nodeId: state.currentNode, - attemptId: state.currentAttemptId, - sessionId: isolatedBinding.bundleId, - payload: { - sessionId: isolatedBinding.bundleId, - conversation: isolatedPrompt.conversation, - rawResponseArtifact, - }, - }); - const trace: FlowStepTrace = { - sessionId: isolatedBinding.bundleId, - promptArtifact, - rawResponseArtifact, - conversation: isolatedPrompt.conversation, - }; let parsedOutput: unknown; try { - parsedOutput = node.parse - ? await node.parse(isolatedPrompt.rawText, context) - : isolatedPrompt.rawText; + parsedOutput = node.parse ? await node.parse(rawText, context) : rawText; } catch (error) { throw attachStepTrace(error, trace); } return { output: parsedOutput, promptText, - rawText: isolatedPrompt.rawText, - sessionInfo: isolatedBinding, + rawText, + sessionInfo: binding, agentInfo, trace, }; @@ -1107,6 +1152,95 @@ export class FlowRunner { ); } + private async runIsolatedAcpPromptWithTrace( + runDir: string, + state: FlowRunState, + flow: FlowDefinition, + profile: string | undefined, + agentInfo: ResolvedFlowAgent, + prompt: PromptInput, + promptArtifact: FlowArtifactRef, + nodeTimeoutMs: number | undefined, + ): Promise<{ + rawText: string; + binding: FlowSessionBinding; + rawResponseArtifact: FlowArtifactRef; + trace: FlowStepTrace; + }> { + const isolatedBinding = createIsolatedSessionBinding( + flow.name, + state.runId, + state.currentAttemptId ?? randomUUID(), + profile, + agentInfo, + ); + const initialIsolatedRecord = createSyntheticSessionRecord({ + binding: isolatedBinding, + createdAt: state.currentNodeStartedAt ?? isoNow(), + updatedAt: state.currentNodeStartedAt ?? isoNow(), + conversation: createSessionConversation(state.currentNodeStartedAt ?? isoNow()), + acpxState: undefined, + lastSeq: 0, + }); + await this.store.ensureSessionBundle(runDir, state, isolatedBinding, initialIsolatedRecord); + await this.store.appendTrace(runDir, state, { + scope: "acp", + type: "acp_prompt_prepared", + nodeId: state.currentNode, + attemptId: state.currentAttemptId, + sessionId: isolatedBinding.bundleId, + payload: { sessionId: isolatedBinding.bundleId, promptArtifact }, + }); + + const isolatedPrompt = await this.runIsolatedPrompt( + runDir, + state, + isolatedBinding, + agentInfo, + prompt, + nodeTimeoutMs, + ); + + const rawResponseArtifact = await this.store.writeArtifact( + runDir, + state, + isolatedPrompt.rawText, + { + mediaType: "text/plain", + extension: "txt", + nodeId: state.currentNode, + attemptId: state.currentAttemptId, + sessionId: isolatedBinding.bundleId, + }, + ); + await this.store.appendTrace(runDir, state, { + scope: "acp", + type: "acp_response_parsed", + nodeId: state.currentNode, + attemptId: state.currentAttemptId, + sessionId: isolatedBinding.bundleId, + payload: { + sessionId: isolatedBinding.bundleId, + conversation: isolatedPrompt.conversation, + rawResponseArtifact, + }, + }); + + const trace: FlowStepTrace = { + sessionId: isolatedBinding.bundleId, + promptArtifact, + rawResponseArtifact, + conversation: isolatedPrompt.conversation, + }; + + return { + rawText: isolatedPrompt.rawText, + binding: isolatedBinding, + rawResponseArtifact, + trace, + }; + } + private async runIsolatedPrompt( runDir: string, state: FlowRunState, @@ -1421,6 +1555,33 @@ function extractAttachedStepTrace(error: unknown): FlowStepTrace | null | undefi return (error as Error & { flowStepTrace?: FlowStepTrace | null }).flowStepTrace; } +function validateDecisionResult( + result: unknown, + options: Record, +): asserts result is DecisionResult { + if ( + result == null || + typeof result !== "object" || + typeof (result as DecisionResult).choice !== "string" + ) { + throw new Error( + `Decision result must have shape { choice: string, reasoning?: string }, got: ${JSON.stringify(result)}`, + ); + } + const reasoning = (result as Record).reasoning; + if (reasoning !== undefined && typeof reasoning !== "string") { + throw new Error( + `Decision result reasoning must be a string if provided, got: ${JSON.stringify(result)}`, + ); + } + const choice = (result as DecisionResult).choice; + if (!(choice in options)) { + throw new Error( + `Decision choice "${choice}" is not one of the valid options: ${Object.keys(options).join(", ")}`, + ); + } +} + function toInlineOutput(value: unknown): undefined | null | boolean | number | string | object { if (value == null || typeof value === "number" || typeof value === "boolean") { return value; diff --git a/src/flows/store.ts b/src/flows/store.ts index cc41bbd..f16f62b 100644 --- a/src/flows/store.ts +++ b/src/flows/store.ts @@ -452,6 +452,14 @@ function snapshotNode(node: FlowNodeDefinition) { ...(node.summary ? { summary: node.summary } : {}), hasRun: typeof node.run === "function", }; + case "decision": + return { + ...common, + options: structuredClone(node.options), + ...(node.profile ? { profile: node.profile } : {}), + ...(node.model ? { model: node.model } : {}), + hasPrompt: true, + }; } } diff --git a/src/flows/types.ts b/src/flows/types.ts index 9ef4e13..d5ec3d1 100644 --- a/src/flows/types.ts +++ b/src/flows/types.ts @@ -108,11 +108,31 @@ export type CheckpointNodeDefinition = FlowNodeCommon & { run?: (context: FlowNodeContext) => MaybePromise; }; +export type DecisionNodeDefinition = FlowNodeCommon & { + nodeType: "decision"; + prompt: (context: FlowNodeContext) => MaybePromise; + options: Record; + model?: string; + profile?: string; +}; + +export type DecisionResolverInput = { + prompt: string; + options: Record; + model?: string; +}; + +export type DecisionResult = { + choice: string; + reasoning?: string; +}; + export type FlowNodeDefinition = | AcpNodeDefinition | ComputeNodeDefinition | ActionNodeDefinition - | CheckpointNodeDefinition; + | CheckpointNodeDefinition + | DecisionNodeDefinition; export type FlowPermissionRequirements = { requiredMode: PermissionMode; @@ -132,6 +152,7 @@ export type FlowDefinition = { export type FlowNodeSnapshot = FlowNodeCommon & { nodeType: FlowNodeDefinition["nodeType"]; profile?: string; + model?: string; session?: { handle?: string; isolated?: boolean; @@ -141,6 +162,7 @@ export type FlowNodeSnapshot = FlowNodeCommon & { value?: string; }; summary?: string; + options?: Record; actionExecution?: "function" | "shell"; hasPrompt?: boolean; hasParse?: boolean; @@ -339,6 +361,7 @@ export type ResolvedFlowAgent = { export type FlowRunnerOptions = { resolveAgent: (profile?: string) => ResolvedFlowAgent; + resolveDecision?: (input: DecisionResolverInput) => Promise; permissionMode: PermissionMode; mcpServers?: McpServer[]; nonInteractivePermissions?: NonInteractivePermissionPolicy; diff --git a/test/flows.test.ts b/test/flows.test.ts index 57ffff0..2243a2b 100644 --- a/test/flows.test.ts +++ b/test/flows.test.ts @@ -11,6 +11,7 @@ import { action, checkpoint, compute, + decision, defineFlow, shell, } from "../src/flows/runtime.js"; @@ -1203,6 +1204,287 @@ test("FlowRunner stores successful node results separately from outputs", async }); }); +test("FlowRunner executes decision node with resolveDecision and routes via switch edge", async () => { + await withTempHome(async () => { + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-")); + try { + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd: outputRoot, + }), + permissionMode: "approve-all", + resolveDecision: async ({ options }) => { + return { choice: Object.keys(options)[0]!, reasoning: "test pick" }; + }, + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-route-test", + startAt: "decide", + nodes: { + decide: decision({ + prompt: (ctx) => `Evaluate tone for ${(ctx.input as { audience: string }).audience}`, + options: { + good: "Tone is appropriate", + too_formal: "Too formal", + too_casual: "Too casual", + }, + }), + good_path: compute({ run: () => ({ result: "good" }) }), + formal_path: compute({ run: () => ({ result: "formal" }) }), + casual_path: compute({ run: () => ({ result: "casual" }) }), + }, + edges: [ + { + from: "decide", + switch: { + on: "$.choice", + cases: { + good: "good_path", + too_formal: "formal_path", + too_casual: "casual_path", + }, + }, + }, + ], + }); + + const result = await runner.run(flow, { audience: "engineers" }); + assert.equal(result.state.status, "completed"); + assert.deepEqual(result.state.outputs.good_path, { result: "good" }); + assert.equal(result.state.outputs.formal_path, undefined); + assert.equal(result.state.outputs.casual_path, undefined); + const decisionOutput = result.state.outputs.decide as { + choice: string; + reasoning: string; + }; + assert.equal(decisionOutput.choice, "good"); + assert.equal(decisionOutput.reasoning, "test pick"); + } finally { + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + +test("FlowRunner decision node rejects invalid choice not in options", async () => { + await withTempHome(async () => { + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-invalid-")); + try { + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd: outputRoot, + }), + permissionMode: "approve-all", + resolveDecision: async () => ({ + choice: "nonexistent", + reasoning: "bad choice", + }), + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-invalid-test", + startAt: "decide", + nodes: { + decide: decision({ + prompt: () => "Pick one", + options: { a: "Option A", b: "Option B" }, + }), + }, + edges: [], + }); + + await assert.rejects(async () => await runner.run(flow, {}), /not one of the valid options/); + + const runDir = await waitForRunDir(outputRoot, "decision-invalid-test"); + const state = await readRunJson(runDir); + assert.equal(state.status, "failed"); + assert.match(String(state.error), /not one of the valid options/); + } finally { + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + +test("FlowRunner decision node prompt receives upstream outputs in context", async () => { + await withTempHome(async () => { + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-ctx-")); + try { + let capturedPrompt = ""; + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd: outputRoot, + }), + permissionMode: "approve-all", + resolveDecision: async ({ prompt }) => { + capturedPrompt = prompt; + return { choice: "yes", reasoning: "looks good" }; + }, + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-context-test", + startAt: "prep", + nodes: { + prep: compute({ run: () => ({ summary: "all good" }) }), + decide: decision({ + prompt: (ctx) => `Review: ${(ctx.outputs.prep as { summary: string }).summary}`, + options: { yes: "Approved", no: "Rejected" }, + }), + }, + edges: [{ from: "prep", to: "decide" }], + }); + + const result = await runner.run(flow, {}); + assert.equal(result.state.status, "completed"); + assert.equal(capturedPrompt, "Review: all good"); + } finally { + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + +test("FlowRunner decision node accepts result without reasoning field", async () => { + await withTempHome(async () => { + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-no-reason-")); + try { + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd: outputRoot, + }), + permissionMode: "approve-all", + resolveDecision: async () => ({ choice: "a" }) as never, + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-no-reason-test", + startAt: "decide", + nodes: { + decide: decision({ + prompt: () => "Pick", + options: { a: "A", b: "B" }, + }), + }, + edges: [], + }); + + const result = await runner.run(flow, {}); + assert.equal(result.state.status, "completed"); + assert.equal((result.state.outputs.decide as { choice: string }).choice, "a"); + } finally { + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + +test("FlowRunner decision node rejects non-string reasoning", async () => { + await withTempHome(async () => { + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-bad-reason-")); + try { + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd: outputRoot, + }), + permissionMode: "approve-all", + resolveDecision: async () => ({ choice: "a", reasoning: 42 }) as never, + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-bad-reason-test", + startAt: "decide", + nodes: { + decide: decision({ + prompt: () => "Pick", + options: { a: "A", b: "B" }, + }), + }, + edges: [], + }); + + await assert.rejects(async () => await runner.run(flow, {}), /reasoning must be a string/); + + const runDir = await waitForRunDir(outputRoot, "decision-bad-reason-test"); + const state = await readRunJson(runDir); + assert.equal(state.status, "failed"); + assert.match(String(state.error), /reasoning must be a string/); + } finally { + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + +test("FlowRunner decision node falls back to isolated ACP session when no resolveDecision", async () => { + await withTempHome(async () => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-acp-cwd-")); + const outputRoot = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-decision-acp-")); + try { + const runner = new FlowRunner({ + resolveAgent: () => ({ + agentName: "mock", + agentCommand: MOCK_AGENT_COMMAND, + cwd, + }), + permissionMode: "approve-all", + ttlMs: 1_000, + outputRoot, + }); + + const flow = defineFlow({ + name: "decision-acp-fallback", + startAt: "decide", + nodes: { + decide: decision({ + prompt: () => 'echo {"choice":"approve","reasoning":"looks good"}', + options: { approve: "Approved", reject: "Rejected" }, + }), + approved: compute({ run: () => ({ result: "approved" }) }), + rejected: compute({ run: () => ({ result: "rejected" }) }), + }, + edges: [ + { + from: "decide", + switch: { + on: "$.choice", + cases: { + approve: "approved", + reject: "rejected", + }, + }, + }, + ], + }); + + const result = await runner.run(flow, {}); + assert.equal(result.state.status, "completed"); + const decisionOutput = result.state.outputs.decide as { + choice: string; + reasoning: string; + }; + assert.equal(decisionOutput.choice, "approve"); + assert.equal(decisionOutput.reasoning, "looks good"); + assert.deepEqual(result.state.outputs.approved, { result: "approved" }); + assert.equal(result.state.outputs.rejected, undefined); + } finally { + await fs.rm(cwd, { recursive: true, force: true }); + await fs.rm(outputRoot, { recursive: true, force: true }); + } + }); +}); + async function withTempHome(run: (homeDir: string) => Promise): Promise { const previousHome = process.env.HOME; const previousQueueOwnerArgs = process.env.ACPX_QUEUE_OWNER_ARGS;