Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/javi11/nntppool/v4 v4.11.1
github.com/javi11/nxg v0.1.0
github.com/javi11/nzbparser v0.5.4
github.com/javi11/par2go v0.0.8
github.com/javi11/par2go v0.0.9
github.com/klauspost/compress v1.18.2
github.com/mattn/go-sqlite3 v1.14.32
github.com/mnightingale/rapidyenc v0.0.0-20251128204712-7aafef1eaf1c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ github.com/javi11/nxg v0.1.0 h1:CTThldYlaVIPIhpkrMw0HcTD0NLrW1uYMoDILjjEOtM=
github.com/javi11/nxg v0.1.0/go.mod h1:+GvYpp+y1oq+qBOWxFMvfTjtin/0zCeomWfjiPkiu8A=
github.com/javi11/nzbparser v0.5.4 h1:0aYyORZipp7iX8eNpT/efnzCeVO+9C0sE2HWCGc/JaI=
github.com/javi11/nzbparser v0.5.4/go.mod h1:ikF7WI3BUGs5IHQJmKzmtTkX29NZW5nvUdo6ZWFZgL4=
github.com/javi11/par2go v0.0.8 h1:dnEaRIHIxJZ9dQg2AZ6h/mD9L11xFpdAfaA+vWg2pAI=
github.com/javi11/par2go v0.0.8/go.mod h1:GXuKNiRmZGv4rGia0FR23016IbcrsTMvLHihytsSnMU=
github.com/javi11/par2go v0.0.9 h1:L8L5+N9VdBG/Yn/kTurhLqBPpepTuPx6QJCgtWenQFA=
github.com/javi11/par2go v0.0.9/go.mod h1:GXuKNiRmZGv4rGia0FR23016IbcrsTMvLHihytsSnMU=
github.com/jaypipes/ghw v0.13.0 h1:log8MXuB8hzTNnSktqpXMHc0c/2k/WgjOMSUtnI1RV4=
github.com/jaypipes/ghw v0.13.0/go.mod h1:In8SsaDqlb1oTyrbmTC14uy+fbBMvp+xdqX51MidlD8=
github.com/jaypipes/pcidb v1.0.1 h1:WB2zh27T3nwg8AE8ei81sNRb9yWBii3JGNJtT7K9Oic=
Expand Down
231 changes: 182 additions & 49 deletions internal/poster/poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,35 @@ var (
ErrPosterClosed = errors.New("poster is closed")
)

// defaultBodyBufferSize is the baseline allocation for the body buffer pool
// (slightly larger than the default article size of 750KB).
const defaultBodyBufferSize = 768 * 1024

// maxBodyBufferPoolSize is the upper bound on buffers we accept back into the
// pool. Anything larger came from an anomalously large article and would
// permanently inflate the pool's working set if reused, so we drop it.
const maxBodyBufferPoolSize = 4 * defaultBodyBufferSize

// bodyBufferPool provides reusable buffers for article body read-ahead to reduce GC pressure
var bodyBufferPool = sync.Pool{
New: func() any {
// Pre-allocate 768KB (slightly larger than default article size of 750KB)
return make([]byte, 768*1024)
return make([]byte, defaultBodyBufferSize)
},
}

// putBodyBuffer returns a buffer to the pool, dropping it if it grew beyond
// maxBodyBufferPoolSize so one outlier article cannot inflate the pool forever.
func putBodyBuffer(buf []byte) {
if buf == nil {
return
}
full := buf[:cap(buf)]
if cap(full) > maxBodyBufferPoolSize {
return
}
bodyBufferPool.Put(full) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed
}

