diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java index eeff3c8195e3..2c13303970a6 100644 --- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java +++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java @@ -26,6 +26,7 @@ import org.apache.shenyu.common.utils.Singleton; import org.apache.shenyu.loadbalancer.entity.Upstream; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -146,53 +147,110 @@ public void removeByKey(final String key) { */ public void submit(final String selectorId, final List upstreamList) { List actualUpstreamList = Objects.isNull(upstreamList) ? Lists.newArrayList() : upstreamList; - actualUpstreamList.forEach(upstream -> { - if (!upstream.isHealthCheckEnabled()) { - upstream.setStatus(true); - upstream.setHealthy(true); - } - }); + + // Check if the list is empty first to avoid unnecessary processing + if (actualUpstreamList.isEmpty()) { + List existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList()); + removeAllUpstreams(selectorId, existUpstreamList); + return; + } + + initializeUpstreamHealthStatus(actualUpstreamList); + Map> partitionedUpstreams = actualUpstreamList.stream() .collect(Collectors.partitioningBy(Upstream::isStatus)); List validUpstreamList = partitionedUpstreams.get(true); List offlineUpstreamList = partitionedUpstreams.get(false); List existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList()); - if (actualUpstreamList.isEmpty()) { - existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId, up)); - } + processOfflineUpstreams(selectorId, offlineUpstreamList, existUpstreamList); + processValidUpstreams(selectorId, validUpstreamList, existUpstreamList); + + List healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId); + UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList); + } + + private void initializeUpstreamHealthStatus(final List upstreamList) { + upstreamList.forEach(upstream -> { + if (!upstream.isHealthCheckEnabled()) { + upstream.setStatus(true); + upstream.setHealthy(true); + } + }); + } + + private void removeAllUpstreams(final String selectorId, final List existUpstreamList) { + List toRemove = new ArrayList<>(existUpstreamList); + toRemove.forEach(up -> task.triggerRemoveOne(selectorId, up)); + } - // Use a Set for O(1) lookups instead of nested loops + private void processOfflineUpstreams(final String selectorId, final List offlineUpstreamList, + final List existUpstreamList) { + Map currentUnhealthyMap = getCurrentUnhealthyMap(selectorId); Set existUpstreamSet = new HashSet<>(existUpstreamList); + offlineUpstreamList.forEach(offlineUp -> { + String key = upstreamMapKey(offlineUp); if (existUpstreamSet.contains(offlineUp)) { - task.triggerRemoveOne(selectorId, offlineUp); + if (currentUnhealthyMap.containsKey(key) && offlineUp.isHealthCheckEnabled()) { + task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp); + task.removeFromMap(task.getHealthyUpstream(), selectorId, offlineUp); + } else { + task.triggerRemoveOne(selectorId, offlineUp); + } + } else if (offlineUp.isHealthCheckEnabled()) { + task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp); + } + }); + } + + private void processValidUpstreams(final String selectorId, final List validUpstreamList, + final List existUpstreamList) { + if (validUpstreamList.isEmpty()) { + return; + } + + updateExistingUpstreams(validUpstreamList, existUpstreamList); + addNewUpstreams(selectorId, validUpstreamList, existUpstreamList); + } + + private void updateExistingUpstreams(final List validUpstreamList, final List existUpstreamList) { + Map existUpstreamMap = existUpstreamList.stream() + .collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing)); + + validUpstreamList.forEach(validUp -> { + Upstream matchedExistUp = existUpstreamMap.get(upstreamMapKey(validUp)); + if (Objects.nonNull(matchedExistUp)) { + matchedExistUp.setWeight(validUp.getWeight()); + matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled()); + if (!matchedExistUp.isHealthCheckEnabled()) { + matchedExistUp.setHealthy(true); + } } }); + } + + private void addNewUpstreams(final String selectorId, final List validUpstreamList, + final List existUpstreamList) { + Map currentUnhealthyMap = getCurrentUnhealthyMap(selectorId); - if (!validUpstreamList.isEmpty()) { - // update upstream weight - Map existUpstreamMap = existUpstreamList.stream() - .collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing)); - validUpstreamList.forEach(validUp -> { - String key = upstreamMapKey(validUp); - Upstream matchedExistUp = existUpstreamMap.get(key); - if (Objects.nonNull(matchedExistUp)) { - matchedExistUp.setWeight(validUp.getWeight()); - matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled()); - if (!matchedExistUp.isHealthCheckEnabled()) { - matchedExistUp.setHealthy(true); - } + validUpstreamList.stream() + .filter(validUp -> !existUpstreamList.contains(validUp)) + .forEach(up -> { + Upstream prevUnhealthy = currentUnhealthyMap.get(upstreamMapKey(up)); + if (Objects.nonNull(prevUnhealthy)) { + task.putToMap(task.getUnhealthyUpstream(), selectorId, up); + } else { + task.triggerAddOne(selectorId, up); } }); + } - validUpstreamList.stream() - .filter(validUp -> !existUpstreamList.contains(validUp)) - .forEach(up -> task.triggerAddOne(selectorId, up)); - } - - List healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId); - UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList); + private Map getCurrentUnhealthyMap(final String selectorId) { + List currentUnhealthy = task.getUnhealthyUpstream().get(selectorId); + return Objects.isNull(currentUnhealthy) + ? Maps.newConcurrentMap() + : currentUnhealthy.stream().collect(Collectors.toMap(this::upstreamMapKey, u -> u, (a, b) -> a)); } private String upstreamMapKey(final Upstream upstream) { diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java index eef5cc47c55b..4a5d0ea8fb85 100644 --- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java +++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java @@ -265,7 +265,7 @@ private void finishHealthCheck() { public void triggerAddOne(final String selectorId, final Upstream upstream) { putToMap(healthyUpstream, selectorId, upstream); } - + /** * Remove a specific upstream via selectorId. * @@ -277,7 +277,14 @@ public void triggerRemoveOne(final String selectorId, final Upstream upstream) { removeFromMap(unhealthyUpstream, selectorId, upstream); } - private void putToMap(final Map> map, final String selectorId, final Upstream upstream) { + /** + * Put upstream to specified map (for preserving health status). + * + * @param map the map to put upstream + * @param selectorId the selector id + * @param upstream the upstream + */ + public void putToMap(final Map> map, final String selectorId, final Upstream upstream) { synchronized (lock) { List list = MapUtils.computeIfAbsent(map, selectorId, k -> Lists.newArrayList()); if (!list.contains(upstream)) { @@ -286,7 +293,14 @@ private void putToMap(final Map> map, final String select } } - private void removeFromMap(final Map> map, final String selectorId, final Upstream upstream) { + /** + * Remove upstream from specified map. + * + * @param map the map to remove upstream from + * @param selectorId the selector id + * @param upstream the upstream + */ + public void removeFromMap(final Map> map, final String selectorId, final Upstream upstream) { synchronized (lock) { List list = map.get(selectorId); if (CollectionUtils.isNotEmpty(list)) { diff --git a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java index 323535f2300d..17b449df3c5a 100644 --- a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java +++ b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * The type UpstreamCacheManager check task test. @@ -106,4 +107,188 @@ public void testSubmitSyncsHealthCheckEnabled() { // Clean up upstreamCacheManager.removeByKey(testSelectorId); } + + @Test + @Order(6) + public void testSubmitWithStatusFalsePreservesUnhealthyState() { + final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance(); + final String testSelectorId = "PRESERVE_UNHEALTHY_TEST"; + + // First, submit healthy upstreams to establish baseline + List initialList = new ArrayList<>(2); + initialList.add(Upstream.builder() + .protocol("http://") + .url("upstream1:8080") + .status(true) + .healthCheckEnabled(true) + .build()); + initialList.add(Upstream.builder() + .protocol("http://") + .url("upstream2:8080") + .status(true) + .healthCheckEnabled(true) + .build()); + upstreamCacheManager.submit(testSelectorId, initialList); + + // Simulate health check marking one as unhealthy + UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager); + if (Objects.nonNull(task)) { + Upstream unhealthyUpstream = initialList.get(0); + unhealthyUpstream.setHealthy(false); + task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream); + task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream); + + // Verify it's in unhealthy map + Assertions.assertNotNull(task.getUnhealthyUpstream().get(testSelectorId)); + Assertions.assertTrue(task.getUnhealthyUpstream().get(testSelectorId).stream() + .anyMatch(u -> u.getUrl().equals("upstream1:8080"))); + } + + // Now admin sends update with status=false for that upstream + List updateList = new ArrayList<>(2); + updateList.add(Upstream.builder() + .protocol("http://") + .url("upstream1:8080") + .status(false) + .healthCheckEnabled(true) + .build()); + updateList.add(Upstream.builder() + .protocol("http://") + .url("upstream2:8080") + .status(true) + .healthCheckEnabled(true) + .build()); + upstreamCacheManager.submit(testSelectorId, updateList); + + // Verify: upstream1 should still be in unhealthy map (preserved state) + if (Objects.nonNull(task)) { + List unhealthyList = task.getUnhealthyUpstream().get(testSelectorId); + Assertions.assertNotNull(unhealthyList); + Assertions.assertTrue(unhealthyList.stream() + .anyMatch(u -> u.getUrl().equals("upstream1:8080")), + "upstream1 should be preserved in unhealthy map"); + } + + // Clean up + upstreamCacheManager.removeByKey(testSelectorId); + } + + @Test + @Order(7) + public void testSubmitWithNewOfflineUpstreamAddedToUnhealthy() { + final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance(); + final String testSelectorId = "NEW_OFFLINE_UNHEALTHY_TEST"; + + // Submit a list with a new upstream having status=false + List upstreamList = new ArrayList<>(1); + upstreamList.add(Upstream.builder() + .protocol("http://") + .url("new-upstream:8080") + .status(false) + .healthCheckEnabled(true) + .build()); + upstreamCacheManager.submit(testSelectorId, upstreamList); + + // Verify: new upstream with status=false should be in unhealthy map for monitoring + UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager); + if (Objects.nonNull(task)) { + List unhealthyList = task.getUnhealthyUpstream().get(testSelectorId); + Assertions.assertNotNull(unhealthyList); + Assertions.assertTrue(unhealthyList.stream() + .anyMatch(u -> u.getUrl().equals("new-upstream:8080")), + "New upstream with status=false should be in unhealthy map"); + } + + // Clean up + upstreamCacheManager.removeByKey(testSelectorId); + } + + @Test + @Order(8) + public void testSubmitPreservesUnhealthyForValidUpstream() { + final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance(); + final String testSelectorId = "PRESERVE_UNHEALTHY_VALID_TEST"; + + // First submit and mark an upstream as unhealthy + List initialList = new ArrayList<>(1); + initialList.add(Upstream.builder() + .protocol("http://") + .url("recovering-upstream:8080") + .status(true) + .healthCheckEnabled(true) + .build()); + upstreamCacheManager.submit(testSelectorId, initialList); + + UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager); + if (Objects.nonNull(task)) { + // Manually mark as unhealthy + Upstream unhealthyUpstream = initialList.get(0); + unhealthyUpstream.setHealthy(false); + task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream); + task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream); + + // Now admin sends update with status=true (valid) for the same upstream + List updateList = new ArrayList<>(1); + updateList.add(Upstream.builder() + .protocol("http://") + .url("recovering-upstream:8080") + .status(true) + .healthCheckEnabled(true) + .build()); + upstreamCacheManager.submit(testSelectorId, updateList); + + // Verify: should preserve unhealthy state since it was previously unhealthy + List unhealthyList = task.getUnhealthyUpstream().get(testSelectorId); + Assertions.assertNotNull(unhealthyList); + Assertions.assertTrue(unhealthyList.stream() + .anyMatch(u -> u.getUrl().equals("recovering-upstream:8080")), + "Previously unhealthy upstream should remain in unhealthy map"); + } + + // Clean up + upstreamCacheManager.removeByKey(testSelectorId); + } + + @Test + @Order(9) + public void testSubmitWithHealthCheckDisabledAndStatusFalse() { + final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance(); + final String testSelectorId = "HEALTH_CHECK_DISABLED_STATUS_FALSE_TEST"; + + // Submit upstream with healthCheckEnabled=false and status=false + // This upstream should be removed, not added to unhealthy map + List upstreamList = new ArrayList<>(1); + upstreamList.add(Upstream.builder() + .protocol("http://") + .url("no-check-upstream:8080") + .status(false) + .healthCheckEnabled(false) + .build()); + upstreamCacheManager.submit(testSelectorId, upstreamList); + + UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager); + if (Objects.nonNull(task)) { + // Verify: should NOT be in unhealthy map since health check is disabled + List unhealthyList = task.getUnhealthyUpstream().get(testSelectorId); + Assertions.assertTrue(Objects.isNull(unhealthyList) || unhealthyList.isEmpty(), + "Upstream with healthCheckEnabled=false should not be in unhealthy map"); + } + + // Clean up + upstreamCacheManager.removeByKey(testSelectorId); + } + + /** + * Helper method to get the UpstreamCheckTask using reflection. + */ + private UpstreamCheckTask getUpstreamCheckTask(final UpstreamCacheManager manager) { + try { + java.lang.reflect.Field field = UpstreamCacheManager.class.getDeclaredField("task"); + field.setAccessible(true); + return (UpstreamCheckTask) field.get(manager); + } catch (Exception e) { + // If reflection fails, return null + return null; + } + } } diff --git a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java index 4d4017143d09..d103e8844fd7 100644 --- a/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java +++ b/shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java @@ -164,4 +164,140 @@ public void testHealthCheckEnabledDefaultsToTrue() { assertTrue(upstream.isHealthCheckEnabled()); } + + /** + * Test public putToMap method. + */ + @Test + public void testPutToMap() { + final String selectorId = "putToMapTest"; + Upstream upstream1 = Upstream.builder() + .url("upstream1:8080") + .build(); + Upstream upstream2 = Upstream.builder() + .url("upstream2:8080") + .build(); + + // Test adding to healthy map + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream1); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1)); + + // Test adding another upstream + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream2); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2)); + + // Test adding duplicate (should not add again) + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream1); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2)); + + // Clean up + healthCheckTask.triggerRemoveAll(selectorId); + } + + /** + * Test public putToMap method with unhealthy map. + */ + @Test + public void testPutToMapUnhealthy() { + final String selectorId = "putToMapUnhealthyTest"; + Upstream upstream = Upstream.builder() + .url("unhealthy-upstream:8080") + .build(); + + // Test adding to unhealthy map + healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(), selectorId, upstream); + assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(), is(1)); + + // Verify it's not in healthy map + assertTrue(CollectionUtils.isEmpty(healthCheckTask.getHealthyUpstream().get(selectorId))); + + // Clean up + healthCheckTask.triggerRemoveAll(selectorId); + } + + /** + * Test public removeFromMap method. + */ + @Test + public void testRemoveFromMap() { + final String selectorId = "removeFromMapTest"; + Upstream upstream1 = Upstream.builder() + .url("remove1:8080") + .build(); + Upstream upstream2 = Upstream.builder() + .url("remove2:8080") + .build(); + + // Add upstreams to healthy map + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream1); + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream2); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(2)); + + // Remove one upstream + healthCheckTask.removeFromMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream1); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1)); + + // Verify correct upstream remains + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).get(0).getUrl(), is("remove2:8080")); + + // Clean up + healthCheckTask.triggerRemoveAll(selectorId); + } + + /** + * Test public removeFromMap method with unhealthy map. + */ + @Test + public void testRemoveFromMapUnhealthy() { + final String selectorId = "removeFromMapUnhealthyTest"; + Upstream upstream = Upstream.builder() + .url("unhealthy-to-remove:8080") + .build(); + + // Add to unhealthy map + healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(), selectorId, upstream); + assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(), is(1)); + + // Remove from unhealthy map + healthCheckTask.removeFromMap(healthCheckTask.getUnhealthyUpstream(), selectorId, upstream); + assertTrue(!healthCheckTask.getUnhealthyUpstream().containsKey(selectorId) + || healthCheckTask.getUnhealthyUpstream().get(selectorId).isEmpty()); + } + + /** + * Test moving upstream between healthy and unhealthy maps using public methods. + */ + @Test + public void testMoveUpstreamBetweenMaps() { + final String selectorId = "moveBetweenMapsTest"; + Upstream upstream = Upstream.builder() + .url("moving-upstream:8080") + .build(); + upstream.setHealthy(true); + + // Start in healthy map + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream); + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1)); + + // Move to unhealthy map + healthCheckTask.removeFromMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream); + healthCheckTask.putToMap(healthCheckTask.getUnhealthyUpstream(), selectorId, upstream); + + // Verify moved + assertTrue(!healthCheckTask.getHealthyUpstream().containsKey(selectorId) + || healthCheckTask.getHealthyUpstream().get(selectorId).isEmpty()); + assertThat(healthCheckTask.getUnhealthyUpstream().get(selectorId).size(), is(1)); + + // Move back to healthy + healthCheckTask.removeFromMap(healthCheckTask.getUnhealthyUpstream(), selectorId, upstream); + healthCheckTask.putToMap(healthCheckTask.getHealthyUpstream(), selectorId, upstream); + + // Verify moved back + assertThat(healthCheckTask.getHealthyUpstream().get(selectorId).size(), is(1)); + assertTrue(!healthCheckTask.getUnhealthyUpstream().containsKey(selectorId) + || healthCheckTask.getUnhealthyUpstream().get(selectorId).isEmpty()); + + // Clean up + healthCheckTask.triggerRemoveAll(selectorId); + } }