Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.loadbalancer.entity.Upstream;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -146,53 +147,110 @@ public void removeByKey(final String key) {
*/
public void submit(final String selectorId, final List<Upstream> upstreamList) {
List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ? Lists.newArrayList() : upstreamList;
actualUpstreamList.forEach(upstream -> {
if (!upstream.isHealthCheckEnabled()) {
upstream.setStatus(true);
upstream.setHealthy(true);
}
});

// Check if the list is empty first to avoid unnecessary processing
if (actualUpstreamList.isEmpty()) {
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
removeAllUpstreams(selectorId, existUpstreamList);
return;
}

initializeUpstreamHealthStatus(actualUpstreamList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initializeUpstreamHealthStatus method is called before partitioning the upstreams. This might lead to unnecessary processing if the list is empty.
Consider moving this call after checking if the list is empty to avoid unnecessary processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Map<Boolean, List<Upstream>> partitionedUpstreams = actualUpstreamList.stream()
.collect(Collectors.partitioningBy(Upstream::isStatus));
List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
List<Upstream> offlineUpstreamList = partitionedUpstreams.get(false);
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());

if (actualUpstreamList.isEmpty()) {
existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId, up));
}
processOfflineUpstreams(selectorId, offlineUpstreamList, existUpstreamList);
processValidUpstreams(selectorId, validUpstreamList, existUpstreamList);

List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
}

private void initializeUpstreamHealthStatus(final List<Upstream> upstreamList) {
upstreamList.forEach(upstream -> {
if (!upstream.isHealthCheckEnabled()) {
upstream.setStatus(true);
upstream.setHealthy(true);
}
});
}

private void removeAllUpstreams(final String selectorId, final List<Upstream> existUpstreamList) {
List<Upstream> toRemove = new ArrayList<>(existUpstreamList);
toRemove.forEach(up -> task.triggerRemoveOne(selectorId, up));
}

// Use a Set for O(1) lookups instead of nested loops
private void processOfflineUpstreams(final String selectorId, final List<Upstream> offlineUpstreamList,
final List<Upstream> existUpstreamList) {
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);
Set<Upstream> existUpstreamSet = new HashSet<>(existUpstreamList);

offlineUpstreamList.forEach(offlineUp -> {
String key = upstreamMapKey(offlineUp);
if (existUpstreamSet.contains(offlineUp)) {
task.triggerRemoveOne(selectorId, offlineUp);
if (currentUnhealthyMap.containsKey(key) && offlineUp.isHealthCheckEnabled()) {
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
task.removeFromMap(task.getHealthyUpstream(), selectorId, offlineUp);
} else {
task.triggerRemoveOne(selectorId, offlineUp);
}
} else if (offlineUp.isHealthCheckEnabled()) {
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
}
});
}

private void processValidUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
final List<Upstream> existUpstreamList) {
if (validUpstreamList.isEmpty()) {
return;
}

updateExistingUpstreams(validUpstreamList, existUpstreamList);
addNewUpstreams(selectorId, validUpstreamList, existUpstreamList);
}

private void updateExistingUpstreams(final List<Upstream> validUpstreamList, final List<Upstream> existUpstreamList) {
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));

validUpstreamList.forEach(validUp -> {
Upstream matchedExistUp = existUpstreamMap.get(upstreamMapKey(validUp));
if (Objects.nonNull(matchedExistUp)) {
matchedExistUp.setWeight(validUp.getWeight());
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
if (!matchedExistUp.isHealthCheckEnabled()) {
matchedExistUp.setHealthy(true);
}
}
});
}

private void addNewUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
final List<Upstream> existUpstreamList) {
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);

