Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected override void Load(ContainerBuilder builder)
new TrustedNodesManager(initConfig.TrustedNodesPath.GetApplicationResourcePath(initConfig.DataDir), logManager))

.Bind<INodeSource, IStaticNodesManager>()
.Bind<INodeSource, ITrustedNodesManager>()

// Used by NodesLoader, and ProtocolsManager which add entry on sync peer connected
.AddNetworkStorage(DbNames.PeersDb, DbNames.PeersDb)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ public async Task Will_disconnect_on_remove_static_node()
void DisconnectHandler(object o, DisconnectEventArgs e) => disconnections++;
ctx.Sessions.ForEach(s => s.Disconnected += DisconnectHandler);

ctx.StaticNodesManager.NodeRemoved += Raise.EventWith(new NodeEventArgs(
ctx.StaticNodesManager.NodeRemoved += Raise.EventWith<NodeEventArgs>(new ExplicitNodeRemovalEventArgs(
new Node(staticNodes.First())));

Assert.That(ctx.PeerManager.ActivePeers.Count(p => p.Node.IsStatic), Is.EqualTo(nodesCount - 1));
Expand Down
17 changes: 17 additions & 0 deletions src/Nethermind/Nethermind.Network.Test/PeerPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,23 @@ public async Task PeerPool_ShouldThrottleSource_WhenActivePeerPoolIsFull()
}
}

[TestCase(true, true)] // active session → peer kept
[TestCase(false, false)] // no session → peer evicted
public void PeerPool_DiscoveryEviction(bool hasSession, bool expectPresent)
{
ITrustedNodesManager trustedNodesManager = Substitute.For<ITrustedNodesManager>();
TestNodeSource nodeSource = new();
PeerPool pool = CreatePeerPool(nodeSource, trustedNodesManager, maxActivePeers: 10, maxCandidatePeerCount: 10);

Node node = new(TestItem.PublicKeyA, "1.2.3.4", 1234);
Peer peer = pool.GetOrAdd(node);
if (hasSession) peer.InSession = Substitute.For<ISession>();

nodeSource.RemoveNode(node);

Assert.That(pool.TryGet(node.Id, out _), Is.EqualTo(expectPresent));
}

[Test]
public void PeerPool_Replace_DoesNotInheritStaticFlag()
{
Expand Down
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Network/NodeEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ public class NodeEventArgs(Node node) : EventArgs
{
public Node Node { get; } = node;
}

/// <summary>
/// Raised when a node is explicitly removed by the operator (e.g. via admin_removePeer).
/// Receivers must disconnect any active P2P session unconditionally.
/// </summary>
public class ExplicitNodeRemovalEventArgs(Node node) : NodeEventArgs(node);
}
20 changes: 20 additions & 0 deletions src/Nethermind/Nethermind.Network/NodesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Serialization.Json;
using Nethermind.Stats.Model;

namespace Nethermind.Network;

Expand Down Expand Up @@ -132,6 +133,25 @@ protected virtual async Task<ConcurrentDictionary<PublicKey, NetworkNode>> Parse
return nodes;
}

/// <summary>
/// Raised when a node is explicitly removed from this source.
/// </summary>
public event EventHandler<NodeEventArgs>? NodeRemoved;

/// <summary>
/// Removes a node from the in-memory store and fires <see cref="NodeRemoved"/> as an
/// <see cref="ExplicitNodeRemovalEventArgs"/> so downstream listeners (e.g. <c>PeerPool</c>)
/// know to disconnect the peer unconditionally.
/// </summary>
protected bool TryRemoveNode(PublicKey nodeId)
{
if (!_nodes.TryRemove(nodeId, out NetworkNode? removed))
return false;

NodeRemoved?.Invoke(this, new ExplicitNodeRemovalEventArgs(new Node(removed)));
return true;
}

