Skip to content
Open
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/agentic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ uuid = { version = "1", features = ["v7", "serde"] }
axum.workspace = true
criterion = { workspace = true }
serde_yaml = "0.9"
serde_yml = "0.0.12"
tokio = { workspace = true, features = ["full"] }

[[bench]]
Expand Down
5 changes: 5 additions & 0 deletions crates/agentic-core/src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod normalize;
pub mod types;

pub use normalize::normalize_sse_line;
pub use types::{EventFrame, EventPayload, SSEEventType};
183 changes: 183 additions & 0 deletions crates/agentic-core/src/events/normalize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use serde_json::Value;

use super::types::{EventFrame, EventPayload, SSEEventType};
use crate::utils::common::deserialize_from_str_opt;

/// Normalize a raw SSE data line into a typed [`EventFrame`].
///
/// Expects input in the form `data: {...}` (the `data: ` prefix is required).
/// Returns `None` for non-data lines, empty lines, and the `data: [DONE]`
/// sentinel.
#[must_use]
pub fn normalize_sse_line(line: &str) -> Option<EventFrame> {
let data_str = line.strip_prefix("data: ")?;
if data_str == "[DONE]" {
return None;
}

let json: Value = deserialize_from_str_opt(data_str)?;

let event_type = json
.get("type")
.and_then(Value::as_str)
.map_or(SSEEventType::Other, classify_event_type);

let sequence_number = json.get("sequence_number").and_then(Value::as_u64);

let payload = extract_payload(event_type, &json);

Some(EventFrame {
event_type,
payload,
sequence_number,
})
}

/// Map a wire-format event type string to our enum.
fn classify_event_type(type_str: &str) -> SSEEventType {
match type_str {
"response.created" => SSEEventType::ResponseCreated,
"response.in_progress" => SSEEventType::ResponseInProgress,
"response.completed" | "response.done" => SSEEventType::ResponseCompleted,
"response.failed" => SSEEventType::ResponseFailed,
"response.incomplete" => SSEEventType::ResponseIncomplete,
"response.output_item.added" => SSEEventType::OutputItemAdded,
"response.output_item.done" => SSEEventType::OutputItemDone,
"response.output_text.delta" => SSEEventType::OutputTextDelta,
"response.output_text.done" => SSEEventType::OutputTextDone,
"response.content_part.added" => SSEEventType::ContentPartAdded,
"response.content_part.done" => SSEEventType::ContentPartDone,
"response.function_call_arguments.delta" => SSEEventType::FunctionCallArgumentsDelta,
"response.function_call_arguments.done" => SSEEventType::FunctionCallArgumentsDone,
"response.reasoning_summary_text.delta" => SSEEventType::ReasoningSummaryTextDelta,
"response.reasoning_summary_text.done" => SSEEventType::ReasoningSummaryTextDone,
"response.file_search_call.searching" => SSEEventType::FileSearchCallSearching,
"response.file_search_call.completed" => SSEEventType::FileSearchCallCompleted,
"response.web_search_call.searching" => SSEEventType::WebSearchCallSearching,
"response.web_search_call.completed" => SSEEventType::WebSearchCallCompleted,
_ => SSEEventType::Other,
}
}

/// Extract a typed payload from the JSON body based on the classified event type.
fn extract_payload(event_type: SSEEventType, json: &Value) -> EventPayload {
match event_type {
SSEEventType::ResponseCreated
| SSEEventType::ResponseInProgress
| SSEEventType::ResponseCompleted
| SSEEventType::ResponseFailed
| SSEEventType::ResponseIncomplete => extract_response_payload(json),

SSEEventType::OutputItemAdded => extract_output_item_added(json),
SSEEventType::OutputItemDone => extract_output_item_done(json),

SSEEventType::OutputTextDelta => extract_text_delta(json),
SSEEventType::OutputTextDone => extract_text_done(json),

SSEEventType::FunctionCallArgumentsDelta => extract_fn_call_args_delta(json),
SSEEventType::FunctionCallArgumentsDone => extract_fn_call_args_done(json),

SSEEventType::ReasoningSummaryTextDelta => extract_reasoning_delta(json),
SSEEventType::ReasoningSummaryTextDone => extract_reasoning_done(json),

SSEEventType::ContentPartAdded
| SSEEventType::ContentPartDone
| SSEEventType::FileSearchCallSearching
| SSEEventType::FileSearchCallCompleted
| SSEEventType::WebSearchCallSearching
| SSEEventType::WebSearchCallCompleted
| SSEEventType::Other => EventPayload::Raw(json.clone()),
}
}

fn json_str(json: &Value, key: &str) -> String {
json[key].as_str().unwrap_or_default().to_string()
}

fn json_str_opt(json: &Value, key: &str) -> Option<String> {
json[key].as_str().map(ToString::to_string)
}

fn json_u32(json: &Value, key: &str) -> u32 {
u32::try_from(json[key].as_u64().unwrap_or(0)).unwrap_or(u32::MAX)
}

fn extract_response_payload(json: &Value) -> EventPayload {
let response = &json["response"];
EventPayload::Response {
id: json_str(response, "id"),
status: json_str(response, "status"),
usage: response.get("usage").filter(|v| !v.is_null()).cloned(),
}
}

