diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java index 4df1d0bb3b683..d25cc4db14f12 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java @@ -46,6 +46,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC; import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; @@ -215,22 +216,110 @@ public void testSendAfterClosed(ClusterInstance clusterInstance) throws Interrup assertThrows(IllegalStateException.class, () -> producer3.send(record)); } + /** + * Test that sending to an internal topic throws InvalidTopicException + * when auto.create.topics.enable=true and the internal topic already exists. + */ + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "true") + }) + public void testCannotSendToInternalTopicWhenAutoCreateTrueAndTopicExists(ClusterInstance clusterInstance) throws Exception { + // explicitly create __consumer_offsets to satisfy the "topic exists" precondition + try (Admin admin = clusterInstance.admin()) { + Map topicConfig = clusterInstance.brokers().get(0) + .groupCoordinator() + .groupMetadataTopicConfigs(); + admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig))); + clusterInstance.waitTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, 1); + } + + try (Producer producer = clusterInstance.producer(producerConfig(1))) { + Exception thrown = assertThrows(ExecutionException.class, + () -> producer.send(new ProducerRecord<>( + Topic.GROUP_METADATA_TOPIC_NAME, + "test".getBytes(), + "test".getBytes())).get()); + assertInstanceOf(InvalidTopicException.class, thrown.getCause(), + () -> "Expected InvalidTopicException but got " + thrown.getCause()); + } + } + + /** + * Test that sending to an internal topic throws InvalidTopicException + * when auto.create.topics.enable=false and the internal topic already exists. + */ @ClusterTest - public void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws InterruptedException { + public void testCannotSendToInternalTopicWhenAutoCreateFalseAndTopicExists(ClusterInstance clusterInstance) throws Exception { + // explicitly create __consumer_offsets to satisfy the "topic exists" precondition try (Admin admin = clusterInstance.admin()) { Map topicConfig = clusterInstance.brokers().get(0) - .groupCoordinator() - .groupMetadataTopicConfigs(); + .groupCoordinator() + .groupMetadataTopicConfigs(); admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig))); - clusterInstance.waitTopicDeletion(Topic.GROUP_METADATA_TOPIC_NAME); + clusterInstance.waitTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, 1); } try (Producer producer = clusterInstance.producer(producerConfig(1))) { Exception thrown = assertThrows(ExecutionException.class, - () -> producer.send(new ProducerRecord<>(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes(), + () -> producer.send(new ProducerRecord<>( + Topic.GROUP_METADATA_TOPIC_NAME, + "test".getBytes(), "test".getBytes())).get()); assertInstanceOf(InvalidTopicException.class, thrown.getCause(), - () -> "Unexpected exception while sending to an invalid topic " + thrown.getCause()); + () -> "Expected InvalidTopicException but got " + thrown.getCause()); + } + } + + /** + * Test that sending to an internal topic throws InvalidTopicException + * when auto.create.topics.enable=true and the internal topic does not exist. + * The broker should auto-create the internal topic even though the send is rejected. + */ + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + }) + public void testCannotSendToInternalTopicWhenAutoCreateTrueAndTopicNotExists(ClusterInstance clusterInstance) throws Exception { + clusterInstance.deleteTopic(Topic.GROUP_METADATA_TOPIC_NAME); + + try (Producer producer = clusterInstance.producer(producerConfig(1))) { + Exception thrown = assertThrows(ExecutionException.class, + () -> producer.send(new ProducerRecord<>( + Topic.GROUP_METADATA_TOPIC_NAME, + "test".getBytes(), + "test".getBytes())).get()); + assertInstanceOf(InvalidTopicException.class, thrown.getCause(), + () -> "Expected InvalidTopicException but got " + thrown.getCause()); + } + + // verify the broker auto-created the internal topic + clusterInstance.waitTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, 1); + } + + /** + * Test that sending to an internal topic throws TimeoutException + * when auto.create.topics.enable=false and the internal topic does not exist. + * The broker returns UNKNOWN_TOPIC_OR_PARTITION as a recoverable error causing the producer to time out. + */ + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + }) + public void testCannotSendToInternalTopicWhenAutoCreateFalseAndTopicNotExists(ClusterInstance clusterInstance) throws Exception { + clusterInstance.deleteTopic(Topic.GROUP_METADATA_TOPIC_NAME); + + try (Producer producer = clusterInstance.producer(producerConfig(1))) { + Exception thrown = assertThrows(ExecutionException.class, + () -> producer.send(new ProducerRecord<>( + Topic.GROUP_METADATA_TOPIC_NAME, + "test".getBytes(), + "test".getBytes())).get()); + assertInstanceOf(TimeoutException.class, thrown.getCause(), + () -> "Expected TimeoutException but got " + thrown.getCause()); + } + + // verify the internal topic was not auto-created + try (Admin admin = clusterInstance.admin()) { + assertFalse(admin.listTopics().names().get().contains(Topic.GROUP_METADATA_TOPIC_NAME)); } }