diff --git a/backend/Clients/Usenet/INntpClient.cs b/backend/Clients/Usenet/INntpClient.cs index ece2cdf9..e049c428 100644 --- a/backend/Clients/Usenet/INntpClient.cs +++ b/backend/Clients/Usenet/INntpClient.cs @@ -59,7 +59,7 @@ NzbFileStream GetFileStream( NzbFile nzbFile, long fileSize, int articleBufferSize); NzbFileStream GetFileStream( - string[] segmentIds, long fileSize, int articleBufferSize); + string[] segmentIds, long fileSize, int articleBufferSize, long? requestedEndByte = null); Task CheckAllSegmentsAsync( IEnumerable segmentIds, int concurrency, IProgress? progress, CancellationToken cancellationToken); diff --git a/backend/Clients/Usenet/NntpClient.cs b/backend/Clients/Usenet/NntpClient.cs index 356ea284..80a64b55 100644 --- a/backend/Clients/Usenet/NntpClient.cs +++ b/backend/Clients/Usenet/NntpClient.cs @@ -100,9 +100,10 @@ public virtual NzbFileStream GetFileStream(NzbFile nzbFile, long fileSize, int a return new NzbFileStream(nzbFile.GetSegmentIds(), fileSize, this, articleBufferSize); } - public virtual NzbFileStream GetFileStream(string[] segmentIds, long fileSize, int articleBufferSize) + public virtual NzbFileStream GetFileStream( + string[] segmentIds, long fileSize, int articleBufferSize, long? requestedEndByte = null) { - return new NzbFileStream(segmentIds, fileSize, this, articleBufferSize); + return new NzbFileStream(segmentIds, fileSize, this, articleBufferSize, requestedEndByte); } public virtual async Task CheckAllSegmentsAsync diff --git a/backend/Clients/Usenet/UsenetStreamingClient.cs b/backend/Clients/Usenet/UsenetStreamingClient.cs index dc9752f3..b0b62af6 100644 --- a/backend/Clients/Usenet/UsenetStreamingClient.cs +++ b/backend/Clients/Usenet/UsenetStreamingClient.cs @@ -39,10 +39,12 @@ WebsocketManager websocketManager { var providerConfig = configManager.GetUsenetProviderConfig(); var connectionPoolStats = new ConnectionPoolStats(providerConfig, websocketManager); + var idleTimeout = TimeSpan.FromSeconds(configManager.GetConnectionIdleTimeoutSeconds()); var providerClients = providerConfig.Providers .Select((provider, index) => CreateProviderClient( provider, - connectionPoolStats.GetOnConnectionPoolChanged(index) + connectionPoolStats.GetOnConnectionPoolChanged(index), + idleTimeout )) .ToList(); return new MultiProviderNntpClient(providerClients); @@ -51,13 +53,15 @@ WebsocketManager websocketManager private static MultiConnectionNntpClient CreateProviderClient ( UsenetProviderConfig.ConnectionDetails connectionDetails, - EventHandler onConnectionPoolChanged + EventHandler onConnectionPoolChanged, + TimeSpan idleTimeout ) { var connectionPool = CreateNewConnectionPool( maxConnections: connectionDetails.MaxConnections, connectionFactory: ct => CreateNewConnection(connectionDetails, ct), - onConnectionPoolChanged + onConnectionPoolChanged, + idleTimeout ); var circuitBreaker = new ProviderCircuitBreaker(connectionDetails.Host); return new MultiConnectionNntpClient(connectionPool, connectionDetails.Type, circuitBreaker); @@ -67,10 +71,11 @@ private static ConnectionPool CreateNewConnectionPool ( int maxConnections, Func> connectionFactory, - EventHandler onConnectionPoolChanged + EventHandler onConnectionPoolChanged, + TimeSpan idleTimeout ) { - var connectionPool = new ConnectionPool(maxConnections, connectionFactory); + var connectionPool = new ConnectionPool(maxConnections, connectionFactory, idleTimeout); connectionPool.OnConnectionPoolChanged += onConnectionPoolChanged; var args = new ConnectionPoolStats.ConnectionPoolChangedEventArgs(0, 0, maxConnections); onConnectionPoolChanged(connectionPool, args); diff --git a/backend/Config/ConfigManager.cs b/backend/Config/ConfigManager.cs index 7790da7b..f0292fa4 100644 --- a/backend/Config/ConfigManager.cs +++ b/backend/Config/ConfigManager.cs @@ -154,6 +154,14 @@ public SemaphorePriorityOdds GetStreamingPriority() return new SemaphorePriorityOdds() { HighPriorityOdds = numericalValue }; } + public int GetConnectionIdleTimeoutSeconds() + { + return int.Parse( + StringUtil.EmptyToNull(GetConfigValue("usenet.connection-idle-timeout-seconds")) + ?? "300" + ); + } + public bool IsEnforceReadonlyWebdavEnabled() { var defaultValue = true; diff --git a/backend/Streams/MultiSegmentStream.cs b/backend/Streams/MultiSegmentStream.cs index 1d7b56dc..62cf6cc6 100644 --- a/backend/Streams/MultiSegmentStream.cs +++ b/backend/Streams/MultiSegmentStream.cs @@ -1,4 +1,4 @@ -using System.Threading.Channels; +using System.Threading.Channels; using NzbWebDAV.Clients.Usenet; using NzbWebDAV.Clients.Usenet.Concurrency; using NzbWebDAV.Clients.Usenet.Contexts; @@ -13,6 +13,7 @@ public class MultiSegmentStream : FastReadOnlyNonSeekableStream private readonly INntpClient _usenetClient; private readonly Channel> _streamTasks; private readonly ContextualCancellationTokenSource _cts; + private readonly int? _endSegmentCount; private Stream? _stream; private bool _disposed; @@ -21,12 +22,13 @@ public static Stream Create Memory segmentIds, INntpClient usenetClient, int articleBufferSize, - CancellationToken cancellationToken + CancellationToken cancellationToken, + int? endSegmentCount = null ) { return articleBufferSize == 0 ? new UnbufferedMultiSegmentStream(segmentIds, usenetClient) - : new MultiSegmentStream(segmentIds, usenetClient, articleBufferSize, cancellationToken); + : new MultiSegmentStream(segmentIds, usenetClient, articleBufferSize, cancellationToken, endSegmentCount); } private MultiSegmentStream @@ -34,21 +36,26 @@ private MultiSegmentStream Memory segmentIds, INntpClient usenetClient, int articleBufferSize, - CancellationToken cancellationToken + CancellationToken cancellationToken, + int? endSegmentCount = null ) { _segmentIds = segmentIds; _usenetClient = usenetClient; _streamTasks = Channel.CreateBounded>(articleBufferSize); _cts = ContextualCancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _endSegmentCount = endSegmentCount; _ = DownloadSegments(_cts.Token); } private async Task DownloadSegments(CancellationToken cancellationToken) { + var effectiveCount = _endSegmentCount.HasValue + ? Math.Min(_segmentIds.Length, _endSegmentCount.Value) + : _segmentIds.Length; try { - for (var i = 0; i < _segmentIds.Length; i++) + for (var i = 0; i < effectiveCount; i++) { var segmentId = _segmentIds.Span[i]; @@ -131,4 +138,4 @@ protected override void Dispose(bool disposing) base.Dispose(); } -} \ No newline at end of file +} diff --git a/backend/Streams/NzbFileStream.cs b/backend/Streams/NzbFileStream.cs index 91f016ef..3414208f 100644 --- a/backend/Streams/NzbFileStream.cs +++ b/backend/Streams/NzbFileStream.cs @@ -1,7 +1,8 @@ -using NzbWebDAV.Clients.Usenet; +using NzbWebDAV.Clients.Usenet; using NzbWebDAV.Extensions; using NzbWebDAV.Models; using NzbWebDAV.Utils; +using Serilog; using UsenetSharp.Streams; namespace NzbWebDAV.Streams; @@ -10,9 +11,13 @@ public class NzbFileStream( string[] fileSegmentIds, long fileSize, INntpClient usenetClient, - int articleBufferSize + int articleBufferSize, + long? requestedEndByte = null ) : FastReadOnlyStream { + // Extra segments fetched past the requested end to cover seek imprecision. + private const int RangePrefetchOvershootSegments = 4; + private long _position; private bool _disposed; private Stream? _innerStream; @@ -80,7 +85,35 @@ await stream.DiscardBytesAsync(rangeStart - foundSegment.FoundByteRange.StartInc private Stream GetMultiSegmentStream(int firstSegmentIndex, CancellationToken cancellationToken) { var segmentIds = fileSegmentIds.AsMemory()[firstSegmentIndex..]; - return MultiSegmentStream.Create(segmentIds, usenetClient, articleBufferSize, cancellationToken); + var endSegmentCount = ComputeEndSegmentCount(firstSegmentIndex, segmentIds.Length); + if (endSegmentCount.HasValue) + { + Log.Debug( + "Range-bounded prefetch: requestedEndByte={EndByte}, firstSegmentIndex={First}, " + + "endSegmentCount={Count} (of {Remaining} remaining segments)", + requestedEndByte, firstSegmentIndex, endSegmentCount.Value, segmentIds.Length); + } + return MultiSegmentStream.Create( + segmentIds, usenetClient, articleBufferSize, cancellationToken, endSegmentCount); + } + + // Returns segment count covering requestedEndByte, or null when no cap is needed. + private int? ComputeEndSegmentCount(int firstSegmentIndex, int remainingSegmentCount) + { + if (!requestedEndByte.HasValue) return null; + if (fileSegmentIds.Length == 0 || remainingSegmentCount <= 0) return null; + + var endByte = Math.Clamp(requestedEndByte.Value, 0, fileSize - 1); + var avgSegmentSize = (double)fileSize / fileSegmentIds.Length; + if (avgSegmentSize <= 0) return null; + + var absoluteEndIndex = Math.Clamp( + (int)(endByte / avgSegmentSize), 0, fileSegmentIds.Length - 1); + var withOvershoot = absoluteEndIndex + RangePrefetchOvershootSegments; + var relativeCount = withOvershoot - firstSegmentIndex + 1; + if (relativeCount <= 0) return 0; + if (relativeCount >= remainingSegmentCount) return null; + return relativeCount; } protected override void Dispose(bool disposing) @@ -97,4 +130,4 @@ public override async ValueTask DisposeAsync() _disposed = true; GC.SuppressFinalize(this); } -} \ No newline at end of file +} diff --git a/backend/WebDav/Base/GetAndHeadHandlerPatch.cs b/backend/WebDav/Base/GetAndHeadHandlerPatch.cs index d847cc77..0560798d 100644 --- a/backend/WebDav/Base/GetAndHeadHandlerPatch.cs +++ b/backend/WebDav/Base/GetAndHeadHandlerPatch.cs @@ -48,6 +48,12 @@ public async Task HandleRequestAsync(HttpContext httpContext) // Determine the requested range var range = request.GetRange(); + // Forward closed-range end byte to downstream prefetch capping. + if (range?.End is long requestedRangeEnd) + { + httpContext.Items["RequestedRangeEnd"] = requestedRangeEnd; + } + // Obtain the WebDAV collection var entry = await _store.GetItemAsync(request.GetUri(), httpContext.RequestAborted).ConfigureAwait(false); if (entry == null) diff --git a/backend/WebDav/DatabaseStoreNzbFile.cs b/backend/WebDav/DatabaseStoreNzbFile.cs index 6f97bf78..d7a3a531 100644 --- a/backend/WebDav/DatabaseStoreNzbFile.cs +++ b/backend/WebDav/DatabaseStoreNzbFile.cs @@ -36,6 +36,9 @@ protected override async Task GetStreamAsync(CancellationToken cancellat private NzbFileStream GetStream(DavNzbFile nzbFile) { - return usenetClient.GetFileStream(nzbFile.SegmentIds, FileSize, configManager.GetArticleBufferSize()); + // Closed-range end byte from GetAndHeadHandlerPatch, used to cap prefetch. + var requestedEndByte = httpContext.Items["RequestedRangeEnd"] as long?; + return usenetClient.GetFileStream( + nzbFile.SegmentIds, FileSize, configManager.GetArticleBufferSize(), requestedEndByte); } } \ No newline at end of file