diff --git a/.changeset/prompt-processing-indicators.md b/.changeset/prompt-processing-indicators.md new file mode 100644 index 0000000..98a8b9b --- /dev/null +++ b/.changeset/prompt-processing-indicators.md @@ -0,0 +1,37 @@ +--- +"@ai-sdk-tool/harness": patch +"@ai-sdk-tool/tui": patch +"@ai-sdk-tool/headless": patch +"plugsuits": patch +--- + +Surface the "prompt processing" state that previously looked frozen, and fix follow-up correctness gaps found during post-implementation review. + +- Harness: new `LoopHooks.onStreamStart` / `onFirstStreamPart` hooks wrap the `agent.stream()` call site so consumers driving turns through `runAgentLoop` can react to the prompt-processing latency gap. `onFirstStreamPart` receives the current stream part as its first argument (`TextStreamPart`) so consumers can inspect `part.type` to filter framing chunks (`start`, `text-start`, …) from visible content. `TextStreamPart` is re-exported from the harness root for convenience. Docstring clarifies that the TUI has its own independent `onStreamStart` on `AgentTUIConfig`. +- TUI: shows a `Processing...` loader during turn preparation and transitions to `Working...` once the LLM request is in flight. The startup token probe is now non-blocking (fire-and-forget) so the editor accepts input immediately; the context-usage footer starts from the estimated count and quietly upgrades to the real value. During a blocking compaction the foreground loader temporarily switches to `Compacting...` and restores the previous label when the block ends, so users see the actual reason for a long wait. `text-start` stream parts are now treated as visible, clearing the streaming loader as soon as the assistant view mounts (no more empty-view flicker). +- Headless: emits a `turn-start` lifecycle annotation and a matching `onStreamStart` callback before each LLM request; the event is dropped from `trajectory.json` (transient UX signal, no `step_id`) so persisted consumers see identical output. The event fires exactly once per logical turn — overflow and no-output retries no longer re-emit it. New tests cover normal ordering, `new-turn` vs `intermediate-step` phases, retry single-emission, and non-persistence in `trajectory.json`. +- Headless: the persisted `schema_version` is corrected from the internal `ATIF-v1.6` label to the actual current Harbor spec version `ATIF-v1.4` (). Documentation across `packages/headless/AGENTS.md`, `packages/headless/README.md`, and `packages/cea/benchmark/AGENTS.md` now separates the internal JSONL streaming protocol (which carries lifecycle annotations such as `approval`, `compaction`, `interrupt`, `turn-start`) from the ATIF-v1.4 trajectory that `TrajectoryCollector` writes to disk. +- Headless: `StepMetrics` gains the remaining ATIF-v1.4 optional fields (`logprobs`, `prompt_token_ids`, `completion_token_ids`) and `TrajectoryJson.final_metrics` now aggregates `total_cost_usd`. `TrajectoryJson.extra` is typed as a closed record of exactly the three ATIF persistence buckets (`approval_events`, `compaction_events`, `interrupt_events`); new lifecycle types must extend the interface explicitly so the Harbor persistence contract stays type-enforced. +- CEA: the `--atif` CLI help text and the benchmark pipeline now reference ATIF-v1.4 (matching the corrected `schema_version`). The bundled `packages/cea/benchmark/test_trajectory.py` validator now calls Harbor's official `TrajectoryValidator` when `harbor` is importable and falls back to a stricter local shape check otherwise; it enforces per-step metric shapes and rejects `bool` values where ATIF requires a real number. +- Addressed PR review feedback: + - `turn-start` and `onStreamStart` now fire strictly after `agent.stream()` successfully returns, so stream-creation failures no longer produce a false "stream started" signal (reported by Gemini, Codex, and Cubic reviewers). + - The background startup usage probe is serialized against per-turn probes by a generation token; a stale startup probe can no longer overwrite newer usage data and skew context-pressure metrics. + - The blocking-compaction spinner swap only stashes the original foreground label on first entry and only restores it when the foreground loader is still live, eliminating both the "Compacting..." wording sticking after unblock and the "Processing..." spinner resurrecting after the first stream part arrived. + - Restored the post-`onSetup` `updateHeader()` call that was accidentally dropped when the startup probe became non-blocking, so any header/footer state that `onSetup` initialises renders immediately instead of waiting for the first probe to resolve. + - The bundled Python ATIF validator (`test_trajectory.py`) no longer accepts `bool` values where ATIF v1.4 requires a real number — `isinstance(True, int)` is `True` in Python, so the old check let invalid metric payloads slip through. Added `_is_real_number` / `_is_real_int` helpers that exclude `bool`. + - Observer hooks (`onStreamStart`, `onFirstStreamPart`) no longer abort a valid stream when the callback throws. Errors are logged via `console.error` and swallowed in the harness loop, headless runner, and TUI session loop, with the contract documented on `LoopHooks`. + - Repaired a regression where `LoopHooks.onToolCall` had silently dropped out of the public `LoopHooks` interface while still being destructured inside `runAgentLoop`. The field is restored to its original signature; consumers that already relied on it are unaffected, and the destructuring now type-checks again. + - Corrected the `LoopHooks.onFirstStreamPart` signature as a pre-adoption fix (Cubic P2): the previous `(context) => void` shape promised in its docstring that consumers could filter on part type, but the callback never received the part. The signature now passes `(part: TextStreamPart, context)` so consumers can actually inspect `part.type`. Zero existing consumers were found across the monorepo (the hook was introduced earlier in this PR), so this is a type-only correction with no runtime migration. New regression tests in `loop.test.ts` cover single-fire semantics, per-iteration firing, empty-stream skip, and observer-error isolation. +- Pinned the ATIF v1.4 compliance contract in-source: `trajectory-collector.ts`, `TrajectoryJson`, `AtifStep`, `TrajectoryEvent`, `collectTrajectoryEvent`, and `runHeadless` now carry module/interface-level JSDoc spelling out the Harbor spec version, the allowed `steps[*].source` values, the `extra.*` persistence rule, and the stream-vs-snapshot boundary. `packages/headless/AGENTS.md` gains an "ATIF v1.4 COMPLIANCE" section listing the same invariants, and the `atif-events.test.ts` suite now declares itself as the executable compliance contract. These are docs-only, but they turn future spec drifts into obvious code-review red flags instead of silent regressions. +- Review cycle 1 follow-ups (Oracle + Gemini + Codex + Cubic + CodeRabbit): + - Guarded `TrajectoryCollector.writeTo` against persisting an invalid zero-step trajectory (Harbor's own validator rejects `steps: []`). The method now returns `boolean` — `true` when a file was written, `false` when the write was intentionally skipped to keep `trajectory.json` ATIF-v1.4 compliant. + - Moved the TUI `showLoader("Processing...")` call inside the stream-turn `try/finally` so a thrown `prepareMessages` (or `onBeforeTurn`/usage probe/compaction check) no longer leaves the spinner stuck on screen. + - Tightened the startup usage-probe guard: in addition to the generation token, `measureUsageIfAvailable` now captures `messageHistory.getRevision()` at call time and drops its result when the history has mutated mid-probe, preventing stale empty-message usage from overwriting per-turn measurements. + - Narrowed `TrajectoryJson.extra` to the three canonical lifecycle buckets (`approval_events`, `compaction_events`, `interrupt_events`) by dropping the `Record` intersection. New lifecycle types must now extend the interface explicitly, keeping the ATIF persistence contract type-enforced. + - Hardened the Python validator: `_is_real_number` now rejects `NaN`, `Infinity`, and `-Infinity` (all of which `json.loads` will happily produce from non-strict JSON) via an explicit `math.isfinite` check. + - Corrected documentation drift across `packages/headless/AGENTS.md`, `packages/headless/README.md`, `packages/headless/src/types.ts`, `packages/headless/src/trajectory-collector.ts`, and the root `AGENTS.md`: `approval`/`compaction`/`interrupt` are persisted under `trajectory.extra.*`, not JSONL-only; only `turn-start` and `error` are transient. + - Regression test added for the `writeTo` zero-step guard: `does not write an invalid zero-step trajectory when the stream fails before any step`. +- Review cycle 2 follow-ups (Oracle re-audit): + - Headless `measureUsageIfAvailable` now carries the same generation + revision guards the TUI already had. A slow background probe that resolves after a compaction or a newer per-turn probe no longer overwrites fresh usage data. + - ATIF v1.4 step source contract aligned across code, Python validator, and benchmark docs: `user`, `agent`, and `system` are all permitted (Harbor v1.2+). Previous divergence between `AtifStep.source` and `test_trajectory.py`'s `valid_sources = {user, agent}` is resolved. + - Root `README.md` headless event list now includes `turn-start` and points at Harbor's ATIF-v1.4 schema for the persisted trajectory. diff --git a/AGENTS.md b/AGENTS.md index 85872fc..096b6a1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -182,7 +182,7 @@ try { - File edits in CEA favor hashline-aware operations (`LINE#HASH` + `expected_file_hash`) for stale-safe modifications. - Manual tool-loop continuation is intentionally constrained to normalized `tool-calls` finish reasons. -- Headless mode emits structured ATIF JSONL lifecycle types (`metadata`, `step`, `approval`, `compaction`, `error`, `interrupt`) consumed by benchmark tooling. +- Headless mode emits a JSONL event stream with lifecycle types `metadata`, `step`, `approval`, `compaction`, `error`, `interrupt`, and `turn-start`. The persisted `trajectory.json` produced by `TrajectoryCollector` follows Harbor's ATIF-v1.4 schema (): `approval`, `compaction`, and `interrupt` are bundled into `extra.*` buckets; `turn-start` and `error` are JSONL-only. - `SkillsEngine` discovers skills from up to five directories: bundled, global skills, global commands, project skills, project commands. ## COMMANDS diff --git a/README.md b/README.md index 0ced259..d7d4dda 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Available commands: pnpm run headless -- "Fix the type error in src/index.ts" ``` -Outputs structured ATIF JSONL events (`metadata`, `step`, `approval`, `compaction`, `error`, `interrupt`) for programmatic consumption. +Outputs a JSONL event stream (`metadata`, `step`, `approval`, `compaction`, `error`, `interrupt`, `turn-start`) for programmatic consumption. The persisted `trajectory.json` conforms to Harbor's ATIF-v1.4 schema. ## Architecture diff --git a/packages/cea/benchmark/AGENTS.md b/packages/cea/benchmark/AGENTS.md index b51b801..fcc43cc 100644 --- a/packages/cea/benchmark/AGENTS.md +++ b/packages/cea/benchmark/AGENTS.md @@ -36,7 +36,7 @@ Control agent behavior via environment variables: | `AGENT_ENABLE_THINKING` | `1`, `true`, `yes` | Enable `--think` flag (captures reasoning content) | | `AGENT_ENABLE_TOOL_FALLBACK` | `1`, `true`, `yes` | Enable `--tool-fallback` flag (XML-based tool calling for non-native models) | -## Event Flow (ATIF-v1.6) +## Event Flow ``` headless.ts (Docker) output.jsonl harbor_agent.py @@ -51,9 +51,11 @@ headless.ts (Docker) output.jsonl harbor_agent.py │ │ │ ├─► emit InterruptEvent ───► interrupt ─────────────► lifecycle annotation │ │ │ + ├─► emit TurnStartEvent ───► turn-start ────────────► lifecycle annotation (not persisted) + │ │ │ └─► emit StepEvent(agent) ─► step (agent) ──────────► Step(source="agent") │ │ - └───────────────────────► trajectory.json (ATIF-v1.6, written by headless) + └───────────────────────► trajectory.json (ATIF-v1.4, written by headless) ``` ## Event Types (output.jsonl) @@ -66,13 +68,15 @@ headless.ts (Docker) output.jsonl harbor_agent.py | `compaction` | `event`, `tokensBefore`, `tokensAfter?`, `durationMs?` | History compaction events | | `error` | `error`, `timestamp` | Fatal errors | | `interrupt` | `reason`, `timestamp` | Intentional caller interruption | +| `turn-start` | `phase`, `timestamp` | Lifecycle annotation emitted once per logical turn right after `agent.stream()` dispatch; dropped by `TrajectoryCollector` and absent from `trajectory.json` | ## Verification ### 1. Event Type Distribution ```bash cat jobs//*/agent/output.jsonl | jq -r '.type' | sort | uniq -c -# Expected output like: 1 metadata N step M compaction K approval optional interrupt (no unexpected 'error' lines) +# Expected output like: 1 metadata N step N turn-start M compaction K approval optional interrupt (no unexpected 'error' lines) +# Note: turn-start count should match the number of logical turns (== agent step count for linear conversations). ``` ### 2. Step ID Sequence @@ -88,7 +92,7 @@ python -m harbor.utils.trajectory_validator jobs//*/agent/trajectory.jso ``` Validator expectations: -- `steps[*].source` is currently `user` or `agent` +- `steps[*].source` is `user`, `agent`, or `system` (ATIF v1.4 permits all three; system steps support observations since v1.2) - bundled tool observations live in `steps[*].observation.results` - persisted lifecycle annotations, when present, live under `extra.approval_events`, `extra.compaction_events`, and `extra.interrupt_events` diff --git a/packages/cea/benchmark/scorer.py b/packages/cea/benchmark/scorer.py index 81318cb..5e657f4 100644 --- a/packages/cea/benchmark/scorer.py +++ b/packages/cea/benchmark/scorer.py @@ -25,7 +25,7 @@ def parse_timestamp(ts: str | None) -> datetime | None: def score_trajectory(trajectory: dict) -> dict: - """Compute performance metrics from an ATIF-v1.6 trajectory dict.""" + """Compute performance metrics from an ATIF-v1.4 trajectory dict.""" steps = trajectory.get("steps", []) fm = trajectory.get("final_metrics", {}) or {} compaction_events = trajectory.get("extra", {}).get("compaction_events", []) diff --git a/packages/cea/benchmark/test_trajectory.py b/packages/cea/benchmark/test_trajectory.py index f6403d6..865911b 100644 --- a/packages/cea/benchmark/test_trajectory.py +++ b/packages/cea/benchmark/test_trajectory.py @@ -1,17 +1,28 @@ #!/usr/bin/env python3 -"""ATIF-v1.6 trajectory validation test. +"""ATIF-v1.4 trajectory validation test. Usage: python3 test_trajectory.py """ from __future__ import annotations import json +import math import sys from pathlib import Path +def _is_real_number(value: object) -> bool: + if isinstance(value, bool) or not isinstance(value, (int, float)): + return False + return math.isfinite(float(value)) + + +def _is_real_int(value: object) -> bool: + return isinstance(value, int) and not isinstance(value, bool) + + def validate_trajectory(path: str) -> list[str]: - """Validate trajectory.json against ATIF-v1.6 spec. Returns list of errors.""" + """Validate trajectory.json against ATIF-v1.4 spec. Returns list of errors.""" errors = [] try: @@ -21,9 +32,9 @@ def validate_trajectory(path: str) -> list[str]: return [f"Cannot read/parse file: {e}"] # 1. schema_version - if t.get("schema_version") != "ATIF-v1.6": + if t.get("schema_version") != "ATIF-v1.4": errors.append( - f"schema_version: expected 'ATIF-v1.6', got {t.get('schema_version')!r}" + f"schema_version: expected 'ATIF-v1.4', got {t.get('schema_version')!r}" ) # 2. session_id present @@ -53,8 +64,9 @@ def validate_trajectory(path: str) -> list[str]: if step_ids != expected: errors.append(f"step_ids: expected {expected}, got {step_ids}") - # 6. each step has required fields - valid_sources = {"user", "agent"} + # 6. each step has required fields. ATIF v1.4 permits "user", "agent", + # and "system" as step sources (system steps support observations since v1.2). + valid_sources = {"user", "agent", "system"} for i, step in enumerate(steps): if not isinstance(step, dict): continue @@ -84,8 +96,51 @@ def validate_trajectory(path: str) -> list[str]: if not isinstance(fm, dict): errors.append("final_metrics must be a dictionary") return errors - if not isinstance(fm.get("total_steps"), int): + if not _is_real_int(fm.get("total_steps")): errors.append("final_metrics.total_steps: must be an integer") + for token_field in ( + "total_prompt_tokens", + "total_completion_tokens", + "total_cached_tokens", + "total_cost_usd", + ): + value = fm.get(token_field) + if value is not None and not _is_real_number(value): + errors.append( + f"final_metrics.{token_field}: must be a number or null, got {type(value).__name__}" + ) + + # 8b. per-step metrics shape + for i, step in enumerate(steps): + if not isinstance(step, dict): + continue + metrics = step.get("metrics") + if metrics is None: + continue + if not isinstance(metrics, dict): + errors.append(f"steps[{i}].metrics: must be a dictionary when present") + continue + for num_field in ( + "prompt_tokens", + "completion_tokens", + "cached_tokens", + "cost_usd", + ): + value = metrics.get(num_field) + if value is not None and not _is_real_number(value): + errors.append( + f"steps[{i}].metrics.{num_field}: must be a number when present" + ) + for list_field in ( + "logprobs", + "prompt_token_ids", + "completion_token_ids", + ): + value = metrics.get(list_field) + if value is not None and not isinstance(value, list): + errors.append( + f"steps[{i}].metrics.{list_field}: must be a list when present" + ) # 9. persisted lifecycle annotations under extra extra = t.get("extra") @@ -104,6 +159,22 @@ def validate_trajectory(path: str) -> list[str]: return errors +def run_harbor_validator(path: str) -> list[str] | None: + """Run Harbor's official trajectory_validator when the harbor package is + importable. Returns None when Harbor isn't installed so the caller can + fall back to the bundled validator.""" + try: + from harbor.utils.trajectory_validator import TrajectoryValidator + except ImportError: + return None + + validator = TrajectoryValidator() + is_valid = validator.validate(path) + if is_valid: + return [] + return [f"harbor: {err}" for err in validator.get_errors()] + + def main() -> None: if len(sys.argv) < 2: print("Usage: python3 test_trajectory.py ") @@ -112,6 +183,11 @@ def main() -> None: path = sys.argv[1] errors = validate_trajectory(path) + harbor_errors = run_harbor_validator(path) + harbor_used = harbor_errors is not None + if harbor_errors: + errors.extend(harbor_errors) + if errors: print(f"VALIDATION FAILED: {path}") for e in errors: @@ -128,7 +204,7 @@ def main() -> None: print(f" session_id: {t.get('session_id')}") print(f" steps: {len(steps)}") print( - f" final_metrics: total_prompt={fm.get('total_prompt_tokens')}, total_completion={fm.get('total_completion_tokens')}" + f" final_metrics: total_prompt={fm.get('total_prompt_tokens')}, total_completion={fm.get('total_completion_tokens')}, total_cost={fm.get('total_cost_usd')}" ) extra = t.get("extra", {}) or {} print( @@ -137,6 +213,9 @@ def main() -> None: f"compaction={len(extra.get('compaction_events', []))}, " f"interrupt={len(extra.get('interrupt_events', []))}" ) + print( + f" harbor_validator: {'passed' if harbor_used else 'skipped (harbor package not installed)'}" + ) sys.exit(0) diff --git a/packages/cea/src/entrypoints/main.ts b/packages/cea/src/entrypoints/main.ts index c769737..2ab2119 100644 --- a/packages/cea/src/entrypoints/main.ts +++ b/packages/cea/src/entrypoints/main.ts @@ -627,7 +627,7 @@ const mainCommand = defineCommand({ atif: { type: "boolean", description: - "Generate trajectory.json in ATIF-v1.6 format (Harbor compatible)", + "Generate trajectory.json in ATIF-v1.4 format (Harbor compatible)", default: false, }, }, diff --git a/packages/harness/src/index.ts b/packages/harness/src/index.ts index e8e190d..6d0c0db 100644 --- a/packages/harness/src/index.ts +++ b/packages/harness/src/index.ts @@ -1,4 +1,4 @@ -export type { LanguageModelUsage } from "ai"; +export type { LanguageModelUsage, TextStreamPart } from "ai"; export { createAgent } from "./agent"; export type { BackgroundMemoryExtractorConfig } from "./background-memory-extractor"; export { BackgroundMemoryExtractor } from "./background-memory-extractor"; diff --git a/packages/harness/src/loop.test.ts b/packages/harness/src/loop.test.ts index 10dbba2..7d75a94 100644 --- a/packages/harness/src/loop.test.ts +++ b/packages/harness/src/loop.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { AgentError, AgentErrorCode } from "./errors"; import { runAgentLoop } from "./loop"; import type { Agent, AgentStreamResult } from "./types"; @@ -304,6 +304,102 @@ describe("runAgentLoop", () => { expect(result.messages.length).toBe(3); }); + it("invokes onFirstStreamPart exactly once with the first emitted part", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + toolCallsPerIteration: [ + [ + { toolName: "get_time", args: {} }, + { toolName: "get_weather", args: { location: "Paris" } }, + ], + [], + ], + }); + + const observed: Array<{ type: string; iteration: number }> = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onFirstStreamPart: (part, context) => { + observed.push({ type: part.type, iteration: context.iteration }); + }, + }); + + expect(observed).toHaveLength(1); + expect(observed[0]?.type).toBe("tool-call"); + expect(observed[0]?.iteration).toBe(0); + }); + + it("invokes onFirstStreamPart on each iteration when the stream has content", async () => { + const agent = createMockAgent(["tool-calls", "tool-calls", "stop"], { + toolCallsPerIteration: [ + [{ toolName: "first_call", args: {} }], + [{ toolName: "second_call", args: {} }], + [], + ], + }); + + const observed: Array<{ type: string; iteration: number }> = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onFirstStreamPart: (part, context) => { + observed.push({ type: part.type, iteration: context.iteration }); + }, + }); + + expect(observed).toHaveLength(2); + expect(observed[0]?.iteration).toBe(0); + expect(observed[1]?.iteration).toBe(1); + }); + + it("skips onFirstStreamPart when the stream yields no parts", async () => { + const agent = createMockAgent(["stop"], { + toolCallsPerIteration: [[]], + }); + + const observed: unknown[] = []; + + await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onFirstStreamPart: (part) => { + observed.push(part); + }, + }); + + expect(observed).toHaveLength(0); + }); + + it("isolates onFirstStreamPart observer errors from the stream flow", async () => { + const agent = createMockAgent(["tool-calls", "stop"], { + toolCallsPerIteration: [[{ toolName: "noop", args: {} }], []], + }); + + const consoleErrorSpy = vi + .spyOn(console, "error") + .mockImplementation(() => undefined); + + try { + const result = await runAgentLoop({ + agent, + messages: [{ role: "user", content: "Hello" }], + onFirstStreamPart: () => { + throw new Error("observer bug"); + }, + }); + + expect(result.iterations).toBe(2); + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.stringContaining("onFirstStreamPart"), + expect.any(Error) + ); + } finally { + consoleErrorSpy.mockRestore(); + } + }); + it("calls onToolCall for each tool call", async () => { const agent = createMockAgent(["tool-calls", "stop"], { toolCallsPerIteration: [ diff --git a/packages/harness/src/loop.ts b/packages/harness/src/loop.ts index f274d97..b5c40f8 100644 --- a/packages/harness/src/loop.ts +++ b/packages/harness/src/loop.ts @@ -17,6 +17,21 @@ import type { RunAgentLoopResult, } from "./types"; +async function invokeObserverHook( + hook: ((...args: Args) => void | Promise) | undefined, + hookName: string, + ...args: Args +): Promise { + if (!hook) { + return; + } + try { + await hook(...args); + } catch (error) { + console.error(`[harness] ${hookName} threw; continuing stream:`, error); + } +} + /** * Runs an {@link Agent} in a loop until a stop condition is met or `maxIterations` is reached. * @@ -44,8 +59,10 @@ export async function runAgentLoop( agent, abortSignal, onError, + onFirstStreamPart, onInterrupt, onStepComplete, + onStreamStart, onToolCall, onToolLifecycle, } = options; @@ -98,7 +115,21 @@ export async function runAgentLoop( const stream = agent.stream(streamOptions); + await invokeObserverHook(onStreamStart, "onStreamStart", context); + + let firstPartSeen = false; + for await (const part of stream.fullStream) { + if (!firstPartSeen) { + firstPartSeen = true; + await invokeObserverHook( + onFirstStreamPart, + "onFirstStreamPart", + part, + context + ); + } + const lifecycle = getToolLifecycleState( part as { toolCallId?: string; toolName?: string; type: string } ); diff --git a/packages/harness/src/types.ts b/packages/harness/src/types.ts index c018ba9..73661d4 100644 --- a/packages/harness/src/types.ts +++ b/packages/harness/src/types.ts @@ -7,6 +7,7 @@ import type { LanguageModel, ModelMessage, streamText, + TextStreamPart, ToolCallPart, ToolSet, } from "ai"; @@ -158,6 +159,26 @@ export interface LoopHooks { | Promise< { shouldContinue?: boolean; recovery?: ModelMessage[] } | undefined >; + /** + * Fires exactly once per loop iteration, on the **first** part of any + * kind emitted from `stream.fullStream`. The part itself is passed as + * the first argument so consumers can inspect `part.type` if they only + * care about visible output — e.g., ignore framing parts (`start`, + * `text-start`, …) and react only to `text-delta`, `tool-call`, etc. + * + * Important: the hook fires on the very first part regardless of its + * visibility. Filtering inside the callback lets you *decide what to + * do* with that first part; it does NOT cause the hook to re-fire on + * a later visible part. + * + * Observer-only contract: errors thrown from the callback are logged + * via `console.error` and swallowed so a buggy observer cannot abort a + * valid stream. + */ + onFirstStreamPart?: ( + part: TextStreamPart, + context: LoopContinueContext + ) => void | Promise; onInterrupt?: ( interruption: { iteration: number; @@ -169,6 +190,24 @@ export interface LoopHooks { context: LoopContinueContext ) => BeforeTurnResult | Promise | undefined; onStepComplete?: (step: LoopStepInfo) => void | Promise; + /** + * Fires immediately after {@link Agent.stream} is invoked and before the + * `fullStream` iteration begins. This is the closest hook to "LLM request + * sent, waiting for first chunk" — intended for consumers that need to + * display a loading indicator during the prompt-processing latency gap. + * + * Observer-only contract: this hook must not influence the stream. Errors + * thrown from the callback are logged via `console.error` and swallowed + * so a buggy observer cannot abort a valid stream. + * + * The callback is awaited before iteration starts, so keep it fast. + * + * Note: only runtimes that drive their turns through `runAgentLoop` + * observe this hook. The TUI (`createAgentTUI`) implements its own + * stream loop and exposes an independent `AgentTUIConfig.onStreamStart` + * with a `phase` argument. + */ + onStreamStart?: (context: LoopContinueContext) => void | Promise; onToolCall?: ( call: ToolCallPart, context: LoopContinueContext diff --git a/packages/headless/AGENTS.md b/packages/headless/AGENTS.md index e3733de..18c7187 100644 --- a/packages/headless/AGENTS.md +++ b/packages/headless/AGENTS.md @@ -9,9 +9,20 @@ This package provides a non-interactive, JSONL event-streaming runtime for agent The package depends on `@ai-sdk-tool/harness` for `CheckpointHistory`, `AgentStreamResult`, and `shouldContinueManualToolLoop`. The agent itself is passed in as a config parameter. -## JSONL EVENT PROTOCOL (ATIF-v1.6) +## JSONL EVENT PROTOCOL -Every event is a JSON object on its own line. Events conform to the ATIF-v1.6 specification for trajectory logging. +Every event is a JSON object on its own line. + +> **This JSONL stream is NOT the ATIF schema.** ATIF is the format of the +> persisted `trajectory.json` file produced by `TrajectoryCollector` — see +> [Harbor's ATIF specification](https://www.harborframework.com/docs/agents/trajectory-format). +> The current ATIF version is **v1.4**. This JSONL protocol is an internal +> streaming contract used by the runner to drive UI, telemetry, and the +> trajectory collector. Persisted lifecycle annotations (`approval`, +> `compaction`, `interrupt`) are bundled into ATIF `extra.*` buckets by the +> collector — they are NOT `steps[*].source` values, but they do survive +> on disk. Transient annotations (`turn-start`, `error`) stay JSONL-only +> and are never written to `trajectory.json`. ### Design Decisions - **NO sessionId on individual events**: A single `MetadataEvent` at the start carries the `session_id`. @@ -31,6 +42,7 @@ Every event is a JSON object on its own line. Events conform to the ATIF-v1.6 sp | `compaction` | system | Lifecycle event for history compaction | | `error` | system | Fatal or iteration-limit error | | `interrupt` | system | Intentional caller interruption (`caller-abort`) | +| `turn-start` | system | Lifecycle annotation emitted right after `agent.stream()` is invoked, before the first chunk arrives | ### Examples @@ -77,6 +89,11 @@ Every event is a JSON object on its own line. Events conform to the ATIF-v1.6 sp {"type":"error","timestamp":"2026-04-03T10:00:20.000Z","error":"Max iterations (50) reached"} ``` +**TurnStartEvent**: +```json +{"type":"turn-start","timestamp":"2026-04-03T10:00:04.500Z","phase":"new-turn"} +``` + ## KEY EXPORTS ### `runHeadless(config: HeadlessRunnerConfig): Promise` @@ -114,7 +131,7 @@ import type { | `runner.ts` | `runHeadless` | Main agent loop with JSONL emission | | `emit.ts` | `emitEvent` | Default stdout JSONL event sink | | `signals.ts` | `registerSignalHandlers` | Process signal lifecycle management | -| `types.ts` | `TrajectoryEvent`, etc. | ATIF-v1.6 event type definitions | +| `types.ts` | `TrajectoryEvent`, etc. | JSONL stream event types (internal) + ATIF-v1.4 persisted types | ## CONVENTIONS @@ -130,3 +147,16 @@ import type { - Estimating token counts (use `metrics` from SDK only). - Manual `step_id` management outside the runner. - Using `console.log` for non-event output. + +## ATIF v1.4 COMPLIANCE (persisted trajectory.json) + +`TrajectoryCollector` writes the only output that MUST conform to Harbor's ATIF v1.4 spec (). When editing `trajectory-collector.ts` or `collectTrajectoryEvent` in `runner.ts`, keep the following invariants: + +- **schema_version is the literal `"ATIF-v1.4"`**. Never bump unilaterally — bump only when the upstream Harbor spec bumps and this implementation has been audited against the new version's required/optional fields. +- **`steps[*].source` ∈ `{user, agent, system}`**. Never widen this set; new event types go to `extra.*` or are dropped from persistence. +- **Persisted lifecycle annotations live under `extra.*`**: `approval_events`, `compaction_events`, `interrupt_events`. Extending this set is acceptable (ATIF `extra` is forward-compatible) but each new bucket requires a new collector method and a corresponding `collectTrajectoryEvent` case. +- **Adding a JSONL event type is additive for stdout** but requires an explicit routing decision in `collectTrajectoryEvent`: persist under `extra.*`, or drop. Leaving the `default: return;` fallthrough is a valid choice for transient signals (the pattern `turn-start` uses). +- **`final_metrics` keys are null-when-absent, not omitted**. The shape is load-bearing for downstream tools (Harbor validator, scorer). +- **Metrics come from SDK `stream.usage`** — never estimated; never hand-filled. + +Violating these invariants silently breaks Harbor benchmark runs, since the persisted trajectory will fail `harbor.utils.trajectory_validator` or produce unusable scorer output. diff --git a/packages/headless/README.md b/packages/headless/README.md index 33787d9..ce4a37f 100644 --- a/packages/headless/README.md +++ b/packages/headless/README.md @@ -50,7 +50,7 @@ await runHeadless({ }); ``` -**Example output (ATIF-v1.6):** +**Example output (JSONL stream):** ```jsonl {"type":"metadata","timestamp":"2026-04-03T10:00:00.000Z","session_id":"ses-abc123","agent":{"name":"code-editing-agent","version":"1.0.0","model_name":"gpt-4o"}} @@ -169,7 +169,7 @@ registerSignalHandlers({ ## JSONL Event Types -Headless output now follows the **ATIF-v1.6** protocol documented in `packages/headless/AGENTS.md`. +The runner streams an internal JSONL event protocol documented in `packages/headless/AGENTS.md`. The persisted `trajectory.json` produced by `TrajectoryCollector` conforms to Harbor's **ATIF-v1.4** schema (). Lifecycle annotations on the JSONL stream split into two categories: `approval`, `compaction`, and `interrupt` are persisted into `trajectory.extra.*` buckets (not as `steps[*].source` values); `turn-start` and `error` are transient and stay JSONL-only. ### Event overview @@ -182,6 +182,7 @@ Headless output now follows the **ATIF-v1.6** protocol documented in `packages/h | `compaction` | system | Lifecycle event for history compaction | | `error` | system | Fatal error or iteration-limit event | | `interrupt` | system | Intentional caller interruption (`caller-abort`) | +| `turn-start` | system | Lifecycle annotation emitted right after `agent.stream()` is dispatched, before the first chunk arrives | ### `metadata` @@ -278,6 +279,16 @@ Headless output now follows the **ATIF-v1.6** protocol documented in `packages/h } ``` +### `turn-start` + +```typescript +{ + type: "turn-start", + timestamp: string, + phase: "new-turn" | "intermediate-step", +} +``` + ### TypeScript types ```typescript @@ -291,6 +302,7 @@ import type { CompactionEvent, ErrorEvent, InterruptEvent, + TurnStartEvent, } from "@ai-sdk-tool/headless"; ``` diff --git a/packages/headless/src/__tests__/atif-events.test.ts b/packages/headless/src/__tests__/atif-events.test.ts index ba35796..e7064b9 100644 --- a/packages/headless/src/__tests__/atif-events.test.ts +++ b/packages/headless/src/__tests__/atif-events.test.ts @@ -1,3 +1,18 @@ +/** + * ATIF v1.4 compliance test suite. + * + * These tests are load-bearing: they encode the Harbor ATIF v1.4 shape + * contract (https://www.harborframework.com/docs/agents/trajectory-format) + * as executable assertions. A regression here means `trajectory.json` + * output no longer passes `harbor.utils.trajectory_validator`, which + * breaks terminal-bench runs and any downstream scorer. + * + * Before loosening any assertion here, confirm the change is permitted by + * the current Harbor spec version this package targets (`ATIF-v1.4`) and + * that the Python validator in `packages/cea/benchmark/test_trajectory.py` + * is updated in lock-step. + */ + import { describe, expect, it } from "vitest"; import { TrajectoryCollector } from "../trajectory-collector"; import type { @@ -568,6 +583,109 @@ describe("TrajectoryCollector ATIF compliance", () => { }); }); + it("finalize() aggregates total_cost_usd across step metrics", () => { + const collector = new TrajectoryCollector(); + collector.addMetadata({ + type: "metadata", + timestamp, + session_id: "ses-cost", + agent: { name: "a", version: "1", model_name: "m" }, + }); + collector.addStep({ + type: "step", + step_id: 1, + timestamp, + source: "user", + message: "hi", + }); + collector.addStep({ + type: "step", + step_id: 2, + timestamp, + source: "agent", + message: "a", + metrics: { cost_usd: 0.12, prompt_tokens: 100 }, + }); + collector.addStep({ + type: "step", + step_id: 3, + timestamp, + source: "agent", + message: "b", + metrics: { cost_usd: 0.08, prompt_tokens: 80 }, + }); + + const trajectory = collector.finalize(); + expect(trajectory.final_metrics.total_cost_usd).toBeCloseTo(0.2, 10); + expect(trajectory.final_metrics.total_prompt_tokens).toBe(180); + }); + + it("finalize() returns null total_cost_usd when no step reported a cost", () => { + const collector = new TrajectoryCollector(); + collector.addMetadata({ + type: "metadata", + timestamp, + session_id: "ses-no-cost", + agent: { name: "a", version: "1", model_name: "m" }, + }); + collector.addStep({ + type: "step", + step_id: 1, + timestamp, + source: "user", + message: "hi", + }); + collector.addStep({ + type: "step", + step_id: 2, + timestamp, + source: "agent", + message: "a", + metrics: { prompt_tokens: 100 }, + }); + + const trajectory = collector.finalize(); + expect(trajectory.final_metrics.total_cost_usd).toBeNull(); + }); + + it("finalize() preserves ATIF-v1.4 optional fields (logprobs, prompt_token_ids, completion_token_ids)", () => { + const collector = new TrajectoryCollector(); + collector.addMetadata({ + type: "metadata", + timestamp, + session_id: "ses-v14", + agent: { name: "a", version: "1", model_name: "m" }, + }); + collector.addStep({ + type: "step", + step_id: 1, + timestamp, + source: "user", + message: "hi", + }); + collector.addStep({ + type: "step", + step_id: 2, + timestamp, + source: "agent", + message: "reply", + metrics: { + completion_token_ids: [1722, 310, 5533], + logprobs: [-0.1, -0.05, -0.02], + prompt_token_ids: [1, 2, 3], + prompt_tokens: 3, + }, + }); + + const trajectory = collector.finalize(); + const agentStep = trajectory.steps[1]; + expect(agentStep?.metrics?.logprobs).toStrictEqual([-0.1, -0.05, -0.02]); + expect(agentStep?.metrics?.prompt_token_ids).toStrictEqual([1, 2, 3]); + expect(agentStep?.metrics?.completion_token_ids).toStrictEqual([ + 1722, 310, 5533, + ]); + }); + it("finalize() observation source_call_ids reference valid tool_call_ids", () => { const collector = new TrajectoryCollector(); collector.addMetadata({ diff --git a/packages/headless/src/runner.test.ts b/packages/headless/src/runner.test.ts index a6e188d..6912735 100644 --- a/packages/headless/src/runner.test.ts +++ b/packages/headless/src/runner.test.ts @@ -129,6 +129,228 @@ describe("runHeadless", () => { expect(history.getAll()[0]?.originalContent).toBe("안녕"); }); + it("emits a single turn-start event per logical turn in the normal path", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + + await runHeadless({ + agent: { + stream: () => + createMockStream([{ role: "assistant", content: "hello" }]), + }, + emitEvent: (event) => { + events.push(event); + }, + initialUserMessage: { content: "hi" }, + messageHistory: history, + modelId: "mock-model", + sessionId: "session-turn-start-order", + }); + + const turnStartEvents = events.filter((e) => e.type === "turn-start"); + expect(turnStartEvents).toHaveLength(1); + expect(turnStartEvents[0]).toMatchObject({ + type: "turn-start", + phase: "new-turn", + }); + + const userIndex = events.findIndex( + (e) => e.type === "step" && "source" in e && e.source === "user" + ); + const turnStartIndex = events.findIndex((e) => e.type === "turn-start"); + const agentIndex = events.findIndex( + (e) => e.type === "step" && "source" in e && e.source === "agent" + ); + expect(userIndex).toBeGreaterThanOrEqual(0); + expect(turnStartIndex).toBeGreaterThan(userIndex); + expect(agentIndex).toBeGreaterThan(turnStartIndex); + }); + + it("continues streaming when onStreamStart throws (observer errors are isolated)", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + const consoleErrorSpy = vi + .spyOn(console, "error") + .mockImplementation(() => undefined); + + try { + await runHeadless({ + agent: { + stream: () => + createMockStream([{ role: "assistant", content: "done" }]), + }, + emitEvent: (event) => { + events.push(event); + }, + initialUserMessage: { content: "hi" }, + messageHistory: history, + modelId: "mock-model", + onStreamStart: () => { + throw new Error("observer bug: should not break the stream"); + }, + sessionId: "session-observer-throws", + }); + + expect(events.some((e) => e.type === "turn-start")).toBe(true); + const agentSteps = events.filter( + (e) => e.type === "step" && "source" in e && e.source === "agent" + ); + expect(agentSteps).toHaveLength(1); + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.stringContaining("onStreamStart"), + expect.any(Error) + ); + } finally { + consoleErrorSpy.mockRestore(); + } + }); + + it("does not emit turn-start when agent.stream() rejects before dispatch", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + const streamError = new Error("Provider refused the request"); + + await runHeadless({ + agent: { + stream: () => Promise.reject(streamError), + }, + emitEvent: (event) => { + events.push(event); + }, + initialUserMessage: { content: "hi" }, + messageHistory: history, + modelId: "mock-model", + sessionId: "session-stream-failure", + }).catch(() => undefined); + + expect(events.some((e) => e.type === "turn-start")).toBe(false); + }); + + it("emits turn-start after agent.stream() succeeds (before first chunk)", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + const eventOrder: string[] = []; + let streamCallCount = 0; + + await runHeadless({ + agent: { + stream: () => { + streamCallCount += 1; + eventOrder.push("agent.stream() called"); + return createMockStream([{ role: "assistant", content: "response" }]); + }, + }, + emitEvent: (event) => { + if (event.type === "turn-start") { + eventOrder.push("turn-start emitted"); + } + events.push(event); + }, + initialUserMessage: { content: "hi" }, + messageHistory: history, + modelId: "mock-model", + sessionId: "session-turn-start-order-vs-stream", + }); + + expect(streamCallCount).toBe(1); + const streamIdx = eventOrder.indexOf("agent.stream() called"); + const turnStartIdx = eventOrder.indexOf("turn-start emitted"); + expect(streamIdx).toBeGreaterThanOrEqual(0); + expect(turnStartIdx).toBeGreaterThan(streamIdx); + }); + + it("emits an intermediate-step turn-start on tool-continuation turns", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + let streamCallCount = 0; + + await runHeadless({ + agent: { + stream: () => { + streamCallCount += 1; + if (streamCallCount === 1) { + return createToolCallStream( + [{ role: "assistant", content: "call_1" }], + "tool-calls" + ); + } + return createMockStream([{ role: "assistant", content: "done" }]); + }, + }, + emitEvent: (event) => { + events.push(event); + }, + initialUserMessage: { content: "do it" }, + messageHistory: history, + modelId: "mock-model", + sessionId: "session-intermediate-turn-start", + }); + + expect(streamCallCount).toBe(2); + const turnStartPhases = events + .filter((e) => e.type === "turn-start") + .map((e) => (e as { phase: string }).phase); + expect(turnStartPhases).toEqual(["new-turn", "intermediate-step"]); + }); + + it("excludes turn-start events from the ATIF trajectory JSON", async () => { + const events: TrajectoryEvent[] = []; + const history = new CheckpointHistory(); + const atifOutputPath = `/tmp/plugsuits-turn-start-${Date.now()}.json`; + + await runHeadless({ + agent: { + stream: () => + createMockStream([{ role: "assistant", content: "hello" }]), + }, + atifOutputPath, + emitEvent: (event) => { + events.push(event); + }, + initialUserMessage: { content: "hi" }, + messageHistory: history, + modelId: "mock-model", + sessionId: "session-trajectory-persistence-check", + }); + + expect(events.some((e) => e.type === "turn-start")).toBe(true); + + const { readFileSync, unlinkSync } = await import("node:fs"); + const persisted = JSON.parse(readFileSync(atifOutputPath, "utf-8")) as { + schema_version: string; + steps: Array<{ step_id: number; source: string }>; + extra?: Record; + }; + unlinkSync(atifOutputPath); + + expect(persisted.schema_version).toBe("ATIF-v1.4"); + const stepSources = persisted.steps.map((s) => s.source); + expect(stepSources).not.toContain("turn-start"); + const serialized = JSON.stringify(persisted); + expect(serialized).not.toContain("turn-start"); + }); + + it("does not write an invalid zero-step trajectory when the stream fails before any step", async () => { + const history = new CheckpointHistory(); + const atifOutputPath = `/tmp/plugsuits-zero-step-${Date.now()}.json`; + const { existsSync } = await import("node:fs"); + + await runHeadless({ + agent: { + stream: () => + Promise.reject(new Error("provider unreachable")) as never, + }, + atifOutputPath, + emitEvent: () => undefined, + // Deliberately omit initialUserMessage so no user step precedes the abort. + messageHistory: history, + modelId: "mock-model", + sessionId: "session-zero-step-guard", + }).catch(() => undefined); + + expect(existsSync(atifOutputPath)).toBe(false); + }); + it("does not emit a synthetic user event when no initial user message is given", async () => { const events: TrajectoryEvent[] = []; @@ -374,7 +596,7 @@ describe("runHeadless", () => { expect(capturedMessages[1]?.[0]?.role).toBe("user"); }); - it("emits ATIF-v1.6 step events for user messages and agent tool responses", async () => { + it("emits ATIF-v1.4 step events for user messages and agent tool responses", async () => { const collectedEvents: TrajectoryEvent[] = []; await runHeadless({ @@ -482,6 +704,9 @@ describe("runHeadless", () => { ); expect(agentEvents).toHaveLength(1); expect(agentEvents[0]).toMatchObject({ message: "ok" }); + const turnStartEvents = events.filter((e) => e.type === "turn-start"); + expect(turnStartEvents).toHaveLength(1); + expect(turnStartEvents[0]).toMatchObject({ phase: "new-turn" }); }); it("retries once on no output generated error", async () => { @@ -521,6 +746,8 @@ describe("runHeadless", () => { ); expect(agentEvents).toHaveLength(1); expect(agentEvents[0]).toMatchObject({ message: "ok" }); + const turnStartEvents = events.filter((e) => e.type === "turn-start"); + expect(turnStartEvents).toHaveLength(1); }); it("retries multiple times on no output generated error before succeeding", async () => { @@ -559,6 +786,8 @@ describe("runHeadless", () => { (e) => e.type === "step" && "source" in e && e.source === "agent" ); expect(agentEvents).toHaveLength(1); + const turnStartEvents = events.filter((e) => e.type === "turn-start"); + expect(turnStartEvents).toHaveLength(1); }); it("emits an interrupt event and stops when the caller aborts", async () => { diff --git a/packages/headless/src/runner.ts b/packages/headless/src/runner.ts index a4f6311..19b3ab4 100644 --- a/packages/headless/src/runner.ts +++ b/packages/headless/src/runner.ts @@ -170,6 +170,14 @@ function getRecommendedMaxOutputTokens( return history.getRecommendedMaxOutputTokens(messages); } +/** + * Routes a JSONL stream event to the ATIF-v1.4 trajectory collector, if + * persistence is enabled. The `default` case INTENTIONALLY drops event + * types that are not part of ATIF v1.4 (e.g. `turn-start`, `error`) — they + * stay on the JSONL stream only. Adding a case here must be accompanied by + * a matching `extra.*` persistence path in `TrajectoryCollector.finalize` + * so `trajectory.json` stays ATIF v1.4 compliant. + */ function collectTrajectoryEvent( collector: TrajectoryCollector | null, event: TrajectoryEvent @@ -294,6 +302,9 @@ async function runTodoReminderLoop(params: { export async function runHeadless(config: HeadlessRunnerConfig): Promise { const emitEventSink = config.emitEvent ?? defaultEmitEvent; + // When `atifOutputPath` is set, the collected trajectory is written to + // disk in ATIF v1.4 format. The JSONL stream to stdout is a separate + // surface (internal protocol, not ATIF) and continues regardless. const trajectoryCollector = config.atifOutputPath ? new TrajectoryCollector() : null; @@ -316,6 +327,8 @@ export async function runHeadless(config: HeadlessRunnerConfig): Promise { let totalIterationCount = 0; type StreamTurnResult = Awaited>; type StreamUsage = StreamTurnResult["usage"] | undefined; + let usageProbeGeneration = 0; + const measureUsageIfAvailable = async ( messages: ModelMessage[] ): Promise => { @@ -323,6 +336,13 @@ export async function runHeadless(config: HeadlessRunnerConfig): Promise { return false; } + usageProbeGeneration += 1; + const thisGeneration = usageProbeGeneration; + const historyWithRevision = config.messageHistory as { + getRevision?: () => number; + }; + const startingRevision = historyWithRevision.getRevision?.(); + const measured = normalizeUsageMeasurement( await config.measureUsage(messages) ); @@ -330,6 +350,16 @@ export async function runHeadless(config: HeadlessRunnerConfig): Promise { return false; } + if (thisGeneration !== usageProbeGeneration) { + return false; + } + if ( + startingRevision !== undefined && + historyWithRevision.getRevision?.() !== startingRevision + ) { + return false; + } + config.messageHistory.updateActualUsage({ inputTokens: measured.inputTokens, outputTokens: measured.outputTokens, @@ -635,6 +665,7 @@ export async function runHeadless(config: HeadlessRunnerConfig): Promise { } let overflowRetried = false; let noOutputRetryCount = 0; + let hasEmittedTurnStart = false; const executeStream = async ( streamMessages: ModelMessage[], streamMaxOutputTokens: number | undefined @@ -699,6 +730,23 @@ export async function runHeadless(config: HeadlessRunnerConfig): Promise { ); }), ]); + + if (!hasEmittedTurnStart) { + hasEmittedTurnStart = true; + emitAndCollect({ + type: "turn-start", + phase, + timestamp: new Date().toISOString(), + }); + try { + await config.onStreamStart?.(phase); + } catch (hookError) { + console.error( + "[headless] onStreamStart threw; continuing stream:", + hookError + ); + } + } const nextStepId = stepId + 1; const processStreamResult = await processStream({ emitEvent: emitAndCollect, diff --git a/packages/headless/src/trajectory-collector.ts b/packages/headless/src/trajectory-collector.ts index 1d485b5..419b7d4 100644 --- a/packages/headless/src/trajectory-collector.ts +++ b/packages/headless/src/trajectory-collector.ts @@ -1,3 +1,36 @@ +/** + * ATIF-v1.4 trajectory collector. + * + * This module is the ONLY surface in the headless package that produces the + * persisted `trajectory.json` file, and that file MUST conform to Harbor's + * ATIF v1.4 specification: + * + * https://www.harborframework.com/docs/agents/trajectory-format + * + * ATIF v1.4 compliance rules (load-bearing — do not relax without bumping + * the Harbor spec version this package claims to target): + * + * • `schema_version` is the literal string "ATIF-v1.4". + * • Every {@link AtifStep.step_id} is a sequential integer starting at 1. + * • `steps[*].source` is limited to `"user" | "agent" | "system"`; no + * lifecycle event type is ever persisted as a step source. + * • Persisted lifecycle annotations go under `extra.approval_events`, + * `extra.compaction_events`, and `extra.interrupt_events`. New + * lifecycle types must NOT introduce new top-level fields; pick an + * existing `extra.*` bucket or drop the event from persistence. + * • Transient lifecycle events (`turn-start`, `error`) are JSONL-only + * and are dropped by the collector. + * • `final_metrics` must include every key defined in ATIF v1.4 even if + * the value is `null` (null-when-absent, not omitted). + * • Trajectories with zero steps are NOT persisted — `writeTo` returns + * `false` in that case to avoid producing a file that fails Harbor's + * own validator. + * • Metrics are pulled from the SDK `stream.usage`; never estimated. + * + * The JSONL event stream emitted to stdout by the runner is a DIFFERENT + * surface (internal protocol, not ATIF) — see `types.ts` for that contract. + */ + import { mkdirSync, writeFileSync } from "node:fs"; import { dirname } from "node:path"; import type { @@ -11,6 +44,13 @@ import type { ToolCallData, } from "./types"; +/** + * Shape of a single ATIF v1.4 `Step` entry inside `trajectory.json`. + * + * Any field added here MUST be defined by ATIF v1.4 (or be ignored by the + * spec as part of forward-compatible `extra`). Do not introduce ad-hoc + * step-level fields — put them under `extra` instead. + */ interface AtifStep { extra?: Record; is_copied_context?: boolean; @@ -26,6 +66,14 @@ interface AtifStep { tool_calls?: ToolCallData[]; } +/** + * Shape of the persisted ATIF v1.4 `Trajectory` JSON document. + * + * `schema_version` is the literal "ATIF-v1.4" and must match the Harbor + * spec version this package targets. Bump it only when the underlying + * Harbor spec bumps and this implementation has been audited against the + * new version's required/optional fields. + */ export interface TrajectoryJson { agent: { name: string; version: string; model_name: string }; extra?: { @@ -34,12 +82,13 @@ export interface TrajectoryJson { interrupt_events?: InterruptEvent[]; }; final_metrics: { - total_prompt_tokens: number | null; - total_completion_tokens: number | null; total_cached_tokens: number | null; + total_completion_tokens: number | null; + total_cost_usd: number | null; + total_prompt_tokens: number | null; total_steps: number; }; - schema_version: "ATIF-v1.6"; + schema_version: "ATIF-v1.4"; session_id: string; steps: AtifStep[]; } @@ -103,14 +152,16 @@ export class TrajectoryCollector { } private collectFinalMetrics(): { - total_prompt_tokens: number | null; - total_completion_tokens: number | null; total_cached_tokens: number | null; + total_completion_tokens: number | null; + total_cost_usd: number | null; + total_prompt_tokens: number | null; total_steps: number; } { const prompt: MetricAccumulator = { hasValue: false, total: 0 }; const completion: MetricAccumulator = { hasValue: false, total: 0 }; const cached: MetricAccumulator = { hasValue: false, total: 0 }; + const cost: MetricAccumulator = { hasValue: false, total: 0 }; for (const step of this.steps) { if (!("metrics" in step)) { @@ -125,12 +176,14 @@ export class TrajectoryCollector { addMetric(prompt, metrics.prompt_tokens); addMetric(completion, metrics.completion_tokens); addMetric(cached, metrics.cached_tokens); + addMetric(cost, metrics.cost_usd); } return { total_prompt_tokens: toMetricTotal(prompt), total_completion_tokens: toMetricTotal(completion), total_cached_tokens: toMetricTotal(cached), + total_cost_usd: toMetricTotal(cost), total_steps: this.steps.length, }; } @@ -150,7 +203,7 @@ export class TrajectoryCollector { finalize(): TrajectoryJson { const trajectory: TrajectoryJson = { - schema_version: "ATIF-v1.6", + schema_version: "ATIF-v1.4", session_id: this.metadata?.session_id ?? "unknown", agent: this.metadata?.agent ?? { ...DEFAULT_AGENT }, steps: this.steps.map((s) => this.toAtifStep(s)), @@ -178,10 +231,24 @@ export class TrajectoryCollector { return trajectory; } - writeTo(outputPath: string): void { + /** + * Persists the trajectory to disk in ATIF v1.4 format. + * + * Returns `true` when a file was written, `false` when the write was + * skipped because the trajectory would violate the ATIF v1.4 shape + * contract (currently: no steps emitted). Skipping is preferred over + * writing an invalid document — Harbor's own validator rejects + * `steps: []`, so a zero-step file is worse than no file for any + * downstream consumer. + */ + writeTo(outputPath: string): boolean { + if (this.steps.length === 0) { + return false; + } const trajectory = this.finalize(); mkdirSync(dirname(outputPath), { recursive: true }); writeFileSync(outputPath, JSON.stringify(trajectory, null, 2), "utf-8"); + return true; } reset(): void { diff --git a/packages/headless/src/types.ts b/packages/headless/src/types.ts index 04dbd15..6b351d1 100644 --- a/packages/headless/src/types.ts +++ b/packages/headless/src/types.ts @@ -1,10 +1,23 @@ import type { BeforeTurnResult } from "@ai-sdk-tool/harness"; /** - * ATIF-v1.6 native event types for trajectory logging. + * JSONL streaming protocol for headless trajectory logging. + * + * Note: this is NOT the ATIF schema itself. The persisted trajectory + * written to disk via {@link TrajectoryCollector} conforms to ATIF-v1.4 + * (https://www.harborframework.com/docs/agents/trajectory-format). The + * event union below is the internal stdout JSONL contract that the + * runner emits during execution; the collector consumes these events + * and produces the ATIF trajectory as output. + * + * Lifecycle annotations split into two categories: + * - `approval`, `compaction`, `interrupt` are persisted under + * `trajectory.extra.*` buckets (not as `steps[*].source` values). + * - `turn-start` and `error` stay JSONL-only and are dropped by the + * collector. * * All emitted step events (UserStepEvent, AgentStepEvent) conform to the ATIF specification. - * Metadata is emitted once at run start. Compaction and error events are lifecycle annotations. + * Metadata is emitted once at run start. */ // ============================================================================ @@ -37,12 +50,21 @@ export interface ObservationData { /** * Token usage metrics from the agent's model invocation. - * All fields come from the SDK's stream.usage and are never estimated. + * + * Fields follow ATIF-v1.4 (https://www.harborframework.com/docs/agents/trajectory-format). + * `completion_token_ids` was added in v1.3 (RL), `prompt_token_ids` in v1.4. + * + * Numeric fields come from the SDK's stream.usage and are never estimated. + * Token-id and logprob fields are only populated when the provider exposes + * them; consumers must treat them as opt-in. */ export interface StepMetrics { cached_tokens?: number; + completion_token_ids?: number[]; completion_tokens?: number; cost_usd?: number; + logprobs?: number[]; + prompt_token_ids?: number[]; prompt_tokens?: number; } @@ -128,6 +150,18 @@ export interface InterruptEvent { type: "interrupt"; } +/** + * Emitted immediately after the agent's LLM request is dispatched but before + * any stream chunk has arrived. Consumers can use this as a "prompt + * processing started" signal to render a loading indicator. Lifecycle + * annotation — carries no `step_id`. + */ +export interface TurnStartEvent { + phase: "new-turn" | "intermediate-step"; + timestamp: string; + type: "turn-start"; +} + export type { HistorySnapshot } from "@ai-sdk-tool/harness"; export interface HeadlessRunnerConfig { @@ -158,6 +192,9 @@ export interface HeadlessRunnerConfig { phase: "new-turn" | "intermediate-step" ) => BeforeTurnResult | Promise | undefined; onInterrupt?: (event: InterruptEvent) => Promise | void; + onStreamStart?: ( + phase: "new-turn" | "intermediate-step" + ) => void | Promise; onTodoReminder?: () => Promise<{ hasReminder: boolean; message: string | null; @@ -197,7 +234,20 @@ export interface MetadataEvent { // ============================================================================ /** - * The complete union of all trajectory event types. + * The complete union of all JSONL stream event types. + * + * This union defines the INTERNAL JSONL protocol — it is NOT the ATIF + * schema. Adding a new event type here is additive and non-breaking, but + * the corresponding switch in `collectTrajectoryEvent` must explicitly + * decide whether the new type is: + * + * • persisted into an ATIF v1.4 `extra.*` bucket (lifecycle annotation + * that Harbor accepts as forward-compatible extension), or + * • dropped (lives only on the JSONL stream; `trajectory.json` stays + * ATIF v1.4 compliant). + * + * It MUST NEVER be persisted as a top-level ATIF field or as a new + * `steps[*].source` value — that would violate the Harbor spec. */ export type TrajectoryEvent = | StepEvent @@ -205,4 +255,5 @@ export type TrajectoryEvent = | ErrorEvent | ApprovalEvent | InterruptEvent - | MetadataEvent; + | MetadataEvent + | TurnStartEvent; diff --git a/packages/tui/src/agent-tui.ts b/packages/tui/src/agent-tui.ts index 3182d12..97dd094 100644 --- a/packages/tui/src/agent-tui.ts +++ b/packages/tui/src/agent-tui.ts @@ -105,6 +105,10 @@ interface ContextPressureThresholds { const style = (prefix: string, text: string): string => `${prefix}${text}${ANSI_RESET}`; +const ignore = (): void => { + return; +}; + class StatusSpinner extends Text { private readonly frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; private currentFrame = 0; @@ -652,6 +656,9 @@ export interface AgentTUIConfig { iteration: number; phase: "new-turn" | "intermediate-step"; }) => Promise | void; + onStreamStart?: ( + phase: "new-turn" | "intermediate-step" + ) => void | Promise; onTurnComplete?: ( messages: CheckpointMessage[], usage?: { @@ -752,6 +759,8 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { let inputResolver: null | ((value: string | null) => void) = null; let lastCtrlCPressAt = 0; let foregroundStatus: StatusSpinner | null = null; + let foregroundStatusMessage: string | null = null; + let foregroundStatusBeforeBlocking: string | null = null; const backgroundStatuses = new Map(); let blockingCompactionActive = false; let commandInputListenerActive = false; @@ -828,6 +837,7 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { }; const clearStatus = (): void => { + foregroundStatusMessage = null; if (!foregroundStatus) { tui.requestRender(); return; @@ -842,6 +852,7 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { foregroundStatus.stop(); } foregroundStatus = createStatusSpinner(message); + foregroundStatusMessage = message; renderForegroundStatus(); }; @@ -907,12 +918,27 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { "Compacting...", "running" ); + if ( + foregroundStatusBeforeBlocking === null && + foregroundStatusMessage !== null + ) { + foregroundStatusBeforeBlocking = foregroundStatusMessage; + } + if (foregroundStatusMessage !== null) { + showLoader("Compacting..."); + } userCompactionCallbacks?.onBlockingChange?.(event); return; } blockingCompactionActive = false; clearBackgroundStatus("blocking-compaction"); + if (foregroundStatusBeforeBlocking !== null) { + if (foregroundStatusMessage !== null) { + showLoader(foregroundStatusBeforeBlocking); + } + foregroundStatusBeforeBlocking = null; + } updateHeader(); tui.requestRender(); userCompactionCallbacks?.onBlockingChange?.(event); @@ -1012,6 +1038,8 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { } }; + let usageProbeGeneration = 0; + const measureUsageIfAvailable = async ( messages: ModelMessage[] ): Promise => { @@ -1019,6 +1047,10 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { return false; } + usageProbeGeneration += 1; + const thisGeneration = usageProbeGeneration; + const startingRevision = config.messageHistory.getRevision?.(); + const measured = normalizeUsageMeasurement( await config.measureUsage(messages) ); @@ -1026,6 +1058,16 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { return false; } + if (thisGeneration !== usageProbeGeneration) { + return false; + } + if ( + startingRevision !== undefined && + config.messageHistory.getRevision?.() !== startingRevision + ) { + return false; + } + config.messageHistory.updateActualUsage({ inputTokens: measured.inputTokens, outputTokens: measured.outputTokens, @@ -1043,6 +1085,10 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { await measureUsageIfAvailable(messages); }; + const runBackgroundStartupProbe = (): void => { + measureUsageIfAvailable([]).then(ignore, ignore); + }; + const cancelActiveStream = (): boolean => { if (!activeStreamController || activeStreamController.signal.aborted) { return false; @@ -1495,18 +1541,20 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { overflowRetried = false, noOutputRetryCount = 0 ): Promise<"completed" | "continue" | "interrupted"> => { - const preparedTurn = await prepareMessages(phase); - let messagesForLLM = preparedTurn.messages; - const turnOverrides = preparedTurn.turnOverrides; - - showLoader("Working..."); const streamAbortController = new AbortController(); activeStreamController = streamAbortController; streamInterruptRequested = false; try { + showLoader("Processing..."); + + const preparedTurn = await prepareMessages(phase); + let messagesForLLM = preparedTurn.messages; + const turnOverrides = preparedTurn.turnOverrides; + const budget = await resolveTurnBudget(phase, messagesForLLM); messagesForLLM = budget.messagesForLLM; + const stream = await config.agent.stream( mergeAgentStreamOptions({ abortSignal: streamAbortController.signal, @@ -1516,6 +1564,16 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { }) ); + showLoader("Working..."); + try { + await config.onStreamStart?.(phase); + } catch (hookError) { + console.error( + "[tui] onStreamStart threw; continuing stream:", + hookError + ); + } + const clearStreamingLoader = createStreamingLoaderClearer(); await renderAgentStream( @@ -1823,9 +1881,10 @@ export async function createAgentTUI(config: AgentTUIConfig): Promise { try { await config.onSetup?.(); - await measureUsageIfAvailable([]); updateHeader(); + runBackgroundStartupProbe(); + while (!shouldExit) { const input = await waitForInput(); if (input === null) { diff --git a/packages/tui/src/stream-handlers.ts b/packages/tui/src/stream-handlers.ts index 87462d0..e9e5ce1 100644 --- a/packages/tui/src/stream-handlers.ts +++ b/packages/tui/src/stream-handlers.ts @@ -417,10 +417,12 @@ export const isVisibleStreamPart = ( case "text-end": case "reasoning-end": case "start": - case "text-start": - case "reasoning-start": case "tool-input-end": return false; + case "text-start": + return true; + case "reasoning-start": + return flags.showReasoning; case "reasoning-delta": return flags.showReasoning; case "tool-result":