From 71a03e1740f72878f67aa159face7e3fcc26b173 Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Tue, 4 Nov 2025 21:37:08 -0700 Subject: [PATCH 01/11] cimms metrics topic --- jikkou/kafka-connectors-values.yaml | 6 ++++++ jikkou/kafka-topics-values.yaml | 1 + mongo/create_indexes.js | 4 +++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 04b3357..85c8b9e 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -394,6 +394,12 @@ apps: collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation generateTimestamp: true timestampField: notificationGeneratedAt + + # CIMMS Metrics Events + - topicName: topic.CmPriorityRequestMetrics + collectionName: CmPriorityRequestMetrics + generateTimestamp: true + timestampField: metricGeneratedAt mecdeposit: name: mecdeposit diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 55e78a0..3b8065e 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -131,6 +131,7 @@ apps: - topic.CmSpatMessageCountProgressionEventAggregation - topic.CmRevocableEnabledLaneAlignmentEventAggregation - topic.CmPriorityPreemptionRequestEvent + - topic.CmPriorityRequestMetrics tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index c0a945e..566211a 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -107,7 +107,6 @@ const conflictMonitorCollections = [ { name: "CmSpatTransitionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmPriorityPreemptionRequestEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - { name: "CmSpatMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -163,6 +162,9 @@ const conflictMonitorCollections = [ // Reports { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, + // CIMMS Metrics + { name: "CmPriorityRequestMetrics", ttlField: "metricGeneratedAt", timeField: "metricGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + ]; let intersectionAPICollections = [ From e2dd41d8810dfc81f2623abca3a1503fd5a119b0 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Thu, 6 Nov 2025 16:06:52 -0700 Subject: [PATCH 02/11] Adding required topic for Vehicle Misbehavior Events --- jikkou/kafka-connectors-values.yaml | 4 ++++ jikkou/kafka-topics-values.yaml | 1 + mongo/create_indexes.js | 1 + 3 files changed, 6 insertions(+) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 04b3357..0a82db1 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -281,6 +281,10 @@ apps: collectionName: CmPriorityPreemptionRequestEvent useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmVehicleMisbehaviorEvents + collectionName: CmVehicleMisbehaviorEvents + useTimestamp: true + timestampField: eventGeneratedAt # Record BSM events: diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 55e78a0..cf4ca3d 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -131,6 +131,7 @@ apps: - topic.CmSpatMessageCountProgressionEventAggregation - topic.CmRevocableEnabledLaneAlignmentEventAggregation - topic.CmPriorityPreemptionRequestEvent + - topic.CmVehicleMisbehaviorEvents tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index c0a945e..e2f767c 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -107,6 +107,7 @@ const conflictMonitorCollections = [ { name: "CmSpatTransitionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmPriorityPreemptionRequestEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmVehicleMisbehaviorEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, From 51fc7a4d4496c342ae94deca09405f17d77ee456 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Tue, 18 Nov 2025 01:52:56 -0700 Subject: [PATCH 03/11] Add the rsu status monitor topics --- docker-compose-kafka.yml | 1 + jikkou/kafka-topics-template.jinja | 4 ++++ jikkou/kafka-topics-values.yaml | 7 +++++++ sample.env | 1 + 4 files changed, 13 insertions(+) diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 4427092..07daa38 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -71,6 +71,7 @@ services: KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false} KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false} KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true} + KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR: ${KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR:-false} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1} diff --git a/jikkou/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja index dd411f4..bb1b736 100644 --- a/jikkou/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -84,6 +84,10 @@ spec: {{ create_topics(values.apps.mecdeposit) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR %} +{{ create_topics(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_OTHER %} {{ create_topics(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index cf4ca3d..f7bf142 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -212,6 +212,13 @@ apps: - topic.MecDepositMetrics tableTopics: {} customTopics: {} + rsustatusmonitor: + name: rsu-status-monitor + streamTopics: + - topic.RmNearestNeighborUnresponsiveEvent + tableTopics: + - topic.RmIntersectionStatusRecords + customTopics: {} other: name: other-topics streamTopics: {} diff --git a/sample.env b/sample.env index 43f79c7..eaafb11 100644 --- a/sample.env +++ b/sample.env @@ -58,6 +58,7 @@ KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor KAFKA_TOPIC_CREATE_INTERSECTION_API=true # Create topics for Intersection API KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit +KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR=false # Create topics for RsuStatusMonitor KAFKA_TOPIC_CREATE_OTHER=false # Create topics for other applications # Relative path to the Kafka topics values file, upper level directories are supported From fe93e4a4691c59fa351e533dd27eddc4ab369954 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Tue, 18 Nov 2025 03:11:28 -0700 Subject: [PATCH 04/11] Add RmNearestNeighborUnresponsiveNotification to the topic values --- jikkou/kafka-topics-values.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index f7bf142..b69889c 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -216,6 +216,7 @@ apps: name: rsu-status-monitor streamTopics: - topic.RmNearestNeighborUnresponsiveEvent + - topic.RmNearestNeighborUnresponsiveNotification tableTopics: - topic.RmIntersectionStatusRecords customTopics: {} From 3d12fc7514b8ac33e8c79155aa527fcde43f37d2 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Fri, 21 Nov 2025 04:40:40 -0700 Subject: [PATCH 05/11] Add monitoring status topic --- jikkou/kafka-topics-values.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index b69889c..68f48fa 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -219,6 +219,7 @@ apps: - topic.RmNearestNeighborUnresponsiveNotification tableTopics: - topic.RmIntersectionStatusRecords + - topic.RmMonitoringStatusRecords customTopics: {} other: name: other-topics From 1742d077316e0d5ef18742394c251e4bde161ff8 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Mon, 24 Nov 2025 09:57:20 -0700 Subject: [PATCH 06/11] Add RSU Status Monitor MongoDB collections, indexes and connectors --- jikkou/kafka-connectors-template.jinja | 4 ++++ jikkou/kafka-connectors-values.yaml | 12 ++++++++++++ mongo/create_indexes.js | 9 +++++++++ sample.env | 1 + 4 files changed, 26 insertions(+) diff --git a/jikkou/kafka-connectors-template.jinja b/jikkou/kafka-connectors-template.jinja index cfb4d08..8c5ea4c 100644 --- a/jikkou/kafka-connectors-template.jinja +++ b/jikkou/kafka-connectors-template.jinja @@ -75,6 +75,10 @@ spec: {{ create_connector(values.apps.mecdeposit) }} {% endif %} +{% if system.env.CONNECT_CREATE_RSUSTATUSMONITOR %} +{{ create_connector(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.CONNECT_CREATE_OTHER %} {{ create_connector(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 0a82db1..c6b3271 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -405,6 +405,18 @@ apps: - topicName: topic.MecDepositMetrics collectionName: MecDepositMetrics generateTimestamp: true + + rsustatusmonitor: + name: rsustatusmonitor + connectors: + - topicName: topic.RmIntersectionStatusRecords + collectionName: RmIntersectionStatusRecords + useTimestamp: true + timestampField: timestamp + - topicName: topic.RmMonitoringStatusRecords + collectionName: RmMonitoringStatusRecords + useTimestamp: true + timestampField: timestamp # Allow for custom connectors to be added - users can override this file and add other kafka connectors here other: name: other diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index e2f767c..4d7aed3 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -39,6 +39,7 @@ const CONNECT_CREATE_GEOJSONCONVERTER = process.env['CONNECT_CREATE_GEOJSONCONVE const CONNECT_CREATE_CONFLICTMONITOR = process.env['CONNECT_CREATE_CONFLICTMONITOR'] || true; const CONNECT_CREATE_DEDUPLICATOR = process.env['CONNECT_CREATE_DEDUPLICATOR'] || true; const CONNECT_INDEX_CREATE_INTERSECTION_API = process.env['CONNECT_CREATE_INTERSECTION_API'] || true; +const CONNECT_INDEX_CREATE_RSUSTATUSMONITOR = process.env['CONNECT_CREATE_RSUSTATUSMONITOR'] || true; const users = [ @@ -170,6 +171,11 @@ let intersectionAPICollections = [ { name: "IntersectionApiRsuStatus", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, ]; +let rsuStatusMonitorCollections = [ + { name: "RmIntersectionStatusRecords", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"listenerIp"}, + { name: "RmMonitoringStatusRecords", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, +]; + let collections = []; if(CONNECT_CREATE_ODE){ @@ -188,6 +194,9 @@ if(CONNECT_INDEX_CREATE_INTERSECTION_API){ collections = collections.concat(intersectionAPICollections); } +if(CONNECT_INDEX_CREATE_RSUSTATUSMONITOR){ + collections = collections.concat(rsuStatusMonitorCollections); +} try{ diff --git a/sample.env b/sample.env index eaafb11..e6b0074 100644 --- a/sample.env +++ b/sample.env @@ -140,6 +140,7 @@ CONNECT_CREATE_CONFLICTMONITOR=true # Create kafka connectors to MongoDB for CONNECT_CREATE_INTERSECTION_API=true # Create kafka Connectors to MongoDB for Intersection API CONNECT_CREATE_DEDUPLICATOR=false # Create kafka connectors to MongoDB for Deduplicator CONNECT_CREATE_MECDEPOSIT=false # Create kafka connectors to MongoDB for MecDeposit +CONNECT_CREATE_RSUSTATUSMONITOR=false # Create kafka connectors to MongoDB for RsuStatusMonitor CONNECT_CREATE_OTHER=false # Create kafka connectors to MongoDB for other applications # Relative path to the Kafka connect connectors values file, upper level directories are supported From 71625a75254edba7f2af70a7267d7fda70cc4b01 Mon Sep 17 00:00:00 2001 From: Jacob Frye <51723791+jacob6838@users.noreply.github.com> Date: Tue, 23 Dec 2025 04:37:12 +1300 Subject: [PATCH 07/11] Adding chmox +x on restore_mongo.sh (#47) Adding chmox +x on restore_mongo.sh --- mongo/restore_mongo.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 mongo/restore_mongo.sh diff --git a/mongo/restore_mongo.sh b/mongo/restore_mongo.sh old mode 100644 new mode 100755 From 65ec181f53437b588674e1f3d604614531c5acb5 Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Fri, 13 Mar 2026 15:14:10 -0600 Subject: [PATCH 08/11] Rtcm topics for cimms (#5) * rtcm event topics * rtcm aggregation topics --- jikkou/kafka-topics-values.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 3207a8b..7d500cb 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -117,10 +117,12 @@ apps: - topic.CmBsmMessageCountProgressionEvents - topic.CmMapMessageCountProgressionEvents - topic.CmSpatMessageCountProgressionEvents + - topic.CmRtcmMessageCountProgressionEventAggregation - topic.CmEventStateProgressionEvent - topic.CmRevocableEnabledLaneAlignmentEvent - topic.CmSpatMinimumDataEventAggregation - topic.CmMapMinimumDataEventAggregation + - topic.CmRtcmMinimumDataEventAggregation - topic.CmIntersectionReferenceAlignmentEventAggregation - topic.CmSignalGroupAlignmentEventAggregation - topic.CmSignalStateConflictEventAggregation @@ -133,6 +135,9 @@ apps: - topic.CmPriorityPreemptionRequestEvent - topic.CmVehicleMisbehaviorEvents - topic.CmPriorityRequestMetrics + - topic.CmRtcmMinimumDataEvents + - topic.CmRtcmBroadcastRateEvents + - topic.CmRtcmMessageCountProgressionEvents tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification From 2581695cd2bdbc5bbe06ba6e5394bea99f41696f Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Thu, 9 Apr 2026 13:58:55 -0600 Subject: [PATCH 09/11] Rtcm topics for cimms (#6) * rtcm event topics * rtcm aggregation topics * processed rtcm connectors --- .gitignore | 4 +++- jikkou/kafka-connectors-values.yaml | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8d161e5..c9042c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ **/.env -**/target \ No newline at end of file +**/target + +.vscode/ \ No newline at end of file diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 1c47b1b..8f2d2a1 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -109,6 +109,9 @@ apps: - topicName: topic.ProcessedSsm collectionName: ProcessedSsm generateTimestamp: true + - topicName: topic.ProcessedRtcm + collectionName: ProcessedRtcm + generateTimestamp: true intersection_api: name: intersection_api connectors: From e308679216f15c61c7e384d667befd70a26ec96a Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Mon, 13 Apr 2026 13:36:46 -0600 Subject: [PATCH 10/11] fix infinite loop in connect setup (#4) * break out of loop at max retries * exit on exceeding max retries --- jikkou/kafka_connector_init.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/jikkou/kafka_connector_init.sh b/jikkou/kafka_connector_init.sh index c9e13eb..ece319c 100755 --- a/jikkou/kafka_connector_init.sh +++ b/jikkou/kafka_connector_init.sh @@ -14,6 +14,10 @@ until ./jikkou health get kafkaconnect | yq -e '.status.name == "UP"' > /dev/nul echo "Waiting 10 sec for Kafka Connect to be ready (Attempt: $((RETRY_COUNT+1))/$MAX_RETRIES)" RETRY_COUNT=$((RETRY_COUNT+1)) sleep 10 + if [ "$RETRY_COUNT" -ge "$MAX_RETRIES" ]; then + echo "Quitting because retry count is >= max retries" + exit 1 + fi done ./jikkou validate \ From 6ac677b0560e37ce53801fecf009fd812812d9f2 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Tue, 14 Apr 2026 10:47:13 -0600 Subject: [PATCH 11/11] Add release notes --- docs/Release_notes.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/Release_notes.md b/docs/Release_notes.md index f46c393..61fa718 100644 --- a/docs/Release_notes.md +++ b/docs/Release_notes.md @@ -1,5 +1,15 @@ # JPO-UTILS Release Notes +## Version 2.3.0 +---------------------------------------- +### **Summary** +In this release, support for the JPO ODE pipeline was expanded for RTCM messages to create new Kafka topics and MongoDB connector and collections. A modification was made for the MongoDB script to fix a bug with running the script through Docker. + +Enhancements in this release: +- [Neaera PR 6](https://github.com/neaeraconsulting/jpo-utils/pull/6): RTCM topics for cimms #2 +- [Neaera PR 5](https://github.com/neaeraconsulting/jpo-utils/pull/5): RTCM topics for cimms #1 +- [CDOT PR 47](https://github.com/CDOT-CV/jpo-utils/pull/47): Setting restore_mongo.sh Execute Bit + ## Version 2.2.0 ---------------------------------------- ### **Summary**