From 6f64838d61d29bac3ca8a4c98b75267d2355c069 Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Wed, 1 Apr 2026 17:42:38 +0200 Subject: [PATCH 1/7] gRPC OTel Receiver support --- Dockerfile | 4 +- README.md | 11 +- charts/agentevals/Chart.yaml | 2 +- charts/agentevals/templates/NOTES.txt | 5 +- charts/agentevals/templates/deployment.yaml | 3 + charts/agentevals/templates/service.yaml | 4 + charts/agentevals/values.yaml | 4 + docs/otel-compatibility.md | 11 +- examples/README.md | 11 +- examples/kubernetes/README.md | 48 ++----- src/agentevals/api/dependencies.py | 17 ++- src/agentevals/api/otlp_grpc.py | 75 +++++++++++ src/agentevals/cli.py | 52 ++++++-- tests/integration/conftest.py | 18 +-- tests/integration/test_live_agents.py | 68 +++++----- tests/integration/test_otlp_grpc_receiver.py | 128 +++++++++++++++++++ tests/test_otlp_receiver.py | 83 ++++++++++++ 17 files changed, 438 insertions(+), 106 deletions(-) create mode 100644 src/agentevals/api/otlp_grpc.py create mode 100644 tests/integration/test_otlp_grpc_receiver.py diff --git a/Dockerfile b/Dockerfile index cb4166f..f1eb0d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,6 +33,6 @@ USER app ENV PATH="/app/.venv/bin:$PATH" ENV AGENTEVALS_SERVER_URL=http://127.0.0.1:8001 -EXPOSE 8001 4318 8080 +EXPOSE 8001 4318 4317 8080 -CMD ["agentevals", "serve", "--host", "0.0.0.0", "--port", "8001", "--otlp-port", "4318", "--mcp-port", "8080"] +CMD ["agentevals", "serve", "--host", "0.0.0.0", "--port", "8001", "--otlp-http-port", "4318", "--otlp-grpc-port", "4317", "--mcp-port", "8080"] diff --git a/README.md b/README.md index ccd1f99..3e05904 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,14 @@ export OTEL_RESOURCE_ATTRIBUTES="agentevals.session_name=my-agent" python your_agent.py ``` -Traces stream to the UI in real-time. Works with LangChain, Strands, Google ADK, or any framework that emits OTel spans (`http/protobuf` and `http/json` supported). Sessions are auto-created and grouped by `agentevals.session_name`. Set `agentevals.eval_set_id` to associate traces with an eval set. +For OTLP/gRPC exporters, use: + +```bash +export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317 +export OTEL_EXPORTER_OTLP_PROTOCOL=grpc +``` + +Traces stream to the UI in real-time. Works with LangChain, Strands, Google ADK, or any framework that emits OTel spans (`http/protobuf`, `http/json`, and OTLP/gRPC supported). Sessions are auto-created and grouped by `agentevals.session_name`. Set `agentevals.eval_set_id` to associate traces with an eval set. See [examples/zero-code-examples/](examples/zero-code-examples/) for working examples. @@ -245,7 +252,7 @@ While the server is running, interactive API documentation is available at: | [`/redoc`](http://localhost:8001/redoc) | ReDoc reference documentation | | [`/openapi.json`](http://localhost:8001/openapi.json) | Raw OpenAPI 3.x schema (for code generation or CI) | -The OTLP receiver (port 4318) serves its own docs at `http://localhost:4318/docs`. +The OTLP HTTP receiver (port 4318) serves its own docs at `http://localhost:4318/docs`. ## MCP Server diff --git a/charts/agentevals/Chart.yaml b/charts/agentevals/Chart.yaml index 8c3cec5..3b2b9f5 100644 --- a/charts/agentevals/Chart.yaml +++ b/charts/agentevals/Chart.yaml @@ -1,6 +1,6 @@ apiVersion: v2 name: agentevals -description: agentevals web UI, OTLP HTTP receiver, and MCP (Streamable HTTP) +description: agentevals web UI, OTLP HTTP+gRPC receivers, and MCP (Streamable HTTP) type: application version: 0.1.0 appVersion: "0.5.2" diff --git a/charts/agentevals/templates/NOTES.txt b/charts/agentevals/templates/NOTES.txt index 8a7cfb1..2f95688 100644 --- a/charts/agentevals/templates/NOTES.txt +++ b/charts/agentevals/templates/NOTES.txt @@ -1,8 +1,9 @@ 1. UI and API are available at port {{ .Values.service.http.port }} (Service port name: http). 2. OTLP HTTP receiver: port {{ .Values.service.otlpHttp.port }} (OTEL_EXPORTER_OTLP_ENDPOINT=http://:{{ .Values.service.otlpHttp.port }}). -3. MCP (Streamable HTTP): port {{ .Values.service.mcp.port }}, path /mcp (e.g. http://:{{ .Values.service.mcp.port }}/mcp). +3. OTLP gRPC receiver: port {{ .Values.service.otlpGrpc.port }} (OTEL_EXPORTER_OTLP_ENDPOINT=:{{ .Values.service.otlpGrpc.port }}, OTEL_EXPORTER_OTLP_PROTOCOL=grpc). +4. MCP (Streamable HTTP): port {{ .Values.service.mcp.port }}, path /mcp (e.g. http://:{{ .Values.service.mcp.port }}/mcp). {{- if .Values.ephemeralVolume.enabled }} -4. An emptyDir is mounted at /tmp with HOME=/tmp/agentevals-home (ephemeral; lost on pod restart). Set ephemeralVolume.enabled=false and readOnlyRootFilesystem=false if you need a writable root without this mount. +5. An emptyDir is mounted at /tmp with HOME=/tmp/agentevals-home (ephemeral; lost on pod restart). Set ephemeralVolume.enabled=false and readOnlyRootFilesystem=false if you need a writable root without this mount. {{- end }} Get the Service URL: diff --git a/charts/agentevals/templates/deployment.yaml b/charts/agentevals/templates/deployment.yaml index 4593095..3a56b25 100644 --- a/charts/agentevals/templates/deployment.yaml +++ b/charts/agentevals/templates/deployment.yaml @@ -79,6 +79,9 @@ spec: - name: otlp-http containerPort: {{ .Values.service.otlpHttp.containerPort }} protocol: TCP + - name: otlp-grpc + containerPort: {{ .Values.service.otlpGrpc.containerPort }} + protocol: TCP - name: mcp containerPort: {{ .Values.service.mcp.containerPort }} protocol: TCP diff --git a/charts/agentevals/templates/service.yaml b/charts/agentevals/templates/service.yaml index 3d46ed8..090ff3c 100644 --- a/charts/agentevals/templates/service.yaml +++ b/charts/agentevals/templates/service.yaml @@ -16,6 +16,10 @@ spec: port: {{ .Values.service.otlpHttp.port }} targetPort: otlp-http protocol: TCP + - name: otlp-grpc + port: {{ .Values.service.otlpGrpc.port }} + targetPort: otlp-grpc + protocol: TCP - name: mcp port: {{ .Values.service.mcp.port }} targetPort: mcp diff --git a/charts/agentevals/values.yaml b/charts/agentevals/values.yaml index 56bc4eb..f455af3 100644 --- a/charts/agentevals/values.yaml +++ b/charts/agentevals/values.yaml @@ -111,6 +111,10 @@ service: otlpHttp: port: 4318 containerPort: 4318 + # -- OTLP gRPC receiver port + otlpGrpc: + port: 4317 + containerPort: 4317 # -- MCP (Streamable HTTP) port mcp: port: 8080 diff --git a/docs/otel-compatibility.md b/docs/otel-compatibility.md index 582ffc1..46b2373 100644 --- a/docs/otel-compatibility.md +++ b/docs/otel-compatibility.md @@ -80,11 +80,18 @@ If you maintain an OTel-instrumented agent framework and want to align with the ## OTLP Receiver -agentevals runs an OTLP HTTP receiver on port 4318 (the standard OTLP HTTP port) that accepts: +agentevals runs: + +- OTLP HTTP receiver on port 4318 (standard OTLP HTTP port) +- OTLP gRPC receiver on port 4317 (standard OTLP gRPC port). + +OTLP HTTP accepts: | Endpoint | Content Types | |----------|--------------| | `/v1/traces` | `application/json`, `application/x-protobuf` | | `/v1/logs` | `application/json`, `application/x-protobuf` | -Point your standard OTel exporters at `http://localhost:4318` and traces will stream into agentevals automatically. See [examples/README.md](../examples/README.md) for zero-code setup instructions. +Point OTLP/HTTP exporters at `http://localhost:4318`. +Point OTLP/gRPC exporters at `localhost:4317` with `OTEL_EXPORTER_OTLP_PROTOCOL=grpc`. +Traces and logs stream into agentevals automatically. See [examples/README.md](../examples/README.md) for zero-code setup instructions. diff --git a/examples/README.md b/examples/README.md index 5676175..d1c2b73 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,14 @@ export OTEL_RESOURCE_ATTRIBUTES="agentevals.session_name=my-agent,agentevals.eva python your_agent.py ``` -The OTLP receiver runs on port 4318 (standard OTLP HTTP port) and accepts both `http/protobuf` and `http/json`. Sessions are auto-created from incoming traces and grouped by `agentevals.session_name`. +For OTLP/gRPC exporters, use: + +```bash +export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317 +export OTEL_EXPORTER_OTLP_PROTOCOL=grpc +``` + +agentevals accepts OTLP/HTTP on port 4318 (`http/protobuf` and `http/json`) and OTLP/gRPC on port 4317. Sessions are auto-created from incoming traces and grouped by `agentevals.session_name`. | Example | Framework | LLM Provider | |---------|-----------|-------------| @@ -106,7 +113,7 @@ The zero-code and SDK examples implement the same toy agent (dice rolling + prim | Example | Description | |---------|-------------| -| [kubernetes/](./kubernetes/) | Deploy agentevals with kagent on Kubernetes, using an OTel Collector as a gRPC to HTTP bridge. Includes a walkthrough for comparing two kagent agents (different models) and evaluating them with tool trajectory and response match scores. | +| [kubernetes/](./kubernetes/) | Deploy agentevals with kagent on Kubernetes using native OTLP gRPC ingestion (or optionally an OTel Collector). Includes a walkthrough for comparing two kagent agents (different models) and evaluating them with tool trajectory and response match scores. | ## Advanced: GenAI Semantic Convention Patterns diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 2f9057e..781993f 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -1,15 +1,14 @@ # Kubernetes: Evaluating kagent with agentevals -Run agentevals alongside [kagent](https://github.com/kagent-dev/kagent) on Kubernetes to evaluate AI agent conversations in real time. This example deploys three components: +Run agentevals alongside [kagent](https://github.com/kagent-dev/kagent) on Kubernetes to evaluate AI agent conversations in real time. This example deploys: -1. **agentevals** receives OTLP traces over HTTP and serves the evaluation UI -2. **OTel Collector** bridges the protocol gap: kagent exports traces via gRPC, but agentevals only supports OTLP/HTTP today, so the Collector converts gRPC to HTTP -3. **kagent** provides Kubernetes-native AI agents with built-in OTel instrumentation (gRPC export only) +1. **agentevals** receives OTLP traces over HTTP (`:4318`) and gRPC (`:4317`) and serves the evaluation UI +2. **kagent** provides Kubernetes-native AI agents with built-in OTel instrumentation (gRPC export) ``` -kagent (gRPC :4317) --> OTel Collector --> agentevals (HTTP :4318) - | - UI on :8001 +kagent (gRPC :4317) -----------------> agentevals (gRPC :4317 / HTTP :4318) + | + UI on :8001 ``` ## Prerequisites @@ -33,35 +32,11 @@ This creates a single pod exposing: | Port | Purpose | |------|---------| | 8001 | Web UI and API | +| 4317 | OTLP gRPC receiver (traces and logs) | | 4318 | OTLP HTTP receiver (traces and logs) | | 8080 | MCP (Streamable HTTP) | -### 2. OTel Collector (gRPC to HTTP bridge) - -kagent exports traces over gRPC (port 4317), but agentevals accepts OTLP over HTTP (port 4318). The OTel Collector bridges the two protocols. - -```bash -helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts -helm repo update - -helm upgrade --install otel-collector open-telemetry/opentelemetry-collector \ - --namespace kagent --create-namespace \ - --set mode=deployment \ - --set replicaCount=1 \ - --set image.repository=otel/opentelemetry-collector \ - --set ports.otlp.enabled=true \ - --set ports.otlp-http.enabled=false \ - --set config.exporters.otlphttp.endpoint="http://agentevals.default.svc.cluster.local:4318" \ - --set config.exporters.otlphttp.compression="none" \ - --set config.service.pipelines.traces.receivers[0]=otlp \ - --set config.service.pipelines.traces.exporters[0]=otlphttp \ - --set config.service.pipelines.logs.receivers[0]=otlp \ - --set config.service.pipelines.logs.exporters[0]=otlphttp -``` - -> **Note:** If you deployed agentevals in a namespace other than `default`, update the `endpoint` value accordingly: `http://agentevals..svc.cluster.local:4318`. - -### 3. kagent +### 2. kagent Install the CRDs first, then the kagent operator with OTel tracing enabled: @@ -83,16 +58,16 @@ helm upgrade --install kagent oci://ghcr.io/kagent-dev/kagent/helm/kagent \ --set agents.cilium-manager-agent.enabled=false \ --set agents.cilium-debug-agent.enabled=false \ --set otel.tracing.enabled=true \ - --set otel.tracing.exporter.otlp.endpoint="otel-collector-opentelemetry-collector.kagent.svc.cluster.local:4317" \ + --set otel.tracing.exporter.otlp.endpoint="agentevals.default.svc.cluster.local:4317" \ --set otel.tracing.exporter.otlp.insecure=true ``` -This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter at the Collector. +This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter directly at agentevals gRPC. ### Verify the deployment ```bash -kubectl get pods -A -l 'app.kubernetes.io/name in (agentevals, kagent, opentelemetry-collector)' +kubectl get pods -A -l 'app.kubernetes.io/name in (agentevals, kagent)' ``` All pods should be `Running` before continuing. @@ -240,7 +215,6 @@ You can also click an individual conversation and see a breakdown of each evalua ```bash helm uninstall kagent -n kagent helm uninstall kagent-crds -n kagent -helm uninstall otel-collector -n kagent helm uninstall agentevals kubectl delete namespace kagent ``` diff --git a/src/agentevals/api/dependencies.py b/src/agentevals/api/dependencies.py index 452b676..c6b08a1 100644 --- a/src/agentevals/api/dependencies.py +++ b/src/agentevals/api/dependencies.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from fastapi import HTTPException, Request @@ -15,9 +15,22 @@ def get_trace_manager(request: Request) -> StreamingTraceManager | None: return getattr(request.app.state, "trace_manager", None) +def get_trace_manager_from_app(app: Any) -> StreamingTraceManager | None: + """Return the StreamingTraceManager from an app object or None.""" + return getattr(app.state, "trace_manager", None) + + def require_trace_manager(request: Request) -> StreamingTraceManager: """Return the StreamingTraceManager, raising 503 if live mode is off.""" - mgr = getattr(request.app.state, "trace_manager", None) + mgr = get_trace_manager_from_app(request.app) if mgr is None: raise HTTPException(status_code=503, detail="Live mode not enabled") return mgr + + +def require_trace_manager_from_app(app: Any) -> StreamingTraceManager: + """Return the StreamingTraceManager from app, raising RuntimeError if missing.""" + mgr = get_trace_manager_from_app(app) + if mgr is None: + raise RuntimeError("Live mode not enabled") + return mgr diff --git a/src/agentevals/api/otlp_grpc.py b/src/agentevals/api/otlp_grpc.py new file mode 100644 index 0000000..60430bf --- /dev/null +++ b/src/agentevals/api/otlp_grpc.py @@ -0,0 +1,75 @@ +"""OTLP gRPC receiver services for traces and logs. + +Receives standard OTLP/gRPC Export requests on port 4317 and forwards them +into the same StreamingTraceManager pipeline used by OTLP/HTTP routes. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from google.protobuf.json_format import MessageToDict +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc + +from .otlp_routes import _fix_protobuf_id_fields, _process_logs, _process_traces + +if TYPE_CHECKING: + from grpc import aio + + from ..streaming.ws_server import StreamingTraceManager + +logger = logging.getLogger(__name__) + + +class OtlpTraceService(trace_service_pb2_grpc.TraceServiceServicer): + """OTLP TraceService gRPC implementation.""" + + def __init__(self, manager: StreamingTraceManager): + self._manager = manager + + async def Export(self, request, context): # noqa: N802 (gRPC method name) + body = MessageToDict(request, preserving_proto_field_name=False) + _fix_protobuf_id_fields(body) + await _process_traces(body, self._manager) + return trace_service_pb2.ExportTraceServiceResponse() + + +class OtlpLogsService(logs_service_pb2_grpc.LogsServiceServicer): + """OTLP LogsService gRPC implementation.""" + + def __init__(self, manager: StreamingTraceManager): + self._manager = manager + + async def Export(self, request, context): # noqa: N802 (gRPC method name) + body = MessageToDict(request, preserving_proto_field_name=False) + _fix_protobuf_id_fields(body) + await _process_logs(body, self._manager) + return logs_service_pb2.ExportLogsServiceResponse() + + +def create_otlp_grpc_server( + host: str, + port: int, + manager: StreamingTraceManager, +) -> aio.Server: + """Create an OTLP gRPC server bound to host:port.""" + try: + import grpc + except ImportError as exc: # pragma: no cover - environment-dependent + raise RuntimeError( + "OTLP gRPC receiver requires grpcio. Install with: pip install grpcio" + ) from exc + + server = grpc.aio.server() + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(OtlpTraceService(manager), server) + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(OtlpLogsService(manager), server) + + listen_addr = f"{host}:{port}" + bound_port = server.add_insecure_port(listen_addr) + if bound_port == 0: + raise RuntimeError(f"Failed to bind OTLP gRPC receiver to {listen_addr}") + + logger.info("OTLP gRPC receiver configured at %s", listen_addr) + return server diff --git a/src/agentevals/cli.py b/src/agentevals/cli.py index bb36352..8aaf358 100644 --- a/src/agentevals/cli.py +++ b/src/agentevals/cli.py @@ -503,14 +503,15 @@ def _shared_exit(sig, frame): async def _run_servers( host: str, port: int, - otlp_port: int, + otlp_http_port: int, + otlp_grpc_port: int, *, mcp_port: int | None = None, reload: bool = False, reload_dirs: list[str] | None = None, log_level: str = "warning", ) -> None: - """Start the main API, OTLP HTTP server, and optionally MCP (Streamable HTTP).""" + """Start API, OTLP HTTP+gRPC receivers, and optional MCP (Streamable HTTP).""" import uvicorn shared_kwargs: dict = { @@ -522,8 +523,10 @@ async def _run_servers( shared_kwargs["reload_dirs"] = reload_dirs main_server = uvicorn.Server(uvicorn.Config("agentevals.api.app:app", port=port, **shared_kwargs)) - otlp_server = uvicorn.Server(uvicorn.Config("agentevals.api.otlp_app:otlp_app", port=otlp_port, **shared_kwargs)) - servers: list = [main_server, otlp_server] + otlp_http_server = uvicorn.Server( + uvicorn.Config("agentevals.api.otlp_app:otlp_app", port=otlp_http_port, **shared_kwargs) + ) + servers: list = [main_server, otlp_http_server] if mcp_port is not None: from .mcp_server import create_server as create_mcp_server @@ -540,7 +543,18 @@ async def _run_servers( servers.append(mcp_uvicorn) _link_server_shutdown(*servers) - await asyncio.gather(*(s.serve() for s in servers)) + from .api.app import app as main_app + from .api.dependencies import require_trace_manager_from_app + from .api.otlp_grpc import create_otlp_grpc_server + + mgr = require_trace_manager_from_app(main_app) + otlp_grpc_server = create_otlp_grpc_server(host=host, port=otlp_grpc_port, manager=mgr) + await otlp_grpc_server.start() + + try: + await asyncio.gather(*(s.serve() for s in servers)) + finally: + await otlp_grpc_server.stop(grace=1) @main.command("serve") @@ -561,10 +575,17 @@ async def _run_servers( help="Port to bind the server to.", ) @click.option( - "--otlp-port", + "--otlp-http-port", + type=click.IntRange(min=1), default=4318, help="Port for OTLP HTTP receiver (default: 4318, standard OTLP HTTP port).", ) +@click.option( + "--otlp-grpc-port", + type=click.IntRange(min=1), + default=4317, + help="Port for OTLP gRPC receiver (default: 4317, standard OTLP gRPC port).", +) @click.option( "--mcp-port", default=None, @@ -592,7 +613,8 @@ def serve( dev: bool, host: str, port: int, - otlp_port: int, + otlp_http_port: int, + otlp_grpc_port: int, mcp_port: int | None, eval_sets: str | None, headless: bool, @@ -631,7 +653,8 @@ def serve( if dev: click.echo("agentevals dev server starting...") - click.echo(f" OTLP HTTP: http://{host}:{otlp_port} (OTEL_EXPORTER_OTLP_ENDPOINT default)") + click.echo(f" OTLP HTTP: http://{host}:{otlp_http_port} (OTEL_EXPORTER_OTLP_ENDPOINT default)") + click.echo(f" OTLP gRPC: {host}:{otlp_grpc_port} (OTEL_EXPORTER_OTLP_PROTOCOL=grpc)") click.echo(f" WebSocket: ws://{host}:{port}/ws/traces") click.echo(f" API: http://{host}:{port}/api") if mcp_port is not None: @@ -652,7 +675,8 @@ def serve( _run_servers( host, port, - otlp_port, + otlp_http_port, + otlp_grpc_port, mcp_port=mcp_port, reload=True, reload_dirs=reload_dirs, @@ -661,20 +685,22 @@ def serve( ) elif has_ui and not headless: click.echo(f"agentevals: http://{host}:{port}") - click.echo(f" OTLP HTTP: http://{host}:{otlp_port}") + click.echo(f" OTLP HTTP: http://{host}:{otlp_http_port}") + click.echo(f" OTLP gRPC: {host}:{otlp_grpc_port}") if mcp_port is not None: click.echo(f" MCP (Streamable HTTP): http://{host}:{mcp_port}/mcp") click.echo() - asyncio.run(_run_servers(host, port, otlp_port, mcp_port=mcp_port)) + asyncio.run(_run_servers(host, port, otlp_http_port, otlp_grpc_port, mcp_port=mcp_port)) else: click.echo(f"agentevals API: http://{host}:{port}/api") - click.echo(f" OTLP HTTP: http://{host}:{otlp_port}") + click.echo(f" OTLP HTTP: http://{host}:{otlp_http_port}") + click.echo(f" OTLP gRPC: {host}:{otlp_grpc_port}") if mcp_port is not None: click.echo(f" MCP (Streamable HTTP): http://{host}:{mcp_port}/mcp") click.echo() - asyncio.run(_run_servers(host, port, otlp_port, mcp_port=mcp_port)) + asyncio.run(_run_servers(host, port, otlp_http_port, otlp_grpc_port, mcp_port=mcp_port)) @main.command("mcp") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c63a317..922841d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -39,10 +39,10 @@ async def trace_manager(): @pytest.fixture async def otlp_client(trace_manager): """httpx client → OTLP app via ASGI transport (no real server).""" - from agentevals.api.otlp_routes import otlp_router - from fastapi import FastAPI + from agentevals.api.otlp_routes import otlp_router + test_app = FastAPI() test_app.state.trace_manager = trace_manager test_app.include_router(otlp_router) @@ -55,10 +55,10 @@ async def otlp_client(trace_manager): @pytest.fixture async def api_client(trace_manager): """httpx client → main app streaming routes via ASGI transport.""" - from agentevals.api.streaming_routes import streaming_router - from fastapi import FastAPI + from agentevals.api.streaming_routes import streaming_router + test_app = FastAPI() test_app.state.trace_manager = trace_manager test_app.include_router(streaming_router, prefix="/api/streaming") @@ -83,7 +83,7 @@ def _find_free_port() -> int: def live_servers(): """Start real uvicorn on ephemeral ports in a background thread. - Returns (main_port, otlp_port, trace_manager). + Returns (main_port, otlp_http_port, trace_manager). Servers run in their own event loop on a daemon thread so they can process HTTP requests independently of the test's event loop. @@ -92,7 +92,7 @@ def live_servers(): import time main_port = _find_free_port() - otlp_port = _find_free_port() + otlp_http_port = _find_free_port() saved_env = { "AGENTEVALS_LIVE": os.environ.get("AGENTEVALS_LIVE"), @@ -115,7 +115,7 @@ def live_servers(): otlp_app.state.trace_manager = mgr main_config = uvicorn.Config(app, host="127.0.0.1", port=main_port, log_level="warning") - otlp_config = uvicorn.Config(otlp_app, host="127.0.0.1", port=otlp_port, log_level="warning") + otlp_config = uvicorn.Config(otlp_app, host="127.0.0.1", port=otlp_http_port, log_level="warning") main_server = uvicorn.Server(main_config) otlp_server = uvicorn.Server(otlp_config) @@ -133,7 +133,7 @@ def _run(): import httpx as _httpx - for port in (main_port, otlp_port): + for port in (main_port, otlp_http_port): reachable = False for _ in range(50): try: @@ -145,7 +145,7 @@ def _run(): if not reachable: raise RuntimeError(f"Server on port {port} did not become reachable") - yield main_port, otlp_port, mgr + yield main_port, otlp_http_port, mgr main_server.should_exit = True otlp_server.should_exit = True diff --git a/tests/integration/test_live_agents.py b/tests/integration/test_live_agents.py index e647a22..41bbae2 100644 --- a/tests/integration/test_live_agents.py +++ b/tests/integration/test_live_agents.py @@ -41,7 +41,7 @@ def _run_agent( script: str, - otlp_port: int, + otlp_http_port: int, session_name: str, eval_set_id: str = "e2e-test", extra_env: dict | None = None, @@ -50,7 +50,7 @@ def _run_agent( """Run an example agent script as a subprocess.""" env = { **os.environ, - "OTEL_EXPORTER_OTLP_ENDPOINT": f"http://127.0.0.1:{otlp_port}", + "OTEL_EXPORTER_OTLP_ENDPOINT": f"http://127.0.0.1:{otlp_http_port}", "OTEL_RESOURCE_ATTRIBUTES": (f"agentevals.eval_set_id={eval_set_id},agentevals.session_name={session_name}"), **(extra_env or {}), } @@ -69,12 +69,12 @@ class TestLangchainZeroCode: """Run the LangChain zero-code OTLP example and verify session grouping.""" def test_session_created_with_spans_and_logs(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-langchain" result = _run_agent( "examples/zero-code-examples/langchain/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -88,12 +88,12 @@ def test_session_created_with_spans_and_logs(self, live_servers): assert len(session.logs) > 0, "LangChain uses logs for message content" def test_invocations_extracted_with_content(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-langchain-inv" result = _run_agent( "examples/zero-code-examples/langchain/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -107,12 +107,12 @@ def test_invocations_extracted_with_content(self, live_servers): assert has_content, f"Invocation {inv.get('invocationId', '?')} has no content" def test_session_visible_via_api(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-langchain-api" result = _run_agent( "examples/zero-code-examples/langchain/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0 @@ -130,12 +130,12 @@ class TestStrandsZeroCode: """Run the Strands zero-code OTLP example and verify session grouping.""" def test_session_created_spans_only(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-strands" result = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env={ "OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental", @@ -151,12 +151,12 @@ def test_session_created_spans_only(self, live_servers): assert len(session.spans) > 0, "Expected spans from LLM calls" def test_invocations_extracted(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-strands-inv" result = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env={ "OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental", @@ -170,12 +170,12 @@ def test_invocations_extracted(self, live_servers): assert len(session.invocations) > 0, "Expected extracted invocations" def test_session_visible_via_api(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-strands-api" result = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env={ "OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental", @@ -196,12 +196,12 @@ class TestAdkZeroCode: """Run the ADK zero-code OTLP example and verify session grouping.""" def test_session_created_spans_only(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-adk" result = _run_agent( "examples/zero-code-examples/adk/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -214,12 +214,12 @@ def test_session_created_spans_only(self, live_servers): assert len(session.spans) > 0, "Expected spans from ADK agent" def test_invocations_extracted(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-adk-inv" result = _run_agent( "examples/zero-code-examples/adk/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -230,12 +230,12 @@ def test_invocations_extracted(self, live_servers): assert len(session.invocations) > 0, "Expected extracted invocations" def test_session_visible_via_api(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-adk-api" result = _run_agent( "examples/zero-code-examples/adk/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0 @@ -253,12 +253,12 @@ class TestOpenAIAgentsZeroCode: """Run the OpenAI Agents SDK zero-code OTLP example and verify session grouping.""" def test_session_created_with_spans(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-openai-agents" result = _run_agent( "examples/zero-code-examples/openai-agents/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -271,12 +271,12 @@ def test_session_created_with_spans(self, live_servers): assert len(session.spans) > 0, "Expected spans from LLM calls" def test_invocations_extracted(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-openai-agents-inv" result = _run_agent( "examples/zero-code-examples/openai-agents/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0, f"Agent failed:\nstdout: {result.stdout}\nstderr: {result.stderr}" @@ -287,12 +287,12 @@ def test_invocations_extracted(self, live_servers): assert len(session.invocations) > 0, "Expected extracted invocations" def test_session_visible_via_api(self, live_servers): - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-openai-agents-api" result = _run_agent( "examples/zero-code-examples/openai-agents/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result.returncode == 0 @@ -317,7 +317,7 @@ class TestAgentRerun: def test_strands_rerun_creates_separate_sessions(self, live_servers): """Run the Strands agent twice with the same session_name. Each run must produce its own session.""" - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-strands-rerun" strands_env = { "OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental", @@ -325,7 +325,7 @@ def test_strands_rerun_creates_separate_sessions(self, live_servers): result1 = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env=strands_env, ) @@ -334,7 +334,7 @@ def test_strands_rerun_creates_separate_sessions(self, live_servers): result2 = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env=strands_env, ) @@ -352,12 +352,12 @@ def test_strands_rerun_creates_separate_sessions(self, live_servers): def test_langchain_rerun_creates_separate_sessions(self, live_servers): """Run the LangChain agent twice with the same session_name. Each run must produce its own session.""" - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-langchain-rerun" result1 = _run_agent( "examples/zero-code-examples/langchain/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result1.returncode == 0, f"Run 1 failed:\n{result1.stderr}" @@ -365,7 +365,7 @@ def test_langchain_rerun_creates_separate_sessions(self, live_servers): result2 = _run_agent( "examples/zero-code-examples/langchain/run.py", - otlp_port, + otlp_http_port, session_name, ) assert result2.returncode == 0, f"Run 2 failed:\n{result2.stderr}" @@ -381,7 +381,7 @@ def test_langchain_rerun_creates_separate_sessions(self, live_servers): def test_rerun_sessions_visible_via_api(self, live_servers): """Both rerun sessions are visible in the API response.""" - main_port, otlp_port, mgr = live_servers + main_port, otlp_http_port, mgr = live_servers session_name = "e2e-strands-rerun-api" strands_env = { "OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental", @@ -390,7 +390,7 @@ def test_rerun_sessions_visible_via_api(self, live_servers): for run_idx in range(2): result = _run_agent( "examples/zero-code-examples/strands/run.py", - otlp_port, + otlp_http_port, session_name, extra_env=strands_env, ) diff --git a/tests/integration/test_otlp_grpc_receiver.py b/tests/integration/test_otlp_grpc_receiver.py new file mode 100644 index 0000000..43bbb98 --- /dev/null +++ b/tests/integration/test_otlp_grpc_receiver.py @@ -0,0 +1,128 @@ +"""Integration tests for OTLP gRPC receiver.""" + +from __future__ import annotations + +import socket + +import pytest +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2_grpc +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ExportLogsServiceRequest +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2_grpc +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest +from opentelemetry.proto.common.v1.common_pb2 import AnyValue, InstrumentationScope, KeyValue +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord, ResourceLogs, ScopeLogs +from opentelemetry.proto.resource.v1.resource_pb2 import Resource +from opentelemetry.proto.trace.v1.trace_pb2 import ResourceSpans, ScopeSpans, Span + +from agentevals.api.otlp_grpc import create_otlp_grpc_server + +from .conftest import wait_for_session_complete + +pytestmark = pytest.mark.integration + + +def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _create_server_with_retry(trace_manager, attempts: int = 20): + last_error: RuntimeError | None = None + for _ in range(attempts): + port = _find_free_port() + try: + server = create_otlp_grpc_server( + host="localhost", + port=port, + manager=trace_manager, + ) + return port, server + except RuntimeError as exc: + last_error = exc + assert last_error is not None + raise last_error + + +class TestOtlpGrpcReceiver: + async def test_grpc_trace_and_logs_create_session(self, trace_manager): + grpc = pytest.importorskip("grpc") + + port, server = _create_server_with_retry(trace_manager) + session_name = "grpc-integration" + + trace_id_hex = "0102030405060708090a0b0c0d0e0f10" + span_id_hex = "1112131415161718" + trace_id = bytes.fromhex(trace_id_hex) + span_id = bytes.fromhex(span_id_hex) + + await server.start() + + try: + async with grpc.aio.insecure_channel(f"localhost:{port}") as channel: + trace_stub = trace_service_pb2_grpc.TraceServiceStub(channel) + logs_stub = logs_service_pb2_grpc.LogsServiceStub(channel) + + trace_req = ExportTraceServiceRequest( + resource_spans=[ + ResourceSpans( + resource=Resource( + attributes=[ + KeyValue( + key="agentevals.session_name", + value=AnyValue(string_value=session_name), + ), + ] + ), + scope_spans=[ + ScopeSpans( + scope=InstrumentationScope(name="grpc-test", version="0.1"), + spans=[ + Span( + trace_id=trace_id, + span_id=span_id, + name="grpc-root", + kind=Span.SPAN_KIND_CLIENT, + start_time_unix_nano=1_000_000_000, + end_time_unix_nano=2_000_000_000, + ) + ], + ) + ], + ) + ] + ) + await trace_stub.Export(trace_req) + + logs_req = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + scope_logs=[ + ScopeLogs( + log_records=[ + LogRecord( + event_name="gen_ai.user.message", + observed_time_unix_nano=1_500_000_000, + trace_id=trace_id, + span_id=span_id, + body=AnyValue(string_value='{"content":"hello over grpc"}'), + ) + ] + ) + ] + ) + ] + ) + await logs_stub.Export(logs_req) + + await wait_for_session_complete(trace_manager, session_name, timeout=2.0) + session = trace_manager.sessions[session_name] + + assert session.is_complete + assert session.source == "otlp" + assert session.trace_ids == {trace_id_hex} + assert len(session.spans) == 1 + assert len(session.logs) >= 1 + assert session.logs[0]["event_name"] == "gen_ai.user.message" + finally: + await server.stop(grace=1) diff --git a/tests/test_otlp_receiver.py b/tests/test_otlp_receiver.py index 67ac00c..44068ce 100644 --- a/tests/test_otlp_receiver.py +++ b/tests/test_otlp_receiver.py @@ -1,9 +1,13 @@ """Tests for the OTLP HTTP receiver endpoints and session auto-management.""" import asyncio +import sys from datetime import UTC, datetime, timedelta from unittest.mock import AsyncMock, MagicMock +import pytest + +from agentevals.api.otlp_grpc import OtlpLogsService, OtlpTraceService, create_otlp_grpc_server from agentevals.api.otlp_routes import ( _convert_otlp_log_record, _decode_protobuf_logs, @@ -1337,9 +1341,11 @@ async def go(): import base64 +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2 from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest as LogsServiceRequestPB, ) +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2 from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest as TraceServiceRequestPB, ) @@ -1568,6 +1574,83 @@ def test_empty_logs_request(self): assert body.get("resourceLogs") is None or body.get("resourceLogs") == [] +class TestCreateOtlpGrpcServer: + def test_raises_when_bind_fails(self, monkeypatch): + fake_server = MagicMock() + fake_server.add_insecure_port.return_value = 0 + + class _FakeAio: + @staticmethod + def server(): + return fake_server + + fake_grpc = MagicMock() + fake_grpc.aio = _FakeAio() + monkeypatch.setitem(sys.modules, "grpc", fake_grpc) + + with pytest.raises(RuntimeError, match="Failed to bind OTLP gRPC receiver"): + create_otlp_grpc_server( + host="127.0.0.1", + port=4317, + manager=MagicMock(), + ) + + +class TestGrpcServices: + def test_trace_service_export_creates_session(self): + async def go(): + mgr = _make_mgr() + service = OtlpTraceService(mgr) + + resource_attrs = [ + KeyValue(key="agentevals.session_name", value=AnyValue(string_value="grpc-session")), + KeyValue(key="agentevals.eval_set_id", value=AnyValue(string_value="grpc-eval")), + ] + span = _make_pb_span(parent_span_id_hex=PARENT_SPAN_ID_HEX) + request = _make_pb_export_request([span], resource_attrs=resource_attrs) + + response = await service.Export(request, None) + + assert isinstance(response, trace_service_pb2.ExportTraceServiceResponse) + assert "grpc-session" in mgr.sessions + session = mgr.sessions["grpc-session"] + assert session.eval_set_id == "grpc-eval" + assert session.trace_id == TRACE_ID_HEX + assert len(session.spans) == 1 + _cancel_timers(mgr) + + _run(go()) + + def test_logs_service_export_attaches_logs_to_existing_session(self): + async def go(): + mgr = _make_mgr() + service = OtlpLogsService(mgr) + meta = {"eval_set_id": None, "session_name": "grpc-logs", "resource_attrs": {}} + session = await mgr.get_or_create_otlp_session(TRACE_ID_HEX, meta) + + log_record = LogRecordPB( + time_unix_nano=1000000000, + trace_id=_hex_to_bytes(TRACE_ID_HEX), + span_id=_hex_to_bytes(SPAN_ID_HEX), + body=AnyValue(string_value='{"content": "hello from grpc"}'), + ) + log_record.attributes.append( + KeyValue(key="event.name", value=AnyValue(string_value="gen_ai.user.message")) + ) + scope_logs = ScopeLogs(log_records=[log_record]) + resource_logs = ResourceLogs(scope_logs=[scope_logs]) + request = LogsServiceRequestPB(resource_logs=[resource_logs]) + + response = await service.Export(request, None) + + assert isinstance(response, logs_service_pb2.ExportLogsServiceResponse) + assert len(session.logs) == 1 + assert session.logs[0]["event_name"] == "gen_ai.user.message" + _cancel_timers(mgr) + + _run(go()) + + class TestProtobufJsonParity: """Verify that protobuf-decoded traces produce the same session/span behavior as JSON.""" From a846dafe73d50eaa17af27594a939060afb9ccec Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Wed, 1 Apr 2026 18:05:03 +0200 Subject: [PATCH 2/7] improvements --- src/agentevals/api/otlp_grpc.py | 10 +- src/agentevals/api/otlp_processing.py | 349 ++++++++++++++++++++++++ src/agentevals/api/otlp_routes.py | 368 +------------------------- tests/test_otlp_receiver.py | 100 +++---- 4 files changed, 418 insertions(+), 409 deletions(-) create mode 100644 src/agentevals/api/otlp_processing.py diff --git a/src/agentevals/api/otlp_grpc.py b/src/agentevals/api/otlp_grpc.py index 60430bf..0e1a87a 100644 --- a/src/agentevals/api/otlp_grpc.py +++ b/src/agentevals/api/otlp_grpc.py @@ -13,7 +13,7 @@ from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc -from .otlp_routes import _fix_protobuf_id_fields, _process_logs, _process_traces +from .otlp_processing import fix_protobuf_id_fields, process_logs, process_traces if TYPE_CHECKING: from grpc import aio @@ -31,8 +31,8 @@ def __init__(self, manager: StreamingTraceManager): async def Export(self, request, context): # noqa: N802 (gRPC method name) body = MessageToDict(request, preserving_proto_field_name=False) - _fix_protobuf_id_fields(body) - await _process_traces(body, self._manager) + fix_protobuf_id_fields(body) + await process_traces(body, self._manager) return trace_service_pb2.ExportTraceServiceResponse() @@ -44,8 +44,8 @@ def __init__(self, manager: StreamingTraceManager): async def Export(self, request, context): # noqa: N802 (gRPC method name) body = MessageToDict(request, preserving_proto_field_name=False) - _fix_protobuf_id_fields(body) - await _process_logs(body, self._manager) + fix_protobuf_id_fields(body) + await process_logs(body, self._manager) return logs_service_pb2.ExportLogsServiceResponse() diff --git a/src/agentevals/api/otlp_processing.py b/src/agentevals/api/otlp_processing.py new file mode 100644 index 0000000..548a2ce --- /dev/null +++ b/src/agentevals/api/otlp_processing.py @@ -0,0 +1,349 @@ +"""OTLP payload decoding and processing for traces/logs.""" + +from __future__ import annotations + +import base64 +import logging +from typing import TYPE_CHECKING + +from google.protobuf.json_format import MessageToDict +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest as LogsServiceRequestPB, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceRequest as TraceServiceRequestPB, +) + +from ..extraction import flatten_otlp_attributes +from ..trace_attrs import ( + OTEL_GENAI_CONVERSATION_ID, + OTEL_GENAI_INPUT_MESSAGES, + OTEL_GENAI_OUTPUT_MESSAGES, + OTEL_SCOPE, + OTEL_SCOPE_VERSION, +) +from .models import WSSpanReceivedEvent + +if TYPE_CHECKING: + from ..streaming.ws_server import StreamingTraceManager + +logger = logging.getLogger(__name__) + +AGENTEVALS_EVAL_SET_ID = "agentevals.eval_set_id" +AGENTEVALS_SESSION_NAME = "agentevals.session_name" + + +async def process_traces(body: dict, manager: StreamingTraceManager) -> None: + """Parse ExportTraceServiceRequest and feed spans to the pipeline.""" + for resource_span in body.get("resourceSpans", []): + resource_attrs = resource_span.get("resource", {}).get("attributes", []) + metadata = _extract_agentevals_metadata(resource_attrs) + + if not metadata.get("conversation_id"): + metadata["conversation_id"] = _prescan_conversation_id(resource_span) + + for scope_span in resource_span.get("scopeSpans", []): + scope = scope_span.get("scope", {}) + scope_name = scope.get("name", "") + scope_version = scope.get("version", "") + + for span_data in scope_span.get("spans", []): + span = _normalize_span(span_data, scope_name, scope_version) + trace_id = span.get("traceId", "") + + if not trace_id: + continue + + if not metadata.get("conversation_id"): + conversation_id = _extract_conversation_id(span.get("attributes", [])) + if conversation_id: + metadata["conversation_id"] = conversation_id + + session = await manager.get_or_create_otlp_session(trace_id, metadata) + + if not session.can_accept_span(): + logger.warning("Session %s at span limit", session.session_id) + continue + + session.spans.append(span) + + extractor = manager.incremental_extractors.get(session.session_id) + if extractor: + updates = extractor.process_span(span) + for update in updates: + update["sessionId"] = session.session_id + await manager.broadcast_to_ui(update) + + await manager.broadcast_to_ui( + WSSpanReceivedEvent( + session_id=session.session_id, + span=span, + ).model_dump(by_alias=True) + ) + + manager.reset_idle_timer(session.session_id) + + if not span.get("parentSpanId"): + session.has_root_span = True + manager.schedule_session_completion(session.session_id) + + +async def process_logs(body: dict, manager: StreamingTraceManager) -> None: + """Parse ExportLogsServiceRequest and feed logs to sessions. + + Logs and spans arrive via separate OTLP exporters (BatchLogRecordProcessor + and BatchSpanProcessor) and may arrive in any order. When a log's traceId + isn't yet registered in a session's trace_ids set, we fall back to matching + by session_name from resource attributes. + + Logs may arrive after span-triggered session completion (the + BatchLogRecordProcessor and BatchSpanProcessor flush independently). + Late-arriving logs are accepted and trigger re-extraction of invocations. + """ + sessions_needing_reextraction: set[str] = set() + + for resource_log in body.get("resourceLogs", []): + resource_attrs = resource_log.get("resource", {}).get("attributes", []) + metadata = _extract_agentevals_metadata(resource_attrs) + session_name = metadata.get("session_name") + + for scope_log in resource_log.get("scopeLogs", []): + for log_record in scope_log.get("logRecords", []): + log_event = _convert_otlp_log_record(log_record) + if not log_event: + continue + + trace_id = log_record.get("traceId", "") + if not trace_id: + continue + + session = manager.find_session_by_trace_id(trace_id) + + if not session and session_name: + active_id = manager._active_session_for_name.get(session_name) + candidate = manager.sessions.get(active_id) if active_id else None + if candidate and not candidate.is_complete: + candidate.trace_ids.add(trace_id) + session = candidate + + if not session: + manager.buffer_orphan_log(trace_id, session_name, log_event) + logger.debug( + "Buffered orphan log trace_id=%s session_name=%s", + trace_id[:12], + session_name, + ) + continue + + if not session.can_accept_log(): + continue + + session.logs.append(log_event) + + if session.is_complete: + sessions_needing_reextraction.add(session.session_id) + else: + manager.reset_idle_timer(session.session_id) + + extractor = manager.incremental_extractors.get(session.session_id) + if extractor: + updates = extractor.process_log(log_event) + for update in updates: + update["sessionId"] = session.session_id + await manager.broadcast_to_ui(update) + + for session_id in sessions_needing_reextraction: + manager.schedule_log_reextraction(session_id) + + +def decode_protobuf_traces(raw: bytes) -> dict: + """Decode ExportTraceServiceRequest protobuf to OTLP JSON dict.""" + msg = TraceServiceRequestPB() + msg.ParseFromString(raw) + data = MessageToDict(msg, preserving_proto_field_name=False) + fix_protobuf_id_fields(data) + return data + + +def decode_protobuf_logs(raw: bytes) -> dict: + """Decode ExportLogsServiceRequest protobuf to OTLP JSON dict.""" + msg = LogsServiceRequestPB() + msg.ParseFromString(raw) + data = MessageToDict(msg, preserving_proto_field_name=False) + fix_protobuf_id_fields(data) + return data + + +def fix_protobuf_id_fields(data) -> None: + """Convert base64-encoded bytes fields to hex strings in-place. + + MessageToDict base64-encodes protobuf bytes fields, but OTLP JSON + uses hex-encoded strings for traceId, spanId, and parentSpanId. + """ + if isinstance(data, dict): + for key in ("traceId", "spanId", "parentSpanId"): + if key in data and isinstance(data[key], str): + try: + raw = base64.b64decode(data[key]) + data[key] = raw.hex() + except Exception: + pass + for value in data.values(): + if isinstance(value, (dict, list)): + fix_protobuf_id_fields(value) + elif isinstance(data, list): + for item in data: + if isinstance(item, (dict, list)): + fix_protobuf_id_fields(item) + + +_GENAI_EVENT_KEYS = {OTEL_GENAI_INPUT_MESSAGES, OTEL_GENAI_OUTPUT_MESSAGES} + + +def _normalize_span(span_data: dict, scope_name: str, scope_version: str) -> dict: + """Normalize an OTLP span for the downstream pipeline. + + Performs two transformations: + 1. Injects otel.scope.name/version from the scopeSpans level into span + attributes (the pipeline expects them there). + 2. Promotes gen_ai.input.messages and gen_ai.output.messages from span + events to span attributes. Some SDKs (e.g. Strands with + OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental) store + message content in span events, but the converter reads attributes. + """ + span = dict(span_data) + attrs = list(span.get("attributes", [])) + + existing_keys = {a.get("key") for a in attrs} + + if scope_name and OTEL_SCOPE not in existing_keys: + attrs.append({"key": OTEL_SCOPE, "value": {"stringValue": scope_name}}) + existing_keys.add(OTEL_SCOPE) + if scope_version and OTEL_SCOPE_VERSION not in existing_keys: + attrs.append({"key": OTEL_SCOPE_VERSION, "value": {"stringValue": scope_version}}) + existing_keys.add(OTEL_SCOPE_VERSION) + + for event in span.get("events", []): + for attr in event.get("attributes", []): + key = attr.get("key", "") + if key in _GENAI_EVENT_KEYS and key not in existing_keys: + attrs.append(attr) + existing_keys.add(key) + + span["attributes"] = attrs + return span + + +def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict: + """Extract agentevals-specific metadata from OTLP resource attributes.""" + flat = flatten_otlp_attributes(resource_attrs) + return { + "eval_set_id": flat.get(AGENTEVALS_EVAL_SET_ID), + "session_name": flat.get(AGENTEVALS_SESSION_NAME), + "service_name": flat.get("service.name"), + "resource_attrs": flat, + } + + +def _prescan_conversation_id(resource_span: dict) -> str | None: + """Pre-scan all spans in a resourceSpan batch for gen_ai.conversation.id. + + Within a single OTLP batch, some scopes (e.g. A2A server instrumentation) + may lack conversation_id while others (agent instrumentation) have it. + Scanning upfront ensures ALL spans in the batch route to the same session. + """ + for scope_span in resource_span.get("scopeSpans", []): + for span_data in scope_span.get("spans", []): + conv_id = _extract_conversation_id(span_data.get("attributes", [])) + if conv_id: + return conv_id + return None + + +def _extract_conversation_id(attrs_list: list[dict]) -> str | None: + """Extract gen_ai.conversation.id from OTLP span attributes.""" + for attr in attrs_list: + if attr.get("key") == OTEL_GENAI_CONVERSATION_ID: + return attr.get("value", {}).get("stringValue") + return None + + +def _convert_otlp_log_record(log_record: dict) -> dict | None: + """Convert OTLP log record to internal log event format. + + Internal format (used by IncrementalInvocationExtractor.process_log()): + {"event_name": "gen_ai.user.message", "timestamp": ..., "body": {...}, "attributes": {...}} + + Handles two event-name conventions: + - Newer OTel SDKs: top-level ``eventName`` field (LogRecord.event_name proto) + - Older convention: ``event.name`` stored as a regular attribute + """ + attrs = flatten_otlp_attributes(log_record.get("attributes", [])) + event_name = log_record.get("eventName") or attrs.get("event.name", "") + + if not event_name or not event_name.startswith("gen_ai."): + return None + + body_raw = log_record.get("body", {}) + body = _parse_otlp_body(body_raw) + + timestamp = log_record.get("timeUnixNano") or log_record.get("observedTimeUnixNano") + + result = { + "event_name": event_name, + "timestamp": timestamp, + "body": body, + "attributes": attrs, + } + + span_id = log_record.get("spanId", "") + if span_id: + result["span_id"] = span_id + + return result + + +def _parse_otlp_any_value(value_obj: dict): + """Recursively parse an OTLP AnyValue to native Python types. + + Handles the full AnyValue union: stringValue, intValue, doubleValue, + boolValue, kvlistValue (→ dict), arrayValue (→ list), bytesValue. + """ + if "stringValue" in value_obj: + return value_obj["stringValue"] + if "intValue" in value_obj: + return int(value_obj["intValue"]) + if "doubleValue" in value_obj: + return float(value_obj["doubleValue"]) + if "boolValue" in value_obj: + return value_obj["boolValue"] + if "kvlistValue" in value_obj: + kv = value_obj["kvlistValue"] + return {item.get("key", ""): _parse_otlp_any_value(item.get("value", {})) for item in kv.get("values", [])} + if "arrayValue" in value_obj: + arr = value_obj["arrayValue"] + return [_parse_otlp_any_value(v) for v in arr.get("values", [])] + if "bytesValue" in value_obj: + return value_obj["bytesValue"] + return value_obj + + +def _parse_otlp_body(body_raw: dict) -> dict | str: + """Parse OTLP log record body value. + + Top-level stringValue bodies are JSON-decoded (Strands-style logs store + message content as JSON strings). All other AnyValue types are parsed + recursively via ``_parse_otlp_any_value`` (handles the nested kvlistValue / + arrayValue structures used by the OpenAI instrumentor). + """ + if "stringValue" in body_raw: + import json + + raw = body_raw["stringValue"] + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + return raw + return _parse_otlp_any_value(body_raw) + + diff --git a/src/agentevals/api/otlp_routes.py b/src/agentevals/api/otlp_routes.py index f4dba27..9b379dd 100644 --- a/src/agentevals/api/otlp_routes.py +++ b/src/agentevals/api/otlp_routes.py @@ -1,50 +1,29 @@ -"""OTLP HTTP receiver endpoints for /v1/traces and /v1/logs. +"""OTLP HTTP routes for /v1/traces and /v1/logs. -Accepts standard OTLP/HTTP payloads (ExportTraceServiceRequest, -ExportLogsServiceRequest) in both JSON and protobuf wire formats, -and feeds them into the existing streaming pipeline via -StreamingTraceManager. - -Runs on port 4318 (standard OTLP HTTP port). Agents send traces by setting: - OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +Route handlers are intentionally thin and delegate decode/process logic to +`otlp_processing.py` so protocol handling can be reused by gRPC receivers and +tested independently from HTTP routing. """ from __future__ import annotations -import base64 -import logging from typing import TYPE_CHECKING from fastapi import APIRouter, Depends, Request, Response -from google.protobuf.json_format import MessageToDict -from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( - ExportLogsServiceRequest as LogsServiceRequestPB, -) -from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( - ExportTraceServiceRequest as TraceServiceRequestPB, -) -from ..extraction import flatten_otlp_attributes -from ..trace_attrs import ( - OTEL_GENAI_CONVERSATION_ID, - OTEL_GENAI_INPUT_MESSAGES, - OTEL_GENAI_OUTPUT_MESSAGES, - OTEL_SCOPE, - OTEL_SCOPE_VERSION, -) from .dependencies import require_trace_manager -from .models import WSSpanReceivedEvent +from .otlp_processing import ( + decode_protobuf_logs, + decode_protobuf_traces, + process_logs, + process_traces, +) if TYPE_CHECKING: from ..streaming.ws_server import StreamingTraceManager -logger = logging.getLogger(__name__) - otlp_router = APIRouter() -AGENTEVALS_EVAL_SET_ID = "agentevals.eval_set_id" -AGENTEVALS_SESSION_NAME = "agentevals.session_name" - @otlp_router.post("/v1/traces") async def receive_traces( @@ -56,11 +35,11 @@ async def receive_traces( if "application/x-protobuf" in content_type: raw = await request.body() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) else: body = await request.json() - await _process_traces(body, manager) + await process_traces(body, manager) return Response( status_code=200, content='{"partialSuccess":{}}', @@ -78,332 +57,13 @@ async def receive_logs( if "application/x-protobuf" in content_type: raw = await request.body() - body = _decode_protobuf_logs(raw) + body = decode_protobuf_logs(raw) else: body = await request.json() - await _process_logs(body, manager) + await process_logs(body, manager) return Response( status_code=200, content='{"partialSuccess":{}}', media_type="application/json", ) - - -async def _process_traces(body: dict, manager: StreamingTraceManager) -> None: - """Parse ExportTraceServiceRequest and feed spans to the pipeline.""" - for resource_span in body.get("resourceSpans", []): - resource_attrs = resource_span.get("resource", {}).get("attributes", []) - metadata = _extract_agentevals_metadata(resource_attrs) - - if not metadata.get("conversation_id"): - metadata["conversation_id"] = _prescan_conversation_id(resource_span) - - for scope_span in resource_span.get("scopeSpans", []): - scope = scope_span.get("scope", {}) - scope_name = scope.get("name", "") - scope_version = scope.get("version", "") - - for span_data in scope_span.get("spans", []): - span = _normalize_span(span_data, scope_name, scope_version) - trace_id = span.get("traceId", "") - - if not trace_id: - continue - - if not metadata.get("conversation_id"): - conversation_id = _extract_conversation_id(span.get("attributes", [])) - if conversation_id: - metadata["conversation_id"] = conversation_id - - session = await manager.get_or_create_otlp_session(trace_id, metadata) - - if not session.can_accept_span(): - logger.warning("Session %s at span limit", session.session_id) - continue - - session.spans.append(span) - - extractor = manager.incremental_extractors.get(session.session_id) - if extractor: - updates = extractor.process_span(span) - for update in updates: - update["sessionId"] = session.session_id - await manager.broadcast_to_ui(update) - - await manager.broadcast_to_ui( - WSSpanReceivedEvent( - session_id=session.session_id, - span=span, - ).model_dump(by_alias=True) - ) - - manager.reset_idle_timer(session.session_id) - - if not span.get("parentSpanId"): - session.has_root_span = True - manager.schedule_session_completion(session.session_id) - - -async def _process_logs(body: dict, manager: StreamingTraceManager) -> None: - """Parse ExportLogsServiceRequest and feed logs to sessions. - - Logs and spans arrive via separate OTLP exporters (BatchLogRecordProcessor - and BatchSpanProcessor) and may arrive in any order. When a log's traceId - isn't yet registered in a session's trace_ids set, we fall back to matching - by session_name from resource attributes. - - Logs may arrive after span-triggered session completion (the - BatchLogRecordProcessor and BatchSpanProcessor flush independently). - Late-arriving logs are accepted and trigger re-extraction of invocations. - """ - sessions_needing_reextraction: set[str] = set() - - for resource_log in body.get("resourceLogs", []): - resource_attrs = resource_log.get("resource", {}).get("attributes", []) - metadata = _extract_agentevals_metadata(resource_attrs) - session_name = metadata.get("session_name") - - for scope_log in resource_log.get("scopeLogs", []): - for log_record in scope_log.get("logRecords", []): - log_event = _convert_otlp_log_record(log_record) - if not log_event: - continue - - trace_id = log_record.get("traceId", "") - if not trace_id: - continue - - session = manager.find_session_by_trace_id(trace_id) - - if not session and session_name: - active_id = manager._active_session_for_name.get(session_name) - candidate = manager.sessions.get(active_id) if active_id else None - if candidate and not candidate.is_complete: - candidate.trace_ids.add(trace_id) - session = candidate - - if not session: - manager.buffer_orphan_log(trace_id, session_name, log_event) - logger.debug( - "Buffered orphan log trace_id=%s session_name=%s", - trace_id[:12], - session_name, - ) - continue - - if not session.can_accept_log(): - continue - - session.logs.append(log_event) - - if session.is_complete: - sessions_needing_reextraction.add(session.session_id) - else: - manager.reset_idle_timer(session.session_id) - - extractor = manager.incremental_extractors.get(session.session_id) - if extractor: - updates = extractor.process_log(log_event) - for update in updates: - update["sessionId"] = session.session_id - await manager.broadcast_to_ui(update) - - for session_id in sessions_needing_reextraction: - manager.schedule_log_reextraction(session_id) - - -_GENAI_EVENT_KEYS = {OTEL_GENAI_INPUT_MESSAGES, OTEL_GENAI_OUTPUT_MESSAGES} - - -def _normalize_span(span_data: dict, scope_name: str, scope_version: str) -> dict: - """Normalize an OTLP span for the downstream pipeline. - - Performs two transformations: - 1. Injects otel.scope.name/version from the scopeSpans level into span - attributes (the pipeline expects them there). - 2. Promotes gen_ai.input.messages and gen_ai.output.messages from span - events to span attributes. Some SDKs (e.g. Strands with - OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental) store - message content in span events, but the converter reads attributes. - """ - span = dict(span_data) - attrs = list(span.get("attributes", [])) - - existing_keys = {a.get("key") for a in attrs} - - if scope_name and OTEL_SCOPE not in existing_keys: - attrs.append({"key": OTEL_SCOPE, "value": {"stringValue": scope_name}}) - existing_keys.add(OTEL_SCOPE) - if scope_version and OTEL_SCOPE_VERSION not in existing_keys: - attrs.append({"key": OTEL_SCOPE_VERSION, "value": {"stringValue": scope_version}}) - existing_keys.add(OTEL_SCOPE_VERSION) - - for event in span.get("events", []): - for attr in event.get("attributes", []): - key = attr.get("key", "") - if key in _GENAI_EVENT_KEYS and key not in existing_keys: - attrs.append(attr) - existing_keys.add(key) - - span["attributes"] = attrs - return span - - -def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict: - """Extract agentevals-specific metadata from OTLP resource attributes.""" - flat = flatten_otlp_attributes(resource_attrs) - return { - "eval_set_id": flat.get(AGENTEVALS_EVAL_SET_ID), - "session_name": flat.get(AGENTEVALS_SESSION_NAME), - "service_name": flat.get("service.name"), - "resource_attrs": flat, - } - - -def _prescan_conversation_id(resource_span: dict) -> str | None: - """Pre-scan all spans in a resourceSpan batch for gen_ai.conversation.id. - - Within a single OTLP batch, some scopes (e.g. A2A server instrumentation) - may lack conversation_id while others (agent instrumentation) have it. - Scanning upfront ensures ALL spans in the batch route to the same session. - """ - for scope_span in resource_span.get("scopeSpans", []): - for span_data in scope_span.get("spans", []): - conv_id = _extract_conversation_id(span_data.get("attributes", [])) - if conv_id: - return conv_id - return None - - -def _extract_conversation_id(attrs_list: list[dict]) -> str | None: - """Extract gen_ai.conversation.id from OTLP span attributes.""" - for attr in attrs_list: - if attr.get("key") == OTEL_GENAI_CONVERSATION_ID: - return attr.get("value", {}).get("stringValue") - return None - - -def _convert_otlp_log_record(log_record: dict) -> dict | None: - """Convert OTLP log record to internal log event format. - - Internal format (used by IncrementalInvocationExtractor.process_log()): - {"event_name": "gen_ai.user.message", "timestamp": ..., "body": {...}, "attributes": {...}} - - Handles two event-name conventions: - - Newer OTel SDKs: top-level ``eventName`` field (LogRecord.event_name proto) - - Older convention: ``event.name`` stored as a regular attribute - """ - attrs = flatten_otlp_attributes(log_record.get("attributes", [])) - event_name = log_record.get("eventName") or attrs.get("event.name", "") - - if not event_name or not event_name.startswith("gen_ai."): - return None - - body_raw = log_record.get("body", {}) - body = _parse_otlp_body(body_raw) - - timestamp = log_record.get("timeUnixNano") or log_record.get("observedTimeUnixNano") - - result = { - "event_name": event_name, - "timestamp": timestamp, - "body": body, - "attributes": attrs, - } - - span_id = log_record.get("spanId", "") - if span_id: - result["span_id"] = span_id - - return result - - -def _parse_otlp_any_value(value_obj: dict): - """Recursively parse an OTLP AnyValue to native Python types. - - Handles the full AnyValue union: stringValue, intValue, doubleValue, - boolValue, kvlistValue (→ dict), arrayValue (→ list), bytesValue. - """ - if "stringValue" in value_obj: - return value_obj["stringValue"] - if "intValue" in value_obj: - return int(value_obj["intValue"]) - if "doubleValue" in value_obj: - return float(value_obj["doubleValue"]) - if "boolValue" in value_obj: - return value_obj["boolValue"] - if "kvlistValue" in value_obj: - kv = value_obj["kvlistValue"] - return {item.get("key", ""): _parse_otlp_any_value(item.get("value", {})) for item in kv.get("values", [])} - if "arrayValue" in value_obj: - arr = value_obj["arrayValue"] - return [_parse_otlp_any_value(v) for v in arr.get("values", [])] - if "bytesValue" in value_obj: - return value_obj["bytesValue"] - return value_obj - - -def _parse_otlp_body(body_raw: dict) -> dict | str: - """Parse OTLP log record body value. - - Top-level stringValue bodies are JSON-decoded (Strands-style logs store - message content as JSON strings). All other AnyValue types are parsed - recursively via ``_parse_otlp_any_value`` (handles the nested kvlistValue / - arrayValue structures used by the OpenAI instrumentor). - """ - if "stringValue" in body_raw: - import json - - raw = body_raw["stringValue"] - try: - return json.loads(raw) - except (json.JSONDecodeError, TypeError): - return raw - return _parse_otlp_any_value(body_raw) - - -# --------------------------------------------------------------------------- -# Protobuf decoding -# --------------------------------------------------------------------------- - - -def _decode_protobuf_traces(raw: bytes) -> dict: - """Decode ExportTraceServiceRequest protobuf to OTLP JSON dict.""" - msg = TraceServiceRequestPB() - msg.ParseFromString(raw) - data = MessageToDict(msg, preserving_proto_field_name=False) - _fix_protobuf_id_fields(data) - return data - - -def _decode_protobuf_logs(raw: bytes) -> dict: - """Decode ExportLogsServiceRequest protobuf to OTLP JSON dict.""" - msg = LogsServiceRequestPB() - msg.ParseFromString(raw) - data = MessageToDict(msg, preserving_proto_field_name=False) - _fix_protobuf_id_fields(data) - return data - - -def _fix_protobuf_id_fields(data) -> None: - """Convert base64-encoded bytes fields to hex strings in-place. - - MessageToDict base64-encodes protobuf bytes fields, but OTLP JSON - uses hex-encoded strings for traceId, spanId, and parentSpanId. - """ - if isinstance(data, dict): - for key in ("traceId", "spanId", "parentSpanId"): - if key in data and isinstance(data[key], str): - try: - raw = base64.b64decode(data[key]) - data[key] = raw.hex() - except Exception: - pass - for value in data.values(): - if isinstance(value, (dict, list)): - _fix_protobuf_id_fields(value) - elif isinstance(data, list): - for item in data: - if isinstance(item, (dict, list)): - _fix_protobuf_id_fields(item) diff --git a/tests/test_otlp_receiver.py b/tests/test_otlp_receiver.py index 44068ce..323107b 100644 --- a/tests/test_otlp_receiver.py +++ b/tests/test_otlp_receiver.py @@ -8,16 +8,16 @@ import pytest from agentevals.api.otlp_grpc import OtlpLogsService, OtlpTraceService, create_otlp_grpc_server -from agentevals.api.otlp_routes import ( +from agentevals.api.otlp_processing import ( _convert_otlp_log_record, - _decode_protobuf_logs, - _decode_protobuf_traces, _extract_agentevals_metadata, - _fix_protobuf_id_fields, _normalize_span, _parse_otlp_body, - _process_logs, - _process_traces, + decode_protobuf_logs, + decode_protobuf_traces, + fix_protobuf_id_fields, + process_logs, + process_traces, ) from agentevals.streaming.session import TraceSession from agentevals.streaming.ws_server import StreamingTraceManager @@ -509,7 +509,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) assert len(session.logs) == 1 mgr.schedule_log_reextraction.assert_called_once_with("s1") @@ -551,7 +551,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) assert len(session.logs) == 0 assert "new-trace-id" not in session.trace_ids @@ -752,7 +752,7 @@ async def go(): # --------------------------------------------------------------------------- -# Full pipeline: _process_traces +# Full pipeline: process_traces # --------------------------------------------------------------------------- @@ -766,7 +766,7 @@ async def go(): _make_otlp_attr("agentevals.session_name", "test-session"), ], ) - await _process_traces(body, mgr) + await process_traces(body, mgr) assert "test-session" in mgr.sessions session = mgr.sessions["test-session"] assert len(session.spans) == 1 @@ -784,7 +784,7 @@ async def go(): _make_span(trace_id="t1", span_id="s2", parent_span_id="p1"), ], ) - await _process_traces(body, mgr) + await process_traces(body, mgr) sessions = [s for s in mgr.sessions.values() if s.trace_id == "t1"] assert len(sessions) == 1 assert len(sessions[0].spans) == 2 @@ -821,7 +821,7 @@ async def go(): }, ] } - await _process_traces(body, mgr) + await process_traces(body, mgr) trace_ids = {s.trace_id for s in mgr.sessions.values()} assert "t1" in trace_ids assert "t2" in trace_ids @@ -837,7 +837,7 @@ async def go(): scope_name="gcp.vertex.agent", scope_version="1.2.3", ) - await _process_traces(body, mgr) + await process_traces(body, mgr) session = list(mgr.sessions.values())[0] span = session.spans[0] attr_map = {a["key"]: a["value"] for a in span["attributes"]} @@ -856,7 +856,7 @@ async def go(): body = _make_export_request( spans=[_make_span(trace_id="t1", parent_span_id=None)], ) - await _process_traces(body, mgr) + await process_traces(body, mgr) session = list(mgr.sessions.values())[0] assert session.has_root_span is True mgr.schedule_session_completion.assert_called_once_with(session.session_id) @@ -874,7 +874,7 @@ async def go(): _make_span(trace_id="t1", span_id="s2"), ], ) - await _process_traces(body, mgr) + await process_traces(body, mgr) assert mgr.reset_idle_timer.call_count == 2 _cancel_timers(mgr) @@ -895,8 +895,8 @@ async def go(): spans=[_make_span(trace_id="trace-b", span_id="s2")], resource_attrs=meta, ) - await _process_traces(body1, mgr) - await _process_traces(body2, mgr) + await process_traces(body1, mgr) + await process_traces(body2, mgr) assert len(mgr.sessions) == 1 session = mgr.sessions["my-session"] @@ -921,8 +921,8 @@ async def go(): spans=[_make_span(trace_id="trace-b")], resource_attrs=meta, ) - await _process_traces(body1, mgr) - await _process_traces(body2, mgr) + await process_traces(body1, mgr) + await process_traces(body2, mgr) log_body = { "resourceLogs": [ @@ -953,7 +953,7 @@ async def go(): } ] } - await _process_logs(log_body, mgr) + await process_logs(log_body, mgr) session = mgr.sessions["my-session"] assert len(session.logs) == 2 @@ -964,7 +964,7 @@ async def go(): def test_empty_request(self): async def go(): mgr = _make_mgr() - await _process_traces({"resourceSpans": []}, mgr) + await process_traces({"resourceSpans": []}, mgr) assert len(mgr.sessions) == 0 _run(go()) @@ -975,7 +975,7 @@ async def go(): body = _make_export_request( spans=[_make_span(trace_id="t1")], ) - await _process_traces(body, mgr) + await process_traces(body, mgr) span_received_calls = [c for c in mgr.broadcast_to_ui.call_args_list if c[0][0]["type"] == "span_received"] assert len(span_received_calls) == 1 _cancel_timers(mgr) @@ -984,7 +984,7 @@ async def go(): # --------------------------------------------------------------------------- -# Full pipeline: _process_logs +# Full pipeline: process_logs # --------------------------------------------------------------------------- @@ -1027,7 +1027,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) assert len(mgr._orphan_logs) == 1 assert mgr._orphan_logs[0]["trace_id"] == "trace-1" assert mgr._orphan_logs[0]["session_name"] == "my-agent" @@ -1071,7 +1071,7 @@ async def go(): } ] } - await _process_logs(log_body, mgr) + await process_logs(log_body, mgr) assert len(mgr._orphan_logs) == 1 meta = {"eval_set_id": None, "session_name": "my-agent", "resource_attrs": {}} @@ -1120,7 +1120,7 @@ async def go(): } ] } - await _process_logs(log_body, mgr) + await process_logs(log_body, mgr) meta = {"eval_set_id": None, "session_name": "my-agent", "resource_attrs": {}} session = await mgr.get_or_create_otlp_session("trace-1", meta) @@ -1195,7 +1195,7 @@ async def go(): } ] } - await _process_logs(log_body, mgr) + await process_logs(log_body, mgr) assert len(mgr._orphan_logs) == 3 @@ -1235,7 +1235,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) session = mgr.sessions["s1"] assert len(session.logs) == 1 assert session.logs[0]["event_name"] == "gen_ai.user.message" @@ -1266,7 +1266,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) assert len(mgr.sessions) == 0 assert len(mgr._orphan_logs) == 1 @@ -1298,7 +1298,7 @@ async def go(): } ] } - await _process_logs(body, mgr) + await process_logs(body, mgr) session = mgr.sessions["s1"] assert len(session.logs) == 0 @@ -1425,14 +1425,14 @@ def test_converts_base64_trace_id_to_hex(self): raw_bytes = _hex_to_bytes(TRACE_ID_HEX) b64 = base64.b64encode(raw_bytes).decode() data = {"traceId": b64, "spanId": base64.b64encode(_hex_to_bytes(SPAN_ID_HEX)).decode()} - _fix_protobuf_id_fields(data) + fix_protobuf_id_fields(data) assert data["traceId"] == TRACE_ID_HEX assert data["spanId"] == SPAN_ID_HEX def test_converts_parent_span_id(self): b64 = base64.b64encode(_hex_to_bytes(PARENT_SPAN_ID_HEX)).decode() data = {"parentSpanId": b64} - _fix_protobuf_id_fields(data) + fix_protobuf_id_fields(data) assert data["parentSpanId"] == PARENT_SPAN_ID_HEX def test_recurses_into_nested_structures(self): @@ -1440,20 +1440,20 @@ def test_recurses_into_nested_structures(self): b64_trace = base64.b64encode(raw_bytes).decode() b64_span = base64.b64encode(_hex_to_bytes(SPAN_ID_HEX)).decode() data = {"resourceSpans": [{"scopeSpans": [{"spans": [{"traceId": b64_trace, "spanId": b64_span}]}]}]} - _fix_protobuf_id_fields(data) + fix_protobuf_id_fields(data) span = data["resourceSpans"][0]["scopeSpans"][0]["spans"][0] assert span["traceId"] == TRACE_ID_HEX assert span["spanId"] == SPAN_ID_HEX def test_leaves_non_id_fields_alone(self): data = {"name": "test", "kind": 1, "traceId": base64.b64encode(b"\x01\x02").decode()} - _fix_protobuf_id_fields(data) + fix_protobuf_id_fields(data) assert data["name"] == "test" assert data["kind"] == 1 def test_handles_already_hex_strings(self): data = {"traceId": TRACE_ID_HEX} - _fix_protobuf_id_fields(data) + fix_protobuf_id_fields(data) assert len(data["traceId"]) > 0 @@ -1463,7 +1463,7 @@ def test_single_span_roundtrip(self): request = _make_pb_export_request([span]) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) assert "resourceSpans" in body spans = body["resourceSpans"][0]["scopeSpans"][0]["spans"] @@ -1478,7 +1478,7 @@ def test_root_span_no_parent(self): request = _make_pb_export_request([span]) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) decoded_span = body["resourceSpans"][0]["scopeSpans"][0]["spans"][0] assert "parentSpanId" not in decoded_span @@ -1493,7 +1493,7 @@ def test_preserves_attributes(self): request = _make_pb_export_request([span]) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) decoded_attrs = body["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"] attr_map = {a["key"]: a["value"] for a in decoded_attrs} assert attr_map["gen_ai.request.model"]["stringValue"] == "gpt-4" @@ -1508,7 +1508,7 @@ def test_preserves_resource_attributes(self): request = _make_pb_export_request([span], resource_attrs=resource_attrs) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) res_attrs = body["resourceSpans"][0]["resource"]["attributes"] attr_map = {a["key"]: a["value"] for a in res_attrs} assert attr_map["service.name"]["stringValue"] == "test-agent" @@ -1519,7 +1519,7 @@ def test_preserves_scope(self): request = _make_pb_export_request([span], scope_name="gcp.vertex.agent", scope_version="1.2.3") raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) scope = body["resourceSpans"][0]["scopeSpans"][0]["scope"] assert scope["name"] == "gcp.vertex.agent" assert scope["version"] == "1.2.3" @@ -1530,7 +1530,7 @@ def test_multiple_spans(self): request = _make_pb_export_request([span1, span2]) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) spans = body["resourceSpans"][0]["scopeSpans"][0]["spans"] assert len(spans) == 2 names = {s["name"] for s in spans} @@ -1539,7 +1539,7 @@ def test_multiple_spans(self): def test_empty_request(self): request = TraceServiceRequestPB() raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) + body = decode_protobuf_traces(raw) assert body.get("resourceSpans") is None or body.get("resourceSpans") == [] @@ -1558,7 +1558,7 @@ def test_genai_log_roundtrip(self): request = LogsServiceRequestPB(resource_logs=[resource_logs]) raw = request.SerializeToString() - body = _decode_protobuf_logs(raw) + body = decode_protobuf_logs(raw) assert "resourceLogs" in body lr = body["resourceLogs"][0]["scopeLogs"][0]["logRecords"][0] @@ -1570,7 +1570,7 @@ def test_genai_log_roundtrip(self): def test_empty_logs_request(self): request = LogsServiceRequestPB() raw = request.SerializeToString() - body = _decode_protobuf_logs(raw) + body = decode_protobuf_logs(raw) assert body.get("resourceLogs") is None or body.get("resourceLogs") == [] @@ -1666,8 +1666,8 @@ async def go(): request = _make_pb_export_request([span], resource_attrs=resource_attrs) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) - await _process_traces(body, mgr) + body = decode_protobuf_traces(raw) + await process_traces(body, mgr) assert "pb-session" in mgr.sessions session = mgr.sessions["pb-session"] @@ -1687,8 +1687,8 @@ async def go(): request = _make_pb_export_request([span]) raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) - await _process_traces(body, mgr) + body = decode_protobuf_traces(raw) + await process_traces(body, mgr) session = list(mgr.sessions.values())[0] assert session.has_root_span is True @@ -1705,8 +1705,8 @@ async def go(): request = _make_pb_export_request([span], scope_name="strands.agent", scope_version="2.0.0") raw = request.SerializeToString() - body = _decode_protobuf_traces(raw) - await _process_traces(body, mgr) + body = decode_protobuf_traces(raw) + await process_traces(body, mgr) session = list(mgr.sessions.values())[0] stored_span = session.spans[0] From 7a1231dbf58b59ac4e3af5732e8301a1275e4fa4 Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Thu, 2 Apr 2026 11:54:12 +0200 Subject: [PATCH 3/7] Apply feedback --- examples/kubernetes/README.md | 128 +++++++++++++++++++++++++++++--- src/agentevals/api/otlp_grpc.py | 22 +++++- src/agentevals/cli.py | 53 ++++++++++--- tests/test_otlp_receiver.py | 53 ++++++++++++- 4 files changed, 231 insertions(+), 25 deletions(-) diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 781993f..f89df56 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -1,14 +1,16 @@ # Kubernetes: Evaluating kagent with agentevals -Run agentevals alongside [kagent](https://github.com/kagent-dev/kagent) on Kubernetes to evaluate AI agent conversations in real time. This example deploys: +Run agentevals alongside [kagent](https://github.com/kagent-dev/kagent) on Kubernetes to evaluate AI agent conversations in real time. This example deploys three components: -1. **agentevals** receives OTLP traces over HTTP (`:4318`) and gRPC (`:4317`) and serves the evaluation UI -2. **kagent** provides Kubernetes-native AI agents with built-in OTel instrumentation (gRPC export) +1. **agentevals** receives OTLP traces over HTTP and serves the evaluation UI +2. **OTel Collector** Optional, useful when you want centralized telemetry +controls. +3. **kagent** provides Kubernetes-native AI agents with built-in OTel instrumentation (gRPC export only) ``` -kagent (gRPC :4317) -----------------> agentevals (gRPC :4317 / HTTP :4318) - | - UI on :8001 +kagent (gRPC :4317) --> OTel Collector( optional ) --> agentevals (gRPC :4317 / HTTP :4318) + | + UI on :8001 ``` ## Prerequisites @@ -36,7 +38,39 @@ This creates a single pod exposing: | 4318 | OTLP HTTP receiver (traces and logs) | | 8080 | MCP (Streamable HTTP) | -### 2. kagent +### 2. OTel Collector (optional) + +Native gRPC ingestion in agentevals is sufficient for most setups, but an +intermediate collector is still useful when you want centralized telemetry +controls: + +- traffic shaping (batching, retries, backpressure) +- filtering or redaction before data reaches agentevals +- routing/fan-out to additional backends +- protocol translation for mixed clients + +```bash +helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts +helm repo update + +helm upgrade --install otel-collector open-telemetry/opentelemetry-collector \ + --namespace kagent --create-namespace \ + --set mode=deployment \ + --set replicaCount=1 \ + --set image.repository=otel/opentelemetry-collector \ + --set ports.otlp.enabled=true \ + --set ports.otlp-http.enabled=false \ + --set config.exporters.otlp.endpoint="agentevals.default.svc.cluster.local:4317" \ + --set config.exporters.otlp.compression="gzip" \ + --set config.service.pipelines.traces.receivers[0]=otlp \ + --set config.service.pipelines.traces.exporters[0]=otlp \ + --set config.service.pipelines.logs.receivers[0]=otlp \ + --set config.service.pipelines.logs.exporters[0]=otlp +``` + +> **Note:** If you deployed agentevals in a namespace other than `default`, update the `endpoint` value accordingly: `http://agentevals..svc.cluster.local:4318`. + +### 3. kagent Install the CRDs first, then the kagent operator with OTel tracing enabled: @@ -58,16 +92,16 @@ helm upgrade --install kagent oci://ghcr.io/kagent-dev/kagent/helm/kagent \ --set agents.cilium-manager-agent.enabled=false \ --set agents.cilium-debug-agent.enabled=false \ --set otel.tracing.enabled=true \ - --set otel.tracing.exporter.otlp.endpoint="agentevals.default.svc.cluster.local:4317" \ + --set otel.tracing.exporter.otlp.endpoint="otel-collector-opentelemetry-collector.kagent.svc.cluster.local:4317" \ --set otel.tracing.exporter.otlp.insecure=true ``` -This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter directly at agentevals gRPC. +This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter at the Collector. ### Verify the deployment ```bash -kubectl get pods -A -l 'app.kubernetes.io/name in (agentevals, kagent)' +kubectl get pods -A -l 'app.kubernetes.io/name in (agentevals, kagent, opentelemetry-collector)' ``` All pods should be `Running` before continuing. @@ -212,6 +246,80 @@ You can also click an individual conversation and see a breakdown of each evalua ## Cleanup +```bash +helm uninstall kagent -n kagent +helm uninstall kagent-crds -n kagent +helm uninstall otel-collector -n kagent +helm uninstall agentevals +kubectl delete namespace kagent +``` + +**With `helm-agent` (gpt-4.1-mini):** + +1. Select `helm-agent` from the agent list +2. Start a new conversation +3. Ask: *"List all Helm releases across all namespaces and tell me which ones have pending upgrades"* +4. Follow up: *"Show me the values for the agentevals release"* + +**With `helm-agent-gpt5` (gpt-5):** + +1. Select `helm-agent-gpt5` from the agent list +2. Start a new conversation +3. Ask the same questions in the same order + +### Step 5. Watch traces in agentevals + +Switch back to the agentevals Live view at http://localhost:8001. You will see two sessions appear, one for each conversation. Each session shows: + +- **Status** transitioning from ACTIVE to COMPLETED as the conversation ends +- **Span count** incrementing in real time as the agent makes LLM calls and tool invocations +- **Model name** visible in the session metadata + +### Step 6. Select the GPT-5 session as the eval set + +Once both sessions are complete: + +1. Click on the `helm-agent-gpt5` session card to open its trace details +2. Review the conversation: check that it called the right tools and produced correct responses +3. Click **Use as Eval Set** to mark this session as the evaluation baseline +4. Give it a name like `helm-agent-comparison` + +This captures the GPT-5 session's tool trajectory and final responses as the golden reference. + +image + +### Step 7. Evaluate both sessions + +1. Go back to the sessions list +2. Select both sessions (the `gpt-4.1-mini` session and the `gpt-5` session) +3. Click **Evaluate** +4. Select the `helm-agent-comparison` eval set +5. Choose the metrics: + - **tool_trajectory_avg_score**: Did the agent call the correct tools in the correct order? + - **response_match_score**: Did the agent produce responses consistent with the golden reference? +6. Run the evaluation + +### What to look for + +| Metric | What it tells you | +|--------|------------------| +| `tool_trajectory_avg_score` | Whether the agent followed the expected sequence of Helm tool calls (`helm-list`, then `helm-get-values`). A score of 1.0 means it matched exactly. | +| `response_match_score` | How closely the agent's final answers matched the GPT-5 baseline. Useful for catching regressions when switching to a cheaper model. | + +Compare the two sessions in the results table: + +- **Token usage**: The session metadata includes total token counts. If `gpt-5` consumed fewer tokens while achieving the same trajectory score, it may be the better choice for this use case. +- **Tool trajectory**: If one agent called extra tools or skipped expected ones, the trajectory score reflects that. +- **Response quality**: A lower response match score on the `gpt-4.1-mini` session highlights where the cheaper model diverged from the GPT-5 baseline. + +image + +You can also click an individual conversation and see a breakdown of each evaluators. + +image + +## Cleanup + ```bash helm uninstall kagent -n kagent helm uninstall kagent-crds -n kagent diff --git a/src/agentevals/api/otlp_grpc.py b/src/agentevals/api/otlp_grpc.py index 0e1a87a..332bb95 100644 --- a/src/agentevals/api/otlp_grpc.py +++ b/src/agentevals/api/otlp_grpc.py @@ -22,6 +22,9 @@ logger = logging.getLogger(__name__) +DEFAULT_GRPC_MAX_CONCURRENT_RPCS = 32 +DEFAULT_GRPC_MAX_MESSAGE_BYTES = 8 * 1024 * 1024 + class OtlpTraceService(trace_service_pb2_grpc.TraceServiceServicer): """OTLP TraceService gRPC implementation.""" @@ -53,6 +56,9 @@ def create_otlp_grpc_server( host: str, port: int, manager: StreamingTraceManager, + *, + max_concurrent_rpcs: int = DEFAULT_GRPC_MAX_CONCURRENT_RPCS, + max_message_bytes: int = DEFAULT_GRPC_MAX_MESSAGE_BYTES, ) -> aio.Server: """Create an OTLP gRPC server bound to host:port.""" try: @@ -62,7 +68,14 @@ def create_otlp_grpc_server( "OTLP gRPC receiver requires grpcio. Install with: pip install grpcio" ) from exc - server = grpc.aio.server() + server = grpc.aio.server( + compression=grpc.Compression.Gzip, + maximum_concurrent_rpcs=max_concurrent_rpcs, + options=[ + ("grpc.max_receive_message_length", max_message_bytes), + ("grpc.max_send_message_length", max_message_bytes), + ], + ) trace_service_pb2_grpc.add_TraceServiceServicer_to_server(OtlpTraceService(manager), server) logs_service_pb2_grpc.add_LogsServiceServicer_to_server(OtlpLogsService(manager), server) @@ -71,5 +84,10 @@ def create_otlp_grpc_server( if bound_port == 0: raise RuntimeError(f"Failed to bind OTLP gRPC receiver to {listen_addr}") - logger.info("OTLP gRPC receiver configured at %s", listen_addr) + logger.info( + "OTLP gRPC receiver configured at %s (gzip enabled, max_concurrent_rpcs=%d, max_msg=%d)", + listen_addr, + max_concurrent_rpcs, + max_message_bytes, + ) return server diff --git a/src/agentevals/cli.py b/src/agentevals/cli.py index 8aaf358..69066f0 100644 --- a/src/agentevals/cli.py +++ b/src/agentevals/cli.py @@ -479,27 +479,48 @@ def evaluator_config(name: str, evaluator_path: str | None, threshold: float | N click.echo(rendered) -def _link_server_shutdown(*servers) -> None: - """Link multiple uvicorn servers so a single SIGINT shuts down all of them. +def _install_shared_exit_handler( + *uvicorn_servers, + grpc_server, +) -> None: + """Install one exit handler that coordinates uvicorn and gRPC shutdown. Uvicorn installs per-server signal handlers; the last server's handler overwrites earlier ones. This replaces handle_exit on every server with - a shared callback that sets should_exit / force_exit on all of them. + a shared callback that sets should_exit / force_exit on all of them and + requests gRPC shutdown alongside the uvicorn servers. """ import signal as _signal + loop = asyncio.get_running_loop() + shutdown_requested = False + + def _request_grpc_shutdown(force: bool) -> None: + if loop.is_closed(): + return + grace = None if force else GRPC_SHUTDOWN_GRACE_SECONDS + loop.create_task(grpc_server.stop(grace=grace)) + def _shared_exit(sig, frame): - force = all(s.should_exit for s in servers) - for s in servers: - if force and sig == _signal.SIGINT: + nonlocal shutdown_requested + force = shutdown_requested and sig == _signal.SIGINT + shutdown_requested = True + + for s in uvicorn_servers: + if force: s.force_exit = True else: s.should_exit = True - for s in servers: + _request_grpc_shutdown(force) + + for s in uvicorn_servers: s.handle_exit = _shared_exit +GRPC_SHUTDOWN_GRACE_SECONDS = 5 + + async def _run_servers( host: str, port: int, @@ -522,11 +543,13 @@ async def _run_servers( if reload_dirs: shared_kwargs["reload_dirs"] = reload_dirs + # TODO #99 Create the manager and pass it into the Server constructors instead of injecting it into the app state. + main_server = uvicorn.Server(uvicorn.Config("agentevals.api.app:app", port=port, **shared_kwargs)) otlp_http_server = uvicorn.Server( uvicorn.Config("agentevals.api.otlp_app:otlp_app", port=otlp_http_port, **shared_kwargs) ) - servers: list = [main_server, otlp_http_server] + uvicorn_servers: list = [main_server, otlp_http_server] if mcp_port is not None: from .mcp_server import create_server as create_mcp_server @@ -540,9 +563,8 @@ async def _run_servers( mcp_app = mcp_instance.streamable_http_app() mcp_kwargs = {**shared_kwargs, "reload": False, "port": mcp_port} mcp_uvicorn = uvicorn.Server(uvicorn.Config(mcp_app, **mcp_kwargs)) - servers.append(mcp_uvicorn) + uvicorn_servers.append(mcp_uvicorn) - _link_server_shutdown(*servers) from .api.app import app as main_app from .api.dependencies import require_trace_manager_from_app from .api.otlp_grpc import create_otlp_grpc_server @@ -551,10 +573,17 @@ async def _run_servers( otlp_grpc_server = create_otlp_grpc_server(host=host, port=otlp_grpc_port, manager=mgr) await otlp_grpc_server.start() + _install_shared_exit_handler( + *uvicorn_servers, + otlp_grpc_server, + ) + try: - await asyncio.gather(*(s.serve() for s in servers)) + await asyncio.gather(*(s.serve() for s in uvicorn_servers)) finally: - await otlp_grpc_server.stop(grace=1) + # The shared exit handler only requests gRPC shutdown on SIGINT/SIGTERM. + # We still await a final stop here so non-signal exits also clean up. + await otlp_grpc_server.stop(grace=GRPC_SHUTDOWN_GRACE_SECONDS) @main.command("serve") diff --git a/tests/test_otlp_receiver.py b/tests/test_otlp_receiver.py index 323107b..a47a390 100644 --- a/tests/test_otlp_receiver.py +++ b/tests/test_otlp_receiver.py @@ -1,8 +1,10 @@ """Tests for the OTLP HTTP receiver endpoints and session auto-management.""" import asyncio +import signal import sys from datetime import UTC, datetime, timedelta +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest @@ -19,6 +21,7 @@ process_logs, process_traces, ) +from agentevals.cli import GRPC_SHUTDOWN_GRACE_SECONDS, _install_shared_exit_handler from agentevals.streaming.session import TraceSession from agentevals.streaming.ws_server import StreamingTraceManager @@ -1581,7 +1584,7 @@ def test_raises_when_bind_fails(self, monkeypatch): class _FakeAio: @staticmethod - def server(): + def server(**kwargs): return fake_server fake_grpc = MagicMock() @@ -1596,6 +1599,54 @@ def server(): ) +class _FakeGrpcServer: + def __init__(self): + self.grace_values: list[float | None] = [] + + async def stop(self, grace: float | None) -> None: + self.grace_values.append(grace) + + +class TestInstallSharedExitHandler: + def test_first_sigint_gracefully_stops_grpc(self): + async def go(): + server_a = SimpleNamespace(should_exit=False, force_exit=False, handle_exit=None) + server_b = SimpleNamespace(should_exit=False, force_exit=False, handle_exit=None) + grpc_server = _FakeGrpcServer() + + _install_shared_exit_handler(server_a, server_b, grpc_server=grpc_server) + + server_a.handle_exit(signal.SIGINT, None) + await asyncio.sleep(0) + + assert server_a.should_exit is True + assert server_b.should_exit is True + assert server_a.force_exit is False + assert server_b.force_exit is False + assert grpc_server.grace_values == [GRPC_SHUTDOWN_GRACE_SECONDS] + + _run(go()) + + def test_second_sigint_force_stops_grpc(self): + async def go(): + server_a = SimpleNamespace(should_exit=False, force_exit=False, handle_exit=None) + server_b = SimpleNamespace(should_exit=False, force_exit=False, handle_exit=None) + grpc_server = _FakeGrpcServer() + + _install_shared_exit_handler(server_a, server_b, grpc_server=grpc_server) + + server_a.handle_exit(signal.SIGINT, None) + await asyncio.sleep(0) + server_a.handle_exit(signal.SIGINT, None) + await asyncio.sleep(0) + + assert server_a.force_exit is True + assert server_b.force_exit is True + assert grpc_server.grace_values == [GRPC_SHUTDOWN_GRACE_SECONDS, None] + + _run(go()) + + class TestGrpcServices: def test_trace_service_export_creates_session(self): async def go(): From f12618f2025184d9c809f81ad7cd1944d97a842a Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Thu, 2 Apr 2026 12:06:38 +0200 Subject: [PATCH 4/7] Apply feedback --- examples/kubernetes/README.md | 75 +-------------------------------- src/agentevals/api/otlp_grpc.py | 7 +++ src/agentevals/cli.py | 13 ++---- tests/test_otlp_receiver.py | 11 +++-- 4 files changed, 20 insertions(+), 86 deletions(-) diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index f89df56..71844af 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -252,77 +252,4 @@ helm uninstall kagent-crds -n kagent helm uninstall otel-collector -n kagent helm uninstall agentevals kubectl delete namespace kagent -``` - -**With `helm-agent` (gpt-4.1-mini):** - -1. Select `helm-agent` from the agent list -2. Start a new conversation -3. Ask: *"List all Helm releases across all namespaces and tell me which ones have pending upgrades"* -4. Follow up: *"Show me the values for the agentevals release"* - -**With `helm-agent-gpt5` (gpt-5):** - -1. Select `helm-agent-gpt5` from the agent list -2. Start a new conversation -3. Ask the same questions in the same order - -### Step 5. Watch traces in agentevals - -Switch back to the agentevals Live view at http://localhost:8001. You will see two sessions appear, one for each conversation. Each session shows: - -- **Status** transitioning from ACTIVE to COMPLETED as the conversation ends -- **Span count** incrementing in real time as the agent makes LLM calls and tool invocations -- **Model name** visible in the session metadata - -### Step 6. Select the GPT-5 session as the eval set - -Once both sessions are complete: - -1. Click on the `helm-agent-gpt5` session card to open its trace details -2. Review the conversation: check that it called the right tools and produced correct responses -3. Click **Use as Eval Set** to mark this session as the evaluation baseline -4. Give it a name like `helm-agent-comparison` - -This captures the GPT-5 session's tool trajectory and final responses as the golden reference. - -image - -### Step 7. Evaluate both sessions - -1. Go back to the sessions list -2. Select both sessions (the `gpt-4.1-mini` session and the `gpt-5` session) -3. Click **Evaluate** -4. Select the `helm-agent-comparison` eval set -5. Choose the metrics: - - **tool_trajectory_avg_score**: Did the agent call the correct tools in the correct order? - - **response_match_score**: Did the agent produce responses consistent with the golden reference? -6. Run the evaluation - -### What to look for - -| Metric | What it tells you | -|--------|------------------| -| `tool_trajectory_avg_score` | Whether the agent followed the expected sequence of Helm tool calls (`helm-list`, then `helm-get-values`). A score of 1.0 means it matched exactly. | -| `response_match_score` | How closely the agent's final answers matched the GPT-5 baseline. Useful for catching regressions when switching to a cheaper model. | - -Compare the two sessions in the results table: - -- **Token usage**: The session metadata includes total token counts. If `gpt-5` consumed fewer tokens while achieving the same trajectory score, it may be the better choice for this use case. -- **Tool trajectory**: If one agent called extra tools or skipped expected ones, the trajectory score reflects that. -- **Response quality**: A lower response match score on the `gpt-4.1-mini` session highlights where the cheaper model diverged from the GPT-5 baseline. - -image - -You can also click an individual conversation and see a breakdown of each evaluators. - -image - -## Cleanup - -```bash -helm uninstall kagent -n kagent -helm uninstall kagent-crds -n kagent -helm uninstall agentevals -kubectl delete namespace kagent -``` +``` \ No newline at end of file diff --git a/src/agentevals/api/otlp_grpc.py b/src/agentevals/api/otlp_grpc.py index 332bb95..0c8c29d 100644 --- a/src/agentevals/api/otlp_grpc.py +++ b/src/agentevals/api/otlp_grpc.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) +GRPC_SHUTDOWN_GRACE_SECONDS = 5 DEFAULT_GRPC_MAX_CONCURRENT_RPCS = 32 DEFAULT_GRPC_MAX_MESSAGE_BYTES = 8 * 1024 * 1024 @@ -91,3 +92,9 @@ def create_otlp_grpc_server( max_message_bytes, ) return server + + +async def stop_otlp_grpc_server(server: aio.Server, *, force: bool = False) -> None: + """Stop the OTLP gRPC server with graceful or forced semantics.""" + grace = 0 if force else GRPC_SHUTDOWN_GRACE_SECONDS + await server.stop(grace=grace) diff --git a/src/agentevals/cli.py b/src/agentevals/cli.py index 69066f0..e2d693e 100644 --- a/src/agentevals/cli.py +++ b/src/agentevals/cli.py @@ -19,6 +19,7 @@ import click from . import __version__ +from .api.otlp_grpc import create_otlp_grpc_server, stop_otlp_grpc_server def _relative_time(iso_str: str | None) -> str: @@ -491,15 +492,14 @@ def _install_shared_exit_handler( requests gRPC shutdown alongside the uvicorn servers. """ import signal as _signal - + loop = asyncio.get_running_loop() shutdown_requested = False def _request_grpc_shutdown(force: bool) -> None: if loop.is_closed(): return - grace = None if force else GRPC_SHUTDOWN_GRACE_SECONDS - loop.create_task(grpc_server.stop(grace=grace)) + loop.create_task(stop_otlp_grpc_server(grpc_server, force=force)) def _shared_exit(sig, frame): nonlocal shutdown_requested @@ -517,10 +517,6 @@ def _shared_exit(sig, frame): for s in uvicorn_servers: s.handle_exit = _shared_exit - -GRPC_SHUTDOWN_GRACE_SECONDS = 5 - - async def _run_servers( host: str, port: int, @@ -567,7 +563,6 @@ async def _run_servers( from .api.app import app as main_app from .api.dependencies import require_trace_manager_from_app - from .api.otlp_grpc import create_otlp_grpc_server mgr = require_trace_manager_from_app(main_app) otlp_grpc_server = create_otlp_grpc_server(host=host, port=otlp_grpc_port, manager=mgr) @@ -583,7 +578,7 @@ async def _run_servers( finally: # The shared exit handler only requests gRPC shutdown on SIGINT/SIGTERM. # We still await a final stop here so non-signal exits also clean up. - await otlp_grpc_server.stop(grace=GRPC_SHUTDOWN_GRACE_SECONDS) + await stop_otlp_grpc_server(otlp_grpc_server) @main.command("serve") diff --git a/tests/test_otlp_receiver.py b/tests/test_otlp_receiver.py index a47a390..f153f3d 100644 --- a/tests/test_otlp_receiver.py +++ b/tests/test_otlp_receiver.py @@ -9,7 +9,12 @@ import pytest -from agentevals.api.otlp_grpc import OtlpLogsService, OtlpTraceService, create_otlp_grpc_server +from agentevals.api.otlp_grpc import ( + GRPC_SHUTDOWN_GRACE_SECONDS, + OtlpLogsService, + OtlpTraceService, + create_otlp_grpc_server, +) from agentevals.api.otlp_processing import ( _convert_otlp_log_record, _extract_agentevals_metadata, @@ -21,7 +26,7 @@ process_logs, process_traces, ) -from agentevals.cli import GRPC_SHUTDOWN_GRACE_SECONDS, _install_shared_exit_handler +from agentevals.cli import _install_shared_exit_handler from agentevals.streaming.session import TraceSession from agentevals.streaming.ws_server import StreamingTraceManager @@ -1642,7 +1647,7 @@ async def go(): assert server_a.force_exit is True assert server_b.force_exit is True - assert grpc_server.grace_values == [GRPC_SHUTDOWN_GRACE_SECONDS, None] + assert grpc_server.grace_values == [GRPC_SHUTDOWN_GRACE_SECONDS, 0] _run(go()) From 4349e002458c06a5923acdeb007093d156e3925a Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Thu, 2 Apr 2026 14:34:41 +0200 Subject: [PATCH 5/7] Fix linter --- src/agentevals/api/otlp_grpc.py | 4 +--- src/agentevals/api/otlp_processing.py | 2 -- src/agentevals/cli.py | 3 ++- tests/test_otlp_receiver.py | 4 +--- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/agentevals/api/otlp_grpc.py b/src/agentevals/api/otlp_grpc.py index 0c8c29d..06034d3 100644 --- a/src/agentevals/api/otlp_grpc.py +++ b/src/agentevals/api/otlp_grpc.py @@ -65,9 +65,7 @@ def create_otlp_grpc_server( try: import grpc except ImportError as exc: # pragma: no cover - environment-dependent - raise RuntimeError( - "OTLP gRPC receiver requires grpcio. Install with: pip install grpcio" - ) from exc + raise RuntimeError("OTLP gRPC receiver requires grpcio. Install with: pip install grpcio") from exc server = grpc.aio.server( compression=grpc.Compression.Gzip, diff --git a/src/agentevals/api/otlp_processing.py b/src/agentevals/api/otlp_processing.py index 548a2ce..5dc6038 100644 --- a/src/agentevals/api/otlp_processing.py +++ b/src/agentevals/api/otlp_processing.py @@ -345,5 +345,3 @@ def _parse_otlp_body(body_raw: dict) -> dict | str: except (json.JSONDecodeError, TypeError): return raw return _parse_otlp_any_value(body_raw) - - diff --git a/src/agentevals/cli.py b/src/agentevals/cli.py index e2d693e..db68753 100644 --- a/src/agentevals/cli.py +++ b/src/agentevals/cli.py @@ -492,7 +492,7 @@ def _install_shared_exit_handler( requests gRPC shutdown alongside the uvicorn servers. """ import signal as _signal - + loop = asyncio.get_running_loop() shutdown_requested = False @@ -517,6 +517,7 @@ def _shared_exit(sig, frame): for s in uvicorn_servers: s.handle_exit = _shared_exit + async def _run_servers( host: str, port: int, diff --git a/tests/test_otlp_receiver.py b/tests/test_otlp_receiver.py index f153f3d..629e9dc 100644 --- a/tests/test_otlp_receiver.py +++ b/tests/test_otlp_receiver.py @@ -1690,9 +1690,7 @@ async def go(): span_id=_hex_to_bytes(SPAN_ID_HEX), body=AnyValue(string_value='{"content": "hello from grpc"}'), ) - log_record.attributes.append( - KeyValue(key="event.name", value=AnyValue(string_value="gen_ai.user.message")) - ) + log_record.attributes.append(KeyValue(key="event.name", value=AnyValue(string_value="gen_ai.user.message"))) scope_logs = ScopeLogs(log_records=[log_record]) resource_logs = ResourceLogs(scope_logs=[scope_logs]) request = LogsServiceRequestPB(resource_logs=[resource_logs]) From 0018be21477a5f02770e59828b630c6af01c0802 Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Thu, 2 Apr 2026 18:28:59 +0200 Subject: [PATCH 6/7] Apply feedback --- examples/kubernetes/README.md | 6 ++++-- src/agentevals/cli.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 71844af..331efa8 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -68,7 +68,7 @@ helm upgrade --install otel-collector open-telemetry/opentelemetry-collector \ --set config.service.pipelines.logs.exporters[0]=otlp ``` -> **Note:** If you deployed agentevals in a namespace other than `default`, update the `endpoint` value accordingly: `http://agentevals..svc.cluster.local:4318`. +> **Note:** If you deployed agentevals in a namespace other than `default`, update the `endpoint` value accordingly: `http://agentevals..svc.cluster.local:4317`. ### 3. kagent @@ -98,6 +98,8 @@ helm upgrade --install kagent oci://ghcr.io/kagent-dev/kagent/helm/kagent \ This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter at the Collector. +> **Note:** In case you are not using the OTel Collector, you can set the `otel.tracing.exporter.otlp.endpoint` to `agentevals.default.svc.cluster.local:4317`. + ### Verify the deployment ```bash @@ -252,4 +254,4 @@ helm uninstall kagent-crds -n kagent helm uninstall otel-collector -n kagent helm uninstall agentevals kubectl delete namespace kagent -``` \ No newline at end of file +``` diff --git a/src/agentevals/cli.py b/src/agentevals/cli.py index 5dff35c..3fd7dbd 100644 --- a/src/agentevals/cli.py +++ b/src/agentevals/cli.py @@ -580,7 +580,7 @@ async def _run_servers( _install_shared_exit_handler( *uvicorn_servers, - otlp_grpc_server, + grpc_server=otlp_grpc_server, ) try: From 606d42ca4078f4258e3700d52f6f129f30b11259 Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Thu, 2 Apr 2026 18:35:05 +0200 Subject: [PATCH 7/7] Apply feedback --- examples/kubernetes/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/kubernetes/README.md b/examples/kubernetes/README.md index 331efa8..3a269f3 100644 --- a/examples/kubernetes/README.md +++ b/examples/kubernetes/README.md @@ -98,7 +98,7 @@ helm upgrade --install kagent oci://ghcr.io/kagent-dev/kagent/helm/kagent \ This installs kagent with only the default Helm agent (`helm-agent`) and the K8s troubleshooter enabled, and points its OTel exporter at the Collector. -> **Note:** In case you are not using the OTel Collector, you can set the `otel.tracing.exporter.otlp.endpoint` to `agentevals.default.svc.cluster.local:4317`. +> **Note:** If you are not running an OTel Collector, point `otel.tracing.exporter.otlp.endpoint` directly to the agentevals OTLP gRPC endpoint instead: `agentevals.default.svc.cluster.local:4317`. ### Verify the deployment