Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions file_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ import (
"time"
)

// Special file paths that receive special handling by FileSink.
const (
stdout = "/dev/stdout"
stderr = "/dev/stderr"
devnull = "/dev/null"
// FileStdout represents the standard output stream path for FileSink.
FileStdout = "/dev/stdout"

// FileStderr represents the standard error stream path for FileSink.
FileStderr = "/dev/stderr"

// FileDevNull represents the null device path where FileSink discards data.
FileDevNull = "/dev/null"
)

// FileSink writes the []byte representation of an Event to a file
Expand Down Expand Up @@ -81,9 +87,15 @@ func (*FileSink) Type() NodeType {

// Process writes the []byte representation of an Event to a file
// as a string.
func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) {
func (fs *FileSink) Process(ctx context.Context, e *Event) (*Event, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

// '/dev/null' should just return success
if fs.Path == devnull {
if fs.Path == FileDevNull {
return nil, nil
}

Expand All @@ -101,11 +113,18 @@ func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) {
fs.l.Lock()
defer fs.l.Unlock()

// Check context again after acquiring the lock.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

var writer io.Writer
switch fs.Path {
case stdout:
case FileStdout:
writer = os.Stdout
case stderr:
case FileStderr:
writer = os.Stderr
default:
if fs.f == nil {
Expand Down Expand Up @@ -146,7 +165,7 @@ func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error) {
// handle obtaining the relevant lock on the struct.
func (fs *FileSink) reopen() error {
switch fs.Path {
case stdout, stderr, devnull:
case FileStdout, FileStderr, FileDevNull:
return nil
}

Expand Down Expand Up @@ -176,7 +195,7 @@ func (fs *FileSink) reopen() error {
// Reopen will close, rotate and reopen the Sink's file.
func (fs *FileSink) Reopen() error {
switch fs.Path {
case stdout, stderr, devnull:
case FileStdout, FileStderr, FileDevNull:
return nil
}

Expand All @@ -194,7 +213,7 @@ func (fs *FileSink) Name() string {
func (fs *FileSink) open() error {
// Return early if the file is open, or we're using a special path.
switch fs.Path {
case devnull, stdout, stderr:
case FileDevNull, FileStdout, FileStderr:
return nil
default:
if fs.f != nil {
Expand Down Expand Up @@ -241,7 +260,7 @@ func (fs *FileSink) open() error {

func (fs *FileSink) rotate() error {
switch fs.Path {
case stdout, stderr, devnull:
case FileStdout, FileStderr, FileDevNull:
return nil
}

Expand Down Expand Up @@ -279,7 +298,7 @@ func (fs *FileSink) rotate() error {

func (fs *FileSink) pruneFiles() error {
switch {
case fs.Path == stdout, fs.Path == stderr, fs.Path == devnull:
case fs.Path == FileStdout, fs.Path == FileStderr, fs.Path == FileDevNull:
return nil
case fs.MaxFiles == 0:
return nil
Expand Down
100 changes: 96 additions & 4 deletions file_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ package eventlogger
import (
"bytes"
"context"
"github.com/stretchr/testify/require"
"os"
"path/filepath"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestFileSink_NewDir(t *testing.T) {
Expand Down Expand Up @@ -51,13 +52,13 @@ func TestFileSink_Reopen(t *testing.T) {
IsFile bool
}{
"stdout": {
Path: stdout,
Path: FileStdout,
},
"stderr": {
Path: stderr,
Path: FileStderr,
},
"dev/null": {
Path: devnull,
Path: FileDevNull,
},
"default-file": {
IsFile: true,
Expand Down Expand Up @@ -358,6 +359,7 @@ func TestFileSink_pruneFiles(t *testing.T) {
t.Errorf("Expected %d files, got %d", want, got)
}
}

func TestFileSink_FileMode(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -407,3 +409,93 @@ func TestFileSink_DirMode(t *testing.T) {
t.Errorf("Expected file mode %q, got %q", parentDirMode.Perm(), actualDirMode.Perm())
}
}

func TestFileSink_ContextCancellation(t *testing.T) {
t.Parallel()

tests := map[string]struct {
path string
}{
"regular-file-path": {
path: t.TempDir(),
},
"stdout": {
path: FileStdout,
},
"stderr": {
path: FileStderr,
},
"devnull": {
path: FileDevNull,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

fs := &FileSink{
Path: tc.path,
FileName: "sink.log",
}

// Create and immediately cancel the context.
ctx, cancel := context.WithCancel(context.Background())
cancel()

event := &Event{
Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"test data"}`)},
Payload: "test",
}

// Process should return context error immediately.
_, err := fs.Process(ctx, event)
require.Error(t, err)
require.Equal(t, context.Canceled, err)
})
}
}

func TestFileSink_ContextCancellationBetweenWrites(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()
fs := &FileSink{
Path: tmpDir,
FileName: "sink.log",
}

// Create a context that we'll cancel between writes.
ctx, cancel := context.WithCancel(context.Background())

// Process one event successfully.
event := &Event{
Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"first event"}`)},
Payload: "first event",
}
_, err := fs.Process(ctx, event)
require.NoError(t, err)

// Verify the first event was processed and written to the sink.
filePath := filepath.Join(tmpDir, "sink.log")
content, err := os.ReadFile(filePath)
require.NoError(t, err)
require.Equal(t, `{"msg":"first event"}`, string(content))

// Cancel the context and next time we process an event,
// it should fail with context error.
cancel()

event2 := &Event{
Formatted: map[string][]byte{JSONFormat: []byte(`{"msg":"second event"}`)},
Payload: "second event",
}
_, err = fs.Process(ctx, event2)
require.Error(t, err)
require.Equal(t, context.Canceled, err)

// Verify the file still only contains the first event (second write didn't happen).
contentAfter, err := os.ReadFile(filePath)
require.NoError(t, err)
require.Equal(t, `{"msg":"first event"}`, string(contentAfter))
}