From c955db951b4cc0fdd89eee16227119c327a605e1 Mon Sep 17 00:00:00 2001 From: javi11 Date: Mon, 11 May 2026 09:16:13 +0200 Subject: [PATCH] fix(poster): retry on stale-connection i/o timeouts, not just broken pipes The poster's in-function retry only matched broken-pipe / connection-reset errors. When the upstream NNTP server silently closes an idle pooled socket faster than the pool's IdleTimeout, the next reuse can fail with a read "i/o timeout" instead. Those errors bypassed the retry and exhausted the outer 5-retry budget, surfacing as repeated "after 5 retries" failures after upgrading nntppool. Rename isBrokenPipe to isStaleConnError and broaden it to also match wrapped net.Error timeouts and the "i/o timeout" string. Bump the in-function retry from 2 to 3 attempts so a second stale pick after a long throttle pause is also tolerated. The postCtx DeadlineExceeded short-circuit still guards against retrying on real envelope expiry. --- internal/poster/poster.go | 48 +++++++++++----- internal/poster/poster_test.go | 4 +- internal/poster/stale_conn_test.go | 90 ++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 15 deletions(-) create mode 100644 internal/poster/stale_conn_test.go diff --git a/internal/poster/poster.go b/internal/poster/poster.go index 0ce2a5f..eb23b6e 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -8,6 +8,7 @@ import ( "fmt" "log/slog" "math/rand" + "net" "os" "path/filepath" "strings" @@ -1090,15 +1091,20 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article, postCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() - // Retry once on broken pipe: the pool may have returned a stale connection that - // the server silently closed. After the broken pipe error, the pool discards that - // connection; the retry picks a fresh one. bytes.NewReader(body) is cheap to - // recreate so the body is fully re-readable on each attempt. + // Retry on stale pooled connection: the pool may hand back a connection the + // server silently closed (broken pipe / connection reset) or that has gone + // half-open and stops responding (read i/o timeout). After such errors the + // pool discards the bad connection; the retry picks a fresh one. + // bytes.NewReader(body) is cheap to recreate so the body is fully re-readable + // on each attempt. Cap at 3 attempts to avoid masking real server problems + // while tolerating a second stale pick after a long throttle pause. var lastErr error - for attempt := range 2 { + for attempt := range 3 { if attempt > 0 { - slog.WarnContext(ctx, "Retrying article post after broken pipe (stale pooled connection)", - "messageID", art.MessageID) + slog.WarnContext(ctx, "Retrying article post after stale pooled connection", + "messageID", art.MessageID, + "attempt", attempt, + "prevErr", lastErr.Error()) } _, lastErr = p.uploadPool.PostYenc(postCtx, headers, bytes.NewReader(body), meta) @@ -1110,7 +1116,7 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article, return context.Canceled } - if !isBrokenPipe(lastErr) { + if !isStaleConnError(lastErr) { break } } @@ -1133,15 +1139,31 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article, return nil } -// isBrokenPipe reports whether err represents a broken-pipe / connection-reset -// condition, indicating that the remote server closed the TCP connection before -// the client finished writing (typically a stale idle pooled connection). -func isBrokenPipe(err error) bool { +// isStaleConnError reports whether err looks like a stale pooled connection: +// the remote server silently closed the TCP socket (broken pipe / connection +// reset) or the connection went half-open and the read deadline fired before +// any response arrived (i/o timeout). In all of these cases the pool discards +// the bad connection, so retrying picks a fresh one. +// +// True context cancellation / deadline exceeded on the caller's postCtx is +// short-circuited at the call site before this is invoked, so any net.Error +// timeout reaching here is a per-socket read deadline rather than the +// per-article envelope. +func isStaleConnError(err error) bool { + if err == nil { + return false + } if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) { return true } + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + return true + } msg := err.Error() - return strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset by peer") + return strings.Contains(msg, "broken pipe") || + strings.Contains(msg, "connection reset by peer") || + strings.Contains(msg, "i/o timeout") } // checkArticle checks if an article exists using the check pool diff --git a/internal/poster/poster_test.go b/internal/poster/poster_test.go index e184388..77b8231 100644 --- a/internal/poster/poster_test.go +++ b/internal/poster/poster_test.go @@ -1251,7 +1251,7 @@ func TestPostLoop_Basic(t *testing.T) { assert.Contains(t, err.Error(), "error posting article") }) - t.Run("postArticleWithBody fails when both attempts return broken pipe", func(t *testing.T) { + t.Run("postArticleWithBody fails when all attempts return broken pipe", func(t *testing.T) { ctx := context.Background() content := "test content" @@ -1260,7 +1260,7 @@ func TestPostLoop_Basic(t *testing.T) { mockPool := mocks.NewMockNNTPClient(ctrl) mockPool.EXPECT().PostYenc(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, fmt.Errorf("write tcp: %w", syscall.EPIPE)).Times(2) + Return(nil, fmt.Errorf("write tcp: %w", syscall.EPIPE)).Times(3) p := &poster{ uploadPool: mockPool, diff --git a/internal/poster/stale_conn_test.go b/internal/poster/stale_conn_test.go new file mode 100644 index 0000000..bf9cb50 --- /dev/null +++ b/internal/poster/stale_conn_test.go @@ -0,0 +1,90 @@ +package poster + +import ( + "errors" + "fmt" + "io" + "net" + "syscall" + "testing" + "time" +) + +// fakeTimeoutErr satisfies net.Error with Timeout()==true, like a wrapped +// os.SyscallError from a TCP read deadline. +type fakeTimeoutErr struct{} + +func (fakeTimeoutErr) Error() string { return "fake: deadline exceeded" } +func (fakeTimeoutErr) Timeout() bool { return true } +func (fakeTimeoutErr) Temporary() bool { return true } + +type fakeNonTimeoutNetErr struct{} + +func (fakeNonTimeoutNetErr) Error() string { return "fake: refused" } +func (fakeNonTimeoutNetErr) Timeout() bool { return false } +func (fakeNonTimeoutNetErr) Temporary() bool { return false } + +func TestIsStaleConnError(t *testing.T) { + t.Parallel() + + realTimeout := &net.OpError{ + Op: "read", + Net: "tcp", + Err: fakeTimeoutErr{}, + } + + tests := []struct { + name string + err error + want bool + }{ + {name: "nil", err: nil, want: false}, + {name: "EPIPE", err: syscall.EPIPE, want: true}, + {name: "ECONNRESET", err: syscall.ECONNRESET, want: true}, + {name: "wrapped EPIPE", err: fmt.Errorf("post: %w", syscall.EPIPE), want: true}, + {name: "wrapped net.Error timeout", err: fmt.Errorf("nntp: %w", realTimeout), want: true}, + {name: "string broken pipe", err: errors.New("write: broken pipe"), want: true}, + {name: "string connection reset", err: errors.New("read: connection reset by peer"), want: true}, + {name: "string i/o timeout", err: errors.New("read tcp 1.2.3.4:42->5.6.7.8:563: i/o timeout"), want: true}, + {name: "net.Error non-timeout", err: fakeNonTimeoutNetErr{}, want: false}, + {name: "io.EOF", err: io.EOF, want: false}, + {name: "generic error", err: errors.New("posting forbidden"), want: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if got := isStaleConnError(tt.err); got != tt.want { + t.Errorf("isStaleConnError(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +// Ensure a real net.Error from a deadline behaves as expected (defense in +// depth against future stdlib changes to error wrapping). +func TestIsStaleConnError_RealDeadline(t *testing.T) { + t.Parallel() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Skipf("cannot listen: %v", err) + } + defer ln.Close() + + conn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Skipf("cannot dial: %v", err) + } + defer conn.Close() + + _ = conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) + buf := make([]byte, 1) + _, readErr := conn.Read(buf) + if readErr == nil { + t.Fatalf("expected read deadline error, got nil") + } + if !isStaleConnError(readErr) { + t.Errorf("isStaleConnError(%v) = false, want true", readErr) + } +}