Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@
/** The class managing deserialization exceptions. */
@Slf4j
public class DlqDeserializationExceptionHandler extends DlqExceptionHandler implements DeserializationExceptionHandler {
private boolean handleSchemaRegistryRestException;
private boolean continueOnUnhandledErrors;

/** Constructor. */
public DlqDeserializationExceptionHandler() {}

/**
* Handles deserialization errors by routing records to the DLQ and deciding whether to continue or fail processing
* based on configured rules.
* Handles deserialization exceptions by routing the record to the DLQ topic.
*
* @param context The error handler context
* @param consumerRecord The record that failed deserialization
* @param exception The exception that occurred
* @return A {@link Response} indicating how to proceed
*/
@Override
public Response handleError(
ErrorHandlerContext context, ConsumerRecord<byte[], byte[]> consumerRecord, Exception exception) {

log.warn(
"Exception during Deserialization, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}",
"Exception during deserialization, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}",
context.processorNodeId(),
context.taskId(),
context.topic(),
Expand All @@ -64,34 +65,59 @@ public Response handleError(
exception);

if (isDlqNotDefined()) {
log.warn("Failed to route deserialization error to DLQ.");
log.warn("Failed to route deserialization error to DLQ. Define a DLQ topic in configuration.");
return Response.fail();
}

if (shouldNotResume(exception)) {
return Response.fail();
}

try {
KafkaError error = buildKafkaError(context, consumerRecord, exception);
byte[] value = serializeError(error);
return shouldResume(exception) ? resumeWithDlqRecord(consumerRecord, value) : Response.fail();
Serde<KafkaError> serde = SerdesUtils.getValueSerdes();
byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error);
return Response.resume(List.of(
new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), value)));
} catch (Exception e) {
log.error("Cannot send deserialization exception to DLQ topic {}", deadLetterQueueTopic, e);
log.error(
"Cannot send deserialization exception to DLQ topic {}",
KafkaStreamsExecutionContext.getDlqTopicName(),
e);
return Response.fail();
}
}

/** Determines if the exception should be handled by continuing processing based on known handled scenarios. */
private boolean shouldResume(Exception exception) {
/**
* Determines whether processing should not resume after the exception.
*
* @param exception The exception that occurred
* @return {@code true} if processing should not resume, {@code false} otherwise
*/
private boolean shouldNotResume(Exception exception) {
Throwable cause = exception.getCause() != null ? exception.getCause() : exception;

boolean isCausedByKafka = cause instanceof KafkaException;
boolean isRestClientSchemaRegistryException = cause instanceof RestClientException;
boolean handleSchemaRegistryRestException = KafkaStreamsExecutionContext.isDlqFeatureEnabled(
DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION);
boolean continueOnUnhandledErrors = KafkaStreamsExecutionContext.isDlqFeatureEnabled(
KstreamplifyConfig.DLQ_DESERIALIZATION_HANDLER_CONTINUE_ON_UNHANDLED_ERRORS);

return isCausedByKafka
return !(isCausedByKafka
|| cause == null
|| (isRestClientSchemaRegistryException && handleSchemaRegistryRestException)
|| continueOnUnhandledErrors;
|| continueOnUnhandledErrors);
}

/** Builds a KafkaError enriched with record metadata and exception details for DLQ publishing. */
/**
* Builds a {@link KafkaError} from the record metadata and exception details.
*
* @param context The error handler context
* @param consumerRecord The record that failed deserialization
* @param exception The exception that occurred
* @return The built {@link KafkaError}
*/
private KafkaError buildKafkaError(
ErrorHandlerContext context, ConsumerRecord<byte[], byte[]> consumerRecord, Exception exception) {

Expand All @@ -111,24 +137,13 @@ private KafkaError buildKafkaError(
.build();
}

/** Serializes the KafkaError into a byte array for DLQ topic. */
private byte[] serializeError(KafkaError error) {
Serde<KafkaError> serde = SerdesUtils.getValueSerdes();
return serde.serializer().serialize(deadLetterQueueTopic, error);
}

/** Creates a DLQ record and returns a resume response to continue processing. */
private Response resumeWithDlqRecord(ConsumerRecord<byte[], byte[]> consumerRecord, byte[] value) {
return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, consumerRecord.key(), value)));
}

