diff --git a/.env.example b/.env.example index da088470d2..5a4f545a7e 100644 --- a/.env.example +++ b/.env.example @@ -60,12 +60,10 @@ # AGENT_HTTP_URL= # AGENT_GRPC_URL= -## Runner task broker +## Runner fleets ## -## Required together when Runner components should use an external broker. -# TASK_BROKER_BASE_URL= -# TASK_BROKER_FLEET_ID= -# TASK_BROKER_AUTH_TOKEN= +## Register a fleet via POST /admin/api/runner/fleets, then set fleet_id on Runner nodes. +## Configure fleet-manager with SUPERPLANE_URL and the fleet auth_token from registration. ## Usage service # USAGE_GRPC_URL= diff --git a/db/migrations/20260518120000_add-runner-fleets.down.sql b/db/migrations/20260518120000_add-runner-fleets.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/db/migrations/20260518120000_add-runner-fleets.up.sql b/db/migrations/20260518120000_add-runner-fleets.up.sql new file mode 100644 index 0000000000..5a7819d10d --- /dev/null +++ b/db/migrations/20260518120000_add-runner-fleets.up.sql @@ -0,0 +1,33 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS runner_fleets ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name VARCHAR(255) NOT NULL, + auth_token TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE UNIQUE INDEX runner_fleets_name_idx ON runner_fleets (name); + +CREATE TABLE IF NOT EXISTS runner_tasks ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + fleet_id UUID NOT NULL REFERENCES runner_fleets(id) ON DELETE CASCADE, + fleet_task_id TEXT NOT NULL, + execution_id UUID NOT NULL REFERENCES workflow_node_executions(id) ON DELETE CASCADE, + status VARCHAR(32) NOT NULL DEFAULT 'queued', + spec JSONB NOT NULL DEFAULT '{}'::jsonb, + exit_code INTEGER, + output TEXT NOT NULL DEFAULT '', + error TEXT NOT NULL DEFAULT '', + result JSONB, + task_log JSONB, + dispatched_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE UNIQUE INDEX runner_tasks_fleet_task_idx ON runner_tasks (fleet_id, fleet_task_id); +CREATE INDEX runner_tasks_execution_id_idx ON runner_tasks (execution_id); +CREATE INDEX runner_tasks_fleet_status_created_idx ON runner_tasks (fleet_id, status, created_at); + +COMMIT; diff --git a/db/structure.sql b/db/structure.sql index 91dab8c781..324996b5df 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -5,7 +5,7 @@ \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -432,6 +432,42 @@ CREATE TABLE public.role_metadata ( ); +-- +-- Name: runner_fleets; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.runner_fleets ( + id uuid DEFAULT public.uuid_generate_v4() NOT NULL, + name character varying(255) NOT NULL, + fleet_url text, + auth_token text DEFAULT ''::text NOT NULL, + created_at timestamp with time zone DEFAULT now() NOT NULL, + mode character varying(32) DEFAULT 'bridge'::character varying NOT NULL +); + + +-- +-- Name: runner_tasks; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.runner_tasks ( + id uuid DEFAULT public.uuid_generate_v4() NOT NULL, + fleet_id uuid NOT NULL, + fleet_task_id text NOT NULL, + execution_id uuid NOT NULL, + created_at timestamp with time zone DEFAULT now() NOT NULL, + status character varying(32) DEFAULT 'queued'::character varying NOT NULL, + spec jsonb DEFAULT '{}'::jsonb NOT NULL, + exit_code integer, + output text DEFAULT ''::text NOT NULL, + error text DEFAULT ''::text NOT NULL, + result jsonb, + task_log jsonb, + dispatched_at timestamp with time zone, + completed_at timestamp with time zone +); + + -- -- Name: schema_migrations; Type: TABLE; Schema: public; Owner: - -- @@ -988,6 +1024,22 @@ ALTER TABLE ONLY public.role_metadata ADD CONSTRAINT role_metadata_pkey PRIMARY KEY (id); +-- +-- Name: runner_fleets runner_fleets_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.runner_fleets + ADD CONSTRAINT runner_fleets_pkey PRIMARY KEY (id); + + +-- +-- Name: runner_tasks runner_tasks_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.runner_tasks + ADD CONSTRAINT runner_tasks_pkey PRIMARY KEY (id); + + -- -- Name: schema_migrations schema_migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1638,6 +1690,34 @@ CREATE INDEX idx_workflows_live_version_id ON public.workflows USING btree (live CREATE INDEX idx_workflows_organization_id ON public.workflows USING btree (organization_id); +-- +-- Name: runner_fleets_name_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX runner_fleets_name_idx ON public.runner_fleets USING btree (name); + + +-- +-- Name: runner_tasks_execution_id_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX runner_tasks_execution_id_idx ON public.runner_tasks USING btree (execution_id); + + +-- +-- Name: runner_tasks_fleet_status_created_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX runner_tasks_fleet_status_created_idx ON public.runner_tasks USING btree (fleet_id, status, created_at); + + +-- +-- Name: runner_tasks_fleet_task_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX runner_tasks_fleet_task_idx ON public.runner_tasks USING btree (fleet_id, fleet_task_id); + + -- -- Name: unique_human_user_in_organization; Type: INDEX; Schema: public; Owner: - -- @@ -1828,6 +1908,22 @@ ALTER TABLE ONLY public.organization_invite_links ADD CONSTRAINT organization_invite_links_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organizations(id) ON DELETE CASCADE; +-- +-- Name: runner_tasks runner_tasks_execution_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.runner_tasks + ADD CONSTRAINT runner_tasks_execution_id_fkey FOREIGN KEY (execution_id) REFERENCES public.workflow_node_executions(id) ON DELETE CASCADE; + + +-- +-- Name: runner_tasks runner_tasks_fleet_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.runner_tasks + ADD CONSTRAINT runner_tasks_fleet_id_fkey FOREIGN KEY (fleet_id) REFERENCES public.runner_fleets(id) ON DELETE CASCADE; + + -- -- Name: users users_account_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -2129,7 +2225,7 @@ ALTER TABLE ONLY public.workflows \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2148,7 +2244,7 @@ SET row_security = off; -- COPY public.schema_migrations (version, dirty) FROM stdin; -20260515120000 f +20260518120000 f \. @@ -2165,7 +2261,7 @@ COPY public.schema_migrations (version, dirty) FROM stdin; \restrict abcdef123 -- Dumped from database version 17.5 (Debian 17.5-1.pgdg130+1) --- Dumped by pg_dump version 17.10 (Ubuntu 17.10-1.pgdg22.04+1) +-- Dumped by pg_dump version 17.9 (Ubuntu 17.9-1.pgdg22.04+1) SET statement_timeout = 0; SET lock_timeout = 0; diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index f6b61d7b10..3ad33faf32 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -77,11 +77,6 @@ services: # Runner + node webhooks: URLs sent to externals use this origin (must reach this process on PUBLIC_API_PORT). # Tunnel example: ngrok http 8000 then set WEBHOOKS_BASE_URL=https://.ngrok-free.app (no trailing slash). WEBHOOKS_BASE_URL: ${WEBHOOKS_BASE_URL:-http://localhost:8000} - # Task broker /v1 (Runner component). Base URL has no trailing slash. - TASK_BROKER_BASE_URL: ${TASK_BROKER_BASE_URL:-} - TASK_BROKER_FLEET_ID: ${TASK_BROKER_FLEET_ID:-} - # Bearer; must match broker AUTH_TOKEN when auth is enabled - TASK_BROKER_AUTH_TOKEN: ${TASK_BROKER_AUTH_TOKEN:-} ALLOWED_WS_ORIGINS: ${ALLOWED_WS_ORIGINS:-http://localhost:8000} AGENT_ENABLED: ${AGENT_ENABLED:-no} AGENT_HTTP_URL: ${AGENT_HTTP_URL:-} diff --git a/docs/components/Core.mdx b/docs/components/Core.mdx index 227235e5ae..0738477649 100644 --- a/docs/components/Core.mdx +++ b/docs/components/Core.mdx @@ -730,7 +730,7 @@ Runs shell commands on a fleet runner. - **Execution mode**: Host (default) or Docker. - **Container base image**: Choose a common public image, or **Other (custom image)** to enter any OCI reference. - **Custom container image**: Shown only for **Other**; use a normal reference (`my.registry.example.com/org/repo:1.2.3` or `debian:bookworm-slim@sha256:…`). Private registries require the runner to be configured with registry credentials. -- **Execution timeout**: Optional wall-clock limit in seconds (1–86400). Leave at **0** to use the broker default. +- **Execution timeout**: Optional wall-clock limit in seconds (1–86400). Leave at **0** to use the fleet default. - **Commands**: One or more shell commands, one per line. - **Environment variables**: Optional key/value pairs available during command execution. Values can be literal strings (with expression support) or organization secret keys. @@ -739,7 +739,7 @@ Runs shell commands on a fleet runner. - **Failed**: The commands finished with non-zero exit code. ### Structured result -If the completed broker task includes valid JSON in **result**, SuperPlane includes it on the `runner.finished` event payload next to **status** and **exit_code** (the exact shape depends on your runner / task implementation). +If the completed fleet task includes valid JSON in **result**, SuperPlane includes it on the `runner.finished` event payload next to **status** and **exit_code** (the exact shape depends on your runner / task implementation). ### Example Output diff --git a/go.mod b/go.mod index 4a34433faa..f07bf4245b 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,19 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 // indirect + github.com/aws/aws-sdk-go-v2/config v1.31.5 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.9 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.29.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.1 // indirect github.com/aws/smithy-go v1.24.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index cc215d8450..14b6837e29 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,32 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 h1:i8p8P4diljCr60PpJp6qZXNlgX4m2yQFpYk+9ZT+J4E= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1/go.mod h1:ddqbooRZYNoJ2dsTwOty16rM+/Aqmk/GOXrK8cg7V00= +github.com/aws/aws-sdk-go-v2/config v1.31.5 h1:wsZr2kq1XeKU/D2QDcW5xEB1zHPdHAuQqnR0yaygAQQ= +github.com/aws/aws-sdk-go-v2/config v1.31.5/go.mod h1:IpXejRuSIyOSCyT4BomfIJ5gWRcDoX/NJaAHh9Cp8jE= +github.com/aws/aws-sdk-go-v2/credentials v1.18.9 h1:zKrnPtmO7j2FpMqudayjCzNxyO8KtPQGCIzqEosKQbg= +github.com/aws/aws-sdk-go-v2/credentials v1.18.9/go.mod h1:gAotjkj0roLrwvBxECN1Q8ILfkVsw3Ntph6FP1LnZ8Q= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.5 h1:ul7hICbZ5Z/Pp9VnLVGUVe7rqYLXCyIiPU7hQ0sRkow= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.5/go.mod h1:5cIWJ0N6Gjj+72Q6l46DeaNtcxXHV42w/Uq3fIfeUl4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7 h1:UCxq0X9O3xrlENdKf1r9eRJoKz/b0AfGkpp3a7FPlhg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7/go.mod h1:rHRoJUNUASj5Z/0eqI4w32vKvC7atoWR0jC+IkmVH8k= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7 h1:Y6DTZUn7ZUC4th9FMBbo8LVE+1fyq3ofw+tRwkUd3PY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7/go.mod h1:x3XE6vMnU9QvHN/Wrx2s44kwzV2o2g5x/siw4ZUJ9g8= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.0 h1:XH0kj0KcoKd+BAadpiS83/Wf+25q4FmH3gDei4u+PzA= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.0/go.mod h1:ptJgRWK9opQK1foOTBKUg3PokkKA0/xcTXWIxwliaIY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 h1:oegbebPEMA/1Jny7kvwejowCaHz1FWZAQ94WXFNCyTM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1/go.mod h1:kemo5Myr9ac0U9JfSjMo9yHLtw+pECEHsFtJ9tqCEI8= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.5 h1:Cx1M/UUgYu9UCQnIMKaOhkVaFvLy1HneD6T4sS/DlKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.5/go.mod h1:fTRNLgrTvPpEzGqc9QkeO4hu/3ng+mdtUbL8shUwXz4= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.0 h1:H4QPAHLE1bHSQrZV6Hz+CPpJG+Mtf+rkl6NFb/Y7sv8= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.0/go.mod h1:BnyjuIX0l+KXJVl2o9Ki3Zf0M4pA2hQYopFCRUj9ADU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.1 h1:8yI3jK5JZ310S8RpgdZdzwvlvBu3QbG8DP7Be/xJ6yo= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.1/go.mod h1:HPzXfFgrLd02lYpcFYdDz5xZs94LOb+lWlvbAGaeMsk= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.1 h1:3kWmIg5iiWPMBJyq/I55Fki5fyfoMtrn/SkUIpxPwHQ= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.1/go.mod h1:yi0b3Qez6YamRVJ+Rbi19IgvjfjPODgVRhkWA6RTMUM= github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= diff --git a/pkg/components/runner/broker.go b/pkg/components/runner/broker.go deleted file mode 100644 index 9de0f49c41..0000000000 --- a/pkg/components/runner/broker.go +++ /dev/null @@ -1,293 +0,0 @@ -package runner - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "strings" - "time" - - "github.com/superplanehq/superplane/pkg/core" -) - -const ( - brokerHTTPTimeout = 30 * time.Second - - // Task-broker may return 409 until fleet_task_id is linked (cancel right after create). - cancel409MaxAttempts = 3 - cancel409RetryBackoff = 200 * time.Millisecond -) - -type BrokerClient struct { - httpClient core.HTTPContext - baseURL string - fleetID string - authToken string -} - -func NewBrokerClient(httpClient core.HTTPContext) (*BrokerClient, error) { - baseURL := os.Getenv("TASK_BROKER_BASE_URL") - if baseURL == "" { - return nil, fmt.Errorf("TASK_BROKER_BASE_URL is not set") - } - - fleetID := os.Getenv("TASK_BROKER_FLEET_ID") - if fleetID == "" { - return nil, fmt.Errorf("TASK_BROKER_FLEET_ID is not set") - } - - authToken := os.Getenv("TASK_BROKER_AUTH_TOKEN") - if authToken == "" { - return nil, fmt.Errorf("TASK_BROKER_AUTH_TOKEN is not set") - } - - return &BrokerClient{ - httpClient: httpClient, - baseURL: baseURL, - fleetID: fleetID, - authToken: authToken, - }, nil -} - -// Create Task -// -// POST /v1/tasks -// -// Example request: -// { -// "fleet_id": "aws-standard-1", -// "commands": ["echo \"Hello, World!\""], -// "environment": [{"name": "APP_ENV", "value": "production"}], -// "webhook_url": "https://example.com/webhook", -// "execution_mode": "host", -// "docker_image": "debian:bookworm-slim", -// "execution_timeout_seconds": 600 -// } -// -// Example response: -// { -// "id": "1234567890" -// } - -type brokerCreateTaskRequest struct { - FleetID string `json:"fleet_id"` - - Commands []string `json:"commands"` - Environment []BrokerEnvironmentVariable `json:"environment,omitempty"` - WebhookURL string `json:"webhook_url"` - ExecutionMode string `json:"execution_mode,omitempty"` - DockerImage string `json:"docker_image,omitempty"` - ExecutionTimeoutSeconds *int `json:"execution_timeout_seconds,omitempty"` -} - -// BrokerEnvironmentVariable is forwarded to the task broker as JSON. -type BrokerEnvironmentVariable struct { - Name string `json:"name"` - Value string `json:"value"` -} - -// CreateTaskParams is forwarded to the task broker POST /v1/tasks. -type CreateTaskParams struct { - Commands []string - WebhookURL string - Environment []BrokerEnvironmentVariable - ExecutionMode string - DockerImage string - TimeoutSeconds int // 0 = omit (broker / fleet default) -} - -type brokerCreateTaskResponse struct { - ID string `json:"id"` -} - -func (b *BrokerClient) CreateTask(p CreateTaskParams) (string, error) { - mode := strings.ToLower(strings.TrimSpace(p.ExecutionMode)) - if mode == "" { - mode = ExecutionModeHost - } - - req := brokerCreateTaskRequest{ - FleetID: b.fleetID, - Commands: p.Commands, - Environment: p.Environment, - WebhookURL: p.WebhookURL, - ExecutionMode: mode, - DockerImage: strings.TrimSpace(p.DockerImage), - } - if p.TimeoutSeconds > 0 { - t := p.TimeoutSeconds - req.ExecutionTimeoutSeconds = &t - } - - bodyBytes, err := json.Marshal(req) - if err != nil { - return "", fmt.Errorf("marshal request: %w", err) - } - - httpCtx, cancel := context.WithTimeout(context.Background(), brokerHTTPTimeout) - defer cancel() - - httpReq, err := http.NewRequestWithContext(httpCtx, http.MethodPost, b.baseURL+"/v1/tasks", bytes.NewReader(bodyBytes)) - if err != nil { - return "", fmt.Errorf("new request: %w", err) - } - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("Authorization", "Bearer "+b.authToken) - - resp, err := b.httpClient.Do(httpReq) - if err != nil { - return "", fmt.Errorf("broker request: %w", err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("read response body: %w", err) - } - - if resp.StatusCode != http.StatusCreated { - return "", fmt.Errorf( - "broker rejected task: status=%d body=%s", - resp.StatusCode, - strings.TrimSpace(string(body)), - ) - } - - var out brokerCreateTaskResponse - if err := json.Unmarshal(body, &out); err != nil { - return "", fmt.Errorf("unmarshal create task response: %w", err) - } - - return out.ID, nil -} - -// Task is the broker task payload (GET /v1/tasks/:id and webhook body). -type Task struct { - TaskID string `json:"task_id"` - Status string `json:"status"` - ExitCode *int `json:"exit_code,omitempty"` - Output string `json:"output,omitempty"` - Error string `json:"error,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - - TaskLog *TaskLogSink `json:"task_log,omitempty"` - - CloudWatchLogGroup string `json:"cloudwatch_log_group,omitempty"` - CloudWatchLogStream string `json:"cloudwatch_log_stream,omitempty"` -} - -func (t *Task) effectiveExitCode() int { - if t == nil || t.ExitCode == nil { - return 0 - } - return *t.ExitCode -} - -func (t *Task) IsInTerminalState() bool { - return t.Status == "succeeded" || t.Status == "failed" || t.Status == "canceled" -} - -func (b *BrokerClient) CancelTask(brokerTaskID string) error { - brokerTaskID = strings.TrimSpace(brokerTaskID) - if brokerTaskID == "" { - return fmt.Errorf("broker task id is empty") - } - - cancelPath := b.baseURL + "/v1/tasks/" + url.PathEscape(brokerTaskID) + "/cancel" - - var lastErr error - for attempt := range cancel409MaxAttempts { - if attempt > 0 { - time.Sleep(cancel409RetryBackoff) - } - - httpCtx, cancel := context.WithTimeout(context.Background(), brokerHTTPTimeout) - httpReq, err := http.NewRequestWithContext(httpCtx, http.MethodPost, cancelPath, http.NoBody) - if err != nil { - cancel() - return fmt.Errorf("new request: %w", err) - } - httpReq.Header.Set("Authorization", "Bearer "+b.authToken) - - resp, err := b.httpClient.Do(httpReq) - if err != nil { - cancel() - return fmt.Errorf("broker request: %w", err) - } - - body, err := io.ReadAll(resp.Body) - if closeErr := resp.Body.Close(); closeErr != nil && err == nil { - err = closeErr - } - cancel() - if err != nil { - return fmt.Errorf("read response body: %w", err) - } - - switch resp.StatusCode { - case http.StatusOK, http.StatusNotFound: - return nil - case http.StatusConflict: - lastErr = fmt.Errorf( - "broker rejected cancel: status=%d body=%s", - resp.StatusCode, - strings.TrimSpace(string(body)), - ) - default: - return fmt.Errorf( - "broker rejected cancel: status=%d body=%s", - resp.StatusCode, - strings.TrimSpace(string(body)), - ) - } - } - - return fmt.Errorf("broker cancel: exceeded retries: %w", lastErr) -} - -func (b *BrokerClient) FetchTaskStatus(taskID string) (*Task, error) { - httpCtx, cancel := context.WithTimeout(context.Background(), brokerHTTPTimeout) - defer cancel() - - httpReq, err := http.NewRequestWithContext(httpCtx, http.MethodGet, b.baseURL+"/v1/tasks/"+url.PathEscape(taskID), nil) - if err != nil { - return nil, fmt.Errorf("new request: %w", err) - } - httpReq.Header.Set("Authorization", "Bearer "+b.authToken) - - resp, err := b.httpClient.Do(httpReq) - if err != nil { - return nil, fmt.Errorf("broker request: %w", err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("read response body: %w", err) - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("broker rejected task: status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body))) - } - - var out Task - if err := json.Unmarshal(body, &out); err != nil { - return nil, fmt.Errorf("unmarshal task response: %w", err) - } - - return &out, nil -} - -func (b *BrokerClient) ProcessWebhook(body []byte) (*Task, error) { - var out Task - if err := json.Unmarshal(body, &out); err != nil { - return nil, fmt.Errorf("unmarshal webhook response: %w", err) - } - - return &out, nil -} diff --git a/pkg/components/runner/commands.go b/pkg/components/runner/commands.go index 98fcf273de..b3eb3e274e 100644 --- a/pkg/components/runner/commands.go +++ b/pkg/components/runner/commands.go @@ -14,6 +14,12 @@ const ( EnvironmentValueSourceSecret = "secret" ) +// BrokerEnvironmentVariable is forwarded to fleet-manager as JSON. +type BrokerEnvironmentVariable struct { + Name string `json:"name"` + Value string `json:"value"` +} + var environmentVariableNameRegex = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) func normalizeCommands(commands string) []string { diff --git a/pkg/components/runner/metadata.go b/pkg/components/runner/metadata.go index 62598c299d..a42acd77f3 100644 --- a/pkg/components/runner/metadata.go +++ b/pkg/components/runner/metadata.go @@ -1,10 +1,10 @@ package runner import ( - "encoding/json" "strings" "github.com/superplanehq/superplane/pkg/core" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" ) const ( @@ -12,15 +12,8 @@ const ( ExecutionMetadataTaskLog = "runner_task_log" ) -// TaskLogSink matches the task-broker / fleet-manager JSON shape for CloudWatch-backed live logs. -type TaskLogSink struct { - Type string `json:"type"` - CloudWatch *struct { - LogGroupName string `json:"log_group_name"` - LogStreamName string `json:"log_stream_name"` - Region string `json:"region,omitempty"` - } `json:"cloudwatch,omitempty"` -} +// TaskLogSink matches the fleet-manager JSON shape for CloudWatch-backed live logs. +type TaskLogSink = runnermodels.FleetTaskLog func mergeExecutionMetadata(meta core.MetadataWriter, patch map[string]any) error { if meta == nil { @@ -66,7 +59,16 @@ func mergeRunnerTaskLog(meta core.MetadataWriter, brokerTaskID string, sink *Tas return mergeExecutionMetadata(meta, patch) } -func taskLogFromBrokerTask(t *Task) *TaskLogSink { +// FinishFleetTask merges task log metadata and emits the terminal runner event. +func FinishFleetTask(meta core.MetadataWriter, state core.ExecutionStateContext, fleetTask *runnermodels.FleetTask, brokerTaskID string) error { + sink := taskLogFromFleetTask(fleetTask) + if err := mergeRunnerTaskLog(meta, brokerTaskID, sink); err != nil { + return err + } + return (&Runner{}).processTaskStatus(state, fleetTask) +} + +func taskLogFromFleetTask(t *runnermodels.FleetTask) *TaskLogSink { if t == nil { return nil } @@ -90,29 +92,3 @@ func taskLogFromBrokerTask(t *Task) *TaskLogSink { }, } } - -func taskLogFromRawWebhook(raw map[string]any) *TaskLogSink { - if raw == nil { - return nil - } - if v, ok := raw["task_log"]; ok && v != nil { - b, err := json.Marshal(v) - if err != nil { - return nil - } - var sink TaskLogSink - if err := json.Unmarshal(b, &sink); err != nil { - return nil - } - if strings.TrimSpace(sink.Type) != "" { - return &sink - } - } - g, _ := raw["cloudwatch_log_group"].(string) - s, _ := raw["cloudwatch_log_stream"].(string) - t := &Task{ - CloudWatchLogGroup: g, - CloudWatchLogStream: s, - } - return taskLogFromBrokerTask(t) -} diff --git a/pkg/components/runner/run_commands.go b/pkg/components/runner/run_commands.go index 0f0dac5cc0..ed1f60c66b 100644 --- a/pkg/components/runner/run_commands.go +++ b/pkg/components/runner/run_commands.go @@ -3,32 +3,40 @@ package runner import ( "bytes" "encoding/json" - "errors" "fmt" "net/http" "strings" - "time" "github.com/google/uuid" "github.com/superplanehq/superplane/pkg/configuration" "github.com/superplanehq/superplane/pkg/core" "github.com/superplanehq/superplane/pkg/registry" + "github.com/superplanehq/superplane/pkg/runners" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" ) const ( PassedOutputChannel = "passed" FailedOutputChannel = "failed" RunnerFinishedEventType = "runner.finished" - - pollInterval = 2 * time.Minute - hookActionPoll = "poll" ) func init() { registry.RegisterAction("runner", &Runner{}) } -type Runner struct{} +// fleetStore is the subset of runners.Store used by the runner component. +// Defined here so tests can provide a lightweight mock without importing pkg/runners. +type fleetStore interface { + FindFleet(id uuid.UUID) (*runnermodels.RunnerFleet, error) + EnqueueJob(fleetID, executionID uuid.UUID, spec runnermodels.JobSpec) (*runnermodels.RunnerTask, error) +} + +// Runner implements the runner component. +type Runner struct { + // store looks up fleets and persists runner tasks; nil uses the Postgres store. + store fleetStore +} var dockerExecutionOnly = []configuration.VisibilityCondition{ {Field: "execution_mode", Values: []string{ExecutionModeDocker}}, @@ -78,7 +86,7 @@ func (c *Runner) Documentation() string { - **Execution mode**: Host (default) or Docker. - **Container base image**: Choose a common public image, or **Other (custom image)** to enter any OCI reference. - **Custom container image**: Shown only for **Other**; use a normal reference (` + "`my.registry.example.com/org/repo:1.2.3`" + ` or ` + "`debian:bookworm-slim@sha256:…`" + `). Private registries require the runner to be configured with registry credentials. -- **Execution timeout**: Optional wall-clock limit in seconds (1–86400). Leave at **0** to use the broker default. +- **Execution timeout**: Optional wall-clock limit in seconds (1–86400). Leave at **0** to use the fleet default. - **Commands**: One or more shell commands, one per line. - **Environment variables**: Optional key/value pairs available during command execution. Values can be literal strings (with expression support) or organization secret keys. @@ -87,12 +95,18 @@ func (c *Runner) Documentation() string { - **Failed**: The commands finished with non-zero exit code. ## Structured result -If the completed broker task includes valid JSON in **result**, SuperPlane includes it on the ` + "`runner.finished`" + ` event payload next to **status** and **exit_code** (the exact shape depends on your runner / task implementation). +If the completed fleet task includes valid JSON in **result**, SuperPlane includes it on the ` + "`runner.finished`" + ` event payload next to **status** and **exit_code** (the exact shape depends on your runner / task implementation). ` } func (c *Runner) Configuration() []configuration.Field { return []configuration.Field{ + { + Name: "fleet_id", + Label: "Machine type", + Type: configuration.FieldTypeRunnerFleet, + Required: true, + }, { Name: "execution_mode", Label: "Execution mode", @@ -227,9 +241,9 @@ func (c *Runner) Configuration() []configuration.Field { Name: "execution_timeout_seconds", Label: "Execution timeout (seconds)", Type: configuration.FieldTypeNumber, - Required: false, // legacy nodes omit this; 0 means broker default + Required: false, // legacy nodes omit this; 0 means fleet default Default: 0, - Description: "Hard time limit for the whole task, including image pull and command run. Use 0 for the broker default.", + Description: "Hard time limit for the whole task, including image pull and command run. Use 0 for the fleet default.", TypeOptions: &configuration.TypeOptions{ Number: &configuration.NumberTypeOptions{ Min: intPtr(0), @@ -250,12 +264,7 @@ func (c *Runner) Setup(ctx core.SetupContext) error { return err } - if err := validateRunnerSpec(spec); err != nil { - return err - } - - _, err = ctx.Webhook.Setup() - return err + return validateRunnerSpec(spec) } func (c *Runner) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { @@ -277,136 +286,59 @@ func (c *Runner) Execute(ctx core.ExecutionContext) error { return err } - webhookURL, err := ctx.Webhook.Setup() - if err != nil { - return fmt.Errorf("webhook setup: %w", err) - } - cmds := normalizeCommands(spec.Commands) - broker, err := NewBrokerClient(ctx.HTTP) - if err != nil { - return fmt.Errorf("new broker client: %w", err) - } - mode := normalizeExecutionMode(spec.ExecutionMode) - params := CreateTaskParams{ - Commands: cmds, - WebhookURL: webhookURL, - Environment: environment, - ExecutionMode: mode, - DockerImage: resolvedDockerImageRef(spec), - TimeoutSeconds: spec.ExecutionTimeoutSeconds, - } - taskID, err := broker.CreateTask(params) + fleet, err := c.resolveFleet(spec) if err != nil { - return fmt.Errorf("create task: %w", err) + return fmt.Errorf("resolve fleet: %w", err) } - hookParams := map[string]any{"task_id": taskID} + store := c.effectiveStore() - err = ctx.ExecutionState.SetKV("task_id", taskID) + var timeoutPtr *int + if spec.ExecutionTimeoutSeconds > 0 { + t := spec.ExecutionTimeoutSeconds + timeoutPtr = &t + } + task, err := store.EnqueueJob(fleet.ID, ctx.ID, runnermodels.JobSpec{ + Commands: cmds, + Environment: toFleetEnv(environment), + ExecutionMode: mode, + DockerImage: resolvedDockerImageRef(spec), + ExecutionTimeoutSeconds: timeoutPtr, + }) if err != nil { - return fmt.Errorf("set task id in kv: %w", err) + return fmt.Errorf("enqueue runner job: %w", err) + } + taskID := task.ID.String() + if err := ctx.ExecutionState.SetKV("task_id", taskID); err != nil { + return fmt.Errorf("set task_id in kv: %w", err) } - if err := mergeRunnerBrokerTaskID(ctx.Metadata, taskID); err != nil { return fmt.Errorf("runner execution metadata: %w", err) } - - return ctx.Requests.ScheduleActionCall(hookActionPoll, hookParams, pollInterval) + return nil } func (c *Runner) Hooks() []core.Hook { - return []core.Hook{{Name: hookActionPoll, Type: core.HookTypeInternal}} + return nil } func (c *Runner) HandleHook(ctx core.ActionHookContext) error { - switch ctx.Name { - case hookActionPoll: - return c.handlePoll(ctx) - default: - return fmt.Errorf("unknown hook: %s", ctx.Name) - } -} - -func (c *Runner) handlePoll(ctx core.ActionHookContext) error { - if ctx.ExecutionState.IsFinished() { - return nil - } - - taskID, ok := ctx.Parameters["task_id"].(string) - if !ok { - return fmt.Errorf("task_id is missing from parameters") - } - - broker, err := NewBrokerClient(ctx.HTTP) - if err != nil { - return fmt.Errorf("new broker client: %w", err) - } - - task, err := broker.FetchTaskStatus(taskID) - if err != nil { - ctx.Logger.WithError(err).Warn("runner: broker poll failed, will retry") - return ctx.Requests.ScheduleActionCall(hookActionPoll, map[string]any{"task_id": taskID}, pollInterval) - } - - sink := taskLogFromBrokerTask(task) - if err := mergeRunnerTaskLog(ctx.Metadata, taskID, sink); err != nil { - ctx.Logger.WithError(err).Warn("runner: execution metadata update failed") - } - - if task.IsInTerminalState() { - return c.processTaskStatus(ctx.ExecutionState, task) - } - - return ctx.Requests.ScheduleActionCall(hookActionPoll, map[string]any{"task_id": taskID}, pollInterval) + return fmt.Errorf("unknown hook: %s", ctx.Name) } func (c *Runner) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) { - broker, err := NewBrokerClient(ctx.HTTP) - if err != nil { - return http.StatusInternalServerError, nil, fmt.Errorf("new broker client: %w", err) - } - - var raw map[string]any - if err := json.Unmarshal(ctx.Body, &raw); err != nil { - raw = nil - } - - task, err := broker.ProcessWebhook(ctx.Body) - if err != nil { - return http.StatusInternalServerError, nil, fmt.Errorf("process webhook: %w", err) - } - - if !task.IsInTerminalState() { - ctx.Logger.Warn("runner: broker webhook received non-terminal state") - } - - executionCtx, err := ctx.FindExecutionByKV("task_id", task.TaskID) - if err != nil { - return http.StatusNotFound, nil, nil - } - - sink := taskLogFromBrokerTask(task) - if sink == nil { - sink = taskLogFromRawWebhook(raw) - } - if executionCtx.Metadata != nil { - if err := mergeRunnerTaskLog(executionCtx.Metadata, task.TaskID, sink); err != nil { - ctx.Logger.WithError(err).Warn("runner: execution metadata update failed") - } - } - - err = c.processTaskStatus(executionCtx.ExecutionState, task) - if err != nil { - return http.StatusInternalServerError, nil, fmt.Errorf("process task status: %w", err) - } + return http.StatusNotFound, nil, nil +} - return http.StatusOK, nil, nil +// ProcessFleetTask applies a terminal fleet task outcome from fleet-manager bridge complete. +func (c *Runner) ProcessFleetTask(state core.ExecutionStateContext, task *runnermodels.FleetTask) error { + return c.processTaskStatus(state, task) } -func (c *Runner) processTaskStatus(state core.ExecutionStateContext, task *Task) error { +func (c *Runner) processTaskStatus(state core.ExecutionStateContext, task *runnermodels.FleetTask) error { if state.IsFinished() { return nil } @@ -416,18 +348,18 @@ func (c *Runner) processTaskStatus(state core.ExecutionStateContext, task *Task) } channel := FailedOutputChannel - if strings.ToLower(strings.TrimSpace(task.Status)) == "succeeded" && task.effectiveExitCode() == 0 { + if strings.ToLower(strings.TrimSpace(task.Status)) == "succeeded" && task.EffectiveExitCode() == 0 { channel = PassedOutputChannel } - out := map[string]any{"status": task.Status, "exit_code": task.effectiveExitCode()} - if v := brokerResultAsAny(task.Result); v != nil { + out := map[string]any{"status": task.Status, "exit_code": task.EffectiveExitCode()} + if v := fleetResultAsAny(task.Result); v != nil { out["result"] = v } return state.Emit(channel, RunnerFinishedEventType, []any{out}) } -func brokerResultAsAny(raw json.RawMessage) any { +func fleetResultAsAny(raw json.RawMessage) any { b := bytes.TrimSpace(raw) if len(b) == 0 || !json.Valid(b) { return nil @@ -439,28 +371,42 @@ func brokerResultAsAny(raw json.RawMessage) any { return v } -func (c *Runner) Cancel(ctx core.ExecutionContext) error { - if ctx.ExecutionState.IsFinished() { - return nil - } +func (c *Runner) Cancel(ctx core.ExecutionContext) error { return nil } +func (c *Runner) Cleanup(ctx core.SetupContext) error { return nil } - taskID, err := ctx.ExecutionState.GetKV("task_id") - if err != nil { - if errors.Is(err, core.ErrExecutionKVNotFound) { - return nil - } - return fmt.Errorf("get task_id kv: %w", err) +func (c *Runner) resolveFleet(spec Spec) (*runnermodels.RunnerFleet, error) { + return c.resolveFleetByID(spec.FleetID) +} + +func (c *Runner) resolveFleetByID(fleetIDStr string) (*runnermodels.RunnerFleet, error) { + fleetIDStr = strings.TrimSpace(fleetIDStr) + if fleetIDStr == "" { + return nil, fmt.Errorf("fleet_id is required") } - broker, err := NewBrokerClient(ctx.HTTP) + fleetID, err := uuid.Parse(fleetIDStr) if err != nil { - return err + return nil, fmt.Errorf("invalid fleet_id %q: %w", fleetIDStr, err) } - if err := broker.CancelTask(taskID); err != nil { - return fmt.Errorf("cancel task: %w", err) + store := c.effectiveStore() + return store.FindFleet(fleetID) +} + +func (c *Runner) effectiveStore() fleetStore { + if c.store != nil { + return c.store } - return nil + return runners.NewPostgresStore() } -func (c *Runner) Cleanup(ctx core.SetupContext) error { return nil } +func toFleetEnv(env []BrokerEnvironmentVariable) []runnermodels.FleetEnvironmentVariable { + if len(env) == 0 { + return nil + } + out := make([]runnermodels.FleetEnvironmentVariable, len(env)) + for i, v := range env { + out[i] = runnermodels.FleetEnvironmentVariable{Name: v.Name, Value: v.Value} + } + return out +} diff --git a/pkg/components/runner/run_commands_test.go b/pkg/components/runner/run_commands_test.go index 51c581597b..6564fe9d63 100644 --- a/pkg/components/runner/run_commands_test.go +++ b/pkg/components/runner/run_commands_test.go @@ -2,20 +2,56 @@ package runner import ( "encoding/json" - "io" - "net/http" "reflect" - "strings" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/superplanehq/superplane/pkg/configuration" "github.com/superplanehq/superplane/pkg/core" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" "github.com/superplanehq/superplane/test/support/contexts" + "gorm.io/datatypes" ) +// mockStore is a test-only in-memory implementation of fleetStore. +type mockStore struct { + fleet *runnermodels.RunnerFleet + tasks []*runnermodels.RunnerTask +} + +func (m *mockStore) FindFleet(id uuid.UUID) (*runnermodels.RunnerFleet, error) { + return m.fleet, nil +} + +func (m *mockStore) EnqueueJob(fleetID, executionID uuid.UUID, spec runnermodels.JobSpec) (*runnermodels.RunnerTask, error) { + id := uuid.New() + t := &runnermodels.RunnerTask{ + ID: id, + FleetID: fleetID, + FleetTaskID: id.String(), + ExecutionID: executionID, + Status: runnermodels.TaskStatusQueued, + Spec: datatypes.NewJSONType(spec), + } + m.tasks = append(m.tasks, t) + return t, nil +} + +func testFleet() *runnermodels.RunnerFleet { + return &runnermodels.RunnerFleet{ + ID: uuid.New(), + Name: "fleet-1", + AuthToken: "token-1", + } +} + +func testRunner(fleet *runnermodels.RunnerFleet) *Runner { + return &Runner{store: &mockStore{fleet: fleet}} +} + func TestNormalizeCommands(t *testing.T) { t.Parallel() got := normalizeCommands("echo a\n\n echo b \n") @@ -112,23 +148,35 @@ func TestValidateEnvironment(t *testing.T) { } } -func TestRunnerExecuteSendsEnvironmentToBroker(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") +func TestRunnerExecuteEnqueuesJob(t *testing.T) { + fleet := testFleet() + store := &mockStore{fleet: fleet} + state := &contexts.ExecutionStateContext{KVs: map[string]string{}} - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusCreated, Body: io.NopCloser(strings.NewReader(`{"id":"task-123"}`))}, - }, - } + err := (&Runner{store: store}).Execute(core.ExecutionContext{ + ID: uuid.New(), + Configuration: map[string]any{"fleet_id": fleet.ID.String(), "commands": "echo hello"}, + HTTP: &contexts.HTTPContext{}, + ExecutionState: state, + Requests: &contexts.RequestContext{}, + }) + require.NoError(t, err) + require.Len(t, store.tasks, 1) + assert.Equal(t, []string{"echo hello"}, store.tasks[0].Spec.Data().Commands) + assert.Equal(t, runnermodels.TaskStatusQueued, store.tasks[0].Status) + assert.NotEmpty(t, state.KVs["task_id"]) +} + +func TestRunnerExecuteSendsEnvironmentInJobSpec(t *testing.T) { + fleet := testFleet() + store := &mockStore{fleet: fleet} state := &contexts.ExecutionStateContext{KVs: map[string]string{}} - requests := &contexts.RequestContext{} - component := &Runner{} - err := component.Execute(core.ExecutionContext{ + err := (&Runner{store: store}).Execute(core.ExecutionContext{ + ID: uuid.New(), Configuration: map[string]any{ + "fleet_id": fleet.ID.String(), "commands": "echo hello", "environment": []map[string]any{ { @@ -146,69 +194,27 @@ func TestRunnerExecuteSendsEnvironmentToBroker(t *testing.T) { }, }, }, - HTTP: httpContext, + HTTP: &contexts.HTTPContext{}, Secrets: &contexts.SecretsContext{Values: map[string][]byte{"api/token": []byte("secret'value;$PATH")}}, - Webhook: &contexts.NodeWebhookContext{}, ExecutionState: state, - Requests: requests, + Requests: &contexts.RequestContext{}, }) require.NoError(t, err) - require.Len(t, httpContext.Requests, 1) - - body, err := io.ReadAll(httpContext.Requests[0].Body) - require.NoError(t, err) - - var req brokerCreateTaskRequest - require.NoError(t, json.Unmarshal(body, &req)) - - assert.Equal(t, "fleet-1", req.FleetID) - assert.Equal(t, []string{"echo hello"}, req.Commands) - assert.Equal(t, "host", req.ExecutionMode) - assert.Equal(t, []BrokerEnvironmentVariable{ + require.Len(t, store.tasks, 1) + assert.Equal(t, []runnermodels.FleetEnvironmentVariable{ {Name: "COMMIT_AUTHOR", Value: "alice@example.com"}, {Name: "API_TOKEN", Value: "secret'value;$PATH"}, - }, req.Environment) - assert.Equal(t, "task-123", state.KVs["task_id"]) - assert.Equal(t, hookActionPoll, requests.Action) -} - -func TestRunnerExecuteOmitsEmptyEnvironment(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusCreated, Body: io.NopCloser(strings.NewReader(`{"id":"task-123"}`))}, - }, - } - - err := (&Runner{}).Execute(core.ExecutionContext{ - Configuration: map[string]any{"commands": "echo hello"}, - HTTP: httpContext, - Webhook: &contexts.NodeWebhookContext{}, - ExecutionState: &contexts.ExecutionStateContext{KVs: map[string]string{}}, - Requests: &contexts.RequestContext{}, - }) - - require.NoError(t, err) - require.Len(t, httpContext.Requests, 1) - - body, err := io.ReadAll(httpContext.Requests[0].Body) - require.NoError(t, err) - assert.NotContains(t, string(body), "environment") + }, store.tasks[0].Spec.Data().Environment) } func TestRunnerExecuteFailsWhenSecretCannotBeResolved(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{} + fleet := testFleet() - err := (&Runner{}).Execute(core.ExecutionContext{ + err := testRunner(fleet).Execute(core.ExecutionContext{ + ID: uuid.New(), Configuration: map[string]any{ + "fleet_id": fleet.ID.String(), "commands": "echo hello", "environment": []map[string]any{ { @@ -221,15 +227,13 @@ func TestRunnerExecuteFailsWhenSecretCannotBeResolved(t *testing.T) { }, }, }, - HTTP: httpContext, + HTTP: &contexts.HTTPContext{}, Secrets: &contexts.SecretsContext{Values: map[string][]byte{}}, - Webhook: &contexts.NodeWebhookContext{}, ExecutionState: &contexts.ExecutionStateContext{KVs: map[string]string{}}, Requests: &contexts.RequestContext{}, }) require.ErrorContains(t, err, "failed to resolve environment variable API_TOKEN") - assert.Empty(t, httpContext.Requests) } func secretRef(secret, key string) configuration.SecretKeyRef { @@ -241,7 +245,7 @@ func TestRunnerProcessTaskStatusIncludesResult(t *testing.T) { state := &contexts.ExecutionStateContext{KVs: map[string]string{}} exit := 0 - task := &Task{ + task := &runnermodels.FleetTask{ Status: "succeeded", ExitCode: &exit, Result: json.RawMessage(`{"items":[1,2],"ok":true}`), @@ -263,7 +267,7 @@ func TestRunnerProcessTaskStatusOmitsInvalidResult(t *testing.T) { state := &contexts.ExecutionStateContext{KVs: map[string]string{}} exit := 0 - task := &Task{ + task := &runnermodels.FleetTask{ Status: "succeeded", ExitCode: &exit, Result: json.RawMessage(`not-json`), @@ -280,89 +284,10 @@ func TestRunnerProcessTaskStatusCanceledUsesFailedChannel(t *testing.T) { state := &contexts.ExecutionStateContext{KVs: map[string]string{}} exit := 130 - task := &Task{ + task := &runnermodels.FleetTask{ Status: "canceled", ExitCode: &exit, } require.NoError(t, (&Runner{}).processTaskStatus(state, task)) require.Equal(t, FailedOutputChannel, state.Channel) } - -func TestBrokerCancelTaskSuccess(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"fleet-1","state":"canceled","status":"canceled"}`))}, - }, - } - - broker, err := NewBrokerClient(httpContext) - require.NoError(t, err) - - require.NoError(t, broker.CancelTask("broker-task-99")) - require.Len(t, httpContext.Requests, 1) - assert.Equal(t, http.MethodPost, httpContext.Requests[0].Method) - assert.Equal(t, "/v1/tasks/broker-task-99/cancel", httpContext.Requests[0].URL.Path) - assert.Equal(t, "Bearer token-1", httpContext.Requests[0].Header.Get("Authorization")) -} - -func TestBrokerCancelTask404Noop(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader(`{"error":"task not found"}`))}, - }, - } - - broker, err := NewBrokerClient(httpContext) - require.NoError(t, err) - - require.NoError(t, broker.CancelTask("missing")) -} - -func TestBrokerCancelTask409RetriesThenSucceeds(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusConflict, Body: io.NopCloser(strings.NewReader(`{"error":"task not yet assigned upstream"}`))}, - {StatusCode: http.StatusConflict, Body: io.NopCloser(strings.NewReader(`{"error":"task not yet assigned upstream"}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"f1","state":"cancel_requested","status":"claimed"}`))}, - }, - } - - broker, err := NewBrokerClient(httpContext) - require.NoError(t, err) - - require.NoError(t, broker.CancelTask("t1")) - require.Len(t, httpContext.Requests, 3) -} - -func TestRunnerCancelCallsBroker(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "https://broker.example") - t.Setenv("TASK_BROKER_FLEET_ID", "fleet-1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token-1") - - httpContext := &contexts.HTTPContext{ - Responses: []*http.Response{ - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"up-1","state":"already_terminal","status":"succeeded"}`))}, - }, - } - - state := &contexts.ExecutionStateContext{KVs: map[string]string{"task_id": "broker-42"}} - err := (&Runner{}).Cancel(core.ExecutionContext{ - HTTP: httpContext, - ExecutionState: state, - }) - require.NoError(t, err) - require.Len(t, httpContext.Requests, 1) - assert.Equal(t, "/v1/tasks/broker-42/cancel", httpContext.Requests[0].URL.Path) -} diff --git a/pkg/components/runner/spec.go b/pkg/components/runner/spec.go index bf69b470da..f9f4e662ad 100644 --- a/pkg/components/runner/spec.go +++ b/pkg/components/runner/spec.go @@ -37,7 +37,10 @@ type Spec struct { ExecutionMode string `mapstructure:"execution_mode"` DockerImagePreset string `mapstructure:"docker_image_preset"` DockerImage string `mapstructure:"docker_image"` - ExecutionTimeoutSeconds int `mapstructure:"execution_timeout_seconds"` // 0 = omit (broker default) + ExecutionTimeoutSeconds int `mapstructure:"execution_timeout_seconds"` // 0 = omit (fleet default) + + // FleetID selects a registered runner fleet by UUID. + FleetID string `mapstructure:"fleet_id"` } func decodeRunnerSpec(raw any) (Spec, error) { @@ -116,5 +119,9 @@ func validateRunnerSpec(spec Spec) error { } } + if strings.TrimSpace(spec.FleetID) == "" { + return fmt.Errorf("fleet_id is required") + } + return nil } diff --git a/pkg/components/runner/spec_test.go b/pkg/components/runner/spec_test.go index e8b441b887..e1661de802 100644 --- a/pkg/components/runner/spec_test.go +++ b/pkg/components/runner/spec_test.go @@ -30,19 +30,22 @@ func TestNormalizeExecutionMode(t *testing.T) { } } +const testFleetID = "550e8400-e29b-41d4-a716-446655440000" + func TestValidateRunnerSpec(t *testing.T) { t.Parallel() - if err := validateRunnerSpec(Spec{Commands: "echo hi", ExecutionMode: ExecutionModeHost}); err != nil { + if err := validateRunnerSpec(Spec{Commands: "echo hi", ExecutionMode: ExecutionModeHost, FleetID: testFleetID}); err != nil { t.Fatalf("valid host spec: %v", err) } - // Legacy persisted config: commands only (no execution_mode / execution_timeout_seconds keys). - if err := validateRunnerSpec(Spec{Commands: "echo hi"}); err != nil { - t.Fatalf("valid legacy host spec (empty execution_mode): %v", err) + // Persisted config: commands only (no execution_mode / execution_timeout_seconds keys). + if err := validateRunnerSpec(Spec{Commands: "echo hi", FleetID: testFleetID}); err != nil { + t.Fatalf("valid host spec (empty execution_mode): %v", err) } if err := validateRunnerSpec(Spec{ Commands: "echo hi", ExecutionMode: ExecutionModeDocker, DockerImagePreset: "debian:bookworm-slim", + FleetID: testFleetID, }); err != nil { t.Fatalf("valid docker quick pick: %v", err) } @@ -51,6 +54,7 @@ func TestValidateRunnerSpec(t *testing.T) { ExecutionMode: ExecutionModeDocker, DockerImagePreset: DockerImagePresetCustom, DockerImage: "my.registry.example.com/app:1.0.0", + FleetID: testFleetID, }); err != nil { t.Fatalf("valid docker custom: %v", err) } @@ -59,21 +63,26 @@ func TestValidateRunnerSpec(t *testing.T) { Commands: "echo hi", ExecutionMode: ExecutionModeDocker, DockerImage: "debian:bookworm-slim", + FleetID: testFleetID, }); err != nil { - t.Fatalf("valid docker legacy: %v", err) + t.Fatalf("valid docker without preset: %v", err) } if err := validateRunnerSpec(Spec{ Commands: "echo hi", ExecutionMode: ExecutionModeHost, ExecutionTimeoutSeconds: 120, + FleetID: testFleetID, }); err != nil { t.Fatalf("valid timeout: %v", err) } - if err := validateRunnerSpec(Spec{Commands: "", ExecutionMode: ExecutionModeHost}); err == nil { + if err := validateRunnerSpec(Spec{Commands: "", ExecutionMode: ExecutionModeHost, FleetID: testFleetID}); err == nil { t.Fatal("expected error for empty commands") } - if err := validateRunnerSpec(Spec{Commands: "echo hi", ExecutionMode: ExecutionModeDocker}); err == nil { + if err := validateRunnerSpec(Spec{Commands: "echo hi", ExecutionMode: ExecutionModeHost}); err == nil { + t.Fatal("expected error for missing fleet_id") + } + if err := validateRunnerSpec(Spec{Commands: "echo hi", ExecutionMode: ExecutionModeDocker, FleetID: testFleetID}); err == nil { t.Fatal("expected error for docker without image") } if err := validateRunnerSpec(Spec{ @@ -81,6 +90,7 @@ func TestValidateRunnerSpec(t *testing.T) { ExecutionMode: ExecutionModeDocker, DockerImagePreset: DockerImagePresetCustom, DockerImage: "", + FleetID: testFleetID, }); err == nil { t.Fatal("expected error for docker custom without image") } @@ -90,6 +100,7 @@ func TestValidateRunnerSpec(t *testing.T) { ExecutionMode: ExecutionModeDocker, DockerImagePreset: DockerImagePresetCustom, DockerImage: longImage, + FleetID: testFleetID, }); err == nil { t.Fatal("expected error for docker image reference that is too long") } @@ -97,6 +108,7 @@ func TestValidateRunnerSpec(t *testing.T) { Commands: "echo hi", ExecutionMode: ExecutionModeHost, ExecutionTimeoutSeconds: -1, + FleetID: testFleetID, }); err == nil { t.Fatal("expected error for negative timeout") } @@ -104,6 +116,7 @@ func TestValidateRunnerSpec(t *testing.T) { Commands: "echo hi", ExecutionMode: ExecutionModeHost, ExecutionTimeoutSeconds: maxExecutionTimeoutSecondsRequest + 1, + FleetID: testFleetID, }); err == nil { t.Fatal("expected error for timeout above max") } @@ -113,6 +126,7 @@ func TestDecodeRunnerSpec_WeakTypes(t *testing.T) { t.Parallel() raw := map[string]any{ "commands": "echo x", + "fleet_id": testFleetID, "execution_mode": "docker", "docker_image_preset": "debian:bookworm-slim", "docker_image": " alpine:latest ", @@ -138,6 +152,7 @@ func TestValidateConfigurationRunnerLegacyPreExecutionFields(t *testing.T) { r := &Runner{} legacy := map[string]any{ "commands": "echo hi", + "fleet_id": testFleetID, } if err := configuration.ValidateConfiguration(r.Configuration(), legacy); err != nil { t.Fatalf("ValidateConfiguration legacy runner: %v", err) @@ -163,6 +178,7 @@ func TestValidateConfigurationRunnerLegacyDockerImageOnly(t *testing.T) { err := configuration.ValidateConfiguration(r.Configuration(), map[string]any{ "execution_mode": ExecutionModeDocker, "commands": "echo hi", + "fleet_id": testFleetID, "execution_timeout_seconds": 0, "docker_image": "debian:bookworm-slim", }) diff --git a/pkg/configuration/field.go b/pkg/configuration/field.go index fc69665b1b..4a7565b47d 100644 --- a/pkg/configuration/field.go +++ b/pkg/configuration/field.go @@ -33,6 +33,7 @@ const ( FieldTypeAnyPredicateList = "any-predicate-list" FieldTypeGitRef = "git-ref" FieldTypeSecretKey = "secret-key" + FieldTypeRunnerFleet = "runner-fleet" ) type Field struct { diff --git a/pkg/features/features.go b/pkg/features/features.go index cc9e2bb88a..9d5eace7fb 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -19,8 +19,11 @@ const FeatureClaudeManagedAgents = "claude_managed_agents" // is released or enabled for the organization. const FeatureDashboards = "dashboards" +// FeatureRunner gates the Runner component and runner fleet APIs until generally available. +const FeatureRunner = "runner" + var registry = []Feature{ - {ID: "runner", Label: "Runners", Description: "Sandboxed Runners"}, + {ID: FeatureRunner, Label: "Runners", Description: "Sandboxed Runners"}, {ID: FeatureClaudeManagedAgents, Label: "Claude Managed Agents", Description: "Chat with a Claude-powered agent against the canvas"}, {ID: FeatureDashboards, Label: "Dashboards", Description: "Markdown dashboard panels on canvases"}, } diff --git a/pkg/models/canvas_node_execution.go b/pkg/models/canvas_node_execution.go index 56b9e1282c..e13bb96c86 100644 --- a/pkg/models/canvas_node_execution.go +++ b/pkg/models/canvas_node_execution.go @@ -319,6 +319,14 @@ func FindNodeExecution(workflowID, id uuid.UUID) (*CanvasNodeExecution, error) { return FindNodeExecutionInTransaction(database.Conn(), workflowID, id) } +func FindNodeExecutionByID(tx *gorm.DB, id uuid.UUID) (*CanvasNodeExecution, error) { + var execution CanvasNodeExecution + if err := tx.Where("id = ?", id).First(&execution).Error; err != nil { + return nil, err + } + return &execution, nil +} + func FindNodeExecutionInTransaction(tx *gorm.DB, workflowID, id uuid.UUID) (*CanvasNodeExecution, error) { var execution CanvasNodeExecution err := tx. diff --git a/pkg/public/admin_test.go b/pkg/public/admin_test.go index 41063f75ac..f78ef453ed 100644 --- a/pkg/public/admin_test.go +++ b/pkg/public/admin_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/superplanehq/superplane/pkg/database" @@ -867,3 +868,22 @@ func TestAdminDisableOrgExperimentalFeature(t *testing.T) { assert.Equal(t, http.StatusNotFound, response.Code) }) } + +func TestAdminRegisterRunnerFleetRequiresAdmin(t *testing.T) { + r := support.Setup(t) + defer r.Close() + server, _, token := setupTestServer(r, t) + + body, err := json.Marshal(map[string]any{ + "name": "x-" + uuid.New().String(), + }) + require.NoError(t, err) + response := execRequest(server, requestParams{ + method: http.MethodPost, + path: "/admin/api/runner/fleets", + authCookie: token, + body: body, + contentType: "application/json", + }) + assert.Equal(t, http.StatusNotFound, response.Code) +} diff --git a/pkg/public/runner_live_log_stream.go b/pkg/public/runner_live_log_stream.go deleted file mode 100644 index 8ddc1f70ef..0000000000 --- a/pkg/public/runner_live_log_stream.go +++ /dev/null @@ -1,160 +0,0 @@ -package public - -import ( - "fmt" - "io" - "net/http" - "net/url" - "os" - "strings" - - "github.com/google/uuid" - "github.com/gorilla/mux" - runneraction "github.com/superplanehq/superplane/pkg/components/runner" - "github.com/superplanehq/superplane/pkg/database" - "github.com/superplanehq/superplane/pkg/models" - "github.com/superplanehq/superplane/pkg/public/middleware" -) - -func (s *Server) handleRunnerLiveLogStream(w http.ResponseWriter, r *http.Request) { - user, ok := middleware.GetUserFromContext(r.Context()) - if !ok { - http.Error(w, "Unauthorized", http.StatusUnauthorized) - return - } - - allowed, err := s.authService.CheckOrganizationPermission( - user.ID.String(), - user.OrganizationID.String(), - "canvases", - "read", - ) - if err != nil { - http.Error(w, "Authorization check failed", http.StatusInternalServerError) - return - } - if !allowed { - http.Error(w, "Forbidden", http.StatusForbidden) - return - } - - vars := mux.Vars(r) - canvasID, err := uuid.Parse(strings.TrimSpace(vars["canvas_id"])) - if err != nil { - http.Error(w, "Invalid canvas id", http.StatusBadRequest) - return - } - executionID, err := uuid.Parse(strings.TrimSpace(vars["execution_id"])) - if err != nil { - http.Error(w, "Invalid execution id", http.StatusBadRequest) - return - } - - if _, err := models.FindCanvas(user.OrganizationID, canvasID); err != nil { - http.Error(w, "Canvas not found", http.StatusNotFound) - return - } - - execution, err := models.FindNodeExecution(canvasID, executionID) - if err != nil { - http.Error(w, "Execution not found", http.StatusNotFound) - return - } - - node, err := models.FindCanvasNode(database.Conn(), canvasID, execution.NodeID) - if err != nil { - http.Error(w, "Node not found", http.StatusNotFound) - return - } - - ref := node.Ref.Data() - if ref.Component == nil || ref.Component.Name != "runner" { - http.Error(w, "Live logs are only available for Runner components", http.StatusBadRequest) - return - } - - meta := execution.Metadata.Data() - var brokerTaskID string - if v, ok := meta[runneraction.ExecutionMetadataBrokerTaskID]; ok && v != nil { - if s, ok2 := v.(string); ok2 { - brokerTaskID = strings.TrimSpace(s) - } else { - brokerTaskID = strings.TrimSpace(fmt.Sprint(v)) - } - } - if brokerTaskID == "" { - http.Error( - w, - "Logs are not available for this execution yet. Check again shortly.", - http.StatusNotFound, - ) - return - } - - base := strings.TrimRight(strings.TrimSpace(os.Getenv("TASK_BROKER_BASE_URL")), "/") - token := strings.TrimSpace(os.Getenv("TASK_BROKER_AUTH_TOKEN")) - if base == "" || token == "" { - http.Error(w, "Live logs are not configured on this installation", http.StatusServiceUnavailable) - return - } - upstream := base + "/v1/tasks/" + url.PathEscape(brokerTaskID) + "/live-logs" - - req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, upstream, nil) - if err != nil { - http.Error(w, "Bad gateway", http.StatusBadGateway) - return - } - req.Header.Set("Authorization", "Bearer "+token) - req.Header.Set("Accept", "application/x-ndjson") - // Avoid upstream gzip; it adds latency and can buffer small NDJSON chunks. - req.Header.Set("Accept-Encoding", "identity") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - if ct := resp.Header.Get("Content-Type"); ct != "" { - w.Header().Set("Content-Type", ct) - } else { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") - } - w.WriteHeader(resp.StatusCode) - _, _ = io.Copy(w, resp.Body) - return - } - - if ct := resp.Header.Get("Content-Type"); ct != "" { - w.Header().Set("Content-Type", ct) - } else { - w.Header().Set("Content-Type", "application/x-ndjson; charset=utf-8") - } - w.Header().Set("Cache-Control", "no-store") - w.WriteHeader(http.StatusOK) - flusher, flusherOK := w.(http.Flusher) - if flusherOK { - flusher.Flush() - } - - buf := make([]byte, 16*1024) - for { - n, readErr := resp.Body.Read(buf) - if n > 0 { - if _, writeErr := w.Write(buf[:n]); writeErr != nil { - return - } - if flusherOK { - flusher.Flush() - } - } - if readErr != nil { - if readErr == io.EOF { - break - } - return - } - } -} diff --git a/pkg/public/runner_live_log_stream_test.go b/pkg/public/runner_live_log_stream_test.go deleted file mode 100644 index 9cd09820d0..0000000000 --- a/pkg/public/runner_live_log_stream_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package public - -import ( - "fmt" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - runneraction "github.com/superplanehq/superplane/pkg/components/runner" - "github.com/superplanehq/superplane/pkg/database" - "github.com/superplanehq/superplane/pkg/jwt" - "github.com/superplanehq/superplane/pkg/models" - "github.com/superplanehq/superplane/test/support" - "gorm.io/datatypes" - "gorm.io/gorm" -) - -func mustRunnerLiveLogServer(t *testing.T, r *support.ResourceRegistry) (*Server, *jwt.Signer) { - t.Helper() - signer := jwt.NewSigner("test") - server, err := NewServer( - r.Encryptor, - r.Registry, - signer, - support.NewOIDCProvider(), - "", - "http://localhost", - "http://localhost", - "test", - "/app/templates", - r.AuthService, - nil, - false, - ) - require.NoError(t, err) - require.NoError(t, server.RegisterGRPCGateway("localhost:50051")) - return server, signer -} - -func runnerLiveLogGET( - t *testing.T, - server *Server, - signer *jwt.Signer, - r *support.ResourceRegistry, - canvasID, executionID string, -) *httptest.ResponseRecorder { - t.Helper() - req := httptest.NewRequest( - http.MethodGet, - fmt.Sprintf("/api/v1/canvases/%s/node-executions/%s/runner-live-logs", canvasID, executionID), - nil, - ) - req.Header.Set("x-organization-id", r.Organization.ID.String()) - token, err := signer.Generate(r.Account.ID.String(), time.Hour) - require.NoError(t, err) - req.AddCookie(&http.Cookie{Name: "account_token", Value: token}) - rec := httptest.NewRecorder() - server.Router.ServeHTTP(rec, req) - return rec -} - -func createExecutionForCanvasRun( - t *testing.T, - run *models.CanvasRun, - rootEventID uuid.UUID, - nodeID string, -) *models.CanvasNodeExecution { - t.Helper() - now := time.Now() - execution := models.CanvasNodeExecution{ - ID: uuid.New(), - WorkflowID: run.WorkflowID, - NodeID: nodeID, - RootEventID: rootEventID, - RunID: run.ID, - EventID: rootEventID, - State: models.CanvasNodeExecutionStatePending, - Configuration: datatypes.NewJSONType(map[string]any{}), - CreatedAt: &now, - UpdatedAt: &now, - } - require.NoError(t, database.Conn().Create(&execution).Error) - return &execution -} - -func createCanvasWithComponentExecution( - t *testing.T, - r *support.ResourceRegistry, - componentName string, - nodeID string, - metadata map[string]any, -) (canvasID uuid.UUID, executionID uuid.UUID) { - t.Helper() - canvas, _ := support.CreateCanvas(t, r.Organization.ID, r.User, []models.CanvasNode{ - { - NodeID: "trigger-1", - Type: models.NodeTypeTrigger, - Ref: datatypes.NewJSONType(models.NodeRef{ - Trigger: &models.TriggerRef{Name: "start"}, - }), - }, - { - NodeID: nodeID, - Type: models.NodeTypeComponent, - Name: componentName, - Ref: datatypes.NewJSONType(models.NodeRef{ - Component: &models.ComponentRef{Name: componentName}, - }), - }, - }, nil) - - rootEvent := support.EmitCanvasEventForNode(t, canvas.ID, "trigger-1", "default", nil) - - var run *models.CanvasRun - require.NoError(t, database.Conn().Transaction(func(tx *gorm.DB) error { - var err error - run, err = models.FindOrCreateCanvasRunForRootEventInTransaction(tx, rootEvent) - if err != nil { - return err - } - return rootEvent.RoutedInTransaction(tx) - })) - - exec := createExecutionForCanvasRun(t, run, rootEvent.ID, nodeID) - if len(metadata) > 0 { - exec.Metadata = datatypes.NewJSONType(metadata) - require.NoError(t, database.Conn().Save(exec).Error) - } - return canvas.ID, exec.ID -} - -func TestHandleRunnerLiveLogStream(t *testing.T) { - r := support.Setup(t) - defer r.Close() - - server, signer := mustRunnerLiveLogServer(t, r) - - t.Run("no session cookie", func(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, - fmt.Sprintf("/api/v1/canvases/%s/node-executions/%s/runner-live-logs", uuid.New(), uuid.New()), - nil, - ) - req.Header.Set("x-organization-id", r.Organization.ID.String()) - rec := httptest.NewRecorder() - server.Router.ServeHTTP(rec, req) - assert.Equal(t, http.StatusUnauthorized, rec.Code) - }) - - t.Run("invalid canvas id", func(t *testing.T) { - rec := runnerLiveLogGET(t, server, signer, r, "not-a-uuid", uuid.New().String()) - assert.Equal(t, http.StatusBadRequest, rec.Code) - assert.Contains(t, rec.Body.String(), "Invalid canvas id") - }) - - t.Run("invalid execution id", func(t *testing.T) { - canvasID, _ := createCanvasWithComponentExecution(t, r, "runner", "runner-1", nil) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), "bad-id") - assert.Equal(t, http.StatusBadRequest, rec.Code) - assert.Contains(t, rec.Body.String(), "Invalid execution id") - }) - - t.Run("canvas not found", func(t *testing.T) { - rec := runnerLiveLogGET(t, server, signer, r, uuid.New().String(), uuid.New().String()) - assert.Equal(t, http.StatusNotFound, rec.Code) - assert.Contains(t, rec.Body.String(), "Canvas not found") - }) - - t.Run("execution not found", func(t *testing.T) { - canvasID, _ := createCanvasWithComponentExecution(t, r, "runner", "runner-1", nil) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), uuid.New().String()) - assert.Equal(t, http.StatusNotFound, rec.Code) - assert.Contains(t, rec.Body.String(), "Execution not found") - }) - - t.Run("non-runner node", func(t *testing.T) { - canvasID, execID := createCanvasWithComponentExecution(t, r, "noop", "noop-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: "tb-1", - }) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusBadRequest, rec.Code) - assert.Contains(t, rec.Body.String(), "Runner components") - }) - - t.Run("broker task id missing", func(t *testing.T) { - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{}) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusNotFound, rec.Code) - assert.Contains(t, rec.Body.String(), "not available for this execution") - }) - - t.Run("broker task id from non-string metadata", func(t *testing.T) { - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: 99, - }) - t.Setenv("TASK_BROKER_BASE_URL", "http://127.0.0.1:1") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "token") - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusBadGateway, rec.Code) - }) - - t.Run("task broker not configured", func(t *testing.T) { - t.Setenv("TASK_BROKER_BASE_URL", "") - t.Setenv("TASK_BROKER_AUTH_TOKEN", "") - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: "tb-x", - }) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusServiceUnavailable, rec.Code) - assert.Contains(t, rec.Body.String(), "not configured") - }) - - t.Run("upstream error response is proxied", func(t *testing.T) { - upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "text/plain") - w.WriteHeader(http.StatusTeapot) - _, _ = w.Write([]byte("upstream")) - })) - t.Cleanup(upstream.Close) - t.Setenv("TASK_BROKER_BASE_URL", upstream.URL) - t.Setenv("TASK_BROKER_AUTH_TOKEN", "secret") - - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: "task-418", - }) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusTeapot, rec.Code) - assert.Contains(t, rec.Body.String(), "upstream") - }) - - t.Run("upstream success streams body", func(t *testing.T) { - upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "/v1/tasks/task-ok/live-logs", r.URL.Path) - assert.Equal(t, "Bearer broker-token", r.Header.Get("Authorization")) - assert.Equal(t, "application/x-ndjson", r.Header.Get("Accept")) - assert.Equal(t, "identity", r.Header.Get("Accept-Encoding")) - w.Header().Set("Content-Type", "application/x-ndjson") - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"type":"line","text":"log"}` + "\n")) - })) - t.Cleanup(upstream.Close) - t.Setenv("TASK_BROKER_BASE_URL", upstream.URL) - t.Setenv("TASK_BROKER_AUTH_TOKEN", "broker-token") - - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: "task-ok", - }) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusOK, rec.Code) - assert.Equal(t, "no-store", rec.Header().Get("Cache-Control")) - assert.Contains(t, rec.Body.String(), "log") - }) - - t.Run("upstream success forwards upstream content-type", func(t *testing.T) { - // httptest.Server sets Content-Type to text/plain when the handler does not set one. - upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("x")) - })) - t.Cleanup(upstream.Close) - t.Setenv("TASK_BROKER_BASE_URL", upstream.URL) - t.Setenv("TASK_BROKER_AUTH_TOKEN", "t") - - canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ - runneraction.ExecutionMetadataBrokerTaskID: "task-ct", - }) - rec := runnerLiveLogGET(t, server, signer, r, canvasID.String(), execID.String()) - assert.Equal(t, http.StatusOK, rec.Code) - assert.Equal(t, "x", rec.Body.String()) - assert.NotEmpty(t, rec.Header().Get("Content-Type")) - }) -} diff --git a/pkg/public/server.go b/pkg/public/server.go index bf9c98c910..49c042e6e2 100644 --- a/pkg/public/server.go +++ b/pkg/public/server.go @@ -28,6 +28,7 @@ import ( "github.com/superplanehq/superplane/pkg/jwt" "github.com/superplanehq/superplane/pkg/logging" "github.com/superplanehq/superplane/pkg/registry" + runnerapi "github.com/superplanehq/superplane/pkg/runners/api" "github.com/superplanehq/superplane/pkg/workers/contexts" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" "go.opentelemetry.io/otel/attribute" @@ -90,6 +91,7 @@ type Server struct { authHandler *authentication.Handler isDev bool usageService usage.Service + runnerAPI *runnerapi.Handler } // WebsocketHub returns the websocket hub for this server @@ -176,6 +178,11 @@ func NewServer( oidcProvider: oidcProvider, registry: registry, authService: authorizationService, + runnerAPI: runnerapi.New(runnerapi.Config{ + BaseURL: baseURL, + Registry: registry, + AuthService: authorizationService, + }), upgrader: &websocket.Upgrader{ CheckOrigin: makeOriginChecker(getAllowedOrigins()), ReadBufferSize: 1024, @@ -341,7 +348,12 @@ func (s *Server) RegisterGRPCGateway(grpcServerAddr string) error { s.Router.Handle( "/api/v1/canvases/{canvas_id}/node-executions/{execution_id}/runner-live-logs", - middleware.OrganizationAuthMiddleware(s.jwt)(http.HandlerFunc(s.handleRunnerLiveLogStream)), + middleware.OrganizationAuthMiddleware(s.jwt)(http.HandlerFunc(s.runnerAPI.LiveLogStream)), + ).Methods("GET") + + s.Router.Handle( + "/api/v1/runner-fleets", + middleware.OrganizationAuthMiddleware(s.jwt)(http.HandlerFunc(s.runnerAPI.OrgListFleets)), ).Methods("GET") // Protect the gRPC gateway routes with organization authentication @@ -566,6 +578,13 @@ func (s *Server) InitRouter(additionalMiddlewares ...mux.MiddlewareFunc) { publicRoute.HandleFunc("/.well-known/openid-configuration", s.handleOIDCConfiguration).Methods("GET") publicRoute.HandleFunc("/.well-known/jwks.json", s.handleOIDCJWKS).Methods("GET") + publicRoute. + HandleFunc(s.BasePath+"/runner-fleets/sync", s.runnerAPI.FleetSync). + Methods("POST") + publicRoute. + HandleFunc(s.BasePath+"/runner-fleets/tasks/{taskId}/complete", s.runnerAPI.FleetTaskComplete). + Methods("POST") + // // Webhook endpoints for triggers // @@ -607,6 +626,9 @@ func (s *Server) InitRouter(additionalMiddlewares ...mux.MiddlewareFunc) { adminRoute.HandleFunc("/impersonate/status", s.impersonationStatus).Methods("GET") adminRoute.HandleFunc("/accounts/{accountId}/promote", s.promoteAdmin).Methods("POST") adminRoute.HandleFunc("/accounts/{accountId}/demote", s.demoteAdmin).Methods("POST") + adminRoute.HandleFunc("/runner/fleets", s.runnerAPI.AdminRegisterFleet).Methods("POST") + adminRoute.HandleFunc("/runner/fleets", s.runnerAPI.AdminListFleets).Methods("GET") + adminRoute.HandleFunc("/runner/fleets/{fleetId}", s.runnerAPI.AdminDeleteFleet).Methods("DELETE") // Apply additional middlewares for _, middleware := range additionalMiddlewares { diff --git a/pkg/runners/api/admin.go b/pkg/runners/api/admin.go new file mode 100644 index 0000000000..033e3387af --- /dev/null +++ b/pkg/runners/api/admin.go @@ -0,0 +1,106 @@ +package api + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" + "gorm.io/gorm" +) + +type registerFleetRequest struct { + Name string `json:"name"` +} + +type fleetResponse struct { + ID string `json:"id"` + Name string `json:"name"` + CreatedAt string `json:"created_at,omitempty"` +} + +func fleetToResponse(f runnermodels.RunnerFleet) fleetResponse { + r := fleetResponse{ + ID: f.ID.String(), + Name: f.Name, + } + if f.CreatedAt != nil { + r.CreatedAt = f.CreatedAt.Format("2006-01-02T15:04:05Z07:00") + } + return r +} + +// AdminRegisterFleet registers a new runner fleet (bridge mode; fleet-manager pulls jobs from SuperPlane). +func (h *Handler) AdminRegisterFleet(w http.ResponseWriter, r *http.Request) { + var req registerFleetRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + if strings.TrimSpace(req.Name) == "" { + http.Error(w, "name is required", http.StatusBadRequest) + return + } + + authToken := uuid.New().String() + + fleet, err := h.store().CreateFleet(req.Name, authToken) + if err != nil { + log.Errorf("admin: failed to register runner fleet: %v", err) + http.Error(w, "Failed to register fleet", http.StatusInternalServerError) + return + } + + type fleetCreatedResponse struct { + fleetResponse + AuthToken string `json:"auth_token"` + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(fleetCreatedResponse{fleetToResponse(*fleet), fleet.AuthToken}) +} + +// AdminListFleets lists all registered runner fleets. +func (h *Handler) AdminListFleets(w http.ResponseWriter, r *http.Request) { + fleets, err := h.store().ListFleets() + if err != nil { + log.Errorf("admin: failed to list runner fleets: %v", err) + http.Error(w, "Failed to list fleets", http.StatusInternalServerError) + return + } + + items := make([]fleetResponse, 0, len(fleets)) + for _, f := range fleets { + items = append(items, fleetToResponse(f)) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(items) +} + +// AdminDeleteFleet deletes a registered runner fleet by ID. +func (h *Handler) AdminDeleteFleet(w http.ResponseWriter, r *http.Request) { + fleetIDStr := mux.Vars(r)["fleetId"] + fleetID, err := uuid.Parse(fleetIDStr) + if err != nil { + http.Error(w, "Fleet not found", http.StatusNotFound) + return + } + + if err := h.store().DeleteFleet(fleetID); err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + http.Error(w, "Fleet not found", http.StatusNotFound) + return + } + log.Errorf("admin: failed to delete runner fleet %s: %v", fleetID, err) + http.Error(w, "Failed to delete fleet", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/runners/api/admin_test.go b/pkg/runners/api/admin_test.go new file mode 100644 index 0000000000..9aeaf2c9df --- /dev/null +++ b/pkg/runners/api/admin_test.go @@ -0,0 +1,66 @@ +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/test/support" +) + +func adminPOST(t *testing.T, h *Handler, body any) *httptest.ResponseRecorder { + t.Helper() + var buf bytes.Buffer + require.NoError(t, json.NewEncoder(&buf).Encode(body)) + req := httptest.NewRequest(http.MethodPost, "/admin/api/runner/fleets", &buf) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + h.AdminRegisterFleet(rec, req) + return rec +} + +func adminGET(t *testing.T, h *Handler) *httptest.ResponseRecorder { + t.Helper() + req := httptest.NewRequest(http.MethodGet, "/admin/api/runner/fleets", nil) + rec := httptest.NewRecorder() + h.AdminListFleets(rec, req) + return rec +} + +func TestAdminRegisterRunnerFleet(t *testing.T) { + r := support.Setup(t) + defer r.Close() + h := testHandler(t, r) + + t.Run("registers fleet and returns auth token", func(t *testing.T) { + rec := adminPOST(t, h, map[string]any{ + "name": "fleet-" + uuid.New().String(), + }) + require.Equal(t, http.StatusCreated, rec.Code) + + var created struct { + ID string `json:"id"` + Name string `json:"name"` + AuthToken string `json:"auth_token"` + } + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &created)) + assert.Contains(t, created.Name, "fleet-") + assert.NotEmpty(t, created.AuthToken) + assert.NotEmpty(t, created.ID) + }) + + t.Run("list fleets", func(t *testing.T) { + rec := adminGET(t, h) + require.Equal(t, http.StatusOK, rec.Code) + + var items []fleetResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &items)) + require.NotEmpty(t, items) + assert.NotEmpty(t, items[0].Name) + }) +} diff --git a/pkg/runners/api/bridge.go b/pkg/runners/api/bridge.go new file mode 100644 index 0000000000..8ea1da270d --- /dev/null +++ b/pkg/runners/api/bridge.go @@ -0,0 +1,118 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" +) + +// FleetSync is POST /runner-fleets/sync (fleet-manager pulls the next job). +func (h *Handler) FleetSync(w http.ResponseWriter, r *http.Request) { + fleet, ok := h.authenticateRunnerFleet(w, r) + if !ok { + return + } + + task, err := h.store().ClaimNextQueuedJob(fleet.ID) + if err != nil { + log.Errorf("runner fleet sync: claim job: %v", err) + http.Error(w, "could not claim job", http.StatusInternalServerError) + return + } + + resp := runnermodels.FleetSyncResponse{Continue: task != nil} + if task != nil { + resp.Job = &runnermodels.FleetBridgeJob{ + ID: task.ID.String(), + Spec: task.Spec.Data(), + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// FleetTaskComplete is POST /runner-fleets/tasks/{taskId}/complete. +func (h *Handler) FleetTaskComplete(w http.ResponseWriter, r *http.Request) { + fleet, ok := h.authenticateRunnerFleet(w, r) + if !ok { + return + } + + taskIDStr := strings.TrimSpace(mux.Vars(r)["taskId"]) + taskID, err := uuid.Parse(taskIDStr) + if err != nil { + http.Error(w, "task not found", http.StatusNotFound) + return + } + + r.Body = http.MaxBytesReader(w, r.Body, MaxRequestBodyBytes) + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + if _, ok := err.(*http.MaxBytesError); ok { + http.Error(w, "request body too large", http.StatusRequestEntityTooLarge) + return + } + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + var req runnermodels.FleetCompleteRequest + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + task, err := h.store().FindTask(taskID) + if err != nil { + http.Error(w, "task not found", http.StatusNotFound) + return + } + if task.FleetID != fleet.ID { + http.Error(w, "task not found", http.StatusNotFound) + return + } + + wasTerminal := task.IsTerminal() + task, err = h.store().CompleteJob(taskID, req) + if err != nil { + log.Errorf("runner fleet complete: %v", err) + http.Error(w, "could not complete task", http.StatusInternalServerError) + return + } + + if wasTerminal { + w.WriteHeader(http.StatusOK) + return + } + + if err := h.finishRunnerTask(task); err != nil { + log.Errorf("runner fleet complete: finish execution: %v", err) + http.Error(w, "could not finish execution", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (h *Handler) authenticateRunnerFleet(w http.ResponseWriter, r *http.Request) (*runnermodels.RunnerFleet, bool) { + auth := strings.TrimSpace(r.Header.Get("Authorization")) + if !strings.HasPrefix(auth, "Bearer ") { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return nil, false + } + token := strings.TrimSpace(strings.TrimPrefix(auth, "Bearer ")) + fleet, err := h.store().FindFleetByAuthToken(token) + if err != nil { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return nil, false + } + return fleet, true +} diff --git a/pkg/runners/api/bridge_test.go b/pkg/runners/api/bridge_test.go new file mode 100644 index 0000000000..30496e96d4 --- /dev/null +++ b/pkg/runners/api/bridge_test.go @@ -0,0 +1,241 @@ +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/google/uuid" + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/database" + "github.com/superplanehq/superplane/pkg/models" + "github.com/superplanehq/superplane/pkg/runners" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" + "github.com/superplanehq/superplane/test/support" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +func testHandler(t *testing.T, r *support.ResourceRegistry) *Handler { + t.Helper() + return New(Config{ + BaseURL: "http://localhost", + Registry: r.Registry, + AuthService: r.AuthService, + }) +} + +func fleetSyncPOST(t *testing.T, h *Handler, token string, body any) *httptest.ResponseRecorder { + t.Helper() + var buf bytes.Buffer + if body != nil { + require.NoError(t, json.NewEncoder(&buf).Encode(body)) + } + req := httptest.NewRequest(http.MethodPost, "/runner-fleets/sync", &buf) + req.Header.Set("Content-Type", "application/json") + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + rec := httptest.NewRecorder() + h.FleetSync(rec, req) + return rec +} + +func fleetCompletePOST(t *testing.T, h *Handler, taskID, token string, body any) *httptest.ResponseRecorder { + t.Helper() + var buf bytes.Buffer + if body != nil { + require.NoError(t, json.NewEncoder(&buf).Encode(body)) + } + req := httptest.NewRequest(http.MethodPost, "/runner-fleets/tasks/"+taskID+"/complete", &buf) + req = mux.SetURLVars(req, map[string]string{"taskId": taskID}) + req.Header.Set("Content-Type", "application/json") + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + rec := httptest.NewRecorder() + h.FleetTaskComplete(rec, req) + return rec +} + +func createCanvasWithComponentExecution( + t *testing.T, + r *support.ResourceRegistry, + componentName string, + nodeID string, + metadata map[string]any, +) (canvasID uuid.UUID, executionID uuid.UUID) { + t.Helper() + canvas, _ := support.CreateCanvas(t, r.Organization.ID, r.User, []models.CanvasNode{ + { + NodeID: "trigger-1", + Type: models.NodeTypeTrigger, + Ref: datatypes.NewJSONType(models.NodeRef{ + Trigger: &models.TriggerRef{Name: "start"}, + }), + }, + { + NodeID: nodeID, + Type: models.NodeTypeComponent, + Name: componentName, + Ref: datatypes.NewJSONType(models.NodeRef{ + Component: &models.ComponentRef{Name: componentName}, + }), + }, + }, nil) + + rootEvent := support.EmitCanvasEventForNode(t, canvas.ID, "trigger-1", "default", nil) + + var run *models.CanvasRun + require.NoError(t, database.Conn().Transaction(func(tx *gorm.DB) error { + var err error + run, err = models.FindOrCreateCanvasRunForRootEventInTransaction(tx, rootEvent) + if err != nil { + return err + } + return rootEvent.RoutedInTransaction(tx) + })) + + now := time.Now() + execution := models.CanvasNodeExecution{ + ID: uuid.New(), + WorkflowID: run.WorkflowID, + NodeID: nodeID, + RootEventID: rootEvent.ID, + RunID: run.ID, + EventID: rootEvent.ID, + State: models.CanvasNodeExecutionStatePending, + Configuration: datatypes.NewJSONType(map[string]any{}), + CreatedAt: &now, + UpdatedAt: &now, + } + if len(metadata) > 0 { + execution.Metadata = datatypes.NewJSONType(metadata) + } + require.NoError(t, database.Conn().Create(&execution).Error) + return canvas.ID, execution.ID +} + +func TestRunnerFleetBridgeSync(t *testing.T) { + r := support.Setup(t) + defer r.Close() + h := testHandler(t, r) + store := runners.NewPostgresStore() + + t.Run("unauthorized without bearer", func(t *testing.T) { + rec := fleetSyncPOST(t, h, "", map[string]any{}) + assert.Equal(t, http.StatusUnauthorized, rec.Code) + }) + + t.Run("unauthorized with bad token", func(t *testing.T) { + rec := fleetSyncPOST(t, h, "not-a-real-token", map[string]any{}) + assert.Equal(t, http.StatusUnauthorized, rec.Code) + }) + + t.Run("empty queue", func(t *testing.T) { + fleet, err := store.CreateFleet("bridge-empty-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + + rec := fleetSyncPOST(t, h, fleet.AuthToken, map[string]any{}) + require.Equal(t, http.StatusOK, rec.Code) + + var resp runnermodels.FleetSyncResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + assert.False(t, resp.Continue) + assert.Nil(t, resp.Job) + }) + + t.Run("returns queued job", func(t *testing.T) { + fleet, err := store.CreateFleet("bridge-jobs-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + + _, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", nil) + _, err = store.EnqueueJob(fleet.ID, execID, runnermodels.JobSpec{ + Commands: []string{"echo hello"}, + }) + require.NoError(t, err) + + rec := fleetSyncPOST(t, h, fleet.AuthToken, map[string]any{}) + require.Equal(t, http.StatusOK, rec.Code) + + var resp runnermodels.FleetSyncResponse + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp)) + require.True(t, resp.Continue) + require.NotNil(t, resp.Job) + assert.Equal(t, []string{"echo hello"}, resp.Job.Spec.Commands) + }) +} + +func TestRunnerFleetBridgeComplete(t *testing.T) { + r := support.Setup(t) + defer r.Close() + h := testHandler(t, r) + store := runners.NewPostgresStore() + + t.Run("unauthorized", func(t *testing.T) { + rec := fleetCompletePOST(t, h, uuid.New().String(), "", runnermodels.FleetCompleteRequest{}) + assert.Equal(t, http.StatusUnauthorized, rec.Code) + }) + + t.Run("task not found", func(t *testing.T) { + fleet, err := store.CreateFleet("bridge-complete-missing-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + + rec := fleetCompletePOST(t, h, uuid.New().String(), fleet.AuthToken, runnermodels.FleetCompleteRequest{ExitCode: 0}) + assert.Equal(t, http.StatusNotFound, rec.Code) + }) + + t.Run("wrong fleet", func(t *testing.T) { + fleetA, err := store.CreateFleet("bridge-a-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + fleetB, err := store.CreateFleet("bridge-b-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + + _, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-wf", nil) + task, err := store.EnqueueJob(fleetA.ID, execID, runnermodels.JobSpec{Commands: []string{"echo"}}) + require.NoError(t, err) + + rec := fleetCompletePOST(t, h, task.ID.String(), fleetB.AuthToken, runnermodels.FleetCompleteRequest{ExitCode: 0}) + assert.Equal(t, http.StatusNotFound, rec.Code) + }) + + t.Run("invalid json body", func(t *testing.T) { + fleet, err := store.CreateFleet("bridge-bad-json-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + _, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-json", nil) + task, err := store.EnqueueJob(fleet.ID, execID, runnermodels.JobSpec{Commands: []string{"echo"}}) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/runner-fleets/tasks/"+task.ID.String()+"/complete", strings.NewReader("{")) + req = mux.SetURLVars(req, map[string]string{"taskId": task.ID.String()}) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+fleet.AuthToken) + rec := httptest.NewRecorder() + h.FleetTaskComplete(rec, req) + assert.Equal(t, http.StatusBadRequest, rec.Code) + }) + + t.Run("idempotent when already terminal", func(t *testing.T) { + fleet, err := store.CreateFleet("bridge-idempotent-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + _, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-idem", nil) + task, err := store.EnqueueJob(fleet.ID, execID, runnermodels.JobSpec{Commands: []string{"echo"}}) + require.NoError(t, err) + + claimed, err := store.ClaimNextQueuedJob(fleet.ID) + require.NoError(t, err) + require.NotNil(t, claimed) + + _, err = store.CompleteJob(task.ID, runnermodels.FleetCompleteRequest{ExitCode: 0}) + require.NoError(t, err) + + rec := fleetCompletePOST(t, h, task.ID.String(), fleet.AuthToken, runnermodels.FleetCompleteRequest{ExitCode: 0}) + assert.Equal(t, http.StatusOK, rec.Code) + }) +} diff --git a/pkg/runners/api/finish.go b/pkg/runners/api/finish.go new file mode 100644 index 0000000000..998237990a --- /dev/null +++ b/pkg/runners/api/finish.go @@ -0,0 +1,64 @@ +package api + +import ( + "fmt" + + "github.com/google/uuid" + runneraction "github.com/superplanehq/superplane/pkg/components/runner" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/database" + "github.com/superplanehq/superplane/pkg/grpc/actions/messages" + "github.com/superplanehq/superplane/pkg/logging" + "github.com/superplanehq/superplane/pkg/models" + "github.com/superplanehq/superplane/pkg/runners" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" + "github.com/superplanehq/superplane/pkg/workers/contexts" +) + +func (h *Handler) finishRunnerTask(runnerTask *runnermodels.RunnerTask) error { + if runnerTask == nil { + return fmt.Errorf("runner task is nil") + } + + tx := database.Conn() + execution, err := models.FindNodeExecutionByID(tx, runnerTask.ExecutionID) + if err != nil { + return fmt.Errorf("execution not found: %w", err) + } + + node, err := models.FindCanvasNode(tx, execution.WorkflowID, execution.NodeID) + if err != nil { + return fmt.Errorf("node not found: %w", err) + } + + newEvents := []models.CanvasEvent{} + onNewEvents := func(events []models.CanvasEvent) { + newEvents = append(newEvents, events...) + } + + execCtx := &core.ExecutionContext{ + ID: execution.ID, + WorkflowID: execution.WorkflowID.String(), + NodeID: execution.NodeID, + BaseURL: h.BaseURL, + Configuration: execution.Configuration.Data(), + HTTP: h.Registry.HTTPContext(), + Metadata: contexts.NewExecutionMetadataContext(tx, execution), + NodeMetadata: contexts.NewNodeMetadataContext(tx, node), + ExecutionState: contexts.NewExecutionStateContext(tx, execution, onNewEvents), + Requests: contexts.NewExecutionRequestContext(tx, execution), + Logger: logging.ForExecution(execution, nil), + Notifications: contexts.NewNotificationContext(tx, uuid.Nil, execution.WorkflowID), + CanvasMemory: contexts.NewCanvasMemoryContext(tx, execution.WorkflowID), + } + + fleetTask := runners.FleetTaskFromRunnerTask(runnerTask) + if err := runneraction.FinishFleetTask(execCtx.Metadata, execCtx.ExecutionState, fleetTask, runnerTask.ID.String()); err != nil { + return err + } + + for _, event := range newEvents { + messages.PublishCanvasEventCreatedMessage(&event) + } + return nil +} diff --git a/pkg/runners/api/handler.go b/pkg/runners/api/handler.go new file mode 100644 index 0000000000..2bae5e38d8 --- /dev/null +++ b/pkg/runners/api/handler.go @@ -0,0 +1,47 @@ +package api + +import ( + "github.com/superplanehq/superplane/pkg/authorization" + "github.com/superplanehq/superplane/pkg/registry" + "github.com/superplanehq/superplane/pkg/runners" +) + +// MaxRequestBodyBytes limits runner webhook and fleet-bridge request bodies. +const MaxRequestBodyBytes = 64 * 1024 + +// Handler serves runner fleet admin, bridge, and live-log HTTP endpoints. +type Handler struct { + BaseURL string + Registry *registry.Registry + AuthService authorization.Authorization + Store runners.Store +} + +// Config wires dependencies for a Handler. +type Config struct { + BaseURL string + Registry *registry.Registry + AuthService authorization.Authorization + Store runners.Store +} + +// New returns a Handler with a Postgres runner store when Store is nil. +func New(cfg Config) *Handler { + store := cfg.Store + if store == nil { + store = runners.NewPostgresStore() + } + return &Handler{ + BaseURL: cfg.BaseURL, + Registry: cfg.Registry, + AuthService: cfg.AuthService, + Store: store, + } +} + +func (h *Handler) store() runners.Store { + if h.Store != nil { + return h.Store + } + return runners.NewPostgresStore() +} diff --git a/pkg/runners/api/live_logs.go b/pkg/runners/api/live_logs.go new file mode 100644 index 0000000000..fa33d55c8b --- /dev/null +++ b/pkg/runners/api/live_logs.go @@ -0,0 +1,175 @@ +package api + +import ( + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/gorilla/mux" + runneraction "github.com/superplanehq/superplane/pkg/components/runner" + "github.com/superplanehq/superplane/pkg/database" + "github.com/superplanehq/superplane/pkg/models" + "github.com/superplanehq/superplane/pkg/public/middleware" + "github.com/superplanehq/superplane/pkg/runners" + "github.com/superplanehq/superplane/pkg/runners/livelogs" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" +) + +// LiveLogStream tails CloudWatch logs for a runner node execution. +func (h *Handler) LiveLogStream(w http.ResponseWriter, r *http.Request) { + user, ok := middleware.GetUserFromContext(r.Context()) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + allowed, err := h.AuthService.CheckOrganizationPermission( + user.ID.String(), + user.OrganizationID.String(), + "canvases", + "read", + ) + if err != nil { + http.Error(w, "Authorization check failed", http.StatusInternalServerError) + return + } + if !allowed { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + vars := mux.Vars(r) + canvasID, err := uuid.Parse(strings.TrimSpace(vars["canvas_id"])) + if err != nil { + http.Error(w, "Invalid canvas id", http.StatusBadRequest) + return + } + executionID, err := uuid.Parse(strings.TrimSpace(vars["execution_id"])) + if err != nil { + http.Error(w, "Invalid execution id", http.StatusBadRequest) + return + } + + if _, err := models.FindCanvas(user.OrganizationID, canvasID); err != nil { + http.Error(w, "Canvas not found", http.StatusNotFound) + return + } + + execution, err := models.FindNodeExecution(canvasID, executionID) + if err != nil { + http.Error(w, "Execution not found", http.StatusNotFound) + return + } + + node, err := models.FindCanvasNode(database.Conn(), canvasID, execution.NodeID) + if err != nil { + http.Error(w, "Node not found", http.StatusNotFound) + return + } + + ref := node.Ref.Data() + if ref.Component == nil || ref.Component.Name != "runner" { + http.Error(w, "Live logs are only available for Runner components", http.StatusBadRequest) + return + } + + group, stream, region, hasCloudWatch := resolveCloudWatchLogSink(h.store(), execution.Metadata.Data()) + var storedOutput string + if !hasCloudWatch { + if task, err := h.store().FindTaskByExecutionID(executionID); err == nil { + storedOutput = strings.TrimSpace(task.Output) + if storedOutput == "" { + storedOutput = strings.TrimSpace(task.Error) + } + } + } + if !hasCloudWatch && storedOutput == "" { + http.Error( + w, + "Logs are not available for this execution yet. For live streaming, configure CloudWatch on fleet-manager and AWS credentials on SuperPlane.", + http.StatusNotFound, + ) + return + } + + w.Header().Set("Content-Type", "application/x-ndjson; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + if flusher != nil { + flusher.Flush() + } + + if hasCloudWatch { + _ = livelogs.StreamCloudWatchLogToNDJSON(r.Context(), w, flusher, group, stream, region) + return + } + _ = livelogs.StreamTextOutputToNDJSON(w, flusher, storedOutput) +} + +func resolveCloudWatchLogSink(store runners.Store, meta any) (group, stream, region string, ok bool) { + m, _ := meta.(map[string]any) + if m == nil { + return "", "", "", false + } + + if v, exists := m[runneraction.ExecutionMetadataTaskLog]; exists && v != nil { + if sink := taskLogMapToSink(v); sink != nil && sink.CloudWatch != nil { + g := strings.TrimSpace(sink.CloudWatch.LogGroupName) + s := strings.TrimSpace(sink.CloudWatch.LogStreamName) + if g != "" && s != "" { + return g, s, strings.TrimSpace(sink.CloudWatch.Region), true + } + } + } + + if brokerID, ok2 := m[runneraction.ExecutionMetadataBrokerTaskID].(string); ok2 && strings.TrimSpace(brokerID) != "" { + if taskID, err := uuid.Parse(strings.TrimSpace(brokerID)); err == nil { + if task, err := store.FindTask(taskID); err == nil { + if sink := task.TaskLog.Data(); sink != nil && sink.CloudWatch != nil { + g := strings.TrimSpace(sink.CloudWatch.LogGroupName) + s := strings.TrimSpace(sink.CloudWatch.LogStreamName) + if g != "" && s != "" { + return g, s, strings.TrimSpace(sink.CloudWatch.Region), true + } + } + } + } + } + + return "", "", "", false +} + +func taskLogMapToSink(v any) *runnermodels.FleetTaskLog { + switch t := v.(type) { + case runnermodels.FleetTaskLog: + return &t + case *runnermodels.FleetTaskLog: + return t + case map[string]any: + ft := &runnermodels.FleetTaskLog{} + if typ, ok := t["type"].(string); ok { + ft.Type = typ + } + if cw, ok := t["cloudwatch"].(map[string]any); ok { + ft.CloudWatch = &struct { + LogGroupName string `json:"log_group_name"` + LogStreamName string `json:"log_stream_name"` + Region string `json:"region,omitempty"` + }{} + if g, ok := cw["log_group_name"].(string); ok { + ft.CloudWatch.LogGroupName = g + } + if s, ok := cw["log_stream_name"].(string); ok { + ft.CloudWatch.LogStreamName = s + } + if r, ok := cw["region"].(string); ok { + ft.CloudWatch.Region = r + } + } + if ft.Type != "" { + return ft + } + } + return nil +} diff --git a/pkg/runners/api/live_logs_test.go b/pkg/runners/api/live_logs_test.go new file mode 100644 index 0000000000..70f61fd007 --- /dev/null +++ b/pkg/runners/api/live_logs_test.go @@ -0,0 +1,142 @@ +package api + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/google/uuid" + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runneraction "github.com/superplanehq/superplane/pkg/components/runner" + "github.com/superplanehq/superplane/pkg/public/middleware" + "github.com/superplanehq/superplane/pkg/runners" + runnermodels "github.com/superplanehq/superplane/pkg/runners/models" + "github.com/superplanehq/superplane/test/support" +) + +func liveLogGET( + t *testing.T, + h *Handler, + r *support.ResourceRegistry, + canvasID, executionID string, +) *httptest.ResponseRecorder { + t.Helper() + req := httptest.NewRequest( + http.MethodGet, + fmt.Sprintf("/api/v1/canvases/%s/node-executions/%s/runner-live-logs", canvasID, executionID), + nil, + ) + req = mux.SetURLVars(req, map[string]string{ + "canvas_id": canvasID, + "execution_id": executionID, + }) + ctx := context.WithValue(req.Context(), middleware.UserContextKey, r.UserModel) + req = req.WithContext(ctx) + rec := httptest.NewRecorder() + h.LiveLogStream(rec, req) + return rec +} + +func TestHandleRunnerLiveLogStream(t *testing.T) { + r := support.Setup(t) + defer r.Close() + h := testHandler(t, r) + + t.Run("no user in context", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, + fmt.Sprintf("/api/v1/canvases/%s/node-executions/%s/runner-live-logs", uuid.New(), uuid.New()), + nil, + ) + req = mux.SetURLVars(req, map[string]string{ + "canvas_id": uuid.New().String(), + "execution_id": uuid.New().String(), + }) + rec := httptest.NewRecorder() + h.LiveLogStream(rec, req) + assert.Equal(t, http.StatusUnauthorized, rec.Code) + }) + + t.Run("invalid canvas id", func(t *testing.T) { + rec := liveLogGET(t, h, r, "not-a-uuid", uuid.New().String()) + assert.Equal(t, http.StatusBadRequest, rec.Code) + assert.Contains(t, rec.Body.String(), "Invalid canvas id") + }) + + t.Run("invalid execution id", func(t *testing.T) { + canvasID, _ := createCanvasWithComponentExecution(t, r, "runner", "runner-1", nil) + rec := liveLogGET(t, h, r, canvasID.String(), "bad-id") + assert.Equal(t, http.StatusBadRequest, rec.Code) + assert.Contains(t, rec.Body.String(), "Invalid execution id") + }) + + t.Run("canvas not found", func(t *testing.T) { + rec := liveLogGET(t, h, r, uuid.New().String(), uuid.New().String()) + assert.Equal(t, http.StatusNotFound, rec.Code) + assert.Contains(t, rec.Body.String(), "Canvas not found") + }) + + t.Run("execution not found", func(t *testing.T) { + canvasID, _ := createCanvasWithComponentExecution(t, r, "runner", "runner-1", nil) + rec := liveLogGET(t, h, r, canvasID.String(), uuid.New().String()) + assert.Equal(t, http.StatusNotFound, rec.Code) + assert.Contains(t, rec.Body.String(), "Execution not found") + }) + + t.Run("non-runner node", func(t *testing.T) { + canvasID, execID := createCanvasWithComponentExecution(t, r, "noop", "noop-1", map[string]any{ + runneraction.ExecutionMetadataBrokerTaskID: "tb-1", + }) + rec := liveLogGET(t, h, r, canvasID.String(), execID.String()) + assert.Equal(t, http.StatusBadRequest, rec.Code) + assert.Contains(t, rec.Body.String(), "Runner components") + }) + + t.Run("broker task id missing", func(t *testing.T) { + canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{}) + rec := liveLogGET(t, h, r, canvasID.String(), execID.String()) + assert.Equal(t, http.StatusNotFound, rec.Code) + assert.Contains(t, rec.Body.String(), "not available for this execution") + }) + + t.Run("stored task output without cloudwatch", func(t *testing.T) { + canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ + runneraction.ExecutionMetadataBrokerTaskID: uuid.New().String(), + }) + store := runners.NewPostgresStore() + fleet, err := store.CreateFleet("live-log-fleet-"+uuid.New().String(), uuid.New().String()) + require.NoError(t, err) + task, err := store.EnqueueJob(fleet.ID, execID, runnermodels.JobSpec{Commands: []string{"echo hi"}}) + require.NoError(t, err) + _, err = store.CompleteJob(task.ID, runnermodels.FleetCompleteRequest{ + ExitCode: 0, + Output: "hello from runner\nsecond line", + }) + require.NoError(t, err) + + rec := liveLogGET(t, h, r, canvasID.String(), execID.String()) + require.Equal(t, http.StatusOK, rec.Code) + assert.Contains(t, rec.Body.String(), `"type":"line"`) + assert.Contains(t, rec.Body.String(), "hello from runner") + }) + + t.Run("cloudwatch task log starts ndjson stream", func(t *testing.T) { + canvasID, execID := createCanvasWithComponentExecution(t, r, "runner", "runner-1", map[string]any{ + runneraction.ExecutionMetadataTaskLog: map[string]any{ + "type": "cloudwatch", + "cloudwatch": map[string]any{ + "log_group_name": "/test/group", + "log_stream_name": "task-1", + "region": "us-east-1", + }, + }, + }) + rec := liveLogGET(t, h, r, canvasID.String(), execID.String()) + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, "application/x-ndjson; charset=utf-8", rec.Header().Get("Content-Type")) + assert.Equal(t, "no-store", rec.Header().Get("Cache-Control")) + }) +} diff --git a/pkg/runners/api/org_fleets.go b/pkg/runners/api/org_fleets.go new file mode 100644 index 0000000000..edb972ff50 --- /dev/null +++ b/pkg/runners/api/org_fleets.go @@ -0,0 +1,58 @@ +package api + +import ( + "encoding/json" + "net/http" + + log "github.com/sirupsen/logrus" + "github.com/superplanehq/superplane/pkg/features" + "github.com/superplanehq/superplane/pkg/models" + "github.com/superplanehq/superplane/pkg/public/middleware" +) + +type orgFleetOption struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// OrgListFleets lists runner fleets for canvas configuration (machine type dropdown). +// Requires organization auth; returns an empty list when the runner feature is disabled for the org. +func (h *Handler) OrgListFleets(w http.ResponseWriter, r *http.Request) { + user, ok := middleware.GetUserFromContext(r.Context()) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + enabled, err := models.HasExperimentalFeature(user.OrganizationID, features.FeatureRunner) + if err != nil { + log.Errorf("runner fleets: check experimental feature: %v", err) + http.Error(w, "Failed to list machine types", http.StatusInternalServerError) + return + } + + items := []orgFleetOption{} + if !enabled { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(items) + return + } + + fleets, err := h.store().ListFleets() + if err != nil { + log.Errorf("runner fleets: list: %v", err) + http.Error(w, "Failed to list machine types", http.StatusInternalServerError) + return + } + + items = make([]orgFleetOption, 0, len(fleets)) + for _, f := range fleets { + items = append(items, orgFleetOption{ + ID: f.ID.String(), + Name: f.Name, + }) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(items) +} diff --git a/pkg/runners/api/org_fleets_test.go b/pkg/runners/api/org_fleets_test.go new file mode 100644 index 0000000000..26a40a3e90 --- /dev/null +++ b/pkg/runners/api/org_fleets_test.go @@ -0,0 +1,62 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/models" + "github.com/superplanehq/superplane/pkg/public/middleware" + "github.com/superplanehq/superplane/test/support" +) + +func TestOrgListFleets(t *testing.T) { + r := support.Setup(t) + defer r.Close() + h := testHandler(t, r) + + t.Run("requires user in context", func(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/runner-fleets", nil) + h.OrgListFleets(rec, req) + assert.Equal(t, http.StatusUnauthorized, rec.Code) + }) + + t.Run("empty when runner feature disabled", func(t *testing.T) { + _ = models.DisableExperimentalFeature(r.Organization.ID, "runner") + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/runner-fleets", nil) + req = req.WithContext(context.WithValue(req.Context(), middleware.UserContextKey, r.UserModel)) + h.OrgListFleets(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + var items []orgFleetOption + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &items)) + assert.Empty(t, items) + }) + + t.Run("lists fleets when runner enabled", func(t *testing.T) { + require.NoError(t, models.EnableExperimentalFeature(r.Organization.ID, "runner")) + + created := adminPOST(t, h, map[string]any{"name": "fleet-" + uuid.New().String()}) + require.Equal(t, http.StatusCreated, created.Code) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/runner-fleets", nil) + req = req.WithContext(context.WithValue(req.Context(), middleware.UserContextKey, r.UserModel)) + h.OrgListFleets(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + var items []orgFleetOption + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &items)) + require.NotEmpty(t, items) + assert.NotEmpty(t, items[0].ID) + assert.NotEmpty(t, items[0].Name) + }) +} diff --git a/pkg/runners/livelogs/output.go b/pkg/runners/livelogs/output.go new file mode 100644 index 0000000000..e69dd690e6 --- /dev/null +++ b/pkg/runners/livelogs/output.go @@ -0,0 +1,23 @@ +// Package livelogs streams runner task output as NDJSON for the live-log API. +package livelogs + +import ( + "io" + "net/http" + "strings" +) + +// StreamTextOutputToNDJSON writes stored task output as line records (local dev without CloudWatch). +func StreamTextOutputToNDJSON(w io.Writer, flusher http.Flusher, output string) error { + nw := ndjsonWriter{w: w, f: flusher} + text := strings.TrimSpace(output) + if text == "" { + return nil + } + for _, line := range strings.Split(text, "\n") { + if err := nw.writeRecord(map[string]any{"type": "line", "text": line}); err != nil { + return err + } + } + return nil +} diff --git a/pkg/runners/livelogs/stream.go b/pkg/runners/livelogs/stream.go new file mode 100644 index 0000000000..75eabe22f3 --- /dev/null +++ b/pkg/runners/livelogs/stream.go @@ -0,0 +1,194 @@ +// Package livelogs streams Amazon CloudWatch Logs task output as NDJSON. +package livelogs + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" +) + +const pollQuiet = 2 * time.Second + +type ndjsonWriter struct { + w io.Writer + f http.Flusher +} + +func (n *ndjsonWriter) writeRecord(rec map[string]any) error { + b, err := json.Marshal(rec) + if err != nil { + return err + } + if _, err := n.w.Write(append(b, '\n')); err != nil { + return err + } + if n.f != nil { + n.f.Flush() + } + return nil +} + +// StreamCloudWatchLogToNDJSON tails a CloudWatch Logs stream and writes newline-delimited JSON records. +func StreamCloudWatchLogToNDJSON(ctx context.Context, w io.Writer, flusher http.Flusher, group, stream, region string) error { + group = strings.TrimSpace(group) + stream = strings.TrimSpace(stream) + if group == "" || stream == "" { + return fmt.Errorf("log group and stream are required") + } + + region = strings.TrimSpace(region) + if region == "" { + region = strings.TrimSpace(os.Getenv("AWS_REGION")) + } + if region == "" { + region = strings.TrimSpace(os.Getenv("AWS_DEFAULT_REGION")) + } + if region == "" { + region = "us-east-1" + } + + awscfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region)) + if err != nil { + return fmt.Errorf("aws config: %w", err) + } + + client := cloudwatchlogs.NewFromConfig(awscfg) + nw := ndjsonWriter{w: w, f: flusher} + + var nextForward *string + var lastToken string + + for { + if err := ctx.Err(); err != nil { + return err + } + + out, err := client.GetLogEvents(ctx, &cloudwatchlogs.GetLogEventsInput{ + LogGroupName: awsString(group), + LogStreamName: awsString(stream), + NextToken: nextForward, + StartFromHead: awsBool(nextForward == nil), + Limit: awsInt32(256), + }) + if err != nil { + _ = nw.writeRecord(map[string]any{ + "type": "error", + "message": err.Error(), + }) + return err + } + + token := awsToString(out.NextForwardToken) + nextForward = out.NextForwardToken + + if len(out.Events) == 0 { + if token != "" && token == lastToken { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollQuiet): + } + continue + } + } + + lastToken = token + + for _, ev := range out.Events { + msg := awsToString(ev.Message) + if rec, ok := parseRunnerControlRecord(msg); ok { + if err := nw.writeRecord(rec); err != nil { + return err + } + continue + } + if err := nw.writeRecord(map[string]any{"type": "line", "text": msg}); err != nil { + return err + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(400 * time.Millisecond): + } + } +} + +func parseRunnerControlRecord(message string) (map[string]any, bool) { + var envelope struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(message), &envelope); err != nil { + return nil, false + } + switch envelope.Type { + case "cmd_start": + var rec struct { + Type string `json:"type"` + Index int `json:"index"` + Text string `json:"text"` + } + if err := json.Unmarshal([]byte(message), &rec); err != nil { + return nil, false + } + if rec.Index < 0 { + return nil, false + } + return map[string]any{ + "type": "cmd_start", + "index": rec.Index, + "text": rec.Text, + }, true + case "cmd_end": + var rec struct { + Type string `json:"type"` + Index int `json:"index"` + Status string `json:"status"` + DurationMS int64 `json:"duration_ms"` + } + if err := json.Unmarshal([]byte(message), &rec); err != nil { + return nil, false + } + if rec.Index < 0 || rec.DurationMS < 0 { + return nil, false + } + if rec.Status != "passed" && rec.Status != "failed" { + return nil, false + } + return map[string]any{ + "type": "cmd_end", + "index": rec.Index, + "status": rec.Status, + "duration_ms": rec.DurationMS, + }, true + default: + return nil, false + } +} + +func awsString(s string) *string { + if s == "" { + return nil + } + return &s +} + +func awsBool(b bool) *bool { return &b } + +func awsInt32(n int32) *int32 { return &n } + +func awsToString(p *string) string { + if p == nil { + return "" + } + return *p +} diff --git a/pkg/runners/livelogs/stream_test.go b/pkg/runners/livelogs/stream_test.go new file mode 100644 index 0000000000..28475a8cbb --- /dev/null +++ b/pkg/runners/livelogs/stream_test.go @@ -0,0 +1,39 @@ +package livelogs + +import "testing" + +func TestParseRunnerControlRecord(t *testing.T) { + t.Run("cmd_start", func(t *testing.T) { + rec, ok := parseRunnerControlRecord(`{"type":"cmd_start","index":2,"text":"echo hello"}`) + if !ok { + t.Fatalf("expected cmd_start to parse") + } + if rec["type"] != "cmd_start" || rec["index"] != 2 || rec["text"] != "echo hello" { + t.Fatalf("unexpected cmd_start record: %#v", rec) + } + }) + + t.Run("cmd_end", func(t *testing.T) { + rec, ok := parseRunnerControlRecord(`{"type":"cmd_end","index":2,"status":"passed","duration_ms":37}`) + if !ok { + t.Fatalf("expected cmd_end to parse") + } + if rec["type"] != "cmd_end" || rec["index"] != 2 || rec["status"] != "passed" || rec["duration_ms"] != int64(37) { + t.Fatalf("unexpected cmd_end record: %#v", rec) + } + }) + + t.Run("invalid", func(t *testing.T) { + cases := []string{ + `{"type":"line","text":"hello"}`, + `{"type":"cmd_start","index":-1,"text":"echo hello"}`, + `{"type":"cmd_end","index":0,"status":"running","duration_ms":10}`, + `not-json`, + } + for _, item := range cases { + if _, ok := parseRunnerControlRecord(item); ok { + t.Fatalf("expected invalid record to fail: %s", item) + } + } + }) +} diff --git a/pkg/runners/models/bridge.go b/pkg/runners/models/bridge.go new file mode 100644 index 0000000000..06f36cace1 --- /dev/null +++ b/pkg/runners/models/bridge.go @@ -0,0 +1,25 @@ +package models + +import "encoding/json" + +// FleetSyncResponse is returned by POST /api/v1/runner-fleets/sync. +type FleetSyncResponse struct { + Continue bool `json:"continue"` + Job *FleetBridgeJob `json:"job,omitempty"` +} + +// FleetBridgeJob is a queued job for fleet-manager to run locally. +type FleetBridgeJob struct { + ID string `json:"id"` + Spec JobSpec `json:"spec"` +} + +// FleetCompleteRequest is POST /api/v1/runner-fleets/tasks/{id}/complete from fleet-manager. +type FleetCompleteRequest struct { + ExitCode int `json:"exit_code"` + Output string `json:"output"` + Error string `json:"error,omitempty"` + Canceled bool `json:"canceled,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + TaskLog *TaskLogSink `json:"task_log,omitempty"` +} diff --git a/pkg/runners/models/fleet.go b/pkg/runners/models/fleet.go new file mode 100644 index 0000000000..7576142f58 --- /dev/null +++ b/pkg/runners/models/fleet.go @@ -0,0 +1,16 @@ +package models + +import ( + "time" + + "github.com/google/uuid" +) + +type RunnerFleet struct { + ID uuid.UUID `gorm:"primaryKey;default:uuid_generate_v4()" json:"id"` + Name string `json:"name"` + AuthToken string `json:"-"` + CreatedAt *time.Time `json:"created_at"` +} + +func (RunnerFleet) TableName() string { return "runner_fleets" } diff --git a/pkg/runners/models/fleet_task.go b/pkg/runners/models/fleet_task.go new file mode 100644 index 0000000000..f32bc42c39 --- /dev/null +++ b/pkg/runners/models/fleet_task.go @@ -0,0 +1,39 @@ +package models + +import "encoding/json" + +// FleetTask is the task payload returned by fleet-manager (GET /v1/tasks/:id and webhook body). +type FleetTask struct { + TaskID string `json:"task_id"` + Status string `json:"status"` + ExitCode *int `json:"exit_code,omitempty"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + + TaskLog *FleetTaskLog `json:"task_log,omitempty"` + + CloudWatchLogGroup string `json:"cloudwatch_log_group,omitempty"` + CloudWatchLogStream string `json:"cloudwatch_log_stream,omitempty"` +} + +func (t *FleetTask) EffectiveExitCode() int { + if t == nil || t.ExitCode == nil { + return 0 + } + return *t.ExitCode +} + +func (t *FleetTask) IsInTerminalState() bool { + return t.Status == "succeeded" || t.Status == "failed" || t.Status == "canceled" +} + +// FleetTaskLog matches the fleet-manager JSON shape for CloudWatch-backed live logs. +type FleetTaskLog struct { + Type string `json:"type"` + CloudWatch *struct { + LogGroupName string `json:"log_group_name"` + LogStreamName string `json:"log_stream_name"` + Region string `json:"region,omitempty"` + } `json:"cloudwatch,omitempty"` +} diff --git a/pkg/runners/models/job_spec.go b/pkg/runners/models/job_spec.go new file mode 100644 index 0000000000..70d51284dc --- /dev/null +++ b/pkg/runners/models/job_spec.go @@ -0,0 +1,30 @@ +package models + +// JobSpec is the work payload SuperPlane queues for fleet-manager to pull. +type JobSpec struct { + Command []string `json:"command,omitempty"` + Commands []string `json:"commands,omitempty"` + Environment []FleetEnvironmentVariable `json:"environment,omitempty"` + ExecutionMode string `json:"execution_mode,omitempty"` + DockerImage string `json:"docker_image,omitempty"` + ExecutionTimeoutSeconds *int `json:"execution_timeout_seconds,omitempty"` +} + +// TaskLogSink describes where to read task logs (e.g. CloudWatch). +type TaskLogSink struct { + Type string `json:"type"` + CloudWatch *TaskLogSinkCloudWatch `json:"cloudwatch,omitempty"` +} + +// TaskLogSinkCloudWatch identifies a CloudWatch Logs stream. +type TaskLogSinkCloudWatch struct { + LogGroupName string `json:"log_group_name"` + LogStreamName string `json:"log_stream_name"` + Region string `json:"region,omitempty"` +} + +// FleetEnvironmentVariable is a name/value pair passed to fleet-manager. +type FleetEnvironmentVariable struct { + Name string `json:"name"` + Value string `json:"value"` +} diff --git a/pkg/runners/models/task.go b/pkg/runners/models/task.go new file mode 100644 index 0000000000..3d0a5f1dab --- /dev/null +++ b/pkg/runners/models/task.go @@ -0,0 +1,45 @@ +package models + +import ( + "time" + + "github.com/google/uuid" + "gorm.io/datatypes" +) + +const ( + TaskStatusQueued = "queued" + TaskStatusDispatched = "dispatched" + TaskStatusRunning = "running" + TaskStatusSucceeded = "succeeded" + TaskStatusFailed = "failed" + TaskStatusCanceled = "canceled" +) + +type RunnerTask struct { + ID uuid.UUID `gorm:"primaryKey;default:uuid_generate_v4()"` + FleetID uuid.UUID `gorm:"not null"` + FleetTaskID string `gorm:"not null"` + ExecutionID uuid.UUID `gorm:"not null"` + Status string `gorm:"not null;default:queued"` + Spec datatypes.JSONType[JobSpec] `gorm:"not null"` + ExitCode *int + Output string `gorm:"not null;default:''"` + Error string `gorm:"not null;default:''"` + Result datatypes.JSON + TaskLog datatypes.JSONType[*TaskLogSink] + DispatchedAt *time.Time + CompletedAt *time.Time + CreatedAt *time.Time +} + +func (RunnerTask) TableName() string { return "runner_tasks" } + +func (t *RunnerTask) IsTerminal() bool { + switch t.Status { + case TaskStatusSucceeded, TaskStatusFailed, TaskStatusCanceled: + return true + default: + return false + } +} diff --git a/pkg/runners/store.go b/pkg/runners/store.go new file mode 100644 index 0000000000..a8b507dbe8 --- /dev/null +++ b/pkg/runners/store.go @@ -0,0 +1,21 @@ +package runners + +import ( + "github.com/google/uuid" + "github.com/superplanehq/superplane/pkg/runners/models" +) + +type Store interface { + CreateFleet(name, authToken string) (*models.RunnerFleet, error) + ListFleets() ([]models.RunnerFleet, error) + FindFleet(id uuid.UUID) (*models.RunnerFleet, error) + FindFleetByAuthToken(token string) (*models.RunnerFleet, error) + DeleteFleet(id uuid.UUID) error + + FindTask(id uuid.UUID) (*models.RunnerTask, error) + FindTaskByExecutionID(executionID uuid.UUID) (*models.RunnerTask, error) + + EnqueueJob(fleetID, executionID uuid.UUID, spec models.JobSpec) (*models.RunnerTask, error) + ClaimNextQueuedJob(fleetID uuid.UUID) (*models.RunnerTask, error) + CompleteJob(taskID uuid.UUID, req models.FleetCompleteRequest) (*models.RunnerTask, error) +} diff --git a/pkg/runners/store_postgres.go b/pkg/runners/store_postgres.go new file mode 100644 index 0000000000..a5d52c0cdb --- /dev/null +++ b/pkg/runners/store_postgres.go @@ -0,0 +1,217 @@ +package runners + +import ( + "encoding/json" + "errors" + "strings" + "time" + + "github.com/google/uuid" + "github.com/superplanehq/superplane/pkg/database" + "github.com/superplanehq/superplane/pkg/runners/models" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type postgresStore struct{} + +func NewPostgresStore() Store { + return &postgresStore{} +} + +func (s *postgresStore) db() *gorm.DB { + return database.Conn() +} + +func (s *postgresStore) CreateFleet(name, authToken string) (*models.RunnerFleet, error) { + fleet := &models.RunnerFleet{ + Name: name, + AuthToken: authToken, + } + if err := s.db().Create(fleet).Error; err != nil { + return nil, err + } + return fleet, nil +} + +func (s *postgresStore) ListFleets() ([]models.RunnerFleet, error) { + var fleets []models.RunnerFleet + if err := s.db().Order("created_at ASC").Find(&fleets).Error; err != nil { + return nil, err + } + return fleets, nil +} + +func (s *postgresStore) FindFleet(id uuid.UUID) (*models.RunnerFleet, error) { + var fleet models.RunnerFleet + if err := s.db().First(&fleet, "id = ?", id).Error; err != nil { + return nil, err + } + return &fleet, nil +} + +func (s *postgresStore) FindFleetByAuthToken(token string) (*models.RunnerFleet, error) { + token = strings.TrimSpace(token) + if token == "" { + return nil, gorm.ErrRecordNotFound + } + var fleet models.RunnerFleet + if err := s.db().First(&fleet, "auth_token = ?", token).Error; err != nil { + return nil, err + } + return &fleet, nil +} + +func (s *postgresStore) DeleteFleet(id uuid.UUID) error { + return s.db().Delete(&models.RunnerFleet{}, "id = ?", id).Error +} + +func (s *postgresStore) FindTask(id uuid.UUID) (*models.RunnerTask, error) { + var task models.RunnerTask + if err := s.db().First(&task, "id = ?", id).Error; err != nil { + return nil, err + } + return &task, nil +} + +func (s *postgresStore) FindTaskByExecutionID(executionID uuid.UUID) (*models.RunnerTask, error) { + var task models.RunnerTask + if err := s.db().Where("execution_id = ?", executionID).First(&task).Error; err != nil { + return nil, err + } + return &task, nil +} + +func (s *postgresStore) EnqueueJob(fleetID, executionID uuid.UUID, spec models.JobSpec) (*models.RunnerTask, error) { + id := uuid.New() + task := &models.RunnerTask{ + ID: id, + FleetID: fleetID, + FleetTaskID: id.String(), + ExecutionID: executionID, + Status: models.TaskStatusQueued, + Spec: datatypes.NewJSONType(spec), + } + if err := s.db().Create(task).Error; err != nil { + return nil, err + } + return task, nil +} + +func (s *postgresStore) ClaimNextQueuedJob(fleetID uuid.UUID) (*models.RunnerTask, error) { + var task models.RunnerTask + err := s.db().Transaction(func(tx *gorm.DB) error { + var taskIDStr string + if err := tx.Raw(` + SELECT id::text FROM runner_tasks + WHERE fleet_id = ? AND status = ? + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + `, fleetID, models.TaskStatusQueued).Scan(&taskIDStr).Error; err != nil { + return err + } + if strings.TrimSpace(taskIDStr) == "" { + return nil + } + taskID, err := uuid.Parse(taskIDStr) + if err != nil { + return err + } + task.ID = taskID + now := time.Now().UTC() + res := tx.Model(&models.RunnerTask{}). + Where("id = ? AND status = ?", taskID, models.TaskStatusQueued). + Updates(map[string]any{ + "status": models.TaskStatusDispatched, + "dispatched_at": now, + }) + if res.Error != nil { + return res.Error + } + if res.RowsAffected == 0 { + task.ID = uuid.Nil + return nil + } + return tx.First(&task, "id = ?", taskID).Error + }) + if err != nil { + return nil, err + } + if task.ID == uuid.Nil { + return nil, nil + } + return &task, nil +} + +func (s *postgresStore) CompleteJob(taskID uuid.UUID, req models.FleetCompleteRequest) (*models.RunnerTask, error) { + var task models.RunnerTask + err := s.db().Transaction(func(tx *gorm.DB) error { + if err := tx.First(&task, "id = ?", taskID).Error; err != nil { + return err + } + if task.IsTerminal() { + return nil + } + + status := models.TaskStatusFailed + if req.Canceled { + status = models.TaskStatusCanceled + } else if req.ExitCode == 0 && strings.TrimSpace(req.Error) == "" { + status = models.TaskStatusSucceeded + } + + now := time.Now().UTC() + updates := map[string]any{ + "status": status, + "exit_code": req.ExitCode, + "output": req.Output, + "error": req.Error, + "completed_at": now, + } + if len(req.Result) > 0 { + updates["result"] = datatypes.JSON(req.Result) + } + if req.TaskLog != nil { + updates["task_log"] = datatypes.NewJSONType(req.TaskLog) + } + if err := tx.Model(&models.RunnerTask{}).Where("id = ?", taskID).Updates(updates).Error; err != nil { + return err + } + return tx.First(&task, "id = ?", taskID).Error + }) + if err != nil { + return nil, err + } + return &task, nil +} + +// FleetTaskFromRunnerTask builds a FleetTask for the runner component finish path. +func FleetTaskFromRunnerTask(t *models.RunnerTask) *models.FleetTask { + if t == nil { + return nil + } + ft := &models.FleetTask{ + TaskID: t.ID.String(), + Status: t.Status, + Output: t.Output, + Error: t.Error, + } + if t.ExitCode != nil { + ft.ExitCode = t.ExitCode + } + if len(t.Result) > 0 { + ft.Result = json.RawMessage(t.Result) + } + if sink := t.TaskLog.Data(); sink != nil { + ft.TaskLog = TaskLogToFleetLog(sink) + if sink.CloudWatch != nil { + ft.CloudWatchLogGroup = sink.CloudWatch.LogGroupName + ft.CloudWatchLogStream = sink.CloudWatch.LogStreamName + } + } + return ft +} + +// ErrTaskAlreadyTerminal is returned when completing an already-finished task. +var ErrTaskAlreadyTerminal = errors.New("runner task already terminal") diff --git a/pkg/runners/task_log.go b/pkg/runners/task_log.go new file mode 100644 index 0000000000..a4cb5bf17c --- /dev/null +++ b/pkg/runners/task_log.go @@ -0,0 +1,23 @@ +package runners + +import "github.com/superplanehq/superplane/pkg/runners/models" + +// TaskLogToFleetLog converts a stored task log sink to the fleet API shape. +func TaskLogToFleetLog(s *models.TaskLogSink) *models.FleetTaskLog { + if s == nil || s.Type == "" { + return nil + } + ft := &models.FleetTaskLog{Type: s.Type} + if s.CloudWatch != nil { + ft.CloudWatch = &struct { + LogGroupName string `json:"log_group_name"` + LogStreamName string `json:"log_stream_name"` + Region string `json:"region,omitempty"` + }{ + LogGroupName: s.CloudWatch.LogGroupName, + LogStreamName: s.CloudWatch.LogStreamName, + Region: s.CloudWatch.Region, + } + } + return ft +} diff --git a/web_src/src/App.tsx b/web_src/src/App.tsx index ac2aa16e76..53b7b973f6 100644 --- a/web_src/src/App.tsx +++ b/web_src/src/App.tsx @@ -27,6 +27,7 @@ import OrganizationsListAdmin from "./pages/admin/OrganizationsList"; import OrganizationDetailAdmin from "./pages/admin/OrganizationDetail"; import AccountsListAdmin from "./pages/admin/AccountsList"; import InstallationSettingsAdmin from "./pages/admin/InstallationSettings"; +import RunnerFleetsAdmin from "./pages/admin/RunnerFleets"; import ImpersonationBanner from "./components/ImpersonationBanner"; // Create a client @@ -85,6 +86,7 @@ function AppRouter() { } /> } /> } /> + } /> } /> diff --git a/web_src/src/hooks/useRunnerFleets.ts b/web_src/src/hooks/useRunnerFleets.ts new file mode 100644 index 0000000000..68a0e47ef1 --- /dev/null +++ b/web_src/src/hooks/useRunnerFleets.ts @@ -0,0 +1,29 @@ +import { withOrganizationHeader } from "@/lib/withOrganizationHeader"; +import { useQuery } from "@tanstack/react-query"; + +export type RunnerFleetOption = { + id: string; + name: string; +}; + +async function fetchRunnerFleets(organizationId: string): Promise { + const res = await fetch("/api/v1/runner-fleets", { + credentials: "include", + ...withOrganizationHeader({ organizationId }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(text.trim() || "Failed to load machine types"); + } + const data = (await res.json()) as RunnerFleetOption[]; + return Array.isArray(data) ? data : []; +} + +export function useRunnerFleets(organizationId: string | undefined, enabled = true) { + return useQuery({ + queryKey: ["runner-fleets", organizationId], + queryFn: () => fetchRunnerFleets(organizationId!), + enabled: enabled && !!organizationId, + staleTime: 60_000, + }); +} diff --git a/web_src/src/pages/admin/AdminLayout.tsx b/web_src/src/pages/admin/AdminLayout.tsx index 388de18adf..9bb7d950e2 100644 --- a/web_src/src/pages/admin/AdminLayout.tsx +++ b/web_src/src/pages/admin/AdminLayout.tsx @@ -1,7 +1,7 @@ import SuperplaneLogo from "@/assets/superplane.svg"; import { Text } from "@/components/Text/text"; import { useAccount } from "@/contexts/useAccount"; -import { ArrowLeft, Building, Network, Shield, Users } from "lucide-react"; +import { ArrowLeft, Building, Network, Shield, Terminal, Users } from "lucide-react"; import React from "react"; import { Link, Navigate, NavLink, Outlet } from "react-router-dom"; @@ -59,6 +59,10 @@ const AdminLayout: React.FC = () => { Settings + + + Runner fleets + diff --git a/web_src/src/pages/admin/RunnerFleets.tsx b/web_src/src/pages/admin/RunnerFleets.tsx new file mode 100644 index 0000000000..0b421a3330 --- /dev/null +++ b/web_src/src/pages/admin/RunnerFleets.tsx @@ -0,0 +1,331 @@ +import { Button } from "@/components/ui/button"; +import { Dialog, DialogActions, DialogDescription, DialogTitle } from "@/components/Dialog/dialog"; +import { Input } from "@/components/Input/input"; +import { Label } from "@/components/ui/label"; +import { Text } from "@/components/Text/text"; +import { Icon } from "@/components/Icon"; +import { showErrorToast, showSuccessToast } from "@/lib/toast"; +import { Copy, Plus, Terminal, Trash2 } from "lucide-react"; +import React, { useCallback, useEffect, useState } from "react"; +import { formatDate } from "./formatDate"; + +type RunnerFleet = { + id: string; + name: string; + created_at?: string; +}; + +type FleetCreatedPayload = RunnerFleet & { + auth_token: string; +}; + +async function readErrorMessage(response: Response, fallback: string): Promise { + const text = await response.text(); + return text.trim() || fallback; +} + +function CopyableValue({ label, value }: { label: string; value: string }) { + const [copied, setCopied] = useState(false); + + const handleCopy = async () => { + try { + await navigator.clipboard.writeText(value); + setCopied(true); + setTimeout(() => setCopied(false), 2000); + } catch { + showErrorToast("Failed to copy"); + } + }; + + return ( +
+ +
+ + {value} + + +
+
+ ); +} + +function FleetCreatedDialog({ + fleet, + open, + onClose, +}: { + fleet: FleetCreatedPayload | null; + open: boolean; + onClose: () => void; +}) { + if (!fleet) return null; + + return ( + + Fleet registered + +

+ {fleet.name} is ready. Copy these values now — the auth token is only shown once. +

+ + +
+

Fleet-manager environment

+

+ SUPERPLANE_URL — your SuperPlane API origin (e.g.{" "} + http://localhost:8000) +

+

+ SUPERPLANE_FLEET_AUTH_TOKEN — the auth token above +

+
+
+ + + +
+ ); +} + +function DeleteFleetDialog({ + fleet, + open, + deleting, + onClose, + onConfirm, +}: { + fleet: RunnerFleet | null; + open: boolean; + deleting: boolean; + onClose: () => void; + onConfirm: () => void; +}) { + return ( + + Delete runner fleet + + {fleet ? ( + <> + Delete {fleet.name}? Queued runner tasks for this fleet will be removed. This cannot be + undone. + + ) : null} + + + + + + + ); +} + +const RunnerFleets: React.FC = () => { + const [fleets, setFleets] = useState([]); + const [loading, setLoading] = useState(true); + const [creating, setCreating] = useState(false); + const [deleting, setDeleting] = useState(false); + const [name, setName] = useState(""); + const [createdFleet, setCreatedFleet] = useState(null); + const [deleteTarget, setDeleteTarget] = useState(null); + + const loadFleets = useCallback(async () => { + setLoading(true); + try { + const res = await fetch("/admin/api/runner/fleets", { credentials: "include" }); + if (!res.ok) { + showErrorToast(await readErrorMessage(res, "Failed to load runner fleets")); + return; + } + const data = (await res.json()) as RunnerFleet[]; + setFleets(Array.isArray(data) ? data : []); + } catch { + showErrorToast("Failed to load runner fleets"); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + void loadFleets(); + }, [loadFleets]); + + const handleCreate = async (e: React.FormEvent) => { + e.preventDefault(); + const trimmedName = name.trim(); + if (!trimmedName) { + showErrorToast("Fleet name is required"); + return; + } + + setCreating(true); + try { + const res = await fetch("/admin/api/runner/fleets", { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ name: trimmedName }), + }); + if (!res.ok) { + showErrorToast(await readErrorMessage(res, "Failed to register fleet")); + return; + } + const created = (await res.json()) as FleetCreatedPayload; + setCreatedFleet(created); + setName(""); + showSuccessToast(`Fleet "${created.name}" registered`); + await loadFleets(); + } catch { + showErrorToast("Failed to register fleet"); + } finally { + setCreating(false); + } + }; + + const handleDelete = async () => { + if (!deleteTarget) return; + setDeleting(true); + try { + const res = await fetch(`/admin/api/runner/fleets/${deleteTarget.id}`, { + method: "DELETE", + credentials: "include", + }); + if (!res.ok) { + showErrorToast(await readErrorMessage(res, "Failed to delete fleet")); + return; + } + showSuccessToast(`Fleet "${deleteTarget.name}" deleted`); + setDeleteTarget(null); + await loadFleets(); + } catch { + showErrorToast("Failed to delete fleet"); + } finally { + setDeleting(false); + } + }; + + const copyFleetId = async (id: string) => { + try { + await navigator.clipboard.writeText(id); + showSuccessToast("Fleet ID copied"); + } catch { + showErrorToast("Failed to copy"); + } + }; + + if (loading && fleets.length === 0) { + return ( +
+
+ Loading runner fleets... +
+ ); + } + + return ( +
+ setCreatedFleet(null)} /> + setDeleteTarget(null)} + onConfirm={handleDelete} + /> + +

