diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 1c9c231e361b8..febe853eef497 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -75,7 +75,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1619,7 +1618,29 @@ private MetadataSnapshot(Optional clientRack, SubscriptionState subscrip } boolean matches(MetadataSnapshot other) { - return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic); + if (version == other.version) + return true; + + // A rebalance is only required if the metadata changed in a way that could change the + // assignment: a subscribed topic was added/removed, a topic's partition count changed, + // or a partition's replica racks changed. For the rack comparison we ignore differences + // that can be explained by a replica whose broker is temporarily unavailable (its rack + // is unknown), so that a broker briefly going offline during a roll does not trigger an + // unnecessary rebalance. See PartitionRackInfo#equivalentTo. + if (!partitionsPerTopic.keySet().equals(other.partitionsPerTopic.keySet())) + return false; + + for (Map.Entry> entry : partitionsPerTopic.entrySet()) { + List assignmentRacks = entry.getValue(); + List currentRacks = other.partitionsPerTopic.get(entry.getKey()); + if (currentRacks.size() != assignmentRacks.size()) + return false; + for (int i = 0; i < assignmentRacks.size(); i++) { + if (!assignmentRacks.get(i).equivalentTo(currentRacks.get(i))) + return false; + } + } + return true; } @Override @@ -1656,36 +1677,44 @@ private ConsumerCoordinatorMetrics(MetricsLedger metrics, String metricGrpPrefix } private static class PartitionRackInfo { - private final Set racks; + private final Set knownRacks; + private final boolean hasUnknownReplica; PartitionRackInfo(Optional clientRack, PartitionInfo partition) { + Set racks = new HashSet<>(); + boolean unknown = false; if (clientRack.isPresent() && partition.replicas() != null) { - racks = Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet()); - } else { - racks = Collections.emptySet(); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof PartitionRackInfo)) { - return false; + for (Node replica : partition.replicas()) { + // A replica whose broker is not in the current live-broker list resolves to an + // empty Node with an unknown rack. Track that separately rather than letting it + // count as a rack change + if (replica.isEmpty()) + unknown = true; + else if (replica.rack() != null) + racks.add(replica.rack()); + } } - PartitionRackInfo rackInfo = (PartitionRackInfo) o; - return Objects.equals(racks, rackInfo.racks); + this.knownRacks = racks; + this.hasUnknownReplica = unknown; } - @Override - public int hashCode() { - return Objects.hash(racks); + /** + * Returns true if this (assignment-time) snapshot and {@code current} are equivalent for + * assignment purposes. A rack present in one snapshot but missing from the other is ignored + * when the other snapshot has an unavailable (unknown-rack) replica that could account for + * it; only a rack that is actually added or removed with no unavailable replica to + * explain it is treated as a topology change that warrants a rebalance. + */ + boolean equivalentTo(PartitionRackInfo current) { + boolean noUnexplainedAddition = knownRacks.containsAll(current.knownRacks) || hasUnknownReplica; + boolean noUnexplainedRemoval = current.knownRacks.containsAll(knownRacks) || current.hasUnknownReplica; + return noUnexplainedAddition && noUnexplainedRemoval; } @Override public String toString() { - return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks; + String racks = knownRacks.isEmpty() ? "NO_RACKS" : "racks=" + knownRacks; + return hasUnknownReplica ? racks + "+UNKNOWN" : racks; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index e897683bb987a..60864ce2b0669 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1513,6 +1513,16 @@ public void testRackAwareConsumerRebalanceWithLessRacks() { true, true); } + @Test + public void testRackAwareConsumerRebalanceWithUnavailableRack() { + // tests the case where a broker in a rack-aware cluster goes down, causing the client's known racks + // to shrink, ensuring that there is not a rebalance in this case + verifyRackAwareConsumerRebalance( + Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)), + Arrays.asList(Arrays.asList(1, 6), Arrays.asList(1, 2), Arrays.asList(2, 0)), + true, false); + } + @Test public void testRackAwareConsumerRebalanceWithNewPartitions() { verifyRackAwareConsumerRebalance(