Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -93,8 +94,10 @@ public void after() {
cluster.stop();
}

@Test
public void shouldHaveSamePositionBoundActiveAndStandBy(final TestInfo testInfo) throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldHaveSamePositionBoundActiveAndStandBy(final boolean transactionalStateStores,
final TestInfo testInfo) throws Exception {
final Semaphore semaphore = new Semaphore(0);

final StreamsBuilder builder = new StreamsBuilder();
Expand All @@ -107,8 +110,8 @@ public void shouldHaveSamePositionBoundActiveAndStandBy(final TestInfo testInfo)
.peek((k, v) -> semaphore.release());

final String safeTestName = safeUniqueTestName(testInfo);
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName));
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName));
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName, transactionalStateStores));
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName, transactionalStateStores));
final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);

try {
Expand Down Expand Up @@ -200,7 +203,7 @@ private void produceValueRange() {
);
}

private Properties streamsConfiguration(final String safeTestName) {
private Properties streamsConfiguration(final String safeTestName, final boolean transactionalStateStores) {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
Expand All @@ -214,6 +217,10 @@ private Properties streamsConfiguration(final String safeTestName) {
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true);
if (transactionalStateStores) {
config.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ private static java.util.stream.Stream<Arguments> groupProtocolAndProcessingThre
);
}

// Adds a sparse "transactional state stores" (KIP-892) dimension on top of the group-protocol/
// processing-threads matrix. To avoid exploding the full matrix by 2, the transactional=true case
// is only exercised for a single, representative combination (classic protocol, processing threads
// disabled) while preserving all existing (transactional=false) coverage. Transactional state stores
// are an exactly-once-only feature; this test already runs under EXACTLY_ONCE_V2.
private static java.util.stream.Stream<Arguments> groupProtocolProcessingThreadsAndTransactionalParameters() {
return java.util.stream.Stream.of(
Arguments.of("classic", true, false),
Arguments.of("classic", false, false),
Arguments.of("streams", true, false),
Arguments.of("streams", false, false),
Arguments.of("classic", false, true)
);
}

@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
Expand Down Expand Up @@ -498,8 +513,10 @@ public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final
}

@ParameterizedTest
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolProcessingThreadsAndTransactionalParameters")
public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol,
final boolean processingThreadsEnabled,
final boolean transactionalStateStores) throws Exception {

// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
Expand All @@ -515,7 +532,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtoco

// We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
// to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) {
try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled, transactionalStateStores)) {
startApplicationAndWaitUntilRunning(streams);

final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
Expand Down Expand Up @@ -772,12 +789,14 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
}

@ParameterizedTest
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
@MethodSource("groupProtocolProcessingThreadsAndTransactionalParameters")
public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol,
final boolean processingThreadsEnabled,
final boolean transactionalStateStores) throws Exception {
final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);

try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) {
try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled, transactionalStateStores)) {
writeInputData(writtenData);

startApplicationAndWaitUntilRunning(streams);
Expand Down Expand Up @@ -1117,13 +1136,23 @@ private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
return data;
}

// the threads should no longer fail one thread one at a time
private KafkaStreams getKafkaStreams(final String dummyHostName,
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
final String groupProtocol,
final boolean processingThreadsEnabled) {
return getKafkaStreams(dummyHostName, withState, appDir, numberOfStreamsThreads, groupProtocol, processingThreadsEnabled, false);
}

// the threads should no longer fail one thread one at a time
private KafkaStreams getKafkaStreams(final String dummyHostName,
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
final String groupProtocol,
final boolean processingThreadsEnabled,
final boolean transactionalStateStores) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
stallInjected = new AtomicBoolean(false);
Expand Down Expand Up @@ -1231,6 +1260,11 @@ public void process(final Record<Long, Long> record) {
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
if (transactionalStateStores) {
// This test already runs under EXACTLY_ONCE_V2 (see PROCESSING_GUARANTEE_CONFIG above);
// transactional state stores are an exactly-once-only feature.
properties.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true);
}

final Properties config = StreamsTestUtils.getStreamsConfig(
applicationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayOutputStream;
Expand All @@ -92,6 +94,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
Expand Down Expand Up @@ -170,16 +173,45 @@ public void whenShuttingDown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

/**
* Provides (withHeaders, transactionalStateStores) pairs so representative aggregation tests keep their
* existing withHeaders={false,true} coverage (with non-transactional stores) and additionally run once
* with transactional state stores enabled. Transactional state stores (KIP-892) are an exactly-once-only
* feature, so the transactional=true case always runs under EXACTLY_ONCE_V2 (see
* {@link #maybeEnableTransactionalStateStores(boolean)}).
*/
private static Stream<Arguments> headersAndTransactional() {
return Stream.of(
Arguments.of(false, false),
Arguments.of(true, false),
Arguments.of(false, true)
);
}

/**
* When {@code transactionalStateStores} is true, enable transactional state stores under EXACTLY_ONCE_V2,
* since transactional state stores (KIP-892) are only supported with exactly-once processing.
*/
private void maybeEnableTransactionalStateStores(final boolean transactionalStateStores) {
if (transactionalStateStores) {
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
streamsConfiguration.put(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, true);
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReduce(final boolean withHeaders, final TestInfo testInfo) throws Exception {
@MethodSource("headersAndTransactional")
public void shouldReduce(final boolean withHeaders,
final boolean transactionalStateStores,
final TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
maybeEnableTransactionalStateStores(transactionalStateStores);

startStreams();

Expand Down Expand Up @@ -224,8 +256,10 @@ private static <K extends Comparable<K>, V extends Comparable<V>> int compare(fi
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReduceWindowed(final boolean withHeaders, final TestInfo testInfo) throws Exception {
@MethodSource("headersAndTransactional")
public void shouldReduceWindowed(final boolean withHeaders,
final boolean transactionalStateStores,
final TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
mockTime.sleep(1000);
produceMessages(firstBatchTimestamp);
Expand All @@ -241,6 +275,7 @@ public void shouldReduceWindowed(final boolean withHeaders, final TestInfo testI
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));

StreamsTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
maybeEnableTransactionalStateStores(transactionalStateStores);

startStreams();

Expand Down Expand Up @@ -856,8 +891,9 @@ public void shouldCountSessionWindows(final boolean withHeaders) throws Exceptio
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReduceSessionWindows(final boolean withHeaders) throws Exception {
@MethodSource("headersAndTransactional")
public void shouldReduceSessionWindows(final boolean withHeaders,
final boolean transactionalStateStores) throws Exception {
final long sessionGap = 1000L; // something to do with time

final Properties producerConfig = TestUtils.producerConfig(
Expand Down Expand Up @@ -890,6 +926,8 @@ public void shouldReduceSessionWindows(final boolean withHeaders) throws Excepti
latch.countDown();
});

maybeEnableTransactionalStateStores(transactionalStateStores);

startStreams();
latch.await(30, TimeUnit.SECONDS);

Expand Down
Loading
Loading