Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/

package org.ethereum.beacon.discovery.pipeline;

/**
* Base class for {@link EnvelopeHandler}s that should be skipped once an envelope has been marked
* with {@link Field#BAD_PACKET}. Subclasses implement {@link #handlePacket(Envelope)} and get the
* skip behaviour for free; the terminal bad-packet handler implements {@link EnvelopeHandler}
* directly.
*/
public abstract class AbstractSkippingEnvelopeHandler implements EnvelopeHandler {

@Override
public final void handle(final Envelope envelope) {
if (envelope.contains(Field.BAD_PACKET)) {
return;
}
handlePacket(envelope);
}

protected abstract void handlePacket(Envelope envelope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.ethereum.beacon.discovery.AddressAccessPolicy;
import org.ethereum.beacon.discovery.message.V5Message;
import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.pipeline.Pipeline;
Expand All @@ -27,7 +27,7 @@
import org.ethereum.beacon.discovery.util.Functions;

/** Handles {@link HandshakeMessagePacket} in {@link Field#PACKET_HANDSHAKE} field */
public class HandshakeMessagePacketHandler implements EnvelopeHandler {
public class HandshakeMessagePacketHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(HandshakeMessagePacketHandler.class);
private final Pipeline outgoingPipeline;
private final Scheduler scheduler;
Expand All @@ -49,7 +49,7 @@ public HandshakeMessagePacketHandler(
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.PACKET_HANDSHAKE, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.packet.Packet;
import org.ethereum.beacon.discovery.packet.RawPacket;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.type.Bytes16;
import org.ethereum.beacon.discovery.util.DecodeException;

/** Handles raw BytesValue incoming data in {@link Field#INCOMING} */
public class IncomingDataPacker implements EnvelopeHandler {
public class IncomingDataPacker extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(IncomingDataPacker.class);
public static final int MAX_PACKET_SIZE = 1280;
public static final int MIN_PACKET_SIZE = 63;
Expand All @@ -28,7 +28,7 @@ public IncomingDataPacker(Bytes homeNodeId) {
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.INCOMING, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import org.ethereum.beacon.discovery.message.V5Message;
import org.ethereum.beacon.discovery.message.handler.EnrUpdateTracker.EnrUpdater;
import org.ethereum.beacon.discovery.message.handler.ExternalAddressSelector;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.processor.DiscoveryV5MessageProcessor;
import org.ethereum.beacon.discovery.processor.MessageProcessor;
import org.ethereum.beacon.discovery.schema.NodeSession;
import org.ethereum.beacon.discovery.storage.LocalNodeRecordStore;

public class MessageHandler implements EnvelopeHandler {
public class MessageHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(MessageHandler.class);
private final MessageProcessor messageProcessor;

Expand All @@ -36,7 +36,7 @@ public MessageHandler(
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.MESSAGE, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.ethereum.beacon.discovery.message.V5Message;
import org.ethereum.beacon.discovery.packet.MessagePacket;
import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory;
Expand All @@ -19,7 +19,7 @@
import org.ethereum.beacon.discovery.util.DecryptException;

/** Handles {@link MessagePacket} in {@link Field#PACKET_MESSAGE} field */
public class MessagePacketHandler implements EnvelopeHandler {
public class MessagePacketHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(MessagePacketHandler.class);
private final NodeRecordFactory nodeRecordFactory;

Expand All @@ -28,7 +28,7 @@ public MessagePacketHandler(NodeRecordFactory nodeRecordFactory) {
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.PACKET_MESSAGE, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.pipeline.info.Request;
import org.ethereum.beacon.discovery.schema.NodeSession;

/** Enqueues task in session for any task found in {@link Field#REQUEST} */
public class NewTaskHandler implements EnvelopeHandler {
public class NewTaskHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(NewTaskHandler.class);

@Override
@SuppressWarnings("rawtypes")
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.REQUEST, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.message.V5Message;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.pipeline.Pipeline;
Expand All @@ -23,7 +23,7 @@
import org.ethereum.beacon.discovery.task.TaskStatus;

/** Gets next request task in session and processes it */
public class NextTaskHandler implements EnvelopeHandler {
public class NextTaskHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(NextTaskHandler.class);
private static final int DEFAULT_DELAY_MS = 1000;
private static final int RANDOM_MESSAGE_SIZE = 128;
Expand All @@ -46,7 +46,7 @@ public static void tryToSendAwaitTaskIfAny(
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireSessionWithNodeRecord(envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.crypto.Signer;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.pipeline.Pipeline;
Expand All @@ -37,7 +37,7 @@
* Performs {@link Field#SESSION_LOOKUP} request. Looks up for Node session based on NodeId, which
* should be in request field and stores it in {@link Field#SESSION} field.
*/
public class NodeSessionManager implements EnvelopeHandler {
public class NodeSessionManager extends AbstractSkippingEnvelopeHandler {
private static final int SESSION_CLEANUP_DELAY_SECONDS = 180;
private static final int REQUEST_CLEANUP_DELAY_SECONDS = 60;
private static final Logger LOG = LogManager.getLogger(NodeSessionManager.class);
Expand Down Expand Up @@ -67,7 +67,7 @@ public NodeSessionManager(
}

@Override
public void handle(final Envelope envelope) {
protected void handlePacket(final Envelope envelope) {
if (!HandlerUtil.requireField(Field.SESSION_LOOKUP, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;

/**
* Searches for node in {@link Field#NODE} and requests session resolving using {@link
* Field#SESSION_LOOKUP}
*/
public class NodeSessionRequestHandler implements EnvelopeHandler {
public class NodeSessionRequestHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(NodeSessionRequestHandler.class);

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.NODE, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.AddressAccessPolicy;
import org.ethereum.beacon.discovery.network.NetworkParcel;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import reactor.core.publisher.FluxSink;
Expand All @@ -19,7 +19,7 @@
* we have outgoing parcel at the very first stage. Handler pushes it to `outgoingSink` stream which
* is linked with discovery client.
*/
public class OutgoingParcelHandler implements EnvelopeHandler {
public class OutgoingParcelHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(OutgoingParcelHandler.class);

private final FluxSink<NetworkParcel> outgoingSink;
Expand All @@ -32,7 +32,7 @@ public OutgoingParcelHandler(
}

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.INCOMING, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket;
import org.ethereum.beacon.discovery.packet.Packet;
import org.ethereum.beacon.discovery.packet.WhoAreYouPacket;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.schema.NodeSession;
import org.ethereum.beacon.discovery.util.Utils;

/** Matches the current session state and inbound packet */
public class PacketDispatcherHandler implements EnvelopeHandler {
public class PacketDispatcherHandler extends AbstractSkippingEnvelopeHandler {

private static final Logger LOG = LogManager.getLogger(PacketDispatcherHandler.class);

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.SESSION, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.AddressAccessPolicy;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;

public class PacketSourceFilter implements EnvelopeHandler {
public class PacketSourceFilter extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(PacketSourceFilter.class);

private final AddressAccessPolicy addressAccessPolicy;
Expand All @@ -23,7 +23,7 @@ public PacketSourceFilter(final AddressAccessPolicy addressAccessPolicy) {
}

@Override
public void handle(final Envelope envelope) {
protected void handlePacket(final Envelope envelope) {
if (!HandlerUtil.requireField(Field.REMOTE_SENDER, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket;
import org.ethereum.beacon.discovery.packet.WhoAreYouPacket;
import org.ethereum.beacon.discovery.packet.WhoAreYouPacket.WhoAreYouAuthData;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand All @@ -22,12 +22,12 @@
import org.ethereum.beacon.discovery.type.Bytes16;
import org.ethereum.beacon.discovery.util.Functions;

public class UnauthorizedMessagePacketHandler implements EnvelopeHandler {
public class UnauthorizedMessagePacketHandler extends AbstractSkippingEnvelopeHandler {

private static final Logger LOG = LogManager.getLogger(UnauthorizedMessagePacketHandler.class);

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.UNAUTHORIZED_PACKET_MESSAGE, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket;
import org.ethereum.beacon.discovery.packet.OrdinaryMessagePacket;
import org.ethereum.beacon.discovery.packet.Packet;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;

Expand All @@ -20,11 +20,11 @@
* field of the packet. Next, puts it to the {@link Field#SESSION_LOOKUP} so sender session could be
* resolved by another handler.
*/
public class UnknownPacketTagToSender implements EnvelopeHandler {
public class UnknownPacketTagToSender extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(UnknownPacketTagToSender.class);

@Override
public void handle(Envelope envelope) {
protected void handlePacket(Envelope envelope) {
if (!HandlerUtil.requireField(Field.PACKET, envelope)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.ethereum.beacon.discovery.packet.HandshakeMessagePacket.HandshakeAuthData;
import org.ethereum.beacon.discovery.packet.Header;
import org.ethereum.beacon.discovery.packet.WhoAreYouPacket;
import org.ethereum.beacon.discovery.pipeline.AbstractSkippingEnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.EnvelopeHandler;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.HandlerUtil;
import org.ethereum.beacon.discovery.pipeline.Pipeline;
Expand All @@ -31,7 +31,7 @@
import org.ethereum.beacon.discovery.util.Functions;

/** Handles {@link WhoAreYouPacket} in {@link Field#PACKET_WHOAREYOU} field */
public class WhoAreYouPacketHandler implements EnvelopeHandler {
public class WhoAreYouPacketHandler extends AbstractSkippingEnvelopeHandler {
private static final Logger LOG = LogManager.getLogger(WhoAreYouPacketHandler.class);

private final Pipeline outgoingPipeline;
Expand All @@ -43,7 +43,7 @@ public WhoAreYouPacketHandler(final Pipeline outgoingPipeline, final Scheduler s
}

@Override
public void handle(final Envelope envelope) {
protected void handlePacket(final Envelope envelope) {
if (!HandlerUtil.requireSessionWithNodeRecord(envelope)) {
return;
}
Expand Down
Loading
Loading