diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandler.java new file mode 100644 index 000000000..8dffe0301 --- /dev/null +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandler.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.ethereum.beacon.discovery.pipeline; + +/** + * Base class for {@link EnvelopeHandler}s that should be skipped once an envelope has been marked + * with {@link Field#BAD_PACKET}. Subclasses implement {@link #handlePacket(Envelope)} and get the + * skip behaviour for free; the terminal bad-packet handler implements {@link EnvelopeHandler} + * directly. + */ +public abstract class AbstractSkippingEnvelopeHandler implements EnvelopeHandler { + + @Override + public final void handle(final Envelope envelope) { + if (envelope.contains(Field.BAD_PACKET)) { + return; + } + handlePacket(envelope); + } + + protected abstract void handlePacket(Envelope envelope); +} diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java index 4669d464b..311ff7c16 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java @@ -13,8 +13,8 @@ import org.ethereum.beacon.discovery.AddressAccessPolicy; import org.ethereum.beacon.discovery.message.V5Message; import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.pipeline.Pipeline; @@ -27,7 +27,7 @@ import org.ethereum.beacon.discovery.util.Functions; /** Handles {@link HandshakeMessagePacket} in {@link Field#PACKET_HANDSHAKE} field */ -public class HandshakeMessagePacketHandler implements EnvelopeHandler { +public class HandshakeMessagePacketHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(HandshakeMessagePacketHandler.class); private final Pipeline outgoingPipeline; private final Scheduler scheduler; @@ -49,7 +49,7 @@ public HandshakeMessagePacketHandler( } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.PACKET_HANDSHAKE, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/IncomingDataPacker.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/IncomingDataPacker.java index f28ea8746..2d93f32f2 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/IncomingDataPacker.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/IncomingDataPacker.java @@ -9,15 +9,15 @@ import org.apache.tuweni.bytes.Bytes; import org.ethereum.beacon.discovery.packet.Packet; import org.ethereum.beacon.discovery.packet.RawPacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.type.Bytes16; import org.ethereum.beacon.discovery.util.DecodeException; /** Handles raw BytesValue incoming data in {@link Field#INCOMING} */ -public class IncomingDataPacker implements EnvelopeHandler { +public class IncomingDataPacker extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(IncomingDataPacker.class); public static final int MAX_PACKET_SIZE = 1280; public static final int MIN_PACKET_SIZE = 63; @@ -28,7 +28,7 @@ public IncomingDataPacker(Bytes homeNodeId) { } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.INCOMING, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessageHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessageHandler.java index 9c2074d78..b5b1da94b 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessageHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessageHandler.java @@ -10,8 +10,8 @@ import org.ethereum.beacon.discovery.message.V5Message; import org.ethereum.beacon.discovery.message.handler.EnrUpdateTracker.EnrUpdater; import org.ethereum.beacon.discovery.message.handler.ExternalAddressSelector; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.processor.DiscoveryV5MessageProcessor; @@ -19,7 +19,7 @@ import org.ethereum.beacon.discovery.schema.NodeSession; import org.ethereum.beacon.discovery.storage.LocalNodeRecordStore; -public class MessageHandler implements EnvelopeHandler { +public class MessageHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(MessageHandler.class); private final MessageProcessor messageProcessor; @@ -36,7 +36,7 @@ public MessageHandler( } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.MESSAGE, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessagePacketHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessagePacketHandler.java index dbcf0739c..3d7d7a823 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessagePacketHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/MessagePacketHandler.java @@ -9,8 +9,8 @@ import org.ethereum.beacon.discovery.message.V5Message; import org.ethereum.beacon.discovery.packet.MessagePacket; import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.schema.NodeRecordFactory; @@ -19,7 +19,7 @@ import org.ethereum.beacon.discovery.util.DecryptException; /** Handles {@link MessagePacket} in {@link Field#PACKET_MESSAGE} field */ -public class MessagePacketHandler implements EnvelopeHandler { +public class MessagePacketHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(MessagePacketHandler.class); private final NodeRecordFactory nodeRecordFactory; @@ -28,7 +28,7 @@ public MessagePacketHandler(NodeRecordFactory nodeRecordFactory) { } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.PACKET_MESSAGE, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NewTaskHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NewTaskHandler.java index 91357e8d4..8d8d9f19f 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NewTaskHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NewTaskHandler.java @@ -6,20 +6,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.pipeline.info.Request; import org.ethereum.beacon.discovery.schema.NodeSession; /** Enqueues task in session for any task found in {@link Field#REQUEST} */ -public class NewTaskHandler implements EnvelopeHandler { +public class NewTaskHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(NewTaskHandler.class); @Override @SuppressWarnings("rawtypes") - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.REQUEST, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NextTaskHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NextTaskHandler.java index 041599713..1559cebe0 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NextTaskHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NextTaskHandler.java @@ -11,8 +11,8 @@ import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; import org.ethereum.beacon.discovery.message.V5Message; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.pipeline.Pipeline; @@ -23,7 +23,7 @@ import org.ethereum.beacon.discovery.task.TaskStatus; /** Gets next request task in session and processes it */ -public class NextTaskHandler implements EnvelopeHandler { +public class NextTaskHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(NextTaskHandler.class); private static final int DEFAULT_DELAY_MS = 1000; private static final int RANDOM_MESSAGE_SIZE = 128; @@ -46,7 +46,7 @@ public static void tryToSendAwaitTaskIfAny( } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireSessionWithNodeRecord(envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionManager.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionManager.java index 10668d4c5..3bab0a90b 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionManager.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionManager.java @@ -19,8 +19,8 @@ import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; import org.ethereum.beacon.discovery.crypto.Signer; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.pipeline.Pipeline; @@ -37,7 +37,7 @@ * Performs {@link Field#SESSION_LOOKUP} request. Looks up for Node session based on NodeId, which * should be in request field and stores it in {@link Field#SESSION} field. */ -public class NodeSessionManager implements EnvelopeHandler { +public class NodeSessionManager extends AbstractSkippingEnvelopeHandler { private static final int SESSION_CLEANUP_DELAY_SECONDS = 180; private static final int REQUEST_CLEANUP_DELAY_SECONDS = 60; private static final Logger LOG = LogManager.getLogger(NodeSessionManager.class); @@ -67,7 +67,7 @@ public NodeSessionManager( } @Override - public void handle(final Envelope envelope) { + protected void handlePacket(final Envelope envelope) { if (!HandlerUtil.requireField(Field.SESSION_LOOKUP, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionRequestHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionRequestHandler.java index 6effc0064..1b9bcc595 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionRequestHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/NodeSessionRequestHandler.java @@ -6,8 +6,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; @@ -15,11 +15,11 @@ * Searches for node in {@link Field#NODE} and requests session resolving using {@link * Field#SESSION_LOOKUP} */ -public class NodeSessionRequestHandler implements EnvelopeHandler { +public class NodeSessionRequestHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(NodeSessionRequestHandler.class); @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.NODE, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/OutgoingParcelHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/OutgoingParcelHandler.java index 649e7e505..dbaaa1f6e 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/OutgoingParcelHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/OutgoingParcelHandler.java @@ -8,8 +8,8 @@ import org.apache.logging.log4j.Logger; import org.ethereum.beacon.discovery.AddressAccessPolicy; import org.ethereum.beacon.discovery.network.NetworkParcel; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import reactor.core.publisher.FluxSink; @@ -19,7 +19,7 @@ * we have outgoing parcel at the very first stage. Handler pushes it to `outgoingSink` stream which * is linked with discovery client. */ -public class OutgoingParcelHandler implements EnvelopeHandler { +public class OutgoingParcelHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(OutgoingParcelHandler.class); private final FluxSink outgoingSink; @@ -32,7 +32,7 @@ public OutgoingParcelHandler( } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.INCOMING, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketDispatcherHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketDispatcherHandler.java index 576b4c691..4ecd0f4a0 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketDispatcherHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketDispatcherHandler.java @@ -11,20 +11,20 @@ import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket; import org.ethereum.beacon.discovery.packet.Packet; import org.ethereum.beacon.discovery.packet.WhoAreYouPacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.schema.NodeSession; import org.ethereum.beacon.discovery.util.Utils; /** Matches the current session state and inbound packet */ -public class PacketDispatcherHandler implements EnvelopeHandler { +public class PacketDispatcherHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(PacketDispatcherHandler.class); @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.SESSION, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketSourceFilter.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketSourceFilter.java index 5493bb5b2..039d4d95c 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketSourceFilter.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/PacketSourceFilter.java @@ -8,12 +8,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.ethereum.beacon.discovery.AddressAccessPolicy; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; -public class PacketSourceFilter implements EnvelopeHandler { +public class PacketSourceFilter extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(PacketSourceFilter.class); private final AddressAccessPolicy addressAccessPolicy; @@ -23,7 +23,7 @@ public PacketSourceFilter(final AddressAccessPolicy addressAccessPolicy) { } @Override - public void handle(final Envelope envelope) { + protected void handlePacket(final Envelope envelope) { if (!HandlerUtil.requireField(Field.REMOTE_SENDER, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnauthorizedMessagePacketHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnauthorizedMessagePacketHandler.java index b314639f1..3ed35eb4b 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnauthorizedMessagePacketHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnauthorizedMessagePacketHandler.java @@ -11,8 +11,8 @@ import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket; import org.ethereum.beacon.discovery.packet.WhoAreYouPacket; import org.ethereum.beacon.discovery.packet.WhoAreYouPacket.WhoAreYouAuthData; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.schema.NodeRecord; @@ -22,12 +22,12 @@ import org.ethereum.beacon.discovery.type.Bytes16; import org.ethereum.beacon.discovery.util.Functions; -public class UnauthorizedMessagePacketHandler implements EnvelopeHandler { +public class UnauthorizedMessagePacketHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(UnauthorizedMessagePacketHandler.class); @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.UNAUTHORIZED_PACKET_MESSAGE, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnknownPacketTagToSender.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnknownPacketTagToSender.java index c82ac9a8c..5ad9b5944 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnknownPacketTagToSender.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/UnknownPacketTagToSender.java @@ -10,8 +10,8 @@ import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket; import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket; import org.ethereum.beacon.discovery.packet.Packet; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; @@ -20,11 +20,11 @@ * field of the packet. Next, puts it to the {@link Field#SESSION_LOOKUP} so sender session could be * resolved by another handler. */ -public class UnknownPacketTagToSender implements EnvelopeHandler { +public class UnknownPacketTagToSender extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(UnknownPacketTagToSender.class); @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.PACKET, envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouPacketHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouPacketHandler.java index 919af14fb..d8ec26a2e 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouPacketHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouPacketHandler.java @@ -15,8 +15,8 @@ import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket.HandshakeAuthData; import org.ethereum.beacon.discovery.packet.Header; import org.ethereum.beacon.discovery.packet.WhoAreYouPacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.pipeline.Pipeline; @@ -31,7 +31,7 @@ import org.ethereum.beacon.discovery.util.Functions; /** Handles {@link WhoAreYouPacket} in {@link Field#PACKET_WHOAREYOU} field */ -public class WhoAreYouPacketHandler implements EnvelopeHandler { +public class WhoAreYouPacketHandler extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(WhoAreYouPacketHandler.class); private final Pipeline outgoingPipeline; @@ -43,7 +43,7 @@ public WhoAreYouPacketHandler(final Pipeline outgoingPipeline, final Scheduler s } @Override - public void handle(final Envelope envelope) { + protected void handlePacket(final Envelope envelope) { if (!HandlerUtil.requireSessionWithNodeRecord(envelope)) { return; } diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouSessionResolver.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouSessionResolver.java index 64424b497..b6efa731a 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouSessionResolver.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/WhoAreYouSessionResolver.java @@ -9,8 +9,8 @@ import org.apache.logging.log4j.Logger; import org.ethereum.beacon.discovery.packet.Packet; import org.ethereum.beacon.discovery.packet.WhoAreYouPacket; +import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Envelope; -import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler; import org.ethereum.beacon.discovery.pipeline.Field; import org.ethereum.beacon.discovery.pipeline.HandlerUtil; import org.ethereum.beacon.discovery.schema.NodeSession; @@ -19,7 +19,7 @@ * Resolves session using `nonceRepository` for `WHOAREYOU` packets which should be placed in {@link * Field#PACKET_WHOAREYOU} */ -public class WhoAreYouSessionResolver implements EnvelopeHandler { +public class WhoAreYouSessionResolver extends AbstractSkippingEnvelopeHandler { private static final Logger LOG = LogManager.getLogger(WhoAreYouSessionResolver.class); private final NodeSessionManager nodeSessionManager; @@ -28,7 +28,7 @@ public WhoAreYouSessionResolver(NodeSessionManager nodeSessionManager) { } @Override - public void handle(Envelope envelope) { + protected void handlePacket(Envelope envelope) { if (!HandlerUtil.requireField(Field.PACKET, envelope)) { return; } diff --git a/src/test/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandlerTest.java b/src/test/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandlerTest.java new file mode 100644 index 000000000..bcd57866e --- /dev/null +++ b/src/test/java/org/ethereum/beacon/discovery/pipeline/AbstractSkippingEnvelopeHandlerTest.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.ethereum.beacon.discovery.pipeline; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class AbstractSkippingEnvelopeHandlerTest { + + private static final Field MARKER = new Field<>("MARKER"); + + private static class CountingHandler extends AbstractSkippingEnvelopeHandler { + final AtomicInteger invocations = new AtomicInteger(); + + @Override + protected void handlePacket(final Envelope envelope) { + invocations.incrementAndGet(); + } + } + + @Test + void downstreamHandlersSkipEnvelopesMarkedAsBadPacket() { + final EnvelopeHandler markBad = + envelope -> { + envelope.put(Field.BAD_PACKET, new Object()); + envelope.put(Field.BAD_EXCEPTION, new RuntimeException("bad")); + }; + final CountingHandler downstream = new CountingHandler(); + final AtomicInteger terminalInvocations = new AtomicInteger(); + // Terminal handler implements EnvelopeHandler directly, so it still sees bad packets. + final EnvelopeHandler terminal = envelope -> terminalInvocations.incrementAndGet(); + + final Pipeline pipeline = + new PipelineImpl().addHandler(markBad).addHandler(downstream).addHandler(terminal).build(); + + pipeline.push(new Envelope()); + + assertThat(downstream.invocations).hasValue(0); + assertThat(terminalInvocations).hasValue(1); + } + + @Test + void healthyEnvelopesReachDownstreamSkippingHandlers() { + final EnvelopeHandler markClean = envelope -> envelope.put(MARKER, "ok"); + final CountingHandler downstream = + new CountingHandler() { + @Override + protected void handlePacket(final Envelope envelope) { + assertThat(envelope.get(MARKER)).isEqualTo("ok"); + super.handlePacket(envelope); + } + }; + + final Pipeline pipeline = + new PipelineImpl().addHandler(markClean).addHandler(downstream).build(); + + pipeline.push(new Envelope()); + + assertThat(downstream.invocations).hasValue(1); + } + + @Test + void handlerChainStopsAtFirstBadMarking() { + final CountingHandler first = new CountingHandler(); + final EnvelopeHandler markBadMidway = envelope -> envelope.put(Field.BAD_PACKET, new Object()); + final CountingHandler last = new CountingHandler(); + + final Pipeline pipeline = + new PipelineImpl().addHandler(first).addHandler(markBadMidway).addHandler(last).build(); + + pipeline.push(new Envelope()); + + assertThat(first.invocations).hasValue(1); + assertThat(last.invocations).hasValue(0); + } +}