From 40a68ce9c9e779f38b31aee821ad9c68080e934c Mon Sep 17 00:00:00 2001 From: javi11 Date: Sat, 9 May 2026 10:52:34 +0200 Subject: [PATCH 1/2] fix: apply post-check propagation delay per file with elapsed-time credit Track postedAt on each Post when its upload completes, and in checkLoop wait only the remainder of PostCheck.RetryDelay that hasn't already elapsed since that file finished posting. Removes the firstPost-gated single wait, which caused files 2..N within a multi-file Post() call to skip the propagation grace period entirely while file 1 paid the full delay. Refs #184. --- internal/poster/poster.go | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/internal/poster/poster.go b/internal/poster/poster.go index 0ce2a5f..738357e 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -81,6 +81,10 @@ type Post struct { wg *sync.WaitGroup failed *atomic.Int64 progress progress.Progress + // postedAt records when this file's upload finished. Used by checkLoop to + // apply the propagation delay per-file with credit for time already elapsed, + // rather than only sleeping before the first file. + postedAt time.Time } // FailedArticleInfo contains information about an article that failed verification @@ -493,6 +497,7 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue post.mu.Lock() post.Status = PostStatusPosted + post.postedAt = time.Now() post.mu.Unlock() if p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { @@ -532,8 +537,6 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue deferredEnabled := p.checkCfg.DeferredCheckDelay.ToDuration() > 0 - firstPost := true - for post := range checkQueue { select { case <-ctx.Done(): @@ -550,19 +553,33 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue // during the RetryDelay wait, preventing the queue item from appearing stuck. post.progress = p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (check)", filepath.Base(post.FilePath)), progress.ProgressTypeChecking, post.filesize) - // Wait for articles to propagate to the verify server before checking. - // Only needed for the first file: subsequent files are posted after enough - // time has already elapsed during the first file's posting and checking. - if firstPost { - firstPost = false - if delay := p.checkCfg.RetryDelay.ToDuration(); delay > 0 { - post.progress.SetWaitDeadline(time.Now().Add(delay)) + // Wait for this file's articles to propagate to the verify server before + // checking. We sleep only the remainder of RetryDelay that hasn't already + // elapsed since this file finished posting — files that sat in the queue + // long enough incur no extra wait, while files checked right after upload + // still get the full propagation grace period. + if delay := p.checkCfg.RetryDelay.ToDuration(); delay > 0 { + post.mu.Lock() + postedAt := post.postedAt + post.mu.Unlock() + + var remaining time.Duration + if postedAt.IsZero() { + // Defensive: postedAt should always be set by postLoop, but if + // it isn't (e.g. unusual code paths) wait the full delay. + remaining = delay + } else { + remaining = delay - time.Since(postedAt) + } + + if remaining > 0 { + post.progress.SetWaitDeadline(time.Now().Add(remaining)) select { case <-ctx.Done(): post.progress.SetWaitDeadline(time.Time{}) errChan <- ctx.Err() return - case <-time.After(delay): + case <-time.After(remaining): } post.progress.SetWaitDeadline(time.Time{}) } From d195180d21ad89a9b0658947136ae2b474b47a77 Mon Sep 17 00:00:00 2001 From: javi11 Date: Sat, 9 May 2026 11:25:49 +0200 Subject: [PATCH 2/2] fix: run post-check verification asynchronously off Post() critical path Issue #184: with MaxConcurrentUploads=1, every uploaded file paid the full PostCheck.RetryDelay because each file was its own Post() call that spawned a fresh per-call checkLoop and only returned after STAT verification. Refactor postLoop and checkLoop to be poster-lifetime goroutines spawned once (lazily) per poster instance, sharing the postQueue and checkQueue. Post() now returns as soon as all uploads complete; the propagation delay and STAT verification run in the background on the long-lived checkLoop. Permanent verification failures (articles exhausting MaxRePost) are now surfaced via a per-call CheckExhaustedCallback set on the Postie wrapper via SetVerificationCallback. The processor wires this callback to queue.AddPendingArticleChecks, so the existing PostCheckRetryWorker keeps draining failures from the deferred-check DB queue exactly as before. Behavior changes: - Post() no longer returns DeferredCheckError synchronously; verification results are reported asynchronously through the existing deferred path. - Non-deferred verification exhaustion (post_check.deferred_check_delay=0) is now logged-only instead of returned as an error from Post(); the upload itself still succeeds. - poster.Close() now drains the long-lived loops with a 30s soft timeout before force-cancelling. Compile-time API additions: - poster.Poster gains PostWithCallback(ctx, files, rootDir, nzbGen, relativePaths, completedItemID, onCheckExhausted). - pkg/postie.Postie gains SetVerificationCallback(itemID, cb). Existing call sites in pkg/postie and the CLI keep working unchanged because callbacks default to nil (matches the legacy "drop on failure" behavior). DeferredCheckError type and the old synchronous catch sites in pkg/postie + processor remain as harmless dead code; can be removed in a follow-up. Refs #184. Builds on #224 (Phase 1, per-file postedAt). --- internal/poster/poster.go | 694 ++++++++++++++++++-------------- internal/poster/poster_test.go | 31 +- internal/processor/processor.go | 35 ++ pkg/postie/postfolder_test.go | 4 + pkg/postie/postie.go | 32 +- 5 files changed, 470 insertions(+), 326 deletions(-) diff --git a/internal/poster/poster.go b/internal/poster/poster.go index 738357e..32d37b6 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -50,6 +50,12 @@ type Poster interface { // PostWithRelativePaths posts files with custom display names (relative paths) for subjects // relativePaths maps absolute file path to the display name to use in the subject PostWithRelativePaths(ctx context.Context, files []string, rootDir string, nzbGen nzb.NZBGenerator, relativePaths map[string]string) error + // PostWithCallback is the full-featured posting entry point. It returns as + // soon as all uploads complete; the post-check propagation delay and STAT + // verification run asynchronously on the poster's long-lived checkLoop. + // completedItemID + onCheckExhausted let the caller persist failed-to-verify + // articles to a deferred-check queue. Both may be empty/nil. + PostWithCallback(ctx context.Context, files []string, rootDir string, nzbGen nzb.NZBGenerator, relativePaths map[string]string, completedItemID string, onCheckExhausted CheckExhaustedCallback) error // Stats returns posting statistics Stats() Stats // Close closes the poster @@ -68,6 +74,13 @@ const ( PostStatusPosting ) +// CheckExhaustedCallback is invoked by the long-lived checkLoop when an +// uploaded file's articles cannot be verified after exhausting MaxRePost. +// It is called on a background goroutine, after Post() has already returned +// to the caller. The callback is responsible for persisting failed articles +// for deferred verification (e.g. via queue.AddPendingArticleChecks). +type CheckExhaustedCallback func(ctx context.Context, articles []FailedArticleInfo, totalArticles int, completedItemID string) error + // Post represents a file to be posted type Post struct { FilePath string @@ -78,13 +91,29 @@ type Post struct { file *os.File mu sync.Mutex filesize int64 - wg *sync.WaitGroup + wg *sync.WaitGroup // per-call, drained when upload (not check) completes failed *atomic.Int64 progress progress.Progress // postedAt records when this file's upload finished. Used by checkLoop to - // apply the propagation delay per-file with credit for time already elapsed, - // rather than only sleeping before the first file. + // apply the propagation delay per-file with credit for time already elapsed. postedAt time.Time + // errSink is the per-call channel that postLoop uses to surface fatal + // upload errors back to the caller of Post(). Set by addPost. Never used + // by checkLoop — check failures route through onCheckExhausted instead. + errSink chan<- error + // nzbGen is the per-call NZB generator. Set by addPost. + nzbGen nzb.NZBGenerator + // completedItemID identifies this post in the persistent queue. Used by + // checkLoop to pass to onCheckExhausted on permanent verification failure. + // Empty when the caller doesn't have a queue ID (e.g. CLI usage). + completedItemID string + // onCheckExhausted is invoked by checkLoop when articles exhaust MaxRePost. + // May be nil — in that case checkLoop logs and drops the failure. + onCheckExhausted CheckExhaustedCallback + // retryParent links a retry post back to the original post's wg/failed/etc. + // Retries do not increment the per-call wg (which counts only initial + // uploads), so they need a way to find the original's metadata. + retryParent *Post } // FailedArticleInfo contains information about an article that failed verification @@ -133,6 +162,17 @@ type poster struct { jobProgress progress.JobProgress closed atomic.Bool closeOnce sync.Once + + // Long-lived loop infrastructure. postLoop and checkLoop are spawned once + // in New() and shared by every Post() call, so the propagation check no + // longer blocks Post()'s critical path. + postQueue chan *Post + checkQueue chan *Post // nil when post-check is disabled + loopCtx context.Context + loopCancel context.CancelFunc + loopsWG sync.WaitGroup + checkOn bool // captured at construction; whether checkLoop is running + loopsOnce sync.Once } // New creates a new poster using dependency injection for the connection pool manager @@ -158,6 +198,12 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j } postCheckCfg := cfg.GetPostCheckConfig() + checkOn := postCheckCfg.Enabled != nil && *postCheckCfg.Enabled + + // Loop ctx is detached from the caller's ctx so the loops outlive any + // individual Post() call. They are torn down in Close(). + loopCtx, loopCancel := context.WithCancel(context.Background()) + p := &poster{ cfg: cfg.GetPostingConfig(), checkCfg: postCheckCfg, @@ -165,6 +211,13 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j verifyPool: verifyPool, stats: stats, jobProgress: jobProgress, + postQueue: make(chan *Post, 100), + loopCtx: loopCtx, + loopCancel: loopCancel, + checkOn: checkOn, + } + if checkOn { + p.checkQueue = make(chan *Post, 100) } // Log post check configuration for debugging @@ -182,13 +235,90 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j p.throttle = NewThrottle(throttleRate, time.Second) } + p.ensureLoops() + if checkOn { + slog.DebugContext(ctx, "Post check enabled - started checkLoop goroutine") + } else { + slog.InfoContext(ctx, "Post check disabled - skipping article verification") + } + return p, nil } +// ensureLoops lazily initializes per-poster channels/ctx and spawns postLoop +// (and checkLoop, if enabled) exactly once. Called by New() and again by the +// post entry points so hand-constructed posters (used in unit tests) work +// without explicit setup. +func (p *poster) ensureLoops() { + p.loopsOnce.Do(func() { + if p.postQueue == nil { + p.postQueue = make(chan *Post, 100) + } + if p.loopCtx == nil { + p.loopCtx, p.loopCancel = context.WithCancel(context.Background()) + } + // Honor an explicit checkOn even if checkCfg.Enabled is nil — supports + // tests that pre-set the field. Otherwise derive from config. + if !p.checkOn && p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { + p.checkOn = true + } + if p.checkOn && p.checkQueue == nil { + p.checkQueue = make(chan *Post, 100) + } + + p.loopsWG.Add(1) + go func() { + defer p.loopsWG.Done() + p.postLoop() + }() + if p.checkOn { + p.loopsWG.Add(1) + go func() { + defer p.loopsWG.Done() + p.checkLoop() + }() + } + }) +} + func (p *poster) Close() { p.closeOnce.Do(func() { p.closed.Store(true) - slog.Info("Poster closed - no new Post() calls will be accepted") + + // If loops were never spawned (hand-constructed poster never used), + // nothing to drain. + if p.postQueue == nil { + return + } + + slog.Info("Poster closing - draining loops") + + // Closing postQueue signals postLoop to drain pending posts and exit. + // postLoop closes checkQueue on its way out, which signals checkLoop. + close(p.postQueue) + + // Wait for both loops with a soft timeout. If they don't exit within + // 30s (e.g. hung NNTP STAT), cancel loopCtx to force them out. + done := make(chan struct{}) + go func() { + p.loopsWG.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + slog.Warn("Poster Close: loops did not drain within 30s, cancelling") + if p.loopCancel != nil { + p.loopCancel() + } + <-done + } + + if p.loopCancel != nil { + p.loopCancel() + } + slog.Info("Poster closed") }) } @@ -199,12 +329,12 @@ func (p *poster) Post( rootDir string, nzbGen nzb.NZBGenerator, ) error { - return p.PostWithRelativePaths(ctx, files, rootDir, nzbGen, nil) + return p.PostWithCallback(ctx, files, rootDir, nzbGen, nil, "", nil) } -// PostWithRelativePaths posts files with custom display names (relative paths) for subjects -// relativePaths maps absolute file path to the display name to use in the subject -// If relativePaths is nil or a file is not found in the map, the filename is used +// PostWithRelativePaths posts files with custom display names (relative paths) for subjects. +// relativePaths maps absolute file path to the display name to use in the subject. +// If relativePaths is nil or a file is not found in the map, the filename is used. func (p *poster) PostWithRelativePaths( ctx context.Context, files []string, @@ -212,148 +342,123 @@ func (p *poster) PostWithRelativePaths( nzbGen nzb.NZBGenerator, relativePaths map[string]string, ) error { - // Check if poster has been closed + return p.PostWithCallback(ctx, files, rootDir, nzbGen, relativePaths, "", nil) +} + +// PostWithCallback submits files to the long-lived post pipeline and returns +// as soon as all uploads complete. Post-check verification runs asynchronously +// on the shared checkLoop and surfaces permanent failures via onCheckExhausted. +func (p *poster) PostWithCallback( + ctx context.Context, + files []string, + rootDir string, + nzbGen nzb.NZBGenerator, + relativePaths map[string]string, + completedItemID string, + onCheckExhausted CheckExhaustedCallback, +) error { if p.closed.Load() { return ErrPosterClosed } - wg := sync.WaitGroup{} - var failedPosts atomic.Int64 - - // Create a context that can be canceled - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Create error channel to collect errors. Buffered enough that the deferred - // writers (postLoop, checkLoop, deferred-check sender) cannot block each - // other if more than one error fires before the main goroutine drains. - errChan := make(chan error, 4) - - // Create channels for post and check queues - postQueue := make(chan *Post, 100) - checkQueue := make(chan *Post, 100) + if len(files) == 0 { + return nil + } - // Track posts in flight (initial + retries) so we close postQueue only once - // every initial post AND every retry is fully accounted for. Closing earlier - // races with checkLoop's retry sends and panics on closed-channel send. - var postsInFlight sync.WaitGroup + // Lazy-init in case this poster was hand-constructed (tests). + p.ensureLoops() - // Start a goroutine to process posts - go p.postLoop(ctx, postQueue, checkQueue, errChan, nzbGen, &postsInFlight) + var wg sync.WaitGroup + var failedPosts atomic.Int64 - // Start a goroutine to process checks only if post check is enabled - if p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { - go p.checkLoop(ctx, checkQueue, postQueue, errChan, nzbGen, &postsInFlight) - slog.DebugContext(ctx, "Post check enabled - started checkLoop goroutine") - } else { - slog.InfoContext(ctx, "Post check disabled - skipping article verification") - } + // errChan is per-call: postLoop reports fatal upload errors here. checkLoop + // never writes to errChan — check failures route through onCheckExhausted. + errChan := make(chan error, 4) wg.Add(len(files)) + added := 0 for i, file := range files { - // Check if context is canceled before adding more posts select { case <-ctx.Done(): + // Drain wg for files we never enqueued. + for j := added; j < len(files); j++ { + wg.Done() + } return ctx.Err() default: } - // Get the display name (relative path) for this file, or empty string to use filename displayName := "" if relativePaths != nil { displayName = relativePaths[file] } - if err := p.addPost(ctx, file, displayName, i+1, len(files), &wg, &failedPosts, postQueue, nzbGen, &postsInFlight); err != nil { + if err := p.addPost(ctx, file, displayName, i+1, len(files), &wg, &failedPosts, nzbGen, errChan, completedItemID, onCheckExhausted); err != nil { + // addPost on its failure path may or may not have enqueued; if it + // returned early before enqueuing, the wg slot it claimed is still + // outstanding. Drain it here plus any remaining files. + for j := added; j < len(files); j++ { + wg.Done() + } return fmt.Errorf("error adding file %s to posting queue: %w", file, err) } + added++ } - // Close postQueue only when no posts are in-flight (initial + any retries - // queued by checkLoop). This avoids the closed-channel panic on retry sends - // and lets postLoop/checkLoop drain naturally. - go func() { - postsInFlight.Wait() - close(postQueue) - }() - - // Wait for all posts to complete or an error to occur + // Wait for all uploads to complete, or for a fatal error. done := make(chan struct{}) go func() { wg.Wait() close(done) }() - // Collect any deferred check error that arrives after posting completes - var deferredErr *DeferredCheckError - select { case <-ctx.Done(): - cancel() // Cancel the context to stop all operations return ctx.Err() case err := <-errChan: - // Check if this is a non-fatal DeferredCheckError - if errors.As(err, &deferredErr) { - // Don't cancel - this is non-fatal, wait for completion - } else { - cancel() // Cancel the context to stop all operations - return err - } + // Fatal upload error. The wg may not have drained; we leave the post + // goroutines to handle their own cleanup via the per-Post errSink (the + // caller's ctx is the source of truth for cancellation). + return err case <-done: - // All posts completed normally - } - - // If we got a deferred error, wait for done signal too - if deferredErr != nil { - select { - case <-done: - case <-ctx.Done(): - return ctx.Err() - } - } - - // Check for any additional error that may have arrived - select { - case err := <-errChan: - if !errors.As(err, &deferredErr) { - return err - } - default: } if n := failedPosts.Load(); n > 0 { return fmt.Errorf("failed to post %d files", n) } - - // Return deferred error if present (non-fatal - caller should handle) - if deferredErr != nil { - return deferredErr - } - return nil } -// postLoop processes posts from the queue -func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue chan *Post, errChan chan<- error, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) { - // Only close channels that this goroutine writes to - defer close(checkQueue) +// postLoop processes posts from the long-lived poster.postQueue. It reads +// per-call state (errSink, wg, failed, nzbGen) off the Post struct so a +// single shared loop can serve every Post() call without coupling. +func (p *poster) postLoop() { + ctx := p.loopCtx + // Close the checkQueue when this loop exits so checkLoop drains naturally. + if p.checkQueue != nil { + defer close(p.checkQueue) + } numOfConnections := 0 - for _, pr := range p.uploadPool.Stats().Providers { numOfConnections += pr.MaxConnections } - for post := range postQueue { + for post := range p.postQueue { select { case <-ctx.Done(): - errChan <- ctx.Err() + p.failPostUpload(ctx, post, ctx.Err()) + // Drain the rest of the queue, failing each post, so callers' wg + // counters decrement and Post() returns rather than hanging. + for rest := range p.postQueue { + p.failPostUpload(ctx, rest, ctx.Err()) + } return default: // Check if we should pause before processing the post if err := pausable.CheckPause(ctx); err != nil { - errChan <- err - return + p.failPostUpload(ctx, post, err) + continue } // Set post status to Posting @@ -459,8 +564,10 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue postCancel() // Batch add completed articles to NZB generator (reduces lock contention) - for _, art := range completedArticles { - nzbGen.AddArticle(art) + if post.nzbGen != nil { + for _, art := range completedArticles { + post.nzbGen.AddArticle(art) + } } p.jobProgress.FinishProgress(post.progress.GetID()) @@ -476,9 +583,6 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue } post.mu.Unlock() - // Mark this post as done in the queue tracking - postsInFlight.Done() - // Close the underlying file so the descriptor isn't leaked on // failure. Long-running daemons that hit intermittent NNTP // errors otherwise exhaust the process fd ulimit and stall. @@ -488,11 +592,22 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue } } - if !errors.Is(errs, context.Canceled) { - errChan <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs) + // Account the failure: bump the per-call failed counter and + // report the error on the per-call errSink (best-effort, the + // caller may have already returned). Then release the wg slot. + if post.failed != nil && post.retryParent == nil { + post.failed.Add(1) } - - return + if !errors.Is(errs, context.Canceled) && post.errSink != nil { + select { + case post.errSink <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs): + default: + } + } + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + continue } post.mu.Lock() @@ -500,53 +615,95 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue post.postedAt = time.Now() post.mu.Unlock() - if p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { - checkQueue <- post + // Upload succeeded — release the per-call wg slot now (was + // previously delayed until after check). For retries (retryParent + // != nil) the original post already released the slot. + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + if p.checkOn && p.checkQueue != nil { + select { + case p.checkQueue <- post: + case <-ctx.Done(): + if post.file != nil { + _ = post.file.Close() + } + } continue } - // Post complete without check - mark as done in queue tracking - postsInFlight.Done() - - // Close file + // No check configured — close file and we're done. if post.file != nil { if err := post.file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) } } + } + } +} - post.wg.Done() +// failPostUpload is invoked when postLoop cannot upload a post (e.g. ctx +// cancellation drained from the queue before processing). It releases the +// per-call wg slot so Post() returns rather than hanging. +func (p *poster) failPostUpload(ctx context.Context, post *Post, cause error) { + if post == nil { + return + } + if post.file != nil { + _ = post.file.Close() + } + if post.failed != nil && post.retryParent == nil { + post.failed.Add(1) + } + if post.errSink != nil && !errors.Is(cause, context.Canceled) { + select { + case post.errSink <- fmt.Errorf("post %s aborted: %w", post.FilePath, cause): + default: } } + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + _ = ctx // accepted for symmetry with future logging; unused otherwise } -// checkLoop processes posts from the check queue -func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue chan *Post, errChan chan<- error, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) { - numOfConnections := 0 +// checkLoop processes posts from the long-lived poster.checkQueue. It is +// fully decoupled from any individual Post() call: by the time a post arrives +// here, postLoop has already released the per-call wg slot, so checkLoop +// failures cannot block the caller. Permanent verification failures route +// through post.onCheckExhausted (set per-call via Post struct) instead of an +// error channel. +func (p *poster) checkLoop() { + ctx := p.loopCtx - // Use verify pool's providers for connection count + numOfConnections := 0 for _, pr := range p.verifyPool.Stats().Providers { numOfConnections += pr.MaxConnections } - // Collect articles that exhaust immediate retries for deferred checking - var allDeferredArticles []FailedArticleInfo - var deferredMu sync.Mutex - totalArticlesProcessed := 0 - deferredEnabled := p.checkCfg.DeferredCheckDelay.ToDuration() > 0 - for post := range checkQueue { + for post := range p.checkQueue { select { case <-ctx.Done(): - errChan <- ctx.Err() + // Drain remaining posts so file descriptors close cleanly. + if post.file != nil { + _ = post.file.Close() + } + for rest := range p.checkQueue { + if rest.file != nil { + _ = rest.file.Close() + } + } return default: - // Check if we should pause before processing the check if err := pausable.CheckPause(ctx); err != nil { - errChan <- err - return + slog.WarnContext(ctx, "checkLoop paused error", "error", err) + if post.file != nil { + _ = post.file.Close() + } + continue } // Create the progress task immediately so the user sees "checking" status @@ -577,7 +734,9 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue select { case <-ctx.Done(): post.progress.SetWaitDeadline(time.Time{}) - errChan <- ctx.Err() + if post.file != nil { + _ = post.file.Close() + } return case <-time.After(remaining): } @@ -585,72 +744,44 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue } } - // Create a pool with error handling - use all available CPU cores - // Note: We intentionally don't use WithCancelOnError() to prevent cascading failures - // when a single article check fails. Other checks should continue. + // Create a pool with error handling - use all available connections. + // Don't WithCancelOnError(): per-article failures should not abort + // the rest; we collect failures and retry/defer at file granularity. pool := concpool.New().WithContext(ctx).WithMaxGoroutines(numOfConnections).WithFirstError() - articlesChecked := 0 - articleErrors := 0 var failedArticles []*article.Article var mu sync.Mutex - totalArticlesProcessed += len(post.Articles) - - // Submit all articles to the pool for _, art := range post.Articles { pool.Go(func(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } - - // Check for pause before checking article if err := pausable.CheckPause(ctx); err != nil { return err } - if err := p.checkArticle(ctx, art); err != nil { - // Track failed article mu.Lock() failedArticles = append(failedArticles, art) - articleErrors++ mu.Unlock() return err } - - // Update progress atomically (non-blocking) - mu.Lock() - articlesChecked++ - mu.Unlock() - - // Update progress if manager is available post.progress.UpdateProgress(int64(art.Size)) return nil }) } + poolErr := pool.Wait() - // Wait for all workers to complete and collect errors - errors := pool.Wait() - - // If we have failed articles, handle them if len(failedArticles) > 0 { post.mu.Lock() post.Retries++ + retryCount := post.Retries post.mu.Unlock() - // If we haven't exceeded max retries, add only failed articles back to queue - if post.Retries < int(p.checkCfg.MaxRePost) { - // Refresh article headers before re-posting. - // The MessageID is intentionally preserved: the NZB generator keys entries - // by (filename, messageID), so regenerating the ID would append a duplicate - // segment rather than replacing the existing one, corrupting the NZB. - // Re-using the same MessageID is safe because: - // - If the first post was never accepted, the server will accept it again. - // - If it was accepted but propagation is slow, the deferred-check path - // handles that case and we should not be re-posting at all. - // The header validation below guards against 441 "Missing required fields" - // errors that can occur when other headers are missing or empty. + // If we still have retries left, re-post the failed articles. + if retryCount < int(p.checkCfg.MaxRePost) { + // Refresh article headers before re-posting (preserve MessageID; + // the NZB generator keys entries by (filename, messageID)). for _, art := range failedArticles { - // Ensure required NNTP headers are non-empty if art.From == "" { if defaultFrom := p.cfg.PostHeaders.DefaultFrom; defaultFrom != "" { art.From = defaultFrom @@ -666,177 +797,118 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue } } - // Create a new post with only the failed articles + // Build the retry post. retryParent links it back to the + // original so postLoop skips wg/failed bookkeeping (already + // accounted for on the original). failedPost := &Post{ - FilePath: post.FilePath, - Articles: failedArticles, - Status: PostStatusPending, - file: post.file, - filesize: post.filesize, - wg: post.wg, - failed: post.failed, - Retries: post.Retries, - progress: p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (retry)", filepath.Base(post.FilePath)), progress.ProgressTypeUploading, post.filesize), + FilePath: post.FilePath, + Articles: failedArticles, + Status: PostStatusPending, + file: post.file, + filesize: post.filesize, + wg: nil, // retries don't gate the per-call wg + failed: nil, + Retries: retryCount, + progress: p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (retry)", filepath.Base(post.FilePath)), progress.ProgressTypeUploading, post.filesize), + errSink: nil, // Post() has already returned; no caller to surface to + nzbGen: post.nzbGen, + completedItemID: post.completedItemID, + onCheckExhausted: post.onCheckExhausted, + retryParent: post, } - slog.InfoContext(ctx, - "Retrying failed articles", + slog.InfoContext(ctx, "Retrying failed articles", "file", post.FilePath, - "attempt", post.Retries, + "attempt", retryCount, "max_retries", p.checkCfg.MaxRePost, ) - // Track this retry in the queue before sending - postsInFlight.Add(1) - - // Send retry. postQueue is kept open by the postsInFlight - // gate (the Add above runs before this send), so a closed- - // channel panic here is no longer reachable. Only ctx - // cancellation can interrupt the send. - // - // Bookkeeping: addPost added +1 for the original post; the - // Add above added +1 for the retry. Once the retry is - // queued, the original post is no longer "in flight" — the - // retry represents it from here on — so Done the original. select { - case postQueue <- failedPost: - postsInFlight.Done() - continue + case p.postQueue <- failedPost: case <-ctx.Done(): - // Decrement both: the retry that won't be sent and the - // original post we are abandoning. - postsInFlight.Done() - postsInFlight.Done() slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath) + if post.file != nil { + _ = post.file.Close() + } return } + continue } - // Max retries exhausted - check if deferred checking is enabled - if deferredEnabled { - // Collect failed articles for deferred verification instead of failing - deferredMu.Lock() - for _, art := range failedArticles { - allDeferredArticles = append(allDeferredArticles, FailedArticleInfo{ - MessageID: art.MessageID, - Groups: art.Groups, - }) + // Max retries exhausted. + p.jobProgress.FinishProgress(post.progress.GetID()) + if post.file != nil { + if err := post.file.Close(); err != nil { + slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", err, "file", post.FilePath) } - deferredMu.Unlock() + } - slog.InfoContext(ctx, - "Articles deferred for later verification", + // Build the failed-article info list for downstream handling. + infos := make([]FailedArticleInfo, 0, len(failedArticles)) + for _, art := range failedArticles { + infos = append(infos, FailedArticleInfo{ + MessageID: art.MessageID, + Groups: art.Groups, + }) + } + + if deferredEnabled && post.onCheckExhausted != nil { + slog.InfoContext(ctx, "Articles deferred for later verification", "file", post.FilePath, - "deferred_count", len(failedArticles), + "deferred_count", len(infos), "retries_exhausted", p.checkCfg.MaxRePost, ) - - // Mark as verified (optimistically) - deferred check will update later post.mu.Lock() - post.Status = PostStatusVerified + post.Status = PostStatusVerified // optimistic; deferred worker updates later post.mu.Unlock() - - postsInFlight.Done() - p.jobProgress.FinishProgress(post.progress.GetID()) - - if post.file != nil { - if err := post.file.Close(); err != nil { - slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) - } - } - - post.wg.Done() - continue - } - - // Deferred checking not enabled - fail as before - post.mu.Lock() - post.Status = PostStatusFailed - post.Error = fmt.Errorf("failed to verify articles after %d retries", p.checkCfg.MaxRePost) - post.mu.Unlock() - - // Mark this post as done in queue tracking - it failed permanently - postsInFlight.Done() - - if post.failed != nil { - post.failed.Add(1) - } - - if post.file != nil { - if cerr := post.file.Close(); cerr != nil { - slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) + if err := post.onCheckExhausted(ctx, infos, len(post.Articles), post.completedItemID); err != nil { + slog.ErrorContext(ctx, "onCheckExhausted callback failed", + "file", post.FilePath, "error", err) } + } else { + // No deferred path or no callback: log and drop. Post() has + // already returned to the caller; we cannot retroactively + // turn this into a synchronous error. + post.mu.Lock() + post.Status = PostStatusFailed + post.Error = fmt.Errorf("failed to verify articles after %d retries", p.checkCfg.MaxRePost) + post.mu.Unlock() + slog.WarnContext(ctx, "Articles failed verification (no deferred handler)", + "file", post.FilePath, + "failed_count", len(infos), + "retries_exhausted", p.checkCfg.MaxRePost, + ) } - - errChan <- fmt.Errorf("failed to verify file %s after %d retries", post.FilePath, p.checkCfg.MaxRePost) - return - } else if errors != nil { - // This is a safety check - if we have errors but no failed articles, something went wrong - post.mu.Lock() - post.Status = PostStatusFailed - post.Error = fmt.Errorf("verification failed but no articles were marked as failed: %v", errors) - post.mu.Unlock() - - // Mark this post as done in queue tracking - it failed with unexpected error - postsInFlight.Done() - - if post.failed != nil { - post.failed.Add(1) - } - + continue + } else if poolErr != nil { + // Pool error with no per-article failures collected — context + // cancellation or a checkArticle bug. Log and move on. + slog.WarnContext(ctx, "checkLoop pool error with no failed articles", + "file", post.FilePath, "error", poolErr) if post.file != nil { - if cerr := post.file.Close(); cerr != nil { - slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) - } + _ = post.file.Close() } - - errChan <- fmt.Errorf("unexpected error verifying file %s: %v", post.FilePath, errors) - return + continue } - // Mark as verified + // All articles verified. post.mu.Lock() post.Status = PostStatusVerified post.mu.Unlock() - - // Mark this post as done in queue tracking - verification successful - postsInFlight.Done() - p.jobProgress.FinishProgress(post.progress.GetID()) - - // Close file if post.file != nil { if err := post.file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) } } - - post.wg.Done() - } - } - - // After processing all posts, if there are deferred articles, send a DeferredCheckError - // This is a non-fatal error that signals the caller to store these for later verification. - // Guard the send with ctx so a leaked checkLoop cannot block forever if the main - // goroutine has already returned. - if len(allDeferredArticles) > 0 { - slog.InfoContext(ctx, "Sending deferred check error", - "deferred_articles", len(allDeferredArticles), - "total_articles", totalArticlesProcessed) - select { - case errChan <- &DeferredCheckError{ - FailedArticles: allDeferredArticles, - TotalArticles: totalArticlesProcessed, - }: - case <-ctx.Done(): } } } -// addPost adds a file to the posting queue -// displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4") -// If displayName is empty, the filename is used -func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error { +// addPost adds a file to the posting queue. +// displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4"). +// If displayName is empty, the filename is used. +func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, nzbGen nzb.NZBGenerator, errSink chan<- error, completedItemID string, onCheckExhausted CheckExhaustedCallback) error { file, err := os.Open(filePath) if err != nil { return fmt.Errorf("error opening file: %w", err) @@ -1019,28 +1091,32 @@ func (p *poster) addPost(ctx context.Context, filePath string, displayName strin } post := &Post{ - FilePath: filePath, - Articles: articles, - Status: PostStatusPending, - file: file, - filesize: fileInfo.Size(), - wg: wg, - failed: failedPosts, - progress: p.jobProgress.AddProgress(uuid.New(), filepath.Base(filePath), progress.ProgressTypeUploading, fileInfo.Size()), + FilePath: filePath, + Articles: articles, + Status: PostStatusPending, + file: file, + filesize: fileInfo.Size(), + wg: wg, + failed: failedPosts, + progress: p.jobProgress.AddProgress(uuid.New(), filepath.Base(filePath), progress.ProgressTypeUploading, fileInfo.Size()), + errSink: errSink, + nzbGen: nzbGen, + completedItemID: completedItemID, + onCheckExhausted: onCheckExhausted, } - // Track this post as in-flight until it's sent to the queue - postsInFlight.Add(1) + // Send to the poster-lifetime postQueue. If the poster has been Closed + // concurrently, the queue will be closed and the send will panic — guard + // with a closed-check. + if p.closed.Load() { + _ = file.Close() + return ErrPosterClosed + } - // Use select to safely send to channel and handle context cancellation select { - case postQueue <- post: - // Successfully sent to queue + case p.postQueue <- post: return nil case <-ctx.Done(): - // Context canceled, decrement counter since we didn't send - postsInFlight.Done() - // Close the file and return error if err := file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file after context cancellation", "error", err, "file", filePath) } diff --git a/internal/poster/poster_test.go b/internal/poster/poster_test.go index e184388..c9441ee 100644 --- a/internal/poster/poster_test.go +++ b/internal/poster/poster_test.go @@ -371,12 +371,17 @@ func TestPost(t *testing.T) { jobProgress: mockJobProgress, } + // After Phase 2, verification failures are async and never propagate + // to Post() as a synchronous error — the upload itself succeeded. + // Permanent verification failures route through onCheckExhausted (when + // deferred mode is enabled) or are logged-only (otherwise). This test + // keeps deferred disabled so the check failure is logged and dropped; + // Post() returns nil because the file did upload. err := p.Post(ctx, []string{testFile}, "", nzbGen) + assert.NoError(t, err) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to verify file") - - // Close after test completes + // Close after test completes — also drains the checkLoop so the + // post-check failure is processed before we return. p.Close() // Finish controller after all operations complete @@ -822,23 +827,23 @@ func TestAddPost(t *testing.T) { p := &poster{ cfg: cfg, jobProgress: mockJobProgress, + postQueue: make(chan *Post, 10), } var wg sync.WaitGroup - var postsInFlight sync.WaitGroup var failedPosts atomic.Int64 - postQueue := make(chan *Post, 10) + errSink := make(chan error, 4) nzbGen := mocks.NewMockNZBGenerator(ctrl) wg.Add(1) ctx := context.Background() - err := p.addPost(ctx, testFile, "", 1, 1, &wg, &failedPosts, postQueue, nzbGen, &postsInFlight) + err := p.addPost(ctx, testFile, "", 1, 1, &wg, &failedPosts, nzbGen, errSink, "", nil) assert.NoError(t, err) // Check that a post was added to the queue select { - case post := <-postQueue: + case post := <-p.postQueue: assert.Equal(t, testFile, post.FilePath) assert.Equal(t, PostStatusPending, post.Status) assert.Greater(t, len(post.Articles), 1) // Should have multiple articles due to small segment size @@ -863,13 +868,12 @@ func TestAddPost(t *testing.T) { defer ctrl.Finish() var wg sync.WaitGroup - var postsInFlight sync.WaitGroup var failedPosts atomic.Int64 - postQueue := make(chan *Post, 10) + errSink := make(chan error, 4) nzbGen := mocks.NewMockNZBGenerator(ctrl) ctx := context.Background() - err := p.addPost(ctx, "nonexistent.txt", "", 1, 1, &wg, &failedPosts, postQueue, nzbGen, &postsInFlight) + err := p.addPost(ctx, "nonexistent.txt", "", 1, 1, &wg, &failedPosts, nzbGen, errSink, "", nil) assert.Error(t, err) assert.Contains(t, err.Error(), "error opening file") @@ -1035,6 +1039,11 @@ func TestPostIntegration(t *testing.T) { assert.NoError(t, err) + // Phase 2: Post() returns once uploads complete; verification runs + // asynchronously. Close() drains the long-lived loops so we can read + // stats deterministically. + p.Close() + // Verify stats were updated stats := p.Stats() assert.Equal(t, int64(1), stats.ArticlesPosted) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 027881f..0a66c77 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -468,6 +468,41 @@ func (p *Processor) processFile(ctx context.Context, msg *goqite.Message, job *q } defer jobPostie.Close() + // Wire the asynchronous post-check callback. The poster's long-lived + // checkLoop calls this when articles exhaust MaxRePost retries; we persist + // them to the deferred-check queue so PostCheckRetryWorker can recheck + // them later. The callback runs after Post() has already returned, so the + // processor's main flow no longer blocks on verification. + completedItemID := string(msg.ID) + deferredDelay := p.config.GetPostCheckConfig().DeferredCheckDelay.ToDuration() + jobPostie.SetVerificationCallback(completedItemID, func(cbCtx context.Context, articles []poster.FailedArticleInfo, totalArticles int, itemID string) error { + if len(articles) == 0 { + return nil + } + nextRetryAt := time.Now().Add(deferredDelay) + pendingChecks := make([]queue.PendingArticleCheck, 0, len(articles)) + for _, art := range articles { + groupsJSON, _ := json.Marshal(art.Groups) + pendingChecks = append(pendingChecks, queue.PendingArticleCheck{ + MessageID: art.MessageID, + Groups: string(groupsJSON), + NextRetryAt: nextRetryAt, + }) + } + if addErr := p.queue.AddPendingArticleChecks(cbCtx, itemID, pendingChecks); addErr != nil { + slog.ErrorContext(cbCtx, "Failed to store deferred article checks", + "error", addErr, "completedItemID", itemID, "count", len(pendingChecks)) + return addErr + } + if statusErr := p.queue.UpdateCompletedItemVerificationStatus(cbCtx, itemID, "pending_verification"); statusErr != nil { + slog.ErrorContext(cbCtx, "Failed to update verification status", + "error", statusErr, "completedItemID", itemID) + } + slog.InfoContext(cbCtx, "Stored deferred article checks asynchronously", + "completedItemID", itemID, "deferred_articles", len(pendingChecks), "total_articles", totalArticles) + return nil + }) + // Determine the input folder for maintaining folder structure var inputFolder string switch { diff --git a/pkg/postie/postfolder_test.go b/pkg/postie/postfolder_test.go index 72b2912..b6e80e7 100644 --- a/pkg/postie/postfolder_test.go +++ b/pkg/postie/postfolder_test.go @@ -22,6 +22,10 @@ func (m *mockPoster) Post(_ context.Context, files []string, _ string, nzbGen nz return nil } +func (m *mockPoster) PostWithCallback(_ context.Context, files []string, _ string, nzbGen nzb.NZBGenerator, _ map[string]string, _ string, _ poster.CheckExhaustedCallback) error { + return m.PostWithRelativePaths(context.Background(), files, "", nzbGen, nil) +} + func (m *mockPoster) PostWithRelativePaths(_ context.Context, files []string, _ string, nzbGen nzb.NZBGenerator, _ map[string]string) error { addFakeArticles(nzbGen, files) return nil diff --git a/pkg/postie/postie.go b/pkg/postie/postie.go index 5f48069..1731a89 100644 --- a/pkg/postie/postie.go +++ b/pkg/postie/postie.go @@ -32,6 +32,26 @@ type Postie struct { maintainOriginalExtension bool jobProgress progress.JobProgress queue QueueInterface + + // Verification callback set by the caller (e.g. the processor) before + // invoking Post(). Forwarded to poster.PostWithCallback so the long-lived + // checkLoop can persist permanent verification failures asynchronously. + verifyItemID string + verifyCallback poster.CheckExhaustedCallback +} + +// SetVerificationCallback configures the asynchronous post-check exhaustion +// callback for this Postie instance. completedItemID identifies the queue +// item; cb is invoked once per file whose articles cannot be verified after +// MaxRePost retries. Either parameter may be empty/nil — in that case the +// poster logs the failure and drops it (same behavior as the legacy CLI). +// +// Postie is per-job (constructed and Closed inside processor.processFile), +// so this setter is normally called once before any Post() call. Concurrent +// Post() calls on the same Postie are not currently supported. +func (p *Postie) SetVerificationCallback(completedItemID string, cb poster.CheckExhaustedCallback) { + p.verifyItemID = completedItemID + p.verifyCallback = cb } // QueueInterface defines the queue methods needed by Postie @@ -272,7 +292,7 @@ func (p *Postie) postInParallel( return nil } - if err := p.poster.Post(ctx, createdPar2Paths, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, createdPar2Paths, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { if !errors.Is(err, context.Canceled) { slog.ErrorContext(ctx, fmt.Sprintf("Error during upload of par2 files: %s. Upload will continue without par2.", createdPar2Paths), "error", err) } @@ -286,7 +306,7 @@ func (p *Postie) postInParallel( var deferredErr *poster.DeferredCheckError errg.Go(func() error { - if err := p.poster.Post(ctx, []string{f.Path}, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, []string{f.Path}, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error var de *poster.DeferredCheckError if errors.As(err, &de) { @@ -385,7 +405,7 @@ func (p *Postie) post( } var deferredErr *poster.DeferredCheckError - if err := p.poster.Post(ctx, filesPath, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, filesPath, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error if errors.As(err, &deferredErr) { slog.InfoContext(ctx, "Some articles deferred for later verification", "file", f.Path, "deferred", len(deferredErr.FailedArticles)) @@ -517,7 +537,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root var deferredErr *poster.DeferredCheckError // Post all files (including PAR2) together with relative paths for subjects - if err := p.poster.PostWithRelativePaths(ctx, allFilePaths, rootDir, nzbGen, relativePaths); err != nil { + if err := p.poster.PostWithCallback(ctx, allFilePaths, rootDir, nzbGen, relativePaths, p.verifyItemID, p.verifyCallback); err != nil { if errors.As(err, &deferredErr) { slog.InfoContext(ctx, "Some articles deferred for later verification", "folder", folderName, "deferred", len(deferredErr.FailedArticles)) } else { @@ -579,7 +599,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root } par2RelPaths := buildPar2RelativePaths(files, createdPar2Paths) - if err := p.poster.PostWithRelativePaths(ctx, createdPar2Paths, rootDir, nzbGen, par2RelPaths); err != nil { + if err := p.poster.PostWithCallback(ctx, createdPar2Paths, rootDir, nzbGen, par2RelPaths, p.verifyItemID, p.verifyCallback); err != nil { if !errors.Is(err, context.Canceled) { slog.ErrorContext(ctx, "Error during upload of par2 files. Upload will continue without par2.", "error", err) } @@ -590,7 +610,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root // Post main files with relative paths for subjects errg.Go(func() error { - if err := p.poster.PostWithRelativePaths(ctx, allFilePaths, rootDir, nzbGen, relativePaths); err != nil { + if err := p.poster.PostWithCallback(ctx, allFilePaths, rootDir, nzbGen, relativePaths, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error var de *poster.DeferredCheckError if errors.As(err, &de) {