Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/agent/container/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ func (cmd *SetupContainerCmd) Run(ctx context.Context) error {
return err
}

// The tunnel logger forwards messages over the tunnel asynchronously, so
// flush before returning to make sure the final log lines (e.g. the last
// stderr output of a failing lifecycle hook) reach the client before the
// connection is torn down. ctx is still alive while this defer runs.
if f, ok := logger.(tunnelserver.Flusher); ok {
defer f.Flush()
}

workspaceInfo, setupInfo, err := cmd.parseWorkspaceAndSetupInfo(logger)
if err != nil {
return err
Expand Down
40 changes: 40 additions & 0 deletions e2e/tests/up/handles_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,46 @@ var _ = ginkgo.Describe(
ginkgo.SpecTimeout(framework.GetTimeout()),
)

ginkgo.It(
"forwards the final lifecycle hook log line before the agent exits",
func(ctx context.Context) {
f, err := setupDockerProvider(initialDir+"/bin", "docker")
framework.ExpectNoError(err)
ginkgo.DeferCleanup(func(cleanupCtx context.Context) {
_ = f.DevPodProviderDelete(cleanupCtx, "docker")
})

tempDir, err := framework.CopyToTempDir(
"tests/up/testdata/docker-lifecycle-stderr",
)
framework.ExpectNoError(err)
ginkgo.DeferCleanup(framework.CleanupTempDir, initialDir, tempDir)
ginkgo.DeferCleanup(func(cleanupCtx context.Context) {
_ = f.DevPodWorkspaceDelete(cleanupCtx, tempDir, "--force")
})

// The postCreateCommand runs a script that prints a burst of
// stderr lines ending with a marker, then exits non-zero. The
// agent forwards lifecycle hook output over the tunnel
// asynchronously; the burst keeps the sender busy so the final
// marker is still queued when the hook fails, and the agent
// must flush it before tearing down. Without the flush the
// marker is dropped. The marker lives in the script (not the
// inline command) so it cannot leak into devpod's "failed to
// run <command>" error and cause a false positive. Regression
// guard for the dropped-last-line bug.
stdout, stderr, err := f.DevPodUpStreams(ctx, tempDir, "--log-output=json")
framework.ExpectError(err, "expected lifecycle hook failure")
framework.ExpectNoError(
findMessage(
strings.NewReader(stdout+stderr),
"DEVPOD_LIFECYCLE_FLUSH_MARKER",
),
)
},
ginkgo.SpecTimeout(framework.GetTimeout()),
)

ginkgo.It(
"ensure workspace cleanup when failing to create a workspace",
func(ctx context.Context) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"name": "lifecycle-stderr",
"image": "mcr.microsoft.com/devcontainers/go:1",
"postCreateCommand": "sh .devcontainer/postCreateCommand.sh"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/sh
# Regression fixture for the dropped-last-line bug (see the e2e spec
# "forwards the final lifecycle hook log line before the agent exits").
#
# Emit a burst of stderr lines ending with a marker, then fail. The agent
# forwards lifecycle hook output over the tunnel asynchronously; the burst
# keeps the sender busy so the marker is still queued when the hook exits
# non-zero. Without the flush-on-shutdown fix the agent tears down before the
# marker is sent and it is dropped (the observed drain window is only ~11
# lines, so the 300-line burst leaves a wide margin).
#
# The marker lives in this script rather than the inline postCreateCommand so
# it cannot leak into devpod's "failed to run <command>" error message and
# produce a false positive.
i=0
while [ "$i" -lt 300 ]; do
echo "filler line $i" >&2
i=$((i + 1))
done
echo DEVPOD_LIFECYCLE_FLUSH_MARKER >&2
exit 1
162 changes: 106 additions & 56 deletions pkg/agent/tunnelserver/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,93 @@ func NewTunnelLogger(ctx context.Context, client tunnel.TunnelClient, debug bool
ctx: ctx,
client: client,
level: level,
logChan: make(chan *tunnel.LogMessage, 1000), // Buffer size of 1000 messages
logChan: make(chan logEntry, 1000), // Buffer size of 1000 messages
Comment thread
coderabbitai[bot] marked this conversation as resolved.
done: make(chan struct{}),
}

go logger.worker()

return logger
}

// Flusher is implemented by loggers that buffer output and can block until
// everything queued so far has been delivered. NewTunnelLogger returns a value
// satisfying this interface.
type Flusher interface {
Flush()
}

var _ Flusher = (*tunnelLogger)(nil)

// logEntry is what flows through the logger channel. It is either a log
// message to forward over the tunnel, or a flush request whose ack channel is
// closed once all preceding messages have been sent.
type logEntry struct {
msg *tunnel.LogMessage
flush chan struct{}
}

type tunnelLogger struct {
ctx context.Context
level logrus.Level
client tunnel.TunnelClient
logChan chan *tunnel.LogMessage
logChan chan logEntry
done chan struct{} // closed when worker() exits
fields logrus.Fields
}

func (s *tunnelLogger) worker() {
defer close(s.done)
for {
select {
case msg := <-s.logChan:
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
_, _ = s.client.Log(ctx, msg)
// ignore error since we can't use the logger itself
cancel()
case entry := <-s.logChan:
s.handle(entry)
case <-s.ctx.Done():
return
// Best-effort drain so the final lines aren't lost when the
// agent shuts down before the channel has been emptied.
for {
select {
case entry := <-s.logChan:
s.handle(entry)
default:
return
}
}
}
}
}

// flushTimeout caps the entire drain. handle() applies a per-message timeout,
// so without an overall bound a backlog on a stalled tunnel could block
// shutdown for buffer-size × per-message timeout. The cap is generous so a
// healthy drain of a full backlog always completes.
const flushTimeout = 30 * time.Second

// Flush blocks until all messages queued before the call have been forwarded
// over the tunnel, until the worker has stopped, or until flushTimeout elapses.
// It gates on worker completion rather than s.ctx so it stays a real barrier
// even when s.ctx is already cancelled (e.g. the deferred flush on the shutdown
// path, or Fatal/Fatalf).
func (s *tunnelLogger) Flush() {
ack := make(chan struct{})
timer := time.NewTimer(flushTimeout)
defer timer.Stop()

select {
case s.logChan <- logEntry{flush: ack}:
case <-s.done:
return
case <-timer.C:
return
}

select {
case <-ack:
case <-s.done:
case <-timer.C:
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// formatMessage appends structured fields to the message.
func (s *tunnelLogger) formatMessage(message string) string {
if len(s.fields) == 0 {
Expand All @@ -75,99 +132,74 @@ func (s *tunnelLogger) Debug(args ...any) {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_DEBUG,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_DEBUG, fmt.Sprintln(args...))
}

func (s *tunnelLogger) Debugf(format string, args ...any) {
if s.level < logrus.DebugLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_DEBUG,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_DEBUG, fmt.Sprintf(format, args...)+"\n")
}

func (s *tunnelLogger) Info(args ...any) {
if s.level < logrus.InfoLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_INFO,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_INFO, fmt.Sprintln(args...))
}

func (s *tunnelLogger) Infof(format string, args ...any) {
if s.level < logrus.InfoLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_INFO,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_INFO, fmt.Sprintf(format, args...)+"\n")
}

func (s *tunnelLogger) Warn(args ...any) {
if s.level < logrus.WarnLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_WARNING,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_WARNING, fmt.Sprintln(args...))
}

