diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index e205b83c..fd557468 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,6 +22,8 @@ guardrails-remote = [ ] object-store = [ "dep:object_store", + "dep:reqwest", + "dep:rustls", "tokio/net", "tokio/time", ] diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index eecc3c77..1cf48eeb 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -21,7 +21,7 @@ use std::future::Future; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; -#[cfg(any(feature = "otel", feature = "openinference"))] +#[cfg(any(feature = "otel", feature = "openinference", feature = "object-store"))] use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -249,6 +249,8 @@ impl Default for AtifSectionConfig { #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[serde(tag = "type", rename_all = "snake_case")] pub enum AtifStorageConfig { + /// HTTP endpoint storage. + Http(HttpStorageConfig), /// S3-compatible object storage. /// /// Non-secret connection settings (`region`, `endpoint_url`, `allow_http`) @@ -305,6 +307,28 @@ pub struct S3StorageConfig { pub allow_http: Option, } +/// HTTP endpoint settings for ATIF trajectory upload. +/// +/// Completed trajectories are uploaded with `POST` and an +/// `application/json` body. Inline `headers` are merged with values resolved +/// from `header_env`; `header_env` values are environment variable names, not +/// secret values. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +pub struct HttpStorageConfig { + /// Destination endpoint URL. Must use `http://` or `https://`. + pub endpoint: String, + /// Static request headers. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub headers: HashMap, + /// Request headers whose values are read from environment variables. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub header_env: HashMap, + /// Request timeout in milliseconds. + #[serde(default = "default_timeout_millis")] + pub timeout_millis: u64, +} + /// Shared OTLP exporter config for OpenTelemetry and OpenInference. /// /// The `opentelemetry` and `openinference` sections share the same shape but @@ -739,8 +763,7 @@ struct AtifDispatcher { /// observing further events. fatal_error: Option, /// Per-sink last error. A sink that recorded an error is skipped on - /// subsequent trajectories; other sinks continue to receive writes. Errors - /// here are surfaced together by [`last_error_result`] on teardown. + /// subsequent trajectories; other sinks continue to receive writes. sink_errors: HashMap, } @@ -755,6 +778,11 @@ struct ManagedAtifExporter { struct PendingAtifWrite { agent_uuid: Uuid, + #[cfg_attr( + not(all(feature = "object-store", not(target_arch = "wasm32"))), + allow(dead_code) + )] + session_id: String, // `filename` is consumed by the remote upload path, which is gated on the // object-store feature; without it, only the local sink reads `local_path`. #[cfg_attr( @@ -780,22 +808,6 @@ enum SinkLabel { Remote(usize), } -impl SinkLabel { - fn display(&self) -> String { - match self { - SinkLabel::Local => "local".to_string(), - SinkLabel::Remote(index) => format!("storage[{index}]"), - } - } - - fn sort_key(&self) -> isize { - match self { - SinkLabel::Local => -1, - SinkLabel::Remote(index) => *index as isize, - } - } -} - impl AtifDispatcher { fn new(config: AtifSectionConfig) -> Self { Self { @@ -963,20 +975,10 @@ impl AtifDispatcher { } fn last_error_result(&self) -> std::io::Result<()> { - let mut parts: Vec = Vec::new(); if let Some(message) = &self.fatal_error { - parts.push(message.clone()); - } - let mut sink_entries: Vec<_> = self.sink_errors.iter().collect(); - sink_entries.sort_by_key(|(label, _)| label.sort_key()); - for (label, message) in sink_entries { - parts.push(format!("{}: {message}", label.display())); - } - if parts.is_empty() { - Ok(()) - } else { - Err(std::io::Error::other(parts.join("; "))) + return Err(std::io::Error::other(message.clone())); } + Ok(()) } fn agent_info(&self) -> AtifAgentInfo { @@ -1104,6 +1106,7 @@ fn prepare_atif_file( agent.written = true; Ok(PendingAtifWrite { agent_uuid, + session_id: agent_uuid.to_string(), filename: agent.filename.clone(), local_path: agent.local_path.clone(), payload, @@ -1148,7 +1151,7 @@ fn write_atif_remote( let sink = storage .get(index) .ok_or_else(|| std::io::Error::other(format!("ATIF storage[{index}] is not registered")))?; - sink.put(&write.filename, &write.payload) + sink.put(&write.filename, &write.session_id, &write.payload) } #[cfg(not(all(feature = "object-store", not(target_arch = "wasm32"))))] @@ -1523,6 +1526,47 @@ fn validate_atif_storage_values( storage: &AtifStorageConfig, ) { match storage { + AtifStorageConfig::Http(http) => { + validate_atif_http_endpoint( + diagnostics, + policy, + &format!("storage[{index}].endpoint"), + &http.endpoint, + ); + if http.timeout_millis == 0 { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atif".to_string()), + Some(format!("storage[{index}].timeout_millis")), + format!("ATIF storage[{index}].timeout_millis must be positive"), + ); + } + for (header, value) in &http.headers { + validate_atif_http_header( + diagnostics, + policy, + &format!("storage[{index}].headers.{header}"), + header, + value, + ); + } + for (header, var_name) in &http.header_env { + validate_atif_http_header_name( + diagnostics, + policy, + &format!("storage[{index}].header_env.{header}"), + header, + ); + validate_atif_storage_env_var( + diagnostics, + policy, + &format!("storage[{index}].header_env.{header}"), + Some(var_name.as_str()), + ); + } + } AtifStorageConfig::S3(s3) => { if s3.bucket.trim().is_empty() { push_policy_diag( @@ -1550,6 +1594,85 @@ fn validate_atif_storage_values( } } +fn validate_atif_http_header( + diagnostics: &mut Vec, + policy: &ConfigPolicy, + field: &str, + header: &str, + _value: &str, +) { + validate_atif_http_header_name(diagnostics, policy, field, header); + #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] + if let Err(err) = reqwest::header::HeaderValue::from_str(_value) { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atif".to_string()), + Some(field.to_string()), + format!("ATIF {field} value is invalid: {err}"), + ); + } +} + +fn validate_atif_http_header_name( + diagnostics: &mut Vec, + policy: &ConfigPolicy, + field: &str, + header: &str, +) { + #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] + let is_valid = reqwest::header::HeaderName::from_bytes(header.as_bytes()).is_ok(); + #[cfg(not(all(feature = "object-store", not(target_arch = "wasm32"))))] + let is_valid = !header.trim().is_empty() && header.trim() == header; + if !is_valid { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atif".to_string()), + Some(field.to_string()), + format!("ATIF {field} header name '{header}' is invalid"), + ); + } +} + +fn validate_atif_http_endpoint( + diagnostics: &mut Vec, + policy: &ConfigPolicy, + field: &str, + endpoint: &str, +) { + let trimmed = endpoint.trim(); + let mut is_valid = !trimmed.is_empty() && trimmed == endpoint; + #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] + { + is_valid = is_valid + && reqwest::Url::parse(endpoint) + .map(|url| matches!(url.scheme(), "http" | "https") && url.host_str().is_some()) + .unwrap_or(false); + } + #[cfg(not(all(feature = "object-store", not(target_arch = "wasm32"))))] + { + let valid_scheme = trimmed.starts_with("http://") || trimmed.starts_with("https://"); + let has_host = trimmed + .split_once("://") + .map(|(_, rest)| !rest.is_empty() && !rest.starts_with('/')) + .unwrap_or(false); + is_valid = is_valid && valid_scheme && has_host; + } + if !is_valid { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atif".to_string()), + Some(field.to_string()), + format!("ATIF {field} must be a valid http:// or https:// URL"), + ); + } +} + fn validate_atif_storage_env_var( diagnostics: &mut Vec, policy: &ConfigPolicy, @@ -1739,10 +1862,20 @@ struct AtifRemoteStorage { #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] struct AtifUploadRequest { key: String, + filename: String, + session_id: String, payload: Vec, reply: std::sync::mpsc::Sender>, } +#[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] +#[derive(Clone)] +struct HttpUploadConfig { + endpoint: String, + headers: HashMap, + timeout: Duration, +} + #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] #[derive(Default)] struct S3BuilderOverrides { @@ -1824,10 +1957,77 @@ fn resolve_env_var_field(field: &str, var_name: Option<&str>) -> std::io::Result impl AtifRemoteStorage { fn from_config(index: usize, config: &AtifStorageConfig) -> std::io::Result { match config { + AtifStorageConfig::Http(http) => Self::build_http(index, http), AtifStorageConfig::S3(s3) => Self::build_s3(index, s3), } } + fn build_http(index: usize, http: &HttpStorageConfig) -> std::io::Result { + let upload_config = HttpUploadConfig::resolve(index, http)?; + let (req_tx, req_rx) = std::sync::mpsc::channel::(); + let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); + + std::thread::Builder::new() + .name("nemo-relay-atif-storage".to_string()) + .spawn(move || { + install_rustls_crypto_provider(); + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => { + let _ = ready_tx.send(Err(std::io::Error::other(format!( + "failed to build ATIF storage runtime: {err}" + )))); + return; + } + }; + let client = match reqwest::Client::builder() + .timeout(upload_config.timeout) + .build() + { + Ok(client) => client, + Err(err) => { + let _ = ready_tx.send(Err(std::io::Error::other(format!( + "failed to build HTTP client for ATIF storage[{}]: {err}", + index + )))); + return; + } + }; + if ready_tx.send(Ok(())).is_err() { + return; + } + drop(ready_tx); + + while let Ok(request) = req_rx.recv() { + let result = runtime.block_on(post_atif_http( + &client, + &upload_config, + request.filename, + request.session_id, + request.payload, + )); + let _ = request.reply.send(result); + } + }) + .map_err(|err| { + std::io::Error::other(format!("failed to spawn ATIF storage thread: {err}")) + })?; + + match ready_rx.recv() { + Ok(Ok(())) => Ok(Self { + sender: req_tx, + key_prefix: String::new(), + }), + Ok(Err(err)) => Err(err), + Err(_) => Err(std::io::Error::other( + "ATIF storage thread exited before signalling readiness", + )), + } + } + fn build_s3(index: usize, s3: &S3StorageConfig) -> std::io::Result { let bucket = s3.bucket.clone(); let key_prefix = normalize_storage_key_prefix(s3.key_prefix.as_deref()); @@ -1839,6 +2039,7 @@ impl AtifRemoteStorage { std::thread::Builder::new() .name("nemo-relay-atif-storage".to_string()) .spawn(move || { + install_rustls_crypto_provider(); let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -1905,12 +2106,14 @@ impl AtifRemoteStorage { } } - fn put(&self, filename: &str, payload: &[u8]) -> std::io::Result<()> { + fn put(&self, filename: &str, session_id: &str, payload: &[u8]) -> std::io::Result<()> { let key = format!("{}{}", self.key_prefix, filename); let (reply_tx, reply_rx) = std::sync::mpsc::channel(); self.sender .send(AtifUploadRequest { key, + filename: filename.to_string(), + session_id: session_id.to_string(), payload: payload.to_vec(), reply: reply_tx, }) @@ -1921,6 +2124,109 @@ impl AtifRemoteStorage { } } +#[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] +impl HttpUploadConfig { + fn resolve(index: usize, http: &HttpStorageConfig) -> std::io::Result { + let endpoint = http.endpoint.trim(); + if endpoint.is_empty() || endpoint != http.endpoint { + return Err(std::io::Error::other(format!( + "ATIF storage[{index}].endpoint must be non-empty and must not have surrounding whitespace" + ))); + } + let parsed = reqwest::Url::parse(endpoint).map_err(|err| { + std::io::Error::other(format!( + "ATIF storage[{index}].endpoint must be a valid URL: {err}" + )) + })?; + if !matches!(parsed.scheme(), "http" | "https") || parsed.host_str().is_none() { + return Err(std::io::Error::other(format!( + "ATIF storage[{index}].endpoint must be a valid http:// or https:// URL" + ))); + } + if http.timeout_millis == 0 { + return Err(std::io::Error::other(format!( + "ATIF storage[{index}].timeout_millis must be positive" + ))); + } + + let mut headers = http.headers.clone(); + for (header, var_name) in &http.header_env { + let value = resolve_env_var_field( + &format!("storage[{index}].header_env.{header}"), + Some(var_name.as_str()), + )? + .expect("resolve_env_var_field returns Some when var_name is Some"); + headers.insert(header.clone(), value); + } + validate_http_headers(index, &headers)?; + + Ok(Self { + endpoint: parsed.to_string(), + headers, + timeout: Duration::from_millis(http.timeout_millis), + }) + } +} + +#[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] +fn validate_http_headers(index: usize, headers: &HashMap) -> std::io::Result<()> { + for (header, value) in headers { + reqwest::header::HeaderName::from_bytes(header.as_bytes()).map_err(|err| { + std::io::Error::other(format!( + "ATIF storage[{index}] header name '{header}' is invalid: {err}" + )) + })?; + reqwest::header::HeaderValue::from_str(value).map_err(|err| { + std::io::Error::other(format!( + "ATIF storage[{index}] value for header '{header}' is invalid: {err}" + )) + })?; + } + Ok(()) +} + +#[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] +async fn post_atif_http( + client: &reqwest::Client, + config: &HttpUploadConfig, + filename: String, + session_id: String, + payload: Vec, +) -> std::io::Result<()> { + let mut request = client.post(&config.endpoint); + for (header, value) in &config.headers { + request = request.header(header.as_str(), value.as_str()); + } + let response = request + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header("x-nemo-relay-atif-filename", filename.clone()) + .header("x-nemo-relay-atif-session-id", session_id) + .body(payload) + .send() + .await + .map_err(|err| { + std::io::Error::other(format!( + "HTTP ATIF upload to '{}' failed: {err}", + config.endpoint + )) + })?; + if response.status().is_success() { + Ok(()) + } else { + Err(std::io::Error::other(format!( + "HTTP ATIF upload to '{}' for '{}' failed with status {}", + config.endpoint, + filename, + response.status() + ))) + } +} + +#[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] +fn install_rustls_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); +} + #[cfg(all(feature = "object-store", not(target_arch = "wasm32")))] fn normalize_storage_key_prefix(raw: Option<&str>) -> String { let trimmed = raw.unwrap_or("").trim(); diff --git a/crates/core/tests/integration/atif_storage_tests.rs b/crates/core/tests/integration/atif_storage_tests.rs index eecb68b3..68a471a3 100644 --- a/crates/core/tests/integration/atif_storage_tests.rs +++ b/crates/core/tests/integration/atif_storage_tests.rs @@ -20,6 +20,7 @@ use nemo_relay::api::runtime::{ NemoRelayContextState, create_scope_stack, global_context, set_thread_scope_stack, }; use nemo_relay::api::scope::{PopScopeParams, PushScopeParams, ScopeType, pop_scope, push_scope}; +use nemo_relay::api::subscriber::flush_subscribers; use nemo_relay::observability::plugin_component::OBSERVABILITY_PLUGIN_KIND; use nemo_relay::plugin::{ PluginComponentSpec, PluginConfig, clear_plugin_configuration, initialize_plugins, @@ -28,9 +29,24 @@ use object_store::{ObjectStore, ObjectStoreExt as _}; use serde_json::{Value as Json, json}; use uuid::Uuid; +#[derive(Debug)] +struct CapturedHttpRequest { + method: String, + path: String, + headers: std::collections::HashMap, + body: Vec, +} + +struct TestHttpServer { + base_url: String, + received: std::sync::Arc>>, + handle: std::thread::JoinHandle<()>, +} + const RUN_ENV: &str = "NEMO_RELAY_RUN_S3_TESTS"; const BUCKET_ENV: &str = "NEMO_RELAY_S3_TEST_BUCKET"; const KEY_PREFIX_ENV: &str = "NEMO_RELAY_S3_TEST_KEY_PREFIX"; +static PLUGIN_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); fn env_value_is_truthy(value: Option<&str>) -> bool { matches!( @@ -99,6 +115,164 @@ fn build_observability_config(bucket: &str, key_prefix: &str) -> PluginConfig { } } +fn build_http_observability_config(endpoints: &[String]) -> PluginConfig { + let storage = endpoints + .iter() + .map(|endpoint| { + json!({ + "type": "http", + "endpoint": endpoint, + "headers": {"x-static": "static-value"}, + "header_env": {"authorization": "NEMO_RELAY_ATIF_HTTP_TEST_TOKEN"}, + "timeout_millis": 3000 + }) + }) + .collect::>(); + let Json::Object(component_config) = json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": storage + } + }) else { + unreachable!("config builder produced non-object root") + }; + PluginConfig { + version: 1, + components: vec![PluginComponentSpec { + kind: OBSERVABILITY_PLUGIN_KIND.to_string(), + enabled: true, + config: component_config, + }], + policy: Default::default(), + } +} + +fn read_http_request(stream: &mut std::net::TcpStream) -> CapturedHttpRequest { + use std::io::Read; + + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set read timeout"); + let mut buffer = Vec::new(); + let mut chunk = [0_u8; 1024]; + let mut expected_len = None; + loop { + let read = stream.read(&mut chunk).expect("read HTTP request"); + if read == 0 { + break; + } + buffer.extend_from_slice(&chunk[..read]); + if expected_len.is_none() + && let Some(header_end) = buffer.windows(4).position(|window| window == b"\r\n\r\n") + { + let header_text = String::from_utf8_lossy(&buffer[..header_end]); + let content_length = header_text + .lines() + .find_map(|line| { + let (name, value) = line.split_once(':')?; + name.eq_ignore_ascii_case("content-length") + .then(|| value.trim().parse::().ok()) + .flatten() + }) + .unwrap_or(0); + expected_len = Some(header_end + 4 + content_length); + } + if let Some(len) = expected_len + && buffer.len() >= len + { + break; + } + } + + let header_end = buffer + .windows(4) + .position(|window| window == b"\r\n\r\n") + .expect("request should contain headers"); + let header_text = String::from_utf8_lossy(&buffer[..header_end]); + let mut lines = header_text.lines(); + let request_line = lines.next().expect("request line"); + let method = request_line + .split_whitespace() + .next() + .expect("request method") + .to_string(); + let path = request_line + .split_whitespace() + .nth(1) + .expect("request path") + .to_string(); + let mut headers = std::collections::HashMap::new(); + for line in lines { + if let Some((name, value)) = line.split_once(':') { + headers.insert(name.to_ascii_lowercase(), value.trim().to_string()); + } + } + CapturedHttpRequest { + method, + path, + headers, + body: buffer[header_end + 4..].to_vec(), + } +} + +fn write_http_response(stream: &mut std::net::TcpStream, status: u16) { + use std::io::Write; + + let reason = match status { + 200 => "OK", + 204 => "No Content", + 500 => "Internal Server Error", + _ => "OK", + }; + write!( + stream, + "HTTP/1.1 {status} {reason}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n" + ) + .expect("write HTTP response"); +} + +fn start_http_server( + expected_requests: usize, + statuses: Vec<(&'static str, u16)>, +) -> TestHttpServer { + let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind test HTTP server"); + listener.set_nonblocking(true).expect("set nonblocking"); + let base_url = format!("http://{}", listener.local_addr().expect("local addr")); + let statuses = statuses + .into_iter() + .map(|(path, status)| (path.to_string(), status)) + .collect::>(); + let received = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let thread_received = std::sync::Arc::clone(&received); + let handle = std::thread::spawn(move || { + let deadline = std::time::Instant::now() + Duration::from_secs(2); + while thread_received.lock().unwrap().len() < expected_requests + && std::time::Instant::now() < deadline + { + match listener.accept() { + Ok((mut stream, _)) => { + stream.set_nonblocking(false).expect("set stream blocking"); + let request = read_http_request(&mut stream); + let status = statuses.get(&request.path).copied().unwrap_or(204); + write_http_response(&mut stream, status); + thread_received.lock().unwrap().push(request); + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => panic!("test HTTP server accept failed: {err}"), + } + } + }); + + TestHttpServer { + base_url, + received, + handle, + } +} + fn read_object_with_retries( runtime: &tokio::runtime::Runtime, store: &dyn ObjectStore, @@ -163,6 +337,7 @@ fn cleanup_prefix(runtime: &tokio::runtime::Runtime, store: &dyn ObjectStore, ke #[test] fn atif_storage_uploads_trajectory_to_s3() { + let _guard = PLUGIN_TEST_LOCK.lock().unwrap(); if !run_tests_enabled() { eprintln!( "SKIP: set {RUN_ENV} to a truthy value (for example, {RUN_ENV}=1) to run ATIF S3 storage tests" @@ -212,6 +387,145 @@ fn atif_storage_uploads_trajectory_to_s3() { cleanup_prefix(&runtime, store.as_ref(), &key_prefix); } +#[test] +fn atif_storage_posts_trajectory_to_http_endpoints() { + let _guard = PLUGIN_TEST_LOCK.lock().unwrap(); + reset_runtime(); + let server = start_http_server(2, vec![("/primary", 204), ("/secondary", 204)]); + // SAFETY: this uniquely named env var is only touched by this test. + unsafe { + std::env::set_var("NEMO_RELAY_ATIF_HTTP_TEST_TOKEN", "Bearer test-token"); + } + + let config = build_http_observability_config(&[ + format!("{}/primary", server.base_url), + format!("{}/secondary", server.base_url), + ]); + futures::executor::block_on(initialize_plugins(config)) + .expect("observability plugin should initialize with HTTP storage"); + + let handle = push_scope( + PushScopeParams::builder() + .name("atif-http-storage-integration") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("push agent scope"); + let session_id = handle.uuid; + pop_scope(PopScopeParams::builder().handle_uuid(&handle.uuid).build()) + .expect("pop agent scope"); + flush_subscribers().expect("HTTP upload subscriber should flush"); + + clear_plugin_configuration().expect("plugin teardown should succeed after HTTP uploads"); + server + .handle + .join() + .expect("HTTP server thread should finish"); + // SAFETY: cleanup of test-only env var. + unsafe { + std::env::remove_var("NEMO_RELAY_ATIF_HTTP_TEST_TOKEN"); + } + + let requests = server.received.lock().unwrap(); + assert_eq!(requests.len(), 2); + let mut paths = requests + .iter() + .map(|request| request.path.as_str()) + .collect::>(); + paths.sort_unstable(); + assert_eq!(paths, vec!["/primary", "/secondary"]); + for request in requests.iter() { + assert_eq!(request.method, "POST"); + assert_eq!( + request.headers.get("content-type").map(String::as_str), + Some("application/json") + ); + assert_eq!( + request + .headers + .get("x-nemo-relay-atif-filename") + .map(String::as_str), + Some(format!("trajectory-{session_id}.json").as_str()) + ); + assert_eq!( + request + .headers + .get("x-nemo-relay-atif-session-id") + .map(String::as_str), + Some(session_id.to_string().as_str()) + ); + assert_eq!( + request.headers.get("authorization").map(String::as_str), + Some("Bearer test-token") + ); + assert_eq!( + request.headers.get("x-static").map(String::as_str), + Some("static-value") + ); + let value: Json = serde_json::from_slice(&request.body).expect("HTTP body should be JSON"); + assert_eq!(value["schema_version"].as_str(), Some("ATIF-v1.7")); + assert_eq!( + value["session_id"].as_str(), + Some(session_id.to_string().as_str()) + ); + } +} + +#[test] +fn atif_storage_http_non_2xx_marks_sink_unhealthy() { + let _guard = PLUGIN_TEST_LOCK.lock().unwrap(); + reset_runtime(); + let server = start_http_server(2, vec![("/fail", 500)]); + // SAFETY: this uniquely named env var is only touched by this test. + unsafe { + std::env::set_var("NEMO_RELAY_ATIF_HTTP_TEST_TOKEN", "Bearer test-token"); + } + + let config = build_http_observability_config(&[format!("{}/fail", server.base_url)]); + futures::executor::block_on(initialize_plugins(config)) + .expect("observability plugin should initialize with HTTP storage"); + + let handle = push_scope( + PushScopeParams::builder() + .name("atif-http-storage-failure") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("push agent scope"); + pop_scope(PopScopeParams::builder().handle_uuid(&handle.uuid).build()) + .expect("pop agent scope"); + flush_subscribers().expect("HTTP upload subscriber should flush"); + + let second = push_scope( + PushScopeParams::builder() + .name("atif-http-storage-after-failure") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("push second agent scope"); + pop_scope(PopScopeParams::builder().handle_uuid(&second.uuid).build()) + .expect("pop second agent scope"); + flush_subscribers().expect("HTTP upload subscriber should flush after failure"); + + server + .handle + .join() + .expect("HTTP server thread should finish"); + { + let requests = server.received.lock().unwrap(); + assert_eq!(requests.len(), 1); + assert_eq!(requests[0].method, "POST"); + assert_eq!(requests[0].path, "/fail"); + } + clear_plugin_configuration().expect("plugin teardown should ignore unhealthy sink errors"); + // SAFETY: cleanup of test-only env var. + unsafe { + std::env::remove_var("NEMO_RELAY_ATIF_HTTP_TEST_TOKEN"); + } + + reset_runtime(); +} + #[test] fn s3_test_env_truthy_parsing() { assert!(!env_value_is_truthy(None)); diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index abb41416..946c2cf3 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -1010,7 +1010,7 @@ fn atif_dispatcher_records_failed_agent_writes() { .map(String::as_str), Some("disk full") ); - assert!(dispatcher.last_error_result().is_err()); + assert!(dispatcher.last_error_result().is_ok()); drop(dispatcher); pop(&agent); } @@ -1166,6 +1166,7 @@ fn atif_storage_section_parses_s3_variant() { .expect("valid storage section should parse"); assert_eq!(parsed.storage.len(), 1); match &parsed.storage[0] { + AtifStorageConfig::Http(_) => panic!("expected s3 storage"), AtifStorageConfig::S3(s3) => { assert_eq!(s3.bucket, "my-bucket"); assert_eq!(s3.key_prefix.as_deref(), Some("openshell/")); @@ -1173,6 +1174,38 @@ fn atif_storage_section_parses_s3_variant() { } } +#[test] +fn atif_storage_section_parses_http_variant() { + let parsed: AtifSectionConfig = serde_json::from_value(json!({ + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "timeout_millis": 1500, + "headers": {"x-static": "value"}, + "header_env": {"authorization": "NEMO_RELAY_ATIF_HTTP_TOKEN"} + }] + })) + .expect("valid HTTP storage section should parse"); + assert_eq!(parsed.storage.len(), 1); + match &parsed.storage[0] { + AtifStorageConfig::Http(http) => { + assert_eq!(http.endpoint, "https://example.com/atif"); + assert_eq!(http.timeout_millis, 1500); + assert_eq!( + http.headers.get("x-static").map(String::as_str), + Some("value") + ); + assert_eq!( + http.header_env.get("authorization").map(String::as_str), + Some("NEMO_RELAY_ATIF_HTTP_TOKEN") + ); + } + AtifStorageConfig::S3(_) => panic!("expected HTTP storage"), + } +} + #[test] fn atif_storage_section_rejects_single_table() { let err = serde_json::from_value::(json!({ @@ -1197,19 +1230,20 @@ fn atif_storage_section_parses_array_of_tables() { "filename_template": "trajectory-{session_id}.json", "storage": [ {"type": "s3", "bucket": "primary", "key_prefix": "p/"}, - {"type": "s3", "bucket": "archive", "endpoint_url": "http://minio:9000"} + {"type": "http", "endpoint": "http://127.0.0.1:3000/atif"} ] })) .expect("array-of-tables form should parse"); assert_eq!(parsed.storage.len(), 2); match &parsed.storage[0] { + AtifStorageConfig::Http(_) => panic!("expected s3 storage"), AtifStorageConfig::S3(s3) => assert_eq!(s3.bucket, "primary"), } match &parsed.storage[1] { - AtifStorageConfig::S3(s3) => { - assert_eq!(s3.bucket, "archive"); - assert_eq!(s3.endpoint_url.as_deref(), Some("http://minio:9000")); + AtifStorageConfig::Http(http) => { + assert_eq!(http.endpoint, "http://127.0.0.1:3000/atif"); } + AtifStorageConfig::S3(_) => panic!("expected HTTP storage"), } } @@ -1283,6 +1317,210 @@ fn atif_storage_diagnostics_carry_sink_index() { ); } +#[test] +fn atif_storage_empty_http_endpoint_is_rejected() { + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{"type": "http", "endpoint": " "}] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].endpoint")), + "expected diagnostic for empty endpoint: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_malformed_http_endpoint_is_rejected() { + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{"type": "http", "endpoint": "ftp://example.com/atif"}] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].endpoint")), + "expected diagnostic for malformed endpoint: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_timeout_must_be_positive() { + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "timeout_millis": 0 + }] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].timeout_millis")), + "expected diagnostic for non-positive timeout: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_invalid_literal_header_name_is_rejected() { + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "headers": {"bad header": "value"} + }] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].headers.bad header")), + "expected diagnostic for invalid header name: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_invalid_literal_header_value_is_rejected() { + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "headers": {"x-bad": "bad\nvalue"} + }] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].headers.x-bad")), + "expected diagnostic for invalid header value: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_header_env_missing_env_is_rejected() { + let var_name = "NEMO_RELAY_TEST_ATIF_HTTP_HEADER_MISSING_ZZZZ"; + // SAFETY: tests in this binary do not concurrently observe this uniquely + // named env var, so removing it is safe. + unsafe { + std::env::remove_var(var_name); + } + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "header_env": {"authorization": var_name} + }] + } + }))); + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].header_env.authorization")), + "expected diagnostic for missing header env var: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_header_env_empty_env_is_rejected() { + let var_name = "NEMO_RELAY_TEST_ATIF_HTTP_HEADER_EMPTY_ZZZZ"; + // SAFETY: this uniquely named env var is only touched by this test. + unsafe { + std::env::set_var(var_name, ""); + } + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "header_env": {"authorization": var_name} + }] + } + }))); + // SAFETY: cleanup of test-only env var. + unsafe { + std::env::remove_var(var_name); + } + assert!(report.has_errors()); + assert!( + report + .diagnostics + .iter() + .any(|diag| diag.field.as_deref() == Some("storage[0].header_env.authorization")), + "expected diagnostic for empty header env var: {:?}", + report.diagnostics + ); +} + +#[test] +fn atif_storage_http_header_env_present_env_is_accepted() { + let var_name = "NEMO_RELAY_TEST_ATIF_HTTP_HEADER_OK_ZZZZ"; + // SAFETY: this uniquely named env var is only touched by this test. + unsafe { + std::env::set_var(var_name, "Bearer test-token"); + } + let report = validate_plugin_config(&plugin_config(json!({ + "atif": { + "enabled": true, + "filename_template": "trajectory-{session_id}.json", + "storage": [{ + "type": "http", + "endpoint": "https://example.com/atif", + "header_env": {"authorization": var_name} + }] + } + }))); + // SAFETY: cleanup of test-only env var. + unsafe { + std::env::remove_var(var_name); + } + assert!( + !report.has_errors(), + "validation should pass when header env var is set: {:?}", + report.diagnostics + ); +} + #[test] fn atif_storage_editor_field_is_optional_json() { let schema = AtifSectionConfig::editor_schema(); @@ -1311,6 +1549,7 @@ fn atif_storage_s3_parses_full_credential_block() { .expect("full credential block should parse"); assert_eq!(parsed.storage.len(), 1); match &parsed.storage[0] { + AtifStorageConfig::Http(_) => panic!("expected s3 storage"), AtifStorageConfig::S3(s3) => { assert_eq!(s3.bucket, "my-bucket"); assert_eq!(s3.key_prefix.as_deref(), Some("openshell/")); diff --git a/crates/node/observability.d.ts b/crates/node/observability.d.ts index cae4555e..18c20bca 100644 --- a/crates/node/observability.d.ts +++ b/crates/node/observability.d.ts @@ -25,6 +25,14 @@ export interface S3StorageConfig { allow_http?: boolean; } +export interface HttpStorageConfig { + type: 'http'; + endpoint: string; + headers?: Record; + header_env?: Record; + timeout_millis?: number; +} + export interface AtifConfig { enabled?: boolean; agent_name?: string; @@ -34,7 +42,7 @@ export interface AtifConfig { extra?: Record; output_directory?: string; filename_template?: string; - storage?: S3StorageConfig | S3StorageConfig[]; + storage?: S3StorageConfig | HttpStorageConfig | Array; } export interface OtlpConfig { diff --git a/crates/node/tests/observability_plugin_tests.mjs b/crates/node/tests/observability_plugin_tests.mjs index c4e21fc1..c73d4ead 100644 --- a/crates/node/tests/observability_plugin_tests.mjs +++ b/crates/node/tests/observability_plugin_tests.mjs @@ -56,6 +56,26 @@ describe('observability plugin helpers', () => { assert.deepEqual(report.diagnostics.map((diagnostic) => diagnostic.field).sort(), ['filename_template', 'mode']); }); + it('passes through mixed ATIF remote storage config', () => { + const s3 = { + type: 's3', + bucket: 'archive', + key_prefix: 'runs/', + }; + const http = { + type: 'http', + endpoint: 'https://example.com/atif', + headers: { 'x-static': 'value' }, + header_env: { authorization: 'NEMO_RELAY_ATIF_HTTP_AUTH' }, + timeout_millis: 1500, + }; + const config = observability.atifConfig({ + enabled: true, + storage: [s3, http], + }); + assert.deepEqual(config.storage, [s3, http]); + }); + it('activates ATOF and ATIF file sinks', async () => { const outputDirectory = tempDir('node-observability-plugin'); const config = { diff --git a/docs/observability-plugin/atif.mdx b/docs/observability-plugin/atif.mdx index a8d6d8a0..4d816178 100644 --- a/docs/observability-plugin/atif.mdx +++ b/docs/observability-plugin/atif.mdx @@ -62,8 +62,8 @@ the local file write is replaced by uploads to every configured backend; `output_directory` is ignored. Each storage entry is tagged with a `type` discriminator so additional -backends can be added without breaking existing configs. Today, S3-compatible -object storage is supported. +backends can be added without breaking existing configs. S3-compatible object +storage and HTTP endpoints are supported. ### S3-compatible storage @@ -78,11 +78,47 @@ bucket = "nemo-relay-traces" key_prefix = "openshell/" ``` +### HTTP endpoint storage + +```toml +[components.config.atif] +enabled = true +filename_template = "trajectory-{session_id}.json" + +[[components.config.atif.storage]] +type = "http" +endpoint = "https://observability.example.com/atif" +timeout_millis = 3000 + +[components.config.atif.storage.headers] +x-team = "agent-platform" + +[components.config.atif.storage.header_env] +authorization = "NEMO_RELAY_ATIF_HTTP_AUTH" +``` + +The HTTP backend sends one `POST` per completed trajectory. The request body is +the rendered ATIF JSON file with `content-type: application/json`. NeMo Relay +also sets `x-nemo-relay-atif-filename` and +`x-nemo-relay-atif-session-id` headers so receivers can keep the same object +identity that local files and S3 uploads use. + +HTTP `2xx` responses are treated as success. Any non-`2xx` response or +transport error records the endpoint as unhealthy and skips it for later +trajectories. Other configured destinations continue to receive writes. + +| Field | Default | Notes | +|---|---|---| +| `endpoint` | required | Destination `http://` or `https://` URL. | +| `headers` | `{}` | Static request headers. | +| `header_env` | `{}` | Header names mapped to environment variable names containing secret values. | +| `timeout_millis` | `3000` | Per-request timeout in milliseconds. Must be positive. | + ### Multiple destinations Add additional `[[components.config.atif.storage]]` tables to fan out the same trajectory to every destination — for example, an in-cluster MinIO target and a -remote AWS target: +remote HTTP endpoint: ```toml [components.config.atif] @@ -95,11 +131,8 @@ bucket = "nemo-relay-traces" key_prefix = "openshell/" [[components.config.atif.storage]] -type = "s3" -bucket = "team-archive" -key_prefix = "openshell/" -endpoint_url = "http://localhost:9000" -allow_http = true +type = "http" +endpoint = "https://observability.example.com/atif" ``` #### Connection fields diff --git a/python/nemo_relay/observability.py b/python/nemo_relay/observability.py index 9917f1f2..be36b9e7 100644 --- a/python/nemo_relay/observability.py +++ b/python/nemo_relay/observability.py @@ -110,6 +110,28 @@ def to_dict(self) -> JsonObject: ) +@dataclass(slots=True) +class HttpStorageConfig: + """HTTP endpoint settings for ATIF trajectory upload.""" + + endpoint: str = "" + headers: dict[str, str] = field(default_factory=dict) + header_env: dict[str, str] = field(default_factory=dict) + timeout_millis: int = 3000 + + def to_dict(self) -> JsonObject: + """Serialize this HTTP storage config to the canonical JSON object shape.""" + return _normalize_object( + { + "type": "http", + "endpoint": self.endpoint, + "headers": self.headers, + "header_env": self.header_env, + "timeout_millis": self.timeout_millis, + } + ) + + @dataclass(slots=True) class AtifConfig: """Per-top-level-agent ATIF file export settings.""" @@ -122,7 +144,7 @@ class AtifConfig: extra: JsonObject | None = None output_directory: str | None = None filename_template: str = "nemo-relay-atif-{session_id}.json" - storage: list[S3StorageConfig] | None = None + storage: list[S3StorageConfig | HttpStorageConfig] | None = None def to_dict(self) -> JsonObject: """Serialize this ATIF config to the canonical JSON object shape.""" @@ -223,6 +245,7 @@ def to_dict(self) -> JsonObject: "ConfigPolicy", "AtofConfig", "AtifConfig", + "HttpStorageConfig", "S3StorageConfig", "OtlpConfig", "ObservabilityConfig", diff --git a/python/nemo_relay/observability.pyi b/python/nemo_relay/observability.pyi index 405cb39b..9685ffc0 100644 --- a/python/nemo_relay/observability.pyi +++ b/python/nemo_relay/observability.pyi @@ -37,6 +37,14 @@ class S3StorageConfig: allow_http: bool | None = ... def to_dict(self) -> JsonObject: ... +@dataclass(slots=True) +class HttpStorageConfig: + endpoint: str = ... + headers: dict[str, str] = field(default_factory=dict) + header_env: dict[str, str] = field(default_factory=dict) + timeout_millis: int = ... + def to_dict(self) -> JsonObject: ... + @dataclass(slots=True) class AtifConfig: enabled: bool = ... @@ -47,7 +55,7 @@ class AtifConfig: extra: JsonObject | None = ... output_directory: str | None = ... filename_template: str = ... - storage: list[S3StorageConfig] | None = ... + storage: list[S3StorageConfig | HttpStorageConfig] | None = ... def to_dict(self) -> JsonObject: ... @dataclass(slots=True) diff --git a/python/tests/test_observability_plugin.py b/python/tests/test_observability_plugin.py index dba92d04..5ad4fb95 100644 --- a/python/tests/test_observability_plugin.py +++ b/python/tests/test_observability_plugin.py @@ -16,6 +16,7 @@ AtifConfig, AtofConfig, ComponentSpec, + HttpStorageConfig, ObservabilityConfig, OtlpConfig, S3StorageConfig, @@ -95,6 +96,24 @@ def test_s3_storage_config_serializes_credential_fields(self): atif = AtifConfig(enabled=True, storage=[storage]) assert atif.to_dict()["storage"] == [storage.to_dict()] + def test_http_storage_config_serializes_headers(self): + s3 = S3StorageConfig(bucket="archive") + http = HttpStorageConfig( + endpoint="https://example.com/atif", + headers={"x-static": "value"}, + header_env={"authorization": "NEMO_RELAY_ATIF_HTTP_AUTH"}, + timeout_millis=1500, + ) + assert http.to_dict() == { + "type": "http", + "endpoint": "https://example.com/atif", + "headers": {"x-static": "value"}, + "header_env": {"authorization": "NEMO_RELAY_ATIF_HTTP_AUTH"}, + "timeout_millis": 1500, + } + atif = AtifConfig(enabled=True, storage=[s3, http]) + assert atif.to_dict()["storage"] == [s3.to_dict(), http.to_dict()] + @pytest.mark.parametrize("use_context_manager", [True, False]) async def test_atof_and_atif_file_outputs(self, tmp_path: Path, use_context_manager: bool): config = ObservabilityConfig(