diff --git a/.github/workflows/execd-test.yml b/.github/workflows/execd-test.yml index 16d9b8c62..5357883cb 100644 --- a/.github/workflows/execd-test.yml +++ b/.github/workflows/execd-test.yml @@ -101,6 +101,7 @@ jobs: sleep 5 python3 tests/smoke_api.py + python3 tests/command_resume_test.py - name: Show logs if: always() run: | diff --git a/components/execd/README.md b/components/execd/README.md index f929ecd67..ae6a219e4 100644 --- a/components/execd/README.md +++ b/components/execd/README.md @@ -54,6 +54,7 @@ English | [中文](README_zh.md) - Proper signal forwarding with process groups - Real-time stdout/stderr streaming - Context-aware interruption +- **SSE resume (foreground `POST /command` and `POST /code`)**: stdout/stderr are copied to a bounded ring buffer while the primary SSE is active. If the client disconnects, **`GET /command/{id}/resume?after_eid=`** replays buffered events (`eid > after_eid`) and may attach as the sole live consumer if the command is still running. An active primary stream returns **`409 Conflict`**. See `specs/execd-api.yaml`. ### Filesystem diff --git a/components/execd/README_zh.md b/components/execd/README_zh.md index 7aa70191f..8047f19c2 100644 --- a/components/execd/README_zh.md +++ b/components/execd/README_zh.md @@ -52,6 +52,7 @@ - 通过进程组管理正确转发信号 - 实时 stdout/stderr 流式输出 - 支持上下文感知的中断 +- **SSE 断线续传(前台 `POST /command` / `POST /code`)**:主 SSE 存活期间 stdout/stderr 会写入有界环形缓冲;客户端断开后可用 **`GET /command/{id}/resume?after_eid=`** 按 `eid` 重放并可在命令仍运行时独占续传;主连接仍占用时返回 **`409 Conflict`**。OpenAPI 见 `specs/execd-api.yaml`。 ### 文件系统 diff --git a/components/execd/RELEASE_NOTES.md b/components/execd/RELEASE_NOTES.md index 1b89a2d58..4bae9f307 100644 --- a/components/execd/RELEASE_NOTES.md +++ b/components/execd/RELEASE_NOTES.md @@ -35,8 +35,8 @@ Thanks to these contributors ❤️ - @csdbianhua --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.8 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.8 # components/execd 1.0.7 @@ -70,8 +70,8 @@ Thanks to these contributors ❤️ - @dependabot --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.7 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.7 # components/execd 1.0.6 @@ -93,8 +93,8 @@ Thanks to these contributors ❤️ - @dependabot --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.6 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.6 # components/execd 1.0.5 @@ -111,8 +111,8 @@ Thanks to these contributors ❤️ - @Pangjiping --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.5 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.5 # components/execd 1.0.4 @@ -135,8 +135,8 @@ Thanks to these contributors ❤️ - @ninan-nn --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.4 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.4 # components/execd 1.0.3 @@ -159,8 +159,8 @@ Thanks to these contributors ❤️ - @jwx0925 --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.3 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.3 # components/execd 1.0.2 @@ -188,8 +188,8 @@ Thanks to these contributors ❤️ - @ninan-nn --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.2 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.2 # components/execd 1.0.1 @@ -221,8 +221,8 @@ Thanks to these contributors ❤️ - @jwx0925 --- -- Docker Hub: opensandbox/execd:v1.0.9 -- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9 +- Docker Hub: opensandbox/execd:v1.0.1 +- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.1 # components/execd 1.0.0 diff --git a/components/execd/pkg/runtime/bash_session.go b/components/execd/pkg/runtime/bash_session.go index 5e5d01d3c..dd5f77e3b 100644 --- a/components/execd/pkg/runtime/bash_session.go +++ b/components/execd/pkg/runtime/bash_session.go @@ -236,7 +236,7 @@ func (s *bashSession) run(ctx context.Context, request *ExecuteCodeRequest) erro continue } if request.Hooks.OnExecuteStdout != nil { - request.Hooks.OnExecuteStdout(line) + request.Hooks.OnExecuteStdout(request.nextStdoutStderrEventID(), line) } } } diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index b18af3de0..853baad92 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -55,7 +55,7 @@ func TestBashSession_NonZeroExitEmitsError(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { stdoutLine = s }, + OnExecuteStdout: func(_ int64, s string) { stdoutLine = s }, OnExecuteError: func(err *execute.ErrorOutput) { errCh <- err }, OnExecuteComplete: func(_ time.Duration) { completeCh <- struct{}{} @@ -104,7 +104,7 @@ func TestBashSession_envAndExitCode(t *testing.T) { require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit") initCalls++ }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(_ int64, text string) { t.Log(text) stdoutLines = append(stdoutLines, text) }, @@ -177,7 +177,7 @@ func TestBashSession_envLargeOutputChained(t *testing.T) { require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit") initCalls++ }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(_ int64, text string) { t.Log(text) stdoutLines = append(stdoutLines, text) }, @@ -222,7 +222,7 @@ func TestBashSession_cwdPersistsWithoutOverride(t *testing.T) { targetDir := t.TempDir() var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -264,7 +264,7 @@ func TestBashSession_requestCwdOverridesAfterCd(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -306,7 +306,7 @@ func TestBashSession_envDumpNotLeakedWhenNoTrailingNewline(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -334,7 +334,7 @@ func TestBashSession_envDumpNotLeakedWhenNoOutput(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -365,7 +365,7 @@ func TestBashSession_heredoc(t *testing.T) { t.Cleanup(func() { _ = controller.DeleteBashSession(sessionID) }) hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { fmt.Printf("[stdout] %s\n", line) }, OnExecuteComplete: func(d time.Duration) { @@ -418,7 +418,7 @@ func TestBashSession_execReplacesShell(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } @@ -458,7 +458,7 @@ func TestBashSession_complexExec(t *testing.T) { var stdoutLines []string hooks := ExecuteResultHook{ - OnExecuteStdout: func(line string) { + OnExecuteStdout: func(_ int64, line string) { stdoutLines = append(stdoutLines, line) }, } diff --git a/components/execd/pkg/runtime/command.go b/components/execd/pkg/runtime/command.go index 208b541ab..a18c34041 100644 --- a/components/execd/pkg/runtime/command.go +++ b/components/execd/pkg/runtime/command.go @@ -128,11 +128,11 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest wg.Add(2) safego.Go(func() { defer wg.Done() - c.tailStdPipe(stdoutPath, request.Hooks.OnExecuteStdout, done) + c.tailStdPipe(stdoutPath, request.wrapStdoutPipeHook(), done) }) safego.Go(func() { defer wg.Done() - c.tailStdPipe(stderrPath, request.Hooks.OnExecuteStderr, done) + c.tailStdPipe(stderrPath, request.wrapStderrPipeHook(), done) }) err = cmd.Start() diff --git a/components/execd/pkg/runtime/command_test.go b/components/execd/pkg/runtime/command_test.go index e282d40a1..1866aa2b5 100644 --- a/components/execd/pkg/runtime/command_test.go +++ b/components/execd/pkg/runtime/command_test.go @@ -133,10 +133,10 @@ func TestRunCommand_Echo(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(s string) { + OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) }, OnExecuteError: func(err *execute.ErrorOutput) { @@ -188,8 +188,8 @@ func TestRunCommand_Error(t *testing.T) { Timeout: 5 * time.Second, Hooks: ExecuteResultHook{ OnExecuteInit: func(s string) { sessionID = s }, - OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) }, - OnExecuteStderr: func(s string) { stderrLines = append(stderrLines, s) }, + OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) }, + OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) }, OnExecuteError: func(err *execute.ErrorOutput) { gotErr = err completeCh <- struct{}{} diff --git a/components/execd/pkg/runtime/command_windows.go b/components/execd/pkg/runtime/command_windows.go index 888bd5e89..ffa7bef89 100644 --- a/components/execd/pkg/runtime/command_windows.go +++ b/components/execd/pkg/runtime/command_windows.go @@ -53,10 +53,10 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest done := make(chan struct{}, 1) safego.Go(func() { - c.tailStdPipe(c.stdoutFileName(session), request.Hooks.OnExecuteStdout, done) + c.tailStdPipe(c.stdoutFileName(session), request.wrapStdoutPipeHook(), done) }) safego.Go(func() { - c.tailStdPipe(c.stderrFileName(session), request.Hooks.OnExecuteStderr, done) + c.tailStdPipe(c.stderrFileName(session), request.wrapStderrPipeHook(), done) }) err = cmd.Start() diff --git a/components/execd/pkg/runtime/jupyter.go b/components/execd/pkg/runtime/jupyter.go index f4732d515..fc697d25a 100644 --- a/components/execd/pkg/runtime/jupyter.go +++ b/components/execd/pkg/runtime/jupyter.go @@ -122,9 +122,16 @@ func dispatchExecutionResultHooks(request *ExecuteCodeRequest, result *execute.E for _, stream := range result.Stream { switch stream.Name { case execute.StreamStdout: - request.Hooks.OnExecuteStdout(stream.Text) + if stream.Text != "" && request.Hooks.OnExecuteStdout != nil { + eid := request.nextStdoutStderrEventID() + request.Hooks.OnExecuteStdout(eid, stream.Text) + } case execute.StreamStderr: - request.Hooks.OnExecuteStderr(stream.Text) + if stream.Text != "" && request.Hooks.OnExecuteStderr != nil { + eid := request.nextStdoutStderrEventID() + request.Hooks.OnExecuteStderr(eid, stream.Text) + } + default: } } } diff --git a/components/execd/pkg/runtime/types.go b/components/execd/pkg/runtime/types.go index cd0615c63..3beedc23b 100644 --- a/components/execd/pkg/runtime/types.go +++ b/components/execd/pkg/runtime/types.go @@ -17,6 +17,7 @@ package runtime import ( "fmt" "sync" + "sync/atomic" "time" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" @@ -27,8 +28,8 @@ type ExecuteResultHook struct { OnExecuteInit func(context string) OnExecuteResult func(result map[string]any, count int) OnExecuteStatus func(status string) - OnExecuteStdout func(stdout string) //nolint:predeclared - OnExecuteStderr func(stderr string) //nolint:predeclared + OnExecuteStdout func(eid int64, stdout string) //nolint:predeclared + OnExecuteStderr func(eid int64, stderr string) //nolint:predeclared OnExecuteError func(err *execute.ErrorOutput) OnExecuteComplete func(executionTime time.Duration) } @@ -44,6 +45,35 @@ type ExecuteCodeRequest struct { Uid *uint32 `json:"uid,omitempty"` Gid *uint32 `json:"gid,omitempty"` Hooks ExecuteResultHook + + eventSeq atomic.Uint64 +} + +func (req *ExecuteCodeRequest) nextStdoutStderrEventID() int64 { + if req == nil { + return 0 + } + return int64(req.eventSeq.Add(1)) +} + +func (req *ExecuteCodeRequest) wrapStdoutPipeHook() func(string) { + return func(text string) { + if text == "" || req.Hooks.OnExecuteStdout == nil { + return + } + eid := req.nextStdoutStderrEventID() + req.Hooks.OnExecuteStdout(eid, text) + } +} + +func (req *ExecuteCodeRequest) wrapStderrPipeHook() func(string) { + return func(text string) { + if text == "" || req.Hooks.OnExecuteStderr == nil { + return + } + eid := req.nextStdoutStderrEventID() + req.Hooks.OnExecuteStderr(eid, text) + } } // SetDefaultHooks installs stdout logging fallbacks for unset hooks. @@ -55,10 +85,10 @@ func (req *ExecuteCodeRequest) SetDefaultHooks() { req.Hooks.OnExecuteStatus = func(status string) { fmt.Printf("OnExecuteStatus: %s\n", status) } } if req.Hooks.OnExecuteStdout == nil { - req.Hooks.OnExecuteStdout = func(stdout string) { fmt.Printf("OnExecuteStdout: %s\n", stdout) } + req.Hooks.OnExecuteStdout = func(eid int64, stdout string) { fmt.Printf("OnExecuteStdout: eid=%d %s\n", eid, stdout) } } if req.Hooks.OnExecuteStderr == nil { - req.Hooks.OnExecuteStderr = func(stderr string) { fmt.Printf("OnExecuteStderr: %s\n", stderr) } + req.Hooks.OnExecuteStderr = func(eid int64, stderr string) { fmt.Printf("OnExecuteStderr: eid=%d %s\n", eid, stderr) } } if req.Hooks.OnExecuteError == nil { req.Hooks.OnExecuteError = func(err *execute.ErrorOutput) { fmt.Printf("OnExecuteError: %++v\n", err) } diff --git a/components/execd/pkg/sbuf/config.go b/components/execd/pkg/sbuf/config.go new file mode 100644 index 000000000..12dda926d --- /dev/null +++ b/components/execd/pkg/sbuf/config.go @@ -0,0 +1,37 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// Config controls per-stream bounds and append policy. +type Config struct { + // MaxEvents is the maximum number of events retained per stream. Oldest events are dropped when exceeded. + // Zero defaults to DefaultMaxEvents. + MaxEvents int + // MaxBytes is the approximate upper bound on total payload bytes per stream (sum of len(Payload)). + // Oldest events are dropped until under the limit. Zero means no byte limit. + MaxBytes int64 + // StrictMonotonic rejects Append when eid <= last eid for that stream. Recommended for execd SSE eids. + StrictMonotonic bool +} + +const DefaultMaxEvents = 1024 + +func (c *Config) normalized() Config { + out := *c + if out.MaxEvents <= 0 { + out.MaxEvents = DefaultMaxEvents + } + return out +} diff --git a/components/execd/pkg/sbuf/errors.go b/components/execd/pkg/sbuf/errors.go new file mode 100644 index 000000000..8a37ac6e5 --- /dev/null +++ b/components/execd/pkg/sbuf/errors.go @@ -0,0 +1,23 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import "errors" + +var ( + // ErrOutOfOrder is returned when StrictMonotonic is enabled and eid is not greater than the last appended eid. + ErrOutOfOrder = errors.New("sbuf: eid out of order for stream") + ErrEmptyStreamID = errors.New("sbuf: empty stream id") +) diff --git a/components/execd/pkg/sbuf/event.go b/components/execd/pkg/sbuf/event.go new file mode 100644 index 000000000..d65219e83 --- /dev/null +++ b/components/execd/pkg/sbuf/event.go @@ -0,0 +1,21 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// Event is one stored record (e.g. a single SSE JSON line body). Payload is owned by the buffer after Append. +type Event struct { + EID int64 + Payload []byte +} diff --git a/components/execd/pkg/sbuf/ring.go b/components/execd/pkg/sbuf/ring.go new file mode 100644 index 000000000..37426328a --- /dev/null +++ b/components/execd/pkg/sbuf/ring.go @@ -0,0 +1,91 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +// ring is a FIFO queue with a fixed max length; push drops oldest when full. +type ring struct { + maxLen int + slots []eventSlot + head int + n int + bytes int64 +} + +type eventSlot struct { + eid int64 + payload []byte +} + +func newRing(maxLen int) *ring { + if maxLen < 1 { + maxLen = 1 + } + return &ring{ + maxLen: maxLen, + slots: make([]eventSlot, maxLen), + } +} + +func (r *ring) push(eid int64, payload []byte, maxBytes int64) { + pld := append([]byte(nil), payload...) + size := int64(len(pld)) + + if r.n == r.maxLen { + r.evictHead() + } + idx := (r.head + r.n) % r.maxLen + r.slots[idx] = eventSlot{eid: eid, payload: pld} + r.n++ + r.bytes += size + + if maxBytes > 0 { + for r.bytes > maxBytes && r.n > 0 { + r.evictHead() + } + } +} + +func (r *ring) evictHead() { + if r.n == 0 { + return + } + old := r.slots[r.head] + r.bytes -= int64(len(old.payload)) + r.slots[r.head] = eventSlot{} + r.head = (r.head + 1) % r.maxLen + r.n-- +} + +func (r *ring) iterAfter(afterEid int64, fn func(eid int64, payload []byte)) { + for i := range r.n { + idx := (r.head + i) % r.maxLen + s := r.slots[idx] + if s.eid > afterEid { + fn(s.eid, s.payload) + } + } +} + +// snapshotAfter returns a copy slice for safe iteration outside the ring lock. +func (r *ring) snapshotAfter(afterEid int64) []Event { + var out []Event + r.iterAfter(afterEid, func(eid int64, payload []byte) { + out = append(out, Event{ + EID: eid, + Payload: append([]byte(nil), payload...), + }) + }) + return out +} diff --git a/components/execd/pkg/sbuf/store.go b/components/execd/pkg/sbuf/store.go new file mode 100644 index 000000000..ead4666a4 --- /dev/null +++ b/components/execd/pkg/sbuf/store.go @@ -0,0 +1,108 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sbuf provides bounded, per-stream FIFO buffers for SSE (or similar) events keyed by eid, +// used to serve disconnect resume (catch-up by event id). +// It is storage-only: callers assign eids and decide when to delete a stream. +package sbuf + +import ( + "sync" +) + +// Store holds bounded event rings keyed by caller-defined stream IDs (e.g. command execution id). +type Store struct { + cfg Config + mu sync.Mutex + streams map[string]*streamBuf +} + +type streamBuf struct { + mu sync.Mutex + lastEid int64 + ring *ring + maxBytes int64 +} + +// NewStore creates an empty store. cfg is copied after normalization. +func NewStore(cfg Config) *Store { + cfg = cfg.normalized() + return &Store{ + cfg: cfg, + streams: make(map[string]*streamBuf), + } +} + +// Append adds one event to the stream's ring. Payload is copied. +// With StrictMonotonic, returns ErrOutOfOrder if eid <= previous eid for this stream. +func (s *Store) Append(streamID string, eid int64, payload []byte) error { + if streamID == "" { + return ErrEmptyStreamID + } + sb := s.getOrCreate(streamID) + sb.mu.Lock() + defer sb.mu.Unlock() + + if s.cfg.StrictMonotonic { + if eid <= sb.lastEid { + return ErrOutOfOrder + } + } + sb.lastEid = eid + sb.ring.push(eid, payload, sb.maxBytes) + return nil +} + +func (s *Store) getOrCreate(streamID string) *streamBuf { + s.mu.Lock() + defer s.mu.Unlock() + if sb, ok := s.streams[streamID]; ok { + return sb + } + sb := &streamBuf{ + ring: newRing(s.cfg.MaxEvents), + maxBytes: s.cfg.MaxBytes, + } + s.streams[streamID] = sb + return sb +} + +// EventsAfter returns a snapshot of events with EID > afterEid in order. +// If the stream does not exist, ok is false and events is nil. +func (s *Store) EventsAfter(streamID string, afterEid int64) (events []Event, ok bool) { + s.mu.Lock() + sb, found := s.streams[streamID] + s.mu.Unlock() + if !found { + return nil, false + } + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.ring.snapshotAfter(afterEid), true +} + +// Delete removes a stream buffer. No-op if missing. +func (s *Store) Delete(streamID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.streams, streamID) +} + +// Has reports whether a stream currently exists. +func (s *Store) Has(streamID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.streams[streamID] + return ok +} diff --git a/components/execd/pkg/sbuf/store_benchmark_test.go b/components/execd/pkg/sbuf/store_benchmark_test.go new file mode 100644 index 000000000..e769db981 --- /dev/null +++ b/components/execd/pkg/sbuf/store_benchmark_test.go @@ -0,0 +1,136 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import ( + "fmt" + "sync/atomic" + "testing" +) + +// payload used across benchmarks (typical SSE JSON line order of magnitude). +var benchPayload = []byte(`{"type":"stdout","eid":1,"text":"hello","timestamp":0}`) + +// BenchmarkRing_pushNoEvict measures ring.push when the ring is not full (no evictHead). +func BenchmarkRing_pushNoEvict(b *testing.B) { + r := newRing(1 << 20) + b.SetBytes(int64(len(benchPayload))) + var eid atomic.Int64 + b.ResetTimer() + for i := 0; i < b.N; i++ { + r.push(eid.Add(1), benchPayload, 0) + } +} + +// BenchmarkRing_pushWithEvict measures push when the ring stays at capacity (each push may evict oldest). +func BenchmarkRing_pushWithEvict(b *testing.B) { + const cap = 64 + r := newRing(cap) + // Fill ring so every subsequent push evicts one slot. + for i := int64(1); i <= cap; i++ { + r.push(i, benchPayload, 0) + } + b.SetBytes(int64(len(benchPayload))) + var eid atomic.Int64 + eid.Store(cap) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r.push(eid.Add(1), benchPayload, 0) + } +} + +// BenchmarkStore_Append_noEvict is Append on a warm stream with a large MaxEvents (no ring eviction). +func BenchmarkStore_Append_noEvict(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 20, StrictMonotonic: true}) + if err := s.Append("s", 1, benchPayload); err != nil { + b.Fatal(err) + } + var eid int64 = 1 + b.SetBytes(int64(len(benchPayload))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eid++ + if err := s.Append("s", eid, benchPayload); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStore_Append_evicting keeps a small ring so almost every Append evicts the oldest event. +func BenchmarkStore_Append_evicting(b *testing.B) { + s := NewStore(Config{MaxEvents: 64, StrictMonotonic: true}) + if err := s.Append("s", 1, benchPayload); err != nil { + b.Fatal(err) + } + var eid int64 = 1 + b.SetBytes(int64(len(benchPayload))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eid++ + if err := s.Append("s", eid, benchPayload); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStore_EventsAfter measures snapshot copy cost after many appends. +func BenchmarkStore_EventsAfter(b *testing.B) { + const n = 1000 + s := NewStore(Config{MaxEvents: n + 10, StrictMonotonic: true}) + for i := int64(1); i <= n; i++ { + if err := s.Append("s", i, benchPayload); err != nil { + b.Fatal(err) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = s.EventsAfter("s", 0) + } +} + +// BenchmarkStore_Append_ParallelDifferentStreams: one stream per goroutine (minimal lock contention on streamBuf). +func BenchmarkStore_Append_ParallelDifferentStreams(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 16, StrictMonotonic: true}) + b.SetBytes(int64(len(benchPayload))) + var id atomic.Int64 + b.RunParallel(func(pb *testing.PB) { + // Unique stream id per goroutine iteration batch. + my := id.Add(1) + sid := fmt.Sprintf("s-%d", my) + var e int64 + for pb.Next() { + e++ + if err := s.Append(sid, e, benchPayload); err != nil { + b.Fatal(err) + } + } + }) +} + +// BenchmarkStore_Append_ParallelSameStream: all goroutines append to one stream (serialized on streamBuf.mu). +// StrictMonotonic is off: parallel workers would observe eids out of arrival order if enforced. +func BenchmarkStore_Append_ParallelSameStream(b *testing.B) { + s := NewStore(Config{MaxEvents: 1 << 20, StrictMonotonic: false}) + var eid atomic.Int64 + b.SetBytes(int64(len(benchPayload))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := eid.Add(1) + if err := s.Append("s", n, benchPayload); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/components/execd/pkg/sbuf/store_test.go b/components/execd/pkg/sbuf/store_test.go new file mode 100644 index 000000000..aeb1554d9 --- /dev/null +++ b/components/execd/pkg/sbuf/store_test.go @@ -0,0 +1,80 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sbuf + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStore_EventsAfter_NotFound(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + ev, ok := s.EventsAfter("missing", 0) + require.False(t, ok) + require.Nil(t, ev) +} + +func TestStore_AppendStrictMonotonic(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + require.NoError(t, s.Append("stream-a", 1, []byte(`{"a":1}`))) + require.ErrorIs(t, s.Append("stream-a", 1, []byte(`dup`)), ErrOutOfOrder) + require.NoError(t, s.Append("stream-a", 2, []byte(`{"a":2}`))) + + ev, ok := s.EventsAfter("stream-a", 0) + require.True(t, ok) + require.Len(t, ev, 2) + require.Equal(t, int64(1), ev[0].EID) + require.Equal(t, `{"a":1}`, string(ev[0].Payload)) + + ev2, _ := s.EventsAfter("stream-a", 1) + require.Len(t, ev2, 1) + require.Equal(t, int64(2), ev2[0].EID) +} + +func TestStore_MaxEventsEvictsOldest(t *testing.T) { + s := NewStore(Config{MaxEvents: 3, StrictMonotonic: true}) + for i := int64(1); i <= 5; i++ { + require.NoError(t, s.Append("s", i, []byte{byte(i)})) + } + ev, ok := s.EventsAfter("s", 0) + require.True(t, ok) + require.Len(t, ev, 3) + require.Equal(t, int64(3), ev[0].EID) + require.Equal(t, byte(3), ev[0].Payload[0]) +} + +func TestStore_MaxBytesEvicts(t *testing.T) { + s := NewStore(Config{MaxEvents: 100, MaxBytes: 10, StrictMonotonic: true}) + require.NoError(t, s.Append("s", 1, []byte("1234567890"))) + require.NoError(t, s.Append("s", 2, []byte("1234567890"))) + ev, ok := s.EventsAfter("s", 0) + require.True(t, ok) + require.Len(t, ev, 1) + require.Equal(t, int64(2), ev[0].EID) +} + +func TestStore_Delete(t *testing.T) { + s := NewStore(Config{MaxEvents: 8, StrictMonotonic: true}) + require.NoError(t, s.Append("x", 1, []byte("a"))) + require.True(t, s.Has("x")) + s.Delete("x") + require.False(t, s.Has("x")) +} + +func TestStore_EmptyStreamID(t *testing.T) { + s := NewStore(Config{}) + require.ErrorIs(t, s.Append("", 1, nil), ErrEmptyStreamID) +} diff --git a/components/execd/pkg/web/controller/codeinterpreting.go b/components/execd/pkg/web/controller/codeinterpreting.go index e30131f37..ef8ddc8f3 100644 --- a/components/execd/pkg/web/controller/codeinterpreting.go +++ b/components/execd/pkg/web/controller/codeinterpreting.go @@ -43,6 +43,11 @@ type CodeInterpretingController struct { // chunkWriter serializes SSE event writes to prevent interleaved output. chunkWriter sync.Mutex + + resumeStreamMu sync.Mutex + resumeStreamID string + // resumeEnabled opts into disconnect resume (event buffer + live hub) for RunCommand / RunCode. + resumeEnabled bool } type codeExecutionRunner interface { @@ -131,6 +136,11 @@ func (c *CodeInterpretingController) RunCode() { ctx, cancel := context.WithCancel(c.ctx.Request.Context()) defer cancel() + c.resumeEnabled = true + defer func() { + deferResumeCleanup(c) + c.resumeEnabled = false + }() runCodeRequest := c.buildExecuteCodeRequest(request) eventsHandler := c.setServerEventsHandler(ctx) diff --git a/components/execd/pkg/web/controller/command.go b/components/execd/pkg/web/controller/command.go index d4da90df5..8c02b3e3e 100644 --- a/components/execd/pkg/web/controller/command.go +++ b/components/execd/pkg/web/controller/command.go @@ -16,12 +16,14 @@ package controller import ( "context" + "errors" "fmt" "net/http" "strconv" "time" "github.com/alibaba/opensandbox/execd/pkg/flag" + "github.com/alibaba/opensandbox/execd/pkg/log" "github.com/alibaba/opensandbox/execd/pkg/runtime" "github.com/alibaba/opensandbox/execd/pkg/web/model" ) @@ -50,6 +52,11 @@ func (c *CodeInterpretingController) RunCommand() { ctx, cancel := context.WithCancel(c.ctx.Request.Context()) defer cancel() + c.resumeEnabled = true + defer func() { + deferResumeCleanup(c) + c.resumeEnabled = false + }() runCodeRequest := c.buildExecuteCommandRequest(request) eventsHandler := c.setServerEventsHandler(ctx) @@ -125,6 +132,80 @@ func (c *CodeInterpretingController) GetBackgroundCommandOutput() { c.ctx.String(http.StatusOK, "%s", output) } +// ResumeCommandStream sends buffered events after after_eid, then if the command is still running +// and no other client holds the live slot, streams further events until completion or client disconnect. +func (c *CodeInterpretingController) ResumeCommandStream() { + commandID := c.ctx.Param("id") + if commandID == "" { + c.RespondError(http.StatusBadRequest, model.ErrorCodeInvalidRequest, "missing command execution id") + return + } + afterEid := c.QueryInt64(c.ctx.Query(model.CommandResumeAfterEidQuery), 0) + + hub := commandStreams.getHub(commandID) + st, errSt := codeRunner.GetCommandStatus(commandID) + if errSt != nil && hub == nil { + c.RespondError(http.StatusNotFound, model.ErrorCodeInvalidRequest, errSt.Error()) + return + } + + events, bufferOK := resumeBuffer.EventsAfter(commandID, afterEid) + if !bufferOK && hub == nil { + c.RespondError(http.StatusNotFound, model.ErrorCodeInvalidRequest, "command stream resume buffer not available") + return + } + + if st != nil && st.Running && hub != nil && hub.isHolderAlive() { + c.RespondError( + http.StatusConflict, + model.ErrorCodeInvalidRequest, + "primary SSE stream is still active; disconnect it before resuming", + ) + return + } + + c.setupSSEResponse() + lastReplayMaxEid := afterEid + for _, ev := range events { + c.writeSingleEvent("ResumeBuffer", ev.Payload, false, fmt.Sprintf("buffer eid=%d", ev.EID), 0) + if ev.EID > lastReplayMaxEid { + lastReplayMaxEid = ev.EID + } + } + + st2, _ := codeRunner.GetCommandStatus(commandID) + if st2 == nil || !st2.Running { + if len(events) > 0 { + log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d (replay only)", + commandID, afterEid, len(events)) + } + return + } + + hub = commandStreams.getHub(commandID) + if hub == nil { + return + } + + h, err := commandStreams.tryAttachResume(commandID, c.ctx.Writer, c.ctx.Request.Context()) + if err != nil { + if errors.Is(err, errLiveStreamPrimaryActive) { + log.Error("ResumeCommandStream: attach conflict after buffered history (another client may have attached)") + } + return + } + + // Catch up events appended while the snapshot slice was replayed (holder still nil); same mutex as writeFrame. + tailN := h.flushResumeTail(commandID, lastReplayMaxEid) + log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d post_attach_tail=%d (live)", + commandID, afterEid, len(events), tailN) + + select { + case <-h.waitDone(): + case <-c.ctx.Request.Context().Done(): + } +} + func (c *CodeInterpretingController) buildExecuteCommandRequest(request model.RunCommandRequest) *runtime.ExecuteCodeRequest { timeout := time.Duration(request.TimeoutMs) * time.Millisecond if request.Background { diff --git a/components/execd/pkg/web/controller/command_stream.go b/components/execd/pkg/web/controller/command_stream.go new file mode 100644 index 000000000..6ec07d7d0 --- /dev/null +++ b/components/execd/pkg/web/controller/command_stream.go @@ -0,0 +1,237 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Bounded event buffer (sbuf) plus at-most-one live SSE writer per command id for disconnect resume; +// GET /command/:id/resume sends buffered events then may take over as the sole live consumer. + +package controller + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + + "github.com/alibaba/opensandbox/execd/pkg/log" + "github.com/alibaba/opensandbox/execd/pkg/sbuf" +) + +var ( + resumeBuffer *sbuf.Store + + errLiveStreamPrimaryActive = errors.New("primary SSE stream is still active") + errLiveHubNotFound = errors.New("command live hub not found") +) + +func init() { + resumeBuffer = sbuf.NewStore(sbuf.Config{StrictMonotonic: true}) +} + +func deferResumeCleanup(c *CodeInterpretingController) { + c.resumeStreamMu.Lock() + id := c.resumeStreamID + c.resumeStreamID = "" + c.resumeStreamMu.Unlock() + if id == "" { + return + } + commandStreams.closeAndRemove(id) + resumeBuffer.Delete(id) + log.Info("command stream: hub and resume buffer cleaned up id=%s", id) +} + +// --- live SSE routing (mutually exclusive main vs resume) --- + +type streamRegistry struct { + mu sync.Mutex + m map[string]*streamHub +} + +var commandStreams = &streamRegistry{m: make(map[string]*streamHub)} + +type streamHub struct { + streamID string + mu sync.Mutex + holder *streamHolder + done chan struct{} +} + +type streamHolder struct { + writer http.ResponseWriter + ctx context.Context +} + +func (r *streamRegistry) registerPrimary(id string, w http.ResponseWriter, ctx context.Context) { + r.mu.Lock() + h := &streamHub{ + streamID: id, + done: make(chan struct{}), + } + r.m[id] = h + h.mu.Lock() + h.holder = &streamHolder{writer: w, ctx: ctx} + h.mu.Unlock() + r.mu.Unlock() + + log.Info("command stream: primary hub registered id=%s", id) + watchHolderRelease(h, ctx) +} + +// watchHolderRelease clears h.holder when ctx is cancelled. All holder mutations use h.mu only +// (see tryAttachResume, registerPrimary, writeFrame) so r.mu and h.mu are not split across h.holder. +func watchHolderRelease(h *streamHub, ctx context.Context) { + go func() { + <-ctx.Done() + h.mu.Lock() + defer h.mu.Unlock() + if h.holder != nil && h.holder.ctx == ctx { + h.holder = nil + } + }() +} + +func (r *streamRegistry) getHub(id string) *streamHub { + r.mu.Lock() + defer r.mu.Unlock() + return r.m[id] +} + +func (r *streamRegistry) closeAndRemove(id string) { + r.mu.Lock() + h := r.m[id] + delete(r.m, id) + r.mu.Unlock() + if h != nil { + h.closeDone() + } +} + +func (h *streamHub) closeDone() { + h.mu.Lock() + defer h.mu.Unlock() + select { + case <-h.done: + default: + close(h.done) + } +} + +func (h *streamHub) waitDone() <-chan struct{} { + return h.done +} + +func (h *streamHub) isHolderAlive() bool { + if h == nil { + return false + } + h.mu.Lock() + defer h.mu.Unlock() + return h.holder != nil && h.holder.ctx.Err() == nil +} + +func (r *streamRegistry) tryAttachResume(id string, w http.ResponseWriter, ctx context.Context) (*streamHub, error) { + r.mu.Lock() + h := r.m[id] + if h == nil { + r.mu.Unlock() + return nil, errLiveHubNotFound + } + h.mu.Lock() + if h.holder != nil && h.holder.ctx.Err() == nil { + h.mu.Unlock() + r.mu.Unlock() + return nil, errLiveStreamPrimaryActive + } + h.holder = &streamHolder{writer: w, ctx: ctx} + h.mu.Unlock() + r.mu.Unlock() + + watchHolderRelease(h, ctx) + return h, nil +} + +func (r *streamRegistry) writeSSE(id string, data []byte, bufEid int64, handler, summary string) { + r.mu.Lock() + h := r.m[id] + r.mu.Unlock() + if h == nil { + if bufEid > 0 { + _ = resumeBuffer.Append(id, bufEid, bytes.Clone(data)) + } + return + } + h.writeFrame(data, bufEid, handler, summary) +} + +// flushResumeTail writes all buffered events with EID > afterEid to the current holder while holding h.mu. +// Live writeFrame calls block on the same mutex, so chunks appended only to the ring during the initial +// snapshot replay cannot be missed on this connection (see ResumeCommandStream). +// Returns how many extra events were written after the initial snapshot replay. +func (h *streamHub) flushResumeTail(commandID string, afterEid int64) int { + if h == nil { + return 0 + } + h.mu.Lock() + defer h.mu.Unlock() + if h.holder == nil { + return 0 + } + + tail, ok := resumeBuffer.EventsAfter(commandID, afterEid) + if !ok || len(tail) == 0 { + return 0 + } + writer := h.holder.writer + written := 0 + for _, ev := range tail { + payload := append(append([]byte(nil), ev.Payload...), '\n', '\n') + nw, err := writer.Write(payload) + if err == nil && nw != len(payload) { + err = io.ErrShortWrite + } + if err != nil { + log.Error("flushResumeTail: write eid=%d: %v", ev.EID, err) + return written + } + if flusher, ok := writer.(http.Flusher); ok { + flusher.Flush() + } + written++ + } + return written +} + +func (h *streamHub) writeFrame(data []byte, bufEid int64, handler, summary string) { + h.mu.Lock() + defer h.mu.Unlock() + + payload := append(data, '\n', '\n') + if h.holder != nil { + n, err := h.holder.writer.Write(payload) + if err == nil && n != len(payload) { + err = io.ErrShortWrite + } + if err != nil { + log.Error("StreamEvent.%s write data %s error: %v", handler, summary, err) + } else if flusher, ok := h.holder.writer.(http.Flusher); ok { + flusher.Flush() + } + } + + if bufEid > 0 { + _ = resumeBuffer.Append(h.streamID, bufEid, bytes.Clone(data)) + } +} diff --git a/components/execd/pkg/web/controller/command_stream_test.go b/components/execd/pkg/web/controller/command_stream_test.go new file mode 100644 index 000000000..72d913388 --- /dev/null +++ b/components/execd/pkg/web/controller/command_stream_test.go @@ -0,0 +1,49 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFlushResumeTail_NilHubNoPanic(t *testing.T) { + var h *streamHub + h.flushResumeTail("any", 0) +} + +func TestFlushResumeTail_WritesBufferedEvents(t *testing.T) { + cmdID := "flush-resume-test-cmd" + payload := []byte(`{"type":"stdout","eid":1}`) + require.NoError(t, resumeBuffer.Append(cmdID, 1, payload)) + + w := httptest.NewRecorder() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := &streamHub{ + streamID: cmdID, + done: make(chan struct{}), + holder: &streamHolder{writer: w, ctx: ctx}, + } + h.flushResumeTail(cmdID, 0) + + body := w.Body.String() + require.Contains(t, body, "stdout") + require.Contains(t, body, `"eid":1`) +} diff --git a/components/execd/pkg/web/controller/pty_ws.go b/components/execd/pkg/web/controller/pty_ws.go index 6273557f5..ea36b13e4 100644 --- a/components/execd/pkg/web/controller/pty_ws.go +++ b/components/execd/pkg/web/controller/pty_ws.go @@ -50,7 +50,7 @@ const ( // 2. Acquire exclusive WS lock → 409 if already held // 3. Upgrade HTTP → WebSocket // 4. Start bash if not already running -// 5+6. AtomicAttachOutputWithSnapshot (snapshot + attach under outMu — no loss window) +// 5+6. AtomicAttachOutputWithSnapshot (snapshot + attach under outMu — no loss window) // 7. defer: detach → pumpWg.Wait → UnlockWS // 8. Send replay frame if snapshot non-empty // 9. Send connected frame diff --git a/components/execd/pkg/web/controller/sse.go b/components/execd/pkg/web/controller/sse.go index 9e87bda6b..1fe789f12 100644 --- a/components/execd/pkg/web/controller/sse.go +++ b/components/execd/pkg/web/controller/sse.go @@ -49,13 +49,20 @@ func (c *basicController) setupSSEResponse() { func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) runtime.ExecuteResultHook { return runtime.ExecuteResultHook{ OnExecuteInit: func(session string) { + if c.resumeEnabled { + c.resumeStreamMu.Lock() + c.resumeStreamID = session + c.resumeStreamMu.Unlock() + commandStreams.registerPrimary(session, c.ctx.Writer, c.ctx.Request.Context()) + } + event := model.ServerStreamEvent{ Type: model.StreamEventTypeInit, Text: session, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteInit", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteInit", payload, true, event.Summary(), 0) safego.Go(func() { c.ping(ctx) }) }, @@ -80,7 +87,7 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary(), 0) } if len(mutated) > 0 { event := model.ServerStreamEvent{ @@ -89,7 +96,7 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteResult", payload, true, event.Summary(), 0) } }, OnExecuteComplete: func(executionTime time.Duration) { @@ -99,7 +106,7 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteComplete", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteComplete", payload, true, event.Summary(), 0) }, OnExecuteError: func(err *execute.ErrorOutput) { if err == nil { @@ -112,7 +119,7 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteError", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteError", payload, true, event.Summary(), 0) }, OnExecuteStatus: func(status string) { event := model.ServerStreamEvent{ @@ -121,43 +128,60 @@ func (c *CodeInterpretingController) setServerEventsHandler(ctx context.Context) Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStatus", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStatus", payload, true, event.Summary(), 0) }, - OnExecuteStdout: func(text string) { + OnExecuteStdout: func(eid int64, text string) { if text == "" { return } event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeStdout, Text: text, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStdout", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStdout", payload, true, event.Summary(), eid) }, - OnExecuteStderr: func(text string) { + OnExecuteStderr: func(eid int64, text string) { if text == "" { return } event := model.ServerStreamEvent{ + Eid: eid, Type: model.StreamEventTypeStderr, Text: text, Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("OnExecuteStderr", payload, true, event.Summary()) + c.writeSingleEvent("OnExecuteStderr", payload, true, event.Summary(), eid) }, } } -// writeSingleEvent serializes one SSE frame. -func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byte, verbose bool, summary string) { +// writeSingleEvent serializes one SSE frame. When resumeStreamID is set, writes go through commandStreams (live hub + buffer). +// bufEid is stdout/stderr event id for the resume buffer; 0 skips Append (control events, resume catch-up frames). +func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byte, verbose bool, summary string, bufEid int64) { if c == nil || c.ctx == nil || c.ctx.Writer == nil { return } + var streamID string + if c.resumeEnabled { + c.resumeStreamMu.Lock() + streamID = c.resumeStreamID + c.resumeStreamMu.Unlock() + } + if streamID != "" { + commandStreams.writeSSE(streamID, data, bufEid, handler, summary) + if verbose { + log.Info("StreamEvent.%s write data %s", handler, summary) + } + return + } + select { case <-c.ctx.Request.Context().Done(): log.Error("StreamEvent.%s: client disconnected", handler) @@ -167,11 +191,6 @@ func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byt c.chunkWriter.Lock() defer c.chunkWriter.Unlock() - defer func() { - if flusher, ok := c.ctx.Writer.(http.Flusher); ok { - flusher.Flush() - } - }() payload := append(data, '\n', '\n') n, err := c.ctx.Writer.Write(payload) @@ -181,10 +200,15 @@ func (c *CodeInterpretingController) writeSingleEvent(handler string, data []byt if err != nil { log.Error("StreamEvent.%s write data %s error: %v", handler, summary, err) - } else { - if verbose { - log.Info("StreamEvent.%s write data %s", handler, summary) - } + return + } + + if flusher, ok := c.ctx.Writer.(http.Flusher); ok { + flusher.Flush() + } + + if verbose { + log.Info("StreamEvent.%s write data %s", handler, summary) } } @@ -200,6 +224,6 @@ func (c *CodeInterpretingController) ping(ctx context.Context) { Timestamp: time.Now().UnixMilli(), } payload := event.ToJSON() - c.writeSingleEvent("Ping", payload, false, event.Summary()) + c.writeSingleEvent("Ping", payload, false, event.Summary(), 0) }, 3*time.Second, ctx.Done()) } diff --git a/components/execd/pkg/web/model/codeinterpreting.go b/components/execd/pkg/web/model/codeinterpreting.go index 771b6d75b..c473b94e4 100644 --- a/components/execd/pkg/web/model/codeinterpreting.go +++ b/components/execd/pkg/web/model/codeinterpreting.go @@ -16,7 +16,6 @@ package model import ( "encoding/json" - "errors" "fmt" "strings" @@ -47,30 +46,6 @@ type CodeContextRequest struct { Cwd string `json:"cwd,omitempty"` } -// RunCommandRequest represents a shell command execution request. -type RunCommandRequest struct { - Command string `json:"command" validate:"required"` - Cwd string `json:"cwd,omitempty"` - Background bool `json:"background,omitempty"` - // TimeoutMs caps execution duration; 0 uses server default. - TimeoutMs int64 `json:"timeout,omitempty" validate:"omitempty,gte=1"` - - Uid *uint32 `json:"uid,omitempty"` - Gid *uint32 `json:"gid,omitempty"` - Envs map[string]string `json:"envs,omitempty"` -} - -func (r *RunCommandRequest) Validate() error { - validate := validator.New() - if err := validate.Struct(r); err != nil { - return err - } - if r.Gid != nil && r.Uid == nil { - return errors.New("uid is required when gid is provided") - } - return nil -} - type ServerStreamEventType string const ( @@ -87,6 +62,7 @@ const ( // ServerStreamEvent is emitted to clients over SSE. type ServerStreamEvent struct { + Eid int64 `json:"eid,omitempty"` Type ServerStreamEventType `json:"type,omitempty"` Text string `json:"text,omitempty"` ExecutionCount int `json:"execution_count,omitempty"` @@ -105,6 +81,9 @@ func (s ServerStreamEvent) ToJSON() []byte { // Summary renders a lightweight, log-friendly string without JSON. func (s ServerStreamEvent) Summary() string { parts := []string{fmt.Sprintf("type=%s", s.Type)} + if s.Eid > 0 { + parts = append(parts, fmt.Sprintf("eid=%d", s.Eid)) + } if s.Text != "" { parts = append(parts, fmt.Sprintf("text=%s", truncateString(s.Text, 100))) } diff --git a/components/execd/pkg/web/model/codeinterpreting_test.go b/components/execd/pkg/web/model/codeinterpreting_test.go index f0903bf05..0d83b645f 100644 --- a/components/execd/pkg/web/model/codeinterpreting_test.go +++ b/components/execd/pkg/web/model/codeinterpreting_test.go @@ -67,6 +67,7 @@ func TestRunCommandRequestValidateUidGid(t *testing.T) { func TestServerStreamEventToJSON(t *testing.T) { event := ServerStreamEvent{ + Eid: 42, Type: StreamEventTypeStdout, Text: "hello", ExecutionCount: 3, @@ -75,6 +76,7 @@ func TestServerStreamEventToJSON(t *testing.T) { data := event.ToJSON() var decoded ServerStreamEvent require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, event.Eid, decoded.Eid) require.Equal(t, event.Type, decoded.Type) require.Equal(t, event.Text, decoded.Text) require.Equal(t, event.ExecutionCount, decoded.ExecutionCount) @@ -90,11 +92,12 @@ func TestServerStreamEventSummary(t *testing.T) { { name: "basic stdout", event: ServerStreamEvent{ + Eid: 7, Type: StreamEventTypeStdout, Text: "hello", ExecutionCount: 2, }, - contains: []string{"type=stdout", "text=hello"}, + contains: []string{"type=stdout", "eid=7", "text=hello"}, }, { name: "truncated text and error", diff --git a/components/execd/pkg/web/model/command.go b/components/execd/pkg/web/model/command.go index 0d35aa823..8876eaa06 100644 --- a/components/execd/pkg/web/model/command.go +++ b/components/execd/pkg/web/model/command.go @@ -14,7 +14,38 @@ package model -import "time" +import ( + "errors" + "time" + + "github.com/go-playground/validator/v10" +) + +const CommandResumeAfterEidQuery = "after_eid" + +// RunCommandRequest represents a shell command execution request. +type RunCommandRequest struct { + Command string `json:"command" validate:"required"` + Cwd string `json:"cwd,omitempty"` + Background bool `json:"background,omitempty"` + // TimeoutMs caps execution duration; 0 uses server default. + TimeoutMs int64 `json:"timeout,omitempty" validate:"omitempty,gte=1"` + + Uid *uint32 `json:"uid,omitempty"` + Gid *uint32 `json:"gid,omitempty"` + Envs map[string]string `json:"envs,omitempty"` +} + +func (r *RunCommandRequest) Validate() error { + validate := validator.New() + if err := validate.Struct(r); err != nil { + return err + } + if r.Gid != nil && r.Uid == nil { + return errors.New("uid is required when gid is provided") + } + return nil +} // CommandStatusResponse represents command status for REST APIs. type CommandStatusResponse struct { diff --git a/components/execd/pkg/web/model/error.go b/components/execd/pkg/web/model/error.go index 46c21ac5c..22ae50e23 100644 --- a/components/execd/pkg/web/model/error.go +++ b/components/execd/pkg/web/model/error.go @@ -26,6 +26,7 @@ const ( ErrorCodeFileNotFound ErrorCode = "FILE_NOT_FOUND" ErrorCodeUnknown ErrorCode = "UNKNOWN" ErrorCodeContextNotFound ErrorCode = "CONTEXT_NOT_FOUND" + ErrorCodeNotImplemented ErrorCode = "NOT_IMPLEMENTED" ErrorCodeNotSupported ErrorCode = "NOT_SUPPORTED" ) diff --git a/components/execd/pkg/web/router.go b/components/execd/pkg/web/router.go index 47f1931e5..665f4b170 100644 --- a/components/execd/pkg/web/router.go +++ b/components/execd/pkg/web/router.go @@ -74,6 +74,7 @@ func NewRouter(accessToken string) *gin.Engine { command.POST("", withCode(func(c *controller.CodeInterpretingController) { c.RunCommand() })) command.DELETE("", withCode(func(c *controller.CodeInterpretingController) { c.InterruptCommand() })) command.GET("/status/:id", withCode(func(c *controller.CodeInterpretingController) { c.GetCommandStatus() })) + command.GET("/:id/resume", withCode(func(c *controller.CodeInterpretingController) { c.ResumeCommandStream() })) command.GET("/:id/logs", withCode(func(c *controller.CodeInterpretingController) { c.GetBackgroundCommandOutput() })) } diff --git a/components/execd/tests/command_resume_test.py b/components/execd/tests/command_resume_test.py new file mode 100644 index 000000000..4bbc2d74a --- /dev/null +++ b/components/execd/tests/command_resume_test.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +# Copyright 2026 Alibaba Group Holding Ltd. +# +# Manual local test: POST /command (streaming output) -> disconnect -> GET /resume (catch-up + live tail), +# repeat at least 3 disconnect/resume rounds, then read until execution_complete on the last connection. +# +# Configure EXECD_URL (and optional EXECD_TOKEN). Examples: +# EXECD_URL=localhost:44772 +# EXECD_URL=https://remote.example +# python3 components/execd/tests/command_resume_test.py + +from __future__ import annotations + +import http.client +import json +import os +import ssl +import sys +import urllib.parse +from typing import Any + +API_ACCESS_TOKEN_HEADER = "X-EXECD-ACCESS-TOKEN" + + +class RunCollector: + """Aggregates stdout and execution_complete across connections for final assertions.""" + + def __init__(self) -> None: + self.stdout_by_eid: dict[int, str] = {} + self.primary_stdout_lines = 0 + self.resume_stdout_lines = 0 + self.saw_complete = False + + def record(self, tag: str, ev: dict[str, Any]) -> None: + t = ev.get("type") + if t == "execution_complete": + self.saw_complete = True + return + if t != "stdout": + return + eid = int(ev.get("eid") or 0) + txt = (ev.get("text") or "").strip() + if eid in self.stdout_by_eid: + assert self.stdout_by_eid[eid] == txt, ( + f"duplicate eid {eid} with different text: {self.stdout_by_eid[eid]!r} vs {txt!r}" + ) + else: + self.stdout_by_eid[eid] = txt + if tag == "primary": + self.primary_stdout_lines += 1 + elif tag.startswith("resume"): + self.resume_stdout_lines += 1 + + def assert_ok(self) -> None: + assert self.saw_complete, "expected execution_complete" + assert self.resume_stdout_lines > 0, ( + "resume delivered no stdout lines; disconnect resume may not be working (check 409, STDOUT_PER_CHOP)" + ) + assert len(self.stdout_by_eid) == OUTPUT_LINES, ( + f"expected {OUTPUT_LINES} stdout lines, got distinct eid count={len(self.stdout_by_eid)}" + ) + for n in range(1, OUTPUT_LINES + 1): + assert n in self.stdout_by_eid, f"missing eid={n}" + assert self.stdout_by_eid[n] == f"tick{n}", ( + f"eid={n} text should be tick{n}, got {self.stdout_by_eid[n]!r}" + ) + assert self.primary_stdout_lines >= 1, "primary connection should receive at least one stdout line" + assert self.primary_stdout_lines + self.resume_stdout_lines == OUTPUT_LINES, ( + "primary + resume stdout line counts should equal total stdout lines (each line counted once): " + f"primary={self.primary_stdout_lines} resume={self.resume_stdout_lines} expected={OUTPUT_LINES}" + ) + print( + "ASSERT ok: execution_complete + resume delivered output + tick1..tick" + + str(OUTPUT_LINES) + + " with eid 1.." + + str(OUTPUT_LINES) + + " complete", + flush=True, + ) + +# Execd base URL (host:port is ok; http:// is prepended if missing). +EXECD_URL = os.environ.get("EXECD_URL", "http://127.0.0.1:44772") +if "://" not in EXECD_URL: + EXECD_URL = "http://" + EXECD_URL + +TOKEN = os.environ.get("EXECD_TOKEN", "") + +# Close each connection after this many stdout lines (three disconnect/resume rounds before the final read). +STDOUT_PER_CHOP = 15 + +# One primary disconnect plus (RESUME_CHOPS - 1) partial resume disconnects; last resume reads until complete. +RESUME_CHOPS = 3 + +# Bounded output: sleep 0.1s between lines, OUTPUT_LINES total; wall time ~ OUTPUT_LINES * 0.1s. +OUTPUT_LINES = 200 + +TIMEOUT_MS = 300_000 + +COMMAND = ( + "sh -c 'n=0; while [ \"$n\" -lt " + + str(OUTPUT_LINES) + + " ]; do n=$((n+1)); echo tick$n; sleep 0.1; done'" +) + + +def parse_frames(buf: bytes) -> tuple[list[dict[str, Any]], bytes]: + out: list[dict[str, Any]] = [] + while True: + i = buf.find(b"\n\n") + if i < 0: + return out, buf + raw = buf[:i].strip() + buf = buf[i + 2 :] + if not raw: + continue + try: + out.append(json.loads(raw.decode("utf-8"))) + except (json.JSONDecodeError, UnicodeDecodeError): + pass + + +def connect(scheme: str, host: str, port: int) -> http.client.HTTPConnection: + if scheme == "https": + return http.client.HTTPSConnection( + host, port, timeout=600, context=ssl.create_default_context() + ) + return http.client.HTTPConnection(host, port, timeout=600) + + +def parse_url(base: str) -> tuple[str, str, int, str]: + u = urllib.parse.urlparse(base.rstrip("/")) + scheme = (u.scheme or "http").lower() + host = u.hostname or "127.0.0.1" + port = u.port or (443 if scheme == "https" else 80) + return scheme, host, port, u.path or "" + + +def path_join(prefix: str, p: str) -> str: + if not prefix: + return p if p.startswith("/") else "/" + p + return prefix.rstrip("/") + (p if p.startswith("/") else "/" + p) + + +def headers() -> dict[str, str]: + h: dict[str, str] = { + "Content-Type": "application/json", + "Accept": "text/event-stream", + } + if TOKEN: + h[API_ACCESS_TOKEN_HEADER] = TOKEN + return h + + +def pump( + resp: http.client.HTTPResponse, + tag: str, + max_eid: int, + *, + stop_after_stdout: int | None, + collector: RunCollector | None = None, +) -> tuple[int, bool, str | None]: + """Read SSE; update max_eid. If stop_after_stdout is a number, stop after that many stdout lines; if None, read until execution_complete.""" + buf = b"" + cmd_id: str | None = None + stdout_n = 0 + complete = False + while True: + chunk = resp.read(8192) + if not chunk: + break + buf += chunk + frames, buf = parse_frames(buf) + for ev in frames: + if collector is not None: + collector.record(tag, ev) + t = ev.get("type") + if t == "init": + cmd_id = ev.get("text") + print(f"[{tag}] init id={cmd_id}") + elif t in ("stdout", "stderr"): + eid = int(ev.get("eid") or 0) + max_eid = max(max_eid, eid) + txt = ev.get("text", "") + print(f"[{tag}] {t} eid={eid} {txt!r}") + if t == "stdout": + stdout_n += 1 + elif t == "execution_complete": + complete = True + print(f"[{tag}] execution_complete ms={ev.get('execution_time')}") + elif t != "ping": + print(f"[{tag}] {t}") + + if complete: + return max_eid, True, cmd_id + if stop_after_stdout is not None and stdout_n >= stop_after_stdout: + return max_eid, False, cmd_id + return max_eid, complete, cmd_id + + +def main() -> int: + scheme, host, port, prefix = parse_url(EXECD_URL) + h = headers() + cmd_path = path_join(prefix, "/command") + resume_tmpl = path_join(prefix, "/command/{id}/resume") + collector = RunCollector() + + body = json.dumps({"command": COMMAND, "timeout": TIMEOUT_MS}) + conn = connect(scheme, host, port) + conn.request("POST", cmd_path, body.encode("utf-8"), h) + r = conn.getresponse() + if r.status != 200: + print(f"POST /command HTTP {r.status}", r.read().decode("utf-8", "replace"), file=sys.stderr) + conn.close() + return 1 + + max_eid = 0 + max_eid, done, cid = pump( + r, + "primary", + max_eid, + stop_after_stdout=STDOUT_PER_CHOP, + collector=collector, + ) + conn.close() + if not cid: + print("no init", file=sys.stderr) + return 1 + if done: + print("command finished on primary connection (unexpected)", file=sys.stderr) + return 0 + + for round_i in range(RESUME_CHOPS): + path = resume_tmpl.format(id=cid) + f"?after_eid={max_eid}" + tag = f"resume{round_i + 1}" + c2 = connect(scheme, host, port) + c2.request("GET", path, headers=h) + r2 = c2.getresponse() + if r2.status == 409: + print( + f"{tag} HTTP 409: primary SSE still active; retry later or increase STDOUT_PER_CHOP", + file=sys.stderr, + ) + print(r2.read().decode("utf-8", "replace"), file=sys.stderr) + c2.close() + return 1 + if r2.status != 200: + print(f"{tag} HTTP {r2.status}", r2.read().decode("utf-8", "replace"), file=sys.stderr) + c2.close() + return 1 + + last = round_i == RESUME_CHOPS - 1 + max_eid, done, _ = pump( + r2, + tag, + max_eid, + stop_after_stdout=None if last else STDOUT_PER_CHOP, + collector=collector, + ) + c2.close() + if done: + try: + collector.assert_ok() + except AssertionError as e: + print(f"ASSERT failed: {e}", file=sys.stderr) + return 1 + print("done.") + return 0 + + print("done (unexpected: should have completed in last resume)", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/specs/execd-api.yaml b/specs/execd-api.yaml index 2a46a484b..229b8a7d2 100644 --- a/specs/execd-api.yaml +++ b/specs/execd-api.yaml @@ -502,6 +502,56 @@ paths: "500": $ref: "#/components/responses/InternalServerError" + /command/{id}/resume: + get: + summary: Resume command SSE stream (replay and optional live tail) + description: | + Replays stdout/stderr events from the server-side ring buffer for events with + `eid` strictly greater than `after_eid`, then—if the command is still running and + no other client holds the primary SSE slot—continues streaming live events until + completion or client disconnect. Event shape matches `POST /command` (`ServerStreamEvent`). + + This endpoint is mutually exclusive with the primary `POST /command` SSE: if that + connection is still active, the server responds with 409 Conflict. + operationId: resumeCommandStream + tags: + - Command + parameters: + - name: id + in: path + required: true + description: Command ID returned by RunCommand + schema: + type: string + example: cmd-abc123 + - name: after_eid + in: query + required: false + description: | + Only events with `eid` greater than this value are replayed from the buffer first + (then optional live tail). Omit or use `0` to replay from the oldest buffered events. + schema: + type: integer + format: int64 + minimum: 0 + default: 0 + example: 42 + responses: + "200": + description: Stream of command execution events (replay then optional live continuation) + content: + text/event-stream: + schema: + $ref: "#/components/schemas/ServerStreamEvent" + "400": + $ref: "#/components/responses/BadRequest" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + "500": + $ref: "#/components/responses/InternalServerError" + /command/{id}/logs: get: summary: Get background command stdout/stderr (non-streamed) @@ -1396,6 +1446,16 @@ components: code: FILE_NOT_FOUND message: "file not found" + Conflict: + description: Request conflicts with current server state (e.g. resource in use) + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + example: + code: INVALID_REQUEST_BODY + message: "primary SSE stream is still active; disconnect it before resuming" + InternalServerError: description: Runtime server error during operation content: