Skip to content

Add payload-aware DeserializationExceptionHandler#372

Open
Kino1994 wants to merge 1 commit intoAbsaOSS:masterfrom
Kino1994:feature/deserialization-handler-with-payload
Open

Add payload-aware DeserializationExceptionHandler#372
Kino1994 wants to merge 1 commit intoAbsaOSS:masterfrom
Kino1994:feature/deserialization-handler-with-payload

Conversation

@Kino1994
Copy link

@Kino1994 Kino1994 commented Feb 26, 2026

Motivation and Context

When Avro deserialization fails in ABRiS, the current DeserializationExceptionHandler interface provides:

trait DeserializationExceptionHandler extends Serializable {
  def handle(exception: Throwable, deserializer: AbrisAvroDeserializer, readerSchema: Schema): Any
}

This gives the handler access to the exception, the deserializer, and the reader schema — but not the raw binary payload that caused the failure.

This is a significant limitation for production Kafka/Spark streaming pipelines where teams need to:

  • Dead Letter Queue (DLQ): Route the corrupted raw message to a DLQ topic for later inspection, reprocessing, or manual intervention.
  • Audit & observability: Log or emit metrics about malformed records, including their raw content, to understand data quality issues upstream.
  • Regulatory / compliance: In regulated industries (finance, healthcare), there may be a legal requirement to retain every ingested message — even those that fail schema validation.
  • Advanced quarantine logic: Implement custom routing based on the raw bytes (e.g., inspect the Confluent wire-format header to determine the writer schema ID, or check magic bytes).

Without access to the raw Array[Byte], users are forced to resort to fragile workarounds such as thread-local storage or wrapping the entire Spark pipeline — both of which are error-prone and hard to maintain.

Solution

This PR introduces a backward-compatible extension to the exception handler API via a new trait:

trait DeserializationExceptionHandlerWithPayload extends DeserializationExceptionHandler {
  def handleWithPayload(
    exception: Throwable,
    deserializer: AbrisAvroDeserializer,
    readerSchema: Schema,
    payload: Array[Byte]
  ): Any
}

Design decisions

Decision Rationale
New sub-trait instead of modifying the existing one Zero breaking changes. All existing DeserializationExceptionHandler implementations compile and work identically.
Runtime match dispatch instead of default method Scala 2.12 compatibility (no default methods in traits with binary compat guarantees). Clean pattern matching is idiomatic Scala.
No shared mutable state The binary variable is already a local val inside nullSafeEval. It is passed directly to the handler — no thread-locals, no instance fields, no side channels.
extends DeserializationExceptionHandler The new trait IS-A DeserializationExceptionHandler, so it works seamlessly with the existing withExceptionHandler(handler) configuration API. No config changes needed.

Changes

New file: DeserializationExceptionHandlerWithPayload.scala

Path: src/main/scala/za/co/absa/abris/avro/errors/DeserializationExceptionHandlerWithPayload.scala

A new trait in the za.co.absa.abris.avro.errors package that:

  • Extends DeserializationExceptionHandler (and therefore Serializable)
  • Adds handleWithPayload(exception, deserializer, readerSchema, payload)
  • Includes full Scaladoc explaining purpose and parameters

Modified: AvroDataToCatalyst.scala

Path: src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala

Two minimal changes:

  1. Import (line 27): Added DeserializationExceptionHandlerWithPayload to the existing import.

  2. Catch block in nullSafeEval (lines 87-92): Changed from:

    case NonFatal(e) => deserializationHandler.handle(e, deserializer, readerSchema)

    to:

    case NonFatal(e) => deserializationHandler match {
      case h: DeserializationExceptionHandlerWithPayload =>
        h.handleWithPayload(e, deserializer, readerSchema, binary)
      case h =>
        h.handle(e, deserializer, readerSchema)
    }

    The binary val is already in scope (line 77: val binary = input.asInstanceOf[Array[Byte]]), so no new state or allocations are needed.

New file: DeserializationExceptionHandlerWithPayloadSpec.scala

