From 840b90f32cbfbb00c6007d5b10f7f32406b1bb3b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 16:46:58 -0700 Subject: [PATCH 1/9] feat: add SSE event normalizer module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces `events/` module in agentic-core with: - SSEEventType enum (20 variants covering all Responses API events) - EventPayload enum (typed extraction per event type) - EventFrame struct as the normalized output - normalize_sse_line() pure function: raw SSE data line → typed frame Handles both vLLM (response.done) and OpenAI (response.completed) wire formats. No dependency on the executor module — lands on main independently of PR #46. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/events/mod.rs | 5 + crates/agentic-core/src/events/normalize.rs | 376 ++++++++++++++++++ crates/agentic-core/src/events/types.rs | 121 ++++++ crates/agentic-core/src/lib.rs | 1 + .../tests/event_normalizer_test.rs | 103 +++++ 5 files changed, 606 insertions(+) create mode 100644 crates/agentic-core/src/events/mod.rs create mode 100644 crates/agentic-core/src/events/normalize.rs create mode 100644 crates/agentic-core/src/events/types.rs create mode 100644 crates/agentic-core/tests/event_normalizer_test.rs diff --git a/crates/agentic-core/src/events/mod.rs b/crates/agentic-core/src/events/mod.rs new file mode 100644 index 0000000..04befb8 --- /dev/null +++ b/crates/agentic-core/src/events/mod.rs @@ -0,0 +1,5 @@ +pub mod normalize; +pub mod types; + +pub use normalize::normalize_sse_line; +pub use types::{EventFrame, EventPayload, SSEEventType}; diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs new file mode 100644 index 0000000..649b10a --- /dev/null +++ b/crates/agentic-core/src/events/normalize.rs @@ -0,0 +1,376 @@ +use serde_json::Value; + +use super::types::{EventFrame, EventPayload, SSEEventType}; + +/// Normalize a raw SSE data line into a typed [`EventFrame`]. +/// +/// Expects input in the form `data: {...}` (the `data: ` prefix is required). +/// Returns `None` for non-data lines, empty lines, and the `data: [DONE]` +/// sentinel. +#[must_use] +pub fn normalize_sse_line(line: &str) -> Option { + let data_str = line.strip_prefix("data: ")?; + if data_str == "[DONE]" { + return None; + } + + let json: Value = serde_json::from_str(data_str).ok()?; + + let event_type = json + .get("type") + .and_then(Value::as_str) + .map_or(SSEEventType::Other, classify_event_type); + + let sequence_number = json.get("sequence_number").and_then(Value::as_u64); + + let payload = extract_payload(event_type, &json); + + Some(EventFrame { + event_type, + payload, + sequence_number, + }) +} + +/// Map a wire-format event type string to our enum. +fn classify_event_type(type_str: &str) -> SSEEventType { + match type_str { + "response.created" => SSEEventType::ResponseCreated, + "response.in_progress" => SSEEventType::ResponseInProgress, + "response.completed" | "response.done" => SSEEventType::ResponseCompleted, + "response.failed" => SSEEventType::ResponseFailed, + "response.incomplete" => SSEEventType::ResponseIncomplete, + "response.output_item.added" => SSEEventType::OutputItemAdded, + "response.output_item.done" => SSEEventType::OutputItemDone, + "response.output_text.delta" => SSEEventType::OutputTextDelta, + "response.output_text.done" => SSEEventType::OutputTextDone, + "response.content_part.added" => SSEEventType::ContentPartAdded, + "response.content_part.done" => SSEEventType::ContentPartDone, + "response.function_call_arguments.delta" => SSEEventType::FunctionCallArgumentsDelta, + "response.function_call_arguments.done" => SSEEventType::FunctionCallArgumentsDone, + "response.reasoning_summary_text.delta" => SSEEventType::ReasoningSummaryTextDelta, + "response.reasoning_summary_text.done" => SSEEventType::ReasoningSummaryTextDone, + "response.file_search_call.searching" => SSEEventType::FileSearchCallSearching, + "response.file_search_call.completed" => SSEEventType::FileSearchCallCompleted, + "response.web_search_call.searching" => SSEEventType::WebSearchCallSearching, + "response.web_search_call.completed" => SSEEventType::WebSearchCallCompleted, + _ => SSEEventType::Other, + } +} + +/// Extract a typed payload from the JSON body based on the classified event type. +fn extract_payload(event_type: SSEEventType, json: &Value) -> EventPayload { + match event_type { + SSEEventType::ResponseCreated + | SSEEventType::ResponseInProgress + | SSEEventType::ResponseCompleted + | SSEEventType::ResponseFailed + | SSEEventType::ResponseIncomplete => extract_response_payload(json), + + SSEEventType::OutputItemAdded => extract_output_item_added(json), + SSEEventType::OutputItemDone => extract_output_item_done(json), + + SSEEventType::OutputTextDelta => extract_text_delta(json), + SSEEventType::OutputTextDone => extract_text_done(json), + + SSEEventType::FunctionCallArgumentsDelta => extract_fn_call_args_delta(json), + SSEEventType::FunctionCallArgumentsDone => extract_fn_call_args_done(json), + + SSEEventType::ReasoningSummaryTextDelta | SSEEventType::ReasoningSummaryTextDone => { + extract_reasoning_delta(json) + } + + SSEEventType::ContentPartAdded + | SSEEventType::ContentPartDone + | SSEEventType::FileSearchCallSearching + | SSEEventType::FileSearchCallCompleted + | SSEEventType::WebSearchCallSearching + | SSEEventType::WebSearchCallCompleted + | SSEEventType::Other => EventPayload::Raw(json.clone()), + } +} + +fn index_u32(json: &Value, key: &str) -> u32 { + u32::try_from(json[key].as_u64().unwrap_or(0)).unwrap_or(u32::MAX) +} + +fn extract_response_payload(json: &Value) -> EventPayload { + let response = &json["response"]; + EventPayload::Response { + id: response["id"].as_str().unwrap_or_default().to_string(), + status: response["status"].as_str().unwrap_or_default().to_string(), + usage: response.get("usage").filter(|v| !v.is_null()).cloned(), + } +} + +fn extract_output_item_added(json: &Value) -> EventPayload { + let item = &json["item"]; + EventPayload::OutputItemAdded { + item_id: item["id"].as_str().unwrap_or_default().to_string(), + item_type: item["type"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + } +} + +fn extract_output_item_done(json: &Value) -> EventPayload { + let item = &json["item"]; + EventPayload::OutputItemDone { + item_id: item["id"].as_str().unwrap_or_default().to_string(), + item_type: item["type"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + item: item.clone(), + } +} + +fn extract_text_delta(json: &Value) -> EventPayload { + EventPayload::TextDelta { + delta: json["delta"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + content_index: index_u32(json, "content_index"), + } +} + +fn extract_text_done(json: &Value) -> EventPayload { + EventPayload::TextDone { + text: json["text"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + } +} + +fn extract_fn_call_args_delta(json: &Value) -> EventPayload { + EventPayload::FunctionCallArgsDelta { + delta: json["delta"].as_str().unwrap_or_default().to_string(), + call_id: json["call_id"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + } +} + +fn extract_fn_call_args_done(json: &Value) -> EventPayload { + EventPayload::FunctionCallArgsDone { + arguments: json["arguments"].as_str().unwrap_or_default().to_string(), + call_id: json["call_id"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + name: json["name"].as_str().unwrap_or_default().to_string(), + output_index: index_u32(json, "output_index"), + } +} + +fn extract_reasoning_delta(json: &Value) -> EventPayload { + EventPayload::ReasoningDelta { + delta: json["delta"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_text_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"hello","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); + assert_eq!(frame.sequence_number, Some(4)); + if let EventPayload::TextDelta { + delta, + item_id, + output_index, + content_index, + } = &frame.payload + { + assert_eq!(delta, "hello"); + assert_eq!(item_id, "msg_1"); + assert_eq!(*output_index, 0); + assert_eq!(*content_index, 0); + } else { + panic!("expected TextDelta payload"); + } + } + + #[test] + fn test_function_call_args_delta() { + let line = r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":","call_id":"call_abc","item_id":"fc_1","output_index":0,"sequence_number":7}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDelta); + assert_eq!(frame.sequence_number, Some(7)); + if let EventPayload::FunctionCallArgsDelta { + delta, + call_id, + item_id, + output_index, + } = &frame.payload + { + assert_eq!(delta, r#"{"city":"#); + assert_eq!(call_id, "call_abc"); + assert_eq!(item_id, "fc_1"); + assert_eq!(*output_index, 0); + } else { + panic!("expected FunctionCallArgsDelta payload"); + } + } + + #[test] + fn test_function_call_args_done() { + let line = r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","call_id":"call_abc","item_id":"fc_1","name":"get_weather","output_index":0,"sequence_number":8}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDone); + if let EventPayload::FunctionCallArgsDone { + arguments, + call_id, + name, + .. + } = &frame.payload + { + assert_eq!(arguments, r#"{"city":"SF"}"#); + assert_eq!(call_id, "call_abc"); + assert_eq!(name, "get_weather"); + } else { + panic!("expected FunctionCallArgsDone payload"); + } + } + + #[test] + fn test_output_item_done() { + let line = r#"data: {"type":"response.output_item.done","item":{"id":"msg_1","type":"message","status":"completed","content":[{"type":"output_text","text":"hi"}]},"output_index":0,"sequence_number":9}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemDone); + if let EventPayload::OutputItemDone { + item_id, + item_type, + item, + .. + } = &frame.payload + { + assert_eq!(item_id, "msg_1"); + assert_eq!(item_type, "message"); + assert_eq!(item["content"][0]["text"].as_str(), Some("hi")); + } else { + panic!("expected OutputItemDone payload"); + } + } + + #[test] + fn test_vllm_response_done_maps_to_completed() { + let line = r#"data: {"type":"response.done","response":{"id":"resp_1","status":"completed","usage":{"total_tokens":10}},"sequence_number":9}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); + if let EventPayload::Response { id, status, usage } = &frame.payload { + assert_eq!(id, "resp_1"); + assert_eq!(status, "completed"); + assert!(usage.is_some()); + } else { + panic!("expected Response payload"); + } + } + + #[test] + fn test_openai_response_completed() { + let line = r#"data: {"type":"response.completed","response":{"id":"resp_2","status":"completed","usage":null},"sequence_number":10}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); + if let EventPayload::Response { id, usage, .. } = &frame.payload { + assert_eq!(id, "resp_2"); + assert!(usage.is_none()); + } else { + panic!("expected Response payload"); + } + } + + #[test] + fn test_done_marker_returns_none() { + assert!(normalize_sse_line("data: [DONE]").is_none()); + } + + #[test] + fn test_non_data_lines_return_none() { + assert!(normalize_sse_line("event: response.created").is_none()); + assert!(normalize_sse_line("").is_none()); + assert!(normalize_sse_line(": comment").is_none()); + assert!(normalize_sse_line("id: 123").is_none()); + } + + #[test] + fn test_unknown_event_type() { + let line = r#"data: {"type":"response.unknown_future_event","foo":"bar"}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::Other); + assert!(matches!(frame.payload, EventPayload::Raw(_))); + } + + #[test] + fn test_malformed_json_returns_none() { + assert!(normalize_sse_line("data: {not valid json}").is_none()); + assert!(normalize_sse_line("data: ").is_none()); + } + + #[test] + fn test_response_created() { + let line = r#"data: {"type":"response.created","response":{"id":"resp_abc","status":"in_progress","usage":null},"sequence_number":0}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCreated); + assert_eq!(frame.sequence_number, Some(0)); + if let EventPayload::Response { id, status, .. } = &frame.payload { + assert_eq!(id, "resp_abc"); + assert_eq!(status, "in_progress"); + } else { + panic!("expected Response payload"); + } + } + + #[test] + fn test_output_item_added_message() { + let line = r#"data: {"type":"response.output_item.added","item":{"id":"msg_1","type":"message","status":"in_progress","content":[]},"output_index":0,"sequence_number":2}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); + if let EventPayload::OutputItemAdded { + item_id, + item_type, + output_index, + } = &frame.payload + { + assert_eq!(item_id, "msg_1"); + assert_eq!(item_type, "message"); + assert_eq!(*output_index, 0); + } else { + panic!("expected OutputItemAdded payload"); + } + } + + #[test] + fn test_output_item_added_function_call() { + let line = r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","status":"in_progress","name":"get_weather","call_id":"call_1","arguments":""},"output_index":1,"sequence_number":5}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); + if let EventPayload::OutputItemAdded { + item_id, + item_type, + output_index, + } = &frame.payload + { + assert_eq!(item_id, "fc_1"); + assert_eq!(item_type, "function_call"); + assert_eq!(*output_index, 1); + } else { + panic!("expected OutputItemAdded payload"); + } + } + + #[test] + fn test_content_part_added_is_raw() { + let line = r#"data: {"type":"response.content_part.added","content_index":0,"item_id":"msg_1","output_index":0,"part":{"type":"output_text","text":""},"sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ContentPartAdded); + assert!(matches!(frame.payload, EventPayload::Raw(_))); + } + + #[test] + fn test_no_sequence_number() { + let line = r#"data: {"type":"response.output_text.delta","delta":"x","item_id":"m","output_index":0,"content_index":0}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.sequence_number, None); + } +} diff --git a/crates/agentic-core/src/events/types.rs b/crates/agentic-core/src/events/types.rs new file mode 100644 index 0000000..ea0aa25 --- /dev/null +++ b/crates/agentic-core/src/events/types.rs @@ -0,0 +1,121 @@ +use serde_json::Value; + +/// Classification of SSE event types from the Responses API. +/// +/// Covers both the `OpenAI` and vLLM wire formats (e.g. `response.done` vs +/// `response.completed`). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SSEEventType { + // Response lifecycle + ResponseCreated, + ResponseInProgress, + ResponseCompleted, + ResponseFailed, + ResponseIncomplete, + + // Output item lifecycle + OutputItemAdded, + OutputItemDone, + + // Text content + OutputTextDelta, + OutputTextDone, + ContentPartAdded, + ContentPartDone, + + // Function calls + FunctionCallArgumentsDelta, + FunctionCallArgumentsDone, + + // Reasoning + ReasoningSummaryTextDelta, + ReasoningSummaryTextDone, + + // Built-in tool calls + FileSearchCallSearching, + FileSearchCallCompleted, + WebSearchCallSearching, + WebSearchCallCompleted, + + // Catch-all for unrecognized events + Other, +} + +/// Typed payload extracted from an SSE event's JSON data. +#[derive(Debug, Clone)] +pub enum EventPayload { + /// `response.created` / `response.completed` / `response.failed` / + /// `response.incomplete` / `response.in_progress` + Response { + id: String, + status: String, + usage: Option, + }, + + /// `response.output_item.added` + OutputItemAdded { + item_id: String, + item_type: String, + output_index: u32, + }, + + /// `response.output_item.done` + OutputItemDone { + item_id: String, + item_type: String, + output_index: u32, + item: Value, + }, + + /// `response.output_text.delta` + TextDelta { + delta: String, + item_id: String, + output_index: u32, + content_index: u32, + }, + + /// `response.output_text.done` + TextDone { + text: String, + item_id: String, + output_index: u32, + }, + + /// `response.function_call_arguments.delta` + FunctionCallArgsDelta { + delta: String, + call_id: String, + item_id: String, + output_index: u32, + }, + + /// `response.function_call_arguments.done` + FunctionCallArgsDone { + arguments: String, + call_id: String, + item_id: String, + name: String, + output_index: u32, + }, + + /// `response.reasoning_summary_text.delta` / + /// `response.reasoning_summary_text.done` + ReasoningDelta { delta: String, item_id: String }, + + /// Events we classify but don't deeply parse yet. + Raw(Value), + + /// No meaningful payload (e.g. unparseable content). + None, +} + +/// A normalized SSE event frame — the output of [`normalize_sse_line`]. +/// +/// [`normalize_sse_line`]: crate::events::normalize::normalize_sse_line +#[derive(Debug, Clone)] +pub struct EventFrame { + pub event_type: SSEEventType, + pub payload: EventPayload, + pub sequence_number: Option, +} diff --git a/crates/agentic-core/src/lib.rs b/crates/agentic-core/src/lib.rs index 700bafb..ef09472 100644 --- a/crates/agentic-core/src/lib.rs +++ b/crates/agentic-core/src/lib.rs @@ -1,6 +1,7 @@ pub mod config; pub mod error; pub mod executor; +pub mod events; pub mod proxy; pub mod readiness; pub mod storage; diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs new file mode 100644 index 0000000..e6943c2 --- /dev/null +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -0,0 +1,103 @@ +use agentic_core::events::{EventPayload, SSEEventType, normalize_sse_line}; + +/// Simulated streaming cassette matching the format of +/// `resp-single-gpt-4o-streaming.yaml` — single turn, text "GLOBE" split +/// across 3 deltas. +const SIMULATED_SSE: &[&str] = &[ + r#"data: {"type":"response.created","response":{"id":"resp_abc","status":"in_progress","usage":null},"sequence_number":0}"#, + r#"data: {"type":"response.in_progress","response":{"id":"resp_abc","status":"in_progress","usage":null},"sequence_number":1}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"msg_1","type":"message","status":"in_progress","content":[]},"output_index":0,"sequence_number":2}"#, + r#"data: {"type":"response.content_part.added","content_index":0,"item_id":"msg_1","output_index":0,"part":{"type":"output_text","text":""},"sequence_number":3}"#, + r#"data: {"type":"response.output_text.delta","content_index":0,"delta":"G","item_id":"msg_1","output_index":0,"sequence_number":4}"#, + r#"data: {"type":"response.output_text.delta","content_index":0,"delta":"LO","item_id":"msg_1","output_index":0,"sequence_number":5}"#, + r#"data: {"type":"response.output_text.delta","content_index":0,"delta":"BE","item_id":"msg_1","output_index":0,"sequence_number":6}"#, + r#"data: {"type":"response.output_text.done","content_index":0,"item_id":"msg_1","output_index":0,"text":"GLOBE","sequence_number":7}"#, + r#"data: {"type":"response.content_part.done","content_index":0,"item_id":"msg_1","output_index":0,"part":{"type":"output_text","text":"GLOBE"},"sequence_number":8}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"msg_1","type":"message","status":"completed","content":[{"type":"output_text","text":"GLOBE"}],"role":"assistant"},"output_index":0,"sequence_number":9}"#, + r#"data: {"type":"response.completed","response":{"id":"resp_abc","status":"completed","usage":{"input_tokens":14,"output_tokens":4,"total_tokens":18}},"sequence_number":10}"#, +]; + +#[test] +fn test_event_distribution() { + let mut counts = std::collections::HashMap::new(); + for line in SIMULATED_SSE { + if let Some(frame) = normalize_sse_line(line) { + *counts.entry(frame.event_type).or_insert(0u32) += 1; + } + } + + assert_eq!(counts.get(&SSEEventType::ResponseCreated), Some(&1)); + assert_eq!(counts.get(&SSEEventType::ResponseInProgress), Some(&1)); + assert_eq!(counts.get(&SSEEventType::OutputItemAdded), Some(&1)); + assert_eq!(counts.get(&SSEEventType::OutputTextDelta), Some(&3)); + assert_eq!(counts.get(&SSEEventType::OutputTextDone), Some(&1)); + assert_eq!(counts.get(&SSEEventType::ContentPartAdded), Some(&1)); + assert_eq!(counts.get(&SSEEventType::ContentPartDone), Some(&1)); + assert_eq!(counts.get(&SSEEventType::OutputItemDone), Some(&1)); + assert_eq!(counts.get(&SSEEventType::ResponseCompleted), Some(&1)); +} + +#[test] +fn test_text_accumulation() { + let mut text = String::new(); + for line in SIMULATED_SSE { + if let Some(frame) = normalize_sse_line(line) { + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + text.push_str(delta); + } + } + } + assert_eq!(text, "GLOBE"); +} + +#[test] +fn test_sequence_numbers_increasing() { + let mut last_seq: Option = None; + for line in SIMULATED_SSE { + if let Some(frame) = normalize_sse_line(line) { + if let Some(seq) = frame.sequence_number { + if let Some(prev) = last_seq { + assert!(seq > prev, "sequence {seq} should be > {prev}"); + } + last_seq = Some(seq); + } + } + } + assert!(last_seq.is_some()); +} + +/// Simulate a function-call streaming session. +#[test] +fn test_function_call_flow() { + let lines = &[ + r#"data: {"type":"response.created","response":{"id":"resp_fc","status":"in_progress","usage":null},"sequence_number":0}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","status":"in_progress","name":"get_weather","call_id":"call_1","arguments":""},"output_index":0,"sequence_number":1}"#, + r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"ci","call_id":"call_1","item_id":"fc_1","output_index":0,"sequence_number":2}"#, + r#"data: {"type":"response.function_call_arguments.delta","delta":"ty\":\"SF\"}","call_id":"call_1","item_id":"fc_1","output_index":0,"sequence_number":3}"#, + r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","call_id":"call_1","item_id":"fc_1","name":"get_weather","output_index":0,"sequence_number":4}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"fc_1","type":"function_call","status":"completed","name":"get_weather","call_id":"call_1","arguments":"{\"city\":\"SF\"}"},"output_index":0,"sequence_number":5}"#, + r#"data: {"type":"response.completed","response":{"id":"resp_fc","status":"completed","usage":{"input_tokens":20,"output_tokens":8,"total_tokens":28}},"sequence_number":6}"#, + ]; + + let mut args_accumulated = String::new(); + let mut final_args = String::new(); + let mut final_name = String::new(); + + for line in lines { + let frame = normalize_sse_line(line).unwrap(); + match &frame.payload { + EventPayload::FunctionCallArgsDelta { delta, .. } => { + args_accumulated.push_str(delta); + } + EventPayload::FunctionCallArgsDone { arguments, name, .. } => { + final_args = arguments.clone(); + final_name = name.clone(); + } + _ => {} + } + } + + assert_eq!(args_accumulated, r#"{"city":"SF"}"#); + assert_eq!(final_args, r#"{"city":"SF"}"#); + assert_eq!(final_name, "get_weather"); +} From 1c9c9d5df3acfc7571eb6711cdd6a0045914462f Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 16:50:39 -0700 Subject: [PATCH 2/9] test: add real vLLM cassette test for function_call events Recorded from google/gemma-4-26B-A4B-it (vLLM v0.21.0). Validates the normalizer handles vLLM's function_call streaming format where delta events omit call_id (only present in output_item.done). Signed-off-by: Ashwin Giridharan --- .../tests/event_normalizer_test.rs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs index e6943c2..804006b 100644 --- a/crates/agentic-core/tests/event_normalizer_test.rs +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -101,3 +101,60 @@ fn test_function_call_flow() { assert_eq!(final_args, r#"{"city":"SF"}"#); assert_eq!(final_name, "get_weather"); } + +/// Real vLLM output captured from `google/gemma-4-26B-A4B-it` on 2026-06-09. +/// Key differences from `OpenAI`: no `call_id` in delta events, different id format. +#[test] +fn test_real_vllm_function_call_stream() { + let lines = &[ + r#"data: {"response":{"id":"resp_938d583bbec02940","created_at":1781048957,"status":"in_progress","output":[],"model":"google/gemma-4-26B-A4B-it","object":"response"},"sequence_number":0,"type":"response.created"}"#, + r#"data: {"response":{"id":"resp_938d583bbec02940","status":"in_progress"},"sequence_number":1,"type":"response.in_progress"}"#, + r#"data: {"item":{"arguments":"","call_id":"call_92fd766dcc21a19c","name":"get_weather","type":"function_call","id":"8c5375b5b08d666c","status":"in_progress"},"output_index":0,"sequence_number":2,"type":"response.output_item.added"}"#, + r#"data: {"delta":"{\"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":3,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":"city","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":4,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":"\":","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":5,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":" \"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":6,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":"San","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":7,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":" Francisco","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":8,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":"\"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":9,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"delta":"}","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":10,"type":"response.function_call_arguments.delta"}"#, + r#"data: {"arguments":"{\"city\": \"San Francisco\"}","item_id":"8c5375b5b08d666c","name":"get_weather","output_index":0,"sequence_number":11,"type":"response.function_call_arguments.done"}"#, + r#"data: {"item":{"arguments":"{\"city\": \"San Francisco\"}","call_id":"call_92fd766dcc21a19c","name":"get_weather","type":"function_call","id":"8c5375b5b08d666c","status":"completed"},"output_index":0,"sequence_number":12,"type":"response.output_item.done"}"#, + r#"data: {"response":{"id":"resp_938d583bbec02940","status":"completed","usage":{"input_tokens":66,"output_tokens":21,"total_tokens":87}},"sequence_number":13,"type":"response.completed"}"#, + ]; + + let mut args = String::new(); + let mut final_name = String::new(); + let mut event_types = Vec::new(); + + for line in lines { + let frame = normalize_sse_line(line).expect("all lines should parse"); + event_types.push(frame.event_type); + match &frame.payload { + EventPayload::FunctionCallArgsDelta { delta, .. } => args.push_str(delta), + EventPayload::FunctionCallArgsDone { name, .. } => final_name = name.clone(), + _ => {} + } + } + + assert_eq!(args, r#"{"city": "San Francisco"}"#); + assert_eq!(final_name, "get_weather"); + + assert_eq!(event_types[0], SSEEventType::ResponseCreated); + assert_eq!(event_types[1], SSEEventType::ResponseInProgress); + assert_eq!(event_types[2], SSEEventType::OutputItemAdded); + assert_eq!(event_types[3], SSEEventType::FunctionCallArgumentsDelta); + assert_eq!(event_types[11], SSEEventType::FunctionCallArgumentsDone); + assert_eq!(event_types[12], SSEEventType::OutputItemDone); + assert_eq!(event_types[13], SSEEventType::ResponseCompleted); + + // Verify the output_item.done carries the full function_call item + let done_frame = normalize_sse_line(lines[12]).unwrap(); + if let EventPayload::OutputItemDone { item_type, item, .. } = &done_frame.payload { + assert_eq!(item_type, "function_call"); + assert_eq!(item["name"].as_str(), Some("get_weather")); + assert_eq!(item["call_id"].as_str(), Some("call_92fd766dcc21a19c")); + } else { + panic!("expected OutputItemDone"); + } +} From 0a71c943bad9dea7777131e33dad8922b8565044 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 16:56:13 -0700 Subject: [PATCH 3/9] test: add YAML cassette-driven tests for event normalizer Adds cassette files recorded from live vLLM (gemma-4-26B-A4B-it): - text-only-vllm-gemma4.yaml: single text turn - function-call-vllm-gemma4.yaml: tool use with streaming args Tests load cassettes via serde_yaml (dev-dependency) and validate normalized output matches expected_text / expected_function_call. Signed-off-by: Ashwin Giridharan --- .../events/function-call-vllm-gemma4.yaml | 37 ++++++++++ .../events/text-only-vllm-gemma4.yaml | 19 ++++++ .../tests/event_normalizer_test.rs | 68 +++++++++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 crates/agentic-core/tests/cassettes/events/function-call-vllm-gemma4.yaml create mode 100644 crates/agentic-core/tests/cassettes/events/text-only-vllm-gemma4.yaml diff --git a/crates/agentic-core/tests/cassettes/events/function-call-vllm-gemma4.yaml b/crates/agentic-core/tests/cassettes/events/function-call-vllm-gemma4.yaml new file mode 100644 index 0000000..470d158 --- /dev/null +++ b/crates/agentic-core/tests/cassettes/events/function-call-vllm-gemma4.yaml @@ -0,0 +1,37 @@ +# Recorded from google/gemma-4-26B-A4B-it via vLLM v0.21.0 +# Request: {"model":"google/gemma-4-26B-A4B-it","input":"What is the weather in San Francisco?","tools":[...],"tool_choice":"required","stream":true} +# Date: 2026-06-09 + +model: google/gemma-4-26B-A4B-it +request: + input: "What is the weather in San Francisco?" + tools: + - type: function + name: get_weather + description: "Get current weather" + parameters: + type: object + properties: + city: + type: string + required: ["city"] + tool_choice: required + stream: true +expected_function_call: + name: get_weather + arguments: '{"city": "San Francisco"}' +sse: + - 'data: {"response":{"id":"resp_938d583bbec02940","created_at":1781048957,"status":"in_progress","output":[],"model":"google/gemma-4-26B-A4B-it","object":"response"},"sequence_number":0,"type":"response.created"}' + - 'data: {"response":{"id":"resp_938d583bbec02940","status":"in_progress"},"sequence_number":1,"type":"response.in_progress"}' + - 'data: {"item":{"arguments":"","call_id":"call_92fd766dcc21a19c","name":"get_weather","type":"function_call","id":"8c5375b5b08d666c","status":"in_progress"},"output_index":0,"sequence_number":2,"type":"response.output_item.added"}' + - 'data: {"delta":"{\"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":3,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":"city","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":4,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":"\":","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":5,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":" \"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":6,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":"San","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":7,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":" Francisco","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":8,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":"\"","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":9,"type":"response.function_call_arguments.delta"}' + - 'data: {"delta":"}","item_id":"8c5375b5b08d666c","output_index":0,"sequence_number":10,"type":"response.function_call_arguments.delta"}' + - 'data: {"arguments":"{\"city\": \"San Francisco\"}","item_id":"8c5375b5b08d666c","name":"get_weather","output_index":0,"sequence_number":11,"type":"response.function_call_arguments.done"}' + - 'data: {"item":{"arguments":"{\"city\": \"San Francisco\"}","call_id":"call_92fd766dcc21a19c","name":"get_weather","type":"function_call","id":"8c5375b5b08d666c","status":"completed"},"output_index":0,"sequence_number":12,"type":"response.output_item.done"}' + - 'data: {"response":{"id":"resp_938d583bbec02940","status":"completed","usage":{"input_tokens":66,"output_tokens":21,"total_tokens":87}},"sequence_number":13,"type":"response.completed"}' diff --git a/crates/agentic-core/tests/cassettes/events/text-only-vllm-gemma4.yaml b/crates/agentic-core/tests/cassettes/events/text-only-vllm-gemma4.yaml new file mode 100644 index 0000000..d34731c --- /dev/null +++ b/crates/agentic-core/tests/cassettes/events/text-only-vllm-gemma4.yaml @@ -0,0 +1,19 @@ +# Recorded from google/gemma-4-26B-A4B-it via vLLM v0.21.0 +# Request: {"model":"google/gemma-4-26B-A4B-it","input":"Reply with exactly one word: HELLO","stream":true} +# Date: 2026-06-09 + +model: google/gemma-4-26B-A4B-it +request: + input: "Reply with exactly one word: HELLO" + stream: true +expected_text: "HELLO" +sse: + - 'data: {"response":{"id":"resp_b52e78cb2fb5fef9","created_at":1781048941,"status":"in_progress","output":[],"model":"google/gemma-4-26B-A4B-it","object":"response"},"sequence_number":0,"type":"response.created"}' + - 'data: {"response":{"id":"resp_b52e78cb2fb5fef9","status":"in_progress"},"sequence_number":1,"type":"response.in_progress"}' + - 'data: {"item":{"id":"b659a474ace39990","content":[],"role":"assistant","status":"in_progress","type":"message"},"output_index":0,"sequence_number":2,"type":"response.output_item.added"}' + - 'data: {"content_index":0,"item_id":"b659a474ace39990","output_index":0,"part":{"annotations":[],"text":"","type":"output_text"},"sequence_number":3,"type":"response.content_part.added"}' + - 'data: {"content_index":0,"delta":"HELLO","item_id":"b659a474ace39990","output_index":0,"sequence_number":4,"type":"response.output_text.delta"}' + - 'data: {"content_index":0,"item_id":"b659a474ace39990","output_index":0,"sequence_number":5,"text":"HELLO","type":"response.output_text.done"}' + - 'data: {"content_index":0,"item_id":"b659a474ace39990","output_index":0,"part":{"annotations":[],"text":"HELLO","type":"output_text"},"sequence_number":6,"type":"response.content_part.done"}' + - 'data: {"item":{"id":"b659a474ace39990","content":[{"annotations":[],"text":"HELLO","type":"output_text"}],"role":"assistant","status":"completed","type":"message"},"output_index":0,"sequence_number":7,"type":"response.output_item.done"}' + - 'data: {"response":{"id":"resp_b52e78cb2fb5fef9","status":"completed","usage":{"input_tokens":21,"output_tokens":2,"total_tokens":23}},"sequence_number":8,"type":"response.completed"}' diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs index 804006b..810e25b 100644 --- a/crates/agentic-core/tests/event_normalizer_test.rs +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -1,4 +1,26 @@ use agentic_core::events::{EventPayload, SSEEventType, normalize_sse_line}; +use serde::Deserialize; + +const CASSETTE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/events"); + +#[derive(Deserialize)] +struct EventCassette { + sse: Vec, + expected_text: Option, + expected_function_call: Option, +} + +#[derive(Deserialize)] +struct ExpectedFunctionCall { + name: String, + arguments: String, +} + +fn load_event_cassette(filename: &str) -> EventCassette { + let path = format!("{CASSETTE_DIR}/{filename}"); + let text = std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("read {path}: {e}")); + serde_yaml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) +} /// Simulated streaming cassette matching the format of /// `resp-single-gpt-4o-streaming.yaml` — single turn, text "GLOBE" split @@ -158,3 +180,49 @@ fn test_real_vllm_function_call_stream() { panic!("expected OutputItemDone"); } } + +// --- Cassette-driven tests --- + +#[test] +fn test_cassette_text_only_vllm() { + let cassette = load_event_cassette("text-only-vllm-gemma4.yaml"); + let mut text = String::new(); + let mut parsed_count = 0; + + for line in &cassette.sse { + if let Some(frame) = normalize_sse_line(line) { + parsed_count += 1; + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + text.push_str(delta); + } + } + } + + assert_eq!(text, cassette.expected_text.unwrap()); + assert_eq!(parsed_count, cassette.sse.len(), "all lines should parse"); +} + +#[test] +fn test_cassette_function_call_vllm() { + let cassette = load_event_cassette("function-call-vllm-gemma4.yaml"); + let expected = cassette.expected_function_call.unwrap(); + + let mut args = String::new(); + let mut final_name = String::new(); + let mut parsed_count = 0; + + for line in &cassette.sse { + if let Some(frame) = normalize_sse_line(line) { + parsed_count += 1; + match &frame.payload { + EventPayload::FunctionCallArgsDelta { delta, .. } => args.push_str(delta), + EventPayload::FunctionCallArgsDone { name, .. } => final_name = name.clone(), + _ => {} + } + } + } + + assert_eq!(args, expected.arguments); + assert_eq!(final_name, expected.name); + assert_eq!(parsed_count, cassette.sse.len(), "all lines should parse"); +} From eead6d4c0494ed01b1093481fd5ab460c0c4b112 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 17:02:25 -0700 Subject: [PATCH 4/9] =?UTF-8?q?fix:=20address=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20data=20loss=20and=20API=20ergonomics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Split ReasoningDelta/ReasoningDone (done event reads "text" not "delta") - Add name + call_id to OutputItemAdded (vLLM only provides call_id here) - Make call_id Option in FunctionCallArgsDelta/Done (absent in vLLM) - Add #[non_exhaustive] to SSEEventType and EventPayload - Replace deprecated serde_yaml with serde_yml Signed-off-by: Ashwin Giridharan --- Cargo.lock | 26 ++++++++++++++++++ crates/agentic-core/Cargo.toml | 1 + crates/agentic-core/src/events/normalize.rs | 27 ++++++++++++++----- crates/agentic-core/src/events/types.rs | 14 +++++++--- .../tests/event_normalizer_test.rs | 2 +- 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbcdcf4..463a8e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "serde_yml", "sqlx", "thiserror", "tokio", @@ -1275,6 +1276,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libyml" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980" +dependencies = [ + "anyhow", + "version_check", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2115,6 +2126,21 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serde_yml" +version = "0.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59e2dd588bf1597a252c3b920e0143eb99b0f76e4e082f4c92ce34fbc9e71ddd" +dependencies = [ + "indexmap", + "itoa", + "libyml", + "memchr", + "ryu", + "serde", + "version_check", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/crates/agentic-core/Cargo.toml b/crates/agentic-core/Cargo.toml index 612e0fe..e8650c2 100644 --- a/crates/agentic-core/Cargo.toml +++ b/crates/agentic-core/Cargo.toml @@ -27,6 +27,7 @@ uuid = { version = "1", features = ["v7", "serde"] } axum.workspace = true criterion = { workspace = true } serde_yaml = "0.9" +serde_yml = "0.0.12" tokio = { workspace = true, features = ["full"] } [[bench]] diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs index 649b10a..69d8701 100644 --- a/crates/agentic-core/src/events/normalize.rs +++ b/crates/agentic-core/src/events/normalize.rs @@ -76,9 +76,8 @@ fn extract_payload(event_type: SSEEventType, json: &Value) -> EventPayload { SSEEventType::FunctionCallArgumentsDelta => extract_fn_call_args_delta(json), SSEEventType::FunctionCallArgumentsDone => extract_fn_call_args_done(json), - SSEEventType::ReasoningSummaryTextDelta | SSEEventType::ReasoningSummaryTextDone => { - extract_reasoning_delta(json) - } + SSEEventType::ReasoningSummaryTextDelta => extract_reasoning_delta(json), + SSEEventType::ReasoningSummaryTextDone => extract_reasoning_done(json), SSEEventType::ContentPartAdded | SSEEventType::ContentPartDone @@ -109,6 +108,8 @@ fn extract_output_item_added(json: &Value) -> EventPayload { item_id: item["id"].as_str().unwrap_or_default().to_string(), item_type: item["type"].as_str().unwrap_or_default().to_string(), output_index: index_u32(json, "output_index"), + name: item["name"].as_str().map(ToString::to_string), + call_id: item["call_id"].as_str().map(ToString::to_string), } } @@ -142,7 +143,7 @@ fn extract_text_done(json: &Value) -> EventPayload { fn extract_fn_call_args_delta(json: &Value) -> EventPayload { EventPayload::FunctionCallArgsDelta { delta: json["delta"].as_str().unwrap_or_default().to_string(), - call_id: json["call_id"].as_str().unwrap_or_default().to_string(), + call_id: json["call_id"].as_str().map(ToString::to_string), item_id: json["item_id"].as_str().unwrap_or_default().to_string(), output_index: index_u32(json, "output_index"), } @@ -151,7 +152,7 @@ fn extract_fn_call_args_delta(json: &Value) -> EventPayload { fn extract_fn_call_args_done(json: &Value) -> EventPayload { EventPayload::FunctionCallArgsDone { arguments: json["arguments"].as_str().unwrap_or_default().to_string(), - call_id: json["call_id"].as_str().unwrap_or_default().to_string(), + call_id: json["call_id"].as_str().map(ToString::to_string), item_id: json["item_id"].as_str().unwrap_or_default().to_string(), name: json["name"].as_str().unwrap_or_default().to_string(), output_index: index_u32(json, "output_index"), @@ -165,6 +166,13 @@ fn extract_reasoning_delta(json: &Value) -> EventPayload { } } +fn extract_reasoning_done(json: &Value) -> EventPayload { + EventPayload::ReasoningDone { + text: json["text"].as_str().unwrap_or_default().to_string(), + item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + } +} + #[cfg(test)] mod tests { use super::*; @@ -205,7 +213,7 @@ mod tests { } = &frame.payload { assert_eq!(delta, r#"{"city":"#); - assert_eq!(call_id, "call_abc"); + assert_eq!(call_id.as_deref(), Some("call_abc")); assert_eq!(item_id, "fc_1"); assert_eq!(*output_index, 0); } else { @@ -226,7 +234,7 @@ mod tests { } = &frame.payload { assert_eq!(arguments, r#"{"city":"SF"}"#); - assert_eq!(call_id, "call_abc"); + assert_eq!(call_id.as_deref(), Some("call_abc")); assert_eq!(name, "get_weather"); } else { panic!("expected FunctionCallArgsDone payload"); @@ -330,6 +338,7 @@ mod tests { item_id, item_type, output_index, + .. } = &frame.payload { assert_eq!(item_id, "msg_1"); @@ -349,11 +358,15 @@ mod tests { item_id, item_type, output_index, + name, + call_id, } = &frame.payload { assert_eq!(item_id, "fc_1"); assert_eq!(item_type, "function_call"); assert_eq!(*output_index, 1); + assert_eq!(name.as_deref(), Some("get_weather")); + assert_eq!(call_id.as_deref(), Some("call_1")); } else { panic!("expected OutputItemAdded payload"); } diff --git a/crates/agentic-core/src/events/types.rs b/crates/agentic-core/src/events/types.rs index ea0aa25..0ef0e8b 100644 --- a/crates/agentic-core/src/events/types.rs +++ b/crates/agentic-core/src/events/types.rs @@ -5,6 +5,7 @@ use serde_json::Value; /// Covers both the `OpenAI` and vLLM wire formats (e.g. `response.done` vs /// `response.completed`). #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] pub enum SSEEventType { // Response lifecycle ResponseCreated, @@ -43,6 +44,7 @@ pub enum SSEEventType { /// Typed payload extracted from an SSE event's JSON data. #[derive(Debug, Clone)] +#[non_exhaustive] pub enum EventPayload { /// `response.created` / `response.completed` / `response.failed` / /// `response.incomplete` / `response.in_progress` @@ -57,6 +59,8 @@ pub enum EventPayload { item_id: String, item_type: String, output_index: u32, + name: Option, + call_id: Option, }, /// `response.output_item.done` @@ -85,7 +89,7 @@ pub enum EventPayload { /// `response.function_call_arguments.delta` FunctionCallArgsDelta { delta: String, - call_id: String, + call_id: Option, item_id: String, output_index: u32, }, @@ -93,16 +97,18 @@ pub enum EventPayload { /// `response.function_call_arguments.done` FunctionCallArgsDone { arguments: String, - call_id: String, + call_id: Option, item_id: String, name: String, output_index: u32, }, - /// `response.reasoning_summary_text.delta` / - /// `response.reasoning_summary_text.done` + /// `response.reasoning_summary_text.delta` ReasoningDelta { delta: String, item_id: String }, + /// `response.reasoning_summary_text.done` + ReasoningDone { text: String, item_id: String }, + /// Events we classify but don't deeply parse yet. Raw(Value), diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs index 810e25b..4aff6f0 100644 --- a/crates/agentic-core/tests/event_normalizer_test.rs +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -19,7 +19,7 @@ struct ExpectedFunctionCall { fn load_event_cassette(filename: &str) -> EventCassette { let path = format!("{CASSETTE_DIR}/{filename}"); let text = std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("read {path}: {e}")); - serde_yaml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) + serde_yml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) } /// Simulated streaming cassette matching the format of From 3c3249879b8db5cc8868f54a8a5f8b7610e53293 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 17:18:51 -0700 Subject: [PATCH 5/9] test: add reasoning, parallel calls, and mixed output tests Covers previously untested paths: - ReasoningSummaryTextDelta + ReasoningSummaryTextDone (validates ISSUE-1 fix) - Parallel function calls (multiple output_index in one response) - Mixed text + function_call in same response - call_id recovery from OutputItemAdded - response.failed and response.incomplete - Empty delta, unicode in deltas - File search and web search event classification Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/events/normalize.rs | 91 +++++++++++++++++++ .../tests/event_normalizer_test.rs | 91 +++++++++++++++++++ 2 files changed, 182 insertions(+) diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs index 69d8701..f2f9f0d 100644 --- a/crates/agentic-core/src/events/normalize.rs +++ b/crates/agentic-core/src/events/normalize.rs @@ -386,4 +386,95 @@ mod tests { let frame = normalize_sse_line(line).unwrap(); assert_eq!(frame.sequence_number, None); } + + #[test] + fn test_reasoning_delta() { + let line = r#"data: {"type":"response.reasoning_summary_text.delta","delta":"Let me think","item_id":"rs_1","sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDelta); + if let EventPayload::ReasoningDelta { delta, item_id } = &frame.payload { + assert_eq!(delta, "Let me think"); + assert_eq!(item_id, "rs_1"); + } else { + panic!("expected ReasoningDelta payload"); + } + } + + #[test] + fn test_reasoning_done_reads_text_not_delta() { + let line = r#"data: {"type":"response.reasoning_summary_text.done","text":"Full reasoning summary here","item_id":"rs_1","sequence_number":5}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDone); + if let EventPayload::ReasoningDone { text, item_id } = &frame.payload { + assert_eq!(text, "Full reasoning summary here"); + assert_eq!(item_id, "rs_1"); + } else { + panic!("expected ReasoningDone payload"); + } + } + + #[test] + fn test_response_failed() { + let line = r#"data: {"type":"response.failed","response":{"id":"resp_err","status":"failed","usage":null},"sequence_number":2}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseFailed); + if let EventPayload::Response { id, status, .. } = &frame.payload { + assert_eq!(id, "resp_err"); + assert_eq!(status, "failed"); + } else { + panic!("expected Response payload"); + } + } + + #[test] + fn test_response_incomplete() { + let line = r#"data: {"type":"response.incomplete","response":{"id":"resp_inc","status":"incomplete","usage":{"input_tokens":100,"output_tokens":4096,"total_tokens":4196}},"sequence_number":99}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseIncomplete); + if let EventPayload::Response { status, usage, .. } = &frame.payload { + assert_eq!(status, "incomplete"); + assert!(usage.is_some()); + } else { + panic!("expected Response payload"); + } + } + + #[test] + fn test_empty_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + assert_eq!(delta, ""); + } else { + panic!("expected TextDelta payload"); + } + } + + #[test] + fn test_unicode_in_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"こんにちは 🌍","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + assert_eq!(delta, "こんにちは 🌍"); + } else { + panic!("expected TextDelta payload"); + } + } + + #[test] + fn test_file_search_classification() { + let line = r#"data: {"type":"response.file_search_call.searching","item_id":"fs_1","output_index":0,"sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FileSearchCallSearching); + assert!(matches!(frame.payload, EventPayload::Raw(_))); + } + + #[test] + fn test_web_search_classification() { + let line = r#"data: {"type":"response.web_search_call.completed","item_id":"ws_1","output_index":0,"sequence_number":6}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::WebSearchCallCompleted); + assert!(matches!(frame.payload, EventPayload::Raw(_))); + } } diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs index 4aff6f0..4aac1b3 100644 --- a/crates/agentic-core/tests/event_normalizer_test.rs +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -226,3 +226,94 @@ fn test_cassette_function_call_vllm() { assert_eq!(final_name, expected.name); assert_eq!(parsed_count, cassette.sse.len(), "all lines should parse"); } + +/// Parallel function calls — two tools called in the same response (different `output_index`). +#[test] +fn test_parallel_function_calls() { + let lines = &[ + r#"data: {"type":"response.created","response":{"id":"resp_par","status":"in_progress","usage":null},"sequence_number":0}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","name":"get_weather","call_id":"call_1","arguments":"","status":"in_progress"},"output_index":0,"sequence_number":1}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"fc_2","type":"function_call","name":"get_time","call_id":"call_2","arguments":"","status":"in_progress"},"output_index":1,"sequence_number":2}"#, + r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":\"SF\"}","item_id":"fc_1","output_index":0,"sequence_number":3}"#, + r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"tz\":\"PST\"}","item_id":"fc_2","output_index":1,"sequence_number":4}"#, + r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","item_id":"fc_1","name":"get_weather","output_index":0,"sequence_number":5}"#, + r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"tz\":\"PST\"}","item_id":"fc_2","name":"get_time","output_index":1,"sequence_number":6}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"fc_1","type":"function_call","name":"get_weather","call_id":"call_1","arguments":"{\"city\":\"SF\"}","status":"completed"},"output_index":0,"sequence_number":7}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"fc_2","type":"function_call","name":"get_time","call_id":"call_2","arguments":"{\"tz\":\"PST\"}","status":"completed"},"output_index":1,"sequence_number":8}"#, + r#"data: {"type":"response.completed","response":{"id":"resp_par","status":"completed","usage":{"input_tokens":30,"output_tokens":15,"total_tokens":45}},"sequence_number":9}"#, + ]; + + let mut calls: std::collections::HashMap = std::collections::HashMap::new(); + + for line in lines { + let frame = normalize_sse_line(line).unwrap(); + if let EventPayload::FunctionCallArgsDone { + item_id, + name, + arguments, + .. + } = &frame.payload + { + calls.insert(item_id.clone(), (name.clone(), arguments.clone())); + } + } + + assert_eq!(calls.len(), 2); + assert_eq!(calls["fc_1"], ("get_weather".into(), r#"{"city":"SF"}"#.into())); + assert_eq!(calls["fc_2"], ("get_time".into(), r#"{"tz":"PST"}"#.into())); +} + +/// Mixed response: text message (`output_index`=0) + function call (`output_index`=1). +#[test] +fn test_mixed_text_and_function_call() { + let lines = &[ + r#"data: {"type":"response.created","response":{"id":"resp_mix","status":"in_progress","usage":null},"sequence_number":0}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"msg_1","type":"message","status":"in_progress","content":[]},"output_index":0,"sequence_number":1}"#, + r#"data: {"type":"response.output_text.delta","delta":"Let me check ","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":2}"#, + r#"data: {"type":"response.output_text.delta","delta":"the weather.","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":3}"#, + r#"data: {"type":"response.output_text.done","text":"Let me check the weather.","item_id":"msg_1","output_index":0,"sequence_number":4}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"msg_1","type":"message","status":"completed","content":[{"type":"output_text","text":"Let me check the weather."}]},"output_index":0,"sequence_number":5}"#, + r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","name":"get_weather","call_id":"call_x","arguments":"","status":"in_progress"},"output_index":1,"sequence_number":6}"#, + r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":\"NYC\"}","item_id":"fc_1","output_index":1,"sequence_number":7}"#, + r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"NYC\"}","item_id":"fc_1","name":"get_weather","output_index":1,"sequence_number":8}"#, + r#"data: {"type":"response.output_item.done","item":{"id":"fc_1","type":"function_call","name":"get_weather","call_id":"call_x","arguments":"{\"city\":\"NYC\"}","status":"completed"},"output_index":1,"sequence_number":9}"#, + r#"data: {"type":"response.completed","response":{"id":"resp_mix","status":"completed","usage":{"input_tokens":25,"output_tokens":20,"total_tokens":45}},"sequence_number":10}"#, + ]; + + let mut text = String::new(); + let mut fn_name = String::new(); + let mut fn_args = String::new(); + + for line in lines { + let frame = normalize_sse_line(line).unwrap(); + match &frame.payload { + EventPayload::TextDelta { delta, .. } => text.push_str(delta), + EventPayload::FunctionCallArgsDone { name, arguments, .. } => { + fn_name = name.clone(); + fn_args = arguments.clone(); + } + _ => {} + } + } + + assert_eq!(text, "Let me check the weather."); + assert_eq!(fn_name, "get_weather"); + assert_eq!(fn_args, r#"{"city":"NYC"}"#); +} + +/// Verify `call_id` is recoverable from `OutputItemAdded` for vLLM streams. +#[test] +fn test_call_id_from_output_item_added() { + let line = r#"data: {"type":"response.output_item.added","item":{"arguments":"","call_id":"call_abc123","name":"search","type":"function_call","id":"fc_99","status":"in_progress"},"output_index":0,"sequence_number":2}"#; + let frame = normalize_sse_line(line).unwrap(); + if let EventPayload::OutputItemAdded { + call_id, name, item_id, .. + } = &frame.payload + { + assert_eq!(call_id.as_deref(), Some("call_abc123")); + assert_eq!(name.as_deref(), Some("search")); + assert_eq!(item_id, "fc_99"); + } else { + panic!("expected OutputItemAdded"); + } +} From 9c5c81740144a484243705c4aeda17e93ec443fd Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 17:24:53 -0700 Subject: [PATCH 6/9] refactor: consolidate all tests into integration test file Move inline unit tests from normalize.rs to tests/event_normalizer_test.rs. All tests use the public API only (normalize_sse_line) so they don't need module-private access. Matches the repo convention established by PR #46's cassette tests. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/events/normalize.rs | 306 ----------------- .../tests/event_normalizer_test.rs | 308 ++++++++++++++++++ 2 files changed, 308 insertions(+), 306 deletions(-) diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs index f2f9f0d..e85b495 100644 --- a/crates/agentic-core/src/events/normalize.rs +++ b/crates/agentic-core/src/events/normalize.rs @@ -172,309 +172,3 @@ fn extract_reasoning_done(json: &Value) -> EventPayload { item_id: json["item_id"].as_str().unwrap_or_default().to_string(), } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_text_delta() { - let line = r#"data: {"type":"response.output_text.delta","delta":"hello","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); - assert_eq!(frame.sequence_number, Some(4)); - if let EventPayload::TextDelta { - delta, - item_id, - output_index, - content_index, - } = &frame.payload - { - assert_eq!(delta, "hello"); - assert_eq!(item_id, "msg_1"); - assert_eq!(*output_index, 0); - assert_eq!(*content_index, 0); - } else { - panic!("expected TextDelta payload"); - } - } - - #[test] - fn test_function_call_args_delta() { - let line = r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":","call_id":"call_abc","item_id":"fc_1","output_index":0,"sequence_number":7}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDelta); - assert_eq!(frame.sequence_number, Some(7)); - if let EventPayload::FunctionCallArgsDelta { - delta, - call_id, - item_id, - output_index, - } = &frame.payload - { - assert_eq!(delta, r#"{"city":"#); - assert_eq!(call_id.as_deref(), Some("call_abc")); - assert_eq!(item_id, "fc_1"); - assert_eq!(*output_index, 0); - } else { - panic!("expected FunctionCallArgsDelta payload"); - } - } - - #[test] - fn test_function_call_args_done() { - let line = r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","call_id":"call_abc","item_id":"fc_1","name":"get_weather","output_index":0,"sequence_number":8}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDone); - if let EventPayload::FunctionCallArgsDone { - arguments, - call_id, - name, - .. - } = &frame.payload - { - assert_eq!(arguments, r#"{"city":"SF"}"#); - assert_eq!(call_id.as_deref(), Some("call_abc")); - assert_eq!(name, "get_weather"); - } else { - panic!("expected FunctionCallArgsDone payload"); - } - } - - #[test] - fn test_output_item_done() { - let line = r#"data: {"type":"response.output_item.done","item":{"id":"msg_1","type":"message","status":"completed","content":[{"type":"output_text","text":"hi"}]},"output_index":0,"sequence_number":9}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::OutputItemDone); - if let EventPayload::OutputItemDone { - item_id, - item_type, - item, - .. - } = &frame.payload - { - assert_eq!(item_id, "msg_1"); - assert_eq!(item_type, "message"); - assert_eq!(item["content"][0]["text"].as_str(), Some("hi")); - } else { - panic!("expected OutputItemDone payload"); - } - } - - #[test] - fn test_vllm_response_done_maps_to_completed() { - let line = r#"data: {"type":"response.done","response":{"id":"resp_1","status":"completed","usage":{"total_tokens":10}},"sequence_number":9}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); - if let EventPayload::Response { id, status, usage } = &frame.payload { - assert_eq!(id, "resp_1"); - assert_eq!(status, "completed"); - assert!(usage.is_some()); - } else { - panic!("expected Response payload"); - } - } - - #[test] - fn test_openai_response_completed() { - let line = r#"data: {"type":"response.completed","response":{"id":"resp_2","status":"completed","usage":null},"sequence_number":10}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); - if let EventPayload::Response { id, usage, .. } = &frame.payload { - assert_eq!(id, "resp_2"); - assert!(usage.is_none()); - } else { - panic!("expected Response payload"); - } - } - - #[test] - fn test_done_marker_returns_none() { - assert!(normalize_sse_line("data: [DONE]").is_none()); - } - - #[test] - fn test_non_data_lines_return_none() { - assert!(normalize_sse_line("event: response.created").is_none()); - assert!(normalize_sse_line("").is_none()); - assert!(normalize_sse_line(": comment").is_none()); - assert!(normalize_sse_line("id: 123").is_none()); - } - - #[test] - fn test_unknown_event_type() { - let line = r#"data: {"type":"response.unknown_future_event","foo":"bar"}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::Other); - assert!(matches!(frame.payload, EventPayload::Raw(_))); - } - - #[test] - fn test_malformed_json_returns_none() { - assert!(normalize_sse_line("data: {not valid json}").is_none()); - assert!(normalize_sse_line("data: ").is_none()); - } - - #[test] - fn test_response_created() { - let line = r#"data: {"type":"response.created","response":{"id":"resp_abc","status":"in_progress","usage":null},"sequence_number":0}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ResponseCreated); - assert_eq!(frame.sequence_number, Some(0)); - if let EventPayload::Response { id, status, .. } = &frame.payload { - assert_eq!(id, "resp_abc"); - assert_eq!(status, "in_progress"); - } else { - panic!("expected Response payload"); - } - } - - #[test] - fn test_output_item_added_message() { - let line = r#"data: {"type":"response.output_item.added","item":{"id":"msg_1","type":"message","status":"in_progress","content":[]},"output_index":0,"sequence_number":2}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); - if let EventPayload::OutputItemAdded { - item_id, - item_type, - output_index, - .. - } = &frame.payload - { - assert_eq!(item_id, "msg_1"); - assert_eq!(item_type, "message"); - assert_eq!(*output_index, 0); - } else { - panic!("expected OutputItemAdded payload"); - } - } - - #[test] - fn test_output_item_added_function_call() { - let line = r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","status":"in_progress","name":"get_weather","call_id":"call_1","arguments":""},"output_index":1,"sequence_number":5}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); - if let EventPayload::OutputItemAdded { - item_id, - item_type, - output_index, - name, - call_id, - } = &frame.payload - { - assert_eq!(item_id, "fc_1"); - assert_eq!(item_type, "function_call"); - assert_eq!(*output_index, 1); - assert_eq!(name.as_deref(), Some("get_weather")); - assert_eq!(call_id.as_deref(), Some("call_1")); - } else { - panic!("expected OutputItemAdded payload"); - } - } - - #[test] - fn test_content_part_added_is_raw() { - let line = r#"data: {"type":"response.content_part.added","content_index":0,"item_id":"msg_1","output_index":0,"part":{"type":"output_text","text":""},"sequence_number":3}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ContentPartAdded); - assert!(matches!(frame.payload, EventPayload::Raw(_))); - } - - #[test] - fn test_no_sequence_number() { - let line = r#"data: {"type":"response.output_text.delta","delta":"x","item_id":"m","output_index":0,"content_index":0}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.sequence_number, None); - } - - #[test] - fn test_reasoning_delta() { - let line = r#"data: {"type":"response.reasoning_summary_text.delta","delta":"Let me think","item_id":"rs_1","sequence_number":3}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDelta); - if let EventPayload::ReasoningDelta { delta, item_id } = &frame.payload { - assert_eq!(delta, "Let me think"); - assert_eq!(item_id, "rs_1"); - } else { - panic!("expected ReasoningDelta payload"); - } - } - - #[test] - fn test_reasoning_done_reads_text_not_delta() { - let line = r#"data: {"type":"response.reasoning_summary_text.done","text":"Full reasoning summary here","item_id":"rs_1","sequence_number":5}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDone); - if let EventPayload::ReasoningDone { text, item_id } = &frame.payload { - assert_eq!(text, "Full reasoning summary here"); - assert_eq!(item_id, "rs_1"); - } else { - panic!("expected ReasoningDone payload"); - } - } - - #[test] - fn test_response_failed() { - let line = r#"data: {"type":"response.failed","response":{"id":"resp_err","status":"failed","usage":null},"sequence_number":2}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ResponseFailed); - if let EventPayload::Response { id, status, .. } = &frame.payload { - assert_eq!(id, "resp_err"); - assert_eq!(status, "failed"); - } else { - panic!("expected Response payload"); - } - } - - #[test] - fn test_response_incomplete() { - let line = r#"data: {"type":"response.incomplete","response":{"id":"resp_inc","status":"incomplete","usage":{"input_tokens":100,"output_tokens":4096,"total_tokens":4196}},"sequence_number":99}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::ResponseIncomplete); - if let EventPayload::Response { status, usage, .. } = &frame.payload { - assert_eq!(status, "incomplete"); - assert!(usage.is_some()); - } else { - panic!("expected Response payload"); - } - } - - #[test] - fn test_empty_delta() { - let line = r#"data: {"type":"response.output_text.delta","delta":"","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); - if let EventPayload::TextDelta { delta, .. } = &frame.payload { - assert_eq!(delta, ""); - } else { - panic!("expected TextDelta payload"); - } - } - - #[test] - fn test_unicode_in_delta() { - let line = r#"data: {"type":"response.output_text.delta","delta":"こんにちは 🌍","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; - let frame = normalize_sse_line(line).unwrap(); - if let EventPayload::TextDelta { delta, .. } = &frame.payload { - assert_eq!(delta, "こんにちは 🌍"); - } else { - panic!("expected TextDelta payload"); - } - } - - #[test] - fn test_file_search_classification() { - let line = r#"data: {"type":"response.file_search_call.searching","item_id":"fs_1","output_index":0,"sequence_number":3}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::FileSearchCallSearching); - assert!(matches!(frame.payload, EventPayload::Raw(_))); - } - - #[test] - fn test_web_search_classification() { - let line = r#"data: {"type":"response.web_search_call.completed","item_id":"ws_1","output_index":0,"sequence_number":6}"#; - let frame = normalize_sse_line(line).unwrap(); - assert_eq!(frame.event_type, SSEEventType::WebSearchCallCompleted); - assert!(matches!(frame.payload, EventPayload::Raw(_))); - } -} diff --git a/crates/agentic-core/tests/event_normalizer_test.rs b/crates/agentic-core/tests/event_normalizer_test.rs index 4aac1b3..8a4a128 100644 --- a/crates/agentic-core/tests/event_normalizer_test.rs +++ b/crates/agentic-core/tests/event_normalizer_test.rs @@ -1,6 +1,314 @@ use agentic_core::events::{EventPayload, SSEEventType, normalize_sse_line}; use serde::Deserialize; +// --- Unit tests (per-event-type parsing) --- + +#[test] +fn test_text_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"hello","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); + assert_eq!(frame.sequence_number, Some(4)); + if let EventPayload::TextDelta { + delta, + item_id, + output_index, + content_index, + } = &frame.payload + { + assert_eq!(delta, "hello"); + assert_eq!(item_id, "msg_1"); + assert_eq!(*output_index, 0); + assert_eq!(*content_index, 0); + } else { + panic!("expected TextDelta payload"); + } +} + +#[test] +fn test_function_call_args_delta() { + let line = r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":","call_id":"call_abc","item_id":"fc_1","output_index":0,"sequence_number":7}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDelta); + assert_eq!(frame.sequence_number, Some(7)); + if let EventPayload::FunctionCallArgsDelta { + delta, + call_id, + item_id, + output_index, + } = &frame.payload + { + assert_eq!(delta, r#"{"city":"#); + assert_eq!(call_id.as_deref(), Some("call_abc")); + assert_eq!(item_id, "fc_1"); + assert_eq!(*output_index, 0); + } else { + panic!("expected FunctionCallArgsDelta payload"); + } +} + +#[test] +fn test_function_call_args_done() { + let line = r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","call_id":"call_abc","item_id":"fc_1","name":"get_weather","output_index":0,"sequence_number":8}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FunctionCallArgumentsDone); + if let EventPayload::FunctionCallArgsDone { + arguments, + call_id, + name, + .. + } = &frame.payload + { + assert_eq!(arguments, r#"{"city":"SF"}"#); + assert_eq!(call_id.as_deref(), Some("call_abc")); + assert_eq!(name, "get_weather"); + } else { + panic!("expected FunctionCallArgsDone payload"); + } +} + +#[test] +fn test_output_item_done() { + let line = r#"data: {"type":"response.output_item.done","item":{"id":"msg_1","type":"message","status":"completed","content":[{"type":"output_text","text":"hi"}]},"output_index":0,"sequence_number":9}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemDone); + if let EventPayload::OutputItemDone { + item_id, + item_type, + item, + .. + } = &frame.payload + { + assert_eq!(item_id, "msg_1"); + assert_eq!(item_type, "message"); + assert_eq!(item["content"][0]["text"].as_str(), Some("hi")); + } else { + panic!("expected OutputItemDone payload"); + } +} + +#[test] +fn test_vllm_response_done_maps_to_completed() { + let line = r#"data: {"type":"response.done","response":{"id":"resp_1","status":"completed","usage":{"total_tokens":10}},"sequence_number":9}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); + if let EventPayload::Response { id, status, usage } = &frame.payload { + assert_eq!(id, "resp_1"); + assert_eq!(status, "completed"); + assert!(usage.is_some()); + } else { + panic!("expected Response payload"); + } +} + +#[test] +fn test_openai_response_completed() { + let line = r#"data: {"type":"response.completed","response":{"id":"resp_2","status":"completed","usage":null},"sequence_number":10}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCompleted); + if let EventPayload::Response { id, usage, .. } = &frame.payload { + assert_eq!(id, "resp_2"); + assert!(usage.is_none()); + } else { + panic!("expected Response payload"); + } +} + +#[test] +fn test_done_marker_returns_none() { + assert!(normalize_sse_line("data: [DONE]").is_none()); +} + +#[test] +fn test_non_data_lines_return_none() { + assert!(normalize_sse_line("event: response.created").is_none()); + assert!(normalize_sse_line("").is_none()); + assert!(normalize_sse_line(": comment").is_none()); + assert!(normalize_sse_line("id: 123").is_none()); +} + +#[test] +fn test_unknown_event_type() { + let line = r#"data: {"type":"response.unknown_future_event","foo":"bar"}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::Other); + assert!(matches!(frame.payload, EventPayload::Raw(_))); +} + +#[test] +fn test_malformed_json_returns_none() { + assert!(normalize_sse_line("data: {not valid json}").is_none()); + assert!(normalize_sse_line("data: ").is_none()); +} + +#[test] +fn test_response_created() { + let line = r#"data: {"type":"response.created","response":{"id":"resp_abc","status":"in_progress","usage":null},"sequence_number":0}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseCreated); + assert_eq!(frame.sequence_number, Some(0)); + if let EventPayload::Response { id, status, .. } = &frame.payload { + assert_eq!(id, "resp_abc"); + assert_eq!(status, "in_progress"); + } else { + panic!("expected Response payload"); + } +} + +#[test] +fn test_output_item_added_message() { + let line = r#"data: {"type":"response.output_item.added","item":{"id":"msg_1","type":"message","status":"in_progress","content":[]},"output_index":0,"sequence_number":2}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); + if let EventPayload::OutputItemAdded { + item_id, + item_type, + output_index, + .. + } = &frame.payload + { + assert_eq!(item_id, "msg_1"); + assert_eq!(item_type, "message"); + assert_eq!(*output_index, 0); + } else { + panic!("expected OutputItemAdded payload"); + } +} + +#[test] +fn test_output_item_added_function_call() { + let line = r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","status":"in_progress","name":"get_weather","call_id":"call_1","arguments":""},"output_index":1,"sequence_number":5}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputItemAdded); + if let EventPayload::OutputItemAdded { + item_id, + item_type, + output_index, + name, + call_id, + } = &frame.payload + { + assert_eq!(item_id, "fc_1"); + assert_eq!(item_type, "function_call"); + assert_eq!(*output_index, 1); + assert_eq!(name.as_deref(), Some("get_weather")); + assert_eq!(call_id.as_deref(), Some("call_1")); + } else { + panic!("expected OutputItemAdded payload"); + } +} + +#[test] +fn test_content_part_added_is_raw() { + let line = r#"data: {"type":"response.content_part.added","content_index":0,"item_id":"msg_1","output_index":0,"part":{"type":"output_text","text":""},"sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ContentPartAdded); + assert!(matches!(frame.payload, EventPayload::Raw(_))); +} + +#[test] +fn test_no_sequence_number() { + let line = + r#"data: {"type":"response.output_text.delta","delta":"x","item_id":"m","output_index":0,"content_index":0}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.sequence_number, None); +} + +#[test] +fn test_reasoning_delta() { + let line = r#"data: {"type":"response.reasoning_summary_text.delta","delta":"Let me think","item_id":"rs_1","sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDelta); + if let EventPayload::ReasoningDelta { delta, item_id } = &frame.payload { + assert_eq!(delta, "Let me think"); + assert_eq!(item_id, "rs_1"); + } else { + panic!("expected ReasoningDelta payload"); + } +} + +#[test] +fn test_reasoning_done_reads_text_not_delta() { + let line = r#"data: {"type":"response.reasoning_summary_text.done","text":"Full reasoning summary here","item_id":"rs_1","sequence_number":5}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ReasoningSummaryTextDone); + if let EventPayload::ReasoningDone { text, item_id } = &frame.payload { + assert_eq!(text, "Full reasoning summary here"); + assert_eq!(item_id, "rs_1"); + } else { + panic!("expected ReasoningDone payload"); + } +} + +#[test] +fn test_response_failed() { + let line = r#"data: {"type":"response.failed","response":{"id":"resp_err","status":"failed","usage":null},"sequence_number":2}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseFailed); + if let EventPayload::Response { id, status, .. } = &frame.payload { + assert_eq!(id, "resp_err"); + assert_eq!(status, "failed"); + } else { + panic!("expected Response payload"); + } +} + +#[test] +fn test_response_incomplete() { + let line = r#"data: {"type":"response.incomplete","response":{"id":"resp_inc","status":"incomplete","usage":{"input_tokens":100,"output_tokens":4096,"total_tokens":4196}},"sequence_number":99}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::ResponseIncomplete); + if let EventPayload::Response { status, usage, .. } = &frame.payload { + assert_eq!(status, "incomplete"); + assert!(usage.is_some()); + } else { + panic!("expected Response payload"); + } +} + +#[test] +fn test_empty_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::OutputTextDelta); + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + assert_eq!(delta, ""); + } else { + panic!("expected TextDelta payload"); + } +} + +#[test] +fn test_unicode_in_delta() { + let line = r#"data: {"type":"response.output_text.delta","delta":"こんにちは 🌍","item_id":"msg_1","output_index":0,"content_index":0,"sequence_number":4}"#; + let frame = normalize_sse_line(line).unwrap(); + if let EventPayload::TextDelta { delta, .. } = &frame.payload { + assert_eq!(delta, "こんにちは 🌍"); + } else { + panic!("expected TextDelta payload"); + } +} + +#[test] +fn test_file_search_classification() { + let line = + r#"data: {"type":"response.file_search_call.searching","item_id":"fs_1","output_index":0,"sequence_number":3}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::FileSearchCallSearching); + assert!(matches!(frame.payload, EventPayload::Raw(_))); +} + +#[test] +fn test_web_search_classification() { + let line = + r#"data: {"type":"response.web_search_call.completed","item_id":"ws_1","output_index":0,"sequence_number":6}"#; + let frame = normalize_sse_line(line).unwrap(); + assert_eq!(frame.event_type, SSEEventType::WebSearchCallCompleted); + assert!(matches!(frame.payload, EventPayload::Raw(_))); +} + +// --- Helpers and constants for integration tests --- + const CASSETTE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/events"); #[derive(Deserialize)] From e62b7c69a6980ab5f2f67afc4c7758f6d17070fa Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 22:21:45 -0700 Subject: [PATCH 7/9] fix: use deserialize_from_str_opt from utils/common MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback from @maralbahari — use the crate's own deserialization helper instead of calling serde_json::from_str directly. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/events/normalize.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs index e85b495..1216386 100644 --- a/crates/agentic-core/src/events/normalize.rs +++ b/crates/agentic-core/src/events/normalize.rs @@ -1,6 +1,7 @@ use serde_json::Value; use super::types::{EventFrame, EventPayload, SSEEventType}; +use crate::utils::common::deserialize_from_str_opt; /// Normalize a raw SSE data line into a typed [`EventFrame`]. /// @@ -14,7 +15,7 @@ pub fn normalize_sse_line(line: &str) -> Option { return None; } - let json: Value = serde_json::from_str(data_str).ok()?; + let json: Value = deserialize_from_str_opt(data_str)?; let event_type = json .get("type") From a016c6f849ab26f6ed07b5b1ad7d4d4d3279fd5a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 9 Jun 2026 22:42:04 -0700 Subject: [PATCH 8/9] refactor: extract json_str/json_str_opt/json_u32 helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduces repetition in extraction functions — consistent pattern for accessing JSON fields with default or optional semantics. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/events/normalize.rs | 70 ++++++++++++--------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/crates/agentic-core/src/events/normalize.rs b/crates/agentic-core/src/events/normalize.rs index 1216386..159a00f 100644 --- a/crates/agentic-core/src/events/normalize.rs +++ b/crates/agentic-core/src/events/normalize.rs @@ -90,15 +90,23 @@ fn extract_payload(event_type: SSEEventType, json: &Value) -> EventPayload { } } -fn index_u32(json: &Value, key: &str) -> u32 { +fn json_str(json: &Value, key: &str) -> String { + json[key].as_str().unwrap_or_default().to_string() +} + +fn json_str_opt(json: &Value, key: &str) -> Option { + json[key].as_str().map(ToString::to_string) +} + +fn json_u32(json: &Value, key: &str) -> u32 { u32::try_from(json[key].as_u64().unwrap_or(0)).unwrap_or(u32::MAX) } fn extract_response_payload(json: &Value) -> EventPayload { let response = &json["response"]; EventPayload::Response { - id: response["id"].as_str().unwrap_or_default().to_string(), - status: response["status"].as_str().unwrap_or_default().to_string(), + id: json_str(response, "id"), + status: json_str(response, "status"), usage: response.get("usage").filter(|v| !v.is_null()).cloned(), } } @@ -106,70 +114,70 @@ fn extract_response_payload(json: &Value) -> EventPayload { fn extract_output_item_added(json: &Value) -> EventPayload { let item = &json["item"]; EventPayload::OutputItemAdded { - item_id: item["id"].as_str().unwrap_or_default().to_string(), - item_type: item["type"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), - name: item["name"].as_str().map(ToString::to_string), - call_id: item["call_id"].as_str().map(ToString::to_string), + item_id: json_str(item, "id"), + item_type: json_str(item, "type"), + output_index: json_u32(json, "output_index"), + name: json_str_opt(item, "name"), + call_id: json_str_opt(item, "call_id"), } } fn extract_output_item_done(json: &Value) -> EventPayload { let item = &json["item"]; EventPayload::OutputItemDone { - item_id: item["id"].as_str().unwrap_or_default().to_string(), - item_type: item["type"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), + item_id: json_str(item, "id"), + item_type: json_str(item, "type"), + output_index: json_u32(json, "output_index"), item: item.clone(), } } fn extract_text_delta(json: &Value) -> EventPayload { EventPayload::TextDelta { - delta: json["delta"].as_str().unwrap_or_default().to_string(), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), - content_index: index_u32(json, "content_index"), + delta: json_str(json, "delta"), + item_id: json_str(json, "item_id"), + output_index: json_u32(json, "output_index"), + content_index: json_u32(json, "content_index"), } } fn extract_text_done(json: &Value) -> EventPayload { EventPayload::TextDone { - text: json["text"].as_str().unwrap_or_default().to_string(), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), + text: json_str(json, "text"), + item_id: json_str(json, "item_id"), + output_index: json_u32(json, "output_index"), } } fn extract_fn_call_args_delta(json: &Value) -> EventPayload { EventPayload::FunctionCallArgsDelta { - delta: json["delta"].as_str().unwrap_or_default().to_string(), - call_id: json["call_id"].as_str().map(ToString::to_string), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), + delta: json_str(json, "delta"), + call_id: json_str_opt(json, "call_id"), + item_id: json_str(json, "item_id"), + output_index: json_u32(json, "output_index"), } } fn extract_fn_call_args_done(json: &Value) -> EventPayload { EventPayload::FunctionCallArgsDone { - arguments: json["arguments"].as_str().unwrap_or_default().to_string(), - call_id: json["call_id"].as_str().map(ToString::to_string), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), - name: json["name"].as_str().unwrap_or_default().to_string(), - output_index: index_u32(json, "output_index"), + arguments: json_str(json, "arguments"), + call_id: json_str_opt(json, "call_id"), + item_id: json_str(json, "item_id"), + name: json_str(json, "name"), + output_index: json_u32(json, "output_index"), } } fn extract_reasoning_delta(json: &Value) -> EventPayload { EventPayload::ReasoningDelta { - delta: json["delta"].as_str().unwrap_or_default().to_string(), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + delta: json_str(json, "delta"), + item_id: json_str(json, "item_id"), } } fn extract_reasoning_done(json: &Value) -> EventPayload { EventPayload::ReasoningDone { - text: json["text"].as_str().unwrap_or_default().to_string(), - item_id: json["item_id"].as_str().unwrap_or_default().to_string(), + text: json_str(json, "text"), + item_id: json_str(json, "item_id"), } } From 08b6c3f22171d24c01278701ff7b3deef2b743bc Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 10 Jun 2026 12:08:19 -0700 Subject: [PATCH 9/9] fix: alphabetize module declarations in lib.rs Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/agentic-core/src/lib.rs b/crates/agentic-core/src/lib.rs index ef09472..b89988e 100644 --- a/crates/agentic-core/src/lib.rs +++ b/crates/agentic-core/src/lib.rs @@ -1,7 +1,7 @@ pub mod config; pub mod error; -pub mod executor; pub mod events; +pub mod executor; pub mod proxy; pub mod readiness; pub mod storage;