From bf868f0f0a779560ba2779384b8e159eedfc3546 Mon Sep 17 00:00:00 2001 From: henryqingmo Date: Thu, 28 May 2026 00:11:55 -0700 Subject: [PATCH 1/3] docs: spec eval runner operator UI --- ...26-05-28-eval-runner-operator-ui-design.md | 342 ++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-28-eval-runner-operator-ui-design.md diff --git a/docs/superpowers/specs/2026-05-28-eval-runner-operator-ui-design.md b/docs/superpowers/specs/2026-05-28-eval-runner-operator-ui-design.md new file mode 100644 index 0000000..d3278df --- /dev/null +++ b/docs/superpowers/specs/2026-05-28-eval-runner-operator-ui-design.md @@ -0,0 +1,342 @@ +# Eval Runner Operator UI — Design + +**Date:** 2026-05-28 + +## Overview + +Make the eval runner demo feel like a single ToolGate resilience operator surface. The current custom runner UI exposes a YAML textarea and waits for one blocking response. The new flow should let a demo operator choose a scenario, see the intentionally degraded stack state, run the eval with an elapsed timer, watch case results appear as they finish, and inspect the audit decisions that explain each verdict. + +This pass stays within `cmd/eval-runner`. It does not make the browser UI start or stop Docker services. Fault injection remains owned by the existing demo script/operator workflow. + +## Goals + +- Replace the YAML textarea as the main path with preset scenario cards. +- Keep custom YAML available for power users. +- Show elapsed time while a run is active, especially for the 15s approval timeout case. +- Render per-case audit decisions as colored trace badges. +- Stream case-level progress so earlier cases appear before later blocking cases finish. +- Add a small stack-health strip so expected outages are visible in the UI. + +## Non-Goals + +- No gateway policy or approval behavior changes. +- No Docker lifecycle control from the browser. +- No persistence of run history beyond the current page session. +- No migration to a frontend build system; `cmd/eval-runner/ui.html` remains a static embedded page. + +## Current Context + +The existing server in `cmd/eval-runner/serve.go` serves `ui.html`, exposes blocking eval endpoints, and accepts custom YAML through `POST /run-eval/custom`. `CaseRunner.Run` already returns audit `TraceRow` values from Postgres. `CaseResult` has a `Trace` field and `TraceRow` has JSON tags in the current workspace, but `Evaluate` does not populate the trace into the result yet. + +The existing UI has: + +- Agent URL input. +- Test Suite YAML textarea. +- Blocking `fetch('/run-eval/custom')`. +- Results table with case, pass/fail, and failures. + +## Architecture + +### Components + +| Unit | Responsibility | +|---|---| +| Scenario preset display data in `ui.html` | Holds scenario labels, descriptions, default URL values, display-only YAML/plan copy, and expected stack-state copy. Server scenario definitions remain authoritative. | +| Streaming eval handler in `serve.go` | Parses custom suite input and emits one event per case plus a final summary. | +| Streaming scenario handler in `serve.go` | Runs named built-in scenarios that are not expressible as plain eval YAML. | +| Shared case execution helper in `serve.go` | Runs one `EvalCase`, evaluates it, attaches trace, and returns `CaseResult`. | +| Retry storm executor in `serve.go` | Replays the gateway retry loop currently implemented in `scripts/demo-resilience.sh` and returns a normal `CaseResult`. | +| Stack health handler in `serve.go` | Returns lightweight service status values for the UI strip. | +| Result renderer in `ui.html` | Appends streaming case results and renders decision badges from `CaseResult.Trace`. | + +### Scenario Registry + +Preset scenario definitions are server-authoritative. The UI may display YAML or a scenario plan for clarity, but preset cards run by `scenario_id`, not by trusting client-provided scenario definitions. Displayed preset YAML is explanatory copy and must not be posted back as the source of truth for preset runs. + +| Scenario ID | Label | Executor | Target URL | Events/Total | +|---|---|---|---|---| +| `mcp-crash` | MCP Crash | YAML suite with `mcp-server-down` | eval agent URL | 1 case | +| `retry-storm` | Retry Storm | Built-in retry storm executor | gateway MCP URL | 1 case | +| `approval-timeout` | Approval Timeout | YAML suite with `approval-timeout-slack-down` | eval agent URL | 1 case | + +Default URLs: + +- Eval agent URL: request `agent_url`, else `AGENT_URL`. Serve mode already requires `AGENT_URL` at startup through `LoadConfig()`. +- Gateway MCP URL: request `gateway_mcp_url`, else `GATEWAY_MCP_URL`, else `http://localhost:18080/mcp`. + +There is no runnable `all-scenarios` preset in this pass. The full three-part demo still requires external stack transitions: MCP down for MCP Crash and Retry Storm, then MCP restored and Slack down for Approval Timeout. The browser UI surfaces and runs each scenario once the operator has prepared the matching stack state. + +Serve-mode startup keeps the existing `LoadConfig()` contract for this pass: `POSTGRES_DSN` and `AGENT_URL` are required environment variables even when the selected scenario later uses only the gateway MCP URL. The UI can override the eval agent URL per request after startup. + +### Data Flow + +``` +operator selects preset + -> UI displays preset plan/YAML and posts scenario_id plus relevant URLs + -> UI starts timer and opens stream request + -> serve.go runs either a YAML suite or a named built-in scenario + -> after each case: runner.Run -> Evaluate -> attach trace -> emit case event + -> UI appends/updates result row with trace badges + -> final summary event updates verdict badge and stops timer +``` + +## Backend Design + +### Case Result Trace + +`Evaluate(c, trace)` should preserve the trace in the returned `CaseResult` for both passing and failing cases: + +```go +result := CaseResult{Name: c.Name, Trace: trace} +``` + +For runner errors before a trace is available, the result should include the run failure and leave `Trace` empty. + +### Shared Execution Helper + +Add a helper around the repeated case execution logic: + +```go +func runEvalCase(ctx context.Context, runner caseExecutor, testCase EvalCase) CaseResult +``` + +Both the blocking handler and streaming handler should use this helper so JSON and streaming behavior stay consistent. + +### Streaming Endpoint + +Add: + +```text +POST /run-eval/custom/stream +``` + +Request body matches `/run-eval/custom`: + +```json +{ + "agent_url": "http://127.0.0.1:18086", + "suite": "cases:\n - ..." +} +``` + +Response should use server-sent events: + +```text +Content-Type: text/event-stream +Cache-Control: no-cache +``` + +Event shapes: + +```text +event: case_start +data: {"name":"mcp-server-down","index":0,"total":3} + +event: case_result +data: {"index":0,"total":3,"result":{"name":"mcp-server-down","passed":true,"trace":[...]}} + +event: summary +data: {"passed":true,"pass_count":3,"total_count":3,"cases":[...],"report":"..."} +``` + +If suite parsing or request validation fails before streaming starts, return the same HTTP error style as the blocking endpoint. If a case run fails, emit a `case_result` containing a failed `CaseResult`; do not terminate the whole stream unless the client context is canceled or writing fails. + +Use `http.Flusher` and flush after every event. If the server cannot flush, return HTTP 500 before running cases. + +### Scenario Streaming Endpoint + +Add: + +```text +POST /run-scenario/stream +``` + +Request: + +```json +{ + "scenario_id": "retry-storm", + "agent_url": "http://127.0.0.1:18086", + "gateway_mcp_url": "http://localhost:18080/mcp" +} +``` + +Behavior: + +- `mcp-crash` and `approval-timeout` use server-defined YAML suites and the same internal execution path as `/run-eval/custom/stream`. +- `retry-storm` runs a built-in gateway retry loop because the current `EvalCase` model cannot express repeated direct MCP calls within one turn. +- Unknown scenario IDs return HTTP 400. + +The frontend calls `/run-scenario/stream` for preset cards and `/run-eval/custom/stream` only for the custom editable YAML mode. The request contract for `/run-scenario/stream` does not accept client-provided suite YAML. + +Server-side URL validation: + +- YAML-backed scenarios require an absolute `agent_url` after request/env resolution. +- Retry Storm requires an absolute `gateway_mcp_url` after request/env/default resolution and must not require or use `agent_url`. +- Invalid URLs return HTTP 400 before streaming starts. + +### Retry Storm Executor + +The retry storm executor should mirror the proven shell flow in `scripts/demo-resilience.sh`: + +1. Initialize a gateway MCP session by posting `initialize` to the resolved gateway MCP URL. +2. Capture the `Mcp-Session-Id` response header from initialize. +3. Reuse a single `X-Mcp-Turn-Id`. +4. Send `Mcp-Session-Id` and `X-Mcp-Turn-Id` on every `tools/call`. +5. Call `tools/call` for `list_recent_charges` up to six times. +6. Stop once a response contains the budget limiter error. +7. Poll `audit_log` for the captured session trace until the terminal `budgetExceeded` row appears or the audit poll timeout expires, matching the existing async-audit handling in `CaseRunner.Run`. +8. Return `CaseResult{Name: "retry-storm-budget", Passed: true, Trace: trace}` if any trace row has decision `budgetExceeded`. + +If initialization fails, calls never reach `budgetExceeded`, or the audit query fails, return a failed `CaseResult` with a `run` or `policyOutcome` failure. This executor should not stop or start Docker services; it assumes the operator or demo script has already placed the stack in the intended state. + +Required external precondition for Retry Storm: the gateway capability cache must already be warmed while MCP is healthy, then `localstripe-mcp` must be stopped. This mirrors `scripts/demo-resilience.sh`; without warm cache, the gateway may fail earlier during capability/session setup instead of demonstrating the budget limiter. + +### Blocking Endpoint Compatibility + +`POST /run-eval/custom` should keep its current response shape. The only intentional response change is that each case may now include `trace` when audit rows were available. Existing consumers that ignore unknown fields remain compatible. + +### Stack Health Endpoint + +Add: + +```text +GET /stack-health +``` + +Response: + +```json +{ + "services": [ + {"name":"Gateway","status":"up","detail":"http://localhost:18080"}, + {"name":"MCP","status":"down","detail":"http://localhost:18421"}, + {"name":"Slack","status":"up","detail":"http://localhost:18090"}, + {"name":"Postgres","status":"up","detail":"configured DSN reachable"} + ] +} +``` + +Statuses are `up`, `down`, or `unknown`. The handler should use short timeouts and avoid expensive operations: + +- Gateway: HTTP probe to `http://localhost:18080/healthz` if configured or hardcoded for the demo compose port. +- MCP: HTTP probe to `http://localhost:18421/mcp` is not a safe generic health call because MCP initialize requires a POST; use TCP dial or mark `unknown` if a safe probe is not available. +- Slack: HTTP probe to `http://localhost:18090/healthz`. +- Postgres: use the existing pool `Ping` with a short context timeout. + +Use a 750ms timeout per probe. If a probe target is not configured and no demo default is available, report `unknown` instead of failing the endpoint. + +## Frontend Design + +### Layout + +The first screen is an operator dashboard: + +- Header with title and final verdict badge area. +- Scenario cards as the primary control row. +- Stack health strip near the run controls. +- URL controls for both eval agent URL and gateway MCP URL. The eval agent URL is required for YAML-backed presets and custom YAML; the gateway MCP URL is required for Retry Storm. +- Run button and elapsed timer. +- Results panel that can show `queued`, `running`, `PASS`, and `FAIL`. +- Collapsible or secondary scenario plan/editor below the primary controls. + +The scenario plan/editor is read-only for preset modes. A `Customize YAML` action copies the displayed YAML-backed preset into editable `Custom YAML` mode. Non-YAML presets such as Retry Storm show a read-only plan and do not offer direct plan editing. + +### Presets + +Initial presets: + +- `MCP Crash`: runs the `mcp-server-down` suite with agent URL `http://127.0.0.1:18086`. +- `Retry Storm`: runs the built-in `retry-storm` executor against `http://localhost:18080/mcp`. +- `Approval Timeout`: runs the `approval-timeout-slack-down` suite with agent URL `http://127.0.0.1:18086`. + +For YAML-backed presets, the scenario plan/editor shows display-only YAML matching the server-defined scenario. For the retry storm preset, it shows a read-only scenario plan because the run is server-authoritative and uses a non-YAML executor. The operator can enter `Custom YAML` mode only through `Customize YAML` or by choosing the custom mode explicitly; custom mode runs `/run-eval/custom/stream` instead of `/run-scenario/stream`. + +Client-side validation: + +- YAML-backed presets and custom YAML require an absolute eval agent URL. +- Retry Storm requires an absolute gateway MCP URL and does not require an eval agent URL. +- The UI should not show an `All Scenarios` run button or imply stack transitions happen automatically. + +### Elapsed Timer + +When a run starts: + +- Clear previous errors. +- Set elapsed to `00:00`. +- Tick once per second until summary, stream error, or request cancellation. +- Keep the run button disabled while active. + +### Streaming Results + +The browser should use `fetch()` and read the response body stream. Native `EventSource` is not suitable for a JSON `POST` body. The UI can parse SSE frames from the fetch stream. + +Behavior: + +- On `case_start`, mark the case row as running. +- On `case_result`, render pass/fail, failures, and trace badges. +- On `summary`, update final verdict and stop the timer. +- On stream parse or network failure, show the error box and stop the timer. + +### Decision Trace Badges + +Badge color mapping: + +| Decision | Color | +|---|---| +| `allow` | green | +| `approvalRequired` | yellow | +| `expired` | gray | +| `deny` | red | +| `upstream_error` | red | +| `budgetExceeded` | orange | +| other/empty | slate | + +Badge text should be compact: + +```text +list_recent_charges -> allow +create_refund -> approvalRequired +create_refund -> expired +``` + +Tool names and decisions must be escaped before insertion into the DOM. + +## Error Handling + +- Invalid custom suite: show HTTP 400 text in the existing error box. +- Missing agent URL or YAML: validate client-side before request. +- Streaming unsupported by server: show a clear HTTP error. +- Mid-stream case failure: render that case as failed and continue to later cases if the server can continue. +- Mid-stream transport failure: show the error box and leave already-rendered case results visible. +- Empty trace: render a muted `no audit trace` label instead of an empty panel. + +## Testing + +Backend unit tests: + +- `Evaluate` includes trace on pass and fail. +- Blocking JSON response includes `trace` fields when present. +- Streaming handler emits `case_start`, `case_result`, and `summary` in order. +- Streaming handler emits a failed `case_result` for runner errors and continues to the next case. +- Scenario streaming handler runs the retry storm executor for `scenario_id=retry-storm`. +- Retry storm executor passes only when the trace contains `budgetExceeded`. +- Retry storm executor polls for delayed async audit visibility before failing. +- Stack health handler returns a JSON response even when probes fail. + +Manual UI verification: + +- Load `POSTGRES_DSN=postgres://gateway:gateway@127.0.0.1:15432/gateway?sslmode=disable AGENT_URL=http://127.0.0.1:18086 go run ./cmd/eval-runner --serve evalsuite/resilience.yaml`. +- Select a preset and confirm YAML/agent URL populate. +- Run a suite and confirm timer ticks. +- Confirm first case result appears before an approval-timeout case completes. +- Prepare Retry Storm by warming gateway capability cache while MCP is healthy, stopping MCP, then running the preset; confirm delayed audit rows are polled until `budgetExceeded`. +- Confirm trace badges render with the expected colors and no overlapping text at desktop width. +- Interrupt a streaming run by stopping the target service or closing the request and confirm already-rendered rows remain visible. + +## Implementation Notes + +- Keep all JavaScript in `ui.html` for this pass to match the current embedded static UI. +- Prefer DOM creation over large `innerHTML` strings where user-controlled values are rendered. +- Keep `/run-eval/custom` stable for existing scripts and tests. +- Do not commit `.superpowers/brainstorm/` visual companion artifacts. From 0fa2fecb23c4230d9c3336256bbe737dac660bdf Mon Sep 17 00:00:00 2001 From: henryqingmo Date: Thu, 28 May 2026 01:20:44 -0700 Subject: [PATCH 2/3] fix: gateway health probe, capability cache warmup, and demo README - Stack health was probing /healthz (404) instead of /mcp (200) on the gateway, causing it to always show as down - Add warmGatewayCapCache() called on eval-server startup to prime the gateway's tools/list cache while all services are healthy; fixes MCP Crash scenario failing when cache is cold - Write README with per-scenario setup instructions, warmup commands, and capability cache explanation Co-Authored-By: Claude Sonnet 4.6 --- README.md | 167 ++++++++++++ cmd/eval-runner/scenarios.go | 444 ++++++++++++++++++++++++++++++++ cmd/eval-runner/serve.go | 52 ++-- cmd/eval-runner/stack_health.go | 82 ++++++ 4 files changed, 717 insertions(+), 28 deletions(-) create mode 100644 README.md create mode 100644 cmd/eval-runner/scenarios.go create mode 100644 cmd/eval-runner/stack_health.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..f49a57f --- /dev/null +++ b/README.md @@ -0,0 +1,167 @@ +# ToolGate + +ToolGate is an MCP gateway that enforces policy on every tool call an AI agent makes — logging decisions, requiring human approval for sensitive operations, and surfacing clean errors when upstream services fail. + +## Prerequisites + +- Docker + Docker Compose +- Go 1.22+ +- `ANTHROPIC_API_KEY` set in your environment (or in `.env`) + +## Quick start — resilience demo UI + +The demo UI lets you run three fault-injection scenarios against a live stack and watch the audit trail update in real time. + +### 1. Build the gateway binary + +The compose stack mounts a pre-built binary instead of compiling inside Docker: + +```bash +make build-compose-bins +``` + +### 2. Start the full stack + +```bash +source .env # loads ANTHROPIC_API_KEY and optional overrides +docker compose up -d --wait +``` + +Services started: + +| Service | Host port | Purpose | +|---|---|---| +| `gateway` | 18080 | ToolGate MCP gateway | +| `localstripe` | 18420 | Fake Stripe API | +| `localstripe-mcp` | 18421 | MCP server wrapping localstripe | +| `eval-trigger` | 18086 | Python agent that the eval runner drives | +| `mock-slack` | 18090 | Fake Slack (receives approval requests) | +| `postgres` | 15432 | Audit log store | + +### 3. Start the eval runner UI + +```bash +POSTGRES_DSN="postgres://gateway:gateway@127.0.0.1:15432/gateway?sslmode=disable" \ +AGENT_URL="http://127.0.0.1:18086" \ +go run ./cmd/eval-runner --serve evalsuite/resilience.yaml +``` + +Open **http://localhost:8099** in your browser. + +--- + +## Running the three scenarios + +Each scenario requires a specific stack state. The **Stack Health** panel in the UI shows the current state of each service — use **Refresh Health** before running. + +### Scenario 1 — MCP Crash + +**What it tests:** Gateway surfaces a clean `upstream_error` when the upstream MCP server is unavailable. + +**Required state:** Gateway up, MCP down, Slack any, Postgres up. + +```bash +# Warm the gateway capability cache while MCP is healthy +SESSION=$(curl -s -D - -X POST http://localhost:18080/mcp \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":0,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"warmup","version":"1.0"}}}' \ + | grep -i "^Mcp-Session-Id:" | awk '{print $2}' | tr -d '\r\n') +curl -s -X POST http://localhost:18080/mcp \ + -H "Content-Type: application/json" \ + -H "Mcp-Session-Id: $SESSION" \ + -d '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' > /dev/null + +# Inject the fault +docker compose stop localstripe-mcp +``` + +Click **MCP Crash → Run Scenario**. + +**Expected result:** `list_recent_charges → allow → upstream_error` — the gateway served the tool list from its capability cache and recorded the upstream failure. + +### Scenario 2 — Retry Storm + +**What it tests:** Budget limiter stops an agent from hammering a downed service. + +**Required state:** Gateway up, MCP down (carry over from Scenario 1). + +No additional setup needed. Click **Retry Storm → Run Scenario**. + +**Expected result:** Five `allow` decisions followed by `budgetExceeded`. + +### Scenario 3 — Approval Timeout + +**What it tests:** An `approvalRequired` decision expires gracefully when Slack is unreachable. + +**Required state:** Gateway up, MCP up, Slack down, Postgres up. + +```bash +# Restore MCP +docker compose start localstripe-mcp + +# Wait for it to become healthy, then seed demo charges for alice@example.com +until docker inspect toolgate-localstripe-mcp-1 \ + --format '{{.State.Health.Status}}' 2>/dev/null | grep -q healthy; do sleep 2; done + +docker exec toolgate-eval-trigger-1 python3 -c " +import asyncio, sys +sys.path.insert(0, '/app') +from demo_webapp.stripe_client import StripeClient +from demo_webapp.seed import seed_demo_customer + +async def main(): + client = StripeClient('http://localstripe:8420', 'sk_test_12345') + cust = await client.find_customer_by_email('alice@example.com') + if cust is None: + cust = await client.create_customer('alice@example.com', 'Alice') + await seed_demo_customer(client, cust['id']) + await client.aclose() + +asyncio.run(main()) +" + +# Re-warm gateway after MCP restart +SESSION=$(curl -s -D - -X POST http://localhost:18080/mcp \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":0,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"warmup","version":"1.0"}}}' \ + | grep -i "^Mcp-Session-Id:" | awk '{print $2}' | tr -d '\r\n') +curl -s -X POST http://localhost:18080/mcp \ + -H "Content-Type: application/json" \ + -H "Mcp-Session-Id: $SESSION" \ + -d '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}' > /dev/null + +# Stop Slack +docker compose stop mock-slack +``` + +Click **Approval Timeout → Run Scenario**. The case waits ~15 s for the approval TTL to expire. + +**Expected result:** `list_recent_charges → allow`, `create_refund → approvalRequired → expired`. + +--- + +## Scripted end-to-end run + +To run all three scenarios headlessly in one shot: + +```bash +make demo-resilience +``` + +This script manages the full Docker lifecycle, runs each scenario in sequence, and tears down the stack on exit. + +--- + +## Gateway capability cache + +The gateway caches the last successful `initialize` and `tools/list` responses from the upstream MCP server. When the upstream is unavailable, it serves tool metadata from this cache so agents can still discover tools — requests then fail with `upstream_error` at the call site rather than at tool-list time. + +**Important:** the cache is populated the first time a successful `tools/list` reaches the gateway. Always warm it (see Scenario 1 setup above) before stopping the MCP server. + +--- + +## Teardown + +```bash +docker compose down -v # stops all services and removes volumes +``` diff --git a/cmd/eval-runner/scenarios.go b/cmd/eval-runner/scenarios.go new file mode 100644 index 0000000..063aac2 --- /dev/null +++ b/cmd/eval-runner/scenarios.go @@ -0,0 +1,444 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +const defaultGatewayMCPURL = "http://localhost:18080/mcp" + +const mcpCrashSuiteYAML = `cases: + - name: mcp-server-down + input: "Show me my recent charges." + mustInclude: + - list_recent_charges + policyOutcome: upstream_error +` + +const approvalTimeoutSuiteYAML = `cases: + - name: approval-timeout-slack-down + input: > + List recent charges for alice@example.com, then issue a full refund on + the first non-refunded charge with reason requested_by_customer. + Do not ask for confirmation — proceed directly. + mustInclude: + - list_recent_charges + - create_refund + policyOutcome: expired +` + +type scenarioDeps struct { + pool *pgxpool.Pool + newRunner runnerFactory + newRetryStorm func(gatewayURL string) scenarioCaseExecutor + defaultAgentURL string + defaultGatewayMCPURL string +} + +type scenarioCaseExecutor interface { + Run(ctx context.Context) CaseResult +} + +type scenarioCaseExecutorFunc func(context.Context) CaseResult + +func (f scenarioCaseExecutorFunc) Run(ctx context.Context) CaseResult { + return f(ctx) +} + +type retryStormExecutor struct { + gatewayMCPURL string + pool *pgxpool.Pool + client *http.Client + initialize func(context.Context) (string, error) + callGateway func(context.Context, string, string) (string, error) + queryTrace func(context.Context, string) ([]TraceRow, error) + newTurnID func() string + pollInterval time.Duration + pollTimeout time.Duration +} + +func makeScenarioStreamHandler(deps scenarioDeps) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var body struct { + ScenarioID string `json:"scenario_id"` + AgentURL string `json:"agent_url"` + GatewayMCPURL string `json:"gateway_mcp_url"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) + return + } + if _, ok := w.(http.Flusher); !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + switch body.ScenarioID { + case "mcp-crash", "approval-timeout": + agentURL, err := resolveAbsoluteURL(body.AgentURL, deps.defaultAgentURL) + if err != nil { + http.Error(w, "missing or invalid agent_url", http.StatusBadRequest) + return + } + if deps.newRunner == nil { + http.Error(w, "runner unavailable", http.StatusInternalServerError) + return + } + suiteYAML := mcpCrashSuiteYAML + if body.ScenarioID == "approval-timeout" { + suiteYAML = approvalTimeoutSuiteYAML + } + suite, err := LoadSuiteFromReader(strings.NewReader(suiteYAML)) + if err != nil { + http.Error(w, fmt.Sprintf("invalid scenario suite: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + streamEvalSuite(r.Context(), w, deps.newRunner(agentURL), suite.Cases) + case "retry-storm": + gatewayURL, err := resolveGatewayMCPURL(body.GatewayMCPURL, deps.defaultGatewayMCPURL) + if err != nil { + http.Error(w, "missing or invalid gateway_mcp_url", http.StatusBadRequest) + return + } + if deps.newRetryStorm == nil { + http.Error(w, "retry storm executor unavailable", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + + if err := writeSSE(w, "case_start", caseStartEvent{Name: "retry-storm-budget", Index: 0, Total: 1}); err != nil { + return + } + result := deps.newRetryStorm(gatewayURL).Run(r.Context()) + if err := writeSSE(w, "case_result", caseResultEvent{Index: 0, Total: 1, Result: result}); err != nil { + return + } + _ = writeSSE(w, "summary", summarizeResults([]CaseResult{result})) + default: + http.Error(w, "unknown scenario_id", http.StatusBadRequest) + } + } +} + +// warmGatewayCapCache calls initialize + tools/list on the gateway so the +// capability cache is populated before localstripe-mcp is stopped for the +// MCP Crash scenario. Failures are logged and ignored — the cache may already +// be warm from a prior run. +func warmGatewayCapCache(gatewayMCPURL string) { + client := &http.Client{Timeout: 5 * time.Second} + + initBody, _ := json.Marshal(map[string]any{ + "jsonrpc": "2.0", "id": 0, "method": "initialize", + "params": map[string]any{ + "protocolVersion": "2025-03-26", + "capabilities": map[string]any{}, + "clientInfo": map[string]any{"name": "eval-warmup", "version": "1.0"}, + }, + }) + req, err := http.NewRequest(http.MethodPost, gatewayMCPURL, bytes.NewReader(initBody)) + if err != nil { + slog.Warn("gateway warmup: build initialize request", "err", err) + return + } + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + slog.Warn("gateway warmup: initialize failed", "err", err) + return + } + sessionID := resp.Header.Get("Mcp-Session-Id") + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK || sessionID == "" { + slog.Warn("gateway warmup: initialize returned unexpected status", "status", resp.StatusCode) + return + } + + listBody, _ := json.Marshal(map[string]any{ + "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": map[string]any{}, + }) + req2, err := http.NewRequest(http.MethodPost, gatewayMCPURL, bytes.NewReader(listBody)) + if err != nil { + slog.Warn("gateway warmup: build tools/list request", "err", err) + return + } + req2.Header.Set("Content-Type", "application/json") + req2.Header.Set("Mcp-Session-Id", sessionID) + resp2, err := client.Do(req2) + if err != nil { + slog.Warn("gateway warmup: tools/list failed", "err", err) + return + } + _ = resp2.Body.Close() + slog.Info("gateway warmup: capability cache primed", "gateway", gatewayMCPURL) +} + +func resolveAbsoluteURL(requestValue, fallback string) (string, error) { + candidate := strings.TrimSpace(requestValue) + if candidate == "" { + candidate = strings.TrimSpace(fallback) + } + if candidate == "" { + return "", errors.New("missing url") + } + parsed, err := url.Parse(candidate) + if err != nil || parsed.Scheme == "" || parsed.Host == "" { + return "", errors.New("invalid url") + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return "", errors.New("invalid url") + } + return candidate, nil +} + +func resolveGatewayMCPURL(requestValue, fallback string) (string, error) { + if strings.TrimSpace(requestValue) != "" { + return resolveAbsoluteURL(requestValue, "") + } + if resolved, err := resolveAbsoluteURL("", fallback); err == nil { + return resolved, nil + } + if resolved, err := resolveAbsoluteURL("", os.Getenv("GATEWAY_MCP_URL")); err == nil { + return resolved, nil + } + return resolveAbsoluteURL("", defaultGatewayMCPURL) +} + +func newRetryStormExecutor(gatewayURL string, pool *pgxpool.Pool) scenarioCaseExecutor { + exec := &retryStormExecutor{ + gatewayMCPURL: gatewayURL, + pool: pool, + client: &http.Client{ + Timeout: caseRunnerHTTPTimeout, + }, + pollInterval: auditPollInterval, + pollTimeout: auditPollTimeout, + newTurnID: func() string { + return fmt.Sprintf("retry-storm-%d", time.Now().UnixNano()) + }, + } + exec.initialize = exec.defaultInitialize + exec.callGateway = exec.defaultCallGateway + exec.queryTrace = exec.defaultQueryTrace + return exec +} + +func (e *retryStormExecutor) Run(ctx context.Context) CaseResult { + result := CaseResult{Name: "retry-storm-budget"} + if e.newTurnID == nil { + e.newTurnID = func() string { + return fmt.Sprintf("retry-storm-%d", time.Now().UnixNano()) + } + } + if e.pollInterval == 0 { + e.pollInterval = auditPollInterval + } + if e.pollTimeout == 0 { + e.pollTimeout = auditPollTimeout + } + sessionID, err := e.initialize(ctx) + if err != nil { + result.Failures = []CheckFailure{{ + Check: "run", + Expected: "retry storm completes successfully", + Observed: err.Error(), + }} + return result + } + if strings.TrimSpace(sessionID) == "" { + result.Failures = []CheckFailure{{ + Check: "run", + Expected: "Mcp-Session-Id response header", + Observed: "(empty session id)", + }} + return result + } + + turnID := e.newTurnID() + budgetResponse := false + for i := 0; i < 6; i++ { + respBody, err := e.callGateway(ctx, sessionID, turnID) + if err != nil { + result.Failures = []CheckFailure{{ + Check: "run", + Expected: "gateway tool call succeeds", + Observed: err.Error(), + }} + return result + } + if strings.Contains(strings.ToLower(respBody), "budget") { + budgetResponse = true + break + } + } + + if !budgetResponse { + result.Failures = []CheckFailure{{ + Check: "policyOutcome", + Expected: "budgetExceeded", + Observed: "no budget limiter response after 6 calls", + }} + return result + } + + deadline := time.Now().Add(e.pollTimeout) + var trace []TraceRow + for { + trace, err = e.queryTrace(ctx, sessionID) + if err != nil { + result.Failures = []CheckFailure{{ + Check: "run", + Expected: "audit query succeeds", + Observed: err.Error(), + }} + return result + } + result.Trace = trace + if hasDecision(trace, "budgetExceeded") { + result.Passed = true + return result + } + if time.Now().After(deadline) { + result.Failures = []CheckFailure{{ + Check: "policyOutcome", + Expected: "budgetExceeded", + Observed: lastDecision(trace), + }} + return result + } + + select { + case <-ctx.Done(): + result.Failures = []CheckFailure{{ + Check: "run", + Expected: "context remains active", + Observed: ctx.Err().Error(), + }} + return result + case <-time.After(e.pollInterval): + } + } +} + +func hasDecision(trace []TraceRow, decision string) bool { + for _, row := range trace { + if row.Decision == decision { + return true + } + } + return false +} + +func lastDecision(trace []TraceRow) string { + if len(trace) == 0 { + return "(empty trace)" + } + return trace[len(trace)-1].Decision +} + +func (e *retryStormExecutor) defaultInitialize(ctx context.Context) (string, error) { + payload := map[string]any{ + "jsonrpc": "2.0", + "id": 0, + "method": "initialize", + "params": map[string]any{ + "protocolVersion": "2025-03-26", + "capabilities": map[string]any{}, + "clientInfo": map[string]any{ + "name": "retry-storm-ui", + "version": "1.0", + }, + }, + } + body, err := json.Marshal(payload) + if err != nil { + return "", fmt.Errorf("marshal initialize request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.gatewayMCPURL, bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("build initialize request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := e.client.Do(req) + if err != nil { + return "", err + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("initialize returned HTTP %d: %s", resp.StatusCode, firstBytes(resp.Body, 256)) + } + return resp.Header.Get("Mcp-Session-Id"), nil +} + +func (e *retryStormExecutor) defaultCallGateway(ctx context.Context, sessionID, turnID string) (string, error) { + payload := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": map[string]any{ + "name": "list_recent_charges", + "arguments": map[string]any{}, + }, + } + body, err := json.Marshal(payload) + if err != nil { + return "", fmt.Errorf("marshal tools/call request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.gatewayMCPURL, bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("build tools/call request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Mcp-Session-Id", sessionID) + req.Header.Set("X-Mcp-Turn-Id", turnID) + resp, err := e.client.Do(req) + if err != nil { + return "", err + } + defer func() { _ = resp.Body.Close() }() + return firstBytes(resp.Body, 4096), nil +} + +func (e *retryStormExecutor) defaultQueryTrace(ctx context.Context, sessionID string) ([]TraceRow, error) { + if e.pool == nil { + return nil, errors.New("postgres pool is nil") + } + rows, err := e.pool.Query( + ctx, + `SELECT tool_name, decision, arguments + FROM audit_log + WHERE session_id = $1 + ORDER BY decided_at ASC`, + sessionID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var trace []TraceRow + for rows.Next() { + var row TraceRow + if err := rows.Scan(&row.ToolName, &row.Decision, &row.Arguments); err != nil { + return nil, err + } + trace = append(trace, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + return trace, nil +} diff --git a/cmd/eval-runner/serve.go b/cmd/eval-runner/serve.go index 06a004a..ba8a0aa 100644 --- a/cmd/eval-runner/serve.go +++ b/cmd/eval-runner/serve.go @@ -85,10 +85,31 @@ func serve(suitePath string) error { }) http.HandleFunc("POST /run-eval/custom", makeCustomEvalHandler(pool)) + http.HandleFunc("POST /run-eval/custom/stream", makeCustomEvalStreamHandler(func(agentURL string) caseExecutor { + return NewCaseRunner(agentURL, pool) + })) + http.HandleFunc("POST /run-scenario/stream", makeScenarioStreamHandler(scenarioDeps{ + pool: pool, + defaultAgentURL: cfg.AgentURL, + defaultGatewayMCPURL: os.Getenv("GATEWAY_MCP_URL"), + newRunner: func(agentURL string) caseExecutor { + return NewCaseRunner(agentURL, pool) + }, + newRetryStorm: func(gatewayURL string) scenarioCaseExecutor { + return newRetryStormExecutor(gatewayURL, pool) + }, + })) http.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) + http.HandleFunc("GET /stack-health", makeStackHealthHandler(stackHealthDeps{pool: pool})) + + gatewayMCPURL := os.Getenv("GATEWAY_MCP_URL") + if gatewayMCPURL == "" { + gatewayMCPURL = defaultGatewayMCPURL + } + go warmGatewayCapCache(gatewayMCPURL) slog.Info("eval server listening", "port", port) return http.ListenAndServe(":"+port, nil) @@ -128,43 +149,18 @@ func makeEvalHandler(runner caseExecutor, suite *EvalSuite, _ *pgxpool.Pool) htt return func(w http.ResponseWriter, r *http.Request) { results := make([]CaseResult, 0, len(suite.Cases)) for _, testCase := range suite.Cases { - trace, err := runner.Run(r.Context(), testCase) - result := CaseResult{Name: testCase.Name} - if err != nil { - result.Failures = []CheckFailure{{ - Check: "run", - Expected: "case completes successfully", - Observed: err.Error(), - }} - } else { - result = Evaluate(testCase, trace) - } - results = append(results, result) - } - - passCount := 0 - for _, r := range results { - if r.Passed { - passCount++ - } + results = append(results, runEvalCase(r.Context(), runner, testCase)) } - report := GenerateReport(results) + resp := summarizeResults(results) if r.Header.Get("Accept") == "application/json" { - resp := evalResponse{ - Passed: passCount == len(results), - PassCount: passCount, - TotalCount: len(results), - Cases: results, - Report: report, - } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(resp) return } w.Header().Set("Content-Type", "text/plain") - _, _ = fmt.Fprint(w, report) + _, _ = fmt.Fprint(w, resp.Report) } } diff --git a/cmd/eval-runner/stack_health.go b/cmd/eval-runner/stack_health.go new file mode 100644 index 0000000..82b3559 --- /dev/null +++ b/cmd/eval-runner/stack_health.go @@ -0,0 +1,82 @@ +package main + +import ( + "context" + "encoding/json" + "net" + "net/http" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +const stackHealthProbeTimeout = 750 * time.Millisecond + +type stackHealthResponse struct { + Services []stackHealthService `json:"services"` +} + +type stackHealthService struct { + Name string `json:"name"` + Status string `json:"status"` + Detail string `json:"detail,omitempty"` +} + +type stackHealthDeps struct { + pool *pgxpool.Pool + httpClient *http.Client +} + +func makeStackHealthHandler(deps stackHealthDeps) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(stackHealthResponse{ + Services: []stackHealthService{ + probeHTTPService(deps.httpClient, "Gateway", "http://localhost:18080/mcp"), + probeTCPService("MCP", "127.0.0.1:18421"), + probeHTTPService(deps.httpClient, "Slack", "http://localhost:18090/healthz"), + probePostgresService(deps.pool), + }, + }) + } +} + +func probeHTTPService(client *http.Client, name, target string) stackHealthService { + if client == nil { + client = &http.Client{Timeout: stackHealthProbeTimeout} + } + req, err := http.NewRequest(http.MethodGet, target, nil) + if err != nil { + return stackHealthService{Name: name, Status: "unknown", Detail: err.Error()} + } + resp, err := client.Do(req) + if err != nil { + return stackHealthService{Name: name, Status: "down", Detail: target} + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return stackHealthService{Name: name, Status: "up", Detail: target} + } + return stackHealthService{Name: name, Status: "down", Detail: target} +} + +func probeTCPService(name, target string) stackHealthService { + conn, err := net.DialTimeout("tcp", target, stackHealthProbeTimeout) + if err != nil { + return stackHealthService{Name: name, Status: "down", Detail: target} + } + _ = conn.Close() + return stackHealthService{Name: name, Status: "up", Detail: target} +} + +func probePostgresService(pool *pgxpool.Pool) stackHealthService { + if pool == nil { + return stackHealthService{Name: "Postgres", Status: "unknown", Detail: "pool unavailable"} + } + ctx, cancel := context.WithTimeout(context.Background(), stackHealthProbeTimeout) + defer cancel() + if err := pool.Ping(ctx); err != nil { + return stackHealthService{Name: "Postgres", Status: "down", Detail: "configured DSN unreachable"} + } + return stackHealthService{Name: "Postgres", Status: "up", Detail: "configured DSN reachable"} +} From b17d015a3f1ac6b825da3b4e7ae3ccb8b7fc4489 Mon Sep 17 00:00:00 2001 From: henryqingmo Date: Thu, 28 May 2026 01:21:29 -0700 Subject: [PATCH 3/3] feat: eval runner operator UI with streaming scenario runner Co-Authored-By: Claude Sonnet 4.6 --- cmd/eval-runner/evaluator.go | 2 +- cmd/eval-runner/evaluator_test.go | 41 ++ cmd/eval-runner/execution.go | 36 ++ cmd/eval-runner/scenarios_test.go | 296 ++++++++++++ cmd/eval-runner/serve_test.go | 133 ++++++ cmd/eval-runner/stack_health_test.go | 26 + cmd/eval-runner/stream.go | 93 ++++ cmd/eval-runner/types.go | 9 +- cmd/eval-runner/ui.html | 690 ++++++++++++++++++++++----- 9 files changed, 1211 insertions(+), 115 deletions(-) create mode 100644 cmd/eval-runner/execution.go create mode 100644 cmd/eval-runner/scenarios_test.go create mode 100644 cmd/eval-runner/serve_test.go create mode 100644 cmd/eval-runner/stack_health_test.go create mode 100644 cmd/eval-runner/stream.go diff --git a/cmd/eval-runner/evaluator.go b/cmd/eval-runner/evaluator.go index 9702202..77cd914 100644 --- a/cmd/eval-runner/evaluator.go +++ b/cmd/eval-runner/evaluator.go @@ -3,7 +3,7 @@ package main import "strings" func Evaluate(c EvalCase, trace []TraceRow) CaseResult { - result := CaseResult{Name: c.Name} + result := CaseResult{Name: c.Name, Trace: trace} failures := make([]CheckFailure, 0) failures = append(failures, evaluateMustInclude(c.MustInclude, trace)...) diff --git a/cmd/eval-runner/evaluator_test.go b/cmd/eval-runner/evaluator_test.go index 371cc16..39ac362 100644 --- a/cmd/eval-runner/evaluator_test.go +++ b/cmd/eval-runner/evaluator_test.go @@ -25,12 +25,53 @@ func TestEvaluatePassesWhenAllChecksMatch(t *testing.T) { Name: "small-refund-allow", Passed: true, Failures: nil, + Trace: trace, } if !reflect.DeepEqual(got, want) { t.Fatalf("Evaluate() = %#v, want %#v", got, want) } } +func TestEvaluateIncludesTraceOnPass(t *testing.T) { + testCase := EvalCase{ + Name: "trace-pass", + MustInclude: []string{"lookup_customer"}, + PolicyOutcome: "allow", + } + trace := []TraceRow{ + {ToolName: "lookup_customer", Decision: "allow", Arguments: json.RawMessage(`{"customer":"abc"}`)}, + } + + got := Evaluate(testCase, trace) + + if !got.Passed { + t.Fatalf("Evaluate() Passed = false, want true; failures = %#v", got.Failures) + } + if !reflect.DeepEqual(got.Trace, trace) { + t.Fatalf("Trace = %#v, want %#v", got.Trace, trace) + } +} + +func TestEvaluateIncludesTraceOnFailure(t *testing.T) { + testCase := EvalCase{ + Name: "trace-fail", + MustInclude: []string{"create_refund"}, + PolicyOutcome: "allow", + } + trace := []TraceRow{ + {ToolName: "lookup_customer", Decision: "allow"}, + } + + got := Evaluate(testCase, trace) + + if got.Passed { + t.Fatal("Evaluate() Passed = true, want false") + } + if !reflect.DeepEqual(got.Trace, trace) { + t.Fatalf("Trace = %#v, want %#v", got.Trace, trace) + } +} + func TestEvaluateMustIncludeAllowsGaps(t *testing.T) { testCase := EvalCase{ Name: "gapped-subsequence", diff --git a/cmd/eval-runner/execution.go b/cmd/eval-runner/execution.go new file mode 100644 index 0000000..0dcbd77 --- /dev/null +++ b/cmd/eval-runner/execution.go @@ -0,0 +1,36 @@ +package main + +import "context" + +func runEvalCase(ctx context.Context, runner caseExecutor, testCase EvalCase) CaseResult { + trace, err := runner.Run(ctx, testCase) + if err != nil { + return CaseResult{ + Name: testCase.Name, + Failures: []CheckFailure{{ + Check: "run", + Expected: "case completes successfully", + Observed: err.Error(), + }}, + } + } + + return Evaluate(testCase, trace) +} + +func summarizeResults(results []CaseResult) evalResponse { + passCount := 0 + for _, result := range results { + if result.Passed { + passCount++ + } + } + + return evalResponse{ + Passed: passCount == len(results), + PassCount: passCount, + TotalCount: len(results), + Cases: results, + Report: GenerateReport(results), + } +} diff --git a/cmd/eval-runner/scenarios_test.go b/cmd/eval-runner/scenarios_test.go new file mode 100644 index 0000000..ebb4331 --- /dev/null +++ b/cmd/eval-runner/scenarios_test.go @@ -0,0 +1,296 @@ +package main + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestScenarioStreamRequiresAgentURLForYAMLScenario(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"mcp-crash"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{})(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", rec.Code) + } +} + +func TestScenarioStreamRetryStormRequiresGatewayURLOnly(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"retry-storm","gateway_mcp_url":"://bad"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{})(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", rec.Code) + } +} + +func TestScenarioStreamRetryStormUsesGatewayDefaultWithoutAgentURL(t *testing.T) { + ran := false + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"retry-storm"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{ + defaultGatewayMCPURL: "http://gateway.example/mcp", + newRetryStorm: func(gatewayURL string) scenarioCaseExecutor { + if gatewayURL != "http://gateway.example/mcp" { + t.Fatalf("gatewayURL = %q, want default", gatewayURL) + } + return scenarioCaseExecutorFunc(func(context.Context) CaseResult { + ran = true + return CaseResult{Name: "retry-storm-budget", Passed: true} + }) + }, + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } + if !ran { + t.Fatal("retry storm executor did not run") + } +} + +func TestScenarioStreamRetryStormRequestGatewayURLWins(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"retry-storm","gateway_mcp_url":"http://request.example/mcp"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{ + defaultGatewayMCPURL: "http://default.example/mcp", + newRetryStorm: func(gatewayURL string) scenarioCaseExecutor { + if gatewayURL != "http://request.example/mcp" { + t.Fatalf("gatewayURL = %q, want request override", gatewayURL) + } + return scenarioCaseExecutorFunc(func(context.Context) CaseResult { + return CaseResult{Name: "retry-storm-budget", Passed: true} + }) + }, + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } +} + +func TestScenarioStreamRetryStormUsesEnvGatewayURL(t *testing.T) { + t.Setenv("GATEWAY_MCP_URL", "http://env.example/mcp") + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"retry-storm"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{ + newRetryStorm: func(gatewayURL string) scenarioCaseExecutor { + if gatewayURL != "http://env.example/mcp" { + t.Fatalf("gatewayURL = %q, want env fallback", gatewayURL) + } + return scenarioCaseExecutorFunc(func(context.Context) CaseResult { + return CaseResult{Name: "retry-storm-budget", Passed: true} + }) + }, + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } +} + +func TestScenarioStreamRetryStormUsesHardcodedGatewayFallback(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"retry-storm"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{ + newRetryStorm: func(gatewayURL string) scenarioCaseExecutor { + if gatewayURL != "http://localhost:18080/mcp" { + t.Fatalf("gatewayURL = %q, want hardcoded fallback", gatewayURL) + } + return scenarioCaseExecutorFunc(func(context.Context) CaseResult { + return CaseResult{Name: "retry-storm-budget", Passed: true} + }) + }, + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } +} + +func TestScenarioStreamYAMLScenarioUsesDefaultAgentURL(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"mcp-crash"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{ + defaultAgentURL: "http://agent.example", + newRunner: func(agentURL string) caseExecutor { + if agentURL != "http://agent.example" { + t.Fatalf("agentURL = %q, want default", agentURL) + } + return serveStubRunner{ + traces: map[string][]TraceRow{"mcp-server-down": {{ToolName: "list_recent_charges", Decision: "upstream_error"}}}, + errs: map[string]error{}, + } + }, + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } +} + +func TestScenarioStreamRejectsUnknownScenario(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-scenario/stream", strings.NewReader(`{"scenario_id":"unknown","agent_url":"http://agent.example"}`)) + rec := httptest.NewRecorder() + + makeScenarioStreamHandler(scenarioDeps{})(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", rec.Code) + } +} + +func TestRetryStormExecutorPollsUntilBudgetExceeded(t *testing.T) { + attempts := 0 + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + callGateway: func(context.Context, string, string) (string, error) { + return `{"error":{"message":"budget exceeded"}}`, nil + }, + queryTrace: func(context.Context, string) ([]TraceRow, error) { + attempts++ + if attempts < 3 { + return []TraceRow{{ToolName: "list_recent_charges", Decision: "upstream_error"}}, nil + } + return []TraceRow{ + {ToolName: "list_recent_charges", Decision: "upstream_error"}, + {ToolName: "list_recent_charges", Decision: "budgetExceeded"}, + }, nil + }, + initialize: func(context.Context) (string, error) { return "session-1", nil }, + pollInterval: time.Millisecond, + pollTimeout: 50 * time.Millisecond, + } + + got := exec.Run(context.Background()) + + if !got.Passed { + t.Fatalf("Passed = false, want true; failures = %#v", got.Failures) + } + if attempts < 3 { + t.Fatalf("query attempts = %d, want polling", attempts) + } +} + +func TestRetryStormExecutorFailsWhenInitializeFails(t *testing.T) { + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + initialize: func(context.Context) (string, error) { + return "", errors.New("initialize failed") + }, + } + + got := exec.Run(context.Background()) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if len(got.Failures) == 0 || got.Failures[0].Check != "run" { + t.Fatalf("Failures = %#v, want run failure", got.Failures) + } +} + +func TestRetryStormExecutorFailsWhenInitializeReturnsNoSession(t *testing.T) { + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + initialize: func(context.Context) (string, error) { + return "", nil + }, + } + + got := exec.Run(context.Background()) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if len(got.Failures) == 0 || got.Failures[0].Check != "run" { + t.Fatalf("Failures = %#v, want run failure", got.Failures) + } +} + +func TestRetryStormExecutorFailsWhenAuditQueryFails(t *testing.T) { + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + initialize: func(context.Context) (string, error) { return "session-1", nil }, + callGateway: func(context.Context, string, string) (string, error) { + return `{"error":{"message":"budget exceeded"}}`, nil + }, + queryTrace: func(context.Context, string) ([]TraceRow, error) { + return nil, errors.New("select failed") + }, + pollInterval: time.Millisecond, + pollTimeout: 5 * time.Millisecond, + } + + got := exec.Run(context.Background()) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if len(got.Failures) == 0 || got.Failures[0].Check != "run" { + t.Fatalf("Failures = %#v, want run failure", got.Failures) + } +} + +func TestRetryStormExecutorFailsAfterSixNonBudgetResponses(t *testing.T) { + calls := 0 + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + initialize: func(context.Context) (string, error) { return "session-1", nil }, + callGateway: func(context.Context, string, string) (string, error) { + calls++ + return `{"error":{"message":"upstream unavailable"}}`, nil + }, + queryTrace: func(context.Context, string) ([]TraceRow, error) { + return []TraceRow{{ToolName: "list_recent_charges", Decision: "upstream_error"}}, nil + }, + pollInterval: time.Millisecond, + pollTimeout: 5 * time.Millisecond, + } + + got := exec.Run(context.Background()) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if calls != 6 { + t.Fatalf("calls = %d, want 6", calls) + } +} + +func TestRetryStormExecutorFailsWhenBudgetNeverAppears(t *testing.T) { + exec := retryStormExecutor{ + gatewayMCPURL: "http://gateway.example/mcp", + initialize: func(context.Context) (string, error) { return "session-1", nil }, + callGateway: func(context.Context, string, string) (string, error) { + return `{"error":{"message":"budget exceeded"}}`, nil + }, + queryTrace: func(context.Context, string) ([]TraceRow, error) { + return []TraceRow{{ToolName: "list_recent_charges", Decision: "upstream_error"}}, nil + }, + pollInterval: time.Millisecond, + pollTimeout: 5 * time.Millisecond, + } + + got := exec.Run(context.Background()) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if len(got.Failures) == 0 || got.Failures[0].Check != "policyOutcome" { + t.Fatalf("Failures = %#v, want policyOutcome failure", got.Failures) + } +} diff --git a/cmd/eval-runner/serve_test.go b/cmd/eval-runner/serve_test.go new file mode 100644 index 0000000..052dbc9 --- /dev/null +++ b/cmd/eval-runner/serve_test.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +type serveStubRunner struct { + traces map[string][]TraceRow + errs map[string]error +} + +func (s serveStubRunner) Run(_ context.Context, c EvalCase) ([]TraceRow, error) { + if err := s.errs[c.Name]; err != nil { + return nil, err + } + return s.traces[c.Name], nil +} + +func TestRunEvalCaseReturnsRunFailure(t *testing.T) { + runner := serveStubRunner{errs: map[string]error{"bad": errors.New("agent down")}} + + got := runEvalCase(context.Background(), runner, EvalCase{Name: "bad", Input: "x"}) + + if got.Passed { + t.Fatal("Passed = true, want false") + } + if len(got.Failures) != 1 || got.Failures[0].Check != "run" { + t.Fatalf("Failures = %#v, want run failure", got.Failures) + } + if got.Trace != nil { + t.Fatalf("Trace = %#v, want nil", got.Trace) + } +} + +func TestCustomEvalJSONIncludesTrace(t *testing.T) { + runnerTrace := []TraceRow{{ToolName: "lookup_customer", Decision: "allow"}} + runner := serveStubRunner{ + traces: map[string][]TraceRow{"lookup": runnerTrace}, + errs: map[string]error{}, + } + suite := &EvalSuite{Cases: []EvalCase{{ + Name: "lookup", + Input: "lookup", + MustInclude: []string{"lookup_customer"}, + PolicyOutcome: "allow", + }}} + req := httptest.NewRequest(http.MethodPost, "/run-eval", nil) + req.Header.Set("Accept", "application/json") + rec := httptest.NewRecorder() + + makeEvalHandler(runner, suite, nil)(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } + var body evalResponse + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("Decode(response): %v", err) + } + if len(body.Cases) != 1 || len(body.Cases[0].Trace) != 1 { + t.Fatalf("cases = %#v, want trace in response", body.Cases) + } + if body.Cases[0].Trace[0].ToolName != "lookup_customer" { + t.Fatalf("trace = %#v, want lookup_customer", body.Cases[0].Trace) + } +} + +func TestCustomEvalRejectsMissingAgentURL(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/run-eval/custom", strings.NewReader(`{"suite":"cases: []"}`)) + rec := httptest.NewRecorder() + + makeCustomEvalHandler(nil)(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", rec.Code) + } +} + +func TestCustomEvalStreamEmitsCaseEventsAndSummary(t *testing.T) { + body := `{"agent_url":"http://agent.example","suite":"cases:\n - name: lookup\n input: lookup\n mustInclude:\n - lookup_customer\n policyOutcome: allow\n"}` + req := httptest.NewRequest(http.MethodPost, "/run-eval/custom/stream", strings.NewReader(body)) + rec := httptest.NewRecorder() + + makeCustomEvalStreamHandler(func(agentURL string) caseExecutor { + if agentURL != "http://agent.example" { + t.Fatalf("agentURL = %q, want http://agent.example", agentURL) + } + return serveStubRunner{ + traces: map[string][]TraceRow{"lookup": {{ToolName: "lookup_customer", Decision: "allow"}}}, + errs: map[string]error{}, + } + })(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", rec.Code, rec.Body.String()) + } + got := rec.Body.String() + for _, want := range []string{"event: case_start", `"name":"lookup"`, "event: case_result", "event: summary"} { + if !strings.Contains(got, want) { + t.Fatalf("stream = %q, missing %q", got, want) + } + } + if strings.Index(got, "event: case_start") > strings.Index(got, "event: case_result") { + t.Fatalf("case_start should precede case_result: %q", got) + } +} + +func TestCustomEvalStreamContinuesAfterRunnerError(t *testing.T) { + body := `{"agent_url":"http://agent.example","suite":"cases:\n - name: bad\n input: bad\n policyOutcome: allow\n - name: good\n input: good\n mustInclude:\n - lookup_customer\n policyOutcome: allow\n"}` + req := httptest.NewRequest(http.MethodPost, "/run-eval/custom/stream", strings.NewReader(body)) + rec := httptest.NewRecorder() + + makeCustomEvalStreamHandler(func(string) caseExecutor { + return serveStubRunner{ + traces: map[string][]TraceRow{"good": {{ToolName: "lookup_customer", Decision: "allow"}}}, + errs: map[string]error{"bad": errors.New("agent down")}, + } + })(rec, req) + + got := rec.Body.String() + if strings.Count(got, "event: case_result") != 2 { + t.Fatalf("case_result count = %d, want 2 in %q", strings.Count(got, "event: case_result"), got) + } + if !strings.Contains(got, `"pass_count":1`) { + t.Fatalf("stream = %q, want one pass in summary", got) + } +} diff --git a/cmd/eval-runner/stack_health_test.go b/cmd/eval-runner/stack_health_test.go new file mode 100644 index 0000000..bfdd0ff --- /dev/null +++ b/cmd/eval-runner/stack_health_test.go @@ -0,0 +1,26 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" +) + +func TestStackHealthReturnsJSONWhenProbesFail(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/stack-health", nil) + rec := httptest.NewRecorder() + + makeStackHealthHandler(stackHealthDeps{})(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var body stackHealthResponse + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("Decode(response): %v", err) + } + if len(body.Services) == 0 { + t.Fatal("services is empty, want health rows") + } +} diff --git a/cmd/eval-runner/stream.go b/cmd/eval-runner/stream.go new file mode 100644 index 0000000..4f52d15 --- /dev/null +++ b/cmd/eval-runner/stream.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" +) + +type caseStartEvent struct { + Name string `json:"name"` + Index int `json:"index"` + Total int `json:"total"` +} + +type caseResultEvent struct { + Index int `json:"index"` + Total int `json:"total"` + Result CaseResult `json:"result"` +} + +type runnerFactory func(agentURL string) caseExecutor + +func writeSSE(w http.ResponseWriter, event string, payload any) error { + data, err := json.Marshal(payload) + if err != nil { + return err + } + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, data); err != nil { + return err + } + + flusher, ok := w.(http.Flusher) + if !ok { + return fmt.Errorf("streaming unsupported") + } + flusher.Flush() + return nil +} + +func streamEvalSuite(ctx context.Context, w http.ResponseWriter, runner caseExecutor, cases []EvalCase) { + results := make([]CaseResult, 0, len(cases)) + total := len(cases) + for index, testCase := range cases { + if err := writeSSE(w, "case_start", caseStartEvent{Name: testCase.Name, Index: index, Total: total}); err != nil { + return + } + result := runEvalCase(ctx, runner, testCase) + results = append(results, result) + if err := writeSSE(w, "case_result", caseResultEvent{Index: index, Total: total, Result: result}); err != nil { + return + } + } + _ = writeSSE(w, "summary", summarizeResults(results)) +} + +func makeCustomEvalStreamHandler(newRunner runnerFactory) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var body struct { + Suite string `json:"suite"` + AgentURL string `json:"agent_url"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) + return + } + if strings.TrimSpace(body.AgentURL) == "" { + http.Error(w, "missing agent_url", http.StatusBadRequest) + return + } + if strings.TrimSpace(body.Suite) == "" { + http.Error(w, "missing suite", http.StatusBadRequest) + return + } + + suite, err := LoadSuiteFromReader(strings.NewReader(body.Suite)) + if err != nil { + http.Error(w, fmt.Sprintf("invalid suite: %v", err), http.StatusBadRequest) + return + } + + if _, ok := w.(http.Flusher); !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + + streamEvalSuite(r.Context(), w, newRunner(body.AgentURL), suite.Cases) + } +} diff --git a/cmd/eval-runner/types.go b/cmd/eval-runner/types.go index 8782278..327b305 100644 --- a/cmd/eval-runner/types.go +++ b/cmd/eval-runner/types.go @@ -16,9 +16,9 @@ type EvalSuite struct { } type TraceRow struct { - ToolName string - Decision string - Arguments json.RawMessage + ToolName string `json:"tool_name"` + Decision string `json:"decision"` + Arguments json.RawMessage `json:"arguments,omitempty"` } type CheckFailure struct { @@ -30,5 +30,6 @@ type CheckFailure struct { type CaseResult struct { Name string `json:"name"` Passed bool `json:"passed"` - Failures []CheckFailure `json:"failures"` + Failures []CheckFailure `json:"failures,omitempty"` + Trace []TraceRow `json:"trace,omitempty"` } diff --git a/cmd/eval-runner/ui.html b/cmd/eval-runner/ui.html index 5361e7e..6bf1e7b 100644 --- a/cmd/eval-runner/ui.html +++ b/cmd/eval-runner/ui.html @@ -3,160 +3,630 @@ - ToolGate Eval Runner + ToolGate Resilience Operator - -
-

ToolGate Eval Runner

- -
- + +
+
- - -

Base URL of the agent to evaluate (must expose a /trigger endpoint).

+

Operator Surface

+

ToolGate Resilience Demo

+

+ Choose a prepared scenario, stream case results as they complete, and inspect the audit trail behind each verdict. +

- -
-
- - +
+
+
Elapsed
+
00:00
- +
- -
- +
+
+
+
+
+

Scenarios

+

Preset runs are server-owned. Custom YAML is a separate mode.

+
+ Preset +
+
+
- +
+
+
+ + +

Required for YAML-backed presets and Custom YAML mode.

+
+
+ + +

Used by the Retry Storm preset.

+
+
+ +
+
+
+

Stack Health

+

Expected outages should be visible before you run a scenario.

+
+ +
+
+
+ +
+ + +
+
+ +
+
+
+

Scenario Plan

+

Preset plans are read-only. Switch to Custom YAML to edit.

+
+ +
+ +
+
+ +
+
+
+
+

Results

+

Rows update as streamed case events arrive.

+
+
+
+
+ + +
+