Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 117 additions & 96 deletions src/agentevals/api/app.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""FastAPI application for agentevals REST API."""

from __future__ import annotations

import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
Expand All @@ -17,6 +20,9 @@
from .debug_routes import debug_router
from .routes import router

if TYPE_CHECKING:
from ..streaming.ws_server import StreamingTraceManager

try:
from dotenv import load_dotenv

Expand All @@ -27,107 +33,122 @@
pass


@asynccontextmanager
async def lifespan(app: FastAPI):
log_level_str = os.getenv("AGENTEVALS_LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_str, logging.INFO)
logging.basicConfig(
level=log_level,
format="%(levelname)s:%(name)s:%(message)s",
force=True,
def _build_lifespan():
@asynccontextmanager
async def lifespan(app: FastAPI):
log_level_str = os.getenv("AGENTEVALS_LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_str, logging.INFO)
logging.basicConfig(
level=log_level,
format="%(levelname)s:%(name)s:%(message)s",
force=True,
)
ae_logger = logging.getLogger("agentevals")
ae_logger.setLevel(log_level)
if log_buffer not in ae_logger.handlers:
log_buffer.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
ae_logger.addHandler(log_buffer)
mgr = getattr(app.state, "trace_manager", None)
if mgr:
mgr.start_cleanup_task()
yield
if mgr:
await mgr.shutdown()
ae_logger.removeHandler(log_buffer)

return lifespan


def create_app(
*,
trace_manager: StreamingTraceManager | None = None,
enable_streaming: bool = False,
) -> FastAPI:
"""Create the main agentevals API app."""
app = FastAPI(
title="agentevals API",
version=__version__,
description="REST API for evaluating agent traces using ADK's scoring framework",
lifespan=_build_lifespan(),
)
ae_logger = logging.getLogger("agentevals")
ae_logger.setLevel(log_level)
if log_buffer not in ae_logger.handlers:
log_buffer.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
ae_logger.addHandler(log_buffer)
mgr = getattr(app.state, "trace_manager", None)
if mgr:
mgr.start_cleanup_task()
yield
if mgr:
await mgr.shutdown()
ae_logger.removeHandler(log_buffer)


app = FastAPI(
title="agentevals API",
version=__version__,
description="REST API for evaluating agent traces using ADK's scoring framework",
lifespan=lifespan,
)

app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:5174"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)

app.include_router(router, prefix="/api")
app.include_router(debug_router, prefix="/api/debug")

_live_mode = os.getenv("AGENTEVALS_LIVE") == "1"

if _live_mode:
from fastapi import Request as _Request
from fastapi import WebSocket

from ..streaming.ws_server import StreamingTraceManager
from .streaming_routes import streaming_router

app.include_router(streaming_router, prefix="/api/streaming")
app.state.trace_manager = StreamingTraceManager()

@app.websocket("/ws/traces")
async def websocket_endpoint(websocket: WebSocket):
await websocket.app.state.trace_manager.handle_connection(websocket)

@app.get("/stream/ui-updates")
async def ui_updates_stream(request: _Request):
mgr = request.app.state.trace_manager

async def event_generator():
queue = mgr.register_sse_client()
try:
while True:
event = await queue.get()
if event is None:
break
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
mgr.unregister_sse_client(queue)

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:5174"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)

app.include_router(router, prefix="/api")
app.include_router(debug_router, prefix="/api/debug")

if trace_manager is not None:
app.state.trace_manager = trace_manager

if enable_streaming:
if trace_manager is None:
raise ValueError("enable_streaming requires a trace_manager")

from fastapi import Request as _Request
from fastapi import WebSocket

from .streaming_routes import streaming_router

app.include_router(streaming_router, prefix="/api/streaming")

@app.websocket("/ws/traces")
async def websocket_endpoint(websocket: WebSocket):
await websocket.app.state.trace_manager.handle_connection(websocket)

@app.get("/stream/ui-updates")
async def ui_updates_stream(request: _Request):
mgr = request.app.state.trace_manager

async def event_generator():
queue = mgr.register_sse_client()
try:
while True:
event = await queue.get()
if event is None:
break
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
mgr.unregister_sse_client(queue)

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)

static_dir = Path(__file__).parent.parent / "_static"
has_ui = static_dir.is_dir() and (static_dir / "index.html").exists()

if has_ui and not os.getenv("AGENTEVALS_HEADLESS"):
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles

app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="ui-assets")

_static_dir = Path(__file__).parent.parent / "_static"
_has_ui = _static_dir.is_dir() and (_static_dir / "index.html").exists()
@app.get("/")
async def root():
return FileResponse(static_dir / "index.html")

if _has_ui and not os.getenv("AGENTEVALS_HEADLESS"):
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
@app.get("/{path:path}")
async def spa_fallback(path: str):
file_path = static_dir / path
if file_path.is_file():
return FileResponse(file_path)
return FileResponse(static_dir / "index.html")

app.mount("/assets", StaticFiles(directory=_static_dir / "assets"), name="ui-assets")
return app

@app.get("/")
async def root():
return FileResponse(_static_dir / "index.html")

@app.get("/{path:path}")
async def spa_fallback(path: str):
file_path = _static_dir / path
if file_path.is_file():
return FileResponse(file_path)
return FileResponse(_static_dir / "index.html")
app = create_app()
8 changes: 0 additions & 8 deletions src/agentevals/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,3 @@ def require_trace_manager(request: Request) -> StreamingTraceManager:
if mgr is None:
raise HTTPException(status_code=503, detail="Live mode not enabled")
return mgr


def require_trace_manager_from_app(app: Any) -> StreamingTraceManager:
"""Return the StreamingTraceManager from app, raising RuntimeError if missing."""
mgr = get_trace_manager_from_app(app)
if mgr is None:
raise RuntimeError("Live mode not enabled")
return mgr
24 changes: 13 additions & 11 deletions src/agentevals/api/otlp_app.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
"""Minimal FastAPI app for the OTLP HTTP receiver on port 4318.

