diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 91c149e6..22fe830a 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -19,8 +19,8 @@ use crate::api::runtime::{ use crate::api::scope::event; use crate::api::scope::{EmitMarkEventParams, ScopeHandle}; use crate::api::shared::{ - ensure_runtime_owner, resolve_parent_uuid, run_request_intercepts_with_codec, - snapshot_event_subscribers, + ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid, + run_request_intercepts_with_codec, snapshot_event_subscribers, }; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::response::AnnotatedLlmResponse; @@ -587,19 +587,22 @@ pub async fn llm_call_execute(params: LlmCallExecuteParams) -> Result { .as_ref() .and_then(|codec| codec.decode_response(&response).ok()) .map(Arc::new); + let end_metadata = metadata_with_otel_status(metadata, "OK", None); llm_call_end( LlmCallEndParams::builder() .handle(&handle) .response(response.clone()) .data_opt(data) - .metadata_opt(metadata) + .metadata_opt(end_metadata) .annotated_response_opt(annotated_response) .build(), )?; Ok(response) } Err(error) => { - let _ = emit_llm_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_llm_end_without_output(&handle, end_metadata); Err(error) } } @@ -744,7 +747,9 @@ pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Resu Ok(Box::pin(wrapper) as LlmJsonStream) } Err(error) => { - let _ = emit_llm_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_llm_end_without_output(&handle, end_metadata); Err(error) } } @@ -831,3 +836,7 @@ pub fn llm_conditional_execution(request: &LlmRequest) -> Result<()> { } Ok(()) } + +#[cfg(test)] +#[path = "../../tests/unit/llm_api_tests.rs"] +mod tests; diff --git a/crates/core/src/api/mod.rs b/crates/core/src/api/mod.rs index c0f9cc09..1932063c 100644 --- a/crates/core/src/api/mod.rs +++ b/crates/core/src/api/mod.rs @@ -18,4 +18,4 @@ pub mod subscriber; /// Tool lifecycle helpers and managed execution entry points. pub mod tool; -mod shared; +pub(crate) mod shared; diff --git a/crates/core/src/api/runtime/state.rs b/crates/core/src/api/runtime/state.rs index cab813d8..ee69b6f6 100644 --- a/crates/core/src/api/runtime/state.rs +++ b/crates/core/src/api/runtime/state.rs @@ -247,20 +247,30 @@ impl NemoRelayContextState { /// # Parameters /// - `handle`: Scope handle to serialize into an event. /// - `data`: Optional data payload returned from the scope. + /// - `metadata`: Optional metadata payload merged over `handle.metadata`. /// /// # Returns /// A scope-end [`Event`] derived from the provided handle. - pub fn end_scope_handle(&self, handle: &ScopeHandle, data: Option) -> Event { + pub fn end_scope_handle( + &self, + handle: &ScopeHandle, + data: Option, + metadata: Option, + ) -> Event { self.build_scope_end_event( EndScopeHandleParams::builder() .handle(handle) .data_opt(data) + .metadata_opt(metadata) .build(), ) } /// Build a scope-end event from builder parameters. /// + /// The `metadata` payload is merged over the metadata already stored on + /// the handle. + /// /// # Parameters /// - `params`: Scope end-event builder parameters. /// @@ -279,7 +289,7 @@ impl NemoRelayContextState { ) .name(handle.name.as_str()) .data_opt(params.data) - .metadata_opt(handle.metadata.clone()) + .metadata_opt(merge_json(handle.metadata.clone(), params.metadata)) .build(), ScopeCategory::End, scope_attributes_to_strings(handle.attributes), diff --git a/crates/core/src/api/scope.rs b/crates/core/src/api/scope.rs index 51700d35..31d60d4c 100644 --- a/crates/core/src/api/scope.rs +++ b/crates/core/src/api/scope.rs @@ -180,6 +180,9 @@ pub struct EndScopeHandleParams<'a> { /// Optional JSON payload exported as the semantic scope output. #[builder(default)] pub data: Option, + /// Optional metadata to be appended to the metadata set when the scope was created. + #[builder(default)] + pub metadata: Option, /// Optional timestamp recorded on the emitted end event. When omitted, the /// runtime records the current UTC time, or one microsecond after the /// handle start time if the current time is not later. @@ -196,6 +199,9 @@ pub struct PopScopeParams<'a> { /// Optional JSON payload exported as the semantic scope output. #[builder(default)] pub output: Option, + /// Optional JSON payload metadata to be appended to the metadata set when the scope was created. + #[builder(default)] + pub metadata: Option, /// Optional timestamp recorded on the emitted end event. When omitted, the /// runtime records the current UTC time, or one microsecond after the /// handle start time if the current time is not later. @@ -347,6 +353,7 @@ pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> { .handle(&scope) .data_opt(params.output) .timestamp_opt(params.timestamp) + .metadata_opt(params.metadata) .build(), ); (scope, event, subscribers) diff --git a/crates/core/src/api/shared.rs b/crates/core/src/api/shared.rs index 7793808d..5a57de2a 100644 --- a/crates/core/src/api/shared.rs +++ b/crates/core/src/api/shared.rs @@ -13,6 +13,7 @@ use crate::api::scope::ScopeHandle; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::traits::LlmCodec; use crate::error::{FlowError, Result}; +use crate::json::{Json, merge_json}; use crate::shared_runtime::ensure_process_runtime_owner; pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option { @@ -37,6 +38,25 @@ pub(crate) fn ensure_runtime_owner() -> Result<()> { ensure_process_runtime_owner() } +pub(crate) fn metadata_with_otel_status( + metadata: Option, + status_code: &'static str, + status_message: Option, +) -> Option { + let mut status = serde_json::Map::new(); + status.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + status.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + merge_json(metadata, Some(Json::Object(status))) +} + pub(crate) fn run_request_intercepts_with_codec( name: &str, request: LlmRequest, diff --git a/crates/core/src/api/tool.rs b/crates/core/src/api/tool.rs index fd38658b..c01e4be9 100644 --- a/crates/core/src/api/tool.rs +++ b/crates/core/src/api/tool.rs @@ -9,7 +9,10 @@ use crate::api::runtime::current_scope_stack; use crate::api::runtime::global_context; use crate::api::scope::event; use crate::api::scope::{EmitMarkEventParams, ScopeHandle}; -use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers}; +use crate::api::shared::{ + ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid, + snapshot_event_subscribers, +}; use crate::error::{FlowError, Result}; use crate::json::Json; use bitflags::bitflags; @@ -445,18 +448,21 @@ pub async fn tool_call_execute(params: ToolCallExecuteParams) -> Result { match execution(intercepted_args).await { Ok(result) => { + let end_metadata = metadata_with_otel_status(metadata, "OK", None); tool_call_end( ToolCallEndParams::builder() .handle(&handle) .result(result.clone()) .data_opt(data) - .metadata_opt(metadata) + .metadata_opt(end_metadata) .build(), )?; Ok(result) } Err(error) => { - let _ = emit_tool_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_tool_end_without_output(&handle, end_metadata); Err(error) } } @@ -542,3 +548,7 @@ pub fn tool_conditional_execution(name: &str, args: &Json) -> Result<()> { } Ok(()) } + +#[cfg(test)] +#[path = "../../tests/unit/tool_api_tests.rs"] +mod tests; diff --git a/crates/core/src/observability/mod.rs b/crates/core/src/observability/mod.rs index 9e0f46dc..851d4b90 100644 --- a/crates/core/src/observability/mod.rs +++ b/crates/core/src/observability/mod.rs @@ -18,3 +18,36 @@ pub mod openinference; #[cfg(feature = "otel")] pub mod otel; pub mod plugin_component; + +#[cfg(any(feature = "otel", feature = "openinference"))] +pub(crate) fn set_span_status_from_event_metadata(span: &mut S, event: &crate::api::event::Event) +where + S: opentelemetry::trace::Span, +{ + let Some(metadata) = event.metadata() else { + return; + }; + let Some(status_code) = metadata + .get("otel.status_code") + .and_then(crate::json::Json::as_str) + else { + return; + }; + + let status = match status_code { + "OK" => opentelemetry::trace::Status::Ok, + "ERROR" => opentelemetry::trace::Status::error( + metadata + .get("otel.status_message") + .and_then(crate::json::Json::as_str) + .unwrap_or_default() + .to_string(), + ), + "UNSET" => opentelemetry::trace::Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + opentelemetry::trace::Status::Unset + } + }; + span.set_status(status); +} diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 092ed669..a9d560d0 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -549,6 +549,7 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span .span diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index f7b42c6c..b3b7f0b3 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -543,6 +543,8 @@ impl OtelEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + + super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span .span @@ -657,16 +659,15 @@ fn start_attributes(event: &Event) -> Vec { fn end_attributes(event: &Event) -> Vec { let mut attributes = Vec::new(); push_serialized(&mut attributes, "nemo_relay.end.data_json", event.data()); - push_serialized( - &mut attributes, - "nemo_relay.end.metadata_json", - event.metadata(), - ); + + let metadata = event.metadata(); + push_serialized(&mut attributes, "nemo_relay.end.metadata_json", metadata); push_serialized( &mut attributes, "nemo_relay.end.output_json", event.output(), ); + attributes } diff --git a/crates/core/src/stream.rs b/crates/core/src/stream.rs index 18eeb326..b3c74f68 100644 --- a/crates/core/src/stream.rs +++ b/crates/core/src/stream.rs @@ -36,6 +36,7 @@ use crate::api::llm::LlmHandle; use crate::api::runtime::NemoRelayContextState; use crate::api::runtime::global_context; use crate::api::runtime::{ScopeStackHandle, current_scope_stack}; +use crate::api::shared::metadata_with_otel_status; use crate::codec::response::AnnotatedLlmResponse; use crate::codec::traits::LlmResponseCodec; use crate::error::Result; @@ -127,14 +128,24 @@ impl LlmStreamWrapper { return; } self.ended = true; - self.emit_end_event(); + self.emit_end_event(self.metadata.clone()); + } + + fn finish_with_status(&mut self, status_code: &'static str, status_message: Option) { + if self.ended { + return; + } + self.ended = true; + let metadata = + metadata_with_otel_status(self.metadata.clone(), status_code, status_message); + self.emit_end_event(metadata); } /// Emit the LLM END event with aggregated response data. /// /// Calls the finalizer to produce the aggregated response, runs sanitize /// response guardrails, and emits the END event. - fn emit_end_event(&mut self) { + fn emit_end_event(&mut self, metadata: Option) { let aggregated = match self.finalizer.take() { Some(finalizer) => finalizer(), None => Json::Null, @@ -163,12 +174,8 @@ impl LlmStreamWrapper { } else { Some(sanitized) }; - let event = state.end_llm_handle( - &self.handle, - data, - self.metadata.clone(), - annotated_response, - ); + let event = + state.end_llm_handle(&self.handle, data, metadata, annotated_response); Some((event, subscribers)) } Err(_) => None, @@ -232,17 +239,19 @@ impl Stream for LlmStreamWrapper { match (this.collector)(raw_chunk.clone()) { Ok(()) => Poll::Ready(Some(Ok(raw_chunk))), Err(e) => { - this.finish(); + let message = e.to_string(); + this.finish_with_status("ERROR", Some(message)); Poll::Ready(Some(Err(e))) } } } Poll::Ready(Some(Err(e))) => { - this.finish(); + let message = e.to_string(); + this.finish_with_status("ERROR", Some(message)); Poll::Ready(Some(Err(e))) } Poll::Ready(None) => { - this.finish(); + this.finish_with_status("OK", None); Poll::Ready(None) } Poll::Pending => Poll::Pending, diff --git a/crates/core/tests/unit/context_tests.rs b/crates/core/tests/unit/context_tests.rs index ac4ca5dc..f402de4f 100644 --- a/crates/core/tests/unit/context_tests.rs +++ b/crates/core/tests/unit/context_tests.rs @@ -15,7 +15,7 @@ use crate::api::runtime::EventSubscriberFn; use crate::api::runtime::ScopeStack; use crate::api::runtime::global_context; use crate::api::runtime::{NemoRelayContextState, flush_subscribers}; -use crate::api::scope::{ScopeAttributes, ScopeHandle, ScopeType}; +use crate::api::scope::{EndScopeHandleParams, ScopeAttributes, ScopeHandle, ScopeType}; use crate::api::tool::CreateToolHandleParams; use crate::context::registries::{ merge_execution_intercept_callables, merge_guardrail_entries, merge_intercept_entries, @@ -305,6 +305,35 @@ fn context_state_supports_extensions_events_and_builders() { assert_eq!(events.lock().unwrap().as_slice(), ["mark"]); } +#[test] +fn scope_end_metadata_merges_with_handle_metadata() { + let state = NemoRelayContextState::new(); + let scope = state.create_scope_handle( + crate::api::scope::CreateScopeHandleParams::builder() + .name("agent") + .scope_type(ScopeType::Agent) + .metadata(json!({"a": 1, "b": 2, "c": 3})) + .build(), + ); + + let scope_end = state.build_scope_end_event( + EndScopeHandleParams::builder() + .handle(&scope) + .metadata(json!({"c": 3.5, "d": 4})) + .build(), + ); + assert_eq!( + scope_end.metadata(), + Some(&json!({"a": 1, "b": 2, "c": 3.5, "d": 4})) + ); + + let scope_end = state.end_scope_handle(&scope, None, Some(json!({"c": 5, "e": 6}))); + assert_eq!( + scope_end.metadata(), + Some(&json!({"a": 1, "b": 2, "c": 5, "e": 6})) + ); +} + #[test] fn global_context_is_a_singleton_handle() { let first = global_context(); diff --git a/crates/core/tests/unit/llm_api_tests.rs b/crates/core/tests/unit/llm_api_tests.rs new file mode 100644 index 00000000..24e00236 --- /dev/null +++ b/crates/core/tests/unit/llm_api_tests.rs @@ -0,0 +1,281 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for LLM API lifecycle behavior. + +use std::sync::{Arc, Mutex}; + +use serde_json::json; +use tokio_stream::StreamExt; + +use super::{ + LlmCallExecuteParams, LlmRequest, LlmStreamCallExecuteParams, llm_call_execute, + llm_stream_call_execute, +}; +use crate::api::event::ScopeCategory; +use crate::api::runtime::LlmJsonStream; +use crate::api::runtime::{NemoRelayContextState, global_context}; +use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use crate::error::FlowError; +use crate::json::Json; + +fn reset_global() { + crate::shared_runtime::reset_runtime_owner_for_tests(); + let context = global_context(); + *context.write().unwrap() = NemoRelayContextState::new(); +} + +fn request() -> LlmRequest { + LlmRequest { + headers: serde_json::Map::new(), + content: json!({"messages": [], "model": "demo"}), + } +} + +#[test] +fn llm_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let response = llm_call_execute( + LlmCallExecuteParams::builder() + .name("llm-ok") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { Ok(json!({"ok": true})) }) + })) + .metadata(json!({"caller": "llm-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + assert_eq!(response, json!({"ok": true})); + + let error = llm_call_execute( + LlmCallExecuteParams::builder() + .name("llm-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { Err(FlowError::Internal("llm boom".to_string())) }) + })) + .metadata(json!({"caller": "llm-error"})) + .build(), + ) + .await + .unwrap_err(); + assert!(error.to_string().contains("llm boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing end event metadata for {name}")) + }; + + let success_metadata = metadata_for("llm-ok"); + assert_eq!(success_metadata["caller"], json!("llm-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); + + let error_metadata = metadata_for("llm-error"); + assert_eq!(error_metadata["caller"], json!("llm-error")); + assert_eq!(error_metadata["otel.status_code"], json!("ERROR")); + assert!( + error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("llm boom") + ); +} + +#[test] +fn llm_stream_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-stream-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-ok") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok( + Box::pin(tokio_stream::iter(vec![Ok(json!({"chunk": true}))])) + as LlmJsonStream, + ) + }) + })) + .collector(Box::new(|_chunk| Ok(()))) + .finalizer(Box::new(|| json!({"ok": true}))) + .metadata(json!({"caller": "llm-stream-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + + while let Some(chunk) = stream.next().await { + chunk.unwrap(); + } + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-stream-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let success_metadata = events + .iter() + .find(|event| event.0 == "llm-stream-ok") + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing stream end event metadata")); + assert_eq!(success_metadata["caller"], json!("llm-stream-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); +} + +#[test] +fn llm_stream_call_execute_adds_otel_error_metadata_to_failed_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-stream-error-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut upstream_error_stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-upstream-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok(Box::pin(tokio_stream::iter(vec![Err(FlowError::Internal( + "stream boom".to_string(), + ))])) as LlmJsonStream) + }) + })) + .collector(Box::new(|_chunk| Ok(()))) + .finalizer(Box::new(|| json!({"partial": true}))) + .metadata( + json!({"caller": "llm-stream-upstream-error", "otel.status_code": "USER"}), + ) + .build(), + ) + .await + .unwrap(); + let upstream_error = upstream_error_stream.next().await.unwrap().unwrap_err(); + assert!(upstream_error.to_string().contains("stream boom")); + + let mut collector_error_stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-collector-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok( + Box::pin(tokio_stream::iter(vec![Ok(json!({"chunk": true}))])) + as LlmJsonStream, + ) + }) + })) + .collector(Box::new(|_chunk| { + Err(FlowError::Internal("collector boom".to_string())) + })) + .finalizer(Box::new(|| json!({"partial": true}))) + .metadata( + json!({"caller": "llm-stream-collector-error", "otel.status_code": "USER"}), + ) + .build(), + ) + .await + .unwrap(); + let collector_error = collector_error_stream.next().await.unwrap().unwrap_err(); + assert!(collector_error.to_string().contains("collector boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-stream-error-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing stream end event metadata for {name}")) + }; + + let upstream_error_metadata = metadata_for("llm-stream-upstream-error"); + assert_eq!( + upstream_error_metadata["caller"], + json!("llm-stream-upstream-error") + ); + assert_eq!(upstream_error_metadata["otel.status_code"], json!("ERROR")); + assert!( + upstream_error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("stream boom") + ); + + let collector_error_metadata = metadata_for("llm-stream-collector-error"); + assert_eq!( + collector_error_metadata["caller"], + json!("llm-stream-collector-error") + ); + assert_eq!(collector_error_metadata["otel.status_code"], json!("ERROR")); + assert!( + collector_error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("collector boom") + ); +} diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 4488b4b9..db3eb02b 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -20,6 +20,7 @@ use crate::codec::request::{ use crate::codec::response::{AnnotatedLlmResponse, FinishReason, ResponseToolCall, Usage}; use crate::json::Json; use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra}; +use opentelemetry::trace::Status; use opentelemetry_sdk::trace::InMemorySpanExporterBuilder; use serde_json::json; use std::collections::HashMap; @@ -1805,6 +1806,51 @@ fn scope_end_output_payload_is_exported_to_openinference_attributes() { ); } +#[test] +fn scope_end_metadata_sets_openinference_span_status() { + let cases = [ + ( + json!({"otel.status_code": "ERROR", "otel.status_message": "failed"}), + Status::error("failed".to_string()), + ), + (json!({"otel.status_code": "OK"}), Status::Ok), + (json!({}), Status::Unset), + ]; + + for (metadata, expected_status) in cases { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let scope_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + scope_uuid, + None, + "agent", + ScopeType::Agent, + Some(json!({"task": "summarize"})), + )); + processor.process(&Event::Scope(ScopeEvent::new( + BaseEvent::builder() + .uuid(scope_uuid) + .name("agent") + .metadata(metadata) + .data(json!({"status": "done"})) + .build(), + ScopeCategory::End, + Vec::new(), + EventCategory::agent(), + None, + ))); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!(spans[0].status, expected_status); + } +} + #[test] fn pre_epoch_timestamps_round_trip_through_system_time() { let timestamp = DateTime::parse_from_rfc3339("1969-12-31T23:59:58.500000000Z") diff --git a/crates/core/tests/unit/tool_api_tests.rs b/crates/core/tests/unit/tool_api_tests.rs new file mode 100644 index 00000000..aa3d6670 --- /dev/null +++ b/crates/core/tests/unit/tool_api_tests.rs @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for tool API lifecycle behavior. + +use std::sync::{Arc, Mutex}; + +use serde_json::json; + +use super::{ToolCallExecuteParams, tool_call_execute}; +use crate::api::event::ScopeCategory; +use crate::api::runtime::{NemoRelayContextState, global_context}; +use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use crate::error::FlowError; +use crate::json::Json; + +fn reset_global() { + crate::shared_runtime::reset_runtime_owner_for_tests(); + let context = global_context(); + *context.write().unwrap() = NemoRelayContextState::new(); +} + +#[test] +fn tool_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "tool-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let result = tool_call_execute( + ToolCallExecuteParams::builder() + .name("tool-ok") + .args(json!({"value": 1})) + .func(Arc::new(|_args| { + Box::pin(async { Ok(json!({"ok": true})) }) + })) + .metadata(json!({"caller": "tool-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + assert_eq!(result, json!({"ok": true})); + + let error = tool_call_execute( + ToolCallExecuteParams::builder() + .name("tool-error") + .args(json!({"value": 2})) + .func(Arc::new(|_args| { + Box::pin(async { Err(FlowError::Internal("tool boom".to_string())) }) + })) + .metadata(json!({"caller": "tool-error"})) + .build(), + ) + .await + .unwrap_err(); + assert!(error.to_string().contains("tool boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("tool-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing end event metadata for {name}")) + }; + + let success_metadata = metadata_for("tool-ok"); + assert_eq!(success_metadata["caller"], json!("tool-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); + + let error_metadata = metadata_for("tool-error"); + assert_eq!(error_metadata["caller"], json!("tool-error")); + assert_eq!(error_metadata["otel.status_code"], json!("ERROR")); + assert!( + error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("tool boom") + ); +} diff --git a/crates/ffi/nemo_relay.h b/crates/ffi/nemo_relay.h index d7fdb94d..a1a1d27e 100644 --- a/crates/ffi/nemo_relay.h +++ b/crates/ffi/nemo_relay.h @@ -1417,21 +1417,25 @@ NemoRelayStatus nemo_relay_push_scope(const char *name, * - `handle`: The current top-of-stack scope handle to pop. * - `output_json`: Optional null-terminated JSON string exported as semantic * scope output on the end event, or null. + * - `metadata_json`: Optional null-terminated JSON metadata string recorded + * on the end event, or null. Incoming metadata is merged over metadata + * stored on the scope handle. * - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end * event, or null to use the runtime default end timestamp. * * # Errors - * Returns `InvalidJson` for invalid output JSON, `InvalidArg` when + * Returns `InvalidJson` for invalid output or metadata JSON, `InvalidArg` when * `timestamp_unix_micros` is outside the supported timestamp range, or an * error status when `handle` is not the current top scope. * * # Safety - * `handle` must be a valid, non-null `FfiScopeHandle` pointer. `output_json` and - * `timestamp_unix_micros` may be null; when non-null, optional pointers must - * be valid for reads for the duration of the call. + * `handle` must be a valid, non-null `FfiScopeHandle` pointer. Optional + * pointer arguments may be null; when non-null, they must be valid for reads + * for the duration of the call. */ NemoRelayStatus nemo_relay_pop_scope(const struct FfiScopeHandle *handle, const char *output_json, + const char *metadata_json, const int64_t *timestamp_unix_micros); /** diff --git a/crates/ffi/src/api/scope.rs b/crates/ffi/src/api/scope.rs index 7b3cfdec..6e956102 100644 --- a/crates/ffi/src/api/scope.rs +++ b/crates/ffi/src/api/scope.rs @@ -139,22 +139,26 @@ pub unsafe extern "C" fn nemo_relay_push_scope( /// - `handle`: The current top-of-stack scope handle to pop. /// - `output_json`: Optional null-terminated JSON string exported as semantic /// scope output on the end event, or null. +/// - `metadata_json`: Optional null-terminated JSON metadata string recorded +/// on the end event, or null. Incoming metadata is merged over metadata +/// stored on the scope handle. /// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end /// event, or null to use the runtime default end timestamp. /// /// # Errors -/// Returns `InvalidJson` for invalid output JSON, `InvalidArg` when +/// Returns `InvalidJson` for invalid output or metadata JSON, `InvalidArg` when /// `timestamp_unix_micros` is outside the supported timestamp range, or an /// error status when `handle` is not the current top scope. /// /// # Safety -/// `handle` must be a valid, non-null `FfiScopeHandle` pointer. `output_json` and -/// `timestamp_unix_micros` may be null; when non-null, optional pointers must -/// be valid for reads for the duration of the call. +/// `handle` must be a valid, non-null `FfiScopeHandle` pointer. Optional +/// pointer arguments may be null; when non-null, they must be valid for reads +/// for the duration of the call. #[unsafe(no_mangle)] pub unsafe extern "C" fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, + metadata_json: *const c_char, timestamp_unix_micros: *const i64, ) -> NemoRelayStatus { clear_last_error(); @@ -166,6 +170,10 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( Some(v) => v, None => return NemoRelayStatus::InvalidJson, }; + let metadata = match c_str_to_opt_json(metadata_json) { + Some(v) => v, + None => return NemoRelayStatus::InvalidJson, + }; let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) { Some(v) => v, None => return NemoRelayStatus::InvalidArg, @@ -174,6 +182,7 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&unsafe { &*handle }.0.uuid) .output_opt(output) + .metadata_opt(metadata) .timestamp_opt(timestamp) .build(), ) { diff --git a/crates/ffi/tests/integration/api_tests.rs b/crates/ffi/tests/integration/api_tests.rs index 5a0af1dd..5f38fbcb 100644 --- a/crates/ffi/tests/integration/api_tests.rs +++ b/crates/ffi/tests/integration/api_tests.rs @@ -110,7 +110,7 @@ unsafe fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, ) -> NemoRelayStatus { - unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null()) } + unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null(), ptr::null()) } } unsafe fn nemo_relay_event( diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index d7506060..08ce98c2 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -557,6 +557,81 @@ fn test_ffi_error_paths_and_scope_stack() { } } +#[test] +fn test_ffi_pop_scope_merges_scope_metadata() { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); + reset_globals(); + + unsafe { + let stack = fresh_scope_stack(); + let subscriber_name = unique_name("ffi_scope_end_metadata_subscriber"); + let subscriber_name_c = cstring(&subscriber_name); + assert_eq!( + nemo_relay_register_subscriber( + subscriber_name_c.as_ptr(), + subscriber_cb, + ptr::null_mut(), + None, + ), + NemoRelayStatus::Ok + ); + + let scope_name = cstring("ffi_scope_end_metadata"); + let scope_metadata = cstring(r#"{"a":1,"b":2,"c":3}"#); + let end_metadata = cstring(r#"{"c":3.5,"d":4}"#); + let invalid_end_metadata = cstring("{"); + let mut scope = ptr::null_mut(); + assert_eq!( + nemo_relay_push_scope( + scope_name.as_ptr(), + NemoRelayScopeType::Function, + ptr::null(), + 0, + ptr::null(), + scope_metadata.as_ptr(), + ptr::null(), + &mut scope, + ), + NemoRelayStatus::Ok + ); + assert_eq!( + api::nemo_relay_pop_scope( + scope, + ptr::null(), + invalid_end_metadata.as_ptr(), + ptr::null() + ), + NemoRelayStatus::InvalidJson + ); + assert_eq!( + api::nemo_relay_pop_scope(scope, ptr::null(), end_metadata.as_ptr(), ptr::null(),), + NemoRelayStatus::Ok + ); + assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); + + let events = lock_unpoisoned(event_log()).clone(); + let end_event = events + .iter() + .find(|event| { + event["json"]["kind"] == json!("scope") + && event["json"]["name"] == json!("ffi_scope_end_metadata") + && event["json"]["scope_category"] == json!("end") + }) + .unwrap(); + assert_eq!( + end_event["metadata"], + json!({"a": 1, "b": 2, "c": 3.5, "d": 4}) + ); + + assert_eq!( + nemo_relay_deregister_subscriber(subscriber_name_c.as_ptr()), + NemoRelayStatus::Ok + ); + nemo_relay_scope_handle_free(scope); + nemo_relay_scope_stack_free(stack); + } +} + #[test] fn test_ffi_event_json_null_pointer_returns_null() { unsafe { @@ -868,7 +943,7 @@ fn test_ffi_manual_lifecycle_timestamps_accept_unix_micros() { ); assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), ×tamps[6]), + api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null(), ×tamps[6]), NemoRelayStatus::Ok ); @@ -1077,10 +1152,11 @@ fn test_ffi_manual_lifecycle_timestamps_reject_out_of_range_unix_micros() { assert_invalid_timestamp(api::nemo_relay_pop_scope( scope, ptr::null(), + ptr::null(), &invalid_timestamp, )); assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null()), + api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null(), ptr::null()), NemoRelayStatus::Ok ); diff --git a/crates/ffi/tests/unit/api_tests.rs b/crates/ffi/tests/unit/api_tests.rs index 2f3af3b2..8f4e728a 100644 --- a/crates/ffi/tests/unit/api_tests.rs +++ b/crates/ffi/tests/unit/api_tests.rs @@ -98,7 +98,7 @@ unsafe fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, ) -> NemoRelayStatus { - unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null()) } + unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null(), ptr::null()) } } unsafe fn nemo_relay_event( diff --git a/crates/node/src/api/mod.rs b/crates/node/src/api/mod.rs index 6194490a..855190a9 100644 --- a/crates/node/src/api/mod.rs +++ b/crates/node/src/api/mod.rs @@ -92,6 +92,21 @@ fn parse_string_map( Ok(out) } +fn otel_status_metadata(status_code: &'static str, status_message: Option) -> Json { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + metadata.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + Json::Object(metadata) +} + fn build_otel_config( options: Option, ) -> napi::Result { @@ -1274,15 +1289,22 @@ pub fn push_scope( /// Optional `output` is a semantic JSON payload exported on the scope end event. /// Optional `timestamp` is a Unix timestamp in microseconds recorded on the end event. /// It must be a safe integer number; omit it to use the runtime default end timestamp. +/// Optional `metadata` is a JSON metadata payload recorded on the scope end event. /// Throws if the handle does not match the current top scope. #[napi] -pub fn pop_scope(handle: &ScopeHandle, output: Option, timestamp: Option) -> Result<()> { +pub fn pop_scope( + handle: &ScopeHandle, + output: Option, + timestamp: Option, + metadata: Option, +) -> Result<()> { let timestamp = parse_timestamp_micros(timestamp)?; core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&handle.inner.uuid) .output_opt(opt_json(output)) .timestamp_opt(timestamp) + .metadata_opt(opt_json(metadata)) .build(), ) .map_err(to_napi_err)?; @@ -1343,13 +1365,17 @@ pub fn with_scope( // Create a promise-aware wrapper so we handle both sync and async callbacks. let pa_fn = std::sync::Arc::new( crate::promise_call::PromiseAwareFn::new(&env, &callback).map_err(|e| { - // Pop scope before propagating error + let status_message = format!("failed to create PromiseAwareFn: {e}"); let _ = core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(status_message.clone()), + ))) .build(), ); - napi::Error::from_reason(format!("failed to create PromiseAwareFn: {e}")) + napi::Error::from_reason(status_message) })?, ); @@ -1369,10 +1395,15 @@ pub fn with_scope( }); let result = pa_fn.call_with_arg0(build_handle).await; + let metadata = match &result { + Ok(_) => otel_status_metadata("OK", None), + Err(error) => otel_status_metadata("ERROR", Some(error.to_string())), + }; // Always pop the scope, even on error. if core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(metadata)) .build(), ) .is_ok() diff --git a/crates/node/tests/llm_tests.mjs b/crates/node/tests/llm_tests.mjs index 16a3c4ff..654b4aa7 100644 --- a/crates/node/tests/llm_tests.mjs +++ b/crates/node/tests/llm_tests.mjs @@ -32,6 +32,7 @@ const { deregisterLlmStreamExecutionIntercept, registerSubscriber, deregisterSubscriber, + flushSubscribers, ScopeType, } = lib; @@ -42,6 +43,13 @@ function rejectWith(value) { return Promise.reject(value); } +async function flushSubscriberCallbacks() { + flushSubscribers(); + for (let i = 0; i < 10; i += 1) { + await new Promise((resolve) => setImmediate(resolve)); + } +} + function makeNative() { return { headers: {}, @@ -173,6 +181,71 @@ describe('LLM execute', () => { assert.equal(result, null); }); + it('execute records OTEL status metadata on end events', async () => { + const events = []; + registerSubscriber('node_llm_status_metadata_sub', (e) => events.push(e)); + try { + const result = await llmCallExecute( + 'exec_status_ok_llm', + makeNative(), + () => ({ + response: 'ok', + }), + null, + null, + null, + { + caller: 'node-llm', + }, + null, + ); + assert.deepEqual(result, { + response: 'ok', + }); + + await assert.rejects( + () => + llmCallExecuteAsync( + 'exec_status_error_llm', + makeNative(), + async () => { + throw new Error('llm status failure'); + }, + null, + null, + null, + { + caller: 'node-llm-error', + }, + null, + ), + /llm status failure/, + ); + + await flushSubscriberCallbacks(); + const okEnd = events.find( + (e) => + e.name === 'exec_status_ok_llm' && e.kind === 'scope' && e.category === 'llm' && e.scope_category === 'end', + ); + const errorEnd = events.find( + (e) => + e.name === 'exec_status_error_llm' && + e.kind === 'scope' && + e.category === 'llm' && + e.scope_category === 'end', + ); + assert.ok(okEnd, 'expected successful llm end event'); + assert.equal(okEnd.metadata.caller, 'node-llm'); + assert.equal(okEnd.metadata['otel.status_code'], 'OK'); + assert.ok(errorEnd, 'expected failed llm end event'); + assert.equal(errorEnd.metadata.caller, 'node-llm-error'); + assert.equal(errorEnd.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEnd.metadata['otel.status_message'], /llm status failure/); + } finally { + deregisterSubscriber('node_llm_status_metadata_sub'); + } + }); + it('async execute awaits Promise-returning callbacks', async () => { const result = await llmCallExecuteAsync( 'exec_async_llm', diff --git a/crates/node/tests/scope_tests.mjs b/crates/node/tests/scope_tests.mjs index 743204a3..c14f2473 100644 --- a/crates/node/tests/scope_tests.mjs +++ b/crates/node/tests/scope_tests.mjs @@ -97,6 +97,24 @@ describe('Scope operations', () => { popScope(scope); } }); + + it('popScope merges end metadata over scope metadata', async () => { + const events = []; + registerSubscriber('node_scope_pop_metadata_sub', (e) => events.push(e)); + try { + const scope = pushScope('pop_metadata_scope', ScopeType.Agent, null, null, null, { a: 1, b: 2, c: 3 }); + popScope(scope, null, null, { c: 3.5, d: 4 }); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'pop_metadata_scope' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.deepEqual(end.metadata, { a: 1, b: 2, c: 3.5, d: 4 }); + } finally { + deregisterSubscriber('node_scope_pop_metadata_sub'); + } + }); }); // =========================================================================== @@ -183,6 +201,27 @@ describe('withScope', () => { }); }); + it('records OK status metadata on successful auto-pop', async () => { + const events = []; + registerSubscriber('node_with_scope_ok_status_sub', (e) => events.push(e)); + try { + await withScope('with_scope_ok_status', ScopeType.Function, () => ({ ok: true }), null, null, null, { + caller: 'node', + }); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'with_scope_ok_status' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.equal(end.metadata.caller, 'node'); + assert.equal(end.metadata['otel.status_code'], 'OK'); + assert.equal(Object.hasOwn(end.metadata, 'otel.status_message'), false); + } finally { + deregisterSubscriber('node_with_scope_ok_status_sub'); + } + }); + it('pops scope on synchronous throw', async () => { const before = getHandle(); await assert.rejects( @@ -210,6 +249,30 @@ describe('withScope', () => { assert.equal(after.uuid, before.uuid, 'scope should be popped after rejection'); }); + it('records ERROR status metadata on failed auto-pop', async () => { + const events = []; + registerSubscriber('node_with_scope_error_status_sub', (e) => events.push(e)); + try { + await assert.rejects( + () => + withScope('with_scope_error_status', ScopeType.Tool, async () => { + throw new Error('node status failure'); + }), + /node status failure/, + ); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'with_scope_error_status' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.equal(end.metadata['otel.status_code'], 'ERROR'); + assert.match(end.metadata['otel.status_message'], /node status failure/); + } finally { + deregisterSubscriber('node_with_scope_error_status_sub'); + } + }); + it('surfaces primitive rejection values as unknown error and still pops the scope', async () => { const before = getHandle(); await assert.rejects( diff --git a/crates/node/tests/tools_tests.mjs b/crates/node/tests/tools_tests.mjs index 6566a2b8..2bb9354f 100644 --- a/crates/node/tests/tools_tests.mjs +++ b/crates/node/tests/tools_tests.mjs @@ -29,6 +29,7 @@ const { deregisterToolExecutionIntercept, registerSubscriber, deregisterSubscriber, + flushSubscribers, ScopeType, } = lib; @@ -38,6 +39,13 @@ function rejectWithPrimitive(value) { return Promise.reject(value); } +async function flushSubscriberCallbacks() { + flushSubscribers(); + for (let i = 0; i < 10; i += 1) { + await new Promise((resolve) => setImmediate(resolve)); + } +} + // =========================================================================== // Tool lifecycle // =========================================================================== @@ -237,6 +245,71 @@ describe('Tool execute', () => { }); }); + it('execute records OTEL status metadata on end events', async () => { + const events = []; + registerSubscriber('node_tool_status_metadata_sub', (e) => events.push(e)); + try { + const result = await toolCallExecute( + 'exec_status_ok_tool', + { + x: 1, + }, + (args) => ({ + result: args.x + 1, + }), + null, + null, + null, + { + caller: 'node-tool', + }, + ); + assert.deepEqual(result, { + result: 2, + }); + + await assert.rejects( + () => + toolCallExecuteAsync( + 'exec_status_error_tool', + {}, + async () => { + throw new Error('tool status failure'); + }, + null, + null, + null, + { + caller: 'node-tool-error', + }, + ), + /tool status failure/, + ); + + await flushSubscriberCallbacks(); + const okEnd = events.find( + (e) => + e.name === 'exec_status_ok_tool' && e.kind === 'scope' && e.category === 'tool' && e.scope_category === 'end', + ); + const errorEnd = events.find( + (e) => + e.name === 'exec_status_error_tool' && + e.kind === 'scope' && + e.category === 'tool' && + e.scope_category === 'end', + ); + assert.ok(okEnd, 'expected successful tool end event'); + assert.equal(okEnd.metadata.caller, 'node-tool'); + assert.equal(okEnd.metadata['otel.status_code'], 'OK'); + assert.ok(errorEnd, 'expected failed tool end event'); + assert.equal(errorEnd.metadata.caller, 'node-tool-error'); + assert.equal(errorEnd.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEnd.metadata['otel.status_message'], /tool status failure/); + } finally { + deregisterSubscriber('node_tool_status_metadata_sub'); + } + }); + it('async execute awaits Promise-returning callbacks', async () => { const result = await toolCallExecuteAsync( 'exec_async_tool', diff --git a/crates/python/src/py_api/mod.rs b/crates/python/src/py_api/mod.rs index 43b53d47..9abb996d 100644 --- a/crates/python/src/py_api/mod.rs +++ b/crates/python/src/py_api/mod.rs @@ -252,27 +252,32 @@ fn push_scope( /// Args: /// handle: The current top-of-stack scope handle returned by ``push``. /// output: Optional JSON-serializable semantic output payload for the scope end event. +/// metadata: Optional JSON-serializable metadata to append to the metadata set when the scope was created. /// timestamp: Optional timezone-aware ``datetime.datetime`` for the emitted end event. /// When omitted, the runtime default end timestamp is used. /// /// Raises: /// RuntimeError: If the scope is not the current top scope or is not found /// on the stack. +/// ValueError: If ``output`` or ``metadata`` cannot be converted to JSON-compatible data. /// TypeError: If ``timestamp`` is not a ``datetime.datetime``. /// ValueError: If ``timestamp`` is a naive datetime. #[pyfunction] -#[pyo3(signature = (handle: "ScopeHandle", output: "object | None"=None, timestamp: "datetime.datetime | None"=None) -> "None", text_signature = "(handle: ScopeHandle, output: object | None = None, timestamp: datetime.datetime | None = None) -> None")] +#[pyo3(signature = (handle: "ScopeHandle", output: "object | None"=None, metadata: "object | None"=None, timestamp: "datetime.datetime | None"=None) -> "None", text_signature = "(handle: ScopeHandle, output: object | None = None, metadata: object | None = None, timestamp: datetime.datetime | None = None) -> None")] fn pop_scope( handle: &PyScopeHandle, output: Option<&Bound<'_, PyAny>>, + metadata: Option<&Bound<'_, PyAny>>, timestamp: Option<&Bound<'_, PyAny>>, ) -> PyResult<()> { let output = opt_py_to_json(output)?; + let metadata = opt_py_to_json(metadata)?; let timestamp = opt_py_to_timestamp(timestamp)?; core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&handle.inner.uuid) .output_opt(output) + .metadata_opt(metadata) .timestamp_opt(timestamp) .build(), ) @@ -465,6 +470,8 @@ fn tool_call_end( /// attributes: Optional ``ToolAttributes`` bitflags. /// data: Optional JSON-serializable application data. /// metadata: Optional JSON-serializable metadata. +/// End events receive ``otel.status_code = "OK"`` on success, or +/// ``otel.status_code = "ERROR"`` and ``otel.status_message`` on error. /// Returns: /// An awaitable that resolves to the tool result after execution /// intercepts. Sanitize guardrails do not rewrite the value returned to @@ -675,6 +682,8 @@ fn llm_call_end( /// attributes: Optional ``LlmAttributes`` bitflags. /// data: Optional JSON-serializable application data. /// metadata: Optional JSON-serializable metadata. +/// End events receive ``otel.status_code = "OK"`` on success, or +/// ``otel.status_code = "ERROR"`` and ``otel.status_message`` on error. /// model_name: Optional normalized model name recorded in emitted LLM events. /// codec: Optional request codec used for annotated-aware request intercepts. /// response_codec: Optional response codec used to attach annotated response data diff --git a/crates/python/tests/coverage/py_api_coverage_tests.rs b/crates/python/tests/coverage/py_api_coverage_tests.rs index c3d86e06..03542b8e 100644 --- a/crates/python/tests/coverage/py_api_coverage_tests.rs +++ b/crates/python/tests/coverage/py_api_coverage_tests.rs @@ -134,7 +134,7 @@ fn py_api_helpers_and_scope_lifecycle_round_trip() { ) .unwrap(); - pop_scope(&child, None, None).unwrap(); + pop_scope(&child, None, None, None).unwrap(); assert_eq!(get_handle().unwrap().inner.name, "root"); }); } @@ -780,7 +780,7 @@ async def run_stream(api, request, func, collector, finalizer, handle, attribute assert!(deregister_subscriber(&global_subscriber).unwrap()); assert!(!deregister_subscriber(&global_subscriber).unwrap()); - pop_scope(&child, None, None).unwrap(); + pop_scope(&child, None, None, None).unwrap(); }); } diff --git a/crates/wasm/src/api/mod.rs b/crates/wasm/src/api/mod.rs index 7b2e04ac..43903216 100644 --- a/crates/wasm/src/api/mod.rs +++ b/crates/wasm/src/api/mod.rs @@ -30,6 +30,7 @@ use std::sync::{Arc, Mutex}; use js_sys::{Function, Reflect}; use serde::{Deserialize, Serialize}; +use serde_json::Value as Json; use uuid::Uuid; use wasm_bindgen::JsCast; use wasm_bindgen::closure::Closure; @@ -70,6 +71,37 @@ use crate::stream::LlmStream; pub use crate::types::{LLM_STATEFUL, LLM_STREAMING, SCOPE_PARALLEL, TOOL_REMOTE}; use crate::types::{LlmHandle, ScopeHandle, ScopeStack, ScopeType, ToolHandle}; +fn otel_status_metadata(status_code: &'static str, status_message: Option) -> Json { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + metadata.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + Json::Object(metadata) +} + +fn js_error_message(error: &JsValue) -> String { + if let Some(message) = error.as_string() { + return message; + } + if let Ok(message) = Reflect::get(error, &JsValue::from_str("message")) + && let Some(message) = message.as_string() + { + return message; + } + js_sys::JSON::stringify(error) + .ok() + .and_then(|value| value.as_string()) + .filter(|value| value != "{}") + .unwrap_or_else(|| "JavaScript callback failed".to_string()) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct WasmOpenTelemetryConfig { @@ -394,6 +426,7 @@ pub fn push_scope( /// Optional `output` is a semantic JSON payload exported on the scope end event. /// Optional `timestamp` is a Unix microseconds timestamp recorded on the scope end event. /// It must be a safe integer number; omitted values use the runtime default end timestamp. +/// Optional `metadata` is a JSON metadata payload recorded on the scope end event. /// /// Throws if the handle does not match the current top of the stack. #[wasm_bindgen(js_name = "popScope")] @@ -401,6 +434,7 @@ pub fn pop_scope( handle: &ScopeHandle, #[wasm_bindgen(unchecked_param_type = "Json | null | undefined")] output: JsValue, #[wasm_bindgen(unchecked_param_type = "number | null | undefined")] timestamp: Option, + #[wasm_bindgen(unchecked_param_type = "Json | null | undefined")] metadata: JsValue, ) -> Result<(), JsValue> { let timestamp = opt_js_to_timestamp_micros(timestamp)?; relay_scope_api::pop_scope( @@ -408,6 +442,7 @@ pub fn pop_scope( .handle_uuid(&handle.inner.uuid) .output_opt(opt_js_to_json(&output)?) .timestamp_opt(timestamp) + .metadata_opt(opt_js_to_json(&metadata)?) .build(), ) .map_err(to_js_err) @@ -493,6 +528,7 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&then_uuid) + .metadata_opt(Some(otel_status_metadata("OK", None))) .build(), ); resolved @@ -503,6 +539,10 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&catch_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(js_error_message(&rejected)), + ))) .build(), ); // Re-throw by returning a rejected promise @@ -522,6 +562,7 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata("OK", None))) .build(), ); Ok(js_sys::Promise::resolve(&val)) @@ -531,6 +572,10 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(js_error_message(&err)), + ))) .build(), ); Err(err) diff --git a/crates/wasm/tests-js/llm_tests.mjs b/crates/wasm/tests-js/llm_tests.mjs index e0fa47da..3b869a39 100644 --- a/crates/wasm/tests-js/llm_tests.mjs +++ b/crates/wasm/tests-js/llm_tests.mjs @@ -318,6 +318,73 @@ test('WebAssembly llm execute flow works from the generated Node package', async } }); +test('WebAssembly llmCallExecute adds OTEL status metadata to end events', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = unique('wasm_llm_status_sub'); + + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await wasm.llmCallExecute( + 'wasm_llm_status_ok', + makeLlmRequest(), + async () => ({ + role: 'assistant', + content: 'ok', + tool_calls: [], + }), + null, + 0, + null, + { + caller: 'wasm-llm', + 'otel.status_code': 'USER', + }, + 'demo-model', + ); + + await assert.rejects( + () => + wasm.llmCallExecute( + 'wasm_llm_status_error', + makeLlmRequest(), + async () => { + throw new Error('wasm llm failure'); + }, + null, + 0, + null, + { + caller: 'wasm-llm-error', + }, + 'demo-model', + ), + /wasm llm failure/, + ); + + const okEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_llm_status_ok', + ), + ); + const errorEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_llm_status_error', + ), + ); + + assert.equal(okEvent.metadata.caller, 'wasm-llm'); + assert.equal(okEvent.metadata['otel.status_code'], 'OK'); + assert.equal(errorEvent.metadata.caller, 'wasm-llm-error'); + assert.equal(errorEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEvent.metadata['otel.status_message'], /wasm llm failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + test('WebAssembly llm stream flow works from the generated Node package', async () => { const request = { headers: { diff --git a/crates/wasm/tests-js/scope_tests.mjs b/crates/wasm/tests-js/scope_tests.mjs index c6b263a5..36dca863 100644 --- a/crates/wasm/tests-js/scope_tests.mjs +++ b/crates/wasm/tests-js/scope_tests.mjs @@ -4,7 +4,14 @@ import assert from 'node:assert/strict'; import { test } from 'node:test'; -import { currentScope, resetScopeStack, SCOPE_ATTR_PARALLEL, SCOPE_ATTR_RELOCATABLE, wasm } from './test_support.mjs'; +import { + currentScope, + resetScopeStack, + SCOPE_ATTR_PARALLEL, + SCOPE_ATTR_RELOCATABLE, + waitFor, + wasm, +} from './test_support.mjs'; test('WebAssembly scope stack exposes the generated root scope handle', () => { const stack = resetScopeStack(); @@ -81,6 +88,51 @@ test('WebAssembly pushScope supports nullable inputs and root parent handles', ( } }); +test('WebAssembly popScope accepts end metadata and merges with scope metadata', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_scope_end_metadata_sub'; + let scope; + let popped = false; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + scope = wasm.pushScope('wasm_scope_end_metadata', wasm.ScopeType.Function, null, 0, null, { + a: 1, + b: 2, + c: 3, + }); + wasm.popScope(scope, null, undefined, { + c: 3.5, + d: 4, + }); + popped = true; + + const endEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_scope_end_metadata', + ), + ); + assert.deepEqual(endEvent.metadata, { + a: 1, + b: 2, + c: 3.5, + d: 4, + }); + } finally { + wasm.deregisterSubscriber(subscriberName); + if (scope) { + if (!popped) { + wasm.popScope(scope); + } + scope.free(); + } + stack.free(); + } +}); + test('WebAssembly withScope returns callback data for synchronous callbacks', async () => { const stack = resetScopeStack(); @@ -111,6 +163,63 @@ test('WebAssembly withScope returns callback data for synchronous callbacks', as } }); +test('WebAssembly withScope records OK status metadata', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_with_scope_status_ok_sub'; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + const result = await wasm.withScope('wasm_with_scope_status_ok', wasm.ScopeType.Function, () => 'done'); + assert.equal(result, 'done'); + + const endEvent = await waitFor(() => + events.find( + (event) => + event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_with_scope_status_ok', + ), + ); + assert.equal(endEvent.metadata['otel.status_code'], 'OK'); + assert.equal(Object.hasOwn(endEvent.metadata, 'otel.status_message'), false); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + +test('WebAssembly withScope records ERROR status metadata on rejection', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_with_scope_status_error_sub'; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await assert.rejects( + () => + wasm.withScope('wasm_with_scope_status_error', wasm.ScopeType.Tool, async () => { + throw new Error('wasm scope failure'); + }), + /wasm scope failure/, + ); + + const endEvent = await waitFor(() => + events.find( + (event) => + event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_with_scope_status_error', + ), + ); + assert.equal(endEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(endEvent.metadata['otel.status_message'], /wasm scope failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + test('WebAssembly withScope supports async callbacks', async () => { const stack = resetScopeStack(); diff --git a/crates/wasm/tests-js/tools_tests.mjs b/crates/wasm/tests-js/tools_tests.mjs index d0756520..2ddf2f1f 100644 --- a/crates/wasm/tests-js/tools_tests.mjs +++ b/crates/wasm/tests-js/tools_tests.mjs @@ -161,3 +161,70 @@ test('WebAssembly tool execute runs through the generated Node package flow', as wasm.deregisterToolRequestIntercept(toolInterceptName); } }); + +test('WebAssembly toolCallExecute adds OTEL status metadata to end events', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = unique('wasm_tool_status_sub'); + + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await wasm.toolCallExecute( + 'wasm_tool_status_ok', + { + value: 1, + }, + async () => ({ + ok: true, + }), + null, + 0, + null, + { + caller: 'wasm-tool', + 'otel.status_code': 'USER', + }, + ); + + await assert.rejects( + () => + wasm.toolCallExecute( + 'wasm_tool_status_error', + { + value: 2, + }, + async () => { + throw new Error('wasm tool failure'); + }, + null, + 0, + null, + { + caller: 'wasm-tool-error', + }, + ), + /wasm tool failure/, + ); + + const okEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_tool_status_ok', + ), + ); + const errorEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_tool_status_error', + ), + ); + + assert.equal(okEvent.metadata.caller, 'wasm-tool'); + assert.equal(okEvent.metadata['otel.status_code'], 'OK'); + assert.equal(errorEvent.metadata.caller, 'wasm-tool-error'); + assert.equal(errorEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEvent.metadata['otel.status_message'], /wasm tool failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); diff --git a/crates/wasm/tests/integration/context_tests.rs b/crates/wasm/tests/integration/context_tests.rs index 363d2705..05a505fc 100644 --- a/crates/wasm/tests/integration/context_tests.rs +++ b/crates/wasm/tests/integration/context_tests.rs @@ -34,7 +34,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } // =========================================================================== diff --git a/crates/wasm/tests/integration/llm_tests.rs b/crates/wasm/tests/integration/llm_tests.rs index ffbc9b17..fb333736 100644 --- a/crates/wasm/tests/integration/llm_tests.rs +++ b/crates/wasm/tests/integration/llm_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn llm_call( diff --git a/crates/wasm/tests/integration/scope_local_tests.rs b/crates/wasm/tests/integration/scope_local_tests.rs index 3d1d5d2c..59383b84 100644 --- a/crates/wasm/tests/integration/scope_local_tests.rs +++ b/crates/wasm/tests/integration/scope_local_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } async fn tool_call_execute( diff --git a/crates/wasm/tests/integration/scope_tests.rs b/crates/wasm/tests/integration/scope_tests.rs index 802b2ff0..c0d75330 100644 --- a/crates/wasm/tests/integration/scope_tests.rs +++ b/crates/wasm/tests/integration/scope_tests.rs @@ -67,11 +67,11 @@ fn with_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn pop_scope_with_output(handle: &ScopeHandle, output: JsValue) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, output, None) + nemo_relay_wasm::api::pop_scope(handle, output, None, JsValue::NULL) } fn event( @@ -557,6 +557,49 @@ fn test_subscriber_receives_scope_output_payload() { js_sys::eval("delete globalThis.__wasm_scope_end_events").unwrap(); } +#[wasm_bindgen_test] +fn test_pop_scope_merges_end_metadata() { + js_sys::eval("globalThis.__wasm_scope_end_metadata_events = []; true").unwrap(); + let cb = js_fn1( + "event", + "if (event.kind === 'scope' && event.scope_category === 'end') globalThis.__wasm_scope_end_metadata_events.push(event)", + ); + register_subscriber("wasm_scope_end_metadata_collector", cb).unwrap(); + + let scope = push_scope( + "metadata_scope", + ScopeType::Function, + None, + None, + JsValue::NULL, + parse_json(r#"{"a":1,"b":2,"c":3}"#), + ) + .unwrap(); + nemo_relay_wasm::api::pop_scope( + &scope, + JsValue::NULL, + None, + parse_json(r#"{"c":3.5,"d":4}"#), + ) + .unwrap(); + + let events = js_sys::eval("globalThis.__wasm_scope_end_metadata_events").unwrap(); + let arr = js_sys::Array::from(&events); + assert_eq!(arr.length(), 1, "Expected one scope end event"); + + let event = arr.get(0); + let metadata = js_sys::Reflect::get(&event, &"metadata".into()).unwrap(); + let metadata_json = serde_wasm_bindgen::from_value::(metadata) + .expect("metadata should be JSON"); + assert_eq!( + metadata_json, + serde_json::json!({"a": 1, "b": 2, "c": 3.5, "d": 4}) + ); + + deregister_subscriber("wasm_scope_end_metadata_collector").unwrap(); + js_sys::eval("delete globalThis.__wasm_scope_end_metadata_events").unwrap(); +} + #[wasm_bindgen_test] fn test_event_mark() { js_sys::eval("globalThis.__wasm_mark_events = []; true").unwrap(); diff --git a/crates/wasm/tests/integration/tools_tests.rs b/crates/wasm/tests/integration/tools_tests.rs index 9befbd22..355a5327 100644 --- a/crates/wasm/tests/integration/tools_tests.rs +++ b/crates/wasm/tests/integration/tools_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn tool_call( diff --git a/crates/wasm/tests/unit/api_tests.rs b/crates/wasm/tests/unit/api_tests.rs index 23639fc9..5e75da53 100644 --- a/crates/wasm/tests/unit/api_tests.rs +++ b/crates/wasm/tests/unit/api_tests.rs @@ -277,6 +277,6 @@ fn scope_stack_and_lifecycle_wrappers_round_trip_natively() { ) .unwrap(); - pop_scope(&child, JsValue::NULL, None).unwrap(); + pop_scope(&child, JsValue::NULL, None, JsValue::NULL).unwrap(); assert_eq!(get_handle().unwrap().name(), "root"); } diff --git a/go/nemo_relay/coverage_gap_test.go b/go/nemo_relay/coverage_gap_test.go index ca515444..b3b09c37 100644 --- a/go/nemo_relay/coverage_gap_test.go +++ b/go/nemo_relay/coverage_gap_test.go @@ -74,6 +74,17 @@ func TestPublicAPIErrorAndDefaultCoverage(t *testing.T) { } } + handle, err := PushScope("invalid_scope_end_metadata", ScopeTypeAgent) + if err != nil { + t.Fatalf("PushScope failed: %v", err) + } + if err := PopScope(handle, WithScopeEndMetadata(json.RawMessage("{"))); err == nil { + t.Fatal("expected PopScope to fail on invalid end metadata JSON") + } + if err := PopScope(handle); err != nil { + t.Fatalf("cleanup PopScope failed: %v", err) + } + if _, err := ToolCall("invalid_tool_json", json.RawMessage("{")); err == nil { t.Fatal("expected ToolCall to fail on invalid JSON args") } diff --git a/go/nemo_relay/llm_test.go b/go/nemo_relay/llm_test.go index ce1ca3f4..bb9c5d30 100644 --- a/go/nemo_relay/llm_test.go +++ b/go/nemo_relay/llm_test.go @@ -153,6 +153,65 @@ func TestLlmCallExecuteBasic(t *testing.T) { } } +func TestLlmCallExecuteAddsOTELStatusMetadataToEndEvents(t *testing.T) { + metadataByName := map[string]json.RawMessage{} + var mu sync.Mutex + + _ = DeregisterSubscriber("go_llm_status_metadata_sub") + if err := RegisterSubscriber("go_llm_status_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.Category() == "llm" && event.ScopeCategory() == "end" { + mu.Lock() + metadataByName[event.Name()] = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(llmRegisterFailed, err) + } + defer DeregisterSubscriber("go_llm_status_metadata_sub") + + _, err := LlmCallExecute("go_llm_status_ok", makeRequest(), + func(nativeJSON json.RawMessage) (json.RawMessage, error) { + return json.RawMessage(`{"ok":true}`), nil + }, + WithLLMMetadata(json.RawMessage(`{"caller":"go-llm","otel.status_code":"USER"}`)), + ) + if err != nil { + t.Fatalf(llmCallExecuteFailed, err) + } + + _, err = LlmCallExecute("go_llm_status_error", makeRequest(), + func(nativeJSON json.RawMessage) (json.RawMessage, error) { + return nil, errors.New("go llm status failure") + }, + WithLLMMetadata(json.RawMessage(`{"caller":"go-llm-error"}`)), + ) + if err == nil { + t.Fatal("expected LLM execution error") + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(llmFlushSubscribersFailed, err) + } + + mu.Lock() + okMetadata := metadataByName["go_llm_status_ok"] + errorMetadata := metadataByName["go_llm_status_error"] + mu.Unlock() + + assertJSONFieldString(t, okMetadata, "caller", "go-llm") + assertJSONFieldString(t, okMetadata, "otel.status_code", "OK") + assertJSONFieldString(t, errorMetadata, "caller", "go-llm-error") + assertJSONFieldString(t, errorMetadata, "otel.status_code", "ERROR") + + var decoded map[string]interface{} + if err := json.Unmarshal(errorMetadata, &decoded); err != nil { + t.Fatalf("unmarshal error metadata failed: %v; raw=%s", err, errorMetadata) + } + statusMessage, _ := decoded["otel.status_message"].(string) + if !strings.Contains(statusMessage, "go llm status failure") { + t.Fatalf("expected status message to mention callback error, got %v", decoded["otel.status_message"]) + } +} + func TestCodecHandleConstructors(t *testing.T) { if NewOpenAIChatCodec() == nil { t.Fatal("expected OpenAI chat codec handle") diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index 430ea6ba..869535ae 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -44,7 +44,7 @@ typedef void (*NemoRelayFreeFn)(void* user_data); // Core API extern int32_t nemo_relay_get_handle(FfiScopeHandle** out); extern int32_t nemo_relay_push_scope(const char* name, int32_t scope_type, const FfiScopeHandle* parent, uint32_t attributes, const char* data_json, const char* metadata_json, const char* input_json, const int64_t* timestamp_unix_micros, FfiScopeHandle** out); -extern int32_t nemo_relay_pop_scope(const FfiScopeHandle* handle, const char* output_json, const int64_t* timestamp_unix_micros); +extern int32_t nemo_relay_pop_scope(const FfiScopeHandle* handle, const char* output_json, const char* metadata_json, const int64_t* timestamp_unix_micros); extern int32_t nemo_relay_event(const char* name, const FfiScopeHandle* parent, const char* data_json, const char* metadata_json, const int64_t* timestamp_unix_micros); // Tool lifecycle @@ -458,12 +458,13 @@ func WithScopeTimestamp(timestamp time.Time) ScopeOption { type scopeEndOptions struct { output *C.char + metadata *C.char timestamp *C.int64_t } // ScopeEndOption is a functional option that configures optional parameters for -// [PopScope]. Available options include [WithOutput] and -// [WithScopeEndTimestamp]. +// [PopScope]. Available options include [WithOutput], +// [WithScopeEndMetadata], and [WithScopeEndTimestamp]. type ScopeEndOption func(*scopeEndOptions) // WithOutput attaches an arbitrary JSON semantic output payload to the scope end event. @@ -473,6 +474,16 @@ func WithOutput(output json.RawMessage) ScopeEndOption { } } +// WithScopeEndMetadata attaches an arbitrary JSON metadata payload to the +// scope End event. When the scope handle already has metadata, object keys in +// this payload overwrite matching existing keys and preserve non-conflicting +// keys. +func WithScopeEndMetadata(metadata json.RawMessage) ScopeEndOption { + return func(o *scopeEndOptions) { + o.metadata = C.CString(string(metadata)) + } +} + // WithScopeEndTimestamp records an explicit time.Time on the scope End event. // The value is converted to UTC Unix microseconds at the FFI boundary; // sub-microsecond precision is truncated. Omit this option to use the runtime @@ -538,8 +549,9 @@ func PushScope(name string, scopeType ScopeType, opts ...ScopeOption) (*ScopeHan // PopScope removes the given scope from the scope stack and emits an End event // to all registered subscribers. The handle must have been returned by a // previous call to [PushScope]. Popping scopes out of stack order returns an -// error. Optional end payloads can be attached via [WithOutput], and an -// explicit event timestamp can be supplied with [WithScopeEndTimestamp]. +// error. Optional end payloads can be attached via [WithOutput] and +// [WithScopeEndMetadata], and an explicit event timestamp can be supplied with +// [WithScopeEndTimestamp]. func PopScope(handle *ScopeHandle, opts ...ScopeEndOption) error { o := &scopeEndOptions{} for _, opt := range opts { @@ -548,10 +560,13 @@ func PopScope(handle *ScopeHandle, opts ...ScopeEndOption) error { if o.output != nil { defer C.free(unsafe.Pointer(o.output)) } + if o.metadata != nil { + defer C.free(unsafe.Pointer(o.metadata)) + } if o.timestamp != nil { defer C.free(unsafe.Pointer(o.timestamp)) } - return checkStatus(C.nemo_relay_pop_scope(handle.ptr, o.output, o.timestamp)) + return checkStatus(C.nemo_relay_pop_scope(handle.ptr, o.output, o.metadata, o.timestamp)) } // --------------------------------------------------------------------------- diff --git a/go/nemo_relay/scope/scope.go b/go/nemo_relay/scope/scope.go index 3b5074c1..628784da 100644 --- a/go/nemo_relay/scope/scope.go +++ b/go/nemo_relay/scope/scope.go @@ -22,9 +22,36 @@ package scope import ( + "encoding/json" + "fmt" + "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" ) +func statusMetadata(statusCode, statusMessage string) nemo_relay.ScopeEndOption { + metadata := map[string]string{"otel.status_code": statusCode} + if statusMessage != "" { + metadata["otel.status_message"] = statusMessage + } + raw, _ := json.Marshal(metadata) + return nemo_relay.WithScopeEndMetadata(raw) +} + +func cleanupScope(handle *nemo_relay.ScopeHandle) func() { + cleaned := false + return func() { + if cleaned { + return + } + cleaned = true + if recovered := recover(); recovered != nil { + _ = Pop(handle, statusMetadata("ERROR", fmt.Sprint(recovered))) + panic(recovered) + } + _ = Pop(handle, statusMetadata("OK", "")) + } +} + // GetHandle returns the handle for the scope currently at the top of the scope // stack. Returns an error if the scope stack is empty. This is a shorthand for // [nemo_relay.GetHandle]. @@ -42,7 +69,8 @@ func Push(name string, scopeType nemo_relay.ScopeType, opts ...nemo_relay.ScopeO // Pop removes the given scope from the scope stack and emits an End event to // all registered subscribers. Optional arguments, including -// [nemo_relay.WithScopeEndTimestamp], are forwarded to [nemo_relay.PopScope]. +// [nemo_relay.WithScopeEndMetadata] and [nemo_relay.WithScopeEndTimestamp], +// are forwarded to [nemo_relay.PopScope]. func Pop(handle *nemo_relay.ScopeHandle, opts ...nemo_relay.ScopeEndOption) error { return nemo_relay.PopScope(handle, opts...) } @@ -71,9 +99,7 @@ func WithScope(name string, scopeType nemo_relay.ScopeType, opts ...nemo_relay.S // Push failed, so cleanup is intentionally a no-op. } } - return func() { - Pop(handle) - } + return cleanupScope(handle) } // WithScopeHandle pushes a new scope and returns both the scope handle and a @@ -93,7 +119,5 @@ func WithScopeHandle(name string, scopeType nemo_relay.ScopeType, opts ...nemo_r // Push failed, so cleanup is intentionally a no-op. } } - return handle, func() { - Pop(handle) - } + return handle, cleanupScope(handle) } diff --git a/go/nemo_relay/scope/scope_test.go b/go/nemo_relay/scope/scope_test.go index b7074af0..891b927d 100644 --- a/go/nemo_relay/scope/scope_test.go +++ b/go/nemo_relay/scope/scope_test.go @@ -4,6 +4,9 @@ package scope_test import ( + "encoding/json" + "strings" + "sync" "testing" "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" @@ -16,6 +19,54 @@ const ( expectedNonNilHandle = "expected non-nil handle" ) +func metadataStringField(t *testing.T, raw json.RawMessage, field string) string { + t.Helper() + + var decoded map[string]interface{} + if err := json.Unmarshal(raw, &decoded); err != nil { + t.Fatalf("unmarshal metadata failed: %v; raw=%s", err, raw) + } + value, ok := decoded[field] + if !ok { + t.Fatalf("expected metadata field %q, got %v", field, decoded) + } + text, ok := value.(string) + if !ok { + t.Fatalf("expected metadata field %q to be string, got %T", field, value) + } + return text +} + +func captureScopeEndMetadata(t *testing.T, subscriberName, scopeName string, fn func()) json.RawMessage { + t.Helper() + + var captured json.RawMessage + var mu sync.Mutex + _ = nemo_relay.DeregisterSubscriber(subscriberName) + if err := nemo_relay.RegisterSubscriber(subscriberName, func(event nemo_relay.Event) { + if event.Kind() == "scope" && event.ScopeCategory() == "end" && event.Name() == scopeName { + mu.Lock() + captured = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf("RegisterSubscriber failed: %v", err) + } + defer nemo_relay.DeregisterSubscriber(subscriberName) + + fn() + + if err := nemo_relay.FlushSubscribers(); err != nil { + t.Fatalf("FlushSubscribers failed: %v", err) + } + mu.Lock() + defer mu.Unlock() + if captured == nil { + t.Fatalf("expected end metadata for scope %q", scopeName) + } + return captured +} + // ============================================================================ // WithScope // ============================================================================ @@ -80,6 +131,17 @@ func TestWithScopeDeferCleanup(t *testing.T) { } } +func TestWithScopeRecordsOKStatusMetadata(t *testing.T) { + metadata := captureScopeEndMetadata(t, "go_scope_with_status_ok_sub", "with_scope_ok_status", func() { + cleanup := scope.WithScope("with_scope_ok_status", nemo_relay.ScopeTypeFunction) + cleanup() + }) + + if got := metadataStringField(t, metadata, "otel.status_code"); got != "OK" { + t.Fatalf("expected otel.status_code=OK, got %q", got) + } +} + func TestWithScopeCleanupOnPanic(t *testing.T) { before, err := nemo_relay.GetHandle() if err != nil { @@ -113,6 +175,27 @@ func TestWithScopeCleanupOnPanic(t *testing.T) { } } +func TestWithScopeRecordsErrorStatusMetadataOnPanic(t *testing.T) { + metadata := captureScopeEndMetadata(t, "go_scope_with_status_error_sub", "with_scope_error_status", func() { + func() { + defer func() { + if recover() == nil { + t.Fatal("expected panic") + } + }() + defer scope.WithScope("with_scope_error_status", nemo_relay.ScopeTypeTool)() + panic("scope status failure") + }() + }) + + if got := metadataStringField(t, metadata, "otel.status_code"); got != "ERROR" { + t.Fatalf("expected otel.status_code=ERROR, got %q", got) + } + if got := metadataStringField(t, metadata, "otel.status_message"); !strings.Contains(got, "scope status failure") { + t.Fatalf("expected status message to mention panic, got %q", got) + } +} + // ============================================================================ // WithScopeHandle // ============================================================================ diff --git a/go/nemo_relay/scope_test.go b/go/nemo_relay/scope_test.go index de78a5bf..9283f383 100644 --- a/go/nemo_relay/scope_test.go +++ b/go/nemo_relay/scope_test.go @@ -662,6 +662,41 @@ func TestScopeWithDataAndMetadata(t *testing.T) { } } +func TestPopScopeWithEndMetadataMergesWithScopeMetadata(t *testing.T) { + var capturedMeta json.RawMessage + var mu sync.Mutex + + _ = DeregisterSubscriber("go_scope_end_metadata_sub") + if err := RegisterSubscriber("go_scope_end_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.ScopeCategory() == "end" && event.Name() == "end_metadata_scope" { + mu.Lock() + capturedMeta = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(registerSubscriberFailed, err) + } + defer DeregisterSubscriber("go_scope_end_metadata_sub") + + handle, err := PushScope("end_metadata_scope", ScopeTypeAgent, + WithMetadata(json.RawMessage(`{"a":1,"b":2,"c":3}`)), + ) + if err != nil { + t.Fatalf(pushScopeFailed, err) + } + if err := PopScope(handle, WithScopeEndMetadata(json.RawMessage(`{"c":3.5,"d":4}`))); err != nil { + t.Fatalf("PopScope failed: %v", err) + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(flushSubscribersFailed, err) + } + + assertJSONFieldNumber(t, capturedMeta, "a", 1) + assertJSONFieldNumber(t, capturedMeta, "b", 2) + assertJSONFieldNumber(t, capturedMeta, "c", 3.5) + assertJSONFieldNumber(t, capturedMeta, "d", 4) +} + func TestScopeEventWithDataAndMetadata(t *testing.T) { var capturedData, capturedMeta json.RawMessage var mu sync.Mutex diff --git a/go/nemo_relay/tools_test.go b/go/nemo_relay/tools_test.go index d242f71f..cd9bac7f 100644 --- a/go/nemo_relay/tools_test.go +++ b/go/nemo_relay/tools_test.go @@ -159,6 +159,65 @@ func TestToolCallExecuteWithAttributes(t *testing.T) { } } +func TestToolCallExecuteAddsOTELStatusMetadataToEndEvents(t *testing.T) { + metadataByName := map[string]json.RawMessage{} + var mu sync.Mutex + + _ = DeregisterSubscriber("go_tool_status_metadata_sub") + if err := RegisterSubscriber("go_tool_status_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.Category() == "tool" && event.ScopeCategory() == "end" { + mu.Lock() + metadataByName[event.Name()] = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(registerFailed, err) + } + defer DeregisterSubscriber("go_tool_status_metadata_sub") + + _, err := ToolCallExecute("go_tool_status_ok", json.RawMessage(`{"x":1}`), + func(args json.RawMessage) (json.RawMessage, error) { + return json.RawMessage(`{"ok":true}`), nil + }, + WithToolMetadata(json.RawMessage(`{"caller":"go-tool","otel.status_code":"USER"}`)), + ) + if err != nil { + t.Fatalf(toolCallExecuteFailed, err) + } + + _, err = ToolCallExecute("go_tool_status_error", json.RawMessage(`{"x":2}`), + func(args json.RawMessage) (json.RawMessage, error) { + return nil, errors.New("go tool status failure") + }, + WithToolMetadata(json.RawMessage(`{"caller":"go-tool-error"}`)), + ) + if err == nil { + t.Fatal("expected tool execution error") + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(toolFlushSubscribersFailed, err) + } + + mu.Lock() + okMetadata := metadataByName["go_tool_status_ok"] + errorMetadata := metadataByName["go_tool_status_error"] + mu.Unlock() + + assertJSONFieldString(t, okMetadata, "caller", "go-tool") + assertJSONFieldString(t, okMetadata, "otel.status_code", "OK") + assertJSONFieldString(t, errorMetadata, "caller", "go-tool-error") + assertJSONFieldString(t, errorMetadata, "otel.status_code", "ERROR") + + var decoded map[string]interface{} + if err := json.Unmarshal(errorMetadata, &decoded); err != nil { + t.Fatalf("unmarshal error metadata failed: %v; raw=%s", err, errorMetadata) + } + statusMessage, _ := decoded["otel.status_message"].(string) + if !strings.Contains(statusMessage, "go tool status failure") { + t.Fatalf("expected status message to mention callback error, got %v", decoded["otel.status_message"]) + } +} + // ============================================================================ // Tool guardrails // ============================================================================ diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index 8a0ea194..e3b66865 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -1184,6 +1184,7 @@ def push_scope( def pop_scope( handle: ScopeHandle, output: Optional[_Json] = None, + metadata: Optional[_Json] = None, timestamp: datetime | None = None, ) -> None: """Pop a scope and emit its end event. @@ -1191,6 +1192,7 @@ def pop_scope( Args: handle: Handle returned by ``push_scope``. output: Optional semantic output payload recorded on the end event. + metadata: Optional JSON metadata recorded on the end event. timestamp: Optional timezone-aware datetime recorded on the end event. When omitted, the runtime default end timestamp is used. @@ -1199,8 +1201,8 @@ def pop_scope( Exceptional flow: Raises native runtime errors if ``handle`` is not the current scope or - if ``output`` cannot be converted to JSON-compatible data. Raises for - invalid timestamp types or naive datetimes. + if ``output`` or ``metadata`` cannot be converted to JSON-compatible data. + Raises for invalid timestamp types or naive datetimes. """ ... diff --git a/python/nemo_relay/integrations/langchain/_serialization.py b/python/nemo_relay/integrations/langchain/_serialization.py index 9ab3501d..3e66d83f 100644 --- a/python/nemo_relay/integrations/langchain/_serialization.py +++ b/python/nemo_relay/integrations/langchain/_serialization.py @@ -315,7 +315,7 @@ def _prepare_lc_payloads(payload: Any) -> Any: prepared = {} for key, value in payload.items(): prepared[key] = _prepare_lc_payloads(value) - elif isinstance(payload, list | tuple): + elif isinstance(payload, list | tuple | set): prepared = [] for value in payload: prepared.append(_prepare_lc_payloads(value)) diff --git a/python/nemo_relay/integrations/langchain/callbacks.py b/python/nemo_relay/integrations/langchain/callbacks.py index dd8f3d64..b5aa2515 100644 --- a/python/nemo_relay/integrations/langchain/callbacks.py +++ b/python/nemo_relay/integrations/langchain/callbacks.py @@ -81,7 +81,7 @@ def on_chain_end( **kwargs: typing.Any, ) -> typing.Any: """Pop the NeMo Relay scope associated with a LangChain chain run.""" - self._pop_scope(run_id, output=outputs) + self._pop_scope(run_id, output=outputs, metadata={"otel.status_code": "OK"}) def on_chain_error( self, @@ -92,15 +92,21 @@ def on_chain_error( **kwargs: typing.Any, ) -> typing.Any: """Pop the NeMo Relay scope associated with a failed LangChain chain run.""" - self._pop_scope(run_id, output={"error": repr(error)}) - - def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None) -> None: + self._pop_scope( + run_id, + output={"error": repr(error)}, + metadata={"otel.status_code": "ERROR", "otel.status_message": str(error)}, + ) + + def _pop_scope( + self, run_id: UUID, *, output: dict[str, typing.Any] | None = None, metadata: nemo_relay.Json | None = None + ) -> None: handle = self._scope_handles.pop(run_id, None) if handle is None: return try: prepared_outputs = _prepare_lc_payloads(output) if output is not None else None - nemo_relay.scope.pop(handle, output=prepared_outputs) + nemo_relay.scope.pop(handle, output=prepared_outputs, metadata=metadata) except Exception: _logger.error("NeMo Relay: scope.pop failed", exc_info=True) diff --git a/python/nemo_relay/integrations/langgraph/callbacks.py b/python/nemo_relay/integrations/langgraph/callbacks.py index dfc320d6..24122f81 100644 --- a/python/nemo_relay/integrations/langgraph/callbacks.py +++ b/python/nemo_relay/integrations/langgraph/callbacks.py @@ -17,26 +17,10 @@ _logger = logging.getLogger(__name__) -def _json_safe(value: Any) -> nemo_relay.Json: - """Return a conservative JSON-compatible representation for mark payloads.""" - try: - value = _prepare_lc_payloads(value) - except Exception: - pass - - if value is None or isinstance(value, str | int | float | bool): - return value - if isinstance(value, dict): - return {str(key): _json_safe(item) for key, item in value.items()} - if isinstance(value, list | tuple | set): - return [_json_safe(item) for item in value] - return repr(value) - - def _interrupt_to_payload(interrupt: Any) -> dict[str, nemo_relay.Json]: return { - "id": _json_safe(getattr(interrupt, "id", None)), - "value": _json_safe(getattr(interrupt, "value", interrupt)), + "id": _prepare_lc_payloads(getattr(interrupt, "id", None)), + "value": _prepare_lc_payloads(getattr(interrupt, "value", interrupt)), } @@ -81,7 +65,7 @@ def _emit_graph_mark(self, name: str, data: dict[str, Any]) -> None: try: nemo_relay.scope.event( name, - data=_json_safe(data), + data=_prepare_lc_payloads(data), metadata={"integration": "langgraph"}, ) except Exception: diff --git a/python/nemo_relay/scope.py b/python/nemo_relay/scope.py index cc1cf2fd..9c6ec388 100644 --- a/python/nemo_relay/scope.py +++ b/python/nemo_relay/scope.py @@ -147,12 +147,15 @@ def push( ) -def pop(handle: ScopeHandle, *, output: Json | None = None, timestamp: datetime | None = None) -> None: +def pop( + handle: ScopeHandle, *, output: Json | None = None, metadata: Json | None = None, timestamp: datetime | None = None +) -> None: """Pop a scope previously returned by ``push()`` or ``scope()``. Args: handle: Scope handle to close. output: Optional JSON payload exported as the semantic scope output. + metadata: Optional JSON metadata to append to the metadata set when the scope was created. timestamp: Optional timezone-aware ``datetime`` recorded on the scope end event. When omitted, the runtime default end timestamp is used. @@ -166,7 +169,7 @@ def pop(handle: ScopeHandle, *, output: Json | None = None, timestamp: datetime strings and naive datetimes are rejected. """ _ensure_scope_stack() - _native_pop_scope(handle, output=output, timestamp=timestamp) + _native_pop_scope(handle, output=output, metadata=metadata, timestamp=timestamp) def event( @@ -215,6 +218,8 @@ def scope( ) -> Iterator[ScopeHandle]: """Create a scope for the duration of a ``with`` block. + OTEL status codes will be automatically recorded in the scope's metadata. + Args: name: Human-readable name for the new scope. scope_type: Semantic scope type, such as ``ScopeType.Agent`` or @@ -256,6 +261,8 @@ def scope( """ _ensure_scope_stack() pushed_handle = None + status_code = "UNSET" + status_message = None try: pushed_handle = _native_push_scope( name, @@ -268,9 +275,17 @@ def scope( timestamp=timestamp, ) yield pushed_handle + status_code = "OK" + except Exception as e: + status_code = "ERROR" + status_message = str(e) + raise finally: if pushed_handle is not None: - _native_pop_scope(pushed_handle, timestamp=end_timestamp) + metadata = {"otel.status_code": status_code} + if status_message is not None: + metadata["otel.status_message"] = status_message + _native_pop_scope(pushed_handle, metadata=metadata, timestamp=end_timestamp) __all__ = ["event", "get_handle", "pop", "push", "scope"] diff --git a/python/tests/integrations/langchain_tests/test_callbacks.py b/python/tests/integrations/langchain_tests/test_callbacks.py index bc72fe29..2b617901 100644 --- a/python/tests/integrations/langchain_tests/test_callbacks.py +++ b/python/tests/integrations/langchain_tests/test_callbacks.py @@ -109,7 +109,11 @@ def test_on_chain_end_pops_scope(self, handler: NemoRelayCallbackHandler, mock_n run_id=run_id, ) - mock_nemo_relay.scope.pop.assert_called_once_with(handle, output={"output": "result"}) + mock_nemo_relay.scope.pop.assert_called_once_with( + handle, + output={"output": "result"}, + metadata={"otel.status_code": "OK"}, + ) assert run_id not in handler._scope_handles def test_on_chain_error_pops_scope(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock): @@ -126,7 +130,11 @@ def test_on_chain_error_pops_scope(self, handler: NemoRelayCallbackHandler, mock run_id=run_id, ) - mock_nemo_relay.scope.pop.assert_called_once_with(handle, output={"error": "RuntimeError('boom')"}) + mock_nemo_relay.scope.pop.assert_called_once_with( + handle, + output={"error": "RuntimeError('boom')"}, + metadata={"otel.status_code": "ERROR", "otel.status_message": "boom"}, + ) assert run_id not in handler._scope_handles def test_on_chain_end_prepares_command_outputs(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock): @@ -183,6 +191,7 @@ def test_on_chain_end_prepares_command_outputs(self, handler: NemoRelayCallbackH }, } }, + metadata={"otel.status_code": "OK"}, ) def test_parent_scope_passed_to_push(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock):