Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Clients/Usenet/INntpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ NzbFileStream GetFileStream(
NzbFile nzbFile, long fileSize, int articleBufferSize);

NzbFileStream GetFileStream(
string[] segmentIds, long fileSize, int articleBufferSize);
string[] segmentIds, long fileSize, int articleBufferSize, long? requestedEndByte = null);

Task CheckAllSegmentsAsync(
IEnumerable<string> segmentIds, int concurrency, IProgress<int>? progress, CancellationToken cancellationToken);
Expand Down
5 changes: 3 additions & 2 deletions backend/Clients/Usenet/NntpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ public virtual NzbFileStream GetFileStream(NzbFile nzbFile, long fileSize, int a
return new NzbFileStream(nzbFile.GetSegmentIds(), fileSize, this, articleBufferSize);
}

public virtual NzbFileStream GetFileStream(string[] segmentIds, long fileSize, int articleBufferSize)
public virtual NzbFileStream GetFileStream(
string[] segmentIds, long fileSize, int articleBufferSize, long? requestedEndByte = null)
{
return new NzbFileStream(segmentIds, fileSize, this, articleBufferSize);
return new NzbFileStream(segmentIds, fileSize, this, articleBufferSize, requestedEndByte);
}

public virtual async Task CheckAllSegmentsAsync
Expand Down
15 changes: 10 additions & 5 deletions backend/Clients/Usenet/UsenetStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ WebsocketManager websocketManager
{
var providerConfig = configManager.GetUsenetProviderConfig();
var connectionPoolStats = new ConnectionPoolStats(providerConfig, websocketManager);
var idleTimeout = TimeSpan.FromSeconds(configManager.GetConnectionIdleTimeoutSeconds());
var providerClients = providerConfig.Providers
.Select((provider, index) => CreateProviderClient(
provider,
connectionPoolStats.GetOnConnectionPoolChanged(index)
connectionPoolStats.GetOnConnectionPoolChanged(index),
idleTimeout
))
.ToList();
return new MultiProviderNntpClient(providerClients);
Expand All @@ -51,13 +53,15 @@ WebsocketManager websocketManager
private static MultiConnectionNntpClient CreateProviderClient
(
UsenetProviderConfig.ConnectionDetails connectionDetails,
EventHandler<ConnectionPoolStats.ConnectionPoolChangedEventArgs> onConnectionPoolChanged
EventHandler<ConnectionPoolStats.ConnectionPoolChangedEventArgs> onConnectionPoolChanged,
TimeSpan idleTimeout
)
{
var connectionPool = CreateNewConnectionPool(
maxConnections: connectionDetails.MaxConnections,
connectionFactory: ct => CreateNewConnection(connectionDetails, ct),
onConnectionPoolChanged
onConnectionPoolChanged,
idleTimeout
);
var circuitBreaker = new ProviderCircuitBreaker(connectionDetails.Host);
return new MultiConnectionNntpClient(connectionPool, connectionDetails.Type, circuitBreaker);
Expand All @@ -67,10 +71,11 @@ private static ConnectionPool<INntpClient> CreateNewConnectionPool
(
int maxConnections,
Func<CancellationToken, ValueTask<INntpClient>> connectionFactory,
EventHandler<ConnectionPoolStats.ConnectionPoolChangedEventArgs> onConnectionPoolChanged
EventHandler<ConnectionPoolStats.ConnectionPoolChangedEventArgs> onConnectionPoolChanged,
TimeSpan idleTimeout
)
{
var connectionPool = new ConnectionPool<INntpClient>(maxConnections, connectionFactory);
var connectionPool = new ConnectionPool<INntpClient>(maxConnections, connectionFactory, idleTimeout);
connectionPool.OnConnectionPoolChanged += onConnectionPoolChanged;
var args = new ConnectionPoolStats.ConnectionPoolChangedEventArgs(0, 0, maxConnections);
onConnectionPoolChanged(connectionPool, args);
Expand Down
8 changes: 8 additions & 0 deletions backend/Config/ConfigManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public SemaphorePriorityOdds GetStreamingPriority()
return new SemaphorePriorityOdds() { HighPriorityOdds = numericalValue };
}

public int GetConnectionIdleTimeoutSeconds()
{
return int.Parse(
StringUtil.EmptyToNull(GetConfigValue("usenet.connection-idle-timeout-seconds"))
?? "300"
);
}

public bool IsEnforceReadonlyWebdavEnabled()
{
var defaultValue = true;
Expand Down
19 changes: 13 additions & 6 deletions backend/Streams/MultiSegmentStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Channels;
using System.Threading.Channels;
using NzbWebDAV.Clients.Usenet;
using NzbWebDAV.Clients.Usenet.Concurrency;
using NzbWebDAV.Clients.Usenet.Contexts;
Expand All @@ -13,6 +13,7 @@ public class MultiSegmentStream : FastReadOnlyNonSeekableStream
private readonly INntpClient _usenetClient;
private readonly Channel<Task<Stream>> _streamTasks;
private readonly ContextualCancellationTokenSource _cts;
private readonly int? _endSegmentCount;
private Stream? _stream;
private bool _disposed;

Expand All @@ -21,34 +22,40 @@ public static Stream Create
Memory<string> segmentIds,
INntpClient usenetClient,
int articleBufferSize,
CancellationToken cancellationToken
CancellationToken cancellationToken,
int? endSegmentCount = null
)
{
return articleBufferSize == 0
? new UnbufferedMultiSegmentStream(segmentIds, usenetClient)
: new MultiSegmentStream(segmentIds, usenetClient, articleBufferSize, cancellationToken);
: new MultiSegmentStream(segmentIds, usenetClient, articleBufferSize, cancellationToken, endSegmentCount);
}

private MultiSegmentStream
(
Memory<string> segmentIds,
INntpClient usenetClient,
int articleBufferSize,
CancellationToken cancellationToken
CancellationToken cancellationToken,
int? endSegmentCount = null
)
{
_segmentIds = segmentIds;
_usenetClient = usenetClient;
_streamTasks = Channel.CreateBounded<Task<Stream>>(articleBufferSize);
_cts = ContextualCancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_endSegmentCount = endSegmentCount;
_ = DownloadSegments(_cts.Token);
}

private async Task DownloadSegments(CancellationToken cancellationToken)
{
var effectiveCount = _endSegmentCount.HasValue
? Math.Min(_segmentIds.Length, _endSegmentCount.Value)
: _segmentIds.Length;
try
{
for (var i = 0; i < _segmentIds.Length; i++)
for (var i = 0; i < effectiveCount; i++)
{
var segmentId = _segmentIds.Span[i];

Expand Down Expand Up @@ -131,4 +138,4 @@ protected override void Dispose(bool disposing)

base.Dispose();
}
}
}
41 changes: 37 additions & 4 deletions backend/Streams/NzbFileStream.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using NzbWebDAV.Clients.Usenet;
using NzbWebDAV.Clients.Usenet;
using NzbWebDAV.Extensions;
using NzbWebDAV.Models;
using NzbWebDAV.Utils;
using Serilog;
using UsenetSharp.Streams;

namespace NzbWebDAV.Streams;
Expand All @@ -10,9 +11,13 @@ public class NzbFileStream(
string[] fileSegmentIds,
long fileSize,
INntpClient usenetClient,
int articleBufferSize
int articleBufferSize,
long? requestedEndByte = null
) : FastReadOnlyStream
{
// Extra segments fetched past the requested end to cover seek imprecision.
private const int RangePrefetchOvershootSegments = 4;

private long _position;
private bool _disposed;
private Stream? _innerStream;
Expand Down Expand Up @@ -80,7 +85,35 @@ await stream.DiscardBytesAsync(rangeStart - foundSegment.FoundByteRange.StartInc
private Stream GetMultiSegmentStream(int firstSegmentIndex, CancellationToken cancellationToken)
{
var segmentIds = fileSegmentIds.AsMemory()[firstSegmentIndex..];
return MultiSegmentStream.Create(segmentIds, usenetClient, articleBufferSize, cancellationToken);
var endSegmentCount = ComputeEndSegmentCount(firstSegmentIndex, segmentIds.Length);
if (endSegmentCount.HasValue)
{
Log.Debug(
"Range-bounded prefetch: requestedEndByte={EndByte}, firstSegmentIndex={First}, "
+ "endSegmentCount={Count} (of {Remaining} remaining segments)",
requestedEndByte, firstSegmentIndex, endSegmentCount.Value, segmentIds.Length);
}
return MultiSegmentStream.Create(
segmentIds, usenetClient, articleBufferSize, cancellationToken, endSegmentCount);
}

// Returns segment count covering requestedEndByte, or null when no cap is needed.
private int? ComputeEndSegmentCount(int firstSegmentIndex, int remainingSegmentCount)
{
if (!requestedEndByte.HasValue) return null;
if (fileSegmentIds.Length == 0 || remainingSegmentCount <= 0) return null;

var endByte = Math.Clamp(requestedEndByte.Value, 0, fileSize - 1);
var avgSegmentSize = (double)fileSize / fileSegmentIds.Length;
if (avgSegmentSize <= 0) return null;

var absoluteEndIndex = Math.Clamp(
(int)(endByte / avgSegmentSize), 0, fileSegmentIds.Length - 1);
var withOvershoot = absoluteEndIndex + RangePrefetchOvershootSegments;
var relativeCount = withOvershoot - firstSegmentIndex + 1;
if (relativeCount <= 0) return 0;
if (relativeCount >= remainingSegmentCount) return null;
return relativeCount;
}

protected override void Dispose(bool disposing)
Expand All @@ -97,4 +130,4 @@ public override async ValueTask DisposeAsync()
_disposed = true;
GC.SuppressFinalize(this);
}
}
}
6 changes: 6 additions & 0 deletions backend/WebDav/Base/GetAndHeadHandlerPatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public async Task<bool> HandleRequestAsync(HttpContext httpContext)
// Determine the requested range
var range = request.GetRange();

// Forward closed-range end byte to downstream prefetch capping.
if (range?.End is long requestedRangeEnd)
{
httpContext.Items["RequestedRangeEnd"] = requestedRangeEnd;
}

// Obtain the WebDAV collection
var entry = await _store.GetItemAsync(request.GetUri(), httpContext.RequestAborted).ConfigureAwait(false);
if (entry == null)
Expand Down
5 changes: 4 additions & 1 deletion backend/WebDav/DatabaseStoreNzbFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ protected override async Task<Stream> GetStreamAsync(CancellationToken cancellat

private NzbFileStream GetStream(DavNzbFile nzbFile)
{
return usenetClient.GetFileStream(nzbFile.SegmentIds, FileSize, configManager.GetArticleBufferSize());
// Closed-range end byte from GetAndHeadHandlerPatch, used to cap prefetch.
var requestedEndByte = httpContext.Items["RequestedRangeEnd"] as long?;
return usenetClient.GetFileStream(
nzbFile.SegmentIds, FileSize, configManager.GetArticleBufferSize(), requestedEndByte);
}
}