Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -54,73 +52,60 @@

import joptsimple.OptionException;

import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;

@Timeout(600)
@Tag("integration")
@ClusterTestDefaults(
types = {Type.CO_KRAFT},
serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
@ClusterConfigProperty(key = STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"),
@ClusterConfigProperty(key = STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "100"),
}
)
public class ListStreamsGroupTest {

public static EmbeddedKafkaCluster cluster = null;
static KafkaStreams streams;
private static final String APP_ID = "streams-group-command-test";
private static final String INPUT_TOPIC = "customInputTopic";
private static final String OUTPUT_TOPIC = "customOutputTopic";

@BeforeAll
public static void setup() throws Exception {
// start the cluster and create the input topic
final Properties props = new Properties();
cluster = new EmbeddedKafkaCluster(1, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC, 2, 1);
private final ClusterInstance cluster;


// start kafka streams
Properties streamsProp = new Properties();
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));

streams = new KafkaStreams(topology(), streamsProp);
startApplicationAndWaitUntilRunning(streams);
ListStreamsGroupTest(ClusterInstance cluster) {
this.cluster = cluster;
}

@AfterAll
public static void closeCluster() {
streams.close();
cluster.stop();
cluster = null;
}

@Test
@ClusterTest
public void testListStreamsGroupWithoutFilters() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"})) {
cluster.createTopic(INPUT_TOPIC, 2, (short) 1);
try (KafkaStreams ignored = startStreamsApp();
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"})) {
Set<String> expectedGroups = Set.of(APP_ID);

final AtomicReference<Set> foundGroups = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundGroups.set(new HashSet<>(service.listStreamsGroups()));
return Objects.equals(expectedGroups, foundGroups.get());
}, () -> "Expected --list to show streams groups " + expectedGroups + ", but found " + foundGroups.get() + ".");

}
}

@Test
@ClusterTest

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need the ClusterInstance, right?

public void testListWithUnrecognizedNewOption() {
String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", cluster.bootstrapServers(), "--list"};
Assertions.assertThrows(OptionException.class, () -> getStreamsGroupService(cgcArgs));
}

@Test
@ClusterTest
public void testListStreamsGroupWithStates() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"})) {
cluster.createTopic(INPUT_TOPIC, 2, (short) 1);
try (KafkaStreams ignored = startStreamsApp();
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"})) {
Set<GroupListing> expectedListing = Set.of(
new GroupListing(
APP_ID,
Expand All @@ -138,63 +123,75 @@ public void testListStreamsGroupWithStates() throws Exception {
}
}

@Test
@ClusterTest
public void testListStreamsGroupWithSpecifiedStates() throws Exception {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) {
Set<GroupListing> expectedListing = Set.of(
new GroupListing(
APP_ID,
Optional.of(GroupType.STREAMS),
"streams",
Optional.of(GroupState.STABLE))
);

final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();

TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of())));
return Objects.equals(expectedListing, foundListing.get());
}, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
cluster.createTopic(INPUT_TOPIC, 2, (short) 1);
try (KafkaStreams ignored = startStreamsApp()) {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consolidate these try-catch blocks into a single one?

Set<GroupListing> expectedListing = Set.of(
new GroupListing(
APP_ID,
Optional.of(GroupType.STREAMS),
"streams",
Optional.of(GroupState.STABLE))
);

final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();

TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of())));
return Objects.equals(expectedListing, foundListing.get());
}, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}

try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
Set<GroupListing> expectedListing = Set.of();
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"})) {
Set<GroupListing> expectedListing = Set.of();

final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();
final AtomicReference<Set<GroupListing>> foundListing = new AtomicReference<>();

TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.PREPARING_REBALANCE))));
return Objects.equals(expectedListing, foundListing.get());
}, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
TestUtils.waitForCondition(() -> {
foundListing.set(new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.PREPARING_REBALANCE))));
return Objects.equals(expectedListing, foundListing.get());
}, () -> "Expected --list to show streams groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
}
}

@Test
@ClusterTest
public void testListStreamsGroupOutput() throws Exception {
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list"),
List.of(),
Set.of(List.of(APP_ID))
);
cluster.createTopic(INPUT_TOPIC, 2, (short) 1);
try (KafkaStreams ignored = startStreamsApp()) {
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list"),
List.of(),
Set.of(List.of(APP_ID))
);

validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);

validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);

// Check case-insensitivity in state filter.
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);
// Check case-insensitivity in state filter.
validateListOutput(
List.of("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"),
List.of("GROUP", "STATE"),
Set.of(List.of(APP_ID, "Stable"))
);
}
}

private KafkaStreams startStreamsApp() throws InterruptedException {
KafkaStreams streams = new KafkaStreams(topology(), streamsProp());
StreamsGroupCommandTestUtils.startApplicationAndWaitUntilRunning(streams);
return streams;
}

private static Topology topology() {
Expand All @@ -207,6 +204,19 @@ private static Topology topology() {
return builder.build();
}

private Properties streamsProp() {
Properties streamsProp = new Properties();
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
streamsProp.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
return streamsProp;
}

private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args);
return new StreamsGroupCommand.StreamsGroupService(
Expand Down Expand Up @@ -239,4 +249,4 @@ private static void validateListOutput(
return expectedRows.equals(groups);
}, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
}
}
}
Loading
Loading