-
Notifications
You must be signed in to change notification settings - Fork 31
Have MergedLinesEnumerable implement IAsyncEnumerable<string> #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-1.7
Are you sure you want to change the base?
Changes from all commits
3fddb89
15815ce
ca60f2a
da45635
e45f1f6
87316df
7976a96
d3720b9
990605e
0ab96b0
fdd9137
750b1e0
caa9428
3b99204
6a2baae
983a28f
7104cea
f6a555a
9030874
b2b7bbe
f9f0813
8d18b9d
5151a25
97198a3
1420db9
4c68836
263d7dd
16b38df
7830e3e
ff9af41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Medallion.Shell.Tests.Streams; | ||
|
|
||
| public class AsyncEnumerableAdapter : IAsyncEnumerable<string> | ||
| { | ||
| private readonly IEnumerable<string> strings; | ||
|
|
||
| public AsyncEnumerableAdapter(IEnumerable<string> strings) | ||
| { | ||
| this.strings = strings; | ||
| } | ||
|
|
||
| public IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default) => | ||
| // this does not allow consuming the same IEnumerable twice | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @madelson
For posterity, I added this comment. |
||
| new AsyncEnumeratorAdapter(this.strings.GetEnumerator()); | ||
|
|
||
| private class AsyncEnumeratorAdapter : IAsyncEnumerator<string> | ||
| { | ||
| private readonly IEnumerator<string> enumerator; | ||
|
|
||
| public AsyncEnumeratorAdapter(IEnumerator<string> enumerator) | ||
| { | ||
| this.enumerator = enumerator; | ||
| } | ||
|
|
||
| public string Current => this.enumerator.Current; | ||
|
|
||
| public ValueTask DisposeAsync() | ||
| { | ||
| this.enumerator.Dispose(); | ||
| return default; | ||
| } | ||
|
|
||
| public ValueTask<bool> MoveNextAsync() => new(this.enumerator.MoveNext()); | ||
| } | ||
| } | ||
This file was deleted.
Bartleby2718 marked this conversation as resolved.
Show resolved
Hide resolved
Bartleby2718 marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| #if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_0_OR_GREATER | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using Medallion.Shell.Streams; | ||
|
|
||
| namespace Medallion.Shell.Tests.Streams; | ||
|
|
||
| public class MergedLinesEnumerableTestAsync : MergedLinesEnumerableTestBase | ||
| { | ||
| protected override IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2) => | ||
| new MergedLinesEnumerable(reader1, reader2); | ||
| } | ||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Medallion.Shell.Streams; | ||
| using Moq; | ||
| using NUnit.Framework; | ||
|
|
||
| namespace Medallion.Shell.Tests.Streams; | ||
|
|
||
| public abstract class MergedLinesEnumerableTestBase | ||
| { | ||
| protected abstract IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2); | ||
|
|
||
| [Test] | ||
| public async Task TestOneIsEmpty() | ||
| { | ||
| var empty1 = new StringReader(string.Empty); | ||
| var nonEmpty1 = new StringReader("abc\r\ndef\r\nghi\r\njkl"); | ||
|
|
||
| var enumerable1 = this.Create(empty1, nonEmpty1); | ||
| var list1 = await enumerable1.ToListAsync(); | ||
| list1.SequenceEqual(["abc", "def", "ghi", "jkl"]) | ||
| .ShouldEqual(true, string.Join(", ", list1)); | ||
|
|
||
| var empty2 = new StringReader(string.Empty); | ||
| var nonEmpty2 = new StringReader("a\nbb\nccc\n"); | ||
| var enumerable2 = this.Create(nonEmpty2, empty2); | ||
| var list2 = await enumerable2.ToListAsync(); | ||
| list2.SequenceEqual(["a", "bb", "ccc"]) | ||
| .ShouldEqual(true, string.Join(", ", list2)); | ||
| } | ||
|
|
||
| [Test] | ||
| public async Task TestBothAreEmpty() | ||
| { | ||
| var list = await this.Create(new StringReader(string.Empty), new StringReader(string.Empty)).ToListAsync(); | ||
| list.Count.ShouldEqual(0, string.Join(", ", list)); | ||
| } | ||
|
|
||
| [Test] | ||
| public async Task TestBothArePopulatedEqualSizes() | ||
| { | ||
| var list = await this.Create( | ||
| new StringReader("a\nbb\nccc"), | ||
| new StringReader("1\r\n22\r\n333") | ||
| ) | ||
| .ToListAsync(); | ||
| string.Join(", ", list).ShouldEqual("a, 1, bb, 22, ccc, 333"); | ||
| } | ||
|
|
||
| [Test] | ||
| public async Task TestBothArePopulatedDifferenceSizes() | ||
| { | ||
| var lines1 = string.Join("\n", ["x", "y", "z"]); | ||
| var lines2 = string.Join("\n", ["1", "2", "3", "4", "5"]); | ||
|
|
||
| var list1 = await this.Create(new StringReader(lines1), new StringReader(lines2)) | ||
| .ToListAsync(); | ||
| string.Join(", ", list1).ShouldEqual("x, 1, y, 2, z, 3, 4, 5"); | ||
|
|
||
| var list2 = await this.Create(new StringReader(lines2), new StringReader(lines1)) | ||
| .ToListAsync(); | ||
| string.Join(", ", list2).ShouldEqual("1, x, 2, y, 3, z, 4, 5"); | ||
| } | ||
|
|
||
| [Test] | ||
| public void TestConsumeTwice() | ||
| { | ||
| var asyncEnumerable = this.Create(new StringReader("a"), new StringReader("b")); | ||
| asyncEnumerable.GetAsyncEnumerator(); | ||
| Assert.Throws<InvalidOperationException>(() => asyncEnumerable.GetAsyncEnumerator()); | ||
| } | ||
|
|
||
| [Test] | ||
| public void TestOneThrows() | ||
| { | ||
| void TestOneThrows(bool reverse) | ||
| { | ||
| var reader1 = new StringReader("a\nb\nc"); | ||
| var count = 0; | ||
| var mockReader = new Mock<TextReader>(MockBehavior.Strict); | ||
| mockReader.Setup(r => r.ReadLineAsync()) | ||
| .ReturnsAsync(() => ++count < 3 ? "LINE" : throw new TimeZoneNotFoundException()); | ||
|
|
||
| Assert.ThrowsAsync<TimeZoneNotFoundException>( | ||
| async () => await this.Create( | ||
| reverse ? mockReader.Object : reader1, | ||
| reverse ? reader1 : mockReader.Object | ||
| ).ToListAsync() | ||
| ); | ||
| } | ||
|
|
||
| TestOneThrows(reverse: false); | ||
| TestOneThrows(reverse: true); | ||
| } | ||
|
|
||
| [Test, Timeout(10_000)] // something's wrong if it's taking more than 10 seconds | ||
| public async Task FuzzTest() | ||
| { | ||
| Pipe pipe1 = new(), pipe2 = new(); | ||
|
|
||
| var asyncEnumerable = this.Create(new StreamReader(pipe1.OutputStream), new StreamReader(pipe2.OutputStream)); | ||
|
|
||
| var strings1 = Enumerable.Range(0, 2000).Select(_ => Guid.NewGuid().ToString()).ToArray(); | ||
| var strings2 = Enumerable.Range(0, 2300).Select(_ => Guid.NewGuid().ToString()).ToArray(); | ||
|
|
||
| static async Task WriteStringsAsync(IReadOnlyList<string> strings, Pipe pipe) | ||
| { | ||
| await Task.CompletedTask; // to make compiler happy | ||
|
|
||
| SpinWait spinWait = default; | ||
| Random random = new(Guid.NewGuid().GetHashCode()); | ||
| using StreamWriter writer = new(pipe.InputStream); | ||
| foreach (var line in strings) | ||
| { | ||
| if (random.Next(10) == 1) | ||
| { | ||
| spinWait.SpinOnce(); | ||
| } | ||
|
|
||
| writer.WriteLine(line); | ||
| } | ||
| } | ||
|
|
||
| var task1 = WriteStringsAsync(strings1, pipe1); | ||
| var task2 = WriteStringsAsync(strings2, pipe2); | ||
| var consumeTask = asyncEnumerable.ToListAsync(); | ||
| await Task.WhenAll(task1, task2, consumeTask); // need to dispose the writer to end the stream | ||
|
|
||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I had to swap the order for tests to pass. Is this a red flag?
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah let's revert this change and make sure it still passes. Also, does this pass or fail on main? You didn't make any changes to Pipe I think so it may be an issue with the release branch.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @madelson I looked more into this and gathered some numbers, but I'm lost as to how I should debug this. Note:
However, I noticed that a small change makes a difference.
var consumeTask = Task.Run(enumerable.ToListAsync);
Task.WaitAll(task1, task2);
Task.WaitAll(task1, task2);
var consumeTask = Task.Run(enumerable.ToListAsync);
// originally 2000
var strings1 = Enumerable.Range(0, 20).Select(_ => Guid.NewGuid().ToString()).ToArray();
// originally 2300
var strings2 = Enumerable.Range(0, 23).Select(_ => Guid.NewGuid().ToString()).ToArray();
...
// same as master or release-1.7
var consumeTask = Task.Run(enumerable.ToListAsync);
Task.WaitAll(task1, task2, consumeTask);
CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);Therefore, I believe that my Any idea how I should debug this? For one thing, I think replacing
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also found that
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like ChatGPT came to the rescue again. (It wasn't helping a few hours ago.) I lost its message, but it said something along the lines of " Now the test passes, but I can't run
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Makes sense; we have do dispose the writer to end the stream. Can you point me to the relevant code change?
I'm not sure I follow here. As often as what? Does it fail when it runs more often? In what way? How does the overall time for this test case compare before and after the changes (I would expect it to be the same). Who would be overriding the frequency if it were protected virtual?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @madelson Let me know if the above makes sense!
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @madelson Bumping this thread!
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This makes me feel like there is a bug somewhere. It could be in the MergedLinesEnumerable changes, it could be in the test code, or it could be in the What I would suggest is to (temporarily) add some logging statements to the code like this: My assumption is that at some point we should stop seeing log statements as the code will enter a hung state. We can then add additional logs to try to get closer and closer to the point where each thread stops. From there, hopefully we can deduce why it is hanging. |
||
| CollectionAssert.AreEquivalent(strings1.Concat(strings2), consumeTask.Result); | ||
| } | ||
Bartleby2718 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public static class AsyncEnumerableExtensions | ||
| { | ||
| public static async Task<List<string>> ToListAsync(this IAsyncEnumerable<string> strings) | ||
| { | ||
| List<string> result = []; | ||
| await foreach (var item in strings) { result.Add(item); } | ||
| return result; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using Medallion.Shell.Streams; | ||
|
|
||
| namespace Medallion.Shell.Tests.Streams; | ||
|
|
||
| public class MergedLinesEnumerableTestSync : MergedLinesEnumerableTestBase | ||
| { | ||
| protected override IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2) => | ||
| new AsyncEnumerableAdapter(new MergedLinesEnumerable(reader1, reader2)); | ||
| } |






Uh oh!
There was an error while loading. Please reload this page.