diff --git a/file_sink.go b/file_sink.go index 610d6a3..bb347de 100644 --- a/file_sink.go +++ b/file_sink.go @@ -18,10 +18,16 @@ import ( "time" ) +// Special file paths that receive special handling by FileSink. const ( - stdout = "/dev/stdout" - stderr = "/dev/stderr" - devnull = "/dev/null" + // FileStdout represents the standard output stream path for FileSink. + FileStdout = "/dev/stdout" + + // FileStderr represents the standard error stream path for FileSink. + FileStderr = "/dev/stderr" + + // FileDevNull represents the null device path where FileSink discards data. + FileDevNull = "/dev/null" ) // FileSink writes the []byte representation of an Event to a file @@ -81,9 +87,15 @@ func (*FileSink) Type() NodeType { // Process writes the []byte representation of an Event to a file // as a string. -func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) { +func (fs *FileSink) Process(ctx context.Context, e *Event) (*Event, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // '/dev/null' should just return success - if fs.Path == devnull { + if fs.Path == FileDevNull { return nil, nil } @@ -101,11 +113,18 @@ func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) { fs.l.Lock() defer fs.l.Unlock() + // Check context again after acquiring the lock. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + var writer io.Writer switch fs.Path { - case stdout: + case FileStdout: writer = os.Stdout - case stderr: + case FileStderr: writer = os.Stderr default: if fs.f == nil { @@ -146,7 +165,7 @@ func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) { // handle obtaining the relevant lock on the struct. func (fs *FileSink) reopen() error { switch fs.Path { - case stdout, stderr, devnull: + case FileStdout, FileStderr, FileDevNull: return nil } @@ -176,7 +195,7 @@ func (fs *FileSink) reopen() error { // Reopen will close, rotate and reopen the Sink's file. func (fs *FileSink) Reopen() error { switch fs.Path { - case stdout, stderr, devnull: + case FileStdout, FileStderr, FileDevNull: return nil } @@ -194,7 +213,7 @@ func (fs *FileSink) Name() string { func (fs *FileSink) open() error { // Return early if the file is open, or we're using a special path. switch fs.Path { - case devnull, stdout, stderr: + case FileDevNull, FileStdout, FileStderr: return nil default: if fs.f != nil { @@ -241,7 +260,7 @@ func (fs *FileSink) open() error { func (fs *FileSink) rotate() error { switch fs.Path { - case stdout, stderr, devnull: + case FileStdout, FileStderr, FileDevNull: return nil } @@ -279,7 +298,7 @@ func (fs *FileSink) rotate() error { func (fs *FileSink) pruneFiles() error { switch { - case fs.Path == stdout, fs.Path == stderr, fs.Path == devnull: + case fs.Path == FileStdout, fs.Path == FileStderr, fs.Path == FileDevNull: return nil case fs.MaxFiles == 0: return nil diff --git a/file_sink_test.go b/file_sink_test.go index b79f86e..9a5c2f2 100644 --- a/file_sink_test.go +++ b/file_sink_test.go @@ -6,12 +6,13 @@ package eventlogger import ( "bytes" "context" - "github.com/stretchr/testify/require" "os" "path/filepath" "reflect" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestFileSink_NewDir(t *testing.T) { @@ -51,13 +52,13 @@ func TestFileSink_Reopen(t *testing.T) { IsFile bool }{ "stdout": { - Path: stdout, + Path: FileStdout, }, "stderr": { - Path: stderr, + Path: FileStderr, }, "dev/null": { - Path: devnull, + Path: FileDevNull, }, "default-file": { IsFile: true, @@ -358,6 +359,7 @@ func TestFileSink_pruneFiles(t *testing.T) { t.Errorf("Expected %d files, got %d", want, got) } } + func TestFileSink_FileMode(t *testing.T) { t.Parallel() @@ -407,3 +409,93 @@ func TestFileSink_DirMode(t *testing.T) { t.Errorf("Expected file mode %q, got %q", parentDirMode.Perm(), actualDirMode.Perm()) } } + +func TestFileSink_ContextCancellation(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + path string + }{ + "regular-file-path": { + path: t.TempDir(), + }, + "stdout": { + path: FileStdout, + }, + "stderr": { + path: FileStderr, + }, + "devnull": { + path: FileDevNull, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + fs := &FileSink{ + Path: tc.path, + FileName: "sink.log", + } + + // Create and immediately cancel the context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + event := &Event{ + Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"test data"}`)}, + Payload: "test", + } + + // Process should return context error immediately. + _, err := fs.Process(ctx, event) + require.Error(t, err) + require.Equal(t, context.Canceled, err) + }) + } +} + +func TestFileSink_ContextCancellationBetweenWrites(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + fs := &FileSink{ + Path: tmpDir, + FileName: "sink.log", + } + + // Create a context that we'll cancel between writes. + ctx, cancel := context.WithCancel(context.Background()) + + // Process one event successfully. + event := &Event{ + Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"first event"}`)}, + Payload: "first event", + } + _, err := fs.Process(ctx, event) + require.NoError(t, err) + + // Verify the first event was processed and written to the sink. + filePath := filepath.Join(tmpDir, "sink.log") + content, err := os.ReadFile(filePath) + require.NoError(t, err) + require.Equal(t, `{"msg":"first event"}`, string(content)) + + // Cancel the context and next time we process an event, + // it should fail with context error. + cancel() + + event2 := &Event{ + Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"second event"}`)}, + Payload: "second event", + } + _, err = fs.Process(ctx, event2) + require.Error(t, err) + require.Equal(t, context.Canceled, err) + + // Verify the file still only contains the first event (second write didn't happen). + contentAfter, err := os.ReadFile(filePath) + require.NoError(t, err) + require.Equal(t, `{"msg":"first event"}`, string(contentAfter)) +}