protected virtual Task SaveFileAsync(CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(path);
Expand Down
18 changes: 14 additions & 4 deletions src/Nethermind/Nethermind.Network/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace Nethermind.Network
/// </summary>
public sealed class Peer(Node node, INodeStats? stats = null) : IEquatable<Peer>
{
public bool IsAwaitingConnection { get; set; }

/// <summary>
/// A physical network node with a network address combined with information about the client version
/// and any extra attributes that we assign to a network node (static / trusted / bootnode).
Expand All @@ -30,15 +28,27 @@ public sealed class Peer(Node node, INodeStats? stats = null) : IEquatable<Peer>

internal INodeStats? Stats { get; } = stats;

/// <remarks>
/// Guards <see cref="IsAwaitingConnection"/>, <see cref="InSession"/>, and <see cref="OutSession"/>
/// as a unit. Using a dedicated object avoids locking on a publicly visible instance.
/// </remarks>
internal readonly object SessionLock = new();

private volatile bool _isAwaitingConnection;
private volatile ISession? _inSession;
private volatile ISession? _outSession;

public bool IsAwaitingConnection { get => _isAwaitingConnection; set => _isAwaitingConnection = value; }

/// <summary>
/// An incoming session to the Node which can be in one of many states.
/// </summary>
public ISession? InSession { get; set; }
public ISession? InSession { get => _inSession; set => _inSession = value; }

/// <summary>
/// An outgoing session to the Node which can be in one of many states.
/// </summary>
public ISession? OutSession { get; set; }
public ISession? OutSession { get => _outSession; set => _outSession = value; }

public override string ToString() => $"[Peer|{Node:s}|{InSession}|{OutSession}]";

Expand Down
17 changes: 10 additions & 7 deletions src/Nethermind/Nethermind.Network/PeerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,14 +1075,17 @@ private bool CanAttachSessionDirectly(ISession session, Peer peer)

private void AttachSession(Peer peer, ISession session, ConnectionDirection sessionDirection, bool disconnectOpposite)
{
if (sessionDirection == ConnectionDirection.In)
lock (peer.SessionLock)
{
peer.Stats.AddNodeStatsHandshakeEvent(ConnectionDirection.In);
peer.InSession = session;
}
else
{
peer.OutSession = session;
if (sessionDirection == ConnectionDirection.In)
{
peer.Stats.AddNodeStatsHandshakeEvent(ConnectionDirection.In);
peer.InSession = session;
}
else
{
peer.OutSession = session;
}
}

if (disconnectOpposite)
Expand Down
38 changes: 31 additions & 7 deletions src/Nethermind/Nethermind.Network/PeerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,30 @@ public PeerPool(
_nodeSource.NodeRemoved += NodeSourceOnNodeRemoved;
}

private void NodeSourceOnNodeRemoved(object? sender, NodeEventArgs e) => TryRemove(e.Node.Id, out _);
private void NodeSourceOnNodeRemoved(object? sender, NodeEventArgs e)
Comment thread
DarkLord017 marked this conversation as resolved.
{
if (!Peers.TryGetValue(e.Node.Id, out Peer? peer))
return;

if (e is not ExplicitNodeRemovalEventArgs)
{
// Only remove the peer if no P2P session is active.
// The dictionary removals are done inside SessionLock so the session check and
// removal are atomic against AttachSession. PeerRemoved is fired outside the lock
// to avoid holding it across arbitrary event handler code.
bool removed;
lock (peer.SessionLock)
{
removed = peer.InSession is null && peer.OutSession is null && !peer.IsAwaitingConnection
&& Peers.TryRemove(e.Node.Id, out _);
if (removed) _staticPeers.TryRemove(e.Node.Id, out _);
}
if (removed) PeerRemoved?.Invoke(this, new PeerEventArgs(peer));
return;
}

TryRemove(e.Node.Id, out _);
Comment thread
DarkLord017 marked this conversation as resolved.
}

public Peer GetOrAdd(Node node) => Peers.GetOrAdd(node.Id, valueFactory: _createNewNodePeer, (node, _staticPeers));

Expand Down Expand Up @@ -114,18 +137,19 @@ private Peer CreateNew(PublicKeyAsKey key, (NetworkNode Node, ConcurrentDictiona

public bool TryRemove(PublicKey id, out Peer peer)
{
if (Peers.TryRemove(id, out peer))
if (!Peers.TryRemove(id, out peer))
return false;

_staticPeers.TryRemove(id, out _);
lock (peer.SessionLock)
{
_staticPeers.TryRemove(id, out _);
peer.InSession?.MarkDisconnected(DisconnectReason.PeerRemoved, DisconnectType.Local, "admin_removePeer");
peer.OutSession?.MarkDisconnected(DisconnectReason.PeerRemoved, DisconnectType.Local, "admin_removePeer");
peer.InSession = null;
peer.OutSession = null;
PeerRemoved?.Invoke(this, new PeerEventArgs(peer));
return true;
}

return false;
PeerRemoved?.Invoke(this, new PeerEventArgs(peer));
return true;
}

public Peer Replace(ISession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,14 @@ public async Task<bool> AddAsync(NetworkNode networkNode, bool updateFile = true

public async Task<bool> RemoveAsync(NetworkNode networkNode, bool updateFile = true, CancellationToken cancellationToken = default)
{
if (!_nodes.TryRemove(networkNode.NodeId, out _))
if (!TryRemoveNode(networkNode.NodeId))
{
if (_logger.IsInfo) _logger.Info($"Static node was not found: {networkNode}");
return false;
}

if (_logger.IsInfo) _logger.Info($"Static node was removed: {networkNode}");
Node node = new(networkNode);
NodeRemoved?.Invoke(this, new NodeEventArgs(node));
if (updateFile)
{
await SaveFileAsync(cancellationToken);
}

if (updateFile) await SaveFileAsync(cancellationToken);
return true;
}

Expand Down Expand Up @@ -101,6 +95,4 @@ public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] Cance
}

private event EventHandler<NodeEventArgs>? NodeAdded;

public event EventHandler<NodeEventArgs>? NodeRemoved;
}
35 changes: 6 additions & 29 deletions src/Nethermind/Nethermind.Network/TrustedNodesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Stats.Model;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -84,40 +83,18 @@ public async Task<bool> AddAsync(Enode enode, bool updateFile = true, Cancellati
public async Task<bool> RemoveAsync(Enode enode, bool updateFile = true, CancellationToken cancellationToken = default)
{
NetworkNode networkNode = new(enode);
if (!_nodes.TryRemove(networkNode.NodeId, out _))
// Fire NodeRemoved BEFORE the file write: a cancelled SaveFileAsync must not leave
// the peer disconnected in-memory but still persisted as trusted.
if (!TryRemoveNode(networkNode.NodeId))
{
if (_logger.IsInfo)
{
_logger.Info($"Trusted node was not found: {enode}");
}
if (_logger.IsInfo) _logger.Info($"Trusted node was not found: {enode}");
return false;
}

if (_logger.IsInfo)
{
_logger.Info($"Trusted node was removed: {enode}");
}

// Fire NodeRemoved (drives PeerPool disconnect via the event chain) BEFORE the file write,
// so a cancelled SaveFileAsync cannot leave the peer untrusted-in-memory yet still connected.
// Mirrors StaticNodesManager.RemoveAsync ordering.
OnNodeRemoved(networkNode);

if (updateFile)
{
await SaveFileAsync(cancellationToken);
}

if (_logger.IsInfo) _logger.Info($"Trusted node was removed: {enode}");
if (updateFile) await SaveFileAsync(cancellationToken);
return true;
}

public bool IsTrusted(Enode enode) => _nodes.ContainsKey(enode.PublicKey);

public event EventHandler<NodeEventArgs>? NodeRemoved;

private void OnNodeRemoved(NetworkNode node)
{
Node nodeForEvent = new(node);
NodeRemoved?.Invoke(this, new NodeEventArgs(nodeForEvent));
}
}
Loading