Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/execd-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jobs:

sleep 5
python3 tests/smoke_api.py
python3 tests/command_resume_test.py
- name: Show logs
if: always()
run: |
Expand Down
1 change: 1 addition & 0 deletions components/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ English | [中文](README_zh.md)
- Proper signal forwarding with process groups
- Real-time stdout/stderr streaming
- Context-aware interruption
- **SSE resume (foreground `POST /command` and `POST /code`)**: stdout/stderr are copied to a bounded ring buffer while the primary SSE is active. If the client disconnects, **`GET /command/{id}/resume?after_eid=<eid>`** replays buffered events (`eid > after_eid`) and may attach as the sole live consumer if the command is still running. An active primary stream returns **`409 Conflict`**. See `specs/execd-api.yaml`.

### Filesystem

Expand Down
1 change: 1 addition & 0 deletions components/execd/README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- 通过进程组管理正确转发信号
- 实时 stdout/stderr 流式输出
- 支持上下文感知的中断
- **SSE 断线续传(前台 `POST /command` / `POST /code`)**:主 SSE 存活期间 stdout/stderr 会写入有界环形缓冲;客户端断开后可用 **`GET /command/{id}/resume?after_eid=<eid>`** 按 `eid` 重放并可在命令仍运行时独占续传;主连接仍占用时返回 **`409 Conflict`**。OpenAPI 见 `specs/execd-api.yaml`。

### 文件系统

Expand Down
32 changes: 16 additions & 16 deletions components/execd/RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ Thanks to these contributors ❤️
- @csdbianhua

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.8
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.8

# components/execd 1.0.7

Expand Down Expand Up @@ -70,8 +70,8 @@ Thanks to these contributors ❤️
- @dependabot

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.7
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.7

# components/execd 1.0.6

Expand All @@ -93,8 +93,8 @@ Thanks to these contributors ❤️
- @dependabot

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.6
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.6

# components/execd 1.0.5

Expand All @@ -111,8 +111,8 @@ Thanks to these contributors ❤️
- @Pangjiping

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.5
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.5

# components/execd 1.0.4

Expand All @@ -135,8 +135,8 @@ Thanks to these contributors ❤️
- @ninan-nn

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.4
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.4

# components/execd 1.0.3

Expand All @@ -159,8 +159,8 @@ Thanks to these contributors ❤️
- @jwx0925

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.3
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.3

# components/execd 1.0.2

Expand Down Expand Up @@ -188,8 +188,8 @@ Thanks to these contributors ❤️
- @ninan-nn

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.2
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.2

# components/execd 1.0.1

Expand Down Expand Up @@ -221,8 +221,8 @@ Thanks to these contributors ❤️
- @jwx0925

---
- Docker Hub: opensandbox/execd:v1.0.9
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.9
- Docker Hub: opensandbox/execd:v1.0.1
- Aliyun Registry: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/execd:v1.0.1

# components/execd 1.0.0

