diff --git a/.gitignore b/.gitignore index 8d161e5..c9042c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ **/.env -**/target \ No newline at end of file +**/target + +.vscode/ \ No newline at end of file diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 4427092..07daa38 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -71,6 +71,7 @@ services: KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false} KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false} KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true} + KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR: ${KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR:-false} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1} diff --git a/docs/Release_notes.md b/docs/Release_notes.md index f46c393..61fa718 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,5 +1,15 @@ # JPO-UTILS Release Notes +## Version 2.3.0 +---------------------------------------- +### **Summary** +In this release, support for the JPO ODE pipeline was expanded for RTCM messages to create new Kafka topics and MongoDB connector and collections. A modification was made for the MongoDB script to fix a bug with running the script through Docker. + +Enhancements in this release: +- [Neaera PR 6](https://github.com/neaeraconsulting/jpo-utils/pull/6): RTCM topics for cimms #2 +- [Neaera PR 5](https://github.com/neaeraconsulting/jpo-utils/pull/5): RTCM topics for cimms #1 +- [CDOT PR 47](https://github.com/CDOT-CV/jpo-utils/pull/47): Setting restore_mongo.sh Execute Bit + ## Version 2.2.0 ---------------------------------------- ### **Summary** diff --git a/jikkou/kafka-connectors-template.jinja b/jikkou/kafka-connectors-template.jinja index cfb4d08..8c5ea4c 100644 --- a/jikkou/kafka-connectors-template.jinja +++ b/jikkou/kafka-connectors-template.jinja @@ -75,6 +75,10 @@ spec: {{ create_connector(values.apps.mecdeposit) }} {% endif %} +{% if system.env.CONNECT_CREATE_RSUSTATUSMONITOR %} +{{ create_connector(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.CONNECT_CREATE_OTHER %} {{ create_connector(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 04b3357..8f2d2a1 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -109,6 +109,9 @@ apps: - topicName: topic.ProcessedSsm collectionName: ProcessedSsm generateTimestamp: true + - topicName: topic.ProcessedRtcm + collectionName: ProcessedRtcm + generateTimestamp: true intersection_api: name: intersection_api connectors: @@ -281,6 +284,10 @@ apps: collectionName: CmPriorityPreemptionRequestEvent useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmVehicleMisbehaviorEvents + collectionName: CmVehicleMisbehaviorEvents + useTimestamp: true + timestampField: eventGeneratedAt # Record BSM events: @@ -394,6 +401,12 @@ apps: collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation generateTimestamp: true timestampField: notificationGeneratedAt + + # CIMMS Metrics Events + - topicName: topic.CmPriorityRequestMetrics + collectionName: CmPriorityRequestMetrics + generateTimestamp: true + timestampField: metricGeneratedAt mecdeposit: name: mecdeposit @@ -401,6 +414,18 @@ apps: - topicName: topic.MecDepositMetrics collectionName: MecDepositMetrics generateTimestamp: true + + rsustatusmonitor: + name: rsustatusmonitor + connectors: + - topicName: topic.RmIntersectionStatusRecords + collectionName: RmIntersectionStatusRecords + useTimestamp: true + timestampField: timestamp + - topicName: topic.RmMonitoringStatusRecords + collectionName: RmMonitoringStatusRecords + useTimestamp: true + timestampField: timestamp # Allow for custom connectors to be added - users can override this file and add other kafka connectors here other: name: other diff --git a/jikkou/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja index dd411f4..bb1b736 100644 --- a/jikkou/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -84,6 +84,10 @@ spec: {{ create_topics(values.apps.mecdeposit) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR %} +{{ create_topics(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_OTHER %} {{ create_topics(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 55e78a0..7d500cb 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -117,10 +117,12 @@ apps: - topic.CmBsmMessageCountProgressionEvents - topic.CmMapMessageCountProgressionEvents - topic.CmSpatMessageCountProgressionEvents + - topic.CmRtcmMessageCountProgressionEventAggregation - topic.CmEventStateProgressionEvent - topic.CmRevocableEnabledLaneAlignmentEvent - topic.CmSpatMinimumDataEventAggregation - topic.CmMapMinimumDataEventAggregation + - topic.CmRtcmMinimumDataEventAggregation - topic.CmIntersectionReferenceAlignmentEventAggregation - topic.CmSignalGroupAlignmentEventAggregation - topic.CmSignalStateConflictEventAggregation @@ -131,6 +133,11 @@ apps: - topic.CmSpatMessageCountProgressionEventAggregation - topic.CmRevocableEnabledLaneAlignmentEventAggregation - topic.CmPriorityPreemptionRequestEvent + - topic.CmVehicleMisbehaviorEvents + - topic.CmPriorityRequestMetrics + - topic.CmRtcmMinimumDataEvents + - topic.CmRtcmBroadcastRateEvents + - topic.CmRtcmMessageCountProgressionEvents tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification @@ -211,6 +218,15 @@ apps: - topic.MecDepositMetrics tableTopics: {} customTopics: {} + rsustatusmonitor: + name: rsu-status-monitor + streamTopics: + - topic.RmNearestNeighborUnresponsiveEvent + - topic.RmNearestNeighborUnresponsiveNotification + tableTopics: + - topic.RmIntersectionStatusRecords + - topic.RmMonitoringStatusRecords + customTopics: {} other: name: other-topics streamTopics: {} diff --git a/jikkou/kafka_connector_init.sh b/jikkou/kafka_connector_init.sh index c9e13eb..ece319c 100755 --- a/jikkou/kafka_connector_init.sh +++ b/jikkou/kafka_connector_init.sh @@ -14,6 +14,10 @@ until ./jikkou health get kafkaconnect | yq -e '.status.name == "UP"' > /dev/nul echo "Waiting 10 sec for Kafka Connect to be ready (Attempt: $((RETRY_COUNT+1))/$MAX_RETRIES)" RETRY_COUNT=$((RETRY_COUNT+1)) sleep 10 + if [ "$RETRY_COUNT" -ge "$MAX_RETRIES" ]; then + echo "Quitting because retry count is >= max retries" + exit 1 + fi done ./jikkou validate \ diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index c0a945e..02b1fbc 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -39,6 +39,7 @@ const CONNECT_CREATE_GEOJSONCONVERTER = process.env['CONNECT_CREATE_GEOJSONCONVE const CONNECT_CREATE_CONFLICTMONITOR = process.env['CONNECT_CREATE_CONFLICTMONITOR'] || true; const CONNECT_CREATE_DEDUPLICATOR = process.env['CONNECT_CREATE_DEDUPLICATOR'] || true; const CONNECT_INDEX_CREATE_INTERSECTION_API = process.env['CONNECT_CREATE_INTERSECTION_API'] || true; +const CONNECT_INDEX_CREATE_RSUSTATUSMONITOR = process.env['CONNECT_CREATE_RSUSTATUSMONITOR'] || true; const users = [ @@ -107,6 +108,7 @@ const conflictMonitorCollections = [ { name: "CmSpatTransitionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmPriorityPreemptionRequestEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmVehicleMisbehaviorEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -163,12 +165,20 @@ const conflictMonitorCollections = [ // Reports { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, + // CIMMS Metrics + { name: "CmPriorityRequestMetrics", ttlField: "metricGeneratedAt", timeField: "metricGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + ]; let intersectionAPICollections = [ { name: "IntersectionApiRsuStatus", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, ]; +let rsuStatusMonitorCollections = [ + { name: "RmIntersectionStatusRecords", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"listenerIp"}, + { name: "RmMonitoringStatusRecords", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, +]; + let collections = []; if(CONNECT_CREATE_ODE){ @@ -187,6 +197,9 @@ if(CONNECT_INDEX_CREATE_INTERSECTION_API){ collections = collections.concat(intersectionAPICollections); } +if(CONNECT_INDEX_CREATE_RSUSTATUSMONITOR){ + collections = collections.concat(rsuStatusMonitorCollections); +} try{ diff --git a/mongo/restore_mongo.sh b/mongo/restore_mongo.sh old mode 100644 new mode 100755 diff --git a/sample.env b/sample.env index 43f79c7..e6b0074 100644 --- a/sample.env +++ b/sample.env @@ -58,6 +58,7 @@ KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor KAFKA_TOPIC_CREATE_INTERSECTION_API=true # Create topics for Intersection API KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit +KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR=false # Create topics for RsuStatusMonitor KAFKA_TOPIC_CREATE_OTHER=false # Create topics for other applications # Relative path to the Kafka topics values file, upper level directories are supported @@ -139,6 +140,7 @@ CONNECT_CREATE_CONFLICTMONITOR=true # Create kafka connectors to MongoDB for CONNECT_CREATE_INTERSECTION_API=true # Create kafka Connectors to MongoDB for Intersection API CONNECT_CREATE_DEDUPLICATOR=false # Create kafka connectors to MongoDB for Deduplicator CONNECT_CREATE_MECDEPOSIT=false # Create kafka connectors to MongoDB for MecDeposit +CONNECT_CREATE_RSUSTATUSMONITOR=false # Create kafka connectors to MongoDB for RsuStatusMonitor CONNECT_CREATE_OTHER=false # Create kafka connectors to MongoDB for other applications # Relative path to the Kafka connect connectors values file, upper level directories are supported