Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 51 additions & 35 deletions internal/api/provider_speedtest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package api

import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -53,45 +51,16 @@ func (s *Server) handleTestProviderSpeed(c *fiber.Ctx) error {
return RespondNotFound(c, "Provider", "")
}

host := fmt.Sprintf("%s:%d", targetProvider.Host, targetProvider.Port)
var tlsCfg *tls.Config
if targetProvider.TLS {
tlsCfg = &tls.Config{
InsecureSkipVerify: targetProvider.InsecureTLS,
ServerName: targetProvider.Host,
}
}

pool, err := nntppool.NewClient(c.Context(), []nntppool.Provider{
{
Host: host,
TLSConfig: tlsCfg,
Auth: nntppool.Auth{Username: targetProvider.Username, Password: targetProvider.Password},
Connections: targetProvider.MaxConnections,
IdleTimeout: 60 * time.Second,
},
})
if err != nil {
return RespondInternalError(c, "Failed to create connection pool", err.Error())
}
defer pool.Close()

// Resolve provider name matching nntppool's resolveProviderName logic
providerName := host
if targetProvider.Username != "" {
providerName = host + "+" + targetProvider.Username
}

testCtx, cancel := context.WithTimeout(c.Context(), 5*time.Minute)
defer cancel()

result, err := pool.SpeedTest(testCtx, nntppool.SpeedTestOptions{
ProviderName: providerName,
})
// Prefer the production pool when this provider is already part of
// it; otherwise fall back to the singleton coordinator so we never
// create a fresh nntppool.Client per request.
result, err := s.runProviderSpeedTest(testCtx, targetProvider)
if err != nil {
return RespondInternalError(c, "Speed test failed", err.Error())
}

speed := result.WireSpeedBps / 1024 / 1024 // bytes/sec → MB/s

// Update provider config with speed test result
Expand Down Expand Up @@ -126,3 +95,50 @@ func (s *Server) handleTestProviderSpeed(c *fiber.Ctx) error {
Duration: result.Elapsed.Seconds(),
})
}

// runProviderSpeedTest executes the speed test against the given
// provider, preferring the production pool when the provider is part
// of it. Falls back to the per-request speedtest coordinator (cached
// nntppool client + singleflight dedupe) when the provider isn't in
// the active pool.
func (s *Server) runProviderSpeedTest(ctx context.Context, p *config.ProviderConfig) (*nntppool.SpeedTestResult, error) {
// Always consult the pool manager so a provider already in the
// running pool reuses its connections rather than dialing fresh.
// pool.Manager is required wiring; in tests it may return nil/err.
if s.poolManager != nil {
if cp, err := s.poolManager.GetPool(); err == nil && cp != nil {
if real, ok := cp.(*nntppool.Client); ok {
providerName := p.Host
if p.Username != "" {
providerName = p.Host + "+" + p.Username
}
// Try the production pool first. If the provider isn't
// in it, nntppool returns an error and we fall through
// to the coordinator.
if result, sterr := real.SpeedTest(ctx, nntppool.SpeedTestOptions{
ProviderName: providerName,
}); sterr == nil {
return result, nil
}
}
}
}

// Fall back to the dedicated speed-test client (cached + dedupe).
// Lazy-construct the coordinator if Server was assembled outside
// NewServer (e.g. in tests). speedtestOnce makes this safe under
// concurrent calls.
s.speedtestOnce.Do(func() {
if s.speedtest == nil {
s.speedtest = newSpeedtestCoordinator()
}
})
v, err := s.speedtest.run(ctx, p, func(client *nntppool.Client, providerName string) (any, error) {
return client.SpeedTest(ctx, nntppool.SpeedTestOptions{ProviderName: providerName})
})
if err != nil {
return nil, err
}
result, _ := v.(*nntppool.SpeedTestResult)
return result, nil
}
134 changes: 134 additions & 0 deletions internal/api/provider_speedtest_storm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package api

import (
"context"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/gofiber/fiber/v2"
"github.com/javi11/altmount/internal/config"
"github.com/javi11/altmount/internal/pool"
"github.com/javi11/nntppool/v4"
)

// provider_speedtest_storm_test.go reproduces the speed-test handler
// storm: every invocation calls nntppool.NewClient directly, bypassing
// pool.Manager entirely. The storm shape is structural — the handler
// doesn't even reach through the seam tests can intercept.

// countingPoolManager is a pool.Manager that records every call to its
// connection-acquiring methods. The S8 test asserts these counters
// remain zero despite N HTTP requests, proving the handler bypasses
// pool.Manager.
type countingPoolManager struct {
getPoolCalls atomic.Int64
hasPoolCalls atomic.Int64
}

var _ pool.Manager = (*countingPoolManager)(nil)

