From 9825dbf8abeb45a503cec8dd9aab1a3dcc6e6e89 Mon Sep 17 00:00:00 2001 From: javi11 Date: Wed, 20 May 2026 09:08:49 +0200 Subject: [PATCH] feat(pool): elastic import admission to prioritize streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under high load, VFS/FUSE streams could starve when many ARR-driven NZB imports ran concurrently. nntppool v4 already has priority dispatch (streams use BodyPriority), but priority only kicks in once a connection frees — when every connection is mid-body for a long-running import, even priority requests wait. Add a pool-level admission controller with two adaptive caps: - max_concurrent_imports (capIdle) — when no stream is active - max_concurrent_imports_while_streaming — when any stream is active The gate sits at Processor.ProcessNzbFile entry (single chokepoint for queue worker, scanner watcher, background reprocess) — before any pool work, so the 30s body timeouts can never be eaten by queue waits. StreamTracker keeps an atomic active-stream count and notifies the pool on Add/Remove; the pool reads the count to pick the effective cap and wakes queued waiters when streams end. Both caps default to 0 (unlimited) — byte-identical behaviour until configured. --- internal/api/server.go | 8 + internal/api/stream_tracker.go | 38 ++ internal/config/accessors.go | 19 + internal/config/manager.go | 7 + internal/health/repair_e2e_test.go | 13 +- internal/importer/processor.go | 12 +- internal/importer/service.go | 24 ++ .../metadata_remote_file_test.go | 9 + internal/pool/admission.go | 200 ++++++++++ internal/pool/admission_test.go | 342 ++++++++++++++++++ internal/pool/manager.go | 52 ++- 11 files changed, 717 insertions(+), 7 deletions(-) create mode 100644 internal/pool/admission.go create mode 100644 internal/pool/admission_test.go diff --git a/internal/api/server.go b/internal/api/server.go index 817de3aa..a3c0db88 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -114,6 +114,14 @@ func NewServer( updater: updater.Default(), } + // Wire stream-activity ↔ pool admission. Streams notify the pool when they + // start/stop; the pool reads the active stream count to pick its + // adaptive import cap. + if poolManager != nil && streamTracker != nil { + streamTracker.SetChangeNotifier(poolManager) + poolManager.SetStreamSource(streamTracker) + } + return server } diff --git a/internal/api/stream_tracker.go b/internal/api/stream_tracker.go index 10f9148b..1b7efd77 100644 --- a/internal/api/stream_tracker.go +++ b/internal/api/stream_tracker.go @@ -16,6 +16,13 @@ import ( // Default timeout for stale streams (4 hours - covers most movie lengths) const defaultStreamTimeout = 4 * time.Hour +// StreamChangeNotifier is notified whenever the active stream count changes. +// Implemented by pool.Manager; declared here to avoid an api -> pool import +// dependency for the StreamTracker itself. +type StreamChangeNotifier interface { + NotifyStreamChange() +} + // StreamTracker tracks active streams type StreamTracker struct { streams sync.Map @@ -24,6 +31,15 @@ type StreamTracker struct { mu sync.Mutex // For history protection timeout time.Duration metricsTracker usenet.MetricsTracker + + // activeCount is the exact number of entries currently in the streams map. + // Maintained as an int64 counter so ActiveStreams() is O(1) and safe to + // call from hot paths (e.g. the pool admission gate). + activeCount atomic.Int64 + + // notifier, when set, is notified after every stream add/remove so the + // import-admission cap can react to streams starting/stopping. + notifier StreamChangeNotifier } type streamSample struct { @@ -253,9 +269,29 @@ func (t *StreamTracker) AddStream(filePath, source, userName, clientIP, userAgen samples: make([]streamSample, 0, 30), // Preallocate for 1 minute of samples (every 2s) } t.streams.Store(id, internal) + t.activeCount.Add(1) + t.notifyChange() return stream } +// SetChangeNotifier wires a notifier (typically a pool.Manager) that will be +// signalled whenever the active stream count changes. Pass nil to clear. +func (t *StreamTracker) SetChangeNotifier(n StreamChangeNotifier) { + t.notifier = n +} + +// ActiveStreams returns the current number of tracked streams. +// Implements pool.StreamActivitySource (structurally). +func (t *StreamTracker) ActiveStreams() int { + return int(t.activeCount.Load()) +} + +func (t *StreamTracker) notifyChange() { + if t.notifier != nil { + t.notifier.NotifyStreamChange() + } +} + // Add adds a new stream and returns its ID (implements nzbfilesystem.StreamTracker) func (t *StreamTracker) Add(filePath, source, userName, clientIP, userAgent string, totalSize int64) string { return t.AddStream(filePath, source, userName, clientIP, userAgent, totalSize).ID @@ -344,6 +380,8 @@ func (t *StreamTracker) Remove(id string) { t.mu.Unlock() t.streams.Delete(id) + t.activeCount.Add(-1) + t.notifyChange() } } diff --git a/internal/config/accessors.go b/internal/config/accessors.go index ccabbbec..e02a5776 100644 --- a/internal/config/accessors.go +++ b/internal/config/accessors.go @@ -105,6 +105,25 @@ func (c *Config) GetMaxImportConnections() int { return c.Import.MaxImportConnections } +// GetMaxConcurrentImports returns the global cap on concurrent NZB imports +// when no stream is active. 0 means unlimited (current default behaviour). +func (c *Config) GetMaxConcurrentImports() int { + if c.Import.MaxConcurrentImports < 0 { + return 0 + } + return c.Import.MaxConcurrentImports +} + +// GetMaxConcurrentImportsWhileStreaming returns the cap on concurrent NZB +// imports while at least one stream is active. 0 means unlimited (current +// default behaviour). +func (c *Config) GetMaxConcurrentImportsWhileStreaming() int { + if c.Import.MaxConcurrentImportsWhileStreaming < 0 { + return 0 + } + return c.Import.MaxConcurrentImportsWhileStreaming +} + // GetMaxDownloadPrefetch returns max download prefetch with a default fallback. func (c *Config) GetMaxDownloadPrefetch() int { if c.Import.MaxDownloadPrefetch <= 0 { diff --git a/internal/config/manager.go b/internal/config/manager.go index d708fcdc..f1743cca 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -262,6 +262,13 @@ type ImportConfig struct { QueueProcessingIntervalSeconds int `yaml:"queue_processing_interval_seconds" mapstructure:"queue_processing_interval_seconds" json:"queue_processing_interval_seconds"` AllowedFileExtensions []string `yaml:"allowed_file_extensions" mapstructure:"allowed_file_extensions" json:"allowed_file_extensions"` MaxImportConnections int `yaml:"max_import_connections" mapstructure:"max_import_connections" json:"max_import_connections"` + // MaxConcurrentImports caps the number of NZB imports that may run + // end-to-end at the same time when no stream is active. 0 = unlimited. + MaxConcurrentImports int `yaml:"max_concurrent_imports" mapstructure:"max_concurrent_imports" json:"max_concurrent_imports"` + // MaxConcurrentImportsWhileStreaming caps concurrent imports while at + // least one stream is active, so streams are not starved by imports. + // 0 = unlimited. + MaxConcurrentImportsWhileStreaming int `yaml:"max_concurrent_imports_while_streaming" mapstructure:"max_concurrent_imports_while_streaming" json:"max_concurrent_imports_while_streaming"` MaxDownloadPrefetch int `yaml:"max_download_prefetch" mapstructure:"max_download_prefetch" json:"max_download_prefetch"` SegmentSamplePercentage int `yaml:"segment_sample_percentage" mapstructure:"segment_sample_percentage" json:"segment_sample_percentage"` ReadTimeoutSeconds int `yaml:"read_timeout_seconds" mapstructure:"read_timeout_seconds" json:"read_timeout_seconds"` diff --git a/internal/health/repair_e2e_test.go b/internal/health/repair_e2e_test.go index eed05344..91f67d63 100644 --- a/internal/health/repair_e2e_test.go +++ b/internal/health/repair_e2e_test.go @@ -33,9 +33,9 @@ func (m *mockPoolManager) HasPool() bool { return fal func (m *mockPoolManager) GetMetrics() (pool.MetricsSnapshot, error) { return pool.MetricsSnapshot{}, nil } -func (m *mockPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil } -func (m *mockPoolManager) ResetProviderErrors(_ context.Context) error { return nil } -func (m *mockPoolManager) IncArticlesDownloaded() {} +func (m *mockPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil } +func (m *mockPoolManager) ResetProviderErrors(_ context.Context) error { return nil } +func (m *mockPoolManager) IncArticlesDownloaded() {} func (m *mockPoolManager) UpdateDownloadProgress(_ string, _ int64) {} func (m *mockPoolManager) IncArticlesPosted() {} func (m *mockPoolManager) AddProvider(_ nntppool.Provider) error { return nil } @@ -44,6 +44,13 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error return nil } func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {} +func (m *mockPoolManager) AcquireImportSlot(_ context.Context) (func(), error) { + return func() {}, nil +} +func (m *mockPoolManager) SetAdmissionCaps(_ int, _ int) {} +func (m *mockPoolManager) SetStreamSource(_ pool.StreamActivitySource) {} +func (m *mockPoolManager) NotifyStreamChange() {} + // mockARRsService captures TriggerFileRescan calls and returns a configurable error. type mockARRsService struct { mu sync.Mutex diff --git a/internal/importer/processor.go b/internal/importer/processor.go index a2e47742..e0e778da 100644 --- a/internal/importer/processor.go +++ b/internal/importer/processor.go @@ -81,7 +81,6 @@ func (proc *Processor) getCleanNzbName(nzbPath string, queueID int) string { return baseName } - func (proc *Processor) SetRecorder(recorder HistoryRecorder) { proc.recorder = recorder } @@ -170,6 +169,17 @@ func (proc *Processor) checkCancellation(ctx context.Context) error { // metadata files written to disk; it is populated even on partial failure so callers can clean up. // Paths prefixed with "DIR:" indicate a metadata directory that should be removed entirely. func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string, extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string, downloadID *string) (string, []string, error) { + // Gate this import behind the pool admission controller so we can cap how + // many NZB imports run concurrently end-to-end and yield to streams under + // load. The Acquire is a no-op when no caps are configured. + if proc.poolManager != nil { + release, err := proc.poolManager.AcquireImportSlot(ctx) + if err != nil { + return "", nil, fmt.Errorf("import admission cancelled: %w", err) + } + defer release() + } + cfg := proc.configGetter() // Determine max connections to use diff --git a/internal/importer/service.go b/internal/importer/service.go index cbdf2641..3c53791d 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -186,6 +186,7 @@ type Service struct { healthRepo *database.HealthRepository // Health repository for updating health status broadcaster *progress.ProgressBroadcaster // WebSocket progress broadcaster userRepo *database.UserRepository // User repository for API key lookup + poolManager pool.Manager // Pool manager — used to push admission caps on config change log *slog.Logger // Runtime state @@ -241,6 +242,7 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, sabnzbdClient: sabnzbd.NewSABnzbdClient(), broadcaster: broadcaster, userRepo: userRepo, + poolManager: poolManager, log: slog.Default().With("component", "importer-service"), ctx: ctx, cancel: cancel, @@ -248,6 +250,17 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService, paused: false, } + // Push initial admission caps to the pool so imports are gated from the + // start. Zero values keep the controller disabled, matching prior behaviour. + if poolManager != nil && configGetter != nil { + if cfg := configGetter(); cfg != nil { + poolManager.SetAdmissionCaps( + cfg.GetMaxConcurrentImports(), + cfg.GetMaxConcurrentImportsWhileStreaming(), + ) + } + } + // Set recorder for processor processor.SetRecorder(service) @@ -404,6 +417,17 @@ func (s *Service) RegisterConfigChangeHandler(configManager any) { "old_workers", oldWorkers, "new_workers", newWorkers) } } + + // Push updated import-admission caps to the pool. Zero values keep + // the admission gate disabled (unlimited). + if s.poolManager != nil { + capIdle := newConfig.GetMaxConcurrentImports() + capWhileStreaming := newConfig.GetMaxConcurrentImportsWhileStreaming() + s.poolManager.SetAdmissionCaps(capIdle, capWhileStreaming) + s.log.InfoContext(s.ctx, "Import admission caps updated", + "max_concurrent_imports", capIdle, + "max_concurrent_imports_while_streaming", capWhileStreaming) + } }) } diff --git a/internal/nzbfilesystem/metadata_remote_file_test.go b/internal/nzbfilesystem/metadata_remote_file_test.go index 7d2da6ac..a7c1507e 100644 --- a/internal/nzbfilesystem/metadata_remote_file_test.go +++ b/internal/nzbfilesystem/metadata_remote_file_test.go @@ -390,6 +390,15 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {} +func (m *mockPoolManager) AcquireImportSlot(_ context.Context) (func(), error) { + return func() {}, nil +} + +func (m *mockPoolManager) SetAdmissionCaps(_ int, _ int) {} + +func (m *mockPoolManager) SetStreamSource(_ pool.StreamActivitySource) {} + +func (m *mockPoolManager) NotifyStreamChange() {} // TestSeekResetsOriginalRangeEnd tests that Seek properly resets originalRangeEnd // This is critical for video playback - without this fix, seeking causes stale range diff --git a/internal/pool/admission.go b/internal/pool/admission.go new file mode 100644 index 00000000..3b750484 --- /dev/null +++ b/internal/pool/admission.go @@ -0,0 +1,200 @@ +package pool + +import ( + "context" + "sync" +) + +// StreamActivitySource reports how many streams are currently active. +// Implemented by api.StreamTracker; kept here so the dependency flows api -> pool. +type StreamActivitySource interface { + ActiveStreams() int +} + +// ImportAdmission is an adaptive counting semaphore that gates how many NZB +// imports may run concurrently end-to-end. It exposes two caps: +// +// - capIdle: maximum concurrent imports when no stream is active. +// - capWhileStreaming: maximum concurrent imports while any stream is active. +// +// A cap value of 0 means "unlimited" for that mode. When both caps are 0 the +// admission controller is a no-op, which is the default so existing +// deployments behave exactly as before until the new config knobs are set. +// +// The controller uses a FIFO waiter queue with manual select / channel signalling +// so we can support ctx cancellation without dropping wake-ups (the classic +// "lost wakeup" hazard with sync.Cond.Wait + cancellation). +type ImportAdmission struct { + mu sync.Mutex + capIdle int + capWhileStreaming int + inFlight int + waiters []*waiter + streamSource StreamActivitySource +} + +type waiter struct { + // ch is closed (or sent on) exactly once when the waiter is granted a slot. + // Buffered with capacity 1 so a granter never blocks; on a race with ctx + // cancellation, the cancelling goroutine drains and forwards the grant to + // the next waiter to avoid losing the wake-up. + ch chan struct{} +} + +// NewImportAdmission constructs an admission controller with both caps disabled +// (0/0 = unlimited). Use SetCaps and SetStreamSource to configure it. +func NewImportAdmission() *ImportAdmission { + return &ImportAdmission{} +} + +// SetCaps updates both caps. If capWhileStreaming > capIdle and capIdle > 0, +// the streaming cap is clamped to capIdle. After updating, queued waiters are +// woken if the effective cap grew. +func (a *ImportAdmission) SetCaps(capIdle, capWhileStreaming int) { + if capIdle < 0 { + capIdle = 0 + } + if capWhileStreaming < 0 { + capWhileStreaming = 0 + } + if capIdle > 0 && capWhileStreaming > capIdle { + capWhileStreaming = capIdle + } + + a.mu.Lock() + a.capIdle = capIdle + a.capWhileStreaming = capWhileStreaming + a.wakeWaitersLocked() + a.mu.Unlock() +} + +// SetStreamSource wires the activity signal. nil sources are tolerated and +// effectively pin behaviour to capIdle. +func (a *ImportAdmission) SetStreamSource(src StreamActivitySource) { + a.mu.Lock() + a.streamSource = src + a.wakeWaitersLocked() + a.mu.Unlock() +} + +// NotifyStreamChange should be called when the stream count changes so the +// controller can wake or hold waiters according to the new effective cap. +func (a *ImportAdmission) NotifyStreamChange() { + a.mu.Lock() + a.wakeWaitersLocked() + a.mu.Unlock() +} + +// Acquire blocks until an admission slot is available or ctx is cancelled. +// The returned release function MUST be called exactly once when the import is +// done. When both caps are 0 the call is a fast-path no-op. +func (a *ImportAdmission) Acquire(ctx context.Context) (release func(), err error) { + a.mu.Lock() + if a.disabledLocked() { + a.mu.Unlock() + return noopRelease, nil + } + + if a.inFlight < a.currentCapLocked() { + a.inFlight++ + a.mu.Unlock() + return a.releaseOnce(), nil + } + + w := &waiter{ch: make(chan struct{}, 1)} + a.waiters = append(a.waiters, w) + a.mu.Unlock() + + select { + case <-w.ch: + // Granted. inFlight was already incremented by the granter. + return a.releaseOnce(), nil + case <-ctx.Done(): + // We may have been granted concurrently. Resolve the race under the + // lock: if the channel has a pending wake, consume it and forward it + // to the next waiter; otherwise remove ourselves from the queue. + a.mu.Lock() + select { + case <-w.ch: + // Already granted. Hand the slot to the next waiter. + a.inFlight-- // undo the grant + a.wakeWaitersLocked() + default: + a.removeWaiterLocked(w) + } + a.mu.Unlock() + return noopRelease, ctx.Err() + } +} + +// disabledLocked reports true when both caps are 0 (controller is a no-op). +func (a *ImportAdmission) disabledLocked() bool { + return a.capIdle == 0 && a.capWhileStreaming == 0 +} + +// currentCapLocked returns the cap that applies right now. A cap of 0 means +// unlimited; we return a very large number so the comparison `inFlight < cap` +// is effectively always true. +func (a *ImportAdmission) currentCapLocked() int { + cap := a.capIdle + if a.streamSource != nil && a.streamSource.ActiveStreams() > 0 { + cap = a.capWhileStreaming + } + if cap <= 0 { + // Unlimited. + return 1 << 30 + } + return cap +} + +// wakeWaitersLocked wakes waiters in FIFO order while there is headroom under +// the current cap. Each wake-up increments inFlight, so callers that receive +// the signal must call their release exactly once. +func (a *ImportAdmission) wakeWaitersLocked() { + if a.disabledLocked() { + // Drain any waiters (shouldn't exist when both caps are 0, but be safe). + for _, w := range a.waiters { + select { + case w.ch <- struct{}{}: + a.inFlight++ + default: + } + } + a.waiters = nil + return + } + + cap := a.currentCapLocked() + for len(a.waiters) > 0 && a.inFlight < cap { + w := a.waiters[0] + a.waiters = a.waiters[1:] + a.inFlight++ + // Buffered chan capacity 1 — never blocks. + w.ch <- struct{}{} + } +} + +func (a *ImportAdmission) removeWaiterLocked(target *waiter) { + for i, w := range a.waiters { + if w == target { + a.waiters = append(a.waiters[:i], a.waiters[i+1:]...) + return + } + } +} + +func (a *ImportAdmission) releaseOnce() func() { + var once sync.Once + return func() { + once.Do(func() { + a.mu.Lock() + if a.inFlight > 0 { + a.inFlight-- + } + a.wakeWaitersLocked() + a.mu.Unlock() + }) + } +} + +func noopRelease() {} diff --git a/internal/pool/admission_test.go b/internal/pool/admission_test.go new file mode 100644 index 00000000..dddec08b --- /dev/null +++ b/internal/pool/admission_test.go @@ -0,0 +1,342 @@ +package pool + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +// stubStreamSource is a tiny test source whose ActiveStreams value can be +// changed atomically. +type stubStreamSource struct { + n atomic.Int64 +} + +func (s *stubStreamSource) ActiveStreams() int { return int(s.n.Load()) } +func (s *stubStreamSource) set(v int64) { s.n.Store(v) } + +// waitFor polls cond up to d for true; returns false on timeout. +func waitFor(d time.Duration, cond func() bool) bool { + deadline := time.Now().Add(d) + for time.Now().Before(deadline) { + if cond() { + return true + } + time.Sleep(time.Millisecond) + } + return cond() +} + +func TestImportAdmission_DisabledIsNoOp(t *testing.T) { + a := NewImportAdmission() + // caps (0, 0) -> disabled: every Acquire returns immediately, no queueing. + for i := range 100 { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + release, err := a.Acquire(ctx) + cancel() + if err != nil { + t.Fatalf("Acquire %d failed: %v", i, err) + } + release() + } + + a.mu.Lock() + if a.inFlight != 0 { + t.Fatalf("disabled controller leaked inFlight=%d", a.inFlight) + } + a.mu.Unlock() +} + +func TestImportAdmission_BlocksAtCap(t *testing.T) { + a := NewImportAdmission() + a.SetCaps(2, 1) + + r1, err := a.Acquire(context.Background()) + if err != nil { + t.Fatalf("acquire 1: %v", err) + } + r2, err := a.Acquire(context.Background()) + if err != nil { + t.Fatalf("acquire 2: %v", err) + } + + // Third acquire must block. + acquired := make(chan struct{}) + go func() { + r, err := a.Acquire(context.Background()) + if err != nil { + t.Errorf("acquire 3: %v", err) + return + } + close(acquired) + r() + }() + + select { + case <-acquired: + t.Fatal("third Acquire should not have returned while cap=2 was full") + case <-time.After(50 * time.Millisecond): + // Good — it's blocked. + } + + // Release one — third should be granted. + r1() + select { + case <-acquired: + case <-time.After(time.Second): + t.Fatal("third Acquire did not unblock after release") + } + r2() +} + +func TestImportAdmission_FIFO(t *testing.T) { + a := NewImportAdmission() + a.SetCaps(1, 1) + + // Hold the single slot. + hold, err := a.Acquire(context.Background()) + if err != nil { + t.Fatalf("hold: %v", err) + } + + const n = 5 + order := make(chan int, n) + var wg sync.WaitGroup + for idx := range n { + wg.Add(1) + go func() { + defer wg.Done() + r, err := a.Acquire(context.Background()) + if err != nil { + t.Errorf("waiter %d: %v", idx, err) + return + } + order <- idx + // Small pause so the next waiter cleanly sees us drop. + time.Sleep(2 * time.Millisecond) + r() + }() + // Ensure goroutines enqueue in order. The Acquire path takes the lock + // then registers as a waiter atomically, so a brief sleep is enough to + // avoid scheduling races for this test. + time.Sleep(2 * time.Millisecond) + } + + // Release the held slot — the queue should drain FIFO. + hold() + wg.Wait() + close(order) + + want := 0 + for got := range order { + if got != want { + t.Fatalf("FIFO violation: got %d, want %d", got, want) + } + want++ + } +} + +func TestImportAdmission_StreamAwareCap(t *testing.T) { + src := &stubStreamSource{} + a := NewImportAdmission() + a.SetStreamSource(src) + a.SetCaps(3, 1) + + // No streams -> capIdle=3. + r1, _ := a.Acquire(context.Background()) + r2, _ := a.Acquire(context.Background()) + r3, _ := a.Acquire(context.Background()) + + // Activate streams. Existing in-flight imports remain. + src.set(1) + a.NotifyStreamChange() + + // A fourth must block because capWhileStreaming=1 and inFlight=3. + blocked := make(chan struct{}) + go func() { + r, err := a.Acquire(context.Background()) + if err == nil { + close(blocked) + r() + } + }() + select { + case <-blocked: + t.Fatal("Acquire should be blocked while streams active and inFlight > capWhileStreaming") + case <-time.After(50 * time.Millisecond): + } + + // Release two: inFlight=1 == capWhileStreaming, still no admission. + r1() + r2() + select { + case <-blocked: + t.Fatal("Acquire should still be blocked at capWhileStreaming") + case <-time.After(50 * time.Millisecond): + } + + // Stop streaming — cap returns to 3 and the waiter is granted. + src.set(0) + a.NotifyStreamChange() + select { + case <-blocked: + case <-time.After(time.Second): + t.Fatal("Acquire should have been granted after streams ended") + } + r3() +} + +func TestImportAdmission_GrowOnSetCapsWakesWaiters(t *testing.T) { + a := NewImportAdmission() + a.SetCaps(1, 1) + + hold, _ := a.Acquire(context.Background()) + + const n = 3 + granted := make(chan int, n) + var releases []func() + var rmu sync.Mutex + for idx := range n { + go func() { + r, err := a.Acquire(context.Background()) + if err != nil { + t.Errorf("waiter %d: %v", idx, err) + return + } + rmu.Lock() + releases = append(releases, r) + rmu.Unlock() + granted <- idx + }() + } + + // Wait for them to enqueue. + if !waitFor(time.Second, func() bool { + a.mu.Lock() + defer a.mu.Unlock() + return len(a.waiters) == n + }) { + t.Fatalf("expected %d waiters", n) + } + + // Grow capIdle to fit them all without releasing the holder. + a.SetCaps(1+n, 1) + + for i := range n { + select { + case <-granted: + case <-time.After(time.Second): + t.Fatalf("waiter %d not granted after SetCaps grew the cap", i) + } + } + + hold() + rmu.Lock() + for _, r := range releases { + r() + } + rmu.Unlock() +} + +func TestImportAdmission_CtxCancelRemovesWaiter(t *testing.T) { + a := NewImportAdmission() + a.SetCaps(1, 1) + + hold, _ := a.Acquire(context.Background()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + _, err := a.Acquire(ctx) + done <- err + }() + + // Wait for the waiter to be enqueued. + if !waitFor(time.Second, func() bool { + a.mu.Lock() + defer a.mu.Unlock() + return len(a.waiters) == 1 + }) { + t.Fatal("waiter never enqueued") + } + + cancel() + select { + case err := <-done: + if err == nil { + t.Fatal("expected ctx error on cancel, got nil") + } + case <-time.After(time.Second): + t.Fatal("Acquire did not return after ctx cancellation") + } + + // Waiter slot must be reclaimed; inFlight must remain 1 (the holder). + a.mu.Lock() + if len(a.waiters) != 0 { + t.Fatalf("expected 0 waiters after cancel, got %d", len(a.waiters)) + } + if a.inFlight != 1 { + t.Fatalf("expected inFlight=1, got %d", a.inFlight) + } + a.mu.Unlock() + + hold() +} + +func TestImportAdmission_CtxCancelRaceForwardsGrant(t *testing.T) { + // Race condition: a waiter is granted at the same moment its ctx cancels. + // We must not consume the grant and starve the next waiter. + a := NewImportAdmission() + a.SetCaps(1, 1) + + hold, _ := a.Acquire(context.Background()) + + ctxA, cancelA := context.WithCancel(context.Background()) + doneA := make(chan error, 1) + go func() { + r, err := a.Acquire(ctxA) + if err == nil { + // Real callers must release on success even if their ctx fired + // concurrently; the controller only forwards the grant when the + // ctx.Done branch is the one selected. + r() + } + doneA <- err + }() + + // Second waiter (B) is queued behind A. + doneB := make(chan struct{}) + go func() { + r, err := a.Acquire(context.Background()) + if err == nil { + close(doneB) + r() + } + }() + + if !waitFor(time.Second, func() bool { + a.mu.Lock() + defer a.mu.Unlock() + return len(a.waiters) == 2 + }) { + t.Fatal("expected 2 waiters") + } + + // Race: release & cancel A simultaneously. Whichever fires first, B must + // eventually be granted (no lost wake-up). + go cancelA() + hold() + + select { + case <-doneA: + case <-time.After(time.Second): + t.Fatal("A never returned") + } + select { + case <-doneB: + case <-time.After(time.Second): + t.Fatal("B never granted — lost wake-up bug") + } +} diff --git a/internal/pool/manager.go b/internal/pool/manager.go index 5137fc97..8a0205b7 100644 --- a/internal/pool/manager.go +++ b/internal/pool/manager.go @@ -58,6 +58,25 @@ type Manager interface { // SetProviderIDs sets a mapping between pool names and configuration IDs. SetProviderIDs(mapping map[string]string) + + // AcquireImportSlot blocks until an admission slot is available for an + // NZB import to start, or ctx is cancelled. The returned release function + // must be called exactly once when the import has finished (success or + // failure). When admission caps are unconfigured (both 0) it is a no-op. + AcquireImportSlot(ctx context.Context) (release func(), err error) + + // SetAdmissionCaps configures the two import-concurrency caps: + // capIdle applies when no stream is active; capWhileStreaming applies + // while any stream is active. A cap of 0 means unlimited. + SetAdmissionCaps(capIdle, capWhileStreaming int) + + // SetStreamSource wires the activity signal so admission can adapt to + // whether any stream is currently active. + SetStreamSource(src StreamActivitySource) + + // NotifyStreamChange must be called by the stream source whenever its + // active stream count changes, so the admission gate can re-evaluate. + NotifyStreamChange() } // StatsRepository defines the interface for persisting pool statistics @@ -84,14 +103,16 @@ type manager struct { ctx context.Context logger *slog.Logger quotaWatchCancel context.CancelFunc + admission *ImportAdmission } // NewManager creates a new pool manager func NewManager(ctx context.Context, repo StatsRepository) Manager { return &manager{ - ctx: ctx, - repo: repo, - logger: slog.Default().With("component", "pool"), + ctx: ctx, + repo: repo, + logger: slog.Default().With("component", "pool"), + admission: NewImportAdmission(), } } @@ -427,6 +448,31 @@ func (m *manager) ResetProviderQuota(ctx context.Context, poolName string) error return m.resetProviderQuotaLocked(ctx, poolName) } +// AcquireImportSlot blocks until an import admission slot is available or ctx +// is cancelled. See ImportAdmission.Acquire. +func (m *manager) AcquireImportSlot(ctx context.Context) (func(), error) { + return m.admission.Acquire(ctx) +} + +// SetAdmissionCaps configures the import-concurrency caps. capIdle applies +// when no stream is active; capWhileStreaming applies while any stream is +// active. A cap of 0 means unlimited. +func (m *manager) SetAdmissionCaps(capIdle, capWhileStreaming int) { + m.admission.SetCaps(capIdle, capWhileStreaming) +} + +// SetStreamSource wires the source used to determine whether streams are +// currently active. +func (m *manager) SetStreamSource(src StreamActivitySource) { + m.admission.SetStreamSource(src) +} + +// NotifyStreamChange forwards a stream-count change to the admission gate so +// it can wake or hold waiters according to the new effective cap. +func (m *manager) NotifyStreamChange() { + m.admission.NotifyStreamChange() +} + // SetProviderIDs sets a mapping between pool names and configuration IDs func (m *manager) SetProviderIDs(mapping map[string]string) { m.mu.Lock()