From dd87707df92cc14ef52f68f0ce62a94a1e52fcbf Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 2 Jul 2026 00:13:09 +0800 Subject: [PATCH] KAFKA-20746: NPE in MetadataCache#toCluster --- .../apache/kafka/metadata/MetadataCache.java | 9 ++- .../kafka/metadata/MetadataCacheTest.java | 56 +++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java index 555f1866bd83a..85d888f45f119 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java @@ -36,11 +36,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -147,10 +147,8 @@ DescribeTopicPartitionsResponseData describeTopicResponse( boolean ignoreTopicsWithExceptions); static Cluster toCluster(String clusterId, MetadataImage image) { - Map> brokerToNodes = new HashMap<>(); - image.cluster().brokers().values().stream() - .filter(broker -> !broker.fenced()) - .forEach(broker -> brokerToNodes.put(broker.id(), broker.nodes())); + Map> brokerToNodes = image.cluster().brokers().values().stream() + .collect(Collectors.toMap(BrokerRegistration::id, BrokerRegistration::nodes)); List partitionInfos = new ArrayList<>(); Set internalTopics = new HashSet<>(); @@ -168,6 +166,7 @@ static Cluster toCluster(String clusterId, MetadataImage image) { toArray(partition.isr, brokerToNodes), getOfflineReplicas(image, partition).stream() .map(brokerToNodes::get) + .filter(Objects::nonNull) .flatMap(Collection::stream) .toArray(Node[]::new) )); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java index 7df67f236705f..c193e48b58a1c 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.metadata; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; @@ -42,6 +44,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -50,6 +53,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1008,6 +1012,58 @@ public void testGetOfflineReplicasConsidersDirAssignment() { private record Broker(int id, List dirs) { } + @Test + public void testToClusterIncludesFencedReplicasAsOffline() { + MetadataDelta delta = new MetadataDelta.Builder().build(); + RegisterBrokerRecord broker0RegisterRecord = new RegisterBrokerRecord() + .setBrokerId(0) + .setFenced(false) + .setEndPoints(new BrokerEndpointCollection(List.of( + new BrokerEndpoint() + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setPort((short) 9092) + .setName("PLAINTEXT") + .setHost("broker-0") + ))); + // Register broker 1 as fenced + RegisterBrokerRecord broker1RegisterRecord = new RegisterBrokerRecord() + .setBrokerId(1) + .setFenced(true) + .setEndPoints(new BrokerEndpointCollection(List.of( + new BrokerEndpoint() + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setPort((short) 9092) + .setName("PLAINTEXT") + .setHost("broker-1") + ))); + delta.replay(broker0RegisterRecord); + delta.replay(broker1RegisterRecord); + + String topicName = "foo"; + Uuid topicId = Uuid.randomUuid(); + int partitionId = 0; + int broker0 = broker0RegisterRecord.brokerId(); + int broker1 = broker1RegisterRecord.brokerId(); + delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); + // Broker 1 is not in the ISR, but it remains in the replica set. + // This used to trigger a NullPointerException when fenced brokers were + // omitted from the broker map used to build replica metadata. + delta.replay(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(partitionId) + .setReplicas(List.of(broker0, broker1)) + .setLeader(broker0) + .setIsr(List.of(broker0))); + + Cluster cluster = assertDoesNotThrow(() -> + MetadataCache.toCluster("cluster-id", delta.apply(MetadataProvenance.EMPTY))); + PartitionInfo partition = cluster.partition(new TopicPartition(topicName, partitionId)); + + assertEquals(List.of(broker0, broker1), Arrays.stream(partition.replicas()).map(Node::id).toList()); + assertEquals(List.of(broker1), Arrays.stream(partition.offlineReplicas()).map(Node::id).toList()); + assertTrue(cluster.nodeById(broker1).isFenced()); + } + private record Partition(int id, List replicas, List dirs) { }