diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index c5cd580d..835c6cc2 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -40,22 +40,23 @@ /** The class managing deserialization exceptions. */ @Slf4j public class DlqDeserializationExceptionHandler extends DlqExceptionHandler implements DeserializationExceptionHandler { - private boolean handleSchemaRegistryRestException; - private boolean continueOnUnhandledErrors; /** Constructor. */ public DlqDeserializationExceptionHandler() {} /** - * Handles deserialization errors by routing records to the DLQ and deciding whether to continue or fail processing - * based on configured rules. + * Handles deserialization exceptions by routing the record to the DLQ topic. + * + * @param context The error handler context + * @param consumerRecord The record that failed deserialization + * @param exception The exception that occurred + * @return A {@link Response} indicating how to proceed */ @Override public Response handleError( ErrorHandlerContext context, ConsumerRecord consumerRecord, Exception exception) { - log.warn( - "Exception during Deserialization, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", + "Exception during deserialization, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", context.processorNodeId(), context.taskId(), context.topic(), @@ -64,34 +65,59 @@ public Response handleError( exception); if (isDlqNotDefined()) { - log.warn("Failed to route deserialization error to DLQ."); + log.warn("Failed to route deserialization error to DLQ. Define a DLQ topic in configuration."); + return Response.fail(); + } + + if (shouldNotResume(exception)) { return Response.fail(); } try { KafkaError error = buildKafkaError(context, consumerRecord, exception); - byte[] value = serializeError(error); - return shouldResume(exception) ? resumeWithDlqRecord(consumerRecord, value) : Response.fail(); + Serde serde = SerdesUtils.getValueSerdes(); + byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error); + return Response.resume(List.of( + new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), value))); } catch (Exception e) { - log.error("Cannot send deserialization exception to DLQ topic {}", deadLetterQueueTopic, e); + log.error( + "Cannot send deserialization exception to DLQ topic {}", + KafkaStreamsExecutionContext.getDlqTopicName(), + e); return Response.fail(); } } - /** Determines if the exception should be handled by continuing processing based on known handled scenarios. */ - private boolean shouldResume(Exception exception) { + /** + * Determines whether processing should not resume after the exception. + * + * @param exception The exception that occurred + * @return {@code true} if processing should not resume, {@code false} otherwise + */ + private boolean shouldNotResume(Exception exception) { Throwable cause = exception.getCause() != null ? exception.getCause() : exception; boolean isCausedByKafka = cause instanceof KafkaException; boolean isRestClientSchemaRegistryException = cause instanceof RestClientException; + boolean handleSchemaRegistryRestException = KafkaStreamsExecutionContext.isDlqFeatureEnabled( + DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION); + boolean continueOnUnhandledErrors = KafkaStreamsExecutionContext.isDlqFeatureEnabled( + KstreamplifyConfig.DLQ_DESERIALIZATION_HANDLER_CONTINUE_ON_UNHANDLED_ERRORS); - return isCausedByKafka + return !(isCausedByKafka || cause == null || (isRestClientSchemaRegistryException && handleSchemaRegistryRestException) - || continueOnUnhandledErrors; + || continueOnUnhandledErrors); } - /** Builds a KafkaError enriched with record metadata and exception details for DLQ publishing. */ + /** + * Builds a {@link KafkaError} from the record metadata and exception details. + * + * @param context The error handler context + * @param consumerRecord The record that failed deserialization + * @param exception The exception that occurred + * @return The built {@link KafkaError} + */ private KafkaError buildKafkaError( ErrorHandlerContext context, ConsumerRecord consumerRecord, Exception exception) { @@ -111,24 +137,13 @@ private KafkaError buildKafkaError( .build(); } - /** Serializes the KafkaError into a byte array for DLQ topic. */ - private byte[] serializeError(KafkaError error) { - Serde serde = SerdesUtils.getValueSerdes(); - return serde.serializer().serialize(deadLetterQueueTopic, error); - } - - /** Creates a DLQ record and returns a resume response to continue processing. */ - private Response resumeWithDlqRecord(ConsumerRecord consumerRecord, byte[] value) { - return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, consumerRecord.key(), value))); - } - - /** {@inheritDoc} */ + /** + * Configures the handler. + * + * @param configs The configuration map + */ @Override public void configure(Map configs) { - deadLetterQueueTopic = KafkaStreamsExecutionContext.getDlqTopicName(); - handleSchemaRegistryRestException = KafkaStreamsExecutionContext.isDlqFeatureEnabled( - DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION); - continueOnUnhandledErrors = KafkaStreamsExecutionContext.isDlqFeatureEnabled( - KstreamplifyConfig.DLQ_DESERIALIZATION_HANDLER_CONTINUE_ON_UNHANDLED_ERRORS); + // Do nothing } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java index 6aee746f..db4a0daf 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java @@ -19,6 +19,7 @@ package com.michelin.kstreamplify.error; import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; @@ -29,8 +30,6 @@ /** The class to manage DLQ exception. */ @Slf4j public abstract class DlqExceptionHandler { - /** The DLQ topic */ - protected String deadLetterQueueTopic; /** Constructor. */ protected DlqExceptionHandler() {} @@ -46,7 +45,6 @@ protected DlqExceptionHandler() {} */ protected KafkaError.Builder enrichWithException( KafkaError.Builder builder, Exception exception, byte[] key, byte[] value) { - StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); exception.printStackTrace(pw); @@ -70,6 +68,6 @@ protected KafkaError.Builder enrichWithException( * @return {@code true} if the dead letter queue topic is not defined, {@code false} otherwise */ protected boolean isDlqNotDefined() { - return StringUtils.isBlank(deadLetterQueueTopic); + return StringUtils.isBlank(KafkaStreamsExecutionContext.getDlqTopicName()); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProcessingExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProcessingExceptionHandler.java index 21320d44..8f034794 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProcessingExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProcessingExceptionHandler.java @@ -40,10 +40,18 @@ public class DlqProcessingExceptionHandler extends DlqExceptionHandler implement /** Constructor. */ public DlqProcessingExceptionHandler() {} + /** + * Handles processing exceptions by routing the record to the DLQ topic. + * + * @param context The error handler context + * @param processingRecord The record that failed processing + * @param exception The exception that occurred + * @return A {@link Response} indicating how to proceed + */ @Override public Response handleError(ErrorHandlerContext context, Record processingRecord, Exception exception) { log.warn( - "Exception during message Processing, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", + "Exception during processing, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", context.processorNodeId(), context.taskId(), context.topic(), @@ -52,56 +60,76 @@ public Response handleError(ErrorHandlerContext context, Record processing exception); if (isDlqNotDefined()) { - log.warn("Failed to route processing error to DLQ."); + log.warn("Failed to route processing error to DLQ. Define a DLQ topic in configuration."); return Response.fail(); } try { - KafkaError.Builder builder = KafkaError.newBuilder() - .setContextMessage( - "An exception occurred during the stream processing of a record. Please find more details about the exception in the cause and stack fields.") - .setOffset(context.offset()) - .setPartition(context.partition()) - .setTopic(context.topic()) - .setApplicationId( - KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG)) - .setProcessorNodeId(context.processorNodeId()) - .setTaskId(context.taskId().toString()) - .setSourceRawKey(ByteBuffer.wrap(context.sourceRawKey())) - .setSourceRawValue(ByteBuffer.wrap(context.sourceRawValue())) - .setValue( - processingRecord.value() == null - ? null - : processingRecord.value().toString()); - - KafkaError error = enrichWithException( - builder, - exception, - processingRecord.key() != null - ? processingRecord.key().toString().getBytes() - : null, - processingRecord.value() != null - ? processingRecord.value().toString().getBytes() - : null) - .build(); - + KafkaError error = buildKafkaError(context, processingRecord, exception); Serde serde = SerdesUtils.getValueSerdes(); - byte[] value = serde.serializer().serialize(deadLetterQueueTopic, error); + byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error); byte[] key = processingRecord.key() != null ? processingRecord.key().toString().getBytes() : null; - return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, key, value))); + return Response.resume( + List.of(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), key, value))); } catch (Exception e) { - log.error("Cannot send processing exception to DLQ topic {}", deadLetterQueueTopic, e); + log.error( + "Cannot send processing exception to DLQ topic {}", + KafkaStreamsExecutionContext.getDlqTopicName(), + e); return Response.fail(); } } - /** {@inheritDoc} */ + /** + * Builds a {@link KafkaError} from the record metadata and exception details. + * + * @param context The error handler context + * @param processingRecord The record that failed processing + * @param exception The exception that occurred + * @return The built {@link KafkaError} + */ + private KafkaError buildKafkaError( + ErrorHandlerContext context, Record processingRecord, Exception exception) { + + KafkaError.Builder builder = KafkaError.newBuilder() + .setContextMessage( + "An exception occurred during the stream processing of a record. Please find more details about the exception in the cause and stack fields.") + .setOffset(context.offset()) + .setPartition(context.partition()) + .setTopic(context.topic()) + .setApplicationId(KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG)) + .setProcessorNodeId(context.processorNodeId()) + .setTaskId(context.taskId().toString()) + .setSourceRawKey(ByteBuffer.wrap(context.sourceRawKey())) + .setSourceRawValue(ByteBuffer.wrap(context.sourceRawValue())) + .setValue( + processingRecord.value() == null + ? null + : processingRecord.value().toString()); + + return enrichWithException( + builder, + exception, + processingRecord.key() != null + ? processingRecord.key().toString().getBytes() + : null, + processingRecord.value() != null + ? processingRecord.value().toString().getBytes() + : null) + .build(); + } + + /** + * Configures the handler. + * + * @param configs The configuration map + */ @Override public void configure(Map configs) { - deadLetterQueueTopic = KafkaStreamsExecutionContext.getDlqTopicName(); + // Do nothing } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index b24d7856..155bf925 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -33,32 +33,26 @@ import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; -/** - * Production exception handler that routes failed records to a Dead Letter Queue (DLQ) topic. Handles both general - * production errors and serialization errors by wrapping failed records in a {@link KafkaError} and sending them to the - * configured DLQ topic. - */ +/** The class managing production exceptions. */ @Slf4j public class DlqProductionExceptionHandler extends DlqExceptionHandler implements ProductionExceptionHandler { - private boolean continueOnSerializationException; /** Constructor. */ public DlqProductionExceptionHandler() {} /** - * Handles production exceptions by routing failed records to the DLQ topic. Returns FAIL if no DLQ is configured, - * RETRY for retriable exceptions, or RESUME after sending to DLQ. + * Handles production exceptions by routing the record to the DLQ topic. * - * @param context the error handler context with metadata about the error - * @param producerRecord the producer record that failed to be produced - * @param exception the exception that occurred during production - * @return a {@link Response} indicating how to proceed (FAIL, RETRY, or RESUME) + * @param context The error handler context + * @param producerRecord The record that failed production + * @param exception The exception that occurred + * @return A {@link Response} indicating how to proceed */ @Override public Response handleError( ErrorHandlerContext context, ProducerRecord producerRecord, Exception exception) { log.warn( - "Exception during message Production, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", + "Exception during production, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", context.processorNodeId(), context.taskId(), context.topic(), @@ -66,34 +60,24 @@ public Response handleError( context.offset(), exception); + if (exception instanceof RetriableException) { + return Response.retry(); + } + if (isDlqNotDefined()) { log.warn("Failed to route production error to DLQ. Define a DLQ topic in configuration."); return Response.fail(); } - if (exception instanceof RetriableException) { - return Response.retry(); - } - try { - KafkaError.Builder builder = KafkaError.newBuilder() - .setContextMessage( - "An exception occurred during the stream internal production. Please find more details about the exception in the cause and stack fields.") - .setOffset(context.offset()) - .setPartition(context.partition()) - .setTopic(producerRecord.topic()) - .setApplicationId( - KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG)) - .setProcessorNodeId(context.processorNodeId()) - .setTaskId(context.taskId().toString()); - - KafkaError error = enrichWithException(builder, exception, producerRecord.key(), producerRecord.value()) - .build(); + KafkaError error = buildKafkaError( + context, producerRecord.topic(), producerRecord.key(), producerRecord.value(), exception, null); Serde serde = SerdesUtils.getValueSerdes(); - byte[] value = serde.serializer().serialize(deadLetterQueueTopic, error); + byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error); - return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, producerRecord.key(), value))); + return Response.resume(List.of( + new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), producerRecord.key(), value))); } catch (Exception e) { log.error( "Cannot send production exception to DLQ topic {}", @@ -104,15 +88,13 @@ public Response handleError( } /** - * Handles serialization exceptions by routing failed records with raw bytes to the DLQ topic. Behavior is - * controlled by the {@code continueOnSerializationException} flag. Returns FAIL if the flag is disabled or no DLQ - * is configured, otherwise RESUME after sending to DLQ. + * Handles serialization exceptions by routing the record to the DLQ topic. * - * @param context the error handler context with access to source raw bytes - * @param record the producer record that failed serialization - * @param exception the serialization exception that occurred - * @param origin the origin of the serialization exception (KEY or VALUE) - * @return a {@link Response} indicating how to proceed (FAIL or RESUME) + * @param context The error handler context + * @param record The record that failed serialization + * @param exception The exception that occurred + * @param origin The origin of the serialization exception + * @return A {@link Response} indicating how to proceed */ @Override public Response handleSerializationError( @@ -121,7 +103,7 @@ public Response handleSerializationError( Exception exception, SerializationExceptionOrigin origin) { log.warn( - "Serialization exception during message Production, origin: {}, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", + "Exception during serialization, origin: {}, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}", origin, context.processorNodeId(), context.taskId(), @@ -130,53 +112,78 @@ public Response handleSerializationError( context.offset(), exception); - if (!continueOnSerializationException) { + if (isDlqNotDefined()) { + log.warn("Failed to route serialization error to DLQ. Define a DLQ topic in configuration."); return Response.fail(); } - if (isDlqNotDefined()) { - log.warn("Failed to route serialization error to DLQ. Define a DLQ topic in configuration."); + boolean continueOnSerializationException = KafkaStreamsExecutionContext.isDlqFeatureEnabled( + DLQ_PRODUCTION_HANDLER_CONTINUE_ON_SERIALIZATION_EXCEPTION); + if (!continueOnSerializationException) { return Response.fail(); } try { - KafkaError.Builder builder = KafkaError.newBuilder() - .setContextMessage("A serialization exception occurred during the stream internal production. " - + "Origin: " + origin + ". " - + "Please find more details about the exception in the cause and stack fields.") - .setOffset(context.offset()) - .setPartition(context.partition()) - .setTopic(record.topic()) - .setApplicationId( - KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG)) - .setProcessorNodeId(context.processorNodeId()) - .setTaskId(context.taskId().toString()); - - KafkaError error = enrichWithException(builder, exception, context.sourceRawKey(), context.sourceRawValue()) - .build(); - + KafkaError error = buildKafkaError( + context, record.topic(), context.sourceRawKey(), context.sourceRawValue(), exception, origin); Serde serde = SerdesUtils.getValueSerdes(); - byte[] value = serde.serializer().serialize(deadLetterQueueTopic, error); - - return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, context.sourceRawKey(), value))); + byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error); + return Response.resume(List.of(new ProducerRecord<>( + KafkaStreamsExecutionContext.getDlqTopicName(), context.sourceRawKey(), value))); } catch (Exception e) { log.error( "Cannot send serialization exception to DLQ topic {}", KafkaStreamsExecutionContext.getDlqTopicName(), e); - return Response.resume(); + return Response.fail(); } } /** - * Configures the handler with DLQ topic name and serialization exception handling flag. + * Builds a {@link KafkaError} from the record metadata and exception details. * - * @param configs the configuration map provided by Kafka Streams + * @param context The error handler context + * @param topic The topic of the record that failed + * @param key The raw key of the record that failed + * @param value The raw value of the record that failed + * @param exception The exception that occurred + * @param origin The origin of the serialization exception, or {@code null} for a production exception + * @return The built {@link KafkaError} + */ + private KafkaError buildKafkaError( + ErrorHandlerContext context, + String topic, + byte[] key, + byte[] value, + Exception exception, + SerializationExceptionOrigin origin) { + + String contextMessage = origin == null + ? "An exception occurred during the stream internal production. " + + "Please find more details about the exception in the cause and stack fields." + : "A serialization exception occurred during the stream internal production. " + + "Origin: " + origin + ". " + + "Please find more details about the exception in the cause and stack fields."; + + KafkaError.Builder builder = KafkaError.newBuilder() + .setContextMessage(contextMessage) + .setOffset(context.offset()) + .setPartition(context.partition()) + .setTopic(topic) + .setApplicationId(KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG)) + .setProcessorNodeId(context.processorNodeId()) + .setTaskId(context.taskId().toString()); + + return enrichWithException(builder, exception, key, value).build(); + } + + /** + * Configures the handler. + * + * @param configs The configuration map */ @Override public void configure(Map configs) { - deadLetterQueueTopic = KafkaStreamsExecutionContext.getDlqTopicName(); - continueOnSerializationException = KafkaStreamsExecutionContext.isDlqFeatureEnabled( - DLQ_PRODUCTION_HANDLER_CONTINUE_ON_SERIALIZATION_EXCEPTION); + // Do nothing } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java index eaa1ed0a..727e5af9 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java @@ -34,7 +34,7 @@ class GenericErrorProcessor extends ContextualFixedKeyProcessor> fixedKeyRecord) { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java index bde661fa..a0ba1bad 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java @@ -134,7 +134,6 @@ void shouldReturnContinueOnKafkaException() { void shouldContinueOnRestClientExceptionWhenFeatureFlagEnabled() { DlqDeserializationExceptionHandler handler = new DlqDeserializationExceptionHandler(); - // Enable the feature flag Properties props = new Properties(); props.setProperty(APPLICATION_ID_CONFIG, "test-app"); props.setProperty(DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION, "true"); @@ -164,7 +163,6 @@ void shouldContinueOnRestClientExceptionWhenFeatureFlagEnabled() { void shouldFailOnRestClientExceptionWhenFeatureFlagDisabled() { DlqDeserializationExceptionHandler handler = new DlqDeserializationExceptionHandler(); - // Disable the feature flag Properties props = new Properties(); props.setProperty(APPLICATION_ID_CONFIG, "test-app"); props.setProperty(DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION, "false"); @@ -173,14 +171,6 @@ void shouldFailOnRestClientExceptionWhenFeatureFlagDisabled() { handler.configure(Map.of()); - when(consumerRecord.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.topic()).thenReturn("topic"); - when(errorHandlerContext.taskId()).thenReturn(new TaskId(0, 0)); - when(errorHandlerContext.partition()).thenReturn(0); - when(errorHandlerContext.sourceRawKey()).thenReturn("sourceKey".getBytes(StandardCharsets.UTF_8)); - when(errorHandlerContext.sourceRawValue()).thenReturn("sourceValue".getBytes(StandardCharsets.UTF_8)); - Exception wrapped = new Exception("Wrapper", new RestClientException("schema error", 500, 500)); DeserializationExceptionHandler.Response response = @@ -202,14 +192,6 @@ void shouldFailOnRestClientExceptionWhenFeatureFlagNotProvided() { handler.configure(Map.of()); - when(consumerRecord.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.topic()).thenReturn("topic"); - when(errorHandlerContext.taskId()).thenReturn(new TaskId(0, 0)); - when(errorHandlerContext.partition()).thenReturn(0); - when(errorHandlerContext.sourceRawKey()).thenReturn("sourceKey".getBytes(StandardCharsets.UTF_8)); - when(errorHandlerContext.sourceRawValue()).thenReturn("sourceValue".getBytes(StandardCharsets.UTF_8)); - Exception wrapped = new Exception("Wrapper", new RestClientException("schema error", 500, 500)); DeserializationExceptionHandler.Response response = @@ -255,7 +237,6 @@ void shouldContinueOnUnhandledExceptionWhenFeatureFlagEnabled() { void shouldFailOnUnhandledExceptionWhenFeatureFlagDisabled() { DlqDeserializationExceptionHandler handler = new DlqDeserializationExceptionHandler(); - // Do NOT enable flag Properties props = new Properties(); props.setProperty(APPLICATION_ID_CONFIG, "test-app"); @@ -264,14 +245,6 @@ void shouldFailOnUnhandledExceptionWhenFeatureFlagDisabled() { handler.configure(Map.of()); - when(consumerRecord.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); - when(consumerRecord.topic()).thenReturn("topic"); - when(errorHandlerContext.taskId()).thenReturn(new TaskId(0, 0)); - when(errorHandlerContext.partition()).thenReturn(0); - when(errorHandlerContext.sourceRawKey()).thenReturn("sourceKey".getBytes(StandardCharsets.UTF_8)); - when(errorHandlerContext.sourceRawValue()).thenReturn("sourceValue".getBytes(StandardCharsets.UTF_8)); - Exception wrapped = new Exception("Wrapper", new RuntimeException("random error")); DeserializationExceptionHandler.Response response = diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java index f54aa923..e5996aef 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java @@ -231,7 +231,7 @@ void shouldRouteSerializationErrorToDlqWhenFlagEnabled() { } @Test - void shouldReturnResumeOnExceptionDuringSerializationErrorHandling() { + void shouldFailOnExceptionDuringSerializationErrorHandling() { Properties properties = new Properties(); properties.setProperty(APPLICATION_ID_CONFIG, "test-app"); properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"); @@ -251,7 +251,7 @@ void shouldReturnResumeOnExceptionDuringSerializationErrorHandling() { new KafkaException("Serialization failed"), SerializationExceptionOrigin.KEY); - assertEquals(ProductionExceptionHandler.Result.RESUME, response.result()); + assertEquals(ProductionExceptionHandler.Result.FAIL, response.result()); assertTrue(response.deadLetterQueueRecords().isEmpty()); } }