Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@
* props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
* Topology topology = ...
* TopologyTestDriver driver = new TopologyTestDriver(topology, props);
* TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
* .withConfig(props)
* .build();
* }</pre>
*
* <p> Note that the {@code TopologyTestDriver} processes input records synchronously.
Expand Down Expand Up @@ -265,45 +267,53 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
};

/**
* Create a new test diver instance.
* Create a new test driver instance.
* Default test properties are used to initialize the driver instance
*
* @param topology the topology to be tested
* @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
@Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology) {
this(topology, new Properties());
}

/**
* Create a new test diver instance.
* Create a new test driver instance.
* Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
*
* @param topology the topology to be tested
* @param config the configuration for the topology
* @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
@Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Properties config) {
this(topology, config, null);
}

/**
* Create a new test diver instance.
* Create a new test driver instance.
*
* @param topology the topology to be tested
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
* @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
@Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Instant initialWallClockTimeMs) {
this(topology, new Properties(), initialWallClockTimeMs);
}

/**
* Create a new test diver instance.
* Create a new test driver instance.
*
* @param topology the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTime the initial value of internally mocked wall-clock time
* @deprecated Since 4.4. Use {@link TopologyTestDriverBuilder} instead.
*/
@Deprecated(since = "4.4")
public TopologyTestDriver(final Topology topology,
final Properties config,
final Instant initialWallClockTime) {
Expand All @@ -314,15 +324,16 @@ public TopologyTestDriver(final Topology topology,
}

/**
* Create a new test diver instance.
* Create a new test driver instance. Package-private core constructor shared by the (deprecated)
* public constructors and by {@link TopologyTestDriverBuilder}, which is the blessed entry point.
*
* @param builder builder for the topology to be tested
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
final Properties configCopy = new Properties();
configCopy.putAll(config);
configCopy.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
Expand Down Expand Up @@ -450,7 +461,7 @@ private void setupGlobalTask(final Time mockWallClockTime,

@SuppressWarnings("deprecation")
final boolean globalEnabled = streamsConfig.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
final ProcessingExceptionHandler processingExceptionHandler =
final ProcessingExceptionHandler processingExceptionHandler =
globalEnabled ? streamsConfig.processingExceptionHandler() : null;

globalStateTask = new GlobalStateUpdateTask(
Expand Down Expand Up @@ -837,7 +848,8 @@ <K, V> TestRecord<K, V> readRecord(final String topic,
}
final K key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key());
final V value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value());
return new TestRecord<>(key, value, record.headers(), record.timestamp());
final int outputPartition = -1;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For single-partition mode, should we set 1 ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts?

@sebastienviale sebastienviale Jun 27, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in the KIP it is specified that single-partition mode will carry partition 0.

In that case, all existing tests that compare a TestRecord via equals()/equalTo() against an expected record built from a partition-less constructor will fail (e.g. the test below), since the expected record defaults to partition -1 while the actual output record would now carry 0.

For example:

@Test
public void testStartTimestamp() {
final Duration advance = Duration.ofSeconds(2);
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, Duration.ZERO);
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "World");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "World", null, testBaseTime.toEpochMilli()))));
inputTopic.advanceTime(advance);
inputTopic.pipeInput(3L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(3L, "Kafka", testBaseTime.plus(advance)))));
}

If we change it to 0 now, we'll need to update all the affected tests by using equalsIgnorePartition(). We can do that now in this PR, or in a follow-up PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, I did mean 0 of course...

But your point about breaking existing tests is crucial. In the end, it does not only affect our tests in kafka.git but all existing user tests, too, and we should not break them. So I think we better keep -1 and update the KIP accordingly, and explain why -1 is necessary for single-partition mode to preserve backward compatiblity?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the KIP in the TestRecord class changes and Compatibility, Deprecation, and Migration Plan sections

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thanks.

Can you send a shote note to the VOTE thread for the KIP, calling out this small change. It's best practice to call out small changes (no need to re-vote of course for such a small change).

return new TestRecord<>(key, value, record.headers(), Instant.ofEpochMilli(record.timestamp()), outputPartition);
Comment thread
mjsax marked this conversation as resolved.
}

<K, V> void pipeRecord(final String topic,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

/**
* Fluent builder for a {@link TopologyTestDriver}.
*
* <p>This is the entry point for constructing a {@link TopologyTestDriver}.
* Configure the builder and call {@link #build()}.
* The {@link TopologyTestDriver} constructors remain functional but are deprecated in favor of
* this builder.</p>
*
* <pre>{@code
* TopologyTestDriver driver = new TopologyTestDriverBuilder(topology)
* .withConfig(props)
* .withInitialWallClockTime(Instant.ofEpochMilli(0))
* .build();
* }</pre>
*/
public class TopologyTestDriverBuilder {
Comment thread
mjsax marked this conversation as resolved.

private final Topology topology;
private Properties config = new Properties();
Comment thread
mjsax marked this conversation as resolved.
private Optional<Instant> initialWallClockTime = Optional.empty();

/**
* Start building a driver for the given topology.
*
* @param topology the topology to be tested
*/
public TopologyTestDriverBuilder(final Topology topology) {
this.topology = Objects.requireNonNull(topology, "topology cannot be null");
}

/**
* Set the configuration passed to the driver. Optional; defaults to empty {@link Properties}.
*
* @param config the configuration for the topology
* @return this builder
*/
public TopologyTestDriverBuilder withConfig(final Properties config) {
this.config = Objects.requireNonNull(config, "config cannot be null");
return this;
}

/**
* Set the initial value of the driver's internally mocked wall-clock time. Optional; defaults to
* the current system time.
*
* @param initialWallClockTime the initial mocked wall-clock time
* @return this builder
*/
public TopologyTestDriverBuilder withInitialWallClockTime(final Instant initialWallClockTime) {
this.initialWallClockTime = Optional.ofNullable(initialWallClockTime);
return this;
}

/**
* Build the driver: construct it and apply all declared topic partition counts.
*
* @return a ready-to-use {@link TopologyTestDriver}
*/
public TopologyTestDriver build() {
return new TopologyTestDriver(
topology.internalTopologyBuilder,
config,
initialWallClockTime.map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis));
}
}
Loading
Loading