From 772c4895df08982a5d2f123e5abc3ffbb62d961c Mon Sep 17 00:00:00 2001 From: luohaiyang Date: Thu, 2 Jul 2026 00:21:58 +0800 Subject: [PATCH 1/2] MINOR: Prevent overflow in CreateTopics partition validation --- .../kafka/controller/ReplicationControlManager.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ae1854f3cedbb..4c8a24f0faa57 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1211,7 +1211,7 @@ private static String logPartitionChangeInfo(PartitionRegistration oldRegistrati } /** - * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch + * Validates that a batch of topics will create at most {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. * Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured. * @@ -1220,7 +1220,7 @@ private static String logPartitionChangeInfo(PartitionRegistration oldRegistrati * @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}. */ static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) { - int totalPartitions = 0; + long totalPartitions = 0; for (CreatableTopic topic: request.topics()) { if (topic.assignments().isEmpty()) { if (topic.numPartitions() == -1) { @@ -1231,10 +1231,9 @@ static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int } else { totalPartitions += topic.assignments().size(); } - - } - if (totalPartitions > MAX_PARTITIONS_PER_BATCH) { - throw new PolicyViolationException("Excessively large number of partitions per request."); + if (totalPartitions > MAX_PARTITIONS_PER_BATCH) { + throw new PolicyViolationException("Excessively large number of partitions per request."); + } } } From 818a6ae9b001190b3d9e43bd32e353960443ebb1 Mon Sep 17 00:00:00 2001 From: luohaiyang Date: Thu, 2 Jul 2026 00:22:35 +0800 Subject: [PATCH 2/2] MINOR: Prevent overflow in CreateTopics partition validation --- .../controller/ReplicationControlManagerTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index cb7a74b334c2e..b6b66a906e800 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -621,6 +621,21 @@ public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() { assertEquals(error.getMessage(), "Excessively large number of partitions per request."); } + @Test + public void testTotalNumberOfPartitionsValidationDoesNotOverflow() { + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(Integer.MAX_VALUE).setReplicationFactor((short) 1)); + request.topics().add(new CreatableTopic().setName("bar"). + setNumPartitions(Integer.MAX_VALUE).setReplicationFactor((short) 1)); + + PolicyViolationException error = assertThrows( + PolicyViolationException.class, + () -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 1) + ); + assertEquals("Excessively large number of partitions per request.", error.getMessage()); + } + @Test public void testCreateTopics() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();