Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/custom_evaluators/eval_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ evaluators:
ref: evaluators/random_evaluator/random_evaluator.py
threshold: 0.110
executor: local

49 changes: 39 additions & 10 deletions src/agentevals/custom_evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def is_available(self) -> bool:


class PythonRuntime(Runtime):
def __init__(self, python_path: Path | None = None):
self._exe = str(python_path) if python_path else sys.executable

@property
def name(self) -> str:
return "Python"
Expand All @@ -90,13 +93,16 @@ def extensions(self) -> tuple[str, ...]:
return (".py",)

def build_command(self, path: Path) -> list[str]:
return [sys.executable, str(path)]
return [self._exe, str(path)]

def is_available(self) -> bool:
return True


class NodeRuntime(Runtime):
def __init__(self) -> None:
self._exe = shutil.which("node")

@property
def name(self) -> str:
return "Node.js"
Expand All @@ -106,10 +112,12 @@ def extensions(self) -> tuple[str, ...]:
return (".js", ".ts")

def build_command(self, path: Path) -> list[str]:
node = shutil.which("node")
if not node:
if not self._exe:
raise RuntimeError("Node.js not found on PATH (required for .js/.ts evaluators)")
return [node, str(path)]
return [self._exe, str(path)]

def is_available(self) -> bool:
return self._exe is not None


