Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions source/PlainBytes.System.Extensions/Collections/Iterators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace PlainBytes.System.Extensions.Collections
{
Expand Down Expand Up @@ -120,5 +122,71 @@ public static IEnumerable<T> Append<T>(this IEnumerable<T> collection, IEnumerab
yield return item;
}
}

/// <summary>
/// Asynchronously enumerates the elements of the source sequence and invokes the specified asynchronous action
/// for each element.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The asynchronous sequence whose elements are to be processed.</param>
/// <param name="action">An asynchronous delegate to invoke for each element in the source sequence.</param>
/// <param name="token">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation. The task completes when all elements have been processed
/// or the operation is canceled.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask> action, CancellationToken token = default)
{
await foreach (var item in source.WithCancellation(token))
{
token.ThrowIfCancellationRequested();

await action(item, token).ConfigureAwait(false);
}
}


/// <summary>
/// Iterates through the asynchronous sequence while processing the elements.
/// </summary>
/// <param name="source">The source asynchronous sequence to process.</param>
/// <param name="selector">Transformer that processes the iterated items.</param>
/// <param name="token">A cancellation token that can be used to cancel the asynchronous iteration.</param>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TR">Type of result item.</typeparam>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async IAsyncEnumerable<TR> SelectAsync<T, TR>(this IAsyncEnumerable<T> source, Func<T, TR> selector, [EnumeratorCancellation] CancellationToken token = default)
{
await foreach (var item in source.WithCancellation(token))
{
token.ThrowIfCancellationRequested();

yield return selector(item);
}
}

/// <summary>
/// Filters the elements of an asynchronous sequence based on a specified predicate.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The source asynchronous sequence to filter.</param>
/// <param name="predicate">A function to test each element for a condition. The element is included in the result if the function
/// returns <see langword="true"/>.</param>
/// <param name="token">A cancellation token that can be used to cancel the asynchronous iteration.</param>
/// <returns>An asynchronous sequence that contains elements from the source sequence that satisfy the condition
/// specified by the predicate.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async IAsyncEnumerable<T> WhereAsync<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate, [EnumeratorCancellation] CancellationToken token = default)
{
await foreach (var item in source.WithCancellation(token))
{
token.ThrowIfCancellationRequested();

if (predicate(item))
{
yield return item;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using PlainBytes.System.Extensions.Collections;
using Xunit;

namespace PlainBytes.System.Extensions.Tests.Collections
{
public class AsyncIteratorsTests
{
private static async IAsyncEnumerable<T> ToAsync<T>(IEnumerable<T> source)
{
foreach (var item in source)
{
yield return item;
await Task.Yield(); // ensure asynchronous execution path
}
}

[Fact]
public async Task ForEachAsync_GivenNullSource_ShouldThrow()
{
// Arrange
IAsyncEnumerable<int> source = null;

// Act / Assert
await Assert.ThrowsAsync<NullReferenceException>(async () => await source.ForEachAsync((i, ct) => ValueTask.CompletedTask));
}

[Fact]
public async Task ForEachAsync_GivenSource_ActionIsExecutedForEachElement()
{
// Arrange
var source = Enumerable.Range(0, 10).ToArray();
var results = new List<int>();

// Act
await ToAsync(source).ForEachAsync((item, ct) =>
{
results.Add(item);
return ValueTask.CompletedTask;
});

// Assert
Assert.Equal(source, results);
}

[Fact]
public async Task ForEachAsync_GivenCancellation_ShouldThrowOperationCanceled()
{
// Arrange
var cts = new CancellationTokenSource();
var source = Enumerable.Range(0, 20);

ValueTask Action(int item, CancellationToken token)
{
if (item == 3)
{
cts.Cancel();
}
return ValueTask.CompletedTask;
}

// Act / Assert
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ToAsync(source).ForEachAsync(Action, cts.Token));
}

[Fact]
public async Task SelectAsync_GivenNullSource_ShouldThrow()
{
// Arrange
IAsyncEnumerable<int> source = null;

// Act / Assert
await Assert.ThrowsAsync<NullReferenceException>(async () =>
{
await foreach (var _ in source.SelectAsync(i => i)) { }
});
}

[Fact]
public async Task SelectAsync_GivenSelector_ProjectsAllElements()
{
// Arrange
var source = Enumerable.Range(0, 15).ToArray();
var results = new List<string>();

// Act
await foreach (var item in ToAsync(source).SelectAsync(i => $"#{i}"))
{
results.Add(item);
}

// Assert
Assert.Equal(source.Length, results.Count);
for (int i = 0; i < source.Length; i++)
{
Assert.Equal($"#{source[i]}", results[i]);
}
}

[Fact]
public async Task WhereAsync_GivenNullSource_ShouldThrow()
{
// Arrange
IAsyncEnumerable<int> source = null;

// Act / Assert
await Assert.ThrowsAsync<NullReferenceException>(async () =>
{
await foreach (var _ in source.WhereAsync(i => true)) { }
});
}

[Fact]
public async Task WhereAsync_GivenPredicate_FiltersElements()
{
// Arrange
var source = Enumerable.Range(0, 30).ToArray();
var result = new List<int>();

// Act
await foreach (var item in ToAsync(source).WhereAsync(i => i % 2 == 0))
{
result.Add(item);
}

// Assert
Assert.All(result, x => Assert.True(x % 2 == 0));
Assert.Equal(source.Where(i => i % 2 == 0), result);
}
}
}