fn extract_output_item_added(json: &Value) -> EventPayload {
let item = &json["item"];
EventPayload::OutputItemAdded {
item_id: json_str(item, "id"),
item_type: json_str(item, "type"),
output_index: json_u32(json, "output_index"),
name: json_str_opt(item, "name"),
call_id: json_str_opt(item, "call_id"),
}
}

fn extract_output_item_done(json: &Value) -> EventPayload {
let item = &json["item"];
EventPayload::OutputItemDone {
item_id: json_str(item, "id"),
item_type: json_str(item, "type"),
output_index: json_u32(json, "output_index"),
item: item.clone(),
}
}

fn extract_text_delta(json: &Value) -> EventPayload {
EventPayload::TextDelta {
delta: json_str(json, "delta"),
item_id: json_str(json, "item_id"),
output_index: json_u32(json, "output_index"),
content_index: json_u32(json, "content_index"),
}
}

fn extract_text_done(json: &Value) -> EventPayload {
EventPayload::TextDone {
text: json_str(json, "text"),
item_id: json_str(json, "item_id"),
output_index: json_u32(json, "output_index"),
}
}

fn extract_fn_call_args_delta(json: &Value) -> EventPayload {
EventPayload::FunctionCallArgsDelta {
delta: json_str(json, "delta"),
call_id: json_str_opt(json, "call_id"),
item_id: json_str(json, "item_id"),
output_index: json_u32(json, "output_index"),
}
}

fn extract_fn_call_args_done(json: &Value) -> EventPayload {
EventPayload::FunctionCallArgsDone {
arguments: json_str(json, "arguments"),
call_id: json_str_opt(json, "call_id"),
item_id: json_str(json, "item_id"),
name: json_str(json, "name"),
output_index: json_u32(json, "output_index"),
}
}

fn extract_reasoning_delta(json: &Value) -> EventPayload {
EventPayload::ReasoningDelta {
delta: json_str(json, "delta"),
item_id: json_str(json, "item_id"),
}
}

fn extract_reasoning_done(json: &Value) -> EventPayload {
EventPayload::ReasoningDone {
text: json_str(json, "text"),
item_id: json_str(json, "item_id"),
}
}
127 changes: 127 additions & 0 deletions crates/agentic-core/src/events/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use serde_json::Value;

/// Classification of SSE event types from the Responses API.
///
/// Covers both the `OpenAI` and vLLM wire formats (e.g. `response.done` vs
/// `response.completed`).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SSEEventType {
// Response lifecycle
ResponseCreated,
ResponseInProgress,
ResponseCompleted,
ResponseFailed,
ResponseIncomplete,

// Output item lifecycle
OutputItemAdded,
OutputItemDone,

// Text content
OutputTextDelta,
OutputTextDone,
ContentPartAdded,
ContentPartDone,

// Function calls
FunctionCallArgumentsDelta,
FunctionCallArgumentsDone,

// Reasoning
ReasoningSummaryTextDelta,
ReasoningSummaryTextDone,

// Built-in tool calls
FileSearchCallSearching,
FileSearchCallCompleted,
WebSearchCallSearching,
WebSearchCallCompleted,

// Catch-all for unrecognized events
Other,
}

/// Typed payload extracted from an SSE event's JSON data.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum EventPayload {
/// `response.created` / `response.completed` / `response.failed` /
/// `response.incomplete` / `response.in_progress`
Response {
id: String,
status: String,
usage: Option<Value>,
},

/// `response.output_item.added`
OutputItemAdded {
item_id: String,
item_type: String,
output_index: u32,
name: Option<String>,
call_id: Option<String>,
},

/// `response.output_item.done`
OutputItemDone {
item_id: String,
item_type: String,
output_index: u32,
item: Value,
},

/// `response.output_text.delta`
TextDelta {
delta: String,
item_id: String,
output_index: u32,
content_index: u32,
},

/// `response.output_text.done`
TextDone {
text: String,
item_id: String,
output_index: u32,
},

/// `response.function_call_arguments.delta`
FunctionCallArgsDelta {
delta: String,
call_id: Option<String>,
item_id: String,
output_index: u32,
},

/// `response.function_call_arguments.done`
FunctionCallArgsDone {
arguments: String,
call_id: Option<String>,
item_id: String,
name: String,
output_index: u32,
},

/// `response.reasoning_summary_text.delta`
ReasoningDelta { delta: String, item_id: String },

/// `response.reasoning_summary_text.done`
ReasoningDone { text: String, item_id: String },

/// Events we classify but don't deeply parse yet.
Raw(Value),

/// No meaningful payload (e.g. unparseable content).
None,
}

/// A normalized SSE event frame — the output of [`normalize_sse_line`].
///
/// [`normalize_sse_line`]: crate::events::normalize::normalize_sse_line
#[derive(Debug, Clone)]
pub struct EventFrame {
pub event_type: SSEEventType,
pub payload: EventPayload,
pub sequence_number: Option<u64>,
}
1 change: 1 addition & 0 deletions crates/agentic-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
pub mod error;
pub mod events;
pub mod executor;
pub mod proxy;
pub mod readiness;
Expand Down
Loading