func (s *tunnelLogger) Warnf(format string, args ...any) {
if s.level < logrus.WarnLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_WARNING,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_WARNING, fmt.Sprintf(format, args...)+"\n")
}

func (s *tunnelLogger) Error(args ...any) {
if s.level < logrus.ErrorLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_ERROR,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintln(args...))
}

func (s *tunnelLogger) Errorf(format string, args ...any) {
if s.level < logrus.ErrorLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_ERROR,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintf(format, args...)+"\n")
}

func (s *tunnelLogger) Fatal(args ...any) {
if s.level < logrus.FatalLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_ERROR,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintln(args...))

// make sure the message is delivered before we exit
s.Flush()
os.Exit(1)
}

Expand All @@ -176,11 +208,10 @@ func (s *tunnelLogger) Fatalf(format string, args ...any) {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_ERROR,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_ERROR, fmt.Sprintf(format, args...)+"\n")

// make sure the message is delivered before we exit
s.Flush()
os.Exit(1)
}

Expand All @@ -189,21 +220,15 @@ func (s *tunnelLogger) Done(args ...any) {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_DONE,
Message: s.formatMessage(fmt.Sprintln(args...)),
}
s.enqueue(tunnel.LogLevel_DONE, fmt.Sprintln(args...))
}

func (s *tunnelLogger) Donef(format string, args ...any) {
if s.level < logrus.InfoLevel {
return
}

s.logChan <- &tunnel.LogMessage{
LogLevel: tunnel.LogLevel_DONE,
Message: s.formatMessage(fmt.Sprintf(format, args...) + "\n"),
}
s.enqueue(tunnel.LogLevel_DONE, fmt.Sprintf(format, args...)+"\n")
}

func (s *tunnelLogger) Print(level logrus.Level, args ...any) {
Expand Down Expand Up @@ -308,10 +333,35 @@ func (s *tunnelLogger) WithFields(fields logrus.Fields) log.Logger {
client: s.client,
level: s.level,
logChan: s.logChan,
done: s.done,
fields: newFields,
}
}

func (*tunnelLogger) LogrLogSink() logr.LogSink {
return nil
}

// enqueue formats and queues a message for the worker to forward.
func (s *tunnelLogger) enqueue(level tunnel.LogLevel, message string) {
s.logChan <- logEntry{msg: &tunnel.LogMessage{
LogLevel: level,
Message: s.formatMessage(message),
}}
}

// handle forwards a single message over the tunnel, or acknowledges a flush
// request. Sends use a context detached from s.ctx's cancellation so that
// queued messages can still be delivered during shutdown as long as the
// underlying connection is alive.
func (s *tunnelLogger) handle(entry logEntry) {
if entry.flush != nil {
close(entry.flush)
return
}

ctx, cancel := context.WithTimeout(context.WithoutCancel(s.ctx), 5*time.Second)
_, _ = s.client.Log(ctx, entry.msg)
// ignore error since we can't use the logger itself
cancel()
}
Loading