Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2681,8 +2681,8 @@ project(':tools') {
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation testFixtures(project(':storage'))
testImplementation project(':streams')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams:integration-tests').sourceSets.test.output
testImplementation testFixtures(project(':streams'))
testImplementation testFixtures(project(':streams:integration-tests'))
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
Expand Down Expand Up @@ -2832,6 +2832,8 @@ project(':streams') {
archivesName = "kafka-streams"
}

apply plugin: 'java-test-fixtures'

ext.buildStreamsVersionFileName = "kafka-streams-version.properties"

configurations {
Expand All @@ -2847,6 +2849,12 @@ project(':streams') {
implementation libs.jacksonDatabind
implementation libs.slf4jApi

testFixturesImplementation testFixtures(project(':clients'))
testFixturesImplementation libs.hamcrest
testFixturesImplementation libs.junitJupiter
testFixturesImplementation libs.mockitoCore
testFixturesImplementation libs.slf4jApi

// testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle
testCompileOnly project(':streams:test-utils')
testCompileOnly libs.bndlib
Expand Down Expand Up @@ -2996,7 +3004,7 @@ project(':streams:streams-scala') {
api project(':streams')

api libs.scalaLibrary
testImplementation project(':streams').sourceSets.test.output
testImplementation testFixtures(project(':streams'))
testImplementation testFixtures(project(':clients'))
testImplementation project(':streams:test-utils')

Expand Down Expand Up @@ -3043,16 +3051,34 @@ project(':streams:integration-tests') {
archivesName = "kafka-streams-integration-tests"
}

apply plugin: 'java-test-fixtures'

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the purpose of adding testfixtures to streams:integration-tests was to let the tools module use EmbeddedKafkaCluster. However, I grepped the usages and it seems they can be replaced by ClusterInstance, which would prevent introducing unnecessary module complexity

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


dependencies {
implementation libs.slf4jApi

testFixturesImplementation testFixtures(project(':clients'))
testFixturesImplementation testFixtures(project(':server-common'))
testFixturesImplementation testFixtures(project(':streams'))
testFixturesImplementation project(':group-coordinator')
testFixturesImplementation project(':server')
testFixturesImplementation project(':server-common')
testFixturesImplementation project(':storage')
testFixturesImplementation project(':streams')
testFixturesImplementation project(':streams:test-utils')
testFixturesImplementation project(':test-common:test-common-runtime')
testFixturesImplementation project(':transaction-coordinator')
testFixturesImplementation libs.hamcrest
testFixturesImplementation libs.junitJupiter
testFixturesImplementation libs.slf4jApi

testImplementation testFixtures(project(':clients'))
testImplementation testFixtures(project(':streams'))
testImplementation project(':group-coordinator')
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation testFixtures(project(':server-common'))
testImplementation project(':storage')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':tools')
testImplementation project(':transaction-coordinator')
Expand All @@ -3065,6 +3091,11 @@ project(':streams:integration-tests') {
testImplementation project(':streams:test-utils')

testRuntimeOnly runtimeTestLibs
// opentelemetryProto is an implementation dep of ':clients' that gets shaded into the clients
// shadow JAR. When testFixtures(project(':clients')) puts raw (unshaded) client class files on
// the classpath, ClientTelemetryProvider references the unshaded io.opentelemetry.proto.* classes
// directly, so the unshaded JAR must be present at runtime as well.
testRuntimeOnly libs.opentelemetryProto
}
}

Expand Down Expand Up @@ -4177,6 +4208,8 @@ gradle.projectsEvaluated {
'metadata': ':metadata',
'raft': ':raft',
'connect/runtime': ':connect:runtime',
'streams': ':streams',
'streams/integration-tests': ':streams:integration-tests',
]
allprojects { proj ->
proj.configurations.all { config ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,25 @@
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.streams.kstream.internals.KTableKTableForeignKeyJoinScenarioTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchemaTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerdeTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerdeTest;

import org.junit.platform.suite.api.SelectClasses;
import org.junit.platform.suite.api.Suite;

/**
* This suite runs all the tests related to the KTable-KTable foreign key join feature.
* This suite runs the integration tests related to the KTable-KTable foreign key join feature.
*
* It can be used from an IDE to selectively just run these tests when developing code related to KTable-KTable
* foreign key join.
* It can be used from an IDE to selectively just run these integration tests when developing code
* related to KTable-KTable foreign key join. The unit tests for this feature live in the
* {@code streams} module; see
* {@code org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignKeyJoinSuite}.
*
* If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all
* these tests are already included in the `:streams:test` task.
* If desired, it can also be added to a Gradle build task, although this isn't strictly necessary,
* since all these tests are already included in the {@code :streams:integration-tests:test} task.
*/
@Suite
@SelectClasses({
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
KTableKTableForeignKeyJoinIntegrationTest.class,
KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
KTableKTableForeignKeyJoinScenarioTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,
ResponseJoinProcessorSupplierTest.class
})
public class ForeignKeyJoinSuite {
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StateListenerStub;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
Expand Down Expand Up @@ -294,7 +294,7 @@ private void verifyOffsetLagFetch(final List<KafkaStreams> streamsList,

private void verifyAllKVKeys(final List<KafkaStreams> streamsList,
final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListener,
final StateListenerStub stateListener,
final Set<String> keys,
final String storeName,
final long timeout,
Expand Down Expand Up @@ -346,7 +346,7 @@ private void verifyAllKVKeys(final List<KafkaStreams> streamsList,

private void verifyAllWindowedKeys(final List<KafkaStreams> streamsList,
final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
final StateListenerStub stateListenerStub,
final Set<String> keys,
final String storeName,
final Long from,
Expand Down Expand Up @@ -531,7 +531,7 @@ public void shouldRejectWronglyTypedStore(final TestInfo testInfo) {
public void shouldBeAbleToQueryDuringRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
final List<KafkaStreams> streamsList = new ArrayList<>(numThreads);
final List<KafkaStreamsTest.StateListenerStub> listeners = new ArrayList<>(numThreads);
final List<StateListenerStub> listeners = new ArrayList<>(numThreads);

final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
producerRunnable.run();
Expand All @@ -546,7 +546,7 @@ public void shouldBeAbleToQueryDuringRebalance() throws Exception {
props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i);
final KafkaStreams streams =
createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props);
final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub();
final StateListenerStub listener = new StateListenerStub();
streams.setStateListener(listener);
listeners.add(listener);
streamsList.add(streams);
Expand Down Expand Up @@ -632,7 +632,7 @@ public void shouldBeAbleToQueryDuringRebalance() throws Exception {
public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
final List<KafkaStreams> streamsList = new ArrayList<>(numThreads);
final List<KafkaStreamsTest.StateListenerStub> listeners = new ArrayList<>(numThreads);
final List<StateListenerStub> listeners = new ArrayList<>(numThreads);

final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
producerRunnable.run();
Expand All @@ -649,7 +649,7 @@ public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("shouldBeAbleQueryStandbyStateDuringRebalance-" + i).getPath());
final KafkaStreams streams =
createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props);
final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub();
final StateListenerStub listener = new StateListenerStub();
streams.setStateListener(listener);
listeners.add(listener);
streamsList.add(streams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,21 @@
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStoreTest;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProviderTest;
import org.apache.kafka.streams.state.internals.WrappingStoreProviderTest;

import org.junit.platform.suite.api.SelectClasses;
import org.junit.platform.suite.api.Suite;

/**
* This suite runs all the tests related to querying StateStores (IQ).
* This suite runs the integration tests related to querying StateStores (IQ).
*
* It can be used from an IDE to selectively just run these tests.
* It can be used from an IDE to selectively just run these integration tests. The unit tests for
* StateStore querying live in the {@code streams} module; see
* {@code org.apache.kafka.streams.state.internals.StoreQuerySuite}.
*
* Tests ending in the word "Suite" are excluded from the gradle build because it
* already runs the component tests individually.
*/
@Suite
@SelectClasses({
CompositeReadOnlyKeyValueStoreTest.class,
CompositeReadOnlyWindowStoreTest.class,
CompositeReadOnlySessionStoreTest.class,
GlobalStateStoreProviderTest.class,
StreamThreadStateStoreProviderTest.class,
WrappingStoreProviderTest.class,
QueryableStateIntegrationTest.class,
})
public class StoreQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -164,23 +163,6 @@ public class KafkaStreamsTest {
private MockedConstruction<GlobalStreamThread> globalStreamThreadMockedConstruction;
private MockedConstruction<Metrics> metricsMockedConstruction;

public static class StateListenerStub implements KafkaStreams.StateListener {
int numChanges = 0;
KafkaStreams.State oldState;
KafkaStreams.State newState;
public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();

@Override
public void onChange(final KafkaStreams.State newState,
final KafkaStreams.State oldState) {
final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0;
numChanges++;
this.oldState = oldState;
this.newState = newState;
mapStates.put(newState, prevCount + 1);
}
}

@BeforeEach
public void before(final TestInfo testInfo) throws Exception {
time = new MockTime();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.streams.kstream.internals.KTableKTableForeignKeyJoinScenarioTest;

import org.junit.platform.suite.api.SelectClasses;
import org.junit.platform.suite.api.Suite;

/**
* This suite runs all the unit tests related to the KTable-KTable foreign key join feature.
*
* It can be used from an IDE to selectively just run these tests when developing code related to
* KTable-KTable foreign key join. The integration tests for this feature live in the
* {@code streams:integration-tests} module; see
* {@code org.apache.kafka.streams.integration.ForeignKeyJoinSuite}.
*
* Tests ending in the word "Suite" are excluded from the gradle build because it
* already runs the component tests individually.
*/
@Suite
@SelectClasses({
KTableKTableForeignKeyJoinScenarioTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,
ResponseJoinProcessorSupplierTest.class
})
public class ForeignKeyJoinSuite {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.junit.platform.suite.api.SelectClasses;
import org.junit.platform.suite.api.Suite;

/**
* This suite runs all the unit tests related to querying StateStores (IQ).
*
* It can be used from an IDE to selectively just run these tests. The integration tests for
* StateStore querying live in the {@code streams:integration-tests} module; see
* {@code org.apache.kafka.streams.integration.StoreQuerySuite}.
*
* Tests ending in the word "Suite" are excluded from the gradle build because it
* already runs the component tests individually.
*/
@Suite
@SelectClasses({
CompositeReadOnlyKeyValueStoreTest.class,
CompositeReadOnlyWindowStoreTest.class,
CompositeReadOnlySessionStoreTest.class,
GlobalStateStoreProviderTest.class,
StreamThreadStateStoreProviderTest.class,
WrappingStoreProviderTest.class,
})
public class StoreQuerySuite {
}
Loading
Loading