diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..eacea596 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,27 @@ +# Agent Instructions + +## Repository Structure + +- `kstreamplify-core`: core Kafka Streams functionality. +- `kstreamplify-core-test`: test utilities and base classes for testing topologies. +- `kstreamplify-spring-boot`: Spring Boot autoconfiguration and web services. + +## Commands + +- Build with `mvn clean package`. +- Run the tests with `mvn test`. +- Run `mvn spotless:apply` before committing to apply Palantir Java Format. + +## Coding Standards + +- Target Java 17. +- Code follows Palantir Java Format. +- Add minimal Javadoc to every method in production code (`src/main`), including `@Override` methods, with descriptions for parameters and return values. Start each description with an uppercase letter. + +## Testing Standards + +- Name test classes `Test`. +- Name unit tests with the "should..." convention (e.g. `shouldNotRegisterPropertiesWhenNull`). +- Use JUnit Jupiter assertions (`org.junit.jupiter.api.Assertions`). +- Use Mockito with `@ExtendWith(MockitoExtension.class)`. +- Use Testcontainers for Kafka integration tests. \ No newline at end of file diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java index f288c3e6..0f25bce2 100644 --- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java +++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java @@ -136,7 +136,7 @@ protected Map getSpecificProperties() { /** * Close everything after each test. * - * @throws IOException if an I/O error occurs while deleting the state directory + * @throws IOException If an I/O error occurs while deleting the state directory */ @AfterEach protected void generalTearDown() throws IOException { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java index 90cceec9..6aaadc7d 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java @@ -51,6 +51,7 @@ public class AvroToJsonConverter { .setPrettyPrinting() .create(); + /** Private constructor. */ private AvroToJsonConverter() {} /** @@ -82,20 +83,20 @@ public static String convertObject(List values) { } /** - * Convert the record from avro format to json format. + * Convert the record from avro format to JSON format. * - * @param inputRecord the record in avro format - * @return the record in json format + * @param inputRecord The record in avro format + * @return The record in JSON format */ public static String convertRecord(GenericRecord inputRecord) { return gson.toJson(recordAsMap(inputRecord)); } /** - * Convert avro to a map for json format. + * Convert avro to a map for JSON format. * - * @param inputRecord record in avro - * @return map for json format + * @param inputRecord Record in avro + * @return Map for JSON format */ private static Map recordAsMap(GenericRecord inputRecord) { Map recordMapping = new HashMap<>(); @@ -145,12 +146,29 @@ private static Map recordAsMap(GenericRecord inputRecord) { private static class LocalDateTypeAdapter implements JsonSerializer, JsonDeserializer { private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + /** + * Serialize a {@link LocalDate} to its JSON representation. + * + * @param date The date to serialize + * @param typeOfSrc The type of the source object + * @param context The serialization context + * @return The serialized JSON element + */ @Override public JsonElement serialize( final LocalDate date, final Type typeOfSrc, final JsonSerializationContext context) { return new JsonPrimitive(date.format(formatter)); } + /** + * Deserialize a {@link LocalDate} from its JSON representation. + * + * @param json The JSON element to deserialize + * @param typeOfT The type of the target object + * @param context The deserialization context + * @return The deserialized date + * @throws JsonParseException If the JSON element cannot be parsed + */ @Override public LocalDate deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { @@ -165,6 +183,14 @@ private static class LocalDateTimeTypeAdapter private static final DateTimeFormatter formatterNano = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"); + /** + * Serialize a {@link LocalDateTime} to its JSON representation. + * + * @param localDateTime The date-time to serialize + * @param srcType The type of the source object + * @param context The serialization context + * @return The serialized JSON element + */ @Override public JsonElement serialize(LocalDateTime localDateTime, Type srcType, JsonSerializationContext context) { if (localDateTime.toString().length() == 29) { @@ -173,6 +199,15 @@ public JsonElement serialize(LocalDateTime localDateTime, Type srcType, JsonSeri return new JsonPrimitive(formatter.format(localDateTime)); } + /** + * Deserialize a {@link LocalDateTime} from its JSON representation. + * + * @param json The JSON element to deserialize + * @param typeOfT The type of the target object + * @param context The deserialization context + * @return The deserialized date-time + * @throws JsonParseException If the JSON element cannot be parsed + */ @Override public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { @@ -184,6 +219,14 @@ private static class LocalTimeTypeAdapter implements JsonSerializer, private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); private static final DateTimeFormatter formatterNano = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"); + /** + * Serialize a {@link LocalTime} to its JSON representation. + * + * @param localTime The time to serialize + * @param srcType The type of the source object + * @param context The serialization context + * @return The serialized JSON element + */ @Override public JsonElement serialize(LocalTime localTime, Type srcType, JsonSerializationContext context) { if (localTime.toString().length() == 15) { @@ -192,6 +235,15 @@ public JsonElement serialize(LocalTime localTime, Type srcType, JsonSerializatio return new JsonPrimitive(formatter.format(localTime)); } + /** + * Deserialize a {@link LocalTime} from its JSON representation. + * + * @param json The JSON element to deserialize + * @param typeOfT The type of the target object + * @param context The deserialization context + * @return The deserialized time + * @throws JsonParseException If the JSON element cannot be parsed + */ @Override public LocalTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java index 16a163a3..16536741 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java @@ -52,13 +52,14 @@ public class JsonToAvroConverter { .setPrettyPrinting() .create(); + /** Private constructor. */ private JsonToAvroConverter() {} /** - * Convert a json string to an object. + * Convert a JSON string to an object. * - * @param json the json string - * @return the object + * @param json The JSON string + * @return The object */ public static Object jsonToObject(String json) { if (json == null) { @@ -69,22 +70,22 @@ public static Object jsonToObject(String json) { } /** - * Convert a file in json to avro. + * Convert a file in JSON to avro. * - * @param file the file in json - * @param schema the avro schema to use - * @return the record in avro + * @param file The file in json + * @param schema The avro schema to use + * @return The record in avro */ public static SpecificRecordBase jsonToAvro(String file, Schema schema) { return jsonToAvro(JsonParser.parseString(file).getAsJsonObject(), schema); } /** - * Convert json to avro. + * Convert JSON to Avro. * - * @param jsonEvent the json record - * @param schema the avro schema to use - * @return the record in avro + * @param jsonEvent The JSON record + * @param schema The avro schema to use + * @return The record in avro */ public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) { try { @@ -101,8 +102,8 @@ public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) /** * Populate avro records from json. * - * @param jsonObject json data to provide to the avro record - * @param message the avro record to populate + * @param jsonObject Json data to provide to the avro record + * @param message The avro record to populate */ private static void populateGenericRecordFromJson(JsonObject jsonObject, SpecificRecordBase message) { // Iterate over object attributes @@ -211,9 +212,9 @@ private static void populateGenericRecordFromJson(JsonObject jsonObject, Specifi /** * Populate field with corresponding type. * - * @param jsonElement the json element to convert - * @param type the type of the element - * @return the element converted with the corresponding type + * @param jsonElement The JSON element to convert + * @param type The type of the element + * @return The element converted with the corresponding type */ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement, Schema.Type type) { return switch (type) { @@ -229,9 +230,9 @@ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement /** * Populate field in record with corresponding type. * - * @param jsonObject data to provide to the avro record - * @param fieldName the name to populate - * @param result the avro record populated + * @param jsonObject Data to provide to the avro record + * @param fieldName The name to populate + * @param result The avro record populated */ @SuppressWarnings("unchecked") private static void populateFieldInRecordWithCorrespondingType( @@ -379,9 +380,9 @@ private static void populateFieldInRecordWithCorrespondingType( /** * Get base class. * - * @param baseNamespace the namespace of the class - * @param typeName the class type - * @return the base class + * @param baseNamespace The namespace of the class + * @param typeName The class type + * @return The base class */ @SuppressWarnings("unchecked") private static Class baseClass(String baseNamespace, String typeName) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java index c06e72da..06116807 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java @@ -63,8 +63,8 @@ public DedupHeadersProcessor( /** * Get the header value for a given key * - * @param headers headers of the record - * @param key the key to look for in the headers + * @param headers Headers of the record + * @param key The key to look for in the headers * @return The header value for the given key, or an empty string if the header is not present or has no value. */ private static String getHeader(Headers headers, String key) { @@ -79,7 +79,7 @@ private static String getHeader(Headers headers, String key) { /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext context) { @@ -90,7 +90,7 @@ public void init(ProcessorContext context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java index afa9049b..9fa69a81 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java @@ -65,7 +65,7 @@ public DedupHeadersProcessorWithErrors( /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext> context) { @@ -76,7 +76,7 @@ public void init(ProcessorContext> context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { @@ -121,8 +121,8 @@ private String buildIdentifier(Headers headers) { /** * Get the header value for a given key * - * @param headers headers of the record - * @param key the key to look for in the headers + * @param headers Headers of the record + * @param key The key to look for in the headers * @return The header value for the given key, or an empty string if the header is not present or has no value. */ private static String getHeader(Headers headers, String key) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index 567c643e..9f09b60b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -53,7 +53,7 @@ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuratio /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext context) { @@ -64,7 +64,7 @@ public void init(ProcessorContext context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java index 3c2427e6..edb70e4e 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java @@ -55,7 +55,7 @@ public DedupKeyProcessorWithErrors(String windowStoreName, Duration retentionWin /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext> context) { @@ -66,7 +66,7 @@ public void init(ProcessorContext> context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java index 61272a70..d07b6b6a 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java @@ -53,7 +53,7 @@ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHo /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext context) { @@ -64,7 +64,7 @@ public void init(ProcessorContext context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java index 59d5c222..7ac0836d 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java @@ -55,7 +55,7 @@ public DedupKeyValueProcessorWithErrors(String windowStoreName, Duration retenti /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext> context) { @@ -66,7 +66,7 @@ public void init(ProcessorContext> context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 7f297c36..91af3658 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -59,7 +59,7 @@ public DedupWithPredicateProcessor( /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext context) { @@ -70,7 +70,7 @@ public void init(ProcessorContext context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java index 6c5ae625..63888525 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java @@ -61,7 +61,7 @@ public DedupWithPredicateProcessorWithErrors( /** * Initialize the processor. * - * @param context the processor context + * @param context The processor context */ @Override public void init(ProcessorContext> context) { @@ -72,7 +72,7 @@ public void init(ProcessorContext> context) { /** * Process a record. * - * @param message the record to process + * @param message The record to process */ @Override public void process(Record message) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index 7ff7124e..90f1cfa5 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -38,6 +38,7 @@ public final class DeduplicationUtils { private static final String DEFAULT_WINDOWSTORE = "WindowStore"; private static final String DEFAULT_REPARTITION = "Repartition"; + /** Private constructor. */ private DeduplicationUtils() {} /** @deprecated Since 1.8.0, use {@link #deduplicateByKeyWithErrors(StreamsBuilder, KStream, Duration)} instead. */ @@ -87,13 +88,13 @@ public static KStream> * *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate (must have String keys) - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param the value type of the stream - * @return a deduplicated stream containing {@link ProcessingResult} + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate (must have String keys) + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param The value type of the stream + * @return A deduplicated stream containing {@link ProcessingResult} */ public static KStream> deduplicateByKeyWithErrors( StreamsBuilder streamsBuilder, @@ -134,13 +135,13 @@ public static KStream deduplicateByKey( * *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate (must have String keys) - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param the value type of the stream - * @return a deduplicated stream containing + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate (must have String keys) + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param The value type of the stream + * @return A deduplicated stream containing */ public static KStream deduplicateByKey( StreamsBuilder streamsBuilder, @@ -211,13 +212,13 @@ public static KStream> *

