From e4ccb001ee87f1457f760e2f2a8014b2ebc764d0 Mon Sep 17 00:00:00 2001 From: Eric Zhang Date: Fri, 22 May 2026 21:09:56 +0000 Subject: [PATCH 1/4] feat(gateway): Add Messages API to HTTP router Some backends like SGLang recently implement the `/v1/messages` API - https://github.com/sgl-project/sglang/pull/18630 This PR adds support for routing to the messages API over HTTP, similar to how `/v1/chat/completions` is handled currently via a `GenerationRequest` trait impl. Signed-off-by: Eric Zhang --- crates/protocols/src/messages.rs | 54 +++++++++++++++++++++++- model_gateway/src/routers/http/router.rs | 12 ++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/crates/protocols/src/messages.rs b/crates/protocols/src/messages.rs index 76adcf77e..4d132b5b1 100644 --- a/crates/protocols/src/messages.rs +++ b/crates/protocols/src/messages.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use validator::Validate; -use crate::{skills::MessagesSkillRef, validated::Normalizable}; +use crate::{common::GenerationRequest, skills::MessagesSkillRef, validated::Normalizable}; // ============================================================================ // Request Types @@ -105,6 +105,58 @@ impl CreateMessageRequest { } } +impl GenerationRequest for CreateMessageRequest { + fn is_stream(&self) -> bool { + self.stream.unwrap_or(false) + } + + fn get_model(&self) -> Option<&str> { + Some(&self.model) + } + + fn extract_text_for_routing(&self) -> String { + let mut buffer = String::new(); + let mut has_content = false; + + let push = |s: &str, has_content: &mut bool, buffer: &mut String| { + if s.is_empty() { + return; + } + if *has_content { + buffer.push(' '); + } + buffer.push_str(s); + *has_content = true; + }; + + if let Some(system) = &self.system { + match system { + SystemContent::String(s) => push(s, &mut has_content, &mut buffer), + SystemContent::Blocks(blocks) => { + for block in blocks { + push(&block.text, &mut has_content, &mut buffer); + } + } + } + } + + for msg in &self.messages { + match &msg.content { + InputContent::String(s) => push(s, &mut has_content, &mut buffer), + InputContent::Blocks(blocks) => { + for block in blocks { + if let InputContentBlock::Text(text_block) = block { + push(&text_block.text, &mut has_content, &mut buffer); + } + } + } + } + } + + buffer + } +} + impl Tool { fn matches_tool_choice_name(&self, name: &str) -> bool { match self { diff --git a/model_gateway/src/routers/http/router.rs b/model_gateway/src/routers/http/router.rs index 85f76e2c1..7ac9dbe3c 100644 --- a/model_gateway/src/routers/http/router.rs +++ b/model_gateway/src/routers/http/router.rs @@ -15,6 +15,7 @@ use openai_protocol::{ completion::CompletionRequest, embedding::EmbeddingRequest, generate::GenerateRequest, + messages::CreateMessageRequest, rerank::{RerankRequest, RerankResponse, RerankResult}, responses::ResponsesRequest, transcription::TranscriptionRequest, @@ -1125,6 +1126,17 @@ impl RouterTrait for Router { .await } + async fn route_messages( + &self, + headers: Option<&HeaderMap>, + _tenant_meta: &TenantRequestMeta, + body: &CreateMessageRequest, + model_id: &str, + ) -> Response { + self.route_typed_request(headers, body, "/v1/messages", model_id) + .await + } + async fn route_completion( &self, headers: Option<&HeaderMap>, From e18412ea685be38e366e2d935308de5362bb74a6 Mon Sep 17 00:00:00 2001 From: Eric Zhang Date: Fri, 22 May 2026 21:19:59 +0000 Subject: [PATCH 2/4] test(gateway): cover Messages API HTTP proxy Adds integration tests for the new HTTP `/v1/messages` proxy: - `/v1/messages` mock handler in the shared MockWorker (non-streaming + SSE streaming, Anthropic event sequence). - End-to-end tests via `AppTestContext` for success, streaming, and upstream-error propagation. - Unit tests for the new `GenerationRequest` impl on `CreateMessageRequest` (is_stream, routing-text extraction for string content, text blocks, and image-only / no-text edge case). Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Eric Zhang --- model_gateway/tests/api/messages_api_test.rs | 155 +++++++++++++++++++ model_gateway/tests/api/mod.rs | 1 + model_gateway/tests/common/mock_worker.rs | 116 ++++++++++++++ model_gateway/tests/messages_test.rs | 105 ++++++++++++- 4 files changed, 370 insertions(+), 7 deletions(-) create mode 100644 model_gateway/tests/api/messages_api_test.rs diff --git a/model_gateway/tests/api/messages_api_test.rs b/model_gateway/tests/api/messages_api_test.rs new file mode 100644 index 000000000..0cef07d12 --- /dev/null +++ b/model_gateway/tests/api/messages_api_test.rs @@ -0,0 +1,155 @@ +//! Integration tests for the Anthropic Messages API (`/v1/messages`) +//! against the HTTP backend, which proxies to sglang's native +//! `/v1/messages` endpoint. + +use axum::{ + body::Body, + extract::Request, + http::{header::CONTENT_TYPE, StatusCode}, +}; +use serde_json::json; +use tower::ServiceExt; + +use crate::common::{ + mock_worker::{HealthStatus, MockWorkerConfig, WorkerType}, + AppTestContext, +}; + +#[tokio::test] +async fn test_v1_messages_proxy_success() { + let ctx = AppTestContext::new(vec![MockWorkerConfig { + port: 18301, + worker_type: WorkerType::Regular, + health_status: HealthStatus::Healthy, + response_delay_ms: 0, + fail_rate: 0.0, + }]) + .await; + + let app = ctx.create_app(); + + let payload = json!({ + "model": "mock-model", + "max_tokens": 64, + "messages": [ + {"role": "user", "content": "Hello, Claude!"} + ] + }); + + let req = Request::builder() + .method("POST") + .uri("/v1/messages") + .header(CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&payload).unwrap())) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let body_json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!(body_json["type"], "message"); + assert_eq!(body_json["role"], "assistant"); + assert_eq!(body_json["model"], "mock-model"); + assert_eq!(body_json["stop_reason"], "end_turn"); + let content = body_json["content"].as_array().expect("content array"); + assert_eq!(content.len(), 1); + assert_eq!(content[0]["type"], "text"); + assert!(body_json["usage"]["input_tokens"].is_number()); + + ctx.shutdown().await; +} + +#[tokio::test] +async fn test_v1_messages_proxy_streaming() { + let ctx = AppTestContext::new(vec![MockWorkerConfig { + port: 18302, + worker_type: WorkerType::Regular, + health_status: HealthStatus::Healthy, + response_delay_ms: 0, + fail_rate: 0.0, + }]) + .await; + + let app = ctx.create_app(); + + let payload = json!({ + "model": "mock-model", + "max_tokens": 64, + "stream": true, + "messages": [ + {"role": "user", "content": "Stream me a haiku"} + ] + }); + + let req = Request::builder() + .method("POST") + .uri("/v1/messages") + .header(CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&payload).unwrap())) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let content_type = resp + .headers() + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + assert!( + content_type.contains("text/event-stream"), + "expected SSE content-type, got {content_type:?}" + ); + + let body = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let text = std::str::from_utf8(&body).expect("utf8"); + + // Wire format: `event: \ndata: \n\n` + let event_types: Vec<&str> = text + .lines() + .filter_map(|l| l.strip_prefix("event: ")) + .collect(); + + assert_eq!(event_types.first().copied(), Some("message_start")); + assert_eq!(event_types.last().copied(), Some("message_stop")); + assert!(event_types.contains(&"content_block_delta")); + + ctx.shutdown().await; +} + +#[tokio::test] +async fn test_v1_messages_proxy_propagates_upstream_error() { + let ctx = AppTestContext::new(vec![MockWorkerConfig { + port: 18303, + worker_type: WorkerType::Regular, + health_status: HealthStatus::Healthy, + response_delay_ms: 0, + fail_rate: 1.0, // always fail + }]) + .await; + + let app = ctx.create_app(); + + let payload = json!({ + "model": "mock-model", + "max_tokens": 16, + "messages": [{"role": "user", "content": "fail please"}] + }); + + let req = Request::builder() + .method("POST") + .uri("/v1/messages") + .header(CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&payload).unwrap())) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + + ctx.shutdown().await; +} diff --git a/model_gateway/tests/api/mod.rs b/model_gateway/tests/api/mod.rs index 48df6d72a..71997e12d 100644 --- a/model_gateway/tests/api/mod.rs +++ b/model_gateway/tests/api/mod.rs @@ -1,6 +1,7 @@ //! API endpoint integration tests mod api_endpoints_test; +mod messages_api_test; mod parser_endpoints_test; mod request_formats_test; mod responses_api_test; diff --git a/model_gateway/tests/common/mock_worker.rs b/model_gateway/tests/common/mock_worker.rs index 44456ffc8..d130ead53 100755 --- a/model_gateway/tests/common/mock_worker.rs +++ b/model_gateway/tests/common/mock_worker.rs @@ -93,6 +93,7 @@ impl MockWorker { .route("/get_model_info", get(model_info_handler)) .route("/generate", post(generate_handler)) .route("/v1/chat/completions", post(chat_completions_handler)) + .route("/v1/messages", post(messages_handler)) .route("/v1/completions", post(completions_handler)) .route("/v1/rerank", post(rerank_handler)) .route("/v1/responses", post(responses_handler)) @@ -501,6 +502,121 @@ async fn chat_completions_handler( } } +#[expect( + clippy::unwrap_used, + reason = "test helper - panicking on failure is intentional" +)] +async fn messages_handler( + State(config): State>>, + Json(payload): Json, +) -> Response { + let config = config.read().await; + + if should_fail(&config) { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "type": "error", + "error": { + "type": "api_error", + "message": "Random failure for testing" + } + })), + ) + .into_response(); + } + + if config.response_delay_ms > 0 { + tokio::time::sleep(tokio::time::Duration::from_millis(config.response_delay_ms)).await; + } + + let is_stream = payload + .get("stream") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + let model = payload + .get("model") + .and_then(|v| v.as_str()) + .unwrap_or("mock-model") + .to_string(); + let message_id = format!("msg_{}", Uuid::now_v7()); + + if is_stream { + let message_id_for_stream = message_id.clone(); + let model_for_stream = model.clone(); + let events = vec![ + ( + "message_start", + json!({ + "type": "message_start", + "message": { + "id": message_id_for_stream, + "type": "message", + "role": "assistant", + "content": [], + "model": model_for_stream, + "stop_reason": null, + "stop_sequence": null, + "usage": {"input_tokens": 10, "output_tokens": 0} + } + }), + ), + ( + "content_block_start", + json!({ + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""} + }), + ), + ( + "content_block_delta", + json!({ + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": "Mock streamed response."} + }), + ), + ( + "content_block_stop", + json!({"type": "content_block_stop", "index": 0}), + ), + ( + "message_delta", + json!({ + "type": "message_delta", + "delta": {"stop_reason": "end_turn", "stop_sequence": null}, + "usage": {"output_tokens": 5} + }), + ), + ("message_stop", json!({"type": "message_stop"})), + ]; + + let stream = stream::iter(events.into_iter().map(|(event, data)| { + Ok::<_, Infallible>(Event::default().event(event).data(data.to_string())) + })); + + Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response() + } else { + Json(json!({ + "id": message_id, + "type": "message", + "role": "assistant", + "content": [ + {"type": "text", "text": "This is a mock messages response."} + ], + "model": model, + "stop_reason": "end_turn", + "stop_sequence": null, + "usage": {"input_tokens": 10, "output_tokens": 5} + })) + .into_response() + } +} + #[expect( clippy::unwrap_used, reason = "test helper - panicking on failure is intentional" diff --git a/model_gateway/tests/messages_test.rs b/model_gateway/tests/messages_test.rs index 05eb56832..65921dfef 100644 --- a/model_gateway/tests/messages_test.rs +++ b/model_gateway/tests/messages_test.rs @@ -1,12 +1,14 @@ //! Basic tests for Messages API //! -//! These tests verify: -//! - /v1/messages endpoint exists -//! - Returns 501 Not Implemented (expected for PR #1) -//! - Request deserialization works -//! - No breaking changes to existing functionality - -use openai_protocol::messages::{CreateMessageRequest, InputContent, Role, SystemContent}; +//! These tests verify request deserialization and the `GenerationRequest` +//! impl (used for routing/streaming on the HTTP backend). End-to-end +//! routing through the HTTP proxy is exercised by +//! `tests/api/messages_api_test.rs`. + +use openai_protocol::{ + common::GenerationRequest, + messages::{CreateMessageRequest, InputContent, Role, SystemContent}, +}; use serde_json::json; #[test] @@ -137,6 +139,95 @@ fn test_create_message_request_with_temperature() { assert_eq!(request.temperature, Some(0.7)); } +#[test] +fn test_generation_request_impl_is_stream_and_model() { + let req: CreateMessageRequest = serde_json::from_value(json!({ + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 16, + "stream": true, + "messages": [{"role": "user", "content": "hi"}] + })) + .unwrap(); + + assert!(req.is_stream()); + assert_eq!(req.get_model(), Some("claude-sonnet-4-5-20250929")); + + let no_stream: CreateMessageRequest = serde_json::from_value(json!({ + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 16, + "messages": [{"role": "user", "content": "hi"}] + })) + .unwrap(); + assert!(!no_stream.is_stream()); +} + +#[test] +fn test_extract_text_for_routing_string_content() { + let req: CreateMessageRequest = serde_json::from_value(json!({ + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 16, + "system": "You are helpful.", + "messages": [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"}, + {"role": "user", "content": "How are you?"} + ] + })) + .unwrap(); + + let text = req.extract_text_for_routing(); + assert_eq!(text, "You are helpful. Hello Hi there How are you?"); +} + +#[test] +fn test_extract_text_for_routing_text_blocks() { + let req: CreateMessageRequest = serde_json::from_value(json!({ + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 16, + "system": [ + {"type": "text", "text": "Block system 1"}, + {"type": "text", "text": "Block system 2"} + ], + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "first chunk"}, + {"type": "image", "source": {"type": "url", "url": "https://example.com/x.png"}}, + {"type": "text", "text": "second chunk"} + ] + } + ] + })) + .unwrap(); + + let text = req.extract_text_for_routing(); + // Image block is skipped; text blocks are concatenated with single-space separators. + assert_eq!( + text, + "Block system 1 Block system 2 first chunk second chunk" + ); +} + +#[test] +fn test_extract_text_for_routing_empty_messages_with_no_text() { + let req: CreateMessageRequest = serde_json::from_value(json!({ + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 16, + "messages": [ + { + "role": "user", + "content": [ + {"type": "image", "source": {"type": "url", "url": "https://example.com/x.png"}} + ] + } + ] + })) + .unwrap(); + + assert_eq!(req.extract_text_for_routing(), ""); +} + #[test] fn test_create_message_request_with_thinking() { let json = json!({ From 72d525ce93f06d15f81eca3240f06a5f47174553 Mon Sep 17 00:00:00 2001 From: Eric Zhang Date: Fri, 22 May 2026 20:42:11 -0400 Subject: [PATCH 3/4] Update mock_worker.rs --- model_gateway/tests/common/mock_worker.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/model_gateway/tests/common/mock_worker.rs b/model_gateway/tests/common/mock_worker.rs index d130ead53..da3adeba2 100755 --- a/model_gateway/tests/common/mock_worker.rs +++ b/model_gateway/tests/common/mock_worker.rs @@ -1231,10 +1231,6 @@ async fn flush_cache_handler(State(config): State>> .into_response() } -#[expect( - clippy::unwrap_used, - reason = "test helper - panicking on failure is intentional" -)] async fn v1_models_handler(State(config): State>>) -> Response { let config = config.read().await; From cf1f2c3cd288d464666f088e9d56875325989465 Mon Sep 17 00:00:00 2001 From: zhyncs <46627482+zhyncs@users.noreply.github.com> Date: Sat, 23 May 2026 23:24:02 +0000 Subject: [PATCH 4/4] fix messages api ci failures Signed-off-by: zhyncs <46627482+zhyncs@users.noreply.github.com> --- model_gateway/tests/common/mock_worker.rs | 8 ++++---- model_gateway/tests/messages_test.rs | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/model_gateway/tests/common/mock_worker.rs b/model_gateway/tests/common/mock_worker.rs index da3adeba2..2c9c21efb 100755 --- a/model_gateway/tests/common/mock_worker.rs +++ b/model_gateway/tests/common/mock_worker.rs @@ -502,10 +502,6 @@ async fn chat_completions_handler( } } -#[expect( - clippy::unwrap_used, - reason = "test helper - panicking on failure is intentional" -)] async fn messages_handler( State(config): State>>, Json(payload): Json, @@ -1231,6 +1227,10 @@ async fn flush_cache_handler(State(config): State>> .into_response() } +#[expect( + clippy::unwrap_used, + reason = "test helper - panicking on failure is intentional" +)] async fn v1_models_handler(State(config): State>>) -> Response { let config = config.read().await; diff --git a/model_gateway/tests/messages_test.rs b/model_gateway/tests/messages_test.rs index 65921dfef..2ddef4df5 100644 --- a/model_gateway/tests/messages_test.rs +++ b/model_gateway/tests/messages_test.rs @@ -150,7 +150,10 @@ fn test_generation_request_impl_is_stream_and_model() { .unwrap(); assert!(req.is_stream()); - assert_eq!(req.get_model(), Some("claude-sonnet-4-5-20250929")); + assert_eq!( + GenerationRequest::get_model(&req), + Some("claude-sonnet-4-5-20250929") + ); let no_stream: CreateMessageRequest = serde_json::from_value(json!({ "model": "claude-sonnet-4-5-20250929",