Runner fleets

+ +
+

+ + Register fleet +

+
+
+ + setName(e.target.value)} + placeholder="e.g. local-dev-fleet" + disabled={creating} + /> +
+ +
+
+ + {fleets.length === 0 ? ( +
+ No runner fleets yet. Register one above. +
+ ) : ( +
+ + + + + + + + + + + {fleets.map((fleet) => ( + + + + + + + ))} + +
NameFleet IDCreatedActions
+ + + {fleet.name} + + + + {fleet.id} + + + + {formatDate(fleet.created_at)} + + +
+
+ )} +
+ ); +}; + +export default RunnerFleets; diff --git a/web_src/src/pages/workflowv2/mappers/runner.tsx b/web_src/src/pages/workflowv2/mappers/runner.tsx index 116864aa11..06bc711d5d 100644 --- a/web_src/src/pages/workflowv2/mappers/runner.tsx +++ b/web_src/src/pages/workflowv2/mappers/runner.tsx @@ -46,6 +46,10 @@ export function runnerConfigurationDetails(configuration: unknown): Record; + const fleetID = typeof c.fleet_id === "string" ? c.fleet_id.trim() : ""; + if (fleetID) { + details["Machine type"] = fleetID; + } const rawMode = typeof c.execution_mode === "string" ? c.execution_mode.trim().toLowerCase() : ""; if (rawMode === EXECUTION_MODE_DOCKER) { details["Execution mode"] = "Docker"; diff --git a/web_src/src/ui/CanvasPage/RunnerLiveLogDialog/LiveLogStreamView.tsx b/web_src/src/ui/CanvasPage/RunnerLiveLogDialog/LiveLogStreamView.tsx index 7fc59e06d7..ecbc4facb7 100644 --- a/web_src/src/ui/CanvasPage/RunnerLiveLogDialog/LiveLogStreamView.tsx +++ b/web_src/src/ui/CanvasPage/RunnerLiveLogDialog/LiveLogStreamView.tsx @@ -9,7 +9,7 @@ export function LiveLogStreamView({ executionId }: { executionId: string }) { return (
- {error ? : null} + {error ? : null} {!error && !hasAnyLogs ? : null} {sections.map((section) => ( @@ -23,10 +23,11 @@ function NoLogsMessage() { return
No log lines yet.
; } -function ErrorMessage() { +function ErrorMessage({ message }: { message: string }) { + const fallback = "Something went wrong while fetching logs. Please try again later."; return ( -
- Something went wrong while fetching logs. Please try again later. +
+ {message.trim() || fallback}
); } diff --git a/web_src/src/ui/configurationFieldRenderer/RunnerFleetFieldRenderer.tsx b/web_src/src/ui/configurationFieldRenderer/RunnerFleetFieldRenderer.tsx new file mode 100644 index 0000000000..2db092a7cb --- /dev/null +++ b/web_src/src/ui/configurationFieldRenderer/RunnerFleetFieldRenderer.tsx @@ -0,0 +1,60 @@ +import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; +import { useRunnerFleets } from "@/hooks/useRunnerFleets"; +import type { ConfigurationField } from "../../api-client"; + +interface RunnerFleetFieldRendererProps { + field: ConfigurationField; + value: string; + onChange: (value: string | undefined) => void; + organizationId?: string; +} + +export function RunnerFleetFieldRenderer({ value, onChange, organizationId }: RunnerFleetFieldRendererProps) { + const { data: fleets, isLoading, error } = useRunnerFleets(organizationId); + + if (!organizationId?.trim()) { + return
Machine type requires an organization context
; + } + + if (error) { + return ( +
+ Failed to load machine types: {error instanceof Error ? error.message : "Unknown error"} +
+ ); + } + + if (isLoading) { + return
Loading machine types...
; + } + + if (!fleets?.length) { + return ( +
+ +

+ Ask your installation administrator to register a runner fleet. +

+
+ ); + } + + return ( + + ); +} diff --git a/web_src/src/ui/configurationFieldRenderer/index.tsx b/web_src/src/ui/configurationFieldRenderer/index.tsx index ec132b29ae..00e68e69d7 100644 --- a/web_src/src/ui/configurationFieldRenderer/index.tsx +++ b/web_src/src/ui/configurationFieldRenderer/index.tsx @@ -22,6 +22,7 @@ import { CronFieldRenderer } from "./CronFieldRenderer"; import { UserFieldRenderer } from "./UserFieldRenderer"; import { RoleFieldRenderer } from "./RoleFieldRenderer"; import { GroupFieldRenderer } from "./GroupFieldRenderer"; +import { RunnerFleetFieldRenderer } from "./RunnerFleetFieldRenderer"; import { GitRefFieldRenderer } from "./GitRefFieldRenderer"; import { TimezoneFieldRenderer } from "./TimezoneFieldRenderer"; import { SecretKeyFieldRenderer, type SecretKeyRefValue } from "./SecretKeyFieldRenderer"; @@ -346,6 +347,16 @@ export const ConfigurationFieldRenderer = ({ case "git-ref": return ; + case "runner-fleet": + return ( + + ); + case "user": if (!domainId) { return
User field requires domainId prop
;