Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ func NewServer(
updater: updater.Default(),
}

// Wire stream-activity ↔ pool admission. Streams notify the pool when they
// start/stop; the pool reads the active stream count to pick its
// adaptive import cap.
if poolManager != nil && streamTracker != nil {
streamTracker.SetChangeNotifier(poolManager)
poolManager.SetStreamSource(streamTracker)
}

return server
}

Expand Down
38 changes: 38 additions & 0 deletions internal/api/stream_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import (
// Default timeout for stale streams (4 hours - covers most movie lengths)
const defaultStreamTimeout = 4 * time.Hour

// StreamChangeNotifier is notified whenever the active stream count changes.
// Implemented by pool.Manager; declared here to avoid an api -> pool import
// dependency for the StreamTracker itself.
type StreamChangeNotifier interface {
NotifyStreamChange()
}

// StreamTracker tracks active streams
type StreamTracker struct {
streams sync.Map
Expand All @@ -24,6 +31,15 @@ type StreamTracker struct {
mu sync.Mutex // For history protection
timeout time.Duration
metricsTracker usenet.MetricsTracker

// activeCount is the exact number of entries currently in the streams map.
// Maintained as an int64 counter so ActiveStreams() is O(1) and safe to
// call from hot paths (e.g. the pool admission gate).
activeCount atomic.Int64

// notifier, when set, is notified after every stream add/remove so the
// import-admission cap can react to streams starting/stopping.
notifier StreamChangeNotifier
}

type streamSample struct {
Expand Down Expand Up @@ -253,9 +269,29 @@ func (t *StreamTracker) AddStream(filePath, source, userName, clientIP, userAgen
samples: make([]streamSample, 0, 30), // Preallocate for 1 minute of samples (every 2s)
}
t.streams.Store(id, internal)
t.activeCount.Add(1)
t.notifyChange()
return stream
}

// SetChangeNotifier wires a notifier (typically a pool.Manager) that will be
// signalled whenever the active stream count changes. Pass nil to clear.
func (t *StreamTracker) SetChangeNotifier(n StreamChangeNotifier) {
t.notifier = n
}

// ActiveStreams returns the current number of tracked streams.
// Implements pool.StreamActivitySource (structurally).
func (t *StreamTracker) ActiveStreams() int {
return int(t.activeCount.Load())
}

func (t *StreamTracker) notifyChange() {
if t.notifier != nil {
t.notifier.NotifyStreamChange()
}
}

// Add adds a new stream and returns its ID (implements nzbfilesystem.StreamTracker)
func (t *StreamTracker) Add(filePath, source, userName, clientIP, userAgent string, totalSize int64) string {
return t.AddStream(filePath, source, userName, clientIP, userAgent, totalSize).ID
Expand Down Expand Up @@ -344,6 +380,8 @@ func (t *StreamTracker) Remove(id string) {
t.mu.Unlock()

t.streams.Delete(id)
t.activeCount.Add(-1)
t.notifyChange()
}
}

Expand Down
19 changes: 19 additions & 0 deletions internal/config/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ func (c *Config) GetMaxImportConnections() int {
return c.Import.MaxImportConnections
}

// GetMaxConcurrentImports returns the global cap on concurrent NZB imports
// when no stream is active. 0 means unlimited (current default behaviour).
func (c *Config) GetMaxConcurrentImports() int {
if c.Import.MaxConcurrentImports < 0 {
return 0
}
return c.Import.MaxConcurrentImports
}

// GetMaxConcurrentImportsWhileStreaming returns the cap on concurrent NZB
// imports while at least one stream is active. 0 means unlimited (current
// default behaviour).
func (c *Config) GetMaxConcurrentImportsWhileStreaming() int {
if c.Import.MaxConcurrentImportsWhileStreaming < 0 {
return 0
}
return c.Import.MaxConcurrentImportsWhileStreaming
}

// GetMaxDownloadPrefetch returns max download prefetch with a default fallback.
func (c *Config) GetMaxDownloadPrefetch() int {
if c.Import.MaxDownloadPrefetch <= 0 {
Expand Down
7 changes: 7 additions & 0 deletions internal/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ type ImportConfig struct {
QueueProcessingIntervalSeconds int `yaml:"queue_processing_interval_seconds" mapstructure:"queue_processing_interval_seconds" json:"queue_processing_interval_seconds"`
AllowedFileExtensions []string `yaml:"allowed_file_extensions" mapstructure:"allowed_file_extensions" json:"allowed_file_extensions"`
MaxImportConnections int `yaml:"max_import_connections" mapstructure:"max_import_connections" json:"max_import_connections"`
// MaxConcurrentImports caps the number of NZB imports that may run
// end-to-end at the same time when no stream is active. 0 = unlimited.
MaxConcurrentImports int `yaml:"max_concurrent_imports" mapstructure:"max_concurrent_imports" json:"max_concurrent_imports"`
// MaxConcurrentImportsWhileStreaming caps concurrent imports while at
// least one stream is active, so streams are not starved by imports.
// 0 = unlimited.
MaxConcurrentImportsWhileStreaming int `yaml:"max_concurrent_imports_while_streaming" mapstructure:"max_concurrent_imports_while_streaming" json:"max_concurrent_imports_while_streaming"`
MaxDownloadPrefetch int `yaml:"max_download_prefetch" mapstructure:"max_download_prefetch" json:"max_download_prefetch"`
SegmentSamplePercentage int `yaml:"segment_sample_percentage" mapstructure:"segment_sample_percentage" json:"segment_sample_percentage"`
ReadTimeoutSeconds int `yaml:"read_timeout_seconds" mapstructure:"read_timeout_seconds" json:"read_timeout_seconds"`
Expand Down
13 changes: 10 additions & 3 deletions internal/health/repair_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func (m *mockPoolManager) HasPool() bool { return fal
func (m *mockPoolManager) GetMetrics() (pool.MetricsSnapshot, error) {
return pool.MetricsSnapshot{}, nil
}
func (m *mockPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil }
func (m *mockPoolManager) ResetProviderErrors(_ context.Context) error { return nil }
func (m *mockPoolManager) IncArticlesDownloaded() {}
func (m *mockPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil }
func (m *mockPoolManager) ResetProviderErrors(_ context.Context) error { return nil }
func (m *mockPoolManager) IncArticlesDownloaded() {}
func (m *mockPoolManager) UpdateDownloadProgress(_ string, _ int64) {}
func (m *mockPoolManager) IncArticlesPosted() {}
func (m *mockPoolManager) AddProvider(_ nntppool.Provider) error { return nil }
Expand All @@ -44,6 +44,13 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error
return nil
}
func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {}
func (m *mockPoolManager) AcquireImportSlot(_ context.Context) (func(), error) {
return func() {}, nil
}
func (m *mockPoolManager) SetAdmissionCaps(_ int, _ int) {}
func (m *mockPoolManager) SetStreamSource(_ pool.StreamActivitySource) {}
func (m *mockPoolManager) NotifyStreamChange() {}

// mockARRsService captures TriggerFileRescan calls and returns a configurable error.
type mockARRsService struct {
mu sync.Mutex
Expand Down
12 changes: 11 additions & 1 deletion internal/importer/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (proc *Processor) getCleanNzbName(nzbPath string, queueID int) string {
return baseName
}


func (proc *Processor) SetRecorder(recorder HistoryRecorder) {
proc.recorder = recorder
}
Expand Down Expand Up @@ -170,6 +169,17 @@ func (proc *Processor) checkCancellation(ctx context.Context) error {
// metadata files written to disk; it is populated even on partial failure so callers can clean up.
// Paths prefixed with "DIR:" indicate a metadata directory that should be removed entirely.
func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string, extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string, downloadID *string) (string, []string, error) {
// Gate this import behind the pool admission controller so we can cap how
// many NZB imports run concurrently end-to-end and yield to streams under
// load. The Acquire is a no-op when no caps are configured.
if proc.poolManager != nil {
release, err := proc.poolManager.AcquireImportSlot(ctx)
if err != nil {
return "", nil, fmt.Errorf("import admission cancelled: %w", err)
}
defer release()
}

cfg := proc.configGetter()

// Determine max connections to use
Expand Down
24 changes: 24 additions & 0 deletions internal/importer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type Service struct {
healthRepo *database.HealthRepository // Health repository for updating health status
broadcaster *progress.ProgressBroadcaster // WebSocket progress broadcaster
userRepo *database.UserRepository // User repository for API key lookup
poolManager pool.Manager // Pool manager — used to push admission caps on config change
log *slog.Logger

// Runtime state
Expand Down Expand Up @@ -241,13 +242,25 @@ func NewService(config ServiceConfig, metadataService *metadata.MetadataService,
sabnzbdClient: sabnzbd.NewSABnzbdClient(),
broadcaster: broadcaster,
userRepo: userRepo,
poolManager: poolManager,
log: slog.Default().With("component", "importer-service"),
ctx: ctx,
cancel: cancel,
cancelFuncs: make(map[int64]context.CancelFunc),
paused: false,
}

// Push initial admission caps to the pool so imports are gated from the
// start. Zero values keep the controller disabled, matching prior behaviour.
if poolManager != nil && configGetter != nil {
if cfg := configGetter(); cfg != nil {
poolManager.SetAdmissionCaps(
cfg.GetMaxConcurrentImports(),
cfg.GetMaxConcurrentImportsWhileStreaming(),
)
}
}

// Set recorder for processor
processor.SetRecorder(service)

Expand Down Expand Up @@ -404,6 +417,17 @@ func (s *Service) RegisterConfigChangeHandler(configManager any) {
"old_workers", oldWorkers, "new_workers", newWorkers)
}
}

// Push updated import-admission caps to the pool. Zero values keep
// the admission gate disabled (unlimited).
if s.poolManager != nil {
capIdle := newConfig.GetMaxConcurrentImports()
capWhileStreaming := newConfig.GetMaxConcurrentImportsWhileStreaming()
s.poolManager.SetAdmissionCaps(capIdle, capWhileStreaming)
s.log.InfoContext(s.ctx, "Import admission caps updated",
"max_concurrent_imports", capIdle,
"max_concurrent_imports_while_streaming", capWhileStreaming)
}
})
}

Expand Down
9 changes: 9 additions & 0 deletions internal/nzbfilesystem/metadata_remote_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,15 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error

func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {}

func (m *mockPoolManager) AcquireImportSlot(_ context.Context) (func(), error) {
return func() {}, nil
}

func (m *mockPoolManager) SetAdmissionCaps(_ int, _ int) {}

func (m *mockPoolManager) SetStreamSource(_ pool.StreamActivitySource) {}

func (m *mockPoolManager) NotifyStreamChange() {}

// TestSeekResetsOriginalRangeEnd tests that Seek properly resets originalRangeEnd
// This is critical for video playback - without this fix, seeking causes stale range
Expand Down
Loading
Loading