diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index 67df47b1..49452dbb 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -43,6 +43,14 @@ impl Drop for ToolGuardrailCleanup { } } +struct SubscriberCleanup(&'static str); + +impl Drop for SubscriberCleanup { + fn drop(&mut self) { + let _ = deregister_subscriber(self.0); + } +} + fn test_http_client() -> reqwest::Client { crate::tls::install_rustls_crypto_provider(); reqwest::Client::new() @@ -107,6 +115,28 @@ fn test_config() -> GatewayConfig { } } +fn find_scope_event<'a>( + events: &'a [Value], + name: &str, + category: &str, + scope_category: &str, +) -> &'a Value { + events + .iter() + .find(|event| { + event["kind"] == "scope" + && event["name"] == name + && event["category"] == category + && event["scope_category"] == scope_category + }) + .unwrap_or_else(|| { + panic!( + "expected {scope_category} {category} scope named {name}, got: {}", + serde_json::to_string_pretty(events).unwrap() + ) + }) +} + #[tokio::test] async fn codex_hook_keeps_codex_response_shape() { let app = router(test_config()); @@ -1075,6 +1105,139 @@ async fn serve_listener_routed_gateway_wire_formats_write_atof_category_profile_ assert_eq!(chat_end["data"]["usage"]["cost_usd"], json!(0.001)); } +#[tokio::test] +async fn serve_listener_records_codex_stop_atof_contract() { + let _guard = PLUGIN_TEST_LOCK.lock().await; + let _ = nemo_relay::plugin::clear_plugin_configuration(); + + let temp = tempfile::tempdir().unwrap(); + let atof_dir = temp.path().join("atof"); + std::fs::create_dir_all(&atof_dir).unwrap(); + let mut config = test_config(); + config.plugin_config = Some(json!({ + "version": 1, + "components": [ + { + "kind": "observability", + "enabled": true, + "config": { + "version": 1, + "atof": { + "enabled": true, + "output_directory": atof_dir, + "filename": "events.jsonl", + "mode": "overwrite" + } + } + } + ] + })); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let url = format!("http://{address}"); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let handle = + tokio::spawn(async move { serve_listener(listener, config, Some(shutdown_rx)).await }); + + wait_for_gateway(&url).await; + let client = test_http_client(); + for payload in [ + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "sessionStart", + "cwd": "/workspace", + "model": "gpt-5.1-codex" + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "PreToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_input": { "file_path": "README.md" } + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "PostToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_output": { "bytes": 42 }, + "status": "success" + }), + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "Stop", + "response": "Done." + }), + ] { + let response = client + .post(format!("{url}/hooks/codex")) + .json(&payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.json::().await.unwrap(), json!({})); + } + + shutdown_tx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + assert!(nemo_relay::plugin::active_plugin_report().is_none()); + + let events = std::fs::read_to_string(temp.path().join("atof/events.jsonl")).unwrap(); + let events = events + .lines() + .map(|line| serde_json::from_str::(line).unwrap()) + .collect::>(); + + assert!(events.iter().all(|event| event["atof_version"] == "0.1")); + assert!(!events.iter().any(|event| { + event["kind"] == "scope" + && event["scope_category"] == "start" + && event["category"] == "agent" + && event["name"] == "codex" + })); + + let turn_start = find_scope_event(&events, "codex-turn", "agent", "start"); + let turn_end = find_scope_event(&events, "codex-turn", "agent", "end"); + assert_eq!(turn_start["uuid"], turn_end["uuid"]); + assert_eq!( + turn_start["data"], + json!({ + "session_id": "codex-atof-session", + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }) + ); + assert_eq!(turn_start["metadata"]["session_id"], "codex-atof-session"); + assert_eq!(turn_start["metadata"]["agent_kind"], "codex"); + assert_eq!(turn_start["metadata"]["nemo_relay_scope_role"], "turn"); + assert_eq!(turn_start["metadata"]["turn_source"], "user_prompt"); + assert_eq!(turn_end["data"]["hook_event_name"], "Stop"); + assert_eq!(turn_end["data"]["response"], "Done."); + + let tool_start = find_scope_event(&events, "Read", "tool", "start"); + let tool_end = find_scope_event(&events, "Read", "tool", "end"); + assert_eq!(tool_start["uuid"], tool_end["uuid"]); + assert_eq!(tool_start["parent_uuid"], turn_start["uuid"]); + assert_eq!(tool_end["parent_uuid"], turn_start["uuid"]); + assert_eq!( + tool_start["category_profile"]["tool_call_id"], + "tool-call-1" + ); + assert_eq!(tool_end["category_profile"]["tool_call_id"], "tool-call-1"); + assert_eq!(tool_start["data"], json!({ "file_path": "README.md" })); + assert_eq!(tool_end["data"], json!({ "bytes": 42 })); + assert_eq!(tool_start["metadata"]["agent_kind"], "codex"); + assert_eq!(tool_end["metadata"]["agent_kind"], "codex"); + assert_eq!(tool_end["metadata"]["status"], "success"); +} + #[tokio::test] async fn serve_listener_activates_any_registered_plugin_kind() { let _guard = PLUGIN_TEST_LOCK.lock().await; @@ -1634,6 +1797,7 @@ async fn gateway_forwards_claude_startup_probe_without_llm_observability() { }), ) .unwrap(); + let _subscriber_cleanup = SubscriberCleanup(subscriber_name); let upstream = spawn_anthropic_upstream().await; let mut config = test_config(); @@ -1677,7 +1841,6 @@ async fn gateway_forwards_claude_startup_probe_without_llm_observability() { captured_llm_starts.lock().unwrap().is_empty(), "Claude startup probe must not emit a managed LLM span" ); - deregister_subscriber(subscriber_name).unwrap(); } async fn wait_for_gateway(url: &str) { diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index f0c27695..2f8480fc 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -69,6 +69,95 @@ fn attr_map(attributes: &[KeyValue]) -> HashMap { .collect() } +async fn apply_codex_payload(manager: &SessionManager, headers: &HeaderMap, payload: Value) { + let outcome = crate::adapters::codex::adapt(payload, headers); + manager.apply_events(headers, outcome.events).await.unwrap(); +} + +async fn start_codex_prompt_turn(manager: &SessionManager, headers: &HeaderMap, session_id: &str) { + for payload in [ + json!({ + "session_id": session_id, + "hook_event_name": "sessionStart", + "model": "gpt-test" + }), + json!({ + "session_id": session_id, + "hook_event_name": "UserPromptSubmit", + "prompt": "Inspect the repository." + }), + ] { + apply_codex_payload(manager, headers, payload).await; + } +} + +async fn run_codex_responses_tool_activity( + manager: &SessionManager, + headers: &HeaderMap, + session_id: &str, +) { + let llm = manager + .start_llm( + headers, + llm_start_with_responses_task(session_id, "Inspect the repository."), + ) + .await + .unwrap(); + manager + .end_llm( + llm, + json!({ + "id": "resp_1", + "status": "completed", + "output": [ + { + "type": "function_call", + "call_id": "tool-call-1", + "name": "Read", + "arguments": "{\"file_path\":\"README.md\"}", + "status": "completed" + } + ] + }), + json!({}), + ) + .await + .unwrap(); + + for payload in [ + json!({ + "session_id": session_id, + "hook_event_name": "PreToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_input": { "file_path": "README.md" } + }), + json!({ + "session_id": session_id, + "hook_event_name": "PostToolUse", + "tool_call_id": "tool-call-1", + "tool_name": "Read", + "tool_output": { "content": "hello" }, + "status": "success" + }), + ] { + apply_codex_payload(manager, headers, payload).await; + } +} + +async fn stop_codex_turn(manager: &SessionManager, headers: &HeaderMap, session_id: &str) { + apply_codex_payload( + manager, + headers, + json!({ + "session_id": session_id, + "hook_event_name": "Stop", + "response": "Done." + }), + ) + .await; +} + fn hermes_routed_gateway_metadata(gateway_path: &str, test_session_marker: Option<&str>) -> Value { let mut metadata = json!({ "gateway_path": gateway_path }); if let Some(marker) = test_session_marker { @@ -1510,6 +1599,194 @@ async fn writes_atif_on_session_end_from_plugin_config() { ); } +#[tokio::test] +async fn codex_stop_snapshots_atif_without_session_end() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let temp = tempfile::tempdir().unwrap(); + let atif_dir = temp.path().join("atif"); + install_test_atif_plugin(&atif_dir).await; + let manager = SessionManager::new(session_test_config()); + let headers = HeaderMap::new(); + + start_codex_prompt_turn(&manager, &headers, "codex-atif-stop").await; + run_codex_responses_tool_activity(&manager, &headers, "codex-atif-stop").await; + assert!( + std::fs::read_dir(&atif_dir).unwrap().next().is_none(), + "Codex ATIF should wait for Stop before writing a per-turn snapshot" + ); + + stop_codex_turn(&manager, &headers, "codex-atif-stop").await; + + clear_plugin_configuration().unwrap(); + let atif = read_atif_for_session(&atif_dir, "codex-atif-stop"); + assert_eq!(atif["schema_version"], json!("ATIF-v1.7")); + assert_eq!(atif["trajectory_id"], atif["session_id"]); + assert!(atif["subagent_trajectories"].is_null()); + assert_eq!(atif["final_metrics"]["total_steps"], json!(2)); + + let observed = atif["extra"]["observed_events"].as_array().unwrap(); + assert!(observed.iter().all(|event| { + event["metadata"]["hook_event_name"] != json!("sessionEnd") + && event["metadata"]["hook_event_name"] != json!("session_end") + })); + let turn_start = observed + .iter() + .find(|event| { + event["name"] == "codex-turn" + && event["category"] == "agent" + && event["scope_category"] == "start" + }) + .expect("Codex turn start should be observed"); + let turn_end = observed + .iter() + .find(|event| { + event["name"] == "codex-turn" + && event["category"] == "agent" + && event["scope_category"] == "end" + }) + .expect("Codex Stop should close the turn scope"); + assert_eq!(turn_start["uuid"], atif["session_id"]); + assert_eq!(turn_end["uuid"], atif["session_id"]); + assert_eq!( + turn_end["data"]["output"][0]["call_id"], + json!("tool-call-1") + ); + + let steps = atif["steps"].as_array().unwrap(); + assert_eq!(steps.len(), 2); + assert_eq!(steps[0]["source"], json!("user")); + assert_eq!(steps[0]["message"], json!("Inspect the repository.")); + assert_eq!( + steps[0]["extra"]["ancestry"]["parent_name"], + json!("codex-turn") + ); + assert_eq!(steps[1]["source"], json!("agent")); + assert_eq!(steps[1]["model_name"], json!("gpt-test")); + assert_eq!(steps[1]["llm_call_count"], json!(1)); + assert_eq!( + steps[1]["tool_calls"][0], + json!({ + "tool_call_id": "tool-call-1", + "function_name": "Read", + "arguments": { "file_path": "README.md" }, + "extra": { "status": "completed" } + }) + ); + assert_eq!( + steps[1]["observation"]["results"][0]["source_call_id"], + json!("tool-call-1") + ); + assert_eq!( + steps[1]["observation"]["results"][0]["content"], + json!(null) + ); + assert_eq!( + steps[1]["observation"]["results"][0]["extra"]["tool_result"], + json!({ "content": "hello" }) + ); + assert_eq!( + steps[1]["extra"]["tool_ancestry"][0]["parent_name"], + json!("codex-turn") + ); + assert_eq!( + steps[1]["extra"]["tool_invocations"][0]["invocation_id"], + json!("tool-call-1") + ); +} + +#[tokio::test] +async fn codex_openinference_spans_match_shared_contract() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let subscriber_name = "cli-codex-openinference-test"; + let _ = deregister_subscriber(subscriber_name); + let (subscriber, exporter) = make_openinference_test_subscriber("codex-test-scope"); + subscriber.register(subscriber_name).unwrap(); + let manager = SessionManager::new(session_test_config()); + let headers = HeaderMap::new(); + + start_codex_prompt_turn(&manager, &headers, "codex-openinference").await; + run_codex_responses_tool_activity(&manager, &headers, "codex-openinference").await; + stop_codex_turn(&manager, &headers, "codex-openinference").await; + + subscriber.force_flush().unwrap(); + assert!(subscriber.deregister(subscriber_name).unwrap()); + + let spans = exporter.get_finished_spans().unwrap(); + let attributes_by_span = spans + .iter() + .map(|span| (span.name.as_ref(), attr_map(&span.attributes))) + .collect::>(); + let turn_attributes = attributes_by_span + .get("codex-turn") + .expect("Codex turn should export an OpenInference span"); + let llm_attributes = attributes_by_span + .get("openai.responses") + .expect("Codex LLM call should export an OpenInference span"); + let tool_attributes = attributes_by_span + .get("Read") + .expect("Codex tool call should export an OpenInference span"); + + assert_eq!( + turn_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("AGENT") + ); + assert_eq!( + llm_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("LLM") + ); + assert_eq!( + tool_attributes + .get("openinference.span.kind") + .map(String::as_str), + Some("TOOL") + ); + assert!(turn_attributes.contains_key("nemo_relay.uuid")); + assert!(llm_attributes.contains_key("nemo_relay.parent_uuid")); + assert!(tool_attributes.contains_key("nemo_relay.parent_uuid")); + let turn_metadata = serde_json::from_str::( + turn_attributes + .get("metadata") + .expect("turn span should include OpenInference metadata"), + ) + .unwrap(); + assert_eq!(turn_metadata["session_id"], json!("codex-openinference")); + assert_eq!( + llm_attributes.get("llm.model_name").map(String::as_str), + Some("gpt-test") + ); + assert_eq!( + tool_attributes + .get("tool_call.function.name") + .map(String::as_str), + Some("Read") + ); + assert_eq!( + tool_attributes + .get("tool_call.function.arguments") + .map(String::as_str), + Some("{\"file_path\":\"README.md\"}") + ); + assert_eq!( + tool_attributes.get("tool_call.id").map(String::as_str), + Some("tool-call-1") + ); + assert!( + llm_attributes + .values() + .any(|value| value.contains("Requested tools: Read")) + ); + assert!( + attributes_by_span + .values() + .flat_map(|attributes| attributes.values()) + .all(|value| !value.contains("sessionEnd")) + ); +} + #[tokio::test] async fn duplicate_agent_end_does_not_overwrite_atif_with_empty_session() { // Regression test: hermes-agent and other integrations can emit terminal hooks more than once