[NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268)#49269
[NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268)#49269xinlian12 wants to merge 5 commits into
Conversation
…mer (#49268) When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath parse error), only that record should be routed to DLQ — not the entire batch. Previously, SinkRecordTransformer.transform() had no per-record error handling, so one malformed record would abort transformation of all records in the batch. Changes: - SinkRecordTransformer: Add per-record try-catch in transform(). Accept ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ when available, skip when tolerance is ALL, rethrow when tolerance is NONE. - CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix written-record bookkeeping to count only successfully transformed records. Fixes #49268 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow, all-valid regression, all-bad with reporter, reporter precedence over tolerance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ reporter failures from collapsing the entire batch. When the reporter throws: - ToleranceOnErrorLevel.ALL: log and continue (skip the bad record) - ToleranceOnErrorLevel.NONE: rethrow the original transform exception Add T6/T7 tests covering both scenarios. Renumber T6→T8. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Address review findings: - Align DLQ/tolerance precedence with writer pattern: DLQ report is fire-and-forget side-effect, tolerance level controls continue-vs-throw. With tolerance=NONE + reporter, record is reported AND task fails. - Guard context.errantRecordReporter() against older Kafka Connect runtimes that lack the API (catch NoClassDefFoundError/NoSuchMethodError). - Add package-private constructor for testability (eliminates reflection). - Consolidate double-logging: one log entry per failed record. - Rewrite tests to use package-private constructor and align with new semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR improves the Cosmos DB Kafka sink’s robustness by isolating per-record failures during transformation (notably ID generation) so that a single malformed SinkRecord doesn’t fail the entire batch and unnecessarily route all records to the DLQ.
Changes:
- Added per-record error handling in
SinkRecordTransformer.transform()with optional DLQ reporting and tolerance-based behavior. - Updated
CosmosSinkTaskto passErrantRecordReporter+ToleranceOnErrorLevelinto the transformer and to count only successfully transformed records. - Added a dedicated unit test suite validating mixed/invalid batches, DLQ reporter behavior, and tolerance modes.
Show a summary per file
| File | Description |
|---|---|
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java |
Adds per-record try/catch around transformation and routes failures to DLQ or tolerance logic. |
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java |
Wires DLQ reporter/tolerance into transformer and fixes per-container written-record counting post-filtering. |
sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java |
Introduces unit tests covering tolerant vs fail-fast behavior and DLQ reporter success/failure scenarios. |
.gitignore |
Ignores .coding-harness/ directory. |
Copilot's findings
- Files reviewed: 3/4 changed files
- Comments generated: 2
| record.topic(), | ||
| record.kafkaPartition(), | ||
| record.kafkaOffset(), | ||
| containerName, | ||
| reportException); |
There was a problem hiding this comment.
Fixed in 4ac387b — changed the outer catch to catch(RuntimeException e) since all exceptions from ID strategies (ConnectException, etc.) are RuntimeException subclasses.
| record.kafkaOffset(), | ||
| containerName, | ||
| e); | ||
| throw e; |
There was a problem hiding this comment.
Fixed in 4ac387b — same fix as above, outer catch changed to catch(RuntimeException e).
Change catch clause from Exception (checked) to RuntimeException (unchecked) since transform() doesn't declare throws Exception. ConnectException and all other exceptions from ID strategies are RuntimeException subclasses. This fixes the CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
|
||
| ErrantRecordReporter errantRecordReporter = null; | ||
| try { | ||
| errantRecordReporter = this.context.errantRecordReporter(); |
There was a problem hiding this comment.
do we really need this check? from public API -> this.context.errantRecordReporter() returned ErrantRecordReporter. I think it should be safe to just pass in other SinkRecordTransformer, similar as the writer pattern
Fix: Single record failing ID parsing should not fail entire batch (#49268)
Problem
When a single
SinkRecordin a batch fails during ID generation (e.g., due to an invalid JsonPath inProvidedInStrategy.generateId()), the entire batch fails and all records are routed to the DLQ — not just the malformed record.Root cause:
SinkRecordTransformer.transform()lacked per-record error isolation. An exception fromidStrategy.generateId()would abort the entiretransform()call before records reached the writer-level DLQ handling inCosmosWriterBase.sendToDlqIfConfigured().Solution
Added per-record try-catch in
SinkRecordTransformer.transform()that is consistent with the writer-level pattern inCosmosWriterBase:ErrantRecordReporterif available (guarded against reporter failures)ALLskips and continues,NONEthrows (regardless of whether DLQ reporter is present)ErrantRecordReporterToleranceOnErrorLevelALLNONEnullALLnullNONEALLNONEChanges
SinkRecordTransformer.javaErrantRecordReporterandToleranceOnErrorLevelfieldsErrantRecordReporter.report()against secondary failurescreateIdStrategy()static for constructor chainCosmosSinkTask.javaerrantRecordReporterandtoleranceOnErrorLeveltoSinkRecordTransformercontext.errantRecordReporter()against older Kafka Connect runtimestransformedRecords.size()(post-filter) instead ofentry.getValue().size()(pre-filter)SinkRecordTransformerTest.java— 8 new unit tests:Fixes #49268