From 1be3015a9d5444f075d5fef4eaae72be95d00883 Mon Sep 17 00:00:00 2001 From: Simon Waight Date: Fri, 17 May 2024 11:06:57 +1000 Subject: [PATCH 1/2] Add support for Azure Cosmos DB as a sink --- .../json/generator/log/CosmosDBLogger.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java diff --git a/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java new file mode 100644 index 0000000..eaa1026 --- /dev/null +++ b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java @@ -0,0 +1,56 @@ +package net.acesinc.data.json.generator.log; + +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CosmosDBLogger implements EventLogger { + + private static final Logger log = LogManager.getLogger(CosmosDBLogger.class); + private CosmosClient cosmosClient; + private CosmosDatabase cosmosDatabase; + private CosmosContainer cosmosContainer; + private final ObjectMapper mapper = new ObjectMapper(); + + public CosmosDBLogger(Map props) { + String uri = (String) props.get("uri"); + String key = (String) props.get("key"); + String databaseName = (String) props.get("databaseName"); + String containerName = (String) props.get("containerName"); + + cosmosClient = new CosmosClientBuilder() + .endpoint(uri) + .key(key) + .buildClient(); + + cosmosDatabase = cosmosClient.getDatabase(databaseName); + cosmosContainer = cosmosDatabase.getContainer(containerName); + } + + @Override + public void logEvent(String event, Map producerConfig) { + try { + JsonNode jsonNode = mapper.readTree(event); + CosmosItemResponse item = cosmosContainer.createItem(jsonNode); + log.info("Document added to Cosmos DB with request charge of " + item.getRequestCharge() + " within session " + item.getSessionToken()); + } catch (Exception e) { + log.error("Error inserting JSON data into Cosmos DB", e); + } + } + + @Override + public void shutdown() { + if (cosmosClient != null) { + cosmosClient.close(); + log.info("Cosmos DB client closed successfully"); + } + } +} From 642f2cf8a3f2f751409f5860489aae8e3ef33ade Mon Sep 17 00:00:00 2001 From: Emad Alashi Date: Mon, 20 May 2024 00:50:26 +0000 Subject: [PATCH 2/2] Add Cosmos Db packages Add Cosmos Db Logger --- pom.xml | 24 ++++++++++++++++--- .../json/generator/JsonDataGenerator.java | 6 +++++ .../json/generator/log/CosmosDBLogger.java | 2 +- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index d587854..5bcff5c 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,11 @@ 1.7 + + com.azure + azure-core-http-netty + 1.14.2 + org.apache.logging.log4j log4j-api @@ -53,7 +58,7 @@ com.fasterxml.jackson.core jackson-databind - 2.9.10.8 + 2.13.5 org.apache.commons @@ -133,7 +138,20 @@ iot-device-client 1.3.30 - + + + com.azure + azure-sdk-bom + 1.2.23 + pom + import + + + azure-cosmos + com.azure + 4.59.0 + + com.amazonaws @@ -149,7 +167,7 @@ com.fasterxml.jackson.core jackson-core - 2.9.7 + 2.13.5 org.apache.pulsar diff --git a/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java b/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java index 2f35b8c..973535a 100644 --- a/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java +++ b/src/main/java/net/acesinc/data/json/generator/JsonDataGenerator.java @@ -109,6 +109,12 @@ public JsonDataGenerator(String simConfigString) { } break; } + // add a case for ComsoDB + case "cosmosdb": { + log.info("Adding CosmosDB Logger with properties: " + elProps); + loggers.add(new CosmosDBLogger(elProps)); + break; + } } } if (loggers.isEmpty()) { diff --git a/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java index eaa1026..067de80 100644 --- a/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java +++ b/src/main/java/net/acesinc/data/json/generator/log/CosmosDBLogger.java @@ -39,7 +39,7 @@ public CosmosDBLogger(Map props) { public void logEvent(String event, Map producerConfig) { try { JsonNode jsonNode = mapper.readTree(event); - CosmosItemResponse item = cosmosContainer.createItem(jsonNode); + CosmosItemResponse item = cosmosContainer.createItem(jsonNode); log.info("Document added to Cosmos DB with request charge of " + item.getRequestCharge() + " within session " + item.getSessionToken()); } catch (Exception e) { log.error("Error inserting JSON data into Cosmos DB", e);