diff --git a/go.mod b/go.mod index e2af672..7dd0dd3 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/javi11/nntppool/v4 v4.11.1 github.com/javi11/nxg v0.1.0 github.com/javi11/nzbparser v0.5.4 - github.com/javi11/par2go v0.0.8 + github.com/javi11/par2go v0.0.9 github.com/klauspost/compress v1.18.2 github.com/mattn/go-sqlite3 v1.14.32 github.com/mnightingale/rapidyenc v0.0.0-20251128204712-7aafef1eaf1c diff --git a/go.sum b/go.sum index e1b6229..74b39eb 100644 --- a/go.sum +++ b/go.sum @@ -351,8 +351,8 @@ github.com/javi11/nxg v0.1.0 h1:CTThldYlaVIPIhpkrMw0HcTD0NLrW1uYMoDILjjEOtM= github.com/javi11/nxg v0.1.0/go.mod h1:+GvYpp+y1oq+qBOWxFMvfTjtin/0zCeomWfjiPkiu8A= github.com/javi11/nzbparser v0.5.4 h1:0aYyORZipp7iX8eNpT/efnzCeVO+9C0sE2HWCGc/JaI= github.com/javi11/nzbparser v0.5.4/go.mod h1:ikF7WI3BUGs5IHQJmKzmtTkX29NZW5nvUdo6ZWFZgL4= -github.com/javi11/par2go v0.0.8 h1:dnEaRIHIxJZ9dQg2AZ6h/mD9L11xFpdAfaA+vWg2pAI= -github.com/javi11/par2go v0.0.8/go.mod h1:GXuKNiRmZGv4rGia0FR23016IbcrsTMvLHihytsSnMU= +github.com/javi11/par2go v0.0.9 h1:L8L5+N9VdBG/Yn/kTurhLqBPpepTuPx6QJCgtWenQFA= +github.com/javi11/par2go v0.0.9/go.mod h1:GXuKNiRmZGv4rGia0FR23016IbcrsTMvLHihytsSnMU= github.com/jaypipes/ghw v0.13.0 h1:log8MXuB8hzTNnSktqpXMHc0c/2k/WgjOMSUtnI1RV4= github.com/jaypipes/ghw v0.13.0/go.mod h1:In8SsaDqlb1oTyrbmTC14uy+fbBMvp+xdqX51MidlD8= github.com/jaypipes/pcidb v1.0.1 h1:WB2zh27T3nwg8AE8ei81sNRb9yWBii3JGNJtT7K9Oic= diff --git a/internal/poster/poster.go b/internal/poster/poster.go index eb23b6e..801e453 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -36,14 +36,35 @@ var ( ErrPosterClosed = errors.New("poster is closed") ) +// defaultBodyBufferSize is the baseline allocation for the body buffer pool +// (slightly larger than the default article size of 750KB). +const defaultBodyBufferSize = 768 * 1024 + +// maxBodyBufferPoolSize is the upper bound on buffers we accept back into the +// pool. Anything larger came from an anomalously large article and would +// permanently inflate the pool's working set if reused, so we drop it. +const maxBodyBufferPoolSize = 4 * defaultBodyBufferSize + // bodyBufferPool provides reusable buffers for article body read-ahead to reduce GC pressure var bodyBufferPool = sync.Pool{ New: func() any { - // Pre-allocate 768KB (slightly larger than default article size of 750KB) - return make([]byte, 768*1024) + return make([]byte, defaultBodyBufferSize) }, } +// putBodyBuffer returns a buffer to the pool, dropping it if it grew beyond +// maxBodyBufferPoolSize so one outlier article cannot inflate the pool forever. +func putBodyBuffer(buf []byte) { + if buf == nil { + return + } + full := buf[:cap(buf)] + if cap(full) > maxBodyBufferPoolSize { + return + } + bodyBufferPool.Put(full) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed +} + // Poster defines the interface for posting articles to Usenet type Poster interface { // Post posts files from a directory to Usenet @@ -109,6 +130,18 @@ type articleWithBody struct { poolBuf []byte // original pooled buffer (may be larger than body) } +// uploadJob is submitted to the shared poster worker pool. Each job posts one +// article. The done callback is invoked exactly once, on success or failure, +// so the submitter can track per-Post completion and capture the first error. +// The worker is responsible for returning poolBuf to the body buffer pool. +type uploadJob struct { + ctx context.Context + art *article.Article + body []byte + poolBuf []byte + done func(err error) +} + // Stats tracks posting statistics type Stats struct { ArticlesPosted int64 @@ -130,6 +163,46 @@ type poster struct { jobProgress progress.JobProgress closed atomic.Bool closeOnce sync.Once + + // Shared upload worker pool. Sized once to the total number of upload-pool + // connections so concurrent Post() calls share workers instead of each + // spawning its own pool. See issue #184. Initialized lazily via workerInit + // so that tests which build the struct directly still get a working pool + // on the first Post() call. + workerInit sync.Once + numOfConnections int + uploadJobs chan uploadJob + shutdown chan struct{} + workerWG sync.WaitGroup +} + +// ensureWorkersStarted spins up the shared upload worker pool exactly once. +// Safe to call from both New() (eager) and Post() (lazy fallback for tests). +func (p *poster) ensureWorkersStarted() { + p.workerInit.Do(func() { + if p.uploadJobs == nil { + p.uploadJobs = make(chan uploadJob) + } + if p.shutdown == nil { + p.shutdown = make(chan struct{}) + } + if p.numOfConnections <= 0 { + n := 0 + if p.uploadPool != nil { + for _, pr := range p.uploadPool.Stats().Providers { + n += pr.MaxConnections + } + } + if n < 1 { + n = 1 + } + p.numOfConnections = n + } + for i := 0; i < p.numOfConnections; i++ { + p.workerWG.Add(1) + go p.uploadWorker() + } + }) } // New creates a new poster using dependency injection for the connection pool manager @@ -155,6 +228,7 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j } postCheckCfg := cfg.GetPostCheckConfig() + p := &poster{ cfg: cfg.GetPostingConfig(), checkCfg: postCheckCfg, @@ -164,6 +238,12 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j jobProgress: jobProgress, } + // Size the shared upload worker pool to the total number of upload-pool + // connections across all providers. This caps in-process posting goroutines + // to actual upstream capacity rather than letting each Post() call spawn + // its own pool, which previously multiplied with MaxConcurrentUploads × PAR2. + p.ensureWorkersStarted() + // Log post check configuration for debugging if postCheckCfg.Enabled != nil { slog.DebugContext(ctx, "Poster initialized", @@ -185,10 +265,49 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j func (p *poster) Close() { p.closeOnce.Do(func() { p.closed.Store(true) + // Only signal workers if they were ever started. A Post() call would + // have triggered ensureWorkersStarted; otherwise shutdown is nil. + if p.shutdown != nil { + close(p.shutdown) + p.workerWG.Wait() + } slog.Info("Poster closed - no new Post() calls will be accepted") }) } +// uploadWorker drains uploadJob entries from the shared queue and posts each +// article. Exactly numOfConnections workers run for the lifetime of the +// poster, so total posting concurrency is bounded regardless of how many +// concurrent Post() calls are in flight. +func (p *poster) uploadWorker() { + defer p.workerWG.Done() + for { + select { + case <-p.shutdown: + return + case job := <-p.uploadJobs: + p.runUploadJob(job) + } + } +} + +// runUploadJob executes a single upload job and reports the outcome through +// job.done. The job's body buffer is always returned to the pool, even on +// error or cancellation. +func (p *poster) runUploadJob(job uploadJob) { + defer putBodyBuffer(job.poolBuf) + + if err := job.ctx.Err(); err != nil { + job.done(err) + return + } + if err := pausable.CheckPause(job.ctx); err != nil { + job.done(err) + return + } + job.done(p.postArticleWithBody(job.ctx, job.art, job.body)) +} + // Post posts files from a directory to Usenet func (p *poster) Post( ctx context.Context, @@ -214,6 +333,10 @@ func (p *poster) PostWithRelativePaths( return ErrPosterClosed } + // Lazy fallback for struct-literal construction (tests). New() already + // invokes this; second call is a no-op via sync.Once. + p.ensureWorkersStarted() + wg := sync.WaitGroup{} var failedPosts atomic.Int64 @@ -335,12 +458,6 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue // Only close channels that this goroutine writes to defer close(checkQueue) - numOfConnections := 0 - - for _, pr := range p.uploadPool.Stats().Providers { - numOfConnections += pr.MaxConnections - } - for post := range postQueue { select { case <-ctx.Done(): @@ -374,17 +491,16 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue case <-postCtx.Done(): return default: - // Get buffer from pool, resize if needed + // Get buffer from pool, resize if needed. Outliers are dropped + // on Put (see putBodyBuffer) so they cannot inflate the pool. poolBuf := bodyBufferPool.Get().([]byte) if cap(poolBuf) < int(art.Size) { - // Buffer too small, allocate a larger one (won't go back to pool) poolBuf = make([]byte, art.Size) } body := poolBuf[:art.Size] if _, err := post.file.ReadAt(body, art.Offset); err != nil { - // Return buffer to pool on error - bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed + putBodyBuffer(poolBuf) slog.ErrorContext(ctx, "Error pre-reading article", "error", err, "offset", art.Offset) return } @@ -392,65 +508,82 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue select { case readAheadChan <- articleWithBody{article: art, body: body, poolBuf: poolBuf}: case <-postCtx.Done(): - // Return buffer to pool if context cancelled - bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed + putBodyBuffer(poolBuf) return } } } }() - // Create a pool with error handling - use all available connections - // Note: We intentionally don't use WithCancelOnError() to prevent cascading failures - // when a single article fails (e.g., TLS timeout). Other articles should continue. - pool := concpool.New().WithContext(ctx).WithMaxGoroutines(numOfConnections).WithFirstError() + // Submit articles to the shared upload worker pool. Total posting + // concurrency across all in-flight Post() calls is capped at + // p.numOfConnections; this prevents the goroutine + buffer explosion + // that caused OOM under MaxConcurrentUploads > 1 with PAR2 enabled. + // + // We intentionally do NOT cancel sibling articles when one fails + // (e.g. TLS timeout) — they should continue. + var articleWG sync.WaitGroup + var firstErr atomic.Pointer[error] + recordErr := func(err error) { + if err == nil { + return + } + e := err + firstErr.CompareAndSwap(nil, &e) + } // Collect completed articles for batch NZB addition (reduces lock contention) var completedArticles []*article.Article var completedMu sync.Mutex - // Consume from read-ahead channel and submit to posting pool for artWithBody := range readAheadChan { art := artWithBody.article body := artWithBody.body poolBuf := artWithBody.poolBuf - pool.Go(func(ctx context.Context) error { - // Return buffer to pool when done (even on error) - defer func() { - if poolBuf != nil { - bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed + articleWG.Add(1) + job := uploadJob{ + ctx: postCtx, + art: art, + body: body, + poolBuf: poolBuf, + done: func(err error) { + defer articleWG.Done() + if err != nil { + recordErr(err) + return } - }() - - if ctx.Err() != nil { - return ctx.Err() - } - - // Check for pause before processing article - if err := pausable.CheckPause(ctx); err != nil { - return err - } - - if err := p.postArticleWithBody(ctx, art, body); err != nil { - return err - } + post.progress.UpdateProgress(int64(art.Size)) + completedMu.Lock() + completedArticles = append(completedArticles, art) + completedMu.Unlock() + }, + } - // Update progress if manager is available - post.progress.UpdateProgress(int64(art.Size)) + select { + case p.uploadJobs <- job: + case <-p.shutdown: + putBodyBuffer(poolBuf) + recordErr(ErrPosterClosed) + articleWG.Done() + case <-postCtx.Done(): + putBodyBuffer(poolBuf) + recordErr(postCtx.Err()) + articleWG.Done() + } + } - // Collect completed article for batch NZB addition - completedMu.Lock() - completedArticles = append(completedArticles, art) - completedMu.Unlock() + // Wait for all submitted articles to finish. + articleWG.Wait() - return nil - }) + // Collect first error (if any). Matches the previous WithFirstError + // semantic from conc/pool: other workers continued; we surface the + // first failure to the caller. + var errs error + if ePtr := firstErr.Load(); ePtr != nil { + errs = *ePtr } - // Wait for all workers to complete and collect errors - errs := pool.Wait() - // Read-ahead goroutine has finished (readAheadChan was drained above) // but cancel the per-post ctx so any straggler observes Done. postCancel() diff --git a/internal/poster/poster_test.go b/internal/poster/poster_test.go index 77b8231..26f7dfe 100644 --- a/internal/poster/poster_test.go +++ b/internal/poster/poster_test.go @@ -1348,6 +1348,153 @@ func TestCheckLoop_Basic(t *testing.T) { // Test helper functions +// TestSharedUploadPool verifies that the global poster worker pool introduced +// for issue #184 (https://github.com/javi11/postie/issues/184) actually caps +// concurrent in-flight article posts at numOfConnections regardless of how +// many concurrent Post() calls are happening, and that Close() drains cleanly. +func TestSharedUploadPool(t *testing.T) { + t.Run("concurrent posts share workers and stay bounded", func(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + const maxConnections = 4 + const concurrentPosts = 8 + + // Track simultaneously-active article posts. The mock blocks briefly so + // many articles overlap; the in-flight peak must not exceed + // maxConnections. + var inflight atomic.Int32 + var peak atomic.Int32 + + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stats().Return(nntppool.ClientStats{ + Providers: []nntppool.ProviderStats{{MaxConnections: maxConnections}}, + }).AnyTimes() + mockPool.EXPECT(). + PostYenc(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, _ any, _ any, _ any) (*nntppool.PostResult, error) { + cur := inflight.Add(1) + for { + p := peak.Load() + if cur <= p || peak.CompareAndSwap(p, cur) { + break + } + } + time.Sleep(5 * time.Millisecond) + inflight.Add(-1) + return &nntppool.PostResult{}, nil + }). + AnyTimes() + + nzbGen := mocks.NewMockNZBGenerator(ctrl) + nzbGen.EXPECT().AddArticle(gomock.Any()).Return().AnyTimes() + + mockJobProgress := mocks.NewMockJobProgress(ctrl) + mockProgress := mocks.NewMockProgress(ctrl) + mockJobProgress.EXPECT().AddProgress(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockProgress).AnyTimes() + mockJobProgress.EXPECT().FinishProgress(gomock.Any()).AnyTimes() + mockProgress.EXPECT().UpdateProgress(gomock.Any()).AnyTimes() + mockProgress.EXPECT().Finish().AnyTimes() + mockProgress.EXPECT().GetID().Return(uuid.New()).AnyTimes() + + checkCfg := createTestPostCheckConfig() + disabled := false + checkCfg.Enabled = &disabled + + p := &poster{ + cfg: createTestConfig(), + checkCfg: checkCfg, + uploadPool: mockPool, + stats: &Stats{StartTime: time.Now()}, + throttle: NewThrottle(1024*1024, time.Second), + jobProgress: mockJobProgress, + } + + // Build a content blob large enough to produce several articles per + // Post() so workers actually contend for slots. + content := strings.Repeat("test data ", 500) + + var wg sync.WaitGroup + errCh := make(chan error, concurrentPosts) + for range concurrentPosts { + testFile := createTestFile(t, content) + defer func() { _ = os.Remove(testFile) }() + wg.Add(1) + go func(f string) { + defer wg.Done() + errCh <- p.Post(ctx, []string{f}, "", nzbGen) + }(testFile) + } + wg.Wait() + close(errCh) + for err := range errCh { + assert.NoError(t, err) + } + + assert.LessOrEqual(t, int(peak.Load()), maxConnections, + "peak concurrent in-flight article posts should not exceed numOfConnections") + assert.Equal(t, int32(0), inflight.Load(), + "all in-flight posts should have completed before Wait() returned") + + p.Close() + }) + + t.Run("Close drains workers and rejects new Post calls", func(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockPool := createMockNNTPClient(ctrl) + mockPool.EXPECT().PostYenc(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&nntppool.PostResult{}, nil).AnyTimes() + + nzbGen := mocks.NewMockNZBGenerator(ctrl) + nzbGen.EXPECT().AddArticle(gomock.Any()).Return().AnyTimes() + + mockJobProgress := mocks.NewMockJobProgress(ctrl) + mockProgress := mocks.NewMockProgress(ctrl) + mockJobProgress.EXPECT().AddProgress(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(mockProgress).AnyTimes() + mockJobProgress.EXPECT().FinishProgress(gomock.Any()).AnyTimes() + mockProgress.EXPECT().UpdateProgress(gomock.Any()).AnyTimes() + mockProgress.EXPECT().Finish().AnyTimes() + mockProgress.EXPECT().GetID().Return(uuid.New()).AnyTimes() + + checkCfg := createTestPostCheckConfig() + disabled := false + checkCfg.Enabled = &disabled + + p := &poster{ + cfg: createTestConfig(), + checkCfg: checkCfg, + uploadPool: mockPool, + stats: &Stats{StartTime: time.Now()}, + throttle: NewThrottle(1024*1024, time.Second), + jobProgress: mockJobProgress, + } + + testFile := createTestFile(t, strings.Repeat("test data ", 100)) + defer func() { _ = os.Remove(testFile) }() + + require.NoError(t, p.Post(ctx, []string{testFile}, "", nzbGen)) + + // Close must drain workers (no goroutine leak) and short-circuit + // subsequent Post() calls with ErrPosterClosed. + closed := make(chan struct{}) + go func() { + p.Close() + close(closed) + }() + select { + case <-closed: + case <-time.After(2 * time.Second): + t.Fatal("Close did not drain workers within 2s") + } + + err := p.Post(ctx, []string{testFile}, "", nzbGen) + assert.ErrorIs(t, err, ErrPosterClosed) + }) +} + func createTestConfig() config.PostingConfig { enabled := true return config.PostingConfig{ diff --git a/internal/processor/processor.go b/internal/processor/processor.go index ef8e91c..c875aef 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "path/filepath" + "runtime" "strings" "sync" "time" @@ -140,6 +141,12 @@ func (p *Processor) Start(ctx context.Context) error { processTicker := time.NewTicker(time.Second * 2) // Process queue frequently defer processTicker.Stop() + // Diagnostic watchdog: periodically log goroutine count and heap usage at + // debug level so silent OOM-kills (e.g. on memory-constrained NAS hosts) + // leave a trend line in the logs before the process dies. See issue #184. + watchdogTicker := time.NewTicker(10 * time.Second) + defer watchdogTicker.Stop() + // Main processing loop for { select { @@ -149,6 +156,15 @@ func (p *Processor) Start(ctx context.Context) error { if err := p.processQueueItems(ctx); err != nil { slog.ErrorContext(ctx, "Error processing queue", "error", err) } + case <-watchdogTicker.C: + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + slog.DebugContext(ctx, "Runtime watchdog", + "goroutines", runtime.NumGoroutine(), + "heap_alloc_bytes", ms.HeapAlloc, + "heap_inuse_bytes", ms.HeapInuse, + "sys_bytes", ms.Sys, + ) } } } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index bcabf30..4bf35f7 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "sync" "time" "github.com/javi11/postie/internal/database" @@ -62,6 +63,14 @@ type Queue struct { db *sql.DB runCtx context.Context runCancel context.CancelFunc + + // addMu serializes check-then-insert in the duplicate-checking Add* methods. + // goqite.Send uses its own *sql.DB handle so a SQL transaction cannot span + // IsPathInQueue + Send; we close the TOCTOU window at the application level. + // See issue #184: under MaxConcurrentUploads > 1 the watcher and processor + // can both race to add the same path between the IsPathInQueue check and + // the Send call. + addMu sync.Mutex } type QueueItem struct { @@ -209,6 +218,9 @@ func (q *Queue) recoverInProgressItems(ctx context.Context) { // AddFile adds a file to the queue for processing func (q *Queue) AddFile(ctx context.Context, path string, size int64) error { + q.addMu.Lock() + defer q.addMu.Unlock() + // Check if the path already exists in pending queue, completed items, or errored items exists, err := q.IsPathInQueue(path) if err != nil { @@ -266,6 +278,9 @@ func (q *Queue) AddFileWithoutDuplicateCheck(ctx context.Context, path string, s // AddFileWithPriority adds a file to the queue with a specific priority func (q *Queue) AddFileWithPriority(ctx context.Context, path string, size int64, priority int) error { + q.addMu.Lock() + defer q.addMu.Unlock() + // Check if the path already exists in pending queue, completed items, or errored items exists, err := q.IsPathInQueue(path) if err != nil { @@ -325,6 +340,9 @@ func (q *Queue) AddFileWithPriorityWithoutDuplicateCheck(ctx context.Context, pa // (priority, input folder for relative-path output, delete-original flag). // Skips the add if the path is already tracked anywhere in the queue. func (q *Queue) AddFileWithOptions(ctx context.Context, path string, size int64, opts AddOptions) error { + q.addMu.Lock() + defer q.addMu.Unlock() + exists, err := q.IsPathInQueue(path) if err != nil { return fmt.Errorf("failed to check if path exists: %w", err)