diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a87607f..ffc9eb0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,6 +6,12 @@ on: push: branches: [main] +permissions: + contents: read + +env: + GO_VERSION: '1.23' + jobs: lint: runs-on: ubuntu-latest @@ -13,24 +19,98 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: ${{ env.GO_VERSION }} + cache: true + - name: golangci-lint + uses: golangci/golangci-lint-action@v7 + with: + version: v2.11 + args: --timeout=5m ./... + + fmt-check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + - name: gofmt + run: | + unformatted=$(gofmt -l .) + if [ -n "$unformatted" ]; then + echo "::error::These files are not gofmt-formatted:" + echo "$unformatted" + exit 1 + fi + + govet: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + - name: go vet + run: go vet ./... + + staticcheck: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + - name: Install staticcheck + run: go install honnef.co/go/tools/cmd/staticcheck@v0.6.1 + - name: staticcheck + run: staticcheck ./... + + tidy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} cache: true - - name: Lint - uses: golangci/golangci-lint-action@v6 + - name: Verify go mod tidy is clean + run: | + go mod tidy + git diff --exit-code -- go.mod go.sum + + test: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 with: - version: latest - args: --timeout=5m + go-version: ${{ env.GO_VERSION }} + cache: true + - name: Test + run: go test -race -count=1 ./... build: + needs: [lint, fmt-check, govet, staticcheck, tidy, test] runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: ${{ env.GO_VERSION }} cache: true - name: Build - run: go build -o pi-stream . + run: | + VERSION="${GITHUB_SHA::7}" + go build \ + -ldflags="-s -w -X github.com/crazy-goat/pi-stream/internal/cli.Version=${VERSION}" \ + -o pi-stream . - name: Upload binary uses: actions/upload-artifact@v4 with: diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..1a069c2 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,30 @@ +version: "2" + +run: + timeout: 5m + issues-exit-code: 1 + tests: true + +linters: + default: none + enable: + - govet + - staticcheck + - unused + - ineffassign + - errcheck + - dupl + settings: + staticcheck: + checks: + - "all" + - "-QF*" + - "-ST*" + +formatters: + enable: + - gofmt + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3a7ce70 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Piotr Hałas + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile index 9db8dba..85485d5 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,82 @@ -.PHONY: build clean install all +.PHONY: build install test clean lint lint-fix fmt vet staticcheck check tidy help -BINARY := pi-stream +BINARY := pi-stream +PKG := github.com/crazy-goat/pi-stream/internal/cli +VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo dev) +LDFLAGS := -s -w -X $(PKG).Version=$(VERSION) -all: build +GOLANGCI_LINT := $(shell command -v golangci-lint 2>/dev/null || echo "$$(go env GOPATH)/bin/golangci-lint") + +help: + @echo "Available targets:" + @echo " make build - Build the binary (with version embedded)" + @echo " make install - Build and copy binary to \$$HOME/.local/bin" + @echo " make test - Run tests with -race" + @echo " make lint - Run golangci-lint" + @echo " make lint-fix - Run golangci-lint with auto-fix" + @echo " make fmt - Format code with gofmt" + @echo " make vet - Run go vet" + @echo " make staticcheck - Run staticcheck" + @echo " make tidy - go mod tidy + diff check" + @echo " make check - Run fmt, vet, staticcheck, lint, test, build" + @echo " make clean - Remove built binary" build: - go build -o $(BINARY) . + go build -ldflags='$(LDFLAGS)' -o $(BINARY) . + +install: build + install -d $(HOME)/.local/bin + install -m 0755 $(BINARY) $(HOME)/.local/bin/$(BINARY) + @echo "Installed $(BINARY) to $(HOME)/.local/bin" + +test: + go test -race -count=1 ./... + +fmt: + @echo "Formatting code..." + @gofmt -w . + @echo "Done" + +vet: + @echo "Running go vet..." + @go vet ./... + @echo "Done" + +lint: + @echo "Running golangci-lint..." + @if [ -x "$(GOLANGCI_LINT)" ]; then \ + $(GOLANGCI_LINT) run ./...; \ + else \ + echo "golangci-lint not installed. Install with:"; \ + echo " curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $$(go env GOPATH)/bin"; \ + exit 1; \ + fi + +lint-fix: + @echo "Running golangci-lint with auto-fix..." + @if [ -x "$(GOLANGCI_LINT)" ]; then \ + $(GOLANGCI_LINT) run --fix ./...; \ + else \ + echo "golangci-lint not installed."; \ + exit 1; \ + fi + +staticcheck: + @echo "Running staticcheck..." + @if command -v staticcheck >/dev/null 2>&1; then \ + staticcheck ./...; \ + else \ + echo "staticcheck not installed. Install with:"; \ + echo " go install honnef.co/go/tools/cmd/staticcheck@latest"; \ + exit 1; \ + fi + +tidy: + go mod tidy + @git diff --exit-code -- go.mod go.sum || (echo "go.mod/go.sum changed; commit the tidy result"; exit 1) + +check: fmt vet staticcheck lint test build + @echo "All checks passed!" clean: rm -f $(BINARY) - -install: build - cp $(BINARY) ~/.local/bin/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..60a613d --- /dev/null +++ b/README.md @@ -0,0 +1,86 @@ +# pi-stream + +A small streaming proxy for [`pi`](https://github.com/) in RPC mode. It launches +`pi --mode rpc`, sends a single prompt over its JSON-RPC stdin, and renders the +event stream from pi's stdout as styled terminal output: + +- thinking tokens in dim italic +- assistant text plain +- tool calls (`🔧 name args`) when the model decides to invoke a tool +- tool execution (`⚡ name: cmd`) when the tool actually runs +- truncated tool result with `✓` / `✗` status + +## Requirements + +- Go 1.23+ to build +- A working `pi` binary on `$PATH` (this proxy spawns it) + +## Install + +```sh +make install # builds and copies pi-stream into ~/.local/bin +# or +go install github.com/crazy-goat/pi-stream@latest +``` + +## Usage + +```sh +pi-stream [flags] +``` + +### Flags + +| Flag | Default | Description | +| ------------ | ------- | ------------------------------------------------------------ | +| `--model` | (auto) | Model name forwarded to pi (e.g. `"GLM 5.1"`). | +| `--thinking` | `high` | Thinking level: `off`, `minimal`, `low`, `medium`, `high`, `xhigh`. | +| `-t` | (none) | Comma-separated tool allowlist (e.g. `bash,read`). | +| `--session` | (none) | pi session file path; share between invocations for context. | +| `--version` | | Print version and exit. | + +### Examples + +```sh +# Quick one-shot prompt +pi-stream --model "GLM 5.1" --thinking off "tell me a one-line joke" + +# Let the model use bash +pi-stream --model "GLM 5.1" -t bash "list files in this repo and summarize" + +# Reuse a session across multiple calls +pi-stream --session /tmp/sess "first message" +pi-stream --session /tmp/sess "follow-up that should remember the first" +``` + +### Exit codes + +| Code | Meaning | +| ---- | -------------------------------------------------- | +| 0 | Normal completion (`agent_end` received) | +| 1 | pi reported an error envelope, or startup failed | +| 2 | Invalid CLI flags / missing prompt | +| 130 | Interrupted by `SIGINT` or `SIGTERM` (Ctrl+C) | + +## Development + +```sh +make build # compile +make test # go test -race ./... +make lint # golangci-lint run +make tidy # go mod tidy + diff check +``` + +Layout: + +``` +main.go # ~10-line entrypoint +internal/event/ # typed event structs for pi's JSON stream +internal/render/ # state-machine Renderer that styles events +internal/pi/ # subprocess lifecycle (Start, Events, Close) +internal/cli/ # flag parsing + event-loop orchestration +``` + +## License + +MIT diff --git a/internal/cli/cli.go b/internal/cli/cli.go new file mode 100644 index 0000000..42e99f2 --- /dev/null +++ b/internal/cli/cli.go @@ -0,0 +1,161 @@ +// Package cli wires the CLI flag set, the pi subprocess, and the renderer +// together. It is the small "orchestrator" layer that main() defers to. +package cli + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "strings" + + "github.com/crazy-goat/pi-stream/internal/event" + "github.com/crazy-goat/pi-stream/internal/pi" + "github.com/crazy-goat/pi-stream/internal/render" +) + +// Version is the human-readable build version, populated via -ldflags +// "-X github.com/crazy-goat/pi-stream/internal/cli.Version=...". +var Version = "dev" + +// Exit codes returned by Run. +const ( + ExitOK = 0 + ExitError = 1 + ExitUsage = 2 + ExitInterrupt = 130 +) + +// Run parses args (excluding the program name), launches pi, and streams +// rendered output to stdout. Diagnostics go to stderr. The returned int +// is the process exit code. +func Run(ctx context.Context, args []string, stdout, stderr io.Writer) int { + fs := flag.NewFlagSet("pi-stream", flag.ContinueOnError) + fs.SetOutput(stderr) + var ( + model = fs.String("model", "", `Model to use (e.g. "GLM 5.1")`) + thinking = fs.String("thinking", "high", "Thinking level (off|minimal|low|medium|high|xhigh)") + tools = fs.String("t", "", "Comma-separated tool allowlist") + session = fs.String("session", "", "pi session file path (shared between steps for context)") + version = fs.Bool("version", false, "Print version and exit") + ) + fs.Usage = func() { + _, _ = fmt.Fprintln(stderr, "usage: pi-stream [flags] ") + _, _ = fmt.Fprintln(stderr) + _, _ = fmt.Fprintln(stderr, "Flags:") + fs.PrintDefaults() + } + if err := fs.Parse(args); err != nil { + if errors.Is(err, flag.ErrHelp) { + return ExitOK + } + return ExitUsage + } + + if *version { + _, _ = fmt.Fprintln(stdout, Version) + return ExitOK + } + + rest := fs.Args() + if len(rest) == 0 { + fs.Usage() + return ExitUsage + } + prompt := strings.Join(rest, " ") + + proc, err := pi.Start(ctx, pi.Options{ + Model: *model, + Thinking: *thinking, + Tools: *tools, + Session: *session, + }, prompt, stderr) + if err != nil { + _, _ = fmt.Fprintf(stderr, "pi-stream: %v\n", err) + return ExitError + } + + exit := streamEvents(ctx, proc, stdout, stderr) + _ = proc.Close() + return exit +} + +// streamEvents drains the process event channel until completion, an +// error envelope, or context cancellation. +func streamEvents(ctx context.Context, proc *pi.Process, stdout, stderr io.Writer) int { + r := render.New(stdout) + events := proc.Events() + for { + select { + case <-ctx.Done(): + _ = proc.Kill() + return ExitInterrupt + case line, ok := <-events: + if !ok { + return ExitOK + } + var env event.Envelope + if err := json.Unmarshal(line, &env); err != nil { + continue + } + done, code := handleEvent(r, env, stderr) + if done { + return code + } + } + } +} + +// handleEvent processes a single envelope. It returns (true, code) when +// the stream should terminate (agent_end, fatal response/error envelope). +func handleEvent(r *render.Renderer, env event.Envelope, stderr io.Writer) (bool, int) { + switch env.Type { + case "response": + if env.Success != nil && !*env.Success { + _, _ = fmt.Fprintf(stderr, "pi error: %s\n", env.Error) + return true, ExitError + } + case "message_update": + if env.AssistantMessageEvent != nil { + handleMessage(r, env.AssistantMessageEvent) + } + case "tool_execution_start": + r.ToolExecStart(env.ToolName, env.Args) + case "tool_execution_update": + // Streaming output; nothing to render today. + case "tool_execution_end": + r.ToolExecEnd(env.ToolName, env.IsError, env.Result.SummaryText()) + case "turn_start": + r.TurnStart() + case "turn_end": + r.TurnEnd() + case "agent_end": + r.AgentEnd() + return true, ExitOK + case "error": + msg := env.Error + if env.AssistantMessageEvent != nil && env.AssistantMessageEvent.Error != "" { + msg = env.AssistantMessageEvent.Error + } + _, _ = fmt.Fprintf(stderr, "pi error: %s\n", msg) + return true, ExitError + } + return false, ExitOK +} + +func handleMessage(r *render.Renderer, msg *event.AssistantMessageEvent) { + switch msg.Type { + case "thinking_delta", "thinks_delta": + r.Thinking(msg.Delta) + case "text_delta": + r.Text(msg.Delta) + case "toolcall_end": + if msg.ToolCall != nil { + r.ToolCall(msg.ToolCall.Name, msg.ToolCall.Arguments) + } + // thinking_start, text_start, toolcall_start, toolcall_delta: + // no-op — renderer handles section transitions on the deltas. + } +} diff --git a/internal/cli/cli_test.go b/internal/cli/cli_test.go new file mode 100644 index 0000000..eaebea4 --- /dev/null +++ b/internal/cli/cli_test.go @@ -0,0 +1,118 @@ +package cli + +import ( + "bytes" + "strings" + "testing" + + "github.com/crazy-goat/pi-stream/internal/event" + "github.com/crazy-goat/pi-stream/internal/render" +) + +func TestHandleEventAgentEndStopsAndSucceeds(t *testing.T) { + t.Parallel() + r := render.New(&bytes.Buffer{}) + done, code := handleEvent(r, event.Envelope{Type: "agent_end"}, &bytes.Buffer{}) + if !done || code != ExitOK { + t.Errorf("agent_end: done=%v code=%d, want true,%d", done, code, ExitOK) + } +} + +func TestHandleEventErrorEnvelopeStopsWithErr(t *testing.T) { + t.Parallel() + var stderr bytes.Buffer + done, code := handleEvent(render.New(&bytes.Buffer{}), event.Envelope{Type: "error", Error: "boom"}, &stderr) + if !done || code != ExitError { + t.Errorf("error: done=%v code=%d, want true,%d", done, code, ExitError) + } + if !strings.Contains(stderr.String(), "boom") { + t.Errorf("stderr missing message: %q", stderr.String()) + } +} + +func TestHandleEventResponseFailureStopsWithErr(t *testing.T) { + t.Parallel() + var stderr bytes.Buffer + ok := false + done, code := handleEvent( + render.New(&bytes.Buffer{}), + event.Envelope{Type: "response", Success: &ok, Error: "denied"}, + &stderr, + ) + if !done || code != ExitError { + t.Errorf("response fail: done=%v code=%d", done, code) + } + if !strings.Contains(stderr.String(), "denied") { + t.Errorf("stderr missing message: %q", stderr.String()) + } +} + +func TestHandleEventTextDeltaRenders(t *testing.T) { + t.Parallel() + var out bytes.Buffer + r := render.New(&out) + done, code := handleEvent(r, event.Envelope{ + Type: "message_update", + AssistantMessageEvent: &event.AssistantMessageEvent{ + Type: "text_delta", + Delta: "hello", + }, + }, &bytes.Buffer{}) + if done || code != ExitOK { + t.Errorf("text_delta should not stop stream") + } + if got := out.String(); !strings.Contains(got, "hello") { + t.Errorf("expected text rendered, got %q", got) + } +} + +func TestHandleEventToolExecEndRenders(t *testing.T) { + t.Parallel() + var out bytes.Buffer + r := render.New(&out) + _, _ = handleEvent(r, event.Envelope{ + Type: "tool_execution_end", + ToolName: "bash", + IsError: false, + Result: &event.Result{Content: []event.ResultContent{ + {Text: "ok"}, + }}, + }, &bytes.Buffer{}) + if got := out.String(); !strings.Contains(got, "✓ bash → ok") { + t.Errorf("expected ✓ summary, got %q", got) + } +} + +func TestHandleMessageThinkingDelta(t *testing.T) { + t.Parallel() + var out bytes.Buffer + r := render.New(&out) + handleMessage(r, &event.AssistantMessageEvent{Type: "thinking_delta", Delta: "hmm"}) + if !strings.Contains(out.String(), "hmm") { + t.Errorf("thinking_delta not rendered: %q", out.String()) + } +} + +func TestHandleMessageToolCallEndRendersName(t *testing.T) { + t.Parallel() + var out bytes.Buffer + r := render.New(&out) + handleMessage(r, &event.AssistantMessageEvent{ + Type: "toolcall_end", + ToolCall: &event.ToolCall{Name: "bash", Arguments: map[string]any{"command": "ls"}}, + }) + got := out.String() + if !strings.Contains(got, "🔧 bash") { + t.Errorf("expected 🔧 bash, got %q", got) + } +} + +func TestHandleMessageToolCallEndWithoutToolCallIsNoOp(t *testing.T) { + t.Parallel() + var out bytes.Buffer + r := render.New(&out) + handleMessage(r, &event.AssistantMessageEvent{Type: "toolcall_end", ToolCall: nil}) + if out.String() != "" { + t.Errorf("expected no output, got %q", out.String()) + } +} diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..05baec8 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,66 @@ +// Package event defines typed structures for the JSON event stream +// emitted by `pi --mode rpc` over stdout. +// +// Each line on stdout is a JSON object that fits the Envelope shape. The +// embedded fields are sparsely populated depending on the envelope Type. +package event + +// Envelope is the top-level JSON object produced by pi for every event. +// Most fields are optional and only populated for the matching Type. +type Envelope struct { + Type string `json:"type"` + + // Populated for type=="response". + Success *bool `json:"success,omitempty"` + Error string `json:"error,omitempty"` + + // Populated for type=="message_update". + AssistantMessageEvent *AssistantMessageEvent `json:"assistantMessageEvent,omitempty"` + + // Populated for type=="tool_execution_*". + ToolName string `json:"toolName,omitempty"` + Args map[string]any `json:"args,omitempty"` + IsError bool `json:"isError,omitempty"` + Result *Result `json:"result,omitempty"` +} + +// AssistantMessageEvent describes a single token-level event emitted by the +// model: a thinking chunk, a text chunk, or a tool-call lifecycle event. +type AssistantMessageEvent struct { + Type string `json:"type"` + Delta string `json:"delta,omitempty"` + ToolCall *ToolCall `json:"toolCall,omitempty"` + Error string `json:"error,omitempty"` +} + +// ToolCall represents a tool invocation the model has decided to make. +// Arguments are only present once the call is complete (toolcall_end). +type ToolCall struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Arguments map[string]any `json:"arguments,omitempty"` +} + +// Result wraps the response returned by a tool execution. +type Result struct { + Content []ResultContent `json:"content"` +} + +// ResultContent is a single content block inside a tool result. Only the +// text variant is consumed by the renderer today. +type ResultContent struct { + Text string `json:"text,omitempty"` +} + +// SummaryText concatenates all text content blocks into a single string. +// Non-text blocks (if pi ever adds them) are ignored. +func (r *Result) SummaryText() string { + if r == nil { + return "" + } + var b []byte + for _, c := range r.Content { + b = append(b, c.Text...) + } + return string(b) +} diff --git a/internal/event/event_test.go b/internal/event/event_test.go new file mode 100644 index 0000000..e7953f2 --- /dev/null +++ b/internal/event/event_test.go @@ -0,0 +1,104 @@ +package event + +import ( + "encoding/json" + "testing" +) + +func TestEnvelopeUnmarshalResponseSuccess(t *testing.T) { + t.Parallel() + raw := `{"type":"response","success":true}` + var env Envelope + if err := json.Unmarshal([]byte(raw), &env); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if env.Type != "response" { + t.Errorf("type = %q, want %q", env.Type, "response") + } + if env.Success == nil || !*env.Success { + t.Errorf("Success = %v, want true", env.Success) + } +} + +func TestEnvelopeUnmarshalResponseFailure(t *testing.T) { + t.Parallel() + raw := `{"type":"response","success":false,"error":"boom"}` + var env Envelope + if err := json.Unmarshal([]byte(raw), &env); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if env.Success == nil || *env.Success { + t.Errorf("Success = %v, want false", env.Success) + } + if env.Error != "boom" { + t.Errorf("Error = %q, want %q", env.Error, "boom") + } +} + +func TestEnvelopeUnmarshalTextDelta(t *testing.T) { + t.Parallel() + raw := `{"type":"message_update","assistantMessageEvent":{"type":"text_delta","delta":"hello"}}` + var env Envelope + if err := json.Unmarshal([]byte(raw), &env); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if env.AssistantMessageEvent == nil { + t.Fatal("AssistantMessageEvent is nil") + } + if env.AssistantMessageEvent.Type != "text_delta" { + t.Errorf("inner type = %q", env.AssistantMessageEvent.Type) + } + if env.AssistantMessageEvent.Delta != "hello" { + t.Errorf("delta = %q", env.AssistantMessageEvent.Delta) + } +} + +func TestEnvelopeUnmarshalToolCallEnd(t *testing.T) { + t.Parallel() + raw := `{"type":"message_update","assistantMessageEvent":{"type":"toolcall_end","toolCall":{"name":"bash","arguments":{"command":"ls"}}}}` + var env Envelope + if err := json.Unmarshal([]byte(raw), &env); err != nil { + t.Fatalf("unmarshal: %v", err) + } + tc := env.AssistantMessageEvent.ToolCall + if tc == nil { + t.Fatal("ToolCall is nil") + } + if tc.Name != "bash" { + t.Errorf("name = %q", tc.Name) + } + if got, _ := tc.Arguments["command"].(string); got != "ls" { + t.Errorf("command = %q", got) + } +} + +func TestEnvelopeUnmarshalToolExecutionEnd(t *testing.T) { + t.Parallel() + raw := `{"type":"tool_execution_end","toolName":"bash","isError":false,"result":{"content":[{"text":"hello"},{"text":" world"}]}}` + var env Envelope + if err := json.Unmarshal([]byte(raw), &env); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if env.ToolName != "bash" { + t.Errorf("toolName = %q", env.ToolName) + } + if got := env.Result.SummaryText(); got != "hello world" { + t.Errorf("SummaryText = %q", got) + } +} + +func TestResultSummaryTextNil(t *testing.T) { + t.Parallel() + var r *Result + if got := r.SummaryText(); got != "" { + t.Errorf("nil SummaryText = %q, want \"\"", got) + } +} + +func TestResultSummaryTextEmpty(t *testing.T) { + t.Parallel() + r := &Result{} + if got := r.SummaryText(); got != "" { + t.Errorf("empty SummaryText = %q", got) + } +} diff --git a/internal/pi/pi.go b/internal/pi/pi.go new file mode 100644 index 0000000..246d353 --- /dev/null +++ b/internal/pi/pi.go @@ -0,0 +1,149 @@ +// Package pi launches a `pi --mode rpc` subprocess, sends the user prompt, +// and exposes the resulting JSON event stream as a channel of raw lines. +// +// The subprocess inherits the parent's environment and is wired so that: +// - the prompt is written to stdin as a single JSON line and stdin is then +// held open until Close is called; +// - stdout is line-scanned and forwarded over the Events channel; +// - stderr is forwarded verbatim to the writer supplied to Start. +// +// Cancellation is driven by the context passed to Start: when the context +// is canceled the subprocess receives a SIGKILL (Go's default for +// exec.CommandContext). +package pi + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os/exec" +) + +// Buffer sizes for the stdout scanner. The initial buffer is small but the +// scanner will grow it on demand up to scanBufferMax before failing. +const ( + scanBufferInit = 64 * 1024 + scanBufferMax = 1024 * 1024 +) + +// Options configures a pi invocation. Every field is optional. +type Options struct { + // Model is forwarded as --model. Empty value omits the flag. + Model string + // Thinking is forwarded as --thinking. Empty value omits the flag. + Thinking string + // Tools is forwarded as -t (comma-separated allowlist). Empty value + // omits the flag. + Tools string + // Session is forwarded as --session. Empty value sends --no-session + // so that the subprocess is fully isolated by default. + Session string +} + +// BuildArgs returns the argv (excluding the binary name) for invoking pi +// with the given options. Exposed for testing. +func BuildArgs(opts Options) []string { + args := []string{"--mode", "rpc"} + if opts.Session != "" { + args = append(args, "--session", opts.Session) + } else { + args = append(args, "--no-session") + } + if opts.Model != "" { + args = append(args, "--model", opts.Model) + } + if opts.Thinking != "" { + args = append(args, "--thinking", opts.Thinking) + } + if opts.Tools != "" { + args = append(args, "-t", opts.Tools) + } + return args +} + +// Process is a running pi subprocess together with its event stream. +type Process struct { + cmd *exec.Cmd + stdin io.WriteCloser + stdout io.ReadCloser +} + +// Start launches `pi --mode rpc` with the given options, writes the prompt +// to its stdin, and returns a handle that can be used to consume events. +// stderr from the subprocess is copied to stderrOut in a background +// goroutine that exits when the subprocess closes its stderr. +func Start(ctx context.Context, opts Options, prompt string, stderrOut io.Writer) (*Process, error) { + cmd := exec.CommandContext(ctx, "pi", BuildArgs(opts)...) + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("stdin pipe: %w", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("stdout pipe: %w", err) + } + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start pi: %w", err) + } + + go func() { + _, _ = io.Copy(stderrOut, stderrPipe) + }() + + enc := json.NewEncoder(stdin) + if err := enc.Encode(map[string]any{ + "type": "prompt", + "message": prompt, + }); err != nil { + _ = cmd.Process.Kill() + return nil, fmt.Errorf("send prompt: %w", err) + } + + return &Process{cmd: cmd, stdin: stdin, stdout: stdout}, nil +} + +// Events returns a channel that emits each line of pi's stdout as a +// freshly-copied byte slice. The channel is closed when stdout reaches +// EOF (typically because pi exited or the context was canceled). +// +// The returned slices are owned by the caller and survive the next read. +func (p *Process) Events() <-chan []byte { + ch := make(chan []byte, 32) + go func() { + defer close(ch) + sc := bufio.NewScanner(p.stdout) + sc.Buffer(make([]byte, 0, scanBufferInit), scanBufferMax) + for sc.Scan() { + line := sc.Bytes() + cp := make([]byte, len(line)) + copy(cp, line) + ch <- cp + } + }() + return ch +} + +// Close closes the subprocess's stdin and waits for it to exit. The +// returned error is the wait error (e.g. non-zero exit) and may be nil +// even if the subprocess crashed if pi's own exit code happened to be 0. +func (p *Process) Close() error { + _ = p.stdin.Close() + return p.cmd.Wait() +} + +// Kill terminates the subprocess immediately. It is safe to call after +// Close (or if the process never started successfully). +func (p *Process) Kill() error { + if p.cmd == nil || p.cmd.Process == nil { + return nil + } + return p.cmd.Process.Kill() +} diff --git a/internal/pi/pi_test.go b/internal/pi/pi_test.go new file mode 100644 index 0000000..5df7b19 --- /dev/null +++ b/internal/pi/pi_test.go @@ -0,0 +1,53 @@ +package pi + +import ( + "reflect" + "testing" +) + +func TestBuildArgsDefaults(t *testing.T) { + t.Parallel() + got := BuildArgs(Options{}) + want := []string{"--mode", "rpc", "--no-session"} + if !reflect.DeepEqual(got, want) { + t.Errorf("BuildArgs(zero) = %v, want %v", got, want) + } +} + +func TestBuildArgsWithSession(t *testing.T) { + t.Parallel() + got := BuildArgs(Options{Session: "/tmp/s"}) + want := []string{"--mode", "rpc", "--session", "/tmp/s"} + if !reflect.DeepEqual(got, want) { + t.Errorf("BuildArgs(session) = %v, want %v", got, want) + } +} + +func TestBuildArgsFull(t *testing.T) { + t.Parallel() + got := BuildArgs(Options{ + Model: "GLM 5.1", + Thinking: "high", + Tools: "bash,read", + Session: "/tmp/s", + }) + want := []string{ + "--mode", "rpc", + "--session", "/tmp/s", + "--model", "GLM 5.1", + "--thinking", "high", + "-t", "bash,read", + } + if !reflect.DeepEqual(got, want) { + t.Errorf("BuildArgs(full) =\n %v\nwant\n %v", got, want) + } +} + +func TestBuildArgsOmitsEmptyFields(t *testing.T) { + t.Parallel() + got := BuildArgs(Options{Model: "GLM 5.1"}) + want := []string{"--mode", "rpc", "--no-session", "--model", "GLM 5.1"} + if !reflect.DeepEqual(got, want) { + t.Errorf("BuildArgs(model only) = %v, want %v", got, want) + } +} diff --git a/internal/render/ansi.go b/internal/render/ansi.go new file mode 100644 index 0000000..14c81dd --- /dev/null +++ b/internal/render/ansi.go @@ -0,0 +1,10 @@ +package render + +// ANSI escape sequences used to style the streaming output. +const ( + ansiReset = "\033[0m" + ansiDim = "\033[2m" + ansiDimItalic = "\033[2;3m" + ansiBoldBlue = "\033[1;34m" + ansiBoldYellow = "\033[1;33m" +) diff --git a/internal/render/render.go b/internal/render/render.go new file mode 100644 index 0000000..2d2d0a6 --- /dev/null +++ b/internal/render/render.go @@ -0,0 +1,170 @@ +// Package render formats pi RPC events as styled terminal output. +// +// A Renderer is a small state machine that tracks the current "section" of +// output (thinking, text, tool call, ...) so that section boundaries are +// always preceded by a newline, regardless of whether the previous chunk +// already ended on one. +package render + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "strings" +) + +// summaryMaxLen caps the rendered tool-result summary so that a chatty +// command doesn't flood the screen. +const summaryMaxLen = 200 + +// State describes which kind of content the renderer most recently emitted. +type State int + +// Renderer state values. +const ( + StateIdle State = iota // no output yet + StateThink // thinking tokens + StateText // assistant text + StateTool // tool call / tool execution lines +) + +// Renderer streams styled events to an io.Writer while tracking enough +// state to insert section boundaries cleanly. +// +// A Renderer is not safe for concurrent use. +type Renderer struct { + out io.Writer + state State + atLineStart bool +} + +// New returns a Renderer that writes to out. The renderer assumes it starts +// at the beginning of a fresh line. +func New(out io.Writer) *Renderer { + return &Renderer{out: out, atLineStart: true} +} + +// Thinking emits a thinking delta in dim italic. Consecutive deltas stay on +// the same line; a transition from any other state inserts a newline first. +func (r *Renderer) Thinking(delta string) { + if r.state != StateThink { + r.ensureNewline() + } + r.state = StateThink + if delta == "" { + return + } + r.printf("%s%s%s", ansiDimItalic, delta, ansiReset) + r.atLineStart = strings.HasSuffix(delta, "\n") +} + +// Text emits an assistant text delta. A pending thinking section is +// flushed onto its own line first. +func (r *Renderer) Text(delta string) { + if r.state == StateThink { + r.ensureNewline() + } + r.state = StateText + r.write(delta) +} + +// ToolCall renders the "🔧 name args" line emitted when the model finishes +// describing a tool call. +func (r *Renderer) ToolCall(name string, args map[string]any) { + r.ensureNewline() + if len(args) > 0 { + r.printf("%s🔧 %s %s%s\n", ansiBoldBlue, name, marshalJSON(args), ansiReset) + } else { + r.printf("%s🔧 %s%s\n", ansiBoldBlue, name, ansiReset) + } + r.atLineStart = true + r.state = StateTool +} + +// ToolExecStart renders the "⚡ name: cmd" header for a tool that has begun +// running. For bash-like tools the literal command is rendered; for others +// the full args object is dumped as JSON. +func (r *Renderer) ToolExecStart(name string, args map[string]any) { + r.ensureNewline() + if cmd, ok := args["command"].(string); ok { + r.printf("%s⚡ %s: %s%s\n", ansiBoldYellow, name, cmd, ansiReset) + } else { + r.printf("%s⚡ %s %s%s\n", ansiBoldYellow, name, marshalJSON(args), ansiReset) + } + r.state = StateTool + r.atLineStart = true +} + +// ToolExecEnd renders the truncated summary line for a finished tool +// execution. isErr controls the ✓/✗ marker. +func (r *Renderer) ToolExecEnd(name string, isErr bool, summary string) { + r.ensureNewline() + summary = truncate(strings.TrimSpace(summary), summaryMaxLen) + status := "✓" + if isErr { + status = "✗" + } + r.printf("%s %s %s → %s%s\n", ansiDim, status, name, summary, ansiReset) + r.atLineStart = true + r.state = StateTool +} + +// TurnStart inserts a newline if the previous section left the cursor mid-line. +func (r *Renderer) TurnStart() { + if r.state != StateIdle { + r.ensureNewline() + } +} + +// TurnEnd ensures the turn ends on a fresh line. +func (r *Renderer) TurnEnd() { + r.ensureNewline() +} + +// AgentEnd ensures the overall stream ends on a fresh line. +func (r *Renderer) AgentEnd() { + r.ensureNewline() +} + +// State returns the current renderer state. Intended for tests. +func (r *Renderer) State() State { + return r.state +} + +func (r *Renderer) ensureNewline() { + if !r.atLineStart { + r.write("\n") + } +} + +func (r *Renderer) write(s string) { + if s == "" { + return + } + _, _ = fmt.Fprint(r.out, s) + r.atLineStart = strings.HasSuffix(s, "\n") +} + +// printf is a write-only formatted helper that drops the io error. We only +// ever write to a tty / pipe; if that fails the OS will deliver SIGPIPE. +func (r *Renderer) printf(format string, args ...any) { + _, _ = fmt.Fprintf(r.out, format, args...) +} + +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +// marshalJSON renders v as compact JSON without Go's default HTML escaping, +// so &, <, > survive intact in tool argument blobs. +func marshalJSON(v any) string { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetEscapeHTML(false) + _ = enc.Encode(v) + return strings.TrimRight(buf.String(), "\n") +} diff --git a/internal/render/render_test.go b/internal/render/render_test.go new file mode 100644 index 0000000..3bc92bc --- /dev/null +++ b/internal/render/render_test.go @@ -0,0 +1,217 @@ +package render + +import ( + "bytes" + "strings" + "testing" +) + +func TestTextStreamsConcatenated(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.Text("hello") + r.Text(" world") + if got, want := buf.String(), "hello world"; got != want { + t.Errorf("output = %q, want %q", got, want) + } +} + +func TestThinkingDimItalic(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.Thinking("hmm") + got := buf.String() + if !strings.Contains(got, ansiDimItalic) || !strings.Contains(got, ansiReset) { + t.Errorf("missing ANSI codes: %q", got) + } + if !strings.Contains(got, "hmm") { + t.Errorf("missing payload: %q", got) + } +} + +func TestThinkingToTextInsertsNewline(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.Thinking("hmm") + r.Text("answer") + out := buf.String() + idxThink := strings.Index(out, "hmm") + idxText := strings.Index(out, "answer") + if idxThink < 0 || idxText < 0 || idxThink > idxText { + t.Fatalf("unexpected ordering in %q", out) + } + between := out[idxThink+len("hmm") : idxText] + if !strings.Contains(between, "\n") { + t.Errorf("expected newline between sections, got %q", between) + } +} + +func TestToolCallWithArgs(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolCall("bash", map[string]any{"command": "ls -la"}) + got := buf.String() + if !strings.Contains(got, "🔧 bash") { + t.Errorf("missing prefix: %q", got) + } + if !strings.Contains(got, `"command":"ls -la"`) { + t.Errorf("missing args JSON: %q", got) + } + if !strings.HasSuffix(got, "\n") { + t.Errorf("output should end with newline: %q", got) + } +} + +func TestToolCallNoArgs(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolCall("ping", nil) + got := buf.String() + if !strings.Contains(got, "🔧 ping") { + t.Errorf("missing prefix: %q", got) + } + if strings.Contains(got, "{}") { + t.Errorf("should not render empty args object: %q", got) + } +} + +func TestToolExecStartBash(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolExecStart("bash", map[string]any{"command": "echo hi"}) + got := buf.String() + if !strings.Contains(got, "⚡ bash: echo hi") { + t.Errorf("expected ⚡ bash: echo hi, got %q", got) + } + if strings.Contains(got, `"command"`) { + t.Errorf("bash should render literal command, not JSON: %q", got) + } +} + +func TestToolExecStartNonBash(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolExecStart("http", map[string]any{"url": "https://example.com"}) + got := buf.String() + if !strings.Contains(got, "⚡ http ") { + t.Errorf("expected ⚡ http prefix, got %q", got) + } + if !strings.Contains(got, `"url":"https://example.com"`) { + t.Errorf("expected JSON args, got %q", got) + } +} + +func TestToolExecEndSuccess(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolExecEnd("bash", false, " hello ") + got := buf.String() + if !strings.Contains(got, "✓ bash → hello") { + t.Errorf("expected ✓ marker and trimmed summary, got %q", got) + } +} + +func TestToolExecEndError(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolExecEnd("bash", true, "boom") + got := buf.String() + if !strings.Contains(got, "✗ bash → boom") { + t.Errorf("expected ✗ marker, got %q", got) + } +} + +func TestToolExecEndTruncates(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + long := strings.Repeat("a", 500) + r.ToolExecEnd("bash", false, long) + got := buf.String() + if !strings.Contains(got, "...") { + t.Errorf("expected truncation marker, got %q", got) + } + if strings.Count(got, "a") > summaryMaxLen+10 { + t.Errorf("summary not truncated; len(a)=%d in %q", strings.Count(got, "a"), got) + } +} + +func TestMarshalJSONNoHTMLEscape(t *testing.T) { + t.Parallel() + // With HTML escaping disabled, raw &, <, > survive verbatim. + // If escaping were enabled they would become &, <, > + // and "&&" / "" would not appear as substrings. + got := marshalJSON(map[string]any{"cmd": "a && b "}) + for _, want := range []string{"&&", ""} { + if !strings.Contains(got, want) { + t.Errorf("expected raw %q in %q", want, got) + } + } +} + +func TestEnsureNewlineMidLine(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.Text("hello") // mid-line + r.ToolCall("ping", nil) // must inject \n first + got := buf.String() + if !strings.HasPrefix(got, "hello\n") { + t.Errorf("expected newline injected before tool call, got %q", got) + } +} + +func TestTextAfterToolCallNoExtraNewline(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.ToolCall("ping", nil) // ends with \n + r.Text("done") + got := buf.String() + if strings.Contains(got, "\n\n") { + t.Errorf("unexpected blank line: %q", got) + } +} + +func TestTurnAndAgentEndOnFreshLine(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + r := New(&buf) + r.Text("mid") + r.TurnEnd() + if !strings.HasSuffix(buf.String(), "\n") { + t.Errorf("TurnEnd should end on newline: %q", buf.String()) + } + r.Text("more") + r.AgentEnd() + if !strings.HasSuffix(buf.String(), "\n") { + t.Errorf("AgentEnd should end on newline: %q", buf.String()) + } +} + +func TestTruncate(t *testing.T) { + t.Parallel() + cases := []struct { + in string + maxLen int + want string + }{ + {"abc", 10, "abc"}, + {"abcdef", 3, "abc..."}, + {"", 5, ""}, + } + for _, c := range cases { + if got := truncate(c.in, c.maxLen); got != c.want { + t.Errorf("truncate(%q, %d) = %q, want %q", c.in, c.maxLen, got, c.want) + } + } +} diff --git a/main.go b/main.go index d9b7169..0cfe8fa 100644 --- a/main.go +++ b/main.go @@ -1,283 +1,24 @@ +// Command pi-stream is a streaming proxy for `pi --mode rpc`. It launches +// pi as a subprocess, forwards a single user prompt over its JSON-RPC +// stdin, and renders the event stream from pi's stdout as styled +// terminal output (thinking in dim italic, tool calls, tool execution +// results, plain text). +// +// All real work lives under internal/. This file only wires up signal +// handling and process exit. package main import ( - "bufio" - "bytes" - "encoding/json" - "flag" - "fmt" - "io" - "log" + "context" "os" - "os/exec" - "strings" -) - -type state int + "os/signal" + "syscall" -const ( - idle state = iota // no output yet - think // printing thinking (dim/italic) - text // printing normal text - tool // printing tool call/execution + "github.com/crazy-goat/pi-stream/internal/cli" ) -func (s state) String() string { - switch s { - case idle: - return "idle" - case think: - return "think" - case text: - return "text" - case tool: - return "tool" - } - return "?" -} - func main() { - model := flag.String("model", "", "Model to use (e.g. go-extra/deepseek-v4-flash)") - thinking := flag.String("thinking", "high", "Thinking level") - tools := flag.String("t", "", "Comma-separated tool allowlist") - session := flag.String("session", "", "pi session file path (shared between steps for context)") - flag.Parse() - - args := flag.Args() - if len(args) == 0 { - fmt.Fprintln(os.Stderr, "usage: pi-stream [--model m] [--thinking l] [-t tools] ") - os.Exit(1) - } - prompt := strings.Join(args, " ") - - piArgs := []string{"--mode", "rpc"} - if *session != "" { - piArgs = append(piArgs, "--session", *session) - } else { - piArgs = append(piArgs, "--no-session") - } - if *model != "" { - piArgs = append(piArgs, "--model", *model) - } - if *thinking != "" { - piArgs = append(piArgs, "--thinking", *thinking) - } - if *tools != "" { - piArgs = append(piArgs, "-t", *tools) - } - - cmd := exec.Command("pi", piArgs...) - stdin, err := cmd.StdinPipe() - if err != nil { - log.Fatalf("stdin pipe: %v", err) - } - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Fatalf("stdout pipe: %v", err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - log.Fatalf("stderr pipe: %v", err) - } - - if err := cmd.Start(); err != nil { - log.Fatalf("start pi: %v", err) - } - - go func() { - _, _ = io.Copy(os.Stderr, stderr) - }() - - // Send prompt - enc := json.NewEncoder(stdin) - if err := enc.Encode(map[string]interface{}{ - "type": "prompt", - "message": prompt, - }); err != nil { - log.Fatalf("send prompt: %v", err) - } - - scanner := bufio.NewScanner(stdout) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) - - st := idle - bol := true // at beginning of line - - // Print s to stdout, track line position - emit := func(s string) { - fmt.Print(s) - bol = strings.HasSuffix(s, "\n") - } - - // Ensure we start on a new line if we're mid-line - ensureNewline := func() { - if !bol { - emit("\n") - } - } - - // JSON without HTML-escaping (&, <, > stay as-is) - marshalJSON := func(v interface{}) string { - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - enc.SetEscapeHTML(false) - _ = enc.Encode(v) - return strings.TrimRight(buf.String(), "\n") - } - - for scanner.Scan() { - line := scanner.Text() - var event map[string]interface{} - if err := json.Unmarshal([]byte(line), &event); err != nil { - continue - } - - typ, _ := event["type"].(string) - switch typ { - case "response": - success, _ := event["success"].(bool) - if !success { - errMsg, _ := event["error"].(string) - fmt.Fprintf(os.Stderr, "pi error: %s\n", errMsg) - _ = cmd.Process.Kill() - os.Exit(1) - } - - case "message_update": - msgEvent, ok := event["assistantMessageEvent"].(map[string]interface{}) - if !ok { - continue - } - dt, _ := msgEvent["type"].(string) - - switch dt { - // ── Thinking ── - case "thinking_start", "think_start": - if st != idle && st != think { - ensureNewline() - } - st = think - - case "thinking_delta", "thinks_delta": - delta, _ := msgEvent["delta"].(string) - if st != think { - ensureNewline() - } - st = think - fmt.Printf("\033[2;3m%s\033[0m", delta) - bol = false - - // ── Text ── - case "text_start": - if st == think { - ensureNewline() - } - st = text - - case "text_delta": - delta, _ := msgEvent["delta"].(string) - if st == think { - ensureNewline() - } - st = text - emit(delta) - - // ── Tool call (LLM decides to use a tool) ── - case "toolcall_start": - ensureNewline() - st = tool - bol = true - - case "toolcall_delta": - // skip partial args - - case "toolcall_end": - tc, _ := msgEvent["toolCall"].(map[string]interface{}) - name, _ := tc["name"].(string) - args, _ := tc["arguments"].(map[string]interface{}) - if len(args) > 0 { - fmt.Printf("\033[1;34m🔧 %s %s\033[0m\n", name, marshalJSON(args)) - } else { - fmt.Printf("\033[1;34m🔧 %s\033[0m\n", name) - } - bol = true - st = tool - } - - // ── Tool execution (actual running of the tool) ── - case "tool_execution_start": - toolName, _ := event["toolName"].(string) - args, _ := event["args"].(map[string]interface{}) - if cmd, ok := args["command"].(string); ok { - fmt.Printf("\033[1;33m⚡ %s: %s\033[0m\n", toolName, cmd) - } else { - fmt.Printf("\033[1;33m⚡ %s %s\033[0m\n", toolName, marshalJSON(args)) - } - st = tool - bol = true - - case "tool_execution_update": - // streaming output, could show but skip for brevity - - case "tool_execution_end": - toolName, _ := event["toolName"].(string) - isErr, _ := event["isError"].(bool) - result, _ := event["result"].(map[string]interface{}) - content, _ := result["content"].([]interface{}) - var summary string - for _, c := range content { - if m, ok := c.(map[string]interface{}); ok { - if text, ok := m["text"].(string); ok { - summary += text - } - } - } - summary = strings.TrimSpace(summary) - if len(summary) > 200 { - summary = summary[:200] + "..." - } - status := "✓" - if isErr { - status = "✗" - } - fmt.Printf("\033[2m %s %s → %s\033[0m\n", status, toolName, summary) - bol = true - - // ── Turn boundaries ── - case "turn_start": - if st != idle { - ensureNewline() - } - case "turn_end": - ensureNewline() - - // ── Completion ── - case "agent_end": - ensureNewline() - stdin.Close() - _ = cmd.Wait() - os.Exit(0) - - case "error": - errMsg := "" - if msgEvent, ok := event["assistantMessageEvent"].(map[string]interface{}); ok { - errMsg, _ = msgEvent["error"].(string) - } - if errMsg == "" { - errMsg, _ = event["error"].(string) - } - fmt.Fprintf(os.Stderr, "pi error: %s\n", errMsg) - _ = cmd.Process.Kill() - os.Exit(1) - } - } - - if err := scanner.Err(); err != nil { - fmt.Fprintf(os.Stderr, "read error: %v\n", err) - os.Exit(1) - } - - stdin.Close() - _ = cmd.Wait() - os.Exit(0) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + os.Exit(cli.Run(ctx, os.Args[1:], os.Stdout, os.Stderr)) }