diff --git a/eureka-core/src/main/java/com/netflix/eureka/cluster/PeerEurekaNodes.java b/eureka-core/src/main/java/com/netflix/eureka/cluster/PeerEurekaNodes.java index 0f221ec747..577b086abc 100644 --- a/eureka-core/src/main/java/com/netflix/eureka/cluster/PeerEurekaNodes.java +++ b/eureka-core/src/main/java/com/netflix/eureka/cluster/PeerEurekaNodes.java @@ -1,19 +1,5 @@ package com.netflix.eureka.cluster; -import javax.inject.Inject; -import javax.inject.Singleton; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - import com.netflix.appinfo.ApplicationInfoManager; import com.netflix.appinfo.InstanceInfo; import com.netflix.discovery.EurekaClientConfig; @@ -25,6 +11,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; +import javax.inject.Singleton; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + /** * Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s. * @@ -67,34 +63,28 @@ public List getPeerNodesView() { public List getPeerEurekaNodes() { return peerEurekaNodes; } - + public int getMinNumberOfAvailablePeers() { return serverConfig.getHealthStatusMinNumberOfAvailablePeers(); } public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); - thread.setDaemon(true); - return thread; - } + r -> { + Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); + thread.setDaemon(true); + return thread; } ); try { updatePeerEurekaNodes(resolvePeerUrls()); - Runnable peersUpdateTask = new Runnable() { - @Override - public void run() { - try { - updatePeerEurekaNodes(resolvePeerUrls()); - } catch (Throwable e) { - logger.error("Cannot update the replica Nodes", e); - } - + Runnable peersUpdateTask = () -> { + try { + updatePeerEurekaNodes(resolvePeerUrls()); + } catch (Throwable e) { + logger.error("Cannot update the replica Nodes", e); } + }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, @@ -117,9 +107,7 @@ public void shutdown() { this.peerEurekaNodes = Collections.emptyList(); this.peerEurekaNodeUrls = Collections.emptySet(); - for (PeerEurekaNode node : toRemove) { - node.shutDown(); - } + toRemove.forEach(PeerEurekaNode::shutDown); } /** @@ -157,9 +145,9 @@ protected void updatePeerEurekaNodes(List newPeerUrls) { } Set toShutdown = new HashSet<>(peerEurekaNodeUrls); - toShutdown.removeAll(newPeerUrls); + newPeerUrls.forEach(toShutdown::remove); Set toAdd = new HashSet<>(newPeerUrls); - toAdd.removeAll(peerEurekaNodeUrls); + peerEurekaNodeUrls.forEach(toAdd::remove); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; @@ -169,7 +157,10 @@ protected void updatePeerEurekaNodes(List newPeerUrls) { List newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { - logger.info("Removing no longer available peer nodes {}", toShutdown); + logger.info( + "Removing no longer available peer nodes {}", + toShutdown.stream().map(this::removePasswordFromPeerUrl).collect(Collectors.toSet()) + ); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); @@ -184,16 +175,40 @@ protected void updatePeerEurekaNodes(List newPeerUrls) { // Add new peers if (!toAdd.isEmpty()) { - logger.info("Adding new peer nodes {}", toAdd); - for (String peerUrl : toAdd) { - newNodeList.add(createPeerEurekaNode(peerUrl)); - } + logger.info( + "Adding new peer nodes {}", + toAdd.stream().map(this::removePasswordFromPeerUrl).collect(Collectors.toSet()) + ); + toAdd.stream().map(this::createPeerEurekaNode).forEach(newNodeList::add); } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); } + /** + * If basic http authorization is used in the url, replace the password with 'PASSWORD', making it safe to log. + */ + private String removePasswordFromPeerUrl(String url) { + URI uri; + try { + uri = new URI(url); + } catch (URISyntaxException e) { + logger.warn("Cannot parse peer URI {}", url, e); + return null; + } + + String userInfo = uri.getUserInfo(); + if (userInfo != null && userInfo.contains(":")) { + String[] userInfoParts = userInfo.split(":"); + if (userInfoParts.length == 2) { + String sanitizedUserInfo = userInfoParts[0] + ":PASSWORD"; + return url.replace(userInfo, sanitizedUserInfo); + } + } + return url; + } + protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) { HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl); String targetHost = hostFromUrl(peerEurekaNodeUrl); @@ -204,16 +219,15 @@ protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) { } /** + * @param url the service url of the replica node that the check is made. + * @return true, if the url represents the current node which is trying to + * replicate, false otherwise. * @deprecated 2016-06-27 use instance version of {@link #isThisMyUrl(String)} - * + *

* Checks if the given service url contains the current host which is trying * to replicate. Only after the EIP binding is done the host has a chance to * identify itself in the list of replica nodes and needs to take itself out * of replication traffic. - * - * @param url the service url of the replica node that the check is made. - * @return true, if the url represents the current node which is trying to - * replicate, false otherwise. */ public static boolean isThisMe(String url) { InstanceInfo myInfo = ApplicationInfoManager.getInstance().getInfo(); @@ -229,7 +243,7 @@ public static boolean isThisMe(String url) { * * @param url the service url of the replica node that the check is made. * @return true, if the url represents the current node which is trying to - * replicate, false otherwise. + * replicate, false otherwise. */ public boolean isThisMyUrl(String url) { final String myUrlConfigured = serverConfig.getMyUrl(); @@ -238,11 +252,11 @@ public boolean isThisMyUrl(String url) { } return isInstanceURL(url, applicationInfoManager.getInfo()); } - + /** * Checks if the given service url matches the supplied instance * - * @param url the service url of the replica node that the check is made. + * @param url the service url of the replica node that the check is made. * @param instance the instance to check the service url against * @return true, if the url represents the supplied instance, false otherwise. */