From c8e83bbbb6b9d64a7c914fecd4286ed3cf407d62 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 08:46:18 -0700 Subject: [PATCH 01/29] WIP: Untested Signed-off-by: David Gardner --- crates/core/src/api/scope.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/core/src/api/scope.rs b/crates/core/src/api/scope.rs index 51700d35..31d60d4c 100644 --- a/crates/core/src/api/scope.rs +++ b/crates/core/src/api/scope.rs @@ -180,6 +180,9 @@ pub struct EndScopeHandleParams<'a> { /// Optional JSON payload exported as the semantic scope output. #[builder(default)] pub data: Option, + /// Optional metadata to be appended to the metadata set when the scope was created. + #[builder(default)] + pub metadata: Option, /// Optional timestamp recorded on the emitted end event. When omitted, the /// runtime records the current UTC time, or one microsecond after the /// handle start time if the current time is not later. @@ -196,6 +199,9 @@ pub struct PopScopeParams<'a> { /// Optional JSON payload exported as the semantic scope output. #[builder(default)] pub output: Option, + /// Optional JSON payload metadata to be appended to the metadata set when the scope was created. + #[builder(default)] + pub metadata: Option, /// Optional timestamp recorded on the emitted end event. When omitted, the /// runtime records the current UTC time, or one microsecond after the /// handle start time if the current time is not later. @@ -347,6 +353,7 @@ pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> { .handle(&scope) .data_opt(params.output) .timestamp_opt(params.timestamp) + .metadata_opt(params.metadata) .build(), ); (scope, event, subscribers) From f9f87d80f2b3a47921021a1b1f12d6d9b9785ab5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 08:54:59 -0700 Subject: [PATCH 02/29] WIP: Untested Signed-off-by: David Gardner --- crates/core/src/api/runtime/state.rs | 14 +++++++++-- crates/core/tests/unit/context_tests.rs | 31 ++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/crates/core/src/api/runtime/state.rs b/crates/core/src/api/runtime/state.rs index cab813d8..ee69b6f6 100644 --- a/crates/core/src/api/runtime/state.rs +++ b/crates/core/src/api/runtime/state.rs @@ -247,20 +247,30 @@ impl NemoRelayContextState { /// # Parameters /// - `handle`: Scope handle to serialize into an event. /// - `data`: Optional data payload returned from the scope. + /// - `metadata`: Optional metadata payload merged over `handle.metadata`. /// /// # Returns /// A scope-end [`Event`] derived from the provided handle. - pub fn end_scope_handle(&self, handle: &ScopeHandle, data: Option) -> Event { + pub fn end_scope_handle( + &self, + handle: &ScopeHandle, + data: Option, + metadata: Option, + ) -> Event { self.build_scope_end_event( EndScopeHandleParams::builder() .handle(handle) .data_opt(data) + .metadata_opt(metadata) .build(), ) } /// Build a scope-end event from builder parameters. /// + /// The `metadata` payload is merged over the metadata already stored on + /// the handle. + /// /// # Parameters /// - `params`: Scope end-event builder parameters. /// @@ -279,7 +289,7 @@ impl NemoRelayContextState { ) .name(handle.name.as_str()) .data_opt(params.data) - .metadata_opt(handle.metadata.clone()) + .metadata_opt(merge_json(handle.metadata.clone(), params.metadata)) .build(), ScopeCategory::End, scope_attributes_to_strings(handle.attributes), diff --git a/crates/core/tests/unit/context_tests.rs b/crates/core/tests/unit/context_tests.rs index ac4ca5dc..f402de4f 100644 --- a/crates/core/tests/unit/context_tests.rs +++ b/crates/core/tests/unit/context_tests.rs @@ -15,7 +15,7 @@ use crate::api::runtime::EventSubscriberFn; use crate::api::runtime::ScopeStack; use crate::api::runtime::global_context; use crate::api::runtime::{NemoRelayContextState, flush_subscribers}; -use crate::api::scope::{ScopeAttributes, ScopeHandle, ScopeType}; +use crate::api::scope::{EndScopeHandleParams, ScopeAttributes, ScopeHandle, ScopeType}; use crate::api::tool::CreateToolHandleParams; use crate::context::registries::{ merge_execution_intercept_callables, merge_guardrail_entries, merge_intercept_entries, @@ -305,6 +305,35 @@ fn context_state_supports_extensions_events_and_builders() { assert_eq!(events.lock().unwrap().as_slice(), ["mark"]); } +#[test] +fn scope_end_metadata_merges_with_handle_metadata() { + let state = NemoRelayContextState::new(); + let scope = state.create_scope_handle( + crate::api::scope::CreateScopeHandleParams::builder() + .name("agent") + .scope_type(ScopeType::Agent) + .metadata(json!({"a": 1, "b": 2, "c": 3})) + .build(), + ); + + let scope_end = state.build_scope_end_event( + EndScopeHandleParams::builder() + .handle(&scope) + .metadata(json!({"c": 3.5, "d": 4})) + .build(), + ); + assert_eq!( + scope_end.metadata(), + Some(&json!({"a": 1, "b": 2, "c": 3.5, "d": 4})) + ); + + let scope_end = state.end_scope_handle(&scope, None, Some(json!({"c": 5, "e": 6}))); + assert_eq!( + scope_end.metadata(), + Some(&json!({"a": 1, "b": 2, "c": 5, "e": 6})) + ); +} + #[test] fn global_context_is_a_singleton_handle() { let first = global_context(); From b55c82ed2079e45010e6974c4f85878c3aea6d10 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 09:21:22 -0700 Subject: [PATCH 03/29] Add setting metadata to the pop scope method in Python bindings, add otel specific attributes to metadata Signed-off-by: David Gardner --- crates/python/src/py_api/mod.rs | 6 +++++- python/nemo_relay/_native.pyi | 2 ++ python/nemo_relay/scope.py | 19 ++++++++++++++++--- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/crates/python/src/py_api/mod.rs b/crates/python/src/py_api/mod.rs index 43b53d47..e36caf7c 100644 --- a/crates/python/src/py_api/mod.rs +++ b/crates/python/src/py_api/mod.rs @@ -252,6 +252,7 @@ fn push_scope( /// Args: /// handle: The current top-of-stack scope handle returned by ``push``. /// output: Optional JSON-serializable semantic output payload for the scope end event. +/// metadata: Optional JSON-serializable metadata to append to the metadata set when the scope was created. /// timestamp: Optional timezone-aware ``datetime.datetime`` for the emitted end event. /// When omitted, the runtime default end timestamp is used. /// @@ -261,18 +262,21 @@ fn push_scope( /// TypeError: If ``timestamp`` is not a ``datetime.datetime``. /// ValueError: If ``timestamp`` is a naive datetime. #[pyfunction] -#[pyo3(signature = (handle: "ScopeHandle", output: "object | None"=None, timestamp: "datetime.datetime | None"=None) -> "None", text_signature = "(handle: ScopeHandle, output: object | None = None, timestamp: datetime.datetime | None = None) -> None")] +#[pyo3(signature = (handle: "ScopeHandle", output: "object | None"=None, metadata: "object | None"=None, timestamp: "datetime.datetime | None"=None) -> "None", text_signature = "(handle: ScopeHandle, output: object | None = None, metadata: object | None = None, timestamp: datetime.datetime | None = None) -> None")] fn pop_scope( handle: &PyScopeHandle, output: Option<&Bound<'_, PyAny>>, + metadata: Option<&Bound<'_, PyAny>>, timestamp: Option<&Bound<'_, PyAny>>, ) -> PyResult<()> { let output = opt_py_to_json(output)?; + let metadata = opt_py_to_json(metadata)?; let timestamp = opt_py_to_timestamp(timestamp)?; core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&handle.inner.uuid) .output_opt(output) + .metadata_opt(metadata) .timestamp_opt(timestamp) .build(), ) diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index 642ca3db..4c40e180 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -1161,6 +1161,7 @@ def push_scope( def pop_scope( handle: ScopeHandle, output: Optional[_Json] = None, + metadata: Optional[_Json] = None, timestamp: datetime | None = None, ) -> None: """Pop a scope and emit its end event. @@ -1168,6 +1169,7 @@ def pop_scope( Args: handle: Handle returned by ``push_scope``. output: Optional semantic output payload recorded on the end event. + metadata: Optional JSON metadata recorded on the end event. timestamp: Optional timezone-aware datetime recorded on the end event. When omitted, the runtime default end timestamp is used. diff --git a/python/nemo_relay/scope.py b/python/nemo_relay/scope.py index cc1cf2fd..d14c790d 100644 --- a/python/nemo_relay/scope.py +++ b/python/nemo_relay/scope.py @@ -147,12 +147,13 @@ def push( ) -def pop(handle: ScopeHandle, *, output: Json | None = None, timestamp: datetime | None = None) -> None: +def pop(handle: ScopeHandle, *, output: Json | None = None, metadata: Json | None = None, timestamp: datetime | None = None) -> None: """Pop a scope previously returned by ``push()`` or ``scope()``. Args: handle: Scope handle to close. output: Optional JSON payload exported as the semantic scope output. + metadata: Optional JSON metadata to append to the metadata set when the scope was created. timestamp: Optional timezone-aware ``datetime`` recorded on the scope end event. When omitted, the runtime default end timestamp is used. @@ -166,7 +167,7 @@ def pop(handle: ScopeHandle, *, output: Json | None = None, timestamp: datetime strings and naive datetimes are rejected. """ _ensure_scope_stack() - _native_pop_scope(handle, output=output, timestamp=timestamp) + _native_pop_scope(handle, output=output, metadata=metadata, timestamp=timestamp) def event( @@ -215,6 +216,8 @@ def scope( ) -> Iterator[ScopeHandle]: """Create a scope for the duration of a ``with`` block. + OTEL status codes will be automatically recorded in the scope's metadata. + Args: name: Human-readable name for the new scope. scope_type: Semantic scope type, such as ``ScopeType.Agent`` or @@ -256,6 +259,8 @@ def scope( """ _ensure_scope_stack() pushed_handle = None + statusCode = 'UNSET' + statusMessage = None try: pushed_handle = _native_push_scope( name, @@ -268,9 +273,17 @@ def scope( timestamp=timestamp, ) yield pushed_handle + statusCode = 'OK' + except Exception as e: + statusCode = 'ERROR' + statusMessage = str(e) + raise e finally: if pushed_handle is not None: - _native_pop_scope(pushed_handle, timestamp=end_timestamp) + metadata = {"otel.status_code": statusCode} + if statusMessage is not None: + metadata["otel.status_message"] = statusMessage + _native_pop_scope(pushed_handle, metadata=metadata, timestamp=end_timestamp) __all__ = ["event", "get_handle", "pop", "push", "scope"] From 6bca3055493dcd847023423b913d999e871d1e89 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 09:26:52 -0700 Subject: [PATCH 04/29] Set scope otel metadata in callbacks Signed-off-by: David Gardner --- python/nemo_relay/integrations/langchain/callbacks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/callbacks.py b/python/nemo_relay/integrations/langchain/callbacks.py index dd8f3d64..3340462c 100644 --- a/python/nemo_relay/integrations/langchain/callbacks.py +++ b/python/nemo_relay/integrations/langchain/callbacks.py @@ -81,7 +81,7 @@ def on_chain_end( **kwargs: typing.Any, ) -> typing.Any: """Pop the NeMo Relay scope associated with a LangChain chain run.""" - self._pop_scope(run_id, output=outputs) + self._pop_scope(run_id, output=outputs, metadata={"otel.status_code": "OK"}) def on_chain_error( self, @@ -92,15 +92,15 @@ def on_chain_error( **kwargs: typing.Any, ) -> typing.Any: """Pop the NeMo Relay scope associated with a failed LangChain chain run.""" - self._pop_scope(run_id, output={"error": repr(error)}) + self._pop_scope(run_id, output={"error": repr(error)}, metadata={"otel.status_code": "ERROR", "otel.status_message": str(error)}) - def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None) -> None: + def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None, metadata: nemo_relay.Json | None = None) -> None: handle = self._scope_handles.pop(run_id, None) if handle is None: return try: prepared_outputs = _prepare_lc_payloads(output) if output is not None else None - nemo_relay.scope.pop(handle, output=prepared_outputs) + nemo_relay.scope.pop(handle, output=prepared_outputs, metadata=metadata) except Exception: _logger.error("NeMo Relay: scope.pop failed", exc_info=True) From 3c384fcbf126be0b389679eed0049923ed7d13b9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 09:29:25 -0700 Subject: [PATCH 05/29] Drop _json_safe in favor of _prepare_lc_payloads Signed-off-by: David Gardner --- .../integrations/langchain/_serialization.py | 2 +- .../integrations/langgraph/callbacks.py | 23 +++---------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/_serialization.py b/python/nemo_relay/integrations/langchain/_serialization.py index 9ab3501d..3e66d83f 100644 --- a/python/nemo_relay/integrations/langchain/_serialization.py +++ b/python/nemo_relay/integrations/langchain/_serialization.py @@ -315,7 +315,7 @@ def _prepare_lc_payloads(payload: Any) -> Any: prepared = {} for key, value in payload.items(): prepared[key] = _prepare_lc_payloads(value) - elif isinstance(payload, list | tuple): + elif isinstance(payload, list | tuple | set): prepared = [] for value in payload: prepared.append(_prepare_lc_payloads(value)) diff --git a/python/nemo_relay/integrations/langgraph/callbacks.py b/python/nemo_relay/integrations/langgraph/callbacks.py index dfc320d6..387ef835 100644 --- a/python/nemo_relay/integrations/langgraph/callbacks.py +++ b/python/nemo_relay/integrations/langgraph/callbacks.py @@ -16,27 +16,10 @@ _logger = logging.getLogger(__name__) - -def _json_safe(value: Any) -> nemo_relay.Json: - """Return a conservative JSON-compatible representation for mark payloads.""" - try: - value = _prepare_lc_payloads(value) - except Exception: - pass - - if value is None or isinstance(value, str | int | float | bool): - return value - if isinstance(value, dict): - return {str(key): _json_safe(item) for key, item in value.items()} - if isinstance(value, list | tuple | set): - return [_json_safe(item) for item in value] - return repr(value) - - def _interrupt_to_payload(interrupt: Any) -> dict[str, nemo_relay.Json]: return { - "id": _json_safe(getattr(interrupt, "id", None)), - "value": _json_safe(getattr(interrupt, "value", interrupt)), + "id": _prepare_lc_payloads(getattr(interrupt, "id", None)), + "value": _prepare_lc_payloads(getattr(interrupt, "value", interrupt)), } @@ -81,7 +64,7 @@ def _emit_graph_mark(self, name: str, data: dict[str, Any]) -> None: try: nemo_relay.scope.event( name, - data=_json_safe(data), + data=_prepare_lc_payloads(data), metadata={"integration": "langgraph"}, ) except Exception: From 562922d6d317c755b8fe8a3b00619bb202f7c251 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 10:01:15 -0700 Subject: [PATCH 06/29] First pass at converting event matadata into span status, WIP: Untested Signed-off-by: David Gardner --- crates/core/src/observability/otel.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index f7b42c6c..3b30433c 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -657,16 +657,35 @@ fn start_attributes(event: &Event) -> Vec { fn end_attributes(event: &Event) -> Vec { let mut attributes = Vec::new(); push_serialized(&mut attributes, "nemo_relay.end.data_json", event.data()); + + let metadata = event.metadata(); push_serialized( &mut attributes, "nemo_relay.end.metadata_json", - event.metadata(), + metadata, ); push_serialized( &mut attributes, "nemo_relay.end.output_json", event.output(), ); + + if metadata.is_some() && metadata.unwrap()["otel.status_code"].is_string() { + let status_code = metadata.unwrap()["otel.status_code"].as_str(); + push_serialized( + &mut attributes, + "StatusCode", + status_code, + ); + + if metadata.unwrap()["otel.status_message"].is_string() && status_code == Some("ERROR") { + push_serialized( + &mut attributes, + "Description", + metadata.unwrap()["otel.status_message"].as_str(), + ); + } + } attributes } From 52ed93bf443fb6dacefc5b5db81dc09f4772a019 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 10:35:40 -0700 Subject: [PATCH 07/29] WIP Signed-off-by: David Gardner --- crates/core/src/observability/otel.rs | 43 +++++++++++++-------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 3b30433c..f6a02bb8 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -28,8 +28,10 @@ use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_ use crate::error::FlowError; use chrono::{DateTime, Utc}; use opentelemetry::trace::{ - Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, + Span as _, SpanContext, SpanKind, Status, TraceContextExt, Tracer, TracerProvider as _, }; + +use crate::json::Json; use opentelemetry::{Context, KeyValue}; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::Resource; @@ -543,6 +545,23 @@ impl OtelEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + + if let Some(metadata) = event.metadata() { + if let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + let status = match status_code { + "OK" => Status::Ok, + "ERROR" => Status::error( + metadata.get("otel.status_message").and_then(Json::as_str).unwrap_or_default().to_string(), + ), + "UNSET" => Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + Status::Unset + } + }; + active_span.span.set_status(status); + } + } active_span.span.set_attributes(end_attributes(event)); active_span .span @@ -659,33 +678,13 @@ fn end_attributes(event: &Event) -> Vec { push_serialized(&mut attributes, "nemo_relay.end.data_json", event.data()); let metadata = event.metadata(); - push_serialized( - &mut attributes, - "nemo_relay.end.metadata_json", - metadata, - ); + push_serialized(&mut attributes, "nemo_relay.end.metadata_json", metadata); push_serialized( &mut attributes, "nemo_relay.end.output_json", event.output(), ); - if metadata.is_some() && metadata.unwrap()["otel.status_code"].is_string() { - let status_code = metadata.unwrap()["otel.status_code"].as_str(); - push_serialized( - &mut attributes, - "StatusCode", - status_code, - ); - - if metadata.unwrap()["otel.status_message"].is_string() && status_code == Some("ERROR") { - push_serialized( - &mut attributes, - "Description", - metadata.unwrap()["otel.status_message"].as_str(), - ); - } - } attributes } From 91d996810142ce652f1c4682559c15405fc0567d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 10:46:35 -0700 Subject: [PATCH 08/29] Update openinference to set span status Signed-off-by: David Gardner --- .../core/src/observability/openinference.rs | 22 ++++++++- crates/core/src/observability/otel.rs | 6 ++- .../unit/observability/openinference_tests.rs | 46 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index deb1b55c..42519791 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -34,7 +34,7 @@ use chrono::{DateTime, Utc}; use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind; use openinference_semantic_conventions::attributes as oi; use opentelemetry::trace::{ - Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, + Span as _, SpanContext, SpanKind, Status, TraceContextExt, Tracer, TracerProvider as _, }; use opentelemetry::{Context, KeyValue}; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}; @@ -549,6 +549,26 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + if let Some(metadata) = event.metadata() { + if let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + let status = match status_code { + "OK" => Status::Ok, + "ERROR" => Status::error( + metadata + .get("otel.status_message") + .and_then(Json::as_str) + .unwrap_or_default() + .to_string(), + ), + "UNSET" => Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + Status::Unset + } + }; + active_span.span.set_status(status); + } + } active_span.span.set_attributes(end_attributes(event)); active_span .span diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index f6a02bb8..cdd3f105 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -551,7 +551,11 @@ impl OtelEventProcessor { let status = match status_code { "OK" => Status::Ok, "ERROR" => Status::error( - metadata.get("otel.status_message").and_then(Json::as_str).unwrap_or_default().to_string(), + metadata + .get("otel.status_message") + .and_then(Json::as_str) + .unwrap_or_default() + .to_string(), ), "UNSET" => Status::Unset, other => { diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 603910dc..13bacbc3 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -20,6 +20,7 @@ use crate::codec::request::{ use crate::codec::response::{AnnotatedLlmResponse, FinishReason, ResponseToolCall, Usage}; use crate::json::Json; use crate::observability::atif::{AtifAgentInfo, AtifExporter, AtifStepExtra}; +use opentelemetry::trace::Status; use opentelemetry_sdk::trace::InMemorySpanExporterBuilder; use serde_json::json; use std::collections::HashMap; @@ -1357,6 +1358,51 @@ fn scope_end_output_payload_is_exported_to_openinference_attributes() { ); } +#[test] +fn scope_end_metadata_sets_openinference_span_status() { + let cases = [ + ( + json!({"otel.status_code": "ERROR", "otel.status_message": "failed"}), + Status::error("failed".to_string()), + ), + (json!({"otel.status_code": "OK"}), Status::Ok), + (json!({}), Status::Unset), + ]; + + for (metadata, expected_status) in cases { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let scope_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + scope_uuid, + None, + "agent", + ScopeType::Agent, + Some(json!({"task": "summarize"})), + )); + processor.process(&Event::Scope(ScopeEvent::new( + BaseEvent::builder() + .uuid(scope_uuid) + .name("agent") + .metadata(metadata) + .data(json!({"status": "done"})) + .build(), + ScopeCategory::End, + Vec::new(), + EventCategory::agent(), + None, + ))); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!(spans[0].status, expected_status); + } +} + #[test] fn pre_epoch_timestamps_round_trip_through_system_time() { let timestamp = DateTime::parse_from_rfc3339("1969-12-31T23:59:58.500000000Z") From 83917ca08742ccc0ebf0b6d7de7d11033d23dfc5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:21:40 -0700 Subject: [PATCH 09/29] Helper method for setting status metadata Signed-off-by: David Gardner --- crates/core/src/api/shared.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/crates/core/src/api/shared.rs b/crates/core/src/api/shared.rs index 7793808d..5a57de2a 100644 --- a/crates/core/src/api/shared.rs +++ b/crates/core/src/api/shared.rs @@ -13,6 +13,7 @@ use crate::api::scope::ScopeHandle; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::traits::LlmCodec; use crate::error::{FlowError, Result}; +use crate::json::{Json, merge_json}; use crate::shared_runtime::ensure_process_runtime_owner; pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option { @@ -37,6 +38,25 @@ pub(crate) fn ensure_runtime_owner() -> Result<()> { ensure_process_runtime_owner() } +pub(crate) fn metadata_with_otel_status( + metadata: Option, + status_code: &'static str, + status_message: Option, +) -> Option { + let mut status = serde_json::Map::new(); + status.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + status.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + merge_json(metadata, Some(Json::Object(status))) +} + pub(crate) fn run_request_intercepts_with_codec( name: &str, request: LlmRequest, From fbd9f44eac30b81198cb6c961d6950a554178cd3 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:24:32 -0700 Subject: [PATCH 10/29] Set otel status on llm and tool calls Signed-off-by: David Gardner --- crates/core/src/api/llm.rs | 15 +++++++++++---- crates/core/src/api/tool.rs | 16 +++++++++++++--- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 91c149e6..f9482024 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -19,8 +19,8 @@ use crate::api::runtime::{ use crate::api::scope::event; use crate::api::scope::{EmitMarkEventParams, ScopeHandle}; use crate::api::shared::{ - ensure_runtime_owner, resolve_parent_uuid, run_request_intercepts_with_codec, - snapshot_event_subscribers, + ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid, + run_request_intercepts_with_codec, snapshot_event_subscribers, }; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::response::AnnotatedLlmResponse; @@ -587,19 +587,22 @@ pub async fn llm_call_execute(params: LlmCallExecuteParams) -> Result { .as_ref() .and_then(|codec| codec.decode_response(&response).ok()) .map(Arc::new); + let end_metadata = metadata_with_otel_status(metadata, "OK", None); llm_call_end( LlmCallEndParams::builder() .handle(&handle) .response(response.clone()) .data_opt(data) - .metadata_opt(metadata) + .metadata_opt(end_metadata) .annotated_response_opt(annotated_response) .build(), )?; Ok(response) } Err(error) => { - let _ = emit_llm_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_llm_end_without_output(&handle, end_metadata); Err(error) } } @@ -831,3 +834,7 @@ pub fn llm_conditional_execution(request: &LlmRequest) -> Result<()> { } Ok(()) } + +#[cfg(test)] +#[path = "../../tests/unit/llm_api_tests.rs"] +mod tests; diff --git a/crates/core/src/api/tool.rs b/crates/core/src/api/tool.rs index fd38658b..c01e4be9 100644 --- a/crates/core/src/api/tool.rs +++ b/crates/core/src/api/tool.rs @@ -9,7 +9,10 @@ use crate::api::runtime::current_scope_stack; use crate::api::runtime::global_context; use crate::api::scope::event; use crate::api::scope::{EmitMarkEventParams, ScopeHandle}; -use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers}; +use crate::api::shared::{ + ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid, + snapshot_event_subscribers, +}; use crate::error::{FlowError, Result}; use crate::json::Json; use bitflags::bitflags; @@ -445,18 +448,21 @@ pub async fn tool_call_execute(params: ToolCallExecuteParams) -> Result { match execution(intercepted_args).await { Ok(result) => { + let end_metadata = metadata_with_otel_status(metadata, "OK", None); tool_call_end( ToolCallEndParams::builder() .handle(&handle) .result(result.clone()) .data_opt(data) - .metadata_opt(metadata) + .metadata_opt(end_metadata) .build(), )?; Ok(result) } Err(error) => { - let _ = emit_tool_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_tool_end_without_output(&handle, end_metadata); Err(error) } } @@ -542,3 +548,7 @@ pub fn tool_conditional_execution(name: &str, args: &Json) -> Result<()> { } Ok(()) } + +#[cfg(test)] +#[path = "../../tests/unit/tool_api_tests.rs"] +mod tests; From 4056818e6372dd492c318aa70af1224174395b83 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:27:21 -0700 Subject: [PATCH 11/29] WIP: tests and docstrings Signed-off-by: David Gardner --- crates/core/tests/unit/llm_api_tests.rs | 106 ++++++++++++++++++ crates/core/tests/unit/tool_api_tests.rs | 99 ++++++++++++++++ crates/python/src/py_api/mod.rs | 4 + .../tests/coverage/py_api_coverage_tests.rs | 4 +- 4 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 crates/core/tests/unit/llm_api_tests.rs create mode 100644 crates/core/tests/unit/tool_api_tests.rs diff --git a/crates/core/tests/unit/llm_api_tests.rs b/crates/core/tests/unit/llm_api_tests.rs new file mode 100644 index 00000000..6ee3c199 --- /dev/null +++ b/crates/core/tests/unit/llm_api_tests.rs @@ -0,0 +1,106 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for LLM API lifecycle behavior. + +use std::sync::{Arc, Mutex}; + +use serde_json::json; + +use super::{LlmCallExecuteParams, LlmRequest, llm_call_execute}; +use crate::api::event::ScopeCategory; +use crate::api::runtime::{NemoRelayContextState, global_context}; +use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use crate::error::FlowError; +use crate::json::Json; + +fn reset_global() { + crate::shared_runtime::reset_runtime_owner_for_tests(); + let context = global_context(); + *context.write().unwrap() = NemoRelayContextState::new(); +} + +fn request() -> LlmRequest { + LlmRequest { + headers: serde_json::Map::new(), + content: json!({"messages": [], "model": "demo"}), + } +} + +#[test] +fn llm_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let response = llm_call_execute( + LlmCallExecuteParams::builder() + .name("llm-ok") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { Ok(json!({"ok": true})) }) + })) + .metadata(json!({"caller": "llm-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + assert_eq!(response, json!({"ok": true})); + + let error = llm_call_execute( + LlmCallExecuteParams::builder() + .name("llm-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { Err(FlowError::Internal("llm boom".to_string())) }) + })) + .metadata(json!({"caller": "llm-error"})) + .build(), + ) + .await + .unwrap_err(); + assert!(error.to_string().contains("llm boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing end event metadata for {name}")) + }; + + let success_metadata = metadata_for("llm-ok"); + assert_eq!(success_metadata["caller"], json!("llm-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); + + let error_metadata = metadata_for("llm-error"); + assert_eq!(error_metadata["caller"], json!("llm-error")); + assert_eq!(error_metadata["otel.status_code"], json!("ERROR")); + assert!( + error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("llm boom") + ); +} diff --git a/crates/core/tests/unit/tool_api_tests.rs b/crates/core/tests/unit/tool_api_tests.rs new file mode 100644 index 00000000..aa3d6670 --- /dev/null +++ b/crates/core/tests/unit/tool_api_tests.rs @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for tool API lifecycle behavior. + +use std::sync::{Arc, Mutex}; + +use serde_json::json; + +use super::{ToolCallExecuteParams, tool_call_execute}; +use crate::api::event::ScopeCategory; +use crate::api::runtime::{NemoRelayContextState, global_context}; +use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use crate::error::FlowError; +use crate::json::Json; + +fn reset_global() { + crate::shared_runtime::reset_runtime_owner_for_tests(); + let context = global_context(); + *context.write().unwrap() = NemoRelayContextState::new(); +} + +#[test] +fn tool_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "tool-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let result = tool_call_execute( + ToolCallExecuteParams::builder() + .name("tool-ok") + .args(json!({"value": 1})) + .func(Arc::new(|_args| { + Box::pin(async { Ok(json!({"ok": true})) }) + })) + .metadata(json!({"caller": "tool-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + assert_eq!(result, json!({"ok": true})); + + let error = tool_call_execute( + ToolCallExecuteParams::builder() + .name("tool-error") + .args(json!({"value": 2})) + .func(Arc::new(|_args| { + Box::pin(async { Err(FlowError::Internal("tool boom".to_string())) }) + })) + .metadata(json!({"caller": "tool-error"})) + .build(), + ) + .await + .unwrap_err(); + assert!(error.to_string().contains("tool boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("tool-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing end event metadata for {name}")) + }; + + let success_metadata = metadata_for("tool-ok"); + assert_eq!(success_metadata["caller"], json!("tool-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); + + let error_metadata = metadata_for("tool-error"); + assert_eq!(error_metadata["caller"], json!("tool-error")); + assert_eq!(error_metadata["otel.status_code"], json!("ERROR")); + assert!( + error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("tool boom") + ); +} diff --git a/crates/python/src/py_api/mod.rs b/crates/python/src/py_api/mod.rs index e36caf7c..4f91be8f 100644 --- a/crates/python/src/py_api/mod.rs +++ b/crates/python/src/py_api/mod.rs @@ -469,6 +469,8 @@ fn tool_call_end( /// attributes: Optional ``ToolAttributes`` bitflags. /// data: Optional JSON-serializable application data. /// metadata: Optional JSON-serializable metadata. +/// End events receive ``otel.status_code = "OK"`` on success, or +/// ``otel.status_code = "ERROR"`` and ``otel.status_message`` on error. /// Returns: /// An awaitable that resolves to the tool result after execution /// intercepts. Sanitize guardrails do not rewrite the value returned to @@ -679,6 +681,8 @@ fn llm_call_end( /// attributes: Optional ``LlmAttributes`` bitflags. /// data: Optional JSON-serializable application data. /// metadata: Optional JSON-serializable metadata. +/// End events receive ``otel.status_code = "OK"`` on success, or +/// ``otel.status_code = "ERROR"`` and ``otel.status_message`` on error. /// model_name: Optional normalized model name recorded in emitted LLM events. /// codec: Optional request codec used for annotated-aware request intercepts. /// response_codec: Optional response codec used to attach annotated response data diff --git a/crates/python/tests/coverage/py_api_coverage_tests.rs b/crates/python/tests/coverage/py_api_coverage_tests.rs index c3d86e06..03542b8e 100644 --- a/crates/python/tests/coverage/py_api_coverage_tests.rs +++ b/crates/python/tests/coverage/py_api_coverage_tests.rs @@ -134,7 +134,7 @@ fn py_api_helpers_and_scope_lifecycle_round_trip() { ) .unwrap(); - pop_scope(&child, None, None).unwrap(); + pop_scope(&child, None, None, None).unwrap(); assert_eq!(get_handle().unwrap().inner.name, "root"); }); } @@ -780,7 +780,7 @@ async def run_stream(api, request, func, collector, finalizer, handle, attribute assert!(deregister_subscriber(&global_subscriber).unwrap()); assert!(!deregister_subscriber(&global_subscriber).unwrap()); - pop_scope(&child, None, None).unwrap(); + pop_scope(&child, None, None, None).unwrap(); }); } From 449b7389f1d69672c86de0fb5e11bef94f59f56b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:39:11 -0700 Subject: [PATCH 12/29] Update tests to expect otel status codes in metadata Signed-off-by: David Gardner --- .../integrations/langchain_tests/test_callbacks.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/tests/integrations/langchain_tests/test_callbacks.py b/python/tests/integrations/langchain_tests/test_callbacks.py index bc72fe29..2b617901 100644 --- a/python/tests/integrations/langchain_tests/test_callbacks.py +++ b/python/tests/integrations/langchain_tests/test_callbacks.py @@ -109,7 +109,11 @@ def test_on_chain_end_pops_scope(self, handler: NemoRelayCallbackHandler, mock_n run_id=run_id, ) - mock_nemo_relay.scope.pop.assert_called_once_with(handle, output={"output": "result"}) + mock_nemo_relay.scope.pop.assert_called_once_with( + handle, + output={"output": "result"}, + metadata={"otel.status_code": "OK"}, + ) assert run_id not in handler._scope_handles def test_on_chain_error_pops_scope(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock): @@ -126,7 +130,11 @@ def test_on_chain_error_pops_scope(self, handler: NemoRelayCallbackHandler, mock run_id=run_id, ) - mock_nemo_relay.scope.pop.assert_called_once_with(handle, output={"error": "RuntimeError('boom')"}) + mock_nemo_relay.scope.pop.assert_called_once_with( + handle, + output={"error": "RuntimeError('boom')"}, + metadata={"otel.status_code": "ERROR", "otel.status_message": "boom"}, + ) assert run_id not in handler._scope_handles def test_on_chain_end_prepares_command_outputs(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock): @@ -183,6 +191,7 @@ def test_on_chain_end_prepares_command_outputs(self, handler: NemoRelayCallbackH }, } }, + metadata={"otel.status_code": "OK"}, ) def test_parent_scope_passed_to_push(self, handler: NemoRelayCallbackHandler, mock_nemo_relay: MagicMock): From 018394fbbb5a2ead523ef0a9e9affc768f889588 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:40:49 -0700 Subject: [PATCH 13/29] Formatting [skip ci] Signed-off-by: David Gardner --- .../nemo_relay/integrations/langchain/callbacks.py | 12 +++++++++--- .../nemo_relay/integrations/langgraph/callbacks.py | 1 + python/nemo_relay/scope.py | 10 ++++++---- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/python/nemo_relay/integrations/langchain/callbacks.py b/python/nemo_relay/integrations/langchain/callbacks.py index 3340462c..b5aa2515 100644 --- a/python/nemo_relay/integrations/langchain/callbacks.py +++ b/python/nemo_relay/integrations/langchain/callbacks.py @@ -92,9 +92,15 @@ def on_chain_error( **kwargs: typing.Any, ) -> typing.Any: """Pop the NeMo Relay scope associated with a failed LangChain chain run.""" - self._pop_scope(run_id, output={"error": repr(error)}, metadata={"otel.status_code": "ERROR", "otel.status_message": str(error)}) - - def _pop_scope(self, run_id: UUID, *, output: dict[str, typing.Any] | None = None, metadata: nemo_relay.Json | None = None) -> None: + self._pop_scope( + run_id, + output={"error": repr(error)}, + metadata={"otel.status_code": "ERROR", "otel.status_message": str(error)}, + ) + + def _pop_scope( + self, run_id: UUID, *, output: dict[str, typing.Any] | None = None, metadata: nemo_relay.Json | None = None + ) -> None: handle = self._scope_handles.pop(run_id, None) if handle is None: return diff --git a/python/nemo_relay/integrations/langgraph/callbacks.py b/python/nemo_relay/integrations/langgraph/callbacks.py index 387ef835..24122f81 100644 --- a/python/nemo_relay/integrations/langgraph/callbacks.py +++ b/python/nemo_relay/integrations/langgraph/callbacks.py @@ -16,6 +16,7 @@ _logger = logging.getLogger(__name__) + def _interrupt_to_payload(interrupt: Any) -> dict[str, nemo_relay.Json]: return { "id": _prepare_lc_payloads(getattr(interrupt, "id", None)), diff --git a/python/nemo_relay/scope.py b/python/nemo_relay/scope.py index d14c790d..32e42e75 100644 --- a/python/nemo_relay/scope.py +++ b/python/nemo_relay/scope.py @@ -147,7 +147,9 @@ def push( ) -def pop(handle: ScopeHandle, *, output: Json | None = None, metadata: Json | None = None, timestamp: datetime | None = None) -> None: +def pop( + handle: ScopeHandle, *, output: Json | None = None, metadata: Json | None = None, timestamp: datetime | None = None +) -> None: """Pop a scope previously returned by ``push()`` or ``scope()``. Args: @@ -259,7 +261,7 @@ def scope( """ _ensure_scope_stack() pushed_handle = None - statusCode = 'UNSET' + statusCode = "UNSET" statusMessage = None try: pushed_handle = _native_push_scope( @@ -273,9 +275,9 @@ def scope( timestamp=timestamp, ) yield pushed_handle - statusCode = 'OK' + statusCode = "OK" except Exception as e: - statusCode = 'ERROR' + statusCode = "ERROR" statusMessage = str(e) raise e finally: From 703f13b6faac75913a4d48e3845d0aeb692b4474 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:46:48 -0700 Subject: [PATCH 14/29] Collapse if statement Signed-off-by: David Gardner --- .../core/src/observability/openinference.rs | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 42519791..2191ed8b 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -549,25 +549,24 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; - if let Some(metadata) = event.metadata() { - if let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { - let status = match status_code { - "OK" => Status::Ok, - "ERROR" => Status::error( - metadata - .get("otel.status_message") - .and_then(Json::as_str) - .unwrap_or_default() - .to_string(), - ), - "UNSET" => Status::Unset, - other => { - eprintln!("Unrecognized OTEL status code in event metadata: {other}"); - Status::Unset - } - }; - active_span.span.set_status(status); - } + if let Some(metadata) = event.metadata() && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + let status = match status_code { + "OK" => Status::Ok, + "ERROR" => Status::error( + metadata + .get("otel.status_message") + .and_then(Json::as_str) + .unwrap_or_default() + .to_string(), + ), + "UNSET" => Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + Status::Unset + } + }; + active_span.span.set_status(status); + } active_span.span.set_attributes(end_attributes(event)); active_span From c7d231556b72be8f47817fd233ca69a06ccb4261 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:47:51 -0700 Subject: [PATCH 15/29] Collapse if statement Signed-off-by: David Gardner --- .../core/src/observability/openinference.rs | 5 +-- crates/core/src/observability/otel.rs | 36 +++++++++---------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 2191ed8b..7fbc7dc2 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -549,7 +549,9 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; - if let Some(metadata) = event.metadata() && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + if let Some(metadata) = event.metadata() + && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) + { let status = match status_code { "OK" => Status::Ok, "ERROR" => Status::error( @@ -566,7 +568,6 @@ impl OpenInferenceEventProcessor { } }; active_span.span.set_status(status); - } active_span.span.set_attributes(end_attributes(event)); active_span diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index cdd3f105..5e96c546 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -546,25 +546,23 @@ impl OtelEventProcessor { return; }; - if let Some(metadata) = event.metadata() { - if let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { - let status = match status_code { - "OK" => Status::Ok, - "ERROR" => Status::error( - metadata - .get("otel.status_message") - .and_then(Json::as_str) - .unwrap_or_default() - .to_string(), - ), - "UNSET" => Status::Unset, - other => { - eprintln!("Unrecognized OTEL status code in event metadata: {other}"); - Status::Unset - } - }; - active_span.span.set_status(status); - } + if let Some(metadata) = event.metadata() && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + let status = match status_code { + "OK" => Status::Ok, + "ERROR" => Status::error( + metadata + .get("otel.status_message") + .and_then(Json::as_str) + .unwrap_or_default() + .to_string(), + ), + "UNSET" => Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + Status::Unset + } + }; + active_span.span.set_status(status); } active_span.span.set_attributes(end_attributes(event)); active_span From 089ae249230a81ed2e4d2a6836264dd80877cc27 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 12:51:33 -0700 Subject: [PATCH 16/29] Formatting [skip ci] Signed-off-by: David Gardner --- crates/core/src/observability/otel.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 5e96c546..7d3321b4 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -546,7 +546,9 @@ impl OtelEventProcessor { return; }; - if let Some(metadata) = event.metadata() && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) { + if let Some(metadata) = event.metadata() + && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) + { let status = match status_code { "OK" => Status::Ok, "ERROR" => Status::error( From 0121120185215fb04f39836cf13062eba7eb55ac Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 13:00:47 -0700 Subject: [PATCH 17/29] Move repeated status code to a method [skip ci] Signed-off-by: David Gardner --- crates/core/src/observability/mod.rs | 33 +++++++++++++++++++ .../core/src/observability/openinference.rs | 23 ++----------- crates/core/src/observability/otel.rs | 25 ++------------ 3 files changed, 37 insertions(+), 44 deletions(-) diff --git a/crates/core/src/observability/mod.rs b/crates/core/src/observability/mod.rs index 9e0f46dc..851d4b90 100644 --- a/crates/core/src/observability/mod.rs +++ b/crates/core/src/observability/mod.rs @@ -18,3 +18,36 @@ pub mod openinference; #[cfg(feature = "otel")] pub mod otel; pub mod plugin_component; + +#[cfg(any(feature = "otel", feature = "openinference"))] +pub(crate) fn set_span_status_from_event_metadata(span: &mut S, event: &crate::api::event::Event) +where + S: opentelemetry::trace::Span, +{ + let Some(metadata) = event.metadata() else { + return; + }; + let Some(status_code) = metadata + .get("otel.status_code") + .and_then(crate::json::Json::as_str) + else { + return; + }; + + let status = match status_code { + "OK" => opentelemetry::trace::Status::Ok, + "ERROR" => opentelemetry::trace::Status::error( + metadata + .get("otel.status_message") + .and_then(crate::json::Json::as_str) + .unwrap_or_default() + .to_string(), + ), + "UNSET" => opentelemetry::trace::Status::Unset, + other => { + eprintln!("Unrecognized OTEL status code in event metadata: {other}"); + opentelemetry::trace::Status::Unset + } + }; + span.set_status(status); +} diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 7fbc7dc2..b3223c15 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -34,7 +34,7 @@ use chrono::{DateTime, Utc}; use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind; use openinference_semantic_conventions::attributes as oi; use opentelemetry::trace::{ - Span as _, SpanContext, SpanKind, Status, TraceContextExt, Tracer, TracerProvider as _, + Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, }; use opentelemetry::{Context, KeyValue}; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}; @@ -549,26 +549,7 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; - if let Some(metadata) = event.metadata() - && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) - { - let status = match status_code { - "OK" => Status::Ok, - "ERROR" => Status::error( - metadata - .get("otel.status_message") - .and_then(Json::as_str) - .unwrap_or_default() - .to_string(), - ), - "UNSET" => Status::Unset, - other => { - eprintln!("Unrecognized OTEL status code in event metadata: {other}"); - Status::Unset - } - }; - active_span.span.set_status(status); - } + super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span .span diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 7d3321b4..b3b7f0b3 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -28,10 +28,8 @@ use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_ use crate::error::FlowError; use chrono::{DateTime, Utc}; use opentelemetry::trace::{ - Span as _, SpanContext, SpanKind, Status, TraceContextExt, Tracer, TracerProvider as _, + Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, }; - -use crate::json::Json; use opentelemetry::{Context, KeyValue}; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::Resource; @@ -546,26 +544,7 @@ impl OtelEventProcessor { return; }; - if let Some(metadata) = event.metadata() - && let Some(status_code) = metadata.get("otel.status_code").and_then(Json::as_str) - { - let status = match status_code { - "OK" => Status::Ok, - "ERROR" => Status::error( - metadata - .get("otel.status_message") - .and_then(Json::as_str) - .unwrap_or_default() - .to_string(), - ), - "UNSET" => Status::Unset, - other => { - eprintln!("Unrecognized OTEL status code in event metadata: {other}"); - Status::Unset - } - }; - active_span.span.set_status(status); - } + super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span .span From b0fac251d60852a1d1c9e2032c7ca7bbf0e39e4a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 13:32:11 -0700 Subject: [PATCH 18/29] Formatting [skip ci] Signed-off-by: David Gardner --- python/nemo_relay/scope.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/nemo_relay/scope.py b/python/nemo_relay/scope.py index 32e42e75..9c6ec388 100644 --- a/python/nemo_relay/scope.py +++ b/python/nemo_relay/scope.py @@ -261,8 +261,8 @@ def scope( """ _ensure_scope_stack() pushed_handle = None - statusCode = "UNSET" - statusMessage = None + status_code = "UNSET" + status_message = None try: pushed_handle = _native_push_scope( name, @@ -275,16 +275,16 @@ def scope( timestamp=timestamp, ) yield pushed_handle - statusCode = "OK" + status_code = "OK" except Exception as e: - statusCode = "ERROR" - statusMessage = str(e) - raise e + status_code = "ERROR" + status_message = str(e) + raise finally: if pushed_handle is not None: - metadata = {"otel.status_code": statusCode} - if statusMessage is not None: - metadata["otel.status_message"] = statusMessage + metadata = {"otel.status_code": status_code} + if status_message is not None: + metadata["otel.status_message"] = status_message _native_pop_scope(pushed_handle, metadata=metadata, timestamp=end_timestamp) From 4f09a5a2d417acd6b301bc33e60375c136ca4466 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 14:26:20 -0700 Subject: [PATCH 19/29] Add setting metadata to node js [skip ci] Signed-off-by: David Gardner --- crates/node/src/api/mod.rs | 37 ++++++++++++++-- crates/node/tests/llm_tests.mjs | 73 +++++++++++++++++++++++++++++++ crates/node/tests/scope_tests.mjs | 63 ++++++++++++++++++++++++++ crates/node/tests/tools_tests.mjs | 73 +++++++++++++++++++++++++++++++ 4 files changed, 243 insertions(+), 3 deletions(-) diff --git a/crates/node/src/api/mod.rs b/crates/node/src/api/mod.rs index b3930001..7297878f 100644 --- a/crates/node/src/api/mod.rs +++ b/crates/node/src/api/mod.rs @@ -92,6 +92,21 @@ fn parse_string_map( Ok(out) } +fn otel_status_metadata(status_code: &'static str, status_message: Option) -> Json { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + metadata.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + Json::Object(metadata) +} + fn build_otel_config( options: Option, ) -> napi::Result { @@ -1251,15 +1266,22 @@ pub fn push_scope( /// Optional `output` is a semantic JSON payload exported on the scope end event. /// Optional `timestamp` is a Unix timestamp in microseconds recorded on the end event. /// It must be a safe integer number; omit it to use the runtime default end timestamp. +/// Optional `metadata` is a JSON metadata payload recorded on the scope end event. /// Throws if the handle does not match the current top scope. #[napi] -pub fn pop_scope(handle: &ScopeHandle, output: Option, timestamp: Option) -> Result<()> { +pub fn pop_scope( + handle: &ScopeHandle, + output: Option, + timestamp: Option, + metadata: Option, +) -> Result<()> { let timestamp = parse_timestamp_micros(timestamp)?; core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&handle.inner.uuid) .output_opt(opt_json(output)) .timestamp_opt(timestamp) + .metadata_opt(opt_json(metadata)) .build(), ) .map_err(to_napi_err)?; @@ -1320,13 +1342,17 @@ pub fn with_scope( // Create a promise-aware wrapper so we handle both sync and async callbacks. let pa_fn = std::sync::Arc::new( crate::promise_call::PromiseAwareFn::new(&env, &callback).map_err(|e| { - // Pop scope before propagating error + let status_message = format!("failed to create PromiseAwareFn: {e}"); let _ = core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(status_message.clone()), + ))) .build(), ); - napi::Error::from_reason(format!("failed to create PromiseAwareFn: {e}")) + napi::Error::from_reason(status_message) })?, ); @@ -1346,10 +1372,15 @@ pub fn with_scope( }); let result = pa_fn.call_with_arg0(build_handle).await; + let metadata = match &result { + Ok(_) => otel_status_metadata("OK", None), + Err(error) => otel_status_metadata("ERROR", Some(error.to_string())), + }; // Always pop the scope, even on error. if core_scope_api::pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(metadata)) .build(), ) .is_ok() diff --git a/crates/node/tests/llm_tests.mjs b/crates/node/tests/llm_tests.mjs index 16a3c4ff..654b4aa7 100644 --- a/crates/node/tests/llm_tests.mjs +++ b/crates/node/tests/llm_tests.mjs @@ -32,6 +32,7 @@ const { deregisterLlmStreamExecutionIntercept, registerSubscriber, deregisterSubscriber, + flushSubscribers, ScopeType, } = lib; @@ -42,6 +43,13 @@ function rejectWith(value) { return Promise.reject(value); } +async function flushSubscriberCallbacks() { + flushSubscribers(); + for (let i = 0; i < 10; i += 1) { + await new Promise((resolve) => setImmediate(resolve)); + } +} + function makeNative() { return { headers: {}, @@ -173,6 +181,71 @@ describe('LLM execute', () => { assert.equal(result, null); }); + it('execute records OTEL status metadata on end events', async () => { + const events = []; + registerSubscriber('node_llm_status_metadata_sub', (e) => events.push(e)); + try { + const result = await llmCallExecute( + 'exec_status_ok_llm', + makeNative(), + () => ({ + response: 'ok', + }), + null, + null, + null, + { + caller: 'node-llm', + }, + null, + ); + assert.deepEqual(result, { + response: 'ok', + }); + + await assert.rejects( + () => + llmCallExecuteAsync( + 'exec_status_error_llm', + makeNative(), + async () => { + throw new Error('llm status failure'); + }, + null, + null, + null, + { + caller: 'node-llm-error', + }, + null, + ), + /llm status failure/, + ); + + await flushSubscriberCallbacks(); + const okEnd = events.find( + (e) => + e.name === 'exec_status_ok_llm' && e.kind === 'scope' && e.category === 'llm' && e.scope_category === 'end', + ); + const errorEnd = events.find( + (e) => + e.name === 'exec_status_error_llm' && + e.kind === 'scope' && + e.category === 'llm' && + e.scope_category === 'end', + ); + assert.ok(okEnd, 'expected successful llm end event'); + assert.equal(okEnd.metadata.caller, 'node-llm'); + assert.equal(okEnd.metadata['otel.status_code'], 'OK'); + assert.ok(errorEnd, 'expected failed llm end event'); + assert.equal(errorEnd.metadata.caller, 'node-llm-error'); + assert.equal(errorEnd.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEnd.metadata['otel.status_message'], /llm status failure/); + } finally { + deregisterSubscriber('node_llm_status_metadata_sub'); + } + }); + it('async execute awaits Promise-returning callbacks', async () => { const result = await llmCallExecuteAsync( 'exec_async_llm', diff --git a/crates/node/tests/scope_tests.mjs b/crates/node/tests/scope_tests.mjs index 743204a3..c14f2473 100644 --- a/crates/node/tests/scope_tests.mjs +++ b/crates/node/tests/scope_tests.mjs @@ -97,6 +97,24 @@ describe('Scope operations', () => { popScope(scope); } }); + + it('popScope merges end metadata over scope metadata', async () => { + const events = []; + registerSubscriber('node_scope_pop_metadata_sub', (e) => events.push(e)); + try { + const scope = pushScope('pop_metadata_scope', ScopeType.Agent, null, null, null, { a: 1, b: 2, c: 3 }); + popScope(scope, null, null, { c: 3.5, d: 4 }); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'pop_metadata_scope' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.deepEqual(end.metadata, { a: 1, b: 2, c: 3.5, d: 4 }); + } finally { + deregisterSubscriber('node_scope_pop_metadata_sub'); + } + }); }); // =========================================================================== @@ -183,6 +201,27 @@ describe('withScope', () => { }); }); + it('records OK status metadata on successful auto-pop', async () => { + const events = []; + registerSubscriber('node_with_scope_ok_status_sub', (e) => events.push(e)); + try { + await withScope('with_scope_ok_status', ScopeType.Function, () => ({ ok: true }), null, null, null, { + caller: 'node', + }); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'with_scope_ok_status' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.equal(end.metadata.caller, 'node'); + assert.equal(end.metadata['otel.status_code'], 'OK'); + assert.equal(Object.hasOwn(end.metadata, 'otel.status_message'), false); + } finally { + deregisterSubscriber('node_with_scope_ok_status_sub'); + } + }); + it('pops scope on synchronous throw', async () => { const before = getHandle(); await assert.rejects( @@ -210,6 +249,30 @@ describe('withScope', () => { assert.equal(after.uuid, before.uuid, 'scope should be popped after rejection'); }); + it('records ERROR status metadata on failed auto-pop', async () => { + const events = []; + registerSubscriber('node_with_scope_error_status_sub', (e) => events.push(e)); + try { + await assert.rejects( + () => + withScope('with_scope_error_status', ScopeType.Tool, async () => { + throw new Error('node status failure'); + }), + /node status failure/, + ); + await flushSubscriberCallbacks(); + + const end = events.find( + (e) => e.name === 'with_scope_error_status' && e.kind === 'scope' && e.scope_category === 'end', + ); + assert.ok(end, 'expected scope end event'); + assert.equal(end.metadata['otel.status_code'], 'ERROR'); + assert.match(end.metadata['otel.status_message'], /node status failure/); + } finally { + deregisterSubscriber('node_with_scope_error_status_sub'); + } + }); + it('surfaces primitive rejection values as unknown error and still pops the scope', async () => { const before = getHandle(); await assert.rejects( diff --git a/crates/node/tests/tools_tests.mjs b/crates/node/tests/tools_tests.mjs index 6566a2b8..2bb9354f 100644 --- a/crates/node/tests/tools_tests.mjs +++ b/crates/node/tests/tools_tests.mjs @@ -29,6 +29,7 @@ const { deregisterToolExecutionIntercept, registerSubscriber, deregisterSubscriber, + flushSubscribers, ScopeType, } = lib; @@ -38,6 +39,13 @@ function rejectWithPrimitive(value) { return Promise.reject(value); } +async function flushSubscriberCallbacks() { + flushSubscribers(); + for (let i = 0; i < 10; i += 1) { + await new Promise((resolve) => setImmediate(resolve)); + } +} + // =========================================================================== // Tool lifecycle // =========================================================================== @@ -237,6 +245,71 @@ describe('Tool execute', () => { }); }); + it('execute records OTEL status metadata on end events', async () => { + const events = []; + registerSubscriber('node_tool_status_metadata_sub', (e) => events.push(e)); + try { + const result = await toolCallExecute( + 'exec_status_ok_tool', + { + x: 1, + }, + (args) => ({ + result: args.x + 1, + }), + null, + null, + null, + { + caller: 'node-tool', + }, + ); + assert.deepEqual(result, { + result: 2, + }); + + await assert.rejects( + () => + toolCallExecuteAsync( + 'exec_status_error_tool', + {}, + async () => { + throw new Error('tool status failure'); + }, + null, + null, + null, + { + caller: 'node-tool-error', + }, + ), + /tool status failure/, + ); + + await flushSubscriberCallbacks(); + const okEnd = events.find( + (e) => + e.name === 'exec_status_ok_tool' && e.kind === 'scope' && e.category === 'tool' && e.scope_category === 'end', + ); + const errorEnd = events.find( + (e) => + e.name === 'exec_status_error_tool' && + e.kind === 'scope' && + e.category === 'tool' && + e.scope_category === 'end', + ); + assert.ok(okEnd, 'expected successful tool end event'); + assert.equal(okEnd.metadata.caller, 'node-tool'); + assert.equal(okEnd.metadata['otel.status_code'], 'OK'); + assert.ok(errorEnd, 'expected failed tool end event'); + assert.equal(errorEnd.metadata.caller, 'node-tool-error'); + assert.equal(errorEnd.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEnd.metadata['otel.status_message'], /tool status failure/); + } finally { + deregisterSubscriber('node_tool_status_metadata_sub'); + } + }); + it('async execute awaits Promise-returning callbacks', async () => { const result = await toolCallExecuteAsync( 'exec_async_tool', From 5288755f0ff9ac4016dda812230920e47bb0f4e2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 4 Jun 2026 15:03:36 -0700 Subject: [PATCH 20/29] WIP: Add FFI & Go, TODO: determine if breaking API compat in nemo_relay.h is allowed/preferred, if it is we should drop nemo_relay_pop_scope_with_metadata and just add the new arg to nemo_relay_pop_scope [skip ci] Signed-off-by: David Gardner --- crates/ffi/nemo_relay.h | 31 +++++++++ crates/ffi/src/api/scope.rs | 48 ++++++++++++++ crates/ffi/tests/unit/api/core_tests.rs | 70 +++++++++++++++++++++ go/nemo_relay/coverage_gap_test.go | 11 ++++ go/nemo_relay/llm_test.go | 59 ++++++++++++++++++ go/nemo_relay/nemo_relay.go | 26 ++++++-- go/nemo_relay/scope/scope.go | 38 ++++++++--- go/nemo_relay/scope/scope_test.go | 83 +++++++++++++++++++++++++ go/nemo_relay/scope_test.go | 35 +++++++++++ go/nemo_relay/tools_test.go | 59 ++++++++++++++++++ 10 files changed, 448 insertions(+), 12 deletions(-) diff --git a/crates/ffi/nemo_relay.h b/crates/ffi/nemo_relay.h index a5d5fa3d..c0106bdd 100644 --- a/crates/ffi/nemo_relay.h +++ b/crates/ffi/nemo_relay.h @@ -1421,6 +1421,37 @@ NemoRelayStatus nemo_relay_pop_scope(const struct FfiScopeHandle *handle, const char *output_json, const int64_t *timestamp_unix_micros); +/** + * Pop a scope from the scope stack by its handle with end metadata. + * + * This emits a scope End event and removes scope-local registrations owned by + * the popped scope. Incoming metadata is merged over metadata stored on the + * scope handle. + * + * # Parameters + * - `handle`: The current top-of-stack scope handle to pop. + * - `output_json`: Optional null-terminated JSON string exported as semantic + * scope output on the end event, or null. + * - `metadata_json`: Optional null-terminated JSON metadata string recorded + * on the end event, or null. + * - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end + * event, or null to use the runtime default end timestamp. + * + * # Errors + * Returns `InvalidJson` for invalid output or metadata JSON, `InvalidArg` when + * `timestamp_unix_micros` is outside the supported timestamp range, or an + * error status when `handle` is not the current top scope. + * + * # Safety + * `handle` must be a valid, non-null `FfiScopeHandle` pointer. Optional + * pointer arguments may be null; when non-null, they must be valid for reads + * for the duration of the call. + */ +NemoRelayStatus nemo_relay_pop_scope_with_metadata(const struct FfiScopeHandle *handle, + const char *output_json, + const char *metadata_json, + const int64_t *timestamp_unix_micros); + /** * Emit a named lifecycle event. * diff --git a/crates/ffi/src/api/scope.rs b/crates/ffi/src/api/scope.rs index 7b3cfdec..1f9bb647 100644 --- a/crates/ffi/src/api/scope.rs +++ b/crates/ffi/src/api/scope.rs @@ -156,6 +156,49 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, timestamp_unix_micros: *const i64, +) -> NemoRelayStatus { + unsafe { pop_scope_impl(handle, output_json, std::ptr::null(), timestamp_unix_micros) } +} + +/// Pop a scope from the scope stack by its handle with end metadata. +/// +/// This emits a scope End event and removes scope-local registrations owned by +/// the popped scope. Incoming metadata is merged over metadata stored on the +/// scope handle. +/// +/// # Parameters +/// - `handle`: The current top-of-stack scope handle to pop. +/// - `output_json`: Optional null-terminated JSON string exported as semantic +/// scope output on the end event, or null. +/// - `metadata_json`: Optional null-terminated JSON metadata string recorded +/// on the end event, or null. +/// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end +/// event, or null to use the runtime default end timestamp. +/// +/// # Errors +/// Returns `InvalidJson` for invalid output or metadata JSON, `InvalidArg` when +/// `timestamp_unix_micros` is outside the supported timestamp range, or an +/// error status when `handle` is not the current top scope. +/// +/// # Safety +/// `handle` must be a valid, non-null `FfiScopeHandle` pointer. Optional +/// pointer arguments may be null; when non-null, they must be valid for reads +/// for the duration of the call. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_pop_scope_with_metadata( + handle: *const FfiScopeHandle, + output_json: *const c_char, + metadata_json: *const c_char, + timestamp_unix_micros: *const i64, +) -> NemoRelayStatus { + unsafe { pop_scope_impl(handle, output_json, metadata_json, timestamp_unix_micros) } +} + +unsafe fn pop_scope_impl( + handle: *const FfiScopeHandle, + output_json: *const c_char, + metadata_json: *const c_char, + timestamp_unix_micros: *const i64, ) -> NemoRelayStatus { clear_last_error(); if handle.is_null() { @@ -166,6 +209,10 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( Some(v) => v, None => return NemoRelayStatus::InvalidJson, }; + let metadata = match c_str_to_opt_json(metadata_json) { + Some(v) => v, + None => return NemoRelayStatus::InvalidJson, + }; let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) { Some(v) => v, None => return NemoRelayStatus::InvalidArg, @@ -174,6 +221,7 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( core_scope_api::PopScopeParams::builder() .handle_uuid(&unsafe { &*handle }.0.uuid) .output_opt(output) + .metadata_opt(metadata) .timestamp_opt(timestamp) .build(), ) { diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index d7506060..bf44bf46 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -557,6 +557,76 @@ fn test_ffi_error_paths_and_scope_stack() { } } +#[test] +fn test_ffi_pop_scope_with_metadata_merges_scope_metadata() { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); + reset_globals(); + + unsafe { + let stack = fresh_scope_stack(); + let subscriber_name = unique_name("ffi_scope_end_metadata_subscriber"); + let subscriber_name_c = cstring(&subscriber_name); + assert_eq!( + nemo_relay_register_subscriber( + subscriber_name_c.as_ptr(), + subscriber_cb, + ptr::null_mut(), + None, + ), + NemoRelayStatus::Ok + ); + + let scope_name = cstring("ffi_scope_end_metadata"); + let scope_metadata = cstring(r#"{"a":1,"b":2,"c":3}"#); + let end_metadata = cstring(r#"{"c":3.5,"d":4}"#); + let mut scope = ptr::null_mut(); + assert_eq!( + nemo_relay_push_scope( + scope_name.as_ptr(), + NemoRelayScopeType::Function, + ptr::null(), + 0, + ptr::null(), + scope_metadata.as_ptr(), + ptr::null(), + &mut scope, + ), + NemoRelayStatus::Ok + ); + assert_eq!( + api::nemo_relay_pop_scope_with_metadata( + scope, + ptr::null(), + end_metadata.as_ptr(), + ptr::null(), + ), + NemoRelayStatus::Ok + ); + assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); + + let events = lock_unpoisoned(event_log()).clone(); + let end_event = events + .iter() + .find(|event| { + event["json"]["kind"] == json!("scope") + && event["json"]["name"] == json!("ffi_scope_end_metadata") + && event["json"]["scope_category"] == json!("end") + }) + .unwrap(); + assert_eq!( + end_event["metadata"], + json!({"a": 1, "b": 2, "c": 3.5, "d": 4}) + ); + + assert_eq!( + nemo_relay_deregister_subscriber(subscriber_name_c.as_ptr()), + NemoRelayStatus::Ok + ); + nemo_relay_scope_handle_free(scope); + nemo_relay_scope_stack_free(stack); + } +} + #[test] fn test_ffi_event_json_null_pointer_returns_null() { unsafe { diff --git a/go/nemo_relay/coverage_gap_test.go b/go/nemo_relay/coverage_gap_test.go index ca515444..b3b09c37 100644 --- a/go/nemo_relay/coverage_gap_test.go +++ b/go/nemo_relay/coverage_gap_test.go @@ -74,6 +74,17 @@ func TestPublicAPIErrorAndDefaultCoverage(t *testing.T) { } } + handle, err := PushScope("invalid_scope_end_metadata", ScopeTypeAgent) + if err != nil { + t.Fatalf("PushScope failed: %v", err) + } + if err := PopScope(handle, WithScopeEndMetadata(json.RawMessage("{"))); err == nil { + t.Fatal("expected PopScope to fail on invalid end metadata JSON") + } + if err := PopScope(handle); err != nil { + t.Fatalf("cleanup PopScope failed: %v", err) + } + if _, err := ToolCall("invalid_tool_json", json.RawMessage("{")); err == nil { t.Fatal("expected ToolCall to fail on invalid JSON args") } diff --git a/go/nemo_relay/llm_test.go b/go/nemo_relay/llm_test.go index ce1ca3f4..bb9c5d30 100644 --- a/go/nemo_relay/llm_test.go +++ b/go/nemo_relay/llm_test.go @@ -153,6 +153,65 @@ func TestLlmCallExecuteBasic(t *testing.T) { } } +func TestLlmCallExecuteAddsOTELStatusMetadataToEndEvents(t *testing.T) { + metadataByName := map[string]json.RawMessage{} + var mu sync.Mutex + + _ = DeregisterSubscriber("go_llm_status_metadata_sub") + if err := RegisterSubscriber("go_llm_status_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.Category() == "llm" && event.ScopeCategory() == "end" { + mu.Lock() + metadataByName[event.Name()] = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(llmRegisterFailed, err) + } + defer DeregisterSubscriber("go_llm_status_metadata_sub") + + _, err := LlmCallExecute("go_llm_status_ok", makeRequest(), + func(nativeJSON json.RawMessage) (json.RawMessage, error) { + return json.RawMessage(`{"ok":true}`), nil + }, + WithLLMMetadata(json.RawMessage(`{"caller":"go-llm","otel.status_code":"USER"}`)), + ) + if err != nil { + t.Fatalf(llmCallExecuteFailed, err) + } + + _, err = LlmCallExecute("go_llm_status_error", makeRequest(), + func(nativeJSON json.RawMessage) (json.RawMessage, error) { + return nil, errors.New("go llm status failure") + }, + WithLLMMetadata(json.RawMessage(`{"caller":"go-llm-error"}`)), + ) + if err == nil { + t.Fatal("expected LLM execution error") + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(llmFlushSubscribersFailed, err) + } + + mu.Lock() + okMetadata := metadataByName["go_llm_status_ok"] + errorMetadata := metadataByName["go_llm_status_error"] + mu.Unlock() + + assertJSONFieldString(t, okMetadata, "caller", "go-llm") + assertJSONFieldString(t, okMetadata, "otel.status_code", "OK") + assertJSONFieldString(t, errorMetadata, "caller", "go-llm-error") + assertJSONFieldString(t, errorMetadata, "otel.status_code", "ERROR") + + var decoded map[string]interface{} + if err := json.Unmarshal(errorMetadata, &decoded); err != nil { + t.Fatalf("unmarshal error metadata failed: %v; raw=%s", err, errorMetadata) + } + statusMessage, _ := decoded["otel.status_message"].(string) + if !strings.Contains(statusMessage, "go llm status failure") { + t.Fatalf("expected status message to mention callback error, got %v", decoded["otel.status_message"]) + } +} + func TestCodecHandleConstructors(t *testing.T) { if NewOpenAIChatCodec() == nil { t.Fatal("expected OpenAI chat codec handle") diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index d8677b4d..fd1012c0 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -45,6 +45,7 @@ typedef void (*NemoRelayFreeFn)(void* user_data); extern int32_t nemo_relay_get_handle(FfiScopeHandle** out); extern int32_t nemo_relay_push_scope(const char* name, int32_t scope_type, const FfiScopeHandle* parent, uint32_t attributes, const char* data_json, const char* metadata_json, const char* input_json, const int64_t* timestamp_unix_micros, FfiScopeHandle** out); extern int32_t nemo_relay_pop_scope(const FfiScopeHandle* handle, const char* output_json, const int64_t* timestamp_unix_micros); +extern int32_t nemo_relay_pop_scope_with_metadata(const FfiScopeHandle* handle, const char* output_json, const char* metadata_json, const int64_t* timestamp_unix_micros); extern int32_t nemo_relay_event(const char* name, const FfiScopeHandle* parent, const char* data_json, const char* metadata_json, const int64_t* timestamp_unix_micros); // Tool lifecycle @@ -446,12 +447,13 @@ func WithScopeTimestamp(timestamp time.Time) ScopeOption { type scopeEndOptions struct { output *C.char + metadata *C.char timestamp *C.int64_t } // ScopeEndOption is a functional option that configures optional parameters for -// [PopScope]. Available options include [WithOutput] and -// [WithScopeEndTimestamp]. +// [PopScope]. Available options include [WithOutput], +// [WithScopeEndMetadata], and [WithScopeEndTimestamp]. type ScopeEndOption func(*scopeEndOptions) // WithOutput attaches an arbitrary JSON semantic output payload to the scope end event. @@ -461,6 +463,16 @@ func WithOutput(output json.RawMessage) ScopeEndOption { } } +// WithScopeEndMetadata attaches an arbitrary JSON metadata payload to the +// scope End event. When the scope handle already has metadata, object keys in +// this payload overwrite matching existing keys and preserve non-conflicting +// keys. +func WithScopeEndMetadata(metadata json.RawMessage) ScopeEndOption { + return func(o *scopeEndOptions) { + o.metadata = C.CString(string(metadata)) + } +} + // WithScopeEndTimestamp records an explicit time.Time on the scope End event. // The value is converted to UTC Unix microseconds at the FFI boundary; // sub-microsecond precision is truncated. Omit this option to use the runtime @@ -526,8 +538,9 @@ func PushScope(name string, scopeType ScopeType, opts ...ScopeOption) (*ScopeHan // PopScope removes the given scope from the scope stack and emits an End event // to all registered subscribers. The handle must have been returned by a // previous call to [PushScope]. Popping scopes out of stack order returns an -// error. Optional end payloads can be attached via [WithOutput], and an -// explicit event timestamp can be supplied with [WithScopeEndTimestamp]. +// error. Optional end payloads can be attached via [WithOutput] and +// [WithScopeEndMetadata], and an explicit event timestamp can be supplied with +// [WithScopeEndTimestamp]. func PopScope(handle *ScopeHandle, opts ...ScopeEndOption) error { o := &scopeEndOptions{} for _, opt := range opts { @@ -536,10 +549,13 @@ func PopScope(handle *ScopeHandle, opts ...ScopeEndOption) error { if o.output != nil { defer C.free(unsafe.Pointer(o.output)) } + if o.metadata != nil { + defer C.free(unsafe.Pointer(o.metadata)) + } if o.timestamp != nil { defer C.free(unsafe.Pointer(o.timestamp)) } - return checkStatus(C.nemo_relay_pop_scope(handle.ptr, o.output, o.timestamp)) + return checkStatus(C.nemo_relay_pop_scope_with_metadata(handle.ptr, o.output, o.metadata, o.timestamp)) } // --------------------------------------------------------------------------- diff --git a/go/nemo_relay/scope/scope.go b/go/nemo_relay/scope/scope.go index 3b5074c1..628784da 100644 --- a/go/nemo_relay/scope/scope.go +++ b/go/nemo_relay/scope/scope.go @@ -22,9 +22,36 @@ package scope import ( + "encoding/json" + "fmt" + "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" ) +func statusMetadata(statusCode, statusMessage string) nemo_relay.ScopeEndOption { + metadata := map[string]string{"otel.status_code": statusCode} + if statusMessage != "" { + metadata["otel.status_message"] = statusMessage + } + raw, _ := json.Marshal(metadata) + return nemo_relay.WithScopeEndMetadata(raw) +} + +func cleanupScope(handle *nemo_relay.ScopeHandle) func() { + cleaned := false + return func() { + if cleaned { + return + } + cleaned = true + if recovered := recover(); recovered != nil { + _ = Pop(handle, statusMetadata("ERROR", fmt.Sprint(recovered))) + panic(recovered) + } + _ = Pop(handle, statusMetadata("OK", "")) + } +} + // GetHandle returns the handle for the scope currently at the top of the scope // stack. Returns an error if the scope stack is empty. This is a shorthand for // [nemo_relay.GetHandle]. @@ -42,7 +69,8 @@ func Push(name string, scopeType nemo_relay.ScopeType, opts ...nemo_relay.ScopeO // Pop removes the given scope from the scope stack and emits an End event to // all registered subscribers. Optional arguments, including -// [nemo_relay.WithScopeEndTimestamp], are forwarded to [nemo_relay.PopScope]. +// [nemo_relay.WithScopeEndMetadata] and [nemo_relay.WithScopeEndTimestamp], +// are forwarded to [nemo_relay.PopScope]. func Pop(handle *nemo_relay.ScopeHandle, opts ...nemo_relay.ScopeEndOption) error { return nemo_relay.PopScope(handle, opts...) } @@ -71,9 +99,7 @@ func WithScope(name string, scopeType nemo_relay.ScopeType, opts ...nemo_relay.S // Push failed, so cleanup is intentionally a no-op. } } - return func() { - Pop(handle) - } + return cleanupScope(handle) } // WithScopeHandle pushes a new scope and returns both the scope handle and a @@ -93,7 +119,5 @@ func WithScopeHandle(name string, scopeType nemo_relay.ScopeType, opts ...nemo_r // Push failed, so cleanup is intentionally a no-op. } } - return handle, func() { - Pop(handle) - } + return handle, cleanupScope(handle) } diff --git a/go/nemo_relay/scope/scope_test.go b/go/nemo_relay/scope/scope_test.go index b7074af0..891b927d 100644 --- a/go/nemo_relay/scope/scope_test.go +++ b/go/nemo_relay/scope/scope_test.go @@ -4,6 +4,9 @@ package scope_test import ( + "encoding/json" + "strings" + "sync" "testing" "github.com/NVIDIA/NeMo-Relay/go/nemo_relay" @@ -16,6 +19,54 @@ const ( expectedNonNilHandle = "expected non-nil handle" ) +func metadataStringField(t *testing.T, raw json.RawMessage, field string) string { + t.Helper() + + var decoded map[string]interface{} + if err := json.Unmarshal(raw, &decoded); err != nil { + t.Fatalf("unmarshal metadata failed: %v; raw=%s", err, raw) + } + value, ok := decoded[field] + if !ok { + t.Fatalf("expected metadata field %q, got %v", field, decoded) + } + text, ok := value.(string) + if !ok { + t.Fatalf("expected metadata field %q to be string, got %T", field, value) + } + return text +} + +func captureScopeEndMetadata(t *testing.T, subscriberName, scopeName string, fn func()) json.RawMessage { + t.Helper() + + var captured json.RawMessage + var mu sync.Mutex + _ = nemo_relay.DeregisterSubscriber(subscriberName) + if err := nemo_relay.RegisterSubscriber(subscriberName, func(event nemo_relay.Event) { + if event.Kind() == "scope" && event.ScopeCategory() == "end" && event.Name() == scopeName { + mu.Lock() + captured = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf("RegisterSubscriber failed: %v", err) + } + defer nemo_relay.DeregisterSubscriber(subscriberName) + + fn() + + if err := nemo_relay.FlushSubscribers(); err != nil { + t.Fatalf("FlushSubscribers failed: %v", err) + } + mu.Lock() + defer mu.Unlock() + if captured == nil { + t.Fatalf("expected end metadata for scope %q", scopeName) + } + return captured +} + // ============================================================================ // WithScope // ============================================================================ @@ -80,6 +131,17 @@ func TestWithScopeDeferCleanup(t *testing.T) { } } +func TestWithScopeRecordsOKStatusMetadata(t *testing.T) { + metadata := captureScopeEndMetadata(t, "go_scope_with_status_ok_sub", "with_scope_ok_status", func() { + cleanup := scope.WithScope("with_scope_ok_status", nemo_relay.ScopeTypeFunction) + cleanup() + }) + + if got := metadataStringField(t, metadata, "otel.status_code"); got != "OK" { + t.Fatalf("expected otel.status_code=OK, got %q", got) + } +} + func TestWithScopeCleanupOnPanic(t *testing.T) { before, err := nemo_relay.GetHandle() if err != nil { @@ -113,6 +175,27 @@ func TestWithScopeCleanupOnPanic(t *testing.T) { } } +func TestWithScopeRecordsErrorStatusMetadataOnPanic(t *testing.T) { + metadata := captureScopeEndMetadata(t, "go_scope_with_status_error_sub", "with_scope_error_status", func() { + func() { + defer func() { + if recover() == nil { + t.Fatal("expected panic") + } + }() + defer scope.WithScope("with_scope_error_status", nemo_relay.ScopeTypeTool)() + panic("scope status failure") + }() + }) + + if got := metadataStringField(t, metadata, "otel.status_code"); got != "ERROR" { + t.Fatalf("expected otel.status_code=ERROR, got %q", got) + } + if got := metadataStringField(t, metadata, "otel.status_message"); !strings.Contains(got, "scope status failure") { + t.Fatalf("expected status message to mention panic, got %q", got) + } +} + // ============================================================================ // WithScopeHandle // ============================================================================ diff --git a/go/nemo_relay/scope_test.go b/go/nemo_relay/scope_test.go index de78a5bf..9283f383 100644 --- a/go/nemo_relay/scope_test.go +++ b/go/nemo_relay/scope_test.go @@ -662,6 +662,41 @@ func TestScopeWithDataAndMetadata(t *testing.T) { } } +func TestPopScopeWithEndMetadataMergesWithScopeMetadata(t *testing.T) { + var capturedMeta json.RawMessage + var mu sync.Mutex + + _ = DeregisterSubscriber("go_scope_end_metadata_sub") + if err := RegisterSubscriber("go_scope_end_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.ScopeCategory() == "end" && event.Name() == "end_metadata_scope" { + mu.Lock() + capturedMeta = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(registerSubscriberFailed, err) + } + defer DeregisterSubscriber("go_scope_end_metadata_sub") + + handle, err := PushScope("end_metadata_scope", ScopeTypeAgent, + WithMetadata(json.RawMessage(`{"a":1,"b":2,"c":3}`)), + ) + if err != nil { + t.Fatalf(pushScopeFailed, err) + } + if err := PopScope(handle, WithScopeEndMetadata(json.RawMessage(`{"c":3.5,"d":4}`))); err != nil { + t.Fatalf("PopScope failed: %v", err) + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(flushSubscribersFailed, err) + } + + assertJSONFieldNumber(t, capturedMeta, "a", 1) + assertJSONFieldNumber(t, capturedMeta, "b", 2) + assertJSONFieldNumber(t, capturedMeta, "c", 3.5) + assertJSONFieldNumber(t, capturedMeta, "d", 4) +} + func TestScopeEventWithDataAndMetadata(t *testing.T) { var capturedData, capturedMeta json.RawMessage var mu sync.Mutex diff --git a/go/nemo_relay/tools_test.go b/go/nemo_relay/tools_test.go index d242f71f..cd9bac7f 100644 --- a/go/nemo_relay/tools_test.go +++ b/go/nemo_relay/tools_test.go @@ -159,6 +159,65 @@ func TestToolCallExecuteWithAttributes(t *testing.T) { } } +func TestToolCallExecuteAddsOTELStatusMetadataToEndEvents(t *testing.T) { + metadataByName := map[string]json.RawMessage{} + var mu sync.Mutex + + _ = DeregisterSubscriber("go_tool_status_metadata_sub") + if err := RegisterSubscriber("go_tool_status_metadata_sub", func(event Event) { + if event.Kind() == "scope" && event.Category() == "tool" && event.ScopeCategory() == "end" { + mu.Lock() + metadataByName[event.Name()] = append(json.RawMessage(nil), event.Metadata()...) + mu.Unlock() + } + }); err != nil { + t.Fatalf(registerFailed, err) + } + defer DeregisterSubscriber("go_tool_status_metadata_sub") + + _, err := ToolCallExecute("go_tool_status_ok", json.RawMessage(`{"x":1}`), + func(args json.RawMessage) (json.RawMessage, error) { + return json.RawMessage(`{"ok":true}`), nil + }, + WithToolMetadata(json.RawMessage(`{"caller":"go-tool","otel.status_code":"USER"}`)), + ) + if err != nil { + t.Fatalf(toolCallExecuteFailed, err) + } + + _, err = ToolCallExecute("go_tool_status_error", json.RawMessage(`{"x":2}`), + func(args json.RawMessage) (json.RawMessage, error) { + return nil, errors.New("go tool status failure") + }, + WithToolMetadata(json.RawMessage(`{"caller":"go-tool-error"}`)), + ) + if err == nil { + t.Fatal("expected tool execution error") + } + if err := FlushSubscribers(); err != nil { + t.Fatalf(toolFlushSubscribersFailed, err) + } + + mu.Lock() + okMetadata := metadataByName["go_tool_status_ok"] + errorMetadata := metadataByName["go_tool_status_error"] + mu.Unlock() + + assertJSONFieldString(t, okMetadata, "caller", "go-tool") + assertJSONFieldString(t, okMetadata, "otel.status_code", "OK") + assertJSONFieldString(t, errorMetadata, "caller", "go-tool-error") + assertJSONFieldString(t, errorMetadata, "otel.status_code", "ERROR") + + var decoded map[string]interface{} + if err := json.Unmarshal(errorMetadata, &decoded); err != nil { + t.Fatalf("unmarshal error metadata failed: %v; raw=%s", err, errorMetadata) + } + statusMessage, _ := decoded["otel.status_message"].(string) + if !strings.Contains(statusMessage, "go tool status failure") { + t.Fatalf("expected status message to mention callback error, got %v", decoded["otel.status_message"]) + } +} + // ============================================================================ // Tool guardrails // ============================================================================ From 2211f18c36367daca52cda84d26e89e0afc4cb6b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 08:29:51 -0700 Subject: [PATCH 21/29] Expose updating metadata when popping a scope, and setting span status code for otel [skip ci] Signed-off-by: David Gardner --- crates/wasm/src/api/mod.rs | 45 +++++++ crates/wasm/tests-js/llm_tests.mjs | 67 +++++++++++ crates/wasm/tests-js/scope_tests.mjs | 111 +++++++++++++++++- crates/wasm/tests-js/tools_tests.mjs | 67 +++++++++++ .../wasm/tests/integration/context_tests.rs | 2 +- crates/wasm/tests/integration/llm_tests.rs | 2 +- .../tests/integration/scope_local_tests.rs | 2 +- crates/wasm/tests/integration/scope_tests.rs | 47 +++++++- crates/wasm/tests/integration/tools_tests.rs | 2 +- crates/wasm/tests/unit/api_tests.rs | 2 +- 10 files changed, 339 insertions(+), 8 deletions(-) diff --git a/crates/wasm/src/api/mod.rs b/crates/wasm/src/api/mod.rs index 7b2e04ac..43903216 100644 --- a/crates/wasm/src/api/mod.rs +++ b/crates/wasm/src/api/mod.rs @@ -30,6 +30,7 @@ use std::sync::{Arc, Mutex}; use js_sys::{Function, Reflect}; use serde::{Deserialize, Serialize}; +use serde_json::Value as Json; use uuid::Uuid; use wasm_bindgen::JsCast; use wasm_bindgen::closure::Closure; @@ -70,6 +71,37 @@ use crate::stream::LlmStream; pub use crate::types::{LLM_STATEFUL, LLM_STREAMING, SCOPE_PARALLEL, TOOL_REMOTE}; use crate::types::{LlmHandle, ScopeHandle, ScopeStack, ScopeType, ToolHandle}; +fn otel_status_metadata(status_code: &'static str, status_message: Option) -> Json { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "otel.status_code".to_string(), + Json::String(status_code.to_string()), + ); + if let Some(status_message) = status_message { + metadata.insert( + "otel.status_message".to_string(), + Json::String(status_message), + ); + } + Json::Object(metadata) +} + +fn js_error_message(error: &JsValue) -> String { + if let Some(message) = error.as_string() { + return message; + } + if let Ok(message) = Reflect::get(error, &JsValue::from_str("message")) + && let Some(message) = message.as_string() + { + return message; + } + js_sys::JSON::stringify(error) + .ok() + .and_then(|value| value.as_string()) + .filter(|value| value != "{}") + .unwrap_or_else(|| "JavaScript callback failed".to_string()) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct WasmOpenTelemetryConfig { @@ -394,6 +426,7 @@ pub fn push_scope( /// Optional `output` is a semantic JSON payload exported on the scope end event. /// Optional `timestamp` is a Unix microseconds timestamp recorded on the scope end event. /// It must be a safe integer number; omitted values use the runtime default end timestamp. +/// Optional `metadata` is a JSON metadata payload recorded on the scope end event. /// /// Throws if the handle does not match the current top of the stack. #[wasm_bindgen(js_name = "popScope")] @@ -401,6 +434,7 @@ pub fn pop_scope( handle: &ScopeHandle, #[wasm_bindgen(unchecked_param_type = "Json | null | undefined")] output: JsValue, #[wasm_bindgen(unchecked_param_type = "number | null | undefined")] timestamp: Option, + #[wasm_bindgen(unchecked_param_type = "Json | null | undefined")] metadata: JsValue, ) -> Result<(), JsValue> { let timestamp = opt_js_to_timestamp_micros(timestamp)?; relay_scope_api::pop_scope( @@ -408,6 +442,7 @@ pub fn pop_scope( .handle_uuid(&handle.inner.uuid) .output_opt(opt_js_to_json(&output)?) .timestamp_opt(timestamp) + .metadata_opt(opt_js_to_json(&metadata)?) .build(), ) .map_err(to_js_err) @@ -493,6 +528,7 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&then_uuid) + .metadata_opt(Some(otel_status_metadata("OK", None))) .build(), ); resolved @@ -503,6 +539,10 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&catch_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(js_error_message(&rejected)), + ))) .build(), ); // Re-throw by returning a rejected promise @@ -522,6 +562,7 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata("OK", None))) .build(), ); Ok(js_sys::Promise::resolve(&val)) @@ -531,6 +572,10 @@ pub fn with_scope( let _ = relay_scope_api::pop_scope( relay_scope_api::PopScopeParams::builder() .handle_uuid(&scope_uuid) + .metadata_opt(Some(otel_status_metadata( + "ERROR", + Some(js_error_message(&err)), + ))) .build(), ); Err(err) diff --git a/crates/wasm/tests-js/llm_tests.mjs b/crates/wasm/tests-js/llm_tests.mjs index e0fa47da..3b869a39 100644 --- a/crates/wasm/tests-js/llm_tests.mjs +++ b/crates/wasm/tests-js/llm_tests.mjs @@ -318,6 +318,73 @@ test('WebAssembly llm execute flow works from the generated Node package', async } }); +test('WebAssembly llmCallExecute adds OTEL status metadata to end events', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = unique('wasm_llm_status_sub'); + + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await wasm.llmCallExecute( + 'wasm_llm_status_ok', + makeLlmRequest(), + async () => ({ + role: 'assistant', + content: 'ok', + tool_calls: [], + }), + null, + 0, + null, + { + caller: 'wasm-llm', + 'otel.status_code': 'USER', + }, + 'demo-model', + ); + + await assert.rejects( + () => + wasm.llmCallExecute( + 'wasm_llm_status_error', + makeLlmRequest(), + async () => { + throw new Error('wasm llm failure'); + }, + null, + 0, + null, + { + caller: 'wasm-llm-error', + }, + 'demo-model', + ), + /wasm llm failure/, + ); + + const okEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_llm_status_ok', + ), + ); + const errorEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_llm_status_error', + ), + ); + + assert.equal(okEvent.metadata.caller, 'wasm-llm'); + assert.equal(okEvent.metadata['otel.status_code'], 'OK'); + assert.equal(errorEvent.metadata.caller, 'wasm-llm-error'); + assert.equal(errorEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEvent.metadata['otel.status_message'], /wasm llm failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + test('WebAssembly llm stream flow works from the generated Node package', async () => { const request = { headers: { diff --git a/crates/wasm/tests-js/scope_tests.mjs b/crates/wasm/tests-js/scope_tests.mjs index c6b263a5..36dca863 100644 --- a/crates/wasm/tests-js/scope_tests.mjs +++ b/crates/wasm/tests-js/scope_tests.mjs @@ -4,7 +4,14 @@ import assert from 'node:assert/strict'; import { test } from 'node:test'; -import { currentScope, resetScopeStack, SCOPE_ATTR_PARALLEL, SCOPE_ATTR_RELOCATABLE, wasm } from './test_support.mjs'; +import { + currentScope, + resetScopeStack, + SCOPE_ATTR_PARALLEL, + SCOPE_ATTR_RELOCATABLE, + waitFor, + wasm, +} from './test_support.mjs'; test('WebAssembly scope stack exposes the generated root scope handle', () => { const stack = resetScopeStack(); @@ -81,6 +88,51 @@ test('WebAssembly pushScope supports nullable inputs and root parent handles', ( } }); +test('WebAssembly popScope accepts end metadata and merges with scope metadata', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_scope_end_metadata_sub'; + let scope; + let popped = false; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + scope = wasm.pushScope('wasm_scope_end_metadata', wasm.ScopeType.Function, null, 0, null, { + a: 1, + b: 2, + c: 3, + }); + wasm.popScope(scope, null, undefined, { + c: 3.5, + d: 4, + }); + popped = true; + + const endEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_scope_end_metadata', + ), + ); + assert.deepEqual(endEvent.metadata, { + a: 1, + b: 2, + c: 3.5, + d: 4, + }); + } finally { + wasm.deregisterSubscriber(subscriberName); + if (scope) { + if (!popped) { + wasm.popScope(scope); + } + scope.free(); + } + stack.free(); + } +}); + test('WebAssembly withScope returns callback data for synchronous callbacks', async () => { const stack = resetScopeStack(); @@ -111,6 +163,63 @@ test('WebAssembly withScope returns callback data for synchronous callbacks', as } }); +test('WebAssembly withScope records OK status metadata', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_with_scope_status_ok_sub'; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + const result = await wasm.withScope('wasm_with_scope_status_ok', wasm.ScopeType.Function, () => 'done'); + assert.equal(result, 'done'); + + const endEvent = await waitFor(() => + events.find( + (event) => + event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_with_scope_status_ok', + ), + ); + assert.equal(endEvent.metadata['otel.status_code'], 'OK'); + assert.equal(Object.hasOwn(endEvent.metadata, 'otel.status_message'), false); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + +test('WebAssembly withScope records ERROR status metadata on rejection', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = 'wasm_with_scope_status_error_sub'; + + wasm.deregisterSubscriber(subscriberName); + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await assert.rejects( + () => + wasm.withScope('wasm_with_scope_status_error', wasm.ScopeType.Tool, async () => { + throw new Error('wasm scope failure'); + }), + /wasm scope failure/, + ); + + const endEvent = await waitFor(() => + events.find( + (event) => + event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_with_scope_status_error', + ), + ); + assert.equal(endEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(endEvent.metadata['otel.status_message'], /wasm scope failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); + test('WebAssembly withScope supports async callbacks', async () => { const stack = resetScopeStack(); diff --git a/crates/wasm/tests-js/tools_tests.mjs b/crates/wasm/tests-js/tools_tests.mjs index d0756520..2ddf2f1f 100644 --- a/crates/wasm/tests-js/tools_tests.mjs +++ b/crates/wasm/tests-js/tools_tests.mjs @@ -161,3 +161,70 @@ test('WebAssembly tool execute runs through the generated Node package flow', as wasm.deregisterToolRequestIntercept(toolInterceptName); } }); + +test('WebAssembly toolCallExecute adds OTEL status metadata to end events', async () => { + const stack = resetScopeStack(); + const events = []; + const subscriberName = unique('wasm_tool_status_sub'); + + wasm.registerSubscriber(subscriberName, (event) => events.push(event)); + + try { + await wasm.toolCallExecute( + 'wasm_tool_status_ok', + { + value: 1, + }, + async () => ({ + ok: true, + }), + null, + 0, + null, + { + caller: 'wasm-tool', + 'otel.status_code': 'USER', + }, + ); + + await assert.rejects( + () => + wasm.toolCallExecute( + 'wasm_tool_status_error', + { + value: 2, + }, + async () => { + throw new Error('wasm tool failure'); + }, + null, + 0, + null, + { + caller: 'wasm-tool-error', + }, + ), + /wasm tool failure/, + ); + + const okEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_tool_status_ok', + ), + ); + const errorEvent = await waitFor(() => + events.find( + (event) => event.kind === 'scope' && event.scope_category === 'end' && event.name === 'wasm_tool_status_error', + ), + ); + + assert.equal(okEvent.metadata.caller, 'wasm-tool'); + assert.equal(okEvent.metadata['otel.status_code'], 'OK'); + assert.equal(errorEvent.metadata.caller, 'wasm-tool-error'); + assert.equal(errorEvent.metadata['otel.status_code'], 'ERROR'); + assert.match(errorEvent.metadata['otel.status_message'], /wasm tool failure/); + } finally { + wasm.deregisterSubscriber(subscriberName); + stack.free(); + } +}); diff --git a/crates/wasm/tests/integration/context_tests.rs b/crates/wasm/tests/integration/context_tests.rs index 363d2705..05a505fc 100644 --- a/crates/wasm/tests/integration/context_tests.rs +++ b/crates/wasm/tests/integration/context_tests.rs @@ -34,7 +34,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } // =========================================================================== diff --git a/crates/wasm/tests/integration/llm_tests.rs b/crates/wasm/tests/integration/llm_tests.rs index ffbc9b17..fb333736 100644 --- a/crates/wasm/tests/integration/llm_tests.rs +++ b/crates/wasm/tests/integration/llm_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn llm_call( diff --git a/crates/wasm/tests/integration/scope_local_tests.rs b/crates/wasm/tests/integration/scope_local_tests.rs index 3d1d5d2c..59383b84 100644 --- a/crates/wasm/tests/integration/scope_local_tests.rs +++ b/crates/wasm/tests/integration/scope_local_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } async fn tool_call_execute( diff --git a/crates/wasm/tests/integration/scope_tests.rs b/crates/wasm/tests/integration/scope_tests.rs index 802b2ff0..c0d75330 100644 --- a/crates/wasm/tests/integration/scope_tests.rs +++ b/crates/wasm/tests/integration/scope_tests.rs @@ -67,11 +67,11 @@ fn with_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn pop_scope_with_output(handle: &ScopeHandle, output: JsValue) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, output, None) + nemo_relay_wasm::api::pop_scope(handle, output, None, JsValue::NULL) } fn event( @@ -557,6 +557,49 @@ fn test_subscriber_receives_scope_output_payload() { js_sys::eval("delete globalThis.__wasm_scope_end_events").unwrap(); } +#[wasm_bindgen_test] +fn test_pop_scope_merges_end_metadata() { + js_sys::eval("globalThis.__wasm_scope_end_metadata_events = []; true").unwrap(); + let cb = js_fn1( + "event", + "if (event.kind === 'scope' && event.scope_category === 'end') globalThis.__wasm_scope_end_metadata_events.push(event)", + ); + register_subscriber("wasm_scope_end_metadata_collector", cb).unwrap(); + + let scope = push_scope( + "metadata_scope", + ScopeType::Function, + None, + None, + JsValue::NULL, + parse_json(r#"{"a":1,"b":2,"c":3}"#), + ) + .unwrap(); + nemo_relay_wasm::api::pop_scope( + &scope, + JsValue::NULL, + None, + parse_json(r#"{"c":3.5,"d":4}"#), + ) + .unwrap(); + + let events = js_sys::eval("globalThis.__wasm_scope_end_metadata_events").unwrap(); + let arr = js_sys::Array::from(&events); + assert_eq!(arr.length(), 1, "Expected one scope end event"); + + let event = arr.get(0); + let metadata = js_sys::Reflect::get(&event, &"metadata".into()).unwrap(); + let metadata_json = serde_wasm_bindgen::from_value::(metadata) + .expect("metadata should be JSON"); + assert_eq!( + metadata_json, + serde_json::json!({"a": 1, "b": 2, "c": 3.5, "d": 4}) + ); + + deregister_subscriber("wasm_scope_end_metadata_collector").unwrap(); + js_sys::eval("delete globalThis.__wasm_scope_end_metadata_events").unwrap(); +} + #[wasm_bindgen_test] fn test_event_mark() { js_sys::eval("globalThis.__wasm_mark_events = []; true").unwrap(); diff --git a/crates/wasm/tests/integration/tools_tests.rs b/crates/wasm/tests/integration/tools_tests.rs index 9befbd22..355a5327 100644 --- a/crates/wasm/tests/integration/tools_tests.rs +++ b/crates/wasm/tests/integration/tools_tests.rs @@ -49,7 +49,7 @@ fn push_scope( } fn pop_scope(handle: &ScopeHandle) -> Result<(), JsValue> { - nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None) + nemo_relay_wasm::api::pop_scope(handle, JsValue::NULL, None, JsValue::NULL) } fn tool_call( diff --git a/crates/wasm/tests/unit/api_tests.rs b/crates/wasm/tests/unit/api_tests.rs index 23639fc9..5e75da53 100644 --- a/crates/wasm/tests/unit/api_tests.rs +++ b/crates/wasm/tests/unit/api_tests.rs @@ -277,6 +277,6 @@ fn scope_stack_and_lifecycle_wrappers_round_trip_natively() { ) .unwrap(); - pop_scope(&child, JsValue::NULL, None).unwrap(); + pop_scope(&child, JsValue::NULL, None, JsValue::NULL).unwrap(); assert_eq!(get_handle().unwrap().name(), "root"); } From a3c2688d0fde95fb5cf16cf48d6c7c2290e58a0f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 09:22:43 -0700 Subject: [PATCH 22/29] Simplify FFI and Go bindings at the cost of API compatibility Signed-off-by: David Gardner --- crates/ffi/nemo_relay.h | 39 +++----------------- crates/ffi/src/api/scope.rs | 45 ++--------------------- crates/ffi/tests/integration/api_tests.rs | 2 +- crates/ffi/tests/unit/api/core_tests.rs | 14 +++---- crates/ffi/tests/unit/api_tests.rs | 2 +- go/nemo_relay/nemo_relay.go | 5 +-- 6 files changed, 18 insertions(+), 89 deletions(-) diff --git a/crates/ffi/nemo_relay.h b/crates/ffi/nemo_relay.h index c0106bdd..4da5189c 100644 --- a/crates/ffi/nemo_relay.h +++ b/crates/ffi/nemo_relay.h @@ -1404,36 +1404,9 @@ NemoRelayStatus nemo_relay_push_scope(const char *name, * - `handle`: The current top-of-stack scope handle to pop. * - `output_json`: Optional null-terminated JSON string exported as semantic * scope output on the end event, or null. - * - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end - * event, or null to use the runtime default end timestamp. - * - * # Errors - * Returns `InvalidJson` for invalid output JSON, `InvalidArg` when - * `timestamp_unix_micros` is outside the supported timestamp range, or an - * error status when `handle` is not the current top scope. - * - * # Safety - * `handle` must be a valid, non-null `FfiScopeHandle` pointer. `output_json` and - * `timestamp_unix_micros` may be null; when non-null, optional pointers must - * be valid for reads for the duration of the call. - */ -NemoRelayStatus nemo_relay_pop_scope(const struct FfiScopeHandle *handle, - const char *output_json, - const int64_t *timestamp_unix_micros); - -/** - * Pop a scope from the scope stack by its handle with end metadata. - * - * This emits a scope End event and removes scope-local registrations owned by - * the popped scope. Incoming metadata is merged over metadata stored on the - * scope handle. - * - * # Parameters - * - `handle`: The current top-of-stack scope handle to pop. - * - `output_json`: Optional null-terminated JSON string exported as semantic - * scope output on the end event, or null. * - `metadata_json`: Optional null-terminated JSON metadata string recorded - * on the end event, or null. + * on the end event, or null. Incoming metadata is merged over metadata + * stored on the scope handle. * - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end * event, or null to use the runtime default end timestamp. * @@ -1447,10 +1420,10 @@ NemoRelayStatus nemo_relay_pop_scope(const struct FfiScopeHandle *handle, * pointer arguments may be null; when non-null, they must be valid for reads * for the duration of the call. */ -NemoRelayStatus nemo_relay_pop_scope_with_metadata(const struct FfiScopeHandle *handle, - const char *output_json, - const char *metadata_json, - const int64_t *timestamp_unix_micros); +NemoRelayStatus nemo_relay_pop_scope(const struct FfiScopeHandle *handle, + const char *output_json, + const char *metadata_json, + const int64_t *timestamp_unix_micros); /** * Emit a named lifecycle event. diff --git a/crates/ffi/src/api/scope.rs b/crates/ffi/src/api/scope.rs index 1f9bb647..6e956102 100644 --- a/crates/ffi/src/api/scope.rs +++ b/crates/ffi/src/api/scope.rs @@ -139,39 +139,9 @@ pub unsafe extern "C" fn nemo_relay_push_scope( /// - `handle`: The current top-of-stack scope handle to pop. /// - `output_json`: Optional null-terminated JSON string exported as semantic /// scope output on the end event, or null. -/// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end -/// event, or null to use the runtime default end timestamp. -/// -/// # Errors -/// Returns `InvalidJson` for invalid output JSON, `InvalidArg` when -/// `timestamp_unix_micros` is outside the supported timestamp range, or an -/// error status when `handle` is not the current top scope. -/// -/// # Safety -/// `handle` must be a valid, non-null `FfiScopeHandle` pointer. `output_json` and -/// `timestamp_unix_micros` may be null; when non-null, optional pointers must -/// be valid for reads for the duration of the call. -#[unsafe(no_mangle)] -pub unsafe extern "C" fn nemo_relay_pop_scope( - handle: *const FfiScopeHandle, - output_json: *const c_char, - timestamp_unix_micros: *const i64, -) -> NemoRelayStatus { - unsafe { pop_scope_impl(handle, output_json, std::ptr::null(), timestamp_unix_micros) } -} - -/// Pop a scope from the scope stack by its handle with end metadata. -/// -/// This emits a scope End event and removes scope-local registrations owned by -/// the popped scope. Incoming metadata is merged over metadata stored on the -/// scope handle. -/// -/// # Parameters -/// - `handle`: The current top-of-stack scope handle to pop. -/// - `output_json`: Optional null-terminated JSON string exported as semantic -/// scope output on the end event, or null. /// - `metadata_json`: Optional null-terminated JSON metadata string recorded -/// on the end event, or null. +/// on the end event, or null. Incoming metadata is merged over metadata +/// stored on the scope handle. /// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end /// event, or null to use the runtime default end timestamp. /// @@ -185,16 +155,7 @@ pub unsafe extern "C" fn nemo_relay_pop_scope( /// pointer arguments may be null; when non-null, they must be valid for reads /// for the duration of the call. #[unsafe(no_mangle)] -pub unsafe extern "C" fn nemo_relay_pop_scope_with_metadata( - handle: *const FfiScopeHandle, - output_json: *const c_char, - metadata_json: *const c_char, - timestamp_unix_micros: *const i64, -) -> NemoRelayStatus { - unsafe { pop_scope_impl(handle, output_json, metadata_json, timestamp_unix_micros) } -} - -unsafe fn pop_scope_impl( +pub unsafe extern "C" fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, metadata_json: *const c_char, diff --git a/crates/ffi/tests/integration/api_tests.rs b/crates/ffi/tests/integration/api_tests.rs index 1eb04f77..4dc165e0 100644 --- a/crates/ffi/tests/integration/api_tests.rs +++ b/crates/ffi/tests/integration/api_tests.rs @@ -110,7 +110,7 @@ unsafe fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, ) -> NemoRelayStatus { - unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null()) } + unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null(), ptr::null()) } } unsafe fn nemo_relay_event( diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index bf44bf46..82e3ab18 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -558,7 +558,7 @@ fn test_ffi_error_paths_and_scope_stack() { } #[test] -fn test_ffi_pop_scope_with_metadata_merges_scope_metadata() { +fn test_ffi_pop_scope_merges_scope_metadata() { let _lock = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner()); reset_globals(); @@ -594,12 +594,7 @@ fn test_ffi_pop_scope_with_metadata_merges_scope_metadata() { NemoRelayStatus::Ok ); assert_eq!( - api::nemo_relay_pop_scope_with_metadata( - scope, - ptr::null(), - end_metadata.as_ptr(), - ptr::null(), - ), + api::nemo_relay_pop_scope(scope, ptr::null(), end_metadata.as_ptr(), ptr::null(),), NemoRelayStatus::Ok ); assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); @@ -938,7 +933,7 @@ fn test_ffi_manual_lifecycle_timestamps_accept_unix_micros() { ); assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), ×tamps[6]), + api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null(), ×tamps[6]), NemoRelayStatus::Ok ); @@ -1147,10 +1142,11 @@ fn test_ffi_manual_lifecycle_timestamps_reject_out_of_range_unix_micros() { assert_invalid_timestamp(api::nemo_relay_pop_scope( scope, ptr::null(), + ptr::null(), &invalid_timestamp, )); assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null()), + api::nemo_relay_pop_scope(scope, ptr::null(), ptr::null(), ptr::null()), NemoRelayStatus::Ok ); diff --git a/crates/ffi/tests/unit/api_tests.rs b/crates/ffi/tests/unit/api_tests.rs index 2f3af3b2..8f4e728a 100644 --- a/crates/ffi/tests/unit/api_tests.rs +++ b/crates/ffi/tests/unit/api_tests.rs @@ -98,7 +98,7 @@ unsafe fn nemo_relay_pop_scope( handle: *const FfiScopeHandle, output_json: *const c_char, ) -> NemoRelayStatus { - unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null()) } + unsafe { api::nemo_relay_pop_scope(handle, output_json, ptr::null(), ptr::null()) } } unsafe fn nemo_relay_event( diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index fd1012c0..474fe14b 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -44,8 +44,7 @@ typedef void (*NemoRelayFreeFn)(void* user_data); // Core API extern int32_t nemo_relay_get_handle(FfiScopeHandle** out); extern int32_t nemo_relay_push_scope(const char* name, int32_t scope_type, const FfiScopeHandle* parent, uint32_t attributes, const char* data_json, const char* metadata_json, const char* input_json, const int64_t* timestamp_unix_micros, FfiScopeHandle** out); -extern int32_t nemo_relay_pop_scope(const FfiScopeHandle* handle, const char* output_json, const int64_t* timestamp_unix_micros); -extern int32_t nemo_relay_pop_scope_with_metadata(const FfiScopeHandle* handle, const char* output_json, const char* metadata_json, const int64_t* timestamp_unix_micros); +extern int32_t nemo_relay_pop_scope(const FfiScopeHandle* handle, const char* output_json, const char* metadata_json, const int64_t* timestamp_unix_micros); extern int32_t nemo_relay_event(const char* name, const FfiScopeHandle* parent, const char* data_json, const char* metadata_json, const int64_t* timestamp_unix_micros); // Tool lifecycle @@ -555,7 +554,7 @@ func PopScope(handle *ScopeHandle, opts ...ScopeEndOption) error { if o.timestamp != nil { defer C.free(unsafe.Pointer(o.timestamp)) } - return checkStatus(C.nemo_relay_pop_scope_with_metadata(handle.ptr, o.output, o.metadata, o.timestamp)) + return checkStatus(C.nemo_relay_pop_scope(handle.ptr, o.output, o.metadata, o.timestamp)) } // --------------------------------------------------------------------------- From a155d79d155c6ff2d2cad342d1c0f0e4b0a07fda Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 10:51:59 -0700 Subject: [PATCH 23/29] Test for invalid JSON Signed-off-by: David Gardner --- crates/ffi/tests/unit/api/core_tests.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index 82e3ab18..267afe01 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -579,6 +579,7 @@ fn test_ffi_pop_scope_merges_scope_metadata() { let scope_name = cstring("ffi_scope_end_metadata"); let scope_metadata = cstring(r#"{"a":1,"b":2,"c":3}"#); let end_metadata = cstring(r#"{"c":3.5,"d":4}"#); + let invalid_end_metadata = cstring("{"); let mut scope = ptr::null_mut(); assert_eq!( nemo_relay_push_scope( @@ -597,6 +598,10 @@ fn test_ffi_pop_scope_merges_scope_metadata() { api::nemo_relay_pop_scope(scope, ptr::null(), end_metadata.as_ptr(), ptr::null(),), NemoRelayStatus::Ok ); + assert_eq!( + api::nemo_relay_pop_scope(scope, ptr::null(), invalid_end_metadata.as_ptr(), ptr::null()), + NemoRelayStatus::InvalidJson + ); assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); let events = lock_unpoisoned(event_log()).clone(); From 3ff4d163187816a53a1f8748318ccd0051b8dfb2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 11:03:06 -0700 Subject: [PATCH 24/29] Ensure we set the otel error status on the span for errors encountered running a streaming llm call Signed-off-by: David Gardner --- crates/core/src/api/llm.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index f9482024..22fe830a 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -747,7 +747,9 @@ pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Resu Ok(Box::pin(wrapper) as LlmJsonStream) } Err(error) => { - let _ = emit_llm_end_without_output(&handle, metadata); + let end_metadata = + metadata_with_otel_status(metadata, "ERROR", Some(error.to_string())); + let _ = emit_llm_end_without_output(&handle, end_metadata); Err(error) } } From b3e176bc5a49ff94ca033570ccb16a65f5ad49a8 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 11:03:21 -0700 Subject: [PATCH 25/29] Update docstrings Signed-off-by: David Gardner --- crates/python/src/py_api/mod.rs | 1 + python/nemo_relay/_native.pyi | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/python/src/py_api/mod.rs b/crates/python/src/py_api/mod.rs index 4f91be8f..9abb996d 100644 --- a/crates/python/src/py_api/mod.rs +++ b/crates/python/src/py_api/mod.rs @@ -259,6 +259,7 @@ fn push_scope( /// Raises: /// RuntimeError: If the scope is not the current top scope or is not found /// on the stack. +/// ValueError: If ``output`` or ``metadata`` cannot be converted to JSON-compatible data. /// TypeError: If ``timestamp`` is not a ``datetime.datetime``. /// ValueError: If ``timestamp`` is a naive datetime. #[pyfunction] diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index 4c40e180..6c4e2628 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -1178,8 +1178,8 @@ def pop_scope( Exceptional flow: Raises native runtime errors if ``handle`` is not the current scope or - if ``output`` cannot be converted to JSON-compatible data. Raises for - invalid timestamp types or naive datetimes. + if ``output`` or ``metadata`` cannot be converted to JSON-compatible data. + Raises for invalid timestamp types or naive datetimes. """ ... From 0af167405dc38202e7b8f03db6c0b86d36b781a0 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 11:19:09 -0700 Subject: [PATCH 26/29] Formatting Signed-off-by: David Gardner --- crates/ffi/tests/unit/api/core_tests.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index 267afe01..82b65089 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -599,7 +599,12 @@ fn test_ffi_pop_scope_merges_scope_metadata() { NemoRelayStatus::Ok ); assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), invalid_end_metadata.as_ptr(), ptr::null()), + api::nemo_relay_pop_scope( + scope, + ptr::null(), + invalid_end_metadata.as_ptr(), + ptr::null() + ), NemoRelayStatus::InvalidJson ); assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); From c4f00fad840484dac1490e6844c3cbe511529bee Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 12:31:00 -0700 Subject: [PATCH 27/29] Ensure otel span metadata is set in streaming Signed-off-by: David Gardner --- crates/core/src/api/llm.rs | 3 +- crates/core/tests/unit/llm_api_tests.rs | 67 ++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 22fe830a..1dbad3a1 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -735,13 +735,14 @@ pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Resu match execution(intercepted_request).await { Ok(raw_stream) => { + let end_metadata = metadata_with_otel_status(metadata, "OK", None); let wrapper = LlmStreamWrapper::new( raw_stream, handle, collector, finalizer, data, - metadata, + end_metadata, response_codec, ); Ok(Box::pin(wrapper) as LlmJsonStream) diff --git a/crates/core/tests/unit/llm_api_tests.rs b/crates/core/tests/unit/llm_api_tests.rs index 6ee3c199..ffd63d49 100644 --- a/crates/core/tests/unit/llm_api_tests.rs +++ b/crates/core/tests/unit/llm_api_tests.rs @@ -6,9 +6,14 @@ use std::sync::{Arc, Mutex}; use serde_json::json; +use tokio_stream::StreamExt; -use super::{LlmCallExecuteParams, LlmRequest, llm_call_execute}; +use super::{ + LlmCallExecuteParams, LlmRequest, LlmStreamCallExecuteParams, llm_call_execute, + llm_stream_call_execute, +}; use crate::api::event::ScopeCategory; +use crate::api::runtime::LlmJsonStream; use crate::api::runtime::{NemoRelayContextState, global_context}; use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; use crate::error::FlowError; @@ -104,3 +109,63 @@ fn llm_call_execute_adds_otel_status_metadata_to_end_events() { .contains("llm boom") ); } + +#[test] +fn llm_stream_call_execute_adds_otel_status_metadata_to_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-stream-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-ok") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok( + Box::pin(tokio_stream::iter(vec![Ok(json!({"chunk": true}))])) + as LlmJsonStream, + ) + }) + })) + .collector(Box::new(|_chunk| Ok(()))) + .finalizer(Box::new(|| json!({"ok": true}))) + .metadata(json!({"caller": "llm-stream-ok", "otel.status_code": "USER"})) + .build(), + ) + .await + .unwrap(); + + while let Some(chunk) = stream.next().await { + chunk.unwrap(); + } + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-stream-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let success_metadata = events + .iter() + .find(|event| event.0 == "llm-stream-ok") + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing stream end event metadata")); + assert_eq!(success_metadata["caller"], json!("llm-stream-ok")); + assert_eq!(success_metadata["otel.status_code"], json!("OK")); + assert!(success_metadata.get("otel.status_message").is_none()); +} From b1d326b8096b24edaa4bbd545e59935aa04371e8 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 13:18:53 -0700 Subject: [PATCH 28/29] Fix end event for streaming Signed-off-by: David Gardner --- crates/core/src/api/llm.rs | 3 +- crates/core/src/api/mod.rs | 2 +- crates/core/src/stream.rs | 31 ++++--- crates/core/tests/unit/llm_api_tests.rs | 110 ++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 14 deletions(-) diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 1dbad3a1..22fe830a 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -735,14 +735,13 @@ pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Resu match execution(intercepted_request).await { Ok(raw_stream) => { - let end_metadata = metadata_with_otel_status(metadata, "OK", None); let wrapper = LlmStreamWrapper::new( raw_stream, handle, collector, finalizer, data, - end_metadata, + metadata, response_codec, ); Ok(Box::pin(wrapper) as LlmJsonStream) diff --git a/crates/core/src/api/mod.rs b/crates/core/src/api/mod.rs index c0f9cc09..1932063c 100644 --- a/crates/core/src/api/mod.rs +++ b/crates/core/src/api/mod.rs @@ -18,4 +18,4 @@ pub mod subscriber; /// Tool lifecycle helpers and managed execution entry points. pub mod tool; -mod shared; +pub(crate) mod shared; diff --git a/crates/core/src/stream.rs b/crates/core/src/stream.rs index 18eeb326..b3c74f68 100644 --- a/crates/core/src/stream.rs +++ b/crates/core/src/stream.rs @@ -36,6 +36,7 @@ use crate::api::llm::LlmHandle; use crate::api::runtime::NemoRelayContextState; use crate::api::runtime::global_context; use crate::api::runtime::{ScopeStackHandle, current_scope_stack}; +use crate::api::shared::metadata_with_otel_status; use crate::codec::response::AnnotatedLlmResponse; use crate::codec::traits::LlmResponseCodec; use crate::error::Result; @@ -127,14 +128,24 @@ impl LlmStreamWrapper { return; } self.ended = true; - self.emit_end_event(); + self.emit_end_event(self.metadata.clone()); + } + + fn finish_with_status(&mut self, status_code: &'static str, status_message: Option) { + if self.ended { + return; + } + self.ended = true; + let metadata = + metadata_with_otel_status(self.metadata.clone(), status_code, status_message); + self.emit_end_event(metadata); } /// Emit the LLM END event with aggregated response data. /// /// Calls the finalizer to produce the aggregated response, runs sanitize /// response guardrails, and emits the END event. - fn emit_end_event(&mut self) { + fn emit_end_event(&mut self, metadata: Option) { let aggregated = match self.finalizer.take() { Some(finalizer) => finalizer(), None => Json::Null, @@ -163,12 +174,8 @@ impl LlmStreamWrapper { } else { Some(sanitized) }; - let event = state.end_llm_handle( - &self.handle, - data, - self.metadata.clone(), - annotated_response, - ); + let event = + state.end_llm_handle(&self.handle, data, metadata, annotated_response); Some((event, subscribers)) } Err(_) => None, @@ -232,17 +239,19 @@ impl Stream for LlmStreamWrapper { match (this.collector)(raw_chunk.clone()) { Ok(()) => Poll::Ready(Some(Ok(raw_chunk))), Err(e) => { - this.finish(); + let message = e.to_string(); + this.finish_with_status("ERROR", Some(message)); Poll::Ready(Some(Err(e))) } } } Poll::Ready(Some(Err(e))) => { - this.finish(); + let message = e.to_string(); + this.finish_with_status("ERROR", Some(message)); Poll::Ready(Some(Err(e))) } Poll::Ready(None) => { - this.finish(); + this.finish_with_status("OK", None); Poll::Ready(None) } Poll::Pending => Poll::Pending, diff --git a/crates/core/tests/unit/llm_api_tests.rs b/crates/core/tests/unit/llm_api_tests.rs index ffd63d49..24e00236 100644 --- a/crates/core/tests/unit/llm_api_tests.rs +++ b/crates/core/tests/unit/llm_api_tests.rs @@ -169,3 +169,113 @@ fn llm_stream_call_execute_adds_otel_status_metadata_to_end_events() { assert_eq!(success_metadata["otel.status_code"], json!("OK")); assert!(success_metadata.get("otel.status_message").is_none()); } + +#[test] +fn llm_stream_call_execute_adds_otel_error_metadata_to_failed_end_events() { + reset_global(); + + let captured_events = Arc::new(Mutex::new(Vec::<(String, Option)>::new())); + let subscriber_events = captured_events.clone(); + register_subscriber( + "llm-stream-error-status-metadata", + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::End) { + subscriber_events + .lock() + .unwrap() + .push((event.name().to_string(), event.metadata().cloned())); + } + }), + ) + .unwrap(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut upstream_error_stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-upstream-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok(Box::pin(tokio_stream::iter(vec![Err(FlowError::Internal( + "stream boom".to_string(), + ))])) as LlmJsonStream) + }) + })) + .collector(Box::new(|_chunk| Ok(()))) + .finalizer(Box::new(|| json!({"partial": true}))) + .metadata( + json!({"caller": "llm-stream-upstream-error", "otel.status_code": "USER"}), + ) + .build(), + ) + .await + .unwrap(); + let upstream_error = upstream_error_stream.next().await.unwrap().unwrap_err(); + assert!(upstream_error.to_string().contains("stream boom")); + + let mut collector_error_stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("llm-stream-collector-error") + .request(request()) + .func(Arc::new(|_request| { + Box::pin(async { + Ok( + Box::pin(tokio_stream::iter(vec![Ok(json!({"chunk": true}))])) + as LlmJsonStream, + ) + }) + })) + .collector(Box::new(|_chunk| { + Err(FlowError::Internal("collector boom".to_string())) + })) + .finalizer(Box::new(|| json!({"partial": true}))) + .metadata( + json!({"caller": "llm-stream-collector-error", "otel.status_code": "USER"}), + ) + .build(), + ) + .await + .unwrap(); + let collector_error = collector_error_stream.next().await.unwrap().unwrap_err(); + assert!(collector_error.to_string().contains("collector boom")); + }); + + flush_subscribers().unwrap(); + assert!(deregister_subscriber("llm-stream-error-status-metadata").unwrap()); + + let events = captured_events.lock().unwrap(); + let metadata_for = |name: &str| { + events + .iter() + .find(|event| event.0 == name) + .and_then(|event| event.1.as_ref()) + .unwrap_or_else(|| panic!("missing stream end event metadata for {name}")) + }; + + let upstream_error_metadata = metadata_for("llm-stream-upstream-error"); + assert_eq!( + upstream_error_metadata["caller"], + json!("llm-stream-upstream-error") + ); + assert_eq!(upstream_error_metadata["otel.status_code"], json!("ERROR")); + assert!( + upstream_error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("stream boom") + ); + + let collector_error_metadata = metadata_for("llm-stream-collector-error"); + assert_eq!( + collector_error_metadata["caller"], + json!("llm-stream-collector-error") + ); + assert_eq!(collector_error_metadata["otel.status_code"], json!("ERROR")); + assert!( + collector_error_metadata["otel.status_message"] + .as_str() + .unwrap() + .contains("collector boom") + ); +} From 75accdc8f43a1e45b85d71b14296494e62e181c9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 5 Jun 2026 13:48:50 -0700 Subject: [PATCH 29/29] Fix test ordering Signed-off-by: David Gardner --- crates/ffi/tests/unit/api/core_tests.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ffi/tests/unit/api/core_tests.rs b/crates/ffi/tests/unit/api/core_tests.rs index 82b65089..08ce98c2 100644 --- a/crates/ffi/tests/unit/api/core_tests.rs +++ b/crates/ffi/tests/unit/api/core_tests.rs @@ -594,10 +594,6 @@ fn test_ffi_pop_scope_merges_scope_metadata() { ), NemoRelayStatus::Ok ); - assert_eq!( - api::nemo_relay_pop_scope(scope, ptr::null(), end_metadata.as_ptr(), ptr::null(),), - NemoRelayStatus::Ok - ); assert_eq!( api::nemo_relay_pop_scope( scope, @@ -607,6 +603,10 @@ fn test_ffi_pop_scope_merges_scope_metadata() { ), NemoRelayStatus::InvalidJson ); + assert_eq!( + api::nemo_relay_pop_scope(scope, ptr::null(), end_metadata.as_ptr(), ptr::null(),), + NemoRelayStatus::Ok + ); assert_eq!(nemo_relay_flush_subscribers(), NemoRelayStatus::Ok); let events = lock_unpoisoned(event_log()).clone();