if (!validUpstreamList.isEmpty()) {
// update upstream weight
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));
validUpstreamList.forEach(validUp -> {
String key = upstreamMapKey(validUp);
Upstream matchedExistUp = existUpstreamMap.get(key);
if (Objects.nonNull(matchedExistUp)) {
matchedExistUp.setWeight(validUp.getWeight());
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
if (!matchedExistUp.isHealthCheckEnabled()) {
matchedExistUp.setHealthy(true);
}
validUpstreamList.stream()
.filter(validUp -> !existUpstreamList.contains(validUp))
.forEach(up -> {
Upstream prevUnhealthy = currentUnhealthyMap.get(upstreamMapKey(up));
if (Objects.nonNull(prevUnhealthy)) {
task.putToMap(task.getUnhealthyUpstream(), selectorId, up);
} else {
task.triggerAddOne(selectorId, up);
}
});
}

validUpstreamList.stream()
.filter(validUp -> !existUpstreamList.contains(validUp))
.forEach(up -> task.triggerAddOne(selectorId, up));
}

List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
private Map<String, Upstream> getCurrentUnhealthyMap(final String selectorId) {
List<Upstream> currentUnhealthy = task.getUnhealthyUpstream().get(selectorId);
return Objects.isNull(currentUnhealthy)
? Maps.newConcurrentMap()
: currentUnhealthy.stream().collect(Collectors.toMap(this::upstreamMapKey, u -> u, (a, b) -> a));
}

private String upstreamMapKey(final Upstream upstream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private void finishHealthCheck() {
public void triggerAddOne(final String selectorId, final Upstream upstream) {
putToMap(healthyUpstream, selectorId, upstream);
}

/**
* Remove a specific upstream via selectorId.
*
Expand All @@ -277,7 +277,14 @@ public void triggerRemoveOne(final String selectorId, final Upstream upstream) {
removeFromMap(unhealthyUpstream, selectorId, upstream);
}

private void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
/**
* Put upstream to specified map (for preserving health status).
*
* @param map the map to put upstream
* @param selectorId the selector id
* @param upstream the upstream
*/
public void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
synchronized (lock) {
List<Upstream> list = MapUtils.computeIfAbsent(map, selectorId, k -> Lists.newArrayList());
if (!list.contains(upstream)) {
Expand All @@ -286,7 +293,14 @@ private void putToMap(final Map<String, List<Upstream>> map, final String select
}
}

private void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
/**
* Remove upstream from specified map.
*
* @param map the map to remove upstream from
* @param selectorId the selector id
* @param upstream the upstream
*/
public void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
synchronized (lock) {
List<Upstream> list = map.get(selectorId);
if (CollectionUtils.isNotEmpty(list)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* The type UpstreamCacheManager check task test.
Expand Down Expand Up @@ -106,4 +107,188 @@ public void testSubmitSyncsHealthCheckEnabled() {
// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}

@Test
@Order(6)
public void testSubmitWithStatusFalsePreservesUnhealthyState() {
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
final String testSelectorId = "PRESERVE_UNHEALTHY_TEST";

// First, submit healthy upstreams to establish baseline
List<Upstream> initialList = new ArrayList<>(2);
initialList.add(Upstream.builder()
.protocol("http://")
.url("upstream1:8080")
.status(true)
.healthCheckEnabled(true)
.build());
initialList.add(Upstream.builder()
.protocol("http://")
.url("upstream2:8080")
.status(true)
.healthCheckEnabled(true)
.build());
upstreamCacheManager.submit(testSelectorId, initialList);

// Simulate health check marking one as unhealthy
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
if (Objects.nonNull(task)) {
Upstream unhealthyUpstream = initialList.get(0);
unhealthyUpstream.setHealthy(false);
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);

// Verify it's in unhealthy map
Assertions.assertNotNull(task.getUnhealthyUpstream().get(testSelectorId));
Assertions.assertTrue(task.getUnhealthyUpstream().get(testSelectorId).stream()
.anyMatch(u -> u.getUrl().equals("upstream1:8080")));
}

// Now admin sends update with status=false for that upstream
List<Upstream> updateList = new ArrayList<>(2);
updateList.add(Upstream.builder()
.protocol("http://")
.url("upstream1:8080")
.status(false)
.healthCheckEnabled(true)
.build());
updateList.add(Upstream.builder()
.protocol("http://")
.url("upstream2:8080")
.status(true)
.healthCheckEnabled(true)
.build());
upstreamCacheManager.submit(testSelectorId, updateList);

// Verify: upstream1 should still be in unhealthy map (preserved state)
if (Objects.nonNull(task)) {
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
Assertions.assertNotNull(unhealthyList);
Assertions.assertTrue(unhealthyList.stream()
.anyMatch(u -> u.getUrl().equals("upstream1:8080")),
"upstream1 should be preserved in unhealthy map");
}

// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}

@Test
@Order(7)
public void testSubmitWithNewOfflineUpstreamAddedToUnhealthy() {
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
final String testSelectorId = "NEW_OFFLINE_UNHEALTHY_TEST";

// Submit a list with a new upstream having status=false
List<Upstream> upstreamList = new ArrayList<>(1);
upstreamList.add(Upstream.builder()
.protocol("http://")
.url("new-upstream:8080")
.status(false)
.healthCheckEnabled(true)
.build());
upstreamCacheManager.submit(testSelectorId, upstreamList);

// Verify: new upstream with status=false should be in unhealthy map for monitoring
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
if (Objects.nonNull(task)) {
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
Assertions.assertNotNull(unhealthyList);
Assertions.assertTrue(unhealthyList.stream()
.anyMatch(u -> u.getUrl().equals("new-upstream:8080")),
"New upstream with status=false should be in unhealthy map");
}

// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}

@Test
@Order(8)
public void testSubmitPreservesUnhealthyForValidUpstream() {
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
final String testSelectorId = "PRESERVE_UNHEALTHY_VALID_TEST";

// First submit and mark an upstream as unhealthy
List<Upstream> initialList = new ArrayList<>(1);
initialList.add(Upstream.builder()
.protocol("http://")
.url("recovering-upstream:8080")
.status(true)
.healthCheckEnabled(true)
.build());
upstreamCacheManager.submit(testSelectorId, initialList);

UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
if (Objects.nonNull(task)) {
// Manually mark as unhealthy
Upstream unhealthyUpstream = initialList.get(0);
unhealthyUpstream.setHealthy(false);
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);

// Now admin sends update with status=true (valid) for the same upstream
List<Upstream> updateList = new ArrayList<>(1);
updateList.add(Upstream.builder()
.protocol("http://")
.url("recovering-upstream:8080")
.status(true)
.healthCheckEnabled(true)
.build());
upstreamCacheManager.submit(testSelectorId, updateList);

// Verify: should preserve unhealthy state since it was previously unhealthy
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
Assertions.assertNotNull(unhealthyList);
Assertions.assertTrue(unhealthyList.stream()
.anyMatch(u -> u.getUrl().equals("recovering-upstream:8080")),
"Previously unhealthy upstream should remain in unhealthy map");
}

// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}

@Test
@Order(9)
public void testSubmitWithHealthCheckDisabledAndStatusFalse() {
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
final String testSelectorId = "HEALTH_CHECK_DISABLED_STATUS_FALSE_TEST";

// Submit upstream with healthCheckEnabled=false and status=false
// This upstream should be removed, not added to unhealthy map
List<Upstream> upstreamList = new ArrayList<>(1);
upstreamList.add(Upstream.builder()
.protocol("http://")
.url("no-check-upstream:8080")
.status(false)
.healthCheckEnabled(false)
.build());
upstreamCacheManager.submit(testSelectorId, upstreamList);

UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
if (Objects.nonNull(task)) {
// Verify: should NOT be in unhealthy map since health check is disabled
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
Assertions.assertTrue(Objects.isNull(unhealthyList) || unhealthyList.isEmpty(),
"Upstream with healthCheckEnabled=false should not be in unhealthy map");
}

// Clean up
upstreamCacheManager.removeByKey(testSelectorId);
}

/**
* Helper method to get the UpstreamCheckTask using reflection.
*/
private UpstreamCheckTask getUpstreamCheckTask(final UpstreamCacheManager manager) {
try {
java.lang.reflect.Field field = UpstreamCacheManager.class.getDeclaredField("task");
field.setAccessible(true);
return (UpstreamCheckTask) field.get(manager);
} catch (Exception e) {
// If reflection fails, return null
return null;
}
}
}
Loading
Loading