diff --git a/src/Nethermind/Nethermind.BalRecorder.Test/BalRecordingBranchProcessorTests.cs b/src/Nethermind/Nethermind.BalRecorder.Test/BalRecordingBranchProcessorTests.cs index 9b8c120a15da..66ca3756bd0a 100644 --- a/src/Nethermind/Nethermind.BalRecorder.Test/BalRecordingBranchProcessorTests.cs +++ b/src/Nethermind/Nethermind.BalRecorder.Test/BalRecordingBranchProcessorTests.cs @@ -63,17 +63,21 @@ public void Forwards_branch_processor_events_to_the_inner_processor() Block? processed = null; Block? processing = null; IReadOnlyList? batch = null; + IReadOnlyList? processedBatch = null; sut.BlockProcessed += (_, e) => processed = e.Block; sut.BlocksProcessing += (_, e) => batch = e.Blocks; + sut.BlocksProcessed += (_, e) => processedBatch = e.Blocks; sut.BlockProcessing += (_, e) => processing = e.Block; inner.RaiseBlockProcessed(block); inner.RaiseBlocksProcessing([block]); + inner.RaiseBlocksProcessed([block]); inner.RaiseBlockProcessing(block); Assert.That(processed, Is.SameAs(block)); Assert.That(processing, Is.SameAs(block)); Assert.That(batch, Is.EqualTo(new[] { block })); + Assert.That(processedBatch, Is.EqualTo(new[] { block })); } private sealed class StubBranchProcessor : IBranchProcessor @@ -88,10 +92,12 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo public event EventHandler? BlockProcessed; public event EventHandler? BlocksProcessing; + public event EventHandler? BlocksProcessed; public event EventHandler? BlockProcessing; public void RaiseBlockProcessed(Block block) => BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(block, [])); public void RaiseBlocksProcessing(IReadOnlyList blocks) => BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(blocks)); + public void RaiseBlocksProcessed(IReadOnlyList blocks) => BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs(blocks)); public void RaiseBlockProcessing(Block block) => BlockProcessing?.Invoke(this, new BlockEventArgs(block)); } } diff --git a/src/Nethermind/Nethermind.BalRecorder/BalRecordingBranchProcessor.cs b/src/Nethermind/Nethermind.BalRecorder/BalRecordingBranchProcessor.cs index b5a2f83099f7..34ae4780628a 100644 --- a/src/Nethermind/Nethermind.BalRecorder/BalRecordingBranchProcessor.cs +++ b/src/Nethermind/Nethermind.BalRecorder/BalRecordingBranchProcessor.cs @@ -77,6 +77,12 @@ public event EventHandler? BlocksProcessing remove => inner.BlocksProcessing -= value; } + public event EventHandler? BlocksProcessed + { + add => inner.BlocksProcessed += value; + remove => inner.BlocksProcessed -= value; + } + public event EventHandler? BlockProcessing { add => inner.BlockProcessing += value; diff --git a/src/Nethermind/Nethermind.Benchmark/Scheduler/BackgroundTaskSchedulerBenchmarks.cs b/src/Nethermind/Nethermind.Benchmark/Scheduler/BackgroundTaskSchedulerBenchmarks.cs index 5ab4d42ecf03..73a52c4cdecb 100644 --- a/src/Nethermind/Nethermind.Benchmark/Scheduler/BackgroundTaskSchedulerBenchmarks.cs +++ b/src/Nethermind/Nethermind.Benchmark/Scheduler/BackgroundTaskSchedulerBenchmarks.cs @@ -91,7 +91,7 @@ public async Task ScheduleAndDrainDuringBlockProcessing() await Task.Delay(BlockProcessingDurationMs); // Block done — resume normal task execution - _branchProcessor.RaiseBlockProcessed(); + _branchProcessor.RaiseBlocksProcessed(); // Wait for all scheduled tasks to drain before next cycle SpinWait spin = default; @@ -143,9 +143,10 @@ public async Task ScheduleAndDrainWithoutBlockProcessing() /// private sealed class StubBranchProcessor : IBranchProcessor { - public event EventHandler? BlockProcessed; public event EventHandler? BlocksProcessing; + public event EventHandler? BlocksProcessed; #pragma warning disable CS0067 // Event is never used + public event EventHandler? BlockProcessed; public event EventHandler? BlockProcessing; #pragma warning restore CS0067 @@ -156,8 +157,8 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo public void RaiseBlocksProcessing() => BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs([])); - public void RaiseBlockProcessed() => - BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(null!, null!)); + public void RaiseBlocksProcessed() => + BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs([])); } /// diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs index 93ce20314d87..e93e694ca8ac 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockchainProcessorTests.cs @@ -102,48 +102,57 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo _logger.Info($"Processing {suggestedBlocks.Last().ToString(Block.Format.Short)}"); int nextBlock = 0; - while (true) + BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); + try { - lock (_gate) + while (true) { - bool notYet = false; - for (int i = nextBlock; i < suggestedBlocks.Count; i++) + lock (_gate) { - BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); - Block suggestedBlock = suggestedBlocks[i]; - BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock)); - Hash256 hash = suggestedBlock.Hash!; - if (!_allowed.Contains(hash)) + bool notYet = false; + for (int i = nextBlock; i < suggestedBlocks.Count; i++) { - if (_allowedToFail.TryRemove(hash)) + Block suggestedBlock = suggestedBlocks[i]; + BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock)); + Hash256 hash = suggestedBlock.Hash!; + if (!_allowed.Contains(hash)) { - BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, [])); - throw new InvalidBlockException(suggestedBlock, "allowed to fail"); + if (_allowedToFail.TryRemove(hash)) + { + BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, [])); + throw new InvalidBlockException(suggestedBlock, "allowed to fail"); + } + + notYet = true; + break; } - notYet = true; - break; + BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, [])); + nextBlock = i + 1; } - BlockProcessed?.Invoke(this, new BlockProcessedEventArgs(suggestedBlock, [])); - nextBlock = i + 1; - } - - if (notYet) - { - Monitor.Wait(_gate, MockRecheckInterval); - } - else - { - _rootProcessed.Add(suggestedBlocks.Last().StateRoot!); - return suggestedBlocks.ToArray(); + if (notYet) + { + Monitor.Wait(_gate, MockRecheckInterval); + } + else + { + _rootProcessed.Add(suggestedBlocks.Last().StateRoot!); + return suggestedBlocks.ToArray(); + } } } } + finally + { + BlocksProcessed?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); + } } public event EventHandler? BlocksProcessing; + public event EventHandler? BlocksProcessed; + public event EventHandler? BlockProcessing; public event EventHandler? BlockProcessed; diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs b/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs index 5b089c3f11c3..9beded676126 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs @@ -21,6 +21,7 @@ internal class TestBranchProcessor : IBranchProcessor { public event EventHandler? BlockProcessed; public event EventHandler? BlocksProcessing { add { } remove { } } + public event EventHandler? BlocksProcessed { add { } remove { } } public event EventHandler? BlockProcessing { add { } remove { } } public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlocks, diff --git a/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerBurstTests.cs b/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerBurstTests.cs index 219ee7b59899..5e2da4914a65 100644 --- a/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerBurstTests.cs +++ b/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerBurstTests.cs @@ -46,7 +46,7 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load( for (int cycle = 0; cycle < cycles; cycle++) { // Start block processing — tasks should be paused, not executed - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); for (int i = 0; i < tasksPerCycle; i++) { @@ -63,7 +63,7 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load( } // End block processing — paused tasks should now resume and execute - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); await Task.Delay(200); } @@ -87,4 +87,14 @@ public async Task Queue_should_not_drop_tasks_under_sustained_load( Is.EqualTo(postCycleCount).After(5000, 10), "all post-cycle tasks should execute after block processing ends"); } + + private BlocksProcessingEventArgs RaiseBlocksProcessing() + { + BlocksProcessingEventArgs args = new(null); + _branchProcessor.BlocksProcessing += Raise.EventWith(args); + return args; + } + + private void RaiseBlocksProcessed(BlocksProcessingEventArgs args) => + _branchProcessor.BlocksProcessed += Raise.EventWith(args); } diff --git a/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerTests.cs b/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerTests.cs index 5f115e0ffbed..5f57a5777421 100644 --- a/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerTests.cs +++ b/src/Nethermind/Nethermind.Consensus.Test/Scheduler/BackgroundTaskSchedulerTests.cs @@ -100,9 +100,9 @@ public async Task Test_task_will_cancel_on_block_processing() }); await waitSignal.WaitOneAsync(CancellationToken.None); - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); await Task.Delay(10); - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); Assert.That(() => wasCancelled, Is.EqualTo(true).After(10, 1)); } @@ -110,7 +110,7 @@ public async Task Test_task_will_cancel_on_block_processing() public async Task Test_task_scheduled_during_block_processing_gets_cancelled_token() { await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance); - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); int cancelledCount = 0; CountdownEvent expiredRan = new(5); @@ -128,7 +128,7 @@ public async Task Test_task_scheduled_during_block_processing_gets_cancelled_tok Assert.That(expiredRan.Wait(TimeSpan.FromSeconds(30)), Is.True, "Expired tasks did not all run within 30s"); Assert.That(cancelledCount, Is.EqualTo(5)); - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); int postBlockCount = 0; CountdownEvent postBlockRan = new(3); @@ -147,11 +147,73 @@ public async Task Test_task_scheduled_during_block_processing_gets_cancelled_tok Assert.That(postBlockCount, Is.EqualTo(3)); } + [Test] + public async Task Test_task_waits_until_branch_processing_finishes() + { + await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 1, 65536, LimboLogs.Instance); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); + + ManualResetEvent waitSignal = new(false); + Assert.That(scheduler.TryScheduleTask(1, (_, _) => + { + waitSignal.Set(); + return Task.CompletedTask; + }, TimeSpan.FromSeconds(5)), Is.True); + + _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + Assert.That(waitSignal.WaitOne(TimeSpan.FromMilliseconds(500)), Is.False, "task should remain paused until the whole branch finishes"); + + RaiseBlocksProcessed(branchProcessing); + Assert.That(waitSignal.WaitOne(TimeSpan.FromSeconds(5)), Is.True); + } + + [Test] + public async Task Test_branch_completion_without_block_processed_restores_uncancelled_token() + { + await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 1, 65536, LimboLogs.Instance); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); + + RaiseBlocksProcessed(branchProcessing); + + bool wasCancelled = true; + ManualResetEvent waitSignal = new(false); + Assert.That(scheduler.TryScheduleTask(1, (_, token) => + { + wasCancelled = token.IsCancellationRequested; + waitSignal.Set(); + return Task.CompletedTask; + }), Is.True); + + Assert.That(waitSignal.WaitOne(TimeSpan.FromSeconds(5)), Is.True); + Assert.That(wasCancelled, Is.False); + } + + [Test] + public async Task Test_branch_completion_with_fresh_args_restores_uncancelled_token() + { + await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 1, 65536, LimboLogs.Instance); + RaiseBlocksProcessing(); + + RaiseBlocksProcessed(new BlocksProcessingEventArgs(null)); + + bool wasCancelled = true; + ManualResetEvent waitSignal = new(false); + Assert.That(scheduler.TryScheduleTask(1, (_, token) => + { + wasCancelled = token.IsCancellationRequested; + waitSignal.Set(); + return Task.CompletedTask; + }), Is.True); + + Assert.That(waitSignal.WaitOne(TimeSpan.FromSeconds(5)), Is.True); + Assert.That(wasCancelled, Is.False); + } + [Test] public async Task Test_expired_task_during_block_processing_gets_cancelled_token_and_exits() { await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance); - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); bool wasCancelled = false; ManualResetEvent waitSignal = new(false); @@ -166,7 +228,7 @@ public async Task Test_expired_task_during_block_processing_gets_cancelled_token Assert.That(wasCancelled, Is.True, "expired task should receive a cancelled token during block processing"); // After block processing, new tasks execute normally - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); ManualResetEvent postBlockSignal = new(false); scheduler.TryScheduleTask(1, (_, token) => @@ -184,7 +246,7 @@ public async Task Test_expired_tasks_drain_during_block_processing_freeing_queue await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 1, capacity, LimboLogs.Instance); // Start block processing — token cancelled - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); // Fill the queue with tasks that expire in 1ms for (int i = 0; i < capacity; i++) @@ -202,7 +264,7 @@ public async Task Test_expired_tasks_drain_during_block_processing_freeing_queue Assert.That(accepted, Is.True, $"Task {i} should be accepted after expired tasks freed queue space"); } - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); } [Test] @@ -212,7 +274,7 @@ public async Task Test_queue_accepts_new_tasks_after_expired_tasks_drain_during_ await using BackgroundTaskScheduler scheduler = new(_branchProcessor, _chainHeadInfo, 1, capacity, LimboLogs.Instance); // Start block processing — signal is reset, token cancelled - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs branchProcessing = RaiseBlocksProcessing(); // Fill the queue with short-lived tasks for (int i = 0; i < capacity; i++) @@ -230,7 +292,7 @@ public async Task Test_queue_accepts_new_tasks_after_expired_tasks_drain_during_ Assert.That(accepted, Is.True, $"Task {i} should be accepted after expired tasks were drained"); } - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(branchProcessing); } [Test] @@ -243,7 +305,7 @@ public async Task Test_high_capacity_queue_survives_repeated_block_processing_cy int executedCount = 0; // --- Phase 1: Fill the queue during block processing — expired tasks drain with cancelled tokens --- - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs phase1BranchProcessing = RaiseBlocksProcessing(); for (int i = 0; i < capacity; i++) { @@ -255,7 +317,7 @@ public async Task Test_high_capacity_queue_survives_repeated_block_processing_cy await Task.Delay(2000); // --- Phase 2: End block processing, verify queue accepts tasks and runs them normally --- - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(phase1BranchProcessing); int phase2Count = capacity / 2; for (int i = 0; i < phase2Count; i++) @@ -274,7 +336,7 @@ public async Task Test_high_capacity_queue_survives_repeated_block_processing_cy "all phase 2 tasks should execute normally after block processing ends"); // --- Phase 3: Another block processing cycle --- - _branchProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null)); + BlocksProcessingEventArgs phase3BranchProcessing = RaiseBlocksProcessing(); int totalPhase3 = capacity / 2 + capacity / 4; for (int i = 0; i < totalPhase3; i++) @@ -286,7 +348,7 @@ public async Task Test_high_capacity_queue_survives_repeated_block_processing_cy await Task.Delay(2000); // End block processing — verify normal operation with new tasks - _branchProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null)); + RaiseBlocksProcessed(phase3BranchProcessing); int phase3ExecutedCount = 0; int longLivedCount = capacity / 4; @@ -322,4 +384,14 @@ public async Task Test_high_capacity_queue_survives_repeated_block_processing_cy Is.EqualTo(capacity).After(5000, 10), "all tasks in the final phase should execute successfully"); } + + private BlocksProcessingEventArgs RaiseBlocksProcessing() + { + BlocksProcessingEventArgs args = new(null); + _branchProcessor.BlocksProcessing += Raise.EventWith(args); + return args; + } + + private void RaiseBlocksProcessed(BlocksProcessingEventArgs args) => + _branchProcessor.BlocksProcessed += Raise.EventWith(args); } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BranchProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BranchProcessor.cs index a36b74d4d404..7d1554f4def4 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BranchProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BranchProcessor.cs @@ -36,6 +36,8 @@ public class BranchProcessor( public event EventHandler? BlocksProcessing; + public event EventHandler? BlocksProcessed; + public event EventHandler? BlockProcessing; private void PreCommitBlock(BlockHeader block) @@ -72,6 +74,7 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo CancellationTokenSource? backgroundCancellation = new(); Task? preWarmTask = null; + BlocksProcessingEventArgs? blocksProcessingEventArgs = null; // Subscribe to cancel background work (prewarmer, prefetch) once transactions finish, // freeing the thread pool for parallel post-tx work (blooms, receipts root, state root). @@ -87,7 +90,8 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo preWarmTask = PreWarmTransactions(suggestedBlock, baseBlock!, spec, backgroundCancellation.Token); Task? prefetchBlockhash = blockhashProvider.Prefetch(suggestedBlock.Header, backgroundCancellation.Token); - BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks)); + blocksProcessingEventArgs = new BlocksProcessingEventArgs(suggestedBlocks); + BlocksProcessing?.Invoke(this, blocksProcessingEventArgs); BlockHeader? preBlockBaseBlock = baseBlock; @@ -185,8 +189,18 @@ public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlo } finally { - blockProcessor.TransactionsExecuted -= CancelBackgroundWork; - worldStateCloser?.Dispose(); + try + { + blockProcessor.TransactionsExecuted -= CancelBackgroundWork; + worldStateCloser?.Dispose(); + } + finally + { + if (blocksProcessingEventArgs is not null) + { + BlocksProcessed?.Invoke(this, blocksProcessingEventArgs); + } + } } static void WaitAndClear(ref Task? task) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/IBranchProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/IBranchProcessor.cs index 4ab2db8685c8..63760ef37037 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/IBranchProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/IBranchProcessor.cs @@ -37,6 +37,11 @@ Block[] Process( /// event EventHandler BlocksProcessing; + /// + /// Fired after branch processing finishes. + /// + event EventHandler BlocksProcessed; + /// /// Fired when a block is being processed. /// diff --git a/src/Nethermind/Nethermind.Consensus/Scheduler/BackgroundTaskScheduler.cs b/src/Nethermind/Nethermind.Consensus/Scheduler/BackgroundTaskScheduler.cs index c5523106189c..f9eecc9b15b4 100644 --- a/src/Nethermind/Nethermind.Consensus/Scheduler/BackgroundTaskScheduler.cs +++ b/src/Nethermind/Nethermind.Consensus/Scheduler/BackgroundTaskScheduler.cs @@ -34,6 +34,8 @@ public class BackgroundTaskScheduler : IBackgroundTaskScheduler, IAsyncDisposabl private readonly IBranchProcessor _branchProcessor; private readonly IChainHeadInfoProvider _headInfo; private readonly int _capacity; + private readonly object _blockProcessingLock = new(); + private int _activeBlockProcessingBranches; private long _queueCount; private CancellationTokenSource _blockProcessorCancellationTokenSource; @@ -64,7 +66,7 @@ public BackgroundTaskScheduler(IBranchProcessor branchProcessor, IChainHeadInfoP _capacity = capacity; _branchProcessor.BlocksProcessing += BranchProcessorOnBranchesProcessing; - _branchProcessor.BlockProcessed += BranchProcessorOnBranchProcessed; + _branchProcessor.BlocksProcessed += BranchProcessorOnBranchesProcessed; // TaskScheduler to run tasks at BelowNormal priority _scheduler = new BelowNormalPriorityTaskScheduler( @@ -79,26 +81,65 @@ private void BranchProcessorOnBranchesProcessing(object? sender, BlocksProcessin { // If we are syncing, we don't block background task processing // as there are potentially no gaps between blocks - if (!_headInfo.IsSyncing) + if (_headInfo.IsSyncing) { - long depth = Volatile.Read(ref _queueCount); - if (_logger.IsDebug) _logger.Debug($"Block processing starting, background queue depth: {depth}"); - // Signal that block processing is in progress so StartChannel can async-wait - _blockProcessingDoneSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - // On block processing, cancel the block process CTS so running tasks can exit quickly - _blockProcessorCancellationTokenSource.Cancel(); + return; + } + + lock (_blockProcessingLock) + { + if (_disposed) + { + return; + } + + if (_activeBlockProcessingBranches++ == 0) + { + long depth = Volatile.Read(ref _queueCount); + if (_logger.IsDebug) _logger.Debug($"Block processing starting, background queue depth: {depth}"); + // Signal that block processing is in progress so StartChannel can async-wait + _blockProcessingDoneSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // On block processing, cancel the block process CTS so running tasks can exit quickly + _blockProcessorCancellationTokenSource.Cancel(); + } } } - private void BranchProcessorOnBranchProcessed(object? sender, BlockProcessedEventArgs e) + private void BranchProcessorOnBranchesProcessed(object? sender, BlocksProcessingEventArgs e) { - // Once the block is processed, we replace the cancellation token with a fresh uncanceled one - using CancellationTokenSource oldTokenSource = Interlocked.Exchange( - ref _blockProcessorCancellationTokenSource, - new CancellationTokenSource()); + CancellationTokenSource? oldTokenSource = null; + TaskCompletionSource? signal = null; + + lock (_blockProcessingLock) + { + if (_disposed || _activeBlockProcessingBranches == 0) + { + return; + } - // Signal that block processing is done so the paused consumers can resume - Interlocked.Exchange(ref _blockProcessingDoneSignal, null)?.TrySetResult(); + if (--_activeBlockProcessingBranches != 0) + { + return; + } + + oldTokenSource = _blockProcessorCancellationTokenSource; + _blockProcessorCancellationTokenSource = new CancellationTokenSource(); + signal = _blockProcessingDoneSignal; + _blockProcessingDoneSignal = null; + } + + oldTokenSource.Dispose(); + signal?.TrySetResult(); + } + + private CancellationTokenSource CreateTaskCancellationTokenSource() + { + lock (_blockProcessingLock) + { + return CancellationTokenSource.CreateLinkedTokenSource( + _blockProcessorCancellationTokenSource.Token, + _mainCancellationTokenSource.Token); + } } private async Task StartChannel() @@ -108,9 +149,7 @@ private async Task StartChannel() while (await _taskQueue.Reader.WaitToReadAsync(_mainCancellationTokenSource.Token)) { // Create fresh CancellationTokenSource for current block processing - CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource( - Volatile.Read(ref _blockProcessorCancellationTokenSource).Token, - _mainCancellationTokenSource.Token); + CancellationTokenSource cts = CreateTaskCancellationTokenSource(); try { CancellationToken token = cts.Token; @@ -213,7 +252,7 @@ public async ValueTask DisposeAsync() if (Interlocked.CompareExchange(ref _disposed, true, false)) return; _branchProcessor.BlocksProcessing -= BranchProcessorOnBranchesProcessing; - _branchProcessor.BlockProcessed -= BranchProcessorOnBranchProcessed; + _branchProcessor.BlocksProcessed -= BranchProcessorOnBranchesProcessed; _taskQueue.Writer.Complete(); await _mainCancellationTokenSource.CancelAsync(); @@ -221,7 +260,12 @@ public async ValueTask DisposeAsync() // until they observe cancellation and complete. await Task.WhenAll(_tasksExecutors); _mainCancellationTokenSource.Dispose(); - _blockProcessorCancellationTokenSource.Dispose(); + lock (_blockProcessingLock) + { + _activeBlockProcessingBranches = 0; + _blockProcessingDoneSignal = null; + _blockProcessorCancellationTokenSource.Dispose(); + } _scheduler.Dispose(); } diff --git a/src/Nethermind/Nethermind.Merge.Plugin.Test/TestBranchProcessorInterceptor.cs b/src/Nethermind/Nethermind.Merge.Plugin.Test/TestBranchProcessorInterceptor.cs index 5f6716e42aef..7fb5b46f4df2 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin.Test/TestBranchProcessorInterceptor.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin.Test/TestBranchProcessorInterceptor.cs @@ -39,6 +39,12 @@ public event EventHandler? BlocksProcessing remove => baseBlockProcessor.BlocksProcessing -= value; } + public event EventHandler? BlocksProcessed + { + add => baseBlockProcessor.BlocksProcessed += value; + remove => baseBlockProcessor.BlocksProcessed -= value; + } + public event EventHandler? BlockProcessing { add => baseBlockProcessor.BlockProcessing += value;