From 06359d0bef5467dbfc60c057084db714028fbd2f Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Fri, 5 Jun 2026 15:43:06 -0700 Subject: [PATCH 1/2] test: validate Hermes routed provider OpenInference spans Signed-off-by: mnajafian-nv --- Cargo.lock | 2 + crates/cli/Cargo.toml | 2 + crates/cli/tests/coverage/session_tests.rs | 517 ++++++++++++++------- 3 files changed, 350 insertions(+), 171 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0246c835..7bca9a1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,8 @@ dependencies = [ "http-body-util", "nemo-relay", "nemo-relay-adaptive", + "opentelemetry", + "opentelemetry_sdk", "reqwest", "rustls", "serde", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index ca1d49d2..1ae7b6ae 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -45,5 +45,7 @@ toml_edit = "0.23" uuid = { workspace = true, features = ["serde", "v7"] } [dev-dependencies] +opentelemetry = { version = "0.31", default-features = false, features = ["trace"] } +opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } tempfile = "3" tower = { version = "0.5", features = ["util"] } diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index bb3bb6cb..f0c27695 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -4,8 +4,12 @@ use axum::http::HeaderMap; use nemo_relay::api::event::ScopeCategory; use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use nemo_relay::observability::openinference::OpenInferenceSubscriber; use nemo_relay::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins}; +use opentelemetry::KeyValue; +use opentelemetry_sdk::trace::InMemorySpanExporterBuilder; use serde_json::json; +use std::collections::HashMap; use std::path::Path; use std::sync::{Arc, Mutex as StdMutex}; @@ -13,6 +17,7 @@ use super::*; use crate::model::{LlmEvent, LlmHintEvent, SessionEvent, ToolEvent}; static OBSERVABILITY_PLUGIN_TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); +const HERMES_ROUTED_TEST_SESSION_KEY: &str = "hermes_routed_test_session_id"; async fn install_test_atif_plugin(output_directory: &Path) { let _ = clear_plugin_configuration(); @@ -38,6 +43,40 @@ async fn install_test_atif_plugin(output_directory: &Path) { initialize_plugins(config).await.unwrap(); } +fn make_openinference_test_subscriber( + scope: &str, +) -> ( + OpenInferenceSubscriber, + opentelemetry_sdk::trace::InMemorySpanExporter, +) { + let exporter = InMemorySpanExporterBuilder::new().build(); + let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let subscriber = OpenInferenceSubscriber::from_tracer_provider(provider, scope.to_string()); + (subscriber, exporter) +} + +fn attr_map(attributes: &[KeyValue]) -> HashMap { + attributes + .iter() + .map(|attribute| { + ( + attribute.key.as_str().to_string(), + attribute.value.to_string(), + ) + }) + .collect() +} + +fn hermes_routed_gateway_metadata(gateway_path: &str, test_session_marker: Option<&str>) -> Value { + let mut metadata = json!({ "gateway_path": gateway_path }); + if let Some(marker) = test_session_marker { + metadata[HERMES_ROUTED_TEST_SESSION_KEY] = json!(marker); + } + metadata +} + fn read_atif_for_session(output_directory: &Path, session_id: &str) -> Value { flush_subscribers().unwrap(); std::fs::read_dir(output_directory) @@ -94,6 +133,187 @@ async fn has_pending_alignment(manager: &SessionManager, session_id: &str) -> bo .has_pending_session(session_id) } +async fn drive_hermes_routed_provider_session( + manager: &SessionManager, + headers: &HeaderMap, + session_id: &str, + test_session_marker: Option<&str>, +) { + manager + .apply_events( + headers, + vec![NormalizedEvent::AgentStarted(SessionEvent { + session_id: session_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "on_session_start".into(), + payload: json!({}), + metadata: json!({}), + })], + ) + .await + .unwrap(); + + let anthropic = manager + .start_llm( + headers, + LlmGatewayStart { + session_id: Some(session_id.into()), + provider: "anthropic.messages".into(), + model_name: Some("claude-sonnet-4".into()), + subagent_id: None, + conversation_id: None, + generation_id: None, + request_id: Some("msg-request".into()), + request: LlmRequest { + headers: Map::new(), + content: json!({ + "model": "claude-sonnet-4", + "messages": [{"role": "user", "content": "Find the file."}], + "tools": [{"name": "search", "input_schema": {"type": "object"}}] + }), + }, + streaming: false, + metadata: hermes_routed_gateway_metadata("/v1/messages", test_session_marker), + }, + ) + .await + .unwrap(); + manager + .end_llm( + anthropic, + json!({ + "id": "msg_01", + "type": "message", + "content": [ + {"type": "text", "text": "I will search."}, + {"type": "tool_use", "id": "toolu_01", "name": "search", "input": {"query": "file"}} + ], + "usage": { + "input_tokens": 11, + "output_tokens": 7, + "cache_read_input_tokens": 3, + "cost": {"total": 0.0042} + } + }), + json!({}), + ) + .await + .unwrap(); + + let responses = manager + .start_llm( + headers, + LlmGatewayStart { + session_id: Some(session_id.into()), + provider: "openai.responses".into(), + model_name: Some("gpt-4o".into()), + subagent_id: None, + conversation_id: None, + generation_id: None, + request_id: Some("resp-request".into()), + request: LlmRequest { + headers: Map::new(), + content: json!({ + "model": "gpt-4o", + "input": "Find the weather.", + "tools": [{"type": "function", "name": "get_weather"}] + }), + }, + streaming: false, + metadata: hermes_routed_gateway_metadata("/v1/responses", test_session_marker), + }, + ) + .await + .unwrap(); + manager + .end_llm( + responses, + json!({ + "id": "resp_1", + "output": [ + {"type": "message", "content": [{"type": "output_text", "text": "I will check the weather."}]}, + {"type": "function_call", "call_id": "call_weather_1", "name": "get_weather", "arguments": "{\"city\":\"SF\"}"} + ], + "usage": { + "input_tokens": 75, + "output_tokens": 20, + "total_tokens": 95, + "input_tokens_details": {"cached_tokens": 10}, + "cost_usd": 0.005 + } + }), + json!({}), + ) + .await + .unwrap(); + + let chat = manager + .start_llm( + headers, + LlmGatewayStart { + session_id: Some(session_id.into()), + provider: "openai.chat_completions".into(), + model_name: Some("gpt-4o".into()), + subagent_id: None, + conversation_id: None, + generation_id: None, + request_id: Some("chat-request".into()), + request: LlmRequest { + headers: Map::new(), + content: json!({ + "model": "gpt-4o", + "messages": [{"role": "user", "content": "Inspect the files."}], + "tools": [{"type": "function", "function": {"name": "read"}}] + }), + }, + streaming: false, + metadata: hermes_routed_gateway_metadata( + "/v1/chat/completions", + test_session_marker, + ), + }, + ) + .await + .unwrap(); + manager + .end_llm( + chat, + json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": "I will inspect.", + "tool_calls": [{"id": "call_read_1", "function": {"name": "read", "arguments": "{\"path\":\"api.py\"}"}}] + } + }], + "usage": { + "prompt_tokens": 3, + "completion_tokens": 4, + "total_tokens": 7, + "prompt_tokens_details": {"cached_tokens": 2}, + "cost_usd": 0.001 + } + }), + json!({}), + ) + .await + .unwrap(); + + manager + .apply_events( + headers, + vec![NormalizedEvent::AgentEnded(SessionEvent { + session_id: session_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "on_session_finalize".into(), + payload: json!({}), + metadata: json!({}), + })], + ) + .await + .unwrap(); +} + #[tokio::test] async fn nests_agent_subagent_and_tool_lifecycle() { let config = GatewayConfig { @@ -2316,177 +2536,7 @@ async fn hermes_routed_provider_payloads_write_exact_atif_trajectory() { install_test_atif_plugin(&atif_dir).await; let manager = SessionManager::new(session_test_config()); let headers = HeaderMap::new(); - - manager - .apply_events( - &headers, - vec![NormalizedEvent::AgentStarted(SessionEvent { - session_id: "hermes-routed".into(), - agent_kind: AgentKind::Hermes, - event_name: "on_session_start".into(), - payload: json!({}), - metadata: json!({}), - })], - ) - .await - .unwrap(); - - let anthropic = manager - .start_llm( - &headers, - LlmGatewayStart { - session_id: Some("hermes-routed".into()), - provider: "anthropic.messages".into(), - model_name: Some("claude-sonnet-4".into()), - subagent_id: None, - conversation_id: None, - generation_id: None, - request_id: Some("msg-request".into()), - request: LlmRequest { - headers: Map::new(), - content: json!({ - "model": "claude-sonnet-4", - "messages": [{"role": "user", "content": "Find the file."}], - "tools": [{"name": "search", "input_schema": {"type": "object"}}] - }), - }, - streaming: false, - metadata: json!({ "gateway_path": "/v1/messages" }), - }, - ) - .await - .unwrap(); - manager - .end_llm( - anthropic, - json!({ - "id": "msg_01", - "type": "message", - "content": [ - {"type": "text", "text": "I will search."}, - {"type": "tool_use", "id": "toolu_01", "name": "search", "input": {"query": "file"}} - ], - "usage": { - "input_tokens": 11, - "output_tokens": 7, - "cache_read_input_tokens": 3, - "cost": {"total": 0.0042} - } - }), - json!({}), - ) - .await - .unwrap(); - - let responses = manager - .start_llm( - &headers, - LlmGatewayStart { - session_id: Some("hermes-routed".into()), - provider: "openai.responses".into(), - model_name: Some("gpt-4o".into()), - subagent_id: None, - conversation_id: None, - generation_id: None, - request_id: Some("resp-request".into()), - request: LlmRequest { - headers: Map::new(), - content: json!({ - "model": "gpt-4o", - "input": "Find the weather.", - "tools": [{"type": "function", "name": "get_weather"}] - }), - }, - streaming: false, - metadata: json!({ "gateway_path": "/v1/responses" }), - }, - ) - .await - .unwrap(); - manager - .end_llm( - responses, - json!({ - "id": "resp_1", - "output": [ - {"type": "message", "content": [{"type": "output_text", "text": "I will check the weather."}]}, - {"type": "function_call", "call_id": "call_weather_1", "name": "get_weather", "arguments": "{\"city\":\"SF\"}"} - ], - "usage": { - "input_tokens": 75, - "output_tokens": 20, - "total_tokens": 95, - "input_tokens_details": {"cached_tokens": 10}, - "cost_usd": 0.005 - } - }), - json!({}), - ) - .await - .unwrap(); - - let chat = manager - .start_llm( - &headers, - LlmGatewayStart { - session_id: Some("hermes-routed".into()), - provider: "openai.chat_completions".into(), - model_name: Some("gpt-4o".into()), - subagent_id: None, - conversation_id: None, - generation_id: None, - request_id: Some("chat-request".into()), - request: LlmRequest { - headers: Map::new(), - content: json!({ - "model": "gpt-4o", - "messages": [{"role": "user", "content": "Inspect the files."}], - "tools": [{"type": "function", "function": {"name": "read"}}] - }), - }, - streaming: false, - metadata: json!({ "gateway_path": "/v1/chat/completions" }), - }, - ) - .await - .unwrap(); - manager - .end_llm( - chat, - json!({ - "choices": [{ - "message": { - "role": "assistant", - "content": "I will inspect.", - "tool_calls": [{"id": "call_read_1", "function": {"name": "read", "arguments": "{\"path\":\"api.py\"}"}}] - } - }], - "usage": { - "prompt_tokens": 3, - "completion_tokens": 4, - "total_tokens": 7, - "prompt_tokens_details": {"cached_tokens": 2}, - "cost_usd": 0.001 - } - }), - json!({}), - ) - .await - .unwrap(); - - manager - .apply_events( - &headers, - vec![NormalizedEvent::AgentEnded(SessionEvent { - session_id: "hermes-routed".into(), - agent_kind: AgentKind::Hermes, - event_name: "on_session_finalize".into(), - payload: json!({}), - metadata: json!({}), - })], - ) - .await - .unwrap(); + drive_hermes_routed_provider_session(&manager, &headers, "hermes-routed", None).await; clear_plugin_configuration().unwrap(); let atif = read_atif_for_session(&atif_dir, "hermes-routed"); @@ -2526,6 +2576,131 @@ async fn hermes_routed_provider_payloads_write_exact_atif_trajectory() { assert_eq!(atif["final_metrics"]["total_cost_usd"], json!(0.0102)); } +#[tokio::test] +async fn hermes_routed_provider_payloads_emit_openinference_text_usage_and_cost() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let subscriber_name = "cli-hermes-routed-openinference-test"; + let session_id = "hermes-routed-openinference"; + let _ = deregister_subscriber(subscriber_name); + let (subscriber, exporter) = make_openinference_test_subscriber("session-test-scope"); + let openinference_subscriber = subscriber.subscriber(); + register_subscriber( + subscriber_name, + Arc::new(move |event| { + // Manual test-path LLM events do not carry the owning session id in metadata, + // so the routed helper tags them with a stable test marker for subscriber isolation. + if event + .metadata() + .and_then(|metadata| metadata.get(HERMES_ROUTED_TEST_SESSION_KEY)) + .and_then(Value::as_str) + == Some(session_id) + { + openinference_subscriber(event); + } + }), + ) + .unwrap(); + + let manager = SessionManager::new(session_test_config()); + let headers = HeaderMap::new(); + drive_hermes_routed_provider_session(&manager, &headers, session_id, Some(session_id)).await; + + subscriber.force_flush().unwrap(); + assert!(deregister_subscriber(subscriber_name).unwrap()); + + let spans = exporter.get_finished_spans().unwrap(); + let llm_spans: Vec> = spans + .iter() + .map(|span| attr_map(&span.attributes)) + .filter(|attributes| { + attributes + .get("openinference.span.kind") + .map(String::as_str) + == Some("LLM") + }) + .collect(); + assert_eq!(llm_spans.len(), 3); + + let anthropic = llm_spans + .iter() + .find(|attributes| { + attributes.get("output.value") + == Some(&"I will search.\nRequested tools: search".to_string()) + }) + .expect("expected Hermes-routed Anthropic OpenInference span"); + assert_eq!( + anthropic.get("llm.model_name"), + Some(&"claude-sonnet-4".to_string()) + ); + assert_eq!( + anthropic.get("input.value"), + Some(&"user: Find the file.".to_string()) + ); + assert_eq!( + anthropic.get("llm.token_count.prompt"), + Some(&"11".to_string()) + ); + assert_eq!( + anthropic.get("llm.token_count.completion"), + Some(&"7".to_string()) + ); + assert_eq!( + anthropic.get("llm.token_count.prompt_details.cache_read"), + Some(&"3".to_string()) + ); + assert_eq!(anthropic.get("llm.cost.total"), Some(&"0.0042".to_string())); + + let responses = llm_spans + .iter() + .find(|attributes| { + attributes.get("output.value") + == Some(&"I will check the weather.\nRequested tools: get_weather".to_string()) + }) + .expect("expected Hermes-routed Responses OpenInference span"); + assert_eq!(responses.get("llm.model_name"), Some(&"gpt-4o".to_string())); + assert_eq!( + responses.get("llm.token_count.prompt"), + Some(&"75".to_string()) + ); + assert_eq!( + responses.get("llm.token_count.completion"), + Some(&"20".to_string()) + ); + assert_eq!( + responses.get("llm.token_count.total"), + Some(&"95".to_string()) + ); + assert_eq!( + responses.get("llm.token_count.prompt_details.cache_read"), + Some(&"10".to_string()) + ); + assert_eq!(responses.get("llm.cost.total"), Some(&"0.005".to_string())); + + let chat = llm_spans + .iter() + .find(|attributes| { + attributes.get("output.value") + == Some(&"I will inspect.\nRequested tools: read".to_string()) + }) + .expect("expected Hermes-routed chat completions OpenInference span"); + assert_eq!(chat.get("llm.model_name"), Some(&"gpt-4o".to_string())); + assert_eq!( + chat.get("input.value"), + Some(&"user: Inspect the files.".to_string()) + ); + assert_eq!(chat.get("llm.token_count.prompt"), Some(&"3".to_string())); + assert_eq!( + chat.get("llm.token_count.completion"), + Some(&"4".to_string()) + ); + assert_eq!(chat.get("llm.token_count.total"), Some(&"7".to_string())); + assert_eq!( + chat.get("llm.token_count.prompt_details.cache_read"), + Some(&"2".to_string()) + ); + assert_eq!(chat.get("llm.cost.total"), Some(&"0.001".to_string())); +} + #[tokio::test] async fn empty_hook_marks_do_not_create_empty_atif_steps() { let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; From 77e4ba2128ab8bc6db1c94071482e59a8fd5a8d6 Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Fri, 5 Jun 2026 16:45:54 -0700 Subject: [PATCH 2/2] build: centralize OpenTelemetry workspace dependency versions Signed-off-by: mnajafian-nv --- Cargo.toml | 2 ++ crates/cli/Cargo.toml | 4 ++-- crates/core/Cargo.toml | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf54af16..8c3cdfb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,8 @@ nemo-relay = { version = "0.4.0", path = "crates/core", default-features = false nemo-relay-adaptive = { version = "0.4.0", path = "crates/adaptive" } nemo-relay-ffi = { version = "0.4.0", path = "crates/ffi" } nemo-relay-cli = { version = "0.4.0", path = "crates/cli" } +opentelemetry = { version = "0.31", default-features = false } +opentelemetry_sdk = { version = "0.31", default-features = false } uuid = "=1.18.1" [workspace.lints.rust] diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 34d64701..16e13b15 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -50,7 +50,7 @@ toml_edit = "0.23" uuid = { workspace = true, features = ["serde", "v7"] } [dev-dependencies] -opentelemetry = { version = "0.31", default-features = false, features = ["trace"] } -opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } +opentelemetry = { workspace = true, features = ["trace"] } +opentelemetry_sdk = { workspace = true, features = ["trace", "testing"] } tempfile = "3" tower = { version = "0.5", features = ["util"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 72139869..fb56e9f7 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -86,8 +86,8 @@ tokio = { version = "1", default-features = false, features = ["rt", "macros", " tokio-stream = { version = "0.1", default-features = false, features = ["sync"] } typed-builder = "0.23.2" futures-util = { version = "0.3", optional = true } -opentelemetry = { version = "0.31", default-features = false, features = ["trace"], optional = true } -opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"], optional = true } +opentelemetry = { workspace = true, features = ["trace"], optional = true } +opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true } openinference-semantic-conventions = { version = "0.1.1", optional = true } async-trait = { version = "0.1", optional = true } getrandom = { version = "0.3.4", features = ["wasm_js"], optional = true } @@ -100,7 +100,7 @@ web-sys = { version = "0.3", features = ["Headers", "Request", "RequestInit", "R [dev-dependencies] tokio = { version = "1", features = ["rt", "macros", "sync", "test-util", "rt-multi-thread", "time"] } futures = "0.3" -opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace", "testing"] } +opentelemetry_sdk = { workspace = true, features = ["trace", "testing"] } serde_json = "1" object_store = { version = "0.13", default-features = false, features = ["aws"] }