_RUNTIMES: list[Runtime] = [
Expand Down Expand Up @@ -203,12 +211,13 @@ class SubprocessBackend(EvaluatorBackend):
"""Runs a local code file (.py, .js, .ts, …) as a subprocess.

The correct interpreter is resolved from the file extension via the
:data:`_RUNTIMES` registry.
:data:`_RUNTIMES` registry. Pass a pre-configured *runtime* to override
the default (e.g. a :class:`PythonRuntime` with a venv interpreter).
"""

def __init__(self, path: Path, timeout: int = 30):
def __init__(self, path: Path, timeout: int = 30, runtime: Runtime | None = None):
self._path = path.resolve()
self._runtime = _resolve_runtime(self._path)
self._runtime = runtime or _resolve_runtime(self._path)
self._timeout = timeout

if not self._path.exists():
Expand All @@ -223,7 +232,7 @@ async def run(self, eval_input: EvalInput, metric_name: str) -> EvalResult:
# Executor factory
# ---------------------------------------------------------------------------

_EXECUTOR_FACTORIES: dict[str, Callable[[Path, int], EvaluatorBackend]] = {
_EXECUTOR_FACTORIES: dict[str, Callable[..., EvaluatorBackend]] = {
"local": lambda path, timeout: SubprocessBackend(path, timeout),
}

Expand All @@ -236,7 +245,7 @@ def create_executor(executor_name: str, path: Path, timeout: int = 30) -> Evalua
return factory(path, timeout)


def register_executor(name: str, factory: Callable[[Path, int], EvaluatorBackend]) -> None:
def register_executor(name: str, factory: Callable[..., EvaluatorBackend]) -> None:
"""Register a new executor factory (e.g. for Docker support)."""
_EXECUTOR_FACTORIES[name] = factory

Expand Down Expand Up @@ -425,7 +434,27 @@ async def evaluate_custom_evaluator(
evaluator_def = await get_default_resolver().resolve(evaluator_def)

if isinstance(evaluator_def, CodeEvaluatorDef):
backend = create_executor(evaluator_def.executor, Path(evaluator_def.path), evaluator_def.timeout)
evaluator_path = Path(evaluator_def.path)

runtime: Runtime | None = None
if evaluator_path.suffix == ".py":
from .evaluator.venv import ensure_venv_async

try:
venv_python = await ensure_venv_async(evaluator_path)
except Exception as exc:
logger.error("Failed to set up venv for '%s': %s", evaluator_def.name, exc)
return MetricResult(
metric_name=evaluator_def.name,
error=f"Dependency installation failed: {exc}",
)
if venv_python:
runtime = PythonRuntime(python_path=venv_python)

if runtime is not None:
backend = SubprocessBackend(evaluator_path, evaluator_def.timeout, runtime=runtime)
else:
backend = create_executor(evaluator_def.executor, evaluator_path, evaluator_def.timeout)
else:
raise ValueError(f"Unsupported custom evaluator type: {type(evaluator_def).__name__}")

Expand Down
26 changes: 23 additions & 3 deletions src/agentevals/evaluator/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from pathlib import Path, PurePosixPath

import yaml

Expand Down Expand Up @@ -216,8 +216,22 @@ async def fetch_evaluator(self, ref: str, dest: Path) -> Path:
resp = await client.get(url, headers=self._headers(), timeout=30)
resp.raise_for_status()

dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(resp.text, encoding="utf-8") # noqa: ASYNC240
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(resp.text, encoding="utf-8") # noqa: ASYNC240

# Also try to fetch requirements.txt from the same directory.
ref_dir = str(PurePosixPath(ref).parent)
req_ref = f"{ref_dir}/requirements.txt"
req_url = self._raw_url(req_ref)
try:
req_resp = await client.get(req_url, headers=self._headers(), timeout=15)
if req_resp.status_code == 200:
req_dest = dest.parent / "requirements.txt"
req_dest.write_text(req_resp.text, encoding="utf-8") # noqa: ASYNC240
logger.info("Downloaded requirements.txt for evaluator")
except httpx.HTTPError:
logger.debug("No requirements.txt found for evaluator (or download failed)")

return dest


Expand Down Expand Up @@ -267,6 +281,12 @@ async def fetch_evaluator(self, ref: str, dest: Path) -> Path:

dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dest)

# Also copy requirements.txt if it exists alongside the source file.
req_src = src.parent / "requirements.txt"
if req_src.exists():
shutil.copy2(req_src, dest.parent / "requirements.txt")

return dest


Expand Down
119 changes: 119 additions & 0 deletions src/agentevals/evaluator/venv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Virtual environment management for evaluators with dependencies.

When an evaluator ships a ``requirements.txt`` alongside its entrypoint, we
create a cached venv, install the dependencies (plus the evaluator SDK), and
return the path to that venv's Python interpreter so the evaluator subprocess
runs in isolation.
"""

from __future__ import annotations

import asyncio
import hashlib
import logging
import os
import shutil
import subprocess
import sys
from pathlib import Path

logger = logging.getLogger(__name__)

_VENV_CACHE_DIR = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache")) / "agentevals" / "venvs"
_HASH_FILE = ".requirements_hash"

# Per-evaluator locks to prevent concurrent venv creation for the same evaluator.
_venv_locks: dict[str, asyncio.Lock] = {}


def _venv_python(venv_dir: Path) -> Path:
if sys.platform == "win32":
return venv_dir / "Scripts" / "python.exe"
return venv_dir / "bin" / "python"


def _venv_key(evaluator_path: Path) -> str:
"""Stable cache directory name derived from evaluator location."""
resolved = evaluator_path.resolve()
name = resolved.parent.name
path_hash = hashlib.sha256(str(resolved.parent).encode()).hexdigest()[:8]
return f"{name}-{path_hash}"


def _is_venv_valid(venv_dir: Path, req_hash: str) -> bool:
hash_file = venv_dir / _HASH_FILE
return _venv_python(venv_dir).exists() and hash_file.exists() and hash_file.read_text().strip() == req_hash


def _create_venv(venv_dir: Path, uv: str | None) -> None:
if venv_dir.exists():
shutil.rmtree(venv_dir)
cmd = (
[uv, "venv", str(venv_dir), "--python", sys.executable] if uv else [sys.executable, "-m", "venv", str(venv_dir)]
)
subprocess.run(cmd, check=True, capture_output=True)


def _install_deps(venv_dir: Path, requirements: Path, uv: str | None) -> None:
python = str(_venv_python(venv_dir))
sdk_spec = "agentevals-evaluator-sdk"

if uv:
base = [uv, "pip", "install", "--python", python]
else:
base = [python, "-m", "pip", "install"]

subprocess.run(base + [sdk_spec], check=True, capture_output=True)
logger.info("Installing dependencies from %s ...", requirements.name)
subprocess.run(base + ["-r", str(requirements)], check=True, capture_output=True)


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


def ensure_venv(evaluator_path: Path) -> Path | None:
"""Ensure a cached venv exists for *evaluator_path* if it has ``requirements.txt``.

Returns the venv Python path, or ``None`` if no venv is needed.
"""
requirements = evaluator_path.resolve().parent / "requirements.txt"
if not requirements.exists():
return None

req_hash = hashlib.sha256(requirements.read_bytes()).hexdigest()
venv_dir = _VENV_CACHE_DIR / _venv_key(evaluator_path)

if _is_venv_valid(venv_dir, req_hash):
logger.debug("Using cached venv for %s at %s", evaluator_path.name, venv_dir)
return _venv_python(venv_dir)

uv = shutil.which("uv")
logger.info(
"Setting up environment for evaluator '%s' (using %s). This may take a while on first run...",
evaluator_path.stem,
"uv" if uv else "venv+pip",
)

try:
venv_dir.parent.mkdir(parents=True, exist_ok=True)
_create_venv(venv_dir, uv)
_install_deps(venv_dir, requirements, uv)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode() if isinstance(exc.stderr, bytes) else (exc.stderr or "")
raise RuntimeError(f"Failed to set up environment for evaluator '{evaluator_path.stem}': {stderr}") from exc

(venv_dir / _HASH_FILE).write_text(req_hash)
logger.info("Environment ready for '%s'", evaluator_path.stem)
return _venv_python(venv_dir)


async def ensure_venv_async(evaluator_path: Path) -> Path | None:
"""Async wrapper around :func:`ensure_venv` with per-evaluator locking."""
venv_key = _venv_key(evaluator_path)
if venv_key not in _venv_locks:
_venv_locks[venv_key] = asyncio.Lock()

async with _venv_locks[venv_key]:
return await asyncio.to_thread(ensure_venv, evaluator_path)