Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"os"
"time"

"github.com/hdresearch/vers-cli/internal/handlers"
Expand All @@ -10,13 +11,22 @@ import (
)

var executeTimeout int
var executeSSH bool
var executeWorkDir string

// executeCmd represents the execute command
var executeCmd = &cobra.Command{
Use: "execute [vm-id|alias] <command> [args...]",
Short: "Run a command on a specific VM",
Long: `Execute a command within the Vers environment on the specified VM.
If no VM is specified, the current HEAD VM is used.`,
Use: "exec [vm-id|alias] <command> [args...]",
Aliases: []string{"execute"},
Short: "Run a command on a specific VM",
Long: `Execute a command on the specified VM via the orchestrator API.

The command runs through the in-VM agent, which means it automatically
inherits environment variables configured for your account.

If no VM is specified, the current HEAD VM is used.

Use --ssh to bypass the API and connect directly via SSH (legacy behavior).`,
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Use custom timeout if specified, otherwise use default APIMedium
Expand Down Expand Up @@ -44,17 +54,35 @@ If no VM is specified, the current HEAD VM is used.`,
command = args[1:]
}

view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{Target: target, Command: command})
var timeoutSec uint64
if executeTimeout > 0 {
timeoutSec = uint64(executeTimeout)
}

view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{
Target: target,
Command: command,
WorkingDir: executeWorkDir,
TimeoutSec: timeoutSec,
UseSSH: executeSSH,
})
if err != nil {
return err
}
pres.RenderExecute(application, view)

// Exit with the command's exit code
if view.ExitCode != 0 {
os.Exit(view.ExitCode)
}
return nil
},
}

func init() {
rootCmd.AddCommand(executeCmd)
executeCmd.Flags().String("host", "", "Specify the host IP to connect to (overrides default)")
executeCmd.Flags().SetInterspersed(false) // stop flag parsing after first positional arg
executeCmd.Flags().IntVarP(&executeTimeout, "timeout", "t", 0, "Timeout in seconds (default: 30s, use 0 for no limit)")
executeCmd.Flags().BoolVar(&executeSSH, "ssh", false, "Use direct SSH instead of the VERS API")
executeCmd.Flags().StringVarP(&executeWorkDir, "workdir", "w", "", "Working directory for the command")
}
121 changes: 115 additions & 6 deletions internal/handlers/execute.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package handlers

import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"

"github.com/hdresearch/vers-cli/internal/app"
"github.com/hdresearch/vers-cli/internal/presenters"
Expand All @@ -13,8 +17,28 @@ import (
)

type ExecuteReq struct {
Target string
Command []string
Target string
Command []string
WorkingDir string
Env map[string]string
TimeoutSec uint64
UseSSH bool
}

// streamResponse represents a single NDJSON line from the exec stream.
// The orchestrator flattens the agent protocol into:
//
// {"type":"chunk","stream":"stdout","data_b64":"...","cursor":N,"exec_id":"..."}
// {"type":"exit","exit_code":0,"cursor":N,"exec_id":"..."}
// {"type":"error","code":"...","message":"..."}
type streamResponse struct {
Type string `json:"type"`
Stream string `json:"stream,omitempty"`
DataB64 string `json:"data_b64,omitempty"`
ExitCode *int `json:"exit_code,omitempty"`
Cursor uint64 `json:"cursor,omitempty"`
Code string `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}

func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.ExecuteView, error) {
Expand All @@ -27,21 +51,106 @@ func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.Ex
v.UsedHEAD = t.UsedHEAD
v.HeadID = t.HeadID

if r.UseSSH {
return handleExecuteSSH(ctx, a, r, t, v)
}

return handleExecuteAPI(ctx, a, r, t, v)
}

// handleExecuteAPI runs the command via the orchestrator exec/stream API.
func handleExecuteAPI(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) {
// Wrap the command in bash -c so shell features work
command := []string{"bash", "-c", utils.ShellJoin(r.Command)}

body, err := vmSvc.ExecStream(ctx, t.Ident, vmSvc.ExecRequest{
Command: command,
Env: r.Env,
WorkingDir: r.WorkingDir,
TimeoutSec: r.TimeoutSec,
})
if err != nil {
return v, fmt.Errorf("exec: %w", err)
}
defer body.Close()

exitCode, err := streamExecOutput(body, a.IO.Out, a.IO.Err)
if err != nil {
return v, fmt.Errorf("exec stream: %w", err)
}

v.ExitCode = exitCode
return v, nil
}

