Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ private static String logPartitionChangeInfo(PartitionRegistration oldRegistrati
}

/**
* Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
* Validates that a batch of topics will create at most {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
* has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory.
* Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured.
*
Expand All @@ -1220,7 +1220,7 @@ private static String logPartitionChangeInfo(PartitionRegistration oldRegistrati
* @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}.
*/
static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) {
int totalPartitions = 0;
long totalPartitions = 0;
for (CreatableTopic topic: request.topics()) {
if (topic.assignments().isEmpty()) {
if (topic.numPartitions() == -1) {
Expand All @@ -1231,10 +1231,9 @@ static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int
} else {
totalPartitions += topic.assignments().size();
}

}
if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
throw new PolicyViolationException("Excessively large number of partitions per request.");
if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
throw new PolicyViolationException("Excessively large number of partitions per request.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,21 @@ public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
assertEquals(error.getMessage(), "Excessively large number of partitions per request.");
}

@Test
public void testTotalNumberOfPartitionsValidationDoesNotOverflow() {
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(Integer.MAX_VALUE).setReplicationFactor((short) 1));
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(Integer.MAX_VALUE).setReplicationFactor((short) 1));

PolicyViolationException error = assertThrows(
PolicyViolationException.class,
() -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 1)
);
assertEquals("Excessively large number of partitions per request.", error.getMessage());
}

@Test
public void testCreateTopics() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
Expand Down