Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.internals.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;

public class TopologyDescriptionPluginSystemTest {

private static final String APPLICATION_ID = "kafka-streams-system-test-topology-description-plugin";
private static final String SOURCE_TOPIC = "topologyDescriptionPluginSource";
private static final String SINK_TOPIC = "topologyDescriptionPluginSink";

public static void main(final String[] args) throws IOException {
if (args.length != 1) {
System.err.println("TopologyDescriptionPluginSystemTest expects one parameter: propFile");
Exit.exit(1);
}

System.out.println("TopologyDescriptionPluginSystemTest starting");

final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
final String bootstrap = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);

if (bootstrap == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
Exit.exit(1);
}

streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(SOURCE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> value.toLowerCase())
.groupByKey()
.count()
.toStream()
.mapValues(count -> Long.toString(count))
.to(SINK_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
streams.setUncaughtExceptionHandler(e -> {
System.err.println("FATAL: An unexpected exception " + e);
System.err.flush();
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

System.out.println("Start Kafka Streams");
streams.start();
System.out.println("STREAMS-STARTED");
System.out.flush();

Exit.addShutdownHook("streams-shutdown-hook", () -> {
streams.close(Duration.ofSeconds(30));
System.out.println("TopologyDescriptionPluginSystemTest closed");
System.out.flush();
});
}
}
2 changes: 2 additions & 0 deletions tests/kafkatest/services/kafka/config_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@

STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS = "group.streams.assignment.interval.ms"

STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "group.streams.topology.description.plugin.class"

UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable"

Expand Down
6 changes: 6 additions & 0 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
dynamicRaftQuorum=False,
use_transactions_v2=False,
use_streams_groups=False,
streams_group_topology_description_plugin_class=None,
enable_assignment_batching=None
):
"""
Expand Down Expand Up @@ -272,6 +273,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
:param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag
:param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890
:param use_streams_groups: When true, enables the use of streams groups introduced in KIP-1071
:param streams_group_topology_description_plugin_class: When set, configures group.streams.topology.description.plugin.class on the broker (KIP-1331). Requires use_streams_groups=True.
:param enable_assignment_batching: When true, enables assignment batching introduced in KIP-1263. If not specified, defaults to True.
"""

Expand All @@ -288,6 +290,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI

self.use_transactions_v2 = use_transactions_v2
self.use_streams_groups = use_streams_groups
self.streams_group_topology_description_plugin_class = streams_group_topology_description_plugin_class

# Set consumer_group_migration_policy based on context and arguments.
if consumer_group_migration_policy is None:
Expand Down Expand Up @@ -361,6 +364,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
server_prop_overrides=server_prop_overrides, dynamicRaftQuorum=self.dynamicRaftQuorum,
use_transactions_v2=self.use_transactions_v2,
use_streams_groups=self.use_streams_groups,
streams_group_topology_description_plugin_class=self.streams_group_topology_description_plugin_class,
enable_assignment_batching=self.enable_assignment_batching
)
self.controller_quorum = self.isolated_controller_quorum
Expand Down Expand Up @@ -785,6 +789,8 @@ def prop_file(self, node):
if self.use_streams_groups is True:
override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = str(True)
override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] = str(True)
if self.streams_group_topology_description_plugin_class is not None:
override_configs[config_property.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS] = self.streams_group_topology_description_plugin_class

if self.enable_assignment_batching:
# Assignment batching is enabled by default in Kafka
Expand Down
45 changes: 45 additions & 0 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .kafka.util import get_log4j_config_param, get_log4j_config_for_tools

STATE_DIR = "state.dir"
INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin"

class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
"""Base class for Streams Test services providing some common settings and functionality"""
Expand Down Expand Up @@ -763,3 +764,47 @@ def prop_file(self):

cfg = KafkaConfig(**properties)
return cfg.render()


class StreamsTopologyDescriptionPluginService(StreamsTestBaseService):
def __init__(self, test_context, kafka, topology_description_push_enabled=True):
super(StreamsTopologyDescriptionPluginService, self).__init__(
test_context,
kafka,
"org.apache.kafka.streams.tests.TopologyDescriptionPluginSystemTest",
"")
self.topology_description_push_enabled = topology_description_push_enabled

@property
def expectedMessage(self):
return "STREAMS-STARTED"