Expand Down
2 changes: 1 addition & 1 deletion components/execd/pkg/runtime/bash_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *bashSession) run(ctx context.Context, request *ExecuteCodeRequest) erro
continue
}
if request.Hooks.OnExecuteStdout != nil {
request.Hooks.OnExecuteStdout(line)
request.Hooks.OnExecuteStdout(request.nextStdoutStderrEventID(), line)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions components/execd/pkg/runtime/bash_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBashSession_NonZeroExitEmitsError(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) { stdoutLine = s },
OnExecuteStdout: func(_ int64, s string) { stdoutLine = s },
OnExecuteError: func(err *execute.ErrorOutput) { errCh <- err },
OnExecuteComplete: func(_ time.Duration) {
completeCh <- struct{}{}
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestBashSession_envAndExitCode(t *testing.T) {
require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit")
initCalls++
},
OnExecuteStdout: func(text string) {
OnExecuteStdout: func(_ int64, text string) {
t.Log(text)
stdoutLines = append(stdoutLines, text)
},
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestBashSession_envLargeOutputChained(t *testing.T) {
require.Equal(t, session.config.Session, ctx, "unexpected session in OnExecuteInit")
initCalls++
},
OnExecuteStdout: func(text string) {
OnExecuteStdout: func(_ int64, text string) {
t.Log(text)
stdoutLines = append(stdoutLines, text)
},
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestBashSession_cwdPersistsWithoutOverride(t *testing.T) {
targetDir := t.TempDir()
var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestBashSession_requestCwdOverridesAfterCd(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestBashSession_envDumpNotLeakedWhenNoTrailingNewline(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestBashSession_envDumpNotLeakedWhenNoOutput(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestBashSession_heredoc(t *testing.T) {
t.Cleanup(func() { _ = controller.DeleteBashSession(sessionID) })

hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
fmt.Printf("[stdout] %s\n", line)
},
OnExecuteComplete: func(d time.Duration) {
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestBashSession_execReplacesShell(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestBashSession_complexExec(t *testing.T) {

var stdoutLines []string
hooks := ExecuteResultHook{
OnExecuteStdout: func(line string) {
OnExecuteStdout: func(_ int64, line string) {
stdoutLines = append(stdoutLines, line)
},
}
Expand Down
4 changes: 2 additions & 2 deletions components/execd/pkg/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
wg.Add(2)
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stdoutPath, request.Hooks.OnExecuteStdout, done)
c.tailStdPipe(stdoutPath, request.wrapStdoutPipeHook(), done)
})
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stderrPath, request.Hooks.OnExecuteStderr, done)
c.tailStdPipe(stderrPath, request.wrapStderrPipeHook(), done)
})

err = cmd.Start()
Expand Down
8 changes: 4 additions & 4 deletions components/execd/pkg/runtime/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func TestRunCommand_Echo(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) {
OnExecuteStdout: func(_ int64, s string) {
stdoutLines = append(stdoutLines, s)
},
OnExecuteStderr: func(s string) {
OnExecuteStderr: func(_ int64, s string) {
stderrLines = append(stderrLines, s)
},
OnExecuteError: func(err *execute.ErrorOutput) {
Expand Down Expand Up @@ -188,8 +188,8 @@ func TestRunCommand_Error(t *testing.T) {
Timeout: 5 * time.Second,
Hooks: ExecuteResultHook{
OnExecuteInit: func(s string) { sessionID = s },
OnExecuteStdout: func(s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(s string) { stderrLines = append(stderrLines, s) },
OnExecuteStdout: func(_ int64, s string) { stdoutLines = append(stdoutLines, s) },
OnExecuteStderr: func(_ int64, s string) { stderrLines = append(stderrLines, s) },
OnExecuteError: func(err *execute.ErrorOutput) {
gotErr = err
completeCh <- struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions components/execd/pkg/runtime/command_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest

done := make(chan struct{}, 1)
safego.Go(func() {
c.tailStdPipe(c.stdoutFileName(session), request.Hooks.OnExecuteStdout, done)
c.tailStdPipe(c.stdoutFileName(session), request.wrapStdoutPipeHook(), done)
})
safego.Go(func() {
c.tailStdPipe(c.stderrFileName(session), request.Hooks.OnExecuteStderr, done)
c.tailStdPipe(c.stderrFileName(session), request.wrapStderrPipeHook(), done)
})

err = cmd.Start()
Expand Down
11 changes: 9 additions & 2 deletions components/execd/pkg/runtime/jupyter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,16 @@ func dispatchExecutionResultHooks(request *ExecuteCodeRequest, result *execute.E
for _, stream := range result.Stream {
switch stream.Name {
case execute.StreamStdout:
request.Hooks.OnExecuteStdout(stream.Text)
if stream.Text != "" && request.Hooks.OnExecuteStdout != nil {
eid := request.nextStdoutStderrEventID()
request.Hooks.OnExecuteStdout(eid, stream.Text)
}
case execute.StreamStderr:
request.Hooks.OnExecuteStderr(stream.Text)
if stream.Text != "" && request.Hooks.OnExecuteStderr != nil {
eid := request.nextStdoutStderrEventID()
request.Hooks.OnExecuteStderr(eid, stream.Text)
}
default:
}
}
}
Expand Down
38 changes: 34 additions & 4 deletions components/execd/pkg/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package runtime
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/alibaba/opensandbox/execd/pkg/jupyter/execute"
Expand All @@ -27,8 +28,8 @@ type ExecuteResultHook struct {
OnExecuteInit func(context string)
OnExecuteResult func(result map[string]any, count int)
OnExecuteStatus func(status string)
OnExecuteStdout func(stdout string) //nolint:predeclared
OnExecuteStderr func(stderr string) //nolint:predeclared
OnExecuteStdout func(eid int64, stdout string) //nolint:predeclared
OnExecuteStderr func(eid int64, stderr string) //nolint:predeclared
OnExecuteError func(err *execute.ErrorOutput)
OnExecuteComplete func(executionTime time.Duration)
}
Expand All @@ -44,6 +45,35 @@ type ExecuteCodeRequest struct {
Uid *uint32 `json:"uid,omitempty"`
Gid *uint32 `json:"gid,omitempty"`
Hooks ExecuteResultHook

eventSeq atomic.Uint64
}

func (req *ExecuteCodeRequest) nextStdoutStderrEventID() int64 {
if req == nil {
return 0
}
return int64(req.eventSeq.Add(1))
}

func (req *ExecuteCodeRequest) wrapStdoutPipeHook() func(string) {
return func(text string) {
if text == "" || req.Hooks.OnExecuteStdout == nil {
return
}
eid := req.nextStdoutStderrEventID()
req.Hooks.OnExecuteStdout(eid, text)
}
}

func (req *ExecuteCodeRequest) wrapStderrPipeHook() func(string) {
return func(text string) {
if text == "" || req.Hooks.OnExecuteStderr == nil {
return
}
eid := req.nextStdoutStderrEventID()
req.Hooks.OnExecuteStderr(eid, text)
}
}

// SetDefaultHooks installs stdout logging fallbacks for unset hooks.
Expand All @@ -55,10 +85,10 @@ func (req *ExecuteCodeRequest) SetDefaultHooks() {
req.Hooks.OnExecuteStatus = func(status string) { fmt.Printf("OnExecuteStatus: %s\n", status) }
}
if req.Hooks.OnExecuteStdout == nil {
req.Hooks.OnExecuteStdout = func(stdout string) { fmt.Printf("OnExecuteStdout: %s\n", stdout) }
req.Hooks.OnExecuteStdout = func(eid int64, stdout string) { fmt.Printf("OnExecuteStdout: eid=%d %s\n", eid, stdout) }
}
if req.Hooks.OnExecuteStderr == nil {
req.Hooks.OnExecuteStderr = func(stderr string) { fmt.Printf("OnExecuteStderr: %s\n", stderr) }
req.Hooks.OnExecuteStderr = func(eid int64, stderr string) { fmt.Printf("OnExecuteStderr: eid=%d %s\n", eid, stderr) }
}
if req.Hooks.OnExecuteError == nil {
req.Hooks.OnExecuteError = func(err *execute.ErrorOutput) { fmt.Printf("OnExecuteError: %++v\n", err) }
Expand Down
37 changes: 37 additions & 0 deletions components/execd/pkg/sbuf/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2026 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sbuf

// Config controls per-stream bounds and append policy.
type Config struct {
// MaxEvents is the maximum number of events retained per stream. Oldest events are dropped when exceeded.
// Zero defaults to DefaultMaxEvents.
MaxEvents int
// MaxBytes is the approximate upper bound on total payload bytes per stream (sum of len(Payload)).
// Oldest events are dropped until under the limit. Zero means no byte limit.
MaxBytes int64
// StrictMonotonic rejects Append when eid <= last eid for that stream. Recommended for execd SSE eids.
StrictMonotonic bool
}

const DefaultMaxEvents = 1024

func (c *Config) normalized() Config {
out := *c
if out.MaxEvents <= 0 {
out.MaxEvents = DefaultMaxEvents
}
return out
}
Loading
Loading