Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions internal/poster/poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log/slog"
"math/rand"
"net"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1090,15 +1091,20 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article,
postCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

// Retry once on broken pipe: the pool may have returned a stale connection that
// the server silently closed. After the broken pipe error, the pool discards that
// connection; the retry picks a fresh one. bytes.NewReader(body) is cheap to
// recreate so the body is fully re-readable on each attempt.
// Retry on stale pooled connection: the pool may hand back a connection the
// server silently closed (broken pipe / connection reset) or that has gone
// half-open and stops responding (read i/o timeout). After such errors the
// pool discards the bad connection; the retry picks a fresh one.
// bytes.NewReader(body) is cheap to recreate so the body is fully re-readable
// on each attempt. Cap at 3 attempts to avoid masking real server problems
// while tolerating a second stale pick after a long throttle pause.
var lastErr error
for attempt := range 2 {
for attempt := range 3 {
if attempt > 0 {
slog.WarnContext(ctx, "Retrying article post after broken pipe (stale pooled connection)",
"messageID", art.MessageID)
slog.WarnContext(ctx, "Retrying article post after stale pooled connection",
"messageID", art.MessageID,
"attempt", attempt,
"prevErr", lastErr.Error())
}

_, lastErr = p.uploadPool.PostYenc(postCtx, headers, bytes.NewReader(body), meta)
Expand All @@ -1110,7 +1116,7 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article,
return context.Canceled
}

if !isBrokenPipe(lastErr) {
if !isStaleConnError(lastErr) {
break
}
}
Expand All @@ -1133,15 +1139,31 @@ func (p *poster) postArticleWithBody(ctx context.Context, art *article.Article,
return nil
}

// isBrokenPipe reports whether err represents a broken-pipe / connection-reset
// condition, indicating that the remote server closed the TCP connection before
// the client finished writing (typically a stale idle pooled connection).
func isBrokenPipe(err error) bool {
// isStaleConnError reports whether err looks like a stale pooled connection:
// the remote server silently closed the TCP socket (broken pipe / connection
// reset) or the connection went half-open and the read deadline fired before
// any response arrived (i/o timeout). In all of these cases the pool discards
// the bad connection, so retrying picks a fresh one.
//
// True context cancellation / deadline exceeded on the caller's postCtx is
// short-circuited at the call site before this is invoked, so any net.Error
// timeout reaching here is a per-socket read deadline rather than the
// per-article envelope.
func isStaleConnError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
return true
}
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return true
}
msg := err.Error()
return strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset by peer")
return strings.Contains(msg, "broken pipe") ||
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "i/o timeout")
}

// checkArticle checks if an article exists using the check pool
Expand Down
4 changes: 2 additions & 2 deletions internal/poster/poster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ func TestPostLoop_Basic(t *testing.T) {
assert.Contains(t, err.Error(), "error posting article")
})

t.Run("postArticleWithBody fails when both attempts return broken pipe", func(t *testing.T) {
t.Run("postArticleWithBody fails when all attempts return broken pipe", func(t *testing.T) {
ctx := context.Background()
content := "test content"

Expand All @@ -1260,7 +1260,7 @@ func TestPostLoop_Basic(t *testing.T) {

mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().PostYenc(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, fmt.Errorf("write tcp: %w", syscall.EPIPE)).Times(2)
Return(nil, fmt.Errorf("write tcp: %w", syscall.EPIPE)).Times(3)

p := &poster{
uploadPool: mockPool,
Expand Down
90 changes: 90 additions & 0 deletions internal/poster/stale_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package poster

import (
"errors"
"fmt"
"io"
"net"
"syscall"
"testing"
"time"
)

// fakeTimeoutErr satisfies net.Error with Timeout()==true, like a wrapped
// os.SyscallError from a TCP read deadline.
type fakeTimeoutErr struct{}

func (fakeTimeoutErr) Error() string { return "fake: deadline exceeded" }
func (fakeTimeoutErr) Timeout() bool { return true }
func (fakeTimeoutErr) Temporary() bool { return true }

type fakeNonTimeoutNetErr struct{}

func (fakeNonTimeoutNetErr) Error() string { return "fake: refused" }
func (fakeNonTimeoutNetErr) Timeout() bool { return false }
func (fakeNonTimeoutNetErr) Temporary() bool { return false }

func TestIsStaleConnError(t *testing.T) {
t.Parallel()

realTimeout := &net.OpError{
Op: "read",
Net: "tcp",
Err: fakeTimeoutErr{},
}

tests := []struct {
name string
err error
want bool
}{
{name: "nil", err: nil, want: false},
{name: "EPIPE", err: syscall.EPIPE, want: true},
{name: "ECONNRESET", err: syscall.ECONNRESET, want: true},
{name: "wrapped EPIPE", err: fmt.Errorf("post: %w", syscall.EPIPE), want: true},
{name: "wrapped net.Error timeout", err: fmt.Errorf("nntp: %w", realTimeout), want: true},
{name: "string broken pipe", err: errors.New("write: broken pipe"), want: true},
{name: "string connection reset", err: errors.New("read: connection reset by peer"), want: true},
{name: "string i/o timeout", err: errors.New("read tcp 1.2.3.4:42->5.6.7.8:563: i/o timeout"), want: true},
{name: "net.Error non-timeout", err: fakeNonTimeoutNetErr{}, want: false},
{name: "io.EOF", err: io.EOF, want: false},
{name: "generic error", err: errors.New("posting forbidden"), want: false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := isStaleConnError(tt.err); got != tt.want {
t.Errorf("isStaleConnError(%v) = %v, want %v", tt.err, got, tt.want)
}
})
}
}

// Ensure a real net.Error from a deadline behaves as expected (defense in
// depth against future stdlib changes to error wrapping).
func TestIsStaleConnError_RealDeadline(t *testing.T) {
t.Parallel()

ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Skipf("cannot listen: %v", err)
}
defer ln.Close()

conn, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Skipf("cannot dial: %v", err)
}
defer conn.Close()

_ = conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
buf := make([]byte, 1)
_, readErr := conn.Read(buf)
if readErr == nil {
t.Fatalf("expected read deadline error, got nil")
}
if !isStaleConnError(readErr) {
t.Errorf("isStaleConnError(%v) = false, want true", readErr)
}
}
Loading