Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c8e83bb
WIP: Untested
dagardner-nv Jun 4, 2026
f9f87d8
WIP: Untested
dagardner-nv Jun 4, 2026
b55c82e
Add setting metadata to the pop scope method in Python bindings, add …
dagardner-nv Jun 4, 2026
6bca305
Set scope otel metadata in callbacks
dagardner-nv Jun 4, 2026
3c384fc
Drop _json_safe in favor of _prepare_lc_payloads
dagardner-nv Jun 4, 2026
562922d
First pass at converting event matadata into span status, WIP: Untested
dagardner-nv Jun 4, 2026
52ed93b
WIP
dagardner-nv Jun 4, 2026
91d9968
Update openinference to set span status
dagardner-nv Jun 4, 2026
83917ca
Helper method for setting status metadata
dagardner-nv Jun 4, 2026
fbd9f44
Set otel status on llm and tool calls
dagardner-nv Jun 4, 2026
4056818
WIP: tests and docstrings
dagardner-nv Jun 4, 2026
449b738
Update tests to expect otel status codes in metadata
dagardner-nv Jun 4, 2026
018394f
Formatting [skip ci]
dagardner-nv Jun 4, 2026
703f13b
Collapse if statement
dagardner-nv Jun 4, 2026
c7d2315
Collapse if statement
dagardner-nv Jun 4, 2026
089ae24
Formatting [skip ci]
dagardner-nv Jun 4, 2026
0121120
Move repeated status code to a method [skip ci]
dagardner-nv Jun 4, 2026
b0fac25
Formatting [skip ci]
dagardner-nv Jun 4, 2026
4f09a5a
Add setting metadata to node js [skip ci]
dagardner-nv Jun 4, 2026
5288755
WIP: Add FFI & Go, TODO: determine if breaking API compat in nemo_rel…
dagardner-nv Jun 4, 2026
2211f18
Expose updating metadata when popping a scope, and setting span statu…
dagardner-nv Jun 5, 2026
a3c2688
Simplify FFI and Go bindings at the cost of API compatibility
dagardner-nv Jun 5, 2026
6d9b0e3
Merge branch 'main' of github.com:NVIDIA/NeMo-Relay into david-lc-04-…
dagardner-nv Jun 5, 2026
a155d79
Test for invalid JSON
dagardner-nv Jun 5, 2026
3ff4d16
Ensure we set the otel error status on the span for errors encountere…
dagardner-nv Jun 5, 2026
b3e176b
Update docstrings
dagardner-nv Jun 5, 2026
0af1674
Formatting
dagardner-nv Jun 5, 2026
c4f00fa
Ensure otel span metadata is set in streaming
dagardner-nv Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions crates/core/src/api/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::api::runtime::{
use crate::api::scope::event;
use crate::api::scope::{EmitMarkEventParams, ScopeHandle};
use crate::api::shared::{
ensure_runtime_owner, resolve_parent_uuid, run_request_intercepts_with_codec,
snapshot_event_subscribers,
ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid,
run_request_intercepts_with_codec, snapshot_event_subscribers,
};
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::response::AnnotatedLlmResponse;
Expand Down Expand Up @@ -587,19 +587,22 @@ pub async fn llm_call_execute(params: LlmCallExecuteParams) -> Result<Json> {
.as_ref()
.and_then(|codec| codec.decode_response(&response).ok())
.map(Arc::new);
let end_metadata = metadata_with_otel_status(metadata, "OK", None);
llm_call_end(
LlmCallEndParams::builder()
.handle(&handle)
.response(response.clone())
.data_opt(data)
.metadata_opt(metadata)
.metadata_opt(end_metadata)
.annotated_response_opt(annotated_response)
.build(),
)?;
Ok(response)
}
Err(error) => {
let _ = emit_llm_end_without_output(&handle, metadata);
let end_metadata =
metadata_with_otel_status(metadata, "ERROR", Some(error.to_string()));
let _ = emit_llm_end_without_output(&handle, end_metadata);
Err(error)
}
}
Expand Down Expand Up @@ -732,19 +735,22 @@ pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Resu

match execution(intercepted_request).await {
Ok(raw_stream) => {
let end_metadata = metadata_with_otel_status(metadata, "OK", None);
let wrapper = LlmStreamWrapper::new(
raw_stream,
handle,
collector,
finalizer,
data,
metadata,
end_metadata,
response_codec,
);
Ok(Box::pin(wrapper) as LlmJsonStream)
}
Err(error) => {
let _ = emit_llm_end_without_output(&handle, metadata);
let end_metadata =
metadata_with_otel_status(metadata, "ERROR", Some(error.to_string()));
let _ = emit_llm_end_without_output(&handle, end_metadata);
Err(error)
}
}
Expand Down Expand Up @@ -831,3 +837,7 @@ pub fn llm_conditional_execution(request: &LlmRequest) -> Result<()> {
}
Ok(())
}

