diff --git a/README.md b/README.md index 49bb9f4..94f4dc2 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ CLI for [Arc](https://github.com/Basekick-Labs/arc) — operator-facing client for Arc time-series databases. -> **Status:** v0.3.0-dev (PR3). Manages connection profiles, runs SQL queries, writes line protocol, and administers databases + measurements. `import` / `auth` / `cluster` subcommands ship in follow-up PRs. +> **Status:** v0.4.0-dev (PR4). Manages connection profiles, runs SQL queries, writes line protocol, administers databases + measurements, and bulk-imports CSV / LP / Parquet / TLE files. `auth` / `cluster` subcommands ship in follow-up PRs. ## Why @@ -152,6 +152,32 @@ arcctl measurement list -c prod --database logs -o json `db list`, `db show`, and `measurement list` all support `-o table|json|csv` (no `-o arrow` — these endpoints return JSON, not Arrow IPC). +## Bulk import + +`arcctl import` ships four file-import flows backed by Arc's admin-only `/api/v1/import/*` endpoints. Upload bodies are streamed via `io.Pipe`, so even multi-GB files don't buffer in memory. + +```bash +# CSV — file has no measurement, so --measurement is required +arcctl import csv -f data.csv --database metrics --measurement cpu +arcctl import csv -f data.csv --database metrics --measurement cpu \ + --time-column ts --time-format epoch_ms --delimiter ';' --skip-rows 1 + +# Line protocol — measurement comes from the LP lines themselves; +# --measurement here is an optional filter. Server auto-detects gzip. +arcctl import lp -f telegraf.lp --database metrics +arcctl import lp -f data.lp.gz --database metrics --precision ms +arcctl import lp -f data.lp --database metrics --measurement cpu # filter + +# Parquet — preserves types end-to-end (faster + lossless vs CSV) +arcctl import parquet -f data.parquet --database metrics --measurement cpu + +# TLE (NORAD two-line element / satellite tracking) +arcctl import tle -f starlink.tle --database satellites +arcctl import tle -f starlink.tle --database satellites --measurement starlink +``` + +All four require an **admin** token (server-side `adminAuth`). Each prints either a pretty result block or `-o json` for scripting; server errors (auth, validation, "file is empty", quota) surface verbatim. + ## TLS For HTTPS endpoints, certificate verification is on by default. To skip verification (lab / self-signed certs only), use either: @@ -168,10 +194,11 @@ This repo is being built in [phased PRs](https://github.com/Basekick-Labs/arcctl - ~~**PR1** — scaffold, `config` subcommand tree, multi-connection store~~ ✅ shipped - ~~**PR2** — `arcctl query`, `arcctl write`, output formats: table/json/csv/arrow~~ ✅ shipped - ~~**PR3** — `arcctl db {list,show,create,drop}`, `arcctl measurement list`~~ ✅ shipped -- **PR4** — `arcctl import {csv,lp,parquet,msgpack}` +- ~~**PR4** — `arcctl import {csv,lp,parquet,tle}` (msgpack write deferred to a follow-up)~~ ✅ shipped - **PR5** — `arcctl auth {token,whoami}` - **PR6** — `arcctl cluster {status,nodes}`, `arcctl compaction`, `arcctl retention` -- **PR7** — release workflow + Homebrew tap + multi-arch Docker, cut v1.0.0 +- **PR7** — `arcctl write --format msgpack` (msgpack-write follow-up) +- **PR8** — release workflow + Homebrew tap + multi-arch Docker, cut v1.0.0 Target: arcctl 1.x speaks to Arc 26.06+. diff --git a/internal/client/import.go b/internal/client/import.go new file mode 100644 index 0000000..59ebfa2 --- /dev/null +++ b/internal/client/import.go @@ -0,0 +1,369 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" + "path/filepath" +) + +// ImportResult is the server's response shape for CSV + Parquet imports +// (POST /api/v1/import/{csv,parquet}). Mirrors arc/internal/api/import.go's +// ImportResult struct. +type ImportResult struct { + Database string `json:"database"` + Measurement string `json:"measurement"` + RowsImported int64 `json:"rows_imported"` + PartitionsCreated int `json:"partitions_created"` + TimeRangeMin string `json:"time_range_min,omitempty"` + TimeRangeMax string `json:"time_range_max,omitempty"` + Columns []string `json:"columns"` + DurationMs int64 `json:"duration_ms"` +} + +// LPImportResult is the server's response shape for Line Protocol imports +// (POST /api/v1/import/lp). LP files self-declare their measurements via +// line syntax, so the result reports one or more measurements rather than +// a single one. +type LPImportResult struct { + Database string `json:"database"` + Measurements []string `json:"measurements"` + RowsImported int64 `json:"rows_imported"` + Precision string `json:"precision"` + DurationMs int64 `json:"duration_ms"` +} + +// TLEImportResult is the server's response shape for TLE (two-line element) +// satellite-data imports (POST /api/v1/import/tle). +type TLEImportResult struct { + Database string `json:"database"` + Measurement string `json:"measurement"` + SatelliteCount int `json:"satellite_count"` + RowsImported int64 `json:"rows_imported"` + ParseWarnings []string `json:"parse_warnings,omitempty"` + DurationMs int64 `json:"duration_ms"` +} + +// importEnvelope is the outer { "status": "ok", "result": {...} } wrapper +// the server uses for every successful import response. We unmarshal the +// inner `result` into the format-specific struct. +type importEnvelope struct { + Status string `json:"status"` + Result json.RawMessage `json:"result"` +} + +// CSVImportOptions are the optional knobs accepted by the CSV import +// endpoint as query parameters. Zero-valued fields are not sent (the +// server applies its defaults). +type CSVImportOptions struct { + // TimeColumn names the column to use as the row timestamp. + // Server default: "time". + TimeColumn string + + // TimeFormat tells the server how to parse TimeColumn values. + // Valid: "", "epoch_s", "epoch_ms", "epoch_us", "epoch_ns". + // Empty means "let DuckDB infer" (works for ISO-8601 strings). + TimeFormat string + + // Delimiter is the field separator. Server default: ",". + Delimiter string + + // SkipRows is the count of header rows to skip before parsing. + // Server default: 0. + SkipRows int +} + +// ParquetImportOptions mirrors the (much smaller) Parquet option set. +type ParquetImportOptions struct { + // TimeColumn names the column to use as the row timestamp. + // Server default: "time". + TimeColumn string +} + +// LPImportOptions configures a Line Protocol import. +type LPImportOptions struct { + // Precision is one of "ns", "us", "ms", "s". Empty = server default ("ns"). + Precision Precision + + // MeasurementFilter, when non-empty, drops any LP line whose + // measurement doesn't match. Server returns 400 if no lines match. + MeasurementFilter string +} + +// TLEImportOptions configures a TLE import. +type TLEImportOptions struct { + // Measurement overrides the default ("satellite_tle"). Sent via the + // quirky x-arc-measurement header rather than a query param. + Measurement string +} + +// ImportCSV uploads a CSV file via POST /api/v1/import/csv. The file is +// streamed end-to-end (multipart writer wraps an os.File reader), so +// large files do not buffer in memory. +// +// Server requires the `measurement` query param (not derivable from the +// file). Server-side admin auth required. +func (c *Client) ImportCSV(ctx context.Context, filePath, database, measurement string, opts CSVImportOptions) (*ImportResult, error) { + // Client-boundary validation. The cobra layer also validates, but + // keeping these here means programmatic callers (test helpers, + // future subcommands that bypass cobra) get the same guarantees. + // (Gemini PR #3 rounds 4 + 5.) + if measurement == "" { + return nil, fmt.Errorf("measurement is required") + } + if opts.SkipRows < 0 { + return nil, fmt.Errorf("CSVImportOptions.SkipRows must be >= 0 (got %d)", opts.SkipRows) + } + if opts.Delimiter != "" && len([]rune(opts.Delimiter)) != 1 { + // DuckDB's read_csv `delim=` takes a single character. Anything + // else would surface as a confusing server-side parse error; + // fail fast with a clear message instead. + return nil, fmt.Errorf("CSVImportOptions.Delimiter must be a single character (got %q)", opts.Delimiter) + } + switch opts.TimeFormat { + case "", "epoch_s", "epoch_ms", "epoch_us", "epoch_ns": + // valid (empty means "let the server / DuckDB infer") + default: + return nil, fmt.Errorf("invalid CSVImportOptions.TimeFormat %q (must be one of: epoch_s, epoch_ms, epoch_us, epoch_ns)", opts.TimeFormat) + } + + q := url.Values{} + q.Set("measurement", measurement) + if opts.TimeColumn != "" { + q.Set("time_column", opts.TimeColumn) + } + if opts.TimeFormat != "" { + q.Set("time_format", opts.TimeFormat) + } + if opts.Delimiter != "" { + q.Set("delimiter", opts.Delimiter) + } + if opts.SkipRows > 0 { + q.Set("skip_rows", fmt.Sprintf("%d", opts.SkipRows)) + } + + envelope, err := c.uploadMultipart(ctx, "/api/v1/import/csv", filePath, database, q, nil) + if err != nil { + return nil, err + } + var out ImportResult + if err := json.Unmarshal(envelope.Result, &out); err != nil { + return nil, fmt.Errorf("decode csv import result: %w", err) + } + return &out, nil +} + +// ImportParquet uploads a Parquet file via POST /api/v1/import/parquet. +// Measurement is required (Parquet files don't carry a measurement name). +func (c *Client) ImportParquet(ctx context.Context, filePath, database, measurement string, opts ParquetImportOptions) (*ImportResult, error) { + // Client-boundary validation. (Gemini PR #3 round 5.) + if measurement == "" { + return nil, fmt.Errorf("measurement is required") + } + + q := url.Values{} + q.Set("measurement", measurement) + if opts.TimeColumn != "" { + q.Set("time_column", opts.TimeColumn) + } + + envelope, err := c.uploadMultipart(ctx, "/api/v1/import/parquet", filePath, database, q, nil) + if err != nil { + return nil, err + } + var out ImportResult + if err := json.Unmarshal(envelope.Result, &out); err != nil { + return nil, fmt.Errorf("decode parquet import result: %w", err) + } + return &out, nil +} + +// ImportLP uploads a Line Protocol file via POST /api/v1/import/lp. +// Measurement is OPTIONAL (LP lines self-declare); when set it acts as a +// server-side filter. Server auto-detects gzip via magic bytes; clients +// can pass either compressed or plain LP and the server figures it out. +func (c *Client) ImportLP(ctx context.Context, filePath, database string, opts LPImportOptions) (*LPImportResult, error) { + // ValidPrecision("") returns true (empty == "let server apply default"), + // so the bare check is sufficient — no need for an outer != "" guard. + if !ValidPrecision(string(opts.Precision)) { + return nil, fmt.Errorf("invalid precision %q (must be one of ns, us, ms, s)", opts.Precision) + } + + q := url.Values{} + if opts.Precision != "" { + q.Set("precision", string(opts.Precision)) + } + if opts.MeasurementFilter != "" { + q.Set("measurement", opts.MeasurementFilter) + } + + envelope, err := c.uploadMultipart(ctx, "/api/v1/import/lp", filePath, database, q, nil) + if err != nil { + return nil, err + } + var out LPImportResult + if err := json.Unmarshal(envelope.Result, &out); err != nil { + return nil, fmt.Errorf("decode lp import result: %w", err) + } + return &out, nil +} + +// ImportTLE uploads a TLE (two-line element / satellite tracking) file +// via POST /api/v1/import/tle. The measurement override is sent via +// header `x-arc-measurement` rather than a query param — quirky server +// design, mirrored faithfully here so the server's defaulting behavior +// works correctly. +func (c *Client) ImportTLE(ctx context.Context, filePath, database string, opts TLEImportOptions) (*TLEImportResult, error) { + var extraHeaders map[string]string + if opts.Measurement != "" { + extraHeaders = map[string]string{"x-arc-measurement": opts.Measurement} + } + + envelope, err := c.uploadMultipart(ctx, "/api/v1/import/tle", filePath, database, nil, extraHeaders) + if err != nil { + return nil, err + } + var out TLEImportResult + if err := json.Unmarshal(envelope.Result, &out); err != nil { + return nil, fmt.Errorf("decode tle import result: %w", err) + } + return &out, nil +} + +// uploadMultipart is the shared multipart-upload primitive used by every +// import command. Builds a multipart body with one `file` field that +// streams from disk, sets the database header + any per-call extras, +// posts to the given path, and returns the decoded envelope. +// +// The multipart body is built via io.Pipe so the file is NEVER fully +// buffered in memory — even a 500MB CSV streams chunk-by-chunk over the +// wire. The pipe writer runs in a goroutine so the HTTP client's reader +// drives the upload rate. +func (c *Client) uploadMultipart( + ctx context.Context, + path string, + filePath string, + database string, + query url.Values, + extraHeaders map[string]string, +) (*importEnvelope, error) { + if database == "" { + return nil, fmt.Errorf("database is required") + } + + // Open file up front so a missing-file error surfaces immediately + // (before we set up the goroutine + HTTP request). os.Open + // already includes the path in its PathError, so we wrap without + // duplicating it. + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + // f is closed inside the goroutine after the multipart writer + // finishes; we MUST NOT defer Close here or we'd race with the + // goroutine's final read. + + // Compose URL with optional query params. + u := c.cfg.Endpoint + path + if len(query) > 0 { + u += "?" + query.Encode() + } + + // io.Pipe to stream the multipart body without buffering. + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + + // Guarantee pr is closed when this function exits, regardless of + // path. Without this, an early failure in http.NewRequestWithContext + // or http.Do (DNS failure, TLS handshake failure, ctx cancelled + // before send) would leave pr open — the writer goroutine then + // blocks on pw.Write forever, leaking the goroutine and the + // underlying file handle. io.PipeReader.Close is idempotent per + // Go's documentation, so a redundant close on the happy path + // (where the transport already closed pr after reading the body + // to EOF) is harmless. (Gemini PR #3 round 3 finding — High.) + defer pr.Close() + + go func() { + // Inside the goroutine we own both `f` and `mw` and are + // responsible for closing the pipe on every path so the reader + // side (HTTP request body) eventually sees EOF or a real error + // rather than blocking forever. + defer f.Close() + + part, err := mw.CreateFormFile("file", filepath.Base(filePath)) + if err != nil { + // Don't call mw.Close() on error paths — it would write + // the trailing boundary into a stream whose preceding + // content is already known-bad, producing a multipart + // payload that's syntactically valid but semantically + // truncated. Just signal the error on the pipe and let + // the reader surface it. (Gemini PR #3 finding.) + _ = pw.CloseWithError(fmt.Errorf("create form file: %w", err)) + return + } + if _, err := io.Copy(part, f); err != nil { + _ = pw.CloseWithError(fmt.Errorf("stream file body: %w", err)) + return + } + // Happy path: close the multipart writer so the trailing + // boundary is written, then close the pipe cleanly so the + // reader sees EOF. + if err := mw.Close(); err != nil { + _ = pw.CloseWithError(fmt.Errorf("close multipart writer: %w", err)) + return + } + _ = pw.Close() + }() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, pr) + if err != nil { + return nil, fmt.Errorf("build import request: %w", err) + } + + // Database goes via x-arc-database header for consistency with + // query/write (the server also accepts ?db= as a fallback). + c.setCommonHeaders(req, database) + for k, v := range extraHeaders { + // Refuse to clobber the multipart Content-Type. Setting that + // header from extraHeaders would silently lose the boundary + // and break every upload in confusing ways; force callers to + // route Content-Type through the canonical path instead. + if http.CanonicalHeaderKey(k) == "Content-Type" { + continue + } + req.Header.Set(k, v) + } + // Set Content-Type LAST so neither setCommonHeaders nor the + // extraHeaders loop can accidentally overwrite the boundary + // parameter computed by mw.FormDataContentType(). + req.Header.Set("Content-Type", mw.FormDataContentType()) + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("import upload: %w", err) + } + defer resp.Body.Close() + + // Import responses include a small JSON result; cap the read at + // 16 MiB to defend against a misbehaving proxy returning HTML. + body, err := io.ReadAll(io.LimitReader(resp.Body, 16<<20)) + if err != nil { + return nil, fmt.Errorf("read import response: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, decodeWriteError(resp.StatusCode, body) + } + + var env importEnvelope + if err := json.Unmarshal(body, &env); err != nil { + return nil, fmt.Errorf("decode import envelope: %w", err) + } + return &env, nil +} diff --git a/internal/client/import_test.go b/internal/client/import_test.go new file mode 100644 index 0000000..c15eda1 --- /dev/null +++ b/internal/client/import_test.go @@ -0,0 +1,637 @@ +package client + +import ( + "context" + "encoding/json" + "io" + "mime" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" +) + +// writeTempFile writes content to a fresh temp file and returns its path. +// t.Cleanup handles removal. +func writeTempFile(t *testing.T, name, content string) string { + t.Helper() + dir := t.TempDir() + p := filepath.Join(dir, name) + if err := os.WriteFile(p, []byte(content), 0o600); err != nil { + t.Fatalf("WriteFile: %v", err) + } + return p +} + +// parseMultipartForm decodes the file field out of a multipart request. +// Returns (filename, contents). +func parseMultipartForm(t *testing.T, r *http.Request) (string, string) { + t.Helper() + if err := r.ParseMultipartForm(32 << 20); err != nil { + t.Fatalf("ParseMultipartForm: %v", err) + } + f, hdr, err := r.FormFile("file") + if err != nil { + t.Fatalf("FormFile: %v", err) + } + defer f.Close() + body, err := io.ReadAll(f) + if err != nil { + t.Fatalf("read file: %v", err) + } + return hdr.Filename, string(body) +} + +// envelope helper for happy-path test servers. +func writeImportEnvelope(t *testing.T, w http.ResponseWriter, result any) { + t.Helper() + raw, err := json.Marshal(result) + if err != nil { + t.Fatalf("marshal result: %v", err) + } + env := importEnvelope{Status: "ok", Result: raw} + out, err := json.Marshal(env) + if err != nil { + t.Fatalf("marshal envelope: %v", err) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(out) +} + +func TestImportCSV_SendsMultipartAndQueryParams(t *testing.T) { + var ( + gotPath string + gotMethod string + gotQuery url.Values + gotCT string + gotDB string + gotAuth string + gotFilename string + gotFileContent string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotMethod = r.Method + gotQuery = r.URL.Query() + gotCT = r.Header.Get("Content-Type") + gotDB = r.Header.Get("x-arc-database") + gotAuth = r.Header.Get("Authorization") + gotFilename, gotFileContent = parseMultipartForm(t, r) + writeImportEnvelope(t, w, ImportResult{ + Database: "metrics", Measurement: "cpu", + RowsImported: 3, PartitionsCreated: 1, + Columns: []string{"time", "host", "value"}, DurationMs: 17, + }) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.csv", "time,host,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n") + + result, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{ + TimeColumn: "time", + TimeFormat: "epoch_s", + Delimiter: ",", + SkipRows: 1, + }) + if err != nil { + t.Fatalf("ImportCSV: %v", err) + } + + if gotMethod != "POST" { + t.Errorf("method = %q", gotMethod) + } + if gotPath != "/api/v1/import/csv" { + t.Errorf("path = %q", gotPath) + } + if !strings.HasPrefix(gotCT, "multipart/form-data;") { + t.Errorf("content-type = %q; expected multipart/form-data;...", gotCT) + } + if gotDB != "metrics" { + t.Errorf("x-arc-database = %q", gotDB) + } + if gotAuth != "Bearer test-token" { + t.Errorf("auth = %q", gotAuth) + } + if gotQuery.Get("measurement") != "cpu" { + t.Errorf("measurement query = %q", gotQuery.Get("measurement")) + } + if gotQuery.Get("time_column") != "time" { + t.Errorf("time_column query = %q", gotQuery.Get("time_column")) + } + if gotQuery.Get("time_format") != "epoch_s" { + t.Errorf("time_format query = %q", gotQuery.Get("time_format")) + } + if gotQuery.Get("delimiter") != "," { + t.Errorf("delimiter query = %q", gotQuery.Get("delimiter")) + } + if gotQuery.Get("skip_rows") != "1" { + t.Errorf("skip_rows query = %q", gotQuery.Get("skip_rows")) + } + if gotFilename != "data.csv" { + t.Errorf("uploaded filename = %q", gotFilename) + } + if !strings.HasPrefix(gotFileContent, "time,host,value\n") { + t.Errorf("uploaded body = %q", gotFileContent) + } + if result.RowsImported != 3 || result.Measurement != "cpu" { + t.Errorf("decoded result = %+v", result) + } +} + +func TestImportCSV_OmitsEmptyOptionalQueryParams(t *testing.T) { + var gotQuery url.Values + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotQuery = r.URL.Query() + // Drain the body so the upload goroutine completes. + _, _, _ = r.FormFile("file") // best-effort + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, ImportResult{Database: "metrics", Measurement: "cpu"}) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.csv", "x\n1\n") + if _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{}); err != nil { + t.Fatalf("ImportCSV: %v", err) + } + // Only `measurement` should be present. + if _, ok := gotQuery["time_column"]; ok { + t.Errorf("time_column sent even though empty: %v", gotQuery) + } + if _, ok := gotQuery["time_format"]; ok { + t.Errorf("time_format sent even though empty: %v", gotQuery) + } + if _, ok := gotQuery["delimiter"]; ok { + t.Errorf("delimiter sent even though empty: %v", gotQuery) + } + if _, ok := gotQuery["skip_rows"]; ok { + t.Errorf("skip_rows sent even though 0: %v", gotQuery) + } + if gotQuery.Get("measurement") != "cpu" { + t.Errorf("measurement not sent: %v", gotQuery) + } +} + +func TestImportCSV_MissingFile(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called when file is missing") + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + _, err := c.ImportCSV(context.Background(), "/nonexistent/path.csv", "metrics", "cpu", CSVImportOptions{}) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "open") { + t.Errorf("error %q lacks open context", err) + } +} + +func TestImportCSV_MissingDatabase(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called") + })) + defer srv.Close() + c := freshClient(t, srv, "") + _, err := c.ImportCSV(context.Background(), "/nonexistent", "", "cpu", CSVImportOptions{}) + if err == nil || !strings.Contains(err.Error(), "database is required") { + t.Errorf("expected database-required error, got %v", err) + } +} + +// TestImportCSV_RejectsEmptyMeasurement and the three TimeFormat/ +// Delimiter tests below pin the client-boundary validations added in +// Gemini PR #3 round 5. Each fails before the server is contacted, +// so the test handler's t.Fatal would fire on any regression. + +func TestImportCSV_RejectsEmptyMeasurement(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called with empty measurement") + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "x\n1\n") + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "", CSVImportOptions{}) + if err == nil || !strings.Contains(err.Error(), "measurement is required") { + t.Errorf("expected measurement-required error, got %v", err) + } +} + +func TestImportCSV_RejectsMultiCharDelimiter(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called with multi-char delimiter") + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "x\n1\n") + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{Delimiter: "||"}) + if err == nil || !strings.Contains(err.Error(), "Delimiter must be a single character") { + t.Errorf("expected delimiter-single-char error, got %v", err) + } +} + +func TestImportCSV_AcceptsSingleRuneDelimiter(t *testing.T) { + // A multi-byte UTF-8 single rune (e.g. en-dash, U+2013, 3 bytes) + // should pass the `len([]rune) == 1` check. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, ImportResult{Database: "metrics", Measurement: "cpu"}) + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "x\n1\n") + if _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{Delimiter: "—"}); err != nil { + t.Errorf("single multi-byte rune should be accepted, got %v", err) + } +} + +func TestImportCSV_RejectsInvalidTimeFormat(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called with invalid time_format") + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "x\n1\n") + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{TimeFormat: "epoch_milliseconds"}) + if err == nil || !strings.Contains(err.Error(), "invalid CSVImportOptions.TimeFormat") { + t.Errorf("expected time-format-enum error, got %v", err) + } +} + +func TestImportCSV_AcceptsEmptyTimeFormat(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, ImportResult{Database: "metrics", Measurement: "cpu"}) + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "x\n1\n") + if _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{TimeFormat: ""}); err != nil { + t.Errorf("empty TimeFormat should be accepted (means infer), got %v", err) + } +} + +func TestImportParquet_RejectsEmptyMeasurement(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called with empty measurement") + })) + defer srv.Close() + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.parquet", "fake") + _, err := c.ImportParquet(context.Background(), filePath, "metrics", "", ParquetImportOptions{}) + if err == nil || !strings.Contains(err.Error(), "measurement is required") { + t.Errorf("expected measurement-required error, got %v", err) + } +} + +// TestImportCSV_RejectsNegativeSkipRows pins the client-layer +// validation Gemini added in PR #3 round 4. The cobra layer rejects +// negative --skip-rows at RunE, but the client must ALSO reject so +// any future caller bypassing cobra can't silently drop the value. +func TestImportCSV_RejectsNegativeSkipRows(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called when SkipRows is negative") + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.csv", "x\n1\n") + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{SkipRows: -3}) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "SkipRows must be >= 0") { + t.Errorf("error %q lacks SkipRows context", err) + } +} + +func TestImportCSV_ServerError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = io.WriteString(w, `{"error":"measurement query parameter is required"}`) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.csv", "x\n1\n") + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{}) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "measurement query parameter is required") { + t.Errorf("error %q lacks server message", err) + } + if !strings.Contains(err.Error(), "HTTP 400") { + t.Errorf("error %q lacks status", err) + } +} + +func TestImportParquet_BasicShape(t *testing.T) { + var ( + gotPath string + gotQuery url.Values + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotQuery = r.URL.Query() + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, ImportResult{ + Database: "metrics", Measurement: "cpu", RowsImported: 100, + }) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.parquet", "fake parquet bytes") + if _, err := c.ImportParquet(context.Background(), filePath, "metrics", "cpu", ParquetImportOptions{TimeColumn: "ts"}); err != nil { + t.Fatalf("ImportParquet: %v", err) + } + if gotPath != "/api/v1/import/parquet" { + t.Errorf("path = %q", gotPath) + } + if gotQuery.Get("measurement") != "cpu" { + t.Errorf("measurement query = %q", gotQuery.Get("measurement")) + } + if gotQuery.Get("time_column") != "ts" { + t.Errorf("time_column query = %q", gotQuery.Get("time_column")) + } +} + +func TestImportLP_PrecisionAndMeasurementFilter(t *testing.T) { + var gotQuery url.Values + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotQuery = r.URL.Query() + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, LPImportResult{ + Database: "metrics", Measurements: []string{"cpu"}, + RowsImported: 42, Precision: "ms", DurationMs: 5, + }) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.lp", "cpu v=1 1700000000000\n") + result, err := c.ImportLP(context.Background(), filePath, "metrics", LPImportOptions{ + Precision: PrecisionMS, + MeasurementFilter: "cpu", + }) + if err != nil { + t.Fatalf("ImportLP: %v", err) + } + if gotQuery.Get("precision") != "ms" { + t.Errorf("precision query = %q", gotQuery.Get("precision")) + } + if gotQuery.Get("measurement") != "cpu" { + t.Errorf("measurement filter query = %q", gotQuery.Get("measurement")) + } + if result.RowsImported != 42 || result.Measurements[0] != "cpu" { + t.Errorf("decoded result = %+v", result) + } +} + +func TestImportLP_InvalidPrecision(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called") + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.lp", "cpu v=1\n") + _, err := c.ImportLP(context.Background(), filePath, "metrics", LPImportOptions{Precision: Precision("furlong")}) + if err == nil || !strings.Contains(err.Error(), "invalid precision") { + t.Errorf("expected invalid-precision error, got %v", err) + } +} + +func TestImportLP_NoOptionalQueryParamsWhenEmpty(t *testing.T) { + var gotQuery url.Values + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotQuery = r.URL.Query() + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, LPImportResult{Database: "metrics"}) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "data.lp", "cpu v=1\n") + if _, err := c.ImportLP(context.Background(), filePath, "metrics", LPImportOptions{}); err != nil { + t.Fatalf("ImportLP: %v", err) + } + if _, ok := gotQuery["precision"]; ok { + t.Errorf("precision sent even though empty: %v", gotQuery) + } + if _, ok := gotQuery["measurement"]; ok { + t.Errorf("measurement sent even though empty: %v", gotQuery) + } +} + +func TestImportTLE_MeasurementGoesViaHeader(t *testing.T) { + var ( + gotPath string + gotMeasHdr string + gotMeasQuery string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + gotMeasHdr = r.Header.Get("x-arc-measurement") + gotMeasQuery = r.URL.Query().Get("measurement") + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, TLEImportResult{ + Database: "satellites", Measurement: "starlink", + SatelliteCount: 12, RowsImported: 12, DurationMs: 3, + }) + })) + defer srv.Close() + + c := freshClient(t, srv, "satellites") + filePath := writeTempFile(t, "starlink.tle", "STARLINK-1\n1 11111U\n2 22222\n") + if _, err := c.ImportTLE(context.Background(), filePath, "satellites", TLEImportOptions{Measurement: "starlink"}); err != nil { + t.Fatalf("ImportTLE: %v", err) + } + if gotPath != "/api/v1/import/tle" { + t.Errorf("path = %q", gotPath) + } + // Server uses HEADER not query param for TLE measurement override. + if gotMeasHdr != "starlink" { + t.Errorf("x-arc-measurement header = %q; expected starlink", gotMeasHdr) + } + if gotMeasQuery != "" { + t.Errorf("measurement query param = %q; expected empty (TLE uses header)", gotMeasQuery) + } +} + +func TestImportTLE_NoMeasurementHeaderWhenOmitted(t *testing.T) { + // When user doesn't pass --measurement, arcctl must NOT send the + // header so the server can apply its default ("satellite_tle"). + hdrPresent := true + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, hdrPresent = r.Header[http.CanonicalHeaderKey("x-arc-measurement")] + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, TLEImportResult{Database: "sats", Measurement: "satellite_tle"}) + })) + defer srv.Close() + + c := freshClient(t, srv, "sats") + filePath := writeTempFile(t, "f.tle", "X\n1\n2\n") + if _, err := c.ImportTLE(context.Background(), filePath, "sats", TLEImportOptions{}); err != nil { + t.Fatalf("ImportTLE: %v", err) + } + if hdrPresent { + t.Errorf("x-arc-measurement header was sent despite empty override") + } +} + +func TestUploadMultipart_StreamsLargeFileWithoutBuffering(t *testing.T) { + // Smoke test: write a 5 MiB file and confirm the server receives + // exactly the same bytes. This isn't a real memory-pressure test + // (Go heap behavior under test conditions is noisy), but it does + // confirm the io.Pipe + goroutine plumbing works correctly under + // non-trivial payload sizes. + const size = 5 * 1024 * 1024 + var sentBytes int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = r.ParseMultipartForm(1 << 30) + _, _, sent := drainMultipartFile(t, r) + sentBytes = sent + writeImportEnvelope(t, w, ImportResult{Database: "x", Measurement: "y", RowsImported: 0}) + })) + defer srv.Close() + + dir := t.TempDir() + p := filepath.Join(dir, "big.csv") + buf := make([]byte, size) + for i := range buf { + buf[i] = byte('a' + (i % 26)) + } + if err := os.WriteFile(p, buf, 0o600); err != nil { + t.Fatalf("write big file: %v", err) + } + + c := freshClient(t, srv, "x") + if _, err := c.ImportCSV(context.Background(), p, "x", "y", CSVImportOptions{}); err != nil { + t.Fatalf("ImportCSV: %v", err) + } + if sentBytes != size { + t.Errorf("server received %d bytes, want %d", sentBytes, size) + } +} + +// drainMultipartFile is a helper for the streaming test — counts the +// bytes the server actually received in the file field. +func drainMultipartFile(t *testing.T, r *http.Request) (string, string, int) { + t.Helper() + f, hdr, err := r.FormFile("file") + if err != nil { + t.Fatalf("FormFile: %v", err) + } + defer f.Close() + n, err := io.Copy(io.Discard, f) + if err != nil { + t.Fatalf("drain: %v", err) + } + return hdr.Filename, "", int(n) +} + +// TestUploadMultipart_NoGoroutineLeakOnEarlyDoFailure runs the import +// path against a refused-connection port and verifies that no goroutine +// is leaked across many iterations. +// +// Context: Gemini PR #3 round-3 raised the concern that if +// c.http.Do returns an error before the transport begins reading the +// request body, the io.Pipe writer goroutine could block on pw.Write +// forever and leak both the goroutine and the underlying file handle. +// The fix is `defer pr.Close()` immediately after io.Pipe creation, so +// the pipe is closed on every return path regardless of transport +// behavior. +// +// Empirically (Go 1.25), net/http's transport DOES close req.Body on +// the connection-refused path, so the leak isn't reproducible by simply +// refusing the TCP connection — `delta=0` even without the fix. This +// test therefore exists as a guard rather than a strict reproducer: if +// a future Go change (or a different early-failure path like +// pre-send ctx-cancel) starts leaking, the linear growth would surface +// here. We accept that the mutation test passes today — the defense-in- +// depth value of the fix isn't reproducible in current Go, but the +// `defer pr.Close()` is still correct hygiene. +func TestUploadMultipart_NoGoroutineLeakOnEarlyDoFailure(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + deadAddr := ln.Addr().String() + _ = ln.Close() + + c, err := New(Config{ + Endpoint: "http://" + deadAddr, + Token: "t", + Timeout: 2 * time.Second, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + filePath := writeTempFile(t, "x.csv", "a,b\n1,2\n") + + runtime.GC() + runtime.Gosched() + time.Sleep(50 * time.Millisecond) + before := runtime.NumGoroutine() + + const iterations = 8 + for i := 0; i < iterations; i++ { + _, err := c.ImportCSV(context.Background(), filePath, "metrics", "cpu", CSVImportOptions{}) + if err == nil { + t.Fatal("expected error from refused connection, got nil") + } + } + + deadline := time.Now().Add(2 * time.Second) + var after int + for time.Now().Before(deadline) { + runtime.GC() + after = runtime.NumGoroutine() + if after-before < iterations { + break + } + time.Sleep(50 * time.Millisecond) + } + + if after-before >= iterations { + t.Errorf("goroutine leak: before=%d after=%d delta=%d iterations=%d", before, after, after-before, iterations) + } +} + +// TestContentTypeIsValidMultipart verifies the Content-Type header the +// client sends parses as real multipart/form-data with a boundary +// parameter, rather than relying on a brittle prefix check. +func TestContentTypeIsValidMultipart(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ct := r.Header.Get("Content-Type") + mediaType, params, err := mime.ParseMediaType(ct) + if err != nil { + t.Errorf("Content-Type %q invalid: %v", ct, err) + } + if mediaType != "multipart/form-data" { + t.Errorf("media type = %q, want multipart/form-data", mediaType) + } + if params["boundary"] == "" { + t.Errorf("Content-Type missing boundary param: %q", ct) + } + _ = r.ParseMultipartForm(1 << 20) + writeImportEnvelope(t, w, ImportResult{}) + })) + defer srv.Close() + + c := freshClient(t, srv, "metrics") + filePath := writeTempFile(t, "x.csv", "a\n1\n") + if _, err := c.ImportCSV(context.Background(), filePath, "metrics", "y", CSVImportOptions{}); err != nil { + t.Fatalf("ImportCSV: %v", err) + } +} diff --git a/internal/commands/import.go b/internal/commands/import.go new file mode 100644 index 0000000..d4a1a22 --- /dev/null +++ b/internal/commands/import.go @@ -0,0 +1,437 @@ +// import subcommand: bulk-load files into an Arc cluster. +// +// All four formats use POST /api/v1/import/{csv,parquet,lp,tle} on the +// server, all are multipart uploads with field name "file", and all +// require an admin token (the server's adminAuth middleware). The body +// is streamed via io.Pipe — `arcctl import csv -f huge.csv` does NOT +// buffer the whole file in memory. +// +// CSV and Parquet require --measurement (file has no notion of one). +// LP measurements are self-declared in the line syntax (--measurement +// is an optional server-side filter). TLE defaults to "satellite_tle" +// when --measurement is omitted. +package commands + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/spf13/cobra" + + "github.com/basekick-labs/arcctl/internal/client" + "github.com/basekick-labs/arcctl/internal/output" +) + +func newImportCmd() *cobra.Command { + c := &cobra.Command{ + Use: "import", + Short: "Bulk-load files into an Arc cluster", + Long: `Bulk-load files into an Arc cluster. + +Four formats, all admin-only on the server side (POST /api/v1/import/*). +Body is streamed end-to-end — even multi-hundred-MB files don't buffer +in memory. + + csv — text CSV with explicit time-column + parser options + lp — InfluxDB-style line protocol (auto-detects gzip on the wire) + parquet — Apache Parquet (preserves types; fastest path) + tle — Two-line element satellite tracking data + +CSV and Parquet require --measurement (the file doesn't carry one). +LP lines self-declare measurements (so --measurement is an optional +filter). TLE defaults to "satellite_tle".`, + } + c.AddCommand( + newImportCSVCmd(), + newImportLPCmd(), + newImportParquetCmd(), + newImportTLECmd(), + ) + return c +} + +// importCommonFlags is the set every import subcommand needs: +// connection, database, file path. Factored so adding PR5+ subcommands +// stays one-line. +type importCommonFlags struct { + connectionName string + endpoint string + token string + insecure bool + database string + filePath string + outputFormat string + timeout time.Duration +} + +// addImportCommonFlags registers the flags shared by every import +// subcommand. The format-specific subcommands add their own on top. +// +// Marks --file as required via cobra's built-in mechanism. Cobra +// enforces it before RunE runs and produces a uniform error message +// ("required flag(s) \"file\" not set"), so individual subcommands +// don't need manual `if filePath == ""` guards. (Gemini PR #3 r5.) +func addImportCommonFlags(c *cobra.Command, f *importCommonFlags) { + addCommonConnectionFlags(c, &f.connectionName, &f.endpoint, &f.token, &f.insecure) + c.Flags().StringVar(&f.database, "database", "", "target database (required; can also come from active connection's default_database)") + c.Flags().StringVarP(&f.filePath, "file", "f", "", "path to the input file (required)") + c.Flags().StringVarP(&f.outputFormat, "output", "o", output.FormatTable, "output format: table|json") + addTimeoutFlag(c, &f.timeout) + _ = c.MarkFlagRequired("file") +} + +// resolveImportDatabase picks --database if set, else the active +// connection's default_database. Empty → error before any network call. +// Mirrors the precedence used by `measurement list`. +func resolveImportDatabase(flag string, cli *client.Client) (string, error) { + db := flag + if db == "" { + db = cli.DefaultDatabase() + } + if db == "" { + return "", fmt.Errorf("no database specified (pass --database or set default_database on the active connection)") + } + return db, nil +} + +// validImportOutputFormat reports whether the format is one of the two +// supported by import commands (table, json — no csv since the result +// IS the import outcome, not tabular data; no arrow since these +// endpoints don't stream). +func validImportOutputFormat(s string) bool { + switch s { + case output.FormatTable, output.FormatJSON: + return true + } + return false +} + +// ---- csv ------------------------------------------------------------------ + +func newImportCSVCmd() *cobra.Command { + var ( + common importCommonFlags + measurement string + timeColumn string + timeFormat string + delimiter string + skipRows int + ) + c := &cobra.Command{ + Use: "csv", + Short: "Import a CSV file into a measurement", + Example: ` arcctl import csv -f data.csv --database metrics --measurement cpu + arcctl import csv -f data.csv --database metrics --measurement cpu \ + --time-column ts --time-format epoch_ms --delimiter ';' --skip-rows 1`, + RunE: func(cmd *cobra.Command, args []string) error { + if common.timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", common.timeout) + } + if !validImportOutputFormat(common.outputFormat) { + return fmt.Errorf("invalid --output %q (valid: table, json)", common.outputFormat) + } + // --file and --measurement are enforced by cobra's + // MarkFlagRequired below — no manual check needed here. + if skipRows < 0 { + return fmt.Errorf("--skip-rows must be >= 0 (got %d)", skipRows) + } + + cli, _, err := buildClient(cmd.ErrOrStderr(), common.connectionName, common.endpoint, common.token, common.insecure, common.timeout) + if err != nil { + return err + } + db, err := resolveImportDatabase(common.database, cli) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(cmd.Context(), common.timeout) + defer cancel() + + result, err := cli.ImportCSV(ctx, common.filePath, db, measurement, client.CSVImportOptions{ + TimeColumn: timeColumn, + TimeFormat: timeFormat, + Delimiter: delimiter, + SkipRows: skipRows, + }) + if err != nil { + return err + } + return renderImportResult(cmd, result, common.outputFormat) + }, + } + addImportCommonFlags(c, &common) + c.Flags().StringVar(&measurement, "measurement", "", "target measurement name (required)") + _ = c.MarkFlagRequired("measurement") + c.Flags().StringVar(&timeColumn, "time-column", "", "column to use as the row timestamp (server default: time)") + c.Flags().StringVar(&timeFormat, "time-format", "", "epoch_s|epoch_ms|epoch_us|epoch_ns (empty = let DuckDB infer, works for ISO-8601 strings)") + c.Flags().StringVar(&delimiter, "delimiter", "", "field separator (server default: ,)") + c.Flags().IntVar(&skipRows, "skip-rows", 0, "number of header rows to skip before parsing") + return c +} + +// ---- lp ------------------------------------------------------------------- + +func newImportLPCmd() *cobra.Command { + var ( + common importCommonFlags + precision string + measurementFilter string + ) + c := &cobra.Command{ + Use: "lp", + Short: "Import a Line Protocol file into a database", + Long: `Import a Line Protocol file into a database (POST /api/v1/import/lp). + +LP lines carry their own measurement names, so --measurement here acts +as a server-side filter rather than a destination. The server auto- +detects gzip via magic bytes — pass either a .lp or a .lp.gz file. + +Server-side cap: 500 MB decompressed.`, + Example: ` arcctl import lp -f telegraf-snapshot.lp --database metrics + arcctl import lp -f data.lp.gz --database metrics --precision ms + arcctl import lp -f data.lp --database metrics --measurement cpu # filter to cpu`, + RunE: func(cmd *cobra.Command, args []string) error { + if common.timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", common.timeout) + } + if !validImportOutputFormat(common.outputFormat) { + return fmt.Errorf("invalid --output %q (valid: table, json)", common.outputFormat) + } + // --file is enforced by cobra's MarkFlagRequired in + // addImportCommonFlags. + if precision != "" && !client.ValidPrecision(precision) { + return fmt.Errorf("invalid --precision %q (must be one of ns, us, ms, s)", precision) + } + + cli, _, err := buildClient(cmd.ErrOrStderr(), common.connectionName, common.endpoint, common.token, common.insecure, common.timeout) + if err != nil { + return err + } + db, err := resolveImportDatabase(common.database, cli) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(cmd.Context(), common.timeout) + defer cancel() + + result, err := cli.ImportLP(ctx, common.filePath, db, client.LPImportOptions{ + Precision: client.Precision(precision), + MeasurementFilter: measurementFilter, + }) + if err != nil { + return err + } + return renderLPImportResult(cmd, result, common.outputFormat) + }, + } + addImportCommonFlags(c, &common) + c.Flags().StringVar(&precision, "precision", "", "timestamp precision: ns|us|ms|s (default: server-side default = ns)") + c.Flags().StringVar(&measurementFilter, "measurement", "", "filter to a single measurement (LP lines for other measurements are dropped)") + return c +} + +// ---- parquet -------------------------------------------------------------- + +func newImportParquetCmd() *cobra.Command { + var ( + common importCommonFlags + measurement string + timeColumn string + ) + c := &cobra.Command{ + Use: "parquet", + Short: "Import a Parquet file into a measurement", + Long: `Import a Parquet file into a measurement (POST /api/v1/import/parquet). + +Parquet preserves column types end-to-end — faster + lossless compared +to CSV for the same data.`, + Example: ` arcctl import parquet -f data.parquet --database metrics --measurement cpu + arcctl import parquet -f data.parquet --database metrics --measurement cpu --time-column ts`, + RunE: func(cmd *cobra.Command, args []string) error { + if common.timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", common.timeout) + } + if !validImportOutputFormat(common.outputFormat) { + return fmt.Errorf("invalid --output %q (valid: table, json)", common.outputFormat) + } + // --file and --measurement are enforced by cobra's + // MarkFlagRequired below. + + cli, _, err := buildClient(cmd.ErrOrStderr(), common.connectionName, common.endpoint, common.token, common.insecure, common.timeout) + if err != nil { + return err + } + db, err := resolveImportDatabase(common.database, cli) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(cmd.Context(), common.timeout) + defer cancel() + + result, err := cli.ImportParquet(ctx, common.filePath, db, measurement, client.ParquetImportOptions{ + TimeColumn: timeColumn, + }) + if err != nil { + return err + } + return renderImportResult(cmd, result, common.outputFormat) + }, + } + addImportCommonFlags(c, &common) + c.Flags().StringVar(&measurement, "measurement", "", "target measurement name (required)") + _ = c.MarkFlagRequired("measurement") + c.Flags().StringVar(&timeColumn, "time-column", "", "column to use as the row timestamp (server default: time)") + return c +} + +// ---- tle ------------------------------------------------------------------ + +func newImportTLECmd() *cobra.Command { + var ( + common importCommonFlags + measurement string + ) + c := &cobra.Command{ + Use: "tle", + Short: "Import a TLE (two-line element) satellite-tracking file", + Long: `Import a TLE (two-line element) satellite-tracking file (POST /api/v1/import/tle). + +TLE is the standard NORAD/NASA format for orbital state vectors. The +server parses each three-line record (name + line1 + line2) and ingests +one row per satellite into the target measurement. + +--measurement defaults to "satellite_tle" if omitted (matches the +server's default behavior).`, + Example: ` arcctl import tle -f starlink.tle --database satellites + arcctl import tle -f starlink.tle --database satellites --measurement starlink`, + RunE: func(cmd *cobra.Command, args []string) error { + if common.timeout <= 0 { + return fmt.Errorf("--timeout must be > 0 (got %s)", common.timeout) + } + if !validImportOutputFormat(common.outputFormat) { + return fmt.Errorf("invalid --output %q (valid: table, json)", common.outputFormat) + } + // --file is enforced by cobra's MarkFlagRequired in + // addImportCommonFlags. --measurement is optional for TLE + // (server defaults to "satellite_tle"), so no required mark. + + cli, _, err := buildClient(cmd.ErrOrStderr(), common.connectionName, common.endpoint, common.token, common.insecure, common.timeout) + if err != nil { + return err + } + db, err := resolveImportDatabase(common.database, cli) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(cmd.Context(), common.timeout) + defer cancel() + + result, err := cli.ImportTLE(ctx, common.filePath, db, client.TLEImportOptions{ + Measurement: measurement, + }) + if err != nil { + return err + } + return renderTLEImportResult(cmd, result, common.outputFormat) + }, + } + addImportCommonFlags(c, &common) + c.Flags().StringVar(&measurement, "measurement", "", "target measurement (server default: satellite_tle)") + return c +} + +// ---- render helpers -------------------------------------------------------- + +// renderImportResult writes a CSV/Parquet ImportResult in the chosen +// format. Two formats only: table (default) and json. CSV doesn't make +// sense here because the "result" is a single record describing the +// import outcome, not tabular data. +// +// JSON output normalizes nil slices to empty slices so `"columns": +// null` from the server (or omitted field) renders as `"columns": []` +// — same pattern as PR3 to keep consumers' assumptions stable. +func renderImportResult(cmd *cobra.Command, r *client.ImportResult, format string) error { + if format == output.FormatJSON { + // Shallow copy of the result is safe today because every field + // of ImportResult is a value type or slice; the reassignment + // `out.Columns = []string{}` points the COPY's slice header + // at a fresh slice without touching the caller's `r`. If + // ImportResult ever grows a pointer/map field this comment + // needs revisiting. + out := *r + if out.Columns == nil { + out.Columns = []string{} + } + enc := json.NewEncoder(cmd.OutOrStdout()) + enc.SetIndent("", " ") + return enc.Encode(&out) + } + w := cmd.OutOrStdout() + fmt.Fprintln(w, "OK") + fmt.Fprintf(w, " database: %s\n", r.Database) + fmt.Fprintf(w, " measurement: %s\n", r.Measurement) + fmt.Fprintf(w, " rows_imported: %d\n", r.RowsImported) + fmt.Fprintf(w, " partitions_created: %d\n", r.PartitionsCreated) + if r.TimeRangeMin != "" || r.TimeRangeMax != "" { + fmt.Fprintf(w, " time_range: [%s … %s]\n", r.TimeRangeMin, r.TimeRangeMax) + } + if len(r.Columns) > 0 { + fmt.Fprintf(w, " columns: %s\n", strings.Join(r.Columns, ", ")) + } + fmt.Fprintf(w, " duration_ms: %d\n", r.DurationMs) + return nil +} + +// renderLPImportResult writes an LP-specific import result. LP can ingest +// multiple measurements from one file, so the field is plural. +// +// JSON output normalizes nil Measurements to []string{} (PR3 pattern). +func renderLPImportResult(cmd *cobra.Command, r *client.LPImportResult, format string) error { + if format == output.FormatJSON { + // Shallow copy: see renderImportResult for the safety rationale. + out := *r + if out.Measurements == nil { + out.Measurements = []string{} + } + enc := json.NewEncoder(cmd.OutOrStdout()) + enc.SetIndent("", " ") + return enc.Encode(&out) + } + w := cmd.OutOrStdout() + fmt.Fprintln(w, "OK") + fmt.Fprintf(w, " database: %s\n", r.Database) + fmt.Fprintf(w, " measurements: %s\n", strings.Join(r.Measurements, ", ")) + fmt.Fprintf(w, " rows_imported: %d\n", r.RowsImported) + fmt.Fprintf(w, " precision: %s\n", r.Precision) + fmt.Fprintf(w, " duration_ms: %d\n", r.DurationMs) + return nil +} + +// renderTLEImportResult writes a TLE-specific import result. Includes +// the satellite count + any parser warnings (TLE files often have +// invalid checksums on individual satellites; the server keeps the +// good ones and reports the bad in ParseWarnings). +func renderTLEImportResult(cmd *cobra.Command, r *client.TLEImportResult, format string) error { + if format == output.FormatJSON { + enc := json.NewEncoder(cmd.OutOrStdout()) + enc.SetIndent("", " ") + return enc.Encode(r) + } + w := cmd.OutOrStdout() + fmt.Fprintln(w, "OK") + fmt.Fprintf(w, " database: %s\n", r.Database) + fmt.Fprintf(w, " measurement: %s\n", r.Measurement) + fmt.Fprintf(w, " satellite_count: %d\n", r.SatelliteCount) + fmt.Fprintf(w, " rows_imported: %d\n", r.RowsImported) + fmt.Fprintf(w, " duration_ms: %d\n", r.DurationMs) + if len(r.ParseWarnings) > 0 { + fmt.Fprintf(w, " parse_warnings (%d):\n", len(r.ParseWarnings)) + for _, line := range r.ParseWarnings { + fmt.Fprintf(w, " - %s\n", line) + } + } + return nil +} diff --git a/internal/commands/import_test.go b/internal/commands/import_test.go new file mode 100644 index 0000000..2c419ee --- /dev/null +++ b/internal/commands/import_test.go @@ -0,0 +1,256 @@ +package commands + +import ( + "bytes" + "strings" + "testing" + + "github.com/spf13/cobra" + + "github.com/basekick-labs/arcctl/internal/client" +) + +func newImportTestCmd() *cobra.Command { + return &cobra.Command{Use: "test"} +} + +// TestImportCSV_MarkFlagRequired_File pins the cobra MarkFlagRequired +// behavior for --file (added in Gemini PR #3 r5). Without the flag, +// cobra should error before RunE with the canonical +// `required flag(s) "file" not set` message. +func TestImportCSV_MarkFlagRequired_File(t *testing.T) { + cmd := newImportCSVCmd() + cmd.SetOut(&bytes.Buffer{}) + cmd.SetErr(&bytes.Buffer{}) + // Pass --measurement so the only missing required flag is --file. + cmd.SetArgs([]string{"--measurement", "m", "--endpoint", "http://127.0.0.1:1", "--token", "t"}) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), `required flag(s) "file" not set`) { + t.Errorf("error %q does not mention required --file flag", err) + } +} + +// TestImportCSV_MarkFlagRequired_Measurement is the same check for +// --measurement, which is required for CSV (the file doesn't carry a +// measurement name). +func TestImportCSV_MarkFlagRequired_Measurement(t *testing.T) { + cmd := newImportCSVCmd() + cmd.SetOut(&bytes.Buffer{}) + cmd.SetErr(&bytes.Buffer{}) + cmd.SetArgs([]string{"--file", "/tmp/whatever.csv", "--endpoint", "http://127.0.0.1:1", "--token", "t"}) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), `required flag(s) "measurement" not set`) { + t.Errorf("error %q does not mention required --measurement flag", err) + } +} + +// TestImportLP_MarkFlagRequired_File — LP does NOT require +// --measurement (measurement is self-declared in line syntax), so only +// --file should be marked required. +func TestImportLP_MarkFlagRequired_File(t *testing.T) { + cmd := newImportLPCmd() + cmd.SetOut(&bytes.Buffer{}) + cmd.SetErr(&bytes.Buffer{}) + cmd.SetArgs([]string{"--endpoint", "http://127.0.0.1:1", "--token", "t"}) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), `required flag(s) "file" not set`) { + t.Errorf("error %q does not mention required --file flag", err) + } + // Confirm --measurement is NOT marked required for LP. + if strings.Contains(err.Error(), "measurement") { + t.Errorf("error %q unexpectedly mentions --measurement (should be optional for LP)", err) + } +} + +// TestImportCSV_RejectsNegativeSkipRows pins the client-side validation +// Gemini flagged in arcctl PR #3: a negative --skip-rows used to pass +// silently into the client (which drops it via `> 0`), so the user got +// no error AND no skip. The RunE-level guard now fails fast. +// +// Driven through the cobra command rather than calling RunE directly so +// the test covers the wired flag path end-to-end. +func TestImportCSV_RejectsNegativeSkipRows(t *testing.T) { + cmd := newImportCSVCmd() + cmd.SetOut(&bytes.Buffer{}) + cmd.SetErr(&bytes.Buffer{}) + cmd.SetArgs([]string{ + "--file", "/tmp/whatever", + "--measurement", "m", + "--endpoint", "http://127.0.0.1:1", + "--token", "t", + "--skip-rows", "-5", + }) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "--skip-rows must be >= 0") { + t.Errorf("error %q does not mention --skip-rows", err) + } +} + +func TestValidImportOutputFormat(t *testing.T) { + for _, f := range []string{"table", "json"} { + if !validImportOutputFormat(f) { + t.Errorf("validImportOutputFormat(%q) = false", f) + } + } + for _, f := range []string{"csv", "arrow", "", "YAML"} { + if validImportOutputFormat(f) { + t.Errorf("validImportOutputFormat(%q) = true (should be false — import endpoints return one-shot results, not tabular data)", f) + } + } +} + +func TestRenderImportResult_TableHappyPath(t *testing.T) { + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.ImportResult{ + Database: "metrics", Measurement: "cpu", + RowsImported: 42, PartitionsCreated: 1, + TimeRangeMin: "2026-01-01", TimeRangeMax: "2026-01-02", + Columns: []string{"time", "host", "value"}, DurationMs: 17, + } + if err := renderImportResult(cmd, r, "table"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + for _, want := range []string{"OK", "metrics", "cpu", "42", "time, host, value", "17"} { + if !strings.Contains(out, want) { + t.Errorf("missing %q in:\n%s", want, out) + } + } +} + +func TestRenderImportResult_TableNoTimeRangeNoColumns(t *testing.T) { + // Server-side error path: import succeeded but server only filled + // in the basics. The render must not print empty bracketed time + // ranges or empty "columns: " lines. + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.ImportResult{Database: "x", Measurement: "y", RowsImported: 1} + if err := renderImportResult(cmd, r, "table"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + if strings.Contains(out, "time_range:") { + t.Errorf("empty time-range should not render: %s", out) + } + if strings.Contains(out, "columns:") { + t.Errorf("empty columns should not render: %s", out) + } +} + +// Regression for the PR3-style nil-slice JSON encoding issue. +// `ImportResult.Columns` is a []string without `omitempty`; if the +// server returns null (or arcctl decodes the field as nil for any +// reason), JSON output must STILL emit `"columns": []` so downstream +// consumers don't see `null`. +func TestRenderImportResult_JSONEmptyColumns_IsArrayNotNull(t *testing.T) { + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.ImportResult{ + Database: "x", Measurement: "y", RowsImported: 0, + Columns: nil, // server returned no columns + } + if err := renderImportResult(cmd, r, "json"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + if strings.Contains(out, "null") { + t.Errorf("JSON output contains `null`. Got:\n%s", out) + } + if !strings.Contains(out, `"columns": []`) { + t.Errorf("expected `\"columns\": []`, got:\n%s", out) + } +} + +func TestRenderLPImportResult_TableHappyPath(t *testing.T) { + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.LPImportResult{ + Database: "metrics", Measurements: []string{"cpu", "mem"}, + RowsImported: 100, Precision: "ms", DurationMs: 8, + } + if err := renderLPImportResult(cmd, r, "table"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + for _, want := range []string{"OK", "metrics", "cpu, mem", "100", "ms"} { + if !strings.Contains(out, want) { + t.Errorf("missing %q in:\n%s", want, out) + } + } +} + +func TestRenderLPImportResult_JSONEmptyMeasurements_IsArrayNotNull(t *testing.T) { + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.LPImportResult{ + Database: "x", Measurements: nil, RowsImported: 0, + } + if err := renderLPImportResult(cmd, r, "json"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + if strings.Contains(out, "null") { + t.Errorf("JSON output contains `null`. Got:\n%s", out) + } + if !strings.Contains(out, `"measurements": []`) { + t.Errorf("expected `\"measurements\": []`, got:\n%s", out) + } +} + +func TestRenderTLEImportResult_WithWarnings(t *testing.T) { + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.TLEImportResult{ + Database: "sats", Measurement: "satellite_tle", + SatelliteCount: 2, RowsImported: 2, DurationMs: 5, + ParseWarnings: []string{"entry 1 line 1 checksum mismatch"}, + } + if err := renderTLEImportResult(cmd, r, "table"); err != nil { + t.Fatalf("render: %v", err) + } + out := buf.String() + if !strings.Contains(out, "parse_warnings (1):") { + t.Errorf("expected parse_warnings header, got:\n%s", out) + } + if !strings.Contains(out, "entry 1 line 1 checksum mismatch") { + t.Errorf("expected warning line, got:\n%s", out) + } +} + +func TestRenderTLEImportResult_NoWarningsBlock(t *testing.T) { + // TLE's ParseWarnings uses omitempty on the server, so a nil + // slice means "no warnings" — table output must NOT print an + // empty warnings block. + var buf bytes.Buffer + cmd := newImportTestCmd() + cmd.SetOut(&buf) + r := &client.TLEImportResult{ + Database: "sats", Measurement: "satellite_tle", + SatelliteCount: 5, RowsImported: 5, + } + if err := renderTLEImportResult(cmd, r, "table"); err != nil { + t.Fatalf("render: %v", err) + } + if strings.Contains(buf.String(), "parse_warnings") { + t.Errorf("empty warnings should not render: %s", buf.String()) + } +} diff --git a/internal/commands/root.go b/internal/commands/root.go index d72fff2..fc1f2fd 100644 --- a/internal/commands/root.go +++ b/internal/commands/root.go @@ -1,8 +1,8 @@ // Package commands wires the arcctl cobra command tree. // // Each top-level command lives in its own file. PR1 shipped `root` + -// `config`; PR2 added `query` + `write`; PR3 added `db` + `measurement`. -// Later PRs add: import, auth, cluster, ops. +// `config`; PR2 added `query` + `write`; PR3 added `db` + `measurement`; +// PR4 added `import`. Later PRs add: auth, cluster, ops. package commands import "github.com/spf13/cobra" @@ -35,6 +35,7 @@ First-time setup: newWriteCmd(), newDBCmd(), newMeasurementCmd(), + newImportCmd(), ) return root }