diff --git a/components/egress/go.mod b/components/egress/go.mod index 3e202394c..71003776b 100644 --- a/components/egress/go.mod +++ b/components/egress/go.mod @@ -8,6 +8,7 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.41.0 go.opentelemetry.io/otel/metric v1.41.0 + go.uber.org/automaxprocs v1.6.0 golang.org/x/sys v0.41.0 ) @@ -38,6 +39,8 @@ require ( google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/apimachinery v0.34.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect ) replace github.com/alibaba/opensandbox/internal => ../internal diff --git a/components/egress/go.sum b/components/egress/go.sum index 4e9d88688..04b00defe 100644 --- a/components/egress/go.sum +++ b/components/egress/go.sum @@ -25,6 +25,8 @@ github.com/miekg/dns v1.1.61 h1:nLxbwF3XxhwVSm8g9Dghm9MHPaUZuqhPiGL+675ZmEs= github.com/miekg/dns v1.1.61/go.mod h1:mnAarhS3nWaW+NVP2wTkYVIZyHNJ098SJZUki3eykwQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -45,6 +47,8 @@ go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -78,3 +82,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= +k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/components/egress/main.go b/components/egress/main.go index 7492f7894..e5eef88ef 100644 --- a/components/egress/main.go +++ b/components/egress/main.go @@ -23,6 +23,9 @@ import ( "syscall" "time" + _ "github.com/alibaba/opensandbox/internal/safego" + _ "go.uber.org/automaxprocs/maxprocs" + "github.com/alibaba/opensandbox/egress/pkg/constants" "github.com/alibaba/opensandbox/egress/pkg/dnsproxy" "github.com/alibaba/opensandbox/egress/pkg/events" @@ -31,6 +34,7 @@ import ( "github.com/alibaba/opensandbox/egress/pkg/policy" "github.com/alibaba/opensandbox/egress/pkg/telemetry" slogger "github.com/alibaba/opensandbox/internal/logger" + "github.com/alibaba/opensandbox/internal/safego" "github.com/alibaba/opensandbox/internal/version" ) @@ -119,6 +123,7 @@ func withLogger(ctx context.Context) context.Context { base = base.With(extra...) } logger := base.Named("opensandbox.egress") + safego.InitPanicLogger(ctx, logger) return log.WithLogger(ctx, logger) } diff --git a/components/egress/pkg/dnsproxy/proxy.go b/components/egress/pkg/dnsproxy/proxy.go index ee040bce2..162fa0c54 100644 --- a/components/egress/pkg/dnsproxy/proxy.go +++ b/components/egress/pkg/dnsproxy/proxy.go @@ -34,6 +34,7 @@ import ( "github.com/alibaba/opensandbox/egress/pkg/policy" "github.com/alibaba/opensandbox/egress/pkg/telemetry" slogger "github.com/alibaba/opensandbox/internal/logger" + "github.com/alibaba/opensandbox/internal/safego" ) const defaultListenAddr = "127.0.0.1:15353" @@ -122,11 +123,11 @@ func (p *Proxy) Start(ctx context.Context) error { errCh := make(chan error, len(p.servers)) for _, srv := range p.servers { s := srv - go func() { + safego.Go(func() { if err := s.ListenAndServe(); err != nil { errCh <- err } - }() + }) } timer := time.NewTimer(200 * time.Millisecond) @@ -138,7 +139,7 @@ func (p *Proxy) Start(ctx context.Context) error { // listeners bound; start upstream probes only after DNS servers are up } - go p.runUpstreamProbes(ctx) + safego.Go(func() { p.runUpstreamProbes(ctx) }) return nil } diff --git a/components/egress/pkg/dnsproxy/upstream.go b/components/egress/pkg/dnsproxy/upstream.go index b7bf9b50a..5dd5034d7 100644 --- a/components/egress/pkg/dnsproxy/upstream.go +++ b/components/egress/pkg/dnsproxy/upstream.go @@ -24,6 +24,8 @@ import ( "github.com/miekg/dns" + "github.com/alibaba/opensandbox/internal/safego" + "github.com/alibaba/opensandbox/egress/pkg/constants" "github.com/alibaba/opensandbox/egress/pkg/log" ) @@ -97,10 +99,12 @@ func (p *Proxy) probeUpstreams() { var wg sync.WaitGroup for i := range all { wg.Add(1) - go func(idx int, addr string) { + idx := i + addr := all[i] + safego.Go(func() { defer wg.Done() healthy[idx] = p.probeOneUpstream(addr, timeout) - }(i, all[i]) + }) } wg.Wait() diff --git a/components/egress/pkg/events/broadcaster.go b/components/egress/pkg/events/broadcaster.go index 223e1af4f..9f53fd43e 100644 --- a/components/egress/pkg/events/broadcaster.go +++ b/components/egress/pkg/events/broadcaster.go @@ -21,6 +21,7 @@ import ( "time" "github.com/alibaba/opensandbox/egress/pkg/log" + "github.com/alibaba/opensandbox/internal/safego" ) const defaultQueueSize = 128 @@ -76,7 +77,7 @@ func (b *Broadcaster) AddSubscriber(sub Subscriber) { b.subscribers = append(b.subscribers, ch) b.mu.Unlock() - go func() { + safego.Go(func() { for { select { case <-b.ctx.Done(): @@ -88,7 +89,7 @@ func (b *Broadcaster) AddSubscriber(sub Subscriber) { sub.HandleBlocked(b.ctx, ev) } } - }() + }) } // Publish sends an event to all subscribers; drops and logs when a subscriber queue is full. diff --git a/components/egress/policy_server.go b/components/egress/policy_server.go index cc9d381ec..54e08794c 100644 --- a/components/egress/policy_server.go +++ b/components/egress/policy_server.go @@ -30,6 +30,7 @@ import ( "github.com/alibaba/opensandbox/egress/pkg/log" "github.com/alibaba/opensandbox/egress/pkg/nftables" "github.com/alibaba/opensandbox/egress/pkg/policy" + "github.com/alibaba/opensandbox/internal/safego" ) type policyUpdater interface { @@ -81,22 +82,22 @@ func startPolicyServer(proxy policyUpdater, nft nftApplier, enforcementMode stri handler.server = srv errCh := make(chan error, 1) - go func() { + safego.Go(func() { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } - }() + }) select { case err := <-errCh: return nil, err case <-time.After(200 * time.Millisecond): // assume healthy start; keep logging future errors - go func() { + safego.Go(func() { if err := <-errCh; err != nil { log.Errorf("policy server error: %v", err) } - }() + }) return srv, nil } } diff --git a/components/execd/DEVELOPMENT.md b/components/execd/DEVELOPMENT.md index f44e2547d..706efe9de 100644 --- a/components/execd/DEVELOPMENT.md +++ b/components/execd/DEVELOPMENT.md @@ -159,7 +159,7 @@ logs.Debug("received event: type=%s", eventType) Always use `safego.Go` to prevent panics: ```go -import "github.com/alibaba/opensandbox/execd/pkg/util/safego" +import "github.com/alibaba/opensandbox/internal/safego" safego.Go(func() { processInBackground() diff --git a/components/execd/go.mod b/components/execd/go.mod index f27e0c550..70b119ac2 100644 --- a/components/execd/go.mod +++ b/components/execd/go.mod @@ -53,7 +53,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect - go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.50.0 // indirect @@ -63,8 +63,8 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect - sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect ) diff --git a/components/execd/go.sum b/components/execd/go.sum index 4c7e15e99..732739a94 100644 --- a/components/execd/go.sum +++ b/components/execd/go.sum @@ -125,8 +125,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= -go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= @@ -182,12 +182,12 @@ k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y= -k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= -sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= -sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= diff --git a/components/execd/main.go b/components/execd/main.go index 79678c1d9..798fb0851 100644 --- a/components/execd/main.go +++ b/components/execd/main.go @@ -20,12 +20,12 @@ import ( "github.com/alibaba/opensandbox/internal/version" + _ "github.com/alibaba/opensandbox/internal/safego" _ "go.uber.org/automaxprocs/maxprocs" "github.com/alibaba/opensandbox/execd/pkg/clone3compat" "github.com/alibaba/opensandbox/execd/pkg/flag" "github.com/alibaba/opensandbox/execd/pkg/log" - _ "github.com/alibaba/opensandbox/execd/pkg/util/safego" "github.com/alibaba/opensandbox/execd/pkg/web" "github.com/alibaba/opensandbox/execd/pkg/web/controller" ) diff --git a/components/execd/pkg/jupyter/execute/execute.go b/components/execd/pkg/jupyter/execute/execute.go index 82ba5df00..7d0e1bfea 100644 --- a/components/execd/pkg/jupyter/execute/execute.go +++ b/components/execd/pkg/jupyter/execute/execute.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/alibaba/opensandbox/internal/safego" "github.com/google/uuid" "github.com/gorilla/websocket" @@ -90,7 +91,7 @@ func (c *Client) Connect(wsURL string) error { c.registerDefaultHandlers() // Start message receiving goroutine - go c.receiveMessages() + safego.Go(func() { c.receiveMessages() }) return nil } @@ -286,7 +287,7 @@ func (c *Client) handleExecutionStatus(msg *Message, state *streamExecutionState return } state.executeDone = true - go c.finalizeExecution(state, resultChan) + safego.Go(func() { c.finalizeExecution(state, resultChan) }) } func (c *Client) finalizeExecution(state *streamExecutionState, resultChan chan *ExecutionResult) { diff --git a/components/execd/pkg/log/log.go b/components/execd/pkg/log/log.go index 6ced8a7f3..e50626822 100644 --- a/components/execd/pkg/log/log.go +++ b/components/execd/pkg/log/log.go @@ -15,9 +15,11 @@ package log import ( + "context" "os" slogger "github.com/alibaba/opensandbox/internal/logger" + "github.com/alibaba/opensandbox/internal/safego" ) const logFileEnvKey = "EXECD_LOG_FILE" @@ -28,6 +30,7 @@ var current slogger.Logger // Legacy levels: 0/1/2=fatal, 3=error, 4=warn, 5/6=info, 7+=debug. func Init(level int) { current = newLogger(mapLevel(level)) + safego.InitPanicLogger(context.Background(), current) } func mapLevel(level int) string { diff --git a/components/execd/pkg/runtime/bash_session_test.go b/components/execd/pkg/runtime/bash_session_test.go index b18af3de0..d054f0fa5 100644 --- a/components/execd/pkg/runtime/bash_session_test.go +++ b/components/execd/pkg/runtime/bash_session_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" + "github.com/alibaba/opensandbox/internal/safego" ) func TestBashSession_NonZeroExitEmitsError(t *testing.T) { @@ -519,9 +520,9 @@ func TestBashSession_CloseKillsRunningProcess(t *testing.T) { Timeout: 60 * time.Second, Hooks: ExecuteResultHook{}, } - go func() { + safego.Go(func() { runDone <- session.run(context.Background(), req) - }() + }) // Give the child process time to start. time.Sleep(200 * time.Millisecond) @@ -557,9 +558,9 @@ func TestBashSession_DeleteBashSessionKillsRunningProcess(t *testing.T) { Timeout: 60 * time.Second, Hooks: ExecuteResultHook{}, } - go func() { + safego.Go(func() { runDone <- c.RunInBashSession(context.Background(), req) - }() + }) time.Sleep(200 * time.Millisecond) @@ -585,10 +586,10 @@ func TestBashSession_CloseWithNoActiveRun(t *testing.T) { require.NoError(t, session.start()) done := make(chan struct{}, 1) - go func() { + safego.Go(func() { _ = session.close() done <- struct{}{} - }() + }) select { case <-done: diff --git a/components/execd/pkg/runtime/command.go b/components/execd/pkg/runtime/command.go index 208b541ab..65fdc05ba 100644 --- a/components/execd/pkg/runtime/command.go +++ b/components/execd/pkg/runtime/command.go @@ -30,9 +30,10 @@ import ( "syscall" "time" + "github.com/alibaba/opensandbox/internal/safego" + "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" "github.com/alibaba/opensandbox/execd/pkg/log" - "github.com/alibaba/opensandbox/execd/pkg/util/safego" ) // getShell returns the preferred shell, falling back to sh if bash is not available. @@ -157,7 +158,7 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest c.storeCommandKernel(session, kernel) request.Hooks.OnExecuteInit(session) - go func() { + safego.Go(func() { for { select { case <-ctx.Done(): @@ -172,7 +173,7 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest } } } - }() + }) err = cmd.Wait() close(done) diff --git a/components/execd/pkg/runtime/command_windows.go b/components/execd/pkg/runtime/command_windows.go index 888bd5e89..4938520d0 100644 --- a/components/execd/pkg/runtime/command_windows.go +++ b/components/execd/pkg/runtime/command_windows.go @@ -28,7 +28,7 @@ import ( "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" "github.com/alibaba/opensandbox/execd/pkg/log" - "github.com/alibaba/opensandbox/execd/pkg/util/safego" + "github.com/alibaba/opensandbox/internal/safego" ) // runCommand executes shell commands and streams their output on Windows. diff --git a/components/execd/pkg/runtime/interrupt.go b/components/execd/pkg/runtime/interrupt.go index 67902a3d6..b9cd2a545 100644 --- a/components/execd/pkg/runtime/interrupt.go +++ b/components/execd/pkg/runtime/interrupt.go @@ -25,6 +25,8 @@ import ( "syscall" "time" + "github.com/alibaba/opensandbox/internal/safego" + "github.com/alibaba/opensandbox/execd/pkg/log" ) @@ -60,10 +62,10 @@ func (c *Controller) killPid(pid int) error { log.Warning("SIGTERM failed for pid %d: %v, trying SIGKILL", pid, err) } else { done := make(chan error, 1) - go func() { + safego.Go(func() { _, err := process.Wait() done <- err - }() + }) select { case err := <-done: diff --git a/components/execd/pkg/runtime/interrupt_windows.go b/components/execd/pkg/runtime/interrupt_windows.go index 611c7ea7d..6e1044d77 100644 --- a/components/execd/pkg/runtime/interrupt_windows.go +++ b/components/execd/pkg/runtime/interrupt_windows.go @@ -24,6 +24,7 @@ import ( "time" "github.com/alibaba/opensandbox/execd/pkg/log" + "github.com/alibaba/opensandbox/internal/safego" ) // Interrupt stops execution in the specified session. @@ -55,10 +56,10 @@ func (c *Controller) killPid(pid int) error { // Best-effort wait to reduce zombies; os.Process.Wait only works for child processes. done := make(chan error, 1) - go func() { + safego.Go(func() { _, err := process.Wait() done <- err - }() + }) select { case <-done: diff --git a/components/execd/pkg/runtime/pty_session.go b/components/execd/pkg/runtime/pty_session.go index 74a0e38ef..453004f7c 100644 --- a/components/execd/pkg/runtime/pty_session.go +++ b/components/execd/pkg/runtime/pty_session.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "syscall" + "github.com/alibaba/opensandbox/internal/safego" "github.com/creack/pty" "github.com/alibaba/opensandbox/execd/pkg/log" @@ -183,8 +184,8 @@ func (s *ptySession) StartPTY() error { s.doneCh = make(chan struct{}) s.stdin = ptmx // write to the PTY master to feed stdin - go s.broadcastPTY() - go s.waitAndExit(cmd, ptmx) + safego.Go(func() { s.broadcastPTY() }) + safego.Go(func() { s.waitAndExit(cmd, ptmx) }) return nil } @@ -251,9 +252,9 @@ func (s *ptySession) StartPipe() error { s.doneCh = make(chan struct{}) s.stdin = stdinW - go s.broadcastPipe(stdoutR, true) - go s.broadcastPipe(stderrR, false) - go s.waitAndExitPipe(cmd, stdinW, stdoutR, stderrR) + safego.Go(func() { s.broadcastPipe(stdoutR, true) }) + safego.Go(func() { s.broadcastPipe(stderrR, false) }) + safego.Go(func() { s.waitAndExitPipe(cmd, stdinW, stdoutR, stderrR) }) return nil } diff --git a/components/execd/pkg/runtime/pty_session_test.go b/components/execd/pkg/runtime/pty_session_test.go index 52ad1f8a2..029bb74be 100644 --- a/components/execd/pkg/runtime/pty_session_test.go +++ b/components/execd/pkg/runtime/pty_session_test.go @@ -27,6 +27,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/alibaba/opensandbox/internal/safego" ) // replayContains polls the replay buffer until it contains substr or timeout expires. @@ -54,7 +56,7 @@ func TestPTYSession_BasicExecution(t *testing.T) { stdoutR, _, detach := s.AttachOutput() defer detach() - go io.Copy(io.Discard, stdoutR) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR) }) //nolint:errcheck _, err := s.WriteStdin([]byte("echo hello_pty\n")) require.NoError(t, err) @@ -87,7 +89,7 @@ func TestPTYSession_ResizeWinsize(t *testing.T) { // Attach output so the broadcast goroutine has a sink and fills the replay buffer. stdoutR, _, detach := s.AttachOutput() defer detach() - go io.Copy(io.Discard, stdoutR) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR) }) //nolint:errcheck // Wait for bash to start (prompt appears). time.Sleep(150 * time.Millisecond) @@ -113,7 +115,7 @@ func TestPTYSession_ANSISequences(t *testing.T) { stdoutR, _, detach := s.AttachOutput() defer detach() - go io.Copy(io.Discard, stdoutR) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR) }) //nolint:errcheck // Send printf with explicit ESC bytes via $'\033'. _, err := s.WriteStdin([]byte("printf $'\\033[1;32mGREEN\\033[0m\\n'\n")) @@ -144,20 +146,20 @@ func TestPTYSession_PipeMode(t *testing.T) { require.NotNil(t, stderrR) stdoutCh := make(chan string, 32) - go func() { + safego.Go(func() { sc := bufio.NewScanner(stdoutR) for sc.Scan() { stdoutCh <- sc.Text() } - }() + }) stderrCh := make(chan string, 32) - go func() { + safego.Go(func() { sc := bufio.NewScanner(stderrR) for sc.Scan() { stderrCh <- sc.Text() } - }() + }) _, err := s.WriteStdin([]byte("echo hello_pipe\necho err_pipe >&2\n")) require.NoError(t, err) @@ -179,7 +181,7 @@ func TestPTYSession_ReconnectReplay(t *testing.T) { // First connection — drain output so replay buffer fills. stdoutR1, _, detach1 := s.AttachOutput() - go io.Copy(io.Discard, stdoutR1) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR1) }) //nolint:errcheck _, err := s.WriteStdin([]byte("echo first_output\n")) require.NoError(t, err) @@ -201,7 +203,7 @@ func TestPTYSession_ReconnectReplay(t *testing.T) { // Second connection. stdoutR2, _, detach2 := s.AttachOutput() defer detach2() - go io.Copy(io.Discard, stdoutR2) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR2) }) //nolint:errcheck offsetAfterFirst := int64(len(replay)) @@ -228,7 +230,7 @@ func TestPTYSession_SendSIGINT(t *testing.T) { stdoutR, _, detach := s.AttachOutput() defer detach() - go io.Copy(io.Discard, stdoutR) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR) }) //nolint:errcheck // Start a sleep inside the PTY. _, err := s.WriteStdin([]byte("sleep 30\n")) @@ -274,7 +276,7 @@ func TestPTYSession_ExitCode(t *testing.T) { require.NoError(t, s.StartPTY()) stdoutR, _, detach := s.AttachOutput() - go io.Copy(io.Discard, stdoutR) //nolint:errcheck + safego.Go(func() { _, _ = io.Copy(io.Discard, stdoutR) }) //nolint:errcheck _, _ = s.WriteStdin([]byte("exit 42\n")) diff --git a/components/execd/pkg/web/controller/pty_ws.go b/components/execd/pkg/web/controller/pty_ws.go index 6273557f5..8138304c9 100644 --- a/components/execd/pkg/web/controller/pty_ws.go +++ b/components/execd/pkg/web/controller/pty_ws.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/alibaba/opensandbox/internal/safego" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -50,7 +51,7 @@ const ( // 2. Acquire exclusive WS lock → 409 if already held // 3. Upgrade HTTP → WebSocket // 4. Start bash if not already running -// 5+6. AtomicAttachOutputWithSnapshot (snapshot + attach under outMu — no loss window) +// 5+6. AtomicAttachOutputWithSnapshot (snapshot + attach under outMu — no loss window) // 7. defer: detach → pumpWg.Wait → UnlockWS // 8. Send replay frame if snapshot non-empty // 9. Send connected frame @@ -187,21 +188,25 @@ func PTYSessionWebSocket(ctx *gin.Context) { } // 10a. RFC 6455 binary ping goroutine (30 s interval). - go ptyPingLoop(conn, &connMu, cancelCh, cancelOnce) + safego.Go(func() { ptyPingLoop(conn, &connMu, cancelCh, cancelOnce) }) // 10b. Launch stdout pump. pumpWg.Add(1) - go ptyStreamPump(stdoutR, model.BinStdout, "stdout", id, conn, &connMu, &pumpWg, cancelCh, cancelOnce) + safego.Go(func() { + ptyStreamPump(stdoutR, model.BinStdout, "stdout", id, conn, &connMu, &pumpWg, cancelCh, cancelOnce) + }) // 10c. Launch stderr pump (pipe mode only). if stderrR != nil { pumpWg.Add(1) - go ptyStreamPump(stderrR, model.BinStderr, "stderr", id, conn, &connMu, &pumpWg, cancelCh, cancelOnce) + safego.Go(func() { + ptyStreamPump(stderrR, model.BinStderr, "stderr", id, conn, &connMu, &pumpWg, cancelCh, cancelOnce) + }) } // 10d. Exit watcher: waits for the process to exit, then sends exit frame // and closes the WS connection immediately (unblocks ReadJSON in the read loop). - go ptyExitWatcher(session, writeJSON, closeConn, cancelCh, cancelOnce) + safego.Go(func() { ptyExitWatcher(session, writeJSON, closeConn, cancelCh, cancelOnce) }) // 11. Client read loop. ptyClientReadLoop(conn, session, id, writeJSON, cancelCh, cancelOnce) diff --git a/components/execd/pkg/web/controller/sse.go b/components/execd/pkg/web/controller/sse.go index 9e87bda6b..573315260 100644 --- a/components/execd/pkg/web/controller/sse.go +++ b/components/execd/pkg/web/controller/sse.go @@ -20,12 +20,12 @@ import ( "net/http" "time" + "github.com/alibaba/opensandbox/internal/safego" "k8s.io/apimachinery/pkg/util/wait" "github.com/alibaba/opensandbox/execd/pkg/jupyter/execute" "github.com/alibaba/opensandbox/execd/pkg/log" "github.com/alibaba/opensandbox/execd/pkg/runtime" - "github.com/alibaba/opensandbox/execd/pkg/util/safego" "github.com/alibaba/opensandbox/execd/pkg/web/model" ) diff --git a/components/internal/go.mod b/components/internal/go.mod index 36f520872..c6d0da78e 100644 --- a/components/internal/go.mod +++ b/components/internal/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.41.0 go.opentelemetry.io/otel/trace v1.41.0 go.uber.org/zap v1.27.1 + k8s.io/apimachinery v0.34.2 ) require ( @@ -29,4 +30,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect + k8s.io/klog/v2 v2.130.1 // indirect ) diff --git a/components/internal/go.sum b/components/internal/go.sum index 27436c2c6..765e58a17 100644 --- a/components/internal/go.sum +++ b/components/internal/go.sum @@ -61,3 +61,7 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= +k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/components/execd/pkg/util/safego/safe.go b/components/internal/safego/safe.go similarity index 83% rename from components/execd/pkg/util/safego/safe.go rename to components/internal/safego/safe.go index 1f6bb67b9..b2e8515dc 100644 --- a/components/execd/pkg/util/safego/safe.go +++ b/components/internal/safego/safe.go @@ -16,14 +16,14 @@ package safego import ( "context" - "log" "net/http" "runtime" + "github.com/alibaba/opensandbox/internal/logger" runtimeutil "k8s.io/apimachinery/pkg/util/runtime" ) -func InitPanicLogger(_ context.Context) { +func InitPanicLogger(_ context.Context, log logger.Logger) { runtimeutil.PanicHandlers = []func(context.Context, any){ func(_ context.Context, r any) { if r == http.ErrAbortHandler { // nolint:errorlint @@ -34,9 +34,9 @@ func InitPanicLogger(_ context.Context) { stacktrace := make([]byte, size) stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] if _, ok := r.(string); ok { - log.Printf("Observed a panic: %s\n%s", r, stacktrace) + log.Errorf("Observed a panic: %s\n%s", r, stacktrace) } else { - log.Printf("Observed a panic: %#v (%v)\n%s", r, r, stacktrace) + log.Errorf("Observed a panic: %#v (%v)\n%s", r, r, stacktrace) } }, } diff --git a/components/execd/pkg/util/safego/safe_test.go b/components/internal/safego/safe_test.go similarity index 88% rename from components/execd/pkg/util/safego/safe_test.go rename to components/internal/safego/safe_test.go index 6d0d2ebc3..629b5eb01 100644 --- a/components/execd/pkg/util/safego/safe_test.go +++ b/components/internal/safego/safe_test.go @@ -18,13 +18,16 @@ import ( "context" "sync" "testing" + + "github.com/alibaba/opensandbox/internal/logger" ) func Test_Go(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - InitPanicLogger(ctx) + logg, _ := logger.New(logger.Config{}) + InitPanicLogger(ctx, logg) var wg sync.WaitGroup wg.Add(1)