Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,21 @@ public void Forwards_branch_processor_events_to_the_inner_processor()
Block? processed = null;
Block? processing = null;
IReadOnlyList<Block>? batch = null;
IReadOnlyList<Block>? processedBatch = null;
sut.BlockProcessed += (_, e) => processed = e.Block;
sut.BlocksProcessing += (_, e) => batch = e.Blocks;
sut.BlocksProcessed += (_, e) => processedBatch = e.Blocks;
sut.BlockProcessing += (_, e) => processing = e.Block;

inner.RaiseBlockProcessed(block);
inner.RaiseBlocksProcessing([block]);
inner.RaiseBlocksProcessed([block]);
inner.RaiseBlockProcessing(block);

Assert.That(processed, Is.SameAs(block));
Assert.That(processing, Is.SameAs(block));
Assert.That(batch, Is.EqualTo(new[] { block }));
Assert.That(processedBatch, Is.EqualTo(new[] { block }));
}

private sealed class StubBranchProcessor : IBranchProcessor
Expand All @@ -88,10 +92,12 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList<Block> suggestedBlo

public event EventHandler<BlockProcessedEventArgs>? BlockProcessed;
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessing;
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessed;
public event EventHandler<BlockEventArgs>? BlockProcessing;

public void RaiseBlockProcessed(Block block) => BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(block, []));
public void RaiseBlocksProcessing(IReadOnlyList<Block> blocks) => BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(blocks));
public void RaiseBlocksProcessed(IReadOnlyList<Block> blocks) => BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs(blocks));
public void RaiseBlockProcessing(Block block) => BlockProcessing?.Invoke(this, new BlockEventArgs(block));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessing
remove => inner.BlocksProcessing -= value;
}

public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessed
{
add => inner.BlocksProcessed += value;
remove => inner.BlocksProcessed -= value;
}

public event EventHandler<BlockEventArgs>? BlockProcessing
{
add => inner.BlockProcessing += value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public async Task ScheduleAndDrainDuringBlockProcessing()
await Task.Delay(BlockProcessingDurationMs);

// Block done — resume normal task execution
_branchProcessor.RaiseBlockProcessed();
_branchProcessor.RaiseBlocksProcessed();

// Wait for all scheduled tasks to drain before next cycle
SpinWait spin = default;
Expand Down Expand Up @@ -143,9 +143,10 @@ public async Task ScheduleAndDrainWithoutBlockProcessing()
/// </summary>
private sealed class StubBranchProcessor : IBranchProcessor
{
public event EventHandler<BlockProcessedEventArgs>? BlockProcessed;
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessing;
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessed;
#pragma warning disable CS0067 // Event is never used
public event EventHandler<BlockProcessedEventArgs>? BlockProcessed;
public event EventHandler<BlockEventArgs>? BlockProcessing;
#pragma warning restore CS0067

Expand All @@ -156,8 +157,8 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList<Block> suggestedBlo
public void RaiseBlocksProcessing() =>
BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs([]));

public void RaiseBlockProcessed() =>
BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(null!, null!));
public void RaiseBlocksProcessed() =>
BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs([]));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,48 +102,57 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList<Block> suggestedBlo

_logger.Info($"Processing {suggestedBlocks.Last().ToString(Block.Format.Short)}");
int nextBlock = 0;
while (true)
BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));
try
{
lock (_gate)
while (true)
{
bool notYet = false;
for (int i = nextBlock; i < suggestedBlocks.Count; i++)
lock (_gate)
{
BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));
Block suggestedBlock = suggestedBlocks[i];
BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock));
Hash256 hash = suggestedBlock.Hash!;
if (!_allowed.Contains(hash))
bool notYet = false;
for (int i = nextBlock; i < suggestedBlocks.Count; i++)
{
if (_allowedToFail.TryRemove(hash))
Block suggestedBlock = suggestedBlocks[i];
BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock));
Hash256 hash = suggestedBlock.Hash!;
if (!_allowed.Contains(hash))
{
BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, []));
throw new InvalidBlockException(suggestedBlock, "allowed to fail");
if (_allowedToFail.TryRemove(hash))
{
BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, []));
throw new InvalidBlockException(suggestedBlock, "allowed to fail");
}

notYet = true;
break;
}

notYet = true;
break;
BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, []));
nextBlock = i + 1;
}

BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, []));
nextBlock = i + 1;
}

if (notYet)
{
Monitor.Wait(_gate, MockRecheckInterval);
}
else
{
_rootProcessed.Add(suggestedBlocks.Last().StateRoot!);
return suggestedBlocks.ToArray();
if (notYet)
{
Monitor.Wait(_gate, MockRecheckInterval);
}
else
{
_rootProcessed.Add(suggestedBlocks.Last().StateRoot!);
return suggestedBlocks.ToArray();
}
}
}
}
finally
{
BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));
}
}

public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessing;

public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessed;

public event EventHandler<BlockEventArgs>? BlockProcessing;

public event EventHandler<BlockProcessedEventArgs>? BlockProcessed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class TestBranchProcessor : IBranchProcessor
{
public event EventHandler<BlockProcessedEventArgs>? BlockProcessed;
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessing { add { } remove { } }
public event EventHandler<BlocksProcessingEventArgs>? BlocksProcessed { add { } remove { } }
public event EventHandler<BlockEventArgs>? BlockProcessing { add { } remove { } }

public Block[] Process(BlockHeader? baseBlock, IReadOnlyList<Block> suggestedBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load(
for (int cycle = 0; cycle < cycles; cycle++)
{
// Start block processing — tasks should be paused, not executed
_branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));
BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing();

for (int i = 0; i < tasksPerCycle; i++)
{
Expand All @@ -63,7 +63,7 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load(
}

// End block processing — paused tasks should now resume and execute
_branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null));
RaiseBlocksProcessed(branchProcessing);
await Task.Delay(200);
}

Expand All @@ -87,4 +87,14 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load(
Is.EqualTo(postCycleCount).After(5000, 10),
"all post-cycle tasks should execute after block processing ends");
}

private BlocksProcessingEventArgs RaiseBlocksProcessing()
{
BlocksProcessingEventArgs args = new(null);
_branchProcessor.BlocksProcessing += Raise.EventWith(args);
return args;
}

private void RaiseBlocksProcessed(BlocksProcessingEventArgs args) =>
_branchProcessor.BlocksProcessed += Raise.EventWith(args);
}
Loading