diff --git a/examples/front_ends/responses_api_endpoint/README.md b/examples/front_ends/responses_api_endpoint/README.md new file mode 100644 index 0000000000..25f3d60dae --- /dev/null +++ b/examples/front_ends/responses_api_endpoint/README.md @@ -0,0 +1,208 @@ + + +# OpenAI Responses API Endpoint + +**Complexity:** 🟢 Beginner + +This example demonstrates how to configure a NeMo Agent toolkit FastAPI frontend to accept requests in the [OpenAI Responses API format](https://platform.openai.com/docs/api-reference/responses). + +## Overview + +The OpenAI Responses API uses a different request format than the Chat Completions API: + +| Feature | Chat Completions API | Responses API | +|---------|---------------------|---------------| +| Input field | `messages` (array) | `input` (string or array) | +| System prompt | In messages array | `instructions` field | +| Response object | `chat.completion` | `response` | +| Streaming events | `chat.completion.chunk` | `response.created`, `response.output_text.delta`, etc. | + +This example configures the `/v1/responses` endpoint to accept the Responses API format while the standard `/generate` and `/chat` endpoints continue using Chat Completions format. + +> **⚠️ Important**: The Responses API format is provided for pass-through compatibility with managed services that support stateful backends (such as OpenAI and Azure OpenAI). NeMo Agent toolkit workflows do not inherently support stateful backends. Features like `previous_response_id` will be accepted but ignored. + +## Prerequisites + +1. **Install LangChain integration** (required for `tool_calling_agent` workflow): + +```bash +uv pip install -e '.[langchain]' +``` + +2. **Set up the NVIDIA API key**: + +```bash +export NVIDIA_API_KEY= +``` + +## Start the Server + +```bash +nat serve --config_file examples/front_ends/responses_api_endpoint/configs/config.yml +``` + +The server will start on port 8088 with the following endpoints: + +| Endpoint | Format | Description | +|----------|--------|-------------| +| `/generate` | NAT default | Standard workflow endpoint | +| `/chat` | Chat Completions | OpenAI Chat Completions format | +| `/chat/stream` | Chat Completions | Streaming Chat Completions | +| `/v1/responses` | Responses API | OpenAI Responses API format | + +## Test with curl + +### Responses API Format (Non-Streaming) + +```bash +curl -X POST http://localhost:8088/v1/responses \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "gpt-4o-mini", + "input": "What time is it?" + }' +``` + +**Expected Response:** + +```json +{ + "id": "resp_abc123...", + "object": "response", + "status": "completed", + "model": "gpt-4o-mini", + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": "The current time is..." + } + ] + } + ] +} +``` + +### Responses API Format (Streaming) + +```bash +curl -X POST http://localhost:8088/v1/responses \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "gpt-4o-mini", + "input": "What time is it?", + "stream": true + }' +``` + +**Expected SSE Events:** + +``` +event: response.created +data: {"type": "response.created", "response": {"id": "resp_...", "status": "in_progress"}} + +event: response.output_item.added +data: {"type": "response.output_item.added", ...} + +event: response.output_text.delta +data: {"type": "response.output_text.delta", "delta": "The current"} + +event: response.output_text.delta +data: {"type": "response.output_text.delta", "delta": " time is..."} + +event: response.done +data: {"type": "response.done", "response": {"status": "completed", ...}} +``` + +### With System Instructions + +```bash +curl -X POST http://localhost:8088/v1/responses \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "gpt-4o-mini", + "input": "What time is it?", + "instructions": "You are a helpful assistant. Always be concise." + }' +``` + +### With Tools + +```bash +curl -X POST http://localhost:8088/v1/responses \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "gpt-4o-mini", + "input": "What time is it?", + "tools": [ + { + "type": "function", + "name": "current_datetime", + "description": "Get the current date and time" + } + ] + }' +``` + +### Chat Completions Format (Still Works) + +The `/chat` endpoint continues to use the Chat Completions format: + +```bash +curl -X POST http://localhost:8088/chat \ + -H 'Content-Type: application/json' \ + -d '{ + "messages": [{"role": "user", "content": "What time is it?"}] + }' +``` + +## Configuration Options + +### Using Explicit Format Override + +If you want to use the Responses API format on a custom path (not containing "responses"), use the explicit `openai_api_v1_format` setting: + +```yaml +general: + front_end: + _type: fastapi + workflow: + openai_api_v1_path: /v1/custom/endpoint + openai_api_v1_format: responses # Force Responses API format +``` + +Available format options: +- `auto` (default): Detects based on path pattern +- `chat_completions`: Force Chat Completions API format +- `responses`: Force Responses API format + +## Limitations + +- **No Stateful Backend**: `previous_response_id` is accepted but ignored +- **No Built-in Tools**: OpenAI built-in tools like `code_interpreter` are not executed by NAT; use the `responses_api_agent` workflow type for that functionality +- **Tool Format Conversion**: Responses API tool definitions are converted to Chat Completions format internally + +## Related Examples + +- [Tool Calling Agent with Responses API](../../agents/tool_calling/README.md#using-tool-calling-with-the-openai-responses-api) - For using OpenAI's Responses API directly with built-in tools +- [Simple Auth](../simple_auth/README.md) - Authentication example +- [Custom Routes](../simple_calculator_custom_routes/README.md) - Custom endpoint routes + diff --git a/examples/front_ends/responses_api_endpoint/configs/config.yml b/examples/front_ends/responses_api_endpoint/configs/config.yml new file mode 100644 index 0000000000..074e20b84f --- /dev/null +++ b/examples/front_ends/responses_api_endpoint/configs/config.yml @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Example configuration for testing the OpenAI Responses API endpoint format. +# +# This config demonstrates using the FastAPI frontend's /v1/responses endpoint +# which accepts the OpenAI Responses API format (with 'input' field) and routes +# requests through a standard tool_calling_agent workflow. +# +# NOTE: The Responses API format is provided for pass-through compatibility. +# NAT agents do not inherently support stateful backends; features like +# 'previous_response_id' will be ignored. +# +# Prerequisites: +# uv pip install -e '.[langchain]' # Required for tool_calling_agent +# export NVIDIA_API_KEY= +# +# Usage: +# nat serve --config_file examples/front_ends/responses_api_endpoint/configs/config.yml +# +# Test with curl (Responses API format): +# curl -X POST http://localhost:8088/v1/responses \ +# -H 'Content-Type: application/json' \ +# -d '{"model": "gpt-4o-mini", "input": "What time is it?"}' +# +# Test with curl (Streaming): +# curl -X POST http://localhost:8088/v1/responses \ +# -H 'Content-Type: application/json' \ +# -d '{"model": "gpt-4o-mini", "input": "What time is it?", "stream": true}' +# +# The /generate and /chat endpoints still use the Chat Completions format: +# curl -X POST http://localhost:8088/generate \ +# -H 'Content-Type: application/json' \ +# -d '{"messages": [{"role": "user", "content": "What time is it?"}]}' + +general: + use_uvloop: true + front_end: + _type: fastapi + port: 8088 + workflow: + method: POST + path: /generate + openai_api_path: /chat + openai_api_v1_path: /v1/responses + description: "Test OpenAI Responses API endpoint format" + +llms: + nim_llm: + _type: nim + model_name: meta/llama-3.1-70b-instruct + temperature: 0.0 + max_tokens: 250 + +functions: + current_datetime: + _type: current_datetime + +workflow: + _type: tool_calling_agent + llm_name: nim_llm + tool_names: [current_datetime] + verbose: true + handle_tool_errors: true + diff --git a/src/nat/data_models/api_server.py b/src/nat/data_models/api_server.py index ad543b93c2..e159ea7b65 100644 --- a/src/nat/data_models/api_server.py +++ b/src/nat/data_models/api_server.py @@ -15,6 +15,7 @@ import abc import datetime +import secrets import typing import uuid from abc import abstractmethod @@ -74,6 +75,8 @@ class ChatContentType(str, Enum): TEXT = "text" IMAGE_URL = "image_url" INPUT_AUDIO = "input_audio" + # Azure AI Foundry / Responses API use "input_text" for user text content + INPUT_TEXT = "input_text" class InputAudio(BaseModel): @@ -107,6 +110,16 @@ class TextContent(BaseModel): text: str = "default" +class InputTextContent(BaseModel): + """ + Same as TextContent but with type "input_text" for Azure AI Foundry / Responses API compatibility. + """ + model_config = ConfigDict(extra="forbid") + + type: typing.Literal[ChatContentType.INPUT_TEXT] = ChatContentType.INPUT_TEXT + text: str = "default" + + class Security(BaseModel): model_config = ConfigDict(extra="forbid") @@ -114,13 +127,25 @@ class Security(BaseModel): token: SerializableSecretStr = Field(default="default") -UserContent = typing.Annotated[TextContent | ImageContent | AudioContent, Discriminator("type")] +UserContent = typing.Annotated[ + TextContent | InputTextContent | ImageContent | AudioContent, Discriminator("type") +] class Message(BaseModel): content: str | list[UserContent] role: UserMessageContentRoleType + @field_serializer("content") + def _serialize_content_for_openai(self, value: str | list[UserContent]) -> str | list[UserContent]: + """Serialize content so OpenAI-compatible APIs receive type 'text' instead of 'input_text'.""" + if isinstance(value, str): + return value + return [ + TextContent(text=item.text) if isinstance(item, InputTextContent) else item + for item in value + ] + class ChatRequest(BaseModel): """ @@ -865,6 +890,240 @@ def _string_to_nat_chat_response_chunk(data: str) -> ChatResponseChunk: GlobalTypeConverter.register_converter(_string_to_nat_chat_response_chunk) + +# ======== OpenAI Responses API Models ======== + +class ResponsesInputItem(BaseModel): + """Input item for Responses API (message format).""" + + type: typing.Literal["message"] = "message" + role: UserMessageContentRoleType = UserMessageContentRoleType.USER + content: str | list[dict[str, typing.Any]] = Field(description="Content of the message") + + +class ResponsesRequest(BaseModel): + """ + ResponsesRequest represents a request to the OpenAI Responses API. + https://platform.openai.com/docs/api-reference/responses + + WARNING: This model is provided for pass-through compatibility with managed services + that support stateful backends (e.g., OpenAI, Azure OpenAI). NAT agents do not + inherently support stateful backends. Stateful features such as 'previous_response_id' + will be ignored when processed by NAT workflows. The request will be converted to + the internal ChatRequest format for execution. + """ + + model_config = ConfigDict(extra="allow", + json_schema_extra={ + "example": { + "model": "gpt-4o-mini", + "input": "What is the weather today?", + "tools": [{ + "type": "function", + "name": "get_weather", + "description": "Get current weather" + }] + } + }) + + # Optional model field + model: str | None = Field(default=None, description="ID of the model to use") + + # Input can be a string or list of input items + input: str | list[dict[str, typing.Any]] = Field( + description="Text input to the model, or a list of input items (messages)") + + # Optional fields + instructions: str | None = Field(default=None, description="System instructions for the model") + tools: list[dict[str, typing.Any]] | None = Field(default=None, description="Tools available to the model") + tool_choice: str | dict[str, typing.Any] | None = Field(default=None, description="Tool choice control") + stream: bool | None = Field(default=False, description="Whether to stream the response") + temperature: float | None = Field(default=None, description="Sampling temperature between 0 and 2") + max_output_tokens: int | None = Field(default=None, description="Maximum tokens in the response") + top_p: float | None = Field(default=None, description="Nucleus sampling parameter") + previous_response_id: str | None = Field(default=None, + description="ID of previous response for multi-turn conversations") + store: bool | None = Field(default=None, description="Whether to store the response") + metadata: dict[str, str] | None = Field(default=None, description="Metadata for the request") + + def to_chat_request(self) -> "ChatRequest": + """Convert Responses API request to internal ChatRequest format for processing.""" + messages: list[Message] = [] + + # Add system instruction if provided + if self.instructions: + messages.append(Message(role=UserMessageContentRoleType.SYSTEM, content=self.instructions)) + + # Convert input to messages format + if isinstance(self.input, str): + messages.append(Message(role=UserMessageContentRoleType.USER, content=self.input)) + else: + # input is a list of input items (message-like dicts) + for item in self.input: + if isinstance(item, dict): + role_str = item.get("role", "user") + try: + role = UserMessageContentRoleType(role_str) + except ValueError: + role = UserMessageContentRoleType.USER + content = item.get("content", "") + messages.append(Message(role=role, content=content)) + + # Convert tools if needed (Responses API tools have slightly different structure) + converted_tools = self._convert_tools() if self.tools else None + + return ChatRequest( + messages=messages, + model=self.model, + tools=converted_tools, + tool_choice=self.tool_choice, + stream=self.stream, + temperature=self.temperature, + max_tokens=self.max_output_tokens, + top_p=self.top_p, + ) + + def _convert_tools(self) -> list[dict[str, typing.Any]] | None: + """ + Convert Responses API tools format to Chat Completions format. + + Responses API format (flat): + {"type": "function", "name": "get_weather", "description": "...", "parameters": {...}} + + Chat Completions format (nested): + {"type": "function", "function": {"name": "get_weather", "description": "...", "parameters": {...}}} + + Unsupported tool types (code_interpreter, file_search, web_search_preview) are logged + with warnings and excluded from conversion since NAT workflows do not support them. + """ + import logging as _logging + + _logger = _logging.getLogger(__name__) + + if not self.tools: + return None + + # Tool types that are built-in to OpenAI Responses API but not supported by NAT + unsupported_tool_types = {"code_interpreter", "file_search", "web_search_preview", "computer_use_preview"} + + converted = [] + for tool in self.tools: + if not isinstance(tool, dict): + converted.append(tool) + continue + + tool_type = tool.get("type", "function") + + if tool_type in unsupported_tool_types: + _logger.warning( + "Tool type '%s' is not supported by NAT workflows and will be ignored. " + "Only 'function' type tools are supported.", tool_type) + continue + + if tool_type == "function": + # Check if already in Chat Completions nested format + if "function" in tool and isinstance(tool["function"], dict): + # Already in Chat Completions format - pass through + converted.append(tool) + else: + # Responses API flat format -> Chat Completions nested format + function_def = { + "name": tool.get("name", ""), + "description": tool.get("description", ""), + "parameters": tool.get("parameters", {}), + } + # Preserve 'strict' mode if present + if "strict" in tool: + function_def["strict"] = tool["strict"] + + converted.append({ + "type": "function", + "function": function_def, + }) + else: + # Unknown type - pass through with warning + _logger.warning("Unknown tool type '%s' encountered, passing through as-is.", tool_type) + converted.append(tool) + + return converted if converted else None + + +class ResponsesOutputContent(BaseModel): + """Content item in Responses API output.""" + + type: typing.Literal["output_text"] = "output_text" + text: str = "" + annotations: list[dict[str, typing.Any]] = Field(default_factory=list) + + +class ResponsesOutputItem(BaseModel): + """Output item in Responses API format.""" + + type: typing.Literal["message"] = "message" + id: str = Field(default_factory=lambda: f"msg_{secrets.token_hex(30)}"[:64]) + status: typing.Literal["in_progress", "completed", "incomplete"] = "completed" + role: typing.Literal["assistant"] = "assistant" + content: list[ResponsesOutputContent] = Field(default_factory=list) + + +class ResponsesUsage(BaseModel): + """Usage information for Responses API.""" + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + + +class ResponsesAPIResponse(ResponseBaseModelOutput): + """ + Response format for OpenAI Responses API. + https://platform.openai.com/docs/api-reference/responses + """ + + id: str = Field(default_factory=lambda: f"rsp_{secrets.token_hex(30)}"[:64]) + object: typing.Literal["response"] = "response" + created_at: int = Field( + default_factory=lambda: int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())) + status: typing.Literal["in_progress", "completed", "incomplete", "failed"] = "completed" + model: str = "unknown-model" + output: list[ResponsesOutputItem] = Field(default_factory=list) + usage: ResponsesUsage = Field(default_factory=ResponsesUsage) + metadata: dict[str, str] = Field(default_factory=dict) + + @staticmethod + def from_chat_response(chat_response: "ChatResponse", model: str | None = None) -> "ResponsesAPIResponse": + """Convert ChatResponse to Responses API format.""" + content_items: list[ResponsesOutputContent] = [] + + if chat_response.choices: + for choice in chat_response.choices: + if choice.message and choice.message.content: + content_items.append(ResponsesOutputContent(type="output_text", text=choice.message.content)) + + output_items = [ResponsesOutputItem(content=content_items)] if content_items else [] + + # Convert usage if available + usage = ResponsesUsage( + input_tokens=chat_response.usage.prompt_tokens or 0 if chat_response.usage else 0, + output_tokens=chat_response.usage.completion_tokens or 0 if chat_response.usage else 0, + total_tokens=chat_response.usage.total_tokens or 0 if chat_response.usage else 0, + ) + + return ResponsesAPIResponse( + model=model or chat_response.model, + output=output_items, + usage=usage, + ) + + @staticmethod + def from_string(data: str, *, model: str = "unknown-model") -> "ResponsesAPIResponse": + """Create a ResponsesAPIResponse from a simple string output.""" + return ResponsesAPIResponse( + model=model, + output=[ResponsesOutputItem(content=[ResponsesOutputContent(type="output_text", text=data)])], + ) + + # Compatibility aliases with previous releases AIQChatRequest = ChatRequest AIQChoiceMessage = ChoiceMessage diff --git a/src/nat/front_ends/fastapi/fastapi_front_end_config.py b/src/nat/front_ends/fastapi/fastapi_front_end_config.py index 686c5cb084..98323baf82 100644 --- a/src/nat/front_ends/fastapi/fastapi_front_end_config.py +++ b/src/nat/front_ends/fastapi/fastapi_front_end_config.py @@ -172,10 +172,24 @@ class EndpointBase(BaseModel): ) openai_api_v1_path: str | None = Field( default=None, - description=("Path for the OpenAI v1 Chat Completions API compatible endpoint. " - "If provided, creates a single endpoint that handles both streaming and " - "non-streaming requests based on the 'stream' parameter, following the " - "OpenAI Chat Completions API specification exactly."), + description=("Path for the OpenAI v1 API compatible endpoint. " + "Creates a single endpoint that handles both streaming and non-streaming " + "requests based on the 'stream' parameter. " + "Use 'openai_api_v1_format' to specify the API format, or leave it as 'auto' " + "for automatic detection based on path patterns."), + ) + openai_api_v1_format: typing.Literal["auto", "chat_completions", "responses"] = Field( + default="auto", + description=( + "API format for the openai_api_v1_path endpoint. " + "'auto': Detects based on path pattern - paths ending with '/responses' use Responses API, " + "otherwise Chat Completions API is used. " + "'chat_completions': Uses the Chat Completions API format (with 'messages' field). " + "'responses': Uses the Responses API format (with 'input' field). " + "WARNING: The Responses API format is provided for pass-through compatibility " + "with managed services that support stateful backends. NAT agents do not " + "inherently support stateful backends; features like 'previous_response_id' " + "will be ignored."), ) class Endpoint(EndpointBase): @@ -259,6 +273,7 @@ class CrossOriginResourceSharing(BaseModel): websocket_path="/websocket", openai_api_path="/chat", openai_api_v1_path="/v1/chat/completions", + openai_api_v1_format="auto", description="Executes the default NAT workflow from the loaded configuration ", ) diff --git a/src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py b/src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py index 43c0d55753..a328c86f14 100644 --- a/src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py +++ b/src/nat/front_ends/fastapi/fastapi_front_end_plugin_worker.py @@ -49,6 +49,8 @@ from nat.data_models.api_server import ChatResponse from nat.data_models.api_server import ChatResponseChunk from nat.data_models.api_server import ResponseIntermediateStep +from nat.data_models.api_server import ResponsesAPIResponse +from nat.data_models.api_server import ResponsesRequest from nat.data_models.config import Config from nat.data_models.object_store import KeyAlreadyExistsError from nat.data_models.object_store import NoSuchKeyError @@ -69,6 +71,7 @@ from nat.front_ends.fastapi.fastapi_front_end_config import EvaluateStatusResponse from nat.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig from nat.front_ends.fastapi.message_handler import WebSocketMessageHandler +from nat.front_ends.fastapi.response_helpers import generate_responses_api_streaming from nat.front_ends.fastapi.response_helpers import generate_single_response from nat.front_ends.fastapi.response_helpers import generate_streaming_response_as_str from nat.front_ends.fastapi.response_helpers import generate_streaming_response_full_as_str @@ -315,6 +318,45 @@ def get_step_adaptor(self) -> StepAdaptor: return StepAdaptor(self.front_end_config.step_adaptor) + def _determine_api_format( + self, + endpoint: FastApiFrontEndConfig.EndpointBase, + openai_v1_path: str, + ) -> str: + """ + Determine the API format to use based on configuration and path pattern. + + Args: + endpoint: The endpoint configuration + openai_v1_path: The path for the OpenAI v1 API endpoint + + Returns: + "chat_completions" or "responses" + """ + import re + + # Check for explicit format configuration + explicit_format = getattr(endpoint, 'openai_api_v1_format', 'auto') + + if explicit_format != "auto": + logger.debug("Using explicit API format '%s' for path '%s'", explicit_format, openai_v1_path) + return explicit_format + + # Auto-detect based on path patterns using strict matching + path_lower = openai_v1_path.lower().rstrip('/') + + # Strict patterns that indicate Responses API: + # - Exact match: /responses, /v1/responses, /api/v1/responses + # - Does NOT match: /v1/user_responses, /v1/responses/history + responses_pattern = re.compile(r'^(/api)?(/v\d+)?/responses$') + + if responses_pattern.match(path_lower): + logger.debug("Auto-detected Responses API format for path '%s'", openai_v1_path) + return "responses" + + logger.debug("Auto-detected Chat Completions API format for path '%s'", openai_v1_path) + return "chat_completions" + async def configure(self, app: FastAPI, builder: WorkflowBuilder): # Do things like setting the base URL and global configuration options @@ -861,7 +903,7 @@ async def post_stream(payload: request_type, filter_steps: str | None = None): def post_openai_api_compatible_endpoint(request_type: type): """ - OpenAI-compatible endpoint that handles both streaming and non-streaming + OpenAI Chat Completions API compatible endpoint that handles both streaming and non-streaming based on the 'stream' parameter in the request. """ @@ -890,6 +932,55 @@ async def post_openai_api_compatible(response: Response, request: Request, paylo return post_openai_api_compatible + def post_responses_api_compatible_endpoint(request_type: type): + """ + OpenAI Responses API compatible endpoint that handles both streaming and non-streaming + based on the 'stream' parameter in the request. + + This endpoint accepts the Responses API format (with 'input' field) and converts it + to the internal ChatRequest format for processing, then returns the response in + Responses API format. + + WARNING: This endpoint is provided for pass-through compatibility with managed services + that support stateful backends (e.g., OpenAI, Azure OpenAI). NAT agents do not + inherently support stateful backends. Stateful features such as 'previous_response_id' + will be ignored. The request is converted to the internal ChatRequest format, executed + by the workflow, and then the response is converted back to Responses API format. + """ + from nat.data_models.api_server import ResponsesAPIResponse + + async def post_responses_api_compatible(response: Response, request: Request, payload: request_type): + stream_requested = getattr(payload, 'stream', False) + + # Convert Responses API request to internal ChatRequest format + chat_request = payload.to_chat_request() + model_name = getattr(payload, 'model', 'unknown-model') + + async with session_manager.session(http_connection=request) as session: + if stream_requested: + # Return streaming response in Responses API SSE format + return StreamingResponse( + headers={ + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + content=generate_responses_api_streaming( + chat_request, + session=session, + model=model_name, + step_adaptor=self.get_step_adaptor(), + )) + + # Non-streaming: get result and convert to Responses API format + response.headers["Content-Type"] = "application/json" + chat_result = await generate_single_response(chat_request, session, result_type=ChatResponse) + responses_result = ResponsesAPIResponse.from_chat_response(chat_result, model=model_name) + add_context_headers_to_response(response) + return responses_result + + return post_responses_api_compatible + def _job_status_to_response(job: "JobInfo") -> AsyncGenerationStatusResponse: job_output = job.output if job_output is not None: @@ -1165,15 +1256,34 @@ async def websocket_endpoint(websocket: WebSocket): # Create OpenAI v1 compatible endpoint if configured if openai_v1_path: - # OpenAI v1 Compatible Mode: Create single endpoint that handles both streaming and non-streaming - app.add_api_route( - path=openai_v1_path, - endpoint=post_openai_api_compatible_endpoint(request_type=ChatRequest), - methods=[endpoint.method], - response_model=ChatResponse | ChatResponseChunk, - description=f"{endpoint.description} (OpenAI Chat Completions API compatible)", - responses={500: response_500}, - ) + # Determine API format based on explicit config or path pattern detection + api_format = self._determine_api_format(endpoint, openai_v1_path) + + if api_format == "responses": + # OpenAI Responses API Mode: Uses 'input' field instead of 'messages' + logger.warning( + "Creating Responses API endpoint at '%s'. NOTE: The Responses API format " + "is provided for pass-through compatibility with managed services. NAT agents " + "do not support stateful backends; features like 'previous_response_id' will " + "be ignored.", openai_v1_path) + app.add_api_route( + path=openai_v1_path, + endpoint=post_responses_api_compatible_endpoint(request_type=ResponsesRequest), + methods=[endpoint.method], + response_model=ResponsesAPIResponse, + description=f"{endpoint.description} (OpenAI Responses API compatible)", + responses={500: response_500}, + ) + else: + # OpenAI Chat Completions API Mode (default): Uses 'messages' field + app.add_api_route( + path=openai_v1_path, + endpoint=post_openai_api_compatible_endpoint(request_type=ChatRequest), + methods=[endpoint.method], + response_model=ChatResponse | ChatResponseChunk, + description=f"{endpoint.description} (OpenAI Chat Completions API compatible)", + responses={500: response_500}, + ) else: raise ValueError(f"Unsupported method {endpoint.method}") diff --git a/src/nat/front_ends/fastapi/message_handler.py b/src/nat/front_ends/fastapi/message_handler.py index 607f2b1dbc..59809dbd66 100644 --- a/src/nat/front_ends/fastapi/message_handler.py +++ b/src/nat/front_ends/fastapi/message_handler.py @@ -30,6 +30,7 @@ from nat.data_models.api_server import ChatResponseChunk from nat.data_models.api_server import Error from nat.data_models.api_server import ErrorTypes +from nat.data_models.api_server import InputTextContent from nat.data_models.api_server import ResponseObservabilityTrace from nat.data_models.api_server import ResponsePayloadOutput from nat.data_models.api_server import ResponseSerializable @@ -145,6 +146,8 @@ def _extract_last_user_message_content(self, messages: list[UserMessages]) -> Te for attachment in user_message.content: if isinstance(attachment, TextContent): return attachment + if isinstance(attachment, InputTextContent): + return TextContent(text=attachment.text) raise ValueError("No user text content found in messages.") async def _process_websocket_user_interaction_response_message( diff --git a/src/nat/front_ends/fastapi/response_helpers.py b/src/nat/front_ends/fastapi/response_helpers.py index 51a4ab9dba..e0be17eae8 100644 --- a/src/nat/front_ends/fastapi/response_helpers.py +++ b/src/nat/front_ends/fastapi/response_helpers.py @@ -194,3 +194,203 @@ async def generate_streaming_response_full_as_str(payload: typing.Any, else: raise ValueError("Unexpected item type in stream. Expected ChatResponseSerializable, got: " + str(type(item))) + + +async def generate_responses_api_streaming( + payload: typing.Any, + *, + session: Session, + model: str, + step_adaptor: StepAdaptor = StepAdaptor(StepAdaptorConfig()), +) -> AsyncGenerator[str, None]: + """ + Generate streaming response in OpenAI Responses API format. + Converts internal streaming chunks to Responses API event stream format. + + The Responses API uses Server-Sent Events with specific event types: + - response.created: Initial response creation + - response.output_item.added: New output item started + - response.content_part.added: New content part started + - response.output_text.delta: Text content delta + - response.output_text.done: Text content completed + - response.content_part.done: Content part completed + - response.output_item.done: Output item completed + - response.done: Full response completed + - response.failed: Emitted when an error occurs during processing + + WARNING: This streaming format is provided for pass-through compatibility with managed + services that support stateful backends. NAT agents do not inherently support stateful + backends. The payload is processed through the NAT workflow and output is formatted + to match the Responses API streaming specification. + """ + import json + import logging + import secrets + + from nat.data_models.api_server import ChatResponse + from nat.data_models.api_server import ChatResponseChunk + + _logger = logging.getLogger(__name__) + + response_id = f"rsp_{secrets.token_hex(30)}"[:64] + message_id = f"msg_{secrets.token_hex(30)}"[:64] + output_index = 0 + content_index = 0 + accumulated_text = "" + error_occurred = False + + def _sse_event(event_type: str, data: dict) -> str: + """Format a Server-Sent Event.""" + return f"event: {event_type}\ndata: {json.dumps(data)}\n\n" + + def _error_event(error_message: str, error_code: str = "server_error") -> str: + """Generate a response.failed SSE event.""" + return _sse_event("response.failed", { + "type": "response.failed", + "response": { + "id": response_id, + "object": "response", + "status": "failed", + "model": model, + "error": { + "type": error_code, + "message": error_message, + }, + "output": [], + } + }) + + try: + # Event: response.created + yield _sse_event("response.created", { + "type": "response.created", + "response": { + "id": response_id, + "object": "response", + "status": "in_progress", + "model": model, + "output": [], + } + }) + + # Event: response.output_item.added + yield _sse_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": output_index, + "item": { + "type": "message", + "id": message_id, + "status": "in_progress", + "role": "assistant", + "content": [], + } + }) + + # Event: response.content_part.added + yield _sse_event("response.content_part.added", { + "type": "response.content_part.added", + "output_index": output_index, + "content_index": content_index, + "part": { + "type": "output_text", + "text": "", + } + }) + + # Process the workflow and stream text deltas + try: + async with session.run(payload) as runner: + if session.workflow.has_streaming_output: + async for chunk in runner.result_stream(to_type=ChatResponseChunk): + # Extract text from ChatResponseChunk + if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: + delta_text = chunk.choices[0].delta.content + accumulated_text += delta_text + + # Event: response.output_text.delta + yield _sse_event("response.output_text.delta", { + "type": "response.output_text.delta", + "output_index": output_index, + "content_index": content_index, + "delta": delta_text, + }) + else: + # Non-streaming workflow - get full result and emit as single delta + result = await runner.result(to_type=ChatResponse) + if result.choices and result.choices[0].message: + accumulated_text = result.choices[0].message.content or "" + if accumulated_text: + yield _sse_event("response.output_text.delta", { + "type": "response.output_text.delta", + "output_index": output_index, + "content_index": content_index, + "delta": accumulated_text, + }) + except Exception as workflow_error: + error_occurred = True + _logger.exception("Error during Responses API streaming workflow execution") + yield _error_event(str(workflow_error), "workflow_error") + return # Stop streaming after error + + # Only emit completion events if no error occurred + if not error_occurred: + # Event: response.output_text.done + yield _sse_event("response.output_text.done", { + "type": "response.output_text.done", + "output_index": output_index, + "content_index": content_index, + "text": accumulated_text, + }) + + # Event: response.content_part.done + yield _sse_event("response.content_part.done", { + "type": "response.content_part.done", + "output_index": output_index, + "content_index": content_index, + "part": { + "type": "output_text", + "text": accumulated_text, + } + }) + + # Event: response.output_item.done + yield _sse_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": output_index, + "item": { + "type": "message", + "id": message_id, + "status": "completed", + "role": "assistant", + "content": [{ + "type": "output_text", + "text": accumulated_text, + }], + } + }) + + # Event: response.done + yield _sse_event("response.done", { + "type": "response.done", + "response": { + "id": response_id, + "object": "response", + "status": "completed", + "model": model, + "output": [{ + "type": "message", + "id": message_id, + "status": "completed", + "role": "assistant", + "content": [{ + "type": "output_text", + "text": accumulated_text, + }], + }], + } + }) + + except Exception as setup_error: + # Catch any errors during initial event emission (before workflow starts) + _logger.exception("Error during Responses API streaming setup") + yield _error_event(str(setup_error), "server_error") diff --git a/tests/nat/front_ends/fastapi/test_openai_compatibility.py b/tests/nat/front_ends/fastapi/test_openai_compatibility.py index f4eaa9bb5f..3fc4a56ac5 100644 --- a/tests/nat/front_ends/fastapi/test_openai_compatibility.py +++ b/tests/nat/front_ends/fastapi/test_openai_compatibility.py @@ -568,3 +568,577 @@ async def test_openai_compatible_streaming_response_format(): # At least one non-null finish_reason should appear across the stream (finalization) assert valid_final_reason_seen, "Expected a final chunk with non-null finish_reason" + + +# ==================== OpenAI Responses API Tests ==================== +# +# NOTE: The Responses API format is provided for pass-through compatibility with managed +# services that support stateful backends (e.g., OpenAI, Azure OpenAI). NAT agents do not +# inherently support stateful backends. Features like 'previous_response_id' are accepted +# but ignored. Requests are converted to the internal ChatRequest format for execution. +# + + +def test_responses_request_model(): + """Test that ResponsesRequest model correctly parses Responses API format.""" + from nat.data_models.api_server import ResponsesRequest + + # Test with simple string input + request = ResponsesRequest(model="gpt-4o-mini", input="What is the weather?") + assert request.model == "gpt-4o-mini" + assert request.input == "What is the weather?" + assert request.stream is False + + # Test with message-style input + request = ResponsesRequest( + model="gpt-4o-mini", + input=[{ + "role": "user", "content": "Hello" + }], + instructions="You are a helpful assistant.", + tools=[{ + "type": "function", "name": "get_weather", "description": "Get weather" + }], + ) + assert isinstance(request.input, list) + assert request.instructions == "You are a helpful assistant." + assert request.tools is not None + + +def test_responses_request_to_chat_request_conversion(): + """Test that ResponsesRequest correctly converts to ChatRequest.""" + from nat.data_models.api_server import ResponsesRequest + + # Test simple string input conversion + request = ResponsesRequest(model="gpt-4o-mini", input="What is the weather?") + chat_request = request.to_chat_request() + + assert len(chat_request.messages) == 1 + assert chat_request.messages[0].role.value == "user" + assert chat_request.messages[0].content == "What is the weather?" + assert chat_request.model == "gpt-4o-mini" + + # Test with instructions (should be added as system message) + request = ResponsesRequest(model="gpt-4o-mini", input="Hello", instructions="Be helpful") + chat_request = request.to_chat_request() + + assert len(chat_request.messages) == 2 + assert chat_request.messages[0].role.value == "system" + assert chat_request.messages[0].content == "Be helpful" + assert chat_request.messages[1].role.value == "user" + assert chat_request.messages[1].content == "Hello" + + # Test tool conversion (flat to nested format) + request = ResponsesRequest( + model="gpt-4o-mini", + input="What time is it?", + tools=[{ + "type": "function", "name": "get_time", "description": "Get current time", "parameters": {} + }], + ) + chat_request = request.to_chat_request() + + assert chat_request.tools is not None + assert len(chat_request.tools) == 1 + assert chat_request.tools[0]["type"] == "function" + assert "function" in chat_request.tools[0] + assert chat_request.tools[0]["function"]["name"] == "get_time" + + +def test_responses_api_response_model(): + """Test that ResponsesAPIResponse model has correct structure.""" + from nat.data_models.api_server import ResponsesAPIResponse + from nat.data_models.api_server import ResponsesOutputContent + from nat.data_models.api_server import ResponsesOutputItem + + response = ResponsesAPIResponse( + model="gpt-4o-mini", + output=[ResponsesOutputItem(content=[ResponsesOutputContent(text="Hello, world!")])], + ) + + assert response.object == "response" + assert response.status == "completed" + assert response.model == "gpt-4o-mini" + assert len(response.output) == 1 + assert response.output[0].type == "message" + assert response.output[0].role == "assistant" + assert len(response.output[0].content) == 1 + assert response.output[0].content[0].text == "Hello, world!" + + +def test_responses_api_response_from_chat_response(): + """Test conversion from ChatResponse to ResponsesAPIResponse.""" + from nat.data_models.api_server import ResponsesAPIResponse + + chat_response = ChatResponse.from_string("Test response", usage=Usage(prompt_tokens=10, completion_tokens=5)) + responses_api_response = ResponsesAPIResponse.from_chat_response(chat_response, model="test-model") + + assert responses_api_response.object == "response" + assert responses_api_response.model == "test-model" + assert len(responses_api_response.output) == 1 + assert responses_api_response.output[0].content[0].text == "Test response" + + +async def test_responses_api_endpoint_non_streaming(): + """Test that /v1/responses endpoint accepts Responses API format and returns correct format.""" + from nat.data_models.api_server import ResponsesAPIResponse + + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/v1/responses" + front_end_config.workflow.openai_api_path = "/chat" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + # Test with Responses API format (using 'input' not 'messages') + response = await client.post( + "/v1/responses", + json={ + "model": "gpt-4o-mini", + "input": "Hello, world!", + "stream": False, + }, + ) + + assert response.status_code == 200 + data = response.json() + + # Validate Responses API response structure + assert data["object"] == "response" + assert data["status"] == "completed" + assert "output" in data + assert len(data["output"]) > 0 + assert data["output"][0]["type"] == "message" + assert data["output"][0]["role"] == "assistant" + assert len(data["output"][0]["content"]) > 0 + assert data["output"][0]["content"][0]["type"] == "output_text" + # Echo function should return the input + assert "Hello, world!" in data["output"][0]["content"][0]["text"] + + +async def test_responses_api_endpoint_with_instructions(): + """Test Responses API endpoint with system instructions.""" + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/v1/responses" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + response = await client.post( + "/v1/responses", + json={ + "model": "gpt-4o-mini", + "input": "Test message", + "instructions": "You are a helpful assistant.", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["object"] == "response" + + +async def test_responses_api_endpoint_streaming(): + """Test that /v1/responses endpoint supports streaming in Responses API SSE format.""" + import json + + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/v1/responses" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=StreamingEchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + events = [] + async with aconnect_sse( + client, + "POST", + "/v1/responses", + json={ + "model": "gpt-4o-mini", + "input": "Hello streaming!", + "stream": True, + }, + ) as event_source: + async for sse in event_source.aiter_sse(): + events.append({"event": sse.event, "data": json.loads(sse.data)}) + + assert event_source.response.status_code == 200 + assert len(events) > 0 + + # Verify Responses API SSE event types + event_types = [e["event"] for e in events] + + # Should have response lifecycle events + assert "response.created" in event_types + assert "response.output_item.added" in event_types + assert "response.content_part.added" in event_types + assert "response.output_text.done" in event_types + assert "response.done" in event_types + + # Check response.created event + created_event = next(e for e in events if e["event"] == "response.created") + assert created_event["data"]["type"] == "response.created" + assert created_event["data"]["response"]["status"] == "in_progress" + + # Check response.done event has completed response + done_event = next(e for e in events if e["event"] == "response.done") + assert done_event["data"]["type"] == "response.done" + assert done_event["data"]["response"]["status"] == "completed" + assert len(done_event["data"]["response"]["output"]) > 0 + + +async def test_responses_api_vs_chat_completions_path_detection(): + """Test that path detection correctly routes to appropriate API handler.""" + front_end_config = FastApiFrontEndConfig() + + # Test Chat Completions path + front_end_config.workflow.openai_api_v1_path = "/v1/chat/completions" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + # Should accept Chat Completions format (with 'messages') + response = await client.post( + "/v1/chat/completions", + json={ + "messages": [{ + "role": "user", "content": "Hello" + }], + "stream": False, + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["object"] == "chat.completion" # Chat Completions response format + + # Should reject Responses API format (with 'input' instead of 'messages') + response = await client.post( + "/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "input": "Hello", + }, + ) + assert response.status_code == 422 # Validation error - missing 'messages' + + +async def test_responses_api_endpoint_rejects_chat_completions_format(): + """Test that /v1/responses endpoint rejects Chat Completions format.""" + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/v1/responses" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + # Should reject Chat Completions format (with 'messages' instead of 'input') + response = await client.post( + "/v1/responses", + json={ + "messages": [{ + "role": "user", "content": "Hello" + }], + }, + ) + assert response.status_code == 422 # Validation error - missing 'input' and 'model' + + +# ============================================================================= +# Enhanced Tool Handling Tests +# ============================================================================= + + +def test_responses_request_tool_conversion_flat_to_nested(): + """Test that flat Responses API tool format converts to Chat Completions nested format.""" + from nat.data_models.api_server import ResponsesRequest + + request = ResponsesRequest( + model="gpt-4", + input="Hello", + tools=[{ + "type": "function", + "name": "get_weather", + "description": "Get weather for a city", + "parameters": { + "type": "object", + "properties": { + "city": {"type": "string"} + } + } + }]) + + chat_request = request.to_chat_request() + + assert chat_request.tools is not None + assert len(chat_request.tools) == 1 + assert chat_request.tools[0]["type"] == "function" + assert "function" in chat_request.tools[0] + assert chat_request.tools[0]["function"]["name"] == "get_weather" + assert chat_request.tools[0]["function"]["description"] == "Get weather for a city" + assert chat_request.tools[0]["function"]["parameters"]["properties"]["city"]["type"] == "string" + + +def test_responses_request_tool_conversion_preserves_strict_mode(): + """Test that 'strict' mode is preserved during tool conversion.""" + from nat.data_models.api_server import ResponsesRequest + + request = ResponsesRequest( + model="gpt-4", + input="Hello", + tools=[{ + "type": "function", + "name": "strict_func", + "description": "A strict function", + "parameters": {}, + "strict": True, + }]) + + chat_request = request.to_chat_request() + + assert chat_request.tools is not None + assert chat_request.tools[0]["function"]["strict"] is True + + +def test_responses_request_tool_conversion_already_nested(): + """Test that already-nested tools (Chat Completions format) pass through unchanged.""" + from nat.data_models.api_server import ResponsesRequest + + # Tool already in Chat Completions format (nested) + request = ResponsesRequest( + model="gpt-4", + input="Hello", + tools=[{ + "type": "function", + "function": { + "name": "nested_func", + "description": "Already nested", + "parameters": {} + } + }]) + + chat_request = request.to_chat_request() + + assert chat_request.tools is not None + assert len(chat_request.tools) == 1 + # Should pass through unchanged + assert chat_request.tools[0]["function"]["name"] == "nested_func" + + +def test_responses_request_filters_unsupported_tool_types(caplog): + """Test that unsupported tool types are filtered out with warnings.""" + import logging + from nat.data_models.api_server import ResponsesRequest + + request = ResponsesRequest( + model="gpt-4", + input="Hello", + tools=[ + {"type": "code_interpreter"}, + {"type": "file_search"}, + {"type": "web_search_preview"}, + { + "type": "function", + "name": "my_func", + "description": "test" + }, + ]) + + # Capture log warnings + with caplog.at_level(logging.WARNING): + chat_request = request.to_chat_request() + + # Verify warnings were logged for unsupported tool types + assert "code_interpreter" in caplog.text + assert "file_search" in caplog.text + assert "web_search_preview" in caplog.text + assert "not supported by NAT workflows" in caplog.text + + # Only the function tool should remain + assert chat_request.tools is not None + assert len(chat_request.tools) == 1 + assert chat_request.tools[0]["function"]["name"] == "my_func" + + +def test_responses_request_no_tools_returns_none(): + """Test that None tools returns None.""" + from nat.data_models.api_server import ResponsesRequest + + request = ResponsesRequest(model="gpt-4", input="Hello", tools=None) + + chat_request = request.to_chat_request() + + assert chat_request.tools is None + + +def test_responses_request_empty_tools_returns_none(): + """Test that empty tools list returns None after filtering.""" + from nat.data_models.api_server import ResponsesRequest + + # Only unsupported tools - should result in None after filtering + request = ResponsesRequest(model="gpt-4", input="Hello", tools=[{"type": "code_interpreter"}]) + + chat_request = request.to_chat_request() + + assert chat_request.tools is None + + +# ============================================================================= +# Path Detection Tests with Explicit Format Config +# ============================================================================= + + +def test_openai_api_v1_format_config_field(): + """Test that openai_api_v1_format field is properly added to config.""" + from nat.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig + + # Test default value (auto) + config = FastApiFrontEndConfig.EndpointBase(method="POST", description="test") + assert hasattr(config, 'openai_api_v1_format') + assert config.openai_api_v1_format == "auto" + + # Test explicit chat_completions + config = FastApiFrontEndConfig.EndpointBase(method="POST", + description="test", + openai_api_v1_format="chat_completions") + assert config.openai_api_v1_format == "chat_completions" + + # Test explicit responses + config = FastApiFrontEndConfig.EndpointBase(method="POST", description="test", openai_api_v1_format="responses") + assert config.openai_api_v1_format == "responses" + + +@pytest.mark.parametrize( + "path,expected_format", + [ + # Strict matches for Responses API + ("/v1/responses", "responses"), + ("/responses", "responses"), + ("/api/v1/responses", "responses"), + ("/api/v2/responses", "responses"), + # Trailing slash should also work + ("/v1/responses/", "responses"), + # Chat Completions paths + ("/v1/chat/completions", "chat_completions"), + ("/chat/completions", "chat_completions"), + ("/v1/completions", "chat_completions"), + # Paths that should NOT match Responses API (even if they contain "responses") + ("/v1/user_responses", "chat_completions"), + ("/v1/responses/history", "chat_completions"), + ("/v1/chat/responses", "chat_completions"), + ("/api/responses/list", "chat_completions"), + ], +) +def test_path_detection_strict_matching(path: str, expected_format: str): + """Test that path detection uses strict pattern matching.""" + from nat.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig + from nat.front_ends.fastapi.fastapi_front_end_plugin_worker import FastApiFrontEndPluginWorker + from nat.data_models.config import Config, GeneralConfig + + # Create a minimal config + front_end_config = FastApiFrontEndConfig() + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + worker = FastApiFrontEndPluginWorker(config) + + # Create endpoint with auto format + endpoint = FastApiFrontEndConfig.EndpointBase(method="POST", description="test", openai_api_v1_format="auto") + + result = worker._determine_api_format(endpoint, path) + assert result == expected_format, f"Path '{path}' should detect as '{expected_format}', got '{result}'" + + +def test_explicit_format_overrides_auto_detection(): + """Test that explicit format config overrides auto-detection.""" + from nat.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig + from nat.front_ends.fastapi.fastapi_front_end_plugin_worker import FastApiFrontEndPluginWorker + from nat.data_models.config import Config, GeneralConfig + + front_end_config = FastApiFrontEndConfig() + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + worker = FastApiFrontEndPluginWorker(config) + + # Path looks like responses, but explicit config says chat_completions + endpoint = FastApiFrontEndConfig.EndpointBase(method="POST", + description="test", + openai_api_v1_format="chat_completions") + + result = worker._determine_api_format(endpoint, "/v1/responses") + assert result == "chat_completions" + + # Path looks like chat completions, but explicit config says responses + endpoint = FastApiFrontEndConfig.EndpointBase(method="POST", + description="test", + openai_api_v1_format="responses") + + result = worker._determine_api_format(endpoint, "/v1/chat/completions") + assert result == "responses" + + +async def test_explicit_responses_format_on_custom_path(): + """Test that explicit 'responses' format works on a non-standard path.""" + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/custom/agent/endpoint" + front_end_config.workflow.openai_api_v1_format = "responses" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + # Should accept Responses API format + response = await client.post( + "/custom/agent/endpoint", + json={ + "model": "gpt-4", + "input": "Hello from custom path", + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["object"] == "response" + + +async def test_explicit_chat_completions_format_on_responses_like_path(): + """Test that explicit 'chat_completions' format works even on a path containing 'responses'.""" + front_end_config = FastApiFrontEndConfig() + front_end_config.workflow.openai_api_v1_path = "/v1/my_responses_endpoint" + front_end_config.workflow.openai_api_v1_format = "chat_completions" + + config = Config( + general=GeneralConfig(front_end=front_end_config), + workflow=EchoFunctionConfig(use_openai_api=True), + ) + + async with build_nat_client(config) as client: + # Should accept Chat Completions format + response = await client.post( + "/v1/my_responses_endpoint", + json={ + "messages": [{"role": "user", "content": "Hello"}], + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["object"] == "chat.completion"