diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java index 8b4591b99036a..32d33caf821bf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java @@ -16,25 +16,22 @@ */ package org.apache.kafka.tools.streams; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.GroupListing; -import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.internals.Exit; import org.apache.kafka.streams.CloseOptions; -import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -42,13 +39,6 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -57,92 +47,76 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.GroupState.EMPTY; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@Timeout(600) -@Tag("integration") +@ClusterTestDefaults( + types = {Type.CO_KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"), + } +) public class DeleteStreamsGroupOffsetTest { private static final String TOPIC_PREFIX = "foo-"; private static final String APP_ID_PREFIX = "streams-group-command-test"; private static final int RECORD_TOTAL = 5; - public static EmbeddedKafkaCluster cluster; - private static String bootstrapServers; private static final String OUTPUT_TOPIC_PREFIX = "output-topic-"; - @BeforeAll - public static void startCluster() { - final Properties props = new Properties(); - cluster = new EmbeddedKafkaCluster(2, props); - cluster.start(); - - bootstrapServers = cluster.bootstrapServers(); - } - - @AfterEach - public void deleteTopicsAndGroups() { - try (final Admin adminClient = cluster.createAdminClient()) { - // delete all topics - final Set topics = adminClient.listTopics().names().get(); - adminClient.deleteTopics(topics).all().get(); - // delete all groups - List groupIds = - adminClient.listGroups(ListGroupsOptions.forStreamsGroups().timeoutMs(1000)).all().get() - .stream().map(GroupListing::groupId).toList(); - adminClient.deleteStreamsGroups(groupIds).all().get(); - } catch (final UnknownTopicOrPartitionException ignored) { - } catch (final ExecutionException | InterruptedException e) { - if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { - throw new RuntimeException(e); - } - } - } - - private Properties createStreamsConfig(String bootstrapServers, String appId) { + private static Properties createStreamsConfig(String bootstrapServers, String appId) { final Properties configs = new Properties(); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); configs.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + configs.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); configs.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); return configs; } - @AfterAll - public static void closeCluster() { - cluster.stop(); - } - - @Test - public void testDeleteOffsetsNonExistingGroup() { + @ClusterTest + public void testDeleteOffsetsNonExistingGroup(ClusterInstance cluster) { String group = "not-existing"; String topic = "foo:1"; - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args)) { Map.Entry> res = service.deleteOffsets(); assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } } - @Test - public void testDeleteStreamsGroupOffsetsMultipleGroups() { + @ClusterTest + public void testDeleteStreamsGroupOffsetsMultipleGroups(ClusterInstance cluster) { final String group1 = generateRandomAppId(); final String group2 = generateRandomAppId(); final String topic1 = generateRandomTopic(); final String topic2 = generateRandomTopic(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group1, "--group", group2, "--input-topic", topic1, "--input-topic", topic2}; + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group1, "--group", group2, "--input-topic", topic1, "--input-topic", topic2}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); @@ -150,22 +124,21 @@ public void testDeleteStreamsGroupOffsetsMultipleGroups() { message.contains(group1) && message.contains(group2)); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(cluster, args)) { assertTrue(exited.get()); + } finally { Exit.resetExitProcedure(); } } - @Test - public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition() { + @ClusterTest + public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); String topicPartition = topic + ":0"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topicPartition}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topicPartition}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { Map.Entry> res = service.deleteOffsets(); assertError(res, topic, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); } catch (Exception e) { @@ -173,13 +146,13 @@ public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition() { } } - @Test - public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly() { + @ClusterTest + public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { Map.Entry> res = service.deleteOffsets(); assertError(res, topic, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); } catch (Exception e) { @@ -187,15 +160,15 @@ public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly() { } } - @Test - public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition() { + @ClusterTest + public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); final String unknownTopic = "unknown-topic"; final String unknownTopicPartition = unknownTopic + ":0"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { Map.Entry> res = service.deleteOffsets(); assertError(res, unknownTopic, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); } catch (Exception e) { @@ -203,14 +176,14 @@ public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition() { } } - @Test - public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly() { + @ClusterTest + public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); final String unknownTopic = "unknown-topic"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", unknownTopic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", unknownTopic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { Map.Entry> res = service.deleteOffsets(); assertError(res, unknownTopic, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION); } catch (Exception e) { @@ -218,14 +191,14 @@ public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); final String topicPartition = topic + ":0"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topicPartition}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topicPartition}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { stopKSApp(group, topic, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, topic, 0, 0, Errors.NONE); @@ -234,13 +207,13 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { stopKSApp(group, topic, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, topic, -1, 0, Errors.NONE); @@ -249,15 +222,15 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic1 = generateRandomTopic(); final String unknownTopic = "unknown-topic"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic1, "--input-topic", unknownTopic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic1, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", topic1, "--input-topic", unknownTopic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic1, service)) { stopKSApp(group, topic1, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, topic1, -1, 0, Errors.NONE); @@ -267,15 +240,15 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); final String unknownTopic = "unknown-topic"; final String unknownTopicPartition = unknownTopic + ":0"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", unknownTopicPartition}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { stopKSApp(group, topic, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, unknownTopic, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); @@ -284,14 +257,14 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); final String unknownTopic = "unknown-topic"; String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", unknownTopic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--input-topic", unknownTopic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { stopKSApp(group, topic, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, unknownTopic, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION); @@ -300,13 +273,13 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly() { } } - @Test - public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics() { + @ClusterTest + public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics(ClusterInstance cluster) { final String group = generateRandomAppId(); final String topic = generateRandomTopic(); String[] args; - args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--all-input-topics", topic}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); KafkaStreams streams = startKSApp(group, topic, service)) { + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete-offsets", "--group", group, "--all-input-topics", topic}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cluster, args); KafkaStreams streams = startKSApp(cluster, group, topic, service)) { stopKSApp(group, topic, streams, service); Map.Entry> res = service.deleteOffsets(); assertError(res, topic, -1, 0, Errors.NONE); @@ -359,10 +332,11 @@ private void stopKSApp(String appId, String topic, KafkaStreams streams, Streams } } - private KafkaStreams startKSApp(String appId, String inputTopic, StreamsGroupCommand.StreamsGroupService service) throws Exception { + private KafkaStreams startKSApp(ClusterInstance cluster, String appId, String inputTopic, StreamsGroupCommand.StreamsGroupService service) throws Exception { String outputTopic = generateRandomTopicId(OUTPUT_TOPIC_PREFIX); + cluster.createTopic(inputTopic, 1, (short) 1); StreamsBuilder builder = builder(inputTopic, outputTopic); - produceMessages(inputTopic); + produceMessages(cluster, inputTopic); final KStream inputStream = builder.stream(inputTopic); @@ -380,7 +354,7 @@ private KafkaStreams startKSApp(String appId, String inputTopic, StreamsGroupCom } }); - KafkaStreams streams = IntegrationTestUtils.getStartedStreams(createStreamsConfig(bootstrapServers, appId), builder, true); + KafkaStreams streams = StreamsGroupCommandTestUtils.getStartedStreams(createStreamsConfig(cluster.bootstrapServers(), appId), builder, true); TestUtils.waitForCondition( () -> !service.collectGroupMembers(appId).isEmpty(), @@ -398,10 +372,6 @@ private String generateRandomTopicId(String prefix) { return prefix + TestUtils.randomString(10); } - private String generateGroupAppId() { - return APP_ID_PREFIX + TestUtils.randomString(10); - } - private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService service, String groupId, GroupState state) throws Exception { return Objects.equals(service.collectGroupState(groupId), state); } @@ -416,25 +386,17 @@ private static StreamsBuilder builder(String inputTopic, String outputTopic) { return builder; } - private static void produceMessages(final String topic) { + private static void produceMessages(final ClusterInstance cluster, final String topic) { List> data = new ArrayList<>(RECORD_TOTAL); for (long v = 0; v < RECORD_TOTAL; ++v) { - data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", cluster.time.milliseconds())); + data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", System.currentTimeMillis())); } - IntegrationTestUtils.produceSynchronously( - TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), - false, - topic, - Optional.empty(), - data - ); + StreamsGroupCommandTestUtils.produceSynchronously(cluster.bootstrapServers(), topic, Optional.empty(), data); } - private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) { + private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(ClusterInstance cluster, String[] args) { StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args); - return new StreamsGroupCommand.StreamsGroupService( - opts, cluster.createAdminClient()); - + return new StreamsGroupCommand.StreamsGroupService(opts, cluster.admin()); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java index 998b97f9582ad..5c480bbcf20f2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupTest.java @@ -19,25 +19,23 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.FeatureUpdate; -import org.apache.kafka.clients.admin.GroupListing; -import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.errors.GroupNotEmptyException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.internals.Exit; import org.apache.kafka.streams.CloseOptions; -import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -46,13 +44,6 @@ import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -71,6 +62,14 @@ import joptsimple.OptionException; import static org.apache.kafka.common.GroupState.EMPTY; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -80,96 +79,71 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@Timeout(600) -@Tag("integration") +@ClusterTestDefaults( + types = {Type.CO_KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"), + } +) public class DeleteStreamsGroupTest { private static final String INPUT_TOPIC_PREFIX = "input-topic-"; private static final String OUTPUT_TOPIC_PREFIX = "output-topic-"; private static final String APP_ID_PREFIX = "delete-group-test-"; private static final int RECORD_TOTAL = 10; - public static EmbeddedKafkaCluster cluster; - private static String bootstrapServers; - private static Admin adminClient; - - @BeforeAll - public static void startCluster() { - final Properties props = new Properties(); - cluster = new EmbeddedKafkaCluster(2, props); - cluster.start(); - - bootstrapServers = cluster.bootstrapServers(); - adminClient = cluster.createAdminClient(); - } - - @AfterEach - public void deleteTopicsAndGroups() { - try (final Admin adminClient = cluster.createAdminClient()) { - // delete all topics - final Set topics = adminClient.listTopics().names().get(); - adminClient.deleteTopics(topics).all().get(); - // delete all groups - List groupIds = - adminClient.listGroups(ListGroupsOptions.forStreamsGroups().timeoutMs(1000)).all().get() - .stream().map(GroupListing::groupId).toList(); - adminClient.deleteStreamsGroups(groupIds).all().get(); - } catch (final UnknownTopicOrPartitionException ignored) { - } catch (final ExecutionException | InterruptedException e) { - if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { - throw new RuntimeException(e); - } - } - } - - @AfterAll - public static void closeCluster() { - cluster.stop(); - } - @Test - public void testDeleteWithUnrecognizedOption() { - final String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--delete", "--all-groups"}; + @ClusterTest + public void testDeleteWithUnrecognizedOption(ClusterInstance cluster) { + final String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups"}; assertThrows(OptionException.class, () -> getStreamsGroupService(args)); } - @Test - public void testDeleteWithoutGroupOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete"}; + @ClusterTest + public void testDeleteWithoutGroupOption(ClusterInstance cluster) { + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [delete] takes one of these options: [all-groups], [group]")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { + Exit.resetExitProcedure(); } } - @Test - public void testDeleteWithDeleteInternalTopicOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups", "--delete-internal-topic", "foo"}; + @ClusterTest + public void testDeleteWithDeleteInternalTopicOption(ClusterInstance cluster) { + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups", "--delete-internal-topic", "foo"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [delete-internal-topic] takes [reset-offsets] when [execute] is used.")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { + Exit.resetExitProcedure(); } } - @Test - public void testDeleteSingleGroupWithoutDeletingInternalTopics() throws Exception { + @ClusterTest + public void testDeleteSingleGroupWithoutDeletingInternalTopics(ClusterInstance cluster) throws Exception { final String appId = generateGroupAppId(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--group", appId}; + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", appId}; - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); - try (KafkaStreams streams = startKSApp(appId, service)) { + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); + KafkaStreams streams = startKSApp(cluster, appId, service)) { /* test 1: delete NON_EMPTY streams group */ String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); Map result = service.deleteGroups(); @@ -198,7 +172,7 @@ public void testDeleteSingleGroupWithoutDeletingInternalTopics() throws Exceptio assertEquals(1, emptyGrpRes.size()); assertTrue(emptyGrpRes.containsKey(appId)); assertNull(emptyGrpRes.get(appId), "The streams group could not be deleted as expected"); - assertEquals(3, getInternalTopics(appId).size(), + assertEquals(3, getInternalTopics(cluster, appId).size(), "The internal topics were deleted, but they shouldn't have been."); /* test 3: delete an already deleted streams group (non-existing group) */ @@ -211,13 +185,13 @@ public void testDeleteSingleGroupWithoutDeletingInternalTopics() throws Exceptio } } - @Test - public void testDeleteSingleGroupWithDeletingInternalTopics() throws Exception { + @ClusterTest + public void testDeleteSingleGroupWithDeletingInternalTopics(ClusterInstance cluster) throws Exception { final String appId = generateGroupAppId(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--group", appId, "--delete-all-internal-topics"}; + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", appId, "--delete-all-internal-topics"}; - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); - try (KafkaStreams streams = startKSApp(appId, service)) { + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); + KafkaStreams streams = startKSApp(cluster, appId, service)) { stopKSApp(appId, streams, service); final Map emptyGrpRes = new HashMap<>(); String output = ToolsTestUtils.grabConsoleOutput(() -> emptyGrpRes.putAll(service.deleteGroups())); @@ -229,182 +203,182 @@ public void testDeleteSingleGroupWithDeletingInternalTopics() throws Exception { assertEquals(1, emptyGrpRes.size()); assertTrue(emptyGrpRes.containsKey(appId)); assertNull(emptyGrpRes.get(appId), "The streams group could not be deleted as expected"); - TestUtils.waitForCondition(() -> getInternalTopics(appId).isEmpty(), + TestUtils.waitForCondition(() -> getInternalTopics(cluster, appId).isEmpty(), "The internal topics of the streams group " + appId + " were not deleted as expected."); } } - @Test - public void testDeleteMultipleGroupsWithoutDeletingInternalTopics() throws Exception { + @ClusterTest + public void testDeleteMultipleGroupsWithoutDeletingInternalTopics(ClusterInstance cluster) throws Exception { final String appId1 = generateGroupAppId(); final String appId2 = generateGroupAppId(); final String appId3 = generateGroupAppId(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"}; - - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); - KafkaStreams streams1 = startKSApp(appId1, service); - KafkaStreams streams2 = startKSApp(appId2, service); - KafkaStreams streams3 = startKSApp(appId3, service); - - - /* test 1: delete NON_EMPTY streams groups */ - final Map result = new HashMap<>(); - String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups())); - - assertTrue(output.contains("Group '" + appId1 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId1 + "' is not EMPTY."), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); - assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId3 + "' is not EMPTY."), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); - assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId2 + "' is not EMPTY."), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); - - - assertNotNull(result.get(appId1), - "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); - assertNotNull(result.get(appId2), - "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); - assertNotNull(result.get(appId3), - "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); - - assertEquals(3, result.size()); - assertInstanceOf(GroupNotEmptyException.class, - result.get(appId1), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); - assertInstanceOf(GroupNotEmptyException.class, - result.get(appId2), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); - assertInstanceOf(GroupNotEmptyException.class, - result.get(appId3), - "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); - - /* test 2: delete mix of EMPTY and NON_EMPTY streams group */ - stopKSApp(appId1, streams1, service); - final Map mixGrpsRes = new HashMap<>(); - output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups())); - - assertTrue(output.contains("Deletion of some streams groups failed:"), "The streams groups deletion did not work as expected"); - assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId2 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); - assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId3 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); - assertTrue(output.contains("These streams groups were deleted successfully: '" + appId1 + "'"), - "The streams groups deletion did not work as expected"); - assertFalse(output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."), - "The internal topics could not be deleted as expected"); - - assertEquals(3, mixGrpsRes.size()); - assertNull(mixGrpsRes.get(appId1)); - assertNotNull(mixGrpsRes.get(appId2)); - assertNotNull(mixGrpsRes.get(appId3)); - assertEquals(3, getInternalTopics(appId1).size(), - "The internal topics were deleted, but they shouldn't have been."); - assertEquals(3, getInternalTopics(appId2).size(), - "The internal topics were deleted, but they shouldn't have been."); - assertEquals(3, getInternalTopics(appId3).size(), - "The internal topics were deleted, but they shouldn't have been."); - - /* test 3: delete all groups */ - stopKSApp(appId2, streams2, service); - stopKSApp(appId3, streams3, service); - - final Map allGrpsRes = new HashMap<>(); - output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups())); - - assertTrue(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | - output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), - "The streams groups deletion did not work as expected"); - assertFalse(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | - output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), - "The internal topics could not be deleted as expected"); - - assertEquals(2, allGrpsRes.size()); - assertNull(allGrpsRes.get(appId2)); - assertNull(allGrpsRes.get(appId3)); - assertEquals(3, getInternalTopics(appId1).size(), - "The internal topics were deleted, but they shouldn't have been."); - assertEquals(3, getInternalTopics(appId2).size(), - "The internal topics were deleted, but they shouldn't have been."); - assertEquals(3, getInternalTopics(appId3).size(), - "The internal topics were deleted, but they shouldn't have been."); + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups"}; + + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + KafkaStreams streams1 = startKSApp(cluster, appId1, service); + KafkaStreams streams2 = startKSApp(cluster, appId2, service); + KafkaStreams streams3 = startKSApp(cluster, appId3, service); + + /* test 1: delete NON_EMPTY streams groups */ + final Map result = new HashMap<>(); + String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups())); + + assertTrue(output.contains("Group '" + appId1 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId1 + "' is not EMPTY."), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); + assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId3 + "' is not EMPTY."), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); + assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId2 + "' is not EMPTY."), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Output was: (" + output + ")"); + + + assertNotNull(result.get(appId1), + "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); + assertNotNull(result.get(appId2), + "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); + assertNotNull(result.get(appId3), + "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")"); + + assertEquals(3, result.size()); + assertInstanceOf(GroupNotEmptyException.class, + result.get(appId1), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); + assertInstanceOf(GroupNotEmptyException.class, + result.get(appId2), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); + assertInstanceOf(GroupNotEmptyException.class, + result.get(appId3), + "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + result + ")"); + + /* test 2: delete mix of EMPTY and NON_EMPTY streams group */ + stopKSApp(appId1, streams1, service); + final Map mixGrpsRes = new HashMap<>(); + output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups())); + + assertTrue(output.contains("Deletion of some streams groups failed:"), "The streams groups deletion did not work as expected"); + assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId2 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); + assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId3 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); + assertTrue(output.contains("These streams groups were deleted successfully: '" + appId1 + "'"), + "The streams groups deletion did not work as expected"); + assertFalse(output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."), + "The internal topics could not be deleted as expected"); + + assertEquals(3, mixGrpsRes.size()); + assertNull(mixGrpsRes.get(appId1)); + assertNotNull(mixGrpsRes.get(appId2)); + assertNotNull(mixGrpsRes.get(appId3)); + assertEquals(3, getInternalTopics(cluster, appId1).size(), + "The internal topics were deleted, but they shouldn't have been."); + assertEquals(3, getInternalTopics(cluster, appId2).size(), + "The internal topics were deleted, but they shouldn't have been."); + assertEquals(3, getInternalTopics(cluster, appId3).size(), + "The internal topics were deleted, but they shouldn't have been."); + + /* test 3: delete all groups */ + stopKSApp(appId2, streams2, service); + stopKSApp(appId3, streams3, service); + + final Map allGrpsRes = new HashMap<>(); + output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups())); + + assertTrue(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | + output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), + "The streams groups deletion did not work as expected"); + assertFalse(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | + output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), + "The internal topics could not be deleted as expected"); + + assertEquals(2, allGrpsRes.size()); + assertNull(allGrpsRes.get(appId2)); + assertNull(allGrpsRes.get(appId3)); + assertEquals(3, getInternalTopics(cluster, appId1).size(), + "The internal topics were deleted, but they shouldn't have been."); + assertEquals(3, getInternalTopics(cluster, appId2).size(), + "The internal topics were deleted, but they shouldn't have been."); + assertEquals(3, getInternalTopics(cluster, appId3).size(), + "The internal topics were deleted, but they shouldn't have been."); + } } - @Test - public void testDeleteAllGroupsWithDeletingInternalTopics() throws Exception { + @ClusterTest + public void testDeleteAllGroupsWithDeletingInternalTopics(ClusterInstance cluster) throws Exception { final String appId1 = generateGroupAppId(); final String appId2 = generateGroupAppId(); final String appId3 = generateGroupAppId(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups", "--delete-all-internal-topics"}; - - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); - KafkaStreams streams1 = startKSApp(appId1, service); - KafkaStreams streams2 = startKSApp(appId2, service); - KafkaStreams streams3 = startKSApp(appId3, service); - - /* test 1: delete mix of EMPTY and NON_EMPTY streams group */ - stopKSApp(appId1, streams1, service); - final Map mixGrpsRes = new HashMap<>(); - String output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups())); - - assertTrue(output.contains("Deletion of some streams groups failed:"), "The streams groups deletion did not work as expected"); - assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId2 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); - assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") - && output.contains("Streams group '" + appId3 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); - assertTrue(output.contains("These streams groups were deleted successfully: '" + appId1 + "'"), - "The streams groups deletion did not work as expected"); - assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."), - "The internal topics could not be deleted as expected"); - - assertEquals(3, mixGrpsRes.size()); - assertNull(mixGrpsRes.get(appId1)); - assertNotNull(mixGrpsRes.get(appId2)); - assertNotNull(mixGrpsRes.get(appId3)); - TestUtils.waitForCondition(() -> getInternalTopics(appId1).isEmpty(), - "The internal topics of the streams group " + appId1 + " were not deleted as expected."); - assertFalse(getInternalTopics(appId2).isEmpty()); - assertFalse(getInternalTopics(appId3).isEmpty()); - - /* test 2: delete all groups */ - stopKSApp(appId2, streams2, service); - stopKSApp(appId3, streams3, service); - - final Map allGrpsRes = new HashMap<>(); - output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups())); - - assertTrue(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | - output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), - "The streams groups deletion did not work as expected"); - assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | - output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), - "The internal topics could not be deleted as expected"); - - assertEquals(2, allGrpsRes.size()); - assertNull(allGrpsRes.get(appId2)); - assertNull(allGrpsRes.get(appId3)); - TestUtils.waitForCondition(() -> getInternalTopics(appId2).isEmpty(), - "The internal topics of the streams group " + appId2 + " were not deleted as expected."); - TestUtils.waitForCondition(() -> getInternalTopics(appId3).isEmpty(), - "The internal topics of the streams group " + appId3 + " were not deleted as expected."); + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups", "--delete-all-internal-topics"}; + + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + KafkaStreams streams1 = startKSApp(cluster, appId1, service); + KafkaStreams streams2 = startKSApp(cluster, appId2, service); + KafkaStreams streams3 = startKSApp(cluster, appId3, service); + + /* test 1: delete mix of EMPTY and NON_EMPTY streams group */ + stopKSApp(appId1, streams1, service); + final Map mixGrpsRes = new HashMap<>(); + String output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups())); + + assertTrue(output.contains("Deletion of some streams groups failed:"), "The streams groups deletion did not work as expected"); + assertTrue(output.contains("Group '" + appId2 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId2 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); + assertTrue(output.contains("Group '" + appId3 + "' could not be deleted due to:") + && output.contains("Streams group '" + appId3 + "' is not EMPTY."), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting streams group. Result was:(" + mixGrpsRes + ")"); + assertTrue(output.contains("These streams groups were deleted successfully: '" + appId1 + "'"), + "The streams groups deletion did not work as expected"); + assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."), + "The internal topics could not be deleted as expected"); + + assertEquals(3, mixGrpsRes.size()); + assertNull(mixGrpsRes.get(appId1)); + assertNotNull(mixGrpsRes.get(appId2)); + assertNotNull(mixGrpsRes.get(appId3)); + TestUtils.waitForCondition(() -> getInternalTopics(cluster, appId1).isEmpty(), + "The internal topics of the streams group " + appId1 + " were not deleted as expected."); + assertFalse(getInternalTopics(cluster, appId2).isEmpty()); + assertFalse(getInternalTopics(cluster, appId3).isEmpty()); + + /* test 2: delete all groups */ + stopKSApp(appId2, streams2, service); + stopKSApp(appId3, streams3, service); + + final Map allGrpsRes = new HashMap<>(); + output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups())); + + assertTrue(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | + output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), + "The streams groups deletion did not work as expected"); + assertTrue(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | + output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful."), + "The internal topics could not be deleted as expected"); + + assertEquals(2, allGrpsRes.size()); + assertNull(allGrpsRes.get(appId2)); + assertNull(allGrpsRes.get(appId3)); + TestUtils.waitForCondition(() -> getInternalTopics(cluster, appId2).isEmpty(), + "The internal topics of the streams group " + appId2 + " were not deleted as expected."); + TestUtils.waitForCondition(() -> getInternalTopics(cluster, appId3).isEmpty(), + "The internal topics of the streams group " + appId3 + " were not deleted as expected."); + } } - @Test - public void testDeleteAllGroupsAfterVersionDowngrade() throws Exception { + @ClusterTest + public void testDeleteAllGroupsAfterVersionDowngrade(ClusterInstance cluster) throws Exception { final String appId = generateGroupAppId(); - String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups", "--delete-all-internal-topics"}; + String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups", "--delete-all-internal-topics"}; - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); - try (KafkaStreams streams = startKSApp(appId, service)) { + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args); + KafkaStreams streams = startKSApp(cluster, appId, service)) { stopKSApp(appId, streams, service); // downgrade the streams.version to 0 - updateStreamsGroupProtocol((short) 0); + updateStreamsGroupProtocol(cluster, (short) 0); final Map result = new HashMap<>(); String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups())); - System.out.println(output); assertTrue(output.contains("Deletion of requested streams groups ('" + appId + "') was successful."), "The streams group could not be deleted as expected"); @@ -412,7 +386,6 @@ public void testDeleteAllGroupsAfterVersionDowngrade() throws Exception { assertTrue(output.contains("Use 'kafka-topics.sh' to delete the group's internal topics.")); // Validate the list of internal topics in error message assertTrue(output.contains("Internal topics:")); - System.out.println(output); assertTrue( output.matches("(?s).*" + APP_ID_PREFIX + "[a-zA-Z0-9\\-]+-(aggregated_value-changelog|repartition|changelog).*"), "The internal topic name does not match the expected format. Output: " + output @@ -421,16 +394,16 @@ public void testDeleteAllGroupsAfterVersionDowngrade() throws Exception { assertEquals(1, result.size()); assertTrue(result.containsKey(appId)); assertNull(result.get(appId), "The streams group could not be deleted as expected"); - assertEquals(3, getInternalTopics(appId).size(), + assertEquals(3, getInternalTopics(cluster, appId).size(), "The internal topics were deleted, but they shouldn't have been."); } finally { // upgrade back the streams.version to 1 - updateStreamsGroupProtocol((short) 1); + updateStreamsGroupProtocol(cluster, (short) 1); } } - private Set getInternalTopics(String appId) { - try { + private Set getInternalTopics(ClusterInstance cluster, String appId) { + try (Admin adminClient = cluster.admin()) { Set topics = adminClient.listTopics().names().get(); return topics.stream() .filter(topic -> topic.startsWith(appId + "-")) @@ -441,8 +414,8 @@ private Set getInternalTopics(String appId) { } } - private void updateStreamsGroupProtocol(short version) { - try (Admin admin = cluster.createAdminClient()) { + private void updateStreamsGroupProtocol(ClusterInstance cluster, short version) { + try (Admin admin = cluster.admin()) { Map updates = Utils.mkMap( Utils.mkEntry("streams.version", new FeatureUpdate(version, version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE : FeatureUpdate.UpgradeType.UPGRADE))); admin.updateFeatures(updates).all().get(); @@ -458,8 +431,9 @@ private static Properties createStreamsConfig(String bootstrapServers, String ap streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); return streamsConfig; } @@ -472,11 +446,12 @@ private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] ); } - private KafkaStreams startKSApp(String appId, StreamsGroupCommand.StreamsGroupService service) throws Exception { + private KafkaStreams startKSApp(ClusterInstance cluster, String appId, StreamsGroupCommand.StreamsGroupService service) throws Exception { String inputTopic = generateRandomTopicId(INPUT_TOPIC_PREFIX); String outputTopic = generateRandomTopicId(OUTPUT_TOPIC_PREFIX); + cluster.createTopic(inputTopic, 1, (short) 1); StreamsBuilder builder = builder(inputTopic, outputTopic); - produceMessages(inputTopic); + produceMessages(cluster, inputTopic); final KStream inputStream = builder.stream(inputTopic); @@ -494,7 +469,7 @@ private KafkaStreams startKSApp(String appId, StreamsGroupCommand.StreamsGroupSe } }); - KafkaStreams streams = IntegrationTestUtils.getStartedStreams(createStreamsConfig(bootstrapServers, appId), builder, true); + KafkaStreams streams = StreamsGroupCommandTestUtils.getStartedStreams(createStreamsConfig(cluster.bootstrapServers(), appId), builder, true); TestUtils.waitForCondition( () -> !service.collectGroupMembers(appId).isEmpty(), @@ -540,19 +515,13 @@ private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService service, return Objects.equals(service.collectGroupState(groupId), state); } - private static void produceMessages(final String topic) { + private static void produceMessages(final ClusterInstance cluster, final String topic) { List> data = new ArrayList<>(RECORD_TOTAL); for (long v = 0; v < RECORD_TOTAL; ++v) { - data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", cluster.time.milliseconds())); + data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", System.currentTimeMillis())); } - IntegrationTestUtils.produceSynchronously( - TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), - false, - topic, - Optional.empty(), - data - ); + StreamsGroupCommandTestUtils.produceSynchronously(cluster.bootstrapServers(), topic, Optional.empty(), data); } private static StreamsBuilder builder(String inputTopic, String outputTopic) { diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index 0e62f2d7acb11..670d5e8717f10 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -19,24 +19,21 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.internals.Exit; -import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -51,17 +48,27 @@ import joptsimple.OptionException; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@Timeout(600) -@Tag("integration") +@ClusterTestDefaults( + types = {Type.CO_KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"), + } +) public class DescribeStreamsGroupTest { - public static EmbeddedKafkaCluster cluster = null; - static KafkaStreams streams; private static final String APP_ID = "streams-group-command-test"; private static final String APP_ID_2 = "streams-group-command-test-2"; @@ -69,56 +76,36 @@ public class DescribeStreamsGroupTest { private static final String OUTPUT_TOPIC = "customOutputTopic"; private static final String INPUT_TOPIC_2 = "customInputTopic2"; private static final String OUTPUT_TOPIC_2 = "customOutputTopic2"; - private static String bootstrapServers; - - @BeforeAll - public static void setup() throws Exception { - // start the cluster and create the input topic - final Properties props = new Properties(); - cluster = new EmbeddedKafkaCluster(1, props); - cluster.start(); - cluster.createTopic(INPUT_TOPIC, 2, 1); - bootstrapServers = cluster.bootstrapServers(); - - - // start kafka streams - Properties streamsProp = streamsProp(APP_ID); - streams = new KafkaStreams(topology(INPUT_TOPIC, OUTPUT_TOPIC), streamsProp); - startApplicationAndWaitUntilRunning(streams); - } - @AfterAll - public static void closeCluster() { - streams.close(); - cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, OUTPUT_TOPIC_2); - cluster.stop(); - cluster = null; + private final ClusterInstance cluster; + + DescribeStreamsGroupTest(ClusterInstance cluster) { + this.cluster = cluster; } - @Test + @ClusterTest public void testDescribeWithUnrecognizedOption() { - String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID}; + String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", cluster.bootstrapServers(), "--describe", "--group", APP_ID}; assertThrows(OptionException.class, () -> getStreamsGroupService(args)); } - @Test + @ClusterTest public void testDescribeWithoutGroupOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--describe"}; + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--describe"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [describe] takes one of these options: [all-groups], [group]")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { Exit.resetExitProcedure(); } } - @Test + @ClusterTest public void testDescribeStreamsGroup() throws Exception { final List expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG"); final Set> expectedRows = Set.of( @@ -127,14 +114,17 @@ public void testDescribeStreamsGroup() throws Exception { List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0")); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of()); - // --describe --offsets has the same output as --describe - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + // --describe --offsets has the same output as --describe + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + } } - @Test + @ClusterTest public void testDescribeStreamsGroupWithVerboseOption() throws Exception { final List expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG"); final Set> expectedRows = Set.of( @@ -143,26 +133,33 @@ public void testDescribeStreamsGroupWithVerboseOption() throws Exception { List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "-", "-", "0", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0")); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); - // --describe --offsets has the same output as --describe - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + // --describe --offsets has the same output as --describe + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--offsets", "--group", APP_ID), expectedHeader, expectedRows, List.of()); + } } - @Test + @ClusterTest public void testDescribeStreamsGroupWithStateOption() throws Exception { final List expectedHeader = List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "#MEMBERS"); final Set> expectedRows = Set.of(List.of(APP_ID, "", "", "Stable", "2")); // The coordinator is not deterministic, so we don't care about it. final List dontCares = List.of(1, 2); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } } - @Test + @ClusterTest public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exception { final List expectedHeader = List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); final Set> expectedRows = Set.of(List.of(APP_ID, "", "", "Stable", "", "", "2")); @@ -170,13 +167,16 @@ public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exceptio // The GROUP-EPOCH and TARGET-ASSIGNMENT-EPOCH can vary due to rebalance timing, so we don't care about them either. final List dontCares = List.of(1, 2, 4, 5); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--state", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } } - @Test + @ClusterTest public void testDescribeStreamsGroupWithMembersOption() throws Exception { final List expectedHeader = List.of("GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows = Set.of( @@ -185,11 +185,14 @@ public void testDescribeStreamsGroupWithMembersOption() throws Exception { // The member and process names as well as client-id are not deterministic, so we don't care about them. final List dontCares = List.of(1, 2, 3); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } } - @Test + @ClusterTest public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Exception { final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows = Set.of( @@ -199,24 +202,21 @@ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to rebalance timing, so we don't care about them either. final List dontCares = List.of(1, 3, 5, 6, 7); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC)) { + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + validateDescribeOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID), expectedHeader, expectedRows, dontCares); + } } - @Test + @ClusterTest public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { - cluster.createTopic(INPUT_TOPIC_2, 1, 1); - TestUtils.waitForCondition( - () -> cluster.getAllTopicsInCluster().contains(INPUT_TOPIC_2), - 30000, - "Topic " + INPUT_TOPIC_2 + " not created" - ); - KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); - try { - startApplicationAndWaitUntilRunning(streams2); - + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + cluster.createTopic(INPUT_TOPIC_2, 1, (short) 1); + try (KafkaStreams ignored = startStreamsApp(APP_ID, INPUT_TOPIC, OUTPUT_TOPIC); + KafkaStreams ignored2 = startStreamsApp(APP_ID_2, INPUT_TOPIC_2, OUTPUT_TOPIC_2)) { final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows1 = Set.of( List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), @@ -233,21 +233,18 @@ public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throw final List dontCares = List.of(1, 3, 5, 6, 7); validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), expectedHeader, expectedRowsMap, dontCares); validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), expectedHeader, expectedRowsMap, dontCares); validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"), + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--all-groups"), expectedHeader, expectedRowsMap, dontCares); - } finally { - streams2.close(); - streams2.cleanUp(); } } - @Test + @ClusterTest public void testDescribeNonExistingStreamsGroup() { final String nonExistingGroup = "non-existing-group"; final String errorMessage = String.format( @@ -255,9 +252,15 @@ public void testDescribeNonExistingStreamsGroup() { nonExistingGroup); validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", nonExistingGroup), errorMessage); + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose", "--group", nonExistingGroup), errorMessage); validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", nonExistingGroup), errorMessage); + List.of("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members", "--group", nonExistingGroup), errorMessage); + } + + private KafkaStreams startStreamsApp(String appId, String inputTopic, String outputTopic) throws InterruptedException { + KafkaStreams streams = new KafkaStreams(topology(inputTopic, outputTopic), streamsProp(appId)); + StreamsGroupCommandTestUtils.startApplicationAndWaitUntilRunning(streams); + return streams; } private static Topology topology(String inputTopic, String outputTopic) { @@ -270,16 +273,16 @@ private static Topology topology(String inputTopic, String outputTopic) { return builder.build(); } - private static Properties streamsProp(String appId) { + private Properties streamsProp(String appId) { Properties streamsProp = new Properties(); streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); return streamsProp; } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java index 151cb5706940e..575efd090ccf2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java @@ -22,23 +22,21 @@ import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.GroupProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import java.util.Arrays; import java.util.HashSet; @@ -54,53 +52,39 @@ import joptsimple.OptionException; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; -@Timeout(600) -@Tag("integration") +@ClusterTestDefaults( + types = {Type.CO_KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"), + } +) public class ListStreamsGroupTest { - - public static EmbeddedKafkaCluster cluster = null; - static KafkaStreams streams; private static final String APP_ID = "streams-group-command-test"; private static final String INPUT_TOPIC = "customInputTopic"; private static final String OUTPUT_TOPIC = "customOutputTopic"; - @BeforeAll - public static void setup() throws Exception { - // start the cluster and create the input topic - final Properties props = new Properties(); - cluster = new EmbeddedKafkaCluster(1, props); - cluster.start(); - cluster.createTopic(INPUT_TOPIC, 2, 1); + private final ClusterInstance cluster; - - // start kafka streams - Properties streamsProp = new Properties(); - streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); - streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); - - streams = new KafkaStreams(topology(), streamsProp); - startApplicationAndWaitUntilRunning(streams); + ListStreamsGroupTest(ClusterInstance cluster) { + this.cluster = cluster; } - @AfterAll - public static void closeCluster() { - streams.close(); - cluster.stop(); - cluster = null; - } - - @Test + @ClusterTest public void testListStreamsGroupWithoutFilters() throws Exception { - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"})) { + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"})) { Set expectedGroups = Set.of(APP_ID); final AtomicReference foundGroups = new AtomicReference<>(); @@ -108,19 +92,20 @@ public void testListStreamsGroupWithoutFilters() throws Exception { foundGroups.set(new HashSet<>(service.listStreamsGroups())); return Objects.equals(expectedGroups, foundGroups.get()); }, () -> "Expected --list to show streams groups " + expectedGroups + ", but found " + foundGroups.get() + "."); - } } - @Test + @ClusterTest public void testListWithUnrecognizedNewOption() { String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", cluster.bootstrapServers(), "--list"}; Assertions.assertThrows(OptionException.class, () -> getStreamsGroupService(cgcArgs)); } - @Test + @ClusterTest public void testListStreamsGroupWithStates() throws Exception { - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"})) { + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp(); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"})) { Set expectedListing = Set.of( new GroupListing( APP_ID, @@ -138,63 +123,75 @@ public void testListStreamsGroupWithStates() throws Exception { } } - @Test + @ClusterTest public void testListStreamsGroupWithSpecifiedStates() throws Exception { - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) { - Set expectedListing = Set.of( - new GroupListing( - APP_ID, - Optional.of(GroupType.STREAMS), - "streams", - Optional.of(GroupState.STABLE)) - ); - - final AtomicReference> foundListing = new AtomicReference<>(); - - TestUtils.waitForCondition(() -> { - foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of()))); - return Objects.equals(expectedListing, foundListing.get()); - }, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + "."); - } + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp()) { + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) { + Set expectedListing = Set.of( + new GroupListing( + APP_ID, + Optional.of(GroupType.STREAMS), + "streams", + Optional.of(GroupState.STABLE)) + ); + + final AtomicReference> foundListing = new AtomicReference<>(); + + TestUtils.waitForCondition(() -> { + foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of()))); + return Objects.equals(expectedListing, foundListing.get()); + }, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + "."); + } - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) { - Set expectedListing = Set.of(); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) { + Set expectedListing = Set.of(); - final AtomicReference> foundListing = new AtomicReference<>(); + final AtomicReference> foundListing = new AtomicReference<>(); - TestUtils.waitForCondition(() -> { - foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.PREPARING_REBALANCE)))); - return Objects.equals(expectedListing, foundListing.get()); - }, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + "."); + TestUtils.waitForCondition(() -> { + foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.PREPARING_REBALANCE)))); + return Objects.equals(expectedListing, foundListing.get()); + }, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + "."); + } } } - @Test + @ClusterTest public void testListStreamsGroupOutput() throws Exception { - validateListOutput( - List.of("--bootstrap-server", cluster.bootstrapServers(), "--list"), - List.of(), - Set.of(List.of(APP_ID)) - ); + cluster.createTopic(INPUT_TOPIC, 2, (short) 1); + try (KafkaStreams ignored = startStreamsApp()) { + validateListOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--list"), + List.of(), + Set.of(List.of(APP_ID)) + ); - validateListOutput( - List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"), - List.of("GROUP", "STATE"), - Set.of(List.of(APP_ID, "Stable")) - ); + validateListOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"), + List.of("GROUP", "STATE"), + Set.of(List.of(APP_ID, "Stable")) + ); - validateListOutput( - List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"), - List.of("GROUP", "STATE"), - Set.of(List.of(APP_ID, "Stable")) - ); + validateListOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"), + List.of("GROUP", "STATE"), + Set.of(List.of(APP_ID, "Stable")) + ); - // Check case-insensitivity in state filter. - validateListOutput( - List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"), - List.of("GROUP", "STATE"), - Set.of(List.of(APP_ID, "Stable")) - ); + // Check case-insensitivity in state filter. + validateListOutput( + List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"), + List.of("GROUP", "STATE"), + Set.of(List.of(APP_ID, "Stable")) + ); + } + } + + private KafkaStreams startStreamsApp() throws InterruptedException { + KafkaStreams streams = new KafkaStreams(topology(), streamsProp()); + StreamsGroupCommandTestUtils.startApplicationAndWaitUntilRunning(streams); + return streams; } private static Topology topology() { @@ -207,6 +204,19 @@ private static Topology topology() { return builder.build(); } + private Properties streamsProp() { + Properties streamsProp = new Properties(); + streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + return streamsProp; + } + private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) { StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args); return new StreamsGroupCommand.StreamsGroupService( @@ -239,4 +249,4 @@ private static void validateListOutput( return expectedRows.equals(groups); }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get())); } -} \ No newline at end of file +} diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java index fcbba67e52d11..88a902179b489 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java @@ -18,35 +18,26 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.GroupListing; -import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.utils.internals.Exit; -import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -57,7 +48,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -71,343 +61,332 @@ import static java.time.LocalDateTime.now; import static java.util.stream.Collectors.toMap; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@Timeout(600) -@Tag("integration") +@ClusterTestDefaults( + types = {Type.CO_KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"), + } +) public class ResetStreamsGroupOffsetTest { private static final String TOPIC_PREFIX = "foo-"; private static final String APP_ID_PREFIX = "streams-group-command-test"; - private static final Properties STREAMS_CONFIG = new Properties(); private static final int RECORD_TOTAL = 10; - public static EmbeddedKafkaCluster cluster; - private static String bootstrapServers; - private static Admin adminClient; - - @BeforeAll - public static void startCluster() { - final Properties props = new Properties(); - cluster = new EmbeddedKafkaCluster(2, props); - cluster.start(); - - bootstrapServers = cluster.bootstrapServers(); - adminClient = cluster.createAdminClient(); - - createStreamsConfig(bootstrapServers); - } - - private static void createStreamsConfig(String bootstrapServers) { - STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); - STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); - } - - @AfterEach - public void deleteTopicsAndGroups() { - try (final Admin adminClient = cluster.createAdminClient()) { - // delete all topics - final Set topics = adminClient.listTopics().names().get(); - adminClient.deleteTopics(topics).all().get(); - // delete all groups - List groupIds = - adminClient.listGroups(ListGroupsOptions.forStreamsGroups().timeoutMs(1000)).all().get() - .stream().map(GroupListing::groupId).toList(); - adminClient.deleteStreamsGroups(groupIds).all().get(); - } catch (final UnknownTopicOrPartitionException ignored) { - } catch (final ExecutionException | InterruptedException e) { - if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { - throw new RuntimeException(e); - } - } - } - @AfterAll - public static void closeCluster() { - cluster.stop(); + private static Properties createStreamsConfig(String bootstrapServers, String appId) { + final Properties streamsConfig = new Properties(); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + return streamsConfig; } - @Test - public void testResetWithUnrecognizedOption() { - String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics", "--to-offset", "5"}; + @ClusterTest + public void testResetWithUnrecognizedOption(ClusterInstance cluster) { + String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--all-groups", "--all-input-topics", "--to-offset", "5"}; assertThrows(OptionException.class, () -> getStreamsGroupService(args)); } - @Test - public void testResetOffsetsWithoutGroupOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--to-offset", "5"}; + @ClusterTest + public void testResetOffsetsWithoutGroupOption(ClusterInstance cluster) { + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--to-offset", "5"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [reset-offsets] takes one of these options: [all-groups], [group]")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { Exit.resetExitProcedure(); } } - @Test - public void testResetOffsetsWithoutDryRunOrExecuteOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics", "--to-offset", "5"}; + @ClusterTest + public void testResetOffsetsWithoutDryRunOrExecuteOption(ClusterInstance cluster) { + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--all-groups", "--all-input-topics", "--to-offset", "5"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [reset-offsets] takes the option: [execute] or [dry-run]")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { Exit.resetExitProcedure(); } } - @Test - public void testResetOffsetsWithDeleteInternalTopicsOption() { - final String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--all-groups", "--all-input-topics", "--to-offset", "5", "--delete-all-internal-topics"}; + @ClusterTest + public void testResetOffsetsWithDeleteInternalTopicsOption(ClusterInstance cluster) { + final String[] args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--all-groups", "--all-input-topics", "--to-offset", "5", "--delete-all-internal-topics"}; AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); assertTrue(message.contains("Option [delete-all-internal-topics] takes [execute] when [reset-offsets] is used")); exited.set(true); })); - try { - getStreamsGroupService(args); - } finally { + try (StreamsGroupCommand.StreamsGroupService ignored = getStreamsGroupService(args)) { assertTrue(exited.get()); + } finally { Exit.resetExitProcedure(); } } - @Test - public void testResetOffset() throws Exception { + @ClusterTest + public void testResetOffset(ClusterInstance cluster) throws Exception { final String appId = generateRandomAppId(); final String topic1 = generateRandomTopic(); final String topic2 = generateRandomTopic(); final int numOfPartitions = 2; String[] args; - produceConsumeShutdown(appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); - /////////////////////////////////////////////// Specific topic (--topic topic1) //////////////////////////////////////////////// - // reset to specific offset, offset already on 10 - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "5"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 10L, 0, 1); + try (Admin adminClient = cluster.admin()) { + produceConsumeShutdown(cluster, appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic2); + /////////////////////////////////////////////// Specific topic (--topic topic1) //////////////////////////////////////////////// + // reset to specific offset, offset already on 10 + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 5L, 10L, 0, 1); - resetForNextTest(appId, 10L, topic1); + resetForNextTest(adminClient, appId, 10L, topic1); - // reset to specific offset when after end offset, offset already on 10 - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "30"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 10L, 0, 1); + // reset to specific offset when after end offset, offset already on 10 + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "30"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 20L, 10L, 0, 1); - // reset to specific offset when before begin offset, offset already on 20 - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "-30"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 20L, 0, 1); + // reset to specific offset when before begin offset, offset already on 20 + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", "-30"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 0L, 20L, 0, 1); - resetForNextTest(appId, 10L, topic1); + resetForNextTest(adminClient, appId, 10L, topic1); - // reset to specific date time - DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); - LocalDateTime dateTime = now().minusDays(1); - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-datetime", format.format(dateTime)}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + // reset to specific date time + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + LocalDateTime dateTime = now().minusDays(1); + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-datetime", format.format(dateTime)}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 0L, 10L, 0, 1); - resetForNextTest(appId, 10L, topic1); + resetForNextTest(adminClient, appId, 10L, topic1); - // reset by duration to earliest - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--by-duration", "PT5M"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + // reset by duration to earliest + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--by-duration", "PT5M"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 0L, 10L, 0, 1); - resetForNextTest(appId, 10L, topic1); + resetForNextTest(adminClient, appId, 10L, topic1); - // reset to earliest - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-earliest"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + // reset to earliest + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-earliest"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 0L, 10L, 0, 1); - resetForNextTest(appId, 10L, topic1); + resetForNextTest(adminClient, appId, 10L, topic1); - // reset to latest - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-latest"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 10L, 0, 1); + // reset to latest + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-latest"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 20L, 10L, 0, 1); - resetForNextTest(appId, 5L, topic1); + resetForNextTest(adminClient, appId, 5L, topic1); - // reset to current - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-current"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 5L, 0, 1); + // reset to current + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-current"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 5L, 5L, 0, 1); - // reset offset shift+. The current offset is 5, as of the prev test is executed (by --execute) - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "3"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 8L, 5L, 0, 1); + // reset offset shift+. The current offset is 5, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "3"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 8L, 5L, 0, 1); - // reset offset shift-. The current offset is 8, as of the prev test is executed (by --execute) - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "-3"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 8L, 0, 1); + // reset offset shift-. The current offset is 8, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "-3"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 5L, 8L, 0, 1); - // reset offset shift by lower than earliest. The current offset is 5, as of the prev test is executed (by --execute) - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "-150"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 5L, 0, 1); + // reset offset shift by lower than earliest. The current offset is 5, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "-150"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 0L, 5L, 0, 1); - // reset offset shift by higher than latest. The current offset is 0, as of the prev test is executed (by --execute) - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "150"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 0L, 0, 1); + // reset offset shift by higher than latest. The current offset is 0, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--shift-by", "150"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 20L, 0L, 0, 1); - // export to file - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--to-offset", "5", "--export"}; - File file = TestUtils.tempFile("reset", ".csv"); - Map exp = Map.of(new TopicPartition(topic1, 0), 5L, new TopicPartition(topic1, 1), 5L); - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { - Map> exportedOffsets = service.resetOffsets(); - writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); + // export to file + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--to-offset", "5", "--export"}; + File file = TestUtils.tempFile("reset", ".csv"); + Map exp = Map.of(new TopicPartition(topic1, 0), 5L, new TopicPartition(topic1, 1), 5L); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> exportedOffsets = service.resetOffsets(); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); - assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); - } - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--from-file", file.getCanonicalPath()}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { - Map> importedOffsets = service.resetOffsets(); - assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); - } - - ///////////////////////////////////////// Specific topic and partition (--topic topic1, --topic topic2) ///////////////////////////////////////// - resetForNextTest(appId, 10L, topic1); - - // reset to specific offset - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1 + ":1", "--to-offset", "5"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 10L, 1); - - resetForNextTest(appId, 10L, topic1); - - // reset both partitions of topic1 and topic2:1 to specific offset - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, - "--input-topic", topic1, "--input-topic", topic2 + ":1", "--to-offset", "5"}; - final Map expectedOffsets = Map.of( - new TopicPartition(topic1, 0), 5L, - new TopicPartition(topic1, 1), 5L, - new TopicPartition(topic2, 1), 5L); - - resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, List.of(topic1, topic2), expectedOffsets, - Map.of( - new TopicPartition(topic1, 0), 10L, - new TopicPartition(topic1, 1), 10L, - new TopicPartition(topic2, 0), 10L, - new TopicPartition(topic2, 1), 10L)); - resetOffsetsAndAssert(addTo(args, "--execute"), appId, List.of(topic1, topic2), expectedOffsets, - Map.of(new TopicPartition(topic1, 0), 5L, - new TopicPartition(topic1, 1), 5L, - new TopicPartition(topic2, 0), 10L, - new TopicPartition(topic2, 1), 5L)); - - ///////////////////////////////////////// All topics (--all-input-topics) ///////////////////////////////////////// - resetForNextTest(appId, 10L, topic1, topic2); + assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); + } + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--from-file", file.getCanonicalPath()}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> importedOffsets = service.resetOffsets(); + assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); + } - // reset to specific offset - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-input-topics", "--to-offset", "5"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L); + ///////////////////////////////////////// Specific topic and partition (--topic topic1, --topic topic2) ///////////////////////////////////////// + resetForNextTest(adminClient, appId, 10L, topic1); - resetForNextTest(appId, 10L, topic1, topic2); + // reset to specific offset + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1 + ":1", "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, 5L, 10L, 1); - // reset to specific offset with two --topic options - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--input-topic", topic1, "--input-topic", topic2, "--to-offset", "5"}; - resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L); + resetForNextTest(adminClient, appId, 10L, topic1); - resetForNextTest(appId, 10L, topic1, topic2); + // reset both partitions of topic1 and topic2:1 to specific offset + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, + "--input-topic", topic1, "--input-topic", topic2 + ":1", "--to-offset", "5"}; + final Map expectedOffsets = Map.of( + new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 1), 5L); + + resetOffsetsAndAssert(adminClient, addTo(args, "--dry-run"), appId, List.of(topic1, topic2), expectedOffsets, + Map.of( + new TopicPartition(topic1, 0), 10L, + new TopicPartition(topic1, 1), 10L, + new TopicPartition(topic2, 0), 10L, + new TopicPartition(topic2, 1), 10L)); + resetOffsetsAndAssert(adminClient, addTo(args, "--execute"), appId, List.of(topic1, topic2), expectedOffsets, + Map.of(new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 0), 10L, + new TopicPartition(topic2, 1), 5L)); + + ///////////////////////////////////////// All topics (--all-input-topics) ///////////////////////////////////////// + resetForNextTest(adminClient, appId, 10L, topic1, topic2); + + // reset to specific offset + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--all-input-topics", "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, topic2, 5L, 10L); + + resetForNextTest(adminClient, appId, 10L, topic1, topic2); + + // reset to specific offset with two --topic options + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--input-topic", topic1, "--input-topic", topic2, "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(adminClient, args, appId, topic1, topic2, 5L, 10L); + + resetForNextTest(adminClient, appId, 10L, topic1, topic2); + + // export to file + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--group", appId, "--all-input-topics", "--to-offset", "5", "--export"}; + file = TestUtils.tempFile("reset-all", ".csv"); + exp = Map.of(new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 0), 5L, + new TopicPartition(topic2, 1), 5L); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> exportedOffsets = service.resetOffsets(); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); - // export to file - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--group", appId, "--all-input-topics", "--to-offset", "5", "--export"}; - file = TestUtils.tempFile("reset-all", ".csv"); - exp = Map.of(new TopicPartition(topic1, 0), 5L, - new TopicPartition(topic1, 1), 5L, - new TopicPartition(topic2, 0), 5L, - new TopicPartition(topic2, 1), 5L); - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { - Map> exportedOffsets = service.resetOffsets(); - writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); + assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); + } + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--from-file", file.getCanonicalPath()}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> importedOffsets = service.resetOffsets(); - assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); - } - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, "--from-file", file.getCanonicalPath()}; - try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { - Map> importedOffsets = service.resetOffsets(); + assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); + } - assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); + // assert that the internal topics are not deleted + assertEquals(2, getInternalTopics(adminClient, appId).size()); } - - // assert that the internal topics are not deleted - assertEquals(2, getInternalTopics(appId).size()); } - @Test - public void testResetOffsetsWithDeleteSpecifiedInternalTopics() throws Exception { + @ClusterTest + public void testResetOffsetsWithDeleteSpecifiedInternalTopics(ClusterInstance cluster) throws Exception { final String appId = generateRandomAppId(); final String internalTopic = appId + "-aggregated_value-changelog"; final String topic1 = generateRandomTopic(); final String topic2 = generateRandomTopic(); final int numOfPartitions = 2; String[] args; - produceConsumeShutdown(appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + try (Admin adminClient = cluster.admin()) { + produceConsumeShutdown(cluster, appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic2); - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-input-topics", "--execute", "--to-offset", "5", - "--delete-internal-topic", internalTopic - }; + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--all-input-topics", "--execute", "--to-offset", "5", + "--delete-internal-topic", internalTopic + }; - resetOffsetsAndAssertInternalTopicDeletion(args, appId, internalTopic); + resetOffsetsAndAssertInternalTopicDeletion(adminClient, args, appId, internalTopic); + } } - @Test - public void testResetOffsetsWithDeleteAllInternalTopics() throws Exception { + @ClusterTest + public void testResetOffsetsWithDeleteAllInternalTopics(ClusterInstance cluster) throws Exception { final String appId = generateRandomAppId(); final String topic1 = generateRandomTopic(); final String topic2 = generateRandomTopic(); final int numOfPartitions = 2; String[] args; - produceConsumeShutdown(appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + try (Admin adminClient = cluster.admin()) { + produceConsumeShutdown(cluster, appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic2); - args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-input-topics", "--delete-all-internal-topics", "--execute", "--to-offset", "5"}; - resetOffsetsAndAssertInternalTopicDeletion(args, appId); + args = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", appId, "--all-input-topics", "--delete-all-internal-topics", "--execute", "--to-offset", "5"}; + resetOffsetsAndAssertInternalTopicDeletion(adminClient, args, appId); + } } - private void resetForNextTest(String appId, long desiredOffset, String... topics) throws ExecutionException, InterruptedException { + private void resetForNextTest(Admin adminClient, String appId, long desiredOffset, String... topics) throws ExecutionException, InterruptedException { Map offsets = new HashMap<>(); for (String topic : topics) { offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(desiredOffset)); offsets.put(new TopicPartition(topic, 1), new OffsetAndMetadata(desiredOffset)); } adminClient.alterStreamsGroupOffsets(appId, offsets).all().get(); - Map committedOffsets = committedOffsets(List.of(topics), appId); + Map committedOffsets = committedOffsets(adminClient, List.of(topics), appId); for (TopicPartition tp: offsets.keySet()) { assertEquals(desiredOffset, committedOffsets.get(tp)); } } - private void AssertCommittedOffsets(String appId, + private void assertCommittedOffsets(Admin adminClient, + String appId, String topic, long expectedCommittedOffset, int... partitions) throws ExecutionException, InterruptedException { List affectedTPs = Arrays.stream(partitions) .mapToObj(partition -> new TopicPartition(topic, partition)) .toList(); - Map committedOffsets = committedOffsets(List.of(topic), appId); + Map committedOffsets = committedOffsets(adminClient, List.of(topic), appId); for (TopicPartition tp: affectedTPs) { assertEquals(expectedCommittedOffset, committedOffsets.get(tp)); } } - private void AssertCommittedOffsets(String appId, + private void assertCommittedOffsets(Admin adminClient, + String appId, String topic1, String topic2, long expectedCommittedOffset) throws ExecutionException, InterruptedException { @@ -415,7 +394,7 @@ private void AssertCommittedOffsets(String appId, TopicPartition tp11 = new TopicPartition(topic2, 0); TopicPartition tp20 = new TopicPartition(topic1, 1); TopicPartition tp21 = new TopicPartition(topic2, 1); - Map committedOffsets = committedOffsets(List.of(topic1, topic2), appId); + Map committedOffsets = committedOffsets(adminClient, List.of(topic1, topic2), appId); assertEquals(Map.of( tp10, expectedCommittedOffset, tp20, expectedCommittedOffset, @@ -426,23 +405,16 @@ private void AssertCommittedOffsets(String appId, /** * Resets offsets for a specific topic and partition(s) and verifies the results. * - *

This method performs the following steps:

- *
    - *
  • Resets offsets for the specified topic and partitions using the provided arguments.
  • - *
  • Asserts that the reset offsets match the expected offsets.
  • - *
  • Asserts that the committed offsets match the expected committed offsets.
  • - *
- * + * @param adminClient The admin client used to read committed offsets. * @param args The command-line arguments for resetting offsets. * @param appId The application ID for the Kafka Streams application. * @param topic The topic for which offsets will be reset. * @param expectedOffset The expected offset value after the reset. * @param expectedCommittedOffset The expected committed offset value after the reset. * @param partitions The partitions of the topic to reset offsets for. - * @throws ExecutionException If an error occurs during the execution of the reset operation. - * @throws InterruptedException If the thread is interrupted during the reset operation. */ - private void resetOffsetsAndAssert(String[] args, + private void resetOffsetsAndAssert(Admin adminClient, + String[] args, String appId, String topic, long expectedOffset, @@ -463,12 +435,12 @@ private void resetOffsetsAndAssert(String[] args, assertEquals(expectedResetResults, resetOffsetsResultByGroup); assertEquals(expectedResetResults.size(), resetOffsetsResultByGroup.size()); // assert that the committed offsets are as expected - AssertCommittedOffsets(appId, topic, expectedCommittedOffset, partitions); + assertCommittedOffsets(adminClient, appId, topic, expectedCommittedOffset, partitions); } - private void resetOffsetsAndAssertInternalTopicDeletion(String[] args, String appId, String... specifiedInternalTopics) throws InterruptedException { + private void resetOffsetsAndAssertInternalTopicDeletion(Admin adminClient, String[] args, String appId, String... specifiedInternalTopics) throws InterruptedException { List specifiedInternalTopicsList = List.of(specifiedInternalTopics); - Set allInternalTopics = getInternalTopics(appId); + Set allInternalTopics = getInternalTopics(adminClient, appId); specifiedInternalTopicsList.forEach(allInternalTopics::remove); try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { @@ -478,11 +450,11 @@ private void resetOffsetsAndAssertInternalTopicDeletion(String[] args, String ap // assert that the internal topics are deleted if (specifiedInternalTopics.length > 0) { TestUtils.waitForCondition( - () -> getInternalTopics(appId).size() == allInternalTopics.size(), + () -> getInternalTopics(adminClient, appId).size() == allInternalTopics.size(), 30_000, "Internal topics were not deleted as expected after reset" ); // verify that the specified internal topics were deleted - Set internalTopicsAfterReset = getInternalTopics(appId); + Set internalTopicsAfterReset = getInternalTopics(adminClient, appId); specifiedInternalTopicsList.forEach(topic -> assertFalse(internalTopicsAfterReset.contains(topic), "Internal topic '" + topic + "' was not deleted as expected after reset") @@ -490,13 +462,13 @@ private void resetOffsetsAndAssertInternalTopicDeletion(String[] args, String ap } else { TestUtils.waitForCondition(() -> { - Set internalTopicsAfterReset = getInternalTopics(appId); + Set internalTopicsAfterReset = getInternalTopics(adminClient, appId); return internalTopicsAfterReset.isEmpty(); }, 30_000, "Internal topics were not deleted after reset"); } } - private Set getInternalTopics(String appId) { + private Set getInternalTopics(Admin adminClient, String appId) { try { Set topics = adminClient.listTopics().names().get(); return topics.stream() @@ -511,23 +483,16 @@ private Set getInternalTopics(String appId) { /** * Resets offsets for two topics and verifies the results. * - *

This method performs the following steps:

- *
    - *
  • Resets offsets for the specified topics using the provided arguments.
  • - *
  • Asserts that the reset offsets match the expected offsets.
  • - *
  • Asserts that the committed offsets match the expected committed offsets.
  • - *
- * + * @param adminClient The admin client used to read committed offsets. * @param args The command-line arguments for resetting offsets. * @param appId The application ID for the Kafka Streams application. * @param topic1 The first topic for which offsets will be reset. * @param topic2 The second topic for which offsets will be reset. * @param expectedOffset The expected offset value after the reset. * @param expectedCommittedOffset The expected committed offset value after the reset. - * @throws ExecutionException If an error occurs during the execution of the reset operation. - * @throws InterruptedException If the thread is interrupted during the reset operation. */ - private void resetOffsetsAndAssert(String[] args, + private void resetOffsetsAndAssert(Admin adminClient, + String[] args, String appId, String topic1, String topic2, @@ -550,28 +515,21 @@ private void resetOffsetsAndAssert(String[] args, assertEquals(expectedResetResults, resetOffsetsResultByGroup); assertEquals(expectedResetResults.size(), resetOffsetsResultByGroup.size()); // assert that the committed offsets are as expected - AssertCommittedOffsets(appId, topic1, topic2, expectedCommittedOffset); + assertCommittedOffsets(adminClient, appId, topic1, topic2, expectedCommittedOffset); } /** * Resets offsets for the specified topics and verifies the results. * - *

This method performs the following steps:

- *
    - *
  • Resets offsets for the given topics using the provided arguments.
  • - *
  • Asserts that the reset offsets match the expected offsets.
  • - *
  • Asserts that the committed offsets match the expected committed offsets.
  • - *
- * + * @param adminClient The admin client used to read committed offsets. * @param args The command-line arguments for resetting offsets. * @param appId The application ID for the Kafka Streams application. * @param topics The list of topics for which offsets will be reset. * @param expectedOffsets A map of expected offsets for each topic partition after the reset. * @param expectedCommittedOffsets A map of expected committed offsets for each topic partition after the reset. - * @throws ExecutionException If an error occurs during the execution of the reset operation. - * @throws InterruptedException If the thread is interrupted during the reset operation. */ - private void resetOffsetsAndAssert(String[] args, + private void resetOffsetsAndAssert(Admin adminClient, + String[] args, String appId, List topics, Map expectedOffsets, @@ -585,30 +543,33 @@ private void resetOffsetsAndAssert(String[] args, assertEquals(expectedOffsets, resetOffsetsResult); assertEquals(expectedOffsets.size(), resetOffsetsResult.size()); // assert that the committed offsets are as expected - assertEquals(expectedCommittedOffsets, committedOffsets(topics, appId)); + assertEquals(expectedCommittedOffsets, committedOffsets(adminClient, topics, appId)); } - private void resetOffsetsAndAssertForDryRunAndExecute(String[] args, + private void resetOffsetsAndAssertForDryRunAndExecute(Admin adminClient, + String[] args, String appId, String topic, long expectedOffset, long expectedCommittedOffset, int... partitions) throws ExecutionException, InterruptedException { - resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic, expectedOffset, expectedCommittedOffset, partitions); - resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic, expectedOffset, expectedOffset, partitions); + resetOffsetsAndAssert(adminClient, addTo(args, "--dry-run"), appId, topic, expectedOffset, expectedCommittedOffset, partitions); + resetOffsetsAndAssert(adminClient, addTo(args, "--execute"), appId, topic, expectedOffset, expectedOffset, partitions); } - private void resetOffsetsAndAssertForDryRunAndExecute(String[] args, + private void resetOffsetsAndAssertForDryRunAndExecute(Admin adminClient, + String[] args, String appId, String topic1, String topic2, long expectedOffset, long expectedCommittedOffset) throws ExecutionException, InterruptedException { - resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, expectedOffset, expectedCommittedOffset); - resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, expectedOffset, expectedOffset); + resetOffsetsAndAssert(adminClient, addTo(args, "--dry-run"), appId, topic1, topic2, expectedOffset, expectedCommittedOffset); + resetOffsetsAndAssert(adminClient, addTo(args, "--execute"), appId, topic1, topic2, expectedOffset, expectedOffset); } - private Map committedOffsets(List topics, + private Map committedOffsets(Admin adminClient, + List topics, String group) throws ExecutionException, InterruptedException { return adminClient.listConsumerGroupOffsets(group) .all().get() @@ -661,18 +622,17 @@ private String generateRandomAppId() { } /** - * Produces messages to two partitions of the specified topic and consumes them. + * Produces messages to two partitions of the specified topics and consumes them. * + * @param cluster The cluster to run against. * @param appId The application ID for the Kafka Streams application. * @param topic1 The first topic to produce and consume messages from. * @param topic2 The second topic to produce and consume messages from. * @param numOfCommittedMessages The number of committed messages to process before shutting down. */ - private void produceConsumeShutdown(String appId, String topic1, String topic2, long numOfCommittedMessages) throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); - - cluster.createTopic(topic1, 2); - cluster.createTopic(topic2, 2); + private void produceConsumeShutdown(ClusterInstance cluster, String appId, String topic1, String topic2, long numOfCommittedMessages) throws Exception { + cluster.createTopic(topic1, 2, (short) 1); + cluster.createTopic(topic2, 2, (short) 1); final StreamsBuilder builder = new StreamsBuilder(); @@ -696,12 +656,10 @@ private void produceConsumeShutdown(String appId, String topic1, String topic2, }); - final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); - streams.cleanUp(); - streams.start(); + final KafkaStreams streams = StreamsGroupCommandTestUtils.getStartedStreams(createStreamsConfig(cluster.bootstrapServers(), appId), builder, true); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); - produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(cluster, RECORD_TOTAL, topic2); TestUtils.waitForCondition(() -> streams.state().equals(KafkaStreams.State.RUNNING), @@ -722,37 +680,25 @@ private void produceConsumeShutdown(String appId, String topic1, String topic2, /** * Produces messages to two partitions of the specified topic. * + * @param cluster The cluster to run against. * @param numOfMessages The number of messages to produce for each partition. * @param topic The topic to which the messages will be produced. */ - private static void produceMessagesOnTwoPartitions(final int numOfMessages, final String topic) { - + private static void produceMessagesOnTwoPartitions(final ClusterInstance cluster, final int numOfMessages, final String topic) { // partition 0 List> data = new ArrayList<>(numOfMessages); for (long v = 0; v < numOfMessages; ++v) { - data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", cluster.time.milliseconds())); + data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", System.currentTimeMillis())); } - IntegrationTestUtils.produceSynchronously( - TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), - false, - topic, - Optional.of(0), - data - ); + StreamsGroupCommandTestUtils.produceSynchronously(cluster.bootstrapServers(), topic, Optional.of(0), data); // partition 1 data = new ArrayList<>(numOfMessages); for (long v = 0; v < 10; ++v) { - data.add(new KeyValueTimestamp<>(v + "1" + topic, v + "1", cluster.time.milliseconds())); + data.add(new KeyValueTimestamp<>(v + "1" + topic, v + "1", System.currentTimeMillis())); } - IntegrationTestUtils.produceSynchronously( - TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), - false, - topic, - Optional.of(1), - data - ); + StreamsGroupCommandTestUtils.produceSynchronously(cluster.bootstrapServers(), topic, Optional.of(1), data); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTestUtils.java new file mode 100644 index 0000000000000..1674bafbe1c45 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTestUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.test.TestUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public final class StreamsGroupCommandTestUtils { + + private StreamsGroupCommandTestUtils() { + } + + public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) { + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig); + if (clean) { + streams.cleanUp(); + } + streams.start(); + return streams; + } + + public static void startApplicationAndWaitUntilRunning(final KafkaStreams streams) throws InterruptedException { + streams.start(); + TestUtils.waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams application did not reach the RUNNING state."); + } + + public static void produceSynchronously(final String bootstrapServers, + final String topic, + final Optional partition, + final List> toProduce) { + Properties producerConfig = TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class); + try (Producer producer = new KafkaProducer<>(producerConfig)) { + List> futures = new ArrayList<>(); + for (KeyValueTimestamp record : toProduce) { + futures.add(producer.send( + new ProducerRecord<>(topic, partition.orElse(null), record.timestamp(), record.key(), record.value(), null))); + } + producer.flush(); + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +}