I dig into the Camus code and found that, Camus catches the exception (decoder exception) thrown from decoder.decode(payload) (getWrappedRecord() method) and if skipSchemaErrors is false, it re-throws it as an IOException. However, nextKeyValue() method catches this Exception again and handled it by writing that exception and key to mapper context and continuing back to normal execution.
if (exceptionCount < getMaximumDecoderExceptionsToPrint(context)) {
mapperContext.write(key, new ExceptionWritable(e));
exceptionCount++;
} else if (exceptionCount == getMaximumDecoderExceptionsToPrint(context)) {
exceptionCount = Integer.MAX_VALUE; //Any random value
log.info("The same exception has occured for more than " + getMaximumDecoderExceptionsToPrint(context)
+ " records. All further exceptions will not be printed");
}
}
This is where data loss is happening. In the nextKeyValue() method, Kafka consumer reads a message from Kafka and increments the current offset and while decoding this message, if there is any exception, current code-base silently eating up that exception and moving to next message from Kafka.
Camus job is finally in succeeded state and updating its offsets to offset file in HDFS.
I think, when skipSchemaErrors is false, Camus job should fail if there are any decoder exceptions, otherwise it violates the definition of the property etl.ignore.schema.errors and also causes data loss.