From b47d8c2794f47be43c9bf6379c77e2a1f46516be Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Fri, 5 Jun 2026 15:37:15 -0700 Subject: [PATCH 1/4] test(cli): ensure Codex ATOF output consistency RELAY-206: Ensure Codex ATOF output consistency Add Codex fixtures and assertions so raw ATOF output matches the shared contract. Covers Codex Stop turn closure, tool scope pairing, UUID/parent UUID linkage, and ATOF 0.1 event metadata. Signed-off-by: Yuchen Zhang --- crates/cli/tests/coverage/server_tests.rs | 156 ++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index 12caff1f..c65fa376 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -105,6 +105,28 @@ fn test_config() -> GatewayConfig { } } +fn scope_event<'a>( + events: &'a [Value], + name: &str, + category: &str, + scope_category: &str, +) -> &'a Value { + events + .iter() + .find(|event| { + event["kind"] == "scope" + && event["name"] == name + && event["category"] == category + && event["scope_category"] == scope_category + }) + .unwrap_or_else(|| { + panic!( + "expected {scope_category} {category} scope named {name}, got: {}", + serde_json::to_string_pretty(events).unwrap() + ) + }) +} + #[tokio::test] async fn codex_hook_keeps_codex_response_shape() { let app = router(test_config()); @@ -319,6 +341,140 @@ async fn serve_listener_observability_plugin_records_non_hermes_hooks() { assert!(!agent_starts.contains(&"claude-code".to_string())); } +#[tokio::test] +async fn serve_listener_records_codex_stop_atof_contract() { + let _guard = PLUGIN_TEST_LOCK.lock().await; + let _ = nemo_relay::plugin::clear_plugin_configuration(); + + let temp = tempfile::tempdir().unwrap(); + let atof_dir = temp.path().join("atof"); + std::fs::create_dir_all(&atof_dir).unwrap(); + let mut config = test_config(); + config.plugin_config = Some(json!({ + "version": 1, + "components": [ + { + "kind": "observability", + "enabled": true, + "config": { + "version": 1, + "atof": { + "enabled": true, + "output_directory": atof_dir, + "filename": "events.jsonl", + "mode": "overwrite" + } + } + } + ] + })); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let url = format!("http://{address}"); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let handle = + tokio::spawn(async move { serve_listener(listener, config, Some(shutdown_rx)).await }); + + wait_for_gateway(&url).await; + let client = test_http_client(); + for payload in [ + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "sessionStart", + "cwd": "/workspace", + "model": "gpt-5.1-codex" + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "PreToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_input": { "file_path": "README.md" } + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "PostToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_output": { "bytes": 42 }, + "status": "success" + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "Stop", + "response": "Done." + }), + ] { + let response = client + .post(format!("{url}/hooks/codex")) + .json(&payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.json::().await.unwrap(), json!({})); + } + + shutdown_tx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + assert!(nemo_relay::plugin::active_plugin_report().is_none()); + + let events = std::fs::read_to_string(temp.path().join("atof/events.jsonl")).unwrap(); + let events = events + .lines() + .map(|line| serde_json::from_str::(line).unwrap()) + .collect::>(); + + assert_eq!(events.len(), 4, "unexpected ATOF events: {events:?}"); + assert!(events.iter().all(|event| event["atof_version"] == "0.1")); + assert!(!events.iter().any(|event| { + event["kind"] == "scope" + && event["scope_category"] == "start" + && event["category"] == "agent" + && event["name"] == "codex" + })); + + let turn_start = scope_event(&events, "codex-turn", "agent", "start"); + let turn_end = scope_event(&events, "codex-turn", "agent", "end"); + assert_eq!(turn_start["uuid"], turn_end["uuid"]); + assert_eq!( + turn_start["data"], + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }) + ); + assert_eq!(turn_start["metadata"]["session_id"], "codex-atof-session"); + assert_eq!(turn_start["metadata"]["agent_kind"], "codex"); + assert_eq!(turn_start["metadata"]["nemo_relay_scope_role"], "turn"); + assert_eq!(turn_start["metadata"]["turn_source"], "user_prompt"); + assert_eq!(turn_end["data"]["hook_event_name"], "Stop"); + assert_eq!(turn_end["data"]["response"], "Done."); + + let tool_start = scope_event(&events, "Read", "tool", "start"); + let tool_end = scope_event(&events, "Read", "tool", "end"); + assert_eq!(tool_start["uuid"], tool_end["uuid"]); + assert_eq!(tool_start["parent_uuid"], turn_start["uuid"]); + assert_eq!(tool_end["parent_uuid"], turn_start["uuid"]); + assert_eq!( + tool_start["category_profile"]["tool_call_id"], + "tool-call-1" + ); + assert_eq!(tool_end["category_profile"]["tool_call_id"], "tool-call-1"); + assert_eq!(tool_start["data"], json!({ "file_path": "README.md" })); + assert_eq!(tool_end["data"], json!({ "bytes": 42 })); + assert_eq!(tool_start["metadata"]["agent_kind"], "codex"); + assert_eq!(tool_end["metadata"]["agent_kind"], "codex"); + assert_eq!(tool_end["metadata"]["status"], "success"); +} + #[tokio::test] async fn serve_listener_activates_any_registered_plugin_kind() { let _guard = PLUGIN_TEST_LOCK.lock().await; From 9f21eaae2d0a34a6bf7dbc939f92cc682da98341 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Fri, 5 Jun 2026 15:38:14 -0700 Subject: [PATCH 2/4] test(cli): ensure Codex ATIF output consistency RELAY-207: Ensure Codex ATIF output consistency Add Codex fixtures and assertions so ATIF trajectory output matches the shared contract, including per-turn snapshot behavior. Covers Stop-triggered Codex turn snapshots without requiring a sessionEnd hook. Signed-off-by: Yuchen Zhang --- crates/cli/tests/coverage/session_tests.rs | 181 +++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index c848dd96..13664c10 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -38,6 +38,95 @@ async fn install_test_atif_plugin(output_directory: &Path) { initialize_plugins(config).await.unwrap(); } +async fn apply_codex_payload(manager: &SessionManager, headers: &HeaderMap, payload: Value) { + let outcome = crate::adapters::codex::adapt(payload, headers); + manager.apply_events(headers, outcome.events).await.unwrap(); +} + +async fn start_codex_prompt_turn(manager: &SessionManager, headers: &HeaderMap, session_id: &str) { + for payload in [ + json!({ + "session_id": session_id, + "hook_event_name": "sessionStart", + "model": "gpt-test" + }), + json!({ + "session_id": session_id, + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }), + ] { + apply_codex_payload(manager, headers, payload).await; + } +} + +async fn run_codex_responses_tool_activity( + manager: &SessionManager, + headers: &HeaderMap, + session_id: &str, +) { + let llm = manager + .start_llm( + headers, + llm_start_with_responses_task(session_id, "Inspect the repository."), + ) + .await + .unwrap(); + manager + .end_llm( + llm, + json!({ + "id": "resp_1", + "status": "completed", + "output": [ + { + "type": "function_call", + "call_id": "tool-call-1", + "name": "Read", + "arguments": "{\"file_path\":\"README.md\"}", + "status": "completed" + } + ] + }), + json!({}), + ) + .await + .unwrap(); + + for payload in [ + json!({ + "session_id": session_id, + "hook_event_name": "PreToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_input": { "file_path": "README.md" } + }), + json!({ + "session_id": session_id, + "hook_event_name": "PostToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_output": { "content": "hello" }, + "status": "success" + }), + ] { + apply_codex_payload(manager, headers, payload).await; + } +} + +async fn stop_codex_turn(manager: &SessionManager, headers: &HeaderMap, session_id: &str) { + apply_codex_payload( + manager, + headers, + json!({ + "session_id": session_id, + "hook_event_name": "Stop", + "response": "Done." + }), + ) + .await; +} + fn read_atif_for_session(output_directory: &Path, session_id: &str) -> Value { flush_subscribers().unwrap(); std::fs::read_dir(output_directory) @@ -1290,6 +1379,98 @@ async fn writes_atif_on_session_end_from_plugin_config() { ); } +#[tokio::test] +async fn codex_stop_snapshots_atif_without_session_end() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let temp = tempfile::tempdir().unwrap(); + let atif_dir = temp.path().join("atif"); + install_test_atif_plugin(&atif_dir).await; + let manager = SessionManager::new(session_test_config()); + let headers = HeaderMap::new(); + + start_codex_prompt_turn(&manager, &headers, "codex-atif-stop").await; + run_codex_responses_tool_activity(&manager, &headers, "codex-atif-stop").await; + assert!( + std::fs::read_dir(&atif_dir).unwrap().next().is_none(), + "Codex ATIF should wait for Stop before writing a per-turn snapshot" + ); + + stop_codex_turn(&manager, &headers, "codex-atif-stop").await; + + clear_plugin_configuration().unwrap(); + let atif = read_atif_for_session(&atif_dir, "codex-atif-stop"); + assert_eq!(atif["schema_version"], json!("ATIF-v1.7")); + assert_eq!(atif["trajectory_id"], atif["session_id"]); + assert!(atif["subagent_trajectories"].is_null()); + assert_eq!(atif["final_metrics"]["total_steps"], json!(2)); + + let observed = atif["extra"]["observed_events"].as_array().unwrap(); + assert_eq!(observed.len(), 6); + assert!(observed.iter().all(|event| { + event["metadata"]["hook_event_name"] != json!("sessionEnd") + && event["metadata"]["hook_event_name"] != json!("session_end") + })); + let turn_start = observed + .iter() + .find(|event| { + event["name"] == "codex-turn" + && event["category"] == "agent" + && event["scope_category"] == "start" + }) + .expect("Codex turn start should be observed"); + let turn_end = observed + .iter() + .find(|event| { + event["name"] == "codex-turn" + && event["category"] == "agent" + && event["scope_category"] == "end" + }) + .expect("Codex Stop should close the turn scope"); + assert_eq!(turn_start["uuid"], atif["session_id"]); + assert_eq!(turn_end["uuid"], atif["session_id"]); + assert_eq!( + turn_end["data"]["output"][0]["call_id"], + json!("tool-call-1") + ); + + let steps = atif["steps"].as_array().unwrap(); + assert_eq!(steps.len(), 2); + assert_eq!(steps[0]["source"], json!("user")); + assert_eq!(steps[0]["message"], json!("Inspect the repository.")); + assert_eq!( + steps[0]["extra"]["ancestry"]["parent_name"], + json!("codex-turn") + ); + assert_eq!(steps[1]["source"], json!("agent")); + assert_eq!(steps[1]["model_name"], json!("gpt-test")); + assert_eq!(steps[1]["llm_call_count"], json!(1)); + assert_eq!( + steps[1]["tool_calls"][0], + json!({ + "tool_call_id": "tool-call-1", + "function_name": "Read", + "arguments": { "file_path": "README.md" }, + "extra": { "status": "completed" } + }) + ); + assert_eq!( + steps[1]["observation"]["results"][0]["source_call_id"], + json!("tool-call-1") + ); + assert_eq!( + steps[1]["observation"]["results"][0]["content"], + json!({ "content": "hello" }) + ); + assert_eq!( + steps[1]["extra"]["tool_ancestry"][0]["parent_name"], + json!("codex-turn") + ); + assert_eq!( + steps[1]["extra"]["tool_invocations"][0]["invocation_id"], + json!("tool-call-1") + ); +} + #[tokio::test] async fn duplicate_agent_end_does_not_overwrite_atif_with_empty_session() { // Regression test: hermes-agent and other integrations can emit terminal hooks more than once From bc366940700992d42ddd46b3d353a7880cbf971a Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Fri, 5 Jun 2026 15:39:04 -0700 Subject: [PATCH 3/4] test(cli): ensure Codex OpenInference output consistency RELAY-205: Ensure Codex OpenInference output consistency Add Codex fixtures and assertions so OpenInference spans match the shared contract. Covers AGENT, LLM, and TOOL span export fields, UUID/parent UUID linkage, tool-call attributes, and Stop-based Codex turn closure. Signed-off-by: Yuchen Zhang --- crates/cli/tests/coverage/session_tests.rs | 136 +++++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 13664c10..5a030732 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -6,8 +6,12 @@ use nemo_relay::api::event::ScopeCategory; use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; use nemo_relay::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins}; use serde_json::json; +use std::io::{Read, Write}; +use std::net::TcpListener as StdTcpListener; use std::path::Path; use std::sync::{Arc, Mutex as StdMutex}; +use std::thread; +use std::time::{Duration, Instant}; use super::*; use crate::model::{LlmEvent, LlmHintEvent, SessionEvent, ToolEvent}; @@ -38,6 +42,96 @@ async fn install_test_atif_plugin(output_directory: &Path) { initialize_plugins(config).await.unwrap(); } +async fn install_test_openinference_plugin(endpoint: &str) { + let _ = clear_plugin_configuration(); + let config: PluginConfig = serde_json::from_value(json!({ + "version": 1, + "components": [ + { + "kind": "observability", + "enabled": true, + "config": { + "version": 1, + "openinference": { + "enabled": true, + "transport": "http_binary", + "endpoint": endpoint, + "service_name": "codex-openinference-test", + "timeout_millis": 1000 + } + } + } + ] + })) + .unwrap(); + initialize_plugins(config).await.unwrap(); +} + +fn start_otlp_collector() -> (String, thread::JoinHandle<(String, Vec)>) { + let listener = StdTcpListener::bind("127.0.0.1:0").unwrap(); + let endpoint = format!("http://{}/v1/traces", listener.local_addr().unwrap()); + let handle = thread::spawn(move || { + listener.set_nonblocking(true).unwrap(); + let deadline = Instant::now() + Duration::from_secs(5); + let (mut stream, _) = loop { + match listener.accept() { + Ok(connection) => break connection, + Err(error) + if error.kind() == std::io::ErrorKind::WouldBlock + && Instant::now() < deadline => + { + thread::sleep(Duration::from_millis(10)); + } + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + panic!("collector should receive an OTLP request") + } + Err(error) => panic!("collector failed to accept OTLP request: {error}"), + } + }; + stream + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buffer = Vec::new(); + let mut content_length = None; + let mut header_end = None; + loop { + let mut chunk = [0_u8; 4096]; + let read = stream.read(&mut chunk).unwrap(); + if read == 0 { + break; + } + buffer.extend_from_slice(&chunk[..read]); + if header_end.is_none() + && let Some(position) = buffer.windows(4).position(|window| window == b"\r\n\r\n") + { + header_end = Some(position + 4); + let headers = String::from_utf8_lossy(&buffer[..position]); + content_length = headers.lines().find_map(|line| { + let (name, value) = line.split_once(':')?; + name.eq_ignore_ascii_case("content-length") + .then(|| value.trim().parse::().ok()) + .flatten() + }); + } + if let (Some(header_end), Some(content_length)) = (header_end, content_length) + && buffer.len() >= header_end + content_length + { + break; + } + } + + let header_end = header_end.expect("collector should receive HTTP headers"); + let content_length = content_length.expect("collector should receive content length"); + let request = String::from_utf8_lossy(&buffer[..header_end]).to_string(); + let body = buffer[header_end..header_end + content_length].to_vec(); + stream + .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .unwrap(); + (request, body) + }); + (endpoint, handle) +} + async fn apply_codex_payload(manager: &SessionManager, headers: &HeaderMap, payload: Value) { let outcome = crate::adapters::codex::adapt(payload, headers); manager.apply_events(headers, outcome.events).await.unwrap(); @@ -1471,6 +1565,48 @@ async fn codex_stop_snapshots_atif_without_session_end() { ); } +#[tokio::test] +async fn codex_openinference_spans_match_shared_contract() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let (endpoint, collector) = start_otlp_collector(); + install_test_openinference_plugin(&endpoint).await; + let manager = SessionManager::new(session_test_config()); + let headers = HeaderMap::new(); + + start_codex_prompt_turn(&manager, &headers, "codex-openinference").await; + run_codex_responses_tool_activity(&manager, &headers, "codex-openinference").await; + stop_codex_turn(&manager, &headers, "codex-openinference").await; + + clear_plugin_configuration().unwrap(); + let (request, body) = collector.join().unwrap(); + assert!(request.starts_with("POST /v1/traces HTTP/1.1")); + assert!(request.contains("content-type: application/x-protobuf")); + + let body = String::from_utf8_lossy(&body); + for expected in [ + "openinference.span.kind", + "AGENT", + "LLM", + "TOOL", + "nemo_relay.uuid", + "nemo_relay.parent_uuid", + "codex-turn", + "openai.responses", + "Read", + "gpt-test", + "codex-openinference", + "tool-call-1", + "tool_call.function.arguments", + "Requested tools: Read", + ] { + assert!( + body.contains(expected), + "expected OpenInference export body to contain {expected:?}; body: {body}" + ); + } + assert!(!body.contains("sessionEnd")); +} + #[tokio::test] async fn duplicate_agent_end_does_not_overwrite_atif_with_empty_session() { // Regression test: hermes-agent and other integrations can emit terminal hooks more than once From 605ad755f8fda490edb9e626020ac6fdb81c3360 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Fri, 5 Jun 2026 17:16:22 -0700 Subject: [PATCH 4/4] test(cli): address observability review feedback Signed-off-by: Yuchen Zhang --- Cargo.lock | 2 + crates/cli/Cargo.toml | 2 + crates/cli/tests/coverage/server_tests.rs | 21 ++-- crates/cli/tests/coverage/session_tests.rs | 137 +++++++++++++++++---- 4 files changed, 131 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14a9f718..0b4baafa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1395,6 +1395,8 @@ dependencies = [ "http-body-util", "nemo-relay", "nemo-relay-adaptive", + "opentelemetry-proto", + "prost", "reqwest", "rustls", "serde", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index f500c03a..e5519960 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -50,5 +50,7 @@ toml_edit = "0.23" uuid = { workspace = true, features = ["serde", "v7"] } [dev-dependencies] +opentelemetry-proto = { version = "0.31", default-features = false, features = ["gen-tonic-messages", "trace"] } +prost = "0.14" tempfile = "3" tower = { version = "0.5", features = ["util"] } diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index c51644e4..49452dbb 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -43,6 +43,14 @@ impl Drop for ToolGuardrailCleanup { } } +struct SubscriberCleanup(&'static str); + +impl Drop for SubscriberCleanup { + fn drop(&mut self) { + let _ = deregister_subscriber(self.0); + } +} + fn test_http_client() -> reqwest::Client { crate::tls::install_rustls_crypto_provider(); reqwest::Client::new() @@ -107,7 +115,7 @@ fn test_config() -> GatewayConfig { } } -fn scope_event<'a>( +fn find_scope_event<'a>( events: &'a [Value], name: &str, category: &str, @@ -1187,7 +1195,6 @@ async fn serve_listener_records_codex_stop_atof_contract() { .map(|line| serde_json::from_str::(line).unwrap()) .collect::>(); - assert_eq!(events.len(), 4, "unexpected ATOF events: {events:?}"); assert!(events.iter().all(|event| event["atof_version"] == "0.1")); assert!(!events.iter().any(|event| { event["kind"] == "scope" @@ -1196,8 +1203,8 @@ async fn serve_listener_records_codex_stop_atof_contract() { && event["name"] == "codex" })); - let turn_start = scope_event(&events, "codex-turn", "agent", "start"); - let turn_end = scope_event(&events, "codex-turn", "agent", "end"); + let turn_start = find_scope_event(&events, "codex-turn", "agent", "start"); + let turn_end = find_scope_event(&events, "codex-turn", "agent", "end"); assert_eq!(turn_start["uuid"], turn_end["uuid"]); assert_eq!( turn_start["data"], @@ -1214,8 +1221,8 @@ async fn serve_listener_records_codex_stop_atof_contract() { assert_eq!(turn_end["data"]["hook_event_name"], "Stop"); assert_eq!(turn_end["data"]["response"], "Done."); - let tool_start = scope_event(&events, "Read", "tool", "start"); - let tool_end = scope_event(&events, "Read", "tool", "end"); + let tool_start = find_scope_event(&events, "Read", "tool", "start"); + let tool_end = find_scope_event(&events, "Read", "tool", "end"); assert_eq!(tool_start["uuid"], tool_end["uuid"]); assert_eq!(tool_start["parent_uuid"], turn_start["uuid"]); assert_eq!(tool_end["parent_uuid"], turn_start["uuid"]); @@ -1790,6 +1797,7 @@ async fn gateway_forwards_claude_startup_probe_without_llm_observability() { }), ) .unwrap(); + let _subscriber_cleanup = SubscriberCleanup(subscriber_name); let upstream = spawn_anthropic_upstream().await; let mut config = test_config(); @@ -1833,7 +1841,6 @@ async fn gateway_forwards_claude_startup_probe_without_llm_observability() { captured_llm_starts.lock().unwrap().is_empty(), "Claude startup probe must not emit a managed LLM span" ); - deregister_subscriber(subscriber_name).unwrap(); } async fn wait_for_gateway(url: &str) { diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index a898a87c..a49faf58 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -5,7 +5,12 @@ use axum::http::HeaderMap; use nemo_relay::api::event::ScopeCategory; use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; use nemo_relay::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins}; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::trace::v1::Span; +use prost::Message; use serde_json::json; +use std::collections::HashMap; use std::io::{Read, Write}; use std::net::TcpListener as StdTcpListener; use std::path::Path; @@ -132,6 +137,40 @@ fn start_otlp_collector() -> (String, thread::JoinHandle<(String, Vec)>) { (endpoint, handle) } +fn decode_otlp_spans(body: &[u8]) -> Vec { + let request = ExportTraceServiceRequest::decode(body) + .unwrap_or_else(|error| panic!("expected a valid OTLP trace request: {error}")); + request + .resource_spans + .into_iter() + .flat_map(|resource_span| resource_span.scope_spans) + .flat_map(|scope_span| scope_span.spans) + .collect() +} + +fn any_value_to_string(value: &AnyValue) -> Option { + match value.value.as_ref()? { + any_value::Value::StringValue(value) => Some(value.clone()), + any_value::Value::BoolValue(value) => Some(value.to_string()), + any_value::Value::IntValue(value) => Some(value.to_string()), + any_value::Value::DoubleValue(value) => Some(value.to_string()), + any_value::Value::BytesValue(value) => Some(String::from_utf8_lossy(value).to_string()), + any_value::Value::ArrayValue(_) | any_value::Value::KvlistValue(_) => None, + } +} + +fn otlp_attr_map(attributes: &[KeyValue]) -> HashMap<&str, String> { + attributes + .iter() + .filter_map(|attribute| { + Some(( + attribute.key.as_str(), + any_value_to_string(attribute.value.as_ref()?)?, + )) + }) + .collect() +} + async fn apply_codex_payload(manager: &SessionManager, headers: &HeaderMap, payload: Value) { let outcome = crate::adapters::codex::adapt(payload, headers); manager.apply_events(headers, outcome.events).await.unwrap(); @@ -1499,7 +1538,6 @@ async fn codex_stop_snapshots_atif_without_session_end() { assert_eq!(atif["final_metrics"]["total_steps"], json!(2)); let observed = atif["extra"]["observed_events"].as_array().unwrap(); - assert_eq!(observed.len(), 6); assert!(observed.iter().all(|event| { event["metadata"]["hook_event_name"] != json!("sessionEnd") && event["metadata"]["hook_event_name"] != json!("session_end") @@ -1586,29 +1624,80 @@ async fn codex_openinference_spans_match_shared_contract() { assert!(request.starts_with("POST /v1/traces HTTP/1.1")); assert!(request.contains("content-type: application/x-protobuf")); - let body = String::from_utf8_lossy(&body); - for expected in [ - "openinference.span.kind", - "AGENT", - "LLM", - "TOOL", - "nemo_relay.uuid", - "nemo_relay.parent_uuid", - "codex-turn", - "openai.responses", - "Read", - "gpt-test", - "codex-openinference", - "tool-call-1", - "tool_call.function.arguments", - "Requested tools: Read", - ] { - assert!( - body.contains(expected), - "expected OpenInference export body to contain {expected:?}; body: {body}" - ); - } - assert!(!body.contains("sessionEnd")); + let spans = decode_otlp_spans(&body); + let attributes_by_span = spans + .iter() + .map(|span| (span.name.as_str(), otlp_attr_map(&span.attributes))) + .collect::>(); + let turn_attributes = attributes_by_span + .get("codex-turn") + .expect("Codex turn should export an OpenInference span"); + let llm_attributes = attributes_by_span + .get("openai.responses") + .expect("Codex LLM call should export an OpenInference span"); + let tool_attributes = attributes_by_span + .get("Read") + .expect("Codex tool call should export an OpenInference span"); + + assert_eq!( + turn_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("AGENT") + ); + assert_eq!( + llm_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("LLM") + ); + assert_eq!( + tool_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("TOOL") + ); + assert!(turn_attributes.contains_key("nemo_relay.uuid")); + assert!(llm_attributes.contains_key("nemo_relay.parent_uuid")); + assert!(tool_attributes.contains_key("nemo_relay.parent_uuid")); + let turn_metadata = serde_json::from_str::( + turn_attributes + .get("metadata") + .expect("turn span should include OpenInference metadata"), + ) + .unwrap(); + assert_eq!(turn_metadata["session_id"], json!("codex-openinference")); + assert_eq!( + llm_attributes.get("llm.model_name").map(String::as_str), + Some("gpt-test") + ); + assert_eq!( + tool_attributes + .get("tool_call.function.name") + .map(String::as_str), + Some("Read") + ); + assert_eq!( + tool_attributes + .get("tool_call.function.arguments") + .map(String::as_str), + Some("{\"file_path\":\"README.md\"}") + ); + assert_eq!( + tool_attributes.get("tool_call.id").map(String::as_str), + Some("tool-call-1") + ); + assert!( + llm_attributes + .values() + .any(|value| value.contains("Requested tools: Read")) + ); + assert!( + attributes_by_span + .values() + .flat_map(|attributes| attributes.values()) + .all(|value| !value.contains("sessionEnd")) + ); } #[tokio::test]