Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1599,48 +1599,54 @@ protected static <T extends RemoteLocationContext, R> Map<T, R> postProcessResul
// transfer originCall & callerContext to worker threads of executor.
final Call originCall = Server.getCurCall().get();
final CallerContext originContext = CallerContext.getCurrent();
for (final T location : locations) {
String nsId = location.getNameserviceId();
boolean isObserverRead = isObserverReadEligible(nsId, m);
final List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(nsId, isObserverRead);
final Class<?> proto = method.getProtocol();
final Object[] paramList = method.getParams(location);
if (standby) {
// Call the objectGetter to all NNs (including standby)
for (final FederationNamenodeContext nn : namenodes) {
String nnId = nn.getNamenodeId();
final List<FederationNamenodeContext> nnList =
Collections.singletonList(nn);
T nnLocation = location;
if (location instanceof RemoteLocation) {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
try{
for (final T location : locations) {
String nsId = location.getNameserviceId();
boolean isObserverRead = isObserverReadEligible(nsId, m);
final List<? extends FederationNamenodeContext> namenodes =
getOrderedNamenodes(nsId, isObserverRead);
final Class<?> proto = method.getProtocol();
final Object[] paramList = method.getParams(location);
if (standby) {
// Call the objectGetter to all NNs (including standby)
for (final FederationNamenodeContext nn : namenodes) {
String nnId = nn.getNamenodeId();
final List<FederationNamenodeContext> nnList =
Collections.singletonList(nn);
T nnLocation = location;
if (location instanceof RemoteLocation) {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
}
orderedLocations.add(nnLocation);
callables.add(
() -> {
transferThreadLocalContext(originCall, originContext);
return invokeMethod(
ugi, nnList, isObserverRead, proto, m, paramList);
});
}
orderedLocations.add(nnLocation);
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
callables.add(
() -> {
transferThreadLocalContext(originCall, originContext);
return invokeMethod(
ugi, nnList, isObserverRead, proto, m, paramList);
ugi, namenodes, isObserverRead, proto, m, paramList);
});
}
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
callables.add(
() -> {
transferThreadLocalContext(originCall, originContext);
return invokeMethod(
ugi, namenodes, isObserverRead, proto, m, paramList);
});
}
}

if (rpcMonitor != null) {
rpcMonitor.proxyOp();
}
if (this.router.getRouterClientMetrics() != null) {
this.router.getRouterClientMetrics().incInvokedConcurrent(m);
if (rpcMonitor != null) {
rpcMonitor.proxyOp();
}
if (this.router.getRouterClientMetrics() != null) {
this.router.getRouterClientMetrics().incInvokedConcurrent(m);
}

} catch (IOException e) {
releasePermit(CONCURRENT_NS, ugi, method, controller);
throw e;
}

return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,20 @@ public void testReleasedWhenExceptionOccurs(
// Use renewLease test invokeConcurrent.
Collection<RemoteLocation> locations = new ArrayList<>();
locations.add(new RemoteLocation("ns0", "/", "/"));
locations.add(new RemoteLocation("ns1", "/", "/"));
RemoteMethod renewLease = new RemoteMethod(
"renewLease",
new Class[]{java.lang.String.class, java.util.List.class},
new Object[]{null, null});
availablePermits =
rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits(CONCURRENT_NS);
LambdaTestUtils.intercept(IOException.class, () -> {
LOG.info("Use renewLease test invokeConcurrent.");
rpcClient.invokeConcurrent(locations, renewLease);
});
// Ensure that the semaphore is not acquired.
// Ensure that the concurrent semaphore is released.
assertEquals(availablePermits,
rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits(CONCURRENT_NS));
}

/**
Expand Down
Loading