From 1a95089ac4331bd17137b00397a742441870cec6 Mon Sep 17 00:00:00 2001 From: prtiwari Date: Mon, 22 Sep 2025 11:37:12 +0530 Subject: [PATCH 1/6] DBZ-9439- Add connection validator for Amazon Kinesis. Signed-off-by: pranavt84 --- debezium-platform-conductor/pom.xml | 12 ++++++++ .../src/main/resources/application.yml | 3 ++ .../main/resources/connection-schemas.json | 29 +++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml index 19c4f8b2..2b794acf 100644 --- a/debezium-platform-conductor/pom.xml +++ b/debezium-platform-conductor/pom.xml @@ -359,6 +359,12 @@ ${kafka-clients.version} + + com.amazonaws + aws-java-sdk-kinesis + 1.12.791 + + io.quarkus @@ -459,6 +465,12 @@ quarkus-test-kafka-companion test + + software.amazon.awssdk + kinesis + 2.29.23 + compile + diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml index 66fd877c..845b05a3 100644 --- a/debezium-platform-conductor/src/main/resources/application.yml +++ b/debezium-platform-conductor/src/main/resources/application.yml @@ -44,6 +44,9 @@ destinations: kafka: connection: timeout: 60 + kinesis: + connection: + timeout: 5 quarkus: rest-client: diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json index 936e1801..d872a2ae 100644 --- a/debezium-platform-conductor/src/main/resources/connection-schemas.json +++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json @@ -218,5 +218,34 @@ } } } + }, + { + "type": "KINESIS", + "schema": { + "title": "Amazon Kinesis stream connection properties", + "description": "Amazon Kinesis stream connection properties", + "type": "object", + "required": [ + "region", + "stream" + ], + "additionalProperties": { + "type": "string" + }, + "properties": { + "region": { + "type": "string", + "title": "AWS region where the Kinesis stream is located (e.g., us-east-1)" + }, + "stream": { + "type": "string", + "title": "The name of the Kinesis stream to connect to" + }, + "partitionKey": { + "type": "string", + "title": "The partition key used when sending a test record (optional, defaults to 'test-partition')" + } + } + } } ] \ No newline at end of file From 4207259bab0b1ded1edb2f383102048158a1ac00 Mon Sep 17 00:00:00 2001 From: prtiwari Date: Mon, 22 Sep 2025 11:42:03 +0530 Subject: [PATCH 2/6] DBZ-9439- Add connection validator for Amazon Kinesis. Signed-off-by: pranavt84 --- .../KinesisConnectionValidator.java | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java new file mode 100644 index 00000000..18627835 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java @@ -0,0 +1,128 @@ +package io.debezium.platform.environment.connection.destination; + +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import jakarta.inject.Named; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.UnknownHostException; +import java.util.Map; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.ConnectionValidator; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; + + +/** + * Implementation of {@link ConnectionValidator} for Amazon Kinesis. + * Validates stream name, region, and tests by sending a simple record. + * + * Author: Pranav Tiwari + */ +@Named("KINESIS") +public class KinesisConnectionValidator implements ConnectionValidator { + + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConnectionValidator.class); + + private static final String REGION_KEY = "region"; + private static final String STREAM_NAME_KEY = "stream"; + private static final String PARTITION_KEY = "partitionKey"; // Optional + private static final String TEST_MESSAGE = "Kinesis validation test message"; + + private final int defaultTimeout; + + public KinesisConnectionValidator( + @ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) { + this.defaultTimeout = defaultTimeout; + } + + @Override + public ConnectionValidationResult validate(Connection connectionConfig) { + if (connectionConfig == null) { + return ConnectionValidationResult.failed("Connection configuration cannot be null"); + } + + try { + LOGGER.debug("Starting Kinesis connection validation for: {}", connectionConfig.getName()); + + Map kinesisConfig = connectionConfig.getConfig(); + + ConnectionValidationResult configValidation = validateConfiguration(kinesisConfig); + if (!configValidation.valid()) { + return configValidation; + } + + return performConnectionValidation(kinesisConfig); + } catch (Exception e) { + LOGGER.error("Unexpected error during Kinesis connection validation", e); + return ConnectionValidationResult.failed("Unexpected error: " + e.getMessage()); + } + } + + private ConnectionValidationResult validateConfiguration(Map config) { + if (!config.containsKey(REGION_KEY) || config.get(REGION_KEY) == null || + config.get(REGION_KEY).toString().trim().isEmpty()) { + return ConnectionValidationResult.failed("Region must be specified"); + } + + if (!config.containsKey(STREAM_NAME_KEY) || config.get(STREAM_NAME_KEY) == null || + config.get(STREAM_NAME_KEY).toString().trim().isEmpty()) { + return ConnectionValidationResult.failed("Stream name must be specified"); + } + + return ConnectionValidationResult.successful(); + } + + private ConnectionValidationResult performConnectionValidation(Map config) { + KinesisClient kinesisClient = null; + + try { + String regionName = config.get(REGION_KEY).toString().trim(); + String streamName = config.get(STREAM_NAME_KEY).toString().trim(); + String partitionKey = config.containsKey(PARTITION_KEY) + ? config.get(PARTITION_KEY).toString() + : "test-partition"; + + LOGGER.debug("Connecting to Kinesis in region: {}, stream: {}", regionName, streamName); + + kinesisClient = KinesisClient.builder() + .region(Region.of(regionName)) + .build(); + + kinesisClient.describeStreamSummary(builder -> + builder.streamName(streamName)); + + // Describe the stream without sending data + var response = kinesisClient.describeStreamSummary(builder -> + builder.streamName(streamName)); + + LOGGER.debug("Successfully described Kinesis stream '{}'. Status: {}", + streamName, response.streamDescriptionSummary().streamStatusAsString()); + + return ConnectionValidationResult.successful(); + + } catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) { + return ConnectionValidationResult.failed("Stream not found: Please verify the stream name and region."); + } catch (software.amazon.awssdk.services.kinesis.model.AccessDeniedException e) { + return ConnectionValidationResult.failed("Access denied: Check IAM permissions or credentials."); + } catch (software.amazon.awssdk.core.exception.SdkClientException e) { + return ConnectionValidationResult.failed("Client error: " + e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Generic exception during validation", e); + return ConnectionValidationResult.failed("Failed to validate Kinesis connection: " + e.getMessage()); + } finally { + if (kinesisClient != null) { + try { + kinesisClient.close(); + } catch (Exception ex) { + LOGGER.warn("Error closing Kinesis client", ex); + } + } + } + } +} From 99bdaa8107d61fbe1ab2b69cde6b52909b64fa51 Mon Sep 17 00:00:00 2001 From: Pranav Tiwari Date: Sat, 11 Oct 2025 17:54:48 +0530 Subject: [PATCH 3/6] DBZ-9439- Ran formatter check. Signed-off-by: pranavt84 --- .../KinesisConnectionValidator.java | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java index 18627835..1279562f 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java @@ -1,30 +1,35 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + package io.debezium.platform.environment.connection.destination; -import com.amazonaws.services.kinesis.model.PutRecordRequest; +import java.util.Map; + +import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Named; + import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.UnknownHostException; -import java.util.Map; - import io.debezium.platform.data.dto.ConnectionValidationResult; import io.debezium.platform.domain.views.Connection; import io.debezium.platform.environment.connection.ConnectionValidator; -import software.amazon.awssdk.core.SdkBytes; + import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; - /** * Implementation of {@link ConnectionValidator} for Amazon Kinesis. * Validates stream name, region, and tests by sending a simple record. - * + * * Author: Pranav Tiwari */ -@Named("KINESIS") +@ApplicationScoped +@Named("AMAZON_KINESIS") public class KinesisConnectionValidator implements ConnectionValidator { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConnectionValidator.class); @@ -37,7 +42,7 @@ public class KinesisConnectionValidator implements ConnectionValidator { private final int defaultTimeout; public KinesisConnectionValidator( - @ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) { + @ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) { this.defaultTimeout = defaultTimeout; } @@ -58,7 +63,8 @@ public ConnectionValidationResult validate(Connection connectionConfig) { } return performConnectionValidation(kinesisConfig); - } catch (Exception e) { + } + catch (Exception e) { LOGGER.error("Unexpected error during Kinesis connection validation", e); return ConnectionValidationResult.failed("Unexpected error: " + e.getMessage()); } @@ -94,32 +100,36 @@ private ConnectionValidationResult performConnectionValidation(Map - builder.streamName(streamName)); + kinesisClient.describeStreamSummary(builder -> builder.streamName(streamName)); // Describe the stream without sending data - var response = kinesisClient.describeStreamSummary(builder -> - builder.streamName(streamName)); + var response = kinesisClient.describeStreamSummary(builder -> builder.streamName(streamName)); LOGGER.debug("Successfully described Kinesis stream '{}'. Status: {}", streamName, response.streamDescriptionSummary().streamStatusAsString()); return ConnectionValidationResult.successful(); - } catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) { + } + catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) { return ConnectionValidationResult.failed("Stream not found: Please verify the stream name and region."); - } catch (software.amazon.awssdk.services.kinesis.model.AccessDeniedException e) { + } + catch (software.amazon.awssdk.services.kinesis.model.AccessDeniedException e) { return ConnectionValidationResult.failed("Access denied: Check IAM permissions or credentials."); - } catch (software.amazon.awssdk.core.exception.SdkClientException e) { + } + catch (software.amazon.awssdk.core.exception.SdkClientException e) { return ConnectionValidationResult.failed("Client error: " + e.getMessage()); - } catch (Exception e) { + } + catch (Exception e) { LOGGER.warn("Generic exception during validation", e); return ConnectionValidationResult.failed("Failed to validate Kinesis connection: " + e.getMessage()); - } finally { + } + finally { if (kinesisClient != null) { try { kinesisClient.close(); - } catch (Exception ex) { + } + catch (Exception ex) { LOGGER.warn("Error closing Kinesis client", ex); } } From c9b260f367a3d32ef9ede94e5663412ef8e97525 Mon Sep 17 00:00:00 2001 From: Pranav Tiwari Date: Sat, 11 Oct 2025 17:56:14 +0530 Subject: [PATCH 4/6] DBZ-9439- Ran formatter check. Signed-off-by: pranavt84 --- .../src/main/resources/connection-schemas.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json index d872a2ae..3a5ff6f0 100644 --- a/debezium-platform-conductor/src/main/resources/connection-schemas.json +++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json @@ -220,7 +220,7 @@ } }, { - "type": "KINESIS", + "type": "AMAZON_KINESIS", "schema": { "title": "Amazon Kinesis stream connection properties", "description": "Amazon Kinesis stream connection properties", From 699e6197eb2a5ee4aa39f754e6a5229e8280a0f5 Mon Sep 17 00:00:00 2001 From: Pranav Tiwari Date: Sun, 12 Oct 2025 11:34:25 +0530 Subject: [PATCH 5/6] DBZ-9439- Added integeration tests. Signed-off-by: pranavt84 --- debezium-platform-conductor/pom.xml | 6 + .../KinesisConnectionValidator.java | 32 +- .../KinesisConnectionValidatorIT.java | 277 ++++++++++++++++++ .../KinesisConnectionValidatorTest.java | 240 +++++++++++++++ .../database/db/KinesisTestResource.java | 160 ++++++++++ 5 files changed, 703 insertions(+), 12 deletions(-) create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorIT.java create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml index 2b794acf..e58da1bd 100644 --- a/debezium-platform-conductor/pom.xml +++ b/debezium-platform-conductor/pom.xml @@ -425,6 +425,12 @@ mssqlserver test + + org.testcontainers + localstack + ${test-containers.version} + test + ch.qos.logback diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java index 1279562f..7f2e952d 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java @@ -6,6 +6,7 @@ package io.debezium.platform.environment.connection.destination; +import java.net.URI; import java.util.Map; import jakarta.enterprise.context.ApplicationScoped; @@ -36,6 +37,7 @@ public class KinesisConnectionValidator implements ConnectionValidator { private static final String REGION_KEY = "region"; private static final String STREAM_NAME_KEY = "stream"; + private static final String ENDPOINT_KEY = "endpoint"; private static final String PARTITION_KEY = "partitionKey"; // Optional private static final String TEST_MESSAGE = "Kinesis validation test message"; @@ -90,20 +92,20 @@ private ConnectionValidationResult performConnectionValidation(Map builder.streamName(streamName)); + // ✅ Allow custom endpoint for testing (LocalStack) + if (config.containsKey(ENDPOINT_KEY) && config.get(ENDPOINT_KEY) != null) { + builder.endpointOverride(URI.create(config.get(ENDPOINT_KEY).toString().trim())); + } + + kinesisClient = builder.build(); - // Describe the stream without sending data - var response = kinesisClient.describeStreamSummary(builder -> builder.streamName(streamName)); + var response = kinesisClient.describeStreamSummary(r -> r.streamName(streamName)); LOGGER.debug("Successfully described Kinesis stream '{}'. Status: {}", streamName, response.streamDescriptionSummary().streamStatusAsString()); @@ -112,13 +114,19 @@ private ConnectionValidationResult performConnectionValidation(Map config = new HashMap<>(); + config.put("endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class)); + config.put("stream", ConfigProvider.getConfig().getValue("kinesis.streamName", String.class)); + // region is missing + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + assertFalse(result.valid(), "Should fail when region is missing"); + assertThat(result.message()).isEqualTo("Region must be specified"); + } + + @Test + @DisplayName("Should fail validation when stream name is missing") + void shouldFailWhenStreamNameIsMissing() { + Map config = new HashMap<>(); + config.put("endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class)); + config.put("region", ConfigProvider.getConfig().getValue("kinesis.region", String.class)); + // stream is missing + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + assertFalse(result.valid(), "Should fail when stream name is missing"); + assertThat(result.message()).isEqualTo("Stream name must be specified"); + } + + @Test + @DisplayName("Should fail validation when both region and stream are missing") + void shouldFailWhenBothRegionAndStreamAreMissing() { + Map config = new HashMap<>(); + config.put("endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class)); + // both region and stream are missing + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + assertFalse(result.valid(), "Should fail when both region and stream are missing"); + // Should fail on first validation check (region) + assertThat(result.message()).isEqualTo("Region must be specified"); + } + + // ==================== ENDPOINT TESTS ==================== + + @Test + @DisplayName("Should work without endpoint (uses default AWS)") + void shouldWorkWithoutEndpoint() { + Map config = new HashMap<>(); + config.put("region", ConfigProvider.getConfig().getValue("kinesis.region", String.class)); + config.put("stream", "non-existent-stream"); + // endpoint is not provided - will try to connect to real AWS + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + // Will fail because stream doesn't exist in real AWS, but proves endpoint is optional + assertFalse(result.valid()); + // The error could be access denied or stream not found depending on credentials + assertTrue(result.message().contains("Stream not found") || + result.message().contains("Access denied") || + result.message().contains("Client error") || + result.message().contains("Failed to validate"), + "Should attempt connection to AWS without custom endpoint"); + } + + @Test + @DisplayName("Should fail with malformed endpoint URL") + void shouldFailWithMalformedEndpoint() { + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, Map.of( + "endpoint", "not-a-valid-url", + "region", ConfigProvider.getConfig().getValue("kinesis.region", String.class), + "stream", ConfigProvider.getConfig().getValue("kinesis.streamName", String.class))); + + ConnectionValidationResult result = validator.validate(conn); + + assertFalse(result.valid(), "Should fail with malformed endpoint"); + assertTrue(result.message().contains("Failed to validate") || + result.message().contains("Client error"), + "Error message should indicate connection failure"); + } + + // ==================== OPTIONAL FIELDS TESTS ==================== + + @Test + @DisplayName("Should work without optional partitionKey field") + void shouldWorkWithoutPartitionKey() { + Map config = new HashMap<>(); + config.put("endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class)); + config.put("region", ConfigProvider.getConfig().getValue("kinesis.region", String.class)); + config.put("stream", ConfigProvider.getConfig().getValue("kinesis.streamName", String.class)); + // partitionKey is optional and not provided + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + assertTrue(result.valid(), "Should work without optional partitionKey"); + } + + @Test + @DisplayName("Should work with partitionKey provided") + void shouldWorkWithPartitionKey() { + Map config = new HashMap<>(); + config.put("endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class)); + config.put("region", ConfigProvider.getConfig().getValue("kinesis.region", String.class)); + config.put("stream", ConfigProvider.getConfig().getValue("kinesis.streamName", String.class)); + config.put("partitionKey", "custom-partition-key"); + + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(conn); + + assertTrue(result.valid(), "Should work with partitionKey provided"); + } + + // ==================== EDGE CASES ==================== + + @Test + @DisplayName("Should handle stream name with special characters") + void shouldHandleStreamNameWithSpecialCharacters() { + // Note: Kinesis stream names can contain alphanumeric, underscore, hyphen, and period + Connection conn = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, Map.of( + "endpoint", ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class), + "region", ConfigProvider.getConfig().getValue("kinesis.region", String.class), + "stream", "test-stream_123.ABC")); + + ConnectionValidationResult result = validator.validate(conn); + + // Will fail because stream doesn't exist, but validates special chars are handled + assertFalse(result.valid()); + assertThat(result.message()).contains("Stream not found"); + } + + @Test + @DisplayName("Should handle null connection config") + void shouldHandleNullConnectionConfig() { + ConnectionValidationResult result = validator.validate(null); + + assertFalse(result.valid(), "Should handle null connection gracefully"); + assertThat(result.message()).isEqualTo("Connection configuration cannot be null"); + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java new file mode 100644 index 00000000..dd8447f0 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java @@ -0,0 +1,240 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.platform.environment.connection; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import io.debezium.platform.data.dto.ConnectionValidationResult; +import io.debezium.platform.data.model.ConnectionEntity; +import io.debezium.platform.domain.views.Connection; +import io.debezium.platform.environment.connection.destination.KinesisConnectionValidator; + +/** + * Unit tests for {@link KinesisConnectionValidator}. + * Tests configuration validation logic without actual AWS connectivity. + * + * @author Pranav Tiwari + */ +class KinesisConnectionValidatorTest { + + private static final int DEFAULT_TIMEOUT = 30; + private KinesisConnectionValidator validator; + + @BeforeEach + void setUp() { + validator = new KinesisConnectionValidator(DEFAULT_TIMEOUT); + } + + // ==================== NULL AND EMPTY CONFIGURATION TESTS ==================== + + @Test + @DisplayName("Should fail validation when connection config is null") + void shouldFailValidationWhenConnectionConfigIsNull() { + ConnectionValidationResult result = validator.validate(null); + + assertFalse(result.valid(), "Validation should fail for null connection config"); + assertEquals("Connection configuration cannot be null", result.message()); + } + + // ==================== REGION VALIDATION TESTS ==================== + + @Test + @DisplayName("Should fail validation when region is missing") + void shouldFailValidationWhenRegionIsMissing() { + Map config = new HashMap<>(); + config.put("stream", "test-stream"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when region is missing"); + assertEquals("Region must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when region is null") + void shouldFailValidationWhenRegionIsNull() { + Map config = new HashMap<>(); + config.put("region", null); + config.put("stream", "test-stream"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when region is null"); + assertEquals("Region must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when region is empty string") + void shouldFailValidationWhenRegionIsEmpty() { + Map config = new HashMap<>(); + config.put("region", ""); + config.put("stream", "test-stream"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when region is empty"); + assertEquals("Region must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when region is only whitespace") + void shouldFailValidationWhenRegionIsWhitespace() { + Map config = new HashMap<>(); + config.put("region", " "); + config.put("stream", "test-stream"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when region is only whitespace"); + assertEquals("Region must be specified", result.message()); + } + + // ==================== STREAM NAME VALIDATION TESTS ==================== + + @Test + @DisplayName("Should fail validation when stream name is missing") + void shouldFailValidationWhenStreamNameIsMissing() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when stream name is missing"); + assertEquals("Stream name must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when stream name is null") + void shouldFailValidationWhenStreamNameIsNull() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", null); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when stream name is null"); + assertEquals("Stream name must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when stream name is empty string") + void shouldFailValidationWhenStreamNameIsEmpty() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", ""); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when stream name is empty"); + assertEquals("Stream name must be specified", result.message()); + } + + @Test + @DisplayName("Should fail validation when stream name is only whitespace") + void shouldFailValidationWhenStreamNameIsWhitespace() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", " "); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when stream name is only whitespace"); + assertEquals("Stream name must be specified", result.message()); + } + + // ==================== BOTH MISSING TESTS ==================== + + @Test + @DisplayName("Should fail validation when both region and stream are missing") + void shouldFailValidationWhenBothRegionAndStreamAreMissing() { + Map config = new HashMap<>(); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Validation should fail when both region and stream are missing"); + // Should fail on first check (region) + assertEquals("Region must be specified", result.message()); + } + + // ==================== OPTIONAL FIELDS TESTS ==================== + + @Test + @DisplayName("Should handle optional endpoint field when not provided") + void shouldHandleMissingEndpoint() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", "non-existent-stream"); + // endpoint is optional, not included + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + // Should use default AWS endpoint + assertFalse(result.valid()); + // Will fail trying to connect to real AWS (no credentials in unit test) + assertTrue(result.message().contains("Failed to validate") || + result.message().contains("Client error") || + result.message().contains("Access denied") || + result.message().contains("Stream not found"), + "Should attempt connection to AWS without custom endpoint"); + } + + // ==================== EDGE CASES ==================== + + @Test + @DisplayName("Should handle very long stream name") + void shouldHandleVeryLongStreamName() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", "a".repeat(1000)); // Very long string + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Should handle very long stream name gracefully"); + assertTrue(result.message().contains("Failed to validate") || + result.message().contains("Stream not found") || + result.message().contains("Client error"), + "Should fail with appropriate error"); + } + + @Test + @DisplayName("Should handle special characters in stream name") + void shouldHandleSpecialCharactersInStreamName() { + Map config = new HashMap<>(); + config.put("region", "us-east-1"); + config.put("stream", "test-stream-123_ABC.xyz"); + + Connection connection = new TestConnectionView(ConnectionEntity.Type.AMAZON_KINESIS, config); + ConnectionValidationResult result = validator.validate(connection); + + assertFalse(result.valid(), "Should process special characters in stream name"); + // Will fail trying to connect (no such stream), but validates the name format is handled + assertTrue(result.message().contains("Failed to validate") || + result.message().contains("Stream not found") || + result.message().contains("Client error"), + "Should attempt validation with special characters"); + } +} diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java new file mode 100644 index 00000000..f80655e9 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java @@ -0,0 +1,160 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.database.db; + +import java.net.URI; +import java.util.Map; + +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; + +/** + * Testcontainers resource for LocalStack (AWS local emulator) with Kinesis service. + *

