From 02ad10fb724327afdd208b5661b843ae84680236 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 29 Jun 2026 09:16:56 -0500 Subject: [PATCH 1/5] KAFKA-20629: Add ducktape system tests for topology description plugin (1/2) Introduces the system-test infrastructure for KIP-1331 (Streams Group Topology Description Plugin): - New Java driver TopologyDescriptionPluginSystemTest that starts a small streams app and exits on signal. - New ducktape suite streams_topology_description_plugin_test with three scenarios: * push round-trip succeeds when the plugin is configured and the client default push is on; * client opt-out via topology.description.push.enabled=false prevents the client from ever sending a description; * no plugin on the broker means no solicitation is issued. - KafkaService constructor parameter streams_group_topology_description_plugin_class and matching config_property constant to propagate group.streams.topology.description.plugin.class to brokers. - StreamsTopologyDescriptionPluginService and the supporting TOPOLOGY_DESCRIPTION_PUSH_ENABLED property constant for the new driver. Co-Authored-By: Claude Opus 4.7 --- .../TopologyDescriptionPluginSystemTest.java | 87 +++++++++++++ .../services/kafka/config_property.py | 2 + tests/kafkatest/services/kafka/kafka.py | 6 + tests/kafkatest/services/streams.py | 47 +++++++ tests/kafkatest/services/streams_property.py | 1 + ...treams_topology_description_plugin_test.py | 120 ++++++++++++++++++ 6 files changed, 263 insertions(+) create mode 100644 streams/src/test/java/org/apache/kafka/streams/tests/TopologyDescriptionPluginSystemTest.java create mode 100644 tests/kafkatest/tests/streams/streams_topology_description_plugin_test.py diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/TopologyDescriptionPluginSystemTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/TopologyDescriptionPluginSystemTest.java new file mode 100644 index 0000000000000..6afd5a0871d46 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/TopologyDescriptionPluginSystemTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.internals.Exit; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; + +public class TopologyDescriptionPluginSystemTest { + + private static final String APPLICATION_ID = "kafka-streams-system-test-topology-description-plugin"; + private static final String SOURCE_TOPIC = "topologyDescriptionPluginSource"; + private static final String SINK_TOPIC = "topologyDescriptionPluginSink"; + + public static void main(final String[] args) throws IOException { + if (args.length != 1) { + System.err.println("TopologyDescriptionPluginSystemTest expects one parameter: propFile"); + Exit.exit(1); + } + + System.out.println("TopologyDescriptionPluginSystemTest starting"); + + final String propFileName = args[0]; + final Properties streamsProperties = Utils.loadProps(propFileName); + final String bootstrap = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + + if (bootstrap == null) { + System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + Exit.exit(1); + } + + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(SOURCE_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) + .mapValues(value -> value.toLowerCase()) + .groupByKey() + .count() + .toStream() + .mapValues(count -> Long.toString(count)) + .to(SINK_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties); + streams.setUncaughtExceptionHandler(e -> { + System.err.println("FATAL: An unexpected exception " + e); + System.err.flush(); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); + + System.out.println("Start Kafka Streams"); + streams.start(); + System.out.println("STREAMS-STARTED"); + System.out.flush(); + + Exit.addShutdownHook("streams-shutdown-hook", () -> { + streams.close(Duration.ofSeconds(30)); + System.out.println("TopologyDescriptionPluginSystemTest closed"); + System.out.flush(); + }); + } +} diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 2148b3bf3e98b..647e9871e8905 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -83,6 +83,8 @@ STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS = "group.streams.assignment.interval.ms" +STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "group.streams.topology.description.plugin.class" + UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable" UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable" diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 8d98b96e2830b..d64682cbf93b1 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -207,6 +207,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI dynamicRaftQuorum=False, use_transactions_v2=False, use_streams_groups=False, + streams_group_topology_description_plugin_class=None, enable_assignment_batching=None ): """ @@ -272,6 +273,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI :param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag :param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890 :param use_streams_groups: When true, enables the use of streams groups introduced in KIP-1071 + :param streams_group_topology_description_plugin_class: When set, configures group.streams.topology.description.plugin.class on the broker (KIP-1331). Requires use_streams_groups=True. :param enable_assignment_batching: When true, enables assignment batching introduced in KIP-1263. If not specified, defaults to True. """ @@ -288,6 +290,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI self.use_transactions_v2 = use_transactions_v2 self.use_streams_groups = use_streams_groups + self.streams_group_topology_description_plugin_class = streams_group_topology_description_plugin_class # Set consumer_group_migration_policy based on context and arguments. if consumer_group_migration_policy is None: @@ -361,6 +364,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI server_prop_overrides=server_prop_overrides, dynamicRaftQuorum=self.dynamicRaftQuorum, use_transactions_v2=self.use_transactions_v2, use_streams_groups=self.use_streams_groups, + streams_group_topology_description_plugin_class=self.streams_group_topology_description_plugin_class, enable_assignment_batching=self.enable_assignment_batching ) self.controller_quorum = self.isolated_controller_quorum @@ -785,6 +789,8 @@ def prop_file(self, node): if self.use_streams_groups is True: override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = str(True) override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] = str(True) + if self.streams_group_topology_description_plugin_class is not None: + override_configs[config_property.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS] = self.streams_group_topology_description_plugin_class if self.enable_assignment_batching: # Assignment batching is enabled by default in Kafka diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 998dcb76c2f04..3f39e40d3f862 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -763,3 +763,50 @@ def prop_file(self): cfg = KafkaConfig(**properties) return cfg.render() + + +INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin" + + +class StreamsTopologyDescriptionPluginService(StreamsTestBaseService): + def __init__(self, test_context, kafka, topology_description_push_enabled=True): + super(StreamsTopologyDescriptionPluginService, self).__init__( + test_context, + kafka, + "org.apache.kafka.streams.tests.TopologyDescriptionPluginSystemTest", + "") + self.topology_description_push_enabled = topology_description_push_enabled + + @property + def expectedMessage(self): + return "STREAMS-STARTED" + + def prop_file(self): + properties = { + streams_property.STATE_DIR: self.state_dir, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.GROUP_PROTOCOL: "streams", + streams_property.TOPOLOGY_DESCRIPTION_PUSH_ENABLED: str(self.topology_description_push_enabled).lower(), + "replication.factor": 1, + "session.timeout.ms": "10000" + } + cfg = KafkaConfig(**properties) + return cfg.render() + + def start_cmd(self, node): + args = self.args.copy() + args['config_file'] = self.CONFIG_FILE + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j_param'] = get_log4j_config_param(node) + args['log4j'] = get_log4j_config_for_tools(node) + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(config_file)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + self.logger.info("Executing: " + cmd) + + return cmd diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py index c0a5902da22cb..2f213afd8f861 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/services/streams_property.py @@ -22,3 +22,4 @@ NUM_THREADS = "num.stream.threads" PROCESSING_GUARANTEE = "processing.guarantee" GROUP_PROTOCOL = "group.protocol" +TOPOLOGY_DESCRIPTION_PUSH_ENABLED = "topology.description.push.enabled" diff --git a/tests/kafkatest/tests/streams/streams_topology_description_plugin_test.py b/tests/kafkatest/tests/streams/streams_topology_description_plugin_test.py new file mode 100644 index 0000000000000..6fe88320da5d4 --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_topology_description_plugin_test.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.streams import ( + INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS, + StreamsTopologyDescriptionPluginService, +) + + +class StreamsTopologyDescriptionPluginTest(Test): + + PUSH_REQUESTED_LOG = "Broker requested topology description push" + PUSH_SENDING_LOG = "Sending topology description for group" + PUSH_SUCCESS_LOG = "Topology description pushed successfully" + + SOURCE_TOPIC = "topologyDescriptionPluginSource" + SINK_TOPIC = "topologyDescriptionPluginSink" + + def __init__(self, test_context): + super(StreamsTopologyDescriptionPluginTest, self).__init__(test_context=test_context) + self.topics = { + self.SOURCE_TOPIC: {"partitions": 1, "replication-factor": 1}, + self.SINK_TOPIC: {"partitions": 1, "replication-factor": 1}, + } + + def setup_kafka(self, plugin_enabled): + plugin_class = INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS if plugin_enabled else None + self.kafka = KafkaService( + self.test_context, + num_nodes=1, + zk=None, + topics=self.topics, + use_streams_groups=True, + streams_group_topology_description_plugin_class=plugin_class, + server_prop_overrides=[ + ["group.streams.min.session.timeout.ms", "10000"], + ["group.streams.session.timeout.ms", "10000"], + ], + ) + self.kafka.start() + self.kafka.run_features_command("upgrade", "streams.version", 1) + + @cluster(num_nodes=2) + @matrix(metadata_quorum=[quorum.combined_kraft]) + def test_topology_description_available_with_plugin(self, metadata_quorum): + """ + Test the situation when the broker has the topology description plugin configured + and the client pushes by default. The broker should solicit a push and the client + should complete it successfully. + """ + self.setup_kafka(plugin_enabled=True) + processor = StreamsTopologyDescriptionPluginService(self.test_context, self.kafka) + with processor.node.account.monitor_log(processor.LOG_FILE) as monitor: + processor.start() + monitor.wait_until(self.PUSH_SUCCESS_LOG, + timeout_sec=120, + err_msg="Streams client did not log a successful topology description push") + processor.stop() + + @cluster(num_nodes=2) + @matrix(metadata_quorum=[quorum.combined_kraft]) + def test_topology_description_not_stored_when_client_opts_out(self, metadata_quorum): + """ + Test the situation when the broker has the topology description plugin configured + but the client opts out via topology.description.push.enabled=false. The client + should never send a topology description even though the broker solicits one. + """ + self.setup_kafka(plugin_enabled=True) + processor = StreamsTopologyDescriptionPluginService( + self.test_context, self.kafka, topology_description_push_enabled=False) + processor.start() + time.sleep(30) + sent = processor.node.account.ssh_capture( + "grep -c '%s' %s || true" % (self.PUSH_SENDING_LOG, processor.LOG_FILE), + allow_fail=False) + assert int(next(sent).strip()) == 0, \ + "Client sent a topology description despite topology.description.push.enabled=false" + pushed = processor.node.account.ssh_capture( + "grep -c '%s' %s || true" % (self.PUSH_SUCCESS_LOG, processor.LOG_FILE), + allow_fail=False) + assert int(next(pushed).strip()) == 0, \ + "Client logged a successful push despite topology.description.push.enabled=false" + processor.stop() + + @cluster(num_nodes=2) + @matrix(metadata_quorum=[quorum.combined_kraft]) + def test_topology_description_not_stored_without_plugin(self, metadata_quorum): + """ + Test the situation when no topology description plugin is configured on the broker. + The broker should never solicit a topology push from the client. + """ + self.setup_kafka(plugin_enabled=False) + processor = StreamsTopologyDescriptionPluginService(self.test_context, self.kafka) + processor.start() + time.sleep(30) + # count how many lines in streams.log contain Broker requested topology description push + solicited = processor.node.account.ssh_capture( + "grep -c '%s' %s || true" % (self.PUSH_REQUESTED_LOG, processor.LOG_FILE), + allow_fail=False) + assert int(next(solicited).strip()) == 0, \ + "Broker solicited a topology push even though no plugin was configured" + processor.stop() From be35f9e4d7981dec595e32b379cf2f119f2e8c84 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 29 Jun 2026 09:31:39 -0500 Subject: [PATCH 2/5] add hb unstable api version --- .../kafka/common/requests/StreamsGroupHeartbeatRequest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java index 74d23a0b70415..33d1fbeab5275 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java @@ -39,7 +39,10 @@ public static class Builder extends AbstractRequest.Builder Date: Tue, 30 Jun 2026 00:57:39 -0500 Subject: [PATCH 3/5] revise comment --- .../kafka/common/requests/StreamsGroupHeartbeatRequest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java index 33d1fbeab5275..6f02c91f24753 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java @@ -39,9 +39,8 @@ public static class Builder extends AbstractRequest.Builder Date: Tue, 30 Jun 2026 17:58:00 -0500 Subject: [PATCH 4/5] remove unnecessary enable unstable version --- .../kafka/common/requests/StreamsGroupHeartbeatRequest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java index 6f02c91f24753..74d23a0b70415 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java @@ -39,9 +39,7 @@ public static class Builder extends AbstractRequest.Builder Date: Wed, 1 Jul 2026 13:19:01 -0500 Subject: [PATCH 5/5] modify import orders --- tests/kafkatest/services/streams.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 3f39e40d3f862..c62b1488f973c 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -26,6 +26,7 @@ from .kafka.util import get_log4j_config_param, get_log4j_config_for_tools STATE_DIR = "state.dir" +INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin" class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): """Base class for Streams Test services providing some common settings and functionality""" @@ -765,9 +766,6 @@ def prop_file(self): return cfg.render() -INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin" - - class StreamsTopologyDescriptionPluginService(StreamsTestBaseService): def __init__(self, test_context, kafka, topology_description_push_enabled=True): super(StreamsTopologyDescriptionPluginService, self).__init__(