// handleExecuteSSH runs the command via direct SSH (legacy fallback).
func handleExecuteSSH(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) {
info, err := vmSvc.GetConnectInfo(ctx, a.Client, t.Ident)
if err != nil {
return v, fmt.Errorf("failed to get VM information: %w", err)
}

sshHost := info.Host
cmdStr := utils.ShellJoin(r.Command)

client := sshutil.NewClient(sshHost, info.KeyPath, info.VMDomain)
client := sshutil.NewClient(info.Host, info.KeyPath, info.VMDomain)
err = client.Execute(ctx, cmdStr, a.IO.Out, a.IO.Err)
if err != nil {
if exitErr, ok := err.(*ssh.ExitError); ok {
return v, fmt.Errorf("command exited with code %d", exitErr.ExitStatus())
v.ExitCode = exitErr.ExitStatus()
return v, nil
}
return v, fmt.Errorf("failed to execute command: %w", err)
}
return v, nil
}

// streamExecOutput reads NDJSON from the exec stream, writes stdout/stderr
// to the provided writers, and returns the exit code.
func streamExecOutput(body io.Reader, stdout, stderr io.Writer) (int, error) {
scanner := bufio.NewScanner(body)
// Allow large lines (agent can send up to 10MB of output)
scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)

exitCode := 0

for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}

var resp streamResponse
if err := json.Unmarshal(line, &resp); err != nil {
// Skip unparseable lines
continue
}

switch resp.Type {
case "chunk":
data, err := base64.StdEncoding.DecodeString(resp.DataB64)
if err != nil {
continue
}
switch resp.Stream {
case "stdout":
stdout.Write(data)
case "stderr":
stderr.Write(data)
}

case "exit":
if resp.ExitCode != nil {
exitCode = *resp.ExitCode
}
return exitCode, nil

case "error":
return 1, fmt.Errorf("exec error [%s]: %s", resp.Code, resp.Message)
}
}

if err := scanner.Err(); err != nil {
return 1, fmt.Errorf("stream read error: %w", err)
}

return exitCode, nil
}
1 change: 1 addition & 0 deletions internal/presenters/execute_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package presenters
type ExecuteView struct {
UsedHEAD bool
HeadID string
ExitCode int
}
122 changes: 122 additions & 0 deletions internal/services/vm/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package vm

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/hdresearch/vers-cli/internal/auth"
)

// ExecRequest matches the orchestrator's VmExecRequest.
type ExecRequest struct {
Command []string `json:"command"`
Env map[string]string `json:"env,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
Stdin string `json:"stdin,omitempty"`
TimeoutSec uint64 `json:"timeout_secs,omitempty"`
}

// ExecResponse matches the orchestrator's VmExecResponse.
type ExecResponse struct {
ExitCode int `json:"exit_code"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
}

// ExecStreamChunk is a single line from the NDJSON exec stream.
type ExecStreamChunk struct {
Type string `json:"type"` // "chunk" or "exit"
Stream string `json:"stream,omitempty"` // "stdout" or "stderr"
Data string `json:"data,omitempty"` // base64-encoded bytes
ExitCode *int `json:"exit_code,omitempty"` // only on type=="exit"
}

// Exec runs a command on a VM via the orchestrator API (non-streaming).
func Exec(ctx context.Context, vmID string, req ExecRequest) (*ExecResponse, error) {
apiKey, err := auth.GetAPIKey()
if err != nil {
return nil, fmt.Errorf("failed to get API key: %w", err)
}

baseURL, err := auth.GetVersUrl()
if err != nil {
return nil, fmt.Errorf("failed to get API URL: %w", err)
}

body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}

url := fmt.Sprintf("%s/api/v1/vm/%s/exec", baseURL.String(), vmID)
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

httpReq.Header.Set("Authorization", "Bearer "+apiKey)
httpReq.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
errBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody))
}

var result ExecResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}

return &result, nil
}

// ExecStream runs a command on a VM via the orchestrator streaming API.
// It returns the response body for the caller to consume as NDJSON.
func ExecStream(ctx context.Context, vmID string, req ExecRequest) (io.ReadCloser, error) {
apiKey, err := auth.GetAPIKey()
if err != nil {
return nil, fmt.Errorf("failed to get API key: %w", err)
}

baseURL, err := auth.GetVersUrl()
if err != nil {
return nil, fmt.Errorf("failed to get API URL: %w", err)
}

body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}

url := fmt.Sprintf("%s/api/v1/vm/%s/exec/stream", baseURL.String(), vmID)
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

httpReq.Header.Set("Authorization", "Bearer "+apiKey)
httpReq.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}

if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
errBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody))
}

return resp.Body, nil
}
Loading