feat: connection handling improvements#589
Conversation
|
With this aproach you don't get all of what your connections can handle since the queue is controlled in the nntppool library by a round-robin connection handling. Do you have an issue while streaming? did you tryed reducing the number of connections used for import? |
|
Thanks — fair pushback, and yeah, reducing the per-import connection count is the obvious first lever. That was actually the first thing I tried, but the issue here isn’t steady-state throughput, it’s the burst profile. The concrete repro is what I called out in S4/S7 in the PR: multiple NZB parses landing concurrently, each parse independently fanning out fetchAllFirstSegments via cp.Body / cp.BodyAsync with effectively no shared accounting between them. So you end up with N parses × C import connections all arriving at nntppool’s dispatcher at roughly the same time. Reducing C helps average-case behaviour, but it doesn’t actually bound the burst. And if we reduce it far enough to make the burst safe under contention, the import path ends up permanently underutilised whenever there’s no active streaming workload. The lane split (I7) is really the part solving the playback starvation issue. Imports are constrained to roughly ~40% while streams are active, but can elastically consume effectively all available capacity when the stream lane is idle. The global cap (I5) mainly exists to give those lanes a shared denominator. You’re absolutely right that, in isolation, it’s weaker than nntppool’s weighted RR scheduler. I’m happy to drop the reserve behaviour entirely so that: streamCap + importCap == Σ(conn × inflight) That addresses the “capacity left on the table” concern without losing the lane isolation behaviour that’s preventing stream starvation. Streaming by itself was never really the issue. The failures only surfaced when imports and streams collided concurrently, alongside the disconnect / Close() behaviour noted in S9, which is orthogonal to the actual connection budgeting work. Now we could possibly remove I5 and see if S2 still passes which is why I got the tests and harness created up front. This would let nntppool handle the global ceiling with its round robin approach, I can test in little while. The whole issue is the conntrack table + number of goroutines left in a wait state which cause starvation. Without the above any version of altmount I run in k8s with a gluetun WireGuard sidecar causes massive instability when both streaming and importing occurred essentially crashing the kernel based WireGuard interface. |
|
In fact could drop the global semaphore (I5) and replace fixed-partition lanes with stream-protected reservation: globalSem(N) + importSem(N − streamFloor). Streams acquire only globalSem. Imports acquire both. Stream floor = ceil(N × 0.6). Does that sound better? That should still allow the storm scenarios to pass I think. |
|
Sounds good let's do that an see how performs |
|
Pushed an update. I’ve changed the lane model so it’s now a stream-protected reservation rather than a hard fixed partition. That should directly address the utilisation concern. Concretely: globalSem(N)is still the absolute ceiling. Imports are now gated by: importSem(N - streamFloor)where: streamFloor = ceil(N * 0.6)Streams only acquire the global semaphore. Imports acquire So the net effect is: When imports are idle, streams can elastically use the full When imports saturate, they can only consume up to Imports are slowed down under stream load, but they do not stop completely, because every stream completion releases a global slot back into the shared pool. On the That’s the actual failure mode I’m trying to protect against in the gluetun/k8s setup: the WireGuard interface falls over because we exhaust conntrack before RR has anything useful to balance. So the semaphore and RR are not substitutes for each other; they’re operating at different layers. Same reasoning on reducing Lowering the per-job count helps within a single parse, but it still doesn’t bound the cross-job burst, which is the S7 case. The two controls stack rather than replace each other, and I’ve documented that in I’ve also added a new invariant, TestStorm_StreamingElasticityWhenImportsIdleasserts that The original S2/S4/S7 tests are still pinning the important behaviours. S4 has just been reworked to assert the reservation model rather than the old partition model. I’ve also added pool-internal tests for: The deadlock stress is currently running 32 workers for 500ms, around 33k ops, with no deadlock. Finally, I added a design rationale section at the top of |
|
Still i think this solution has a problem, if you starve of connections, you can reach the point of getting timeouts at import that are waiting for a import slot. Wouldn't be better to get the slot before grab the import job from database and releae it once the import is done? that way you are not playing with the internal timeouts of each part |
|
yeah, I think you’re right that it’s a real failure mode. The test I added (TestStorm_ImporterParallelNzbFanOut) proves the bound — MaxInFlight <= importCap — but it doesn’t prove wait time per segment. With sustained stream pressure, an individual cp.Body inside an import could sit waiting on AcquireImportSlot long enough to hit its own 30s context timeout. The semaphore itself doesn’t timeout, but the caller’s context does, so segment-level gating does leak timeout pressure into the import path. That isn’t really pinned by I7b / S7. The trade-off I picked was per-wire-call rather than per-job, mostly for utilisation:
So they’re not quite equivalent. Per-call gives better throughput, but per-job gives a much harder ceiling on how many import jobs can reach the dispatcher. There’s probably a middle option too: keep per-wire-call gating, but make AcquireImportSlot non-blocking-with-fallback, so callers back off rather than queueing goroutines on the semaphore. Honest answer though: for the gluetun/k8s case that started this, your model is probably the better fit. The goal here is conntrack/goroutine protection more than squeezing every bit of import throughput out of the lane. I can rework it so the slot is acquired in processNextItem in the queue manager, held across the whole processItem, and released on completion. That would also simplify the parser-side code nicely, since it drops the three AcquireImportSlot call sites in parser.go, and the invariant becomes “one slot per active import job” rather than “one slot per wire call |
6da86c2 to
9692218
Compare
|
Now acquires the slot like you have suggested. Been running 24 hours on my cluster without conntrack table storm :) |
|
mm what do you think on something that just ajust the import part while streaming instead of also make a 40% 60%? so we want always to priorice the streaming no mather what. something like this https://github.com/javi11/altmount/pull/595/changes |
|
Hmm yeah ok. The 60/40 split is janky agreed. The one thing I want to make sure we don’t lose in the swap: #589 has a chunk of work that’s got nothing to do with streams-vs-imports budgeting, and I don’t want it to evaporate when the admission controller lands. Specifically: S1/S3 — bounded retry + jitter (Attempts(5) → Attempts(2), [50,150]ms jitter so concurrent retriers desync instead of thundering-herd against a recovering provider). S6 — bounded closer-worker pool, so a 50-seek scrub burst produces a goroutine delta of ~4 instead of ~50. S9 — the Close() interrupt. This is the big one for me: today a client disconnect mid-read pins the goroutine and its connection slot for the full segment latency, because Close waits on mvf.mu that the in-flight Read is holding. With Plex/Jellyfin scrubbing that multiplies across every disconnect cycle, and it’s a real contributor to the conntrack/goroutine pileup that started all this. None of those are about admission control — they’re connection-lifecycle bugs that stand on their own. So how do you want to slice this? My suggestion: I strip #589 right down to just those fixes + the storm harness (fakepool / segments / goroutines) and STREAMING_INVARIANTS.md, pull the admission/lane stuff out of it entirely, and we land #595 as the budgeting solution. That way each PR does one thing and the diffs stay reviewable. One offer on #595 while I’m here — the fakepool + goroutines harness might be handy for race-testing the ImportAdmission grant-forwarding path under -race, since lost-wakeup bugs are exactly the kind of thing it’s built to catch. Happy to wire up a storm test against it if useful. |
Reworks the streaming and metadata-virtual-file paths to bound retry, goroutine, and connection-slot fan-out under realistic Plex/Jellyfin scrub patterns and provider-recovery storms. Adds a deterministic fakepool test seam so each invariant is pinned by a regression test that asserts MaxInFlight / goroutine / call-count bounds directly. Cross-process budgeting between streams and imports (admission control) is intentionally NOT in this PR — that work belongs in the separate admission-controller PR. This change touches only the connection-lifecycle layer. S1 + S3 (usenet/usenet_reader.go) retry.Attempts(5) -> retry.Attempts(2) so a permanently failing segment produces at most 2 wire calls instead of 5. Replace the 20ms FixedDelay with retry.CombineDelay(BackOffDelay, RandomDelay) + retry.MaxJitter(100ms) so concurrent retriers desync into [50, 150]ms instead of thundering-herd against a recovering provider. ErrArticleNotFound still never retries; DeadlineExceeded still retries immediately on a fresh round-robin connection. S6 (nzbfilesystem/metadata_remote_file.go) Replace closeWg.Go's unbounded goroutine fan-out in closeCurrentReader with a per-file bounded closer pool (closerWorkerCount = 4). A 50-seek scrub burst produces a goroutine delta of ~4 instead of ~50, each holding any in-flight pool slot it had open. When the channel fills, the next Seek runs Close inline as backpressure. S5 + EOF straddle (nzbfilesystem/metadata_remote_file.go) Add randomReadCache: a per-file LRU (size 8) of full-segment bytes for the ephemeral ReadAt path. 200 small reads within an 8-segment hot window now produce 8 wire calls instead of 200. Clamp the slice bound to min(end-off+1, len(p)) so reads that straddle EOF in a partially-filled final segment cannot panic with "slice bounds out of range". S8 (api/speedtest_coordinator.go + provider_speedtest_handler.go) handleTestProviderSpeed previously called nntppool.NewClient inline per HTTP request, opening Provider.MaxConnections fresh slots per invocation. Route through pool.Manager.GetPool when the provider is active; otherwise through a process-wide speedtestCoordinator with singleflight dedupe by providerID and a 5-minute per-provider client cache. A janitor goroutine ticks every clientTTL/5 and evicts expired entries; shutdown is idempotent. S9 (nzbfilesystem/metadata_remote_file.go + usenet/usenet_reader.go) When a streaming HTTP client disconnects mid-read, the request goroutine calls mvf.Close synchronously; a concurrent mvf.Read holds mvf.mu for the full segment-download latency (15s x retries). Add UsenetReader.Interrupt (non-blocking ctx-cancel + cond broadcast + segment closer) and an atomic interruptHandle on MetadataVirtualFile so Close fires the interrupt BEFORE contending for mvf.mu. Close now returns in microseconds regardless of segment latency. Also move streamTracker.Remove + streamID write inside mvf.mu's critical section to satisfy the race detector. Test infrastructure Narrow pool.NntpClient interface so tests can substitute a deterministic fake without standing up real NNTP connections. internal/testsupport/fakepool exposes per-message-ID latency / error injection and an in-flight high-water counter (MaxInFlight) used as the primary observability primitive for the invariants. Companion helpers: testsupport/segments (deterministic payload / message-ID generation) and testsupport/goroutines (snapshot + AssertReturnedToBaseline for closer-accumulation scenarios). internal/usenet/STREAMING_INVARIANTS.md is the contract document listing every pinned invariant (I1-I4, I6, I9-I12) with the corresponding test. Pinned by the new storm tests: - TestStorm_RetryAmplifiesPerMessageCallCount (S1) - TestStorm_RetryUsesFixedDelayInsteadOfExponentialBackoff (S3) - TestStorm_RandomReadAtCreatesEphemeralReaderPerCall (S5) - TestRandomReadCache_EOFReadDoesNotPanic - TestStorm_SeekSpamAccumulatesCloserGoroutines (S6) - TestStorm_SpeedTestBypassesPoolManager (S8) - TestStorm_ClientDisconnectHoldsPoolSlotForUpTo30s (S9) - TestStorm_ConcurrentDisconnectsPinManyGoroutines (S9) go test -race -count=1 clean across the touched packages.
b768e8c to
32625f6
Compare
|
I've stripped out all the admission/lane work. It now deals exclusively with connection-lifecycle and observability: S1/S3 — bounded retry with jitter (Attempts(2), [50, 150]ms jitter) so a permanently failing segment caps at 2 wire calls and concurrent retriers desync against a recovering provider instead of thundering-herd. Cross-process budgeting between streams and imports is intentionally not in this PR — that's #595's job. These two PRs are now genuinely orthogonal and reviewable in isolation. Happy to wire the storm harness against ImportAdmission's grant-forwarding path under -race once #595 is closer to landing — the lost-wakeup class of bug is exactly what the harness was built to catch. |
|
I have both this and #595 combined in my dev branch - will let it run a while |
|
Let me know what is the result, can you remove the readme in usenet there? |
|
Yep that was there just to document the conntrack issues. I'll get rid of it Been solid all day combined with 595 |
| // endpoint can't blow up a connection budget by request-rate alone. | ||
| // Initialized in NewServer; tests that construct Server directly | ||
| // trigger lazy init via speedtestOnce. | ||
| // See STREAMING_INVARIANTS.md I11. |
There was a problem hiding this comment.
Will trim. That comment block is overkill for a field decl — the lazy-init / TTL detail belongs in the coordinator file itself, not on the Server struct. Cutting it to a one-liner pointing at I11.
| // by the ephemeral ReadAt path to coalesce random-access scrubbing. | ||
| // Lazily initialized on first ephemeral ReadAt. Held under mvf.mu. | ||
| // See STREAMING_INVARIANTS.md I9. | ||
| randomReadCache *lru.Cache[int, []byte] |
There was a problem hiding this comment.
But we already have the segment cache why we need this?
There was a problem hiding this comment.
they're not duplicates, they operate at different layers and only one of them is always-on.
segcache is opt-in (config-gated), persistent, disk-backed, and cross-file. It's plumbed through UsenetReader via the SegmentStore interface — Get before the wire call, Put after. So it's reachable from the normal streaming path that goes through a long-lived UsenetReader.
The random-read LRU sits above UsenetReader in MetadataVirtualFile.ReadAtContext's ephemeral path. That path doesn't use the long-lived reader — createReaderAtOffset builds a fresh UsenetReader per ephemeral call. Two consequences:
Two ReadAts at different byte offsets within the same segment each build a separate UsenetReader, each downloading the same ~750KB segment. SegmentStore would dedupe at the message-ID level if it were enabled, but most users don't turn it on (requires cache_path config + disk space + opt-in flag).
Even when segcache is enabled, the LRU avoids the disk round-trip + JSON catalog lookup + sha256 keying for the hot scrubbing window. It's a pure in-memory map keyed by segment index.
So they stack rather than overlap:
segcache = long-lived, cross-file, on-disk, opt-in, granularity = message-ID
random-read LRU = short-lived (per-file), in-memory, always-on, granularity = segment index within this file's ephemeral path
Pulling the LRU out would re-introduce S5 for everyone running without segcache configured, which is the default.
| // (storm S5). Only viable for plain (unencrypted, non-nested) | ||
| // segments — encrypted streams don't map cleanly to segment | ||
| // boundaries. | ||
| if n, served := mvf.tryServeFromRandomReadCache(readCtx, p, off, end); served { |
There was a problem hiding this comment.
don't understand this random reader aproach what will improve?
There was a problem hiding this comment.
Plex/Jellyfin scrubs aren't one seek — they're a burst of small probe reads (~20–50 × 4–64KB) refreshing the keyframe map, and almost all land inside the same handful of ~750KB segments.
Without coalescing, the ephemeral path builds a fresh UsenetReader per probe — each one downloading the full segment to serve a ~16KB window out of it. 200 probes across an 8-segment hot window = 200 wire calls, 200 import slots pinned, 200 conntrack entries. With the LRU it's 8 wire calls (one per unique segment) and the rest serve from RAM.
TestStorm_RandomReadAtCreatesEphemeralReaderPerCall pins that ratio directly. S5 was the biggest single contributor to conntrack pressure under active playback before this landed.
Cache is intentionally tiny (8 segments/file) and bypassed for encrypted / nested-source files since their segment boundaries don't map to plaintext byte ranges.
|
I add it 4 more comments regarding the random cache |
|
I just cocked up and deleted the branch, which has closed the pr (when i rebased on main to order past new commits.... been a long day - will open a new pr if thats ok? |
Connection-lifecycle storms + storm test harness
This branch fixes a class of connection-management bugs in the segment-download pipeline that surface as: retry storms against a recovering provider, slow
Closeon client disconnect, goroutine accumulation during Plex/Jellyfin scrubbing, wasteful re-fetches on randomReadAt, and connection-budget exhaustion via the speed-test endpoint.The investigation reproduced each one end-to-end with deterministic tests, fixed each at the production code level, and inverted each test to pin the post-fix invariant so the class can't regress. The resulting contract is documented in
internal/usenet/STREAMING_INVARIANTS.md.What was wrong
retry.Attempts(5)with a fixed 20ms delayReadAts within the same segmentUsenetReaderper callcloseCurrentReaderdidcloseWg.Goper close (unbounded fan-out)handleTestProviderSpeedcallednntppool.NewClientinline per request — no cache, no dedupe, no rate-limitClosefor the full segment latency × every disconnectmvf.Closewaited formvf.mu, which the in-flightReadwas holdingWhat landed
Each item below is named, pinned by a test, and described in detail in STREAMING_INVARIANTS.md. The full contract there is the authoritative reference; this list is the executive summary.
I6 — Bounded retry with jitter (
Attempts(2)+MaxJitter(100ms))Permanently failing segments now produce at most 2 wire calls (was 5). Retry delays carry
[50, 150]msjitter so concurrent readers desynchronize instead of thundering-herd against a recovering provider.ErrArticleNotFoundis never retried (I3);DeadlineExceededstill retries immediately on a fresh round-robin connection.I9 — Per-file LRU coalesces random
ReadAtMetadataVirtualFile.ReadAtContext's ephemeral path uses an 8-segment LRU of full segment bytes. 200 randomReadAts within an 8-segment hot window now produce 8 wire calls instead of 200. Cache is bypassed for encrypted and nested-source files because their plaintext doesn't align with segment boundaries. Worst-case memory: ~6MB per open file (8 × 768KB default segment size).Also fixes an EOF-straddle panic in the cache slice path. When a FUSE read crosses end-of-file in a partially-filled final segment, the read window is clamped to
min(end−off+1, len(p))so the slice can't run past the segment's readable size.I10 — Bounded closer-worker pool
closeCurrentReaderhands the detached reader to a per-file bounded worker pool (closerWorkerCount=4). If the channel fills, the nextSeekrunsCloseinline as backpressure rather than growing the goroutine count. A 50-seek burst now produces a goroutine delta of 4 instead of ~50.I11 — Speed-test through
pool.Manager+ singleton coordinatorhandleTestProviderSpeednow:pool.Manager.GetPool— if the provider is already in the running pool, the test runs against that client (no extra connections).speedtestCoordinator: asingleflight.Groupkeyed by providerID dedupes concurrent requests, and a per-providernntppool.Clientcache (5-minute TTL) reuses the same client across requests within the window.A janitor goroutine ticks every
clientTTL/5and evicts expired entries (so an idle pod doesn't retain N*nntppool.Clientinstances for the rest of its lifetime);shutdownis idempotent. A monitoring script polling the endpoint can no longer exhaust the provider's connection budget.I12 —
MetadataVirtualFile.Closereturns fast on client disconnectNew
interruptHandleatomic tracks the latest reader.Closefires ctx-cancel on theUsenetReaderbefore contending formvf.mu, so the blockedBodyPriorityreturnsctx.Canceled, the concurrentReadreleasesmvf.mu, andClosecompletes in microseconds regardless of segment latency. Required a newUsenetReader.Interrupt(non-blocking, idempotent cancel + cond broadcast + segment closer).Also fixes a
streamIDrace inClosethat the race detector flagged during S9 development —streamTracker.Removeand thestreamID = ""write are now inside themvf.mucritical section.This is the biggest win in the PR for the gluetun/k8s case that motivated the investigation: without S9, every Plex/Jellyfin scrub-disconnect pinned a goroutine and its connection slot for the full segment latency, and the conntrack-table growth that produced eventually destabilised the kernel WireGuard interface.
Test infrastructure introduced
To make the storms reproducible deterministically, the branch ships three reusable test packages:
internal/testsupport/fakepool— in-process fake of thenntppool.Clientinterface (newpool/nntpclient.godefines the narrow seam). Per-message-ID latency/error injection, in-flight counters (MaxInFlight,PerMessageCalls,BodyPriorityCalls), and aGateprimitive for pinning concurrency at a known point.internal/testsupport/segments— deterministic payload/message-ID generator.FileBytes(n, size)is the reassembly oracle used by sequential-read tests.internal/testsupport/goroutines—Snapshot+AssertReturnedToBaselinefor catching closer-accumulation and leak scenarios.These are not internal helpers smuggled into
_test.gofiles — they're proper packages so storm tests acrossinternal/usenet,internal/nzbfilesystem,internal/importer/parser, andinternal/apiall share the same harness and counters.The only production-side compatibility note is that
pool.Manager.GetPoolnow returns the narrowpool.NntpClientinterface instead of*nntppool.Client. Both are satisfied by the production type; the seam is only there to let tests substitute a fake.How the storm methodology worked
Each fix landed via the same three-step pattern, designed so future regressions surface as test failures rather than user reports:
goroutine delta ≈ 50 for a 50-seek burst). Include a comment block naming CURRENT BEHAVIOR, TARGET INVARIANT, and the markerIf this fires LOW/HIGH, the fix has landed — invert this assertion.goroutine delta ≤ closerWorkerCount). The entry moves from "Storms reproduced today" to "Currently pinned" inSTREAMING_INVARIANTS.md.Every storm covered by this PR is now in the "Currently pinned" section. The "Storms reproduced today" staging area is empty.
Test plan
go build ./...cleango vet ./...cleango test -count=1clean acrossinternal/pool,internal/usenet,internal/nzbfilesystem,internal/importer/...,internal/api,internal/healthgo test -race -count=1clean forinternal/usenet,internal/nzbfilesystem,internal/pool