diff --git a/internal/poster/poster.go b/internal/poster/poster.go index 0ce2a5f..32d37b6 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -50,6 +50,12 @@ type Poster interface { // PostWithRelativePaths posts files with custom display names (relative paths) for subjects // relativePaths maps absolute file path to the display name to use in the subject PostWithRelativePaths(ctx context.Context, files []string, rootDir string, nzbGen nzb.NZBGenerator, relativePaths map[string]string) error + // PostWithCallback is the full-featured posting entry point. It returns as + // soon as all uploads complete; the post-check propagation delay and STAT + // verification run asynchronously on the poster's long-lived checkLoop. + // completedItemID + onCheckExhausted let the caller persist failed-to-verify + // articles to a deferred-check queue. Both may be empty/nil. + PostWithCallback(ctx context.Context, files []string, rootDir string, nzbGen nzb.NZBGenerator, relativePaths map[string]string, completedItemID string, onCheckExhausted CheckExhaustedCallback) error // Stats returns posting statistics Stats() Stats // Close closes the poster @@ -68,6 +74,13 @@ const ( PostStatusPosting ) +// CheckExhaustedCallback is invoked by the long-lived checkLoop when an +// uploaded file's articles cannot be verified after exhausting MaxRePost. +// It is called on a background goroutine, after Post() has already returned +// to the caller. The callback is responsible for persisting failed articles +// for deferred verification (e.g. via queue.AddPendingArticleChecks). +type CheckExhaustedCallback func(ctx context.Context, articles []FailedArticleInfo, totalArticles int, completedItemID string) error + // Post represents a file to be posted type Post struct { FilePath string @@ -78,9 +91,29 @@ type Post struct { file *os.File mu sync.Mutex filesize int64 - wg *sync.WaitGroup + wg *sync.WaitGroup // per-call, drained when upload (not check) completes failed *atomic.Int64 progress progress.Progress + // postedAt records when this file's upload finished. Used by checkLoop to + // apply the propagation delay per-file with credit for time already elapsed. + postedAt time.Time + // errSink is the per-call channel that postLoop uses to surface fatal + // upload errors back to the caller of Post(). Set by addPost. Never used + // by checkLoop — check failures route through onCheckExhausted instead. + errSink chan<- error + // nzbGen is the per-call NZB generator. Set by addPost. + nzbGen nzb.NZBGenerator + // completedItemID identifies this post in the persistent queue. Used by + // checkLoop to pass to onCheckExhausted on permanent verification failure. + // Empty when the caller doesn't have a queue ID (e.g. CLI usage). + completedItemID string + // onCheckExhausted is invoked by checkLoop when articles exhaust MaxRePost. + // May be nil — in that case checkLoop logs and drops the failure. + onCheckExhausted CheckExhaustedCallback + // retryParent links a retry post back to the original post's wg/failed/etc. + // Retries do not increment the per-call wg (which counts only initial + // uploads), so they need a way to find the original's metadata. + retryParent *Post } // FailedArticleInfo contains information about an article that failed verification @@ -129,6 +162,17 @@ type poster struct { jobProgress progress.JobProgress closed atomic.Bool closeOnce sync.Once + + // Long-lived loop infrastructure. postLoop and checkLoop are spawned once + // in New() and shared by every Post() call, so the propagation check no + // longer blocks Post()'s critical path. + postQueue chan *Post + checkQueue chan *Post // nil when post-check is disabled + loopCtx context.Context + loopCancel context.CancelFunc + loopsWG sync.WaitGroup + checkOn bool // captured at construction; whether checkLoop is running + loopsOnce sync.Once } // New creates a new poster using dependency injection for the connection pool manager @@ -154,6 +198,12 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j } postCheckCfg := cfg.GetPostCheckConfig() + checkOn := postCheckCfg.Enabled != nil && *postCheckCfg.Enabled + + // Loop ctx is detached from the caller's ctx so the loops outlive any + // individual Post() call. They are torn down in Close(). + loopCtx, loopCancel := context.WithCancel(context.Background()) + p := &poster{ cfg: cfg.GetPostingConfig(), checkCfg: postCheckCfg, @@ -161,6 +211,13 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j verifyPool: verifyPool, stats: stats, jobProgress: jobProgress, + postQueue: make(chan *Post, 100), + loopCtx: loopCtx, + loopCancel: loopCancel, + checkOn: checkOn, + } + if checkOn { + p.checkQueue = make(chan *Post, 100) } // Log post check configuration for debugging @@ -178,13 +235,90 @@ func New(ctx context.Context, cfg config.Config, poolManager pool.PoolManager, j p.throttle = NewThrottle(throttleRate, time.Second) } + p.ensureLoops() + if checkOn { + slog.DebugContext(ctx, "Post check enabled - started checkLoop goroutine") + } else { + slog.InfoContext(ctx, "Post check disabled - skipping article verification") + } + return p, nil } +// ensureLoops lazily initializes per-poster channels/ctx and spawns postLoop +// (and checkLoop, if enabled) exactly once. Called by New() and again by the +// post entry points so hand-constructed posters (used in unit tests) work +// without explicit setup. +func (p *poster) ensureLoops() { + p.loopsOnce.Do(func() { + if p.postQueue == nil { + p.postQueue = make(chan *Post, 100) + } + if p.loopCtx == nil { + p.loopCtx, p.loopCancel = context.WithCancel(context.Background()) + } + // Honor an explicit checkOn even if checkCfg.Enabled is nil — supports + // tests that pre-set the field. Otherwise derive from config. + if !p.checkOn && p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { + p.checkOn = true + } + if p.checkOn && p.checkQueue == nil { + p.checkQueue = make(chan *Post, 100) + } + + p.loopsWG.Add(1) + go func() { + defer p.loopsWG.Done() + p.postLoop() + }() + if p.checkOn { + p.loopsWG.Add(1) + go func() { + defer p.loopsWG.Done() + p.checkLoop() + }() + } + }) +} + func (p *poster) Close() { p.closeOnce.Do(func() { p.closed.Store(true) - slog.Info("Poster closed - no new Post() calls will be accepted") + + // If loops were never spawned (hand-constructed poster never used), + // nothing to drain. + if p.postQueue == nil { + return + } + + slog.Info("Poster closing - draining loops") + + // Closing postQueue signals postLoop to drain pending posts and exit. + // postLoop closes checkQueue on its way out, which signals checkLoop. + close(p.postQueue) + + // Wait for both loops with a soft timeout. If they don't exit within + // 30s (e.g. hung NNTP STAT), cancel loopCtx to force them out. + done := make(chan struct{}) + go func() { + p.loopsWG.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + slog.Warn("Poster Close: loops did not drain within 30s, cancelling") + if p.loopCancel != nil { + p.loopCancel() + } + <-done + } + + if p.loopCancel != nil { + p.loopCancel() + } + slog.Info("Poster closed") }) } @@ -195,12 +329,12 @@ func (p *poster) Post( rootDir string, nzbGen nzb.NZBGenerator, ) error { - return p.PostWithRelativePaths(ctx, files, rootDir, nzbGen, nil) + return p.PostWithCallback(ctx, files, rootDir, nzbGen, nil, "", nil) } -// PostWithRelativePaths posts files with custom display names (relative paths) for subjects -// relativePaths maps absolute file path to the display name to use in the subject -// If relativePaths is nil or a file is not found in the map, the filename is used +// PostWithRelativePaths posts files with custom display names (relative paths) for subjects. +// relativePaths maps absolute file path to the display name to use in the subject. +// If relativePaths is nil or a file is not found in the map, the filename is used. func (p *poster) PostWithRelativePaths( ctx context.Context, files []string, @@ -208,148 +342,123 @@ func (p *poster) PostWithRelativePaths( nzbGen nzb.NZBGenerator, relativePaths map[string]string, ) error { - // Check if poster has been closed + return p.PostWithCallback(ctx, files, rootDir, nzbGen, relativePaths, "", nil) +} + +// PostWithCallback submits files to the long-lived post pipeline and returns +// as soon as all uploads complete. Post-check verification runs asynchronously +// on the shared checkLoop and surfaces permanent failures via onCheckExhausted. +func (p *poster) PostWithCallback( + ctx context.Context, + files []string, + rootDir string, + nzbGen nzb.NZBGenerator, + relativePaths map[string]string, + completedItemID string, + onCheckExhausted CheckExhaustedCallback, +) error { if p.closed.Load() { return ErrPosterClosed } - wg := sync.WaitGroup{} - var failedPosts atomic.Int64 - - // Create a context that can be canceled - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Create error channel to collect errors. Buffered enough that the deferred - // writers (postLoop, checkLoop, deferred-check sender) cannot block each - // other if more than one error fires before the main goroutine drains. - errChan := make(chan error, 4) - - // Create channels for post and check queues - postQueue := make(chan *Post, 100) - checkQueue := make(chan *Post, 100) + if len(files) == 0 { + return nil + } - // Track posts in flight (initial + retries) so we close postQueue only once - // every initial post AND every retry is fully accounted for. Closing earlier - // races with checkLoop's retry sends and panics on closed-channel send. - var postsInFlight sync.WaitGroup + // Lazy-init in case this poster was hand-constructed (tests). + p.ensureLoops() - // Start a goroutine to process posts - go p.postLoop(ctx, postQueue, checkQueue, errChan, nzbGen, &postsInFlight) + var wg sync.WaitGroup + var failedPosts atomic.Int64 - // Start a goroutine to process checks only if post check is enabled - if p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { - go p.checkLoop(ctx, checkQueue, postQueue, errChan, nzbGen, &postsInFlight) - slog.DebugContext(ctx, "Post check enabled - started checkLoop goroutine") - } else { - slog.InfoContext(ctx, "Post check disabled - skipping article verification") - } + // errChan is per-call: postLoop reports fatal upload errors here. checkLoop + // never writes to errChan — check failures route through onCheckExhausted. + errChan := make(chan error, 4) wg.Add(len(files)) + added := 0 for i, file := range files { - // Check if context is canceled before adding more posts select { case <-ctx.Done(): + // Drain wg for files we never enqueued. + for j := added; j < len(files); j++ { + wg.Done() + } return ctx.Err() default: } - // Get the display name (relative path) for this file, or empty string to use filename displayName := "" if relativePaths != nil { displayName = relativePaths[file] } - if err := p.addPost(ctx, file, displayName, i+1, len(files), &wg, &failedPosts, postQueue, nzbGen, &postsInFlight); err != nil { + if err := p.addPost(ctx, file, displayName, i+1, len(files), &wg, &failedPosts, nzbGen, errChan, completedItemID, onCheckExhausted); err != nil { + // addPost on its failure path may or may not have enqueued; if it + // returned early before enqueuing, the wg slot it claimed is still + // outstanding. Drain it here plus any remaining files. + for j := added; j < len(files); j++ { + wg.Done() + } return fmt.Errorf("error adding file %s to posting queue: %w", file, err) } + added++ } - // Close postQueue only when no posts are in-flight (initial + any retries - // queued by checkLoop). This avoids the closed-channel panic on retry sends - // and lets postLoop/checkLoop drain naturally. - go func() { - postsInFlight.Wait() - close(postQueue) - }() - - // Wait for all posts to complete or an error to occur + // Wait for all uploads to complete, or for a fatal error. done := make(chan struct{}) go func() { wg.Wait() close(done) }() - // Collect any deferred check error that arrives after posting completes - var deferredErr *DeferredCheckError - select { case <-ctx.Done(): - cancel() // Cancel the context to stop all operations return ctx.Err() case err := <-errChan: - // Check if this is a non-fatal DeferredCheckError - if errors.As(err, &deferredErr) { - // Don't cancel - this is non-fatal, wait for completion - } else { - cancel() // Cancel the context to stop all operations - return err - } + // Fatal upload error. The wg may not have drained; we leave the post + // goroutines to handle their own cleanup via the per-Post errSink (the + // caller's ctx is the source of truth for cancellation). + return err case <-done: - // All posts completed normally - } - - // If we got a deferred error, wait for done signal too - if deferredErr != nil { - select { - case <-done: - case <-ctx.Done(): - return ctx.Err() - } - } - - // Check for any additional error that may have arrived - select { - case err := <-errChan: - if !errors.As(err, &deferredErr) { - return err - } - default: } if n := failedPosts.Load(); n > 0 { return fmt.Errorf("failed to post %d files", n) } - - // Return deferred error if present (non-fatal - caller should handle) - if deferredErr != nil { - return deferredErr - } - return nil } -// postLoop processes posts from the queue -func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue chan *Post, errChan chan<- error, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) { - // Only close channels that this goroutine writes to - defer close(checkQueue) +// postLoop processes posts from the long-lived poster.postQueue. It reads +// per-call state (errSink, wg, failed, nzbGen) off the Post struct so a +// single shared loop can serve every Post() call without coupling. +func (p *poster) postLoop() { + ctx := p.loopCtx + // Close the checkQueue when this loop exits so checkLoop drains naturally. + if p.checkQueue != nil { + defer close(p.checkQueue) + } numOfConnections := 0 - for _, pr := range p.uploadPool.Stats().Providers { numOfConnections += pr.MaxConnections } - for post := range postQueue { + for post := range p.postQueue { select { case <-ctx.Done(): - errChan <- ctx.Err() + p.failPostUpload(ctx, post, ctx.Err()) + // Drain the rest of the queue, failing each post, so callers' wg + // counters decrement and Post() returns rather than hanging. + for rest := range p.postQueue { + p.failPostUpload(ctx, rest, ctx.Err()) + } return default: // Check if we should pause before processing the post if err := pausable.CheckPause(ctx); err != nil { - errChan <- err - return + p.failPostUpload(ctx, post, err) + continue } // Set post status to Posting @@ -455,8 +564,10 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue postCancel() // Batch add completed articles to NZB generator (reduces lock contention) - for _, art := range completedArticles { - nzbGen.AddArticle(art) + if post.nzbGen != nil { + for _, art := range completedArticles { + post.nzbGen.AddArticle(art) + } } p.jobProgress.FinishProgress(post.progress.GetID()) @@ -472,9 +583,6 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue } post.mu.Unlock() - // Mark this post as done in the queue tracking - postsInFlight.Done() - // Close the underlying file so the descriptor isn't leaked on // failure. Long-running daemons that hit intermittent NNTP // errors otherwise exhaust the process fd ulimit and stall. @@ -484,156 +592,196 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue } } - if !errors.Is(errs, context.Canceled) { - errChan <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs) + // Account the failure: bump the per-call failed counter and + // report the error on the per-call errSink (best-effort, the + // caller may have already returned). Then release the wg slot. + if post.failed != nil && post.retryParent == nil { + post.failed.Add(1) } - - return + if !errors.Is(errs, context.Canceled) && post.errSink != nil { + select { + case post.errSink <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs): + default: + } + } + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + continue } post.mu.Lock() post.Status = PostStatusPosted + post.postedAt = time.Now() post.mu.Unlock() - if p.checkCfg.Enabled != nil && *p.checkCfg.Enabled { - checkQueue <- post + // Upload succeeded — release the per-call wg slot now (was + // previously delayed until after check). For retries (retryParent + // != nil) the original post already released the slot. + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + if p.checkOn && p.checkQueue != nil { + select { + case p.checkQueue <- post: + case <-ctx.Done(): + if post.file != nil { + _ = post.file.Close() + } + } continue } - // Post complete without check - mark as done in queue tracking - postsInFlight.Done() - - // Close file + // No check configured — close file and we're done. if post.file != nil { if err := post.file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) } } + } + } +} - post.wg.Done() +// failPostUpload is invoked when postLoop cannot upload a post (e.g. ctx +// cancellation drained from the queue before processing). It releases the +// per-call wg slot so Post() returns rather than hanging. +func (p *poster) failPostUpload(ctx context.Context, post *Post, cause error) { + if post == nil { + return + } + if post.file != nil { + _ = post.file.Close() + } + if post.failed != nil && post.retryParent == nil { + post.failed.Add(1) + } + if post.errSink != nil && !errors.Is(cause, context.Canceled) { + select { + case post.errSink <- fmt.Errorf("post %s aborted: %w", post.FilePath, cause): + default: } } + if post.retryParent == nil && post.wg != nil { + post.wg.Done() + } + _ = ctx // accepted for symmetry with future logging; unused otherwise } -// checkLoop processes posts from the check queue -func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue chan *Post, errChan chan<- error, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) { - numOfConnections := 0 +// checkLoop processes posts from the long-lived poster.checkQueue. It is +// fully decoupled from any individual Post() call: by the time a post arrives +// here, postLoop has already released the per-call wg slot, so checkLoop +// failures cannot block the caller. Permanent verification failures route +// through post.onCheckExhausted (set per-call via Post struct) instead of an +// error channel. +func (p *poster) checkLoop() { + ctx := p.loopCtx - // Use verify pool's providers for connection count + numOfConnections := 0 for _, pr := range p.verifyPool.Stats().Providers { numOfConnections += pr.MaxConnections } - // Collect articles that exhaust immediate retries for deferred checking - var allDeferredArticles []FailedArticleInfo - var deferredMu sync.Mutex - totalArticlesProcessed := 0 - deferredEnabled := p.checkCfg.DeferredCheckDelay.ToDuration() > 0 - firstPost := true - - for post := range checkQueue { + for post := range p.checkQueue { select { case <-ctx.Done(): - errChan <- ctx.Err() + // Drain remaining posts so file descriptors close cleanly. + if post.file != nil { + _ = post.file.Close() + } + for rest := range p.checkQueue { + if rest.file != nil { + _ = rest.file.Close() + } + } return default: - // Check if we should pause before processing the check if err := pausable.CheckPause(ctx); err != nil { - errChan <- err - return + slog.WarnContext(ctx, "checkLoop paused error", "error", err) + if post.file != nil { + _ = post.file.Close() + } + continue } // Create the progress task immediately so the user sees "checking" status // during the RetryDelay wait, preventing the queue item from appearing stuck. post.progress = p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (check)", filepath.Base(post.FilePath)), progress.ProgressTypeChecking, post.filesize) - // Wait for articles to propagate to the verify server before checking. - // Only needed for the first file: subsequent files are posted after enough - // time has already elapsed during the first file's posting and checking. - if firstPost { - firstPost = false - if delay := p.checkCfg.RetryDelay.ToDuration(); delay > 0 { - post.progress.SetWaitDeadline(time.Now().Add(delay)) + // Wait for this file's articles to propagate to the verify server before + // checking. We sleep only the remainder of RetryDelay that hasn't already + // elapsed since this file finished posting — files that sat in the queue + // long enough incur no extra wait, while files checked right after upload + // still get the full propagation grace period. + if delay := p.checkCfg.RetryDelay.ToDuration(); delay > 0 { + post.mu.Lock() + postedAt := post.postedAt + post.mu.Unlock() + + var remaining time.Duration + if postedAt.IsZero() { + // Defensive: postedAt should always be set by postLoop, but if + // it isn't (e.g. unusual code paths) wait the full delay. + remaining = delay + } else { + remaining = delay - time.Since(postedAt) + } + + if remaining > 0 { + post.progress.SetWaitDeadline(time.Now().Add(remaining)) select { case <-ctx.Done(): post.progress.SetWaitDeadline(time.Time{}) - errChan <- ctx.Err() + if post.file != nil { + _ = post.file.Close() + } return - case <-time.After(delay): + case <-time.After(remaining): } post.progress.SetWaitDeadline(time.Time{}) } } - // Create a pool with error handling - use all available CPU cores - // Note: We intentionally don't use WithCancelOnError() to prevent cascading failures - // when a single article check fails. Other checks should continue. + // Create a pool with error handling - use all available connections. + // Don't WithCancelOnError(): per-article failures should not abort + // the rest; we collect failures and retry/defer at file granularity. pool := concpool.New().WithContext(ctx).WithMaxGoroutines(numOfConnections).WithFirstError() - articlesChecked := 0 - articleErrors := 0 var failedArticles []*article.Article var mu sync.Mutex - totalArticlesProcessed += len(post.Articles) - - // Submit all articles to the pool for _, art := range post.Articles { pool.Go(func(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } - - // Check for pause before checking article if err := pausable.CheckPause(ctx); err != nil { return err } - if err := p.checkArticle(ctx, art); err != nil { - // Track failed article mu.Lock() failedArticles = append(failedArticles, art) - articleErrors++ mu.Unlock() return err } - - // Update progress atomically (non-blocking) - mu.Lock() - articlesChecked++ - mu.Unlock() - - // Update progress if manager is available post.progress.UpdateProgress(int64(art.Size)) return nil }) } + poolErr := pool.Wait() - // Wait for all workers to complete and collect errors - errors := pool.Wait() - - // If we have failed articles, handle them if len(failedArticles) > 0 { post.mu.Lock() post.Retries++ + retryCount := post.Retries post.mu.Unlock() - // If we haven't exceeded max retries, add only failed articles back to queue - if post.Retries < int(p.checkCfg.MaxRePost) { - // Refresh article headers before re-posting. - // The MessageID is intentionally preserved: the NZB generator keys entries - // by (filename, messageID), so regenerating the ID would append a duplicate - // segment rather than replacing the existing one, corrupting the NZB. - // Re-using the same MessageID is safe because: - // - If the first post was never accepted, the server will accept it again. - // - If it was accepted but propagation is slow, the deferred-check path - // handles that case and we should not be re-posting at all. - // The header validation below guards against 441 "Missing required fields" - // errors that can occur when other headers are missing or empty. + // If we still have retries left, re-post the failed articles. + if retryCount < int(p.checkCfg.MaxRePost) { + // Refresh article headers before re-posting (preserve MessageID; + // the NZB generator keys entries by (filename, messageID)). for _, art := range failedArticles { - // Ensure required NNTP headers are non-empty if art.From == "" { if defaultFrom := p.cfg.PostHeaders.DefaultFrom; defaultFrom != "" { art.From = defaultFrom @@ -649,177 +797,118 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue } } - // Create a new post with only the failed articles + // Build the retry post. retryParent links it back to the + // original so postLoop skips wg/failed bookkeeping (already + // accounted for on the original). failedPost := &Post{ - FilePath: post.FilePath, - Articles: failedArticles, - Status: PostStatusPending, - file: post.file, - filesize: post.filesize, - wg: post.wg, - failed: post.failed, - Retries: post.Retries, - progress: p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (retry)", filepath.Base(post.FilePath)), progress.ProgressTypeUploading, post.filesize), + FilePath: post.FilePath, + Articles: failedArticles, + Status: PostStatusPending, + file: post.file, + filesize: post.filesize, + wg: nil, // retries don't gate the per-call wg + failed: nil, + Retries: retryCount, + progress: p.jobProgress.AddProgress(uuid.New(), fmt.Sprintf("%s (retry)", filepath.Base(post.FilePath)), progress.ProgressTypeUploading, post.filesize), + errSink: nil, // Post() has already returned; no caller to surface to + nzbGen: post.nzbGen, + completedItemID: post.completedItemID, + onCheckExhausted: post.onCheckExhausted, + retryParent: post, } - slog.InfoContext(ctx, - "Retrying failed articles", + slog.InfoContext(ctx, "Retrying failed articles", "file", post.FilePath, - "attempt", post.Retries, + "attempt", retryCount, "max_retries", p.checkCfg.MaxRePost, ) - // Track this retry in the queue before sending - postsInFlight.Add(1) - - // Send retry. postQueue is kept open by the postsInFlight - // gate (the Add above runs before this send), so a closed- - // channel panic here is no longer reachable. Only ctx - // cancellation can interrupt the send. - // - // Bookkeeping: addPost added +1 for the original post; the - // Add above added +1 for the retry. Once the retry is - // queued, the original post is no longer "in flight" — the - // retry represents it from here on — so Done the original. select { - case postQueue <- failedPost: - postsInFlight.Done() - continue + case p.postQueue <- failedPost: case <-ctx.Done(): - // Decrement both: the retry that won't be sent and the - // original post we are abandoning. - postsInFlight.Done() - postsInFlight.Done() slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath) + if post.file != nil { + _ = post.file.Close() + } return } + continue } - // Max retries exhausted - check if deferred checking is enabled - if deferredEnabled { - // Collect failed articles for deferred verification instead of failing - deferredMu.Lock() - for _, art := range failedArticles { - allDeferredArticles = append(allDeferredArticles, FailedArticleInfo{ - MessageID: art.MessageID, - Groups: art.Groups, - }) + // Max retries exhausted. + p.jobProgress.FinishProgress(post.progress.GetID()) + if post.file != nil { + if err := post.file.Close(); err != nil { + slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", err, "file", post.FilePath) } - deferredMu.Unlock() + } - slog.InfoContext(ctx, - "Articles deferred for later verification", + // Build the failed-article info list for downstream handling. + infos := make([]FailedArticleInfo, 0, len(failedArticles)) + for _, art := range failedArticles { + infos = append(infos, FailedArticleInfo{ + MessageID: art.MessageID, + Groups: art.Groups, + }) + } + + if deferredEnabled && post.onCheckExhausted != nil { + slog.InfoContext(ctx, "Articles deferred for later verification", "file", post.FilePath, - "deferred_count", len(failedArticles), + "deferred_count", len(infos), "retries_exhausted", p.checkCfg.MaxRePost, ) - - // Mark as verified (optimistically) - deferred check will update later post.mu.Lock() - post.Status = PostStatusVerified + post.Status = PostStatusVerified // optimistic; deferred worker updates later post.mu.Unlock() - - postsInFlight.Done() - p.jobProgress.FinishProgress(post.progress.GetID()) - - if post.file != nil { - if err := post.file.Close(); err != nil { - slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) - } + if err := post.onCheckExhausted(ctx, infos, len(post.Articles), post.completedItemID); err != nil { + slog.ErrorContext(ctx, "onCheckExhausted callback failed", + "file", post.FilePath, "error", err) } - - post.wg.Done() - continue - } - - // Deferred checking not enabled - fail as before - post.mu.Lock() - post.Status = PostStatusFailed - post.Error = fmt.Errorf("failed to verify articles after %d retries", p.checkCfg.MaxRePost) - post.mu.Unlock() - - // Mark this post as done in queue tracking - it failed permanently - postsInFlight.Done() - - if post.failed != nil { - post.failed.Add(1) - } - - if post.file != nil { - if cerr := post.file.Close(); cerr != nil { - slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) - } - } - - errChan <- fmt.Errorf("failed to verify file %s after %d retries", post.FilePath, p.checkCfg.MaxRePost) - return - } else if errors != nil { - // This is a safety check - if we have errors but no failed articles, something went wrong - post.mu.Lock() - post.Status = PostStatusFailed - post.Error = fmt.Errorf("verification failed but no articles were marked as failed: %v", errors) - post.mu.Unlock() - - // Mark this post as done in queue tracking - it failed with unexpected error - postsInFlight.Done() - - if post.failed != nil { - post.failed.Add(1) + } else { + // No deferred path or no callback: log and drop. Post() has + // already returned to the caller; we cannot retroactively + // turn this into a synchronous error. + post.mu.Lock() + post.Status = PostStatusFailed + post.Error = fmt.Errorf("failed to verify articles after %d retries", p.checkCfg.MaxRePost) + post.mu.Unlock() + slog.WarnContext(ctx, "Articles failed verification (no deferred handler)", + "file", post.FilePath, + "failed_count", len(infos), + "retries_exhausted", p.checkCfg.MaxRePost, + ) } - + continue + } else if poolErr != nil { + // Pool error with no per-article failures collected — context + // cancellation or a checkArticle bug. Log and move on. + slog.WarnContext(ctx, "checkLoop pool error with no failed articles", + "file", post.FilePath, "error", poolErr) if post.file != nil { - if cerr := post.file.Close(); cerr != nil { - slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) - } + _ = post.file.Close() } - - errChan <- fmt.Errorf("unexpected error verifying file %s: %v", post.FilePath, errors) - return + continue } - // Mark as verified + // All articles verified. post.mu.Lock() post.Status = PostStatusVerified post.mu.Unlock() - - // Mark this post as done in queue tracking - verification successful - postsInFlight.Done() - p.jobProgress.FinishProgress(post.progress.GetID()) - - // Close file if post.file != nil { if err := post.file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file handle", "error", err, "file", post.FilePath) } } - - post.wg.Done() - } - } - - // After processing all posts, if there are deferred articles, send a DeferredCheckError - // This is a non-fatal error that signals the caller to store these for later verification. - // Guard the send with ctx so a leaked checkLoop cannot block forever if the main - // goroutine has already returned. - if len(allDeferredArticles) > 0 { - slog.InfoContext(ctx, "Sending deferred check error", - "deferred_articles", len(allDeferredArticles), - "total_articles", totalArticlesProcessed) - select { - case errChan <- &DeferredCheckError{ - FailedArticles: allDeferredArticles, - TotalArticles: totalArticlesProcessed, - }: - case <-ctx.Done(): } } } -// addPost adds a file to the posting queue -// displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4") -// If displayName is empty, the filename is used -func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error { +// addPost adds a file to the posting queue. +// displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4"). +// If displayName is empty, the filename is used. +func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, nzbGen nzb.NZBGenerator, errSink chan<- error, completedItemID string, onCheckExhausted CheckExhaustedCallback) error { file, err := os.Open(filePath) if err != nil { return fmt.Errorf("error opening file: %w", err) @@ -1002,28 +1091,32 @@ func (p *poster) addPost(ctx context.Context, filePath string, displayName strin } post := &Post{ - FilePath: filePath, - Articles: articles, - Status: PostStatusPending, - file: file, - filesize: fileInfo.Size(), - wg: wg, - failed: failedPosts, - progress: p.jobProgress.AddProgress(uuid.New(), filepath.Base(filePath), progress.ProgressTypeUploading, fileInfo.Size()), + FilePath: filePath, + Articles: articles, + Status: PostStatusPending, + file: file, + filesize: fileInfo.Size(), + wg: wg, + failed: failedPosts, + progress: p.jobProgress.AddProgress(uuid.New(), filepath.Base(filePath), progress.ProgressTypeUploading, fileInfo.Size()), + errSink: errSink, + nzbGen: nzbGen, + completedItemID: completedItemID, + onCheckExhausted: onCheckExhausted, } - // Track this post as in-flight until it's sent to the queue - postsInFlight.Add(1) + // Send to the poster-lifetime postQueue. If the poster has been Closed + // concurrently, the queue will be closed and the send will panic — guard + // with a closed-check. + if p.closed.Load() { + _ = file.Close() + return ErrPosterClosed + } - // Use select to safely send to channel and handle context cancellation select { - case postQueue <- post: - // Successfully sent to queue + case p.postQueue <- post: return nil case <-ctx.Done(): - // Context canceled, decrement counter since we didn't send - postsInFlight.Done() - // Close the file and return error if err := file.Close(); err != nil { slog.WarnContext(ctx, "Error closing file after context cancellation", "error", err, "file", filePath) } diff --git a/internal/poster/poster_test.go b/internal/poster/poster_test.go index e184388..c9441ee 100644 --- a/internal/poster/poster_test.go +++ b/internal/poster/poster_test.go @@ -371,12 +371,17 @@ func TestPost(t *testing.T) { jobProgress: mockJobProgress, } + // After Phase 2, verification failures are async and never propagate + // to Post() as a synchronous error — the upload itself succeeded. + // Permanent verification failures route through onCheckExhausted (when + // deferred mode is enabled) or are logged-only (otherwise). This test + // keeps deferred disabled so the check failure is logged and dropped; + // Post() returns nil because the file did upload. err := p.Post(ctx, []string{testFile}, "", nzbGen) + assert.NoError(t, err) - assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to verify file") - - // Close after test completes + // Close after test completes — also drains the checkLoop so the + // post-check failure is processed before we return. p.Close() // Finish controller after all operations complete @@ -822,23 +827,23 @@ func TestAddPost(t *testing.T) { p := &poster{ cfg: cfg, jobProgress: mockJobProgress, + postQueue: make(chan *Post, 10), } var wg sync.WaitGroup - var postsInFlight sync.WaitGroup var failedPosts atomic.Int64 - postQueue := make(chan *Post, 10) + errSink := make(chan error, 4) nzbGen := mocks.NewMockNZBGenerator(ctrl) wg.Add(1) ctx := context.Background() - err := p.addPost(ctx, testFile, "", 1, 1, &wg, &failedPosts, postQueue, nzbGen, &postsInFlight) + err := p.addPost(ctx, testFile, "", 1, 1, &wg, &failedPosts, nzbGen, errSink, "", nil) assert.NoError(t, err) // Check that a post was added to the queue select { - case post := <-postQueue: + case post := <-p.postQueue: assert.Equal(t, testFile, post.FilePath) assert.Equal(t, PostStatusPending, post.Status) assert.Greater(t, len(post.Articles), 1) // Should have multiple articles due to small segment size @@ -863,13 +868,12 @@ func TestAddPost(t *testing.T) { defer ctrl.Finish() var wg sync.WaitGroup - var postsInFlight sync.WaitGroup var failedPosts atomic.Int64 - postQueue := make(chan *Post, 10) + errSink := make(chan error, 4) nzbGen := mocks.NewMockNZBGenerator(ctrl) ctx := context.Background() - err := p.addPost(ctx, "nonexistent.txt", "", 1, 1, &wg, &failedPosts, postQueue, nzbGen, &postsInFlight) + err := p.addPost(ctx, "nonexistent.txt", "", 1, 1, &wg, &failedPosts, nzbGen, errSink, "", nil) assert.Error(t, err) assert.Contains(t, err.Error(), "error opening file") @@ -1035,6 +1039,11 @@ func TestPostIntegration(t *testing.T) { assert.NoError(t, err) + // Phase 2: Post() returns once uploads complete; verification runs + // asynchronously. Close() drains the long-lived loops so we can read + // stats deterministically. + p.Close() + // Verify stats were updated stats := p.Stats() assert.Equal(t, int64(1), stats.ArticlesPosted) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 027881f..0a66c77 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -468,6 +468,41 @@ func (p *Processor) processFile(ctx context.Context, msg *goqite.Message, job *q } defer jobPostie.Close() + // Wire the asynchronous post-check callback. The poster's long-lived + // checkLoop calls this when articles exhaust MaxRePost retries; we persist + // them to the deferred-check queue so PostCheckRetryWorker can recheck + // them later. The callback runs after Post() has already returned, so the + // processor's main flow no longer blocks on verification. + completedItemID := string(msg.ID) + deferredDelay := p.config.GetPostCheckConfig().DeferredCheckDelay.ToDuration() + jobPostie.SetVerificationCallback(completedItemID, func(cbCtx context.Context, articles []poster.FailedArticleInfo, totalArticles int, itemID string) error { + if len(articles) == 0 { + return nil + } + nextRetryAt := time.Now().Add(deferredDelay) + pendingChecks := make([]queue.PendingArticleCheck, 0, len(articles)) + for _, art := range articles { + groupsJSON, _ := json.Marshal(art.Groups) + pendingChecks = append(pendingChecks, queue.PendingArticleCheck{ + MessageID: art.MessageID, + Groups: string(groupsJSON), + NextRetryAt: nextRetryAt, + }) + } + if addErr := p.queue.AddPendingArticleChecks(cbCtx, itemID, pendingChecks); addErr != nil { + slog.ErrorContext(cbCtx, "Failed to store deferred article checks", + "error", addErr, "completedItemID", itemID, "count", len(pendingChecks)) + return addErr + } + if statusErr := p.queue.UpdateCompletedItemVerificationStatus(cbCtx, itemID, "pending_verification"); statusErr != nil { + slog.ErrorContext(cbCtx, "Failed to update verification status", + "error", statusErr, "completedItemID", itemID) + } + slog.InfoContext(cbCtx, "Stored deferred article checks asynchronously", + "completedItemID", itemID, "deferred_articles", len(pendingChecks), "total_articles", totalArticles) + return nil + }) + // Determine the input folder for maintaining folder structure var inputFolder string switch { diff --git a/pkg/postie/postfolder_test.go b/pkg/postie/postfolder_test.go index 72b2912..b6e80e7 100644 --- a/pkg/postie/postfolder_test.go +++ b/pkg/postie/postfolder_test.go @@ -22,6 +22,10 @@ func (m *mockPoster) Post(_ context.Context, files []string, _ string, nzbGen nz return nil } +func (m *mockPoster) PostWithCallback(_ context.Context, files []string, _ string, nzbGen nzb.NZBGenerator, _ map[string]string, _ string, _ poster.CheckExhaustedCallback) error { + return m.PostWithRelativePaths(context.Background(), files, "", nzbGen, nil) +} + func (m *mockPoster) PostWithRelativePaths(_ context.Context, files []string, _ string, nzbGen nzb.NZBGenerator, _ map[string]string) error { addFakeArticles(nzbGen, files) return nil diff --git a/pkg/postie/postie.go b/pkg/postie/postie.go index 5f48069..1731a89 100644 --- a/pkg/postie/postie.go +++ b/pkg/postie/postie.go @@ -32,6 +32,26 @@ type Postie struct { maintainOriginalExtension bool jobProgress progress.JobProgress queue QueueInterface + + // Verification callback set by the caller (e.g. the processor) before + // invoking Post(). Forwarded to poster.PostWithCallback so the long-lived + // checkLoop can persist permanent verification failures asynchronously. + verifyItemID string + verifyCallback poster.CheckExhaustedCallback +} + +// SetVerificationCallback configures the asynchronous post-check exhaustion +// callback for this Postie instance. completedItemID identifies the queue +// item; cb is invoked once per file whose articles cannot be verified after +// MaxRePost retries. Either parameter may be empty/nil — in that case the +// poster logs the failure and drops it (same behavior as the legacy CLI). +// +// Postie is per-job (constructed and Closed inside processor.processFile), +// so this setter is normally called once before any Post() call. Concurrent +// Post() calls on the same Postie are not currently supported. +func (p *Postie) SetVerificationCallback(completedItemID string, cb poster.CheckExhaustedCallback) { + p.verifyItemID = completedItemID + p.verifyCallback = cb } // QueueInterface defines the queue methods needed by Postie @@ -272,7 +292,7 @@ func (p *Postie) postInParallel( return nil } - if err := p.poster.Post(ctx, createdPar2Paths, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, createdPar2Paths, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { if !errors.Is(err, context.Canceled) { slog.ErrorContext(ctx, fmt.Sprintf("Error during upload of par2 files: %s. Upload will continue without par2.", createdPar2Paths), "error", err) } @@ -286,7 +306,7 @@ func (p *Postie) postInParallel( var deferredErr *poster.DeferredCheckError errg.Go(func() error { - if err := p.poster.Post(ctx, []string{f.Path}, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, []string{f.Path}, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error var de *poster.DeferredCheckError if errors.As(err, &de) { @@ -385,7 +405,7 @@ func (p *Postie) post( } var deferredErr *poster.DeferredCheckError - if err := p.poster.Post(ctx, filesPath, rootDir, nzbGen); err != nil { + if err := p.poster.PostWithCallback(ctx, filesPath, rootDir, nzbGen, nil, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error if errors.As(err, &deferredErr) { slog.InfoContext(ctx, "Some articles deferred for later verification", "file", f.Path, "deferred", len(deferredErr.FailedArticles)) @@ -517,7 +537,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root var deferredErr *poster.DeferredCheckError // Post all files (including PAR2) together with relative paths for subjects - if err := p.poster.PostWithRelativePaths(ctx, allFilePaths, rootDir, nzbGen, relativePaths); err != nil { + if err := p.poster.PostWithCallback(ctx, allFilePaths, rootDir, nzbGen, relativePaths, p.verifyItemID, p.verifyCallback); err != nil { if errors.As(err, &deferredErr) { slog.InfoContext(ctx, "Some articles deferred for later verification", "folder", folderName, "deferred", len(deferredErr.FailedArticles)) } else { @@ -579,7 +599,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root } par2RelPaths := buildPar2RelativePaths(files, createdPar2Paths) - if err := p.poster.PostWithRelativePaths(ctx, createdPar2Paths, rootDir, nzbGen, par2RelPaths); err != nil { + if err := p.poster.PostWithCallback(ctx, createdPar2Paths, rootDir, nzbGen, par2RelPaths, p.verifyItemID, p.verifyCallback); err != nil { if !errors.Is(err, context.Canceled) { slog.ErrorContext(ctx, "Error during upload of par2 files. Upload will continue without par2.", "error", err) } @@ -590,7 +610,7 @@ func (p *Postie) postFolder(ctx context.Context, files []fileinfo.FileInfo, root // Post main files with relative paths for subjects errg.Go(func() error { - if err := p.poster.PostWithRelativePaths(ctx, allFilePaths, rootDir, nzbGen, relativePaths); err != nil { + if err := p.poster.PostWithCallback(ctx, allFilePaths, rootDir, nzbGen, relativePaths, p.verifyItemID, p.verifyCallback); err != nil { // Check if this is a non-fatal deferred check error var de *poster.DeferredCheckError if errors.As(err, &de) {