Skip to content

feat(pool): elastic import admission to prioritize streams#595

Open
javi11 wants to merge 1 commit into
mainfrom
session/zealous-bartik-e2c7e9
Open

feat(pool): elastic import admission to prioritize streams#595
javi11 wants to merge 1 commit into
mainfrom
session/zealous-bartik-e2c7e9

Conversation

@javi11
Copy link
Copy Markdown
Owner

@javi11 javi11 commented May 20, 2026

Summary

Under high load, VFS/FUSE streams could starve when many ARR-driven NZB imports ran concurrently. nntppool v4 already has priority dispatch (streams use BodyPriority), but priority only kicks in once a connection frees — when every connection is mid-body for a long-running import, even priority requests wait.

This PR adds an elastic pool-level admission controller with two adaptive caps so imports yield to streams under load.

What changed

  • internal/pool/admission.go (new) — ImportAdmission, an adaptive FIFO counting semaphore with a StreamActivitySource interface, ctx-cancel-safe Acquire with grant forwarding (no lost wake-ups), and a fast-path no-op when both caps are 0.
  • internal/pool/manager.goManager interface gains AcquireImportSlot, SetAdmissionCaps, SetStreamSource, NotifyStreamChange.
  • internal/importer/processor.goProcessNzbFile acquires an admission slot before any pool work and releases on return. This is the single chokepoint for queue worker, scanner watcher and background-reprocess paths. The gate is at the import-entry boundary, so the existing 30 s body timeouts inside the parser are never consumed by queue waits.
  • internal/api/stream_tracker.go — atomic activeCount, ActiveStreams() int, SetChangeNotifier. AddStream/Remove increment/decrement and notify the pool.
  • internal/api/server.go — wires streamTracker ↔ poolManager in NewServer.
  • internal/config/manager.go + accessors.go — new ImportConfig.MaxConcurrentImports and MaxConcurrentImportsWhileStreaming plus accessors. Both default to 0 (unlimited → byte-identical to today).
  • internal/importer/service.go — pushes caps to the pool at startup and inside the existing OnConfigChange handler so live config reloads take effect.
  • internal/pool/admission_test.go (new) — race-detector-clean tests covering disabled fast-path, cap blocking, FIFO order, stream-aware shrink, growing caps waking waiters, ctx-cancel cleanup, and the grant-forwarding race.
  • Mock updates in internal/health/repair_e2e_test.go and internal/nzbfilesystem/metadata_remote_file_test.go for the expanded interface.

Why the gate is at NZB-entry, not per-Body

Body fetches carry a 30 s timeout (parser.go:537, parser.go:737). A gate held there would consume the timeout while waiting for admission — under contention an import could time out before reaching the network and trigger spurious retries. The gate sits before the import starts work; the timer doesn't start running while an import is queued at the gate.

Once admitted, every cp.Body / cp.BodyAsync runs untouched — same parallelism (MaxImportConnections), same 30 s budget, same retry behaviour.

Behaviour example

Provider exposes ~30 connections. Recommended config:

import:
  max_concurrent_imports: 6                   # cap when no streams active
  max_concurrent_imports_while_streaming: 1   # cap while any stream is active
  • No streams active → up to 6 NZBs running × ~5 segment fetches ≈ saturates pool, good import throughput.
  • Stream opens → no new NZB starts; in-flight ones drain over a few seconds and free connections; BodyPriority picks them up immediately.
  • Stream closes → NotifyStreamChange wakes up to 5 queued imports.

No bytes wasted, no retries, no preemption.

Rollout

Defaults are 0/0 → controller disabled → behaviour byte-identical to current. Operators opt in via the two new YAML keys.

Not in this PR

  • Bounding the currently-unbounded BodyAsync call in parser.go:783 (per-NZB internal concurrency). Orthogonal to streams-vs-imports priority; logged as a follow-up.
  • Frontend config UI for the new knobs.

Test plan

  • go build ./... clean.
  • go vet ./... clean.
  • go test -race -count=1 clean for internal/pool, internal/config, internal/api, internal/health, internal/nzbfilesystem, internal/importer/....
  • Manual smoke under real load: trigger 20+ NZB imports, start a stream, observe TTFB stays low and queued imports remain pending until stream closes.
  • Verify regression: with both caps at 0, behaviour matches main.

Under high load, VFS/FUSE streams could starve when many ARR-driven NZB
imports ran concurrently. nntppool v4 already has priority dispatch
(streams use BodyPriority), but priority only kicks in once a connection
frees — when every connection is mid-body for a long-running import,
even priority requests wait.

Add a pool-level admission controller with two adaptive caps:
- max_concurrent_imports (capIdle) — when no stream is active
- max_concurrent_imports_while_streaming — when any stream is active

The gate sits at Processor.ProcessNzbFile entry (single chokepoint for
queue worker, scanner watcher, background reprocess) — before any pool
work, so the 30s body timeouts can never be eaten by queue waits.

StreamTracker keeps an atomic active-stream count and notifies the pool
on Add/Remove; the pool reads the count to pick the effective cap and
wakes queued waiters when streams end.

Both caps default to 0 (unlimited) — byte-identical behaviour until
configured.
@iPromKnight
Copy link
Copy Markdown
Contributor

I prefer this approach yeah!
I was hessitant to make it a user facing configurable - but its right it should be

@javi11
Copy link
Copy Markdown
Owner Author

javi11 commented May 20, 2026

Nice let me finish the review do a test and we are good to go. Thanks for the contribution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants