Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p
return;
}

if (isLocalTransfer(src)) {
if (isExcludedTransfer(src, dst)) {
return;
}

Expand Down Expand Up @@ -126,7 +126,7 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage
return;
}

if (isLocalTransfer(src)) {
if (isExcludedTransfer(src, dst)) {
return;
}

Expand Down Expand Up @@ -254,7 +254,7 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) {
try {
int transferTag = Integer.parseInt(protocolInfo.getTransferTag());
if (transferTag <= 64 || transferTag >= 65536) {
if (transferTag < 64 || transferTag > 65535) {
LOGGER.warn("Invalid integer range for transfer tag: {}", protocolInfo.getTransferTag());
return OptionalInt.empty();
}
Expand All @@ -271,14 +271,27 @@ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject)
return OptionalInt.empty();
}

return voToExpId.containsKey(vo.getGroup().toLowerCase())
? OptionalInt.of(voToExpId.get(vo.getGroup().toLowerCase()))
// FQAN.getGroup() returns paths like "/atlas" or "/atlas/usatlas".
// Strip the leading slash and take only the first path component to
// get the plain VO name (e.g. "atlas") that matches the vo-mapping keys.
String groupPath = vo.getGroup().toLowerCase();
if (groupPath.startsWith("/")) {
groupPath = groupPath.substring(1);
}
int subgroupSlash = groupPath.indexOf('/');
String voName = subgroupSlash != -1 ? groupPath.substring(0, subgroupSlash) : groupPath;

return voToExpId.containsKey(voName)
? OptionalInt.of(voToExpId.get(voName))
: OptionalInt.empty();
}

private boolean isLocalTransfer(InetSocketAddress dst) {
InetAddress addr = dst.getAddress();
return localSubnet.test(addr);
private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) {
InetAddress srcAddress = src.getAddress();
InetAddress dstAddress = dst.getAddress();
return srcAddress != null && dstAddress != null
&& localSubnet.test(srcAddress)
&& localSubnet.test(dstAddress);
}

private int getActivity(ProtocolInfo protocolInfo) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.dcache.pool.movers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import diskCacheV111.vehicles.ProtocolInfo;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.lang.reflect.Method;
import java.util.OptionalInt;
import javax.security.auth.Subject;
import org.dcache.auth.FQANPrincipal;
import org.junit.Before;
import org.junit.Test;

public class TransferLifeCycleTest {

private Method getExperimentId;
private TransferLifeCycle transferLifeCycle;

@Before
public void setup() throws Exception {
transferLifeCycle = new TransferLifeCycle();
transferLifeCycle.setVoMapping("atlas:2,cms:3");

getExperimentId = TransferLifeCycle.class.getDeclaredMethod(
"getExperimentId", ProtocolInfo.class, Subject.class);
getExperimentId.setAccessible(true);
}

@Test
public void shouldAcceptMinimumValidSciTagValue() throws Exception {
OptionalInt experimentId = resolveExperimentId("64", new Subject());

assertTrue(experimentId.isPresent());
assertEquals(1, experimentId.getAsInt());
}

@Test
public void shouldRejectSciTagValueBelowValidRange() throws Exception {
OptionalInt experimentId = resolveExperimentId("63", new Subject());

assertFalse(experimentId.isPresent());
}

@Test
public void shouldMapSlashPrefixedFqanToVoName() throws Exception {
Subject subject = new Subject();
subject.getPrincipals().add(new FQANPrincipal("/atlas/usatlas", true));

OptionalInt experimentId = resolveExperimentId("", subject);

assertTrue(experimentId.isPresent());
assertEquals(2, experimentId.getAsInt());
}

@Test
public void shouldSuppressMarkerWhenBothEndpointsAreExcluded() throws Exception {
assertFalse(sendsStartMarker("10.10.10.10", "10.20.20.20", "10.0.0.0/8"));
}

@Test
public void shouldNotSuppressMarkerWhenOnlySourceIsExcluded() throws Exception {
assertTrue(sendsStartMarker("10.10.10.10", "203.0.113.20", "10.0.0.0/8"));
}

@Test
public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Exception {
assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8"));
}

private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception {
return (OptionalInt) getExperimentId.invoke(transferLifeCycle,
new TestProtocolInfo("xrootd", transferTag), subject);
}

private boolean sendsStartMarker(String srcIp, String dstIp, String excludes) throws Exception {
try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) {
socket.setSoTimeout(700);

TransferLifeCycle lifecycle = new TransferLifeCycle();
lifecycle.setEnabled(true);
lifecycle.setVoMapping("atlas:2");
lifecycle.setExcludes(new String[]{excludes});
lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort());

lifecycle.onStart(
new InetSocketAddress(srcIp, 40000),
new InetSocketAddress(dstIp, 20066),
new TestProtocolInfo("xrootd", "129"),
new Subject());

var packet = new DatagramPacket(new byte[4096], 4096);
try {
socket.receive(packet);
return true;
} catch (SocketTimeoutException ignored) {
return false;
}
}
}

private static class TestProtocolInfo implements ProtocolInfo {

private static final long serialVersionUID = 1L;
private final String protocol;
private final String transferTag;

private TestProtocolInfo(String protocol, String transferTag) {
this.protocol = protocol;
this.transferTag = transferTag;
}

@Override
public String getProtocol() {
return protocol;
}

@Override
public int getMinorVersion() {
return 0;
}

@Override
public int getMajorVersion() {
return 0;
}

@Override
public String getVersionString() {
return "test";
}

@Override
public String getTransferTag() {
return transferTag;
}
}
}