#[cfg(test)]
#[path = "../../tests/unit/llm_api_tests.rs"]
mod tests;
14 changes: 12 additions & 2 deletions crates/core/src/api/runtime/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,30 @@ impl NemoRelayContextState {
/// # Parameters
/// - `handle`: Scope handle to serialize into an event.
/// - `data`: Optional data payload returned from the scope.
/// - `metadata`: Optional metadata payload merged over `handle.metadata`.
///
/// # Returns
/// A scope-end [`Event`] derived from the provided handle.
pub fn end_scope_handle(&self, handle: &ScopeHandle, data: Option<Json>) -> Event {
pub fn end_scope_handle(
&self,
handle: &ScopeHandle,
data: Option<Json>,
metadata: Option<Json>,
) -> Event {
self.build_scope_end_event(
EndScopeHandleParams::builder()
.handle(handle)
.data_opt(data)
.metadata_opt(metadata)
.build(),
)
}

/// Build a scope-end event from builder parameters.
///
/// The `metadata` payload is merged over the metadata already stored on
/// the handle.
///
/// # Parameters
/// - `params`: Scope end-event builder parameters.
///
Expand All @@ -279,7 +289,7 @@ impl NemoRelayContextState {
)
.name(handle.name.as_str())
.data_opt(params.data)
.metadata_opt(handle.metadata.clone())
.metadata_opt(merge_json(handle.metadata.clone(), params.metadata))
.build(),
ScopeCategory::End,
scope_attributes_to_strings(handle.attributes),
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/api/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ pub struct EndScopeHandleParams<'a> {
/// Optional JSON payload exported as the semantic scope output.
#[builder(default)]
pub data: Option<Json>,
/// Optional metadata to be appended to the metadata set when the scope was created.
#[builder(default)]
pub metadata: Option<Json>,
/// Optional timestamp recorded on the emitted end event. When omitted, the
/// runtime records the current UTC time, or one microsecond after the
/// handle start time if the current time is not later.
Expand All @@ -196,6 +199,9 @@ pub struct PopScopeParams<'a> {
/// Optional JSON payload exported as the semantic scope output.
#[builder(default)]
pub output: Option<Json>,
/// Optional JSON payload metadata to be appended to the metadata set when the scope was created.
#[builder(default)]
pub metadata: Option<Json>,
/// Optional timestamp recorded on the emitted end event. When omitted, the
/// runtime records the current UTC time, or one microsecond after the
/// handle start time if the current time is not later.
Expand Down Expand Up @@ -347,6 +353,7 @@ pub fn pop_scope(params: PopScopeParams<'_>) -> Result<()> {
.handle(&scope)
.data_opt(params.output)
.timestamp_opt(params.timestamp)
.metadata_opt(params.metadata)
.build(),
);
(scope, event, subscribers)
Expand Down
20 changes: 20 additions & 0 deletions crates/core/src/api/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::api::scope::ScopeHandle;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::traits::LlmCodec;
use crate::error::{FlowError, Result};
use crate::json::{Json, merge_json};
use crate::shared_runtime::ensure_process_runtime_owner;

pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option<Uuid> {
Expand All @@ -37,6 +38,25 @@ pub(crate) fn ensure_runtime_owner() -> Result<()> {
ensure_process_runtime_owner()
}

pub(crate) fn metadata_with_otel_status(
metadata: Option<Json>,
status_code: &'static str,
status_message: Option<String>,
) -> Option<Json> {
let mut status = serde_json::Map::new();
status.insert(
"otel.status_code".to_string(),
Json::String(status_code.to_string()),
);
if let Some(status_message) = status_message {
status.insert(
"otel.status_message".to_string(),
Json::String(status_message),
);
}
merge_json(metadata, Some(Json::Object(status)))
}

pub(crate) fn run_request_intercepts_with_codec(
name: &str,
request: LlmRequest,
Expand Down
16 changes: 13 additions & 3 deletions crates/core/src/api/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::api::runtime::current_scope_stack;
use crate::api::runtime::global_context;
use crate::api::scope::event;
use crate::api::scope::{EmitMarkEventParams, ScopeHandle};
use crate::api::shared::{ensure_runtime_owner, resolve_parent_uuid, snapshot_event_subscribers};
use crate::api::shared::{
ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid,
snapshot_event_subscribers,
};
use crate::error::{FlowError, Result};
use crate::json::Json;
use bitflags::bitflags;
Expand Down Expand Up @@ -445,18 +448,21 @@ pub async fn tool_call_execute(params: ToolCallExecuteParams) -> Result<Json> {

match execution(intercepted_args).await {
Ok(result) => {
let end_metadata = metadata_with_otel_status(metadata, "OK", None);
tool_call_end(
ToolCallEndParams::builder()
.handle(&handle)
.result(result.clone())
.data_opt(data)
.metadata_opt(metadata)
.metadata_opt(end_metadata)
.build(),
)?;
Ok(result)
}
Err(error) => {
let _ = emit_tool_end_without_output(&handle, metadata);
let end_metadata =
metadata_with_otel_status(metadata, "ERROR", Some(error.to_string()));
let _ = emit_tool_end_without_output(&handle, end_metadata);
Err(error)
}
}
Expand Down Expand Up @@ -542,3 +548,7 @@ pub fn tool_conditional_execution(name: &str, args: &Json) -> Result<()> {
}
Ok(())
}

#[cfg(test)]
#[path = "../../tests/unit/tool_api_tests.rs"]
mod tests;
33 changes: 33 additions & 0 deletions crates/core/src/observability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,36 @@ pub mod openinference;
#[cfg(feature = "otel")]
pub mod otel;
pub mod plugin_component;

#[cfg(any(feature = "otel", feature = "openinference"))]
pub(crate) fn set_span_status_from_event_metadata<S>(span: &mut S, event: &crate::api::event::Event)
where
S: opentelemetry::trace::Span,
{
let Some(metadata) = event.metadata() else {
return;
};
let Some(status_code) = metadata
.get("otel.status_code")
.and_then(crate::json::Json::as_str)
else {
return;
};

let status = match status_code {
"OK" => opentelemetry::trace::Status::Ok,
"ERROR" => opentelemetry::trace::Status::error(
metadata
.get("otel.status_message")
.and_then(crate::json::Json::as_str)
.unwrap_or_default()
.to_string(),
),
"UNSET" => opentelemetry::trace::Status::Unset,
other => {
eprintln!("Unrecognized OTEL status code in event metadata: {other}");
opentelemetry::trace::Status::Unset
Comment thread
dagardner-nv marked this conversation as resolved.
}
};
span.set_status(status);
}
1 change: 1 addition & 0 deletions crates/core/src/observability/openinference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ impl OpenInferenceEventProcessor {
let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
return;
};
super::set_span_status_from_event_metadata(&mut active_span.span, event);
active_span.span.set_attributes(end_attributes(event));
active_span
.span
Expand Down
11 changes: 6 additions & 5 deletions crates/core/src/observability/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ impl OtelEventProcessor {
let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
return;
};

super::set_span_status_from_event_metadata(&mut active_span.span, event);
active_span.span.set_attributes(end_attributes(event));
active_span
.span
Expand Down Expand Up @@ -657,16 +659,15 @@ fn start_attributes(event: &Event) -> Vec<KeyValue> {
fn end_attributes(event: &Event) -> Vec<KeyValue> {
let mut attributes = Vec::new();
push_serialized(&mut attributes, "nemo_relay.end.data_json", event.data());
push_serialized(
&mut attributes,
"nemo_relay.end.metadata_json",
event.metadata(),
);

let metadata = event.metadata();
push_serialized(&mut attributes, "nemo_relay.end.metadata_json", metadata);
push_serialized(
&mut attributes,
"nemo_relay.end.output_json",
event.output(),
);

attributes
}

Expand Down
31 changes: 30 additions & 1 deletion crates/core/tests/unit/context_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::api::runtime::EventSubscriberFn;
use crate::api::runtime::ScopeStack;
use crate::api::runtime::global_context;
use crate::api::runtime::{NemoRelayContextState, flush_subscribers};
use crate::api::scope::{ScopeAttributes, ScopeHandle, ScopeType};
use crate::api::scope::{EndScopeHandleParams, ScopeAttributes, ScopeHandle, ScopeType};
use crate::api::tool::CreateToolHandleParams;
use crate::context::registries::{
merge_execution_intercept_callables, merge_guardrail_entries, merge_intercept_entries,
Expand Down Expand Up @@ -305,6 +305,35 @@ fn context_state_supports_extensions_events_and_builders() {
assert_eq!(events.lock().unwrap().as_slice(), ["mark"]);
}

#[test]
fn scope_end_metadata_merges_with_handle_metadata() {
let state = NemoRelayContextState::new();
let scope = state.create_scope_handle(
crate::api::scope::CreateScopeHandleParams::builder()
.name("agent")
.scope_type(ScopeType::Agent)
.metadata(json!({"a": 1, "b": 2, "c": 3}))
.build(),
);

let scope_end = state.build_scope_end_event(
EndScopeHandleParams::builder()
.handle(&scope)
.metadata(json!({"c": 3.5, "d": 4}))
.build(),
);
assert_eq!(
scope_end.metadata(),
Some(&json!({"a": 1, "b": 2, "c": 3.5, "d": 4}))
);

let scope_end = state.end_scope_handle(&scope, None, Some(json!({"c": 5, "e": 6})));
assert_eq!(
scope_end.metadata(),
Some(&json!({"a": 1, "b": 2, "c": 5, "e": 6}))
);
}

#[test]
fn global_context_is_a_singleton_handle() {
let first = global_context();
Expand Down
Loading