/** {@inheritDoc} */
/**
* Configures the handler.
*
* @param configs The configuration map
*/
@Override
public void configure(Map<String, ?> configs) {
deadLetterQueueTopic = KafkaStreamsExecutionContext.getDlqTopicName();
handleSchemaRegistryRestException = KafkaStreamsExecutionContext.isDlqFeatureEnabled(
DLQ_DESERIALIZATION_HANDLER_FORWARD_REST_CLIENT_EXCEPTION);
continueOnUnhandledErrors = KafkaStreamsExecutionContext.isDlqFeatureEnabled(
KstreamplifyConfig.DLQ_DESERIALIZATION_HANDLER_CONTINUE_ON_UNHANDLED_ERRORS);
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.michelin.kstreamplify.error;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
Expand All @@ -29,8 +30,6 @@
/** The class to manage DLQ exception. */
@Slf4j
public abstract class DlqExceptionHandler {
/** The DLQ topic */
protected String deadLetterQueueTopic;

/** Constructor. */
protected DlqExceptionHandler() {}
Expand All @@ -46,7 +45,6 @@ protected DlqExceptionHandler() {}
*/
protected KafkaError.Builder enrichWithException(
KafkaError.Builder builder, Exception exception, byte[] key, byte[] value) {

StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
Expand All @@ -70,6 +68,6 @@ protected KafkaError.Builder enrichWithException(
* @return {@code true} if the dead letter queue topic is not defined, {@code false} otherwise
*/
protected boolean isDlqNotDefined() {
return StringUtils.isBlank(deadLetterQueueTopic);
return StringUtils.isBlank(KafkaStreamsExecutionContext.getDlqTopicName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ public class DlqProcessingExceptionHandler extends DlqExceptionHandler implement
/** Constructor. */
public DlqProcessingExceptionHandler() {}

/**
* Handles processing exceptions by routing the record to the DLQ topic.
*
* @param context The error handler context
* @param processingRecord The record that failed processing
* @param exception The exception that occurred
* @return A {@link Response} indicating how to proceed
*/
@Override
public Response handleError(ErrorHandlerContext context, Record<?, ?> processingRecord, Exception exception) {
log.warn(
"Exception during message Processing, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}",
"Exception during processing, processor node: {}, taskId: {}, topic: {}, partition: {}, offset: {}",
context.processorNodeId(),
context.taskId(),
context.topic(),
Expand All @@ -52,56 +60,76 @@ public Response handleError(ErrorHandlerContext context, Record<?, ?> processing
exception);

if (isDlqNotDefined()) {
log.warn("Failed to route processing error to DLQ.");
log.warn("Failed to route processing error to DLQ. Define a DLQ topic in configuration.");
return Response.fail();
}

try {
KafkaError.Builder builder = KafkaError.newBuilder()
.setContextMessage(
"An exception occurred during the stream processing of a record. Please find more details about the exception in the cause and stack fields.")
.setOffset(context.offset())
.setPartition(context.partition())
.setTopic(context.topic())
.setApplicationId(
KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG))
.setProcessorNodeId(context.processorNodeId())
.setTaskId(context.taskId().toString())
.setSourceRawKey(ByteBuffer.wrap(context.sourceRawKey()))
.setSourceRawValue(ByteBuffer.wrap(context.sourceRawValue()))
.setValue(
processingRecord.value() == null
? null
: processingRecord.value().toString());

KafkaError error = enrichWithException(
builder,
exception,
processingRecord.key() != null
? processingRecord.key().toString().getBytes()
: null,
processingRecord.value() != null
? processingRecord.value().toString().getBytes()
: null)
.build();

KafkaError error = buildKafkaError(context, processingRecord, exception);
Serde<KafkaError> serde = SerdesUtils.getValueSerdes();
byte[] value = serde.serializer().serialize(deadLetterQueueTopic, error);
byte[] value = serde.serializer().serialize(KafkaStreamsExecutionContext.getDlqTopicName(), error);

byte[] key = processingRecord.key() != null
? processingRecord.key().toString().getBytes()
: null;

return Response.resume(List.of(new ProducerRecord<>(deadLetterQueueTopic, key, value)));
return Response.resume(
List.of(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), key, value)));
} catch (Exception e) {
log.error("Cannot send processing exception to DLQ topic {}", deadLetterQueueTopic, e);
log.error(
"Cannot send processing exception to DLQ topic {}",
KafkaStreamsExecutionContext.getDlqTopicName(),
e);
return Response.fail();
}
}

/** {@inheritDoc} */
/**
* Builds a {@link KafkaError} from the record metadata and exception details.
*
* @param context The error handler context
* @param processingRecord The record that failed processing
* @param exception The exception that occurred
* @return The built {@link KafkaError}
*/
private KafkaError buildKafkaError(
ErrorHandlerContext context, Record<?, ?> processingRecord, Exception exception) {

KafkaError.Builder builder = KafkaError.newBuilder()
.setContextMessage(
"An exception occurred during the stream processing of a record. Please find more details about the exception in the cause and stack fields.")
.setOffset(context.offset())
.setPartition(context.partition())
.setTopic(context.topic())
.setApplicationId(KafkaStreamsExecutionContext.getProperties().getProperty(APPLICATION_ID_CONFIG))
.setProcessorNodeId(context.processorNodeId())
.setTaskId(context.taskId().toString())
.setSourceRawKey(ByteBuffer.wrap(context.sourceRawKey()))
.setSourceRawValue(ByteBuffer.wrap(context.sourceRawValue()))
.setValue(
processingRecord.value() == null
? null
: processingRecord.value().toString());

return enrichWithException(
builder,
exception,
processingRecord.key() != null
? processingRecord.key().toString().getBytes()
: null,
processingRecord.value() != null
? processingRecord.value().toString().getBytes()
: null)
.build();
}

/**
* Configures the handler.
*
* @param configs The configuration map
*/
@Override
public void configure(Map<String, ?> configs) {
deadLetterQueueTopic = KafkaStreamsExecutionContext.getDlqTopicName();
// Do nothing
}
}
Loading