From 80a4a2dc3943fd8aaf8ec4d0b46d328baeb0648a Mon Sep 17 00:00:00 2001 From: bb7133 Date: Thu, 4 Jun 2026 14:19:21 -0700 Subject: [PATCH] log: support discard action on write timeout --- config.go | 10 ++++++++++ log.go | 56 +++++++++++++++++++++++++++++++++++++++++++---------- log_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 10 deletions(-) diff --git a/config.go b/config.go index c802295..d44020d 100644 --- a/config.go +++ b/config.go @@ -25,6 +25,13 @@ const ( defaultLogMaxSize = 300 // MB ) +const ( + // LogTimeoutActionPanic panics when log write/sync is stuck. + LogTimeoutActionPanic = "panic" + // LogTimeoutActionDiscard discards subsequent log write/sync operations when log write/sync is stuck. + LogTimeoutActionDiscard = "discard" +) + // FileLogConfig serializes file log related config in toml/json. type FileLogConfig struct { // Log filename, leave empty to disable file log. @@ -82,6 +89,9 @@ type Config struct { // Timeout for writing log, if TiDB hang on writing log, make it panic. // The value is seconds, 0 means no timeout Timeout int `toml:"timeout" json:"timeout"` + // TimeoutAction controls the action when log write/sync timeout occurs. + // Valid values: panic, discard. Empty means panic. + TimeoutAction string `toml:"timeout-action" json:"timeout-action"` } // ZapProperties records some information about zap. diff --git a/log.go b/log.go index 00db327..d4e597e 100644 --- a/log.go +++ b/log.go @@ -109,8 +109,8 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce return nil, nil, err } if cfg.Timeout > 0 { - output = LockWithTimeout(output, cfg.Timeout) - errOutput = LockWithTimeout(errOutput, cfg.Timeout) + output = LockWithTimeoutAction(output, cfg.Timeout, cfg.TimeoutAction) + errOutput = LockWithTimeoutAction(errOutput, cfg.Timeout, cfg.TimeoutAction) } core := NewTextCore(encoder, output, level) @@ -128,27 +128,55 @@ func InitLoggerWithWriteSyncer(cfg *Config, output, errOutput zapcore.WriteSynce // LockWithTimeout wraps a WriteSyncer make it safe for concurrent use, just like zapcore.Lock() // timeout seconds. func LockWithTimeout(ws zapcore.WriteSyncer, timeout int) zapcore.WriteSyncer { + return LockWithTimeoutAction(ws, timeout, LogTimeoutActionPanic) +} + +// LockWithTimeoutAction wraps a WriteSyncer and controls the action when log write/sync is stuck. +func LockWithTimeoutAction(ws zapcore.WriteSyncer, timeout int, action string) zapcore.WriteSyncer { r := &lockWithTimeoutWrapper{ - ws: ws, - lock: make(chan struct{}, 1), - t: time.NewTicker(time.Second), - timeout: timeout, + ws: ws, + lock: make(chan struct{}, 1), + t: time.NewTicker(time.Second), + timeout: timeout, + timeoutAction: normalizeTimeoutAction(action), } return r } type lockWithTimeoutWrapper struct { - ws zapcore.WriteSyncer - lock chan struct{} - t *time.Ticker - timeout int + ws zapcore.WriteSyncer + lock chan struct{} + t *time.Ticker + timeout int + timeoutAction string + timedOut atomic.Bool +} + +func normalizeTimeoutAction(action string) string { + switch action { + case LogTimeoutActionDiscard: + return LogTimeoutActionDiscard + default: + return LogTimeoutActionPanic + } } // getLockOrBlock returns true when get lock success, and false otherwise. func (s *lockWithTimeoutWrapper) getLockOrBlock() bool { + if s.timeoutAction == LogTimeoutActionDiscard && s.timedOut.Load() { + select { + case s.lock <- struct{}{}: + s.timedOut.Store(false) + return true + default: + return false + } + } + for i := 0; i < s.timeout; { select { case s.lock <- struct{}{}: + s.timedOut.Store(false) return true case <-s.t.C: i++ @@ -164,6 +192,10 @@ func (s *lockWithTimeoutWrapper) unlock() { func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) { succ := s.getLockOrBlock() if !succ { + if s.timeoutAction == LogTimeoutActionDiscard { + s.timedOut.Store(true) + return len(bs), nil + } panic(fmt.Sprintf("Timeout of %ds when trying to write log", s.timeout)) } defer s.unlock() @@ -174,6 +206,10 @@ func (s *lockWithTimeoutWrapper) Write(bs []byte) (int, error) { func (s *lockWithTimeoutWrapper) Sync() error { succ := s.getLockOrBlock() if !succ { + if s.timeoutAction == LogTimeoutActionDiscard { + s.timedOut.Store(true) + return nil + } panic(fmt.Sprintf("Timeout of %ds when trying to sync log", s.timeout)) } defer s.unlock() diff --git a/log_test.go b/log_test.go index 198dea8..995d941 100644 --- a/log_test.go +++ b/log_test.go @@ -19,6 +19,7 @@ import ( "bytes" "net/url" "testing" + "time" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -149,9 +150,64 @@ func TestTimeout(t *testing.T) { <-panicCh } +func TestTimeoutActionDiscard(t *testing.T) { + blocking := newBlockingOnceWriteSyncer() + ws := LockWithTimeoutAction(blocking, 1, LogTimeoutActionDiscard) + + done := make(chan struct{}) + go func() { + _, _ = ws.Write([]byte("first")) + close(done) + }() + + <-blocking.started + + _, err := ws.Write([]byte("discarded")) + require.NoError(t, err) + + start := time.Now() + _, err = ws.Write([]byte("discarded-fast")) + require.NoError(t, err) + require.Less(t, time.Since(start), 500*time.Millisecond) + + close(blocking.release) + <-done + + _, err = ws.Write([]byte("written")) + require.NoError(t, err) + require.Contains(t, blocking.String(), "written") +} + type hang struct{} func (_ hang) Write(_ []byte) (int, error) { <-make(chan struct{}) // block forever return 0, nil } + +type blockingOnceWriteSyncer struct { + bytes.Buffer + started chan struct{} + release chan struct{} + writes int +} + +func newBlockingOnceWriteSyncer() *blockingOnceWriteSyncer { + return &blockingOnceWriteSyncer{ + started: make(chan struct{}), + release: make(chan struct{}), + } +} + +func (s *blockingOnceWriteSyncer) Write(bs []byte) (int, error) { + s.writes++ + if s.writes == 1 { + close(s.started) + <-s.release + } + return s.Buffer.Write(bs) +} + +func (*blockingOnceWriteSyncer) Sync() error { + return nil +}