diff --git a/README.md b/README.md index 6854dca..3f6451a 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ CLI for [Arc](https://github.com/Basekick-Labs/arc) — operator-facing client for Arc time-series databases. -> **Status:** v0.1.0-dev (PR1 scaffold). Manages connection profiles. `query` / `write` / `import` / `db` / `auth` / `cluster` subcommands ship in follow-up PRs. +> **Status:** v0.2.0-dev (PR2). Manages connection profiles, runs SQL queries, and writes line protocol with table / JSON / CSV / Arrow IPC output. `import` / `db` / `auth` / `cluster` subcommands ship in follow-up PRs. ## Why -Today operating Arc means hand-crafting `curl` calls: copying the bootstrap token from a stderr banner, building JSON query bodies, remembering header names like `x-arc-database`, and decoding column-major responses by eye. `arcctl` replaces that with a familiar CLI workflow modeled on `influx`, `kubectl`, and `clickhouse-client`. +Today operating Arc means hand-crafting `curl` calls: copying the bootstrap token from a stderr banner, building JSON query bodies, remembering header names like `x-arc-database`, and decoding `{"columns":[...],"data":[...]}` responses by eye. `arcctl` replaces that with a familiar CLI workflow modeled on `influx`, `kubectl`, and `clickhouse-client`. ## Install @@ -79,18 +79,71 @@ If none are set, commands fail with a clear "no active connection" error. Honors `ARCCTL_CONFIG` env var for test/CI overrides; otherwise `~/.arcctl/config.toml`. +## Querying + +```bash +# Pretty table (default) +arcctl query "SELECT host, value FROM cpu ORDER BY value LIMIT 10" + +# Override the database for one call +arcctl query --database metrics "SELECT count(*) FROM cpu" + +# Read SQL from a file +arcctl query -f reports/p99.sql + +# Pipe SQL from another command +echo "SELECT 1" | arcctl query + +# Machine-parseable output +arcctl query "SELECT * FROM cpu" -o json | jq '.data[0]' +arcctl query "SELECT * FROM cpu" -o csv > out.csv + +# Arrow IPC stream — feed it to pyarrow / duckdb / polars +arcctl query "SELECT * FROM cpu" -o arrow | duckdb -c "SELECT * FROM read_arrow('/dev/stdin')" +``` + +The output formats: + +- `-o table` (default) — pretty-printed bordered table; honors `--no-header` and `--limit N` +- `-o json` — the raw `{"columns":[...],"data":[...]}` response, jq-friendly +- `-o csv` — RFC 4180 with a header row by default +- `-o arrow` — binary Arrow IPC stream on stdout; server-side execution time goes to stderr + +## Writing + +```bash +# Stdin pipe (most common in CI / log forwarders) +echo "cpu,host=server-1 value=42.5 $(date +%s)000000000" | arcctl write + +# From a file +arcctl write -f payload.lp --database metrics --precision ms + +# Explicit precision (default is nanoseconds, matching the server) +echo "cpu v=1 1700000000" | arcctl write --precision s +``` + +`--precision` accepts `ns`, `us`, `ms`, or `s` (anything else is rejected client-side before the request goes out). The body is streamed end-to-end — `cat huge.lp | arcctl write` never buffers the whole payload in memory. + +## TLS + +For HTTPS endpoints, certificate verification is on by default. To skip verification (lab / self-signed certs only), use either: + +- `--insecure` on a single command, or +- `insecure_tls = true` in the connection profile (set once via `arcctl config create --insecure`) + +When verification is skipped, a `WARNING:` line is printed to stderr. The flag is a no-op on `http://` endpoints and the warning is suppressed. + ## Roadmap This repo is being built in [phased PRs](https://github.com/Basekick-Labs/arcctl/pulls): -- **PR1** (this) — scaffold, `config` subcommand tree, multi-connection store -- **PR2** — `arcctl query`, `arcctl write` +- ~~**PR1** — scaffold, `config` subcommand tree, multi-connection store~~ ✅ shipped +- ~~**PR2** — `arcctl query`, `arcctl write`, output formats: table/json/csv/arrow~~ ✅ shipped - **PR3** — `arcctl db {list,create,drop,show}`, `arcctl measurement list` - **PR4** — `arcctl import {csv,lp,parquet,msgpack}` - **PR5** — `arcctl auth {token,whoami}` - **PR6** — `arcctl cluster {status,nodes}`, `arcctl compaction`, `arcctl retention` -- **PR7** — `-o csv` and `-o arrow` output formats -- **PR8** — release workflow + Homebrew tap + multi-arch Docker, cut v1.0.0 +- **PR7** — release workflow + Homebrew tap + multi-arch Docker, cut v1.0.0 Target: arcctl 1.x speaks to Arc 26.06+. diff --git a/internal/client/arrow.go b/internal/client/arrow.go new file mode 100644 index 0000000..6f56bb7 --- /dev/null +++ b/internal/client/arrow.go @@ -0,0 +1,101 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" +) + +// ArrowResponse wraps the raw Arrow IPC stream from /api/v1/query/arrow. +// +// Wire contract (verified against arc/internal/api/query_arrow.go): +// - On error: HTTP non-2xx + JSON `{"success": false, "error": "..."}`. +// - On success: HTTP 200 + `Content-Type: application/vnd.apache.arrow.stream`, +// body is a streaming Arrow IPC payload. Server-side execution time +// is emitted as the `Arc-Execution-Time-Ms` HTTP trailer, available +// only after the body has been read to EOF. +// +// The caller is responsible for Close()-ing this response (which closes +// the underlying HTTP body); call ExecutionTimeMs() only after reading +// Body to EOF. +type ArrowResponse struct { + // Body is the Arrow IPC stream. Caller reads/copies as needed. + Body io.ReadCloser + + // resp is held so we can read trailers after the body's drained. + resp *http.Response +} + +// QueryArrow runs a SQL query and returns the response wrapping the +// Arrow IPC stream. The caller MUST Close() the returned response. +// +// On a 4xx/5xx response, the error is decoded from the JSON body +// (same shape as QueryJSON) and the response body is fully consumed +// + closed before the function returns — callers don't need to clean +// up on the error path. +func (c *Client) QueryArrow(ctx context.Context, sql, database string) (*ArrowResponse, error) { + body, err := json.Marshal(queryRequest{SQL: sql}) + if err != nil { + return nil, fmt.Errorf("encode query: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.cfg.Endpoint+"/api/v1/query/arrow", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/vnd.apache.arrow.stream") + c.setCommonHeaders(req, database) + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("arrow query: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + // Drain + close the error body ourselves so the caller's + // error-path doesn't need a defer. + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + _ = resp.Body.Close() + return nil, decodeServerError(resp.StatusCode, respBody) + } + + return &ArrowResponse{ + Body: resp.Body, + resp: resp, + }, nil +} + +// Close closes the underlying HTTP body. nil-safe (returns nil if the +// response or its Body is nil). Callers should defer this once and not +// double-close; while net/http's bodyEOFSignal happens to be idempotent +// today, the io.ReadCloser contract does not require it. +func (a *ArrowResponse) Close() error { + if a == nil || a.Body == nil { + return nil + } + return a.Body.Close() +} + +// ExecutionTimeMs returns the server's execution time from the +// `Arc-Execution-Time-Ms` HTTP trailer. Only valid after Body has been +// read to EOF — earlier calls return (0, false). Calling before EOF is +// not an error, just a missed read. +func (a *ArrowResponse) ExecutionTimeMs() (int64, bool) { + if a == nil || a.resp == nil { + return 0, false + } + v := a.resp.Trailer.Get(ArrowExecutionTimeTrailer) + if v == "" { + return 0, false + } + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return 0, false + } + return n, true +} diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000..5a4a1fd --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,126 @@ +// Package client is the HTTP wire-format adapter for the Arc server. +// +// Arc speaks JSON-over-HTTP for queries (row-major response, see +// QueryResult), Arrow IPC streaming for large query results, and +// line-protocol POST for writes. This package wraps those endpoints +// so the command layer doesn't have to know wire-format details. +// +// Conventions: +// - The `x-arc-database` header selects the target database for +// every request (query + write). If empty the server defaults to +// "default". +// - Authorization is always `Bearer `. +// - Errors come back as JSON `{"success": false, "error": "..."}` +// on the query endpoints and `{"error": "..."}` on writes. The +// Do* helpers in this package normalise both into a Go error. +// +// The client deliberately uses a configured `*http.Client` rather than +// `http.DefaultClient` so timeouts and TLS verification are explicit. +package client + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "strings" + "time" +) + +// HeaderDatabase is the request header Arc uses to select the target +// database for both query and write endpoints. +const HeaderDatabase = "x-arc-database" + +// ArrowExecutionTimeTrailer is the HTTP response trailer Arc emits at +// the end of an Arrow IPC stream carrying server-side execution time +// in milliseconds. Clients must read the response body to EOF before +// reading this trailer (HTTP/1.1 trailer semantics). +const ArrowExecutionTimeTrailer = "Arc-Execution-Time-Ms" + +// Config holds the per-client tuning knobs. Endpoint and Token are +// required; everything else has a sensible default. +type Config struct { + // Endpoint is the Arc HTTP base URL, e.g. "http://localhost:8000" + // (no trailing slash; we add the API paths ourselves). + Endpoint string + + // Token is the Bearer token from Arc's first-run banner. + Token string + + // Database is the default database name to send via x-arc-database + // when the per-call override is empty. May itself be empty, in + // which case Arc defaults to "default" server-side. + Database string + + // InsecureTLS skips certificate verification. Off by default. + // When true, the caller is responsible for warning the user. + InsecureTLS bool + + // Timeout is the per-request HTTP timeout. Default 60s. + // Writes and small queries finish well under this; for large + // `-o arrow` streams we override on the request. + Timeout time.Duration +} + +// Client is a stateful adapter around *http.Client + auth headers. +// One Client per Arc cluster; safe for concurrent use. +type Client struct { + cfg Config + http *http.Client +} + +// New builds a Client. Returns an error only on missing required +// config; transport construction never fails. +func New(cfg Config) (*Client, error) { + if cfg.Endpoint == "" { + return nil, fmt.Errorf("client: endpoint required") + } + if cfg.Token == "" { + return nil, fmt.Errorf("client: token required") + } + cfg.Endpoint = strings.TrimRight(cfg.Endpoint, "/") + if cfg.Timeout == 0 { + cfg.Timeout = 60 * time.Second + } + + // Clone DefaultTransport so we don't mutate the package global. + // We need to set TLSClientConfig per-Client (InsecureTLS varies). + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: cfg.InsecureTLS} //nolint:gosec // opt-in via --insecure / insecure_tls + transport.DialContext = (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext + + return &Client{ + cfg: cfg, + http: &http.Client{ + Transport: transport, + Timeout: cfg.Timeout, + }, + }, nil +} + +// Endpoint returns the base URL the client is configured against. +// Useful for `-v` verbose output. +func (c *Client) Endpoint() string { return c.cfg.Endpoint } + +// resolveDatabase picks the per-call override if set, else the +// client's default. Both can be empty (Arc falls back to "default"). +func (c *Client) resolveDatabase(override string) string { + if override != "" { + return override + } + return c.cfg.Database +} + +// setCommonHeaders writes Authorization + (optional) x-arc-database +// onto a *http.Request. Used by every Do* call so the headers are set +// in exactly one place. +func (c *Client) setCommonHeaders(req *http.Request, database string) { + req.Header.Set("Authorization", "Bearer "+c.cfg.Token) + if db := c.resolveDatabase(database); db != "" { + req.Header.Set(HeaderDatabase, db) + } + req.Header.Set("User-Agent", "arcctl") +} diff --git a/internal/client/client_test.go b/internal/client/client_test.go new file mode 100644 index 0000000..3873a5e --- /dev/null +++ b/internal/client/client_test.go @@ -0,0 +1,358 @@ +package client + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// freshClient returns a Client pointed at the given test server. +func freshClient(t *testing.T, srv *httptest.Server, db string) *Client { + t.Helper() + c, err := New(Config{ + Endpoint: srv.URL, + Token: "test-token", + Database: db, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + return c +} + +func TestNew_RequiresEndpointAndToken(t *testing.T) { + cases := []struct { + name string + cfg Config + }{ + {"no endpoint", Config{Token: "x"}}, + {"no token", Config{Endpoint: "http://x"}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if _, err := New(tc.cfg); err == nil { + t.Fatal("expected error, got nil") + } + }) + } +} + +func TestNew_TrimsTrailingSlash(t *testing.T) { + c, err := New(Config{Endpoint: "http://localhost:8000/", Token: "t"}) + if err != nil { + t.Fatalf("New: %v", err) + } + if got := c.Endpoint(); got != "http://localhost:8000" { + t.Fatalf("trailing slash not trimmed: got %q", got) + } +} + +func TestQueryJSON_SetsHeaders(t *testing.T) { + var ( + gotAuth string + gotDB string + gotPath string + gotMethod string + gotCT string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + gotDB = r.Header.Get("x-arc-database") + gotPath = r.URL.Path + gotMethod = r.Method + gotCT = r.Header.Get("Content-Type") + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"columns":["n"],"data":[[1]],"row_count":1,"execution_time_ms":1.5}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + qr, err := c.QueryJSON(context.Background(), "SELECT 1", "") + if err != nil { + t.Fatalf("QueryJSON: %v", err) + } + if gotAuth != "Bearer test-token" { + t.Errorf("Authorization = %q, want Bearer test-token", gotAuth) + } + if gotDB != "metrics" { + t.Errorf("x-arc-database = %q, want metrics", gotDB) + } + if gotPath != "/api/v1/query" { + t.Errorf("path = %q, want /api/v1/query", gotPath) + } + if gotMethod != "POST" { + t.Errorf("method = %q, want POST", gotMethod) + } + if gotCT != "application/json" { + t.Errorf("Content-Type = %q, want application/json", gotCT) + } + if qr.RowCount != 1 || len(qr.Columns) != 1 || qr.Columns[0] != "n" { + t.Errorf("decoded result wrong: %+v", qr) + } +} + +func TestQueryJSON_DatabaseOverride(t *testing.T) { + // Client default = "metrics", per-call override = "logs". + // Per-call wins. + var got string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + got = r.Header.Get("x-arc-database") + _, _ = io.WriteString(w, `{"columns":[],"data":[],"row_count":0}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + if _, err := c.QueryJSON(context.Background(), "SELECT 1", "logs"); err != nil { + t.Fatalf("QueryJSON: %v", err) + } + if got != "logs" { + t.Errorf("override ignored: got %q, want logs", got) + } +} + +func TestQueryJSON_NoDatabaseHeaderWhenBothEmpty(t *testing.T) { + // Empty client default + empty per-call -> header should be absent + // so Arc applies its server-side default ("default"). + gotSet := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, gotSet = r.Header[http.CanonicalHeaderKey("x-arc-database")] + _, _ = io.WriteString(w, `{"columns":[],"data":[],"row_count":0}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + if _, err := c.QueryJSON(context.Background(), "SELECT 1", ""); err != nil { + t.Fatalf("QueryJSON: %v", err) + } + if gotSet { + t.Error("x-arc-database header sent even though both client + per-call were empty") + } +} + +func TestQueryJSON_DecodesArcErrorBody(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = io.WriteString(w, `{"success":false,"error":"table not found"}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + _, err := c.QueryJSON(context.Background(), "SELECT 1", "") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "table not found") { + t.Errorf("error %q does not contain server message", err) + } + if !strings.Contains(err.Error(), "HTTP 400") { + t.Errorf("error %q does not include HTTP status", err) + } +} + +func TestQueryJSON_DecodesNonJSONErrorBody(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + _, _ = io.WriteString(w, `nginx upstream gone`) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + _, err := c.QueryJSON(context.Background(), "SELECT 1", "") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "HTTP 502") { + t.Errorf("error %q lacks status", err) + } + if !strings.Contains(err.Error(), "nginx") { + t.Errorf("error %q does not include raw body", err) + } +} + +func TestQueryArrow_StreamsBody(t *testing.T) { + // Verifies body streaming AND the trailer-after-EOF contract. We + // use the http.TrailerPrefix sentinel (the documented way for an + // http.Handler to emit a trailer that wasn't pre-declared in the + // response head); that's the same surface area Arc's production + // handler uses via fasthttp's respHeader.AddTrailer + + // respHeader.Set after SetBodyStreamWriter completes. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/query/arrow" { + t.Errorf("unexpected path %q", r.URL.Path) + } + w.Header().Set("Content-Type", "application/vnd.apache.arrow.stream") + w.Header().Set(http.TrailerPrefix+ArrowExecutionTimeTrailer, "42") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ARROW-IPC-PRETEND")) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + resp, err := c.QueryArrow(context.Background(), "SELECT 1", "") + if err != nil { + t.Fatalf("QueryArrow: %v", err) + } + defer resp.Close() + + // Before EOF: trailer is not yet readable on net/http response. + // We don't assert this — production callers always io.Copy first. + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + if string(body) != "ARROW-IPC-PRETEND" { + t.Errorf("body = %q", body) + } + ms, ok := resp.ExecutionTimeMs() + if !ok { + t.Fatal("trailer not read after body EOF") + } + if ms != 42 { + t.Errorf("trailer = %d, want 42", ms) + } +} + +func TestQueryArrow_DecodesErrorBeforeStream(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusForbidden) + _, _ = io.WriteString(w, `{"success":false,"error":"forbidden by RBAC"}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + resp, err := c.QueryArrow(context.Background(), "SELECT 1", "") + if err == nil { + _ = resp.Close() + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "forbidden by RBAC") { + t.Errorf("error %q lacks server message", err) + } +} + +func TestWriteLineProtocol_Success(t *testing.T) { + var ( + gotPath string + gotCT string + gotDB string + gotPrec string + gotBodyStr string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotCT = r.Header.Get("Content-Type") + gotDB = r.Header.Get("x-arc-database") + gotPrec = r.URL.Query().Get("precision") + b, _ := io.ReadAll(r.Body) + gotBodyStr = string(b) + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + err := c.WriteLineProtocol( + context.Background(), + strings.NewReader("cpu,host=a v=1 1234"), + "", + PrecisionMS, + ) + if err != nil { + t.Fatalf("WriteLineProtocol: %v", err) + } + if gotPath != "/api/v1/write/line-protocol" { + t.Errorf("path = %q", gotPath) + } + if gotCT != "text/plain" { + t.Errorf("content-type = %q", gotCT) + } + if gotDB != "metrics" { + t.Errorf("x-arc-database = %q", gotDB) + } + if gotPrec != "ms" { + t.Errorf("precision query = %q", gotPrec) + } + if gotBodyStr != "cpu,host=a v=1 1234" { + t.Errorf("body = %q", gotBodyStr) + } +} + +func TestWriteLineProtocol_PrecisionValidation(t *testing.T) { + // No server needed — validation runs before the request. + c := freshClient(t, httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})), "") + err := c.WriteLineProtocol(context.Background(), strings.NewReader(""), "", Precision("furlong")) + if err == nil || !strings.Contains(err.Error(), "invalid precision") { + t.Errorf("expected invalid precision error, got %v", err) + } +} + +func TestWriteLineProtocol_NoPrecisionQueryWhenEmpty(t *testing.T) { + gotRawQuery := "sentinel" + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotRawQuery = r.URL.RawQuery + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + if err := c.WriteLineProtocol(context.Background(), strings.NewReader("x v=1"), "", ""); err != nil { + t.Fatalf("WriteLineProtocol: %v", err) + } + if gotRawQuery != "" { + t.Errorf("expected empty query string when precision is empty, got %q", gotRawQuery) + } +} + +func TestWriteLineProtocol_DecodesErrorBody(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = io.WriteString(w, `{"error":"malformed line at offset 17"}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "") + err := c.WriteLineProtocol(context.Background(), strings.NewReader("garbage"), "", "") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "malformed line") { + t.Errorf("error %q lacks server message", err) + } +} + +func TestValidPrecision(t *testing.T) { + for _, p := range []string{"", "ns", "us", "ms", "s"} { + if !ValidPrecision(p) { + t.Errorf("ValidPrecision(%q) = false, want true", p) + } + } + for _, p := range []string{"furlong", "NS", "nanoseconds", " ns"} { + if ValidPrecision(p) { + t.Errorf("ValidPrecision(%q) = true, want false", p) + } + } +} + +func TestQueryResult_RowAt(t *testing.T) { + // Row-major: Data[i] is the i-th row. + qr := &QueryResult{ + Columns: []string{"a", "b"}, + Data: [][]any{ + {1.0, "x"}, + {2.0, "y"}, + {3.0, "z"}, + }, + RowCount: 3, + } + if got := qr.RowAt(1); got[0] != 2.0 || got[1] != "y" { + t.Errorf("RowAt(1) = %v", got) + } + if got := qr.RowAt(-1); got != nil { + t.Errorf("RowAt(-1) = %v, want nil", got) + } + if got := qr.RowAt(3); got != nil { + t.Errorf("RowAt(3) = %v, want nil", got) + } +} diff --git a/internal/client/query.go b/internal/client/query.go new file mode 100644 index 0000000..4791b3e --- /dev/null +++ b/internal/client/query.go @@ -0,0 +1,127 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +// QueryResult is the decoded JSON response from /api/v1/query. +// +// The JSON endpoint returns ROW-MAJOR data: Data[i] is the i-th row, +// a slice of length len(Columns) holding that row's cells. +// (The msgpack endpoint is columnar; the arrow endpoint is binary IPC. +// This shape is JSON-only — see arc/internal/api/query_msgpack_response.go). +type QueryResult struct { + // Columns is the list of column names in display order. + Columns []string `json:"columns"` + + // Data is row-major: Data[rowIdx] is one row's cells, in the same + // order as Columns. Each cell is whatever JSON decoded into — + // typically string, float64, bool, nil, or json.Number depending + // on the column's source type. + Data [][]any `json:"data"` + + // RowCount is the number of rows. The server sets this as + // len(Data); we surface it explicitly to handle the empty case + // (Data may be `null` over the wire when no rows match). + RowCount int `json:"row_count"` + + // ExecutionTimeMs is the server-side execution time. + ExecutionTimeMs float64 `json:"execution_time_ms"` +} + +// queryRequest is the on-the-wire body shape for /api/v1/query. +type queryRequest struct { + SQL string `json:"sql"` +} + +// errorResponse is the JSON body Arc returns on a failed query. +// `Error` is always populated; `Success` is `false` (but we don't +// rely on that — we key off the HTTP status code). +type errorResponse struct { + Success bool `json:"success"` + Error string `json:"error"` +} + +// QueryJSON runs a SQL query against the Arc server and returns the +// column-major result. The database arg overrides the client's +// default (empty == use client default == eventually Arc's "default"). +// +// Errors map as follows: +// - Network / timeout failures: returned as-is, wrapped. +// - 2xx but malformed body: returned as a parse error. +// - Non-2xx with a JSON error body: returned as `arc: `. +// - Non-2xx with a non-JSON body (e.g. plain-text 502 from a proxy): +// returned as `arc: HTTP : `. +func (c *Client) QueryJSON(ctx context.Context, sql, database string) (*QueryResult, error) { + body, err := json.Marshal(queryRequest{SQL: sql}) + if err != nil { + return nil, fmt.Errorf("encode query: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.cfg.Endpoint+"/api/v1/query", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + c.setCommonHeaders(req, database) + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer resp.Body.Close() + + // Bound the body read so a misbehaving proxy can't OOM the CLI. + // 64 MiB is generous for a JSON query response (Arrow IPC is the + // path for "big" results); Arc enforces its own server-side cap. + respBody, err := io.ReadAll(io.LimitReader(resp.Body, 64<<20)) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, decodeServerError(resp.StatusCode, respBody) + } + + var qr QueryResult + if err := json.Unmarshal(respBody, &qr); err != nil { + return nil, fmt.Errorf("decode query response: %w", err) + } + return &qr, nil +} + +// decodeServerError translates a non-2xx response into a Go error. +// Tries JSON first (Arc's normal failure shape), falls back to a +// truncated raw body for proxies / load balancers that don't speak +// Arc's protocol. +func decodeServerError(status int, body []byte) error { + var er errorResponse + if err := json.Unmarshal(body, &er); err == nil && er.Error != "" { + return fmt.Errorf("arc: %s (HTTP %d)", er.Error, status) + } + // Non-JSON or JSON-but-no-error-field. Truncate so a multi-MB + // HTML error page from nginx doesn't fill the terminal. + const maxRawLen = 512 + raw := string(body) + if len(raw) > maxRawLen { + raw = raw[:maxRawLen] + "...[truncated]" + } + return fmt.Errorf("arc: HTTP %d: %s", status, raw) +} + +// RowAt returns the i-th row of a QueryResult. Returns nil if i is +// out of range. Since Data is already row-major this is a direct +// index — the helper exists so renderers don't have to repeat the +// bounds check. +func (qr *QueryResult) RowAt(i int) []any { + if i < 0 || i >= len(qr.Data) { + return nil + } + return qr.Data[i] +} diff --git a/internal/client/write.go b/internal/client/write.go new file mode 100644 index 0000000..ac58fda --- /dev/null +++ b/internal/client/write.go @@ -0,0 +1,99 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" +) + +// Precision is the timestamp precision sent on a line-protocol write +// via the `?precision=` query param. Empty maps to nanoseconds (Arc's +// default). +type Precision string + +const ( + PrecisionNS Precision = "ns" + PrecisionUS Precision = "us" + PrecisionMS Precision = "ms" + PrecisionS Precision = "s" +) + +// ValidPrecision reports whether s is one of the four precisions Arc +// accepts. Empty string is treated as valid (server applies its default). +func ValidPrecision(s string) bool { + switch Precision(s) { + case "", PrecisionNS, PrecisionUS, PrecisionMS, PrecisionS: + return true + } + return false +} + +// WriteLineProtocol POSTs raw line-protocol bytes to /api/v1/write/line-protocol. +// +// The body io.Reader is streamed — we never buffer it fully. Callers +// passing an os.File or a stdin pipe get true streaming behaviour. +// +// `precision` may be empty (Arc applies nanosecond default). +// `database` overrides the client's default database. +// +// Returns nil on success (HTTP 204 No Content) and a decoded server +// error on any non-2xx response. +func (c *Client) WriteLineProtocol(ctx context.Context, body io.Reader, database string, precision Precision) error { + if !ValidPrecision(string(precision)) { + return fmt.Errorf("invalid precision %q (must be one of ns, us, ms, s)", precision) + } + + u, err := url.Parse(c.cfg.Endpoint + "/api/v1/write/line-protocol") + if err != nil { + return fmt.Errorf("build write URL: %w", err) + } + if precision != "" { + q := u.Query() + q.Set("precision", string(precision)) + u.RawQuery = q.Encode() + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), body) + if err != nil { + return fmt.Errorf("build write request: %w", err) + } + req.Header.Set("Content-Type", "text/plain") + c.setCommonHeaders(req, database) + + resp, err := c.http.Do(req) + if err != nil { + return fmt.Errorf("write: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + // Drain any small body Arc emits (currently 204 with empty + // body, but defensive against future changes). + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 4<<10)) + return nil + } + + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + return decodeWriteError(resp.StatusCode, respBody) +} + +// decodeWriteError handles the write endpoint's error shape, which +// differs from query: `{"error": "..."}` with no `success` field. +// Falls back to truncated raw body for non-JSON responses (proxies). +func decodeWriteError(status int, body []byte) error { + var er struct { + Error string `json:"error"` + } + if err := json.Unmarshal(body, &er); err == nil && er.Error != "" { + return fmt.Errorf("arc: %s (HTTP %d)", er.Error, status) + } + const maxRawLen = 512 + raw := string(body) + if len(raw) > maxRawLen { + raw = raw[:maxRawLen] + "...[truncated]" + } + return fmt.Errorf("arc: HTTP %d: %s", status, raw) +} diff --git a/internal/commands/query.go b/internal/commands/query.go new file mode 100644 index 0000000..9a7f7c7 --- /dev/null +++ b/internal/commands/query.go @@ -0,0 +1,226 @@ +// Query subcommand: runs SQL against an Arc cluster. +// +// Three input modes: +// 1. `arcctl query "SELECT ..."` (positional arg) +// 2. `arcctl query -f file.sql` (file containing the SQL) +// 3. `arcctl query` reading SQL from stdin (when neither arg nor -f +// is given and stdin is a pipe) +// +// Four output formats: table (default), json, csv, arrow. Arrow streams +// raw IPC bytes to stdout; the other three flow through internal/output. +package commands + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/spf13/cobra" + + "github.com/basekick-labs/arcctl/internal/client" + "github.com/basekick-labs/arcctl/internal/config" + "github.com/basekick-labs/arcctl/internal/output" +) + +func newQueryCmd() *cobra.Command { + var ( + connectionName string + endpoint string + token string + database string + insecure bool + outputFormat string + sqlFile string + noHeader bool + limit int + timeout time.Duration + ) + c := &cobra.Command{ + Use: "query [SQL]", + Short: "Run a SQL query against an Arc cluster", + Long: `Run a SQL query against an Arc cluster. + +SQL input precedence: positional argument > --file > stdin (only when +neither is supplied). Output defaults to a pretty table; -o json|csv +emit machine-parseable formats; -o arrow streams binary Arrow IPC to +stdout for piping into pyarrow / duckdb / etc.`, + Example: ` arcctl query "SELECT count(*) FROM cpu" + arcctl query --database metrics "SELECT * FROM cpu LIMIT 10" + arcctl query -f long_query.sql -o csv > out.csv + echo "SELECT 1" | arcctl query + arcctl query "SELECT * FROM cpu" -o arrow | python -c 'import pyarrow.ipc as ipc, sys; print(ipc.open_stream(sys.stdin.buffer).read_all())'`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if !output.ValidFormat(outputFormat) { + return fmt.Errorf("invalid --output %q (valid: table, json, csv, arrow)", outputFormat) + } + if timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", timeout) + } + + sql, err := readSQL(cmd, args, sqlFile) + if err != nil { + return err + } + if strings.TrimSpace(sql) == "" { + return fmt.Errorf("empty SQL (pass a positional arg, -f, or pipe via stdin)") + } + + cli, _, err := buildClient(cmd.ErrOrStderr(), connectionName, endpoint, token, insecure, timeout) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(cmd.Context(), timeout) + defer cancel() + + if outputFormat == output.FormatArrow { + return runArrowQuery(ctx, cli, sql, database, cmd.OutOrStdout(), cmd.ErrOrStderr()) + } + + qr, err := cli.QueryJSON(ctx, sql, database) + if err != nil { + return err + } + return output.RenderQueryResult(cmd.OutOrStdout(), qr, outputFormat, noHeader, limit) + }, + } + c.Flags().StringVarP(&connectionName, "connection", "c", "", "named connection (overrides active)") + c.Flags().StringVar(&endpoint, "endpoint", "", "ad-hoc Arc endpoint URL") + c.Flags().StringVar(&token, "token", "", "ad-hoc bearer token") + c.Flags().StringVar(&database, "database", "", "target database (defaults to connection's default_database)") + c.Flags().BoolVar(&insecure, "insecure", false, "skip TLS certificate verification (logs a warning to stderr)") + c.Flags().StringVarP(&outputFormat, "output", "o", output.FormatTable, "output format: table|json|csv|arrow") + c.Flags().StringVarP(&sqlFile, "file", "f", "", "read SQL from a file instead of the positional arg") + c.Flags().BoolVar(&noHeader, "no-header", false, "suppress column header row (table + csv)") + c.Flags().IntVar(&limit, "limit", 0, "cap output rows client-side (0 = no cap; server result is already bounded by the SQL)") + c.Flags().DurationVar(&timeout, "timeout", 60*time.Second, "per-request HTTP timeout") + return c +} + +// readSQL resolves the SQL string from the three input modes. Order: +// 1. Positional arg +// 2. -f file +// 3. Stdin (only if stdin is not a TTY, to avoid hanging on a missing +// arg) +func readSQL(cmd *cobra.Command, args []string, sqlFile string) (string, error) { + if len(args) > 0 { + return args[0], nil + } + if sqlFile != "" { + b, err := os.ReadFile(sqlFile) + if err != nil { + return "", fmt.Errorf("read SQL file: %w", err) + } + return string(b), nil + } + stdin := cmd.InOrStdin() + if isPipe(stdin) { + b, err := io.ReadAll(stdin) + if err != nil { + return "", fmt.Errorf("read SQL from stdin: %w", err) + } + return string(b), nil + } + return "", fmt.Errorf("no SQL provided (pass a positional arg, -f, or pipe via stdin)") +} + +// isPipe reports whether r is a non-TTY pipe/file (i.e. data is +// actually available). Avoids hanging on an interactive terminal when +// the user forgot to pass SQL. +// +// Returns true for any non-*os.File reader because that's what cobra's +// cmd.InOrStdin() returns in tests (typically *bytes.Buffer); tests +// always want "yes, read whatever's there." For real *os.File stdin +// we check ModeCharDevice == 0 to distinguish "pipe/file redirect" +// from "interactive TTY." +func isPipe(r io.Reader) bool { + f, ok := r.(*os.File) + if !ok { + return true + } + fi, err := f.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) == 0 +} + +// runArrowQuery streams the Arrow IPC body straight from the server to +// stdout. Server-side execution time is read from the response trailer +// (available only after Body has been drained to EOF) and printed to +// stderr after the stream finishes. +// +// On a mid-stream error from io.Copy (network drop, client kill, server +// reset) we have already written N bytes of a partial IPC stream to +// stdout — downstream consumers like pyarrow will surface that as a +// hard parse error. Print an explanatory stderr line in addition to +// returning the error so the operator sees both the corrupt binary on +// stdout AND the explanation on stderr. +func runArrowQuery(ctx context.Context, cli *client.Client, sql, database string, stdout, stderr io.Writer) error { + resp, err := cli.QueryArrow(ctx, sql, database) + if err != nil { + return err + } + defer resp.Close() + + n, err := io.Copy(stdout, resp.Body) + if err != nil { + fmt.Fprintf(stderr, "arrow: stream interrupted after %d bytes — stdout contains a truncated Arrow IPC payload that will not parse cleanly\n", n) + return fmt.Errorf("stream arrow body: %w", err) + } + if execMs, ok := resp.ExecutionTimeMs(); ok { + fmt.Fprintf(stderr, "arrow: %d bytes, server execution %dms\n", n, execMs) + } else { + fmt.Fprintf(stderr, "arrow: %d bytes\n", n) + } + return nil +} + +// buildClient resolves the connection (per CLAUDE.md precedence) and +// constructs an HTTP client. The connection's stored InsecureTLS is +// honored unless the user passes --insecure on the command line +// (flag wins, OR semantics). +// +// Shared by `query` and `write` so the resolution rules stay in one +// place. Returns the resolved connection name purely for `-v` / +// error messages. +// +// The TLS-disabled warning goes to the passed-in stderr writer (not +// os.Stderr directly) so tests using cmd.SetErr(buf) can capture it. +func buildClient(stderr io.Writer, connectionName, endpoint, token string, insecureFlag bool, timeout time.Duration) (*client.Client, string, error) { + cfg, err := config.Load() + if err != nil { + return nil, "", err + } + conn, name, err := cfg.Resolve(config.ResolveOptions{ + ConnectionName: connectionName, + Endpoint: endpoint, + Token: token, + }) + if err != nil { + return nil, "", err + } + insecure := conn.InsecureTLS || insecureFlag + if insecure && strings.HasPrefix(strings.ToLower(conn.Endpoint), "https://") { + // Only warn when TLS verify would actually have been applied. + // On http:// endpoints the skip-verify flag is a no-op and the + // warning would mislead. + if insecureFlag { + fmt.Fprintln(stderr, "WARNING: TLS certificate verification disabled (--insecure)") + } else { + fmt.Fprintln(stderr, "WARNING: TLS certificate verification disabled (connection has insecure_tls=true)") + } + } + cli, err := client.New(client.Config{ + Endpoint: conn.Endpoint, + Token: conn.Token, + Database: conn.DefaultDatabase, + InsecureTLS: insecure, + Timeout: timeout, + }) + return cli, name, err +} diff --git a/internal/commands/root.go b/internal/commands/root.go index 57bab01..0b699c2 100644 --- a/internal/commands/root.go +++ b/internal/commands/root.go @@ -28,6 +28,10 @@ First-time setup: SilenceUsage: true, } - root.AddCommand(newConfigCmd()) + root.AddCommand( + newConfigCmd(), + newQueryCmd(), + newWriteCmd(), + ) return root } diff --git a/internal/commands/write.go b/internal/commands/write.go new file mode 100644 index 0000000..5dc88b8 --- /dev/null +++ b/internal/commands/write.go @@ -0,0 +1,105 @@ +// Write subcommand: POSTs line protocol to /api/v1/write/line-protocol. +// +// Input: either `-f file.lp` (file path) or stdin (when -f is not +// supplied). Body is streamed — we never buffer the full payload — so +// piping `cat huge.lp | arcctl write` works at line-rate. +package commands + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "github.com/spf13/cobra" + + "github.com/basekick-labs/arcctl/internal/client" +) + +func newWriteCmd() *cobra.Command { + var ( + connectionName string + endpoint string + token string + database string + insecure bool + filePath string + precision string + timeout time.Duration + ) + c := &cobra.Command{ + Use: "write", + Short: "Write line-protocol records to an Arc cluster", + Long: `Write line-protocol records to an Arc cluster (POST /api/v1/write/line-protocol). + +Body source: --file (-f) takes precedence; otherwise reads from stdin. +The body is streamed — large files / pipes do NOT buffer in memory. + +Precision must be one of ns, us, ms, s. The server treats an unset +precision as nanoseconds.`, + Example: ` echo "cpu,host=a value=42 1234567890000000000" | arcctl write --database metrics + arcctl write -f payload.lp --database metrics --precision ms + cat /var/log/lp/*.lp | arcctl write -c prod --database metrics`, + RunE: func(cmd *cobra.Command, args []string) error { + if !client.ValidPrecision(precision) { + return fmt.Errorf("invalid --precision %q (must be one of ns, us, ms, s)", precision) + } + if timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", timeout) + } + + body, closer, err := openWriteBody(cmd, filePath) + if err != nil { + return err + } + if closer != nil { + defer closer.Close() + } + + cli, _, err := buildClient(cmd.ErrOrStderr(), connectionName, endpoint, token, insecure, timeout) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(cmd.Context(), timeout) + defer cancel() + + if err := cli.WriteLineProtocol(ctx, body, database, client.Precision(precision)); err != nil { + return err + } + fmt.Fprintln(cmd.OutOrStdout(), "OK") + return nil + }, + } + c.Flags().StringVarP(&connectionName, "connection", "c", "", "named connection (overrides active)") + c.Flags().StringVar(&endpoint, "endpoint", "", "ad-hoc Arc endpoint URL") + c.Flags().StringVar(&token, "token", "", "ad-hoc bearer token") + c.Flags().StringVar(&database, "database", "", "target database (defaults to connection's default_database)") + c.Flags().BoolVar(&insecure, "insecure", false, "skip TLS certificate verification (logs a warning to stderr)") + c.Flags().StringVarP(&filePath, "file", "f", "", "read line protocol from a file instead of stdin") + c.Flags().StringVar(&precision, "precision", "", "timestamp precision: ns|us|ms|s (default: server-side default = ns)") + c.Flags().DurationVar(&timeout, "timeout", 60*time.Second, "per-request HTTP timeout") + return c +} + +// openWriteBody returns the io.Reader to stream into the POST body. +// When `path` is non-empty it opens the file (caller closes via the +// returned io.Closer). Otherwise it returns stdin, which the caller +// MUST NOT close (no closer is returned for that path). +// +// We deliberately do NOT block on a TTY stdin like `query` does: an +// operator who runs `arcctl write` interactively and types lines is a +// supported (if rare) workflow. The hang-on-empty-TTY foot-gun +// matters for query because empty-SQL would error anyway; for write +// the server accepts an empty body as a no-op. +func openWriteBody(cmd *cobra.Command, path string) (io.Reader, io.Closer, error) { + if path != "" { + f, err := os.Open(path) + if err != nil { + return nil, nil, fmt.Errorf("open %s: %w", path, err) + } + return f, f, nil + } + return cmd.InOrStdin(), nil, nil +} diff --git a/internal/output/query.go b/internal/output/query.go new file mode 100644 index 0000000..59f302b --- /dev/null +++ b/internal/output/query.go @@ -0,0 +1,160 @@ +// Render strategies for query results. +// +// Three formats covered in PR2: table (default, pretty-printed), +// json (raw API shape, jq-friendly), csv (RFC 4180 with header row). +// Arrow IPC is a separate code path in cmd/query.go because it +// streams from the server rather than holding a decoded QueryResult. +package output + +import ( + "encoding/csv" + "encoding/json" + "fmt" + "io" + "strconv" + + "github.com/basekick-labs/arcctl/internal/client" +) + +// Format names; the value of `-o/--output`. +const ( + FormatTable = "table" + FormatJSON = "json" + FormatCSV = "csv" + FormatArrow = "arrow" +) + +// ValidFormat reports whether s is one of the four supported formats. +func ValidFormat(s string) bool { + switch s { + case FormatTable, FormatJSON, FormatCSV, FormatArrow: + return true + } + return false +} + +// RenderQueryResult writes a decoded QueryResult to w in the chosen +// format. `format` must be FormatTable, FormatJSON, or FormatCSV; +// FormatArrow is streamed from the server and does not flow through +// here (the command layer handles it directly). +// +// `noHeader` suppresses column headers for table + csv. Ignored by json. +// `limit` caps the number of rows written (0 = no cap). +func RenderQueryResult(w io.Writer, qr *client.QueryResult, format string, noHeader bool, limit int) error { + if qr == nil { + return fmt.Errorf("nil query result") + } + switch format { + case "", FormatTable: + return renderTable(w, qr, noHeader, limit) + case FormatJSON: + return renderJSON(w, qr) + case FormatCSV: + return renderCSV(w, qr, noHeader, limit) + case FormatArrow: + return fmt.Errorf("arrow format is streamed directly from the server; pass --output arrow to `arcctl query` for binary IPC on stdout") + } + return fmt.Errorf("unknown output format %q (valid: table, json, csv, arrow)", format) +} + +func renderTable(w io.Writer, qr *client.QueryResult, noHeader bool, limit int) error { + // Arc returns `columns:[] data:[]` for a query that hit no files + // (e.g. SELECT * FROM ). Without this branch + // tablewriter would emit zero bytes, leaving the operator wondering + // whether the query even ran. Mirror what psql / clickhouse-client + // do for the empty case. + if len(qr.Columns) == 0 { + _, err := fmt.Fprintln(w, "(0 rows)") + return err + } + rows := buildRows(qr, limit) + if noHeader { + return Table(w, nil, rows) + } + return Table(w, qr.Columns, rows) +} + +func renderJSON(w io.Writer, qr *client.QueryResult) error { + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(qr) +} + +func renderCSV(w io.Writer, qr *client.QueryResult, noHeader bool, limit int) error { + cw := csv.NewWriter(w) + if !noHeader { + if err := cw.Write(qr.Columns); err != nil { + return err + } + } + rowCount := qr.RowCount + if limit > 0 && limit < rowCount { + rowCount = limit + } + for i := 0; i < rowCount; i++ { + row := qr.RowAt(i) + strRow := make([]string, len(row)) + for j, v := range row { + strRow[j] = formatCell(v) + } + if err := cw.Write(strRow); err != nil { + return err + } + } + cw.Flush() + return cw.Error() +} + +// buildRows converts the row-major QueryResult.Data into a slice of +// stringified rows suitable for tablewriter. The transformation is +// per-cell only (formatCell); the outer shape passes through. +func buildRows(qr *client.QueryResult, limit int) [][]string { + rowCount := qr.RowCount + if limit > 0 && limit < rowCount { + rowCount = limit + } + rows := make([][]string, 0, rowCount) + for i := 0; i < rowCount; i++ { + row := qr.RowAt(i) + strRow := make([]string, len(row)) + for j, v := range row { + strRow[j] = formatCell(v) + } + rows = append(rows, strRow) + } + return rows +} + +// formatCell converts a cell value (from JSON-decoded `any`) into a +// terminal-safe string. Special-cases the common types so we don't +// fall through to "%v" for every cell (which formats float64 as +// "1.23e+09" and slices/maps awkwardly). +func formatCell(v any) string { + switch x := v.(type) { + case nil: + return "" + case string: + return x + case bool: + if x { + return "true" + } + return "false" + case float64: + // JSON numbers decode as float64. Integers up to 2^53 round-trip + // losslessly; print whole numbers without ".000000" tail. + if x == float64(int64(x)) { + return strconv.FormatInt(int64(x), 10) + } + return strconv.FormatFloat(x, 'g', -1, 64) + case json.Number: + return string(x) + } + // Slices, maps, anything else — fall back to compact JSON so the + // cell is at least machine-parseable in csv output. + b, err := json.Marshal(v) + if err != nil { + return fmt.Sprintf("%v", v) + } + return string(b) +} diff --git a/internal/output/query_test.go b/internal/output/query_test.go new file mode 100644 index 0000000..6588895 --- /dev/null +++ b/internal/output/query_test.go @@ -0,0 +1,172 @@ +package output + +import ( + "bytes" + "encoding/csv" + "encoding/json" + "strings" + "testing" + + "github.com/basekick-labs/arcctl/internal/client" +) + +func sampleQR() *client.QueryResult { + // Row-major: Data[i] = row i's cells in Columns order. + return &client.QueryResult{ + Columns: []string{"host", "value", "ok"}, + Data: [][]any{ + {"server-1", 42.5, true}, + {"server-2", 100.0, false}, + }, + RowCount: 2, + ExecutionTimeMs: 1.5, + } +} + +func TestValidFormat(t *testing.T) { + for _, f := range []string{"table", "json", "csv", "arrow"} { + if !ValidFormat(f) { + t.Errorf("ValidFormat(%q) = false", f) + } + } + for _, f := range []string{"", "yaml", "xml", "ARROW"} { + if ValidFormat(f) { + t.Errorf("ValidFormat(%q) = true", f) + } + } +} + +func TestRenderQueryResult_JSON(t *testing.T) { + var buf bytes.Buffer + if err := RenderQueryResult(&buf, sampleQR(), FormatJSON, false, 0); err != nil { + t.Fatalf("Render: %v", err) + } + // Round-trip back to confirm we emitted valid JSON with the + // same shape as the input. + var got client.QueryResult + if err := json.Unmarshal(buf.Bytes(), &got); err != nil { + t.Fatalf("output is not valid JSON: %v\n%s", err, buf.String()) + } + if got.RowCount != 2 || len(got.Columns) != 3 { + t.Errorf("round-trip shape wrong: %+v", got) + } +} + +func TestRenderQueryResult_CSV(t *testing.T) { + var buf bytes.Buffer + if err := RenderQueryResult(&buf, sampleQR(), FormatCSV, false, 0); err != nil { + t.Fatalf("Render: %v", err) + } + r := csv.NewReader(&buf) + records, err := r.ReadAll() + if err != nil { + t.Fatalf("output is not valid CSV: %v", err) + } + if len(records) != 3 { + t.Fatalf("got %d records, want 3 (header + 2 rows)", len(records)) + } + if records[0][0] != "host" { + t.Errorf("header row = %v", records[0]) + } + if records[1][0] != "server-1" || records[1][1] != "42.5" { + t.Errorf("row 1 = %v", records[1]) + } + if records[2][1] != "100" { + t.Errorf("row 2 value (whole-float should print as int): %v", records[2]) + } + if records[2][2] != "false" { + t.Errorf("row 2 = %v", records[2]) + } +} + +func TestRenderQueryResult_CSV_NoHeader(t *testing.T) { + var buf bytes.Buffer + if err := RenderQueryResult(&buf, sampleQR(), FormatCSV, true, 0); err != nil { + t.Fatalf("Render: %v", err) + } + r := csv.NewReader(&buf) + records, err := r.ReadAll() + if err != nil { + t.Fatalf("invalid CSV: %v", err) + } + if len(records) != 2 { + t.Errorf("expected 2 rows (no header), got %d: %v", len(records), records) + } +} + +func TestRenderQueryResult_TableEmptyResult(t *testing.T) { + // Arc's "no files found" response — empty columns AND empty data. + // Must print SOMETHING so the operator knows the query ran. + var buf bytes.Buffer + empty := &client.QueryResult{Columns: nil, Data: nil, RowCount: 0} + if err := RenderQueryResult(&buf, empty, FormatTable, false, 0); err != nil { + t.Fatalf("Render: %v", err) + } + if !strings.Contains(buf.String(), "(0 rows)") { + t.Errorf("empty table output = %q, want '(0 rows)'", buf.String()) + } +} + +func TestRenderQueryResult_TableHasHeaders(t *testing.T) { + var buf bytes.Buffer + if err := RenderQueryResult(&buf, sampleQR(), FormatTable, false, 0); err != nil { + t.Fatalf("Render: %v", err) + } + out := buf.String() + for _, want := range []string{"HOST", "VALUE", "OK", "server-1", "42", "true"} { + if !strings.Contains(out, want) { + t.Errorf("table missing %q\n%s", want, out) + } + } +} + +func TestRenderQueryResult_LimitCaps(t *testing.T) { + var buf bytes.Buffer + if err := RenderQueryResult(&buf, sampleQR(), FormatCSV, false, 1); err != nil { + t.Fatalf("Render: %v", err) + } + r := csv.NewReader(&buf) + records, _ := r.ReadAll() + // header + 1 row = 2 records + if len(records) != 2 { + t.Errorf("expected 2 records (header + 1 row), got %d", len(records)) + } +} + +func TestRenderQueryResult_ArrowRejected(t *testing.T) { + var buf bytes.Buffer + err := RenderQueryResult(&buf, sampleQR(), FormatArrow, false, 0) + if err == nil || !strings.Contains(err.Error(), "arrow format is streamed") { + t.Errorf("expected arrow-streamed-elsewhere error, got %v", err) + } +} + +func TestRenderQueryResult_UnknownFormat(t *testing.T) { + var buf bytes.Buffer + err := RenderQueryResult(&buf, sampleQR(), "xml", false, 0) + if err == nil || !strings.Contains(err.Error(), "unknown output format") { + t.Errorf("expected unknown-format error, got %v", err) + } +} + +func TestFormatCell(t *testing.T) { + cases := []struct { + in any + want string + }{ + {nil, ""}, + {"hello", "hello"}, + {true, "true"}, + {false, "false"}, + {42.0, "42"}, + {42.5, "42.5"}, + {json.Number("999999999999999"), "999999999999999"}, + {[]any{1, 2}, "[1,2]"}, + } + for _, c := range cases { + got := formatCell(c.in) + if got != c.want { + t.Errorf("formatCell(%v) = %q, want %q", c.in, got, c.want) + } + } +}