Path: src/test/scala/za/co/absa/abris/avro/errors/DeserializationExceptionHandlerWithPayloadSpec.scala

Three test cases:

Test What it verifies
"receive the raw payload on deserialization failure" handleWithPayload receives the exact Array[Byte] passed to it
"still work as a DeserializationExceptionHandler" A DeserializationExceptionHandlerWithPayload is assignable to the base type and handle() remains callable
"dispatch to handleWithPayload when handler extends the new trait" Simulates the exact dispatch logic from AvroDataToCatalyst — a payload-aware handler routes to handleWithPayload, while a legacy handler routes to handle

Usage example

Existing users — no changes required:

import za.co.absa.abris.avro.functions.from_avro

val df = kafkaDf.select(from_avro(col("value"), abrisConfig) as "data")
// FailFastExceptionHandler is still the default, behavior unchanged

New users who need the raw payload (DLQ pattern with schema metadata in headers):

import za.co.absa.abris.avro.errors.DeserializationExceptionHandlerWithPayload

class DlqExceptionHandler(dlqProducer: KafkaProducer[Array[Byte], Array[Byte]])
  extends DeserializationExceptionHandlerWithPayload with Logging {

  override def handleWithPayload(
    exception: Throwable,
    deserializer: AbrisAvroDeserializer,
    readerSchema: Schema,
    payload: Array[Byte]
  ): Any = {
    logWarning("Sending malformed record to DLQ", exception)

    // Send raw payload to DLQ with schema and error context in headers
    val record = new ProducerRecord[Array[Byte], Array[Byte]]("dlq-topic", payload)
    record.headers().add("readerSchema", readerSchema.toString.getBytes("UTF-8"))
    record.headers().add("errorMessage", exception.getMessage.getBytes("UTF-8"))
    record.headers().add("errorClass", exception.getClass.getName.getBytes("UTF-8"))
    dlqProducer.send(record)

    // Return a null-filled row so the Spark pipeline keeps running
    val emptyRecord = new GenericData.Record(readerSchema)
    deserializer.deserialize(emptyRecord)
  }

  override def handle(
    exception: Throwable,
    deserializer: AbrisAvroDeserializer,
    readerSchema: Schema
  ): Any = {
    // Fallback when payload is not available
    logWarning("Malformed record detected (no payload available)", exception)
    val emptyRecord = new GenericData.Record(readerSchema)
    deserializer.deserialize(emptyRecord)
  }
}

// Wire it via the existing config API — no new config needed
val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy("my-topic")
  .usingSchemaRegistry("http://schema-registry:8081")
  .withExceptionHandler(new DlqExceptionHandler(producer))

Backward compatibility

  • Source compatible: No existing signatures changed. All existing handler implementations compile without modification.
  • Binary compatible: No existing class/trait bytecode altered. The new trait is purely additive.
  • Behavioral compatible: The dispatch fallback (case h => h.handle(...)) guarantees that FailFastExceptionHandler, PermissiveRecordExceptionHandler, and SpecificRecordExceptionHandler behave exactly as before.
  • Config compatible: withExceptionHandler() accepts DeserializationExceptionHandler, and the new trait extends it, so no API changes needed.

Test plan

  • New DeserializationExceptionHandlerWithPayloadSpec — 3 tests passing
  • Existing FailFastExceptionHandlerSpec — unchanged, passing
  • Existing PermissiveRecordExceptionHandlerSpec — unchanged, passing
  • Existing SpecificRecordExceptionHandlerSpec — unchanged, passing
  • Existing AvroDataToCatalystSpec integration tests — unchanged, passing
  • Full mvn test65 tests passed, 0 failed

🤖 Generated with Claude Code

Introduce DeserializationExceptionHandlerWithPayload trait that extends
DeserializationExceptionHandler to provide access to the raw Array[Byte]
payload when deserialization fails. This enables DLQ patterns, failed
record auditing, and compliance use cases.

The dispatch in AvroDataToCatalyst uses pattern matching: if the handler
implements the new trait, handleWithPayload is called; otherwise the
existing handle method is used as fallback. No breaking API changes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant