diff --git a/.changes/unreleased/Bugfix-20260421-102623.yaml b/.changes/unreleased/Bugfix-20260421-102623.yaml new file mode 100644 index 0000000..0e3c3e5 --- /dev/null +++ b/.changes/unreleased/Bugfix-20260421-102623.yaml @@ -0,0 +1,3 @@ +kind: Bugfix +body: Fix handling of partial log lines. +time: 2026-04-21T10:26:23.397872-04:00 diff --git a/src/pkg/logs.go b/src/pkg/logs.go index 52c16ff..98df7a8 100644 --- a/src/pkg/logs.go +++ b/src/pkg/logs.go @@ -40,6 +40,31 @@ func (s *LogStreamer) AddProcessor(processor LogProcessor) { s.processors = append(s.processors, processor) } +type logStream struct { + buf *SafeBuffer + fn func(LogProcessor, string) string +} + +func (s *LogStreamer) streams() []logStream { + return []logStream{ + {s.Stderr, LogProcessor.ProcessStderr}, + {s.Stdout, LogProcessor.ProcessStdout}, + } +} + +func (s *LogStreamer) processLine(stream logStream) { + line, _ := stream.buf.ReadString('\n') + if line == "" { + return + } + line = strings.TrimSuffix(line, "\n") + for _, processor := range s.processors { + line = stream.fn(processor, line) + } + s.logBuffer.Value = line + s.logBuffer = s.logBuffer.Next() +} + func (s *LogStreamer) GetLogBuffer() []string { output := make([]string, 0) s.logBuffer.Do(func(line any) { @@ -63,26 +88,9 @@ func (s *LogStreamer) Run(ctx context.Context) { s.logger.Trace().Msg("Shutting down log streamer ...") return case <-ticker.C: - for len(s.Stderr.String()) > 0 { - line, err := s.Stderr.ReadString('\n') - if err == nil { - line = strings.TrimSuffix(line, "\n") - for _, processor := range s.processors { - line = processor.ProcessStderr(line) - } - s.logBuffer.Value = line - s.logBuffer = s.logBuffer.Next() - } - } - for len(s.Stdout.String()) > 0 { - line, err := s.Stdout.ReadString('\n') - if err == nil { - line = strings.TrimSuffix(line, "\n") - for _, processor := range s.processors { - line = processor.ProcessStdout(line) - } - s.logBuffer.Value = line - s.logBuffer = s.logBuffer.Next() + for _, stream := range s.streams() { + for strings.Contains(stream.buf.String(), "\n") { + s.processLine(stream) } } } @@ -94,7 +102,7 @@ func (s *LogStreamer) Flush(outcome JobOutcome) { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() timeout := time.After(30 * time.Second) - for len(s.Stderr.String()) > 0 || len(s.Stdout.String()) > 0 { + for strings.Contains(s.Stderr.String(), "\n") || strings.Contains(s.Stdout.String(), "\n") { select { case <-ticker.C: // Continue waiting @@ -107,6 +115,10 @@ done: s.logger.Trace().Msg("Finished log streamer flush ...") s.quit <- true time.Sleep(200 * time.Millisecond) // Allow 'Run' goroutine to quit + // Drain any partial line that never received a terminating newline. + for _, stream := range s.streams() { + s.processLine(stream) + } s.logger.Trace().Msg("Flushing log processors ...") for i := len(s.processors) - 1; i >= 0; i-- { s.processors[i].Flush(outcome) diff --git a/src/pkg/logs_test.go b/src/pkg/logs_test.go new file mode 100644 index 0000000..8025235 --- /dev/null +++ b/src/pkg/logs_test.go @@ -0,0 +1,66 @@ +package pkg + +import ( + "context" + "testing" + "time" + + "github.com/rocktavious/autopilot/v2023" + "github.com/rs/zerolog" +) + +type captureProcessor struct { + lines []string +} + +func (c *captureProcessor) ProcessStdout(line string) string { + c.lines = append(c.lines, line) + return line +} + +func (c *captureProcessor) ProcessStderr(line string) string { + c.lines = append(c.lines, line) + return line +} + +func (c *captureProcessor) Flush(_ JobOutcome) {} + +func TestLogStreamerPartialLineStdout(t *testing.T) { + cap := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), cap) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + // Write first half — no newline yet. + _, _ = s.Stdout.Write([]byte("par")) + time.Sleep(100 * time.Millisecond) + + // Complete the line, then write a trailing partial with no newline. + _, _ = s.Stdout.Write([]byte("tial\ntrailing-no-newline")) + time.Sleep(100 * time.Millisecond) + + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, cap.lines) +} + +func TestLogStreamerPartialLineStderr(t *testing.T) { + cap := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), cap) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + _, _ = s.Stderr.Write([]byte("par")) + time.Sleep(100 * time.Millisecond) + + _, _ = s.Stderr.Write([]byte("tial\ntrailing-no-newline")) + time.Sleep(100 * time.Millisecond) + + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, cap.lines) +}