// Poster defines the interface for posting articles to Usenet
type Poster interface {
// Post posts files from a directory to Usenet
Expand Down Expand Up @@ -109,6 +130,18 @@ type articleWithBody struct {
poolBuf []byte // original pooled buffer (may be larger than body)
}

// uploadJob is submitted to the shared poster worker pool. Each job posts one
// article. The done callback is invoked exactly once, on success or failure,
// so the submitter can track per-Post completion and capture the first error.
// The worker is responsible for returning poolBuf to the body buffer pool.
type uploadJob struct {
ctx context.Context
art *article.Article
body []byte
poolBuf []byte
done func(err error)
}

// Stats tracks posting statistics
type Stats struct {
ArticlesPosted int64
Expand All @@ -130,6 +163,46 @@ type poster struct {
jobProgress progress.JobProgress
closed atomic.Bool
closeOnce sync.Once

// Shared upload worker pool. Sized once to the total number of upload-pool
// connections so concurrent Post() calls share workers instead of each
// spawning its own pool. See issue #184. Initialized lazily via workerInit
// so that tests which build the struct directly still get a working pool
// on the first Post() call.
workerInit sync.Once
numOfConnections int
uploadJobs chan uploadJob
shutdown chan struct{}
workerWG sync.WaitGroup
}

// ensureWorkersStarted spins up the shared upload worker pool exactly once.
// Safe to call from both New() (eager) and Post() (lazy fallback for tests).
func (p *poster) ensureWorkersStarted() {
p.workerInit.Do(func() {
if p.uploadJobs == nil {
p.uploadJobs = make(chan uploadJob)
}
if p.shutdown == nil {
p.shutdown = make(chan struct{})
}
if p.numOfConnections <= 0 {
n := 0
if p.uploadPool != nil {
for _, pr := range p.uploadPool.Stats().Providers {
n += pr.MaxConnections
}
}
if n < 1 {
n = 1
}
p.numOfConnections = n
}
for i := 0; i < p.numOfConnections; i++ {
p.workerWG.Add(1)
go p.uploadWorker()
}
})
}

// New creates a new poster using dependency injection for the connection pool manager
Expand All @@ -155,6 +228,7 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j
}

postCheckCfg := cfg.GetPostCheckConfig()

p := &poster{
cfg: cfg.GetPostingConfig(),
checkCfg: postCheckCfg,
Expand All @@ -164,6 +238,12 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j
jobProgress: jobProgress,
}

// Size the shared upload worker pool to the total number of upload-pool
// connections across all providers. This caps in-process posting goroutines
// to actual upstream capacity rather than letting each Post() call spawn
// its own pool, which previously multiplied with MaxConcurrentUploads × PAR2.
p.ensureWorkersStarted()

// Log post check configuration for debugging
if postCheckCfg.Enabled != nil {
slog.DebugContext(ctx, "Poster initialized",
Expand All @@ -185,10 +265,49 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j
func (p *poster) Close() {
p.closeOnce.Do(func() {
p.closed.Store(true)
// Only signal workers if they were ever started. A Post() call would
// have triggered ensureWorkersStarted; otherwise shutdown is nil.
if p.shutdown != nil {
close(p.shutdown)
p.workerWG.Wait()
}
slog.Info("Poster closed - no new Post() calls will be accepted")
})
}

// uploadWorker drains uploadJob entries from the shared queue and posts each
// article. Exactly numOfConnections workers run for the lifetime of the
// poster, so total posting concurrency is bounded regardless of how many
// concurrent Post() calls are in flight.
func (p *poster) uploadWorker() {
defer p.workerWG.Done()
for {
select {
case <-p.shutdown:
return
case job := <-p.uploadJobs:
p.runUploadJob(job)
}
}
}

// runUploadJob executes a single upload job and reports the outcome through
// job.done. The job's body buffer is always returned to the pool, even on
// error or cancellation.
func (p *poster) runUploadJob(job uploadJob) {
defer putBodyBuffer(job.poolBuf)

if err := job.ctx.Err(); err != nil {
job.done(err)
return
}
if err := pausable.CheckPause(job.ctx); err != nil {
job.done(err)
return
}
job.done(p.postArticleWithBody(job.ctx, job.art, job.body))
}

// Post posts files from a directory to Usenet
func (p *poster) Post(
ctx context.Context,
Expand All @@ -214,6 +333,10 @@ func (p *poster) PostWithRelativePaths(
return ErrPosterClosed
}

// Lazy fallback for struct-literal construction (tests). New() already
// invokes this; second call is a no-op via sync.Once.
p.ensureWorkersStarted()

wg := sync.WaitGroup{}
var failedPosts atomic.Int64

Expand Down Expand Up @@ -335,12 +458,6 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
// Only close channels that this goroutine writes to
defer close(checkQueue)

numOfConnections := 0

for _, pr := range p.uploadPool.Stats().Providers {
numOfConnections += pr.MaxConnections
}

for post := range postQueue {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -374,83 +491,99 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
case <-postCtx.Done():
return
default:
// Get buffer from pool, resize if needed
// Get buffer from pool, resize if needed. Outliers are dropped
// on Put (see putBodyBuffer) so they cannot inflate the pool.
poolBuf := bodyBufferPool.Get().([]byte)
if cap(poolBuf) < int(art.Size) {
// Buffer too small, allocate a larger one (won't go back to pool)
poolBuf = make([]byte, art.Size)
}
body := poolBuf[:art.Size]

if _, err := post.file.ReadAt(body, art.Offset); err != nil {
// Return buffer to pool on error
bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed
putBodyBuffer(poolBuf)
slog.ErrorContext(ctx, "Error pre-reading article", "error", err, "offset", art.Offset)
return
}

select {
case readAheadChan <- articleWithBody{article: art, body: body, poolBuf: poolBuf}:
case <-postCtx.Done():
// Return buffer to pool if context cancelled
bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed
putBodyBuffer(poolBuf)
return
}
}
}
}()

// Create a pool with error handling - use all available connections
// Note: We intentionally don't use WithCancelOnError() to prevent cascading failures
// when a single article fails (e.g., TLS timeout). Other articles should continue.
pool := concpool.New().WithContext(ctx).WithMaxGoroutines(numOfConnections).WithFirstError()
// Submit articles to the shared upload worker pool. Total posting
// concurrency across all in-flight Post() calls is capped at
// p.numOfConnections; this prevents the goroutine + buffer explosion
// that caused OOM under MaxConcurrentUploads > 1 with PAR2 enabled.
//
// We intentionally do NOT cancel sibling articles when one fails
// (e.g. TLS timeout) — they should continue.
var articleWG sync.WaitGroup
var firstErr atomic.Pointer[error]
recordErr := func(err error) {
if err == nil {
return
}
e := err
firstErr.CompareAndSwap(nil, &e)
}

// Collect completed articles for batch NZB addition (reduces lock contention)
var completedArticles []*article.Article
var completedMu sync.Mutex

// Consume from read-ahead channel and submit to posting pool
for artWithBody := range readAheadChan {
art := artWithBody.article
body := artWithBody.body
poolBuf := artWithBody.poolBuf

pool.Go(func(ctx context.Context) error {
// Return buffer to pool when done (even on error)
defer func() {
if poolBuf != nil {
bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed
articleWG.Add(1)
job := uploadJob{
ctx: postCtx,
art: art,
body: body,
poolBuf: poolBuf,
done: func(err error) {
defer articleWG.Done()
if err != nil {
recordErr(err)
return
}
}()

if ctx.Err() != nil {
return ctx.Err()
}

// Check for pause before processing article
if err := pausable.CheckPause(ctx); err != nil {
return err
}

if err := p.postArticleWithBody(ctx, art, body); err != nil {
return err
}
post.progress.UpdateProgress(int64(art.Size))
completedMu.Lock()
completedArticles = append(completedArticles, art)
completedMu.Unlock()
},
}

// Update progress if manager is available
post.progress.UpdateProgress(int64(art.Size))
select {
case p.uploadJobs <- job:
case <-p.shutdown:
putBodyBuffer(poolBuf)
recordErr(ErrPosterClosed)
articleWG.Done()
case <-postCtx.Done():
putBodyBuffer(poolBuf)
recordErr(postCtx.Err())
articleWG.Done()
}
}

// Collect completed article for batch NZB addition
completedMu.Lock()
completedArticles = append(completedArticles, art)
completedMu.Unlock()
// Wait for all submitted articles to finish.
articleWG.Wait()

return nil
})
// Collect first error (if any). Matches the previous WithFirstError
// semantic from conc/pool: other workers continued; we surface the
// first failure to the caller.
var errs error
if ePtr := firstErr.Load(); ePtr != nil {
errs = *ePtr
}

// Wait for all workers to complete and collect errors
errs := pool.Wait()

// Read-ahead goroutine has finished (readAheadChan was drained above)
// but cancel the per-post ctx so any straggler observes Done.
postCancel()
Expand Down
Loading
Loading