feat: agentic-core conversation/responses hydration (ADR-03)#46
Conversation
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Add executor module: rehydration, LLM inference, SSE accumulation, and persistence for both conversation and response stateful flows. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
ashwing
left a comment
There was a problem hiding this comment.
Read through the core source. Structure makes sense: RequestContext for per-turn stuff, ExecutionContext for runtime deps, handlers wrapping store ops. The spawn_blocking trick for JSON parsing on the streaming path is a good call.
Few things I noticed thinking about how function_calls will plug in on top of this:
The streaming path in run_stream doesn't actually stream to the client — it consumes the full SSE response via from_stream(), then yields one big payload.as_responses_chunk() at the end. Works fine for text-only, but clients setting stream=true expect incremental response.output_text.delta events as they arrive. ADR-01 §3 also calls this out explicitly ("SSE stream to the client is interleaved with the tool loop — events go out in real time, not buffered until done"). I'll need to tee the stream for the tool dispatch layer anyway — forward to client while accumulating for tool-call detection. Not blocking this PR on that, just flagging it.
execute() is one pass right now (rehydrate → infer → persist). For function_calls we'll need it to loop: detect tool calls → dispatch → re-enter inference. I'm thinking a LoopDecision enum (Continue/Done/Incomplete) driving re-entry. Would you rather that wrap execute() from the outside, or should we refactor execute() itself to become the loop?
| } | ||
|
|
||
| fn run_stream(ctx: RequestContext, exec_ctx: Arc<ExecutionContext>) -> BoxStream { | ||
| let url = exec_ctx.responses_url(); |
There was a problem hiding this comment.
This accumulates everything before yielding to the caller, so "streaming" here means streaming from upstream but not to the client. Intentional for now? Asking because for tool dispatch I'll need to tap the stream mid-flight — forward deltas to the client while watching for function_call items completing.
There was a problem hiding this comment.
@ashwing yes we need to tap the stream and add in event normalizer to catch reasoning delta and the rest of the SSE types. so initially before we move to rust we were planning to use PydanticAI to handle all the event normalization for us in https://github.com/vllm-project/agentic-api/pull/21/changes#diff-38d6d6323f9401ad47e9230c6d3fc779e2530c09a502cc55d32acde0277f7d89R7
now we need to implement them on rust we can draw inspiration from PydanticAI design and handle many of those objects natively in rust. as you mentioned in the other comment the SSE event types would grow.
we need to design the events and handling them during streaming smartly so that it's easy to maintain while the streaming loop wouldn't regress. so performance is key important point. we need to include this to your proposal implementation to consider the SSE streaming line and normalizing the events.
this is one of the important part I think it can be in a separate module in agentic-core then import it into executor. so the SSE enum in Types could be removed and the SSE handling normalizing the events into separate core module to avoid the accumulator from getting bloated.
what do you think?
There was a problem hiding this comment.
+1 on the separate module approach. Here's the shape I'm thinking:
crates/agentic-core/src/
events/
mod.rs // pub mod normalize; pub mod types;
types.rs // SSEEventType (expanded to 28+ variants) + typed EventPayload enum
normalize.rs // normalize_sse_line(&str) -> EventFrame { event_type, payload }
The normalizer takes a raw data: {...} line and produces a typed EventFrame. The accumulator then pattern-matches on EventFrame instead of doing inline JSON parsing — keeps the streaming loop tight and makes adding new event types a one-line match arm.
I'll look at PydanticAI's StreamedResponse._process_event() for the dispatch table shape — that's basically what we're porting to Rust with zero-copy where possible.
This module has no dependency on the executor, so I can open a PR against main this week while #46 is still in review. Then once #46 lands, the accumulator switches to consuming EventFrame as a follow-up.
I'll start with the function_call event types (response.function_call_arguments.delta, response.output_item.done, response.function_call_arguments.done) since those are the minimum for tool dispatch to work, plus reasoning deltas. Sound good?
| /// | ||
| /// Used by [`run_blocking`] so it can pass the result to [`ResponseAccumulator::from_json`]. | ||
| async fn fetch_response_json( | ||
| upstream_json: String, |
There was a problem hiding this comment.
These two (fetch_response_json + send_inference_request) do the same error mapping (timeout→504, connect fail→502, non-2xx body read). Could share a helper that builds+sends+maps errors, with the callers just differing on .text().await vs .bytes_stream(). Minor — just noticed the duplication.
There was a problem hiding this comment.
thanks. I pushed a commit to refactor.
| /// | ||
| /// Non-`data:` lines, `[DONE]`, and malformed JSON are silently skipped. | ||
| fn process_sse_line(&mut self, line: &str) { | ||
| let Some(data_str) = line.strip_prefix("data: ") else { |
There was a problem hiding this comment.
Right now Other silently drops everything that isn't text message events. For function_calls, response.output_item.done is where we detect a completed tool call in the output. I'll extend this when building dispatch — just noting where the hook goes.
| /// Owns the storage handlers, HTTP client, and LLM endpoint configuration. | ||
| #[derive(Debug)] | ||
| pub struct ExecutionContext { | ||
| pub conv_handler: ConversationHandler, |
There was a problem hiding this comment.
When tool dispatch lands we'll need MCP clients, web search providers, etc. accessible from context. Would you rather grow this struct with optional fields, or pass a separate ToolContext into dispatch_tools()? I'd lean toward the latter to keep this focused on the inference flow.
There was a problem hiding this comment.
I think we should keep context as separate as possible so the tools would have their own ToolContext and then in agent loop we resolve the context. I dont have the full picture in mind now. but to keep modules in core small with their own context or config then later in agentic loop we can handle orchestration of each component.
| /// response generation process. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] | ||
| pub enum SSEEventType { | ||
| /// Response object created; contains initial response metadata. |
There was a problem hiding this comment.
The OpenAI Responses API has 28+ event types. Once function_calls land we'll need at minimum response.function_call_arguments.delta, response.output_item.done, and a few more. Current setup with Other as fallback is fine — just expect this enum to grow.
…dation Reframes the design doc as a hybrid reference: - Acknowledges PR vllm-project#46 (maralbahari) as the base executor loop - Defines clear ownership boundaries (base loop vs tool dispatch) - Organizes into 4 implementation phases, each = one PR - Phase 1 (SSE events) independent of PR vllm-project#46 - Phases 2-4 build on top of PR vllm-project#46 Removes speculative API surface (AgenticState, AgenticConfig, full trait definitions) in favor of concrete code snippets matching actual implementation targets. Keeps just enough detail to execute follow-up PRs without over-specifying. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Phase 1 correctly depends on PR vllm-project#46 (accumulator.rs lives there) - call_inference is sync fn returning lazy stream, not async - persist_response takes explicit handler params (noted) - Native async traits instead of #[async_trait] (Rust 1.85) - Removed undefined ContextSize type, use &str - Phase 2 explicitly non-streaming (streaming gated on Phase 3) - Removed max_iterations redundancy from dispatch_tools params - ADR-01 reference reworded as paraphrase not quote Signed-off-by: Ashwin Giridharan <girida@amazon.com>
…ame entry Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
yes for streaming we need to add the interleave streaming now only works on text-only. the |
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: maral <maralbahari.98@gmail.com>
agentic-core implement agentic loop executor (ADR-03)agentic-core conversation/responses hydration (ADR-03)
Phase 1 now targets a separate `events/` module in agentic-core per @maralbahari's feedback on PR vllm-project#46 — avoids bloating the accumulator and has no PR vllm-project#46 dependency (lands on main directly). Also adds OGX compatibility note to Phase 4 traits and updates the Praxis filter mapping with the event normalizer step. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
agentic-core conversation/responses hydration (ADR-03)agentic-core conversation/responses hydration (ADR-03)
Signed-off-by: maral <maralbahari.98@gmail.com>
|
Note on
Per ADR-03, each of these public functions will be homed in its proper module once the in-flight features are integrated:
These are plain Rust public functions with no gateway-specific API. Once moved, they will be re-exported through |
## Summary Design reference for the `agentic-core` public API extensions. This doc defines what we're building on top of PR #46's base executor loop. **Ownership split:** - @maralbahari — base loop: `execute()`, `rehydrate_conversation()`, `call_inference()`, `persist_response()` (PR #46) - @ashwing — tool dispatch, loop control, streaming tee, executor traits (this doc + follow-up PRs) **Implementation phases (each = one PR):** | Phase | Scope | Depends On | |-------|-------|------------| | 1 | SSEEventType expansion + accumulator FunctionCall detection | nothing (lands on main) | | 2 | `LoopDecision` + `dispatch_tools()` + `execute_loop()` | PR #46 | | 3 | Streaming tee (forward to client + accumulate) | PR #46 | | 4 | Tool executor traits + mock impls (MCP, web_search, vector_store) | Phase 2 | **What changed in this update:** - Removed all code stubs (per @maralbahari's feedback — impl goes in follow-up PRs) - Reframed around PR #46 as the foundation - Reduced from 626 lines to ~230 — just enough to execute, not over-specify - Added Praxis filter mapping table with ownership per step ## Test Plan Design doc only — no code changes. Verification via follow-up implementation PRs with integration tests. --------- Signed-off-by: Ashwin Giridharan <girida@amazon.com>
franciscojavierarceo
left a comment
There was a problem hiding this comment.
hey this looks pretty good, some small adjustments and then i think we're good to go.
| new_items.extend(ctx.new_input_items.into_iter().map(InOutItem::Input)); | ||
| new_items.extend(output_items.into_iter().map(InOutItem::Output)); | ||
|
|
||
| self.store |
There was a problem hiding this comment.
Can we add one more ADR-02 checkpoint test around this path? For non-conversation Responses continuations, each stored response should be a continuation checkpoint: history_item_ids should represent the full ordered model-visible history at that point, not just this turn's new input/output items. Otherwise a 3-turn previous_response_id chain can rehydrate only turn 2 when turn 3 is sent. A good regression test would run three response turns and assert the third upstream request includes turn 1 + turn 2 + the new input.
| conversation_id: None, | ||
| }; | ||
|
|
||
| if !ctx.original_request.store { |
There was a problem hiding this comment.
Can we add coverage for store=false with existing state? store should control whether this new response is persisted, not whether existing previous_response_id / conversation_id context is hydrated for inference. With this early return, store=false + previous_response_id only validates the prior response exists and then forwards just the new input upstream. I'd like tests for store=false + previous_response_id and store=false + conversation_id that assert the upstream request is hydrated but no new state is persisted.
| return Ok(ctx); | ||
| } | ||
|
|
||
| if ctx.original_request.conversation_id.is_some() { |
There was a problem hiding this comment.
Please add an explicit validation test for requests that provide both conversation_id and previous_response_id. Right now conversation hydration silently wins, but the response later still carries the original previous_response_id, which makes it look like that branch was used. We should reject the ambiguous combination before inference.
| let handle = tokio::spawn(async move { | ||
| let app = Router::new() | ||
| .route( | ||
| "/v1/responses", |
There was a problem hiding this comment.
The mock should probably capture/assert the incoming /v1/responses request body for the stateful tests. Right now it ignores _body and just dequeues the next cassette response, so tests can pass even if rehydration sends the wrong history upstream. The new tests for ADR-02 checkpoints, store=false hydration, and ambiguous ID validation will need this to be meaningful.
Introduces `events/` module in agentic-core with: - SSEEventType enum (20 variants covering all Responses API events) - EventPayload enum (typed extraction per event type) - EventFrame struct as the normalized output - normalize_sse_line() pure function: raw SSE data line → typed frame Handles both vLLM (response.done) and OpenAI (response.completed) wire formats. No dependency on the executor module — lands on main independently of PR vllm-project#46. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Move inline unit tests from normalize.rs to tests/event_normalizer_test.rs. All tests use the public API only (normalize_sse_line) so they don't need module-private access. Matches the repo convention established by PR vllm-project#46's cassette tests. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Summary
Implements
agentic-coreexecutor module as specified in ADR-03 — Layered Crate Architecture.Each step of the agentic loop is exposed as a composable public function. usable standalone via
execute()for now only handles text-only messages.Public step functions:
create_conversation(ctx)rehydrate_conversation(request, ctx)RequestContext— load history, resolve toolscall_inference(json, url, client, auth)persist_response(payload, ctx, ch, rh)execute(request, ctx)Either<ResponsePayload, BoxStream>Note on
engine.rs: temporary consolidationrehydrate_conversation,call_inference, andpersist_responseare currently co-located inengine.rsas a deliberate short-term decision to avoid merge conflicts while parallel work lands.Per ADR-03, each of these public functions will be homed in its proper module once the in-flight features are integrated:
rehydrate_conversationconversation.rscall_inferenceinference.rspersist_responsestore.rsThese are plain Rust public functions with no gateway-specific API. Once moved, they will be re-exported through
lib.rsat the same paths (agentic_core::rehydrate_conversation, etc.) so callers remain unaffected.The tidy-up will follow once this PR and its dependents are merged.
Key design decisions matching ADR-03:
execute()is the convenience entry point; each sub-function is independently callable (D1, D2)run_blocking(from_json) andrun_stream(from_streamvia SSE accumulator withspawn_blockingfor CPU-bound JSON parsing)ConversationHandlerandResponseHandlerown all store operations including rehydration.ExecutionContextholds handlers + HTTP client + LLM base URL;conversations_url()andresponses_url()as convenience methodsSSE accumulator (
ResponseAccumulator):from_jsonfor non-streaming path;from_stream(channel +spawn_blocking) for streaming pathresponse.done(vLLM) andresponse.completed(OpenAI) terminal eventsfinalize_current_message()deduplicates text-delta assemblyTest Plan
Unit tests (85 passing):
ResponseAccumulator: delta accumulation, text assignment, usage extraction, status transitionsExecutorError: display formatting, source chaining viathiserrorConversationHandler/ResponseHandler: all methods error correctly on disabled storeIntegration tests (10 passing) — cassette-based, no live model:
stateful_responses_integration(5 tests):previous_response_idchaining, non-streaming and streamingstore=falseresponse rejected asprevious_response_idstateful_conversation_integration(5 tests):conversation_idnon-streaming and streamingprevious_response_id(mixed conversation + response chain)All 113 tests pass. Zero clippy warnings.
Running Tests
Running Benchmarks
Benchmark groups:
execute/blocking/turns Nexecute/streaming/turns Nrehydrate_only/prev_response_depth NDB is cleared between groups to prevent cross-contamination.
Benchmark Results
execute/blockingandexecute/streaming- per-turn cost at each chain depthrehydrate_only- DB read step, no LLM callAnalysis
Per-turn cost is O(1) with respect to chain depth. After the first turn, every subsequent turn costs a constant ~2.8 ms regardless of how many prior turns exist. The prior benchmark showing linear growth to 12 ms at depth 10 was a measurement bug: the seed time was included inside the timed routine. That is now fixed.
Blocking and streaming are within 10% of each other at every depth. SSE accumulation via
spawn_blockingadds no meaningful overhead.Rehydration is flat (~250 us, isolated).
rehydrate_from_responsefetches only the immediate prior response item list via a single indexed query. The ~1.2 ms overhead visible in the full execute benchmarks (vs ~250 us isolated) reflects the DB write (persist) that also occurs each turn.