diff --git a/docs/streams/developer-guide/testing.md b/docs/streams/developer-guide/testing.md index 27b7df4d10521..3264b8195065c 100644 --- a/docs/streams/developer-guide/testing.md +++ b/docs/streams/developer-guide/testing.md @@ -72,7 +72,7 @@ To verify the output, you can use `TestOutputTopic` where you configure the topi TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer()); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L))); + assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue()); `TopologyTestDriver` supports punctuations, too. Event-time punctuations are triggered automatically based on the processed records' timestamps. Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it). @@ -98,39 +98,41 @@ The following example demonstrates how to use the test driver and helper classes private TestInputTopic inputTopic; private TestOutputTopic outputTopic; private KeyValueStore store; - - private Serde stringSerde = new Serdes.StringSerde(); - private Serde longSerde = new Serdes.LongSerde(); - - @Before + + private final Serde stringSerde = Serdes.String(); + private final Serde longSerde = Serdes.Long(); + + @BeforeEach public void setup() { - Topology topology = new Topology(); - topology.addSource("sourceProcessor", "input-topic"); - topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); - topology.addStateStore( - Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("aggStore"), - Serdes.String(), - Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating - "aggregator"); - topology.addSink("sinkProcessor", "result-topic", "aggregator"); - + + var topology = new Topology() + .addSource("sourceProcessor", "input-topic") + .addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor") + .addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("aggStore"), + stringSerde, + longSerde + ), + "aggregator") + .addSink("sinkProcessor", "result-topic", "aggregator"); + // setup test driver - Properties props = new Properties(); - props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + var props = new Properties(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, longSerde.getClass().getName()); testDriver = new TopologyTestDriver(topology, props); - + // setup test topics inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer()); outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer()); - + // pre-populate store store = testDriver.getKeyValueStore("aggStore"); store.put("a", 21L); } - - @After + + @AfterEach public void tearDown() { testDriver.close(); } @@ -138,94 +140,92 @@ The following example demonstrates how to use the test driver and helper classes @Test public void shouldFlushStoreForFirstInput() { inputTopic.pipeInput("a", 1L); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - assertThat(outputTopic.isEmpty(), is(true)); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - + @Test public void shouldNotUpdateStoreForSmallerValue() { inputTopic.pipeInput("a", 1L); - assertThat(store.get("a"), equalTo(21L)); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - assertThat(outputTopic.isEmpty(), is(true)); + assertEquals(21L, store.get("a")); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - + @Test - public void shouldNotUpdateStoreForLargerValue() { + public void shouldUpdateStoreForLargerValue() { inputTopic.pipeInput("a", 42L); - assertThat(store.get("a"), equalTo(42L)); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L))); - assertThat(outputTopic.isEmpty(), is(true)); + assertEquals(42L, store.get("a")); + assertEquals(KeyValue.pair("a", 42L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - + @Test public void shouldUpdateStoreForNewKey() { inputTopic.pipeInput("b", 21L); - assertThat(store.get("b"), equalTo(21L)); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L))); - assertThat(outputTopic.isEmpty(), is(true)); + assertEquals(21L, store.get("b")); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + assertEquals(KeyValue.pair("b", 21L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - + @Test - public void shouldPunctuateIfEvenTimeAdvances() { - final Instant recordTime = Instant.now(); - inputTopic.pipeInput("a", 1L, recordTime); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - - inputTopic.pipeInput("a", 1L, recordTime); - assertThat(outputTopic.isEmpty(), is(true)); - - inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L)); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - assertThat(outputTopic.isEmpty(), is(true)); + public void shouldPunctuateIfStreamTimeAdvances() { + var recordTime = Instant.now(); + inputTopic.pipeInput("a", 1L, recordTime); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + + inputTopic.pipeInput("a", 1L, recordTime); + assertTrue(outputTopic.isEmpty()); + + inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10)); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - + @Test public void shouldPunctuateIfWallClockTimeAdvances() { testDriver.advanceWallClockTime(Duration.ofSeconds(60)); - assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L))); - assertThat(outputTopic.isEmpty(), is(true)); + assertEquals(KeyValue.pair("a", 21L), outputTopic.readKeyValue()); + assertTrue(outputTopic.isEmpty()); } - public class CustomMaxAggregatorSupplier implements ProcessorSupplier { + static class CustomMaxAggregatorSupplier implements ProcessorSupplier { @Override - public Processor get() { + public Processor get() { return new CustomMaxAggregator(); } } - - public class CustomMaxAggregator implements Processor { - ProcessorContext context; + + static class CustomMaxAggregator extends ContextualProcessor { + private KeyValueStore store; - - @SuppressWarnings("unchecked") + @Override - public void init(ProcessorContext context) { - this.context = context; - context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); - context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore()); - store = (KeyValueStore) context.getStateStore("aggStore"); + public void init(ProcessorContext context) { + super.init(context); + context.schedule(Duration.ofSeconds(60), WALL_CLOCK_TIME, this::flushStore); + context.schedule(Duration.ofSeconds(10), STREAM_TIME, this::flushStore); + store = context.getStateStore("aggStore"); } - + @Override - public void process(String key, Long value) { - Long oldValue = store.get(key); - if (oldValue == null || value > oldValue) { - store.put(key, value); + public void process(Record record) { + var oldValue = store.get(record.key()); + if (oldValue == null || record.value() > oldValue) { + store.put(record.key(), record.value()); } } - - private void flushStore() { - KeyValueIterator it = store.all(); - while (it.hasNext()) { - KeyValue next = it.next(); - context.forward(next.key, next.value); + + private void flushStore(long timestamp) { + try (var it = store.all()) { + while (it.hasNext()) { + var next = it.next(); + context().forward(new Record<>(next.key, next.value, timestamp)); + } } } - - @Override - public void close() {} + } # Unit Testing Processors @@ -260,13 +260,13 @@ The mock will capture any values that your processor forwards. You can make asse processorUnderTest.process("key", "value"); final Iterator> forwarded = context.forwarded().iterator(); - assertEquals(forwarded.next().record(), new Record<>(..., ...)); + assertEquals(new Record<>(..., ...), forwarded.next().record()); assertFalse(forwarded.hasNext()); // you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios. context.resetForwards(); - assertEquals(context.forwarded().size(), 0); + assertEquals(0, context.forwarded().size()); If your processor forwards to specific child processors, you can query the context for captured data by child name: @@ -296,17 +296,17 @@ Once these are set, the context will continue returning the same values, until y In case your punctuator is stateful, the mock context allows you to register state stores. You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or Session), since the mock context does _not_ manage changelogs, state directories, etc. - - final KeyValueStore store = - Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("myStore"), - Serdes.String(), - Serdes.Integer() - ) + final KeyValueStore store = Stores + .keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("myStore"), + Serdes.String(), + Serdes.Integer()) .withLoggingDisabled() // Changelog is not supported by MockProcessorContext. .build(); - store.init(context, store); - context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null); + + context = new MockProcessorContext<>(); + store.init(context.getStateStoreContext(), store); + context.addStateStore(store); **Verifying punctuators** @@ -314,10 +314,11 @@ Processors can schedule punctuators to handle periodic tasks. The mock context d final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); - final long interval = capturedPunctuator.getIntervalMs(); + final Duration interval = capturedPunctuator.getInterval(); final PunctuationType type = capturedPunctuator.getType(); final boolean cancelled = capturedPunctuator.cancelled(); final Punctuator punctuator = capturedPunctuator.getPunctuator(); + punctuator.punctuate(/*timestamp*/ 0L); If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a simple topology with your processor and using the [`TopologyTestDriver`](testing.html#testing-topologytestdriver).