org.springframework.boot
spring-boot-resttestclient
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadata.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadata.java
new file mode 100644
index 0000000000..ff8e3e6e16
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.avro;
+
+/**
+ * Avro serialization metadata for a Kafka contract message.
+ *
+ *
+ * Example contract YAML:
+ * metadata:
+ * kafka:
+ * avro:
+ * schema: classpath:avro/Book.avsc
+ *
+ *
+ *
+ * The Schema Registry URL is configured globally via
+ * {@code spring.kafka.properties.schema.registry.url}.
+ *
+ * @author Emanuel Trandafir
+ * @since 4.2.0
+ */
+public final class AvroMetadata {
+
+ /**
+ * Classpath or filesystem path to the Avro schema file
+ * ({@code .avsc}), e.g. {@code classpath:avro/Book.avsc}.
+ * May also be an inline JSON schema string.
+ */
+ private String schema;
+
+ /**
+ * Returns the Avro schema path or inline schema JSON.
+ *
+ * @return the schema
+ */
+ public String getSchema() {
+ return this.schema;
+ }
+
+ /**
+ * Sets the Avro schema path or inline schema JSON.
+ *
+ * @param value the schema
+ */
+ public void setSchema(final String value) {
+ this.schema = value;
+ }
+
+}
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroContractVerifierConfiguration.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroContractVerifierConfiguration.java
new file mode 100644
index 0000000000..8fa794c59f
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroContractVerifierConfiguration.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import tools.jackson.databind.json.JsonMapper;
+
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+
+/**
+ * Auto-configuration for Avro support in Spring Cloud Contract.
+ * Activates when {@code org.apache.avro.specific.SpecificRecordBase}
+ * is on the classpath.
+ *
+ * @author Emanuel Trandafir
+ * @since 4.2.0
+ */
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnClass(name = "org.apache.avro.specific.SpecificRecordBase")
+public final class KafkaAvroContractVerifierConfiguration {
+
+ @Bean
+ @ConditionalOnMissingBean
+ ContractVerifierObjectMapper avroContractVerifierObjectMapper(
+ final ObjectProvider jsonMapper) {
+ JsonMapper mapper = jsonMapper.getIfAvailable(JsonMapper::new)
+ .rebuild()
+ .addMixIn(SpecificRecordBase.class, IgnoreAvroMixin.class)
+ .build();
+ return new ContractVerifierObjectMapper(mapper);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(name = "avroKafkaTemplate")
+ KafkaTemplate avroKafkaTemplate(
+ @Value("${spring.kafka.bootstrap-servers}")
+ final String bootstrapServers,
+ @Value("${spring.kafka.properties.schema.registry.url:}")
+ final String schemaRegistryUrl) {
+ Map props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ KafkaAvroSerializer.class);
+ props.put("schema.registry.url", schemaRegistryUrl);
+ return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ KafkaAvroMessageVerifierSender kafkaAvroMessageVerifierSender(
+ @Qualifier("avroKafkaTemplate")
+ final KafkaTemplate avroKafkaTemplate) {
+ return new KafkaAvroMessageVerifierSender(avroKafkaTemplate);
+ }
+
+ /**
+ * Jackson mixin applied to
+ * {@link org.apache.avro.specific.SpecificRecordBase} to suppress
+ * Avro-internal fields ({@code schema}, {@code specificData},
+ * {@code classSchema}, {@code conversion}) during contract body
+ * verification.
+ *
+ *
+ * {@link ContractVerifierObjectMapper} serializes received Avro
+ * records to JSON so SCC can compare them against the contract's
+ * expected body. {@code SpecificRecordBase} exposes those fields as
+ * getters, but they are Avro runtime metadata — not part of the
+ * message payload. Without this mixin, Jackson either fails on
+ * non-serializable types or includes Avro internals in the output,
+ * causing false contract mismatches.
+ */
+ @JsonIgnoreProperties({ "schema", "specificData", "classSchema",
+ "conversion" })
+ interface IgnoreAvroMixin {
+
+ }
+
+}
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSender.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSender.java
new file mode 100644
index 0000000000..1467341462
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSender.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.jspecify.annotations.Nullable;
+
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
+import org.springframework.cloud.contract.verifier.messaging.kafka.KafkaMetadata;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.kafka.core.KafkaTemplate;
+
+/**
+ * A {@link MessageVerifierSender} that Avro-serializes the contract
+ * payload before sending it to a Kafka topic. The schema is read from
+ * {@link AvroMetadata} stored under the {@code "avro"} key in the
+ * contract metadata. Missing or invalid configuration throws an
+ * exception rather than silently skipping the send.
+ *
+ *
+ * The {@link KafkaTemplate} provided at construction time must be
+ * configured with {@code KafkaAvroSerializer} as its value serializer,
+ * pointing to the Schema Registry URL declared via
+ * {@code spring.kafka.properties.schema.registry.url}. When using
+ * Spring Boot auto-configuration this is handled automatically.
+ *
+ * @author Emanuel Trandafir
+ * @since 4.2.0
+ */
+public final class KafkaAvroMessageVerifierSender
+ implements MessageVerifierSender {
+
+ /**
+ * The Kafka template used to send Avro-serialized records.
+ */
+ private final KafkaTemplate kafkaTemplate;
+
+ /**
+ * Creates a new sender backed by the given Kafka template.
+ *
+ * @param template the Kafka template configured with Avro serialization
+ */
+ public KafkaAvroMessageVerifierSender(
+ final KafkaTemplate template) {
+ this.kafkaTemplate = template;
+ }
+
+ @Override
+ public void send(final Object message, final String destination,
+ @Nullable final YamlContract contract) {
+ send(message, Map.of(), destination, contract);
+ }
+
+ @Override
+ public void send(final T payload, final Map headers,
+ final String destination, @Nullable final YamlContract contract) {
+ if (contract == null || contract.metadata == null) {
+ throw new IllegalArgumentException(
+ "Contract or its metadata is null — cannot perform"
+ + " Avro serialization for destination ["
+ + destination + "]");
+ }
+ AvroMetadata avroMetadata = KafkaMetadata
+ .fromMetadata(contract.metadata).getAvro();
+ if (avroMetadata.getSchema() == null) {
+ throw new IllegalArgumentException(
+ "No Avro schema configured in contract metadata —"
+ + " cannot perform Avro serialization"
+ + " for destination [" + destination + "]");
+ }
+ try {
+ Schema schema = parseSchema(avroMetadata.getSchema());
+ GenericRecord record = buildRecord(schema, payload);
+ ProducerRecord producerRecord =
+ new ProducerRecord<>(destination, record);
+ if (headers != null) {
+ headers.forEach((key, value) -> producerRecord.headers()
+ .add(key, value.toString()
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ this.kafkaTemplate.send(producerRecord);
+ } catch (IOException ex) {
+ throw new IllegalStateException(
+ "Failed to load Avro schema ["
+ + avroMetadata.getSchema() + "]",
+ ex);
+ }
+ }
+
+ private Schema parseSchema(final String schemaValue) throws IOException {
+ if (schemaValue.trim().startsWith("{")) {
+ return new Schema.Parser().parse(schemaValue);
+ }
+ InputStream inputStream;
+ if (schemaValue.startsWith("classpath:")) {
+ String path = schemaValue.substring("classpath:".length());
+ inputStream = new ClassPathResource(path).getInputStream();
+ } else {
+ inputStream = new FileSystemResource(schemaValue).getInputStream();
+ }
+ try (InputStream is = inputStream) {
+ return new Schema.Parser().parse(is);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private GenericRecord buildRecord(final Schema schema,
+ final Object payload) {
+ if (!(payload instanceof Map)) {
+ throw new IllegalArgumentException(
+ "Payload must be a Map to build a GenericRecord, got: "
+ + payload.getClass());
+ }
+ Map payloadMap = (Map) payload;
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ schema.getFields()
+ .stream()
+ .filter(field -> payloadMap.containsKey(field.name()))
+ .forEach(field -> builder.set(field, payloadMap.get(field.name())));
+ return builder.build();
+ }
+
+}
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/package-info.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/package-info.java
new file mode 100644
index 0000000000..505a15510c
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Avro serialization support for Spring Cloud Contract messaging.
+ */
+package org.springframework.cloud.contract.verifier.messaging.avro;
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaMetadata.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaMetadata.java
index c1c1251433..6c79009d1c 100644
--- a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaMetadata.java
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaMetadata.java
@@ -18,6 +18,7 @@
import java.util.Map;
+import org.springframework.cloud.contract.verifier.messaging.avro.AvroMetadata;
import org.springframework.cloud.contract.verifier.util.MetadataUtil;
import org.springframework.cloud.contract.verifier.util.SpringCloudContractMetadata;
@@ -27,92 +28,164 @@
* @author Marcin Grzejszczak
* @since 3.0.0
*/
-public class KafkaMetadata implements SpringCloudContractMetadata {
-
- /**
- * Key under which this metadata entry can be found in contract's metadata.
- */
- public static final String METADATA_KEY = "kafka";
-
- /**
- * Metadata for the input message.
- */
- private MessageKafkaMetadata input = new MessageKafkaMetadata();
-
- /**
- * Metadata for the output message.
- */
- private MessageKafkaMetadata outputMessage = new MessageKafkaMetadata();
-
- public MessageKafkaMetadata getInput() {
- return this.input;
- }
-
- public void setInput(MessageKafkaMetadata input) {
- this.input = input;
- }
-
- public MessageKafkaMetadata getOutputMessage() {
- return this.outputMessage;
- }
-
- public void setOutputMessage(MessageKafkaMetadata outputMessage) {
- this.outputMessage = outputMessage;
- }
-
- public static KafkaMetadata fromMetadata(Map metadata) {
- return MetadataUtil.fromMetadata(metadata, KafkaMetadata.METADATA_KEY, new KafkaMetadata());
- }
-
- @Override
- public String key() {
- return METADATA_KEY;
- }
-
- @Override
- public String description() {
- return "Metadata for Kafka based communication";
- }
-
- /**
- * Kafka message metadata.
- */
- public static class MessageKafkaMetadata {
-
- /**
- * Properties related to connecting to a real broker.
- */
- private ConnectToBroker connectToBroker = new ConnectToBroker();
-
- public ConnectToBroker getConnectToBroker() {
- return this.connectToBroker;
- }
-
- public void setConnectToBroker(ConnectToBroker connectToBroker) {
- this.connectToBroker = connectToBroker;
- }
-
- }
-
- /**
- * Options related to connecting to the real broker.
- */
- public static class ConnectToBroker {
-
- /**
- * If set, will append any options to the existing ones that define connection to
- * the broker.
- */
- private String additionalOptions;
-
- public String getAdditionalOptions() {
- return this.additionalOptions;
- }
-
- public void setAdditionalOptions(String additionalOptions) {
- this.additionalOptions = additionalOptions;
- }
-
- }
+public final class KafkaMetadata implements SpringCloudContractMetadata {
+
+ /**
+ * Key under which this metadata entry can be found in contract's metadata.
+ */
+ public static final String METADATA_KEY = "kafka";
+
+ /**
+ * Metadata for the input message.
+ */
+ private MessageKafkaMetadata input = new MessageKafkaMetadata();
+
+ /**
+ * Metadata for the output message.
+ */
+ private MessageKafkaMetadata outputMessage = new MessageKafkaMetadata();
+
+ /**
+ * Avro serialization metadata. Configures the schema used to
+ * serialize/deserialize messages on this Kafka topic.
+ */
+ private AvroMetadata avro = new AvroMetadata();
+
+ /**
+ * Returns the input message metadata.
+ *
+ * @return the input metadata
+ */
+ public MessageKafkaMetadata getInput() {
+ return this.input;
+ }
+
+ /**
+ * Sets the input message metadata.
+ *
+ * @param value the input metadata
+ */
+ public void setInput(final MessageKafkaMetadata value) {
+ this.input = value;
+ }
+
+ /**
+ * Returns the output message metadata.
+ *
+ * @return the output message metadata
+ */
+ public MessageKafkaMetadata getOutputMessage() {
+ return this.outputMessage;
+ }
+
+ /**
+ * Sets the output message metadata.
+ *
+ * @param value the output message metadata
+ */
+ public void setOutputMessage(final MessageKafkaMetadata value) {
+ this.outputMessage = value;
+ }
+
+ /**
+ * Returns the Avro serialization metadata.
+ *
+ * @return the Avro metadata
+ */
+ public AvroMetadata getAvro() {
+ return this.avro;
+ }
+
+ /**
+ * Sets the Avro serialization metadata.
+ *
+ * @param value the Avro metadata
+ */
+ public void setAvro(final AvroMetadata value) {
+ this.avro = value;
+ }
+
+ /**
+ * Creates a {@link KafkaMetadata} instance from the given metadata map.
+ *
+ * @param metadata the contract metadata map
+ * @return the parsed KafkaMetadata
+ */
+ public static KafkaMetadata fromMetadata(
+ final Map metadata) {
+ return MetadataUtil.fromMetadata(metadata,
+ KafkaMetadata.METADATA_KEY, new KafkaMetadata());
+ }
+
+ @Override
+ public String key() {
+ return METADATA_KEY;
+ }
+
+ @Override
+ public String description() {
+ return "Metadata for Kafka based communication";
+ }
+
+ /**
+ * Kafka message metadata.
+ */
+ public static final class MessageKafkaMetadata {
+
+ /**
+ * Properties related to connecting to a real broker.
+ */
+ private ConnectToBroker connectToBroker = new ConnectToBroker();
+
+ /**
+ * Returns the broker connection properties.
+ *
+ * @return the connect-to-broker config
+ */
+ public ConnectToBroker getConnectToBroker() {
+ return this.connectToBroker;
+ }
+
+ /**
+ * Sets the broker connection properties.
+ *
+ * @param value the connect-to-broker config
+ */
+ public void setConnectToBroker(final ConnectToBroker value) {
+ this.connectToBroker = value;
+ }
+
+ }
+
+ /**
+ * Options related to connecting to the real broker.
+ */
+ public static final class ConnectToBroker {
+
+ /**
+ * If set, will append any options to the existing ones that
+ * define connection to the broker.
+ */
+ private String additionalOptions;
+
+ /**
+ * Returns the additional broker connection options.
+ *
+ * @return the additional options
+ */
+ public String getAdditionalOptions() {
+ return this.additionalOptions;
+ }
+
+ /**
+ * Sets the additional broker connection options.
+ *
+ * @param value the additional options
+ */
+ public void setAdditionalOptions(final String value) {
+ this.additionalOptions = value;
+ }
+
+ }
}
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/package-info.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/package-info.java
new file mode 100644
index 0000000000..7f95fa5b6b
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka messaging support for Spring Cloud Contract.
+ */
+package org.springframework.cloud.contract.verifier.messaging.kafka;
diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/noop/NoOpContractVerifierAutoConfiguration.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/noop/NoOpContractVerifierAutoConfiguration.java
index 1a562bb36d..6c3b2259bc 100644
--- a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/noop/NoOpContractVerifierAutoConfiguration.java
+++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/noop/NoOpContractVerifierAutoConfiguration.java
@@ -90,11 +90,8 @@ public ContractVerifierMessaging contractVerifierMessaging() {
@Bean
@ConditionalOnMissingBean
public ContractVerifierObjectMapper contractVerifierObjectMapper(ObjectProvider jsonMapper) {
- JsonMapper mapper = jsonMapper.getIfAvailable();
- if (mapper != null) {
- return new ContractVerifierObjectMapper(mapper);
- }
- return new ContractVerifierObjectMapper();
+ JsonMapper mapper = jsonMapper.getIfAvailable(JsonMapper::new);
+ return new ContractVerifierObjectMapper(mapper);
}
}
diff --git a/spring-cloud-contract-verifier/src/main/resources/META-INF/spring/org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier.imports b/spring-cloud-contract-verifier/src/main/resources/META-INF/spring/org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier.imports
index d095ad09bd..de8035cefa 100644
--- a/spring-cloud-contract-verifier/src/main/resources/META-INF/spring/org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier.imports
+++ b/spring-cloud-contract-verifier/src/main/resources/META-INF/spring/org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier.imports
@@ -3,3 +3,4 @@ org.springframework.cloud.contract.verifier.messaging.integration.ContractVerifi
org.springframework.cloud.contract.verifier.messaging.camel.ContractVerifierCamelConfiguration
org.springframework.cloud.contract.verifier.messaging.jms.ContractVerifierJmsConfiguration
org.springframework.cloud.contract.verifier.messaging.noop.NoOpContractVerifierAutoConfiguration
+org.springframework.cloud.contract.verifier.messaging.avro.KafkaAvroContractVerifierConfiguration
diff --git a/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadataSpec.groovy b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadataSpec.groovy
new file mode 100644
index 0000000000..a7b6e7345a
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadataSpec.groovy
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.avro
+
+import spock.lang.Specification
+import tools.jackson.dataformat.yaml.YAMLMapper
+
+import org.springframework.cloud.contract.verifier.messaging.kafka.KafkaMetadata
+
+class AvroMetadataSpec extends Specification {
+
+ YAMLMapper mapper = new YAMLMapper()
+
+ def "should parse avro metadata nested under kafka"() {
+ given:
+ def yamlEntry = """
+kafka:
+ avro:
+ schema: classpath:avro/Book.avsc
+"""
+ when:
+ def parsed = mapper.readerForMapOf(Object).readValue(yamlEntry)
+ KafkaMetadata kafkaMetadata = KafkaMetadata.fromMetadata(parsed)
+ then:
+ kafkaMetadata.avro.schema == "classpath:avro/Book.avsc"
+ }
+
+ def "should return empty avro metadata when avro key is absent"() {
+ given:
+ def yamlEntry = """
+kafka:
+ outputMessage:
+ connectToBroker:
+ additionalOptions: foo
+"""
+ when:
+ def parsed = mapper.readerForMapOf(Object).readValue(yamlEntry)
+ KafkaMetadata kafkaMetadata = KafkaMetadata.fromMetadata(parsed)
+ then:
+ kafkaMetadata.avro.schema == null
+ }
+
+}
diff --git a/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSenderSpec.groovy b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSenderSpec.groovy
new file mode 100644
index 0000000000..6c04fc911e
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSenderSpec.groovy
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.avro
+
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.springframework.cloud.contract.verifier.converter.YamlContract
+import org.springframework.kafka.core.KafkaTemplate
+import spock.lang.Specification
+import tools.jackson.dataformat.yaml.YAMLMapper
+
+class KafkaAvroMessageVerifierSenderSpec extends Specification {
+
+ static final String DUMMY_ISBN = "978-1234567890"
+ static final String DUMMY_TITLE = "Contract Testing for Dummies"
+
+ KafkaTemplate kafkaTemplate = Mock()
+ KafkaAvroMessageVerifierSender sender = new KafkaAvroMessageVerifierSender(kafkaTemplate)
+ YAMLMapper yamlMapper = new YAMLMapper()
+
+ def "should parse yml contract with inline schema and send avro message to kafka"() {
+ given:
+ def contractYaml = """
+label: book_returned
+input:
+ triggeredBy: publishBookReturned()
+outputMessage:
+ sentTo: book.returned
+ body:
+ isbn: "$DUMMY_ISBN"
+ title: "$DUMMY_TITLE"
+metadata:
+ kafka:
+ avro:
+ schema: >
+ {
+ "type": "record",
+ "name": "Book",
+ "fields": [
+ {"name": "isbn", "type": "string"},
+ {"name": "title", "type": "string"}
+ ]
+ }
+"""
+ YamlContract contract = yamlMapper.readerFor(YamlContract).readValue(contractYaml)
+ Map payload = [isbn: DUMMY_ISBN, title: DUMMY_TITLE]
+
+ when:
+ sender.send(payload, [:], "book.returned", contract)
+
+ then:
+ 1 * kafkaTemplate.send({
+ it.topic() == "book.returned" &&
+ it.value()["isbn"] == DUMMY_ISBN &&
+ it.value()["title"] == DUMMY_TITLE
+ })
+ }
+
+ def "should parse yml contract with classpath schema and send avro message to kafka"() {
+ given:
+ def contractYaml = """
+label: book_returned
+input:
+ triggeredBy: publishBookReturned()
+outputMessage:
+ sentTo: book.returned
+ body:
+ isbn: "$DUMMY_ISBN"
+ title: "$DUMMY_TITLE"
+metadata:
+ kafka:
+ avro:
+ schema: classpath:avro/Book.avsc
+"""
+ YamlContract contract = yamlMapper.readerFor(YamlContract).readValue(contractYaml)
+ Map payload = [isbn: DUMMY_ISBN, title: DUMMY_TITLE]
+
+ when:
+ sender.send(payload, [:], "book.returned", contract)
+
+ then:
+ 1 * kafkaTemplate.send({ ProducerRecord record ->
+ record.topic() == "book.returned" &&
+ record.value()["isbn"] == DUMMY_ISBN &&
+ record.value()["title"] == DUMMY_TITLE
+ })
+ }
+
+ def "should propagate headers to the kafka ProducerRecord"() {
+ given:
+ def contractYaml = """
+label: book_returned
+input:
+ triggeredBy: publishBookReturned()
+outputMessage:
+ sentTo: book.returned
+ body:
+ isbn: "$DUMMY_ISBN"
+ title: "$DUMMY_TITLE"
+metadata:
+ kafka:
+ avro:
+ schema: classpath:avro/Book.avsc
+"""
+ YamlContract contract = yamlMapper.readerFor(YamlContract).readValue(contractYaml)
+ Map payload = [isbn: DUMMY_ISBN, title: DUMMY_TITLE]
+ Map headers = ["X-Correlation-Id": "abc-123", "Content-Type": "avro/binary"]
+ when:
+ sender.send(payload, headers, "book.returned", contract)
+ then:
+ 1 * kafkaTemplate.send({
+ it.topic() == "book.returned" &&
+ header(it, "X-Correlation-Id") == "abc-123" &&
+ header(it, "Content-Type") == "avro/binary"
+ })
+ }
+
+ def "should fail when StubRunnerExecutor passes a JSON string payload instead of a map (bug #2404)"() {
+ given:
+ def contractYaml = """
+label: book_returned
+input:
+ triggeredBy: publishBookReturned()
+outputMessage:
+ sentTo: book.returned
+ body:
+ isbn: "$DUMMY_ISBN"
+ title: "$DUMMY_TITLE"
+metadata:
+ kafka:
+ avro:
+ schema: classpath:avro/Book.avsc
+"""
+ YamlContract contract = yamlMapper.readerFor(YamlContract).readValue(contractYaml)
+ String jsonPayload = """{"isbn":"$DUMMY_ISBN","title":"$DUMMY_TITLE"}"""
+ when:
+ sender.send(jsonPayload, [:], "book.returned", contract)
+ then:
+ thrown(IllegalArgumentException)
+ }
+
+ String header(ProducerRecord record, String key) {
+ new String(record.headers().lastHeader(key).value())
+ }
+}
diff --git a/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/ContractVerifierObjectMapperAvroSpec.groovy b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/ContractVerifierObjectMapperAvroSpec.groovy
new file mode 100644
index 0000000000..e2b223a3f6
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/ContractVerifierObjectMapperAvroSpec.groovy
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.contract.verifier.messaging.internal
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.cloud.contract.verifier.messaging.noop.NoOpContractVerifierAutoConfiguration
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@ContextConfiguration(classes = [NoOpContractVerifierAutoConfiguration])
+class ContractVerifierObjectMapperAvroSpec extends Specification {
+
+ @Autowired
+ ContractVerifierObjectMapper mapper
+
+ def "should convert an Avro-generated object into a json representation"() {
+ given:
+ FooAvro input = FooAvro.newBuilder().setFooAvro("barAvro").build()
+ when:
+ String result = mapper.writeValueAsString(input)
+ then:
+ result == '{"fooAvro":"barAvro"}'
+ }
+}
diff --git a/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/FooAvro.java b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/FooAvro.java
new file mode 100644
index 0000000000..5e732778a8
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/groovy/org/springframework/cloud/contract/verifier/messaging/internal/FooAvro.java
@@ -0,0 +1,333 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.springframework.cloud.contract.verifier.messaging.internal;
+
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
+
+/** Dummy Avro object for testing purposes */
+@org.apache.avro.specific.AvroGenerated
+public class FooAvro extends org.apache.avro.specific.SpecificRecordBase
+ implements org.apache.avro.specific.SpecificRecord {
+
+ private static final long serialVersionUID = -2221379489582530192L;
+
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
+ "{\"type\":\"record\",\"name\":\"FooAvro\",\"namespace\":\"org.springframework.cloud.contract.verifier.messaging.internal\",\"doc\":\"Dummy Avro object for testing purposes\",\"fields\":[{\"name\":\"fooAvro\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"foo field\"}]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA$;
+ }
+
+ private static final SpecificData MODEL$ = new SpecificData();
+
+ private static final BinaryMessageEncoder ENCODER = new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
+
+ private static final BinaryMessageDecoder DECODER = new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
+
+ /**
+ * Return the BinaryMessageEncoder instance used by this class.
+ * @return the message encoder used by this class
+ */
+ public static BinaryMessageEncoder getEncoder() {
+ return ENCODER;
+ }
+
+ /**
+ * Return the BinaryMessageDecoder instance used by this class.
+ * @return the message decoder used by this class
+ */
+ public static BinaryMessageDecoder getDecoder() {
+ return DECODER;
+ }
+
+ /**
+ * Create a new BinaryMessageDecoder instance for this class that uses the specified
+ * {@link SchemaStore}.
+ * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+ * @return a BinaryMessageDecoder instance for this class backed by the given
+ * SchemaStore
+ */
+ public static BinaryMessageDecoder createDecoder(SchemaStore resolver) {
+ return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
+ }
+
+ /**
+ * Serializes this FooAvro to a ByteBuffer.
+ * @return a buffer holding the serialized data for this instance
+ * @throws java.io.IOException if this instance could not be serialized
+ */
+ public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+ return ENCODER.encode(this);
+ }
+
+ /**
+ * Deserializes a FooAvro from a ByteBuffer.
+ * @param b a byte buffer holding serialized data for an instance of this class
+ * @return a FooAvro instance decoded from the given buffer
+ * @throws java.io.IOException if the given bytes could not be deserialized into an
+ * instance of this class
+ */
+ public static FooAvro fromByteBuffer(java.nio.ByteBuffer b) throws java.io.IOException {
+ return DECODER.decode(b);
+ }
+
+ /** foo field */
+ private java.lang.String fooAvro;
+
+ /**
+ * Default constructor. Note that this does not initialize fields to their default
+ * values from the schema. If that is desired then one should use
+ * newBuilder().
+ */
+ public FooAvro() {
+ }
+
+ /**
+ * All-args constructor.
+ * @param fooAvro foo field
+ */
+ public FooAvro(java.lang.String fooAvro) {
+ this.fooAvro = fooAvro;
+ }
+
+ @Override
+ public org.apache.avro.specific.SpecificData getSpecificData() {
+ return MODEL$;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return SCHEMA$;
+ }
+
+ // Used by DatumWriter. Applications should not call.
+ @Override
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0:
+ return fooAvro;
+ default:
+ throw new IndexOutOfBoundsException("Invalid index: " + field$);
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @Override
+ @SuppressWarnings(value = "unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0:
+ fooAvro = value$ != null ? value$.toString() : null;
+ break;
+ default:
+ throw new IndexOutOfBoundsException("Invalid index: " + field$);
+ }
+ }
+
+ /**
+ * Gets the value of the 'fooAvro' field.
+ * @return foo field
+ */
+ public java.lang.String getFooAvro() {
+ return fooAvro;
+ }
+
+ /**
+ * Sets the value of the 'fooAvro' field. foo field
+ * @param value the value to set.
+ */
+ public void setFooAvro(java.lang.String value) {
+ this.fooAvro = value;
+ }
+
+ /**
+ * Creates a new FooAvro RecordBuilder.
+ * @return A new FooAvro RecordBuilder
+ */
+ public static org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder newBuilder() {
+ return new org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder();
+ }
+
+ /**
+ * Creates a new FooAvro RecordBuilder by copying an existing Builder.
+ * @param other The existing builder to copy.
+ * @return A new FooAvro RecordBuilder
+ */
+ public static org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder newBuilder(
+ org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder other) {
+ if (other == null) {
+ return new org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder();
+ }
+ else {
+ return new org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder(other);
+ }
+ }
+
+ /**
+ * Creates a new FooAvro RecordBuilder by copying an existing FooAvro instance.
+ * @param other The existing instance to copy.
+ * @return A new FooAvro RecordBuilder
+ */
+ public static org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder newBuilder(
+ org.springframework.cloud.contract.verifier.messaging.internal.FooAvro other) {
+ if (other == null) {
+ return new org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder();
+ }
+ else {
+ return new org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder(other);
+ }
+ }
+
+ /**
+ * RecordBuilder for FooAvro instances.
+ */
+ @org.apache.avro.specific.AvroGenerated
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder {
+
+ /** foo field */
+ private java.lang.String fooAvro;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(SCHEMA$, MODEL$);
+ }
+
+ /**
+ * Creates a Builder by copying an existing Builder.
+ * @param other The existing Builder to copy.
+ */
+ private Builder(org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.fooAvro)) {
+ this.fooAvro = data().deepCopy(fields()[0].schema(), other.fooAvro);
+ fieldSetFlags()[0] = other.fieldSetFlags()[0];
+ }
+ }
+
+ /**
+ * Creates a Builder by copying an existing FooAvro instance
+ * @param other The existing instance to copy.
+ */
+ private Builder(org.springframework.cloud.contract.verifier.messaging.internal.FooAvro other) {
+ super(SCHEMA$, MODEL$);
+ if (isValidValue(fields()[0], other.fooAvro)) {
+ this.fooAvro = data().deepCopy(fields()[0].schema(), other.fooAvro);
+ fieldSetFlags()[0] = true;
+ }
+ }
+
+ /**
+ * Gets the value of the 'fooAvro' field. foo field
+ * @return The value.
+ */
+ public java.lang.String getFooAvro() {
+ return fooAvro;
+ }
+
+ /**
+ * Sets the value of the 'fooAvro' field. foo field
+ * @param value The value of 'fooAvro'.
+ * @return This builder.
+ */
+ public org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder setFooAvro(
+ java.lang.String value) {
+ validate(fields()[0], value);
+ this.fooAvro = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'fooAvro' field has been set. foo field
+ * @return True if the 'fooAvro' field has been set, false otherwise.
+ */
+ public boolean hasFooAvro() {
+ return fieldSetFlags()[0];
+ }
+
+ /**
+ * Clears the value of the 'fooAvro' field. foo field
+ * @return This builder.
+ */
+ public org.springframework.cloud.contract.verifier.messaging.internal.FooAvro.Builder clearFooAvro() {
+ fooAvro = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public FooAvro build() {
+ try {
+ FooAvro record = new FooAvro();
+ record.fooAvro = fieldSetFlags()[0] ? this.fooAvro : (java.lang.String) defaultValue(fields()[0]);
+ return record;
+ }
+ catch (org.apache.avro.AvroMissingFieldException e) {
+ throw e;
+ }
+ catch (java.lang.Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumWriter WRITER$ = (org.apache.avro.io.DatumWriter) MODEL$
+ .createDatumWriter(SCHEMA$);
+
+ @Override
+ public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException {
+ WRITER$.write(this, SpecificData.getEncoder(out));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final org.apache.avro.io.DatumReader READER$ = (org.apache.avro.io.DatumReader) MODEL$
+ .createDatumReader(SCHEMA$);
+
+ @Override
+ public void readExternal(java.io.ObjectInput in) throws java.io.IOException {
+ READER$.read(this, SpecificData.getDecoder(in));
+ }
+
+ @Override
+ protected boolean hasCustomCoders() {
+ return true;
+ }
+
+ @Override
+ public void customEncode(org.apache.avro.io.Encoder out) throws java.io.IOException {
+ out.writeString(this.fooAvro);
+
+ }
+
+ @Override
+ public void customDecode(org.apache.avro.io.ResolvingDecoder in) throws java.io.IOException {
+ org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+ if (fieldOrder == null) {
+ this.fooAvro = in.readString();
+
+ }
+ else {
+ for (int i = 0; i < 1; i++) {
+ switch (fieldOrder[i].pos()) {
+ case 0:
+ this.fooAvro = in.readString();
+ break;
+
+ default:
+ throw new java.io.IOException("Corrupt ResolvingDecoder.");
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/spring-cloud-contract-verifier/src/test/resources/avro/Book.avsc b/spring-cloud-contract-verifier/src/test/resources/avro/Book.avsc
new file mode 100644
index 0000000000..e943ebda01
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/resources/avro/Book.avsc
@@ -0,0 +1,8 @@
+{
+ "type": "record",
+ "name": "Book",
+ "fields": [
+ {"name": "isbn", "type": "string"},
+ {"name": "title", "type": "string"}
+ ]
+}
diff --git a/spring-cloud-contract-verifier/src/test/resources/yml/contract_message_avro.yml b/spring-cloud-contract-verifier/src/test/resources/yml/contract_message_avro.yml
new file mode 100644
index 0000000000..bb2081af54
--- /dev/null
+++ b/spring-cloud-contract-verifier/src/test/resources/yml/contract_message_avro.yml
@@ -0,0 +1,14 @@
+label: book_returned
+input:
+ triggeredBy: publishBookReturned()
+outputMessage:
+ sentTo: book.returned
+ headers:
+ X-Correlation-Id: abc-123-def
+ body:
+ isbn: "978-1234567890"
+ title: "Contract Testing for Dummies"
+metadata:
+ kafka:
+ avro:
+ schema: classpath:avro/Book.avsc