From 56279d64564e55bffea9fe6a411a5c339abe847b Mon Sep 17 00:00:00 2001 From: dParikesit Date: Sat, 14 Mar 2026 10:08:24 -0400 Subject: [PATCH 1/3] add test --- .../federation/fairness/TestRouterHandlersFairness.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java index 34d33ac5cc632..706796d8c753a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java @@ -215,19 +215,20 @@ public void testReleasedWhenExceptionOccurs( // Use renewLease test invokeConcurrent. Collection locations = new ArrayList<>(); locations.add(new RemoteLocation("ns0", "/", "/")); + locations.add(new RemoteLocation("ns1", "/", "/")); RemoteMethod renewLease = new RemoteMethod( "renewLease", new Class[]{java.lang.String.class, java.util.List.class}, new Object[]{null, null}); availablePermits = - rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"); + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits(CONCURRENT_NS); LambdaTestUtils.intercept(IOException.class, () -> { LOG.info("Use renewLease test invokeConcurrent."); rpcClient.invokeConcurrent(locations, renewLease); }); - // Ensure that the semaphore is not acquired. + // Ensure that the concurrent semaphore is released. assertEquals(availablePermits, - rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0")); + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits(CONCURRENT_NS)); } /** From 8d5f9192299f522321a92665f01414d42a36d41e Mon Sep 17 00:00:00 2001 From: dParikesit Date: Sat, 14 Mar 2026 10:24:00 -0400 Subject: [PATCH 2/3] fix invokeConcurrent --- .../federation/router/RouterRpcClient.java | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index a0c67958b053e..3caced2ae6cd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1599,51 +1599,55 @@ protected static Map postProcessResul // transfer originCall & callerContext to worker threads of executor. final Call originCall = Server.getCurCall().get(); final CallerContext originContext = CallerContext.getCurrent(); - for (final T location : locations) { - String nsId = location.getNameserviceId(); - boolean isObserverRead = isObserverReadEligible(nsId, m); - final List namenodes = - getOrderedNamenodes(nsId, isObserverRead); - final Class proto = method.getProtocol(); - final Object[] paramList = method.getParams(location); - if (standby) { - // Call the objectGetter to all NNs (including standby) - for (final FederationNamenodeContext nn : namenodes) { - String nnId = nn.getNamenodeId(); - final List nnList = - Collections.singletonList(nn); - T nnLocation = location; - if (location instanceof RemoteLocation) { - nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + try{ + for (final T location : locations) { + String nsId = location.getNameserviceId(); + boolean isObserverRead = isObserverReadEligible(nsId, m); + final List namenodes = + getOrderedNamenodes(nsId, isObserverRead); + final Class proto = method.getProtocol(); + final Object[] paramList = method.getParams(location); + if (standby) { + // Call the objectGetter to all NNs (including standby) + for (final FederationNamenodeContext nn : namenodes) { + String nnId = nn.getNamenodeId(); + final List nnList = + Collections.singletonList(nn); + T nnLocation = location; + if (location instanceof RemoteLocation) { + nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + } + orderedLocations.add(nnLocation); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod( + ugi, nnList, isObserverRead, proto, m, paramList); + }); } - orderedLocations.add(nnLocation); + } else { + // Call the objectGetter in order of nameservices in the NS list + orderedLocations.add(location); callables.add( () -> { transferThreadLocalContext(originCall, originContext); return invokeMethod( - ugi, nnList, isObserverRead, proto, m, paramList); + ugi, namenodes, isObserverRead, proto, m, paramList); }); } - } else { - // Call the objectGetter in order of nameservices in the NS list - orderedLocations.add(location); - callables.add( - () -> { - transferThreadLocalContext(originCall, originContext); - return invokeMethod( - ugi, namenodes, isObserverRead, proto, m, paramList); - }); } - } - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - if (this.router.getRouterClientMetrics() != null) { - this.router.getRouterClientMetrics().incInvokedConcurrent(m); - } + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + if (this.router.getRouterClientMetrics() != null) { + this.router.getRouterClientMetrics().incInvokedConcurrent(m); + } - return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); + return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); + } finally { + releasePermit(CONCURRENT_NS, ugi, method, controller); + } } /** From 912178b7e372ff2726e5d30818f1d4f5ed83ec28 Mon Sep 17 00:00:00 2001 From: dParikesit Date: Thu, 19 Mar 2026 11:00:55 -0400 Subject: [PATCH 3/3] move getRemoteResults outside the try-finally block --- .../hdfs/server/federation/router/RouterRpcClient.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3caced2ae6cd9..674226c300455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1644,10 +1644,12 @@ protected static Map postProcessResul this.router.getRouterClientMetrics().incInvokedConcurrent(m); } - return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); - } finally { + } catch (IOException e) { releasePermit(CONCURRENT_NS, ugi, method, controller); + throw e; } + + return getRemoteResults(method, timeOutMs, controller, orderedLocations, callables); } /**