func (m *countingPoolManager) GetPool() (pool.NntpClient, error) {
m.getPoolCalls.Add(1)
return nil, nil
}
func (m *countingPoolManager) HasPool() bool { m.hasPoolCalls.Add(1); return false }
func (m *countingPoolManager) SetProviders(_ []nntppool.Provider) error { return nil }
func (m *countingPoolManager) ClearPool() error { return nil }
func (m *countingPoolManager) GetMetrics() (pool.MetricsSnapshot, error) { return pool.MetricsSnapshot{}, nil }
func (m *countingPoolManager) ResetMetrics(_ context.Context, _, _ bool) error {
return nil
}
func (m *countingPoolManager) ResetProviderErrors(_ context.Context) error { return nil }
func (m *countingPoolManager) IncArticlesDownloaded() {}
func (m *countingPoolManager) UpdateDownloadProgress(_ string, _ int64) {}
func (m *countingPoolManager) IncArticlesPosted() {}
func (m *countingPoolManager) AddProvider(_ nntppool.Provider) error { return nil }
func (m *countingPoolManager) RemoveProvider(_ string) error { return nil }
func (m *countingPoolManager) ResetProviderQuota(_ context.Context, _ string) error {
return nil
}
func (m *countingPoolManager) SetProviderIDs(_ map[string]string) {}
func (m *countingPoolManager) AcquireImportSlot(_ context.Context) (func(), error) {
return func() {}, nil
}
func (m *countingPoolManager) SetAdmissionCaps(_ int, _ int) {}
func (m *countingPoolManager) SetStreamSource(_ pool.StreamActivitySource) {}
func (m *countingPoolManager) NotifyStreamChange() {}

// TestStorm_SpeedTestBypassesPoolManager pins the post-S8 invariant:
// the /providers/:id/speedtest handler routes through pool.Manager
// (and the singleton speedtestCoordinator for non-pool providers)
// rather than creating a fresh nntppool.Client per request. The test
// asserts the structural property: poolManager.GetPool MUST be called
// at least once per request (the prerequisite for reusing the
// production pool), so the application has a chance to dedupe and
// gate concurrent traffic.
//
// Driving the handler against an unreachable host (127.0.0.1 port 1)
// makes the underlying speed test fail fast; the structural assertion
// runs regardless of the outcome.
func TestStorm_SpeedTestBypassesPoolManager(t *testing.T) {
t.Parallel()
const concurrentRequests = 5

enabled := true
cfg := config.DefaultConfig()
cfg.Providers = []config.ProviderConfig{
{
ID: "storm-test-provider",
Host: "127.0.0.1",
Port: 1, // closed port: nntppool's initial ping fails immediately
Username: "user",
Password: "pass",
MaxConnections: 1, // keep resource use minimal
Enabled: &enabled,
},
}

cm := &mockConfigManager{cfg: cfg}
cpm := &countingPoolManager{}
server := &Server{configManager: cm, poolManager: cpm}

app := fiber.New()
app.Post("/api/config/providers/:id/speedtest", server.handleTestProviderSpeed)

// Fire N concurrent requests. Each will fail (closed port) but the
// structural bypass is what we're measuring, not the outcome.
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < concurrentRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
req := httptest.NewRequest("POST", "/api/config/providers/storm-test-provider/speedtest", nil)
// Tight timeout — we don't care about the response, only the
// fact that the handler took the bypass path.
resp, err := app.Test(req, 3000)
if err == nil && resp.Body != nil {
_ = resp.Body.Close()
}
}()
}
wg.Wait()
elapsed := time.Since(start)

getPoolCalls := cpm.getPoolCalls.Load()
t.Logf("%d concurrent speedtest requests completed in %s; "+
"poolManager.GetPool calls=%d (invariant: > 0 per request)",
concurrentRequests, elapsed, getPoolCalls)

// PINNED INVARIANT: handler MUST consult pool.Manager (so it can
// reuse the production pool for already-running providers and the
// singleton speedtestCoordinator for everything else). Without
// this, every request creates a fresh nntppool.Client and dials
// independently.
if getPoolCalls < 1 {
t.Errorf("INVARIANT regression (S8): poolManager.GetPool calls=%d, want >= 1 "+
"after %d concurrent speedtest requests. handleTestProviderSpeed must "+
"route through s.poolManager rather than calling nntppool.NewClient inline.",
getPoolCalls, concurrentRequests)
}
}
8 changes: 8 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -67,6 +68,9 @@ type Server struct {
migrationRepo *database.ImportMigrationRepository
updater updater.Updater
ready atomic.Bool

speedtest *speedtestCoordinator
speedtestOnce sync.Once
}

// NewServer creates a new API server that can optionally register routes on the provided mux (for backwards compatibility)
Expand Down Expand Up @@ -110,6 +114,7 @@ func NewServer(
progressBroadcaster: progressBroadcaster,
streamTracker: streamTracker,
cacheSource: cacheSource,
speedtest: newSpeedtestCoordinator(),
fuseManager: NewFuseManager(newMountFactory(nzbFilesystem, configManager, streamTracker)),
updater: updater.Default(),
}
Expand Down Expand Up @@ -397,6 +402,9 @@ func (s *Server) Shutdown(ctx context.Context) {
if s.fuseManager != nil {
s.fuseManager.Stop()
}
if s.speedtest != nil {
s.speedtest.shutdown()
}
}

// handleGetActiveStreams handles GET /api/files/active-streams
Expand Down
Loading
Loading