def prop_file(self):
properties = {
streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.GROUP_PROTOCOL: "streams",
streams_property.TOPOLOGY_DESCRIPTION_PUSH_ENABLED: str(self.topology_description_push_enabled).lower(),
"replication.factor": 1,
"session.timeout.ms": "10000"
}
cfg = KafkaConfig(**properties)
return cfg.render()

def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)

cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args

self.logger.info("Executing: " + cmd)

return cmd
1 change: 1 addition & 0 deletions tests/kafkatest/services/streams_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
NUM_THREADS = "num.stream.threads"
PROCESSING_GUARANTEE = "processing.guarantee"
GROUP_PROTOCOL = "group.protocol"
TOPOLOGY_DESCRIPTION_PUSH_ENABLED = "topology.description.push.enabled"
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import (
INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS,
StreamsTopologyDescriptionPluginService,
)


class StreamsTopologyDescriptionPluginTest(Test):

PUSH_REQUESTED_LOG = "Broker requested topology description push"
PUSH_SENDING_LOG = "Sending topology description for group"
PUSH_SUCCESS_LOG = "Topology description pushed successfully"

SOURCE_TOPIC = "topologyDescriptionPluginSource"
SINK_TOPIC = "topologyDescriptionPluginSink"

def __init__(self, test_context):
super(StreamsTopologyDescriptionPluginTest, self).__init__(test_context=test_context)
self.topics = {
self.SOURCE_TOPIC: {"partitions": 1, "replication-factor": 1},
self.SINK_TOPIC: {"partitions": 1, "replication-factor": 1},
}

def setup_kafka(self, plugin_enabled):
plugin_class = INMEMORY_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS if plugin_enabled else None
self.kafka = KafkaService(
self.test_context,
num_nodes=1,
zk=None,
topics=self.topics,
use_streams_groups=True,
streams_group_topology_description_plugin_class=plugin_class,
server_prop_overrides=[
["group.streams.min.session.timeout.ms", "10000"],
["group.streams.session.timeout.ms", "10000"],
],
)
self.kafka.start()
self.kafka.run_features_command("upgrade", "streams.version", 1)

@cluster(num_nodes=2)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_topology_description_available_with_plugin(self, metadata_quorum):
"""
Test the situation when the broker has the topology description plugin configured
and the client pushes by default. The broker should solicit a push and the client
should complete it successfully.
"""
self.setup_kafka(plugin_enabled=True)
processor = StreamsTopologyDescriptionPluginService(self.test_context, self.kafka)
with processor.node.account.monitor_log(processor.LOG_FILE) as monitor:
processor.start()
monitor.wait_until(self.PUSH_SUCCESS_LOG,
timeout_sec=120,
err_msg="Streams client did not log a successful topology description push")
processor.stop()

@cluster(num_nodes=2)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_topology_description_not_stored_when_client_opts_out(self, metadata_quorum):
"""
Test the situation when the broker has the topology description plugin configured
but the client opts out via topology.description.push.enabled=false. The client
should never send a topology description even though the broker solicits one.
"""
self.setup_kafka(plugin_enabled=True)
processor = StreamsTopologyDescriptionPluginService(
self.test_context, self.kafka, topology_description_push_enabled=False)
processor.start()
time.sleep(30)
sent = processor.node.account.ssh_capture(
"grep -c '%s' %s || true" % (self.PUSH_SENDING_LOG, processor.LOG_FILE),
allow_fail=False)
assert int(next(sent).strip()) == 0, \
"Client sent a topology description despite topology.description.push.enabled=false"
pushed = processor.node.account.ssh_capture(
"grep -c '%s' %s || true" % (self.PUSH_SUCCESS_LOG, processor.LOG_FILE),
allow_fail=False)
assert int(next(pushed).strip()) == 0, \
"Client logged a successful push despite topology.description.push.enabled=false"
processor.stop()

@cluster(num_nodes=2)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_topology_description_not_stored_without_plugin(self, metadata_quorum):
"""
Test the situation when no topology description plugin is configured on the broker.
The broker should never solicit a topology push from the client.
"""
self.setup_kafka(plugin_enabled=False)
processor = StreamsTopologyDescriptionPluginService(self.test_context, self.kafka)
processor.start()
time.sleep(30)
# count how many lines in streams.log contain Broker requested topology description push
solicited = processor.node.account.ssh_capture(
"grep -c '%s' %s || true" % (self.PUSH_REQUESTED_LOG, processor.LOG_FILE),
allow_fail=False)
assert int(next(solicited).strip()) == 0, \
"Broker solicited a topology push even though no plugin was configured"
processor.stop()