feat: SOVD entity model alignment and multi-instance peer aggregation#327
feat: SOVD entity model alignment and multi-instance peer aggregation#327
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates ros2_medkit_gateway to align runtime discovery with the SOVD entity model (namespaces → Functions, host-derived default Component, synthetic Areas deprecated) and introduces multi-instance peer aggregation with optional mDNS discovery, entity merging, and transparent request forwarding.
Changes:
- Runtime discovery model shift: Functions from namespaces, single host-derived default Component, Areas empty by default in
runtime_only. - Peer aggregation: peer clients + entity merger + routing/forwarding via
HandlerContext::validate_entity_for_route(), plus/faultsfan-out and/healthpeer status. - Broad test + documentation updates (new unit/integration tests, new aggregation docs/tutorials, updated config defaults).
Reviewed changes
Copilot reviewed 68 out of 68 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ros2_medkit_integration_tests/test/scenarios/test_scenario_thermal_protection.test.py | Updates component selection logic for host-derived default component. |
| src/ros2_medkit_integration_tests/test/scenarios/test_scenario_fault_lifecycle.test.py | Switches discovery waiting from Areas to Functions; uses dynamic host component ID. |
| src/ros2_medkit_integration_tests/test/scenarios/test_scenario_action_lifecycle.test.py | Dynamically discovers component ID instead of hardcoding powertrain. |
| src/ros2_medkit_integration_tests/test/features/test_triggers_faults.test.py | Uses host-derived default component for component-level triggers. |
| src/ros2_medkit_integration_tests/test/features/test_peer_aggregation.test.py | New two-gateway integration test for merging + forwarding + peer health. |
| src/ros2_medkit_integration_tests/test/features/test_operations_api.test.py | Uses cached host component ID for component operations endpoints. |
| src/ros2_medkit_integration_tests/test/features/test_logging_api.test.py | Migrates area-based log tests to function-based log tests. |
| src/ros2_medkit_integration_tests/test/features/test_hateoas.test.py | Updates Area test IDs to manifest-defined powertrain instead of root. |
| src/ros2_medkit_integration_tests/test/features/test_entity_model_runtime.test.py | New runtime-only entity model integration test (Functions + single Component + empty Areas). |
| src/ros2_medkit_integration_tests/test/features/test_entity_listing.test.py | Updates listing expectations for empty Areas + single Component + Functions. |
| src/ros2_medkit_integration_tests/test/features/test_discovery_legacy_mode.test.py | Disables default component for legacy-mode coverage. |
| src/ros2_medkit_integration_tests/test/features/test_discovery_heuristic.test.py | Updates heuristic discovery tests for Functions and default empty Areas. |
| src/ros2_medkit_integration_tests/test/features/test_data_write.test.py | Uses cached host component ID for component data write endpoints. |
| src/ros2_medkit_integration_tests/test/features/test_data_read.test.py | Uses cached host component ID; migrates area data tests to function data tests. |
| src/ros2_medkit_integration_tests/test/features/test_bulk_data_api.test.py | Switches component bulk-data tests to host-derived default component ID. |
| src/ros2_medkit_integration_tests/ros2_medkit_test_utils/gateway_test_case.py | Adds REQUIRED_FUNCTIONS and waits for /functions in discovery polling. |
| src/ros2_medkit_gateway/test/test_stream_proxy.cpp | New unit tests for SSE stream proxy parsing + mock server integration. |
| src/ros2_medkit_gateway/test/test_runtime_discovery.cpp | New runtime discovery unit tests for Functions + deprecated synthetic Areas behavior. |
| src/ros2_medkit_gateway/test/test_mdns_discovery.cpp | New unit tests for mDNS discovery lifecycle/config behavior. |
| src/ros2_medkit_gateway/test/test_host_info_provider.cpp | New unit tests for HostInfoProvider + entity-id sanitization. |
| src/ros2_medkit_gateway/test/test_discovery_manager.cpp | Updates discovery manager tests for new defaults and synthetic areas off-by-default. |
| src/ros2_medkit_gateway/src/models/aggregation_service.cpp | Adds helpers to compute aggregation sources and standardized x-medkit aggregation metadata. |
| src/ros2_medkit_gateway/src/http/rest_server.cpp | Wires AggregationManager into HandlerContext. |
| src/ros2_medkit_gateway/src/http/handlers/log_handlers.cpp | Adds Function/Area log aggregation (x-medkit extension metadata). |
| src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp | Adds peer status to /health when aggregation enabled. |
| src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp | Adds routing lookup + transparent forwarding for remote entities. |
| src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp | Adds /faults fan-out merge and Area aggregation behavior. |
| src/ros2_medkit_gateway/src/http/handlers/discovery_handlers.cpp | Reads Apps from cache first to reflect host-component linking override. |
| src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp | Renames aggregation metadata to aggregated + aggregation_sources. |
| src/ros2_medkit_gateway/src/gateway_node.cpp | Declares/reads new discovery + aggregation params; merges peer entities into cache; links Apps to host Component. |
| src/ros2_medkit_gateway/src/discovery/runtime_discovery.cpp | Implements runtime namespace → Function discovery; deprecates synthetic Areas. |
| src/ros2_medkit_gateway/src/discovery/host_info_provider.cpp | New HostInfoProvider implementation (hostname/OS/arch → default Component). |
| src/ros2_medkit_gateway/src/discovery/discovery_manager.cpp | Creates host info provider; returns default component in runtime_only; exposes hosts for runtime Functions. |
| src/ros2_medkit_gateway/src/aggregation/sse_stream_proxy.cpp | SSE implementation for StreamProxy with buffer limits and parsing. |
| src/ros2_medkit_gateway/src/aggregation/entity_merger.cpp | Implements type-aware entity merge + routing table creation. |
| src/ros2_medkit_gateway/src/aggregation/aggregation_manager.cpp | Adds peer management, health checks, entity fetch/merge, forwarding, and fan-out. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/models/aggregation_service.hpp | Declares new aggregation helpers for sources + x-medkit metadata. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/rest_server.hpp | Adds set_aggregation_manager() API. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/handler_context.hpp | Adds aggregation manager pointer + EntityInfo remote metadata fields. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp | Stores aggregation infrastructure and exposes AggregationManager accessor. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/runtime_discovery.hpp | Updates runtime config defaults and documents Functions-from-namespaces behavior. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/models/function.hpp | Updates docs: Functions can be runtime-defined (namespace grouping). |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/models/component.hpp | Adds optional host metadata in x-medkit. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/host_info_provider.hpp | New HostInfoProvider public interface. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_manager.hpp | Adds new runtime options (default component, functions-from-namespaces) + accessors. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/stream_proxy.hpp | New StreamProxy interface + SSEStreamProxy declaration. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/peer_client.hpp | New PeerClient interface for forwarding + entity fetching. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/mdns_discovery.hpp | New mDNS discovery interface. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/entity_merger.hpp | New entity merge interface (IDs, collisions, routing). |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/aggregation_manager.hpp | New aggregation coordinator interface. |
| src/ros2_medkit_gateway/design/index.rst | Adds aggregation design doc to design toctree. |
| src/ros2_medkit_gateway/config/gateway_params.yaml | Updates discovery defaults and adds aggregation config block. |
| src/ros2_medkit_gateway/README.md | Updates README for new entity model and aggregation docs (but needs correction re: Areas). |
| src/ros2_medkit_gateway/CMakeLists.txt | Adds aggregation + host-info sources and new test targets; excludes third_party from formatting. |
| docs/tutorials/multi-instance.rst | New tutorial for multi-instance aggregation. |
| docs/tutorials/index.rst | Adds multi-instance tutorial to tutorials index. |
| docs/config/index.rst | Adds aggregation config reference to config index. |
| docs/config/discovery-options.rst | Updates discovery documentation for new defaults and Function mapping. |
| docs/config/aggregation.rst | New aggregation configuration reference. |
| .pre-commit-config.yaml | Excludes third_party/ from formatting/copyright hooks. |
| App parse_app(const nlohmann::json & j) { | ||
| App app; | ||
| app.id = j.value("id", ""); | ||
| app.name = j.value("name", ""); | ||
| if (j.contains("x-medkit") && j["x-medkit"].is_object()) { | ||
| const auto & xm = j["x-medkit"]; | ||
| app.component_id = xm.value("componentId", ""); | ||
| app.source = xm.value("source", ""); | ||
| app.description = xm.value("description", ""); | ||
| } | ||
| if (j.contains("translationId")) { | ||
| app.translation_id = j["translationId"].get<std::string>(); | ||
| } | ||
| if (j.contains("tags") && j["tags"].is_array()) { | ||
| app.tags = j["tags"].get<std::vector<std::string>>(); | ||
| } | ||
| return app; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Parse a Function from JSON | ||
| */ | ||
| Function parse_function(const nlohmann::json & j) { | ||
| Function func; | ||
| func.id = j.value("id", ""); | ||
| func.name = j.value("name", ""); | ||
| if (j.contains("x-medkit") && j["x-medkit"].is_object()) { | ||
| const auto & xm = j["x-medkit"]; | ||
| func.source = xm.value("source", ""); | ||
| func.description = xm.value("description", ""); | ||
| } | ||
| if (j.contains("translationId")) { | ||
| func.translation_id = j["translationId"].get<std::string>(); | ||
| } | ||
| if (j.contains("tags") && j["tags"].is_array()) { | ||
| func.tags = j["tags"].get<std::vector<std::string>>(); | ||
| } | ||
| return func; | ||
| } |
There was a problem hiding this comment.
Peer entity parsing drops Function hosts (x-medkit.hosts) and App bound FQN/ROS2 node info. This means merged Functions cannot include remote App IDs, and any cross-entity aggregation that relies on App::effective_fqn() will silently skip remote apps. Parse and populate Function::hosts (and optionally depends_on) from the peer response, and populate App::bound_fqn (from x-medkit.boundFqn or x-medkit.ros2.node depending on endpoint) so downstream aggregation works as intended.
| std::vector<Component> EntityMerger::merge_components(const std::vector<Component> & local, | ||
| const std::vector<Component> & remote) { | ||
| // Start with copies of all local components | ||
| std::vector<Component> result = local; | ||
|
|
||
| // Build set of local component IDs | ||
| std::unordered_set<std::string> local_ids; | ||
| for (const auto & comp : local) { | ||
| local_ids.insert(comp.id); | ||
| } | ||
|
|
||
| for (const auto & remote_comp : remote) { | ||
| Component added = remote_comp; | ||
| added.source = peer_source(); | ||
|
|
||
| if (local_ids.count(remote_comp.id) > 0) { | ||
| // Collision: prefix the remote entity ID | ||
| added.id = prefix_id(remote_comp.id); | ||
| added.name = peer_name_ + SEPARATOR + remote_comp.name; | ||
| } | ||
|
|
||
| routing_table_[added.id] = peer_name_; | ||
| result.push_back(added); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| std::vector<App> EntityMerger::merge_apps(const std::vector<App> & local, const std::vector<App> & remote) { | ||
| // Start with copies of all local apps | ||
| std::vector<App> result = local; | ||
|
|
||
| // Build set of local app IDs | ||
| std::unordered_set<std::string> local_ids; | ||
| for (const auto & app : local) { | ||
| local_ids.insert(app.id); | ||
| } | ||
|
|
||
| for (const auto & remote_app : remote) { | ||
| App added = remote_app; | ||
| added.source = peer_source(); | ||
|
|
||
| if (local_ids.count(remote_app.id) > 0) { | ||
| // Collision: prefix the remote entity ID | ||
| added.id = prefix_id(remote_app.id); | ||
| added.name = peer_name_ + SEPARATOR + remote_app.name; | ||
| } | ||
|
|
||
| routing_table_[added.id] = peer_name_; | ||
| result.push_back(added); |
There was a problem hiding this comment.
Collision prefixing for remote Components/Apps updates the entity IDs but does not rewrite references that point to those IDs (e.g., App.component_id, Function.hosts, App.depends_on). When a remote Component/App is renamed to "peer__id", links like /apps/{id}/is-located-on and merged Function host lists can become inconsistent or point at the wrong local entity. Consider building an old->new ID map during merge and applying it to all relationship fields in the merged entities (and when union-merging Function.hosts).
| // Strip peer prefix from entity ID in the path if present. | ||
| // When entity ID collision causes renaming (e.g., camera_driver -> peer_b__camera_driver), | ||
| // the peer only knows the entity by its original ID (camera_driver), so we must strip | ||
| // the prefix before forwarding. | ||
| std::string forwarded_path = req.path; | ||
| std::string prefix = peer_name + EntityMerger::SEPARATOR; | ||
| auto prefix_pos = forwarded_path.find(prefix); | ||
| if (prefix_pos != std::string::npos) { | ||
| forwarded_path.erase(prefix_pos, prefix.size()); | ||
| } |
There was a problem hiding this comment.
Prefix stripping in forwarded paths uses a substring find/erase, which can remove "peer__" in the wrong part of the URL (query params, other segments) or strip multiple occurrences unintentionally. This can forward an incorrect path to the peer or alter non-entity parts of the route. Prefer parsing the path into segments and stripping the prefix only from the entity-id segment (e.g., /apps/{id}/..., /components/{id}/...).
| class TestPeerAggregation(unittest.TestCase): | ||
| """Verify peer aggregation between two gateway instances.""" | ||
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| """Wait for both gateways to be healthy and discover their nodes.""" | ||
| cls._wait_for_health(PRIMARY_URL, 'Primary') | ||
| cls._wait_for_health(PEER_URL, 'Peer') | ||
|
|
||
| # Wait for peer gateway to discover its nodes | ||
| cls._wait_for_apps(PEER_URL, {'pressure_sensor', 'actuator'}, 'Peer') | ||
|
|
||
| # Wait for primary gateway to discover its own nodes AND merge peer's | ||
| # This requires multiple refresh cycles for aggregation to kick in | ||
| cls._wait_for_apps( | ||
| PRIMARY_URL, | ||
| {'temp_sensor', 'rpm_sensor', 'pressure_sensor', 'actuator'}, | ||
| 'Primary (merged)', | ||
| ) | ||
|
|
||
| @classmethod | ||
| def _wait_for_health(cls, base_url, label): | ||
| """Poll /health until a gateway responds with 200.""" | ||
| deadline = time.monotonic() + GATEWAY_STARTUP_TIMEOUT | ||
| while time.monotonic() < deadline: | ||
| try: | ||
| response = requests.get(f'{base_url}/health', timeout=2) | ||
| if response.status_code == 200: | ||
| print(f'{label} gateway is healthy') | ||
| return | ||
| except requests.exceptions.RequestException: | ||
| pass | ||
| time.sleep(GATEWAY_STARTUP_INTERVAL) | ||
| raise AssertionError( | ||
| f'{label} gateway not responding after {GATEWAY_STARTUP_TIMEOUT}s' | ||
| ) | ||
|
|
||
| @classmethod | ||
| def _wait_for_apps(cls, base_url, required_apps, label): | ||
| """Poll /apps until all required app IDs are present.""" | ||
| deadline = time.monotonic() + DISCOVERY_TIMEOUT | ||
| while time.monotonic() < deadline: | ||
| try: | ||
| response = requests.get(f'{base_url}/apps', timeout=5) | ||
| if response.status_code == 200: | ||
| items = response.json().get('items', []) | ||
| found_ids = {a.get('id', '') for a in items} | ||
| missing = required_apps - found_ids | ||
| if not missing: | ||
| print( | ||
| f'{label}: all apps discovered ({len(found_ids)} total)' | ||
| ) | ||
| return | ||
| print( | ||
| f' {label}: waiting for {missing} ' | ||
| f'(have {found_ids})' | ||
| ) | ||
| except requests.exceptions.RequestException: | ||
| pass | ||
| time.sleep(1.0) | ||
| raise AssertionError( | ||
| f'{label}: apps not discovered after {DISCOVERY_TIMEOUT}s. ' | ||
| f'Missing: {required_apps}' | ||
| ) |
There was a problem hiding this comment.
This integration test re-implements discovery/health waiting logic instead of using the existing GatewayTestCase utilities used throughout the integration test suite. That increases duplication and can make timing/flakiness behavior inconsistent across tests. Consider refactoring this test to extend GatewayTestCase (or reuse its polling helpers) and let the shared base handle readiness/discovery waiting.
| ### Entity Organization | ||
|
|
||
| The gateway organizes nodes into "areas" based on their namespace: | ||
| In runtime discovery mode, the gateway maps the ROS 2 graph to the SOVD entity model: | ||
|
|
||
| - **Component**: A single host-derived Component is created from `HostInfoProvider` (hostname, OS, architecture). All Apps belong to this Component. | ||
| - **App**: Each discovered ROS 2 node becomes an App entity. | ||
| - **Function**: Namespace prefixes create Function entities that group Apps sharing a namespace (e.g., `/powertrain/engine/temp_sensor` and `/powertrain/engine/rpm_sensor` both belong to Function `engine`). | ||
| - **Area**: Top-level namespace segments create Area entities that organize Functions (e.g., `/powertrain/engine/*` nodes create Area `powertrain` containing Function `engine`). | ||
|
|
||
| ``` | ||
| /powertrain/engine/temp_sensor → Area: powertrain, Component: temp_sensor | ||
| /chassis/brakes/pressure_sensor → Area: chassis, Component: pressure_sensor | ||
| /body/lights/controller → Area: body, Component: controller | ||
| /standalone_node → Area: root, Component: standalone_node | ||
| /powertrain/engine/temp_sensor -> Area: powertrain, Function: engine, App: temp_sensor | ||
| /chassis/brakes/pressure_sensor -> Area: chassis, Function: brakes, App: pressure_sensor | ||
| /body/lights/controller -> Area: body, Function: lights, App: controller | ||
| /standalone_node -> Area: root, Function: root, App: standalone_node |
There was a problem hiding this comment.
README "Entity Organization" section describes runtime discovery as creating Areas from top-level namespace segments and nesting Functions under Areas. With the new defaults (create_synthetic_areas=false; Areas only from manifest), runtime_only mode returns an empty /areas list, so this description is misleading. Update this section to reflect that Areas require a manifest (or explicitly mention the deprecated create_synthetic_areas=true legacy option).
… system Reads hostname (gethostname), OS (/etc/os-release PRETTY_NAME), and architecture (uname) to produce a single default Component entity for runtime_only discovery mode. Adds sanitize_entity_id() for converting hostnames to valid SOVD entity IDs. Extends Component struct with optional host_metadata field serialized as x-medkit.host in JSON. 16 unit tests covering component creation, JSON metadata, entity ID sanitization (dots, spaces, case, invalid chars, truncation), and host info accessors.
Change RuntimeDiscoveryStrategy to create Function entities from namespace grouping (SOVD-correct mapping) instead of synthetic Areas. - RuntimeConfig defaults: create_synthetic_areas=false, create_synthetic_components=false, create_functions_from_namespaces=true - discover_functions() creates Function per namespace with App hosts - discover_areas() returns empty by default; logs deprecation when enabled - DiscoveryManager.get_hosts_for_function() now checks strategy output - Update DiscoveryConfig, gateway_params.yaml, and GatewayNode params - Add test_runtime_discovery.cpp with 11 tests covering new behavior - Update test_discovery_manager.cpp for new defaults
Update all integration tests to match the new entity model where: - Areas are empty in runtime_only mode (create_synthetic_areas=false) - Components returns a single host-derived default Component - Functions are created from namespace grouping - Apps point to the host component via component_id Key changes: - GatewayTestCase: add REQUIRED_FUNCTIONS support to _wait_for_discovery - Replace REQUIRED_AREAS with REQUIRED_FUNCTIONS in runtime_only tests - Replace hardcoded component IDs (powertrain, chassis, perception) with dynamically-discovered host component ID - Update test_entity_listing to verify new entity model (empty areas, single host component, namespace-derived functions) - Update test_discovery_heuristic to disable default_component when testing legacy synthetic components, test functions from namespaces - Fix gateway_node.cpp: only override app.component_id to host component in RUNTIME_ONLY mode (preserve manifest is_located_on) - Fix gateway_node.cpp: force ALL apps to host component (not just those with empty component_id, since runtime discovery always sets it)
… (x-medkit extension) - Add get_child_app_ids() and build_collection_x_medkit() to AggregationService for standardized child entity resolution and x-medkit metadata generation - Standardize aggregation_sources field in fault, log, and data handler responses for Function, Area, and Component entities - Add aggregated: true marker consistently across all aggregated collection responses - Add 38 unit tests covering EntityCapabilities, AggregationService child ID resolution, x-medkit metadata, data/operations/configurations aggregation, and edge cases
Type-aware merge for combining local and peer gateway entities: - Area/Function: merge by ID (combine tags, hosts lists, descriptions) - Component/App: prefix remote ID with peername__ on collision - Routing table tracks remote entity IDs for request forwarding
Implement mDNS-based peer gateway discovery using the mjansson/mdns header-only C library (Public Domain). MdnsDiscovery runs background threads for announcing this gateway's presence and browsing for peers on the local network via _medkit._tcp.local service type. Feeds discovered peers to AggregationManager via callbacks.
…ealth Add peer status to /health endpoint when aggregation is active, showing each peer's name, URL, and online/offline status. Add fan-out to the global GET /faults endpoint to merge fault lists from all healthy peers. Entity list endpoints (/areas, /components, /apps, /functions) do not need fan-out because peer entities are already merged in the cache by refresh_cache. Faults are managed by FaultManager (not cached), so they require explicit fan-out. No global /logs endpoint exists, so no log fan-out is needed.
Add two new integration test files: - test_entity_model_runtime: verifies Phase 1 SOVD entity model in runtime_only mode (namespaces create Functions, empty Areas, single host Component, Apps linked via is-located-on) - test_peer_aggregation: launches two gateway instances with static peer configuration to verify entity merging, request forwarding, and health peer status reporting Also includes supporting changes: - Add static peer parameters (aggregation.peer_urls/peer_names) to gateway_node so integration tests can configure peers declaratively - Fix handle_get_app to read from entity cache instead of discovery manager directly, ensuring host component override is reflected in app detail responses
- Hold PeerClient mutex for entire HTTP operations instead of only during lazy init, preventing concurrent use of httplib::Client - Replace AggregationManager::get_routing_table() (returns dangling const ref) with find_peer_for_entity() (thread-safe lookup) - Keep AggregationManager shared lock during forward_request and fan_out_get to prevent use-after-free from concurrent peer removal - Add response body size limit (10MB) for peer JSON responses - Add SSE stream buffer size limit (1MB) to prevent unbounded growth - Filter forwarded response headers to an allowlist instead of blindly copying all headers from peers - Fix timeout unit conversion from milliseconds to seconds+microseconds
- Add fetch_and_merge_peer_entities() to AggregationManager to eliminate dangling pointer risk from healthy_peers() returning raw PeerClient* that could be destroyed by concurrent remove_discovered_peer() calls - Add body size limit checks in forward_request() and fetch_entities() to reject oversized responses from peers before processing - Use local httplib::Client in fetch_entities() instead of shared client to avoid holding client_mutex_ for 4 sequential HTTP requests - Block loopback (127.x, localhost, [::1]) and link-local (169.254.x.x) addresses in is_valid_peer_url() for mDNS-discovered peers - Add warnings for mismatched peer_urls/peer_names array sizes and for aggregation enabled with no static peers and mDNS disabled - Document check_all_health() lock contention tradeoff - Update README Area Organization section to describe current entity model (Functions from namespaces, host-derived Component) - Add tests for loopback and link-local URL rejection
…URLs When entity ID collision causes renaming (e.g., camera_driver -> peer_b__camera_driver), forward_request() now strips the peer prefix before forwarding so the peer receives the original entity ID. Also adds scheme validation for statically configured peers (http/https only), collision warning logs during entity merge, and an auth forwarding note to the aggregation docs.
…gram, packageStyle
…r, SRV additional)
ROS node name (ros2_medkit_gateway) is the same on all instances, causing the self-discovery filter to reject actual peers. Use hostname by default (unique per container/host). Add aggregation.mdns_name parameter for explicit control in multi-gateway-per-host setups.
…stance_name tests
… synthetic components
…325 Areas come from manifest only - runtime discovery never creates Areas. Components come from HostInfoProvider or manifest - runtime discovery never creates Components. Namespaces create Function entities only. Removed: create_synthetic_areas, create_synthetic_components, grouping_strategy, synthetic_component_name_pattern, topic_only_policy, min_topics_for_component, allow_heuristic_areas, allow_heuristic_components, ComponentGroupingStrategy enum, TopicOnlyPolicy enum, discover_synthetic_components(), discover_topic_components(). Kept: discover_apps(), discover_functions(), HostInfoProvider, allow_heuristic_apps, allow_heuristic_functions.
…ponent lookup handle_get_component used DiscoveryManager which only knows local entities, causing 404 for remote components present in the merged cache. Changed to cache-first lookup (matching handle_get_app/handle_get_function). handle_function_hosts called discovery->get_hosts_for_function() which only returns local hosts. Changed to read the Function's hosts list from cache (which has merged hosts from all peers) and resolve apps from cache. merge_components prefixed remote component IDs on collision. Since synthetic components were removed and all Components now come from manifest, they should merge by ID like Areas/Functions. Apps still use prefix on collision since they represent individual ROS 2 nodes.
| httplib::Result result{nullptr, httplib::Error::Unknown}; | ||
| const std::string & path = req.path; | ||
| const std::string content_type = req.get_header_value("Content-Type"); | ||
|
|
||
| if (req.method == "GET") { |
There was a problem hiding this comment.
forward_request() forwards using req.path, which does not include the query string in cpp-httplib. This drops query parameters (e.g., ?status=..., ?severity=...) on forwarded requests and changes behavior vs local handling. Consider forwarding the full request target (path + query) or reconstructing it from req.params/original target before calling the peer client methods.
| // Fetch functions | ||
| { | ||
| auto result = cli.Get(std::string(API_PREFIX) + "/functions"); | ||
| if (!result) { | ||
| return tl::unexpected<std::string>("Failed to connect to peer '" + name_ + "' at " + url_); |
There was a problem hiding this comment.
Peer function merging relies on Function::hosts, but fetch_entities() only calls GET /api/v1/functions, whose response does not include the hosted app IDs (hosts are exposed via /functions/{id}/hosts). As a result, peer-discovered Function objects will have empty hosts, and same-ID Function merges won't actually aggregate across peers. If functions are intended to merge by ID, consider also fetching each function's hosts (and populating Function::hosts) during peer entity fetch.
| EntityMerger merger(peer->name()); | ||
| merged.areas = merger.merge_areas(merged.areas, result->areas); | ||
| merged.functions = merger.merge_functions(merged.functions, result->functions); | ||
| merged.components = merger.merge_components(merged.components, result->components); | ||
| merged.apps = merger.merge_apps(merged.apps, result->apps); |
There was a problem hiding this comment.
The merge order merges Functions before Apps, but App ID collisions are resolved by prefixing in merge_apps(). This can leave merged Function::hosts containing stale/unprefixed app IDs (or pointing at the wrong local app on collision). Consider merging Apps first (or maintaining an old->new app ID mapping during merge) and rewriting Function::hosts accordingly before committing merged Functions into the cache.
| "test/*.cpp" "test/*.h" "test/*.hpp" | ||
| ) | ||
| list(FILTER _format_files EXCLUDE REGEX ".*/vendored/.*") | ||
| list(FILTER _format_files EXCLUDE REGEX ".*/vendored/.*") |
There was a problem hiding this comment.
There’s a duplicated list(FILTER _format_files EXCLUDE REGEX ".*/vendored/.*") line. This is harmless but redundant and makes it easier for these filters to drift over time. Consider removing the duplicate and keeping a single exclude filter for vendored paths.
| list(FILTER _format_files EXCLUDE REGEX ".*/vendored/.*") |
| // Strip peer prefix from entity ID in the path if present. | ||
| // When entity ID collision causes renaming (e.g., camera_driver -> peer_b__camera_driver), | ||
| // the peer only knows the entity by its original ID (camera_driver), so we must strip | ||
| // the prefix before forwarding. | ||
| std::string forwarded_path = req.path; |
There was a problem hiding this comment.
AggregationManager::forward_request() forwards based on req.path only. In cpp-httplib this omits the query string, so filters like ?status=... / ?severity=... won’t be preserved for remote entities. Consider preserving the full target (path + query) when stripping the peer prefix and forwarding.
| // Collision: merge by combining hosts lists | ||
| auto & merged = result[it->second]; | ||
|
|
||
| std::unordered_set<std::string> host_set(merged.hosts.begin(), merged.hosts.end()); | ||
| for (const auto & host : remote_func.hosts) { |
There was a problem hiding this comment.
merge_functions() merges remote_func.hosts into the local function without accounting for App ID collision renaming done in merge_apps() (prefixing with peername__). If a peer app ID collides and is renamed, the merged function will still reference the original (now wrong/non-existent) app ID. Consider applying the same renaming/mapping to Function::hosts when merging functions.
| size_t pos = 0; | ||
| while (true) { | ||
| auto boundary = buffer.find("\n\n", pos); | ||
| if (boundary == std::string::npos) { | ||
| break; |
There was a problem hiding this comment.
The SSE chunk processing looks for event boundaries using "\n\n" only. Many SSE servers use CRLF line endings ("\r\n\r\n"), which won’t match and can cause the proxy to buffer until it hits the 1MB limit and disconnects. Consider normalizing line endings or supporting both delimiters.
| std::vector<PeerClient *> AggregationManager::healthy_peers() { | ||
| std::shared_lock<std::shared_mutex> lock(mutex_); | ||
|
|
||
| std::vector<PeerClient *> result; | ||
| for (auto & peer : peers_) { |
There was a problem hiding this comment.
healthy_peers() returns raw PeerClient* values, but the shared_lock is released when the function returns. If a peer is removed concurrently, callers can end up with dangling pointers. Consider returning value snapshots (name/url/health), returning shared_ptr, or constraining usage so callers can’t outlive the lock scope.
| - **Component**: A single host-derived Component is created from `HostInfoProvider` (hostname, OS, architecture). All Apps belong to this Component. | ||
| - **App**: Each discovered ROS 2 node becomes an App entity. | ||
| - **Function**: Namespace prefixes create Function entities that group Apps sharing a namespace (e.g., `/powertrain/engine/temp_sensor` and `/powertrain/engine/rpm_sensor` both belong to Function `engine`). | ||
| - **Area**: Top-level namespace segments create Area entities that organize Functions (e.g., `/powertrain/engine/*` nodes create Area `powertrain` containing Function `engine`). | ||
|
|
There was a problem hiding this comment.
The README says runtime discovery creates Areas from top-level namespace segments, but the updated runtime-only entity model expects Areas to be empty (Areas are manifest-only). This section should be updated to match current behavior to avoid misleading users.
Convert handle_get_subareas, handle_get_contains, handle_get_subcomponents, handle_get_hosts, handle_component_depends_on, handle_app_depends_on, and handle_app_is_located_on to read from ThreadSafeEntityCache first and fall back to DiscoveryManager only on cache miss. This ensures remote/aggregated entities from peers are visible through all relationship endpoints. For handle_get_contains, recursive subarea walking is implemented to match ManifestManager::get_components_for_area behavior (collect components from descendant areas, not just direct children). Test expectations updated to reflect that cache holds the authoritative is_online state (set in SetUp), matching the existing FunctionHosts pattern.
4c89685 to
49176a4
Compare
Pull Request
Summary
Aligns the runtime discovery entity model with the SOVD spec (ISO 17978-3) and adds multi-instance peer aggregation with mDNS auto-discovery.
Entity model changes:
Multi-instance aggregation:
peer_urls/peer_names) and mDNS auto-discovery (opt-in)__on collisionvalidate_entity_for_route()- zero handler changes/faultswith partial results on peer failure/healthendpointSecurity hardening:
Issue
Type
Testing
Unit tests: 2296 tests, 0 failures
test_host_info_provider(16),test_runtime_discovery(11),test_function_resource_collections(38)test_peer_client(23 - mock server),test_entity_merger(21),test_aggregation_manager(26)test_mdns_discovery(15),test_stream_proxy(18 - mock server)Integration tests:
test_entity_model_runtime.test.py- namespace -> Function, default Component, App linkingtest_peer_aggregation.test.py- two-gateway setup, merged entities, forwarding, peer healthBreaking changes:
/areasreturns empty inruntime_onlymode (was namespace-based synthetic areas)/componentsreturns single host component (was per-namespace synthetic components)/functionsnow populated from namespaces (was empty)docs/config/aggregation.rstChecklist