+ * This resource manages the lifecycle of a LocalStack container that emulates AWS Kinesis + * for integration testing purposes. It provides a local Kinesis instance without requiring + * real AWS credentials or incurring AWS costs. + *

+ * + *

+ * The resource performs the following setup: + *

    + *
  • Starts a LocalStack Docker container with Kinesis service enabled
  • + *
  • Creates a test Kinesis stream named "test-stream" with 1 shard
  • + *
  • Waits for the stream to become ACTIVE before tests run (up to 30 seconds)
  • + *
  • Provides configuration properties for connecting to the local endpoint
  • + *
+ *

+ * + *

+ * Configuration properties exposed to tests: + *

    + *
  • kinesis.endpoint - LocalStack Kinesis endpoint URL
  • + *
  • kinesis.region - AWS region (e.g., us-east-1)
  • + *
  • kinesis.streamName - The pre-created test stream name
  • + *
+ *

+ * + *

+ * Usage in integration tests: + *

+ * {@code
+ * @QuarkusTest
+ * @QuarkusTestResource(KinesisTestResource.class)
+ * public class KinesisConnectionValidatorIT {
+ *     @Test
+ *     void testKinesisConnection() {
+ *         String endpoint = ConfigProvider.getConfig().getValue("kinesis.endpoint", String.class);
+ *         String region = ConfigProvider.getConfig().getValue("kinesis.region", String.class);
+ *         String stream = ConfigProvider.getConfig().getValue("kinesis.streamName", String.class);
+ *         // Use these properties to test Kinesis connection
+ *     }
+ * }
+ * }
+ * 
+ *

