Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1619,7 +1618,29 @@ private MetadataSnapshot(Optional<String> clientRack, SubscriptionState subscrip
}

boolean matches(MetadataSnapshot other) {
return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic);
if (version == other.version)
return true;

// A rebalance is only required if the metadata changed in a way that could change the
// assignment: a subscribed topic was added/removed, a topic's partition count changed,
// or a partition's replica racks changed. For the rack comparison we ignore differences
// that can be explained by a replica whose broker is temporarily unavailable (its rack
// is unknown), so that a broker briefly going offline during a roll does not trigger an
// unnecessary rebalance. See PartitionRackInfo#equivalentTo.
if (!partitionsPerTopic.keySet().equals(other.partitionsPerTopic.keySet()))
return false;

for (Map.Entry<String, List<PartitionRackInfo>> entry : partitionsPerTopic.entrySet()) {
List<PartitionRackInfo> assignmentRacks = entry.getValue();
List<PartitionRackInfo> currentRacks = other.partitionsPerTopic.get(entry.getKey());
if (currentRacks.size() != assignmentRacks.size())
return false;
for (int i = 0; i < assignmentRacks.size(); i++) {
if (!assignmentRacks.get(i).equivalentTo(currentRacks.get(i)))
return false;
}
}
return true;
}

@Override
Expand Down Expand Up @@ -1656,36 +1677,44 @@ private ConsumerCoordinatorMetrics(MetricsLedger metrics, String metricGrpPrefix
}

private static class PartitionRackInfo {
private final Set<String> racks;
private final Set<String> knownRacks;
private final boolean hasUnknownReplica;

PartitionRackInfo(Optional<String> clientRack, PartitionInfo partition) {
Set<String> racks = new HashSet<>();
boolean unknown = false;
if (clientRack.isPresent() && partition.replicas() != null) {
racks = Arrays.stream(partition.replicas()).map(Node::rack).collect(Collectors.toSet());
} else {
racks = Collections.emptySet();
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PartitionRackInfo)) {
return false;
for (Node replica : partition.replicas()) {
// A replica whose broker is not in the current live-broker list resolves to an
// empty Node with an unknown rack. Track that separately rather than letting it
// count as a rack change
if (replica.isEmpty())
unknown = true;
else if (replica.rack() != null)
racks.add(replica.rack());
}
}
PartitionRackInfo rackInfo = (PartitionRackInfo) o;
return Objects.equals(racks, rackInfo.racks);
this.knownRacks = racks;
this.hasUnknownReplica = unknown;
}

@Override
public int hashCode() {
return Objects.hash(racks);
/**
* Returns true if this (assignment-time) snapshot and {@code current} are equivalent for
* assignment purposes. A rack present in one snapshot but missing from the other is ignored
* when the other snapshot has an unavailable (unknown-rack) replica that could account for
* it; only a rack that is actually added or removed with no unavailable replica to
* explain it is treated as a topology change that warrants a rebalance.
*/
boolean equivalentTo(PartitionRackInfo current) {
boolean noUnexplainedAddition = knownRacks.containsAll(current.knownRacks) || hasUnknownReplica;
boolean noUnexplainedRemoval = current.knownRacks.containsAll(knownRacks) || current.hasUnknownReplica;
return noUnexplainedAddition && noUnexplainedRemoval;
}

@Override
public String toString() {
return racks.isEmpty() ? "NO_RACKS" : "racks=" + racks;
String racks = knownRacks.isEmpty() ? "NO_RACKS" : "racks=" + knownRacks;
return hasUnknownReplica ? racks + "+UNKNOWN" : racks;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,16 @@ public void testRackAwareConsumerRebalanceWithLessRacks() {
true, true);
}

@Test
public void testRackAwareConsumerRebalanceWithUnavailableRack() {
// tests the case where a broker in a rack-aware cluster goes down, causing the client's known racks
// to shrink, ensuring that there is not a rebalance in this case
verifyRackAwareConsumerRebalance(
Arrays.asList(Arrays.asList(0, 1), Arrays.asList(1, 2), Arrays.asList(2, 0)),
Arrays.asList(Arrays.asList(1, 6), Arrays.asList(1, 2), Arrays.asList(2, 0)),
true, false);
}

@Test
public void testRackAwareConsumerRebalanceWithNewPartitions() {
verifyRackAwareConsumerRebalance(
Expand Down