Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Agent Instructions

## Repository Structure

- `kstreamplify-core`: core Kafka Streams functionality.
- `kstreamplify-core-test`: test utilities and base classes for testing topologies.
- `kstreamplify-spring-boot`: Spring Boot autoconfiguration and web services.

## Commands

- Build with `mvn clean package`.
- Run the tests with `mvn test`.
- Run `mvn spotless:apply` before committing to apply Palantir Java Format.

## Coding Standards

- Target Java 17.
- Code follows Palantir Java Format.
- Add minimal Javadoc to every method in production code (`src/main`), including `@Override` methods, with descriptions for parameters and return values. Start each description with an uppercase letter.

## Testing Standards

- Name test classes `<ClassName>Test`.
- Name unit tests with the "should..." convention (e.g. `shouldNotRegisterPropertiesWhenNull`).
- Use JUnit Jupiter assertions (`org.junit.jupiter.api.Assertions`).
- Use Mockito with `@ExtendWith(MockitoExtension.class)`.
- Use Testcontainers for Kafka integration tests.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected Map<String, String> getSpecificProperties() {
/**
* Close everything after each test.
*
* @throws IOException if an I/O error occurs while deleting the state directory
* @throws IOException If an I/O error occurs while deleting the state directory
*/
@AfterEach
protected void generalTearDown() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class AvroToJsonConverter {
.setPrettyPrinting()
.create();

/** Private constructor. */
private AvroToJsonConverter() {}

/**
Expand Down Expand Up @@ -82,20 +83,20 @@ public static String convertObject(List<Object> values) {
}

/**
* Convert the record from avro format to json format.
* Convert the record from avro format to JSON format.
*
* @param inputRecord the record in avro format
* @return the record in json format
* @param inputRecord The record in avro format
* @return The record in JSON format
*/
public static String convertRecord(GenericRecord inputRecord) {
return gson.toJson(recordAsMap(inputRecord));
}

/**
* Convert avro to a map for json format.
* Convert avro to a map for JSON format.
*
* @param inputRecord record in avro
* @return map for json format
* @param inputRecord Record in avro
* @return Map for JSON format
*/
private static Map<String, Object> recordAsMap(GenericRecord inputRecord) {
Map<String, Object> recordMapping = new HashMap<>();
Expand Down Expand Up @@ -145,12 +146,29 @@ private static Map<String, Object> recordAsMap(GenericRecord inputRecord) {
private static class LocalDateTypeAdapter implements JsonSerializer<LocalDate>, JsonDeserializer<LocalDate> {
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");

/**
* Serialize a {@link LocalDate} to its JSON representation.
*
* @param date The date to serialize
* @param typeOfSrc The type of the source object
* @param context The serialization context
* @return The serialized JSON element
*/
@Override
public JsonElement serialize(
final LocalDate date, final Type typeOfSrc, final JsonSerializationContext context) {
return new JsonPrimitive(date.format(formatter));
}

/**
* Deserialize a {@link LocalDate} from its JSON representation.
*
* @param json The JSON element to deserialize
* @param typeOfT The type of the target object
* @param context The deserialization context
* @return The deserialized date
* @throws JsonParseException If the JSON element cannot be parsed
*/
@Override
public LocalDate deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
Expand All @@ -165,6 +183,14 @@ private static class LocalDateTimeTypeAdapter
private static final DateTimeFormatter formatterNano =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");

/**
* Serialize a {@link LocalDateTime} to its JSON representation.
*
* @param localDateTime The date-time to serialize
* @param srcType The type of the source object
* @param context The serialization context
* @return The serialized JSON element
*/
@Override
public JsonElement serialize(LocalDateTime localDateTime, Type srcType, JsonSerializationContext context) {
if (localDateTime.toString().length() == 29) {
Expand All @@ -173,6 +199,15 @@ public JsonElement serialize(LocalDateTime localDateTime, Type srcType, JsonSeri
return new JsonPrimitive(formatter.format(localDateTime));
}

/**
* Deserialize a {@link LocalDateTime} from its JSON representation.
*
* @param json The JSON element to deserialize
* @param typeOfT The type of the target object
* @param context The deserialization context
* @return The deserialized date-time
* @throws JsonParseException If the JSON element cannot be parsed
*/
@Override
public LocalDateTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
Expand All @@ -184,6 +219,14 @@ private static class LocalTimeTypeAdapter implements JsonSerializer<LocalTime>,
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static final DateTimeFormatter formatterNano = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");

/**
* Serialize a {@link LocalTime} to its JSON representation.
*
* @param localTime The time to serialize
* @param srcType The type of the source object
* @param context The serialization context
* @return The serialized JSON element
*/
@Override
public JsonElement serialize(LocalTime localTime, Type srcType, JsonSerializationContext context) {
if (localTime.toString().length() == 15) {
Expand All @@ -192,6 +235,15 @@ public JsonElement serialize(LocalTime localTime, Type srcType, JsonSerializatio
return new JsonPrimitive(formatter.format(localTime));
}

/**
* Deserialize a {@link LocalTime} from its JSON representation.
*
* @param json The JSON element to deserialize
* @param typeOfT The type of the target object
* @param context The deserialization context
* @return The deserialized time
* @throws JsonParseException If the JSON element cannot be parsed
*/
@Override
public LocalTime deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ public class JsonToAvroConverter {
.setPrettyPrinting()
.create();

/** Private constructor. */
private JsonToAvroConverter() {}

/**
* Convert a json string to an object.
* Convert a JSON string to an object.
*
* @param json the json string
* @return the object
* @param json The JSON string
* @return The object
*/
public static Object jsonToObject(String json) {
if (json == null) {
Expand All @@ -69,22 +70,22 @@ public static Object jsonToObject(String json) {
}

/**
* Convert a file in json to avro.
* Convert a file in JSON to avro.
*
* @param file the file in json
* @param schema the avro schema to use
* @return the record in avro
* @param file The file in json
* @param schema The avro schema to use
* @return The record in avro
*/
public static SpecificRecordBase jsonToAvro(String file, Schema schema) {
return jsonToAvro(JsonParser.parseString(file).getAsJsonObject(), schema);
}

/**
* Convert json to avro.
* Convert JSON to Avro.
*
* @param jsonEvent the json record
* @param schema the avro schema to use
* @return the record in avro
* @param jsonEvent The JSON record
* @param schema The avro schema to use
* @return The record in avro
*/
public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) {
try {
Expand All @@ -101,8 +102,8 @@ public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema)
/**
* Populate avro records from json.
*
* @param jsonObject json data to provide to the avro record
* @param message the avro record to populate
* @param jsonObject Json data to provide to the avro record
* @param message The avro record to populate
*/
private static void populateGenericRecordFromJson(JsonObject jsonObject, SpecificRecordBase message) {
// Iterate over object attributes
Expand Down Expand Up @@ -211,9 +212,9 @@ private static void populateGenericRecordFromJson(JsonObject jsonObject, Specifi
/**
* Populate field with corresponding type.
*
* @param jsonElement the json element to convert
* @param type the type of the element
* @return the element converted with the corresponding type
* @param jsonElement The JSON element to convert
* @param type The type of the element
* @return The element converted with the corresponding type
*/
private static Object populateFieldWithCorrespondingType(JsonElement jsonElement, Schema.Type type) {
return switch (type) {
Expand All @@ -229,9 +230,9 @@ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement
/**
* Populate field in record with corresponding type.
*
* @param jsonObject data to provide to the avro record
* @param fieldName the name to populate
* @param result the avro record populated
* @param jsonObject Data to provide to the avro record
* @param fieldName The name to populate
* @param result The avro record populated
*/
@SuppressWarnings("unchecked")
private static void populateFieldInRecordWithCorrespondingType(
Expand Down Expand Up @@ -379,9 +380,9 @@ private static void populateFieldInRecordWithCorrespondingType(
/**
* Get base class.
*
* @param baseNamespace the namespace of the class
* @param typeName the class type
* @return the base class
* @param baseNamespace The namespace of the class
* @param typeName The class type
* @return The base class
*/
@SuppressWarnings("unchecked")
private static Class<SpecificRecordBase> baseClass(String baseNamespace, String typeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public DedupHeadersProcessor(
/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @param headers Headers of the record
* @param key The key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Expand All @@ -79,7 +79,7 @@ private static String getHeader(Headers headers, String key) {
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, V> context) {
Expand All @@ -90,7 +90,7 @@ public void init(ProcessorContext<String, V> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public DedupHeadersProcessorWithErrors(
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
Expand All @@ -76,7 +76,7 @@ public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down Expand Up @@ -121,8 +121,8 @@ private String buildIdentifier(Headers headers) {
/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @param headers Headers of the record
* @param key The key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuratio
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, V> context) {
Expand All @@ -64,7 +64,7 @@ public void init(ProcessorContext<String, V> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public DedupKeyProcessorWithErrors(String windowStoreName, Duration retentionWin
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
Expand All @@ -66,7 +66,7 @@ public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHo
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, V> context) {
Expand All @@ -64,7 +64,7 @@ public void init(ProcessorContext<String, V> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public DedupKeyValueProcessorWithErrors(String windowStoreName, Duration retenti
/**
* Initialize the processor.
*
* @param context the processor context
* @param context The processor context
*/
@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
Expand All @@ -66,7 +66,7 @@ public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
/**
* Process a record.
*
* @param message the record to process
* @param message The record to process
*/
@Override
public void process(Record<String, V> message) {
Expand Down
Loading