diff --git a/source/PlainBytes.System.Extensions/Collections/Iterators.cs b/source/PlainBytes.System.Extensions/Collections/Iterators.cs index 87caf08..924922b 100644 --- a/source/PlainBytes.System.Extensions/Collections/Iterators.cs +++ b/source/PlainBytes.System.Extensions/Collections/Iterators.cs @@ -2,6 +2,8 @@ using System.Collections; using System.Collections.Generic; using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; namespace PlainBytes.System.Extensions.Collections { @@ -120,5 +122,71 @@ public static IEnumerable Append(this IEnumerable collection, IEnumerab yield return item; } } + + /// + /// Asynchronously enumerates the elements of the source sequence and invokes the specified asynchronous action + /// for each element. + /// + /// The type of the elements in the source sequence. + /// The asynchronous sequence whose elements are to be processed. + /// An asynchronous delegate to invoke for each element in the source sequence. + /// A cancellation token that can be used to cancel the operation. + /// A task that represents the asynchronous operation. The task completes when all elements have been processed + /// or the operation is canceled. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static async Task ForEachAsync(this IAsyncEnumerable source, Func action, CancellationToken token = default) + { + await foreach (var item in source.WithCancellation(token)) + { + token.ThrowIfCancellationRequested(); + + await action(item, token).ConfigureAwait(false); + } + } + + + /// + /// Iterates through the asynchronous sequence while processing the elements. + /// + /// The source asynchronous sequence to process. + /// Transformer that processes the iterated items. + /// A cancellation token that can be used to cancel the asynchronous iteration. + /// The type of the elements in the source sequence. + /// Type of result item. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static async IAsyncEnumerable SelectAsync(this IAsyncEnumerable source, Func selector, [EnumeratorCancellation] CancellationToken token = default) + { + await foreach (var item in source.WithCancellation(token)) + { + token.ThrowIfCancellationRequested(); + + yield return selector(item); + } + } + + /// + /// Filters the elements of an asynchronous sequence based on a specified predicate. + /// + /// The type of the elements in the source sequence. + /// The source asynchronous sequence to filter. + /// A function to test each element for a condition. The element is included in the result if the function + /// returns . + /// A cancellation token that can be used to cancel the asynchronous iteration. + /// An asynchronous sequence that contains elements from the source sequence that satisfy the condition + /// specified by the predicate. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static async IAsyncEnumerable WhereAsync(this IAsyncEnumerable source, Func predicate, [EnumeratorCancellation] CancellationToken token = default) + { + await foreach (var item in source.WithCancellation(token)) + { + token.ThrowIfCancellationRequested(); + + if (predicate(item)) + { + yield return item; + } + } + } } } diff --git a/tests/PlainBytes.System.Extensions.Tests/Collections/AsyncIteratorsTests.cs b/tests/PlainBytes.System.Extensions.Tests/Collections/AsyncIteratorsTests.cs new file mode 100644 index 0000000..a7ccd72 --- /dev/null +++ b/tests/PlainBytes.System.Extensions.Tests/Collections/AsyncIteratorsTests.cs @@ -0,0 +1,135 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using PlainBytes.System.Extensions.Collections; +using Xunit; + +namespace PlainBytes.System.Extensions.Tests.Collections +{ + public class AsyncIteratorsTests + { + private static async IAsyncEnumerable ToAsync(IEnumerable source) + { + foreach (var item in source) + { + yield return item; + await Task.Yield(); // ensure asynchronous execution path + } + } + + [Fact] + public async Task ForEachAsync_GivenNullSource_ShouldThrow() + { + // Arrange + IAsyncEnumerable source = null; + + // Act / Assert + await Assert.ThrowsAsync(async () => await source.ForEachAsync((i, ct) => ValueTask.CompletedTask)); + } + + [Fact] + public async Task ForEachAsync_GivenSource_ActionIsExecutedForEachElement() + { + // Arrange + var source = Enumerable.Range(0, 10).ToArray(); + var results = new List(); + + // Act + await ToAsync(source).ForEachAsync((item, ct) => + { + results.Add(item); + return ValueTask.CompletedTask; + }); + + // Assert + Assert.Equal(source, results); + } + + [Fact] + public async Task ForEachAsync_GivenCancellation_ShouldThrowOperationCanceled() + { + // Arrange + var cts = new CancellationTokenSource(); + var source = Enumerable.Range(0, 20); + + ValueTask Action(int item, CancellationToken token) + { + if (item == 3) + { + cts.Cancel(); + } + return ValueTask.CompletedTask; + } + + // Act / Assert + await Assert.ThrowsAsync(async () => await ToAsync(source).ForEachAsync(Action, cts.Token)); + } + + [Fact] + public async Task SelectAsync_GivenNullSource_ShouldThrow() + { + // Arrange + IAsyncEnumerable source = null; + + // Act / Assert + await Assert.ThrowsAsync(async () => + { + await foreach (var _ in source.SelectAsync(i => i)) { } + }); + } + + [Fact] + public async Task SelectAsync_GivenSelector_ProjectsAllElements() + { + // Arrange + var source = Enumerable.Range(0, 15).ToArray(); + var results = new List(); + + // Act + await foreach (var item in ToAsync(source).SelectAsync(i => $"#{i}")) + { + results.Add(item); + } + + // Assert + Assert.Equal(source.Length, results.Count); + for (int i = 0; i < source.Length; i++) + { + Assert.Equal($"#{source[i]}", results[i]); + } + } + + [Fact] + public async Task WhereAsync_GivenNullSource_ShouldThrow() + { + // Arrange + IAsyncEnumerable source = null; + + // Act / Assert + await Assert.ThrowsAsync(async () => + { + await foreach (var _ in source.WhereAsync(i => true)) { } + }); + } + + [Fact] + public async Task WhereAsync_GivenPredicate_FiltersElements() + { + // Arrange + var source = Enumerable.Range(0, 30).ToArray(); + var result = new List(); + + // Act + await foreach (var item in ToAsync(source).WhereAsync(i => i % 2 == 0)) + { + result.Add(item); + } + + // Assert + Assert.All(result, x => Assert.True(x % 2 == 0)); + Assert.Equal(source.Where(i => i % 2 == 0), result); + } + } +}