Records with identical key-value pairs within the configured time window are considered duplicates and are * filtered out. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param the value type of the stream - * @return a deduplicated stream containing {@link ProcessingResult} + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param The value type of the stream + * @return A deduplicated stream containing {@link ProcessingResult} */ public static KStream> deduplicateByKeyValueWithErrors( StreamsBuilder streamsBuilder, @@ -258,13 +259,13 @@ public static KStream deduplicateByKeyValu *

Records with identical key-value pairs within the configured time window are considered duplicates and are * filtered out. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param the value type of the stream - * @return a deduplicated stream containing + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param The value type of the stream + * @return A deduplicated stream containing */ public static KStream deduplicateByKeyValue( StreamsBuilder streamsBuilder, @@ -345,14 +346,14 @@ public static KStream> *

The provided extractor builds a deduplication key for each record. Records with identical keys within the * configured time window are considered duplicates and are filtered out. * - * @param streamsBuilder the {@link StreamsBuilder} - * @param initialStream the input stream - * @param storeName state store name - * @param repartitionName repartition topic name - * @param windowDuration deduplication window - * @param extractor function building the deduplication key - * @param value type - * @return a deduplicated stream containing {@link ProcessingResult} + * @param streamsBuilder The {@link StreamsBuilder} + * @param initialStream The input stream + * @param storeName State store name + * @param repartitionName Repartition topic name + * @param windowDuration Deduplication window + * @param extractor Function building the deduplication key + * @param Value type + * @return A deduplicated stream containing {@link ProcessingResult} */ public static KStream> deduplicateByPredicateWithErrors( StreamsBuilder streamsBuilder, @@ -398,14 +399,14 @@ public static KStream> *

The provided extractor builds a deduplication key for each record. Records with identical keys within the * configured time window are considered duplicates and are filtered out. * - * @param streamsBuilder the {@link StreamsBuilder} - * @param initialStream the input stream - * @param storeName state store name - * @param repartitionName repartition topic name - * @param windowDuration deduplication window - * @param extractor function building the deduplication key - * @param value type - * @return a deduplicated stream containing + * @param streamsBuilder The {@link StreamsBuilder} + * @param initialStream The input stream + * @param storeName State store name + * @param repartitionName Repartition topic name + * @param windowDuration Deduplication window + * @param extractor Function building the deduplication key + * @param Value type + * @return A deduplicated stream containing */ public static KStream deduplicateByPredicate( StreamsBuilder streamsBuilder, @@ -490,14 +491,14 @@ public static KStream> * *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate (must have String keys) - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeaders list of header names used to build the deduplication key - * @param the value type of the stream - * @return a deduplicated stream containing {@link ProcessingResult} + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate (must have String keys) + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param deduplicationHeaders List of header names used to build the deduplication key + * @param The value type of the stream + * @return A deduplicated stream containing {@link ProcessingResult} */ public static KStream> deduplicateByHeadersWithErrors( StreamsBuilder streamsBuilder, @@ -545,14 +546,14 @@ public static KStream deduplicateByHeaders * *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder the {@link StreamsBuilder} used to build the topology - * @param initialStream the input stream to deduplicate (must have String keys) - * @param storeName the name of the state store used for deduplication - * @param repartitionName the name of the repartition topic - * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeaders list of header names used to build the deduplication key - * @param the value type of the stream - * @return a deduplicated stream + * @param streamsBuilder The {@link StreamsBuilder} used to build the topology + * @param initialStream The input stream to deduplicate (must have String keys) + * @param storeName The name of the state store used for deduplication + * @param repartitionName The name of the repartition topic + * @param windowDuration The time window during which duplicates are filtered + * @param deduplicationHeaders List of header names used to build the deduplication key + * @param The value type of the stream + * @return A deduplicated stream */ public static KStream deduplicateByHeaders( StreamsBuilder streamsBuilder, diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java index db4a0daf..823654fb 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java @@ -37,11 +37,11 @@ protected DlqExceptionHandler() {} /** * Enrich a KafkaError with exception details * - * @param builder the error builder - * @param exception the exception to add - * @param key record key bytes - * @param value record value bytes - * @return enriched builder + * @param builder The error builder + * @param exception The exception to add + * @param key Record key bytes + * @param value Record value bytes + * @return Enriched builder */ protected KafkaError.Builder enrichWithException( KafkaError.Builder builder, Exception exception, byte[] key, byte[] value) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java index 4a1bee9b..26b19bfd 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java @@ -48,7 +48,7 @@ private ProcessingResult(V value) { /** * Private constructor that sets the error value. * - * @param error the ProcessingError containing the + * @param error The ProcessingError containing the */ private ProcessingResult(ProcessingError error) { this.error = error; @@ -222,7 +222,7 @@ public static Record> wrapRecordFailure( /** * Is the processing result valid. Is it valid either if it contains a successful value or an error * - * @return true if valid, false otherwise + * @return True if valid, false otherwise */ public boolean isValid() { return value != null && error == null; diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java index e1a52ca2..80048816 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java @@ -35,6 +35,7 @@ public class TopologyErrorHandler { private static final String BRANCHING_NAME_NOMINAL = "branch-nominal"; + /** Private constructor. */ private TopologyErrorHandler() {} /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java index cf52622d..a2814ffb 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java @@ -135,6 +135,12 @@ public void setConfig(final String storeName, final Options options, final Map map, String prefix, Propertie * @param properties The properties * @param key The property key * @param defaultValue The default value if the property is not set - * @return true if the feature is enabled, false otherwise + * @return True if the feature is enabled, false otherwise */ public static boolean isFeatureEnabled(Properties properties, String key, boolean defaultValue) { return Boolean.parseBoolean(properties.getProperty(key, Boolean.toString(defaultValue))); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/serde/SerdesUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/serde/SerdesUtils.java index b8edc91d..f4d889e3 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/serde/SerdesUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/serde/SerdesUtils.java @@ -24,13 +24,14 @@ /** The Serde utils class. */ public final class SerdesUtils { + /** Private constructor. */ private SerdesUtils() {} /** * Return a key serde for a requested class. * * @param The class of requested serdes - * @return a serdes for requested class + * @return A serdes for requested class */ public static SpecificAvroSerde getKeySerdes() { return getSerdes(true); @@ -40,7 +41,7 @@ public static SpecificAvroSerde getKeySerdes() { * Return a value serdes for a requested class. * * @param The class of requested serdes - * @return a serdes for requested class + * @return A serdes for requested class */ public static SpecificAvroSerde getValueSerdes() { return getSerdes(false); @@ -51,7 +52,7 @@ public static SpecificAvroSerde getValueSerdes() { * * @param isSerdeForKey Is the serdes for a key or a value * @param The class of requested serdes - * @return a serdes for requested class + * @return A serdes for requested class */ private static SpecificAvroSerde getSerdes(boolean isSerdeForKey) { SpecificAvroSerde serde = new SpecificAvroSerde<>(); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/server/KafkaStreamsHttpServer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/server/KafkaStreamsHttpServer.java index c06ef401..706e3a56 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/server/KafkaStreamsHttpServer.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/server/KafkaStreamsHttpServer.java @@ -114,6 +114,12 @@ public void start() { } } + /** + * Create a Kubernetes probe endpoint. + * + * @param path The endpoint path + * @param kubernetesSupplier The supplier providing the HTTP status code + */ private void createKubernetesEndpoint(String path, IntSupplier kubernetesSupplier) { server.createContext("/" + path, (exchange -> { int code = kubernetesSupplier.getAsInt(); @@ -122,6 +128,7 @@ private void createKubernetesEndpoint(String path, IntSupplier kubernetesSupplie })); } + /** Create the topology endpoint exposing the Kafka Streams topology. */ private void createTopologyEndpoint() { String topologyEndpointPath = (String) kafkaStreamsInitializer .getProperties() @@ -139,6 +146,7 @@ private void createTopologyEndpoint() { })); } + /** Create the interactive queries endpoints exposing the state stores. */ private void createStoreEndpoints() { server.createContext("/" + DEFAULT_STORE_PATH, (exchange -> { try { @@ -171,6 +179,12 @@ private void createStoreEndpoints() { })); } + /** + * Build the response for a store endpoint based on the request URI. + * + * @param exchange The HTTP exchange + * @return The response body to serialize + */ private Object getResponseForStoreEndpoints(HttpExchange exchange) { if (exchange.getRequestURI().toString().equals("/" + DEFAULT_STORE_PATH)) { return keyValueService.getStateStores(); @@ -332,10 +346,24 @@ private Object getResponseForStoreEndpoints(HttpExchange exchange) { return null; } + /** + * Parse a path parameter from the request URI. + * + * @param exchange The HTTP exchange + * @param index The index of the path segment to extract + * @return The path parameter value + */ private String parsePathParam(HttpExchange exchange, int index) { return exchange.getRequestURI().toString().split("\\?")[0].split("/")[index]; } + /** + * Parse a query request parameter from the request URI. + * + * @param exchange The HTTP exchange + * @param key The request parameter name + * @return The request parameter value, or empty if absent + */ private Optional parseRequestParam(HttpExchange exchange, String key) { String[] uriAndParams = exchange.getRequestURI().toString().split("\\?"); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/store/RocksDbConfig.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/store/RocksDbConfig.java index be6b0a8d..1fae2850 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/store/RocksDbConfig.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/store/RocksDbConfig.java @@ -129,6 +129,12 @@ public void setConfig(final String storeName, final Options options, final Map the key type - * @param the value type + * @param stateStore The target state store + * @param key The record key + * @param value The record value + * @param The key type + * @param The value type */ public static void put(WindowStore stateStore, K key, V value) { put(stateStore, key, value, Instant.now().toEpochMilli()); @@ -42,12 +43,12 @@ public static void put(WindowStore stateStore, K key, V value) { /** * Puts a key/value pair into the {@link WindowStore} using the provided timestamp. * - * @param stateStore the target state store - * @param key the record key - * @param value the record value - * @param timestamp the timestamp associated with the record (epoch milliseconds) - * @param the key type - * @param the value type + * @param stateStore The target state store + * @param key The record key + * @param value The record value + * @param timestamp The timestamp associated with the record (epoch milliseconds) + * @param The key type + * @param The value type */ public static void put(WindowStore stateStore, K key, V value, long timestamp) { stateStore.put(key, value, timestamp); @@ -57,12 +58,12 @@ public static void put(WindowStore stateStore, K key, V value, long * Gets the latest value associated with the given key from the {@link WindowStore} within the specified retention * period. * - * @param stateStore the source state store - * @param key the record key - * @param retentionDays the retention period in days to look back from the current time - * @param the key type - * @param the value type - * @return the most recent value for the key within the retention window, or {@code null} if none exists + * @param stateStore The source state store + * @param key The record key + * @param retentionDays The retention period in days to look back from the current time + * @param The key type + * @param The value type + * @return The most recent value for the key within the retention window, or {@code null} if none exists */ public static V get(WindowStore stateStore, K key, int retentionDays) { Instant now = Instant.now(); @@ -72,13 +73,13 @@ public static V get(WindowStore stateStore, K key, int retentionDay /** * Gets the latest value associated with the given key from the {@link WindowStore} within the provided time range. * - * @param stateStore the source state store - * @param key the record key - * @param from the start timestamp (inclusive) - * @param to the end timestamp (inclusive) - * @param the key type - * @param the value type - * @return the most recent value for the key within the given time range, or {@code null} if none exists + * @param stateStore The source state store + * @param key The record key + * @param from The start timestamp (inclusive) + * @param to The end timestamp (inclusive) + * @param The key type + * @param The value type + * @return The most recent value for the key within the given time range, or {@code null} if none exists */ public static V get(WindowStore stateStore, K key, Instant from, Instant to) { var it = stateStore.backwardFetch(key, from, to); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/topic/TopicUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/topic/TopicUtils.java index 4ec40f2c..d27c7177 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/topic/TopicUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/topic/TopicUtils.java @@ -32,6 +32,7 @@ public final class TopicUtils { /** The remap property name. */ public static final String REMAP_PROPERTY_NAME = "remap"; + /** Private constructor. */ private TopicUtils() {} /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java index 2205036f..4d0daac1 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java @@ -28,13 +28,14 @@ */ @Deprecated(since = "1.1.0") public final class SerdesUtils { + /** Private constructor. */ private SerdesUtils() {} /** * Return a key serdes for a requested class. * * @param The class of requested serdes - * @return a serdes for requested class + * @return A serdes for requested class */ public static SpecificAvroSerde getSerdesForKey() { return com.michelin.kstreamplify.serde.SerdesUtils.getKeySerdes(); @@ -44,7 +45,7 @@ public static SpecificAvroSerde getSerdesForKey() * Return a value serdes for a requested class. * * @param The class of requested serdes - * @return a serdes for requested class + * @return A serdes for requested class */ public static SpecificAvroSerde getSerdesForValue() { return com.michelin.kstreamplify.serde.SerdesUtils.getValueSerdes(); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java index 1fb0794b..3c254f1d 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java @@ -29,6 +29,7 @@ */ @Deprecated(since = "1.1.0") public class WindowStateStoreUtils { + /** Private constructor. */ private WindowStateStoreUtils() {} /**