From f8b44ef9031aeb69587d103c6f5d769472351287 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sun, 28 Jun 2026 12:54:40 +0900 Subject: [PATCH] KAFKA-20553: Remove Scala dependency from streams:integration-tests Signed-off-by: PoAn Yang --- build.gradle | 41 +++++++++++++++-- .../integration/ForeignKeyJoinSuite.java | 23 +++------- .../QueryableStateIntegrationTest.java | 14 +++--- .../streams/integration/StoreQuerySuite.java | 19 ++------ .../utils/CompositeStateListener.java | 0 .../utils/EmbeddedKafkaCluster.java | 0 .../utils/IntegrationTestUtils.java | 0 .../kafka/streams/KafkaStreamsTest.java | 18 -------- .../foreignkeyjoin/ForeignKeyJoinSuite.java | 44 +++++++++++++++++++ .../state/internals/StoreQuerySuite.java | 42 ++++++++++++++++++ .../kafka/streams/KeyValueTimestamp.java | 0 .../kafka/streams/StateListenerStub.java | 37 ++++++++++++++++ .../apache/kafka/streams/TopologyWrapper.java | 0 .../assignment/AssignmentTestUtils.java | 0 .../LegacySubscriptionInfoSerde.java | 0 .../internals/RocksDBStoreTestingUtils.java | 0 .../kafka/streams/tests/SmokeTestClient.java | 0 .../kafka/streams/tests/SmokeTestDriver.java | 0 .../kafka/streams/tests/SmokeTestUtil.java | 0 .../streams/tests/StreamsUpgradeTest.java | 0 .../apache/kafka/streams/utils/TestUtils.java | 0 .../streams/utils/UniqueTopicSerdeScope.java | 0 .../org/apache/kafka/test/MockAggregator.java | 0 .../apache/kafka/test/MockApiProcessor.java | 0 .../kafka/test/MockApiProcessorSupplier.java | 0 .../apache/kafka/test/MockClientSupplier.java | 0 .../apache/kafka/test/MockInitializer.java | 0 .../kafka/test/MockInternalTopicManager.java | 0 .../apache/kafka/test/MockKeyValueStore.java | 0 .../kafka/test/MockKeyValueStoreBuilder.java | 0 .../org/apache/kafka/test/MockMapper.java | 0 .../org/apache/kafka/test/MockProcessor.java | 0 .../kafka/test/MockProcessorSupplier.java | 0 .../apache/kafka/test/MockValueJoiner.java | 0 .../apache/kafka/test/StreamsTestUtils.java | 0 35 files changed, 178 insertions(+), 60 deletions(-) rename streams/integration-tests/src/{test => testFixtures}/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java (100%) rename streams/integration-tests/src/{test => testFixtures}/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java (100%) rename streams/integration-tests/src/{test => testFixtures}/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java (100%) create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyJoinSuite.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQuerySuite.java rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/KeyValueTimestamp.java (100%) create mode 100644 streams/src/testFixtures/java/org/apache/kafka/streams/StateListenerStub.java rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/TopologyWrapper.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/tests/SmokeTestClient.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/tests/SmokeTestDriver.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/tests/SmokeTestUtil.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/utils/TestUtils.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockAggregator.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockApiProcessor.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockApiProcessorSupplier.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockClientSupplier.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockInitializer.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockInternalTopicManager.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockKeyValueStore.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockMapper.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockProcessor.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockProcessorSupplier.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/MockValueJoiner.java (100%) rename streams/src/{test => testFixtures}/java/org/apache/kafka/test/StreamsTestUtils.java (100%) diff --git a/build.gradle b/build.gradle index 4d44740d9e631..ee22d9adc3844 100644 --- a/build.gradle +++ b/build.gradle @@ -2681,8 +2681,8 @@ project(':tools') { testImplementation project(':storage:storage-api').sourceSets.main.output testImplementation testFixtures(project(':storage')) testImplementation project(':streams') - testImplementation project(':streams').sourceSets.test.output - testImplementation project(':streams:integration-tests').sourceSets.test.output + testImplementation testFixtures(project(':streams')) + testImplementation testFixtures(project(':streams:integration-tests')) testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension @@ -2832,6 +2832,8 @@ project(':streams') { archivesName = "kafka-streams" } + apply plugin: 'java-test-fixtures' + ext.buildStreamsVersionFileName = "kafka-streams-version.properties" configurations { @@ -2847,6 +2849,12 @@ project(':streams') { implementation libs.jacksonDatabind implementation libs.slf4jApi + testFixturesImplementation testFixtures(project(':clients')) + testFixturesImplementation libs.hamcrest + testFixturesImplementation libs.junitJupiter + testFixturesImplementation libs.mockitoCore + testFixturesImplementation libs.slf4jApi + // testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle testCompileOnly project(':streams:test-utils') testCompileOnly libs.bndlib @@ -2996,7 +3004,7 @@ project(':streams:streams-scala') { api project(':streams') api libs.scalaLibrary - testImplementation project(':streams').sourceSets.test.output + testImplementation testFixtures(project(':streams')) testImplementation testFixtures(project(':clients')) testImplementation project(':streams:test-utils') @@ -3043,16 +3051,34 @@ project(':streams:integration-tests') { archivesName = "kafka-streams-integration-tests" } + apply plugin: 'java-test-fixtures' + dependencies { implementation libs.slf4jApi + testFixturesImplementation testFixtures(project(':clients')) + testFixturesImplementation testFixtures(project(':server-common')) + testFixturesImplementation testFixtures(project(':streams')) + testFixturesImplementation project(':group-coordinator') + testFixturesImplementation project(':server') + testFixturesImplementation project(':server-common') + testFixturesImplementation project(':storage') + testFixturesImplementation project(':streams') + testFixturesImplementation project(':streams:test-utils') + testFixturesImplementation project(':test-common:test-common-runtime') + testFixturesImplementation project(':transaction-coordinator') + testFixturesImplementation libs.hamcrest + testFixturesImplementation libs.junitJupiter + testFixturesImplementation libs.slf4jApi + testImplementation testFixtures(project(':clients')) + testImplementation testFixtures(project(':streams')) testImplementation project(':group-coordinator') testImplementation project(':server') testImplementation project(':server-common') testImplementation testFixtures(project(':server-common')) testImplementation project(':storage') - testImplementation project(':streams').sourceSets.test.output + testImplementation project(':streams') testImplementation project(':test-common:test-common-runtime') testImplementation project(':tools') testImplementation project(':transaction-coordinator') @@ -3065,6 +3091,11 @@ project(':streams:integration-tests') { testImplementation project(':streams:test-utils') testRuntimeOnly runtimeTestLibs + // opentelemetryProto is an implementation dep of ':clients' that gets shaded into the clients + // shadow JAR. When testFixtures(project(':clients')) puts raw (unshaded) client class files on + // the classpath, ClientTelemetryProvider references the unshaded io.opentelemetry.proto.* classes + // directly, so the unshaded JAR must be present at runtime as well. + testRuntimeOnly libs.opentelemetryProto } } @@ -4177,6 +4208,8 @@ gradle.projectsEvaluated { 'metadata': ':metadata', 'raft': ':raft', 'connect/runtime': ':connect:runtime', + 'streams': ':streams', + 'streams/integration-tests': ':streams:integration-tests', ] allprojects { proj -> proj.configurations.all { config -> diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java index ca64869a3afba..75cb63e35f92d 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java @@ -16,34 +16,25 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.streams.kstream.internals.KTableKTableForeignKeyJoinScenarioTest; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchemaTest; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerdeTest; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerdeTest; - import org.junit.platform.suite.api.SelectClasses; import org.junit.platform.suite.api.Suite; /** - * This suite runs all the tests related to the KTable-KTable foreign key join feature. + * This suite runs the integration tests related to the KTable-KTable foreign key join feature. * - * It can be used from an IDE to selectively just run these tests when developing code related to KTable-KTable - * foreign key join. + * It can be used from an IDE to selectively just run these integration tests when developing code + * related to KTable-KTable foreign key join. The unit tests for this feature live in the + * {@code streams} module; see + * {@code org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignKeyJoinSuite}. * - * If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all - * these tests are already included in the `:streams:test` task. + * If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, + * since all these tests are already included in the {@code :streams:integration-tests:test} task. */ @Suite @SelectClasses({ KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class, KTableKTableForeignKeyJoinIntegrationTest.class, KTableKTableForeignKeyJoinMaterializationIntegrationTest.class, - KTableKTableForeignKeyJoinScenarioTest.class, - CombinedKeySchemaTest.class, - SubscriptionWrapperSerdeTest.class, - SubscriptionResponseWrapperSerdeTest.class, - ResponseJoinProcessorSupplierTest.class }) public class ForeignKeyJoinSuite { } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index ae0e6226ba299..42973bfe8a77c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -30,10 +30,10 @@ import org.apache.kafka.server.util.MockTime; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; -import org.apache.kafka.streams.KafkaStreamsTest; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StateListenerStub; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -294,7 +294,7 @@ private void verifyOffsetLagFetch(final List streamsList, private void verifyAllKVKeys(final List streamsList, final KafkaStreams streams, - final KafkaStreamsTest.StateListenerStub stateListener, + final StateListenerStub stateListener, final Set keys, final String storeName, final long timeout, @@ -346,7 +346,7 @@ private void verifyAllKVKeys(final List streamsList, private void verifyAllWindowedKeys(final List streamsList, final KafkaStreams streams, - final KafkaStreamsTest.StateListenerStub stateListenerStub, + final StateListenerStub stateListenerStub, final Set keys, final String storeName, final Long from, @@ -531,7 +531,7 @@ public void shouldRejectWronglyTypedStore(final TestInfo testInfo) { public void shouldBeAbleToQueryDuringRebalance() throws Exception { final int numThreads = STREAM_TWO_PARTITIONS; final List streamsList = new ArrayList<>(numThreads); - final List listeners = new ArrayList<>(numThreads); + final List listeners = new ArrayList<>(numThreads); final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); producerRunnable.run(); @@ -546,7 +546,7 @@ public void shouldBeAbleToQueryDuringRebalance() throws Exception { props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); final KafkaStreams streams = createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props); - final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub(); + final StateListenerStub listener = new StateListenerStub(); streams.setStateListener(listener); listeners.add(listener); streamsList.add(streams); @@ -632,7 +632,7 @@ public void shouldBeAbleToQueryDuringRebalance() throws Exception { public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { final int numThreads = STREAM_TWO_PARTITIONS; final List streamsList = new ArrayList<>(numThreads); - final List listeners = new ArrayList<>(numThreads); + final List listeners = new ArrayList<>(numThreads); final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); producerRunnable.run(); @@ -649,7 +649,7 @@ public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("shouldBeAbleQueryStandbyStateDuringRebalance-" + i).getPath()); final KafkaStreams streams = createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props); - final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub(); + final StateListenerStub listener = new StateListenerStub(); streams.setStateListener(listener); listeners.add(listener); streamsList.add(streams); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java index e6b212e7d1144..e7bdffdb33922 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java @@ -16,32 +16,21 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest; -import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStoreTest; -import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest; -import org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest; -import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProviderTest; -import org.apache.kafka.streams.state.internals.WrappingStoreProviderTest; - import org.junit.platform.suite.api.SelectClasses; import org.junit.platform.suite.api.Suite; /** - * This suite runs all the tests related to querying StateStores (IQ). + * This suite runs the integration tests related to querying StateStores (IQ). * - * It can be used from an IDE to selectively just run these tests. + * It can be used from an IDE to selectively just run these integration tests. The unit tests for + * StateStore querying live in the {@code streams} module; see + * {@code org.apache.kafka.streams.state.internals.StoreQuerySuite}. * * Tests ending in the word "Suite" are excluded from the gradle build because it * already runs the component tests individually. */ @Suite @SelectClasses({ - CompositeReadOnlyKeyValueStoreTest.class, - CompositeReadOnlyWindowStoreTest.class, - CompositeReadOnlySessionStoreTest.class, - GlobalStateStoreProviderTest.class, - StreamThreadStateStoreProviderTest.class, - WrappingStoreProviderTest.class, QueryableStateIntegrationTest.class, }) public class StoreQuerySuite { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java b/streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java similarity index 100% rename from streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java rename to streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java similarity index 100% rename from streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java rename to streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java similarity index 100% rename from streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java rename to streams/integration-tests/src/testFixtures/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index e1d8af5d31890..f5fe1c585ba37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -83,7 +83,6 @@ import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -164,23 +163,6 @@ public class KafkaStreamsTest { private MockedConstruction globalStreamThreadMockedConstruction; private MockedConstruction metricsMockedConstruction; - public static class StateListenerStub implements KafkaStreams.StateListener { - int numChanges = 0; - KafkaStreams.State oldState; - KafkaStreams.State newState; - public Map mapStates = new HashMap<>(); - - @Override - public void onChange(final KafkaStreams.State newState, - final KafkaStreams.State oldState) { - final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0; - numChanges++; - this.oldState = oldState; - this.newState = newState; - mapStates.put(newState, prevCount + 1); - } - } - @BeforeEach public void before(final TestInfo testInfo) throws Exception { time = new MockTime(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyJoinSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyJoinSuite.java new file mode 100644 index 0000000000000..a2061a18c6d05 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyJoinSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; + +import org.apache.kafka.streams.kstream.internals.KTableKTableForeignKeyJoinScenarioTest; + +import org.junit.platform.suite.api.SelectClasses; +import org.junit.platform.suite.api.Suite; + +/** + * This suite runs all the unit tests related to the KTable-KTable foreign key join feature. + * + * It can be used from an IDE to selectively just run these tests when developing code related to + * KTable-KTable foreign key join. The integration tests for this feature live in the + * {@code streams:integration-tests} module; see + * {@code org.apache.kafka.streams.integration.ForeignKeyJoinSuite}. + * + * Tests ending in the word "Suite" are excluded from the gradle build because it + * already runs the component tests individually. + */ +@Suite +@SelectClasses({ + KTableKTableForeignKeyJoinScenarioTest.class, + CombinedKeySchemaTest.class, + SubscriptionWrapperSerdeTest.class, + SubscriptionResponseWrapperSerdeTest.class, + ResponseJoinProcessorSupplierTest.class +}) +public class ForeignKeyJoinSuite { +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQuerySuite.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQuerySuite.java new file mode 100644 index 0000000000000..6e64512e5d30a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQuerySuite.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.junit.platform.suite.api.SelectClasses; +import org.junit.platform.suite.api.Suite; + +/** + * This suite runs all the unit tests related to querying StateStores (IQ). + * + * It can be used from an IDE to selectively just run these tests. The integration tests for + * StateStore querying live in the {@code streams:integration-tests} module; see + * {@code org.apache.kafka.streams.integration.StoreQuerySuite}. + * + * Tests ending in the word "Suite" are excluded from the gradle build because it + * already runs the component tests individually. + */ +@Suite +@SelectClasses({ + CompositeReadOnlyKeyValueStoreTest.class, + CompositeReadOnlyWindowStoreTest.class, + CompositeReadOnlySessionStoreTest.class, + GlobalStateStoreProviderTest.class, + StreamThreadStateStoreProviderTest.class, + WrappingStoreProviderTest.class, +}) +public class StoreQuerySuite { +} diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java b/streams/src/testFixtures/java/org/apache/kafka/streams/KeyValueTimestamp.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/KeyValueTimestamp.java diff --git a/streams/src/testFixtures/java/org/apache/kafka/streams/StateListenerStub.java b/streams/src/testFixtures/java/org/apache/kafka/streams/StateListenerStub.java new file mode 100644 index 0000000000000..6e5bd24fb7354 --- /dev/null +++ b/streams/src/testFixtures/java/org/apache/kafka/streams/StateListenerStub.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.HashMap; +import java.util.Map; + +public class StateListenerStub implements KafkaStreams.StateListener { + public int numChanges = 0; + public KafkaStreams.State oldState; + public KafkaStreams.State newState; + public Map mapStates = new HashMap<>(); + + @Override + public void onChange(final KafkaStreams.State newState, + final KafkaStreams.State oldState) { + final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0; + numChanges++; + this.oldState = oldState; + this.newState = newState; + mapStates.put(newState, prevCount + 1); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/testFixtures/java/org/apache/kafka/streams/TopologyWrapper.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/TopologyWrapper.java diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/testFixtures/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java b/streams/src/testFixtures/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java b/streams/src/testFixtures/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestClient.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestClient.java diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestDriver.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestDriver.java diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestUtil.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/tests/SmokeTestUtil.java diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/testFixtures/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/testFixtures/java/org/apache/kafka/streams/utils/TestUtils.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/utils/TestUtils.java diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java b/streams/src/testFixtures/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java rename to streams/src/testFixtures/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockAggregator.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockAggregator.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockAggregator.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockApiProcessor.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockApiProcessor.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockApiProcessorSupplier.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockApiProcessorSupplier.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockClientSupplier.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockClientSupplier.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockInitializer.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockInitializer.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockInitializer.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockInternalTopicManager.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockInternalTopicManager.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockKeyValueStore.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockKeyValueStore.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockMapper.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockMapper.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockMapper.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockMapper.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockProcessor.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockProcessor.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockProcessor.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockProcessorSupplier.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockProcessorSupplier.java diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/testFixtures/java/org/apache/kafka/test/MockValueJoiner.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java rename to streams/src/testFixtures/java/org/apache/kafka/test/MockValueJoiner.java diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/testFixtures/java/org/apache/kafka/test/StreamsTestUtils.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java rename to streams/src/testFixtures/java/org/apache/kafka/test/StreamsTestUtils.java