Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ cosmos_org.apache.kafka:connect-runtime;3.6.0
cosmos_org.testcontainers:testcontainers;1.21.4
cosmos_org.testcontainers:kafka;1.21.4
cosmos_org.sourcelab:kafka-connect-client;4.0.4
cosmos_io.confluent:kafka-avro-serializer;7.6.0
cosmos_org.apache.avro:avro;1.11.4
cosmos_org.apache.logging.log4j:log4j-api;2.25.3
cosmos_org.apache.logging.log4j:log4j-core;2.25.3
Expand Down
16 changes: 0 additions & 16 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ Licensed under the MIT License.
</site>
</distributionManagement>

<repositories>
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<scm>
<url>scm:git:https://github.com/Azure/azure-sdk-for-java</url>
<connection>scm:git:git@github.com:Azure/azure-sdk-for-java.git</connection>
Expand Down Expand Up @@ -237,13 +229,6 @@ Licensed under the MIT License.
<version>4.0.4</version> <!-- {x-version-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version><!-- {x-version-update;cosmos_io.confluent:kafka-avro-serializer;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -417,7 +402,6 @@ Licensed under the MIT License.
<artifactSet>
<excludes>
<exclude>org.slf4j</exclude>
<exclude>io.confluent:*</exclude>
<exclude>org.apache.kafka:*</exclude>
</excludes>
</artifactSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.cosmos.kafka.connect.implementation.sink.IdStrategyType;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -286,8 +285,8 @@ public void postAvroMessage() throws InterruptedException {
kafkaCosmosConnectContainer.registerConnector(connectorName, sinkConnectorConfig);

Properties producerProperties = kafkaCosmosConnectContainer.getProducerProperties();
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
KafkaProducer<GenericRecord, GenericRecord> kafkaProducer = new KafkaProducer<>(producerProperties);

// first create few records in the topic
Expand Down Expand Up @@ -364,8 +363,8 @@ public void postAvroMessageWithTemplateIdStrategy() throws InterruptedException
kafkaCosmosConnectContainer.registerConnector(connectorName, sinkConnectorConfig);

Properties producerProperties = kafkaCosmosConnectContainer.getProducerProperties();
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
KafkaProducer<GenericRecord, GenericRecord> kafkaProducer = new KafkaProducer<>(producerProperties);

logger.info("Creating sink record...");
Expand Down Expand Up @@ -432,8 +431,8 @@ public void postAvroMessageWithJsonPathInProvidedInKeyStrategy() throws Interrup
kafkaCosmosConnectContainer.registerConnector(connectorName, sinkConnectorConfig);

Properties producerProperties = kafkaCosmosConnectContainer.getProducerProperties();
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TestAvroSerializer.class.getName());
KafkaProducer<GenericRecord, GenericRecord> kafkaProducer = new KafkaProducer<>(producerProperties);

logger.info("Creating sink record...");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;

/**
* A lightweight Avro serializer for integration tests that produces the Confluent wire format
* (magic byte 0x0 + 4-byte schema ID + Avro binary data) without requiring the
* io.confluent:kafka-avro-serializer dependency.
*
* This serializer registers schemas with a Schema Registry via its REST API and caches
* the resulting schema IDs.
*/
public class TestAvroSerializer implements Serializer<GenericRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(TestAvroSerializer.class);
private static final byte MAGIC_BYTE = 0x0;

private String schemaRegistryUrl;
private String basicAuthHeader;
private boolean isKey;
private final Map<String, Integer> schemaIdCache = new ConcurrentHashMap<>();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
this.schemaRegistryUrl = (String) configs.get("schema.registry.url");
if (this.schemaRegistryUrl != null && this.schemaRegistryUrl.endsWith("/")) {
this.schemaRegistryUrl = this.schemaRegistryUrl.substring(0, this.schemaRegistryUrl.length() - 1);
}

String authSource = (String) configs.get("basic.auth.credentials.source");
if ("USER_INFO".equals(authSource)) {
String userInfo = (String) configs.get("basic.auth.user.info");
if (userInfo != null) {
this.basicAuthHeader =
"Basic " + Base64.getEncoder().encodeToString(userInfo.getBytes(StandardCharsets.UTF_8));
}
}
}

@Override
public byte[] serialize(String topic, GenericRecord record) {
if (record == null) {
return null;
}

try {
String subject = topic + (isKey ? "-key" : "-value");
int schemaId = getOrRegisterSchemaId(subject, record.getSchema().toString());

ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();

return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Failed to serialize Avro record", e);
}
}

@Override
public void close() {
// No resources to close
}

private int getOrRegisterSchemaId(String subject, String schemaJson) {
String cacheKey = subject + ":" + schemaJson;
Integer cachedId = schemaIdCache.get(cacheKey);
if (cachedId != null) {
return cachedId;
}

try {
int id = registerSchema(subject, schemaJson);
schemaIdCache.put(cacheKey, id);
return id;
} catch (IOException e) {
throw new RuntimeException("Failed to register schema for subject " + subject, e);
}
}

private int registerSchema(String subject, String schemaJson) throws IOException {
String url = schemaRegistryUrl + "/subjects/" + subject + "/versions";
LOGGER.info("Registering schema for subject {} at {}", subject, url);

String escapedSchema = schemaJson.replace("\\", "\\\\").replace("\"", "\\\"");
String requestBody = "{\"schema\": \"" + escapedSchema + "\"}";

HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
try {
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/vnd.schemaregistry.v1+json");
conn.setRequestProperty("Accept", "application/vnd.schemaregistry.v1+json");
if (basicAuthHeader != null) {
conn.setRequestProperty("Authorization", basicAuthHeader);
}
conn.setDoOutput(true);

try (OutputStream os = conn.getOutputStream()) {
os.write(requestBody.getBytes(StandardCharsets.UTF_8));
}

int responseCode = conn.getResponseCode();
if (responseCode != 200) {
String errorBody = readStream(conn.getErrorStream());
throw new IOException(
"Schema registration failed with HTTP " + responseCode + ": " + errorBody);
}

String response = readStream(conn.getInputStream());
return parseSchemaId(response);
} finally {
conn.disconnect();
}
}

private static String readStream(InputStream stream) {
if (stream == null) {
return "";
}
try (Scanner scanner = new Scanner(stream, "UTF-8")) {
return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
}
}

private static int parseSchemaId(String response) throws IOException {
// Response format: {"id": N} - parse the id value
int idIndex = response.indexOf("\"id\"");
if (idIndex == -1) {
throw new IOException("No schema ID found in response: " + response);
}
int colonPos = response.indexOf(':', idIndex);
int commaPos = response.indexOf(',', colonPos);
int bracePos = response.indexOf('}', colonPos);
int endPos;
if (commaPos == -1) {
endPos = bracePos;
} else {
endPos = Math.min(commaPos, bracePos);
}
return Integer.parseInt(response.substring(colonPos + 1, endPos).trim());
}
}
Loading