diff --git a/internal/api/provider_speedtest_handler.go b/internal/api/provider_speedtest_handler.go index 7fe7b6ae..89b3d8a4 100644 --- a/internal/api/provider_speedtest_handler.go +++ b/internal/api/provider_speedtest_handler.go @@ -2,8 +2,6 @@ package api import ( "context" - "crypto/tls" - "fmt" "log/slog" "time" @@ -53,45 +51,17 @@ func (s *Server) handleTestProviderSpeed(c *fiber.Ctx) error { return RespondNotFound(c, "Provider", "") } - host := fmt.Sprintf("%s:%d", targetProvider.Host, targetProvider.Port) - var tlsCfg *tls.Config - if targetProvider.TLS { - tlsCfg = &tls.Config{ - InsecureSkipVerify: targetProvider.InsecureTLS, - ServerName: targetProvider.Host, - } - } - - pool, err := nntppool.NewClient(c.Context(), []nntppool.Provider{ - { - Host: host, - TLSConfig: tlsCfg, - Auth: nntppool.Auth{Username: targetProvider.Username, Password: targetProvider.Password}, - Connections: targetProvider.MaxConnections, - IdleTimeout: 60 * time.Second, - }, - }) - if err != nil { - return RespondInternalError(c, "Failed to create connection pool", err.Error()) - } - defer pool.Close() - - // Resolve provider name matching nntppool's resolveProviderName logic - providerName := host - if targetProvider.Username != "" { - providerName = host + "+" + targetProvider.Username - } - testCtx, cancel := context.WithTimeout(c.Context(), 5*time.Minute) defer cancel() - result, err := pool.SpeedTest(testCtx, nntppool.SpeedTestOptions{ - ProviderName: providerName, - }) + // Prefer the production pool when this provider is already part of + // it — otherwise fall back to the singleton coordinator. Either way + // we route through pool.Manager rather than creating a fresh + // nntppool.Client per request. See STREAMING_INVARIANTS.md I11. + result, err := s.runProviderSpeedTest(testCtx, targetProvider) if err != nil { return RespondInternalError(c, "Speed test failed", err.Error()) } - speed := result.WireSpeedBps / 1024 / 1024 // bytes/sec → MB/s // Update provider config with speed test result @@ -126,3 +96,50 @@ func (s *Server) handleTestProviderSpeed(c *fiber.Ctx) error { Duration: result.Elapsed.Seconds(), }) } + +// runProviderSpeedTest executes the speed test against the given +// provider, preferring the production pool when the provider is part +// of it. Falls back to the per-request speedtest coordinator (cached +// nntppool client + singleflight dedupe) when the provider isn't in +// the active pool. See STREAMING_INVARIANTS.md I11. +func (s *Server) runProviderSpeedTest(ctx context.Context, p *config.ProviderConfig) (*nntppool.SpeedTestResult, error) { + // Always consult the pool manager so a provider already in the + // running pool reuses its connections rather than dialing fresh. + // pool.Manager is required wiring; in tests it may return nil/err. + if s.poolManager != nil { + if cp, err := s.poolManager.GetPool(); err == nil && cp != nil { + if real, ok := cp.(*nntppool.Client); ok { + providerName := p.Host + if p.Username != "" { + providerName = p.Host + "+" + p.Username + } + // Try the production pool first. If the provider isn't + // in it, nntppool returns an error and we fall through + // to the coordinator. + if result, sterr := real.SpeedTest(ctx, nntppool.SpeedTestOptions{ + ProviderName: providerName, + }); sterr == nil { + return result, nil + } + } + } + } + + // Fall back to the dedicated speed-test client (cached + dedupe). + // Lazy-construct the coordinator if Server was assembled outside + // NewServer (e.g. in tests). speedtestOnce makes this safe under + // concurrent calls. + s.speedtestOnce.Do(func() { + if s.speedtest == nil { + s.speedtest = newSpeedtestCoordinator() + } + }) + v, err := s.speedtest.run(ctx, p, func(client *nntppool.Client, providerName string) (any, error) { + return client.SpeedTest(ctx, nntppool.SpeedTestOptions{ProviderName: providerName}) + }) + if err != nil { + return nil, err + } + result, _ := v.(*nntppool.SpeedTestResult) + return result, nil +} diff --git a/internal/api/provider_speedtest_storm_test.go b/internal/api/provider_speedtest_storm_test.go new file mode 100644 index 00000000..3d86b80e --- /dev/null +++ b/internal/api/provider_speedtest_storm_test.go @@ -0,0 +1,130 @@ +package api + +import ( + "context" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/javi11/altmount/internal/config" + "github.com/javi11/altmount/internal/pool" + "github.com/javi11/nntppool/v4" +) + +// provider_speedtest_storm_test.go reproduces the speed-test handler +// storm: every invocation calls nntppool.NewClient directly, bypassing +// pool.Manager entirely. The storm shape is structural — the handler +// doesn't even reach through the seam tests can intercept. +// +// See ../usenet/STREAMING_INVARIANTS.md. + +// countingPoolManager is a pool.Manager that records every call to its +// connection-acquiring methods. The S8 test asserts these counters +// remain zero despite N HTTP requests, proving the handler bypasses +// pool.Manager. +type countingPoolManager struct { + getPoolCalls atomic.Int64 + hasPoolCalls atomic.Int64 +} + +var _ pool.Manager = (*countingPoolManager)(nil) + +func (m *countingPoolManager) GetPool() (pool.NntpClient, error) { + m.getPoolCalls.Add(1) + return nil, nil +} +func (m *countingPoolManager) HasPool() bool { m.hasPoolCalls.Add(1); return false } +func (m *countingPoolManager) SetProviders(_ []nntppool.Provider) error { return nil } +func (m *countingPoolManager) ClearPool() error { return nil } +func (m *countingPoolManager) GetMetrics() (pool.MetricsSnapshot, error) { return pool.MetricsSnapshot{}, nil } +func (m *countingPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { + return nil +} +func (m *countingPoolManager) ResetProviderErrors(_ context.Context) error { return nil } +func (m *countingPoolManager) IncArticlesDownloaded() {} +func (m *countingPoolManager) UpdateDownloadProgress(_ string, _ int64) {} +func (m *countingPoolManager) IncArticlesPosted() {} +func (m *countingPoolManager) AddProvider(_ nntppool.Provider) error { return nil } +func (m *countingPoolManager) RemoveProvider(_ string) error { return nil } +func (m *countingPoolManager) ResetProviderQuota(_ context.Context, _ string) error { + return nil +} +func (m *countingPoolManager) SetProviderIDs(_ map[string]string) {} + +// TestStorm_SpeedTestBypassesPoolManager pins the post-S8 invariant: +// the /providers/:id/speedtest handler routes through pool.Manager +// (and the singleton speedtestCoordinator for non-pool providers) +// rather than creating a fresh nntppool.Client per request. The test +// asserts the structural property: poolManager.GetPool MUST be called +// at least once per request (the prerequisite for reusing the +// production pool), so the application has a chance to dedupe and +// gate concurrent traffic. +// +// Driving the handler against an unreachable host (127.0.0.1 port 1) +// makes the underlying speed test fail fast; the structural assertion +// runs regardless of the outcome. +func TestStorm_SpeedTestBypassesPoolManager(t *testing.T) { + t.Parallel() + const concurrentRequests = 5 + + enabled := true + cfg := config.DefaultConfig() + cfg.Providers = []config.ProviderConfig{ + { + ID: "storm-test-provider", + Host: "127.0.0.1", + Port: 1, // closed port: nntppool's initial ping fails immediately + Username: "user", + Password: "pass", + MaxConnections: 1, // keep resource use minimal + Enabled: &enabled, + }, + } + + cm := &mockConfigManager{cfg: cfg} + cpm := &countingPoolManager{} + server := &Server{configManager: cm, poolManager: cpm} + + app := fiber.New() + app.Post("/api/config/providers/:id/speedtest", server.handleTestProviderSpeed) + + // Fire N concurrent requests. Each will fail (closed port) but the + // structural bypass is what we're measuring, not the outcome. + var wg sync.WaitGroup + start := time.Now() + for i := 0; i < concurrentRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + req := httptest.NewRequest("POST", "/api/config/providers/storm-test-provider/speedtest", nil) + // Tight timeout — we don't care about the response, only the + // fact that the handler took the bypass path. + resp, err := app.Test(req, 3000) + if err == nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + } + wg.Wait() + elapsed := time.Since(start) + + getPoolCalls := cpm.getPoolCalls.Load() + t.Logf("%d concurrent speedtest requests completed in %s; "+ + "poolManager.GetPool calls=%d (invariant: > 0 per request)", + concurrentRequests, elapsed, getPoolCalls) + + // PINNED INVARIANT: handler MUST consult pool.Manager (so it can + // reuse the production pool for already-running providers and the + // singleton speedtestCoordinator for everything else). Without + // this, every request creates a fresh nntppool.Client and dials + // independently. + if getPoolCalls < 1 { + t.Errorf("INVARIANT regression (S8): poolManager.GetPool calls=%d, want >= 1 "+ + "after %d concurrent speedtest requests. handleTestProviderSpeed must "+ + "route through s.poolManager rather than calling nntppool.NewClient inline.", + getPoolCalls, concurrentRequests) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index 817de3aa..d1f4c060 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -5,6 +5,7 @@ import ( "os" "runtime" "strings" + "sync" "sync/atomic" "time" @@ -67,6 +68,15 @@ type Server struct { migrationRepo *database.ImportMigrationRepository updater updater.Updater ready atomic.Bool + + // speedtest dedupes concurrent /providers/:id/speedtest requests + // and caches per-provider nntppool clients for a short TTL so the + // endpoint can't blow up a connection budget by request-rate alone. + // Initialized in NewServer; tests that construct Server directly + // trigger lazy init via speedtestOnce. + // See STREAMING_INVARIANTS.md I11. + speedtest *speedtestCoordinator + speedtestOnce sync.Once } // NewServer creates a new API server that can optionally register routes on the provided mux (for backwards compatibility) @@ -110,6 +120,7 @@ func NewServer( progressBroadcaster: progressBroadcaster, streamTracker: streamTracker, cacheSource: cacheSource, + speedtest: newSpeedtestCoordinator(), fuseManager: NewFuseManager(newMountFactory(nzbFilesystem, configManager, streamTracker)), updater: updater.Default(), } @@ -389,6 +400,9 @@ func (s *Server) Shutdown(ctx context.Context) { if s.fuseManager != nil { s.fuseManager.Stop() } + if s.speedtest != nil { + s.speedtest.shutdown() + } } // handleGetActiveStreams handles GET /api/files/active-streams diff --git a/internal/api/speedtest_coordinator.go b/internal/api/speedtest_coordinator.go new file mode 100644 index 00000000..c33122f5 --- /dev/null +++ b/internal/api/speedtest_coordinator.go @@ -0,0 +1,221 @@ +package api + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + "time" + + "github.com/javi11/altmount/internal/config" + "github.com/javi11/nntppool/v4" + "golang.org/x/sync/singleflight" +) + +// speedtestCoordinator bounds the structural footprint of the +// /providers/:id/speedtest endpoint. +// +// Without coordination, every HTTP request spawned a fresh +// nntppool.NewClient with up to Provider.MaxConnections dial attempts. +// A user (or monitoring script) hitting the endpoint repeatedly opened +// N independent TCP/TLS connection sets in parallel. +// +// With this coordinator: +// - A singleflight.Group dedupes concurrent requests for the same +// provider — only one in-flight speed test per providerID. The +// other callers share its result. +// - Each per-provider nntppool.Client is held in a short-lived cache +// (TTL clientTTL). Subsequent requests within the TTL reuse the +// same client rather than opening a fresh connection set. +// - Speed tests issued against a provider already in the running +// pool prefer that pool (call sites pass a function returning the +// active client) so production traffic and speed tests share the +// same connections. +// +// Safe for concurrent use. +// +// See STREAMING_INVARIANTS.md (S8). +type speedtestCoordinator struct { + sf singleflight.Group + mu sync.Mutex + clients map[string]*cachedSpeedtestClient // keyed by providerID + + // stopCh signals the janitor goroutine to exit. Closed exactly once + // by shutdown via stopOnce. The field itself is immutable after + // construction so janitorLoop can read it without locking. + stopCh chan struct{} + // stopOnce gates shutdown so multiple calls are safe and so the + // janitor channel is closed exactly once. + stopOnce sync.Once + // wg tracks the janitor goroutine so shutdown can wait for it. + wg sync.WaitGroup +} + +type cachedSpeedtestClient struct { + client *nntppool.Client + expiresAt time.Time + host string // tracked for logging +} + +// clientTTL bounds how long a per-provider speed-test client stays +// cached. Short enough to absorb a burst of requests; long enough that +// repeated speed tests in a monitoring loop don't dial each time. +const clientTTL = 5 * time.Minute + +// janitorInterval controls how often expired clients are reaped. Set to +// clientTTL/5 so a freshly-expired entry is collected within ~1 minute +// of expiry instead of lingering until the next request for the same +// provider — without the sweep, an idle pod retains a full +// nntppool.Client (with its connection set and goroutines) per provider +// ever speed-tested in the process lifetime. +const janitorInterval = clientTTL / 5 + +func newSpeedtestCoordinator() *speedtestCoordinator { + sc := &speedtestCoordinator{ + clients: make(map[string]*cachedSpeedtestClient), + stopCh: make(chan struct{}), + } + sc.wg.Add(1) + go sc.janitorLoop() + return sc +} + +// janitorLoop periodically evicts expired cache entries. Exits when +// stopCh is closed. Holds the mutex only while iterating the map; each +// Close() happens after delete to keep the critical section bounded. +func (sc *speedtestCoordinator) janitorLoop() { + defer sc.wg.Done() + ticker := time.NewTicker(janitorInterval) + defer ticker.Stop() + for { + select { + case <-sc.stopCh: + return + case <-ticker.C: + sc.sweepExpired() + } + } +} + +// sweepExpired removes every entry whose TTL has elapsed and closes the +// underlying client. Splitting the Close calls out of the locked section +// keeps the mutex hold time bounded by map size rather than by however +// long nntppool.Client.Close takes (which can issue QUIT and wait for +// connection teardown). +func (sc *speedtestCoordinator) sweepExpired() { + now := time.Now() + var expired []*nntppool.Client + sc.mu.Lock() + for id, entry := range sc.clients { + if now.After(entry.expiresAt) { + expired = append(expired, entry.client) + delete(sc.clients, id) + } + } + sc.mu.Unlock() + for _, c := range expired { + if c != nil { + c.Close() + } + } +} + +// getOrBuildClient returns a cached client for the provider or builds +// a new one. The caller MUST NOT Close the returned client — the +// coordinator owns its lifetime. Returns the client and the +// nntppool-side provider name (used by SpeedTest). +func (sc *speedtestCoordinator) getOrBuildClient(ctx context.Context, p *config.ProviderConfig) (*nntppool.Client, string, error) { + host := fmt.Sprintf("%s:%d", p.Host, p.Port) + providerName := host + if p.Username != "" { + providerName = host + "+" + p.Username + } + + sc.mu.Lock() + if entry, ok := sc.clients[p.ID]; ok && time.Now().Before(entry.expiresAt) { + sc.mu.Unlock() + return entry.client, providerName, nil + } + // Drop stale entry before building a new one. + if entry, ok := sc.clients[p.ID]; ok { + entry.client.Close() + delete(sc.clients, p.ID) + } + sc.mu.Unlock() + + var tlsCfg *tls.Config + if p.TLS { + tlsCfg = &tls.Config{ + InsecureSkipVerify: p.InsecureTLS, + ServerName: p.Host, + } + } + + client, err := nntppool.NewClient(ctx, []nntppool.Provider{ + { + Host: host, + TLSConfig: tlsCfg, + Auth: nntppool.Auth{Username: p.Username, Password: p.Password}, + Connections: p.MaxConnections, + IdleTimeout: 60 * time.Second, + }, + }) + if err != nil { + return nil, "", err + } + + sc.mu.Lock() + // Another goroutine may have raced ahead and built a client too; + // keep whichever was inserted first and close the loser. + if existing, ok := sc.clients[p.ID]; ok && time.Now().Before(existing.expiresAt) { + sc.mu.Unlock() + client.Close() + return existing.client, providerName, nil + } + sc.clients[p.ID] = &cachedSpeedtestClient{ + client: client, + expiresAt: time.Now().Add(clientTTL), + host: host, + } + sc.mu.Unlock() + + return client, providerName, nil +} + +// run executes fn under the singleflight key for the given provider, +// so concurrent callers share a single speed-test result. fn receives +// the cached/built client and the nntppool provider name. +func (sc *speedtestCoordinator) run( + ctx context.Context, + p *config.ProviderConfig, + fn func(client *nntppool.Client, providerName string) (any, error), +) (any, error) { + v, err, _ := sc.sf.Do(p.ID, func() (any, error) { + client, providerName, err := sc.getOrBuildClient(ctx, p) + if err != nil { + return nil, err + } + return fn(client, providerName) + }) + return v, err +} + +// shutdown stops the janitor and closes all cached clients. Safe to +// call multiple times — stopOnce makes the janitor-stop idempotent; +// the final drain loop is naturally a no-op on an empty map. +func (sc *speedtestCoordinator) shutdown() { + sc.stopOnce.Do(func() { + close(sc.stopCh) + }) + sc.wg.Wait() + + sc.mu.Lock() + clients := sc.clients + sc.clients = make(map[string]*cachedSpeedtestClient) + sc.mu.Unlock() + for _, entry := range clients { + if entry.client != nil { + entry.client.Close() + } + } +} diff --git a/internal/api/speedtest_coordinator_test.go b/internal/api/speedtest_coordinator_test.go new file mode 100644 index 00000000..d842fa19 --- /dev/null +++ b/internal/api/speedtest_coordinator_test.go @@ -0,0 +1,177 @@ +package api + +import ( + "sync" + "testing" + "time" +) + +// speedtest_coordinator_test.go pins the janitor behaviour added to +// prevent the cache from retaining nntppool.Client instances after +// every speed-tested provider goes idle. Without the janitor, an idle +// pod would hold one *nntppool.Client per ever-speed-tested providerID +// for the lifetime of the process — the cache only shrinks when the +// same providerID is requested again. +// +// We don't dial real NNTP here. Test entries install nil clients; +// shutdown's nil-guard makes that safe. + +func installExpiredEntry(sc *speedtestCoordinator, id string) { + sc.mu.Lock() + sc.clients[id] = &cachedSpeedtestClient{ + client: nil, + expiresAt: time.Now().Add(-1 * time.Hour), + host: "test:" + id, + } + sc.mu.Unlock() +} + +func installFreshEntry(sc *speedtestCoordinator, id string) { + sc.mu.Lock() + sc.clients[id] = &cachedSpeedtestClient{ + client: nil, + expiresAt: time.Now().Add(1 * time.Hour), + host: "test:" + id, + } + sc.mu.Unlock() +} + +func cacheSize(sc *speedtestCoordinator) int { + sc.mu.Lock() + defer sc.mu.Unlock() + return len(sc.clients) +} + +// newTestCoordinator returns a coordinator with a stopped janitor so +// tests can drive sweepExpired manually. The wg is empty (no goroutine +// started) so shutdown returns immediately. +func newTestCoordinator() *speedtestCoordinator { + return &speedtestCoordinator{ + clients: make(map[string]*cachedSpeedtestClient), + stopCh: make(chan struct{}), + } +} + +// TestSpeedtestCoordinator_SweepEvictsExpiredEntries pins the headline +// behaviour: entries past expiresAt MUST be removed by sweepExpired +// without requiring a subsequent request for the same providerID. This +// is the leak fix — without the janitor calling sweepExpired on a +// ticker, the cache would only shrink on next access, leaving every +// ever-speed-tested provider's *nntppool.Client resident. +func TestSpeedtestCoordinator_SweepEvictsExpiredEntries(t *testing.T) { + sc := newTestCoordinator() + + installExpiredEntry(sc, "p1") + installExpiredEntry(sc, "p2") + installFreshEntry(sc, "p3") + + if got := cacheSize(sc); got != 3 { + t.Fatalf("setup: cache size = %d, want 3", got) + } + + sc.sweepExpired() + + if got := cacheSize(sc); got != 1 { + t.Errorf("after sweep: cache size = %d, want 1 (only p3 should survive)", got) + } + sc.mu.Lock() + _, hasP3 := sc.clients["p3"] + sc.mu.Unlock() + if !hasP3 { + t.Errorf("after sweep: p3 (fresh entry) was incorrectly evicted") + } +} + +// TestSpeedtestCoordinator_NewCoordinatorStartsAndStopsJanitor verifies +// the janitor goroutine is started by newSpeedtestCoordinator and that +// shutdown stops it. Without this pairing, every speedtest endpoint hit +// would either bypass eviction (if janitor never started) or leak the +// janitor goroutine itself (if shutdown didn't stop it). +func TestSpeedtestCoordinator_NewCoordinatorStartsAndStopsJanitor(t *testing.T) { + sc := newSpeedtestCoordinator() + + done := make(chan struct{}) + go func() { + sc.shutdown() + close(done) + }() + + select { + case <-done: + // Shutdown completed — janitor exited cleanly. + case <-time.After(2 * time.Second): + t.Fatalf("shutdown did not complete within 2s — janitor stuck or wg not paired correctly") + } + + // A second shutdown must be a safe no-op (stopOnce guards close). + sc.shutdown() +} + +// TestSpeedtestCoordinator_ShutdownDrainsAllEntries verifies shutdown +// drains the map regardless of expiresAt. The map must end up empty so +// the *speedtestCoordinator itself becomes GC-eligible. +func TestSpeedtestCoordinator_ShutdownDrainsAllEntries(t *testing.T) { + sc := newSpeedtestCoordinator() + installFreshEntry(sc, "p1") + installFreshEntry(sc, "p2") + + sc.shutdown() + + if got := cacheSize(sc); got != 0 { + t.Errorf("after shutdown: cache size = %d, want 0 (must drain all entries regardless of TTL)", got) + } +} + +// TestSpeedtestCoordinator_ConcurrentSweepAndAccess is a race-detector +// stress: while sweepExpired runs concurrently, callers install and +// look up entries. The mutex on sc.clients must serialise both paths. +func TestSpeedtestCoordinator_ConcurrentSweepAndAccess(t *testing.T) { + sc := newTestCoordinator() + + stop := make(chan struct{}) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + sc.sweepExpired() + } + } + }() + + for i := 0; i < 4; i++ { + wg.Add(1) + id := i + go func() { + defer wg.Done() + for j := 0; j < 500; j++ { + select { + case <-stop: + return + default: + } + if j%2 == 0 { + installExpiredEntry(sc, idStr(id)) + } else { + installFreshEntry(sc, idStr(id)) + } + _ = cacheSize(sc) + } + }() + } + + time.Sleep(200 * time.Millisecond) + close(stop) + wg.Wait() + // No assertion — passing -race is the assertion. +} + +// idStr is a tiny stdlib-free int formatter for single-digit IDs. +func idStr(i int) string { + return string(rune('0' + i)) +} diff --git a/internal/health/repair_e2e_test.go b/internal/health/repair_e2e_test.go index eed05344..40b9a4a9 100644 --- a/internal/health/repair_e2e_test.go +++ b/internal/health/repair_e2e_test.go @@ -24,7 +24,7 @@ import ( // mockPoolManager implements pool.Manager and always fails GetPool so segment validation fails. type mockPoolManager struct{} -func (m *mockPoolManager) GetPool() (*nntppool.Client, error) { +func (m *mockPoolManager) GetPool() (pool.NntpClient, error) { return nil, errors.New("no pool available (test mock)") } func (m *mockPoolManager) SetProviders(_ []nntppool.Provider) error { return nil } @@ -44,6 +44,7 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error return nil } func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {} + // mockARRsService captures TriggerFileRescan calls and returns a configurable error. type mockARRsService struct { mu sync.Mutex diff --git a/internal/importer/parser/parser_storm_test.go b/internal/importer/parser/parser_storm_test.go new file mode 100644 index 00000000..40cd2b73 --- /dev/null +++ b/internal/importer/parser/parser_storm_test.go @@ -0,0 +1,175 @@ +package parser + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/javi11/altmount/internal/config" + "github.com/javi11/altmount/internal/pool" + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" + "github.com/javi11/nntppool/v4" + "github.com/javi11/nzbparser" +) + +// parser_storm_test.go pins the per-import invariants on the parser side. +// See ../../usenet/STREAMING_INVARIANTS.md for the contract. +// +// The parser is responsible only for bounding fan-out WITHIN a single +// import job via MaxImportConnections. Cross-job budgeting is out of +// scope for the parser. + +// fakeFullPoolManager satisfies the full pool.Manager surface so the +// parser can call GetPool, HasPool, and the metric inc helpers without +// nil-deref. +type fakeFullPoolManager struct { + client pool.NntpClient +} + +func newFakeFullPoolManager(client pool.NntpClient) *fakeFullPoolManager { + return &fakeFullPoolManager{client: client} +} + +var _ pool.Manager = (*fakeFullPoolManager)(nil) + +func (m *fakeFullPoolManager) GetPool() (pool.NntpClient, error) { return m.client, nil } +func (m *fakeFullPoolManager) SetProviders(_ []nntppool.Provider) error { return nil } +func (m *fakeFullPoolManager) ClearPool() error { return nil } +func (m *fakeFullPoolManager) HasPool() bool { return true } +func (m *fakeFullPoolManager) GetMetrics() (pool.MetricsSnapshot, error) { + return pool.MetricsSnapshot{}, nil +} +func (m *fakeFullPoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil } +func (m *fakeFullPoolManager) ResetProviderErrors(_ context.Context) error { return nil } +func (m *fakeFullPoolManager) IncArticlesDownloaded() {} +func (m *fakeFullPoolManager) UpdateDownloadProgress(_ string, _ int64) {} +func (m *fakeFullPoolManager) IncArticlesPosted() {} +func (m *fakeFullPoolManager) AddProvider(_ nntppool.Provider) error { return nil } +func (m *fakeFullPoolManager) RemoveProvider(_ string) error { return nil } +func (m *fakeFullPoolManager) ResetProviderQuota(_ context.Context, _ string) error { + return nil +} +func (m *fakeFullPoolManager) SetProviderIDs(_ map[string]string) {} + +// stormConfigGetter returns a ConfigGetter whose MaxImportConnections is +// exactly N. This is the per-import-job cap on wire-call burstiness. +func stormConfigGetter(maxImportConnections int) config.ConfigGetter { + cfg := config.DefaultConfig() + cfg.Import.MaxImportConnections = maxImportConnections + return func() *config.Config { return cfg } +} + +// buildSyntheticNzbFiles returns numFiles nzbparser.NzbFile entries each +// pointing at a single fakepool-known message-ID. The parser's +// fetchAllFirstSegments path will issue one Body call per file — these +// are the calls we count. +func buildSyntheticNzbFiles(numFiles int) []nzbparser.NzbFile { + files := make([]nzbparser.NzbFile, numFiles) + for i := range files { + files[i] = nzbparser.NzbFile{ + Filename: segments.MessageID(i) + ".bin", + Segments: nzbparser.NzbSegments{ + {Bytes: 1024, Number: 1, ID: segments.MessageID(i)}, + }, + } + } + return files +} + +// TestStorm_ImporterFanOutRespectsMaxImportConnections pins the per-job +// invariant: the parser's first-segment fan-out for a SINGLE import MUST +// stay bounded by MaxImportConnections. Cross-job bounding is out of +// scope for the parser. +func TestStorm_ImporterFanOutRespectsMaxImportConnections(t *testing.T) { + t.Parallel() + const ( + filesPerImport = 20 + maxImportConnections = 6 + bodyLatency = 60 * time.Millisecond + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < filesPerImport; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: bodyLatency, + Bytes: make([]byte, 64), + }) + } + + mgr := newFakeFullPoolManager(fp) + parser := NewParser(mgr, stormConfigGetter(maxImportConnections)) + + files := buildSyntheticNzbFiles(filesPerImport) + _, _, _ = parser.fetchAllFirstSegments(ctx, files, nil) + + mif := fp.MaxInFlight() + t.Logf("single import × %d files (MaxImportConnections=%d) "+ + "produced MaxInFlight=%d Body calls (invariant: MaxInFlight <= MaxImportConnections)", + filesPerImport, maxImportConnections, mif) + + if mif > int32(maxImportConnections) { + t.Errorf("INVARIANT regression: MaxInFlight=%d, want <= %d (MaxImportConnections). "+ + "fetchAllFirstSegments must size its concPool by MaxImportConnections — "+ + "if this fails, a single import is fanning out past its configured cap "+ + "and will saturate the slot semaphore on its own.", + mif, maxImportConnections) + } +} + +// TestStorm_ImporterParallelImportsAreNotInternallyGated asserts that the +// parser itself does not bound cross-import fan-out. N concurrent imports +// each fan out up to MaxImportConnections, so MaxInFlight scales as +// N × MaxImportConnections. Cross-job admission control, if introduced, +// belongs at a higher layer — not inside the parser. +func TestStorm_ImporterParallelImportsAreNotInternallyGated(t *testing.T) { + t.Parallel() + const ( + concurrentImports = 4 + filesPerImport = 5 + maxImportConnections = 4 + bodyLatency = 60 * time.Millisecond + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < filesPerImport; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: bodyLatency, + Bytes: make([]byte, 64), + }) + } + + mgr := newFakeFullPoolManager(fp) + parser := NewParser(mgr, stormConfigGetter(maxImportConnections)) + + var wg sync.WaitGroup + for i := 0; i < concurrentImports; i++ { + wg.Add(1) + go func() { + defer wg.Done() + files := buildSyntheticNzbFiles(filesPerImport) + _, _, _ = parser.fetchAllFirstSegments(ctx, files, nil) + }() + } + wg.Wait() + + mif := fp.MaxInFlight() + t.Logf("%d concurrent imports × %d files (MaxImportConnections=%d) "+ + "produced MaxInFlight=%d (parser does NOT internally cap cross-import fan-out)", + concurrentImports, filesPerImport, maxImportConnections, mif) + + // The parser must NOT silently re-introduce an internal slot. If MaxInFlight + // stays <= maxImportConnections under N concurrent imports, the parser is + // gating internally — regression. + minExpected := int32(maxImportConnections + 1) + if mif < minExpected { + t.Errorf("regression: MaxInFlight=%d, expected > %d. Parser appears to be "+ + "gating cross-import fan-out internally.", + mif, maxImportConnections) + } +} diff --git a/internal/nzbfilesystem/disconnect_storm_test.go b/internal/nzbfilesystem/disconnect_storm_test.go new file mode 100644 index 00000000..dbbea1af --- /dev/null +++ b/internal/nzbfilesystem/disconnect_storm_test.go @@ -0,0 +1,194 @@ +package nzbfilesystem + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" +) + +// disconnect_storm_test.go pins the post-S9 invariant for HTTP-client +// disconnects: mvf.Close interrupts the in-flight UsenetReader via an +// atomic interruptHandle BEFORE trying to acquire mvf.mu. The blocked +// segment download returns ctx.Canceled, the concurrent Read releases +// the lock, and Close completes in microseconds regardless of segment +// latency. +// +// Without this, a Plex/Jellyfin scrub session that produces frequent +// disconnect cycles would pin a request goroutine (and its pool slot) +// for the per-attempt timeout × retry-count on every disconnect. +// +// See ../usenet/STREAMING_INVARIANTS.md. + +// TestStorm_ClientDisconnectHoldsPoolSlotForUpTo30s drives the +// disconnect scenario directly. Each "session" opens a MetadataVirtualFile, +// reads a few bytes (which kicks off prefetch goroutines holding +// BodyPriority calls in flight), cancels the read context (simulating +// HTTP client disconnect), and times how long mvf.Close takes. +// +// PINNED INVARIANT (post-S9 fix): cancellation propagates fast. +// mvf.Close calls interruptCurrentReader before contending for mvf.mu, +// which fires ctx-cancel on the in-flight UsenetReader. The blocked +// segment download returns ctx.Canceled, the concurrent Read releases +// mvf.mu, and Close completes in microseconds regardless of segment +// latency. Test asserts < 250ms with a 2s fake-pool latency. +func TestStorm_ClientDisconnectHoldsPoolSlotForUpTo30s(t *testing.T) { + t.Parallel() + const ( + segCount = 5 + segSize = 1024 + maxPrefetch = 4 + // Slow segment latency: simulates a real provider taking time + // to respond. mvf.Close waits for this to complete. + segLatency = 2 * time.Second + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + configurePoolForFile(fp, segCount, segSize, fakepool.SegmentBehavior{ + Latency: segLatency, + }) + + mvf := newTestMVF(t, ctx, fp, segCount, segSize, maxPrefetch) + + // Start a Read in a goroutine. This triggers reader initialization + // and downloadManager spawning. The Read itself will block waiting + // for segment 0's body to arrive. + readDone := make(chan struct{}) + readStarted := make(chan struct{}) + go func() { + defer close(readDone) + close(readStarted) + buf := make([]byte, 16) + _, _ = mvf.Read(buf) + }() + <-readStarted + // Give Read time to acquire mvf.mu and kick off prefetch. Without + // this delay, mvf.Close could acquire the lock first and nil out + // meta before Read's ensureReader runs. + time.Sleep(100 * time.Millisecond) + + // Wait until at least one BodyPriority call is in flight, confirming + // the prefetch pipeline is alive. + deadline := time.Now().Add(2 * time.Second) + for fp.InFlight() == 0 && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + if fp.InFlight() == 0 { + t.Fatalf("no BodyPriority call ever went in flight; prefetch may not be running") + } + + // "Disconnect": close the file. This calls mvf.Close, which calls + // UsenetReader.Close, which waits up to 30s for in-flight downloads. + // Time how long it takes. + closeStart := time.Now() + _ = mvf.Close() + closeElapsed := time.Since(closeStart) + <-readDone // let the orphaned Read goroutine exit so race detector is happy + + t.Logf("mvf.Close took %s after disconnect with %s segment latency "+ + "(invariant: < %s regardless of segment latency)", + closeElapsed, segLatency, 250*time.Millisecond) + + // PINNED INVARIANT: Close interrupts in-flight downloads instead of + // waiting for them. Bound is loose to absorb scheduler jitter; the + // real number is in the microseconds on a healthy host. + const closeBudget = 250 * time.Millisecond + if closeElapsed > closeBudget { + t.Errorf("INVARIANT regression (S9): mvf.Close took %s, want <= %s with segLatency=%s. "+ + "interruptCurrentReader is no longer cancelling the in-flight UsenetReader before "+ + "contending for mvf.mu.", + closeElapsed, closeBudget, segLatency) + } +} + +// TestStorm_ConcurrentDisconnectsPinManyGoroutines is the cumulative +// version of the single-disconnect scenario above. It opens 10 concurrent +// "sessions", each starts streaming, each disconnects after a short +// delay. +// +// PINNED INVARIANT (post-S9 fix): max per-close latency stays under a +// small constant regardless of segLatency. The close path interrupts +// in-flight downloads via the atomic interruptHandle rather than +// waiting for them. +func TestStorm_ConcurrentDisconnectsPinManyGoroutines(t *testing.T) { + t.Parallel() + const ( + sessions = 10 + segCount = 5 + segSize = 1024 + maxPrefetch = 4 + segLatency = 1500 * time.Millisecond + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < segCount; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: segLatency, + Bytes: segments.Payload(i, segSize), + }) + } + + var wg sync.WaitGroup + var maxCloseElapsed time.Duration + var mu sync.Mutex + + overallStart := time.Now() + for s := 0; s < sessions; s++ { + wg.Add(1) + go func() { + defer wg.Done() + mvf := newTestMVF(t, ctx, fp, segCount, segSize, maxPrefetch) + + // Signal when this session's Read has actually acquired + // mvf.mu and reached the in-flight wait. Using a per-session + // signal (not the shared fp.InFlight counter) avoids races + // where one session's Close runs before its own Read has + // started. + readStarted := make(chan struct{}) + readDone := make(chan struct{}) + go func() { + defer close(readDone) + close(readStarted) // signals before Read actually acquires lock + buf := make([]byte, 16) + _, _ = mvf.Read(buf) + }() + <-readStarted + // Give the Read goroutine a moment to acquire mvf.mu and + // kick off the prefetch pipeline. + time.Sleep(50 * time.Millisecond) + + closeStart := time.Now() + _ = mvf.Close() + elapsed := time.Since(closeStart) + <-readDone + + mu.Lock() + if elapsed > maxCloseElapsed { + maxCloseElapsed = elapsed + } + mu.Unlock() + }() + } + wg.Wait() + overallElapsed := time.Since(overallStart) + + t.Logf("%d concurrent disconnects; max close=%s overall=%s "+ + "(invariant: max close < %s regardless of segLatency=%s)", + sessions, maxCloseElapsed, overallElapsed, 250*time.Millisecond, segLatency) + + // PINNED INVARIANT: max close < 250ms regardless of segLatency. + const closeBudget = 250 * time.Millisecond + if maxCloseElapsed > closeBudget { + t.Errorf("INVARIANT regression (S9): maxCloseElapsed=%s, want <= %s after %d concurrent "+ + "disconnects with segLatency=%s. interruptCurrentReader is no longer cancelling "+ + "in-flight UsenetReaders before contending for mvf.mu.", + maxCloseElapsed, closeBudget, sessions, segLatency) + } +} diff --git a/internal/nzbfilesystem/metadata_remote_file.go b/internal/nzbfilesystem/metadata_remote_file.go index 8d2c027b..5d36ba39 100644 --- a/internal/nzbfilesystem/metadata_remote_file.go +++ b/internal/nzbfilesystem/metadata_remote_file.go @@ -11,8 +11,11 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/javi11/altmount/internal/config" "github.com/javi11/altmount/internal/database" "github.com/javi11/altmount/internal/encryption" @@ -809,7 +812,68 @@ type MetadataVirtualFile struct { segmentIndex *segmentOffsetIndex mu sync.Mutex - closeWg sync.WaitGroup // tracks background reader closes during seek + closeWg sync.WaitGroup // tracks the bounded closer-worker pool + + // closerCh is the per-file bounded closer queue. Lazy-initialized + // on first closeCurrentReader; closed in mvf.Close so the worker + // goroutines exit. See enqueueCloser / closerWorkerCount. + closerCh chan io.Closer + + // interruptHandle tracks the latest reader for cancellation from Close + // without taking mvf.mu. Read can hold mvf.mu for the full segment + // download latency, so Close must be able to fire ctx-cancel on the + // in-flight reader before contending for the lock. Stores a + // readerInterrupter; an empty value (Load returns nil) means no + // interruptible reader is set. + interruptHandle atomic.Value + + // randomReadCache is a small per-file LRU of full segment bytes used + // by the ephemeral ReadAt path to coalesce random-access scrubbing. + // Lazily initialized on first ephemeral ReadAt. Held under mvf.mu. + // See STREAMING_INVARIANTS.md I9. + randomReadCache *lru.Cache[int, []byte] +} + +// randomReadCacheSize bounds the per-file ephemeral-read cache. 8 +// segments × default segment size (~768 KB) ≈ 6 MB per open file, +// keeping the worst-case footprint bounded under library-scan loads. +const randomReadCacheSize = 8 + +// readerInterrupter is the interface implemented by readers that can +// abort their in-flight downloads by canceling their internal context. +// UsenetReader (and wrappers that own a UsenetReader) implement this so +// MetadataVirtualFile.Close can interrupt them without holding mvf.mu. +type readerInterrupter interface { + Interrupt() +} + +// interruptSlot wraps a readerInterrupter so we can store a nil-valued +// entry in atomic.Value without panicking (Value.Store rejects untyped +// nil and rejects type changes between calls). +type interruptSlot struct{ i readerInterrupter } + +// setReader assigns a new reader and refreshes the interrupt handle. +// Callers must hold mvf.mu. Pass nil to clear. +func (mvf *MetadataVirtualFile) setReader(r io.ReadCloser) { + mvf.reader = r + slot := interruptSlot{} + if i, ok := r.(readerInterrupter); ok { + slot.i = i + } + mvf.interruptHandle.Store(slot) +} + +// interruptCurrentReader fires a non-blocking cancel on the in-flight +// reader if one is set. Safe to call without holding mvf.mu. +func (mvf *MetadataVirtualFile) interruptCurrentReader() { + v := mvf.interruptHandle.Load() + if v == nil { + return + } + slot, _ := v.(interruptSlot) + if slot.i != nil { + slot.i.Interrupt() + } } // segmentOffsetIndex provides O(1) lookup for offset→segment mapping using binary search @@ -1028,6 +1092,17 @@ func (mvf *MetadataVirtualFile) ReadAtContext(readCtx context.Context, p []byte, end = mvf.meta.FileSize - 1 } + // Coalesce small random reads through a per-file LRU of full segment + // bytes. Plex/Jellyfin scrubbing produces bursts of small ReadAts + // across a handful of segments; without this every call hit the wire + // (storm S5). Only viable for plain (unencrypted, non-nested) + // segments — encrypted streams don't map cleanly to segment + // boundaries. + if n, served := mvf.tryServeFromRandomReadCache(readCtx, p, off, end); served { + mvf.readAtSharedNext = off + int64(n) + return n, nil + } + reader, err := mvf.createReaderAtOffset(off, end) if err != nil { return 0, err @@ -1048,6 +1123,96 @@ func (mvf *MetadataVirtualFile) ReadAtContext(readCtx context.Context, p []byte, return n, err } +// tryServeFromRandomReadCache attempts to satisfy a single-segment +// ephemeral ReadAt from the per-file LRU. On miss it downloads the +// full containing segment, caches it, then serves the requested +// window. Returns (bytesCopied, true) on success or (0, false) when +// the caller must fall back to the normal ephemeral path. Caller must +// hold mvf.mu. +// +// Skipped for encrypted and nested-source files because their segment +// boundaries don't align with plaintext byte ranges. +func (mvf *MetadataVirtualFile) tryServeFromRandomReadCache(readCtx context.Context, p []byte, off, end int64) (int, bool) { + if mvf.meta == nil || + mvf.meta.Encryption != metapb.Encryption_NONE || + len(mvf.meta.NestedSources) > 0 || + len(mvf.meta.SegmentData) == 0 { + return 0, false + } + mvf.segmentIndexOnce.Do(func() { + mvf.segmentIndex = buildSegmentIndex(mvf.meta.SegmentData) + }) + if mvf.segmentIndex == nil { + return 0, false + } + segIdx := mvf.segmentIndex.findSegmentForOffset(off) + if segIdx < 0 { + return 0, false + } + segStart := mvf.segmentIndex.getOffsetForSegment(segIdx) + segSize := mvf.segmentIndex.sizes[segIdx] + segEnd := segStart + segSize - 1 + // Only single-segment reads benefit from the per-segment cache. + if end > segEnd { + return 0, false + } + + if mvf.randomReadCache == nil { + c, err := lru.New[int, []byte](randomReadCacheSize) + if err != nil { + return 0, false + } + mvf.randomReadCache = c + } + + // At end-of-file the caller's buffer may be larger than the readable + // remainder of the segment — the ephemeral path clamps `end` to + // FileSize-1 but `len(p)` is not clamped. Use the clamped window + // (off..end inclusive) as the source of truth for how many bytes we're + // actually allowed to copy out of the cached segment. + want := int(end - off + 1) + if want <= 0 { + return 0, false + } + if want > len(p) { + want = len(p) + } + + if data, ok := mvf.randomReadCache.Get(segIdx); ok { + rel := off - segStart + if rel < 0 || rel >= int64(len(data)) { + return 0, false + } + n := copy(p[:want], data[rel:]) + return n, true + } + + // Miss: fetch the whole segment via an ephemeral reader so the next + // small read in the same segment is a cache hit. + reader, err := mvf.createReaderAtOffset(segStart, segEnd) + if err != nil { + return 0, false + } + defer reader.Close() + + full := make([]byte, segSize) + rn, err := readFullContext(readCtx, reader, full) + if err != nil && err != io.ErrUnexpectedEOF { + return 0, false + } + rel := off - segStart + if int64(rn) <= rel { + return 0, false + } + // Only cache when we got the whole segment; partial data is more + // trouble than it's worth. + if int64(rn) == segSize { + mvf.randomReadCache.Add(segIdx, full) + } + n := copy(p[:want], full[rel:rn]) + return n, true +} + // createReaderAtOffset creates an independent reader for reading at a specific offset. // This reader is self-contained and can be used concurrently with other readers. func (mvf *MetadataVirtualFile) createReaderAtOffset(start, end int64) (io.ReadCloser, error) { @@ -1178,23 +1343,41 @@ func (mvf *MetadataVirtualFile) Seek(offset int64, whence int) (int64, error) { // Close implements afero.File.Close func (mvf *MetadataVirtualFile) Close() error { - // Remove from stream tracker if applicable + // Fire cancel on the in-flight reader BEFORE contending for mvf.mu. + // A concurrent Read can hold mvf.mu for the full segment download + // latency (15s per-attempt timeout × 5 retries). Cancelling the + // reader's context first lets that Read unblock and release the lock + // so Close completes quickly. Safe to call without mvf.mu — uses an + // atomic handle. See STREAMING_INVARIANTS.md S9. + mvf.interruptCurrentReader() + mvf.mu.Lock() + // Remove from stream tracker under the same lock that Read / ReadAtContext + // use to read streamID. Without this, the race detector flags an + // unsynchronized read/write between Close and a concurrent Read. if mvf.streamTracker != nil && mvf.streamID != "" { mvf.streamTracker.Remove(mvf.streamID) mvf.streamID = "" } - - mvf.mu.Lock() if mvf.reader != nil { mvf.reader.Close() - mvf.reader = nil + mvf.setReader(nil) mvf.readerInitialized = false } mvf.segmentIndex = nil // Release segment offset index for GC mvf.meta = nil // Release segment/nested-source slices for GC + if mvf.randomReadCache != nil { + mvf.randomReadCache.Purge() + mvf.randomReadCache = nil + } + // Signal the bounded closer workers to drain remaining queued + // closes and exit. Workers are tracked by closeWg. + if mvf.closerCh != nil { + close(mvf.closerCh) + mvf.closerCh = nil + } mvf.mu.Unlock() - // Wait for any background reader closes from previous seeks + // Wait for the closer-worker pool to finish draining. mvf.closeWg.Wait() return nil @@ -1277,19 +1460,51 @@ func (mvf *MetadataVirtualFile) hasMoreDataToRead() bool { return false } -// closeCurrentReader detaches the current reader and closes it in the background. -// This avoids blocking Seek on UsenetReader.Close() which may wait for in-flight downloads. +// closeCurrentReader detaches the current reader and hands it to the +// bounded closer pool for asynchronous Close. This avoids blocking +// Seek on UsenetReader.Close() which may wait for in-flight downloads. +// If the pool's queue is full (more pending closes than +// closerWorkerCount can drain) the close runs inline as backpressure +// rather than spawning an unbounded fan-out. See STREAMING_INVARIANTS.md S6. func (mvf *MetadataVirtualFile) closeCurrentReader() { if mvf.reader != nil { reader := mvf.reader - mvf.reader = nil - mvf.closeWg.Go(func() { - reader.Close() - }) + mvf.setReader(nil) + mvf.enqueueCloser(reader) } mvf.readerInitialized = false } +// closerWorkerCount bounds the number of background reader-Close +// goroutines that a single MetadataVirtualFile keeps in flight at +// once. Tuned to absorb normal Seek bursts (e.g. video-scrubbing in +// 4-direction probes) without producing the storm S6 fan-out. +const closerWorkerCount = 4 + +// enqueueCloser hands a reader to the per-file bounded closer pool. +// Lazy-starts the worker goroutines on first call. Caller must hold +// mvf.mu (so the lazy init is safe). +func (mvf *MetadataVirtualFile) enqueueCloser(r io.Closer) { + if mvf.closerCh == nil { + mvf.closerCh = make(chan io.Closer, closerWorkerCount) + for i := 0; i < closerWorkerCount; i++ { + mvf.closeWg.Go(func() { + for c := range mvf.closerCh { + _ = c.Close() + } + }) + } + } + select { + case mvf.closerCh <- r: + default: + // Queue full — apply backpressure inline rather than letting + // the closer fan-out grow unbounded. This is the rare path; a + // real Seek burst stays under closerWorkerCount. + _ = r.Close() + } +} + // ensureReader ensures we have a reader initialized for the current position with range support func (mvf *MetadataVirtualFile) ensureReader() error { if mvf.readerInitialized { @@ -1328,21 +1543,21 @@ func (mvf *MetadataVirtualFile) ensureReader() error { if err != nil { return fmt.Errorf("failed to create nested reader: %w", err) } - mvf.reader = reader + mvf.setReader(reader) } else if mvf.meta.Encryption != metapb.Encryption_NONE { // Wrap the usenet reader with encryption decryptedReader, err := mvf.wrapWithEncryption(start, end) if err != nil { return fmt.Errorf(ErrMsgFailedWrapEncryption, err) } - mvf.reader = decryptedReader + mvf.setReader(decryptedReader) } else { // Create plain usenet reader ur, err := mvf.createUsenetReader(mvf.ctx, start, end) if err != nil { return err } - mvf.reader = ur + mvf.setReader(ur) } mvf.readerInitialized = true diff --git a/internal/nzbfilesystem/metadata_remote_file_storm_test.go b/internal/nzbfilesystem/metadata_remote_file_storm_test.go new file mode 100644 index 00000000..9c55bddc --- /dev/null +++ b/internal/nzbfilesystem/metadata_remote_file_storm_test.go @@ -0,0 +1,320 @@ +package nzbfilesystem + +import ( + "context" + "io" + "math/rand" + "runtime" + "sync/atomic" + "testing" + "time" + + metapb "github.com/javi11/altmount/internal/metadata/proto" + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/goroutines" + "github.com/javi11/altmount/internal/testsupport/segments" +) + +// slowCloseReader is a stand-in io.ReadCloser whose Close blocks for the +// configured duration. It models a UsenetReader.Close that is waiting for +// in-flight BodyPriority calls to drain. Each Close increments a +// completion counter so tests can observe how many closers actually ran. +type slowCloseReader struct { + closeDelay time.Duration + closes atomic.Int32 + closedSignal chan struct{} +} + +func newSlowCloseReader(delay time.Duration) *slowCloseReader { + return &slowCloseReader{ + closeDelay: delay, + closedSignal: make(chan struct{}), + } +} + +func (r *slowCloseReader) Read(_ []byte) (int, error) { + return 0, io.EOF +} + +func (r *slowCloseReader) Close() error { + time.Sleep(r.closeDelay) + r.closes.Add(1) + select { + case <-r.closedSignal: + default: + } + return nil +} + +// metadata_remote_file_storm_test.go reproduces the storm conditions +// caused by MetadataVirtualFile's per-request reader lifecycle. Each test +// uses the harness in streamtest_helpers_test.go to construct a minimal +// MetadataVirtualFile wired to the fake pool, then drives a ReadAt or +// Seek workload designed to expose the storm. +// +// Like the usenet-level storm tests, each test pins the CURRENT bad +// behavior with a concrete assertion. When the fix lands, the assertion +// fails and must be inverted to enforce the TARGET invariant. +// +// See ../usenet/STREAMING_INVARIANTS.md. + +// TestStorm_RandomReadAtCreatesEphemeralReaderPerCall pins the post-S5 +// invariant: ephemeral ReadAts with locality (the realistic Plex/ +// Jellyfin scrubbing pattern — many small reads within a small +// segment window) are coalesced by the per-file random-read LRU cache +// so total wire calls stay near the unique-segment count rather than +// the read count. +// +// Workload: 200 small ReadAts spread randomly across 8 distinct +// segments (fits in randomReadCacheSize). Without the cache, every +// call would fetch its containing segment → ≈200 BodyPriority calls. +// With the cache, after each segment is fetched once, subsequent reads +// in the same segment hit the cache → ≈8 BodyPriority calls. +func TestStorm_RandomReadAtCreatesEphemeralReaderPerCall(t *testing.T) { + t.Parallel() + const ( + segCount = 200 + segSize = 1024 + readCount = 200 + readSize = 64 + hotWindowSegs = 8 // working set fits in randomReadCacheSize + maxPrefetch = 4 + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + configurePoolForFile(fp, segCount, segSize, fakepool.SegmentBehavior{ + Latency: 1 * time.Millisecond, + }) + + mvf := newTestMVF(t, ctx, fp, segCount, segSize, maxPrefetch) + fileSize := int64(segCount * segSize) + + // Deterministic RNG so the test is reproducible. Restrict the access + // pattern to hotWindowSegs distinct segments so the working set fits + // in the per-file LRU. + rng := rand.New(rand.NewSource(42)) + buf := make([]byte, readSize) + for i := 0; i < readCount; i++ { + segIdx := rng.Intn(hotWindowSegs) + off := int64(segIdx) * int64(segSize) + if off+int64(readSize) > fileSize { + off = fileSize - int64(readSize) + } + if _, err := mvf.ReadAt(buf, off); err != nil { + t.Fatalf("ReadAt #%d at %d: %v", i, off, err) + } + } + + calls := fp.BodyPriorityCalls() + t.Logf("%d ReadAt calls within %d-segment hot window produced %d BodyPriority requests "+ + "(invariant: calls <= 2 × hotWindowSegs)", readCount, hotWindowSegs, calls) + + // PINNED INVARIANT: with the LRU cache, only the unique hot-window + // segments are fetched (plus a small slop for first-call shared-path + // overlap and any cache miss before warm-up). + const budget = 2 * hotWindowSegs + if calls > int64(budget) { + t.Errorf("INVARIANT regression (S5): BodyPriority=%d, want <= %d "+ + "(should be roughly one fetch per unique segment in the hot window). "+ + "The per-file random-read LRU is no longer coalescing repeated reads "+ + "within the same segment.", + calls, budget) + } +} + +// TestRandomReadCache_EOFReadDoesNotPanic regression-pins a bug where a +// ReadAt that straddles end-of-file panics inside +// tryServeFromRandomReadCache. The ephemeral path clamps `end` to +// FileSize-1 but `len(p)` is left at the original FUSE read size (e.g. +// 16 KiB). The cache then slices `full[rel : rel + len(p)]` past the +// segment's actual size: +// +// panic: runtime error: slice bounds out of range [:704512] with capacity 703432 +// +// reproduced on a Jellyfin library scan against an .mp4 whose last +// segment is partially-filled. The fix clamps the copy length to the +// clamped read window (end-off+1), capped by len(p). +// +// Test setup: a file whose last segment is the only one with usable +// data < SegmentSize, so the EOF straddle only ever affects the final +// segment. ReadAt is issued at a position close enough to FileSize that +// off + len(p) > FileSize but off + len(p) <= segStart + segSize. +func TestRandomReadCache_EOFReadDoesNotPanic(t *testing.T) { + t.Parallel() + const ( + fullSegs = 3 + segSize = 1024 + tailUsable = 700 // last segment has only 700 readable bytes + ) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + fp := fakepool.New() + // Full segments: every byte is readable. + for i := 0; i < fullSegs; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Bytes: segments.Payload(i, segSize), + }) + } + // Tail segment: only the first tailUsable bytes are valid file data, + // the rest is "padding" inside the segment's wire payload. The fake + // returns segSize bytes either way; FileSize is what stops the file + // short. + fp.SetBehavior(segments.MessageID(fullSegs), fakepool.SegmentBehavior{ + Bytes: segments.Payload(fullSegs, segSize), + }) + + segData := make([]*metapb.SegmentData, fullSegs+1) + for i := range segData { + segData[i] = &metapb.SegmentData{ + Id: segments.MessageID(i), + SegmentSize: int64(segSize), + StartOffset: 0, + EndOffset: int64(segSize - 1), + } + } + fileSize := int64(fullSegs*segSize + tailUsable) + + mvf := &MetadataVirtualFile{ + name: "test-eof-readat", + meta: &fileHandleMeta{ + FileSize: fileSize, + SegmentData: segData, + }, + poolManager: newFakePoolManager(fp), + ctx: ctx, + maxPrefetch: 4, + originalRangeEnd: -1, + streamTracker: noopStreamTracker{}, + streamID: "eof-stream", + } + t.Cleanup(func() { _ = mvf.Close() }) + + // Issue a 4 KiB ReadAt at an offset where off + len(p) > FileSize but + // off + len(p) <= segStart + segSize. This is the exact straddle that + // panicked in production. + buf := make([]byte, 4096) + off := int64(fullSegs * segSize) // start of the partial tail segment + n, err := mvf.ReadAt(buf, off) + if err != nil && err != io.EOF { + t.Fatalf("ReadAt across EOF returned unexpected error: %v", err) + } + if int64(n) != tailUsable { + t.Errorf("ReadAt at EOF returned n=%d, want %d (the readable tail)", n, tailUsable) + } + + // Issue the same read again so the second call lands on the cache-hit + // arm (the first call populates the LRU on success). Same straddle + // invariant must hold there. + n2, err := mvf.ReadAt(buf, off) + if err != nil && err != io.EOF { + t.Fatalf("cache-hit ReadAt across EOF returned unexpected error: %v", err) + } + if int64(n2) != tailUsable { + t.Errorf("cache-hit ReadAt at EOF returned n=%d, want %d", n2, tailUsable) + } +} + +// TestStorm_SeekSpamAccumulatesCloserGoroutines pins the post-S6 +// invariant: rapid Seek calls between unaligned positions are absorbed +// by a bounded closer-worker pool (closerWorkerCount=4 workers per +// file), so peak goroutine growth from seek-spam stays at a small +// constant regardless of seek rate. Without the bound, every Seek +// spawned its own closer goroutine via closeWg.Go (unbounded). +// +// We bypass the real UsenetReader and install a slowCloseReader directly +// into mvf.reader between each Seek. slowCloseReader.Close sleeps for +// closeDelay, modeling a real UsenetReader.Close that is waiting for +// in-flight BodyPriority calls to drain. With 50 seeks each installing a +// reader whose Close takes 1s, the unbounded closeWg.Go fan-out produces +// ~50 concurrently-pinned goroutines. +// +// Driving Seek through the real Read path (which would download segments) +// would be slow and racy; the install-directly approach exercises exactly +// the storm-relevant code path (closeCurrentReader → closeWg.Go) while +// giving us deterministic timing. +// +// CURRENT BEHAVIOR: NumGoroutine grows by ~one goroutine per seek (one +// closer waiting inside slowCloseReader.Close). +// +// TARGET INVARIANT (after fix): a bounded closer pool caps the number of +// pending closes at a small constant regardless of seek rate. +func TestStorm_SeekSpamAccumulatesCloserGoroutines(t *testing.T) { + t.Parallel() + const ( + segCount = 100 + segSize = 1024 + seekCount = 50 + maxPrefetch = 2 + // Close takes ~1s so all closer goroutines stay pinned for the + // duration of the seek burst. + closeDelay = 1 * time.Second + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + configurePoolForFile(fp, segCount, segSize, fakepool.SegmentBehavior{ + Latency: 1 * time.Millisecond, + }) + + mvf := newTestMVF(t, ctx, fp, segCount, segSize, maxPrefetch) + fileSize := int64(segCount * segSize) + + // Track total slow-close readers created so the test can sanity-check + // they were all properly handed to closeWg. + var totalReaders int32 + + // Install the first reader so the first Seek has something to close. + primeReader := newSlowCloseReader(closeDelay) + totalReaders++ + mvf.mu.Lock() + mvf.reader = primeReader + mvf.readerInitialized = true + mvf.position = 0 + mvf.mu.Unlock() + + snap := goroutines.Take(t) + + for i := 0; i < seekCount; i++ { + // Seek to a different position each time so closeCurrentReader fires. + var off int64 + if i%2 == 0 { + off = int64(i+1) * int64(segSize) + } else { + off = fileSize - int64(i+1)*int64(segSize) + } + if _, err := mvf.Seek(off, io.SeekStart); err != nil { + t.Fatalf("Seek #%d: %v", i, err) + } + // closeCurrentReader has nil-ed mvf.reader; install a fresh + // slow-close reader so the NEXT Seek has something to close. + next := newSlowCloseReader(closeDelay) + totalReaders++ + mvf.mu.Lock() + mvf.reader = next + mvf.readerInitialized = true + mvf.mu.Unlock() + } + + // Sample peak goroutine count immediately after the seek burst, + // while the closer workers are still draining queued readers. + peak := runtime.NumGoroutine() + delta := peak - snap.Baseline() + t.Logf("%d seeks installed %d readers; peak goroutines=%d (delta=%d from baseline %d) "+ + "(invariant: delta <= 2 × closer-pool size)", + seekCount, totalReaders, peak, delta, snap.Baseline()) + + // PINNED INVARIANT: goroutine delta stays bounded by the closer + // pool size (currently 4) regardless of seek count. Generous 2x + // budget to absorb the runtime's helper goroutines that may also + // be scheduled during the burst. + const budget = 2 * 4 // 2 × closerWorkerCount + if delta > budget { + t.Errorf("INVARIANT regression (S6): goroutine delta=%d, want <= %d after %d seeks. "+ + "closeCurrentReader is no longer routing into the bounded closer-worker pool.", + delta, budget, seekCount) + } +} diff --git a/internal/nzbfilesystem/metadata_remote_file_test.go b/internal/nzbfilesystem/metadata_remote_file_test.go index 7d2da6ac..e6d409b0 100644 --- a/internal/nzbfilesystem/metadata_remote_file_test.go +++ b/internal/nzbfilesystem/metadata_remote_file_test.go @@ -342,7 +342,7 @@ var _ pool.Manager = (*mockPoolManager)(nil) // mockPoolManager implements pool.Manager for testing type mockPoolManager struct{} -func (m *mockPoolManager) GetPool() (*nntppool.Client, error) { +func (m *mockPoolManager) GetPool() (pool.NntpClient, error) { return nil, nil } @@ -390,7 +390,6 @@ func (m *mockPoolManager) ResetProviderQuota(_ context.Context, _ string) error func (m *mockPoolManager) SetProviderIDs(_ map[string]string) {} - // TestSeekResetsOriginalRangeEnd tests that Seek properly resets originalRangeEnd // This is critical for video playback - without this fix, seeking causes stale range // information to be reused, breaking subsequent reads diff --git a/internal/nzbfilesystem/streamtest_helpers_test.go b/internal/nzbfilesystem/streamtest_helpers_test.go new file mode 100644 index 00000000..9950b5bc --- /dev/null +++ b/internal/nzbfilesystem/streamtest_helpers_test.go @@ -0,0 +1,133 @@ +package nzbfilesystem + +import ( + "context" + "testing" + + metapb "github.com/javi11/altmount/internal/metadata/proto" + "github.com/javi11/altmount/internal/pool" + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" + "github.com/javi11/nntppool/v4" +) + +// streamtest_helpers_test.go provides the construction kit shared by the +// MetadataVirtualFile-level storm tests in this package. The harness is +// deliberately minimal: only the fields actually touched by the read / +// ReadAt / Seek / Close paths are populated. The full struct has many +// optional collaborators (health repo, ARRs, rclone client, repair +// coalescer, ciphers) that the streaming-storm tests do not exercise, +// and leaving them nil exercises the same nil-guards that production +// code relies on. +// +// See ../usenet/STREAMING_INVARIANTS.md for the contract these tests pin. + +// fakePoolManager is a pool.Manager that always returns the supplied +// *fakepool.Client. Constructed via newFakePoolManager so tests pass the +// fake explicitly and any future changes to pool.Manager surface compile +// errors here rather than in random test files. +type fakePoolManager struct { + client pool.NntpClient +} + +var _ pool.Manager = (*fakePoolManager)(nil) + +func newFakePoolManager(c pool.NntpClient) *fakePoolManager { + return &fakePoolManager{client: c} +} + +func (m *fakePoolManager) GetPool() (pool.NntpClient, error) { return m.client, nil } +func (m *fakePoolManager) SetProviders(_ []nntppool.Provider) error { return nil } +func (m *fakePoolManager) ClearPool() error { return nil } +func (m *fakePoolManager) HasPool() bool { return true } +func (m *fakePoolManager) GetMetrics() (pool.MetricsSnapshot, error) { return pool.MetricsSnapshot{}, nil } +func (m *fakePoolManager) ResetMetrics(_ context.Context, _, _ bool) error { return nil } +func (m *fakePoolManager) ResetProviderErrors(_ context.Context) error { return nil } +func (m *fakePoolManager) IncArticlesDownloaded() {} +func (m *fakePoolManager) UpdateDownloadProgress(_ string, _ int64) {} +func (m *fakePoolManager) IncArticlesPosted() {} +func (m *fakePoolManager) AddProvider(_ nntppool.Provider) error { return nil } +func (m *fakePoolManager) RemoveProvider(_ string) error { return nil } +func (m *fakePoolManager) ResetProviderQuota(_ context.Context, _ string) error { + return nil +} +func (m *fakePoolManager) SetProviderIDs(_ map[string]string) {} + +// noopStreamTracker is a zero-state StreamTracker. The streaming-storm +// tests don't care about stream metrics; they only need a non-nil +// implementation so UsenetReader doesn't nil-deref on +// IncArticlesDownloaded / UpdateDownloadProgress. +type noopStreamTracker struct{} + +func (noopStreamTracker) Add(_, _, _, _, _ string, _ int64) string { return "test-stream" } +func (noopStreamTracker) UpdateProgress(_ string, _ int64) {} +func (noopStreamTracker) UpdateDownloadProgress(_ string, _ int64) {} +func (noopStreamTracker) UpdateCurrentOffset(_ string, _ int64) {} +func (noopStreamTracker) UpdateBufferedOffset(_ string, _ int64) {} +func (noopStreamTracker) Remove(_ string) {} +func (noopStreamTracker) IncArticlesDownloaded() {} +func (noopStreamTracker) IncArticlesPosted() {} + +// buildSegmentData generates N segments of size segSize, each with the +// canonical fake message-ID and offsets that match what production code +// would see for a contiguous file of size N*segSize. The result can be +// assigned directly to fileHandleMeta.SegmentData. +func buildSegmentData(t testing.TB, n, segSize int) []*metapb.SegmentData { + t.Helper() + out := make([]*metapb.SegmentData, n) + for i := 0; i < n; i++ { + out[i] = &metapb.SegmentData{ + Id: segments.MessageID(i), + SegmentSize: int64(segSize), + StartOffset: 0, + EndOffset: int64(segSize - 1), + } + } + return out +} + +// configurePoolForFile teaches the fakepool how to satisfy every segment +// of a synthetic file built via buildSegmentData(n, segSize). Each +// message-ID gets the deterministic payload and any latency the caller +// supplied via the SegmentBehavior template (Bytes is overwritten). +func configurePoolForFile(fp *fakepool.Client, n, segSize int, behavior fakepool.SegmentBehavior) { + for i := 0; i < n; i++ { + b := behavior + b.Bytes = segments.Payload(i, segSize) + fp.SetBehavior(segments.MessageID(i), b) + } +} + +// newTestMVF builds a MetadataVirtualFile suitable for the streaming- +// storm tests. The file is plain (no encryption, no nested sources), of +// total size n*segSize, wired to the supplied fake pool. The maxPrefetch +// parameter mirrors production's per-reader concurrency cap. +// +// The file's metadataService, healthRepository, arrsService, ciphers, and +// rcloneClient are left nil; the read paths exercised here never touch +// them on the happy path, and leaving them nil verifies the nil-guards +// stay in place. +func newTestMVF( + t testing.TB, + ctx context.Context, + fp *fakepool.Client, + n, segSize, maxPrefetch int, +) *MetadataVirtualFile { + t.Helper() + segData := buildSegmentData(t, n, segSize) + mvf := &MetadataVirtualFile{ + name: "test-virtual-file", + meta: &fileHandleMeta{ + FileSize: int64(n * segSize), + SegmentData: segData, + }, + poolManager: newFakePoolManager(fp), + ctx: ctx, + maxPrefetch: maxPrefetch, + originalRangeEnd: -1, // unbounded — equivalent to "no Range header" + streamTracker: noopStreamTracker{}, + streamID: "test-stream", + } + t.Cleanup(func() { _ = mvf.Close() }) + return mvf +} diff --git a/internal/pool/manager.go b/internal/pool/manager.go index 5137fc97..cfc45dc5 100644 --- a/internal/pool/manager.go +++ b/internal/pool/manager.go @@ -11,10 +11,13 @@ import ( "github.com/javi11/nntppool/v4" ) -// Manager provides centralized NNTP connection pool management +// Manager provides centralized NNTP connection pool management. type Manager interface { - // GetPool returns the current connection pool or error if not available - GetPool() (*nntppool.Client, error) + // GetPool returns the current connection pool or error if not available. + // The returned client exposes the narrow NntpClient surface so tests can + // substitute a fake (see internal/testsupport/fakepool). In production it + // is backed by *nntppool.Client. + GetPool() (NntpClient, error) // SetProviders creates/recreates the pool with new providers SetProviders(providers []nntppool.Provider) error @@ -144,8 +147,9 @@ func (m *manager) injectQuotaState(providers []nntppool.Provider) { } } -// GetPool returns the current connection pool or error if not available -func (m *manager) GetPool() (*nntppool.Client, error) { +// GetPool returns the current connection pool or error if not available. +// The concrete return type is *nntppool.Client which satisfies NntpClient. +func (m *manager) GetPool() (NntpClient, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/pool/nntpclient.go b/internal/pool/nntpclient.go new file mode 100644 index 00000000..9366fec6 --- /dev/null +++ b/internal/pool/nntpclient.go @@ -0,0 +1,48 @@ +package pool + +import ( + "context" + "io" + + "github.com/javi11/nntppool/v4" +) + +// NntpClient is the narrow surface of the underlying nntppool.Client that the +// rest of AltMount calls through Manager.GetPool. Defining it here lets tests +// inject a deterministic fake (see internal/testsupport/fakepool) without +// standing up real NNTP connections, and pins exactly which operations the +// streaming, import, validation, and metrics paths depend on. +// +// Implementations must be safe for concurrent use. The production +// implementation is *nntppool.Client; the contract below intentionally mirrors +// its signatures so the existing client satisfies the interface without an +// adapter. +// +// Keep this interface small. Anything that needs a behavior not listed here +// should add the method explicitly so callers stay observable. +type NntpClient interface { + // Body fetches an article body via the default (non-priority) lane. + // Used by the importer to download NZB segments during scanning. + Body(ctx context.Context, messageID string, onMeta ...func(nntppool.YEncMeta)) (*nntppool.ArticleBody, error) + + // BodyAsync fetches an article body asynchronously, streaming the decoded + // payload to w. The returned channel yields exactly one BodyResult. + BodyAsync(ctx context.Context, messageID string, w io.Writer, onMeta ...func(nntppool.YEncMeta)) <-chan nntppool.BodyResult + + // BodyPriority fetches an article body via the priority lane. Streaming + // reads use this so live playback isn't queued behind a background import. + BodyPriority(ctx context.Context, messageID string, onMeta ...func(nntppool.YEncMeta)) (*nntppool.ArticleBody, error) + + // Stat checks whether an article exists on at least one provider without + // downloading the body. Used by health checks and validation. + Stat(ctx context.Context, messageID string) (*nntppool.StatResult, error) + + // Stats returns a snapshot of pool/provider statistics used by the metrics + // tracker and the system handlers. + Stats() nntppool.ClientStats +} + +// Compile-time assertion: the real client must satisfy the narrow interface. +// If nntppool changes a signature, this line will fail to build and the +// interface above must be updated to match. +var _ NntpClient = (*nntppool.Client)(nil) diff --git a/internal/testsupport/fakepool/fakepool.go b/internal/testsupport/fakepool/fakepool.go new file mode 100644 index 00000000..3af03d79 --- /dev/null +++ b/internal/testsupport/fakepool/fakepool.go @@ -0,0 +1,362 @@ +// Package fakepool provides a deterministic in-process replacement for +// nntppool.Client used by tests that exercise AltMount's streaming and +// connection-management invariants. +// +// The real nntppool.Client opens TCP sockets, dials providers, and runs +// background reconnect / quota / pipelining goroutines. None of that is +// useful when the question under test is "does the streaming pipeline cap +// in-flight downloads", "does a slow provider cause a retry storm", or +// "do ephemeral readers leak goroutines". For those questions we need a +// client we can drive moment-by-moment: control latency, inject specific +// errors, count concurrent calls, and observe whether the caller honors +// the backpressure we apply. +// +// Client satisfies pool.NntpClient (see internal/pool/nntpclient.go). The +// production code never sees a difference; tests inject *Client directly via +// a pool getter closure that returns it as pool.NntpClient. +// +// # Observability primitives +// +// Every Body / BodyPriority / BodyAsync / Stat call increments InFlight on +// entry and decrements on exit. MaxInFlight records the high-water mark. +// Tests assert against these counters to pin invariants like +// "no more than N segment downloads are ever in flight". +// +// # Failure injection +// +// SegmentBehavior controls per-message-ID latency, byte payload, and error. +// Set via SetBehavior or SetDefaultBehavior. Behaviors are evaluated at the +// start of each call; later changes apply only to subsequent calls. +// +// # Backpressure simulation +// +// BlockUntil pins every in-flight call at the entry semaphore until the +// caller closes the returned channel. Use it to hold connections "open" +// while observing how the rest of the pipeline reacts. +// +// All public methods are safe for concurrent use. +package fakepool + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/javi11/altmount/internal/pool" + "github.com/javi11/nntppool/v4" +) + +// compile-time assertion: Client must satisfy the narrow interface. +var _ pool.NntpClient = (*Client)(nil) + +// SegmentBehavior describes how the fake should respond to a single +// message-ID. The zero value returns an empty body with no delay. +type SegmentBehavior struct { + // Latency is the wall-clock delay added before the call returns. + // Honors ctx cancellation: if the context fires first, the call + // returns ctx.Err() and does not pretend to have produced data. + Latency time.Duration + + // Bytes is the payload returned in ArticleBody.Bytes. Length is also + // used for BytesDecoded. + Bytes []byte + + // Err, if non-nil, is returned instead of a body. Use nntppool sentinel + // errors (e.g. nntppool.ErrArticleNotFound) to exercise specific paths + // in the retry/dispatch logic. + Err error +} + +// Client is a fake nntppool.Client suitable for unit and concurrency tests. +type Client struct { + mu sync.RWMutex + defaultBehavior SegmentBehavior + perSegment map[string]SegmentBehavior + releaseGate <-chan struct{} // nil = no gate; closed = always permit + stats nntppool.ClientStats + hasFixedStats bool + + // Atomic counters for observability. + inFlight atomic.Int32 + maxInFlight atomic.Int32 + totalCalls atomic.Int64 + bodyCalls atomic.Int64 + bodyPriCalls atomic.Int64 + bodyAsyncCalls atomic.Int64 + statCalls atomic.Int64 + + // Per-message-ID call counts (string → *atomic.Int64). Tests that need + // to assert how often a specific segment was requested (e.g. to detect + // retry storms) read this map via PerMessageCalls. + perIDCalls sync.Map +} + +// New returns a fake client. Without further configuration it returns an +// empty (zero-byte) ArticleBody immediately for every message-ID. +func New() *Client { + return &Client{ + perSegment: make(map[string]SegmentBehavior), + } +} + +// SetDefaultBehavior sets the behavior used for any message-ID that has no +// per-segment override. +func (c *Client) SetDefaultBehavior(b SegmentBehavior) { + c.mu.Lock() + c.defaultBehavior = b + c.mu.Unlock() +} + +// SetBehavior sets a per-message-ID behavior. Overrides SetDefaultBehavior. +func (c *Client) SetBehavior(messageID string, b SegmentBehavior) { + c.mu.Lock() + c.perSegment[messageID] = b + c.mu.Unlock() +} + +// BlockUntil installs a gate that pins every subsequent call inside the +// fake (after counter increment, before doing any work) until release is +// closed. Useful for asserting "exactly N calls are concurrently in flight +// while the gate is closed". +// +// Pass nil to remove the gate. Setting a new gate replaces any prior one; +// previously gated calls observing the old gate continue to wait on it. +func (c *Client) BlockUntil(release <-chan struct{}) { + c.mu.Lock() + c.releaseGate = release + c.mu.Unlock() +} + +// SetStats overrides what Stats() returns. Tests that exercise the metrics +// tracker can use this to feed deterministic provider data. +func (c *Client) SetStats(s nntppool.ClientStats) { + c.mu.Lock() + c.stats = s + c.hasFixedStats = true + c.mu.Unlock() +} + +// InFlight returns the number of calls currently inside the fake. +func (c *Client) InFlight() int32 { return c.inFlight.Load() } + +// MaxInFlight returns the high-water mark of InFlight observed since the +// client was created (or since the last ResetCounters). +func (c *Client) MaxInFlight() int32 { return c.maxInFlight.Load() } + +// TotalCalls returns the total number of method invocations served. +func (c *Client) TotalCalls() int64 { return c.totalCalls.Load() } + +// BodyCalls returns the count of Body invocations. +func (c *Client) BodyCalls() int64 { return c.bodyCalls.Load() } + +// BodyPriorityCalls returns the count of BodyPriority invocations. This is +// the most useful counter for streaming-path tests since UsenetReader uses +// BodyPriority exclusively. +func (c *Client) BodyPriorityCalls() int64 { return c.bodyPriCalls.Load() } + +// BodyAsyncCalls returns the count of BodyAsync invocations. +func (c *Client) BodyAsyncCalls() int64 { return c.bodyAsyncCalls.Load() } + +// StatCalls returns the count of Stat invocations. +func (c *Client) StatCalls() int64 { return c.statCalls.Load() } + +// PerMessageCalls returns how many times the given message-ID was requested +// across all method types (Body / BodyPriority / BodyAsync / Stat). +// +// This is the primary signal for retry-storm tests: if a retry policy is +// re-issuing failed requests, the per-ID count climbs faster than the +// number of distinct segments and the assertion fails. +func (c *Client) PerMessageCalls(messageID string) int64 { + v, ok := c.perIDCalls.Load(messageID) + if !ok { + return 0 + } + return v.(*atomic.Int64).Load() +} + +// ResetCounters zeroes all observability counters. Behaviors and gates are +// preserved. Useful between phases of a single test. +func (c *Client) ResetCounters() { + c.inFlight.Store(0) + c.maxInFlight.Store(0) + c.totalCalls.Store(0) + c.bodyCalls.Store(0) + c.bodyPriCalls.Store(0) + c.bodyAsyncCalls.Store(0) + c.statCalls.Store(0) + c.perIDCalls.Range(func(k, _ any) bool { + c.perIDCalls.Delete(k) + return true + }) +} + +// countMessage increments the per-ID counter atomically, lazily creating +// the counter on first contact. +func (c *Client) countMessage(messageID string) { + if v, ok := c.perIDCalls.Load(messageID); ok { + v.(*atomic.Int64).Add(1) + return + } + var fresh atomic.Int64 + fresh.Add(1) + actual, loaded := c.perIDCalls.LoadOrStore(messageID, &fresh) + if loaded { + actual.(*atomic.Int64).Add(1) + } +} + +// enter increments in-flight counters and waits at the gate (if any). +// Returns a function to call on exit. +func (c *Client) enter() func() { + cur := c.inFlight.Add(1) + c.totalCalls.Add(1) + for { + hwm := c.maxInFlight.Load() + if cur <= hwm || c.maxInFlight.CompareAndSwap(hwm, cur) { + break + } + } + c.mu.RLock() + gate := c.releaseGate + c.mu.RUnlock() + if gate != nil { + <-gate + } + return func() { c.inFlight.Add(-1) } +} + +// behaviorFor resolves the SegmentBehavior for a given message-ID, falling +// back to the default. +func (c *Client) behaviorFor(messageID string) SegmentBehavior { + c.mu.RLock() + defer c.mu.RUnlock() + if b, ok := c.perSegment[messageID]; ok { + return b + } + return c.defaultBehavior +} + +// waitOrCancel waits for d to elapse or ctx to fire, whichever first. +// Returns ctx.Err() on cancellation, nil otherwise. +func waitOrCancel(ctx context.Context, d time.Duration) error { + if d <= 0 { + if err := ctx.Err(); err != nil { + return err + } + return nil + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return nil + } +} + +// Body satisfies pool.NntpClient. Returns either the configured error or an +// ArticleBody filled with the configured Bytes after the configured latency. +func (c *Client) Body(ctx context.Context, messageID string, onMeta ...func(nntppool.YEncMeta)) (*nntppool.ArticleBody, error) { + c.bodyCalls.Add(1) + c.countMessage(messageID) + defer c.enter()() + return c.serveBody(ctx, messageID, nil) +} + +// BodyPriority is identical to Body but counted separately so tests can +// distinguish streaming from importer traffic. +func (c *Client) BodyPriority(ctx context.Context, messageID string, onMeta ...func(nntppool.YEncMeta)) (*nntppool.ArticleBody, error) { + c.bodyPriCalls.Add(1) + c.countMessage(messageID) + defer c.enter()() + return c.serveBody(ctx, messageID, nil) +} + +// BodyAsync streams the configured Bytes (or error) to w and yields a +// BodyResult on the returned channel. +func (c *Client) BodyAsync(ctx context.Context, messageID string, w io.Writer, onMeta ...func(nntppool.YEncMeta)) <-chan nntppool.BodyResult { + c.bodyAsyncCalls.Add(1) + c.countMessage(messageID) + ch := make(chan nntppool.BodyResult, 1) + go func() { + defer c.enter()() + body, err := c.serveBody(ctx, messageID, w) + ch <- nntppool.BodyResult{Body: body, Err: err} + close(ch) + }() + return ch +} + +// Stat returns a StatResult with the message-ID echoed, after the +// configured latency. If the behavior has Err set, it is returned. +func (c *Client) Stat(ctx context.Context, messageID string) (*nntppool.StatResult, error) { + c.statCalls.Add(1) + c.countMessage(messageID) + defer c.enter()() + b := c.behaviorFor(messageID) + if err := waitOrCancel(ctx, b.Latency); err != nil { + return nil, err + } + if b.Err != nil { + return nil, b.Err + } + return &nntppool.StatResult{MessageID: messageID}, nil +} + +// Stats returns the configured stats or a zero value. +func (c *Client) Stats() nntppool.ClientStats { + c.mu.RLock() + defer c.mu.RUnlock() + if c.hasFixedStats { + return c.stats + } + return nntppool.ClientStats{} +} + +func (c *Client) serveBody(ctx context.Context, messageID string, w io.Writer) (*nntppool.ArticleBody, error) { + b := c.behaviorFor(messageID) + if err := waitOrCancel(ctx, b.Latency); err != nil { + return nil, err + } + if b.Err != nil { + // Mirror nntppool: on error the result may still carry partial + // metadata. Tests that need that can extend this; the common case + // returns nil. + return nil, b.Err + } + payload := b.Bytes + if w != nil && len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + return nil, err + } + } + body := &nntppool.ArticleBody{ + MessageID: messageID, + BytesDecoded: len(payload), + } + if w == nil { + body.Bytes = payload + } + return body, nil +} + +// AssertMaxInFlightLE fails the test if the high-water mark exceeds n. +// Use this as the standard assertion at the end of any test that pins a +// concurrency cap. +func AssertMaxInFlightLE(tb interface { + Helper() + Errorf(format string, args ...any) +}, c *Client, n int32) { + tb.Helper() + if got := c.MaxInFlight(); got > n { + tb.Errorf("fakepool: MaxInFlight=%d, want <= %d", got, n) + } +} + +// ErrSimulated502 is a generic transient error suitable for retry-storm +// scenarios. It is wrapped so errors.Is(err, ErrSimulated502) works. +var ErrSimulated502 = errors.New("fakepool: simulated 502 service unavailable") diff --git a/internal/testsupport/fakepool/fakepool_test.go b/internal/testsupport/fakepool/fakepool_test.go new file mode 100644 index 00000000..d0594115 --- /dev/null +++ b/internal/testsupport/fakepool/fakepool_test.go @@ -0,0 +1,161 @@ +package fakepool + +import ( + "context" + "errors" + "io" + "sync" + "testing" + "time" + + "github.com/javi11/nntppool/v4" +) + +// TestFake_BasicBodyReturnsPayload pins the simplest contract: a configured +// payload comes back as ArticleBody.Bytes with BytesDecoded set. Without this +// the fake can't substitute for nntppool.Client in any downstream test. +func TestFake_BasicBodyReturnsPayload(t *testing.T) { + t.Parallel() + c := New() + c.SetBehavior("seg-1", SegmentBehavior{Bytes: []byte("hello")}) + + body, err := c.BodyPriority(context.Background(), "seg-1") + if err != nil { + t.Fatalf("BodyPriority error: %v", err) + } + if string(body.Bytes) != "hello" { + t.Errorf("Bytes = %q, want %q", body.Bytes, "hello") + } + if body.BytesDecoded != 5 { + t.Errorf("BytesDecoded = %d, want 5", body.BytesDecoded) + } + if c.BodyPriorityCalls() != 1 { + t.Errorf("BodyPriorityCalls = %d, want 1", c.BodyPriorityCalls()) + } +} + +// TestFake_DefaultBehaviorAppliesWhenNoOverride confirms message-IDs without +// an explicit behavior fall through to the default. +func TestFake_DefaultBehaviorAppliesWhenNoOverride(t *testing.T) { + t.Parallel() + c := New() + c.SetDefaultBehavior(SegmentBehavior{Bytes: []byte("default")}) + body, err := c.Body(context.Background(), "any-id") + if err != nil { + t.Fatalf("Body error: %v", err) + } + if string(body.Bytes) != "default" { + t.Errorf("Bytes = %q, want %q", body.Bytes, "default") + } +} + +// TestFake_ErrorPropagates verifies that injected errors are returned +// verbatim — the downstream retry tests depend on this for things like +// nntppool.ErrArticleNotFound. +func TestFake_ErrorPropagates(t *testing.T) { + t.Parallel() + c := New() + c.SetBehavior("missing", SegmentBehavior{Err: nntppool.ErrArticleNotFound}) + _, err := c.BodyPriority(context.Background(), "missing") + if !errors.Is(err, nntppool.ErrArticleNotFound) { + t.Errorf("err = %v, want ErrArticleNotFound", err) + } +} + +// TestFake_ContextCancellationDuringLatencyShortCircuits proves the fake +// honors ctx — without this, slow-latency tests could not assert that +// cancelled requests stop spending wall time. +func TestFake_ContextCancellationDuringLatencyShortCircuits(t *testing.T) { + t.Parallel() + c := New() + c.SetDefaultBehavior(SegmentBehavior{Latency: 5 * time.Second}) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(20 * time.Millisecond) + cancel() + }() + + start := time.Now() + _, err := c.BodyPriority(ctx, "x") + elapsed := time.Since(start) + if !errors.Is(err, context.Canceled) { + t.Fatalf("err = %v, want context.Canceled", err) + } + if elapsed > time.Second { + t.Errorf("call took %v, expected to short-circuit", elapsed) + } +} + +// TestFake_InFlightCounterTracksConcurrency is the core observability test: +// if we fire N parallel calls and pin them behind a gate, MaxInFlight should +// equal N exactly. Every storm-detection test in the project relies on this +// counter being accurate. +func TestFake_InFlightCounterTracksConcurrency(t *testing.T) { + t.Parallel() + c := New() + c.SetDefaultBehavior(SegmentBehavior{Bytes: []byte("x")}) + + release := make(chan struct{}) + c.BlockUntil(release) + + const N = 10 + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + _, _ = c.BodyPriority(context.Background(), "seg") + }() + } + + // Wait for all goroutines to reach the gate. Polling is fine; the + // timeout protects against deadlock. + deadline := time.Now().Add(2 * time.Second) + for c.InFlight() < N && time.Now().Before(deadline) { + time.Sleep(time.Millisecond) + } + if got := c.InFlight(); got != N { + t.Fatalf("InFlight at gate = %d, want %d", got, N) + } + + close(release) + wg.Wait() + + if got := c.MaxInFlight(); got != N { + t.Errorf("MaxInFlight = %d, want %d", got, N) + } + if got := c.InFlight(); got != 0 { + t.Errorf("InFlight after drain = %d, want 0", got) + } +} + +// TestFake_BodyAsyncWritesToWriter mirrors the importer's BodyAsync use: +// the decoded payload must arrive on the writer, and the channel must yield +// exactly one BodyResult. +func TestFake_BodyAsyncWritesToWriter(t *testing.T) { + t.Parallel() + c := New() + c.SetBehavior("a", SegmentBehavior{Bytes: []byte("payload")}) + pr, pw := io.Pipe() + defer pr.Close() + defer pw.Close() + + ch := c.BodyAsync(context.Background(), "a", pw) + + buf := make([]byte, 7) + if _, err := io.ReadFull(pr, buf); err != nil { + t.Fatalf("pipe read: %v", err) + } + if string(buf) != "payload" { + t.Errorf("read %q, want %q", buf, "payload") + } + select { + case res := <-ch: + if res.Err != nil { + t.Errorf("BodyAsync result err = %v", res.Err) + } + case <-time.After(time.Second): + t.Fatal("BodyAsync did not yield a result") + } +} diff --git a/internal/testsupport/goroutines/goroutines.go b/internal/testsupport/goroutines/goroutines.go new file mode 100644 index 00000000..28c9c8f3 --- /dev/null +++ b/internal/testsupport/goroutines/goroutines.go @@ -0,0 +1,99 @@ +// Package goroutines provides a small helper for tests that need to assert +// goroutine leaks — specifically, the closeWg accumulation scenarios in the +// streaming pipeline where rapid seek/close cycles can spawn background +// goroutines that outlive the request. +// +// Goroutine counting is inherently noisy: the Go runtime spins up internal +// goroutines (GC, sysmon, http2 pingers, finalizer goroutines) that come +// and go independently of test code. The Snapshot / AssertReturnedToBaseline +// pattern below tolerates that noise with a small slack window and a polling +// timeout, while still catching the "N spawned, none reaped" leaks this +// codebase has historically been vulnerable to. +// +// Usage: +// +// snap := goroutines.Take(t) +// doWorkThatMustNotLeak() +// snap.AssertReturnedToBaseline(t, 5*time.Second) +// +// The assertion polls runtime.NumGoroutine() until it lands within +// DefaultSlack of the original count or the timeout fires. +package goroutines + +import ( + "fmt" + "runtime" + "testing" + "time" +) + +// DefaultSlack is the tolerance allowed when comparing goroutine counts +// before and after a test phase. Set high enough to absorb runtime jitter +// (GC workers, finalizers) but low enough that a real leak of >5 goroutines +// is caught. +const DefaultSlack = 5 + +// Snapshot is a point-in-time goroutine count. +type Snapshot struct { + baseline int +} + +// Take records the current goroutine count. Call once before the code under +// test, then call AssertReturnedToBaseline after it should have settled. +func Take(t testing.TB) Snapshot { + t.Helper() + // Two GCs and a short pause give the runtime a chance to reap goroutines + // that finished but haven't been removed from runtime.NumGoroutine yet. + // This keeps the baseline tight without making the helper itself a + // source of flakes. + runtime.GC() + runtime.GC() + time.Sleep(10 * time.Millisecond) + return Snapshot{baseline: runtime.NumGoroutine()} +} + +// Baseline returns the recorded count. +func (s Snapshot) Baseline() int { return s.baseline } + +// AssertReturnedToBaseline polls runtime.NumGoroutine until it reaches +// baseline + DefaultSlack or until timeout elapses. On timeout the test +// fails with a diagnostic that includes the current and target counts and +// a partial dump of the goroutine stacks — usually enough to locate the +// leak source without rerunning under a profiler. +func (s Snapshot) AssertReturnedToBaseline(t testing.TB, timeout time.Duration) { + s.AssertReturnedToBaselineWithSlack(t, timeout, DefaultSlack) +} + +// AssertReturnedToBaselineWithSlack is the explicit-tolerance variant. Use +// when the code under test is known to spawn a fixed number of background +// goroutines that are part of its design (e.g. a worker pool). +func (s Snapshot) AssertReturnedToBaselineWithSlack(t testing.TB, timeout time.Duration, slack int) { + t.Helper() + deadline := time.Now().Add(timeout) + target := s.baseline + slack + for { + runtime.Gosched() + cur := runtime.NumGoroutine() + if cur <= target { + return + } + if time.Now().After(deadline) { + t.Errorf( + "goroutine leak: count=%d, baseline=%d, allowed=%d, timeout=%s\n%s", + cur, s.baseline, target, timeout, partialStackDump(), + ) + return + } + time.Sleep(25 * time.Millisecond) + } +} + +// partialStackDump returns the first ~8KB of runtime.Stack(_, true) — enough +// to identify the leaking goroutine family without dumping megabytes of +// stack data into the test log. +func partialStackDump() string { + buf := make([]byte, 8*1024) + n := runtime.Stack(buf, true) + return fmt.Sprintf("--- partial goroutine dump (%d bytes truncated to %d) ---\n%s", + n, len(buf), buf[:n]) +} diff --git a/internal/testsupport/goroutines/goroutines_test.go b/internal/testsupport/goroutines/goroutines_test.go new file mode 100644 index 00000000..6780e9cd --- /dev/null +++ b/internal/testsupport/goroutines/goroutines_test.go @@ -0,0 +1,65 @@ +package goroutines + +import ( + "sync" + "testing" + "time" +) + +// TestSnapshot_ReturnsToBaselineWhenNoLeak proves the happy path: spawn N +// goroutines, wait for them to finish, and assert the snapshot succeeds. +// Without this the leak-detection tests can't trust a passing run means +// "no leak" — it could just mean the helper is broken. +func TestSnapshot_ReturnsToBaselineWhenNoLeak(t *testing.T) { + t.Parallel() + snap := Take(t) + + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(5 * time.Millisecond) + }() + } + wg.Wait() + + snap.AssertReturnedToBaseline(t, 2*time.Second) +} + +// TestSnapshot_DetectsLeak proves the failure path: a leaked goroutine +// must cause the assertion to fail. We use a sub-test with a custom +// testing.TB shim so we can observe the failure without failing the +// parent test. +func TestSnapshot_DetectsLeak(t *testing.T) { + t.Parallel() + snap := Take(t) + + // Spawn a leak intentionally — these will never exit during the test. + stop := make(chan struct{}) + defer close(stop) + for i := 0; i < 50; i++ { + go func() { + <-stop + }() + } + + shim := &recordingTB{realTB: t} + snap.AssertReturnedToBaselineWithSlack(shim, 300*time.Millisecond, DefaultSlack) + if !shim.failed { + t.Errorf("expected leak detection to call Errorf, but it did not") + } +} + +// recordingTB captures Errorf calls so we can assert the helper detected +// the leak without polluting the parent test's pass/fail state. +type recordingTB struct { + testing.TB + realTB testing.TB + failed bool +} + +func (r *recordingTB) Helper() {} +func (r *recordingTB) Errorf(format string, args ...any) { r.failed = true } +func (r *recordingTB) Fatalf(format string, args ...any) { r.failed = true; r.realTB.FailNow() } +func (r *recordingTB) Logf(format string, args ...any) { r.realTB.Logf(format, args...) } diff --git a/internal/testsupport/segments/segments.go b/internal/testsupport/segments/segments.go new file mode 100644 index 00000000..92ea775d --- /dev/null +++ b/internal/testsupport/segments/segments.go @@ -0,0 +1,87 @@ +// Package segments builds deterministic segment payloads and identifiers for +// use by streaming/connection tests. +// +// The fakepool client (internal/testsupport/fakepool) returns whatever bytes +// you configure for a given message-ID; this package supplies a consistent +// naming scheme and a recoverable payload format so that a test which +// downloads segments through the streaming pipeline can verify that the +// reassembled bytes match what was injected. +// +// # Naming +// +// MessageID(i) returns "altmount-test-seg-NNNNNN@fake" — six-digit zero-padded +// so lexicographic sort matches segment order. Tests pin this format so the +// fakepool behaviors and the segment-range builder agree without sharing a +// channel. +// +// # Payload shape +// +// Payload(i, size) returns a deterministic byte slice of the requested size. +// The first 32 bytes encode the segment index in ASCII (left-justified, NUL +// padded), so a hexdump of any cached buffer instantly shows which segment +// it came from. The remaining bytes are a repeating pattern derived from i, +// cheap to generate and stable across runs. +// +// # File reassembly +// +// FileBytes(n, size) returns the concatenation of Payload(0..n-1, size). +// Tests that read a virtual file end-to-end through the streaming pipeline +// can compare the bytes they receive to this slice to catch ordering or +// boundary bugs. +package segments + +import ( + "fmt" +) + +// MessageIDPrefix is the constant prefix used for all generated message-IDs. +// Exposed so fakepool setup loops can sanity-check inputs. +const MessageIDPrefix = "altmount-test-seg-" + +// MessageID returns the canonical fake message-ID for the i-th segment. +func MessageID(i int) string { + return fmt.Sprintf("%s%06d@fake", MessageIDPrefix, i) +} + +// MessageIDs returns the first n message-IDs as a convenience for callers +// that want to seed fakepool behaviors in bulk. +func MessageIDs(n int) []string { + out := make([]string, n) + for i := range out { + out[i] = MessageID(i) + } + return out +} + +// Payload returns a deterministic byte slice for segment i. Bytes 0..31 +// encode the index in ASCII; the remainder follow a stable cheap pattern. +// Length is always exactly size. A size <= 0 returns nil. +func Payload(i, size int) []byte { + if size <= 0 { + return nil + } + out := make([]byte, size) + header := fmt.Sprintf("seg-%06d", i) + copy(out, header) + // Fill remainder with a repeating byte derived from i. Using i directly + // (mod 251 — a prime under 256) gives every segment a unique fill byte + // for small i, which makes hexdumps trivially distinguishable. + fill := byte((i % 251) + 1) + for j := len(header); j < size; j++ { + out[j] = fill + } + return out +} + +// FileBytes returns the concatenation of Payload(0..n-1, size). The result +// is the expected output of a sequential read across n segments. +func FileBytes(n, size int) []byte { + if n <= 0 || size <= 0 { + return nil + } + out := make([]byte, 0, n*size) + for i := 0; i < n; i++ { + out = append(out, Payload(i, size)...) + } + return out +} diff --git a/internal/testsupport/segments/segments_test.go b/internal/testsupport/segments/segments_test.go new file mode 100644 index 00000000..da581325 --- /dev/null +++ b/internal/testsupport/segments/segments_test.go @@ -0,0 +1,67 @@ +package segments + +import ( + "bytes" + "strings" + "testing" +) + +// TestMessageID_FormatStableAndSortable pins the canonical form so other +// test packages can construct IDs without importing this helper if they +// prefer raw strings — the format is part of the public contract. +func TestMessageID_FormatStableAndSortable(t *testing.T) { + t.Parallel() + a := MessageID(7) + b := MessageID(42) + wantA := "altmount-test-seg-000007@fake" + wantB := "altmount-test-seg-000042@fake" + if a != wantA { + t.Errorf("MessageID(7) = %q, want %q", a, wantA) + } + if b != wantB { + t.Errorf("MessageID(42) = %q, want %q", b, wantB) + } + if a >= b { + t.Errorf("lexicographic order broken: %q >= %q", a, b) + } + if !strings.HasPrefix(a, MessageIDPrefix) { + t.Errorf("MessageID missing prefix %q", MessageIDPrefix) + } +} + +// TestPayload_HeaderEncodesIndex makes the diagnostic property explicit: +// the first bytes of every payload must identify its segment so test +// failures can be inspected by hexdump alone. +func TestPayload_HeaderEncodesIndex(t *testing.T) { + t.Parallel() + p := Payload(123, 100) + if len(p) != 100 { + t.Fatalf("len = %d, want 100", len(p)) + } + if !bytes.HasPrefix(p, []byte("seg-000123")) { + t.Errorf("payload header = %q, want prefix %q", p[:10], "seg-000123") + } +} + +// TestPayload_DeterministicAcrossCalls pins reproducibility — the storming +// tests rely on this so that re-running them with a different seed isn't +// possible by accident. +func TestPayload_DeterministicAcrossCalls(t *testing.T) { + t.Parallel() + p1 := Payload(5, 200) + p2 := Payload(5, 200) + if !bytes.Equal(p1, p2) { + t.Errorf("Payload(5, 200) is non-deterministic") + } +} + +// TestFileBytes_ConcatenatesInOrder catches the obvious off-by-one in the +// concatenator and serves as the reassembly oracle for end-to-end tests. +func TestFileBytes_ConcatenatesInOrder(t *testing.T) { + t.Parallel() + got := FileBytes(3, 50) + want := append(append(Payload(0, 50), Payload(1, 50)...), Payload(2, 50)...) + if !bytes.Equal(got, want) { + t.Errorf("FileBytes(3, 50) mismatch") + } +} diff --git a/internal/usenet/race_test.go b/internal/usenet/race_test.go index e2e49666..53900668 100644 --- a/internal/usenet/race_test.go +++ b/internal/usenet/race_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/javi11/nntppool/v4" + "github.com/javi11/altmount/internal/pool" ) func TestUsenetReader_Race_Close_GetBufferedOffset(t *testing.T) { @@ -25,7 +25,7 @@ func TestUsenetReader_Race_Close_GetBufferedOffset(t *testing.T) { } // Mock pool getter that returns error (so we don't need real pool) - poolGetter := func() (*nntppool.Client, error) { + poolGetter := func() (pool.NntpClient, error) { return nil, fmt.Errorf("mock error") } diff --git a/internal/usenet/streamtest_helpers_test.go b/internal/usenet/streamtest_helpers_test.go new file mode 100644 index 00000000..a6a311fb --- /dev/null +++ b/internal/usenet/streamtest_helpers_test.go @@ -0,0 +1,73 @@ +package usenet + +import ( + "context" + "testing" + + "github.com/javi11/altmount/internal/pool" + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" +) + +// streamtest_helpers_test.go is the test-only construction kit shared by all +// streaming/connection invariant tests in this package. Keeping it inside +// internal/usenet (rather than internal/testsupport) is deliberate: the +// segment and segmentRange types are unexported, and exposing them would +// invite production code to depend on the test shape. +// +// See STREAMING_INVARIANTS.md (in this directory) for the contract these tests pin. + +// noopMetrics is a MetricsTracker that satisfies the interface without any +// state. Used by tests that don't care about metrics observation — most do +// not, since the in-flight counter on the fake pool is the primary signal. +type noopMetrics struct{} + +func (noopMetrics) IncArticlesDownloaded() {} +func (noopMetrics) IncArticlesPosted() {} +func (noopMetrics) UpdateDownloadProgress(_ string, _ int64) {} + +// buildEagerRange creates a segmentRange backed by an in-memory slice of +// segments. Each segment has Id=segments.MessageID(i), spans [0, size-1] +// within itself, and reports SegmentSize=size. The range covers the +// concatenated byte stream of all segments. +// +// "Eager" means no SegmentLoader is attached, so the segments slice must +// already be populated — which matches the simplest path through +// downloadManager and avoids dragging metadata-layer concerns into a pure +// streaming test. +func buildEagerRange(ctx context.Context, t testing.TB, n, segSize int) *segmentRange { + t.Helper() + segs := make([]*segment, n) + for i := range segs { + segs[i] = newSegment(segments.MessageID(i), 0, int64(segSize-1), int64(segSize), nil) + } + return &segmentRange{ + segments: segs, + start: 0, + end: int64(n*segSize - 1), + ctx: ctx, + } +} + +// newReaderForTest constructs a UsenetReader wired to the supplied fake +// pool. The maxPrefetch parameter mirrors production semantics: it bounds +// how many segments the downloadManager may schedule ahead of the current +// read position. +func newReaderForTest(t testing.TB, ctx context.Context, fp *fakepool.Client, rg *segmentRange, maxPrefetch int) *UsenetReader { + t.Helper() + return newReaderForTestWithClient(t, ctx, fp, rg, maxPrefetch) +} + +// newReaderForTestWithClient is the lower-level constructor used when the +// test needs to inject a wrapper around the fake (e.g. a recording client +// that timestamps calls). The supplied client must satisfy pool.NntpClient. +func newReaderForTestWithClient(t testing.TB, ctx context.Context, cp pool.NntpClient, rg *segmentRange, maxPrefetch int) *UsenetReader { + t.Helper() + getter := func() (pool.NntpClient, error) { return cp, nil } + ur, err := NewUsenetReader(ctx, getter, rg, maxPrefetch, noopMetrics{}, "test-stream", nil) + if err != nil { + t.Fatalf("NewUsenetReader: %v", err) + } + t.Cleanup(func() { _ = ur.Close() }) + return ur +} diff --git a/internal/usenet/usenet_reader.go b/internal/usenet/usenet_reader.go index bbeeb22f..fd389cdd 100644 --- a/internal/usenet/usenet_reader.go +++ b/internal/usenet/usenet_reader.go @@ -12,6 +12,7 @@ import ( "time" "github.com/avast/retry-go/v4" + "github.com/javi11/altmount/internal/pool" "github.com/javi11/altmount/internal/slogutil" "github.com/javi11/nntppool/v4" ) @@ -62,7 +63,7 @@ type UsenetReader struct { initDownload sync.Once closeOnce sync.Once totalBytesRead int64 - poolGetter func() (*nntppool.Client, error) // Dynamic pool getter + poolGetter func() (pool.NntpClient, error) // Dynamic pool getter metricsTracker MetricsTracker streamID string segmentStore SegmentStore // optional, nil = no caching @@ -79,7 +80,7 @@ type UsenetReader struct { func NewUsenetReader( ctx context.Context, - poolGetter func() (*nntppool.Client, error), + poolGetter func() (pool.NntpClient, error), rg *segmentRange, maxPrefetch int, metricsTracker MetricsTracker, @@ -126,6 +127,23 @@ func (b *UsenetReader) Start() { }) } +// Interrupt cancels the reader's context and signals any blocked Read +// to return. Non-blocking and idempotent; safe to call concurrently +// with Read or Close. The caller is still responsible for invoking +// Close to release goroutines and resources. Used by callers (like +// MetadataVirtualFile.Close) that need to abort an in-flight download +// without taking the file's own lock. +func (b *UsenetReader) Interrupt() { + b.cancel() + b.cond.Broadcast() + b.mu.Lock() + rg := b.rg + b.mu.Unlock() + if rg != nil { + rg.CloseSegments() + } +} + func (b *UsenetReader) Close() error { b.closeOnce.Do(func() { b.cancel() @@ -381,18 +399,22 @@ func (b *UsenetReader) downloadSegmentWithRetry(ctx context.Context, seg *segmen return nil }, - // Fix A: differentiated retry strategy - // - ErrArticleNotFound: never retry (article is permanently gone) - // - DeadlineExceeded (timeout): retry immediately, no backoff — a fresh - // nntppool connection is available immediately via round-robin - // - Other errors: reduced attempts with fixed short delay - retry.Attempts(5), - retry.Delay(20*time.Millisecond), + // Retry strategy (post-S1/S3 fix): + // - ErrArticleNotFound: never retry (article is permanently gone). + // - DeadlineExceeded: retry immediately, no backoff — a fresh + // nntppool connection is available via round-robin. + // - Other errors: at most one retry (Attempts=2 total wire calls + // per failure), with exponential backoff + jitter to break + // thundering-herd synchronization across readers. Base=50ms, + // max jitter=100ms → first retry delay drawn from [50, 150]ms. + retry.Attempts(2), + retry.Delay(50*time.Millisecond), + retry.MaxJitter(100*time.Millisecond), retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration { if errors.Is(err, context.DeadlineExceeded) { - return 0 // retry timeouts immediately + return 0 } - return retry.FixedDelay(n, err, config) + return retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)(n, err, config) }), retry.RetryIf(func(err error) bool { if errors.Is(err, nntppool.ErrArticleNotFound) { diff --git a/internal/usenet/usenet_reader_prefetch_test.go b/internal/usenet/usenet_reader_prefetch_test.go new file mode 100644 index 00000000..37aed953 --- /dev/null +++ b/internal/usenet/usenet_reader_prefetch_test.go @@ -0,0 +1,109 @@ +package usenet + +import ( + "context" + "io" + "testing" + "time" + + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" +) + +// Package: invariant pinning for UsenetReader prefetch and retry behavior. +// +// Each test in this file documents a property that the streaming pipeline +// MUST honor for any combination of provider speed, segment count, and +// reader pace. The tests use a fake nntppool client whose in-flight counter +// records the high-water mark of concurrent BodyPriority calls. That single +// metric is the contract: production code that breaks any of these tests +// is regressing one of the connection-storm guarantees described in +// STREAMING_INVARIANTS.md (in this directory). +// +// These tests live in `package usenet` rather than an external _test +// package because segmentRange and segment are unexported; constructing +// them directly is the only way to exercise UsenetReader without dragging +// the metadata layer in. + +// TestPrefetch_RespectsMaxPrefetchUnderSteadyRead pins the per-reader +// invariant: with maxPrefetch=N, the downloadManager must never schedule +// more than N segments concurrently ahead of the current read position. +// +// Method: 50 segments, maxPrefetch=4, a slow provider (30ms per segment) +// and a fast reader. We let the reader drain everything, then assert the +// fake pool's MaxInFlight high-water mark stays <= maxPrefetch. +// +// This test should pass on current code — it documents existing correct +// behavior so future refactors of downloadManager don't regress it. +func TestPrefetch_RespectsMaxPrefetchUnderSteadyRead(t *testing.T) { + t.Parallel() + const ( + segCount = 50 + segSize = 32 + maxPrefetch = 4 + segLatency = 30 * time.Millisecond + ) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < segCount; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: segLatency, + Bytes: segments.Payload(i, segSize), + }) + } + + rg := buildEagerRange(ctx, t, segCount, segSize) + ur := newReaderForTest(t, ctx, fp, rg, maxPrefetch) + ur.Start() + + if _, err := io.ReadAll(ur); err != nil { + t.Fatalf("ReadAll: %v", err) + } + + fakepool.AssertMaxInFlightLE(t, fp, int32(maxPrefetch)) + if got := fp.BodyPriorityCalls(); got != int64(segCount) { + t.Errorf("BodyPriorityCalls = %d, want %d (one per segment, no retries)", + got, segCount) + } +} + +// TestPrefetch_DoesNotExceedMaxPrefetchOnSlowPool pins the same invariant +// in the worst-case shape: a very slow provider (100ms) and a reader that +// blocks for nothing. If downloadManager were to schedule eagerly without +// honoring maxPrefetch when the pool itself is the bottleneck, MaxInFlight +// would rise above the cap — that is the storm condition. +// +// Should pass on current code; protects against a regression that +// "schedules more aggressively when downloads look slow". +func TestPrefetch_DoesNotExceedMaxPrefetchOnSlowPool(t *testing.T) { + t.Parallel() + const ( + segCount = 20 + segSize = 16 + maxPrefetch = 3 + segLatency = 100 * time.Millisecond + ) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < segCount; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: segLatency, + Bytes: segments.Payload(i, segSize), + }) + } + + rg := buildEagerRange(ctx, t, segCount, segSize) + ur := newReaderForTest(t, ctx, fp, rg, maxPrefetch) + ur.Start() + + if _, err := io.ReadAll(ur); err != nil { + t.Fatalf("ReadAll: %v", err) + } + fakepool.AssertMaxInFlightLE(t, fp, int32(maxPrefetch)) +} diff --git a/internal/usenet/usenet_reader_retry_test.go b/internal/usenet/usenet_reader_retry_test.go new file mode 100644 index 00000000..ec6376ea --- /dev/null +++ b/internal/usenet/usenet_reader_retry_test.go @@ -0,0 +1,128 @@ +package usenet + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" + "github.com/javi11/nntppool/v4" +) + +// usenet_reader_retry_test.go pins the retry-policy invariants for the +// segment download path. See STREAMING_INVARIANTS.md (in this directory). + +// TestRetry_ArticleNotFound_NoRetry pins the existing fast-fail policy: +// nntppool.ErrArticleNotFound is a permanent failure and MUST NOT trigger +// a retry. Retrying a missing article wastes provider connections for an +// answer that will never change, and is a measurable contributor to +// connection-storm conditions when whole batches of articles have expired. +// +// The downloadSegmentWithRetry path uses retry.RetryIf to short-circuit +// this error class; this test pins that exactly one BodyPriority call is +// made even though retry.Attempts is 5. +// +// Should pass on current code. +func TestRetry_ArticleNotFound_NoRetry(t *testing.T) { + t.Parallel() + const ( + segCount = 4 + segSize = 16 + maxPrefetch = 4 + ) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + fp := fakepool.New() + // First segment is permanently missing; subsequent segments succeed — + // but we expect the reader to short-circuit on the first failure with + // DataCorruptionError, so they may not be requested at all. + fp.SetBehavior(segments.MessageID(0), fakepool.SegmentBehavior{ + Err: nntppool.ErrArticleNotFound, + }) + for i := 1; i < segCount; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Bytes: segments.Payload(i, segSize), + }) + } + + rg := buildEagerRange(ctx, t, segCount, segSize) + ur := newReaderForTest(t, ctx, fp, rg, maxPrefetch) + ur.Start() + + _, err := io.ReadAll(ur) + // We don't care about the exact error class here — the invariant is + // purely on call count. + _ = err + + // Wait briefly for any straggling prefetch goroutines (started for + // segments 1..N before segment 0's error short-circuited the reader). + time.Sleep(100 * time.Millisecond) + + // segment 0 must have been requested exactly once: the retry policy + // must NOT have re-issued the BodyPriority call. + if got := fp.PerMessageCalls(segments.MessageID(0)); got != 1 { + t.Errorf("segment 0 issued %d BodyPriority calls, want exactly 1 (no retry on ErrArticleNotFound)", got) + } +} + +// TestRetry_ContextCancellation_StopsImmediately pins another half of the +// retry contract: when the reader's context is cancelled mid-flight, any +// pending retry loop MUST honor cancellation and stop issuing new +// BodyPriority calls. +// +// Without this guarantee, closing a stream during a flaky-provider window +// would let the retry loop keep firing requests even though the consumer +// is gone — another way connection counts spike. +// +// Should pass on current code (retry-go honors ctx via retry.Context). +func TestRetry_ContextCancellation_StopsImmediately(t *testing.T) { + t.Parallel() + const ( + segCount = 3 + segSize = 16 + maxPrefetch = 3 + ) + parent, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + fp := fakepool.New() + // Every call fails with a retryable error, with 50ms latency. The + // retry loop should keep trying until we cancel. + fp.SetDefaultBehavior(fakepool.SegmentBehavior{ + Latency: 50 * time.Millisecond, + Err: errors.New("synthetic transient error"), + }) + + rg := buildEagerRange(parent, t, segCount, segSize) + ur := newReaderForTest(t, parent, fp, rg, maxPrefetch) + ur.Start() + + // Let the retry loop spin a few times. + time.Sleep(150 * time.Millisecond) + + beforeClose := fp.BodyPriorityCalls() + if beforeClose == 0 { + t.Fatalf("expected at least one BodyPriority call before close; got 0") + } + + if err := ur.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // Give any in-flight call up to 200ms to finish naturally, then snapshot. + time.Sleep(200 * time.Millisecond) + settled := fp.BodyPriorityCalls() + + // After settling, no further calls should be issued. + time.Sleep(200 * time.Millisecond) + after := fp.BodyPriorityCalls() + + if after != settled { + t.Errorf("BodyPriority calls increased after Close: settled=%d, after=%d", + settled, after) + } +} diff --git a/internal/usenet/usenet_reader_sequential_test.go b/internal/usenet/usenet_reader_sequential_test.go new file mode 100644 index 00000000..41526dfc --- /dev/null +++ b/internal/usenet/usenet_reader_sequential_test.go @@ -0,0 +1,65 @@ +package usenet + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" +) + +// TestSequentialRead_OneRequestPerSegment pins the simplest correctness +// contract for the streaming pipeline: reading a file end-to-end issues +// exactly one BodyPriority per segment, and the reassembled bytes match +// what the fake pool returned. +// +// This is the moral equivalent of "sequential ReadAt reuses the shared +// reader" at the MetadataVirtualFile layer: any code path that double- +// fetches a segment, or that creates two readers for overlapping regions +// of the same stream, will trip both the per-message-count assertion and +// the byte-equality assertion below. Pinning this here keeps the +// invariant testable without dragging the metadata layer into a unit +// test. +// +// Should pass on current code. +func TestSequentialRead_OneRequestPerSegment(t *testing.T) { + t.Parallel() + const ( + segCount = 8 + segSize = 128 + maxPrefetch = 4 + ) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + fp := fakepool.New() + for i := 0; i < segCount; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Bytes: segments.Payload(i, segSize), + }) + } + + rg := buildEagerRange(ctx, t, segCount, segSize) + ur := newReaderForTest(t, ctx, fp, rg, maxPrefetch) + ur.Start() + + got, err := io.ReadAll(ur) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + + want := segments.FileBytes(segCount, segSize) + if !bytes.Equal(got, want) { + t.Errorf("reassembled bytes do not match expected payload (len got=%d, want=%d)", + len(got), len(want)) + } + + for i := 0; i < segCount; i++ { + if c := fp.PerMessageCalls(segments.MessageID(i)); c != 1 { + t.Errorf("segment %d: %d BodyPriority calls, want exactly 1", i, c) + } + } +} diff --git a/internal/usenet/usenet_reader_storm_test.go b/internal/usenet/usenet_reader_storm_test.go new file mode 100644 index 00000000..aa40880e --- /dev/null +++ b/internal/usenet/usenet_reader_storm_test.go @@ -0,0 +1,181 @@ +package usenet + +import ( + "context" + "errors" + "io" + "math" + "sync" + "testing" + "time" + + "github.com/javi11/altmount/internal/testsupport/fakepool" + "github.com/javi11/altmount/internal/testsupport/segments" + "github.com/javi11/nntppool/v4" +) + +// usenet_reader_storm_test.go documents connection-storm conditions the +// production code currently exhibits. Each test reproduces the storm with +// concrete assertions, so: +// +// 1. CI runs them today and proves the storm exists. +// 2. When a fix lands, the assertion at the bottom of each test fails +// (because the bad behavior no longer happens). The fix author then +// INVERTS the assertion (changes "current bad" → "new invariant") in +// the same PR. The test then guards the fix. +// 3. The CURRENT-BEHAVIOR and TARGET-INVARIANT bands in each test +// constitute the contract: a future contributor sees both numbers +// side-by-side and knows what's allowed. +// +// See STREAMING_INVARIANTS.md (in this directory) for the high-level +// contract these tests pin. + +// TestStorm_RetryAmplifiesPerMessageCallCount pins the post-S1 +// invariant: a permanently failing (non-ArticleNotFound) segment must +// produce at most 2 wire calls — one initial attempt plus one bounded +// retry. With the previous retry.Attempts(5) policy each failing +// segment caused 5x wire amplification under a flaky provider. +func TestStorm_RetryAmplifiesPerMessageCallCount(t *testing.T) { + t.Parallel() + const segSize = 16 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fp := fakepool.New() + failingID := segments.MessageID(0) + fp.SetBehavior(failingID, fakepool.SegmentBehavior{ + Latency: 5 * time.Millisecond, + Err: errors.New("synthetic transient error"), + }) + + rg := buildEagerRange(ctx, t, 1, segSize) + ur := newReaderForTest(t, ctx, fp, rg, 1) + ur.Start() + _, _ = io.ReadAll(ur) + + calls := fp.PerMessageCalls(failingID) + + // PINNED INVARIANT: at most 2 wire calls per failing segment. + const maxCallsPerFailure = 2 + if calls > maxCallsPerFailure { + t.Errorf("INVARIANT regression (S1): PerMessageCalls=%d, want <= %d. "+ + "retry.Attempts must stay at 2 for non-ArticleNotFound errors.", + calls, maxCallsPerFailure) + } +} + +// TestStorm_RetryUsesFixedDelayInsteadOfExponentialBackoff pins the +// post-S3 invariant: inter-attempt delays carry jitter, so multiple +// readers retrying simultaneously desynchronize. With Attempts=2 each +// failing segment contributes exactly one inter-attempt delta; we run +// many failing segments (each in its own reader, independent random +// draws) and assert the coefficient of variation across those deltas +// is meaningfully non-zero. +func TestStorm_RetryUsesFixedDelayInsteadOfExponentialBackoff(t *testing.T) { + t.Parallel() + const ( + segSize = 16 + samples = 30 // independent failing segments + ) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var ( + mu sync.Mutex + arrivals = make(map[string][]time.Time, samples) + ) + fp := fakepool.New() + rec := &multiRecordingClient{Client: fp, arrivals: arrivals, mu: &mu} + + for i := 0; i < samples; i++ { + fp.SetBehavior(segments.MessageID(i), fakepool.SegmentBehavior{ + Latency: 1 * time.Millisecond, + Err: errors.New("synthetic transient error"), + }) + } + + var wg sync.WaitGroup + for i := 0; i < samples; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + // One segment per reader, distinct message-ID per reader, + // so each contributes one independent inter-attempt delta. + segs := []*segment{newSegment(segments.MessageID(idx), 0, int64(segSize-1), int64(segSize), nil)} + rg := &segmentRange{segments: segs, start: 0, end: int64(segSize - 1), ctx: ctx} + ur := newReaderForTestWithClient(t, ctx, rec, rg, 1) + ur.Start() + _, _ = io.ReadAll(ur) + }(i) + } + wg.Wait() + + mu.Lock() + defer mu.Unlock() + deltas := make([]float64, 0, samples) + for _, times := range arrivals { + if len(times) < 2 { + continue + } + // One delta per segment (Attempts=2 → exactly 2 timestamps). + deltas = append(deltas, float64(times[1].Sub(times[0])/time.Millisecond)) + } + if len(deltas) < samples/2 { + t.Fatalf("recorded only %d usable deltas across %d segments", len(deltas), samples) + } + + mean, stdev := meanStdev(deltas) + cv := 0.0 + if mean > 0 { + cv = stdev / mean + } + t.Logf("retry deltas across %d segments: mean=%.2fms stdev=%.2fms cv=%.3f", + len(deltas), mean, stdev, cv) + + // PINNED INVARIANT: cv > 0.1 means deltas vary meaningfully across + // independent retriers — the jitter is doing its job. The lower + // bound on the floor is loose to absorb scheduler noise on shared CI. + const minCV = 0.1 + if cv < minCV { + t.Errorf("INVARIANT regression (S3): retry-delay cv=%.3f, want > %.3f. "+ + "retry.MaxJitter and retry.RandomDelay must remain in the DelayType.", + cv, minCV) + } +} + +// --- helpers --- + +// multiRecordingClient wraps a *fakepool.Client and timestamps every +// BodyPriority call, bucketed by message-ID. Lets the jitter test +// observe inter-attempt timing per segment without extending the +// fakepool public API. +type multiRecordingClient struct { + *fakepool.Client + arrivals map[string][]time.Time + mu *sync.Mutex +} + +func (r *multiRecordingClient) BodyPriority(ctx context.Context, messageID string, onMeta ...func(nntppool.YEncMeta)) (*nntppool.ArticleBody, error) { + r.mu.Lock() + r.arrivals[messageID] = append(r.arrivals[messageID], time.Now()) + r.mu.Unlock() + return r.Client.BodyPriority(ctx, messageID, onMeta...) +} + +func meanStdev(xs []float64) (float64, float64) { + if len(xs) == 0 { + return 0, 0 + } + var sum float64 + for _, x := range xs { + sum += x + } + mean := sum / float64(len(xs)) + var sq float64 + for _, x := range xs { + d := x - mean + sq += d * d + } + stdev := math.Sqrt(sq / float64(len(xs))) + return mean, stdev +}