diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index bad4fd58a7f45..7573f8a0f1e19 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -161,7 +161,9 @@
* props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* Topology topology = ...
- * TopologyTestDriver driver = new TopologyTestDriver(topology, props);
+ * TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
+ * .withConfig(props)
+ * .build();
* }
*
*
Note that the {@code TopologyTestDriver} processes input records synchronously.
@@ -265,45 +267,53 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
};
/**
- * Create a new test diver instance.
+ * Create a new test driver instance.
* Default test properties are used to initialize the driver instance
*
* @param topology the topology to be tested
+ * @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
+ @Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology) {
this(topology, new Properties());
}
/**
- * Create a new test diver instance.
+ * Create a new test driver instance.
* Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
*
* @param topology the topology to be tested
* @param config the configuration for the topology
+ * @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
+ @Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Properties config) {
this(topology, config, null);
}
/**
- * Create a new test diver instance.
+ * Create a new test driver instance.
*
* @param topology the topology to be tested
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
+ * @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
+ @Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Instant initialWallClockTimeMs) {
this(topology, new Properties(), initialWallClockTimeMs);
}
/**
- * Create a new test diver instance.
+ * Create a new test driver instance.
*
* @param topology the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTime the initial value of internally mocked wall-clock time
+ * @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
+ @Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Properties config,
final Instant initialWallClockTime) {
@@ -314,15 +324,16 @@ public TopologyTestDriver(final Topology topology,
}
/**
- * Create a new test diver instance.
+ * Create a new test driver instance. Package-private core constructor shared by the (deprecated)
+ * public constructors and by {@link TopologyTestDriverBuilder}, which is the blessed entry point.
*
* @param builder builder for the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
- private TopologyTestDriver(final InternalTopologyBuilder builder,
- final Properties config,
- final long initialWallClockTimeMs) {
+ TopologyTestDriver(final InternalTopologyBuilder builder,
+ final Properties config,
+ final long initialWallClockTimeMs) {
final Properties configCopy = new Properties();
configCopy.putAll(config);
configCopy.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
@@ -450,7 +461,7 @@ private void setupGlobalTask(final Time mockWallClockTime,
@SuppressWarnings("deprecation")
final boolean globalEnabled = streamsConfig.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
- final ProcessingExceptionHandler processingExceptionHandler =
+ final ProcessingExceptionHandler processingExceptionHandler =
globalEnabled ? streamsConfig.processingExceptionHandler() : null;
globalStateTask = new GlobalStateUpdateTask(
@@ -837,7 +848,8 @@ TestRecord readRecord(final String topic,
}
final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
- return new TestRecord<>(key, value, record.headers(), record.timestamp());
+ final int outputPartition = -1;
+ return new TestRecord<>(key, value, record.headers(), Instant.ofEpochMilli(record.timestamp()), outputPartition);
}
void pipeRecord(final String topic,
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriverBuilder.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriverBuilder.java
new file mode 100644
index 0000000000000..bd7aa527bdfc0
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriverBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Fluent builder for a {@link TopologyTestDriver}.
+ *
+ * This is the entry point for constructing a {@link TopologyTestDriver}.
+ * Configure the builder and call {@link #build()}.
+ * The {@link TopologyTestDriver} constructors remain functional but are deprecated in favor of
+ * this builder.
+ *
+ * {@code
+ * TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
+ * .withConfig(props)
+ * .withInitialWallClockTime(Instant.ofEpochMilli(0))
+ * .build();
+ * }
+ */
+public class TopologyTestDriverBuilder {
+
+ private final Topology topology;
+ private Properties config = new Properties();
+ private Optional initialWallClockTime = Optional.empty();
+
+ /**
+ * Start building a driver for the given topology.
+ *
+ * @param topology the topology to be tested
+ */
+ public TopologyTestDriverBuilder(final Topology topology) {
+ this.topology = Objects.requireNonNull(topology, "topology cannot be null");
+ }
+
+ /**
+ * Set the configuration passed to the driver. Optional; defaults to empty {@link Properties}.
+ *
+ * @param config the configuration for the topology
+ * @return this builder
+ */
+ public TopologyTestDriverBuilder withConfig(final Properties config) {
+ this.config = Objects.requireNonNull(config, "config cannot be null");
+ return this;
+ }
+
+ /**
+ * Set the initial value of the driver's internally mocked wall-clock time. Optional; defaults to
+ * the current system time.
+ *
+ * @param initialWallClockTime the initial mocked wall-clock time
+ * @return this builder
+ */
+ public TopologyTestDriverBuilder withInitialWallClockTime(final Instant initialWallClockTime) {
+ this.initialWallClockTime = Optional.ofNullable(initialWallClockTime);
+ return this;
+ }
+
+ /**
+ * Build the driver: construct it and apply all declared topic partition counts.
+ *
+ * @return a ready-to-use {@link TopologyTestDriver}
+ */
+ public TopologyTestDriver build() {
+ return new TopologyTestDriver(
+ topology.internalTopologyBuilder,
+ config,
+ initialWallClockTime.map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis));
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index c2fe5a5e94c2e..44542d388467b 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -424,18 +424,33 @@ private Topology setupTopologyWithInternalTopic(final String firstTableName,
@Test
public void shouldNotRequireParameters() {
- new TopologyTestDriver(setupSingleProcessorTopology(), config);
+ new TopologyTestDriverBuilder(setupSingleProcessorTopology())
+ .withConfig(config)
+ .build();
+ }
+
+ @Test
+ public void shouldThrowWhenConfigIsNull() {
+ final NullPointerException exception = assertThrows(
+ NullPointerException.class,
+ () -> new TopologyTestDriverBuilder(setupSingleProcessorTopology())
+ .withConfig(null));
+ assertEquals("config cannot be null", exception.getMessage());
}
@Test
public void shouldInitProcessor() {
- testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSingleProcessorTopology())
+ .withConfig(config)
+ .build();
assertTrue(mockProcessors.get(0).initialized);
}
@Test
public void shouldCloseProcessor() {
- testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSingleProcessorTopology())
+ .withConfig(config)
+ .build();
testDriver.close();
assertTrue(mockProcessors.get(0).closed);
// As testDriver is already closed, bypassing @AfterEach tearDown testDriver.close().
@@ -444,7 +459,7 @@ public void shouldCloseProcessor() {
@Test
public void shouldThrowForUnknownTopic() {
- testDriver = new TopologyTestDriver(new Topology());
+ testDriver = new TopologyTestDriverBuilder(new Topology()).build();
assertThrows(
IllegalArgumentException.class,
() -> testDriver.pipeRecord(
@@ -458,7 +473,7 @@ public void shouldThrowForUnknownTopic() {
@Test
public void shouldThrowForMissingTime() {
- testDriver = new TopologyTestDriver(new Topology());
+ testDriver = new TopologyTestDriverBuilder(new Topology()).build();
assertThrows(
IllegalStateException.class,
() -> testDriver.pipeRecord(
@@ -471,7 +486,9 @@ public void shouldThrowForMissingTime() {
@Test
public void shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
- testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSourceSinkTopology())
+ .withConfig(config)
+ .build();
final TestOutputTopic outputTopic = new TestOutputTopic<>(
testDriver,
"unused-topic",
@@ -485,7 +502,9 @@ public void shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRout
@Test
public void shouldCaptureSinkTopicNamesIfWrittenInto() {
- testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSourceSinkTopology())
+ .withConfig(config)
+ .build();
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet()));
@@ -495,10 +514,11 @@ public void shouldCaptureSinkTopicNamesIfWrittenInto() {
@Test
public void shouldCaptureInternalTopicNamesIfWrittenInto() {
- testDriver = new TopologyTestDriver(
- setupTopologyWithInternalTopic("table1", "table2", "join"),
- config
- );
+ testDriver = new TopologyTestDriverBuilder(
+ setupTopologyWithInternalTopic("table1", "table2", "join"))
+ .withConfig(config)
+ .build();
+
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet()));
@@ -531,7 +551,9 @@ public void shouldCaptureGlobalTopicNameIfWrittenInto() {
builder.globalTable(SOURCE_TOPIC_1, Materialized.as("globalTable"));
builder.stream(SOURCE_TOPIC_2).to(SOURCE_TOPIC_1);
- testDriver = new TopologyTestDriver(builder.build(), config);
+ testDriver = new TopologyTestDriverBuilder(builder.build())
+ .withConfig(config)
+ .build();
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet()));
@@ -544,7 +566,9 @@ public void shouldCaptureGlobalTopicNameIfWrittenInto() {
@Test
public void shouldProcessRecordForTopic() {
- testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSourceSinkTopology())
+ .withConfig(config)
+ .build();
pipeRecord(SOURCE_TOPIC_1, testRecord1);
final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
@@ -556,7 +580,9 @@ public void shouldProcessRecordForTopic() {
@Test
public void shouldSetRecordMetadata() {
- testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSingleProcessorTopology())
+ .withConfig(config)
+ .build();
pipeRecord(SOURCE_TOPIC_1, testRecord1);
@@ -576,7 +602,9 @@ private void pipeRecord(final String topic, final TestRecord rec
@Test
public void shouldSendRecordViaCorrectSourceTopic() {
- testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
+ testDriver = new TopologyTestDriverBuilder(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2))
+ .withConfig(config)
+ .build();
final List processedRecords1 = mockProcessors.get(0).processedRecords;
final List processedRecords2 = mockProcessors.get(1).processedRecords;
@@ -634,7 +662,7 @@ public void shouldUseSourceSpecificDeserializers() {
},
processor);
- testDriver = new TopologyTestDriver(topology);
+ testDriver = new TopologyTestDriverBuilder(topology).build();
final Long source1Key = 42L;
final String source1Value = "anyString";
@@ -667,7 +695,9 @@ public void shouldUseSourceSpecificDeserializers() {
@Test
public void shouldPassRecordHeadersIntoSerializersAndDeserializers() {
- testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+ testDriver = new TopologyTestDriverBuilder(setupSourceSinkTopology())
+ .withConfig(config)
+ .build();
final AtomicBoolean passedHeadersToKeySerializer = new AtomicBoolean(false);
final AtomicBoolean passedHeadersToValueSerializer = new AtomicBoolean(false);
@@ -727,7 +757,7 @@ public void shouldUseSinkSpecificSerializers() {
topology.addSink("sink-1", SINK_TOPIC_1, new LongSerializer(), new StringSerializer(), sourceName1);
topology.addSink("sink-2", SINK_TOPIC_2, new IntegerSerializer(), new DoubleSerializer(), sourceName2);
- testDriver = new TopologyTestDriver(topology);
+ testDriver = new TopologyTestDriverBuilder(topology).build();
final Long source1Key = 42L;
final String source1Value = "anyString";
@@ -760,7 +790,9 @@ public void shouldUseSinkSpecificSerializers() {
@Test
public void shouldForwardRecordsFromSubtopologyToSubtopology() {
- testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies(), config);
+ testDriver = new TopologyTestDriverBuilder(setupTopologyWithTwoSubtopologies())
+ .withConfig(config)
+ .build();
pipeRecord(SOURCE_TOPIC_1, testRecord1);
@@ -777,7 +809,7 @@ public void shouldForwardRecordsFromSubtopologyToSubtopology() {
@Test
public void shouldPopulateGlobalStore() {
- testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
+ testDriver = new TopologyTestDriverBuilder(setupGlobalStoreTopology(SOURCE_TOPIC_1)).withConfig(config).build();
final KeyValueStore globalStore = testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore");
assertNotNull(globalStore);
@@ -791,10 +823,10 @@ public void shouldPopulateGlobalStore() {
@Test
public void shouldPunctuateOnStreamsTime() {
final MockPunctuator mockPunctuator = new MockPunctuator();
- testDriver = new TopologyTestDriver(
- setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator),
- config
- );
+ testDriver = new TopologyTestDriverBuilder(
+ setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator))
+ .withConfig(config)
+ .build();
final List expectedPunctuations = new LinkedList<>();
@@ -841,9 +873,11 @@ public void shouldPunctuateOnStreamsTime() {
@Test
public void shouldPunctuateOnWallClockTime() {
final MockPunctuator mockPunctuator = new MockPunctuator();
- testDriver = new TopologyTestDriver(
- setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator),
- config, Instant.ofEpochMilli(0L));
+ testDriver = new TopologyTestDriverBuilder(
+ setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator))
+ .withConfig(config)
+ .withInitialWallClockTime(Instant.ofEpochMilli(0L))
+ .build();
final List expectedPunctuations = new LinkedList<>();
@@ -890,7 +924,7 @@ public void shouldReturnAllStores() {
"globalProcessorName",
voidProcessorSupplier);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
final Set expectedStoreNames = new HashSet<>();
expectedStoreNames.add("store");
@@ -946,7 +980,7 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
globalVersionedKeyValueStoreName);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
// verify state stores
assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
@@ -1129,7 +1163,7 @@ private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean
globalVersionedKeyValueStoreName);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
{
final IllegalArgumentException e = assertThrows(
@@ -1400,7 +1434,7 @@ public void shouldReturnAllStoresNames() {
"globalProcessorName",
voidProcessorSupplier);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
final Set expectedStoreNames = new HashSet<>();
expectedStoreNames.add("store");
@@ -1425,7 +1459,7 @@ private void setup(final KeyValueBytesStoreSupplier storeSupplier) {
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
@@ -1511,7 +1545,7 @@ public void shouldNotResetRecordContextWhenAccessingStateStore() {
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
final TestInputTopic input =
testDriver.createInputTopic("input-topic", new StringSerializer(), new LongSerializer());
@@ -1607,7 +1641,7 @@ public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve better test coverage
"aggregator");
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
@@ -1649,7 +1683,7 @@ public void process(final Record record) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
- try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
+ try (final TopologyTestDriver testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build()) {
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
testDriver.pipeRecord("input-topic", new TestRecord<>("a", 1L),
new StringSerializer(), new LongSerializer(), Instant.now());
@@ -1657,7 +1691,7 @@ public void process(final Record record) {
}
- try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
+ try (final TopologyTestDriver testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build()) {
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"),
"Closing the prior test driver should have cleaned up this store and value.");
}
@@ -1670,7 +1704,7 @@ public void shouldFeedStoreFromGlobalKTable() {
builder.globalTable("topic",
Consumed.with(Serdes.String(), Serdes.String()),
Materialized.as("globalStore"));
- try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
+ try (final TopologyTestDriver testDriver = new TopologyTestDriverBuilder(builder.build()).withConfig(config).build()) {
final KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore");
assertNotNull(globalStore);
assertNotNull(testDriver.getAllStateStores().get("globalStore"));
@@ -1710,7 +1744,7 @@ public void shouldProcessFromSourcesThatMatchMultiplePattern() {
final TestRecord consumerRecord2 = new TestRecord<>(key2, value2, null, timestamp2);
- testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
+ testDriver = new TopologyTestDriverBuilder(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2)).withConfig(config).build();
final List processedRecords1 = mockProcessors.get(0).processedRecords;
final List processedRecords2 = mockProcessors.get(1).processedRecords;
@@ -1744,7 +1778,7 @@ public void shouldProcessFromSourceThatMatchPattern() {
topology.addSource(sourceName, pattern2Source1);
topology.addSink("sink", SINK_TOPIC_1, sourceName);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
pipeRecord(SOURCE_TOPIC_1, testRecord1);
final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
@@ -1763,7 +1797,7 @@ public void shouldThrowPatternNotValidForTopicNameException() {
topology.addSource(sourceName, pattern2Source1);
topology.addSink("sink", SINK_TOPIC_1, sourceName);
- testDriver = new TopologyTestDriver(topology, config);
+ testDriver = new TopologyTestDriverBuilder(topology).withConfig(config).build();
try {
pipeRecord(SOURCE_TOPIC_1, testRecord1);
} catch (final TopologyException exception) {
@@ -1826,7 +1860,7 @@ public void process(final Record record) {
topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
- try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology)) {
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriverBuilder(topology).build()) {
final TestInputTopic in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
@@ -1897,7 +1931,7 @@ public void process(final Record record) {
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
- try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology)) {
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriverBuilder(topology).build()) {
final TestInputTopic in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
@@ -1932,7 +1966,7 @@ public void shouldRespectTaskIdling() {
topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), "input2");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "source1", "source2");
- try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriverBuilder(topology).withConfig(properties).build()) {
final TestInputTopic in1 = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
final TestInputTopic in2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer());
final TestOutputTopic out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());