Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 94 additions & 93 deletions docs/streams/developer-guide/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ To verify the output, you can use `TestOutputTopic` where you configure the topi


TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer());
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L)));
assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue());

`TopologyTestDriver` supports punctuations, too. Event-time punctuations are triggered automatically based on the processed records' timestamps. Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).

Expand All @@ -98,134 +98,134 @@ The following example demonstrates how to use the test driver and helper classes
private TestInputTopic<String, Long> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store;
private Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde();
@Before

private final Serde<String> stringSerde = Serdes.String();
private final Serde<Long> longSerde = Serdes.Long();

@BeforeEach
public void setup() {
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("aggStore"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
"aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");


var topology = new Topology()
.addSource("sourceProcessor", "input-topic")
.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor")
.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("aggStore"),
stringSerde,
longSerde
),
"aggregator")
.addSink("sinkProcessor", "result-topic", "aggregator");

// setup test driver
Properties props = new Properties();
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
var props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, longSerde.getClass().getName());
testDriver = new TopologyTestDriver(topology, props);

// setup test topics
inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());

// pre-populate store
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
}
@After

@AfterEach
public void tearDown() {
testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
inputTopic.pipeInput("a", 1L);
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

@Test
public void shouldNotUpdateStoreForSmallerValue() {
inputTopic.pipeInput("a", 1L);
assertThat(store.get("a"), equalTo(21L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
assertEquals(21L, store.get("a"));
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

@Test
public void shouldNotUpdateStoreForLargerValue() {
public void shouldUpdateStoreForLargerValue() {
inputTopic.pipeInput("a", 42L);
assertThat(store.get("a"), equalTo(42L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L)));
assertThat(outputTopic.isEmpty(), is(true));
assertEquals(42L, store.get("a"));
assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

@Test
public void shouldUpdateStoreForNewKey() {
inputTopic.pipeInput("b", 21L);
assertThat(store.get("b"), equalTo(21L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
assertEquals(21L, store.get("b"));
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
assertEquals(KeyValue.pair("b", 21L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

@Test
public void shouldPunctuateIfEvenTimeAdvances() {
final Instant recordTime = Instant.now();
inputTopic.pipeInput("a", 1L, recordTime);
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
inputTopic.pipeInput("a", 1L, recordTime);
assertThat(outputTopic.isEmpty(), is(true));
inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
public void shouldPunctuateIfStreamTimeAdvances() {
var recordTime = Instant.now();
inputTopic.pipeInput("a", 1L, recordTime);
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());

inputTopic.pipeInput("a", 1L, recordTime);
assertTrue(outputTopic.isEmpty());

inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10));
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
testDriver.advanceWallClockTime(Duration.ofSeconds(60));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue());
assertTrue(outputTopic.isEmpty());
}

public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long, String, Long> {
@Override
public Processor<String, Long> get() {
public Processor<String, Long, String, Long> get() {
return new CustomMaxAggregator();
}
}
public class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;

static class CustomMaxAggregator extends ContextualProcessor<String, Long, String, Long> {

private KeyValueStore<String, Long> store;

@SuppressWarnings("unchecked")

@Override
public void init(ProcessorContext context) {
this.context = context;
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
public void init(ProcessorContext<String, Long> context) {
super.init(context);
context.schedule(Duration.ofSeconds(60), WALL_CLOCK_TIME, this::flushStore);
context.schedule(Duration.ofSeconds(10), STREAM_TIME, this::flushStore);
store = context.getStateStore("aggStore");
}

@Override
public void process(String key, Long value) {
Long oldValue = store.get(key);
if (oldValue == null || value > oldValue) {
store.put(key, value);
public void process(Record<String, Long> record) {
var oldValue = store.get(record.key());
if (oldValue == null || record.value() > oldValue) {
store.put(record.key(), record.value());
}
}

private void flushStore() {
KeyValueIterator<String, Long> it = store.all();
while (it.hasNext()) {
KeyValue<String, Long> next = it.next();
context.forward(next.key, next.value);

private void flushStore(long timestamp) {
try (var it = store.all()) {
while (it.hasNext()) {
var next = it.next();
context().forward(new Record<>(next.key, next.value, timestamp));
}
}
}

@Override
public void close() {}

}

# Unit Testing Processors
Expand Down Expand Up @@ -260,13 +260,13 @@ The mock will capture any values that your processor forwards. You can make asse
processorUnderTest.process("key", "value");

final Iterator<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded().iterator();
assertEquals(forwarded.next().record(), new Record<>(..., ...));
assertEquals(new Record<>(..., ...), forwarded.next().record());
assertFalse(forwarded.hasNext());

// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
context.resetForwards();

assertEquals(context.forwarded().size(), 0);
assertEquals(0, context.forwarded().size());

If your processor forwards to specific child processors, you can query the context for captured data by child name:

Expand Down Expand Up @@ -296,28 +296,29 @@ Once these are set, the context will continue returning the same values, until y

In case your punctuator is stateful, the mock context allows you to register state stores. You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or Session), since the mock context does _not_ manage changelogs, state directories, etc.


final KeyValueStore<String, Integer> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("myStore"),
Serdes.String(),
Serdes.Integer()
)
final KeyValueStore<String, Integer> store = Stores
.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("myStore"),
Serdes.String(),
Serdes.Integer())
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
.build();
store.init(context, store);
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);

context = new MockProcessorContext<>();
store.init(context.getStateStoreContext(), store);
context.addStateStore(store);

**Verifying punctuators**

Processors can schedule punctuators to handle periodic tasks. The mock context does _not_ automatically execute punctuators, but it does capture them to allow you to unit test them as well:


final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final long interval = capturedPunctuator.getIntervalMs();
final Duration interval = capturedPunctuator.getInterval();
final PunctuationType type = capturedPunctuator.getType();
final boolean cancelled = capturedPunctuator.cancelled();
final Punctuator punctuator = capturedPunctuator.getPunctuator();

punctuator.punctuate(/*timestamp*/ 0L);

If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a simple topology with your processor and using the [`TopologyTestDriver`](testing.html#testing-topologytestdriver).
Expand Down