Shares the StreamingTraceManager with the main app (port 8001).
Mounts only the /v1/traces and /v1/logs endpoints.
"""

from contextlib import asynccontextmanager
from __future__ import annotations

from typing import TYPE_CHECKING

from fastapi import FastAPI

from .otlp_routes import otlp_router

if TYPE_CHECKING:
from ..streaming.ws_server import StreamingTraceManager

@asynccontextmanager
async def lifespan(app: FastAPI):
from .app import app as main_app

mgr = getattr(main_app.state, "trace_manager", None)
if mgr:
app.state.trace_manager = mgr
yield
def create_otlp_app(*, trace_manager: StreamingTraceManager | None = None) -> FastAPI:
"""Create the OTLP HTTP receiver app."""
app = FastAPI(title="agentevals OTLP receiver")
if trace_manager is not None:
app.state.trace_manager = trace_manager
app.include_router(otlp_router)
return app


otlp_app = FastAPI(title="agentevals OTLP receiver", lifespan=lifespan)
otlp_app.include_router(otlp_router)
otlp_app = create_otlp_app()
27 changes: 9 additions & 18 deletions src/agentevals/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,27 +534,26 @@ async def _run_servers(
otlp_grpc_port: int,
*,
mcp_port: int | None = None,
reload: bool = False,
reload_dirs: list[str] | None = None,
log_level: str = "warning",
) -> None:
"""Start API, OTLP HTTP+gRPC receivers, and optional MCP (Streamable HTTP)."""
import uvicorn

from .api.app import create_app
from .api.otlp_app import create_otlp_app
from .streaming.ws_server import StreamingTraceManager

shared_kwargs: dict = {
"host": host,
"reload": reload,
"log_level": log_level,
}
if reload_dirs:
shared_kwargs["reload_dirs"] = reload_dirs

# TODO #99 Create the manager and pass it into the Server constructors instead of injecting it into the app state.
mgr = StreamingTraceManager()
main_app = create_app(trace_manager=mgr, enable_streaming=True)
otlp_app = create_otlp_app(trace_manager=mgr)

main_server = uvicorn.Server(uvicorn.Config("agentevals.api.app:app", port=port, **shared_kwargs))
otlp_http_server = uvicorn.Server(
uvicorn.Config("agentevals.api.otlp_app:otlp_app", port=otlp_http_port, **shared_kwargs)
)
main_server = uvicorn.Server(uvicorn.Config(main_app, port=port, **shared_kwargs))
otlp_http_server = uvicorn.Server(uvicorn.Config(otlp_app, port=otlp_http_port, **shared_kwargs))
Comment on lines +555 to +556
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break hot reload functionality of --dev?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

I checked this more closely and the combined serve --dev path wasn't actually getting uvicorn hot reload before this change either: it runs uvicorn.Server(...).serve() directly, and the reload=True / reload_dirs=... config there doesn't start uvicorn's reload supervisor. So this PR doesn't introduce a new regression, but it did make the mismatch more visible. Could you confirm?

I've cleaned that up in this branch by removing the dead reload plumbing and keeping the startup wiring explicit around the shared StreamingTraceManager. If we want real hot reload for the multi-surface dev server, I'd prefer to handle that as a follow-up since it likely needs a different startup shape. Make sense?

uvicorn_servers: list = [main_server, otlp_http_server]

if mcp_port is not None:
Expand All @@ -571,10 +570,6 @@ async def _run_servers(
mcp_uvicorn = uvicorn.Server(uvicorn.Config(mcp_app, **mcp_kwargs))
uvicorn_servers.append(mcp_uvicorn)

from .api.app import app as main_app
from .api.dependencies import require_trace_manager_from_app

mgr = require_trace_manager_from_app(main_app)
otlp_grpc_server = create_otlp_grpc_server(host=host, port=otlp_grpc_port, manager=mgr)
await otlp_grpc_server.start()

Expand Down Expand Up @@ -703,17 +698,13 @@ def serve(
click.echo("Waiting for agent connections...")
click.echo()

src_path = Path(__file__).parent.parent
reload_dirs = [str(src_path)]
asyncio.run(
_run_servers(
host,
port,
otlp_http_port,
otlp_grpc_port,
mcp_port=mcp_port,
reload=True,
reload_dirs=reload_dirs,
log_level="info",
)
)
Expand Down
16 changes: 5 additions & 11 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,12 @@ def live_servers():
os.environ["AGENTEVALS_LIVE"] = "1"
os.environ["AGENTEVALS_HEADLESS"] = "1"

import importlib
from agentevals.api.app import create_app
from agentevals.api.otlp_app import create_otlp_app

from agentevals.api import app as app_module

importlib.reload(app_module)

from agentevals.api.app import app
from agentevals.api.otlp_app import otlp_app

mgr = getattr(app.state, "trace_manager", None)
if mgr:
otlp_app.state.trace_manager = mgr
mgr = StreamingTraceManager()
app = create_app(trace_manager=mgr, enable_streaming=True)
otlp_app = create_otlp_app(trace_manager=mgr)

main_config = uvicorn.Config(app, host="127.0.0.1", port=main_port, log_level="warning")
otlp_config = uvicorn.Config(otlp_app, host="127.0.0.1", port=otlp_http_port, log_level="warning")
Expand Down
Loading