+ * + * @author Pranav Kumar Tiwari + */ +public class KinesisTestResource implements QuarkusTestResourceLifecycleManager { + + private LocalStackContainer localStack; + private final String streamName = "test-stream"; + + /** + * Starts the LocalStack container and creates a test Kinesis stream. + * This method is called before any tests run. + * + * @return Map of configuration properties to inject into test context + */ + @Override + public Map start() { + // Start LocalStack container with Kinesis service + localStack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) + .withServices(LocalStackContainer.Service.KINESIS); + localStack.start(); + + // Get LocalStack endpoint and region + String endpoint = localStack.getEndpointOverride(LocalStackContainer.Service.KINESIS).toString(); + String region = localStack.getRegion(); + + // Create Kinesis client pointing to LocalStack + KinesisClient kinesisClient = KinesisClient.builder() + .endpointOverride(URI.create(endpoint)) + .region(Region.of(region)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create("accessKey", "secretKey"))) // LocalStack dummy credentials + .build(); + + // Create a test stream for integration tests + try { + kinesisClient.createStream(CreateStreamRequest.builder() + .streamName(streamName) + .shardCount(1) + .build()); + } + catch (ResourceInUseException e) { + // Stream already exists, ignore + } + + // Wait for stream to become ACTIVE before tests run + waitForStreamToBecomeActive(kinesisClient, streamName); + + // Return configuration properties for tests to use + return Map.of( + "kinesis.endpoint", endpoint, + "kinesis.region", region, + "kinesis.streamName", "test-stream"); + } + + /** + * Stops the LocalStack container. + * This method is called after all tests have completed. + */ + @Override + public void stop() { + if (localStack != null) { + localStack.stop(); + } + } + + /** + * Waits for the Kinesis stream to reach ACTIVE status. + * This is necessary because stream creation is asynchronous and tests + * will fail if they try to use a stream that isn't ready yet. + * + * @param client the Kinesis client to use for status checks + * @param streamName the name of the stream to wait for + * @throws RuntimeException if the stream doesn't become ACTIVE within 30 seconds + */ + private void waitForStreamToBecomeActive(KinesisClient client, String streamName) { + // Poll stream status every second for up to 30 seconds + for (int i = 0; i < 30; i++) { + var desc = client.describeStream(b -> b.streamName(streamName)); + if ("ACTIVE".equals(desc.streamDescription().streamStatusAsString())) { + return; // Stream is ready + } + try { + Thread.sleep(1000); // Wait 1 second before next check + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + throw new RuntimeException("Stream did not become ACTIVE within timeout"); + } +} From 02f731aa223c5e2d6c8297ab65879d701fcbec28 Mon Sep 17 00:00:00 2001 From: Pranav Tiwari Date: Sun, 12 Oct 2025 11:54:54 +0530 Subject: [PATCH 6/6] DBZ-9439- Added integeration tests. Signed-off-by: pranavt84 --- ...or.java => AmazonKinesisConnectionValidator.java} | 8 ++++---- ....java => AmazonKinesisConnectionValidatorIT.java} | 12 ++++++------ ...ava => AmazonKinesisConnectionValidatorTest.java} | 10 +++++----- ...tResource.java => AmazonKinesisTestResource.java} | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) rename debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/{KinesisConnectionValidator.java => AmazonKinesisConnectionValidator.java} (94%) rename debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/{KinesisConnectionValidatorIT.java => AmazonKinesisConnectionValidatorIT.java} (97%) rename debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/{KinesisConnectionValidatorTest.java => AmazonKinesisConnectionValidatorTest.java} (96%) rename debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/{KinesisTestResource.java => AmazonKinesisTestResource.java} (98%) diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AmazonKinesisConnectionValidator.java similarity index 94% rename from debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java rename to debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AmazonKinesisConnectionValidator.java index 7f2e952d..a9718aa8 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/KinesisConnectionValidator.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/AmazonKinesisConnectionValidator.java @@ -31,9 +31,9 @@ */ @ApplicationScoped @Named("AMAZON_KINESIS") -public class KinesisConnectionValidator implements ConnectionValidator { +public class AmazonKinesisConnectionValidator implements ConnectionValidator { - private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConnectionValidator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AmazonKinesisConnectionValidator.class); private static final String REGION_KEY = "region"; private static final String STREAM_NAME_KEY = "stream"; @@ -43,8 +43,8 @@ public class KinesisConnectionValidator implements ConnectionValidator { private final int defaultTimeout; - public KinesisConnectionValidator( - @ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) { + public AmazonKinesisConnectionValidator( + @ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) { this.defaultTimeout = defaultTimeout; } diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorIT.java similarity index 97% rename from debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorIT.java rename to debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorIT.java index fcbcb2df..7bc57dd7 100644 --- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorIT.java +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorIT.java @@ -21,23 +21,23 @@ import io.debezium.platform.data.dto.ConnectionValidationResult; import io.debezium.platform.data.model.ConnectionEntity; import io.debezium.platform.domain.views.Connection; -import io.debezium.platform.environment.connection.destination.KinesisConnectionValidator; -import io.debezium.platform.environment.database.db.KinesisTestResource; +import io.debezium.platform.environment.connection.destination.AmazonKinesisConnectionValidator; +import io.debezium.platform.environment.database.db.AmazonKinesisTestResource; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; /** - * Integration tests for {@link KinesisConnectionValidator}. + * Integration tests for {@link AmazonKinesisConnectionValidator}. * Tests the validator against a real LocalStack container running Kinesis. * * @author Pranav Tiwari */ @QuarkusTest -@QuarkusTestResource(KinesisTestResource.class) -public class KinesisConnectionValidatorIT { +@QuarkusTestResource(AmazonKinesisTestResource.class) +public class AmazonKinesisConnectionValidatorIT { @Inject - KinesisConnectionValidator validator; + AmazonKinesisConnectionValidator validator; // ==================== SUCCESSFUL CONNECTION TESTS ==================== diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorTest.java similarity index 96% rename from debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java rename to debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorTest.java index dd8447f0..25305912 100644 --- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/KinesisConnectionValidatorTest.java +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/AmazonKinesisConnectionValidatorTest.java @@ -20,22 +20,22 @@ import io.debezium.platform.data.dto.ConnectionValidationResult; import io.debezium.platform.data.model.ConnectionEntity; import io.debezium.platform.domain.views.Connection; -import io.debezium.platform.environment.connection.destination.KinesisConnectionValidator; +import io.debezium.platform.environment.connection.destination.AmazonKinesisConnectionValidator; /** - * Unit tests for {@link KinesisConnectionValidator}. + * Unit tests for {@link AmazonKinesisConnectionValidator}. * Tests configuration validation logic without actual AWS connectivity. * * @author Pranav Tiwari */ -class KinesisConnectionValidatorTest { +class AmazonKinesisConnectionValidatorTest { private static final int DEFAULT_TIMEOUT = 30; - private KinesisConnectionValidator validator; + private AmazonKinesisConnectionValidator validator; @BeforeEach void setUp() { - validator = new KinesisConnectionValidator(DEFAULT_TIMEOUT); + validator = new AmazonKinesisConnectionValidator(DEFAULT_TIMEOUT); } // ==================== NULL AND EMPTY CONFIGURATION TESTS ==================== diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/AmazonKinesisTestResource.java similarity index 98% rename from debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java rename to debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/AmazonKinesisTestResource.java index f80655e9..5cd4bdae 100644 --- a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/KinesisTestResource.java +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/AmazonKinesisTestResource.java @@ -68,7 +68,7 @@ * * @author Pranav Kumar Tiwari */ -public class KinesisTestResource implements QuarkusTestResourceLifecycleManager { +public class AmazonKinesisTestResource implements QuarkusTestResourceLifecycleManager { private LocalStackContainer localStack; private final String streamName = "test-stream";