diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java index 7b1c02e5d22ce..c3c2045fb960b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java @@ -43,9 +43,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; @@ -93,8 +94,10 @@ public void after() { cluster.stop(); } - @Test - public void shouldHaveSamePositionBoundActiveAndStandBy(final TestInfo testInfo) throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldHaveSamePositionBoundActiveAndStandBy(final boolean transactionalStateStores, + final TestInfo testInfo) throws Exception { final Semaphore semaphore = new Semaphore(0); final StreamsBuilder builder = new StreamsBuilder(); @@ -107,8 +110,8 @@ public void shouldHaveSamePositionBoundActiveAndStandBy(final TestInfo testInfo) .peek((k, v) -> semaphore.release()); final String safeTestName = safeUniqueTestName(testInfo); - final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); - final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName, transactionalStateStores)); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName, transactionalStateStores)); final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); try { @@ -200,7 +203,7 @@ private void produceValueRange() { ); } - private Properties streamsConfiguration(final String safeTestName) { + private Properties streamsConfiguration(final String safeTestName, final boolean transactionalStateStores) { final Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); @@ -214,6 +217,10 @@ private Properties streamsConfiguration(final String safeTestName) { config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); config.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true); + if (transactionalStateStores) { + config.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } return config; } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index a6cb68f5283f6..f39a937bd8e68 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -179,6 +179,21 @@ private static java.util.stream.Stream groupProtocolAndProcessingThre ); } + // Adds a sparse "transactional state stores" (KIP-892) dimension on top of the group-protocol/ + // processing-threads matrix. To avoid exploding the full matrix by 2, the transactional=true case + // is only exercised for a single, representative combination (classic protocol, processing threads + // disabled) while preserving all existing (transactional=false) coverage. Transactional state stores + // are an exactly-once-only feature; this test already runs under EXACTLY_ONCE_V2. + private static java.util.stream.Stream groupProtocolProcessingThreadsAndTransactionalParameters() { + return java.util.stream.Stream.of( + Arguments.of("classic", true, false), + Arguments.of("classic", false, false), + Arguments.of("streams", true, false), + Arguments.of("streams", false, false), + Arguments.of("classic", false, true) + ); + } + @BeforeEach public void createTopics() throws Exception { applicationId = "appId-" + TEST_NUMBER.getAndIncrement(); @@ -498,8 +513,10 @@ public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final } @ParameterizedTest - @MethodSource("groupProtocolAndProcessingThreadsParameters") - public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolProcessingThreadsAndTransactionalParameters") + public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, + final boolean processingThreadsEnabled, + final boolean transactionalStateStores) throws Exception { // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to emit all 40 update records into the output topic @@ -515,7 +532,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtoco // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered - try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) { + try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled, transactionalStateStores)) { startApplicationAndWaitUntilRunning(streams); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); @@ -772,12 +789,14 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina } @ParameterizedTest - @MethodSource("groupProtocolAndProcessingThreadsParameters") - public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolProcessingThreadsAndTransactionalParameters") + public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, + final boolean processingThreadsEnabled, + final boolean transactionalStateStores) throws Exception { final List> writtenData = prepareData(0L, 10, 0L, 1L); final List> expectedResult = computeExpectedResult(writtenData); - try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) { + try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled, transactionalStateStores)) { writeInputData(writtenData); startApplicationAndWaitUntilRunning(streams); @@ -1117,13 +1136,23 @@ private List> prepareData(final long fromInclusive, return data; } - // the threads should no longer fail one thread one at a time private KafkaStreams getKafkaStreams(final String dummyHostName, final boolean withState, final String appDir, final int numberOfStreamsThreads, final String groupProtocol, final boolean processingThreadsEnabled) { + return getKafkaStreams(dummyHostName, withState, appDir, numberOfStreamsThreads, groupProtocol, processingThreadsEnabled, false); + } + + // the threads should no longer fail one thread one at a time + private KafkaStreams getKafkaStreams(final String dummyHostName, + final boolean withState, + final String appDir, + final int numberOfStreamsThreads, + final String groupProtocol, + final boolean processingThreadsEnabled, + final boolean transactionalStateStores) { commitRequested = new AtomicInteger(0); errorInjected = new AtomicBoolean(false); stallInjected = new AtomicBoolean(false); @@ -1231,6 +1260,11 @@ public void process(final Record record) { properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); + if (transactionalStateStores) { + // This test already runs under EXACTLY_ONCE_V2 (see PROCESSING_GUARANTEE_CONFIG above); + // transactional state stores are an exactly-once-only feature. + properties.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + } final Properties config = StreamsTestUtils.getStreamsConfig( applicationId, diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 48e9ce88b9af8..6b05e23364dfb 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -74,6 +74,8 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; @@ -92,6 +94,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static java.time.Duration.ofMillis; import static java.time.Duration.ofMinutes; @@ -170,9 +173,37 @@ public void whenShuttingDown() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } + /** + * Provides (withHeaders, transactionalStateStores) pairs so representative aggregation tests keep their + * existing withHeaders={false,true} coverage (with non-transactional stores) and additionally run once + * with transactional state stores enabled. Transactional state stores (KIP-892) are an exactly-once-only + * feature, so the transactional=true case always runs under EXACTLY_ONCE_V2 (see + * {@link #maybeEnableTransactionalStateStores(boolean)}). + */ + private static Stream headersAndTransactional() { + return Stream.of( + Arguments.of(false, false), + Arguments.of(true, false), + Arguments.of(false, true) + ); + } + + /** + * When {@code transactionalStateStores} is true, enable transactional state stores under EXACTLY_ONCE_V2, + * since transactional state stores (KIP-892) are only supported with exactly-once processing. + */ + private void maybeEnableTransactionalStateStores(final boolean transactionalStateStores) { + if (transactionalStateStores) { + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + streamsConfiguration.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + } + } + @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReduce(final boolean withHeaders, final TestInfo testInfo) throws Exception { + @MethodSource("headersAndTransactional") + public void shouldReduce(final boolean withHeaders, + final boolean transactionalStateStores, + final TestInfo testInfo) throws Exception { produceMessages(mockTime.milliseconds()); groupedStream .reduce(reducer, Materialized.as("reduce-by-key")) @@ -180,6 +211,7 @@ public void shouldReduce(final boolean withHeaders, final TestInfo testInfo) thr .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + maybeEnableTransactionalStateStores(transactionalStateStores); startStreams(); @@ -224,8 +256,10 @@ private static , V extends Comparable> int compare(fi } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReduceWindowed(final boolean withHeaders, final TestInfo testInfo) throws Exception { + @MethodSource("headersAndTransactional") + public void shouldReduceWindowed(final boolean withHeaders, + final boolean transactionalStateStores, + final TestInfo testInfo) throws Exception { final long firstBatchTimestamp = mockTime.milliseconds(); mockTime.sleep(1000); produceMessages(firstBatchTimestamp); @@ -241,6 +275,7 @@ public void shouldReduceWindowed(final boolean withHeaders, final TestInfo testI .to(outputTopic, Produced.with(windowedSerde, Serdes.String())); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + maybeEnableTransactionalStateStores(transactionalStateStores); startStreams(); @@ -856,8 +891,9 @@ public void shouldCountSessionWindows(final boolean withHeaders) throws Exceptio } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReduceSessionWindows(final boolean withHeaders) throws Exception { + @MethodSource("headersAndTransactional") + public void shouldReduceSessionWindows(final boolean withHeaders, + final boolean transactionalStateStores) throws Exception { final long sessionGap = 1000L; // something to do with time final Properties producerConfig = TestUtils.producerConfig( @@ -890,6 +926,8 @@ public void shouldReduceSessionWindows(final boolean withHeaders) throws Excepti latch.countDown(); }); + maybeEnableTransactionalStateStores(transactionalStateStores); + startStreams(); latch.await(30, TimeUnit.SECONDS); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index c6daa0420c178..f29292892d799 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -90,10 +90,20 @@ public void before() { } private static Properties getStreamsProperties(final String optimization, final boolean withHeaders) { + return getStreamsProperties(optimization, withHeaders, false); + } + + private static Properties getStreamsProperties(final String optimization, final boolean withHeaders, final boolean transactional) { final Properties props = mkProperties(mkMap( mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization) )); + // Transactional state stores (KIP-892) are only supported under exactly-once-v2, so whenever the + // transactional dimension is enabled we also switch the processing guarantee to exactly-once-v2. + if (transactional) { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + props.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + } StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders); return props; } @@ -145,6 +155,51 @@ private static Stream versionedDataTestCases() { return versionedData().stream().map(Arguments::of); } + // Extends the standard testCases() with a transactional dimension (last argument). All existing cases keep + // transactional=false (preserving current coverage), and we add a small, representative set of + // transactional=true cases (KIP-892 transactional state stores, which imply exactly-once-v2). To avoid + // doubling the whole matrix, transactional=true is only added for a single materialization/config + // combination: materialized (so the queryable store is exercised), non-optimized, non-rejoin, non-versioned, + // and without DSL store-format headers, for both inner and left joins. + private static Stream transactionalTestCases() { + final Stream nonTransactional = testCases() + .map(arguments -> extend(arguments.get(), false)); + final Stream transactional = Stream.of(true, false) + .map(leftJoin -> Arguments.of( + leftJoin, // leftJoin + StreamsConfig.NO_OPTIMIZATION, // optimization + true, // materialized + false, // rejoin + false, // leftVersioned + false, // rightVersioned + false, // withHeaders + true // transactional + )); + return Stream.concat(nonTransactional, transactional); + } + + // Same as transactionalTestCases() but without the leftJoin argument (mirrors testCasesWithoutLeftJoinArg()). + private static Stream transactionalTestCasesWithoutLeftJoinArg() { + final Stream nonTransactional = testCasesWithoutLeftJoinArg() + .map(arguments -> extend(arguments.get(), false)); + final Stream transactional = Stream.of(Arguments.of( + StreamsConfig.NO_OPTIMIZATION, // optimization + true, // materialized + false, // rejoin + false, // leftVersioned + false, // rightVersioned + false, // withHeaders + true // transactional + )); + return Stream.concat(nonTransactional, transactional); + } + + private static Arguments extend(final Object[] args, final Object extra) { + final Object[] extended = Arrays.copyOf(args, args.length + 1); + extended[args.length] = extra; + return Arguments.of(extended); + } + protected static Collection buildParameters(final List... argOptions) { List result = new LinkedList<>(); result.add(new Object[0]); @@ -170,15 +225,16 @@ private static List times(final List left, final List rig } @ParameterizedTest - @MethodSource("testCases") + @MethodSource("transactionalTestCases") public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin, final String optimization, final boolean materialized, final boolean rejoin, final boolean leftVersioned, final boolean rightVersioned, - final boolean withHeaders) { - final Properties streamsConfig = getStreamsProperties(optimization, withHeaders); + final boolean withHeaders, + final boolean transactional) { + final Properties streamsConfig = getStreamsProperties(optimization, withHeaders, transactional); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); @@ -370,15 +426,16 @@ public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean leftJoin, } @ParameterizedTest - @MethodSource("testCases") + @MethodSource("transactionalTestCases") public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin, final String optimization, final boolean materialized, final boolean rejoin, final boolean leftVersioned, final boolean rightVersioned, - final boolean withHeaders) { - final Properties streamsConfig = getStreamsProperties(optimization, withHeaders); + final boolean withHeaders, + final boolean transactional) { + final Properties streamsConfig = getStreamsProperties(optimization, withHeaders, transactional); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); @@ -814,13 +871,14 @@ public void shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati } @ParameterizedTest - @MethodSource("testCasesWithoutLeftJoinArg") + @MethodSource("transactionalTestCasesWithoutLeftJoinArg") public void shouldEmitRecordWhenOldAndNewFkDiffer(final String optimization, final boolean materialized, final boolean rejoin, final boolean leftVersioned, final boolean rightVersioned, - final boolean withHeaders) { + final boolean withHeaders, + final boolean transactional) { final Function foreignKeyExtractor = value -> { final String split = value.split("\\|")[1]; if (split.equals("returnNull")) { @@ -831,7 +889,7 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final String optimization, return split; } }; - final Properties streamsConfig = getStreamsProperties(optimization, withHeaders); + final Properties streamsConfig = getStreamsProperties(optimization, withHeaders, transactional); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, true, rejoin, leftVersioned, rightVersioned, foreignKeyExtractor); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 01caca6467641..9c9bdda9f7b00 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -187,6 +187,15 @@ private Properties props(final Properties extraProperties) { return streamsConfiguration; } + // Enables transactional state stores (KIP-892) when requested. Transactional stores are only supported + // under exactly-once, so this also sets the processing guarantee to EXACTLY_ONCE_V2. + private static void maybeSetTransactionalStateStores(final Properties props, final boolean transactional) { + if (transactional) { + props.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } + } + @AfterEach public void shutdown() throws Exception { if (kafkaStreams != null) { @@ -397,9 +406,21 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNew assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2)); } + // Adds a transactional dimension on top of (useNewProtocol, withHeaders). When transactional is true the + // state stores are transactional (KIP-892), which requires EXACTLY_ONCE_V2 processing guarantee. This + // exercises the transactional-store lifecycle over the changelog restore path. @ParameterizedTest - @CsvSource({"false, false", "false, true", "true, false", "true, true"}) - public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol, final boolean withHeaders) throws Exception { + @CsvSource({ + "false, false, false", + "false, true, false", + "true, false, false", + "true, true, false", + "false, false, true", + "true, false, true" + }) + public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol, + final boolean withHeaders, + final boolean transactional) throws Exception { final String changelog = appId + "-store-changelog"; CLUSTER.createTopic(changelog, 2, 1); @@ -412,6 +433,7 @@ public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol, f props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } StreamsTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders); + maybeSetTransactionalStateStores(props, transactional); // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions final int offsetCheckpointed = 1000; diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java index 562a80d0f3747..f91be43c217db 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java @@ -135,20 +135,29 @@ public void whenShuttingDown() throws IOException { public static Stream data() { return Stream.of( - Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false), - Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true), - Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false), - Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true), - Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false), - Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true), - Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false), - Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true) + // (strategyType, withCache, withHeaders, transactional) + // Existing non-transactional coverage (at-least-once) is preserved unchanged. + Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, false, false), + Arguments.of(StrategyType.ON_WINDOW_UPDATE, true, true, false), + Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false, false), + Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, true, false), + Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, false, false), + Arguments.of(StrategyType.ON_WINDOW_CLOSE, true, true, false), + Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false, false), + Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, true, false), + // Sparse transactional (KIP-892) coverage: enable.transactional.statestores=true always + // implies exactly_once_v2. One representative case per emit strategy, cache disabled and no + // store-format headers, exercising the transactional sliding-window aggregation store path. + Arguments.of(StrategyType.ON_WINDOW_UPDATE, false, false, true), + Arguments.of(StrategyType.ON_WINDOW_CLOSE, false, false, true) ); } @ParameterizedTest @MethodSource("data") - public void shouldAggregateWindowedWithNoGrace(final StrategyType strategyType, final boolean withCache, final boolean withHeaders) throws Exception { + public void shouldAggregateWindowedWithNoGrace(final StrategyType strategyType, final boolean withCache, final boolean withHeaders, final boolean transactional) throws Exception { + maybeSetTransactionalStateStores(transactional); + produceMessages( streamOneInput, new KeyValueTimestamp<>("A", "1", 0), // Create [0, 10](0+1) @@ -216,7 +225,9 @@ public void shouldAggregateWindowedWithNoGrace(final StrategyType strategyType, @ParameterizedTest @MethodSource("data") - public void shouldAggregateWindowedWithGrace(final StrategyType strategyType, final boolean withCache, final boolean withHeaders) throws Exception { + public void shouldAggregateWindowedWithGrace(final StrategyType strategyType, final boolean withCache, final boolean withHeaders, final boolean transactional) throws Exception { + maybeSetTransactionalStateStores(transactional); + produceMessages( streamOneInput, new KeyValueTimestamp<>("A", "1", 0), // Create [0, 10](0+1) @@ -293,7 +304,9 @@ public void shouldAggregateWindowedWithGrace(final StrategyType strategyType, fi @ParameterizedTest @MethodSource("data") - public void shouldRestoreAfterJoinRestart(final StrategyType strategyType, final boolean withCache, final boolean withHeaders) throws Exception { + public void shouldRestoreAfterJoinRestart(final StrategyType strategyType, final boolean withCache, final boolean withHeaders, final boolean transactional) throws Exception { + maybeSetTransactionalStateStores(transactional); + produceMessages( streamOneInput, new KeyValueTimestamp<>("A", "L1", 0), @@ -434,6 +447,15 @@ private void produceMessages(final String topic, final KeyValueTimestamp record) { ) .to(outputTopic); - return new KafkaStreams(builder.build(), props(stateDirPath)); + return new KafkaStreams(builder.build(), props(stateDirPath, transactionalStateStores)); } - private Properties props(final String stateDirPath) { + private Properties props(final String stateDirPath, final boolean transactionalStateStores) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -384,6 +389,10 @@ private Properties props(final String stateDirPath) { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath); streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + if (transactionalStateStores) { + // Transactional state stores are only supported under exactly-once, which is already enabled above. + streamsConfiguration.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + } streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index f6fbe1c75f85f..adbd0f8b27814 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -91,13 +91,14 @@ public void testSelfJoin(final boolean cacheEnabled, final boolean withHeaders) } @ParameterizedTest - @CsvSource({"true, false", "true, true", "false, false", "false, true"}) - public void testInner(final boolean cacheEnabled, final boolean withHeaders) { + @CsvSource({"true, false, false", "true, true, false", "false, false, false", "false, true, false", "false, false, true"}) + public void testInner(final boolean cacheEnabled, final boolean withHeaders, final boolean transactional) { final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(INPUT_TOPIC_LEFT); final KStream rightStream = builder.stream(INPUT_TOPIC_RIGHT); final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner"); + maybeEnableTransactionalStateStores(streamsConfig, transactional); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, withHeaders); @@ -201,13 +202,14 @@ public void testInnerRepartitioned(final boolean cacheEnabled, final boolean wit } @ParameterizedTest - @CsvSource({"true, false", "true, true", "false, false", "false, true"}) - public void testLeft(final boolean cacheEnabled, final boolean withHeaders) { + @CsvSource({"true, false, false", "true, true, false", "false, false, false", "false, true, false", "false, false, true"}) + public void testLeft(final boolean cacheEnabled, final boolean withHeaders, final boolean transactional) { final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(INPUT_TOPIC_LEFT); final KStream rightStream = builder.stream(INPUT_TOPIC_RIGHT); final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left"); + maybeEnableTransactionalStateStores(streamsConfig, transactional); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, withHeaders); @@ -313,13 +315,14 @@ public void testLeftRepartitioned(final boolean cacheEnabled, final boolean with } @ParameterizedTest - @CsvSource({"true, false", "true, true", "false, false", "false, true"}) - public void testOuter(final boolean cacheEnabled, final boolean withHeaders) { + @CsvSource({"true, false, false", "true, true, false", "false, false, false", "false, true, false", "false, false, true"}) + public void testOuter(final boolean cacheEnabled, final boolean withHeaders, final boolean transactional) { final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(INPUT_TOPIC_LEFT); final KStream rightStream = builder.stream(INPUT_TOPIC_RIGHT); final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer"); + maybeEnableTransactionalStateStores(streamsConfig, transactional); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, withHeaders); @@ -531,4 +534,13 @@ public void testMultiInner(final boolean cacheEnabled, final boolean withHeaders runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, streamsConfig, builder.build(streamsConfig)); } + + // Transactional state stores (KIP-892) are only supported under exactly-once, so whenever + // enable.transactional.statestores=true we must also set processing.guarantee=exactly_once_v2. + private static void maybeEnableTransactionalStateStores(final Properties streamsConfig, final boolean transactional) { + if (transactional) { + streamsConfig.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } + } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 21acb226fc33c..1c6b275f74713 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -51,7 +51,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,9 +103,17 @@ public static void closeCluster() { private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final long COMMIT_INTERVAL = 100L; - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, final TestInfo testInfo) { + @ParameterizedTest(name = "{displayName} withHeaders={0}, transactional={1}") + @CsvSource({ + // withHeaders, transactional + // transactional=false keeps the existing (at-least-once) coverage over the DSL store-format header dimension. + "false, false", + "true, false", + // transactional=true always implies exactly-once-v2; a single sparse invocation exercises the + // suppress-buffer restore path over transactional (KIP-892) state stores. + "false, true" + }) + public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, final boolean transactional, final TestInfo testInfo) { final String testId = safeUniqueTestName(testInfo); final String appId = "appId_" + testId; final String input = "input" + testId; @@ -154,6 +162,12 @@ public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, final Te streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, withHeaders); + if (transactional) { + // Transactional state stores (KIP-892) require exactly-once-v2. + streamsConfig.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } + KafkaStreams driver = getStartedStreams(streamsConfig, builder, true); try { // start by putting some stuff in the buffer diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index 740b5b1af5c31..0da0fcb8077e6 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -63,6 +63,8 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; @@ -125,8 +127,9 @@ public void afterTest() { } } - @Test - public void shouldPutGetAndDelete() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldPutGetAndDelete(final boolean transactionalStateStores) throws Exception { // build topology and start app final StreamsBuilder streamsBuilder = new StreamsBuilder(); @@ -142,7 +145,7 @@ public void shouldPutGetAndDelete() throws Exception { .process(() -> new VersionedStoreContentCheckerProcessor(true), STORE_NAME) .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); - final Properties props = props(); + final Properties props = props(transactionalStateStores); kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); kafkaStreams.start(); @@ -208,8 +211,9 @@ public void shouldSetChangelogTopicProperties() throws Exception { assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); } - @Test - public void shouldRestore() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldRestore(final boolean transactionalStateStores) throws Exception { // build topology and start app StreamsBuilder streamsBuilder = new StreamsBuilder(); @@ -225,7 +229,7 @@ public void shouldRestore() throws Exception { .process(() -> new VersionedStoreContentCheckerProcessor(true), STORE_NAME) .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); - final Properties props = props(); + final Properties props = props(transactionalStateStores); kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); kafkaStreams.start(); @@ -428,6 +432,10 @@ private void shouldManualUpgradeFromNonVersionedToVersioned(final Topology origi } private Properties props() { + return props(false); + } + + private Properties props(final boolean transactionalStateStores) { final String safeTestName = safeUniqueTestName(testInfo); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); @@ -435,6 +443,11 @@ private Properties props() { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (transactionalStateStores) { + // Transactional state stores are only supported under exactly-once. + streamsConfiguration.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } return streamsConfiguration; }