diff --git a/cmd/execute.go b/cmd/execute.go index 981e532..22f63c3 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "os" "time" "github.com/hdresearch/vers-cli/internal/handlers" @@ -10,13 +11,22 @@ import ( ) var executeTimeout int +var executeSSH bool +var executeWorkDir string // executeCmd represents the execute command var executeCmd = &cobra.Command{ - Use: "execute [vm-id|alias] [args...]", - Short: "Run a command on a specific VM", - Long: `Execute a command within the Vers environment on the specified VM. -If no VM is specified, the current HEAD VM is used.`, + Use: "exec [vm-id|alias] [args...]", + Aliases: []string{"execute"}, + Short: "Run a command on a specific VM", + Long: `Execute a command on the specified VM via the orchestrator API. + +The command runs through the in-VM agent, which means it automatically +inherits environment variables configured for your account. + +If no VM is specified, the current HEAD VM is used. + +Use --ssh to bypass the API and connect directly via SSH (legacy behavior).`, Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { // Use custom timeout if specified, otherwise use default APIMedium @@ -44,17 +54,35 @@ If no VM is specified, the current HEAD VM is used.`, command = args[1:] } - view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{Target: target, Command: command}) + var timeoutSec uint64 + if executeTimeout > 0 { + timeoutSec = uint64(executeTimeout) + } + + view, err := handlers.HandleExecute(apiCtx, application, handlers.ExecuteReq{ + Target: target, + Command: command, + WorkingDir: executeWorkDir, + TimeoutSec: timeoutSec, + UseSSH: executeSSH, + }) if err != nil { return err } pres.RenderExecute(application, view) + + // Exit with the command's exit code + if view.ExitCode != 0 { + os.Exit(view.ExitCode) + } return nil }, } func init() { rootCmd.AddCommand(executeCmd) - executeCmd.Flags().String("host", "", "Specify the host IP to connect to (overrides default)") + executeCmd.Flags().SetInterspersed(false) // stop flag parsing after first positional arg executeCmd.Flags().IntVarP(&executeTimeout, "timeout", "t", 0, "Timeout in seconds (default: 30s, use 0 for no limit)") + executeCmd.Flags().BoolVar(&executeSSH, "ssh", false, "Use direct SSH instead of the VERS API") + executeCmd.Flags().StringVarP(&executeWorkDir, "workdir", "w", "", "Working directory for the command") } diff --git a/internal/handlers/execute.go b/internal/handlers/execute.go index 2c2aaf2..2e9a684 100644 --- a/internal/handlers/execute.go +++ b/internal/handlers/execute.go @@ -1,8 +1,12 @@ package handlers import ( + "bufio" "context" + "encoding/base64" + "encoding/json" "fmt" + "io" "github.com/hdresearch/vers-cli/internal/app" "github.com/hdresearch/vers-cli/internal/presenters" @@ -13,8 +17,28 @@ import ( ) type ExecuteReq struct { - Target string - Command []string + Target string + Command []string + WorkingDir string + Env map[string]string + TimeoutSec uint64 + UseSSH bool +} + +// streamResponse represents a single NDJSON line from the exec stream. +// The orchestrator flattens the agent protocol into: +// +// {"type":"chunk","stream":"stdout","data_b64":"...","cursor":N,"exec_id":"..."} +// {"type":"exit","exit_code":0,"cursor":N,"exec_id":"..."} +// {"type":"error","code":"...","message":"..."} +type streamResponse struct { + Type string `json:"type"` + Stream string `json:"stream,omitempty"` + DataB64 string `json:"data_b64,omitempty"` + ExitCode *int `json:"exit_code,omitempty"` + Cursor uint64 `json:"cursor,omitempty"` + Code string `json:"code,omitempty"` + Message string `json:"message,omitempty"` } func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.ExecuteView, error) { @@ -27,21 +51,106 @@ func HandleExecute(ctx context.Context, a *app.App, r ExecuteReq) (presenters.Ex v.UsedHEAD = t.UsedHEAD v.HeadID = t.HeadID + if r.UseSSH { + return handleExecuteSSH(ctx, a, r, t, v) + } + + return handleExecuteAPI(ctx, a, r, t, v) +} + +// handleExecuteAPI runs the command via the orchestrator exec/stream API. +func handleExecuteAPI(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) { + // Wrap the command in bash -c so shell features work + command := []string{"bash", "-c", utils.ShellJoin(r.Command)} + + body, err := vmSvc.ExecStream(ctx, t.Ident, vmSvc.ExecRequest{ + Command: command, + Env: r.Env, + WorkingDir: r.WorkingDir, + TimeoutSec: r.TimeoutSec, + }) + if err != nil { + return v, fmt.Errorf("exec: %w", err) + } + defer body.Close() + + exitCode, err := streamExecOutput(body, a.IO.Out, a.IO.Err) + if err != nil { + return v, fmt.Errorf("exec stream: %w", err) + } + + v.ExitCode = exitCode + return v, nil +} + +// handleExecuteSSH runs the command via direct SSH (legacy fallback). +func handleExecuteSSH(ctx context.Context, a *app.App, r ExecuteReq, t utils.TargetResult, v presenters.ExecuteView) (presenters.ExecuteView, error) { info, err := vmSvc.GetConnectInfo(ctx, a.Client, t.Ident) if err != nil { return v, fmt.Errorf("failed to get VM information: %w", err) } - sshHost := info.Host cmdStr := utils.ShellJoin(r.Command) - - client := sshutil.NewClient(sshHost, info.KeyPath, info.VMDomain) + client := sshutil.NewClient(info.Host, info.KeyPath, info.VMDomain) err = client.Execute(ctx, cmdStr, a.IO.Out, a.IO.Err) if err != nil { if exitErr, ok := err.(*ssh.ExitError); ok { - return v, fmt.Errorf("command exited with code %d", exitErr.ExitStatus()) + v.ExitCode = exitErr.ExitStatus() + return v, nil } return v, fmt.Errorf("failed to execute command: %w", err) } return v, nil } + +// streamExecOutput reads NDJSON from the exec stream, writes stdout/stderr +// to the provided writers, and returns the exit code. +func streamExecOutput(body io.Reader, stdout, stderr io.Writer) (int, error) { + scanner := bufio.NewScanner(body) + // Allow large lines (agent can send up to 10MB of output) + scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + + exitCode := 0 + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + var resp streamResponse + if err := json.Unmarshal(line, &resp); err != nil { + // Skip unparseable lines + continue + } + + switch resp.Type { + case "chunk": + data, err := base64.StdEncoding.DecodeString(resp.DataB64) + if err != nil { + continue + } + switch resp.Stream { + case "stdout": + stdout.Write(data) + case "stderr": + stderr.Write(data) + } + + case "exit": + if resp.ExitCode != nil { + exitCode = *resp.ExitCode + } + return exitCode, nil + + case "error": + return 1, fmt.Errorf("exec error [%s]: %s", resp.Code, resp.Message) + } + } + + if err := scanner.Err(); err != nil { + return 1, fmt.Errorf("stream read error: %w", err) + } + + return exitCode, nil +} diff --git a/internal/presenters/execute_types.go b/internal/presenters/execute_types.go index 335cfca..87e5376 100644 --- a/internal/presenters/execute_types.go +++ b/internal/presenters/execute_types.go @@ -3,4 +3,5 @@ package presenters type ExecuteView struct { UsedHEAD bool HeadID string + ExitCode int } diff --git a/internal/services/vm/exec.go b/internal/services/vm/exec.go new file mode 100644 index 0000000..c1122db --- /dev/null +++ b/internal/services/vm/exec.go @@ -0,0 +1,122 @@ +package vm + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/hdresearch/vers-cli/internal/auth" +) + +// ExecRequest matches the orchestrator's VmExecRequest. +type ExecRequest struct { + Command []string `json:"command"` + Env map[string]string `json:"env,omitempty"` + WorkingDir string `json:"working_dir,omitempty"` + Stdin string `json:"stdin,omitempty"` + TimeoutSec uint64 `json:"timeout_secs,omitempty"` +} + +// ExecResponse matches the orchestrator's VmExecResponse. +type ExecResponse struct { + ExitCode int `json:"exit_code"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` +} + +// ExecStreamChunk is a single line from the NDJSON exec stream. +type ExecStreamChunk struct { + Type string `json:"type"` // "chunk" or "exit" + Stream string `json:"stream,omitempty"` // "stdout" or "stderr" + Data string `json:"data,omitempty"` // base64-encoded bytes + ExitCode *int `json:"exit_code,omitempty"` // only on type=="exit" +} + +// Exec runs a command on a VM via the orchestrator API (non-streaming). +func Exec(ctx context.Context, vmID string, req ExecRequest) (*ExecResponse, error) { + apiKey, err := auth.GetAPIKey() + if err != nil { + return nil, fmt.Errorf("failed to get API key: %w", err) + } + + baseURL, err := auth.GetVersUrl() + if err != nil { + return nil, fmt.Errorf("failed to get API URL: %w", err) + } + + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/api/v1/vm/%s/exec", baseURL.String(), vmID) + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + httpReq.Header.Set("Authorization", "Bearer "+apiKey) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody)) + } + + var result ExecResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + + return &result, nil +} + +// ExecStream runs a command on a VM via the orchestrator streaming API. +// It returns the response body for the caller to consume as NDJSON. +func ExecStream(ctx context.Context, vmID string, req ExecRequest) (io.ReadCloser, error) { + apiKey, err := auth.GetAPIKey() + if err != nil { + return nil, fmt.Errorf("failed to get API key: %w", err) + } + + baseURL, err := auth.GetVersUrl() + if err != nil { + return nil, fmt.Errorf("failed to get API URL: %w", err) + } + + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/api/v1/vm/%s/exec/stream", baseURL.String(), vmID) + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + httpReq.Header.Set("Authorization", "Bearer "+apiKey) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + + if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() + errBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(errBody)) + } + + return resp.Body, nil +}