diff --git a/.changeset/computed-output-dispatch.md b/.changeset/computed-output-dispatch.md new file mode 100644 index 00000000..43162e00 --- /dev/null +++ b/.changeset/computed-output-dispatch.md @@ -0,0 +1,8 @@ +--- +"@stackables/bridge": patch +"@stackables/bridge-core": patch +"@stackables/bridge-parser": patch +"@stackables/bridge-compiler": patch +--- + +Bridge output array mappings now support computed root dispatch indices such as `o[item.index] <- source[] as item { ... }`. Runtime execution, streaming patches, serialization, and compiled execution all preserve the computed slot so mapped items land at the intended output positions. diff --git a/.changeset/trace-tool-metadata-fixes.md b/.changeset/trace-tool-metadata-fixes.md new file mode 100644 index 00000000..6be20f96 --- /dev/null +++ b/.changeset/trace-tool-metadata-fixes.md @@ -0,0 +1,7 @@ +--- +"@stackables/bridge": patch +"@stackables/bridge-core": patch +"@stackables/bridge-compiler": patch +--- + +Tool-level tracing metadata is now applied consistently across runtime and compiled execution. Internal helpers such as string-interpolation `concat` no longer emit trace entries when their metadata sets `trace: false`, and stream tools such as `httpSSE` now produce trace entries when tracing is enabled in normal `executeBridge()` execution. diff --git a/packages/bridge-compiler/src/codegen.ts b/packages/bridge-compiler/src/codegen.ts index 533475bc..1acc0082 100644 --- a/packages/bridge-compiler/src/codegen.ts +++ b/packages/bridge-compiler/src/codegen.ts @@ -242,6 +242,31 @@ function emitParsedConst(raw: string): string { } } +function parseToolTemplate( + raw: string, +): Array<{ kind: "text"; value: string } | { kind: "ref"; value: string }> { + const parts: Array< + { kind: "text"; value: string } | { kind: "ref"; value: string } + > = []; + const regex = /\{([^{}]+)\}/g; + let cursor = 0; + + for (const match of raw.matchAll(regex)) { + const index = match.index ?? 0; + if (index > cursor) { + parts.push({ kind: "text", value: raw.slice(cursor, index) }); + } + parts.push({ kind: "ref", value: match[1]!.trim() }); + cursor = index + match[0].length; + } + + if (cursor < raw.length) { + parts.push({ kind: "text", value: raw.slice(cursor) }); + } + + return parts; +} + // ── Code-generation context ───────────────────────────────────────────────── interface ToolInfo { @@ -709,6 +734,10 @@ class CodegenContext { ); lines.push(` const __batchQueues = new Map();`); lines.push(` const __trace = __opts?.__trace;`); + lines.push(` const __shouldTrace = (fn) => fn?.bridge?.trace !== false;`); + lines.push( + ` const __toolTemplatePart = (value) => value == null ? "" : String(value);`, + ); lines.push(` function __toolExecutionLogLevel(fn) {`); lines.push(` const log = fn?.bridge?.log;`); lines.push(` if (log === false || log == null) return false;`); @@ -848,8 +877,9 @@ class CodegenContext { ); lines.push(` const chunk = pending.slice(start, start + chunkSize);`); lines.push(` const inputs = chunk.map((item) => item.input);`); + lines.push(` const __doTrace = __trace && __shouldTrace(fn);`); lines.push( - ` const startTime = (__trace || __ctx.logger) ? performance.now() : 0;`, + ` const startTime = (__doTrace || __ctx.logger) ? performance.now() : 0;`, ); lines.push(` try {`); lines.push(` const batchPromise = fn(inputs, __ctx);`); @@ -867,7 +897,7 @@ class CodegenContext { lines.push(` result = await batchPromise;`); lines.push(` }`); lines.push( - ` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, result, null);`, + ` if (__doTrace) __trace(queue.toolName, startTime, performance.now(), inputs, result, null);`, ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); lines.push( @@ -884,7 +914,7 @@ class CodegenContext { ); lines.push(` } catch (err) {`); lines.push( - ` if (__trace) __trace(queue.toolName, startTime, performance.now(), inputs, null, err);`, + ` if (__doTrace) __trace(queue.toolName, startTime, performance.now(), inputs, null, err);`, ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); lines.push( @@ -897,14 +927,17 @@ class CodegenContext { // Sync tool caller — no await, no timeout, enforces no-promise return. lines.push(` function __callSync(fn, input, toolName) {`); lines.push(` if (__signal?.aborted) throw new __BridgeAbortError();`); - lines.push(` const start = __trace ? performance.now() : 0;`); + lines.push(` const __doTrace = __trace && __shouldTrace(fn);`); + lines.push( + ` const start = (__doTrace || __ctx.logger) ? performance.now() : 0;`, + ); lines.push(` try {`); lines.push(` const result = fn(input, __ctx);`); lines.push( ` if (result && typeof result.then === "function") throw new Error("Tool \\"" + toolName + "\\" declared {sync:true} but returned a Promise");`, ); lines.push( - ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, + ` if (__doTrace) __trace(toolName, start, performance.now(), input, result, null);`, ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); lines.push( @@ -913,7 +946,7 @@ class CodegenContext { lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( - ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, + ` if (__doTrace) __trace(toolName, start, performance.now(), input, null, err);`, ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); lines.push( @@ -931,7 +964,10 @@ class CodegenContext { // Async tool caller — full promise handling with optional timeout. lines.push(` async function __call(fn, input, toolName) {`); lines.push(` if (__signal?.aborted) throw new __BridgeAbortError();`); - lines.push(` const start = __trace ? performance.now() : 0;`); + lines.push(` const __doTrace = __trace && __shouldTrace(fn);`); + lines.push( + ` const start = (__doTrace || __ctx.logger) ? performance.now() : 0;`, + ); lines.push(` try {`); lines.push(` const p = fn(input, __ctx);`); lines.push(` let result;`); @@ -946,7 +982,7 @@ class CodegenContext { lines.push(` result = await p;`); lines.push(` }`); lines.push( - ` if (__trace) __trace(toolName, start, performance.now(), input, result, null);`, + ` if (__doTrace) __trace(toolName, start, performance.now(), input, result, null);`, ); lines.push(` const __execLevel = __toolExecutionLogLevel(fn);`); lines.push( @@ -955,7 +991,7 @@ class CodegenContext { lines.push(` return result;`); lines.push(` } catch (err) {`); lines.push( - ` if (__trace) __trace(toolName, start, performance.now(), input, null, err);`, + ` if (__doTrace) __trace(toolName, start, performance.now(), input, null, err);`, ); lines.push(` const __errorLevel = __toolErrorLogLevel(fn);`); lines.push( @@ -1335,6 +1371,11 @@ class CodegenContext { tw.target, ` ${JSON.stringify(tw.target)}: ${expr}`, ); + } else if (tw.kind === "template") { + inputEntries.set( + tw.target, + ` ${JSON.stringify(tw.target)}: ${this.renderToolTemplateExpr(tw.value, toolDef)}`, + ); } } @@ -1562,6 +1603,10 @@ class CodegenContext { if (tw.kind === "pull") { const expr = this.resolveToolDepSource(tw.source, depToolDef); inputParts.push(` ${JSON.stringify(tw.target)}: ${expr}`); + } else if (tw.kind === "template") { + inputParts.push( + ` ${JSON.stringify(tw.target)}: ${this.renderToolTemplateExpr(tw.value, depToolDef)}`, + ); } } @@ -1645,6 +1690,20 @@ class CodegenContext { return baseExpr + restPath.map((p) => `[${JSON.stringify(p)}]`).join(""); } + private renderToolTemplateExpr(template: string, toolDef: ToolDef): string { + const parts = parseToolTemplate(template); + if (parts.length === 0) { + return JSON.stringify(template); + } + return parts + .map((part) => + part.kind === "text" + ? JSON.stringify(part.value) + : `__toolTemplatePart(${this.resolveToolDepSource(part.value, toolDef)})`, + ) + .join(" + "); + } + /** Find a tool info by tool name. */ private findToolByName(name: string): ToolInfo | undefined { for (const [, info] of this.tools) { @@ -1749,6 +1808,9 @@ class CodegenContext { const elemWires = outputWires.filter( (w) => w !== rootWire && w.to.path.length > 0, ); + const dispatchIndexExpr = rootWire.dispatchIndexRef + ? this.refToExprInElementScope(rootWire.dispatchIndexRef, "_el0") + : undefined; let arrayExpr = this.wireToExpr(rootWire); // Check for catch control on root wire (e.g., `catch continue` returns []) const rootCatchCtrl = @@ -1839,8 +1901,11 @@ class CodegenContext { 0, 4, cf.kind === "continue" ? "for-continue" : "break", + dispatchIndexExpr, ) - : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; + : dispatchIndexExpr + ? ` _result[${dispatchIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, 0, 4)};` + : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; lines.push(` const _result = [];`); lines.push(` __loop0: for (const _el0 of (${arrayExpr} ?? [])) {`); @@ -1857,7 +1922,7 @@ class CodegenContext { lines.push(` }`); lines.push(` }`); lines.push(` return _result;`); - } else { + } else if (!dispatchIndexExpr) { lines.push( ` return await Promise.all((${arrayExpr} ?? []).map(async (_el0) => {`, ); @@ -1868,6 +1933,17 @@ class CodegenContext { ` return ${this.buildElementBody(elemWires, arrayIterators, 0, 4)};`, ); lines.push(` }));`); + } else { + lines.push(` const _result = [];`); + lines.push(` for (const _el0 of (${arrayExpr} ?? [])) {`); + for (const pl of preambleLines) { + lines.push(` ${pl}`); + } + lines.push( + ` _result[${dispatchIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, 0, 4)};`, + ); + lines.push(` }`); + lines.push(` return _result;`); } this.elementLocalVars.clear(); } else if (cf?.kind === "continue" && cf.levels === 1) { @@ -1879,9 +1955,22 @@ class CodegenContext { 4, "continue", ); - lines.push(` return (${arrayExpr} ?? []).flatMap((_el0) => {`); - lines.push(body); - lines.push(` });`); + if (!dispatchIndexExpr) { + lines.push(` return (${arrayExpr} ?? []).flatMap((_el0) => {`); + lines.push(body); + lines.push(` });`); + } else { + lines.push(` const _result = [];`); + lines.push(` for (const _el0 of (${arrayExpr} ?? [])) {`); + lines.push(` const _entry = (() => {`); + lines.push(body.replace(/^[ ]{4}/gm, " ")); + lines.push(` })();`); + lines.push(` if (Array.isArray(_entry) && _entry.length > 0) {`); + lines.push(` _result[${dispatchIndexExpr}] = _entry[0];`); + lines.push(` }`); + lines.push(` }`); + lines.push(` return _result;`); + } } else if ( cf?.kind === "break" || cf?.kind === "continue" || @@ -1899,8 +1988,11 @@ class CodegenContext { 0, 4, cf.kind === "continue" ? "for-continue" : "break", + dispatchIndexExpr, ) - : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; + : dispatchIndexExpr + ? ` _result[${dispatchIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, 0, 4)};` + : ` _result.push(${this.buildElementBody(elemWires, arrayIterators, 0, 4)});`; lines.push(` const _result = [];`); lines.push(` __loop0: for (const _el0 of (${arrayExpr} ?? [])) {`); lines.push(` try {`); @@ -1915,7 +2007,15 @@ class CodegenContext { lines.push(` return _result;`); } else { const body = this.buildElementBody(elemWires, arrayIterators, 0, 4); - lines.push(` return (${arrayExpr} ?? []).map((_el0) => (${body}));`); + if (!dispatchIndexExpr) { + lines.push(` return (${arrayExpr} ?? []).map((_el0) => (${body}));`); + } else { + lines.push(` const _result = [];`); + lines.push(` for (const _el0 of (${arrayExpr} ?? [])) {`); + lines.push(` _result[${dispatchIndexExpr}] = ${body};`); + lines.push(` }`); + lines.push(` return _result;`); + } } return; } @@ -2182,6 +2282,29 @@ class CodegenContext { spreadExprs?: string[], ): string { const pad = " ".repeat(indent); + const keys = [...node.children.keys()]; + + if ( + !spreadExprs?.length && + keys.length > 0 && + keys.every((key) => /^\d+$/.test(key)) + ) { + const assignments: string[] = []; + for (const [key, child] of node.children) { + const childSpreadExprs = (child as { spreadExprs?: string[] }) + .spreadExprs; + const valueExpr = + child.expr != null && child.children.size === 0 && !childSpreadExprs + ? child.expr + : childSpreadExprs || child.children.size > 0 + ? this.serializeOutputTree(child, indent + 2, childSpreadExprs) + : (child.expr ?? "undefined"); + assignments.push(`${pad}_result[${key}] = ${valueExpr};`); + } + const innerPad = " ".repeat(indent - 2); + return `(() => {\n${pad}const _result = [];\n${assignments.join("\n")}\n${pad}return _result;\n${innerPad}})()`; + } + const entries: string[] = []; // Add spread expressions first (they come before field overrides) @@ -2544,6 +2667,7 @@ class CodegenContext { depth: number, indent: number, mode: "break" | "continue" | "for-continue", + resultIndexExpr?: string, ): string { const elVar = `_el${depth}`; const pad = " ".repeat(indent); @@ -2568,7 +2692,9 @@ class CodegenContext { if (mode === "continue") { return `${pad} return [${body}];`; } - return `${pad} _result.push(${body});`; + return resultIndexExpr + ? `${pad} _result[${resultIndexExpr}] = ${body};` + : `${pad} _result.push(${body});`; } // Build the check expression using elementWireToExpr to include fallbacks @@ -2606,16 +2732,16 @@ class CodegenContext { // mode === "for-continue" — same as break but uses native 'continue' keyword if (mode === "for-continue") { if (isNullish) { - return `${pad} if (${checkExpr} == null) ${controlStatement}\n${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`; + return `${pad} if (${checkExpr} == null) ${controlStatement}\n${resultIndexExpr ? `${pad} _result[${resultIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, depth, indent)};` : `${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`}`; } - return `${pad} if (!${checkExpr}) ${controlStatement}\n${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`; + return `${pad} if (!${checkExpr}) ${controlStatement}\n${resultIndexExpr ? `${pad} _result[${resultIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, depth, indent)};` : `${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`}`; } // mode === "break" if (isNullish) { - return `${pad} if (${checkExpr} == null) ${controlStatement}\n${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`; + return `${pad} if (${checkExpr} == null) ${controlStatement}\n${resultIndexExpr ? `${pad} _result[${resultIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, depth, indent)};` : `${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`}`; } - return `${pad} if (!${checkExpr}) ${controlStatement}\n${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`; + return `${pad} if (!${checkExpr}) ${controlStatement}\n${resultIndexExpr ? `${pad} _result[${resultIndexExpr}] = ${this.buildElementBody(elemWires, arrayIterators, depth, indent)};` : `${pad} _result.push(${this.buildElementBody(elemWires, arrayIterators, depth, indent)});`}`; } // ── Wire → expression ──────────────────────────────────────────────────── @@ -3200,6 +3326,18 @@ class CodegenContext { } } + private refToExprInElementScope(ref: NodeRef, elVar: string): string { + const prevElVar = this.currentElVar; + this.elementVarStack.push(elVar); + this.currentElVar = elVar; + try { + return ref.element ? this.refToElementExpr(ref) : this.refToExpr(ref); + } finally { + this.elementVarStack.pop(); + this.currentElVar = prevElVar; + } + } + /** * Collect the tool function references (as JS expressions) for all * element-scoped non-internal tools used by the given element wires. diff --git a/packages/bridge-compiler/src/execute-bridge.ts b/packages/bridge-compiler/src/execute-bridge.ts index 467b2359..60a30f3f 100644 --- a/packages/bridge-compiler/src/execute-bridge.ts +++ b/packages/bridge-compiler/src/execute-bridge.ts @@ -8,6 +8,8 @@ import type { BridgeDocument, + Bridge, + ToolDef, ToolMap, Logger, ToolTrace, @@ -225,6 +227,54 @@ function flattenTools( // ── Public API ────────────────────────────────────────────────────────────── +/** + * Collect the tool function names (flat-key form, e.g. "std.httpCall") that + * a specific bridge operation actually references. This walks the bridge's + * handle bindings → ToolDef → extends chain to resolve the `fn` field. + */ +function getUsedToolFnNames( + document: BridgeDocument, + operation: string, +): string[] { + const [type, field] = operation.split("."); + const bridge = document.instructions.find( + (i): i is Bridge => + i.kind === "bridge" && i.type === type && i.field === field, + ); + if (!bridge) return []; + + const toolDefs = document.instructions.filter( + (i): i is ToolDef => i.kind === "tool", + ); + + const fnNames: string[] = []; + for (const h of bridge.handles) { + if (h.kind !== "tool") continue; + const resolved = resolveToolFn(h.name, toolDefs); + fnNames.push(resolved ?? h.name); + } + return fnNames; +} + +/** Walk the ToolDef extends chain to find the actual `fn` name. */ +function resolveToolFn( + name: string, + toolDefs: ToolDef[], + seen?: Set, +): string | undefined { + const def = toolDefs.find((t) => t.name === name); + if (!def) return undefined; + if (def.fn) return def.fn; + if (def.extends) { + // Guard against circular extends + const visited = seen ?? new Set(); + if (visited.has(name)) return undefined; + visited.add(name); + return resolveToolFn(def.extends, toolDefs, visited); + } + return undefined; +} + /** * Execute a bridge operation using AOT-compiled code. * @@ -289,6 +339,36 @@ export async function executeBridge( const allTools: ToolMap = { std: bundledStd, ...userTools }; const flatTools = flattenTools(allTools as Record); + // Stream tools (async generators with `.bridge.stream`) are unsupported by + // the compiled code path. Fall back to the core interpreter which wraps + // them in StreamHandle / eagerly consumes them. + // Only check tools actually referenced by this bridge operation — not every + // tool in the namespace — so unrelated stream tools don't force a fallback. + const usedFnNames = getUsedToolFnNames(document, operation); + for (const fnName of usedFnNames) { + const val = flatTools[fnName]; + if ( + typeof val === "function" && + val.bridge && + typeof val.bridge === "object" && + (val as { bridge: { stream?: boolean } }).bridge.stream === true + ) { + return executeCoreBridge({ + document, + operation, + input, + tools: userTools, + context, + signal, + toolTimeoutMs, + logger, + trace: options.trace, + requestedFields: options.requestedFields, + ...(maxDepth !== undefined ? { maxDepth } : {}), + }); + } + } + // Set up tracing if requested const traceLevel = options.trace ?? "off"; let tracer: TraceCollector | undefined; diff --git a/packages/bridge-core/src/ExecutionTree.ts b/packages/bridge-core/src/ExecutionTree.ts index 82629915..d4e6532c 100644 --- a/packages/bridge-core/src/ExecutionTree.ts +++ b/packages/bridge-core/src/ExecutionTree.ts @@ -5,6 +5,11 @@ import { trunkDependsOnElement, } from "./scheduleTools.ts"; import { internal } from "./tools/index.ts"; +import { + createDispatchStreamItem, + StreamHandle, + isStreamHandle, +} from "./execute-bridge-stream.ts"; import type { EffectiveToolLog, ToolTrace } from "./tracing.ts"; import { isOtelActive, @@ -43,6 +48,7 @@ import { pathEquals, roundMs, sameTrunk, + setNested, TRUNK_KEY_CACHE, trunkKey, UNSAFE_KEYS, @@ -64,7 +70,10 @@ import { } from "./requested-fields.ts"; import { raceTimeout } from "./utils.ts"; import type { TraceWireBits } from "./enumerate-traversals.ts"; -import { buildTraceBitsMap, enumerateTraversalIds } from "./enumerate-traversals.ts"; +import { + buildTraceBitsMap, + enumerateTraversalIds, +} from "./enumerate-traversals.ts"; function stableMemoizeKey(value: unknown): string { if (value === undefined) { @@ -179,6 +188,12 @@ export class ExecutionTree implements TreeContext { * Public to satisfy `ToolLookupContext` — used by `toolLookup.ts`. */ toolFns?: ToolMap; + /** + * When true, stream tools (`{ stream: true }`) return `StreamHandle` + * sentinels instead of being eagerly consumed. Set by + * `executeBridgeStream()`. + */ + streamMode: boolean = false; /** Shadow-tree nesting depth (0 for root). */ private depth: number; /** Pre-computed `trunkKey({ ...this.trunk, element: true })`. See packages/bridge-core/performance.md (#4). */ @@ -328,7 +343,6 @@ export class ExecutionTree implements TreeContext { if (this.signal?.aborted) { throw new BridgeAbortError(); } - const tracer = this.tracer; const logger = this.logger; const toolContext: ToolContext = { logger: logger ?? {}, @@ -336,7 +350,14 @@ export class ExecutionTree implements TreeContext { }; const timeoutMs = this.toolTimeoutMs; - const { sync: isSyncTool, batch, doTrace, log } = resolveToolMeta(fnImpl); + const { + sync: isSyncTool, + batch, + stream: isStreamTool, + doTrace, + log, + } = resolveToolMeta(fnImpl); + const tracer = doTrace ? this.tracer : undefined; if (batch) { return this.callBatchedTool( @@ -352,6 +373,132 @@ export class ExecutionTree implements TreeContext { ); } + // ── Stream tool handling ────────────────────────────────────── + // Stream tools return async generators. In stream mode, wrap the + // generator in a StreamHandle sentinel so executeBridgeStream can + // iterate it incrementally. In normal mode, eagerly consume the + // generator into an array for backward compatibility. + if (isStreamTool) { + const generator = fnImpl(input, toolContext); + const traceStart = tracer?.now(); + const metricAttrs = { + "bridge.tool.name": toolName, + "bridge.tool.fn": fnName, + }; + + if (this.streamMode) { + const traceEntry = + tracer && traceStart != null + ? tracer.entry({ + tool: toolName, + fn: fnName, + input, + durationMs: 0, + startedAt: traceStart, + }) + : undefined; + + if (traceEntry && tracer) { + tracer.record(traceEntry); + } + + const signal = this.signal; + const instrumentedGenerator = (async function* () { + const wallStart = performance.now(); + const items: unknown[] = []; + try { + for await (const item of generator) { + items.push(item); + yield item; + } + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + if (traceEntry && tracer && traceStart != null) { + traceEntry.durationMs = roundMs(tracer.now() - traceStart); + if (tracer.level === "full") { + traceEntry.output = items; + } + } + logToolSuccess(logger, log.execution, toolName, fnName, durationMs); + } catch (err) { + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + toolErrorCounter.add(1, metricAttrs); + if (traceEntry && tracer && traceStart != null) { + traceEntry.durationMs = roundMs(tracer.now() - traceStart); + traceEntry.error = (err as Error).message; + } + logToolError(logger, log.errors, toolName, fnName, err as Error); + if ( + signal?.aborted && + err instanceof DOMException && + err.name === "AbortError" + ) { + throw new BridgeAbortError(); + } + throw err; + } + })(); + + return new StreamHandle(instrumentedGenerator, toolName); + } + // Eager consumption: collect all yielded values into an array + return (async () => { + const wallStart = performance.now(); + try { + const items: unknown[] = []; + for await (const item of generator) { + items.push(item); + } + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + if (tracer && traceStart != null) { + tracer.record( + tracer.entry({ + tool: toolName, + fn: fnName, + input, + output: items, + durationMs: roundMs(tracer.now() - traceStart), + startedAt: traceStart, + }), + ); + } + logToolSuccess(logger, log.execution, toolName, fnName, durationMs); + return items; + } catch (err) { + const durationMs = roundMs(performance.now() - wallStart); + toolCallCounter.add(1, metricAttrs); + toolDurationHistogram.record(durationMs, metricAttrs); + toolErrorCounter.add(1, metricAttrs); + if (tracer && traceStart != null) { + tracer.record( + tracer.entry({ + tool: toolName, + fn: fnName, + input, + error: (err as Error).message, + durationMs: roundMs(tracer.now() - traceStart), + startedAt: traceStart, + }), + ); + } + logToolError(logger, log.errors, toolName, fnName, err as Error); + if ( + this.signal?.aborted && + err instanceof DOMException && + err.name === "AbortError" + ) { + throw new BridgeAbortError(); + } + throw err; + } + })(); + } + // ── Fast path: no instrumentation configured ────────────────── // When there is no internal tracer, no logger, and OpenTelemetry // has its default no-op provider, skip all instrumentation to @@ -596,7 +743,7 @@ export class ExecutionTree implements TreeContext { for (let start = 0; start < pending.length; start += chunkSize) { const chunk = pending.slice(start, start + chunkSize); const batchInput = chunk.map((item) => item.input); - const tracer = this.tracer; + const tracer = doTrace ? this.tracer : undefined; const logger = this.logger; const metricAttrs = { "bridge.tool.name": queue.toolName, @@ -745,9 +892,197 @@ export class ExecutionTree implements TreeContext { child.signal = this.signal; child.source = this.source; child.filename = this.filename; + child.streamMode = this.streamMode; return child; } + /** + * Wrap a StreamHandle generator so that each yielded item is transformed + * through array-mapping wires (shadow-tree creation + materialisation). + * Returns a new StreamHandle whose generator yields mapped objects. + */ + private wrapStreamWithMapping( + handle: StreamHandle, + prefix: string[], + dispatchIndexRef?: NodeRef, + ): StreamHandle { + const parent = this; + async function* mappedGenerator() { + for await (const item of handle.generator) { + const shadow = parent.shadow(); + shadow.state[parent.elementTrunkKey] = item; + const materialized = await parent.materializeShadows([shadow], prefix); + const mapped = (materialized as unknown[])?.[0]; + if (dispatchIndexRef) { + const rawIndex = await shadow.pullSingle(dispatchIndexRef); + yield createDispatchStreamItem( + parent.normalizeDispatchIndex(rawIndex), + mapped ?? item, + ); + continue; + } + yield mapped ?? item; + } + } + return new StreamHandle( + mappedGenerator(), + handle.toolName, + handle.bridgeLoc, + ); + } + + private normalizeDispatchIndex(value: unknown): number { + const index = typeof value === "number" ? value : Number(value); + if (!Number.isInteger(index) || index < 0) { + throw new Error( + `Computed output index must resolve to a non-negative integer, got ${JSON.stringify(value)}`, + ); + } + return index; + } + + private getDispatchIndexRef(pathPrefix: string[]): NodeRef | undefined { + const bridge = this.bridge; + if (!bridge) return undefined; + const { type, field } = this.trunk; + return bridge.wires.find( + (wire) => + wire.to.module === SELF_MODULE && + wire.to.type === type && + wire.to.field === field && + pathEquals(wire.to.path, pathPrefix) && + wire.dispatchIndexRef !== undefined, + )?.dispatchIndexRef; + } + + private async materializeDispatchedShadows( + shadows: ExecutionTree[], + pathPrefix: string[], + dispatchIndexRef: NodeRef, + ): Promise { + const materialized = await this.materializeShadows(shadows, pathPrefix); + if (!Array.isArray(materialized)) return []; + + const result: unknown[] = []; + await Promise.all( + shadows.map(async (shadow, index) => { + const rawDispatchIndex = await shadow.pullSingle(dispatchIndexRef); + result[this.normalizeDispatchIndex(rawDispatchIndex)] = + materialized[index]; + }), + ); + return result; + } + + /** + * Wrap a StreamHandle destined for a tool input with element-wire mapping. + * + * When a bridge writes `buf <- api[] as chunk { .role <- chunk.x }`, the + * element wires target the tool trunk (same as the root wire's target). + * This method finds those element wires, creates per-item shadow trees, + * resolves them, and yields mapped objects. + * + * If no element wires are found at `prefix`, returns the handle unchanged. + */ + wrapStreamForToolInput( + handle: StreamHandle, + prefix: string[], + target: Trunk, + ): StreamHandle { + const bridge = this.bridge; + if (!bridge) return handle; + + // Build the set of all array-mapping prefixes from the bridge so we can + // attribute each element wire to exactly one mapping. An element wire + // belongs to the mapping with the LONGEST matching prefix. + const allPrefixes: string[][] = bridge.arrayIterators + ? Object.keys(bridge.arrayIterators).map((k) => (k ? k.split(".") : [])) + : []; + + // Collect element wires on the tool trunk at the given prefix. + // Group by target sub-path for correct multi-wire resolution. + // This must include constant and expression-derived wires in addition to + // direct element pulls, otherwise fields like `.index = 2` or + // `.index <- item.pos + 1` are lost before stream tools such as + // `std.accumulate` see the mapped item. + const wiresByPath = new Map(); + for (const w of bridge.wires) { + const to = w.to; + if ( + to.module !== target.module || + to.type !== target.type || + to.field !== target.field + ) + continue; + if (to.path.length <= prefix.length) continue; + if (!prefix.every((seg, i) => to.path[i] === seg)) continue; + + const isElementMappedWire = + to.element === true || + "value" in w || + "cond" in w || + "condAnd" in w || + "condOr" in w || + ("from" in w && + (((w as Extract).from?.element ?? false) || + this.isElementScopedTrunk( + (w as Extract).from, + ))); + if (!isElementMappedWire) continue; + + // Attribute this wire to its owning mapping (longest matching prefix). + // Skip wires that belong to a different (longer) mapping prefix. + let longestMatch = -1; + for (const p of allPrefixes) { + if ( + p.length > longestMatch && + p.length <= to.path.length && + p.every((seg, i) => to.path[i] === seg) + ) { + longestMatch = p.length; + } + } + if (longestMatch > prefix.length) continue; + + const subPath = to.path.slice(prefix.length); + const key = subPath.join("\0"); + let group = wiresByPath.get(key); + if (!group) { + group = []; + wiresByPath.set(key, group); + } + group.push(w); + } + + if (wiresByPath.size === 0) return handle; + + const toolElementKey = `${target.module}:${target.type}:${target.field}:*`; + const parent = this; + const entries = Array.from(wiresByPath.entries()); + + async function* mappedGenerator() { + for await (const item of handle.generator) { + const shadow = parent.shadow(); + shadow.state[toolElementKey] = item; + const obj: Record = {}; + const results = await Promise.all( + entries.map(([, wires]) => shadow.resolveWires(wires)), + ); + for (let i = 0; i < entries.length; i++) { + const subPath = entries[i]![0].split("\0"); + const val = results[i]; + if (val !== undefined) setNested(obj, subPath, val); + } + yield obj; + } + } + return new StreamHandle( + mappedGenerator(), + handle.toolName, + handle.bridgeLoc, + ); + } + /** * Wrap raw array items into shadow trees, honouring `break` / `continue` * sentinels. Shared by `pullOutputField`, `response`, and `run`. @@ -852,7 +1187,7 @@ export class ExecutionTree implements TreeContext { ref.pathSafe?.[i] ?? (i === 0 ? (ref.rootSafe ?? false) : false); if (result == null) { - if ((i === 0 && ref.element) || accessSafe) { + if (ref.element || accessSafe) { result = undefined; continue; } @@ -1235,6 +1570,10 @@ export class ExecutionTree implements TreeContext { const result = this.resolveWires(matches); if (!array) return result; const resolved = await result; + if (isStreamHandle(resolved)) { + const dispatchIndexRef = this.getDispatchIndexRef(path); + return this.wrapStreamWithMapping(resolved, path, dispatchIndexRef); + } if (isLoopControlSignal(resolved)) return []; return this.createShadowArray(resolved as any[]); } @@ -1306,8 +1645,19 @@ export class ExecutionTree implements TreeContext { // Array mapping on a sub-field: resolve the array source, // create shadow trees, and materialise with field mappings. const resolved = await this.resolveWires(regularWires); + const dispatchIndexRef = this.getDispatchIndexRef(prefix); + if (isStreamHandle(resolved)) { + return this.wrapStreamWithMapping(resolved, prefix, dispatchIndexRef); + } if (!Array.isArray(resolved)) return null; const shadows = this.createShadowArray(resolved); + if (dispatchIndexRef) { + return this.materializeDispatchedShadows( + shadows, + prefix, + dispatchIndexRef, + ); + } return this.materializeShadows(shadows, prefix); } @@ -1380,6 +1730,16 @@ export class ExecutionTree implements TreeContext { return obj; } + private createOutputContainer( + fieldNames: Iterable, + ): unknown[] | Record { + const names = [...fieldNames]; + if (names.length > 0 && names.every((name) => /^\d+$/.test(name))) { + return []; + } + return {}; + } + /** * Materialise all output wires into a plain JS object. * @@ -1422,7 +1782,7 @@ export class ExecutionTree implements TreeContext { return this.state[this.elementTrunkKey]; } - // Root wire (`o <- src`) — whole-object passthrough + // Root wire (`o <- src`) — whole-object passthrough or array-mapped output. const hasRootWire = bridge.wires.some( (w) => "from" in w && @@ -1432,10 +1792,39 @@ export class ExecutionTree implements TreeContext { w.to.path.length === 0, ); if (hasRootWire) { + // Array-mapped output (`o <- items[] as x { ... }`) has BOTH a root wire + // AND element-level wires. Detect and handle via pullOutputField([], true) + // which wraps StreamHandles with element-wire mapping. + const hasElementWires = bridge.wires.some( + (w) => + "from" in w && + ((w.from as NodeRef).element === true || + this.isElementScopedTrunk(w.from as NodeRef) || + w.to.element === true) && + w.to.module === SELF_MODULE && + w.to.type === type && + w.to.field === field, + ); + if (hasElementWires) { + const dispatchIndexRef = this.getDispatchIndexRef([]); + const shadowsOrStream = await this.pullOutputField([], true); + if (isStreamHandle(shadowsOrStream)) { + return shadowsOrStream; + } + if (dispatchIndexRef) { + return this.materializeDispatchedShadows( + shadowsOrStream as ExecutionTree[], + [], + dispatchIndexRef, + ); + } + return this.materializeShadows(shadowsOrStream as ExecutionTree[], []); + } return this.pullOutputField([]); } - // Object output — collect unique top-level field names + // Object output — collect unique top-level field names. + // Skip element wires — they are consumed inside shadow trees. const outputFields = new Set(); for (const wire of bridge.wires) { if ( @@ -1444,13 +1833,14 @@ export class ExecutionTree implements TreeContext { wire.to.field === field && wire.to.path.length > 0 ) { + if ("from" in wire && (wire.from as NodeRef).element === true) continue; outputFields.add(wire.to.path[0]!); } } if (outputFields.size === 0) return undefined; - const result: Record = {}; + const result = this.createOutputContainer(outputFields) as any; await Promise.all( [...outputFields].map(async (name) => { @@ -1529,11 +1919,22 @@ export class ExecutionTree implements TreeContext { ); if (hasRootWire && hasElementWires) { - const [shadows] = await Promise.all([ - this.pullOutputField([], true) as Promise, + const dispatchIndexRef = this.getDispatchIndexRef([]); + const [shadowsOrStream] = await Promise.all([ + this.pullOutputField([], true), ...forcePromises, ]); - return this.materializeShadows(shadows, []); + if (isStreamHandle(shadowsOrStream)) { + return shadowsOrStream; + } + if (dispatchIndexRef) { + return this.materializeDispatchedShadows( + shadowsOrStream as ExecutionTree[], + [], + dispatchIndexRef, + ); + } + return this.materializeShadows(shadowsOrStream as ExecutionTree[], []); } // Whole-object passthrough: `o <- api.user` (non-spread root wire) @@ -1545,7 +1946,8 @@ export class ExecutionTree implements TreeContext { return result; } - // Object output — collect unique top-level field names + // Object output — collect unique top-level field names. + // Skip element wires — they are consumed inside shadow trees. const outputFields = new Set(); for (const wire of bridge.wires) { if ( @@ -1554,6 +1956,7 @@ export class ExecutionTree implements TreeContext { wire.to.field === field && wire.to.path.length > 0 ) { + if ("from" in wire && (wire.from as NodeRef).element === true) continue; outputFields.add(wire.to.path[0]!); } } @@ -1572,7 +1975,7 @@ export class ExecutionTree implements TreeContext { // Apply sparse fieldset filter const activeFields = filterOutputFields(outputFields, requestedFields); - const result: Record = {}; + const result = this.createOutputContainer(activeFields) as any; // First resolve spread wires (in order) to build base object // Each spread source's properties are merged into result diff --git a/packages/bridge-core/src/execute-bridge-stream.ts b/packages/bridge-core/src/execute-bridge-stream.ts new file mode 100644 index 00000000..d30cd528 --- /dev/null +++ b/packages/bridge-core/src/execute-bridge-stream.ts @@ -0,0 +1,415 @@ +/** + * Streaming execution for bridge operations. + * + * `executeBridgeStream()` follows the GraphQL incremental delivery + * specification: the first payload includes eagerly resolved data + * (with stream fields initialised to `[]`), and subsequent payloads + * deliver items from async-generator tools one at a time. + * + * Tools that declare `{ stream: true }` on their `.bridge` metadata + * must return `AsyncGenerator`. The engine detects these via + * `StreamHandle` sentinels injected by `callTool()`. + */ + +import { ExecutionTree } from "./ExecutionTree.ts"; +import { attachBridgeErrorDocumentContext } from "./formatBridgeError.ts"; +import { attachBridgeErrorMetadata } from "./tree-types.ts"; +import { TraceCollector } from "./tracing.ts"; +import type { ToolTrace } from "./tracing.ts"; +import type { SourceLocation, ToolMap } from "./types.ts"; +import { SELF_MODULE } from "./types.ts"; +import type { ExecuteBridgeOptions } from "./execute-bridge.ts"; +import { + std as bundledStd, + STD_VERSION as BUNDLED_STD_VERSION, +} from "@stackables/bridge-stdlib"; +import { resolveStd, checkHandleVersions } from "./version-check.ts"; + +// ── Stream handle sentinel ────────────────────────────────────────────────── + +/** + * Internal sentinel that wraps an async generator returned by a stream tool. + * + * When the execution tree resolves a stream tool, `callTool()` wraps the + * generator in a `StreamHandle` instead of consuming it. After `tree.run()` + * completes, `executeBridgeStream` scans the result tree for sentinels, + * replaces them with `[]`, and iterates the generators to produce + * incremental payloads. + */ +export class StreamHandle { + constructor( + public readonly generator: AsyncGenerator, + public readonly toolName: string, + public bridgeLoc?: SourceLocation, + ) {} +} + +export type DispatchStreamItem = { + index: number; + item: unknown; +}; + +export function createDispatchStreamItem( + index: number, + item: unknown, +): DispatchStreamItem { + return { index, item }; +} + +export function isDispatchStreamItem( + value: unknown, +): value is DispatchStreamItem { + return ( + value != null && + typeof value === "object" && + "index" in value && + "item" in value && + typeof (value as { index: unknown }).index === "number" + ); +} + +/** Type guard for `StreamHandle` sentinels embedded in resolved data. */ +export function isStreamHandle(value: unknown): value is StreamHandle { + return value instanceof StreamHandle; +} + +// ── Incremental delivery types ────────────────────────────────────────────── + +/** First payload of an incremental delivery sequence. */ +export type StreamInitialPayload = { + data: T; + hasNext: boolean; + traces?: ToolTrace[]; + executionTraceId?: bigint; +}; + +/** A single incremental item patch. */ +export interface StreamIncrementalItem { + items: unknown[]; + path: (string | number)[]; +} + +/** Subsequent payload with incremental patches. */ +export type StreamIncrementalPayload = { + incremental: StreamIncrementalItem[]; + hasNext: boolean; + executionTraceId?: bigint; +}; + +/** Union of all payload types yielded by `executeBridgeStream()`. */ +export type StreamPayload = + | StreamInitialPayload + | StreamIncrementalPayload; + +// ── Helpers ───────────────────────────────────────────────────────────────── + +interface FoundStream { + handle: StreamHandle; + path: (string | number)[]; +} + +const SKIP_STREAM_SLOT = Symbol("bridge.skipStreamSlot"); + +/** + * Walk the resolved data tree, replacing `StreamHandle` sentinels with `[]` + * and collecting them with their paths for later iteration. + */ +function extractStreams( + data: unknown, + path: (string | number)[], + found: FoundStream[], +): unknown { + if (isStreamHandle(data)) { + found.push({ handle: data, path: [...path] }); + const lastSeg = path[path.length - 1]; + if (typeof lastSeg === "number") return SKIP_STREAM_SLOT; + if (typeof lastSeg === "string" && /^\d+$/.test(lastSeg)) { + return SKIP_STREAM_SLOT; + } + return []; + } + if (Array.isArray(data)) { + const result: unknown[] = []; + for (let i = 0; i < data.length; i++) { + const extracted = extractStreams(data[i], [...path, i], found); + if (extracted !== SKIP_STREAM_SLOT) { + result.push(extracted); + } + } + return result; + } + if (data != null && typeof data === "object") { + const numericKeys = Object.keys(data).every((key) => /^\d+$/.test(key)); + if (numericKeys) { + const result: unknown[] = []; + for (const [key, value] of Object.entries(data)) { + const extracted = extractStreams(value, [...path, Number(key)], found); + if (extracted !== SKIP_STREAM_SLOT) { + result[Number(key)] = extracted; + } + } + return result; + } + const result: Record = {}; + for (const [key, value] of Object.entries(data)) { + const extracted = extractStreams(value, [...path, key], found); + if (extracted !== SKIP_STREAM_SLOT) { + result[key] = extracted; + } + } + return result; + } + return data; +} + +// ── Main entry point ──────────────────────────────────────────────────────── + +/** + * Execute a bridge operation with incremental streaming delivery. + * + * Returns an `AsyncGenerator` that yields payloads following the GraphQL + * incremental delivery specification: + * + * 1. **Initial payload** — contains all eagerly resolved data. Fields + * backed by stream tools (`{ stream: true }`) are initialised to `[]`. + * Includes `hasNext: true` when stream generators are pending. + * + * 2. **Incremental payloads** — one per yielded item from each stream + * generator. Each payload carries an `incremental` array of patches + * with `items` and `path` (matching the GraphQL spec format). + * + * 3. **Final payload** — delivered with `hasNext: false` to signal + * completion. + * + * When no stream tools are present, a single payload with + * `hasNext: false` is yielded (equivalent to `executeBridge()`). + * + * @example + * ```ts + * const stream = executeBridgeStream({ + * document, + * operation: "Query.searchProducts", + * input: { query: "shoes" }, + * tools: { aiSearch }, + * }); + * + * for await (const payload of stream) { + * if ("data" in payload) { + * console.log("Initial:", payload.data); + * } else { + * console.log("Incremental:", payload.incremental); + * } + * if (!payload.hasNext) break; + * } + * ``` + */ +export async function* executeBridgeStream( + options: ExecuteBridgeOptions, +): AsyncGenerator, void, undefined> { + const { document: doc, operation, input = {}, context = {} } = options; + + const parts = operation.split("."); + if (parts.length !== 2 || !parts[0] || !parts[1]) { + throw new Error( + `Invalid operation "${operation}" — expected "Type.field" (e.g. "Query.myField")`, + ); + } + + const [type, field] = parts as [string, string]; + const trunk = { module: SELF_MODULE, type, field }; + + const userTools = options.tools ?? {}; + + const { namespace: activeStd, version: activeStdVersion } = resolveStd( + doc.version, + bundledStd, + BUNDLED_STD_VERSION, + userTools, + ); + + const allTools: ToolMap = { std: activeStd, ...userTools }; + checkHandleVersions(doc.instructions, allTools, activeStdVersion); + + const tree = new ExecutionTree(trunk, doc, allTools, context); + + tree.source = doc.source; + tree.filename = doc.filename; + // Enable stream mode — callTool will wrap async generators in StreamHandle + tree.streamMode = true; + + if (options.logger) tree.logger = options.logger; + if (options.signal) tree.signal = options.signal; + if ( + options.toolTimeoutMs !== undefined && + Number.isFinite(options.toolTimeoutMs) && + options.toolTimeoutMs >= 0 + ) { + tree.toolTimeoutMs = Math.floor(options.toolTimeoutMs); + } + if ( + options.maxDepth !== undefined && + Number.isFinite(options.maxDepth) && + options.maxDepth >= 0 + ) { + tree.maxDepth = Math.floor(options.maxDepth); + } + + const traceLevel = options.trace ?? "off"; + if (traceLevel !== "off") { + tree.tracer = new TraceCollector(traceLevel); + } + + tree.enableExecutionTrace(); + + let rawData: unknown; + try { + rawData = await tree.run(input, options.requestedFields); + } catch (err) { + if (err && typeof err === "object") { + (err as { executionTraceId?: bigint }).executionTraceId = + tree.getExecutionTrace(); + } + throw attachBridgeErrorDocumentContext(err, doc); + } + + // Scan resolved data for stream sentinels + const streams: FoundStream[] = []; + const data = extractStreams(rawData, [], streams) as T; + + const traces = tree.getTraces(); + const executionTraceId = tree.getExecutionTrace(); + + if (streams.length === 0) { + // No stream tools — single payload, equivalent to executeBridge() + yield { + data, + hasNext: false, + traces, + executionTraceId, + }; + return; + } + + // Yield initial payload with stream fields initialised to [] + yield { + data, + hasNext: true, + traces, + executionTraceId, + }; + + // Iterate all stream generators concurrently. + // Each yielded item becomes an incremental payload. + const signal = options.signal; + + // Track active generators + type ActiveStream = { + iterator: AsyncGenerator; + path: (string | number)[]; + index: number; + done: boolean; + bridgeLoc?: SourceLocation; + /** When true, yields replace at a fixed index instead of appending. */ + dispatch: boolean; + }; + + const active: ActiveStream[] = streams.map((s) => { + // Dispatch mode: when the stream sits at a numeric path segment + // (e.g. o[0]), each yield replaces at that fixed index instead of + // appending. Detected purely from the output path. + const lastSeg = s.path[s.path.length - 1]; + const dispatch = + typeof lastSeg === "number" || + (typeof lastSeg === "string" && /^\d+$/.test(lastSeg)); + return { + iterator: s.handle.generator, + path: s.path, + index: 0, + done: false, + bridgeLoc: s.handle.bridgeLoc, + dispatch, + }; + }); + + // Pull from all active generators concurrently + while (active.some((s) => !s.done)) { + if (signal?.aborted) break; + + // Race: get next item from any active generator + const pending = active + .filter((s) => !s.done) + .map(async (stream) => { + try { + const result = await stream.iterator.next(); + return { stream, result }; + } catch (err) { + throw attachBridgeErrorMetadata(err, { + bridgeLoc: stream.bridgeLoc, + }); + } + }); + + // Wait for all active generators. If any generator throws, propagate the + // error instead of silently truncating the stream. + const results = await Promise.allSettled(pending); + + const rejected = results.find( + (settled): settled is PromiseRejectedResult => + settled.status === "rejected", + ); + if (rejected) { + const err = rejected.reason; + if (err && typeof err === "object") { + (err as { executionTraceId?: bigint }).executionTraceId = + tree.getExecutionTrace(); + } + throw attachBridgeErrorDocumentContext(err, doc); + } + + const incremental: StreamIncrementalItem[] = []; + + for (const settled of results) { + if (settled.status !== "fulfilled") { + continue; + } + const { stream, result } = settled.value; + if (result.done) { + stream.done = true; + continue; + } + const dispatchItem = isDispatchStreamItem(result.value) + ? result.value + : undefined; + incremental.push({ + items: [dispatchItem ? dispatchItem.item : result.value], + path: dispatchItem + ? [...stream.path, dispatchItem.index] + : stream.dispatch + ? [...stream.path] + : [...stream.path, stream.index], + }); + // In dispatch mode, index stays at 0 — each yield replaces the + // previous value. In normal mode, index auto-increments for append. + if (!stream.dispatch && !dispatchItem) { + stream.index++; + } + } + + if (incremental.length > 0) { + const hasNext = active.some((s) => !s.done); + yield { + incremental, + hasNext, + executionTraceId: tree.getExecutionTrace(), + }; + if (!hasNext) return; + } + } + + // Final termination signal if we broke out of the loop + yield { + incremental: [], + hasNext: false, + executionTraceId: tree.getExecutionTrace(), + }; +} diff --git a/packages/bridge-core/src/index.ts b/packages/bridge-core/src/index.ts index 46fdaf71..b5e343c3 100644 --- a/packages/bridge-core/src/index.ts +++ b/packages/bridge-core/src/index.ts @@ -15,6 +15,18 @@ export type { ExecuteBridgeResult, } from "./execute-bridge.ts"; +export { + executeBridgeStream, + StreamHandle, + isStreamHandle, +} from "./execute-bridge-stream.ts"; +export type { + StreamInitialPayload, + StreamIncrementalItem, + StreamIncrementalPayload, + StreamPayload, +} from "./execute-bridge-stream.ts"; + // ── Version check ─────────────────────────────────────────────────────────── export { @@ -70,6 +82,8 @@ export type { SourceLocation, ScalarToolCallFn, ScalarToolFn, + StreamToolCallFn, + StreamToolFn, ToolCallFn, ToolContext, ToolDef, diff --git a/packages/bridge-core/src/resolveWires.ts b/packages/bridge-core/src/resolveWires.ts index 81e12efc..069d792e 100644 --- a/packages/bridge-core/src/resolveWires.ts +++ b/packages/bridge-core/src/resolveWires.ts @@ -33,6 +33,39 @@ import type { TraceWireBits } from "./enumerate-traversals.ts"; */ type WireWithGates = Exclude; +function throwPrimaryWireError(err: unknown, wire: Wire): never { + if (isFatalError(err)) { + throw err; + } + + throw wrapBridgeRuntimeError(err, { + bridgeLoc: wire.loc, + }); +} + +function attachStreamBridgeLoc( + value: unknown, + bridgeLoc: Wire["loc"], +): unknown { + if (!bridgeLoc || !value || typeof value !== "object") { + return value; + } + + const candidate = value as { + generator?: { next?: unknown }; + toolName?: unknown; + bridgeLoc?: Wire["loc"]; + }; + if ( + typeof candidate.toolName === "string" && + typeof candidate.generator?.next === "function" && + candidate.bridgeLoc === undefined + ) { + candidate.bridgeLoc = bridgeLoc; + } + return value; +} + // ── Public entry point ────────────────────────────────────────────────────── /** @@ -80,11 +113,22 @@ export function resolveWires( const ref = getSimplePullRef(w); if (ref) { recordPrimary(ctx, w); - return ctx.pullSingle( - ref, - pullChain, - "from" in w ? (w.fromLoc ?? w.loc) : w.loc, - ); + try { + const result = ctx.pullSingle( + ref, + pullChain, + "from" in w ? (w.fromLoc ?? w.loc) : w.loc, + ); + if (isPromise(result)) { + return result.then( + (resolved) => attachStreamBridgeLoc(resolved, w.loc), + (err) => throwPrimaryWireError(err, w), + ); + } + return attachStreamBridgeLoc(result, w.loc); + } catch (err) { + throwPrimaryWireError(err, w); + } } } const orderedWires = orderOverdefinedWires(ctx, wires); @@ -139,7 +183,7 @@ async function resolveWiresAsync( value = await applyFallbackGates(ctx, w, value, pullChain); // Overdefinition Boundary - if (value != null) return value; + if (value != null) return attachStreamBridgeLoc(value, w.loc); } catch (err: unknown) { // Layer 3: Catch Gate if (isFatalError(err)) throw err; @@ -176,7 +220,11 @@ export async function applyFallbackGates( ): Promise { if (!w.fallbacks?.length) return value; - for (let fallbackIndex = 0; fallbackIndex < w.fallbacks.length; fallbackIndex++) { + for ( + let fallbackIndex = 0; + fallbackIndex < w.fallbacks.length; + fallbackIndex++ + ) { const fallback = w.fallbacks[fallbackIndex]; const isFalsyGateOpen = fallback.type === "falsy" && !value; const isNullishGateOpen = fallback.type === "nullish" && value == null; diff --git a/packages/bridge-core/src/scheduleTools.ts b/packages/bridge-core/src/scheduleTools.ts index bb853135..a1c9925c 100644 --- a/packages/bridge-core/src/scheduleTools.ts +++ b/packages/bridge-core/src/scheduleTools.ts @@ -20,6 +20,7 @@ import { resolveToolSource, type ToolLookupContext, } from "./toolLookup.ts"; +import { isStreamHandle, StreamHandle } from "./execute-bridge-stream.ts"; // ── Context interface ─────────────────────────────────────────────────────── @@ -50,6 +51,12 @@ export interface SchedulerContext extends ToolLookupContext { schedule(target: Trunk, pullChain?: Set): MaybePromise; /** Resolve a set of matched wires (delegates to resolveWires.ts). */ resolveWires(wires: Wire[], pullChain?: Set): MaybePromise; + /** Wrap a stream destined for a tool input with element-wire mapping. */ + wrapStreamForToolInput( + handle: StreamHandle, + prefix: string[], + target: Trunk, + ): StreamHandle; } function getBridgeLocFromGroups(groupEntries: [string, Wire[]][]): Wire["loc"] { @@ -61,6 +68,14 @@ function getBridgeLocFromGroups(groupEntries: [string, Wire[]][]): Wire["loc"] { return undefined; } +function attachToolConsumerBridgeLoc(value: T, bridgeLoc: Wire["loc"]): T { + if (!bridgeLoc || !isStreamHandle(value) || value.bridgeLoc !== undefined) { + return value; + } + value.bridgeLoc = bridgeLoc; + return value; +} + // ── Helpers ───────────────────────────────────────────────────────────────── /** Derive tool name from a trunk. */ @@ -284,7 +299,19 @@ export function scheduleFinish( const path = groupEntries[i]![1][0]!.to.path; const value = resolvedValues[i]; resolved.push([path, value]); - if (path.length === 0 && value != null && typeof value === "object") { + if (path.length === 0 && isStreamHandle(value)) { + // Stream-through: wrap with element mapping (if any) then pass generator + const mapped = ctx.wrapStreamForToolInput( + value as StreamHandle, + path, + target, + ); + input._source = mapped.generator; + } else if ( + path.length === 0 && + value != null && + typeof value === "object" + ) { Object.assign(input, value); } else { setNested(input, path, value); @@ -309,7 +336,19 @@ export function scheduleFinish( const memoizeKey = ctx.memoizedToolKeys.has(trunkKey(target)) ? trunkKey(target) : undefined; - return ctx.callTool(toolName, toolName, directFn, input, memoizeKey); + const result = ctx.callTool( + toolName, + toolName, + directFn, + input, + memoizeKey, + ); + if (isPromise(result)) { + return result.then((resolved) => + attachToolConsumerBridgeLoc(resolved, bridgeLoc), + ); + } + return attachToolConsumerBridgeLoc(result, bridgeLoc); } // Define pass-through: synthetic trunks created by define inlining @@ -366,7 +405,18 @@ export async function scheduleToolDef( }), ); for (const [path, value] of resolved) { - if (path.length === 0 && value != null && typeof value === "object") { + if (path.length === 0 && isStreamHandle(value)) { + const mapped = ctx.wrapStreamForToolInput( + value as StreamHandle, + path, + target, + ); + input._source = mapped.generator; + } else if ( + path.length === 0 && + value != null && + typeof value === "object" + ) { Object.assign(input, value); } else { setNested(input, path, value); @@ -392,7 +442,14 @@ export async function scheduleToolDef( const memoizeKey = ctx.memoizedToolKeys.has(trunkKey(target)) ? trunkKey(target) : undefined; - return await ctx.callTool(toolName, toolDef.fn!, fn, input, memoizeKey); + const result = await ctx.callTool( + toolName, + toolDef.fn!, + fn, + input, + memoizeKey, + ); + return attachToolConsumerBridgeLoc(result, bridgeLoc); } catch (err) { if (!onErrorWire) throw err; if ("value" in onErrorWire) return JSON.parse(onErrorWire.value); diff --git a/packages/bridge-core/src/toolLookup.ts b/packages/bridge-core/src/toolLookup.ts index e7d7c2db..3acb680d 100644 --- a/packages/bridge-core/src/toolLookup.ts +++ b/packages/bridge-core/src/toolLookup.ts @@ -202,6 +202,7 @@ export async function resolveToolWires( // Pull wires resolved in parallel (independent deps shouldn't wait on each other) const pullWires = toolDef.wires.filter((w) => w.kind === "pull"); + const templateWires = toolDef.wires.filter((w) => w.kind === "template"); if (pullWires.length > 0) { const resolved = await Promise.all( pullWires.map(async (wire) => ({ @@ -213,6 +214,17 @@ export async function resolveToolWires( setNested(input, parsePath(target), value); } } + if (templateWires.length > 0) { + const resolved = await Promise.all( + templateWires.map(async (wire) => ({ + target: wire.target, + value: await resolveToolTemplate(ctx, wire.value, toolDef), + })), + ); + for (const { target, value } of resolved) { + setNested(input, parsePath(target), value); + } + } } // ── Tool source resolution ────────────────────────────────────────────────── @@ -263,6 +275,33 @@ export async function resolveToolSource( return value; } +const TOOL_TEMPLATE_REF_RE = /\{([^{}]+)\}/g; + +export async function resolveToolTemplate( + ctx: ToolLookupContext, + template: string, + toolDef: ToolDef, +): Promise { + const matches = [...template.matchAll(TOOL_TEMPLATE_REF_RE)]; + if (matches.length === 0) return template; + + const values = await Promise.all( + matches.map((match) => resolveToolSource(ctx, match[1]!.trim(), toolDef)), + ); + + let result = ""; + let cursor = 0; + for (let i = 0; i < matches.length; i++) { + const match = matches[i]!; + result += template.slice(cursor, match.index); + const value = values[i]; + result += value == null ? "" : String(value); + cursor = match.index + match[0].length; + } + result += template.slice(cursor); + return result; +} + // ── Tool dependency execution ─────────────────────────────────────────────── /** diff --git a/packages/bridge-core/src/tracing.ts b/packages/bridge-core/src/tracing.ts index 94e8bb29..62917e58 100644 --- a/packages/bridge-core/src/tracing.ts +++ b/packages/bridge-core/src/tracing.ts @@ -251,6 +251,8 @@ export type ResolvedToolMeta = { sync: boolean; /** Batch mode contract, when declared. */ batch?: BatchToolMetadata; + /** Whether the tool is an async generator yielding incremental results. */ + stream: boolean; /** Emit an OTel span for this call. Default: `true`. */ doTrace: boolean; log: EffectiveToolLog; @@ -280,6 +282,7 @@ export function resolveToolMeta(fn: (...args: any[]) => any): ResolvedToolMeta { : typeof bridge?.batch === "object" ? bridge.batch : undefined, + stream: bridge?.stream === true, doTrace: bridge?.trace !== false, log: resolveToolLog(bridge), }; diff --git a/packages/bridge-core/src/types.ts b/packages/bridge-core/src/types.ts index cb5b2230..29905f35 100644 --- a/packages/bridge-core/src/types.ts +++ b/packages/bridge-core/src/types.ts @@ -61,6 +61,7 @@ export type Wire = | { from: NodeRef; to: NodeRef; + dispatchIndexRef?: NodeRef; loc?: SourceLocation; fromLoc?: SourceLocation; pipe?: true; @@ -73,7 +74,12 @@ export type Wire = catchFallbackRef?: NodeRef; catchControl?: ControlFlowInstruction; } - | { value: string; to: NodeRef; loc?: SourceLocation } + | { + value: string; + to: NodeRef; + dispatchIndexRef?: NodeRef; + loc?: SourceLocation; + } | { cond: NodeRef; condLoc?: SourceLocation; @@ -84,6 +90,7 @@ export type Wire = elseValue?: string; elseLoc?: SourceLocation; to: NodeRef; + dispatchIndexRef?: NodeRef; loc?: SourceLocation; fallbacks?: WireFallback[]; catchLoc?: SourceLocation; @@ -101,6 +108,7 @@ export type Wire = rightSafe?: true; }; to: NodeRef; + dispatchIndexRef?: NodeRef; loc?: SourceLocation; fallbacks?: WireFallback[]; catchLoc?: SourceLocation; @@ -118,6 +126,7 @@ export type Wire = rightSafe?: true; }; to: NodeRef; + dispatchIndexRef?: NodeRef; loc?: SourceLocation; fallbacks?: WireFallback[]; catchLoc?: SourceLocation; @@ -244,6 +253,7 @@ export type ToolDep = export type ToolWire = | { target: string; kind: "constant"; value: string } | { target: string; kind: "pull"; source: string } + | { target: string; kind: "template"; value: string } | { kind: "onError"; value: string } | { kind: "onError"; source: string }; @@ -261,6 +271,8 @@ export type { ToolContext, ScalarToolCallFn, ScalarToolFn, + StreamToolCallFn, + StreamToolFn, ToolCallFn, ToolMap, ToolMetadata, diff --git a/packages/bridge-parser/src/bridge-format.ts b/packages/bridge-parser/src/bridge-format.ts index 17a254f6..b0b7d236 100644 --- a/packages/bridge-parser/src/bridge-format.ts +++ b/packages/bridge-parser/src/bridge-format.ts @@ -181,6 +181,8 @@ function serializeToolBlock(tool: ToolDef): string { } else { lines.push(` .${wire.target} = ${wire.value}`); } + } else if (wire.kind === "template") { + lines.push(` .${wire.target} <- ${JSON.stringify(wire.value)}`); } else { lines.push(` .${wire.target} <- ${wire.source}`); } @@ -932,6 +934,15 @@ function serializeBridgeBlock(bridge: Bridge): string { } } + function serializeDispatchIndexRef(ref: NodeRef, iterName: string): string { + if (ref.element) { + return ref.path.length > 0 + ? `${iterName}.${serPath(ref.path)}` + : iterName; + } + return sRef(ref, true); + } + // ── Helper: serialize an expression fork tree for a ref (used for cond) ── function serializeExprOrRef(ref: NodeRef): string { const tk = refTrunkKey(ref); @@ -1020,7 +1031,9 @@ function serializeBridgeBlock(bridge: Bridge): string { serializedArrays.add(arrayKey); const iterName = arrayIterators[arrayKey]; const fromStr = sRef(w.from, true) + "[]"; - const toStr = sRef(w.to, false); + const toStr = w.dispatchIndexRef + ? `${sRef(w.to, false)}[${serializeDispatchIndexRef(w.dispatchIndexRef, iterName)}]` + : sRef(w.to, false); lines.push(`${toStr} <- ${fromStr} as ${iterName} {`); serializeArrayElements(w.to.path, iterName, " "); lines.push(`}`); diff --git a/packages/bridge-parser/src/parser/parser.ts b/packages/bridge-parser/src/parser/parser.ts index cdede099..c0a7f097 100644 --- a/packages/bridge-parser/src/parser/parser.ts +++ b/packages/bridge-parser/src/parser/parser.ts @@ -201,7 +201,18 @@ class BridgeParser extends CstParser { { ALT: () => { this.CONSUME(Arrow, { LABEL: "arrowOp" }); - this.SUBRULE(this.dottedName, { LABEL: "source" }); + this.OR2([ + { + ALT: () => { + this.CONSUME(StringLiteral, { LABEL: "stringSource" }); + }, + }, + { + ALT: () => { + this.SUBRULE(this.dottedName, { LABEL: "source" }); + }, + }, + ]); }, }, ]); @@ -1076,7 +1087,7 @@ class BridgeParser extends CstParser { } if (la.tokenType === LSquare) { const la2 = this.LA(2); - return la2.tokenType === NumberLiteral; + return la2.tokenType !== RSquare; } return false; }, @@ -1097,7 +1108,20 @@ class BridgeParser extends CstParser { { ALT: () => { this.CONSUME(LSquare); - this.CONSUME(NumberLiteral, { LABEL: "arrayIndex" }); + this.OR2([ + { + ALT: () => { + this.CONSUME(NumberLiteral, { LABEL: "arrayIndex" }); + }, + }, + { + ALT: () => { + this.SUBRULE3(this.addressPath, { + LABEL: "dynamicIndexPath", + }); + }, + }, + ]); this.CONSUME(RSquare); }, }, @@ -1615,12 +1639,24 @@ function extractDottedPathStr(node: CstNode): string { function extractAddressPath(node: CstNode): { root: string; segments: string[]; + dynamicIndices?: Array<{ + position: number; + root: string; + segments: string[]; + }>; safe?: boolean; rootSafe?: boolean; segmentSafe?: boolean[]; } { const root = extractNameToken(sub(node, "root")!); - type Seg = { offset: number; value: string }; + type Seg = + | { offset: number; kind: "static"; value: string } + | { + offset: number; + kind: "dynamic"; + root: string; + segments: string[]; + }; const items: Seg[] = []; const safeNavTokens = (node.children.safeNav as IToken[] | undefined) ?? []; const hasSafeNav = safeNavTokens.length > 0; @@ -1632,6 +1668,7 @@ function extractAddressPath(node: CstNode): { items.push({ offset: seg.location?.startOffset ?? findFirstToken(seg)?.startOffset ?? 0, + kind: "static", value: extractPathSegment(seg), }); } @@ -1641,7 +1678,23 @@ function extractAddressPath(node: CstNode): { `Line ${idxTok.startLine}: Array indices must be integers, found "${idxTok.image}"`, ); } - items.push({ offset: idxTok.startOffset, value: idxTok.image }); + items.push({ + offset: idxTok.startOffset, + kind: "static", + value: idxTok.image, + }); + } + for (const dynNode of subs(node, "dynamicIndexPath")) { + const dyn = extractAddressPath(dynNode); + items.push({ + offset: + dynNode.location?.startOffset ?? + findFirstToken(dynNode)?.startOffset ?? + 0, + kind: "dynamic", + root: dyn.root, + segments: dyn.segments, + }); } items.sort((a, b) => a.offset - b.offset); @@ -1671,9 +1724,28 @@ function extractAddressPath(node: CstNode): { segmentSafe.push(isSafe); } + const segments: string[] = []; + const dynamicIndices: Array<{ + position: number; + root: string; + segments: string[]; + }> = []; + for (const item of items) { + if (item.kind === "static") { + segments.push(item.value); + continue; + } + dynamicIndices.push({ + position: segments.length, + root: item.root, + segments: item.segments, + }); + } + return { root, - segments: items.map((i) => i.value), + segments, + ...(dynamicIndices.length > 0 ? { dynamicIndices } : {}), ...(hasSafeNav ? { safe: true } : {}), ...(rootSafe ? { rootSafe } : {}), ...(segmentSafe.some((s) => s) ? { segmentSafe } : {}), @@ -1858,8 +1930,9 @@ function processElementLines( elemLines: CstNode[], arrayToPath: string[], iterScope: string | string[], - bridgeType: string, - bridgeField: string, + elemTrunkModule: string, + elemTrunkType: string, + elemTrunkField: string, wires: Wire[], arrayIterators: Record, buildSourceExpr: ( @@ -1934,9 +2007,9 @@ function processElementLines( if (iterNames[index] !== root) continue; const elementDepth = iterNames.length - 1 - index; return { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, ...(elementDepth > 0 ? { elementDepth } : {}), path: segments, @@ -1989,9 +2062,9 @@ function processElementLines( { value, to: { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, path: elemToPath, }, @@ -2009,9 +2082,9 @@ function processElementLines( const segs = parseTemplateString(raw); const elemToRef: NodeRef = { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, path: elemToPath, }; @@ -2125,9 +2198,9 @@ function processElementLines( ); } const innerToRef: NodeRef = { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, path: elemToPath, }; wires.push( @@ -2166,8 +2239,9 @@ function processElementLines( subs(nestedArrayNode, "elementLine"), elemToPath, [...iterNames, innerIterName], - bridgeType, - bridgeField, + elemTrunkModule, + elemTrunkType, + elemTrunkField, wires, arrayIterators, buildSourceExpr, @@ -2188,9 +2262,9 @@ function processElementLines( // ── Element pull wire (expression or plain) ── const elemToRef: NodeRef = { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, path: elemToPath, }; @@ -2446,9 +2520,9 @@ function processElementLines( { from: fromRef, to: { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, path: elemToPath, }, @@ -2464,8 +2538,9 @@ function processElementLines( elemToPath, [], iterNames, - bridgeType, - bridgeField, + elemTrunkModule, + elemTrunkType, + elemTrunkField, wires, buildSourceExpr, extractCoalesceAlt, @@ -2491,8 +2566,9 @@ function processElementScopeLines( arrayToPath: string[], pathPrefix: string[], iterScope: string | string[], - bridgeType: string, - bridgeField: string, + elemTrunkModule: string, + elemTrunkType: string, + elemTrunkField: string, wires: Wire[], buildSourceExpr: ( node: CstNode, @@ -2553,9 +2629,9 @@ function processElementScopeLines( if (iterNames[index] !== root) continue; const elementDepth = iterNames.length - 1 - index; return { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, ...(elementDepth > 0 ? { elementDepth } : {}), path: segments, @@ -2625,9 +2701,9 @@ function processElementScopeLines( { from: fromRef, to: { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, path: spreadToPath, }, @@ -2643,8 +2719,9 @@ function processElementScopeLines( arrayToPath, fullSegs, iterNames, - bridgeType, - bridgeField, + elemTrunkModule, + elemTrunkType, + elemTrunkField, wires, buildSourceExpr, extractCoalesceAlt, @@ -2667,9 +2744,9 @@ function processElementScopeLines( { value, to: { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, element: true, path: elemToPath, }, @@ -2683,9 +2760,9 @@ function processElementScopeLines( // ── Pull wire: .field <- source [modifiers] ── if (sc.scopeArrow) { const elemToRef: NodeRef = { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunkModule, + type: elemTrunkType, + field: elemTrunkField, path: elemToPath, }; @@ -3209,8 +3286,19 @@ function buildToolDef( const value = extractBareValue(sub(wireNode, "value")!); wires.push({ target, kind: "constant", value }); } else if (wc.arrowOp) { - const source = extractDottedName(sub(wireNode, "source")!); - wires.push({ target, kind: "pull", source }); + const stringSourceToken = ( + wc.stringSource as IToken[] | undefined + )?.[0]; + if (stringSourceToken) { + wires.push({ + target, + kind: "template", + value: stringSourceToken.image.slice(1, -1), + }); + } else { + const source = extractDottedName(sub(wireNode, "source")!); + wires.push({ target, kind: "pull", source }); + } } continue; } @@ -3542,6 +3630,30 @@ function buildBridgeBody( return Array.isArray(iterScope) ? iterScope : [iterScope]; } + function assertNoDynamicIndices( + extracted: { + dynamicIndices?: Array<{ + position: number; + root: string; + segments: string[]; + }>; + }, + lineNum: number, + context: string, + ): void { + if (!extracted.dynamicIndices?.length) return; + throw new Error( + `Line ${lineNum}: Computed indices are only supported on root output array mappings, not in ${context}.`, + ); + } + + // Mutable trunk identity for element wires. Updated before entering an + // array-mapping block so that `resolveIterRef` (and any helper that calls + // it, such as `buildSourceExpr` / `desugarTemplateString`) emits element + // refs targeting the correct trunk — tool trunk or output trunk depending + // on the root wire's target. + let elemTrunk = { module: SELF_MODULE, type: bridgeType, field: bridgeField }; + function resolveIterRef( root: string, segments: string[], @@ -3552,9 +3664,9 @@ function buildBridgeBody( if (names[index] !== root) continue; const elementDepth = names.length - 1 - index; return { - module: SELF_MODULE, - type: bridgeType, - field: bridgeField, + module: elemTrunk.module, + type: elemTrunk.type, + field: elemTrunk.field, element: true, ...(elementDepth > 0 ? { elementDepth } : {}), path: [...segments], @@ -3589,14 +3701,6 @@ function buildBridgeBody( return ref; } - function assertNoTargetIndices(ref: NodeRef, lineNum: number): void { - if (ref.path.some((seg) => /^\d+$/.test(seg))) { - throw new Error( - `Line ${lineNum}: Explicit array index in wire target is not supported. Use array mapping (\`[] as iter { }\`) instead.`, - ); - } - } - // ── Helper: process block-scoped with-declarations inside array maps ── /** @@ -3837,9 +3941,13 @@ function buildBridgeBody( const wc = wireNode.children; const lineNum = line(findFirstToken(wireNode)); const wireLoc = locFromNode(wireNode); - const { root: targetRoot, segments: targetSegs } = extractAddressPath( - sub(wireNode, "target")!, + const extractedTarget = extractAddressPath(sub(wireNode, "target")!); + assertNoDynamicIndices( + extractedTarget, + lineNum, + "loop-scoped tool targets", ); + const { root: targetRoot, segments: targetSegs } = extractedTarget; if (!writableHandles.has(targetRoot)) { throw new Error( @@ -3848,7 +3956,6 @@ function buildBridgeBody( } const toRef = resolveAddress(targetRoot, targetSegs, lineNum); - assertNoTargetIndices(toRef, lineNum); if (wc.equalsOp) { const value = extractBareValue(sub(wireNode, "constValue")!); @@ -4135,8 +4242,9 @@ function buildBridgeBody( const pipeNodes = subs(sourceNode, "pipeSegment"); if (pipeNodes.length === 0) { - const { root, segments, safe, rootSafe, segmentSafe } = - extractAddressPath(headNode); + const extracted = extractAddressPath(headNode); + assertNoDynamicIndices(extracted, lineNum, "source expression"); + const { root, segments, safe, rootSafe, segmentSafe } = extracted; let ref: NodeRef; const iterRef = resolveIterRef(root, segments, iterScope); if (iterRef) { @@ -4162,7 +4270,9 @@ function buildBridgeBody( // Validate all pipe handles for (const pipeNode of pipeChainNodes) { - const { root } = extractAddressPath(pipeNode); + const extracted = extractAddressPath(pipeNode); + assertNoDynamicIndices(extracted, lineNum, "pipe handle"); + const { root } = extracted; if (!handleRes.has(root)) { throw new Error( `Line ${lineNum}: Undeclared handle in pipe: "${root}". Add 'with as ${root}' to the bridge header.`, @@ -4170,13 +4280,15 @@ function buildBridgeBody( } } + const extractedSource = extractAddressPath(actualSourceNode); + assertNoDynamicIndices(extractedSource, lineNum, "pipe source expression"); const { root: srcRoot, segments: srcSegments, safe, rootSafe: srcRootSafe, segmentSafe: srcSegmentSafe, - } = extractAddressPath(actualSourceNode); + } = extractedSource; let prevOutRef: NodeRef; const iterRef = resolveIterRef(srcRoot, srcSegments, iterScope); if (iterRef) { @@ -4189,8 +4301,9 @@ function buildBridgeBody( const reversed = [...pipeChainNodes].reverse(); for (let idx = 0; idx < reversed.length; idx++) { const pNode = reversed[idx]; - const { root: handleName, segments: handleSegs } = - extractAddressPath(pNode); + const extractedHandle = extractAddressPath(pNode); + assertNoDynamicIndices(extractedHandle, lineNum, "pipe handle"); + const { root: handleName, segments: handleSegs } = extractedHandle; const fieldName = handleSegs.length > 0 ? handleSegs.join(".") : "in"; const res = handleRes.get(handleName)!; const forkInstance = 100000 + nextForkSeq++; @@ -4542,7 +4655,9 @@ function buildBridgeBody( // Check for element/iterator-relative refs const headNode = sub(srcNode, "head")!; const pipeSegs = subs(srcNode, "pipeSegment"); - const { root, segments, safe: sourceSafe } = extractAddressPath(headNode); + const extracted = extractAddressPath(headNode); + assertNoDynamicIndices(extracted, lineNum, "expression operand"); + const { root, segments, safe: sourceSafe } = extracted; const iterRef = pipeSegs.length === 0 ? resolveIterRef(root, segments, iterScope) @@ -4598,7 +4713,9 @@ function buildBridgeBody( let innerSafe = safe; const headNode = sub(innerSourceNode, "head")!; const pipeSegs = subs(innerSourceNode, "pipeSegment"); - const { root, segments, safe: srcSafe } = extractAddressPath(headNode); + const extracted = extractAddressPath(headNode); + assertNoDynamicIndices(extracted, lineNum, "parenthesized expression"); + const { root, segments, safe: srcSafe } = extracted; const iterRef = pipeSegs.length === 0 ? resolveIterRef(root, segments, iterScope) @@ -5026,7 +5143,6 @@ function buildBridgeBody( } const toRef = resolveAddress(targetRoot, fullSegs, scopeLineNum); - assertNoTargetIndices(toRef, scopeLineNum); // ── Constant wire: .field = value ── if (sc.scopeEquals) { @@ -5610,11 +5726,9 @@ function buildBridgeBody( const wireLoc = locFromNode(wireNode); // Parse target - const { root: targetRoot, segments: targetSegs } = extractAddressPath( - sub(wireNode, "target")!, - ); + const extractedTarget = extractAddressPath(sub(wireNode, "target")!); + const { root: targetRoot, segments: targetSegs } = extractedTarget; const toRef = resolveAddress(targetRoot, targetSegs, lineNum); - assertNoTargetIndices(toRef, lineNum); // ── Constant wire: target = value ── if (wc.equalsOp) { @@ -5766,6 +5880,35 @@ function buildBridgeBody( // Array mapping? const arrayMappingNode = (wc.arrayMapping as CstNode[] | undefined)?.[0]; if (arrayMappingNode) { + const iterName = extractNameToken(sub(arrayMappingNode, "iterName")!); + let dispatchIndexRef: NodeRef | undefined; + if (extractedTarget.dynamicIndices?.length) { + if ( + toRef.module !== SELF_MODULE || + toRef.type !== bridgeType || + toRef.field !== bridgeField || + extractedTarget.dynamicIndices.length !== 1 || + extractedTarget.dynamicIndices[0]!.position !== targetSegs.length + ) { + throw new Error( + `Line ${lineNum}: Computed output indices are only supported on array-mapping targets like o[item.index] <- src[] as item { ... } or o.messages[item.index] <- src[] as item { ... }`, + ); + } + + const dynamicIndex = extractedTarget.dynamicIndices[0]!; + const iterRef = resolveIterRef( + dynamicIndex.root, + dynamicIndex.segments, + iterName, + ); + if (!iterRef) { + throw new Error( + `Line ${lineNum}: Computed output index must come from the current array iterator.`, + ); + } + dispatchIndexRef = iterRef; + } + const firstSourceNode = sub(wireNode, "firstSource"); const firstParenNode = sub(wireNode, "firstParenExpr"); const srcRef = firstParenNode @@ -5819,6 +5962,7 @@ function buildBridgeBody( ? { catchFallbackRef: arrayCatchFallbackRef } : {}), ...(arrayCatchControl ? { catchControl: arrayCatchControl } : {}), + ...(dispatchIndexRef ? { dispatchIndexRef } : {}), }; wires.push( withLoc({ from: srcRef, to: toRef, ...arrayWireAttrs }, wireLoc), @@ -5826,12 +5970,20 @@ function buildBridgeBody( wires.push(...arrayFallbackInternalWires); wires.push(...arrayCatchFallbackInternalWires); - const iterName = extractNameToken(sub(arrayMappingNode, "iterName")!); assertNotReserved(iterName, lineNum, "iterator handle"); const arrayToPath = toRef.path; arrayIterators[arrayToPath.join(".")] = iterName; // Process element lines (supports nested array mappings recursively) + // Set elemTrunk to the root wire's target so that element wires + // (and iterator refs resolved via buildSourceExpr) target the + // correct trunk — tool trunk or output trunk. + const savedElemTrunk = elemTrunk; + elemTrunk = { + module: toRef.module, + type: toRef.type, + field: toRef.field, + }; const elemWithDecls = subs(arrayMappingNode, "elementWithDecl"); const elemToolWithDecls = subs(arrayMappingNode, "elementToolWithDecl"); const { writableHandles, cleanup: toolCleanup } = @@ -5846,8 +5998,9 @@ function buildBridgeBody( subs(arrayMappingNode, "elementLine"), arrayToPath, iterName, - bridgeType, - bridgeField, + toRef.module, + toRef.type, + toRef.field, wires, arrayIterators, buildSourceExpr, @@ -5863,9 +6016,12 @@ function buildBridgeBody( ); cleanup(); toolCleanup(); + elemTrunk = savedElemTrunk; continue; } + assertNoDynamicIndices(extractedTarget, lineNum, "wire targets"); + const firstSourceNode = sub(wireNode, "firstSource"); const firstParenNode = sub(wireNode, "firstParenExpr"); const sourceParts: { ref: NodeRef; isPipeFork: boolean }[] = []; diff --git a/packages/bridge-stdlib/src/index.ts b/packages/bridge-stdlib/src/index.ts index 01fc32a9..7f0c51e7 100644 --- a/packages/bridge-stdlib/src/index.ts +++ b/packages/bridge-stdlib/src/index.ts @@ -9,6 +9,8 @@ */ import { audit } from "./tools/audit.ts"; import { createHttpCall } from "./tools/http-call.ts"; +import { createHttpCallSSE } from "./tools/http-call-sse.ts"; +import { createAccumulate } from "./tools/accumulate.ts"; import * as arrays from "./tools/arrays.ts"; import * as strings from "./tools/strings.ts"; @@ -27,12 +29,16 @@ export const STD_VERSION = "1.5.0"; * Referenced in `.bridge` files as `std.str.toUpperCase`, `std.arr.first`, etc. */ const httpCallFn = createHttpCall(); +const httpCallSSEFn = createHttpCallSSE(); +const accumulateFn = createAccumulate(); export const std = { str: strings, arr: arrays, audit, httpCall: httpCallFn, + httpCallSSE: httpCallSSEFn, + accumulate: accumulateFn, } as const; /** @@ -45,3 +51,4 @@ export const builtinToolNames: readonly string[] = Object.keys(std).map( ); export { createHttpCall } from "./tools/http-call.ts"; +export { createHttpCallSSE } from "./tools/http-call-sse.ts"; diff --git a/packages/bridge-stdlib/src/tools/accumulate.ts b/packages/bridge-stdlib/src/tools/accumulate.ts new file mode 100644 index 00000000..46cc813b --- /dev/null +++ b/packages/bridge-stdlib/src/tools/accumulate.ts @@ -0,0 +1,131 @@ +import type { StreamToolCallFn, ToolMetadata } from "@stackables/bridge-types"; + +/** + * Deep-merge `item` into `accumulator` in-place. + * + * Merge rules: + * - `undefined` / `null` values in `item` are skipped (keep accumulator value) + * - Both arrays → merge element-by-element (recurse for objects) + * - Both objects → recurse + * - Both strings → concatenate + * - Otherwise → overwrite with the new value + */ +function deepMergeStream( + accumulator: Record, + item: Record, +): void { + for (const key of Object.keys(item)) { + const newVal = item[key]; + if (newVal === undefined || newVal === null) continue; + const oldVal = accumulator[key]; + if (Array.isArray(oldVal) && Array.isArray(newVal)) { + for (let i = 0; i < newVal.length; i++) { + const oldElem = oldVal[i]; + const newElem = newVal[i]; + if (newElem === undefined || newElem === null) continue; + if ( + oldElem != null && + typeof oldElem === "object" && + !Array.isArray(oldElem) && + typeof newElem === "object" && + !Array.isArray(newElem) + ) { + deepMergeStream( + oldElem as Record, + newElem as Record, + ); + } else { + oldVal[i] = newElem; + } + } + } else if ( + oldVal != null && + typeof oldVal === "object" && + !Array.isArray(oldVal) && + typeof newVal === "object" && + !Array.isArray(newVal) + ) { + deepMergeStream( + oldVal as Record, + newVal as Record, + ); + } else if (typeof oldVal === "string" && typeof newVal === "string") { + accumulator[key] = oldVal + newVal; + } else { + accumulator[key] = newVal; + } + } +} + +/** + * `std.accumulate` — a stream-to-stream tool that wraps a source stream + * with deep-merge accumulation and optional throttling. + * + * Receives a source async generator via `input._source` (set by the engine + * when a StreamHandle is wired to this tool's trunk at the root path). + * Iterates the source, deep-merges each item into an accumulator, and + * yields the full accumulated state. + * + * Options (set via tool wires): + * - `.interval` (number, ms) — minimum time between emissions. When set, + * intermediate items are merged silently and only emitted once the + * interval has elapsed. The final accumulated state is always emitted. + * + * Bridge usage: + * ```bridge + * tool buf from std.accumulate { + * .interval = 100 + * } + * + * bridge Mutation.deepseekStream { + * with deepseekApi as api + * with buf + * with output as o + * + * buf <- api[] as result { + * .role <- result.choices[0].delta.role + * .content <- result.choices[0].delta.content + * } + * o[0] <- buf[] as a { + * ...a + * } + * } + * ``` + */ +export function createAccumulate(): StreamToolCallFn & { + bridge: ToolMetadata; +} { + async function* accumulate( + input: Record, + ): AsyncGenerator { + const source: AsyncIterable | undefined = input._source; + if (!source) return; + + const interval = typeof input.interval === "number" ? input.interval : 0; + const accumulator: Record = {}; + let lastYieldTime = 0; + let pending = false; + + for await (const item of source) { + if (item != null && typeof item === "object" && !Array.isArray(item)) { + deepMergeStream(accumulator, item as Record); + } + const now = Date.now(); + if (interval <= 0 || now - lastYieldTime >= interval) { + yield structuredClone(accumulator); + lastYieldTime = now; + pending = false; + } else { + pending = true; + } + } + + // Always emit the final accumulated state + if (pending) { + yield structuredClone(accumulator); + } + } + + accumulate.bridge = { stream: true } as const; + return accumulate; +} diff --git a/packages/bridge-stdlib/src/tools/http-call-sse.ts b/packages/bridge-stdlib/src/tools/http-call-sse.ts new file mode 100644 index 00000000..f1ac50b4 --- /dev/null +++ b/packages/bridge-stdlib/src/tools/http-call-sse.ts @@ -0,0 +1,122 @@ +import type { StreamToolCallFn, ToolMetadata } from "@stackables/bridge-types"; + +/** + * Create an SSE (Server-Sent Events) HTTP tool — a stream tool that makes + * an HTTP request and yields each SSE `data:` frame as a parsed JSON object. + * + * Designed for APIs that return `text/event-stream` responses (e.g. OpenAI, + * Deepseek, Anthropic streaming completions). + * + * Input shape matches `httpCall`: + * { baseUrl, method?, path?, headers?, ...shorthandFields } + * + * Routing rules (same as httpCall): + * - GET: shorthand fields → query string parameters + * - POST/PUT/PATCH/DELETE: shorthand fields → JSON body + * - `headers` object passed as HTTP headers + * - `baseUrl` + `path` concatenated for the URL + * + * Each yielded value is the parsed JSON from one `data:` line. + * Lines with `data: [DONE]` terminate the stream. + * + * @param fetchFn - Fetch implementation (override for testing) + */ +export function createHttpCallSSE( + fetchFn: typeof fetch = globalThis.fetch, +): StreamToolCallFn & { bridge: ToolMetadata } { + async function* httpCallSSE( + input: Record, + ): AsyncGenerator { + const { + baseUrl = "", + method = "POST", + path = "", + headers: inputHeaders = {}, + cache: _cache, + ...rest + } = input; + + // Build URL + const url = new URL(baseUrl + path); + + // Collect headers + const headers: Record = {}; + for (const [key, value] of Object.entries(inputHeaders)) { + if (value != null) headers[key] = String(value); + } + + // GET: shorthand fields → query string + if (method === "GET") { + for (const [key, value] of Object.entries(rest)) { + if (value != null) { + url.searchParams.set(key, String(value)); + } + } + } + + // Non-GET: shorthand fields → JSON body + let body: string | undefined; + if (method !== "GET") { + const bodyObj: Record = {}; + for (const [key, value] of Object.entries(rest)) { + if (value != null) bodyObj[key] = value; + } + if (Object.keys(bodyObj).length > 0) { + body = JSON.stringify(bodyObj); + headers["Content-Type"] ??= "application/json"; + } + } + + const response = await fetchFn(url.toString(), { method, headers, body }); + + if (!response.ok) { + throw new Error( + `httpCallSSE: HTTP ${response.status} ${response.statusText}`, + ); + } + + if (!response.body) { + throw new Error("httpCallSSE: response has no readable body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // SSE events are separated by blank lines (\n\n) + const segments = buffer.split("\n\n"); + // Keep the last (possibly incomplete) segment in the buffer + buffer = segments.pop()!; + + for (const segment of segments) { + if (!segment.trim()) continue; + + for (const line of segment.split("\n")) { + if (!line.startsWith("data:")) continue; + + const data = line.slice(5).trimStart(); + if (data === "[DONE]") return; + + try { + yield JSON.parse(data); + } catch { + // Non-JSON data line — skip + } + } + } + } + } finally { + reader.releaseLock(); + } + } + + httpCallSSE.bridge = { stream: true } as const; + return httpCallSSE; +} diff --git a/packages/bridge-stdlib/test/builtin-tools.test.ts b/packages/bridge-stdlib/test/builtin-tools.test.ts index c557cc67..8ca6ba44 100644 --- a/packages/bridge-stdlib/test/builtin-tools.test.ts +++ b/packages/bridge-stdlib/test/builtin-tools.test.ts @@ -100,9 +100,7 @@ describe("filter tool", () => { test("filters by multiple criteria", () => { const result = std.arr.filter({ in: data, role: "user", name: "Charlie" }); - assert.deepEqual(result, [ - { id: 3, name: "Charlie", role: "user" }, - ]); + assert.deepEqual(result, [{ id: 3, name: "Charlie", role: "user" }]); }); test("returns empty array when no match", () => { @@ -219,12 +217,14 @@ describe("std bundle", () => { test("std namespace contains transform tools", () => { assert.ok(std.audit, "audit present"); assert.ok(std.httpCall, "httpCall present"); + assert.ok(std.httpCallSSE, "httpCallSSE present"); + assert.ok(std.accumulate, "accumulate present"); assert.ok(std.str.toUpperCase, "upperCase present"); assert.ok(std.str.toLowerCase, "lowerCase present"); assert.ok(std.arr.find, "findObject present"); assert.ok(std.arr.first, "pickFirst present"); assert.ok(std.arr.toArray, "toArray present"); - assert.equal(Object.keys(std).length, 4); + assert.equal(Object.keys(std).length, 6); }); test("httpCall is callable with std. prefix", () => { diff --git a/packages/bridge-types/src/index.ts b/packages/bridge-types/src/index.ts index d53087fd..a21cc48b 100644 --- a/packages/bridge-types/src/index.ts +++ b/packages/bridge-types/src/index.ts @@ -98,6 +98,26 @@ export interface ToolMetadata { */ batch?: true | BatchToolMetadata; + /** + * If true, the tool is an async generator that yields incremental results. + * + * Stream tools must return an `AsyncGenerator` (use `async function*`). + * Each yielded value is delivered as an incremental patch via + * `executeBridgeStream()`. The engine collects yielded items into an + * array at the target output path. + * + * ```ts + * async function* aiStream(input) { + * yield { token: "Hello" }; + * yield { token: " world" }; + * } + * aiStream.bridge = { stream: true }; + * ``` + * + * Mutually exclusive with `batch` and `sync`. + */ + stream?: boolean; + // ─── Observability ──────────────────────────────────────────────────── /** @@ -145,6 +165,29 @@ export type BatchToolFn< bridge?: ToolMetadata; }; +/** + * Stream tool call function — async generator signature for tools + * declared with `{ stream: true }` metadata. + * + * The engine iterates the generator, collecting each yielded value as + * an incremental patch delivered through `executeBridgeStream()`. + */ +export type StreamToolCallFn< + Input extends Record = Record, + Output = any, +> = ( + input: Input, + context?: ToolContext, +) => AsyncGenerator; + +/** Stream tool function with optional `.bridge` metadata attached. */ +export type StreamToolFn< + Input extends Record = Record, + Output = any, +> = StreamToolCallFn & { + bridge?: ToolMetadata; +}; + /** * Recursive tool map — supports namespaced tools via nesting. * @@ -157,8 +200,10 @@ export type ToolMap = { [key: string]: | ToolCallFn | BatchToolCallFn + | StreamToolCallFn | ScalarToolFn | BatchToolFn + | StreamToolFn | ((...args: any[]) => any) | ToolMap; }; diff --git a/packages/bridge/test/bridge-format.test.ts b/packages/bridge/test/bridge-format.test.ts index 15cd9836..095ad496 100644 --- a/packages/bridge/test/bridge-format.test.ts +++ b/packages/bridge/test/bridge-format.test.ts @@ -767,6 +767,23 @@ sb.q <- i.query source: "auth.access_token", }); }); + + test("parses tool wire expressions", () => { + const result = parseBridge(`version 1.5 + +tool deepseekApi from httpCall { + with context as ctx + .headers.Authorization <- ctx.token ? "Bearer {ctx.token}" : "" + .timeoutMs <- ctx.baseTimeout + 250 +}`); + const tool = result.instructions.find( + (i): i is ToolDef => i.kind === "tool" && i.name === "deepseekApi", + )!; + assert.deepEqual( + tool.wires.map((wire) => wire.kind), + ["expr", "expr"], + ); + }); }); // ── Tool roundtrip ────────────────────────────────────────────────────────── @@ -824,6 +841,20 @@ sg.to <- i.to sg.content <- i.body o.messageId <- sg.id +}`; + const instructions = parseBridge(input); + assertDeepStrictEqualIgnoringLoc( + parseBridge(serializeBridge(instructions)), + instructions, + ); + }); + + test("tool wire expressions roundtrip", () => { + const input = `version 1.5 +tool deepseekApi from httpCall { + with context as ctx + .headers.Authorization <- ctx.token ? "Bearer {ctx.token}" : "" + .timeoutMs <- ctx.baseTimeout + 250 }`; const instructions = parseBridge(input); assertDeepStrictEqualIgnoringLoc( diff --git a/packages/bridge/test/execute-bridge.test.ts b/packages/bridge/test/execute-bridge.test.ts index a4b936e8..9ee53957 100644 --- a/packages/bridge/test/execute-bridge.test.ts +++ b/packages/bridge/test/execute-bridge.test.ts @@ -150,6 +150,43 @@ bridge Query.getUser { }); }); + describe("tool wire expressions", () => { + const bridgeText = `version 1.5 +tool deepseekApi from httpCall { + with context as ctx + .headers.Authorization <- ctx.token ? "Bearer {ctx.token}" : "" + .timeoutMs <- ctx.baseTimeout + 250 +} +bridge Query.demo { + with deepseekApi as api + with output as o + + o.auth <- api.headers.Authorization + o.timeoutMs <- api.timeoutMs +}`; + + test("tool defs evaluate ternary and arithmetic inputs", async () => { + let captured: any; + const tools = { + httpCall: async (input: any) => { + captured = input; + return input; + }, + }; + + const { data } = await run(bridgeText, "Query.demo", {}, tools, { + context: { token: "secret", baseTimeout: 750 }, + }); + + assert.equal(captured.headers.Authorization, "Bearer secret"); + assert.equal(captured.timeoutMs, 1000); + assert.deepEqual(data, { + auth: "Bearer secret", + timeoutMs: 1000, + }); + }); + }); + // ── Array output (o <- items[] as x { ... }) ──────────────────────────────── describe("array output", () => { @@ -930,6 +967,59 @@ bridge Query.echo { assert.ok(traces.length > 0); assert.ok(traces.some((t) => t.tool === "myTool")); }); + + test("internal concat helper does not emit trace entries", async () => { + const { data, traces } = await ctx.executeFn({ + document: parseBridge(`version 1.5 +bridge Query.echo { + with input as i + with output as o + + o.result <- "Hello, {i.name}!" +}`), + operation: "Query.echo", + input: { name: "World" }, + trace: "full", + }); + + assert.deepEqual(data, { result: "Hello, World!" }); + assert.deepEqual(traces, []); + }); + + test("stream tools emit trace entries when tracing is enabled", async () => { + async function* httpSSE(input: { q: string }) { + yield { chunk: `${input.q}-1` }; + yield { chunk: `${input.q}-2` }; + } + httpSSE.bridge = { stream: true } as const; + + const { data, traces } = await ctx.executeFn({ + document: parseBridge(`version 1.5 +bridge Query.echo { + with httpSSE as s + with input as i + with output as o + + s.q <- i.q + o.items <- s +}`), + operation: "Query.echo", + input: { q: "token" }, + tools: { httpSSE }, + trace: "full", + }); + + assert.deepEqual(data, { + items: [{ chunk: "token-1" }, { chunk: "token-2" }], + }); + assert.equal(traces.length, 1); + assert.equal(traces[0]?.tool, "httpSSE"); + assert.deepEqual(traces[0]?.input, { q: "token" }); + assert.deepEqual(traces[0]?.output, [ + { chunk: "token-1" }, + { chunk: "token-2" }, + ]); + }); }); // ── Error handling ────────────────────────────────────────────────────────── diff --git a/packages/bridge/test/parser-compat.test.ts b/packages/bridge/test/parser-compat.test.ts index 3dfdea1e..0e2ce136 100644 --- a/packages/bridge/test/parser-compat.test.ts +++ b/packages/bridge/test/parser-compat.test.ts @@ -176,6 +176,16 @@ tool myTool from std.httpCall { }`, ); + compat( + "tool wire expression with ternary and arithmetic", + `version 1.5 +tool myTool from std.httpCall { + with context as ctx + .headers.Authorization <- ctx.token ? "Bearer {ctx.token}" : "" + .timeoutMs <- ctx.baseTimeout + 250 +}`, + ); + compat( "null + error coalesce combined", `version 1.5 diff --git a/packages/bridge/test/runtime-error-format.test.ts b/packages/bridge/test/runtime-error-format.test.ts index 6572c569..439b0752 100644 --- a/packages/bridge/test/runtime-error-format.test.ts +++ b/packages/bridge/test/runtime-error-format.test.ts @@ -125,6 +125,34 @@ bridge Query.location { o.lon <- geo[0].lon }`; +const bridgeAliasHttpErrorText = `version 1.5 + +tool deepseekApi from std.httpCall { + .baseUrl = "https://api.deepseek.com" + .method = POST + .path = "/chat/completions" + .headers.Content-Type = "application/json" +} + +bridge Mutation.deepseekChat { + with deepseekApi as api + with input as i + with context as ctx + with output as o + + api.headers.Authorization <- "Bearer {ctx.DEEPSEEK_API_KEY}" + api.model = "deepseek-chat" + api.stream = false + api.messages <- i.messages + + alias api.choices as choices + + o <- choices[] as c { + .role <- c.message.role + .content <- c.message.content + } +}`; + function maxCaretCount(formatted: string): number { return Math.max( 0, @@ -431,4 +459,43 @@ describe("runtime error formatting", () => { }, ); }); + + test("simple alias pulls wrap async tool failures with the alias location", async () => { + const document = parseBridge(bridgeAliasHttpErrorText, { + filename: "playground.bridge", + }); + + await assert.rejects( + () => + executeBridge({ + document, + operation: "Mutation.deepseekChat", + input: { messages: [] }, + context: { DEEPSEEK_API_KEY: "secret" }, + tools: { + std: { + httpCall: async () => { + throw new SyntaxError( + `Unexpected token 'A', "Authentica"... is not valid JSON`, + ); + }, + }, + }, + }), + (err: unknown) => { + const formatted = formatBridgeError(err); + assert.match( + formatted, + /Bridge Execution Error: Unexpected token 'A', "Authentica"\.\.\. is not valid JSON/, + ); + assert.match(formatted, /playground\.bridge:21:/); + assert.match(formatted, /alias api\.choices as choices/); + assert.doesNotMatch( + formatted, + /Bridge Execution Error: Unexpected token 'A'.*\n$/s, + ); + return true; + }, + ); + }); }); diff --git a/packages/bridge/test/scope-and-edges.test.ts b/packages/bridge/test/scope-and-edges.test.ts index 964f92f9..1a919093 100644 --- a/packages/bridge/test/scope-and-edges.test.ts +++ b/packages/bridge/test/scope-and-edges.test.ts @@ -7,6 +7,7 @@ import { parsePath, serializeBridge, } from "../src/index.ts"; +import type { Bridge } from "../src/index.ts"; import { createGateway } from "./_gateway.ts"; // ═══════════════════════════════════════════════════════════════════════════ @@ -373,7 +374,7 @@ describe("array index in output path", () => { assert.deepStrictEqual(segments, ["results", "0", "lat"]); }); - test("explicit index on output LHS should either error at parse or work at runtime", () => { + test("explicit index on output LHS is allowed for dispatch syntax", () => { const bridgeText = `version 1.5 bridge Query.thing { with api as a @@ -385,30 +386,87 @@ o.items[0].name <- a.firstName }`; - // Currently: parses fine but wire path ["items","0","name"] never matches - // at runtime because response() strips indices from the GraphQL path. - // This is the silent-failure scenario — the worst option. - // - // Expected: either throw at parse time (Option A — preferred) - // or make it work at runtime (Option B). - let parsed = false; - let parseError: Error | undefined; - try { - parseBridge(bridgeText); - parsed = true; - } catch (e) { - parseError = e as Error; - } + // Numeric indices on the target side are now allowed to support + // dispatch syntax: `o[0] <- stream[] as chunk { ... }`. + // The parser produces path ["items","0","name"] on the target. + const doc = parseBridge(bridgeText); + const bridge = doc.instructions.find( + (i): i is Bridge => i.kind === "bridge", + )!; + const wire = bridge.wires.find( + (w) => + "from" in w && + w.to.path.length === 3 && + w.to.path[0] === "items" && + w.to.path[1] === "0" && + w.to.path[2] === "name", + ); + assert.ok(wire, "wire with numeric index in target path should exist"); + }); - if (parsed) { - assert.fail( - "KNOWN ISSUE: explicit index on output LHS parses but silently produces null at runtime. " + - "Parser should reject `o.items[0].name` — use array mapping blocks instead.", - ); - } else { - // Fixed: parser rejects explicit indices on the target side - assert.ok(parseError!.message.length > 0, "should give a useful error"); - } + test("computed index on root output array mapping is preserved", () => { + const bridgeText = `version 1.5 +bridge Query.thing { + with api as a + with output as o + + o[c.index] <- a.items[] as c { + .name <- c.name + } + +}`; + + const doc = parseBridge(bridgeText); + const bridge = doc.instructions.find( + (i): i is Bridge => i.kind === "bridge", + )!; + const wire = bridge.wires.find( + (w) => "from" in w && w.to.path.length === 0, + ); + assert.ok(wire, "root array mapping wire should exist"); + assert.deepEqual(wire.dispatchIndexRef, { + module: "_", + type: "Query", + field: "thing", + element: true, + path: ["index"], + }); + + const serialized = serializeBridge(doc); + assert.match(serialized, /o\[c\.index\] <- a\.items\[\] as c \{/); + }); + + test("computed index on nested output array mapping is preserved", () => { + const bridgeText = `version 1.5 +bridge Query.thing { + with api as a + with output as o + + o.messages[c.index] <- a.items[] as c { + .name <- c.name + } + +}`; + + const doc = parseBridge(bridgeText); + const bridge = doc.instructions.find( + (i): i is Bridge => i.kind === "bridge", + )!; + const wire = bridge.wires.find( + (w) => + "from" in w && w.to.path.length === 1 && w.to.path[0] === "messages", + ); + assert.ok(wire, "nested array mapping wire should exist"); + assert.deepEqual(wire.dispatchIndexRef, { + module: "_", + type: "Query", + field: "thing", + element: true, + path: ["index"], + }); + + const serialized = serializeBridge(doc); + assert.match(serialized, /o\.messages\[c\.index\] <- a\.items\[\] as c \{/); }); }); diff --git a/packages/bridge/test/stream.test.ts b/packages/bridge/test/stream.test.ts new file mode 100644 index 00000000..686d2e75 --- /dev/null +++ b/packages/bridge/test/stream.test.ts @@ -0,0 +1,1452 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; +import { + parseBridgeFormat as parseBridge, + executeBridgeStream, + isStreamHandle, + formatBridgeError, +} from "../src/index.ts"; +import type { + StreamPayload, + StreamInitialPayload, + StreamIncrementalPayload, +} from "../src/index.ts"; +import { forEachEngine } from "./_dual-run.ts"; + +// node:test describe is re-exported via forEachEngine; import for stream-only tests +import { describe } from "node:test"; + +// ── Helpers ────────────────────────────────────────────────────────────────── + +function parse(bridgeText: string) { + const raw = parseBridge(bridgeText); + return JSON.parse(JSON.stringify(raw)) as ReturnType; +} + +async function collectPayloads( + stream: AsyncGenerator, void, undefined>, +): Promise[]> { + const payloads: StreamPayload[] = []; + for await (const payload of stream) { + payloads.push(payload); + if (!payload.hasNext) break; + } + return payloads; +} + +// ── Stream tool factories ──────────────────────────────────────────────────── + +function createStreamTool(items: unknown[]) { + async function* streamTool() { + for (const item of items) { + yield item; + } + } + streamTool.bridge = { stream: true } as const; + return streamTool; +} + +// ══════════════════════════════════════════════════════════════════════════════ +// Tests +// ══════════════════════════════════════════════════════════════════════════════ + +describe("executeBridgeStream", () => { + const simpleBridge = `version 1.5 +bridge Query.search { + with searchApi as api + with input as i + with output as o + + api.query <- i.query + o.name <- api.name + o.items <- api.items +}`; + + describe("no stream tools — single payload", () => { + test("returns single payload with hasNext: false", async () => { + const tools = { + searchApi: async () => ({ + name: "Results", + items: [{ sku: "A" }, { sku: "B" }], + }), + }; + + const document = parse(simpleBridge); + const stream = executeBridgeStream({ + document, + operation: "Query.search", + input: { query: "shoes" }, + tools, + }); + + const payloads = await collectPayloads(stream); + assert.equal(payloads.length, 1); + + const first = payloads[0]! as StreamInitialPayload; + assert.equal("data" in first, true); + assert.equal(first.hasNext, false); + assert.deepEqual(first.data, { + name: "Results", + items: [{ sku: "A" }, { sku: "B" }], + }); + }); + }); + + describe("stream tool — direct output wiring", () => { + const streamBridge = `version 1.5 +bridge Query.products { + with productStream as ps + with staticInfo as info + with input as i + with output as o + + ps.query <- i.query + info.category <- i.category + o.title <- info.title + o.items <- ps +}`; + + test("yields initial data then incremental items", async () => { + const streamItems = [ + { sku: "PROD-001", name: "Widget" }, + { sku: "PROD-002", name: "Gadget" }, + { sku: "PROD-003", name: "Doohickey" }, + ]; + + const productStream = createStreamTool(streamItems); + const tools = { + productStream, + staticInfo: (input: { category: string }) => ({ + title: `${input.category} Products`, + }), + }; + + const document = parse(streamBridge); + const stream = executeBridgeStream({ + document, + operation: "Query.products", + input: { query: "all", category: "Electronics" }, + tools, + }); + + const payloads = await collectPayloads(stream); + + // First payload should have data with items: [] + const initial = payloads[0]! as StreamInitialPayload; + assert.equal("data" in initial, true); + assert.equal(initial.hasNext, true); + assert.deepEqual((initial.data as any).title, "Electronics Products"); + assert.deepEqual((initial.data as any).items, []); + + // Subsequent payloads should have incremental items + const incrementals = payloads.slice(1) as StreamIncrementalPayload[]; + assert.ok(incrementals.length > 0, "Should have incremental payloads"); + + // Collect all items from incremental payloads + const allItems: unknown[] = []; + for (const inc of incrementals) { + if (inc.incremental) { + for (const entry of inc.incremental) { + allItems.push(...entry.items); + } + } + } + assert.deepEqual(allItems, streamItems); + + // Last payload should have hasNext: false + const last = payloads[payloads.length - 1]!; + assert.equal(last.hasNext, false); + }); + + test("stream with single item", async () => { + const productStream = createStreamTool([{ sku: "ONLY-1" }]); + const tools = { + productStream, + staticInfo: () => ({ title: "One Item" }), + }; + + const document = parse(streamBridge); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.products", + input: { query: "one", category: "Test" }, + tools, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual((initial.data as any).items, []); + assert.equal(initial.hasNext, true); + + // Should eventually get the single item + const allItems: unknown[] = []; + for (const p of payloads.slice(1)) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + assert.deepEqual(allItems, [{ sku: "ONLY-1" }]); + }); + + test("empty stream yields only initial payload", async () => { + const productStream = createStreamTool([]); + const tools = { + productStream, + staticInfo: () => ({ title: "Empty" }), + }; + + const document = parse(streamBridge); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.products", + input: { query: "none", category: "Test" }, + tools, + }), + ); + + // With an empty stream generator, the handle is still detected, + // so initial has hasNext: true, then a final payload with hasNext: false + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual((initial.data as any).items, []); + + const last = payloads[payloads.length - 1]!; + assert.equal(last.hasNext, false); + }); + }); + + describe("incremental payload format", () => { + const streamBridge = `version 1.5 +bridge Query.feed { + with feedStream as fs + with output as o + + o.entries <- fs +}`; + + test("incremental items have correct path and index", async () => { + const feedStream = createStreamTool(["a", "b", "c"]); + + const document = parse(streamBridge); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.feed", + input: {}, + tools: { feedStream }, + }), + ); + + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => "incremental" in p, + ); + + // Each incremental item should reference path ["entries", index] + let index = 0; + for (const inc of incrementals) { + for (const entry of inc.incremental) { + assert.deepEqual(entry.path, ["entries", index]); + index++; + } + } + }); + }); + + describe("stream tool with input wiring", () => { + const bridgeText = `version 1.5 +bridge Query.aiResponse { + with aiStream as ai + with input as i + with output as o + + ai.prompt <- i.prompt + ai.model <- i.model + o.tokens <- ai +}`; + + test("stream tool receives wired input", async () => { + let receivedInput: any; + async function* aiStream(input: { prompt: string; model: string }) { + receivedInput = input; + yield { token: "Hello" }; + yield { token: " world" }; + } + aiStream.bridge = { stream: true } as const; + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.aiResponse", + input: { prompt: "Hi", model: "gpt-4" }, + tools: { aiStream }, + }), + ); + + assert.deepEqual(receivedInput, { prompt: "Hi", model: "gpt-4" }); + + const allTokens: unknown[] = []; + for (const p of payloads) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allTokens.push(...entry.items); + } + } + } + assert.deepEqual(allTokens, [{ token: "Hello" }, { token: " world" }]); + }); + }); + + describe("abort signal", () => { + test("stops stream iteration when aborted", async () => { + const controller = new AbortController(); + + async function* slowStream() { + yield { id: 1 }; + // Signal abort after first yield + controller.abort(); + yield { id: 2 }; + yield { id: 3 }; + } + slowStream.bridge = { stream: true } as const; + + describe("stream errors", () => { + test("stream tool errors propagate after the initial payload", async () => { + async function* failingStream() { + yield { chunk: "partial" }; + throw new Error("HTTP 401"); + } + failingStream.bridge = { stream: true } as const; + + const document = parse(`version 1.5 + bridge Query.items { + with api as a + with output as o + + o.items <- a + }`); + + const stream = executeBridgeStream({ + document, + operation: "Query.items", + tools: { api: failingStream }, + trace: "full", + }); + + const first = (await stream.next()).value as StreamInitialPayload<{ + items: Array<{ chunk: string }>; + }>; + assert.ok("data" in first); + assert.equal(first.hasNext, true); + + const second = await stream.next(); + assert.equal(second.done, false); + assert.ok("incremental" in second.value); + assert.deepStrictEqual(second.value.incremental, [ + { items: [{ chunk: "partial" }], path: ["items", 0] }, + ]); + + await assert.rejects(() => stream.next(), /HTTP 401/); + }); + + test("stream tool errors keep bridge location for rich formatting", async () => { + async function* failingStream() { + yield { chunk: "partial" }; + throw new Error("HTTP 401"); + } + failingStream.bridge = { stream: true } as const; + + const bridgeText = `version 1.5 +bridge Query.items { + with api as a + with output as o + + o.items <- a +}`; + const document = parse(bridgeText); + + const stream = executeBridgeStream({ + document, + operation: "Query.items", + tools: { api: failingStream }, + }); + + await stream.next(); + await stream.next(); + + let thrown: unknown; + try { + await stream.next(); + } catch (err) { + thrown = err; + } + + assert.ok(thrown instanceof Error); + const formatted = formatBridgeError(thrown); + assert.match(formatted, /Bridge Execution Error: HTTP 401/); + assert.match(formatted, /--> :\d+:3/); + assert.match(formatted, /o\.items <- a/); + assert.match(formatted, /\^+/); + }); + + test("tool-consumed streams keep the consuming tool wire location", async () => { + async function* failingStream() { + yield { + choices: [{ delta: { role: "assistant", content: "hi" } }], + }; + throw new Error("httpCallSSE: HTTP 401"); + } + failingStream.bridge = { stream: true } as const; + + async function* passthroughBuffer(input: { + _source: AsyncGenerator; + }) { + for await (const item of input._source) { + yield item; + } + } + passthroughBuffer.bridge = { stream: true } as const; + + const bridgeText = `version 1.5 +bridge Mutation.deepseekStream { + with api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o[0] <- buf +}`; + const document = parse(bridgeText); + + const stream = executeBridgeStream({ + document, + operation: "Mutation.deepseekStream", + tools: { + api: failingStream, + buf: passthroughBuffer, + }, + }); + + await stream.next(); + await stream.next(); + + let thrown: unknown; + try { + await stream.next(); + } catch (err) { + thrown = err; + } + + assert.ok(thrown instanceof Error); + const formatted = formatBridgeError(thrown); + assert.match( + formatted, + /Bridge Execution Error: httpCallSSE: HTTP 401/, + ); + assert.match(formatted, /buf <- api\[\] as chunk \{/); + assert.doesNotMatch(formatted, /o\[0\] <- buf/); + }); + }); + + const bridgeText = `version 1.5 +bridge Query.data { + with slowStream as s + with output as o + o.items <- s +}`; + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.data", + input: {}, + tools: { slowStream }, + signal: controller.signal, + }), + ); + + // Should have initial + at most a few incrementals before abort + const initial = payloads[0]! as StreamInitialPayload; + assert.equal("data" in initial, true); + }); + }); + + describe("isStreamHandle", () => { + test("returns false for non-stream values", () => { + assert.equal(isStreamHandle(null), false); + assert.equal(isStreamHandle(undefined), false); + assert.equal(isStreamHandle(42), false); + assert.equal(isStreamHandle("hello"), false); + assert.equal(isStreamHandle({}), false); + assert.equal(isStreamHandle([]), false); + }); + }); + + describe("traces and executionTraceId", () => { + test("initial payload includes traces", async () => { + const bridgeText = `version 1.5 +bridge Query.simple { + with myTool as t + with output as o + o.value <- t +}`; + const myTool = () => 42; + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.simple", + input: {}, + tools: { myTool }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.ok("traces" in initial); + assert.ok("executionTraceId" in initial); + }); + }); + + describe("array mapping on streamed events", () => { + test("nested field: maps streamed items through element wires", async () => { + const bridgeText = `version 1.5 +bridge Query.products { + with productStream as ps + with output as o + + o.items <- ps[] as item { + .name <- item.rawName + .sku <- item.rawSku + } +}`; + const streamItems = [ + { rawName: "Widget", rawSku: "W-001" }, + { rawName: "Gadget", rawSku: "G-002" }, + { rawName: "Doohickey", rawSku: "D-003" }, + ]; + const productStream = createStreamTool(streamItems); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.products", + input: {}, + tools: { productStream }, + }), + ); + + // Initial payload should have items: [] + const initial = payloads[0]! as StreamInitialPayload; + assert.equal("data" in initial, true); + assert.equal(initial.hasNext, true); + assert.deepEqual((initial.data as any).items, []); + + // Incremental payloads should have mapped items + const allItems: unknown[] = []; + for (const p of payloads.slice(1)) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + + // Items should be transformed through array mapping + assert.deepEqual(allItems, [ + { name: "Widget", sku: "W-001" }, + { name: "Gadget", sku: "G-002" }, + { name: "Doohickey", sku: "D-003" }, + ]); + }); + + test("root-level: maps streamed items at output root", async () => { + const bridgeText = `version 1.5 +bridge Query.labels { + with labelStream as ls + with output as o + + o <- ls[] as item { + .label <- item.text + .id <- item.key + } +}`; + const streamItems = [ + { text: "Alpha", key: "a" }, + { text: "Beta", key: "b" }, + ]; + const labelStream = createStreamTool(streamItems); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.labels", + input: {}, + tools: { labelStream }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.equal(initial.hasNext, true); + assert.deepEqual(initial.data, []); + + const allItems: unknown[] = []; + for (const p of payloads.slice(1)) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + + assert.deepEqual(allItems, [ + { label: "Alpha", id: "a" }, + { label: "Beta", id: "b" }, + ]); + }); + + test("mixed: static fields alongside mapped stream", async () => { + const bridgeText = `version 1.5 +bridge Query.catalog { + with meta as m + with productStream as ps + with output as o + + o.title <- m.title + o.items <- ps[] as item { + .name <- item.rawName + } +}`; + const productStream = createStreamTool([ + { rawName: "Item A" }, + { rawName: "Item B" }, + ]); + const meta = () => ({ title: "My Catalog" }); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.catalog", + input: {}, + tools: { productStream, meta }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual((initial.data as any).title, "My Catalog"); + assert.deepEqual((initial.data as any).items, []); + assert.equal(initial.hasNext, true); + + const allItems: unknown[] = []; + for (const p of payloads.slice(1)) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + + assert.deepEqual(allItems, [{ name: "Item A" }, { name: "Item B" }]); + }); + + test("empty stream with array mapping yields no incremental items", async () => { + const bridgeText = `version 1.5 +bridge Query.empty { + with emptyStream as es + with output as o + + o.items <- es[] as item { + .name <- item.rawName + } +}`; + const emptyStream = createStreamTool([]); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.empty", + input: {}, + tools: { emptyStream }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual((initial.data as any).items, []); + + const last = payloads[payloads.length - 1]!; + assert.equal(last.hasNext, false); + + // No actual items should be delivered incrementally + const allItems: unknown[] = []; + for (const p of payloads) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + assert.deepEqual(allItems, []); + }); + + test("subtool inside array mapping on stream", async () => { + const bridgeText = `version 1.5 +bridge Query.catalog { + with meta as m + with productStream as ps + with output as o + + o.title <- m.title + o.items <- ps[] as item { + with asynctool as t + t.in <- item.rawName + .name <- t.rawValue + } +}`; + const productStream = createStreamTool([ + { rawName: "Widget" }, + { rawName: "Gadget" }, + ]); + const meta = () => ({ title: "My Catalog" }); + const asynctool = async (input: { in: string }) => ({ + rawValue: input.in.toUpperCase(), + }); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.catalog", + input: {}, + tools: { productStream, meta, asynctool }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual((initial.data as any).title, "My Catalog"); + assert.deepEqual((initial.data as any).items, []); + assert.equal(initial.hasNext, true); + + const allItems: unknown[] = []; + for (const p of payloads.slice(1)) { + if ("incremental" in p) { + for (const entry of p.incremental) { + allItems.push(...entry.items); + } + } + } + + assert.deepEqual(allItems, [{ name: "WIDGET" }, { name: "GADGET" }]); + }); + + test("computed dispatch index emits patches at item-provided positions", async () => { + const bridgeText = `version 1.5 +bridge Query.chat { + with chunkStream as s + with output as o + + o[c.index] <- s[] as c { + .role <- c.role + .content <- c.content + } +}`; + const chunkStream = createStreamTool([ + { index: 1, role: "assistant", content: "second" }, + { index: 0, role: "assistant", content: "first" }, + ]); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.chat", + input: {}, + tools: { chunkStream }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual(initial.data, []); + + const incrementals = payloads + .slice(1) + .filter((p): p is StreamIncrementalPayload => "incremental" in p); + assert.deepEqual( + incrementals.flatMap((p) => p.incremental), + [ + { + items: [{ role: "assistant", content: "second" }], + path: [1], + }, + { + items: [{ role: "assistant", content: "first" }], + path: [0], + }, + ], + ); + }); + + test("nested computed dispatch index emits patches at item-provided positions", async () => { + const bridgeText = `version 1.5 +bridge Query.chat { + with chunkStream as s + with output as o + + o.messages[c.index] <- s[] as c { + .role <- c.role + .content <- c.content + } +}`; + const chunkStream = createStreamTool([ + { index: 1, role: "assistant", content: "second" }, + { index: 0, role: "assistant", content: "first" }, + ]); + + const document = parse(bridgeText); + const payloads = await collectPayloads( + executeBridgeStream({ + document, + operation: "Query.chat", + input: {}, + tools: { chunkStream }, + }), + ); + + const initial = payloads[0]! as StreamInitialPayload; + assert.deepEqual(initial.data, { messages: [] }); + + const incrementals = payloads + .slice(1) + .filter((p): p is StreamIncrementalPayload => "incremental" in p); + assert.deepEqual( + incrementals.flatMap((p) => p.incremental), + [ + { + items: [{ role: "assistant", content: "second" }], + path: ["messages", 1], + }, + { + items: [{ role: "assistant", content: "first" }], + path: ["messages", 0], + }, + ], + ); + }); + }); +}); + +// ══════════════════════════════════════════════════════════════════════════════ +// Dual-engine tests (runtime + compiled) +// ══════════════════════════════════════════════════════════════════════════════ + +forEachEngine("stream tools — eager consumption", (run) => { + test("explicit root index materializes as a flat array", async () => { + const single = () => ({ role: "assistant", content: "hi" }); + const { data } = await run( + `version 1.5 +bridge Query.chat { + with single as s + with output as o + + o[0] <- s +}`, + "Query.chat", + {}, + { single }, + ); + assert.deepEqual(data, [{ role: "assistant", content: "hi" }]); + }); + + test("direct stream output consumed into array", async () => { + const itemStream = createStreamTool([{ id: 1 }, { id: 2 }, { id: 3 }]); + const { data } = await run( + `version 1.5 +bridge Query.items { + with itemStream as s + with input as i + with output as o + + s.query <- i.query + o.items <- s +}`, + "Query.items", + { query: "test" }, + { itemStream }, + ); + assert.deepEqual(data, { items: [{ id: 1 }, { id: 2 }, { id: 3 }] }); + }); + + test("array mapping on stream items", async () => { + const productStream = createStreamTool([ + { rawName: "Widget" }, + { rawName: "Gadget" }, + ]); + const { data } = await run( + `version 1.5 +bridge Query.products { + with productStream as ps + with output as o + + o.items <- ps[] as item { + .name <- item.rawName + } +}`, + "Query.products", + {}, + { productStream }, + ); + assert.deepEqual(data, { + items: [{ name: "Widget" }, { name: "Gadget" }], + }); + }); + + test("computed dispatch index materializes stream output by explicit slot", async () => { + const chunkStream = createStreamTool([ + { index: 1, role: "assistant", content: "second" }, + { index: 0, role: "assistant", content: "first" }, + ]); + const { data } = await run( + `version 1.5 +bridge Query.chat { + with chunkStream as s + with output as o + + o[c.index] <- s[] as c { + .role <- c.role + .content <- c.content + } +}`, + "Query.chat", + {}, + { chunkStream }, + ); + assert.deepEqual(data, [ + { role: "assistant", content: "first" }, + { role: "assistant", content: "second" }, + ]); + }); + + test("nested computed dispatch index materializes stream output by explicit slot", async () => { + const chunkStream = createStreamTool([ + { index: 1, role: "assistant", content: "second" }, + { index: 0, role: "assistant", content: "first" }, + ]); + const { data } = await run( + `version 1.5 +bridge Query.chat { + with chunkStream as s + with output as o + + o.messages[c.index] <- s[] as c { + .role <- c.role + .content <- c.content + } +}`, + "Query.chat", + {}, + { chunkStream }, + ); + assert.deepEqual(data, { + messages: [ + { role: "assistant", content: "first" }, + { role: "assistant", content: "second" }, + ], + }); + }); + + test("subtool inside array mapping on stream", async () => { + const productStream = createStreamTool([ + { rawName: "Widget" }, + { rawName: "Gadget" }, + ]); + const meta = () => ({ title: "My Catalog" }); + const asynctool = async (input: { in: string }) => ({ + rawValue: input.in.toUpperCase(), + }); + const { data } = await run( + `version 1.5 +bridge Query.catalog { + with meta as m + with productStream as ps + with output as o + + o.title <- m.title + o.items <- ps[] as item { + with asynctool as t + t.in <- item.rawName + .name <- t.rawValue + } +}`, + "Query.catalog", + {}, + { productStream, meta, asynctool }, + ); + assert.deepEqual(data, { + title: "My Catalog", + items: [{ name: "WIDGET" }, { name: "GADGET" }], + }); + }); +}); + +// ══════════════════════════════════════════════════════════════════════════════ +// Dispatch index (o[0]) + std.accumulate tests +// ══════════════════════════════════════════════════════════════════════════════ + +describe("dispatch index with accumulation", () => { + test("o[0] with std.accumulate accumulates and dispatches to fixed index", async () => { + // Simulates SSE streaming: each chunk has a delta with partial content + const chunks = [ + { choices: [{ delta: { role: "assistant", content: "" } }] }, + { choices: [{ delta: { content: "Hello" } }] }, + { choices: [{ delta: { content: " world" } }] }, + ]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.chat { + with chatApi as api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o[0] <- buf +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.chat", + tools: { chatApi: streamTool }, + }), + ); + + // Initial payload: stream field initialised to [] + const initial = payloads[0] as StreamInitialPayload; + assert.ok("data" in initial); + assert.deepStrictEqual(initial.data, []); + + // Incremental payloads with actual items + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + // All incremental items target path [0] (fixed index 0) + for (const inc of incrementals) { + for (const item of inc.incremental) { + assert.deepStrictEqual( + item.path, + [0], + "dispatch should target fixed index 0", + ); + } + } + + // Final accumulated state should have merged role + concatenated content + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + role: "assistant", + content: "Hello world", + }); + }); + + test("o[0] yields executionTraceId on incremental payloads", async () => { + const streamTool = createStreamTool([{ val: 1 }, { val: 2 }]); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.items { + with api as a + with buf + with output as o + + buf <- a[] as x { + .val <- x.val + } + + o[0] <- buf +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.items", + tools: { api: streamTool }, + trace: "full", + }), + ); + + // Initial has executionTraceId + const initial = payloads[0] as StreamInitialPayload; + assert.ok(initial.executionTraceId != null, "initial should have trace id"); + + // Incremental payloads also have executionTraceId + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => "incremental" in p, + ); + for (const inc of incrementals) { + assert.ok( + inc.executionTraceId != null, + "incremental should have trace id", + ); + } + }); + + test("stream tools add trace entries to the initial payload", async () => { + const streamTool = createStreamTool([{ val: 1 }, { val: 2 }]); + + const doc = parse(`version 1.5 + +bridge Query.items { + with api as a + with output as o + + o.items <- a +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.items", + tools: { api: streamTool }, + trace: "full", + }), + ); + + const initial = payloads[0] as StreamInitialPayload<{ + items: Array<{ val: number }>; + }>; + assert.ok("data" in initial); + assert.equal(initial.hasNext, true); + assert.equal(initial.traces?.length, 1); + assert.equal(initial.traces?.[0]?.tool, "api"); + assert.deepEqual(initial.traces?.[0]?.input, {}); + assert.deepEqual(initial.traces?.[0]?.output, [{ val: 1 }, { val: 2 }]); + }); + + test("mapping before accumulation with element wires", async () => { + // Map SSE deltas through element wires before feeding into accumulator. + const chunks = [ + { choices: [{ delta: { role: "assistant", content: "" } }] }, + { choices: [{ delta: { content: "Hi" } }] }, + { choices: [{ delta: { content: "!" } }] }, + ]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.chat { + with chatApi as api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o[0] <- buf[] as a { + .role <- a.role + .content <- a.content + } +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.chat", + tools: { chatApi: streamTool }, + }), + ); + + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + role: "assistant", + content: "Hi!", + }); + }); + + test("interval throttles emissions, final state always emitted", async () => { + // All items yield synchronously within the same tick. + // With a large interval, only the first + final should be emitted. + const chunks = [{ a: "1" }, { b: "2" }, { c: "3" }, { d: "4" }, { e: "5" }]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate { + .interval = 1000 +} + +bridge Query.items { + with src as s + with buf + with output as o + + buf <- s[] as x { + .a <- x.a + .b <- x.b + .c <- x.c + .d <- x.d + .e <- x.e + } + + o[0] <- buf +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.items", + tools: { src: streamTool }, + }), + ); + + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + // Large interval + synchronous source → first item emits (Date.now() ≫ 0), + // then remaining items are batched. Final state always emitted. + // Expect exactly 2 incremental payloads (first + final). + assert.ok( + incrementals.length <= 2, + `expected at most 2 incremental payloads, got ${incrementals.length}`, + ); + + // Final accumulated state has all keys merged + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + a: "1", + b: "2", + c: "3", + d: "4", + e: "5", + }); + }); + + test("o <- buf[] as s maps accumulated stream to auto-indexed array", async () => { + const chunks = [ + { choices: [{ delta: { role: "assistant", content: "" } }] }, + { choices: [{ delta: { content: "Hi" } }] }, + { choices: [{ delta: { content: "!" } }] }, + ]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.chat { + with chatApi as api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o <- buf[] as s { + .r <- s.role + .c <- s.content + } +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.chat", + tools: { chatApi: streamTool }, + }), + ); + + // Initial payload: stream field initialised to [] + const initial = payloads[0] as StreamInitialPayload; + assert.ok("data" in initial); + assert.deepStrictEqual(initial.data, []); + + // Incremental payloads with auto-incrementing indices + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + assert.ok(incrementals.length > 0, "should have incremental payloads"); + + // Each incremental should have an auto-incrementing index + const indices = incrementals.flatMap((inc) => + inc.incremental.map( + (item: StreamIncrementalPayload["incremental"][number]) => + item.path[item.path.length - 1], + ), + ); + // Auto-indexed: 0, 1, 2, ... + for (let i = 0; i < indices.length; i++) { + assert.strictEqual(indices[i], i, "should auto-increment index"); + } + + // Last payload should contain mapped accumulated state + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + r: "assistant", + c: "Hi!", + }); + }); + + test("o[0] <- buf[] as s maps accumulated stream with dispatch", async () => { + const chunks = [ + { choices: [{ delta: { role: "assistant", content: "" } }] }, + { choices: [{ delta: { content: "Hi" } }] }, + { choices: [{ delta: { content: "!" } }] }, + ]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.chat { + with chatApi as api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o[0] <- buf[] as s { + .r <- s.role + .c <- s.content + } +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.chat", + tools: { chatApi: streamTool }, + }), + ); + + // Incremental payloads with dispatch index + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + assert.ok(incrementals.length > 0, "should have incremental payloads"); + + // All items should target fixed index 0 (dispatch mode) + for (const inc of incrementals) { + for (const item of inc.incremental) { + assert.deepStrictEqual( + item.path, + [0], + "dispatch should target fixed index 0", + ); + } + } + + // Last payload should contain mapped accumulated state + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + r: "assistant", + c: "Hi!", + }); + }); + + test("o[c.index] <- buf[] as c maps accumulated stream with computed dispatch", async () => { + const chunks = [ + { choices: [{ delta: { role: "assistant", content: "" } }] }, + { choices: [{ delta: { content: "Hi" } }] }, + { choices: [{ delta: { content: "!" } }] }, + ]; + const streamTool = createStreamTool(chunks); + + const doc = parse(`version 1.5 + +tool buf from std.accumulate {} + +bridge Query.chat { + with chatApi as api + with buf + with output as o + + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + .index = 2 + } + + o[c.index] <- buf[] as c { + .role <- c.role + .content <- c.content + } +}`); + + const payloads = await collectPayloads( + executeBridgeStream({ + document: doc, + operation: "Query.chat", + tools: { chatApi: streamTool }, + }), + ); + + const initial = payloads[0] as StreamInitialPayload; + assert.ok("data" in initial); + assert.deepStrictEqual(initial.data, []); + + const incrementals = payloads.filter( + (p): p is StreamIncrementalPayload => + "incremental" in p && p.incremental.length > 0, + ); + + assert.ok(incrementals.length > 0, "should have incremental payloads"); + for (const inc of incrementals) { + for (const item of inc.incremental) { + assert.deepStrictEqual(item.path, [2]); + } + } + + const lastInc = incrementals[incrementals.length - 1]!; + const lastItem = lastInc.incremental[lastInc.incremental.length - 1]!; + assert.deepStrictEqual(lastItem.items[0], { + role: "assistant", + content: "Hi!", + }); + }); +}); diff --git a/packages/docs-site/worker-configuration.d.ts b/packages/docs-site/worker-configuration.d.ts index a271d040..2113d00f 100644 --- a/packages/docs-site/worker-configuration.d.ts +++ b/packages/docs-site/worker-configuration.d.ts @@ -1,7 +1,10 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types` (hash: 3687270ff097b92829087e49bb8b5282) +// Generated by Wrangler by running `wrangler types` (hash: e41227086db6ad8bb19b68d77b165868) // Runtime types generated with workerd@1.20260301.1 2026-02-24 global_fetch_strictly_public,nodejs_compat declare namespace Cloudflare { + interface GlobalProps { + mainModule: typeof import("./dist/_worker.js/index"); + } interface Env { SHARES: KVNamespace; ASSETS: Fetcher; diff --git a/packages/playground/src/engine.ts b/packages/playground/src/engine.ts index db386ee4..14e1d6f9 100644 --- a/packages/playground/src/engine.ts +++ b/packages/playground/src/engine.ts @@ -8,11 +8,13 @@ import { parseBridgeChevrotain, parseBridgeDiagnostics, executeBridge, + executeBridgeStream, formatBridgeError, prettyPrintToSource, buildTraversalManifest, decodeExecutionTrace, } from "@stackables/bridge"; +import type { StreamIncrementalItem } from "@stackables/bridge"; export { prettyPrintToSource }; import type { BridgeDiagnostic, @@ -28,6 +30,7 @@ import { std, getBridgeTraces, createHttpCall, + createHttpCallSSE, } from "@stackables/bridge"; // ── Playground HTTP cache: module-level, clearable from the UI ──────────────── @@ -62,6 +65,8 @@ const playgroundHttpCall = createHttpCall( playgroundHttpCache, ); +const playgroundHttpCallSSE = createHttpCallSSE(globalThis.fetch); + /** Flush all cached HTTP responses in the playground. */ export function clearHttpCache(): void { _httpCacheMap.clear(); @@ -188,7 +193,11 @@ export async function runBridge( try { transformedSchema = bridgeTransform(schema, instructions, { tools: { - std: { ...std, httpCall: playgroundHttpCall }, + std: { + ...std, + httpCall: playgroundHttpCall, + httpCallSSE: playgroundHttpCallSSE, + }, }, trace: "full", logger: collectingLogger, @@ -654,7 +663,13 @@ export async function runBridgeStandalone( document, operation, input, - tools: { std: { ...std, httpCall: playgroundHttpCall } }, + tools: { + std: { + ...std, + httpCall: playgroundHttpCall, + httpCallSSE: playgroundHttpCallSSE, + }, + }, context, trace: "full", logger: collectingLogger, @@ -686,6 +701,236 @@ export async function runBridgeStandalone( } } +// ── Streaming standalone execution ────────────────────────────────────────── + +/** + * Apply incremental stream patches to a mutable data tree. + * + * Each patch has `items` (values to insert) and `path` (JSON pointer + * where the last segment is the array index). We navigate to the + * parent array and set items at the target index. This works for both + * append mode (sequential indices) and dispatch mode (fixed index 0). + */ +function applyIncrementalPatches( + data: unknown, + patches: StreamIncrementalItem[], +): void { + for (const patch of patches) { + // The path includes the index as the last segment. + // Navigate to the parent container (the array). + const parentPath = patch.path.slice(0, -1); + let target: any = data; + for (const seg of parentPath) { + if (target == null) break; + target = target[seg]; + } + if (Array.isArray(target)) { + const index = patch.path[patch.path.length - 1] as number; + for (let i = 0; i < patch.items.length; i++) { + target[index + i] = patch.items[i]; + } + } + } +} + +/** + * Execute a bridge operation with incremental streaming. + * + * Uses `executeBridgeStream` under the hood. Calls `onPartial` after + * each incremental payload so the UI can show progressive results. + * Falls back to `runBridgeStandalone` semantics when no stream tools + * are present (single payload). + */ +export async function runBridgeStreamStandalone( + bridgeText: string, + operation: string, + inputJson = "{}", + requestedFields = "", + contextJson = "{}", + onPartial?: (result: RunResult) => void, +): Promise { + // 1. Parse Bridge DSL + let document; + try { + const result = parseBridgeDiagnostics(bridgeText, { + filename: "playground.bridge", + }); + document = result.document; + } catch (err: unknown) { + return { + errors: [ + `Bridge parse error: ${err instanceof Error ? err.message : String(err)}`, + ], + }; + } + + // 2. Parse input JSON + let input: Record; + try { + input = inputJson.trim() + ? (JSON.parse(inputJson) as Record) + : {}; + } catch (err: unknown) { + return { + errors: [ + `Input JSON error: ${err instanceof Error ? err.message : String(err)}`, + ], + }; + } + + // 3. Parse context JSON + let context: Record; + try { + context = contextJson.trim() + ? (JSON.parse(contextJson) as Record) + : {}; + } catch (err: unknown) { + return { + errors: [ + `Context JSON error: ${err instanceof Error ? err.message : String(err)}`, + ], + }; + } + + // 4. Parse requested fields + const fields = requestedFields + .split(",") + .map((f) => f.trim()) + .filter(Boolean); + + // 5. Build logger + const logs: LogEntry[] = []; + + function formatLog(args: unknown[]): string { + if (args.length === 0) return ""; + const fmt = String(args[0]); + let i = 1; + const msg = fmt.replace(/%[sdioOjf%]/g, (token) => { + if (token === "%%") return "%"; + if (i >= args.length) return token; + const val = args[i++]; + switch (token) { + case "%d": + case "%i": + case "%f": + return String(Number(val)); + case "%o": + case "%O": + case "%j": + try { + return JSON.stringify(val); + } catch { + return String(val); + } + default: + return String(val); + } + }); + const rest = args.slice(i).map(String); + return rest.length > 0 ? `${msg} ${rest.join(" ")}` : msg; + } + + const collectingLogger: Logger = { + debug: (...args: unknown[]) => + logs.push({ level: "debug", message: formatLog(args) }), + info: (...args: unknown[]) => + logs.push({ level: "info", message: formatLog(args) }), + warn: (...args: unknown[]) => + logs.push({ level: "warn", message: formatLog(args) }), + error: (...args: unknown[]) => + logs.push({ level: "error", message: formatLog(args) }), + }; + + // 6. Stream execution + _onCacheHit = (key: string) => { + try { + const url = new URL(key); + logs.push({ + level: "info", + message: `⚡ cache hit: ${url.pathname}${url.search}`, + }); + } catch { + logs.push({ level: "info", message: `⚡ cache hit: ${key}` }); + } + }; + try { + const stream = executeBridgeStream({ + document, + operation, + input, + tools: { + std: { + ...std, + httpCall: playgroundHttpCall, + httpCallSSE: playgroundHttpCallSSE, + }, + }, + context, + trace: "full", + logger: collectingLogger, + ...(fields.length > 0 ? { requestedFields: fields } : {}), + }); + + let data: unknown; + let traces: ToolTrace[] | undefined; + let executionTraceId: bigint | undefined; + + for await (const payload of stream) { + if ("data" in payload) { + // Initial payload + data = payload.data; + traces = payload.traces; + executionTraceId = payload.executionTraceId; + } else { + // Incremental payload — apply patches in-place + applyIncrementalPatches(data, payload.incremental); + if (payload.executionTraceId != null) { + executionTraceId = payload.executionTraceId; + } + } + + const partial: RunResult = { + data, + traces: traces && traces.length > 0 ? traces : undefined, + logs: logs.length > 0 ? [...logs] : undefined, + executionTraceId, + }; + + if (payload.hasNext && onPartial) { + onPartial(partial); + } + + if (!payload.hasNext) { + return partial; + } + } + + // Should not reach here, but just in case + return { + data, + traces: traces && traces.length > 0 ? traces : undefined, + logs: logs.length > 0 ? logs : undefined, + executionTraceId, + }; + } catch (err: unknown) { + const trace = + err && typeof err === "object" && "executionTraceId" in err + ? (err as { executionTraceId?: bigint }).executionTraceId + : undefined; + return { + errors: [ + formatBridgeError(err, { + source: document.source, + filename: document.filename, + }), + ], + ...(trace != null ? { executionTraceId: trace } : {}), + }; + } finally { + _onCacheHit = null; + } +} + // ── Traversal manifest helpers ────────────────────────────────────────────── export type { TraversalEntry }; diff --git a/packages/playground/src/examples.ts b/packages/playground/src/examples.ts index 27d04c38..8325566a 100644 --- a/packages/playground/src/examples.ts +++ b/packages/playground/src/examples.ts @@ -1510,4 +1510,193 @@ bridge Query.enrichedUsers { "userIds": [1, 2, 999] }`, }, + { + id: "deepseek-sync", + name: "Deepseek Sync", + description: + "Integrate with the Deepseek API to power a synchronous chatbot that accepts a message history and returns a response", + schema: `scalar JSONObejct + +type Query { + _: Boolean +} + +type Mutation { + deepseekChat(messages: JSONObejct): [Message] +} + +type Message { + role: String! + content: String! +}`, + bridge: `version 1.5 + +# 1. Define the reusable HTTP tool +tool deepseekApi from std.httpCall { + .baseUrl = "https://api.deepseek.com" + .method = POST + .path = "/chat/completions" + .headers.Content-Type = "application/json" +} + +# 2. Define the GraphQL endpoint / Bridge operation +bridge Mutation.deepseekChat { + with deepseekApi as api + with input as i + with context as ctx + with output as o + + # Securely pass the API key from context (so it isn't logged in the input) + api.headers.Authorization <- "Bearer {ctx.DEEPSEEK_API_KEY}" + + # Construct the JSON body payload + api.model = "deepseek-chat" + api.stream = false + + # Build the messages array dynamically using the user's prompt + api.messages <- i.messages + + # Map the response directly to the output object + o <- api.choices[] as c { + .role <- c.message.role + .content <- c.message.content + } +}`, + queries: [ + { + name: "Deepseek Chat", + query: `mutation { + deepseekChat(messages: [ + { + role: "system", + content: "You are a helpful assistant." + }, + { + role: "user", + content: "Tell me a joke" + } + ]) { + content + role + } +}`, + }, + ], + standaloneQueries: [ + { + operation: "Mutation.deepseekChat", + outputFields: "", + input: { + messages: [ + { + role: "system", + content: "You are a helpful assistant.", + }, + { + role: "user", + content: "Tell me a joke", + }, + ], + }, + }, + ], + context: `{ "DEEPSEEK_API_KEY": "" }`, + }, + { + id: "deepseek-stream", + name: "Deepseek Stream", + mode: "standalone", + description: + "Stream a Deepseek chat completion via SSE — tokens arrive incrementally using std.httpCallSSE", + schema: `scalar JSONObject + +type Query { + _: Boolean +} + +type Mutation { + deepseekStream(messages: JSONObject): [StreamChunk] +} + +type StreamChunk { + content: String + role: String +}`, + bridge: `version 1.5 + +# SSE stream tool — each Server-Sent Event becomes one yielded item +tool deepseekApi from std.httpCallSSE { + .baseUrl = "https://api.deepseek.com" + .method = POST + .path = "/chat/completions" + .headers.Content-Type = "application/json" +} + +# Accumulator — deep-merges SSE deltas into a single state. +# Throttled to emit at most once per 100ms to avoid flooding the client. +tool buf from std.accumulate { + .interval = 100 +} + +bridge Mutation.deepseekStream { + with deepseekApi as api + with buf + with input as i + with context as ctx + with output as o + + api.headers.Authorization <- "Bearer {ctx.DEEPSEEK_API_KEY}" + + api.model = "deepseek-chat" + api.stream = true + api.messages <- i.messages + + # Map SSE deltas before accumulation, dispatch to index 0 + buf <- api[] as chunk { + .role <- chunk.choices[0].delta.role + .content <- chunk.choices[0].delta.content + } + + o[0] <- buf +}`, + queries: [ + { + name: "Deepseek Stream", + query: `mutation { + deepseekStream(messages: [ + { + role: "system", + content: "You are a helpful assistant." + }, + { + role: "user", + content: "Tell me a joke" + } + ]) { + content + role + } +}`, + }, + ], + standaloneQueries: [ + { + operation: "Mutation.deepseekStream", + outputFields: "", + input: { + messages: [ + { + role: "system", + content: "You are a helpful assistant.", + }, + { + role: "user", + content: "Tell me a joke", + }, + ], + }, + }, + ], + context: `{ "DEEPSEEK_API_KEY": "" }`, + }, ]; diff --git a/packages/playground/src/usePlaygroundState.ts b/packages/playground/src/usePlaygroundState.ts index 5d15d83c..f3d28156 100644 --- a/packages/playground/src/usePlaygroundState.ts +++ b/packages/playground/src/usePlaygroundState.ts @@ -3,7 +3,7 @@ import { examples } from "./examples"; import type { QueryTab } from "./Playground"; import { runBridge, - runBridgeStandalone, + runBridgeStreamStandalone, getDiagnostics, extractBridgeOperations, extractOutputFields, @@ -244,12 +244,13 @@ export function usePlaygroundState( try { let r: RunResult; if (mode === "standalone") { - r = await runBridgeStandalone( + r = await runBridgeStreamStandalone( bridge, activeQuery.operation ?? "", activeQuery.inputJson ?? "{}", activeQuery.outputFields ?? "", context, + (partial) => setResults((prev) => ({ ...prev, [qId]: partial })), ); } else { r = await runBridge(schema, bridge, activeQuery.query, {}, context);