Skip to content

KAFKA-20553: Use ClusterInstance for streams group command tests#22701

Open
FrankYang0529 wants to merge 1 commit into
apache:trunkfrom
FrankYang0529:KAFKA-20553-ClusterInstance
Open

KAFKA-20553: Use ClusterInstance for streams group command tests#22701
FrankYang0529 wants to merge 1 commit into
apache:trunkfrom
FrankYang0529:KAFKA-20553-ClusterInstance

Conversation

@FrankYang0529

@FrankYang0529 FrankYang0529 commented Jun 29, 2026

Copy link
Copy Markdown
Member

Followup from:
#22521 (comment)

Migrate the streams group command integration tests from
EmbeddedKafkaCluster to the ClusterInstance test framework:

  • DescribeStreamsGroupTest
  • ListStreamsGroupTest
  • DeleteStreamsGroupTest
  • DeleteStreamsGroupOffsetTest
  • ResetStreamsGroupOffsetTest

Reviewers: Chia-Ping Tsai chia7712@gmail.com

Signed-off-by: PoAn Yang <payang@apache.org>
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

List<Future<RecordMetadata>> futures = new ArrayList<>();
for (KeyValueTimestamp<String, String> record : toProduce) {
futures.add(producer.send(
new ProducerRecord<>(topic, partition.orElse(null), record.timestamp(), record.key(), record.value(), null)));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public static void produce(final ClusterInstance instance,
                                final String topic,
                                final Optional<Integer> partition,
                                final List<KeyValueTimestamp<String, String>> toProduce) {
        try (Producer<String, String> producer = instance.producer(Map.of(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
        ))) {
            var futures = toProduce.stream()
                    .map(record -> producer.send(new ProducerRecord<>(topic, partition.orElse(null),
                            record.timestamp(), record.key(), record.value())))
                    .toList();
            producer.flush();
            futures.forEach(f -> Assertions.assertDoesNotThrow(() -> f.get()));
        }
    }

}

@Test
@ClusterTest

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need the ClusterInstance, right?

}
cluster.createTopic(INPUT_TOPIC, 2, (short) 1);
try (KafkaStreams ignored = startStreamsApp()) {
try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"})) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consolidate these try-catch blocks into a single one?

}

@Test
@ClusterTest

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Does it need to have a cluster instance?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tests Test fixes (including flaky tests) tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants