diff --git a/cmd/agent/container/setup.go b/cmd/agent/container/setup.go index 55db5a291..8e3d90583 100644 --- a/cmd/agent/container/setup.go +++ b/cmd/agent/container/setup.go @@ -104,6 +104,14 @@ func (cmd *SetupContainerCmd) Run(ctx context.Context) error { return err } + // The tunnel logger forwards messages over the tunnel asynchronously, so + // flush before returning to make sure the final log lines (e.g. the last + // stderr output of a failing lifecycle hook) reach the client before the + // connection is torn down. ctx is still alive while this defer runs. + if f, ok := logger.(tunnelserver.Flusher); ok { + defer f.Flush() + } + workspaceInfo, setupInfo, err := cmd.parseWorkspaceAndSetupInfo(logger) if err != nil { return err diff --git a/e2e/tests/up/handles_errors.go b/e2e/tests/up/handles_errors.go index 575acf583..096f1d21b 100644 --- a/e2e/tests/up/handles_errors.go +++ b/e2e/tests/up/handles_errors.go @@ -63,6 +63,46 @@ var _ = ginkgo.Describe( ginkgo.SpecTimeout(framework.GetTimeout()), ) + ginkgo.It( + "forwards the final lifecycle hook log line before the agent exits", + func(ctx context.Context) { + f, err := setupDockerProvider(initialDir+"/bin", "docker") + framework.ExpectNoError(err) + ginkgo.DeferCleanup(func(cleanupCtx context.Context) { + _ = f.DevPodProviderDelete(cleanupCtx, "docker") + }) + + tempDir, err := framework.CopyToTempDir( + "tests/up/testdata/docker-lifecycle-stderr", + ) + framework.ExpectNoError(err) + ginkgo.DeferCleanup(framework.CleanupTempDir, initialDir, tempDir) + ginkgo.DeferCleanup(func(cleanupCtx context.Context) { + _ = f.DevPodWorkspaceDelete(cleanupCtx, tempDir, "--force") + }) + + // The postCreateCommand runs a script that prints a burst of + // stderr lines ending with a marker, then exits non-zero. The + // agent forwards lifecycle hook output over the tunnel + // asynchronously; the burst keeps the sender busy so the final + // marker is still queued when the hook fails, and the agent + // must flush it before tearing down. Without the flush the + // marker is dropped. The marker lives in the script (not the + // inline command) so it cannot leak into devpod's "failed to + // run " error and cause a false positive. Regression + // guard for the dropped-last-line bug. + stdout, stderr, err := f.DevPodUpStreams(ctx, tempDir, "--log-output=json") + framework.ExpectError(err, "expected lifecycle hook failure") + framework.ExpectNoError( + findMessage( + strings.NewReader(stdout+stderr), + "DEVPOD_LIFECYCLE_FLUSH_MARKER", + ), + ) + }, + ginkgo.SpecTimeout(framework.GetTimeout()), + ) + ginkgo.It( "ensure workspace cleanup when failing to create a workspace", func(ctx context.Context) { diff --git a/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/devcontainer.json b/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/devcontainer.json new file mode 100644 index 000000000..282004078 --- /dev/null +++ b/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/devcontainer.json @@ -0,0 +1,5 @@ +{ + "name": "lifecycle-stderr", + "image": "mcr.microsoft.com/devcontainers/go:1", + "postCreateCommand": "sh .devcontainer/postCreateCommand.sh" +} diff --git a/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/postCreateCommand.sh b/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/postCreateCommand.sh new file mode 100644 index 000000000..4918cb2ea --- /dev/null +++ b/e2e/tests/up/testdata/docker-lifecycle-stderr/.devcontainer/postCreateCommand.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Regression fixture for the dropped-last-line bug (see the e2e spec +# "forwards the final lifecycle hook log line before the agent exits"). +# +# Emit a burst of stderr lines ending with a marker, then fail. The agent +# forwards lifecycle hook output over the tunnel asynchronously; the burst +# keeps the sender busy so the marker is still queued when the hook exits +# non-zero. Without the flush-on-shutdown fix the agent tears down before the +# marker is sent and it is dropped (the observed drain window is only ~11 +# lines, so the 300-line burst leaves a wide margin). +# +# The marker lives in this script rather than the inline postCreateCommand so +# it cannot leak into devpod's "failed to run " error message and +# produce a false positive. +i=0 +while [ "$i" -lt 300 ]; do + echo "filler line $i" >&2 + i=$((i + 1)) +done +echo DEVPOD_LIFECYCLE_FLUSH_MARKER >&2 +exit 1 diff --git a/pkg/agent/tunnelserver/logger.go b/pkg/agent/tunnelserver/logger.go index 75c40f2fd..7edcd3e77 100644 --- a/pkg/agent/tunnelserver/logger.go +++ b/pkg/agent/tunnelserver/logger.go @@ -27,7 +27,8 @@ func NewTunnelLogger(ctx context.Context, client tunnel.TunnelClient, debug bool ctx: ctx, client: client, level: level, - logChan: make(chan *tunnel.LogMessage, 1000), // Buffer size of 1000 messages + logChan: make(chan logEntry, 1000), // Buffer size of 1000 messages + done: make(chan struct{}), } go logger.worker() @@ -35,28 +36,84 @@ func NewTunnelLogger(ctx context.Context, client tunnel.TunnelClient, debug bool return logger } +// Flusher is implemented by loggers that buffer output and can block until +// everything queued so far has been delivered. NewTunnelLogger returns a value +// satisfying this interface. +type Flusher interface { + Flush() +} + +var _ Flusher = (*tunnelLogger)(nil) + +// logEntry is what flows through the logger channel. It is either a log +// message to forward over the tunnel, or a flush request whose ack channel is +// closed once all preceding messages have been sent. +type logEntry struct { + msg *tunnel.LogMessage + flush chan struct{} +} + type tunnelLogger struct { ctx context.Context level logrus.Level client tunnel.TunnelClient - logChan chan *tunnel.LogMessage + logChan chan logEntry + done chan struct{} // closed when worker() exits fields logrus.Fields } func (s *tunnelLogger) worker() { + defer close(s.done) for { select { - case msg := <-s.logChan: - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - _, _ = s.client.Log(ctx, msg) - // ignore error since we can't use the logger itself - cancel() + case entry := <-s.logChan: + s.handle(entry) case <-s.ctx.Done(): - return + // Best-effort drain so the final lines aren't lost when the + // agent shuts down before the channel has been emptied. + for { + select { + case entry := <-s.logChan: + s.handle(entry) + default: + return + } + } } } } +// flushTimeout caps the entire drain. handle() applies a per-message timeout, +// so without an overall bound a backlog on a stalled tunnel could block +// shutdown for buffer-size × per-message timeout. The cap is generous so a +// healthy drain of a full backlog always completes. +const flushTimeout = 30 * time.Second + +// Flush blocks until all messages queued before the call have been forwarded +// over the tunnel, until the worker has stopped, or until flushTimeout elapses. +// It gates on worker completion rather than s.ctx so it stays a real barrier +// even when s.ctx is already cancelled (e.g. the deferred flush on the shutdown +// path, or Fatal/Fatalf). +func (s *tunnelLogger) Flush() { + ack := make(chan struct{}) + timer := time.NewTimer(flushTimeout) + defer timer.Stop() + + select { + case s.logChan <- logEntry{flush: ack}: + case <-s.done: + return + case <-timer.C: + return + } + + select { + case <-ack: + case <-s.done: + case <-timer.C: + } +} + // formatMessage appends structured fields to the message. func (s *tunnelLogger) formatMessage(message string) string { if len(s.fields) == 0 { @@ -75,10 +132,7 @@ func (s *tunnelLogger) Debug(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_DEBUG, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_DEBUG, fmt.Sprintln(args...)) } func (s *tunnelLogger) Debugf(format string, args ...any) { @@ -86,10 +140,7 @@ func (s *tunnelLogger) Debugf(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_DEBUG, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_DEBUG, fmt.Sprintf(format, args...)+"\n") } func (s *tunnelLogger) Info(args ...any) { @@ -97,10 +148,7 @@ func (s *tunnelLogger) Info(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_INFO, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_INFO, fmt.Sprintln(args...)) } func (s *tunnelLogger) Infof(format string, args ...any) { @@ -108,10 +156,7 @@ func (s *tunnelLogger) Infof(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_INFO, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_INFO, fmt.Sprintf(format, args...)+"\n") } func (s *tunnelLogger) Warn(args ...any) { @@ -119,10 +164,7 @@ func (s *tunnelLogger) Warn(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_WARNING, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_WARNING, fmt.Sprintln(args...)) } func (s *tunnelLogger) Warnf(format string, args ...any) { @@ -130,10 +172,7 @@ func (s *tunnelLogger) Warnf(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_WARNING, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_WARNING, fmt.Sprintf(format, args...)+"\n") } func (s *tunnelLogger) Error(args ...any) { @@ -141,10 +180,7 @@ func (s *tunnelLogger) Error(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_ERROR, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintln(args...)) } func (s *tunnelLogger) Errorf(format string, args ...any) { @@ -152,10 +188,7 @@ func (s *tunnelLogger) Errorf(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_ERROR, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintf(format, args...)+"\n") } func (s *tunnelLogger) Fatal(args ...any) { @@ -163,11 +196,10 @@ func (s *tunnelLogger) Fatal(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_ERROR, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintln(args...)) + // make sure the message is delivered before we exit + s.Flush() os.Exit(1) } @@ -176,11 +208,10 @@ func (s *tunnelLogger) Fatalf(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_ERROR, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintf(format, args...)+"\n") + // make sure the message is delivered before we exit + s.Flush() os.Exit(1) } @@ -189,10 +220,7 @@ func (s *tunnelLogger) Done(args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_DONE, - Message: s.formatMessage(fmt.Sprintln(args...)), - } + s.enqueue(tunnel.LogLevel_DONE, fmt.Sprintln(args...)) } func (s *tunnelLogger) Donef(format string, args ...any) { @@ -200,10 +228,7 @@ func (s *tunnelLogger) Donef(format string, args ...any) { return } - s.logChan <- &tunnel.LogMessage{ - LogLevel: tunnel.LogLevel_DONE, - Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"), - } + s.enqueue(tunnel.LogLevel_DONE, fmt.Sprintf(format, args...)+"\n") } func (s *tunnelLogger) Print(level logrus.Level, args ...any) { @@ -308,6 +333,7 @@ func (s *tunnelLogger) WithFields(fields logrus.Fields) log.Logger { client: s.client, level: s.level, logChan: s.logChan, + done: s.done, fields: newFields, } } @@ -315,3 +341,27 @@ func (s *tunnelLogger) WithFields(fields logrus.Fields) log.Logger { func (*tunnelLogger) LogrLogSink() logr.LogSink { return nil } + +// enqueue formats and queues a message for the worker to forward. +func (s *tunnelLogger) enqueue(level tunnel.LogLevel, message string) { + s.logChan <- logEntry{msg: &tunnel.LogMessage{ + LogLevel: level, + Message: s.formatMessage(message), + }} +} + +// handle forwards a single message over the tunnel, or acknowledges a flush +// request. Sends use a context detached from s.ctx's cancellation so that +// queued messages can still be delivered during shutdown as long as the +// underlying connection is alive. +func (s *tunnelLogger) handle(entry logEntry) { + if entry.flush != nil { + close(entry.flush) + return + } + + ctx, cancel := context.WithTimeout(context.WithoutCancel(s.ctx), 5*time.Second) + _, _ = s.client.Log(ctx, entry.msg) + // ignore error since we can't use the logger itself + cancel() +}