From 672a8fe8f04b63d1fc335663dc968ca478a9058b Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:29:25 -0700 Subject: [PATCH 1/5] fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath parse error), only that record should be routed to DLQ — not the entire batch. Previously, SinkRecordTransformer.transform() had no per-record error handling, so one malformed record would abort transformation of all records in the batch. Changes: - SinkRecordTransformer: Add per-record try-catch in transform(). Accept ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ when available, skip when tolerance is ALL, rethrow when tolerance is NONE. - CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix written-record bookkeeping to count only successfully transformed records. Fixes Azure/azure-sdk-for-java#49268 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .gitignore | 1 + .../implementation/sink/CosmosSinkTask.java | 7 +- .../sink/SinkRecordTransformer.java | 85 +++++++++++++------ 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 787fe08c9a86..ca7abdd4bb4d 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,5 @@ TempTypeSpecFiles/ # Azure Artifacts Credential Provider runtime .azure-artifacts/ +.coding-harness/ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 5c5d0bd0150b..2db4aede492f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -50,7 +50,10 @@ public void start(Map props) { this.sinkTaskConfig.getClientMetadataCachesSnapshot()); LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClientItem = this.getThroughputControlCosmosClient(); - this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig); + this.sinkRecordTransformer = new SinkRecordTransformer( + this.sinkTaskConfig, + this.context.errantRecordReporter(), + this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) { this.cosmosWriter = @@ -129,7 +132,7 @@ record -> this.sinkTaskConfig List transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue()); this.cosmosWriter.write(container, transformedRecords); - totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum); + totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum); } logWrittenRecordCount(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index e600b95041b8..970215c04a71 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -11,6 +11,7 @@ import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy; import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +25,16 @@ public class SinkRecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class); private final IdStrategy idStrategy; + private final ErrantRecordReporter errantRecordReporter; + private final ToleranceOnErrorLevel toleranceOnErrorLevel; - public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) { + public SinkRecordTransformer( + CosmosSinkTaskConfig sinkTaskConfig, + ErrantRecordReporter errantRecordReporter, + ToleranceOnErrorLevel toleranceOnErrorLevel) { this.idStrategy = this.createIdStrategy(sinkTaskConfig); + this.errantRecordReporter = errantRecordReporter; + this.toleranceOnErrorLevel = toleranceOnErrorLevel; } @SuppressWarnings("unchecked") @@ -44,30 +52,59 @@ public List transform(String containerName, List sinkRec record.value() == null ? null : record.value().getClass().getName(), record.value() == null ? null : record.valueSchema()); - Object recordValue; - if (record.value() instanceof Struct) { - recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); - } else if (record.value() instanceof Map) { - recordValue = StructToJsonMap.handleMap((Map) record.value()); - } else { - recordValue = record.value(); + try { + Object recordValue; + if (record.value() instanceof Struct) { + recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); + } else if (record.value() instanceof Map) { + recordValue = StructToJsonMap.handleMap((Map) record.value()); + } else { + recordValue = record.value(); + } + + maybeInsertId(recordValue, record); + + final SinkRecord updatedRecord = new SinkRecord(record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + recordValue, + record.kafkaOffset(), + record.timestamp(), + record.timestampType(), + record.headers()); + + toBeWrittenRecordList.add(updatedRecord); + } catch (Exception e) { + LOGGER.warn( + "Failed to transform record from topic {}, partition {}, offset {}, container {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName, + e); + if (this.errantRecordReporter != null) { + this.errantRecordReporter.report(record, e); + } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { + LOGGER.warn( + "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " + + "and ToleranceOnErrorLevel is ALL.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName); + } else { + LOGGER.error( + "Failing record from topic {}, partition {}, offset {}, container {} because no DLQ reporter " + + "is configured and ToleranceOnErrorLevel is NONE.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName); + throw e; + } } - - maybeInsertId(recordValue, record); - - // Create an updated record with from the current record and the updated record value - final SinkRecord updatedRecord = new SinkRecord(record.topic(), - record.kafkaPartition(), - record.keySchema(), - record.key(), - record.valueSchema(), - recordValue, - record.kafkaOffset(), - record.timestamp(), - record.timestampType(), - record.headers()); - - toBeWrittenRecordList.add(updatedRecord); } return toBeWrittenRecordList; From cc8e58a51fb5c6c9ef4cfb243e2b84b8019089b7 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:34:34 -0700 Subject: [PATCH 2/5] test: add SinkRecordTransformer per-record error isolation tests Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow, all-valid regression, all-bad with reporter, reporter precedence over tolerance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../sink/SinkRecordTransformerTest.java | 287 ++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java new file mode 100644 index 000000000000..52d8e04aba79 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -0,0 +1,287 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementation.sink; + +import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.IdStrategy; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +public class SinkRecordTransformerTest { + private static final int TIMEOUT = 60000; + + /** + * Creates a SinkRecordTransformer bypassing the constructor that needs CosmosSinkTaskConfig. + * Uses Mockito with CALLS_REAL_METHODS so the transform() method executes real code, + * then injects the fields via reflection. + */ + private SinkRecordTransformer createTransformer( + IdStrategy idStrategy, + ErrantRecordReporter reporter, + ToleranceOnErrorLevel toleranceLevel) throws Exception { + + SinkRecordTransformer transformer = Mockito.mock( + SinkRecordTransformer.class, + withSettings().defaultAnswer(CALLS_REAL_METHODS)); + FieldUtils.writeField(transformer, "idStrategy", idStrategy, true); + FieldUtils.writeField(transformer, "errantRecordReporter", reporter, true); + FieldUtils.writeField(transformer, "toleranceOnErrorLevel", toleranceLevel, true); + return transformer; + } + + /** + * Creates a SinkRecord with a Map value containing the given fields. + */ + private SinkRecord createMapRecord(String topic, int partition, long offset, Map value) { + return new SinkRecord(topic, partition, null, "key-" + offset, null, value, offset); + } + + /** + * Creates an IdStrategy that fails (throws ConnectException) when generating an ID + * for records whose value map contains a field "fail" set to true, + * and returns a valid ID otherwise. + */ + private IdStrategy createSelectivelyFailingIdStrategy() { + IdStrategy idStrategy = Mockito.mock(IdStrategy.class); + when(idStrategy.generateId(any(SinkRecord.class))).thenAnswer(invocation -> { + SinkRecord record = invocation.getArgument(0); + Object value = record.value(); + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map map = (Map) value; + if (Boolean.TRUE.equals(map.get("fail"))) { + throw new ConnectException("Cannot generate ID: missing required field"); + } + } + return "generated-id-" + record.kafkaOffset(); + }); + return idStrategy; + } + + // ============================================================ + // T1: Mixed batch with reporter — bad record goes to DLQ, valid records in output + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map goodValue1 = new HashMap<>(); + goodValue1.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + Map goodValue2 = new HashMap<>(); + goodValue2.put("data", "world"); + + List batch = Arrays.asList( + createMapRecord("topic1", 0, 0L, goodValue1), + createMapRecord("topic1", 0, 1L, badValue), + createMapRecord("topic1", 0, 2L, goodValue2) + ); + + // Act + List result = transformer.transform("container1", batch); + + // Assert — only 2 valid records in output + assertThat(result.size()).isEqualTo(2); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-0"); + assertThat(((Map) result.get(1).value()).get("id")).isEqualTo("generated-id-2"); + + // Assert — reporter called exactly once with the bad record + ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(SinkRecord.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(reporter, times(1)).report(recordCaptor.capture(), errorCaptor.capture()); + assertThat(recordCaptor.getValue().kafkaOffset()).isEqualTo(1L); + assertThat(errorCaptor.getValue()).isInstanceOf(ConnectException.class); + } + + // ============================================================ + // T2: Mixed batch with tolerance ALL, no reporter — bad record skipped + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); + + Map goodValue = new HashMap<>(); + goodValue.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicA", 1, 10L, goodValue), + createMapRecord("topicA", 1, 11L, badValue) + ); + + // Act — should NOT throw + List result = transformer.transform("container2", batch); + + // Assert — only 1 valid record + assertThat(result.size()).isEqualTo(1); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-10"); + } + + // ============================================================ + // T3: Mixed batch with tolerance NONE, no reporter — exception thrown + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void mixedBatchToleranceNone_noReporter_exceptionThrown() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); + + Map goodValue = new HashMap<>(); + goodValue.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicB", 2, 20L, goodValue), + createMapRecord("topicB", 2, 21L, badValue) + ); + + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container3", batch)); + + // Assert — exception is thrown (fail-fast preserved) + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); + } + + // ============================================================ + // T4: All records valid — no errors, all records in output (regression) + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void allValidRecords_allInOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map value1 = new HashMap<>(); + value1.put("data", "a"); + Map value2 = new HashMap<>(); + value2.put("data", "b"); + Map value3 = new HashMap<>(); + value3.put("data", "c"); + + List batch = Arrays.asList( + createMapRecord("topicC", 0, 100L, value1), + createMapRecord("topicC", 0, 101L, value2), + createMapRecord("topicC", 0, 102L, value3) + ); + + // Act + List result = transformer.transform("container4", batch); + + // Assert — all 3 records in output + assertThat(result.size()).isEqualTo(3); + for (int i = 0; i < 3; i++) { + assertThat(((Map) result.get(i).value()).get("id")) + .isEqualTo("generated-id-" + (100 + i)); + } + + // Assert — reporter never called + verify(reporter, never()).report(any(), any()); + } + + // ============================================================ + // T5: All records bad with reporter — all reported to DLQ, empty output + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map bad1 = new HashMap<>(); + bad1.put("fail", true); + Map bad2 = new HashMap<>(); + bad2.put("fail", true); + Map bad3 = new HashMap<>(); + bad3.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicD", 0, 50L, bad1), + createMapRecord("topicD", 0, 51L, bad2), + createMapRecord("topicD", 0, 52L, bad3) + ); + + // Act + List result = transformer.transform("container5", batch); + + // Assert — empty output + assertThat(result.size()).isEqualTo(0); + + // Assert — reporter called 3 times + verify(reporter, times(3)).report(any(SinkRecord.class), any(ConnectException.class)); + } + + // ============================================================ + // T6: Reporter takes precedence over tolerance NONE — when reporter is present, + // report instead of throwing even when tolerance is NONE + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void reporterTakesPrecedenceOverToleranceNone() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + // Tolerance is NONE but reporter is available + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicE", 0, 0L, badValue) + ); + + // Act — should NOT throw because reporter handles it + Throwable thrown = catchThrowable(() -> transformer.transform("container6", batch)); + + // Assert + assertThat(thrown).isNull(); + verify(reporter, times(1)).report(any(SinkRecord.class), any(ConnectException.class)); + } +} From d49548cda4409ec52125eb2cce202a03b451b500 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:47:19 -0700 Subject: [PATCH 3/5] Guard ErrantRecordReporter.report() against exceptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ reporter failures from collapsing the entire batch. When the reporter throws: - ToleranceOnErrorLevel.ALL: log and continue (skip the bad record) - ToleranceOnErrorLevel.NONE: rethrow the original transform exception Add T6/T7 tests covering both scenarios. Renumber T6→T8. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../sink/SinkRecordTransformer.java | 15 ++++- .../sink/SinkRecordTransformerTest.java | 65 ++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 970215c04a71..13d5d6dcffff 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -85,7 +85,20 @@ public List transform(String containerName, List sinkRec containerName, e); if (this.errantRecordReporter != null) { - this.errantRecordReporter.report(record, e); + try { + this.errantRecordReporter.report(record, e); + } catch (Exception reportException) { + LOGGER.error( + "Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName, + reportException); + if (this.toleranceOnErrorLevel != ToleranceOnErrorLevel.ALL) { + throw e; + } + } } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { LOGGER.warn( "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index 52d8e04aba79..54dcd50ced42 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -256,7 +256,70 @@ public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception } // ============================================================ - // T6: Reporter takes precedence over tolerance NONE — when reporter is present, + // T6: Reporter itself throws — with tolerance NONE, original exception rethrown + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void reporterThrows_toleranceNone_originalExceptionRethrown() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))) + .thenThrow(new ConnectException("DLQ write failed")); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + Map goodValue = new HashMap<>(); + goodValue.put("data", "after-bad"); + + List batch = Arrays.asList( + createMapRecord("topicF", 0, 0L, badValue), + createMapRecord("topicF", 0, 1L, goodValue) + ); + + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container7", batch)); + + // Assert — original transform exception, NOT the DLQ exception + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); + } + + // ============================================================ + // T7: Reporter itself throws — with tolerance ALL, record skipped and processing continues + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))) + .thenThrow(new ConnectException("DLQ write failed")); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + Map goodValue = new HashMap<>(); + goodValue.put("data", "survives"); + + List batch = Arrays.asList( + createMapRecord("topicG", 0, 0L, badValue), + createMapRecord("topicG", 0, 1L, goodValue) + ); + + // Act — should NOT throw + List result = transformer.transform("container8", batch); + + // Assert — only the good record survives + assertThat(result.size()).isEqualTo(1); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-1"); + } + + // ============================================================ + // T8: Reporter takes precedence over tolerance NONE — when reporter is present, // report instead of throwing even when tolerance is NONE // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) From 82946c14385c66f22624404a1c03143ae92311a9 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:51:49 -0700 Subject: [PATCH 4/5] Align transformer error handling with writer pattern; harden MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings: - Align DLQ/tolerance precedence with writer pattern: DLQ report is fire-and-forget side-effect, tolerance level controls continue-vs-throw. With tolerance=NONE + reporter, record is reported AND task fails. - Guard context.errantRecordReporter() against older Kafka Connect runtimes that lack the API (catch NoClassDefFoundError/NoSuchMethodError). - Add package-private constructor for testability (eliminates reflection). - Consolidate double-logging: one log entry per failed record. - Rewrite tests to use package-private constructor and align with new semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/sink/CosmosSinkTask.java | 12 +++- .../sink/SinkRecordTransformer.java | 42 ++++++----- .../sink/SinkRecordTransformerTest.java | 69 +++++++------------ 3 files changed, 59 insertions(+), 64 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 2db4aede492f..7a47678ba3a7 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -50,9 +50,19 @@ public void start(Map props) { this.sinkTaskConfig.getClientMetadataCachesSnapshot()); LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClientItem = this.getThroughputControlCosmosClient(); + + ErrantRecordReporter errantRecordReporter = null; + try { + errantRecordReporter = this.context.errantRecordReporter(); + } catch (NoClassDefFoundError | NoSuchMethodError e) { + LOGGER.info( + "ErrantRecordReporter not available in this Kafka Connect runtime; " + + "DLQ will not be used for transform errors."); + } + this.sinkRecordTransformer = new SinkRecordTransformer( this.sinkTaskConfig, - this.context.errantRecordReporter(), + errantRecordReporter, this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 13d5d6dcffff..7cc13b2cf445 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -32,7 +32,15 @@ public SinkRecordTransformer( CosmosSinkTaskConfig sinkTaskConfig, ErrantRecordReporter errantRecordReporter, ToleranceOnErrorLevel toleranceOnErrorLevel) { - this.idStrategy = this.createIdStrategy(sinkTaskConfig); + this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel); + } + + // Package-private constructor for unit testing without requiring CosmosSinkTaskConfig. + SinkRecordTransformer( + IdStrategy idStrategy, + ErrantRecordReporter errantRecordReporter, + ToleranceOnErrorLevel toleranceOnErrorLevel) { + this.idStrategy = idStrategy; this.errantRecordReporter = errantRecordReporter; this.toleranceOnErrorLevel = toleranceOnErrorLevel; } @@ -77,13 +85,8 @@ public List transform(String containerName, List sinkRec toBeWrittenRecordList.add(updatedRecord); } catch (Exception e) { - LOGGER.warn( - "Failed to transform record from topic {}, partition {}, offset {}, container {}.", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset(), - containerName, - e); + // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). + // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). if (this.errantRecordReporter != null) { try { this.errantRecordReporter.report(record, e); @@ -95,26 +98,27 @@ public List transform(String containerName, List sinkRec record.kafkaOffset(), containerName, reportException); - if (this.toleranceOnErrorLevel != ToleranceOnErrorLevel.ALL) { - throw e; - } } - } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { + } + + // Use tolerance level to decide continue-vs-throw (consistent with writer pattern). + if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { LOGGER.warn( - "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " - + "and ToleranceOnErrorLevel is ALL.", + "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.", record.topic(), record.kafkaPartition(), record.kafkaOffset(), - containerName); + containerName, + e); } else { LOGGER.error( - "Failing record from topic {}, partition {}, offset {}, container {} because no DLQ reporter " - + "is configured and ToleranceOnErrorLevel is NONE.", + "Failing task due to transform error for record from topic {}, partition {}, offset {}, " + + "container {}.", record.topic(), record.kafkaPartition(), record.kafkaOffset(), - containerName); + containerName, + e); throw e; } } @@ -132,7 +136,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) { recordMap.put("id", this.idStrategy.generateId(sinkRecord)); } - private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { + private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { IdStrategy idStrategyClass; switch (sinkTaskConfig.getIdStrategy()) { case FULL_KEY_STRATEGY: diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index 54dcd50ced42..e2fe731972c4 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -4,7 +4,6 @@ package com.azure.cosmos.kafka.connect.implementation.sink; import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.IdStrategy; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -21,35 +20,14 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; public class SinkRecordTransformerTest { private static final int TIMEOUT = 60000; - /** - * Creates a SinkRecordTransformer bypassing the constructor that needs CosmosSinkTaskConfig. - * Uses Mockito with CALLS_REAL_METHODS so the transform() method executes real code, - * then injects the fields via reflection. - */ - private SinkRecordTransformer createTransformer( - IdStrategy idStrategy, - ErrantRecordReporter reporter, - ToleranceOnErrorLevel toleranceLevel) throws Exception { - - SinkRecordTransformer transformer = Mockito.mock( - SinkRecordTransformer.class, - withSettings().defaultAnswer(CALLS_REAL_METHODS)); - FieldUtils.writeField(transformer, "idStrategy", idStrategy, true); - FieldUtils.writeField(transformer, "errantRecordReporter", reporter, true); - FieldUtils.writeField(transformer, "toleranceOnErrorLevel", toleranceLevel, true); - return transformer; - } - /** * Creates a SinkRecord with a Map value containing the given fields. */ @@ -80,18 +58,18 @@ private IdStrategy createSelectivelyFailingIdStrategy() { } // ============================================================ - // T1: Mixed batch with reporter — bad record goes to DLQ, valid records in output + // T1: Mixed batch with reporter + tolerance ALL — bad record goes to DLQ, valid records in output // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) @SuppressWarnings("unchecked") - public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throws Exception { + public void mixedBatchWithReporterToleranceAll_badRecordReportedValidRecordsInOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map goodValue1 = new HashMap<>(); goodValue1.put("data", "hello"); @@ -132,7 +110,7 @@ public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throw public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); - SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); Map goodValue = new HashMap<>(); goodValue.put("data", "hello"); @@ -154,13 +132,13 @@ public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exceptio } // ============================================================ - // T3: Mixed batch with tolerance NONE, no reporter — exception thrown + // T3: Mixed batch with tolerance NONE, no reporter — exception thrown (fail-fast) // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) public void mixedBatchToleranceNone_noReporter_exceptionThrown() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); - SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); Map goodValue = new HashMap<>(); goodValue.put("data", "hello"); @@ -190,7 +168,7 @@ public void allValidRecords_allInOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map value1 = new HashMap<>(); value1.put("data", "a"); @@ -220,17 +198,17 @@ public void allValidRecords_allInOutput() throws Exception { } // ============================================================ - // T5: All records bad with reporter — all reported to DLQ, empty output + // T5: All records bad with reporter + tolerance ALL — all reported to DLQ, empty output // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) - public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception { + public void allBadRecordsWithReporterToleranceAll_allReportedEmptyOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map bad1 = new HashMap<>(); bad1.put("fail", true); @@ -266,7 +244,7 @@ public void reporterThrows_toleranceNone_originalExceptionRethrown() throws Exce when(reporter.report(any(SinkRecord.class), any(Throwable.class))) .thenThrow(new ConnectException("DLQ write failed")); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map badValue = new HashMap<>(); badValue.put("fail", true); @@ -298,7 +276,7 @@ public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throw when(reporter.report(any(SinkRecord.class), any(Throwable.class))) .thenThrow(new ConnectException("DLQ write failed")); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map badValue = new HashMap<>(); badValue.put("fail", true); @@ -319,32 +297,35 @@ public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throw } // ============================================================ - // T8: Reporter takes precedence over tolerance NONE — when reporter is present, - // report instead of throwing even when tolerance is NONE + // T8: Tolerance NONE with reporter — record reported to DLQ AND exception thrown + // (consistent with writer-level pattern: DLQ is side-effect, tolerance controls flow) // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) - public void reporterTakesPrecedenceOverToleranceNone() throws Exception { + public void toleranceNoneWithReporter_reportedToDlqAndExceptionThrown() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - // Tolerance is NONE but reporter is available - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + // Tolerance is NONE — task should fail even though reporter is available + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map badValue = new HashMap<>(); badValue.put("fail", true); List batch = Arrays.asList( - createMapRecord("topicE", 0, 0L, badValue) + createMapRecord("topicH", 0, 0L, badValue) ); - // Act — should NOT throw because reporter handles it - Throwable thrown = catchThrowable(() -> transformer.transform("container6", batch)); + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container9", batch)); + + // Assert — exception IS thrown (tolerance NONE means fail) + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); - // Assert - assertThat(thrown).isNull(); + // Assert — reporter WAS called (DLQ is side-effect for observability) verify(reporter, times(1)).report(any(SinkRecord.class), any(ConnectException.class)); } } From 4ac387bf70f4e482224f03e6afd08b26bf834961 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 15:41:58 -0700 Subject: [PATCH 5/5] fix: catch RuntimeException instead of Exception to fix compilation Change catch clause from Exception (checked) to RuntimeException (unchecked) since transform() doesn't declare throws Exception. ConnectException and all other exceptions from ID strategies are RuntimeException subclasses. This fixes the CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../connect/implementation/sink/SinkRecordTransformer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 7cc13b2cf445..159c8364ff78 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -84,7 +84,7 @@ public List transform(String containerName, List sinkRec record.headers()); toBeWrittenRecordList.add(updatedRecord); - } catch (Exception e) { + } catch (RuntimeException e) { // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). if (this.errantRecordReporter != null) {