diff --git a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/local/local_sandbox_executor.py b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/local/local_sandbox_executor.py index 76171091..578e9ce7 100644 --- a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/local/local_sandbox_executor.py +++ b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/local/local_sandbox_executor.py @@ -134,21 +134,29 @@ def _command_env(self, workspace_root: Path, venv: Path) -> dict[str, str]: env["PATH"] = bindir + os.pathsep + env.get("PATH", "") return env - def _copy_skills_to_workspace(self, thread_id: Optional[str] = None) -> None: + def _copy_skills_to_workspace( + self, + thread_id: Optional[str] = None, + cuga_folder: Optional[str] = None, + skills_enabled: Optional[bool] = None, + ) -> None: """Copy discovered skill folders into the per-thread /workspace/skills directory.""" from cuga.config import settings - if not getattr(settings.skills, "enabled", False): + enabled = skills_enabled if skills_enabled is not None else getattr(settings.skills, "enabled", False) + if not enabled: return try: from cuga.backend.skills.loader import discover_skills except Exception: return - cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or ( - getattr(settings.policy, "cuga_folder", None) or "" - ).strip() - skill_entries = discover_skills(cuga_folder or None) + resolved_folder = ( + cuga_folder + or (os.getenv("CUGA_FOLDER") or "").strip() + or (getattr(settings.policy, "cuga_folder", None) or "").strip() + ) + skill_entries = discover_skills(resolved_folder or None) copied = 0 for skill_entry in skill_entries: @@ -236,14 +244,19 @@ async def run_command(cmd: str) -> str: return run_command - def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]: + def create_sandbox_tools( + self, + thread_id: Optional[str] = None, + cuga_folder: Optional[str] = None, + skills_enabled: Optional[bool] = None, + ) -> list[StructuredTool]: """Return the run_command StructuredTool for local (unsandboxed) execution. Filesystem tools (read/write/list/edit/...) are no longer produced here — they come from the consolidated ``filesystem`` package via ``create_filesystem_tools`` (see ``cuga_lite_graph``). """ - self._copy_skills_to_workspace(thread_id) + self._copy_skills_to_workspace(thread_id, cuga_folder=cuga_folder, skills_enabled=skills_enabled) return [ StructuredTool.from_function( coroutine=self.create_run_command_tool(thread_id), diff --git a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/native/native_sandbox_executor.py b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/native/native_sandbox_executor.py index 00897b57..5c3aa2fa 100644 --- a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/native/native_sandbox_executor.py +++ b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/native/native_sandbox_executor.py @@ -147,23 +147,29 @@ async def _ensure_venv(self) -> None: "subsequent run_command calls will report `activate: No such file or directory`" ) - def _copy_skills_to_workspace(self, thread_id: Optional[str] = None) -> None: + def _copy_skills_to_workspace( + self, + thread_id: Optional[str] = None, + cuga_folder: Optional[str] = None, + skills_enabled: Optional[bool] = None, + ) -> None: """Copy discovered skill folders into the hidden per-thread /workspace/skills directory.""" from cuga.config import settings - if not getattr(settings.skills, "enabled", False): + enabled = skills_enabled if skills_enabled is not None else getattr(settings.skills, "enabled", False) + if not enabled: return try: from cuga.backend.skills.loader import discover_skills except Exception: return - import os - - cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or ( - getattr(settings.policy, "cuga_folder", None) or "" - ).strip() - skill_entries = discover_skills(cuga_folder or None) + resolved_folder = ( + cuga_folder + or (os.getenv("CUGA_FOLDER") or "").strip() + or (getattr(settings.policy, "cuga_folder", None) or "").strip() + ) + skill_entries = discover_skills(resolved_folder or None) copied = 0 for skill_entry in skill_entries: @@ -271,14 +277,19 @@ async def run_command(cmd: str) -> str: return run_command - def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]: + def create_sandbox_tools( + self, + thread_id: Optional[str] = None, + cuga_folder: Optional[str] = None, + skills_enabled: Optional[bool] = None, + ) -> list[StructuredTool]: """Return the run_command StructuredTool for native macOS sandbox-exec. Filesystem tools (read/write/list/edit/...) now come from the consolidated ``filesystem`` package via ``create_filesystem_tools`` (see ``cuga_lite_graph``); they are no longer produced here. """ - self._copy_skills_to_workspace(thread_id) + self._copy_skills_to_workspace(thread_id, cuga_folder=cuga_folder, skills_enabled=skills_enabled) return [ StructuredTool.from_function( coroutine=self.create_run_command_tool(thread_id), diff --git a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/opensandbox/opensandbox_executor.py b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/opensandbox/opensandbox_executor.py index 9b3b4e25..fdf87db6 100644 --- a/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/opensandbox/opensandbox_executor.py +++ b/src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/opensandbox/opensandbox_executor.py @@ -29,6 +29,7 @@ from __future__ import annotations +import asyncio import os from datetime import timedelta from pathlib import Path @@ -88,6 +89,22 @@ class OpenSandboxExecutor(RemoteExecutor): # Interpreter cache: thread_id -> CodeInterpreter instance _sandboxes: dict[str, Any] = {} + # Per-thread skills config requested on the current invocation + _skills_config: dict[str, tuple[Optional[str], Optional[bool]]] = {} + # Skills config that was actually applied when the sandbox was created + _active_skills_config: dict[str, tuple[Optional[str], Optional[bool]]] = {} + # Per-key asyncio locks to prevent concurrent sandbox creation for the same thread + _locks: dict[str, asyncio.Lock] = {} + + def _get_key_lock(self, key: str) -> asyncio.Lock: + """Return (creating if necessary) the asyncio.Lock for key. + + Safe to call without holding any other lock: in asyncio's single-threaded + model there is no preemption between the dict check and the assignment. + """ + if key not in self._locks: + self._locks[key] = asyncio.Lock() + return self._locks[key] # ------------------------------------------------------------------ # # Sandbox lifecycle # @@ -100,56 +117,75 @@ def _get_connection_config(self): return ConnectionConfig(domain=domain) async def _get_or_create_interpreter(self, thread_id: Optional[str] = None) -> Any: - """Return a cached CodeInterpreter for thread_id, creating one if needed.""" + """Return a cached CodeInterpreter for thread_id, creating one if needed. + + A per-key asyncio.Lock prevents concurrent coroutines from creating + duplicate sandboxes for the same thread_id. + """ from opensandbox import Sandbox # type: ignore[import] from code_interpreter import CodeInterpreter # type: ignore[import] key = thread_id or "_default" - existing = self._sandboxes.get(key) - if existing is not None: - try: - await existing.sandbox.commands.run("true") - return existing - except Exception: - logger.debug(f"[OpenSandboxExecutor] Interpreter for thread={key} is dead, recreating") - self._sandboxes.pop(key, None) - - cfg = _cfg() - image = getattr(cfg, "opensandbox_image", "opensandbox/code-interpreter:v1.0.2") - timeout_s = int(getattr(cfg, "opensandbox_timeout_seconds", 600)) - python_version = getattr(cfg, "opensandbox_python_version", "3.11") - entrypoint = getattr(cfg, "opensandbox_entrypoint", "/opt/opensandbox/code-interpreter.sh") - conn = self._get_connection_config() - - sandbox = await Sandbox.create( - image, - entrypoint=[entrypoint], - env={"PYTHON_VERSION": python_version}, - timeout=timedelta(seconds=timeout_s), - connection_config=conn, - ) - interpreter = await CodeInterpreter.create(sandbox) - - # Ensure the agent-facing workspace and shared Python venv exist. - # Agents see /workspace; /tmp/.venv remains an internal implementation detail. - await interpreter.sandbox.commands.run( - f"mkdir -p {VIRTUAL_WORKSPACE_ROOT} && cd {VIRTUAL_WORKSPACE_ROOT} && " - "(command -v uv >/dev/null 2>&1 || python -m pip install --quiet uv) && " - f"uv venv {VENV_PATH}" - ) - if getattr(_cfg(), "enabled", False): - await self._upload_skills_to_sandbox(interpreter) + async with self._get_key_lock(key): + existing = self._sandboxes.get(key) + if existing is not None: + try: + await existing.sandbox.commands.run("true") + return existing + except Exception: + logger.debug(f"[OpenSandboxExecutor] Interpreter for thread={key} is dead, recreating") + self._sandboxes.pop(key, None) + + cfg = _cfg() + image = getattr(cfg, "opensandbox_image", "opensandbox/code-interpreter:v1.0.2") + timeout_s = int(getattr(cfg, "opensandbox_timeout_seconds", 600)) + python_version = getattr(cfg, "opensandbox_python_version", "3.11") + entrypoint = getattr(cfg, "opensandbox_entrypoint", "/opt/opensandbox/code-interpreter.sh") + conn = self._get_connection_config() + + sandbox = await Sandbox.create( + image, + entrypoint=[entrypoint], + env={"PYTHON_VERSION": python_version}, + timeout=timedelta(seconds=timeout_s), + connection_config=conn, + ) + interpreter = await CodeInterpreter.create(sandbox) + + # Ensure the agent-facing workspace and shared Python venv exist. + # Agents see /workspace; /tmp/.venv remains an internal implementation detail. + await interpreter.sandbox.commands.run( + f"mkdir -p {VIRTUAL_WORKSPACE_ROOT} && cd {VIRTUAL_WORKSPACE_ROOT} && " + "(command -v uv >/dev/null 2>&1 || python -m pip install --quiet uv) && " + f"uv venv {VENV_PATH}" + ) - self._sandboxes[key] = interpreter - logger.info(f"[OpenSandboxExecutor] Created interpreter for thread={key}") - return interpreter + skill_cuga_folder, skill_enabled = self._skills_config.get(key, (None, None)) + enabled = skill_enabled if skill_enabled is not None else getattr(_cfg(), "enabled", False) + if enabled: + try: + await self._upload_skills_to_sandbox(interpreter, cuga_folder=skill_cuga_folder) + except Exception as exc: + # Log and continue — the sandbox is still usable; skills companion + # files will not be available, but the agent can still run commands. + logger.warning( + f"[OpenSandboxExecutor] Skills upload failed for thread={key}: {exc}. " + "Sandbox is cached but skill companion files may be unavailable." + ) + + # Cache the interpreter regardless of upload outcome to avoid leaking + # orphaned remote containers on repeated failures. + self._sandboxes[key] = interpreter + self._active_skills_config[key] = (skill_cuga_folder, skill_enabled) + logger.info(f"[OpenSandboxExecutor] Created interpreter for thread={key}") + return interpreter async def get_interpreter_for_thread(self, thread_id: Optional[str] = None) -> Any: """Return the CodeInterpreter for thread_id, creating the sandbox if needed (workspace API, tools).""" return await self._get_or_create_interpreter(thread_id) - async def _upload_skills_to_sandbox(self, interpreter: Any) -> None: + async def _upload_skills_to_sandbox(self, interpreter: Any, cuga_folder: Optional[str] = None) -> None: """Upload discovered skill folders into /workspace/skills/ in the sandbox. Mirrors ``cuga.backend.skills.loader.discover_skills`` precedence: @@ -159,8 +195,12 @@ async def _upload_skills_to_sandbox(self, interpreter: Any) -> None: from opensandbox.models import WriteEntry # type: ignore[import] from cuga.backend.skills.loader import discover_skills - cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or (settings.policy.cuga_folder or "").strip() - skill_entries = discover_skills(cuga_folder or None) + resolved_folder = ( + cuga_folder + or (os.getenv("CUGA_FOLDER") or "").strip() + or (settings.policy.cuga_folder or "").strip() + ) + skill_entries = discover_skills(resolved_folder or None) entries_by_path: dict[str, WriteEntry] = {} for skill_entry in skill_entries: @@ -185,16 +225,23 @@ async def _upload_skills_to_sandbox(self, interpreter: Any) -> None: if entries_by_path: entries = list(entries_by_path.values()) - await interpreter.sandbox.files.write_files(entries) - logger.info( - f"[OpenSandboxExecutor] Uploaded {len(entries)} skill files to " - f"{VIRTUAL_WORKSPACE_ROOT}/skills/" - ) + try: + await interpreter.sandbox.files.write_files(entries) + logger.info( + f"[OpenSandboxExecutor] Uploaded {len(entries)} skill files to " + f"{VIRTUAL_WORKSPACE_ROOT}/skills/" + ) + except Exception as exc: + logger.warning(f"[OpenSandbox] write_files failed ({len(entries)} skill files): {exc}") + raise async def release_sandbox(self, thread_id: Optional[str] = None) -> None: """Kill and remove the cached interpreter/sandbox for a thread.""" key = thread_id or "_default" interpreter = self._sandboxes.pop(key, None) + self._active_skills_config.pop(key, None) + self._skills_config.pop(key, None) + self._locks.pop(key, None) if interpreter: try: await interpreter.sandbox.kill() @@ -232,13 +279,20 @@ async def run_command(cmd: str) -> str: return run_command - def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]: + def create_sandbox_tools( + self, + thread_id: Optional[str] = None, + cuga_folder: Optional[str] = None, + skills_enabled: Optional[bool] = None, + ) -> list[StructuredTool]: """Return the run_command StructuredTool bound to thread_id. Filesystem tools (read/write/list/edit/...) now come from the consolidated ``filesystem`` package via a ``RemoteSandboxBackend`` (wired in ``cuga_lite_graph``); they are no longer produced here. """ + key = thread_id or "_default" + self._skills_config[key] = (cuga_folder, skills_enabled) return [ StructuredTool.from_function( coroutine=self.create_run_command_tool(thread_id), diff --git a/src/cuga/backend/cuga_graph/nodes/cuga_lite/prompt_utils.py b/src/cuga/backend/cuga_graph/nodes/cuga_lite/prompt_utils.py index 774e9143..86d9dff2 100644 --- a/src/cuga/backend/cuga_graph/nodes/cuga_lite/prompt_utils.py +++ b/src/cuga/backend/cuga_graph/nodes/cuga_lite/prompt_utils.py @@ -642,6 +642,11 @@ def create_mcp_prompt( processed_apps = format_apps_for_prompt(apps) if not enable_shell_tool: + if skills_enabled: + logger.warning( + "Skills are enabled but enable_shell_tool=False; the skills block will be suppressed. " + "Set advanced_features.enable_shell_tool=true to activate skills." + ) skills_enabled = False skills_prompt_section = "" diff --git a/src/cuga/backend/skills/loader.py b/src/cuga/backend/skills/loader.py index b7365ece..f468edb2 100644 --- a/src/cuga/backend/skills/loader.py +++ b/src/cuga/backend/skills/loader.py @@ -3,6 +3,7 @@ from __future__ import annotations import os +import re from pathlib import Path from typing import Any, Iterable, List, Sequence @@ -15,6 +16,21 @@ DEFAULT_GLOBAL_SKILLS_ROOT = "~/.config/agents/skills" LEGACY_GLOBAL_SKILLS_ROOT = "~/.config/cuga/skills" +# Matches Jinja2 expression/block/comment delimiters used in the system-prompt template. +# Stripping these at parse time prevents prompt-injection via malicious SKILL.md frontmatter. +_JINJA_RE = re.compile(r"\{\{.*?\}\}|\{%.*?%\}|\{#.*?#\}", re.DOTALL) + + +def _sanitize_for_prompt(value: str, field: str, source: Path) -> str: + """Strip Jinja2 template delimiters from a skill frontmatter string.""" + sanitized = _JINJA_RE.sub("", value) + if sanitized != value: + logger.warning( + f"Skill {source}: {field!r} contained Jinja2 template syntax and was sanitized. " + "This may indicate a malicious or misconfigured SKILL.md." + ) + return sanitized + def _resolve_path(path: str | Path) -> Path: p = Path(path).expanduser() @@ -122,9 +138,18 @@ def _parse_skill_file(path: Path) -> SkillEntry | None: logger.warning(f"Skill {path} missing name or description in frontmatter") return None + name_str = _sanitize_for_prompt(str(name).strip(), "name", path) + if re.search(r'[/\\]|\.\.', name_str): + logger.warning( + f"Skill {path} has unsafe name {name_str!r} (path separators or '..' not allowed), skipping" + ) + return None + + description_str = _sanitize_for_prompt(str(description).strip(), "description", path) + return SkillEntry( - name=str(name).strip(), - description=str(description).strip(), + name=name_str, + description=description_str, body=body.strip(), source=str(path), requirements=_normalize_requirements(frontmatter.get("requirements")), @@ -147,6 +172,10 @@ def discover_skills( for path in _iter_skill_files(skills_dir): entry = _parse_skill_file(path) if entry: + if entry.name in by_name: + logger.debug( + f"Skill '{entry.name}' from {path} overrides earlier entry from {by_name[entry.name].source}" + ) by_name[entry.name] = entry return list(by_name.values()) diff --git a/src/cuga/backend/skills/registry.py b/src/cuga/backend/skills/registry.py index ea5b5da5..d0048b45 100644 --- a/src/cuga/backend/skills/registry.py +++ b/src/cuga/backend/skills/registry.py @@ -37,10 +37,12 @@ def load_skill(self, name: str) -> str: if pip_pkgs: setup_lines.append(f"await run_command('uv pip install --quiet {' '.join(pip_pkgs)}')") setup_lines.append("await asyncio.sleep(5)") + setup_lines.append(f"await run_command('uv pip show {' '.join(pip_pkgs)}')") if npm_pkgs: # Install locally in the working dir so require() resolves correctly setup_lines.append(f"await run_command('npm install {' '.join(npm_pkgs)}')") setup_lines.append("await asyncio.sleep(5)") + setup_lines.append(f"await run_command('npm list {' '.join(npm_pkgs)}')") setup_script = "\n".join(setup_lines) parts.append( "⚠️ STEP 1 — INSTALL REQUIREMENTS (MANDATORY — your very first code block, no exceptions):\n" diff --git a/src/cuga/sdk.py b/src/cuga/sdk.py index 75fb53ce..b04656d2 100644 --- a/src/cuga/sdk.py +++ b/src/cuga/sdk.py @@ -1225,6 +1225,8 @@ def __init__( reset_policy_storage: bool = False, filesystem_sync: Optional[bool] = None, enable_knowledge: Optional[bool] = None, + enable_skills: Optional[bool] = None, + skills_folder: Optional[str] = None, ): """ Initialize the CUGA Agent. @@ -1241,6 +1243,8 @@ def __init__( reset_policy_storage: If True, clears all existing policies from storage on init filesystem_sync: If True, saves policies to .cuga when added/updated (default: True) enable_knowledge: If True, enable knowledge tools; False to disable; None to auto-detect from settings + enable_skills: If True, enable agent skills (SKILL.md / load_skill). None = auto from settings. + skills_folder: Root folder that contains .agents/skills/. Defaults to cwd / CUGA_FOLDER env var. Example with tool approval policy: ```python @@ -1287,6 +1291,10 @@ def __init__( # Knowledge configuration self._enable_knowledge = enable_knowledge # None = auto from settings + # Skills configuration + self._enable_skills = enable_skills # None = auto from settings + self._skills_folder = skills_folder # None = use CUGA_FOLDER / cwd + # Setup tool provider if tool_provider: self.tool_provider = tool_provider @@ -1823,6 +1831,20 @@ async def invoke( # Pass track_tool_calls flag via configurable run_config["configurable"]["track_tool_calls"] = track_tool_calls + # Pass skills configuration via configurable (overrides settings when set) + if self._enable_skills is not None: + run_config["configurable"]["skills_enabled"] = self._enable_skills + if self._skills_folder is not None: + # discover_skills() expects the .cuga subfolder path, not the workspace root. + # Convert workspace root → workspace_root/.cuga so that .agents/skills/ resolves correctly. + # Guard against double-suffixing if the caller already passes a .cuga-suffixed path. + from pathlib import Path as _Path + + _sf = _Path(self._skills_folder) + if _sf.name != ".cuga": + _sf = _sf / ".cuga" + run_config["configurable"]["skills_folder"] = str(_sf) + # Ensure graph is created (needed for state retrieval) _ = self.graph @@ -2001,6 +2023,21 @@ async def invoke( error_msg = result['error'] final_answer = f"Error: {error_msg}" + # Fallback: if final_answer is still empty, look at the last non-empty AI message. + # Reasoning models sometimes return content='' with the answer only in + # additional_kwargs['reasoning_content'], so check both fields. + if not final_answer: + for msg in reversed(result.get("chat_messages", [])): + if getattr(msg, "type", None) != "ai": + continue + text = getattr(msg, "content", "") or ( + getattr(msg, "additional_kwargs", {}).get("reasoning_content", "") + ) + if text: + final_answer = text + logger.debug("final_answer extracted from last AI chat message (fallback)") + break + # Check if graph interrupted for approval if not final_answer: try: @@ -2113,6 +2150,17 @@ async def stream( # Setup config (shallow-copied so we don't mutate the caller's dict) run_config = self._prepare_run_config(config) + # Pass skills configuration via configurable (overrides settings when set) + if self._enable_skills is not None: + run_config["configurable"]["skills_enabled"] = self._enable_skills + if self._skills_folder is not None: + from pathlib import Path as _Path + + _sf = _Path(self._skills_folder) + if _sf.name != ".cuga": + _sf = _sf / ".cuga" + run_config["configurable"]["skills_folder"] = str(_sf) + # Handle resume case (message is None or action_response is provided) if message is None or action_response is not None: if not thread_id: diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 00000000..e4282a72 --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,199 @@ +"""Shared fixtures and helpers for e2e tests. + +Provides: +- CaptureChatModel: hermetic mock LLM that records inputs and replays scripted responses +- Helpers: write_skill, extract_system_content +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Any + +import pytest +from langchain_core.language_models import BaseChatModel +from langchain_core.messages import AIMessage +from langchain_core.outputs import ChatGeneration, ChatResult +from pydantic import Field, PrivateAttr + +from cuga.backend.cuga_graph.nodes.cuga_lite.tool_provider_interface import ToolProviderInterface + + +# --------------------------------------------------------------------------- +# CaptureChatModel +# --------------------------------------------------------------------------- + + +class CaptureChatModel(BaseChatModel): + """Scripted mock chat model for hermetic e2e tests. + + Records every message list it receives and replays a pre-loaded queue of + AIMessage responses. Raises AssertionError if the graph makes more LLM + calls than there are scripted responses — surfacing unexpected loops early. + + bind_tools() records the tools it was called with and returns self. This + works correctly for graphs that call bind_tools once; if a future graph + ever calls bind_tools in a loop, captured_tools will reflect all calls + because we extend rather than replace the list on each call. + """ + + captured_inputs: list[list[Any]] = Field(default_factory=list) + captured_tools: list[Any] = Field(default_factory=list) + _queue: list[AIMessage] = PrivateAttr(default_factory=list) + + def __init__(self, responses: list[AIMessage] | None = None, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._queue = list(responses or [AIMessage(content="Done.")]) + + @property + def _llm_type(self) -> str: + return "capture" + + def bind_tools(self, tools: Any, **kwargs: Any) -> "CaptureChatModel": + self.captured_tools.extend(list(tools)) + return self + + def _pop_response(self) -> AIMessage: + assert self._queue, ( + "CaptureChatModel queue exhausted — the graph made more LLM calls than expected. " + "Pass more scripted responses to CaptureChatModel() or investigate unexpected loops." + ) + return self._queue.pop(0) + + def _generate( + self, + messages: list[Any], + stop: list[str] | None = None, + run_manager: Any = None, + **kwargs: Any, + ) -> ChatResult: + self.captured_inputs.append(list(messages)) + return ChatResult(generations=[ChatGeneration(message=self._pop_response())]) + + async def _agenerate( + self, + messages: list[Any], + stop: list[str] | None = None, + run_manager: Any = None, + **kwargs: Any, + ) -> ChatResult: + self.captured_inputs.append(list(messages)) + return ChatResult(generations=[ChatGeneration(message=self._pop_response())]) + + +# --------------------------------------------------------------------------- +# Tool providers +# --------------------------------------------------------------------------- + + +class MinimalToolProvider(ToolProviderInterface): + """No-op tool provider (copied from policy test helpers to avoid cross-import).""" + + async def initialize(self) -> None: + pass + + async def get_apps(self) -> list: + return [] + + async def get_all_tools(self) -> list: + return [] + + async def get_tools(self, app_name: str) -> list: + return [] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def write_skill( + root: Path, + name: str, + description: str, + body: str, + requirements: str = "", +) -> Path: + """Write a SKILL.md under root/.agents/skills//SKILL.md.""" + skill_dir = root / ".agents" / "skills" / name + skill_dir.mkdir(parents=True, exist_ok=True) + req_line = f"requirements: {requirements}\n" if requirements else "" + skill_file = skill_dir / "SKILL.md" + skill_file.write_text( + f"---\nname: {name}\ndescription: {description}\n{req_line}---\n{body}\n", + encoding="utf-8", + ) + return skill_file + + +def extract_system_content(messages: list) -> str: + """Return the content of the first system message from a LangChain message list.""" + for msg in messages: + role = getattr(msg, "type", None) or getattr(msg, "role", None) + if role == "system": + return msg.content or "" + if isinstance(msg, dict) and msg.get("role") == "system": + return msg.get("content", "") + return "" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def real_llm(): + """Real LLM model for Tier 3 e2e tests, built from the project's own settings. + + Uses LLMManager with settings.agent.code.model — the same model and credentials + that cuga uses in production. The platform, model name, base URL, and API key + are all read from the AGENT_SETTING_CONFIG toml loaded by config.py (which also + loads .env from the repo root). If credentials are absent the test will fail + at inference time with an auth error, not at collection time. + """ + from cuga.backend.llm.models import LLMManager + from cuga.config import settings + + return LLMManager().get_model(settings.agent.code.model) + + +# --------------------------------------------------------------------------- +# End-of-session summary for Tier 3 LLM e2e tests +# --------------------------------------------------------------------------- + + +def _find_module(name: str): + return ( + sys.modules.get(f"tests.e2e.{name}") + or sys.modules.get(name) + or next((v for k, v in sys.modules.items() if k.endswith(name)), None) + ) + + +def pytest_terminal_summary(terminalreporter, exitstatus, config) -> None: # noqa: ARG001 + """Print a summary table of all Tier 3 e2e skill test results.""" + results: list[dict] = [] + for mod_name in ("test_skills_llm_e2e", "test_skills_sdk_e2e"): + mod = _find_module(mod_name) + if mod: + results.extend(getattr(mod, "_RESULTS", [])) + if not results: + return + + width = 84 + sep = "=" * width + print(f"\n{sep}") + print(" TIER 3 E2E SKILLS — SUMMARY") + print(sep) + print(f" {'Skill':<36} {'Expected':<16} {'':6} Actual (excerpt)") + print(f" {'─' * (width - 2)}") + for r in results: + skill = r["skill"][:35] + expected = repr(r["expected"])[:15] + tag = "NOT in" if r["negative"] else "in " + verdict = "PASS" if r["passed"] else "FAIL" + actual_excerpt = r["actual"][:45].replace("\n", " ") + print(f" {skill:<36} {expected:<16} {tag} {verdict} {actual_excerpt}") + print(sep) diff --git a/tests/e2e/demo_outputs/.gitignore b/tests/e2e/demo_outputs/.gitignore new file mode 100644 index 00000000..9b21a6e9 --- /dev/null +++ b/tests/e2e/demo_outputs/.gitignore @@ -0,0 +1 @@ +*.pptx diff --git a/tests/e2e/test_skills_e2e.py b/tests/e2e/test_skills_e2e.py new file mode 100644 index 00000000..99e2580c --- /dev/null +++ b/tests/e2e/test_skills_e2e.py @@ -0,0 +1,836 @@ +"""E2E tests for the skills component. + +Two tiers: + Tier 1 - component-level (no LLM, no graph): exercises the skills discovery, + registry, and tool-creation pipeline directly. + Tier 2 - graph-level (CaptureChatModel): runs CugaLite with a mock LLM and + asserts that the skills block and load_skill tool are wired in. + +How skills work end-to-end +-------------------------- +A "skill" is a SKILL.md file placed by the user or operator under: + + /.agents/skills//SKILL.md + +The file uses YAML frontmatter followed by a markdown body: + + --- + name: summarize_report + description: Summarizes complex reports into bullet points + requirements: [python-pptx, npm:sharp] # optional + --- + ## Instructions + Read the report and output a bullet-point summary. + +At agent startup, the pipeline runs in this order: + + 1. discover_skills(cuga_folder) + Scans /.agents/skills/ and parses every SKILL.md into a SkillEntry. + Returns a list[SkillEntry]. + + 2. SkillRegistry(entries) + Holds the parsed entries. registry.load_skill(name) returns the full + instruction string that the model will see when it invokes the skill, + including any auto-generated package-install preamble. + + 3. create_skill_tools(registry) + Creates a single LangChain StructuredTool named "load_skill". When the model + calls load_skill(skill_name="summarize_report"), it receives the full body + returned by registry.load_skill("summarize_report"). + + 4. format_available_skills_block(registry) + Produces a markdown block listing all available skills with their descriptions. + This block is injected into the system prompt so the model knows what skills + exist before deciding to call load_skill. + + 5. CugaLiteGraph (Tier 2) + In prepare_tools_and_apps, steps 1-4 run automatically when skills.enabled=True. + The resulting skills block appears in the system message. If bind_tools mode + includes "load_skill", the tool is also bound to the model via bind_tools(). + +Requirement types and the commands they emit +-------------------------------------------- + Plain name (e.g. "python-pptx"): -> "uv pip install --quiet python-pptx" + npm: prefix (e.g. "npm:sharp"): -> "npm install sharp" + +These install commands are placed in STEP 1 of the loaded skill output, before the +STEP 2 instruction body, so the model always installs dependencies before executing. +""" + +from __future__ import annotations + +import uuid +from pathlib import Path + +import pytest +from langchain_core.messages import AIMessage, HumanMessage + +from cuga.backend.skills.loader import discover_skills +from cuga.backend.skills.registry import SkillEntry, SkillRegistry +from cuga.backend.skills.tools import create_skill_tools, format_available_skills_block + +from .conftest import CaptureChatModel, MinimalToolProvider, extract_system_content, write_skill + + +# --------------------------------------------------------------------------- +# Tier 1 - Skills discovery +# --------------------------------------------------------------------------- + + +class TestSkillDiscovery: + """Tests discover_skills(), which scans the filesystem for SKILL.md files. + + Each test uses write_skill() (from conftest) to create the expected directory + structure under tmp_path, then monkeypatches cwd to tmp_path so that + discover_skills(".cuga") resolves relative paths correctly. + + File layout created by write_skill(root, name, ...): + /.agents/skills//SKILL.md + + discover_skills(".cuga") looks under: + /.agents/skills/ + + The returned list contains SkillEntry objects with fields: + .name - the skill name from the YAML frontmatter + .description - the description from the YAML frontmatter + .requirements - tuple of requirement strings parsed from the frontmatter + .body - the markdown body below the frontmatter + .source - absolute path to the SKILL.md file + """ + + def test_skill_discovered_from_agents_skills_directory( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """A single SKILL.md written to the expected path is discovered by discover_skills(). + + Setup: + - Writes tmp_path/.agents/skills/make_slides/SKILL.md with name="make_slides". + - Monkeypatches cwd to tmp_path. + + Expected result: + - discover_skills(".cuga") returns a list where at least one entry has + .name == "make_slides". + + Failure modes: + - Empty list or "make_slides" not in names: the loader is looking in a different + directory, the YAML frontmatter format changed, or the file was written to the + wrong path. + """ + monkeypatch.chdir(tmp_path) + write_skill(tmp_path, "make_slides", "Creates slide presentations", "## Do the thing") + + entries = discover_skills(".cuga") + + names = [e.name for e in entries] + assert "make_slides" in names + + def test_multiple_skills_all_discovered(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """All SKILL.md files present under .agents/skills/ are returned in a single call. + + Setup: + - Writes three SKILL.md files: "data_viz", "send_report", "schedule_meeting". + + Expected result: + - All three names appear in the list returned by discover_skills(".cuga"). + - No skills are silently dropped due to ordering or concurrency issues. + + Failure modes: + - One or more names missing: the loader stops after finding the first skill, + or the directory glob pattern does not recurse correctly. + """ + monkeypatch.chdir(tmp_path) + for name in ("data_viz", "send_report", "schedule_meeting"): + write_skill(tmp_path, name, f"{name} description", "## Body") + + entries = discover_skills(".cuga") + names = [e.name for e in entries] + + assert "data_viz" in names + assert "send_report" in names + assert "schedule_meeting" in names + + def test_skill_entry_preserves_description(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """The description field from the YAML frontmatter is preserved exactly in SkillEntry. + + Setup: + - Writes a skill with description="Exactly this description". + + Expected result: + - The SkillEntry for "my_skill" has .description == "Exactly this description" + with no leading/trailing whitespace changes or character substitutions. + + Why this matters: + The description is what appears in the skills block shown to the model + (via format_available_skills_block). An altered description could cause the + model to invoke the wrong skill or fail to recognize a skill is relevant. + """ + monkeypatch.chdir(tmp_path) + write_skill(tmp_path, "my_skill", "Exactly this description", "## Steps") + + entries = discover_skills(".cuga") + entry = next(e for e in entries if e.name == "my_skill") + + assert entry.description == "Exactly this description" + + def test_skill_with_pip_requirements_parsed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Pip package requirements in YAML frontmatter are parsed into SkillEntry.requirements. + + Setup: + - Writes a skill with frontmatter: requirements: [python-pptx, pandas] + + Expected result: + - entry.requirements contains "python-pptx" and "pandas" as separate items. + - These strings are later used by SkillRegistry.load_skill() to generate + "uv pip install --quiet python-pptx" and "uv pip install --quiet pandas" + lines in the STEP 1 block. + + Failure modes: + - requirements is empty: the YAML list was not parsed, possibly due to a + format change in how requirements are declared in SKILL.md. + - Single string instead of list: the brackets were not treated as a YAML list. + """ + monkeypatch.chdir(tmp_path) + write_skill(tmp_path, "needs_packages", "Needs pip packages", "## Body", "[python-pptx, pandas]") + + entries = discover_skills(".cuga") + entry = next(e for e in entries if e.name == "needs_packages") + + assert "python-pptx" in entry.requirements + assert "pandas" in entry.requirements + + def test_skill_with_npm_requirements_parsed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """npm package requirements (npm: prefix) are parsed and distinguished from pip packages. + + Setup: + - Writes a skill with frontmatter: requirements: [npm:sharp, npm:imagemin] + + Expected result: + - entry.requirements contains "npm:sharp" and "npm:imagemin" with the "npm:" + prefix intact. The prefix is what SkillRegistry uses to route to "npm install" + rather than "uv pip install". + + Failure modes: + - Prefix stripped: "sharp" found but not "npm:sharp". The registry would then + incorrectly try to install "sharp" as a Python package. + - Items missing: npm: entries were filtered out during parsing. + """ + monkeypatch.chdir(tmp_path) + write_skill(tmp_path, "needs_npm", "Needs npm packages", "## Body", "[npm:sharp, npm:imagemin]") + + entries = discover_skills(".cuga") + entry = next(e for e in entries if e.name == "needs_npm") + + assert "npm:sharp" in entry.requirements + assert "npm:imagemin" in entry.requirements + + +# --------------------------------------------------------------------------- +# Tier 1 - SkillRegistry +# --------------------------------------------------------------------------- + + +class TestSkillRegistry: + """Tests SkillRegistry.load_skill(), which produces the full instruction string for a skill. + + The registry is constructed directly from SkillEntry objects (no filesystem I/O) + using the _make_registry() helper, making these tests fast and isolated from the + discovery layer. + + load_skill(name) returns a multi-section string. Its general structure is: + + [STEP 1 - only present when requirements exist] + Before following these instructions, run these install commands: + uv pip install --quiet + npm install + Command normalization override: ... + + [STEP 2 - always present] + + Command normalization override: ... + + The "Command normalization override" hint is always included and tells the model + to use "python" rather than "python3" when generating shell commands, for + cross-platform compatibility. + """ + + def _make_registry(self, name: str, body: str, requirements: tuple = ()) -> SkillRegistry: + """Build a SkillRegistry with a single entry without touching the filesystem. + + The source path is set to a plausible but non-existent path (/tmp//SKILL.md) + since load_skill() uses the already-parsed body, not the source path. + """ + entry = SkillEntry( + name=name, + description=f"{name} description", + body=body, + source=f"/tmp/{name}/SKILL.md", + requirements=requirements, + ) + return SkillRegistry([entry]) + + def test_load_returns_skill_body_content(self) -> None: + """load_skill() includes the full body text in the returned string, in order. + + Setup: + - Skill body: "## Phase one\\nDo the thing\\n## Phase two\\nVerify it" + + Expected result: + - "Do the thing" appears in the returned string. + - "Verify it" appears after "Do the thing" (original order preserved). + + Failure modes: + - Body text absent: the skill body was not included in the output. + - Order reversed: the registry reorders sections, which could confuse the + model about which phase comes first. + """ + registry = self._make_registry("make_slides", "## Phase one\nDo the thing\n## Phase two\nVerify it") + + loaded = registry.load_skill("make_slides") + + assert "Do the thing" in loaded + assert "Verify it" in loaded + assert loaded.index("Do the thing") < loaded.index("Verify it") + + def test_pip_requirements_emit_uv_install_commands(self) -> None: + """Pip requirements produce 'uv pip install' commands in the STEP 1 block. + + Setup: + - requirements=("python-pptx", "pandas") + + Expected substrings in loaded output: + - "uv pip install" (the install command prefix) + - "python-pptx" (first package) + - "pandas" (second package) + + The full expected lines look like: + "uv pip install --quiet python-pptx" + "uv pip install --quiet pandas" + """ + registry = self._make_registry( + "slide_maker", "## Make slides", requirements=("python-pptx", "pandas") + ) + + loaded = registry.load_skill("slide_maker") + + assert "uv pip install" in loaded + assert "python-pptx" in loaded + assert "pandas" in loaded + + def test_npm_requirements_emit_npm_install_commands(self) -> None: + """npm: requirements produce 'npm install' commands (not uv pip install). + + Setup: + - requirements=("npm:sharp", "npm:imagemin") + + Expected substrings in loaded output: + - "npm install" (npm command, not pip) + - "sharp" (package name without the "npm:" prefix) + - "imagemin" (package name without the "npm:" prefix) + + The full expected lines look like: + "npm install sharp" + "npm install imagemin" + """ + registry = self._make_registry( + "image_tool", "## Image processing", requirements=("npm:sharp", "npm:imagemin") + ) + + loaded = registry.load_skill("image_tool") + + assert "npm install" in loaded + assert "sharp" in loaded + assert "imagemin" in loaded + + def test_mixed_pip_and_npm_requirements(self) -> None: + """A skill with both pip and npm requirements produces both install command types. + + Setup: + - requirements=("python-pptx", "npm:sharp") + + Expected substrings in loaded output: + - "uv pip install --quiet python-pptx" + - "npm install sharp" + + Expected ordering: + - "STEP 1" (installs) appears before "STEP 2" (instructions) in the string. + This ordering is enforced in registry.py:46,88 and ensures the model always + installs packages before attempting to execute the skill instructions. + + Failure modes: + - STEP 2 before STEP 1: the model would attempt to run the skill before its + dependencies are installed, causing import errors. + - One install type missing: pip or npm routing logic is broken for mixed cases. + """ + registry = self._make_registry("mixed_skill", "## Mixed", requirements=("python-pptx", "npm:sharp")) + + loaded = registry.load_skill("mixed_skill") + + assert "uv pip install --quiet python-pptx" in loaded + assert "npm install sharp" in loaded + # STEP 1 (installs) must precede STEP 2 (instructions) — registry.py:46,88 + assert loaded.index("STEP 1") < loaded.index("STEP 2") + + def test_pip_install_followed_by_verification_command(self) -> None: + """load_skill() includes a 'uv pip show' verification after the pip install step. + + Without a verification command, the LLM has no signal that the package + was actually installed before it proceeds to STEP 2 instructions. + A 'uv pip show ' command produces explicit output (Name, Version, + Location) that the model can read to confirm success. + """ + registry = self._make_registry("deck", "## Make slides", requirements=("python-pptx",)) + + loaded = registry.load_skill("deck") + + assert "uv pip show" in loaded, ( + "Expected 'uv pip show ' verification command after pip install" + ) + assert "python-pptx" in loaded + + def test_npm_install_followed_by_verification_command(self) -> None: + """load_skill() includes an 'npm list' verification after the npm install step.""" + registry = self._make_registry("img", "## Image tool", requirements=("npm:sharp",)) + + loaded = registry.load_skill("img") + + assert "npm list" in loaded, "Expected 'npm list ' verification command after npm install" + assert "sharp" in loaded + + def test_verification_appears_after_install_and_before_step2(self) -> None: + """Verification commands appear after the install block and before STEP 2.""" + registry = self._make_registry("mixed", "## Mixed", requirements=("python-pptx", "npm:sharp")) + + loaded = registry.load_skill("mixed") + + pip_show_pos = loaded.index("uv pip show") + npm_list_pos = loaded.index("npm list") + step2_pos = loaded.index("STEP 2") + + assert pip_show_pos < step2_pos, "pip verification must appear before STEP 2" + assert npm_list_pos < step2_pos, "npm verification must appear before STEP 2" + + def test_load_skill_unknown_name_returns_helpful_error(self) -> None: + """load_skill() returns a descriptive error for an unknown skill name. + + Setup: + - Registry with one skill named "known_skill". + + Expected result: + - Calling load_skill("nonexistent") returns a string containing: + - "Unknown skill" (the error prefix) + - "nonexistent" (the requested name) + - "known_skill" (so the caller knows what's available) + + Why this matters: + The LLM could call load_skill with a slightly wrong name (spacing, case). + A useful error that lists known skills lets the model self-correct. + """ + registry = self._make_registry("known_skill", "## Body") + + result = registry.load_skill("nonexistent") + + assert "Unknown skill" in result + assert "nonexistent" in result + assert "known_skill" in result + + def test_python_command_normalization_hint_present(self) -> None: + """load_skill() always includes the 'Command normalization override' hint. + + Setup: + - Skill with no requirements and a simple body "## Body". + + Expected result: + - The string "Command normalization override" appears in the loaded output. + + Why this matters: + This hint instructs the model to use "python" (not "python3") when generating + shell commands. It is required for cross-platform compatibility and should be + present even for skills with no requirements. If it goes missing, models may + generate commands that fail on platforms where only "python" is on the PATH. + """ + registry = self._make_registry("norm_skill", "## Body") + + loaded = registry.load_skill("norm_skill") + + assert "Command normalization override" in loaded + + +# --------------------------------------------------------------------------- +# Tier 1 - Tool creation and skills block formatting +# --------------------------------------------------------------------------- + + +class TestSkillToolsAndBlock: + """Tests the LangChain tool and prompt block produced from a SkillRegistry. + + create_skill_tools(registry) -> list[StructuredTool] + Returns a list with exactly one tool named "load_skill". The model calls this + tool with {"skill_name": ""} to receive the full instruction body for + that skill. It is a single tool regardless of how many skills are registered, + because the tool itself dispatches to the correct skill at call time. + + format_available_skills_block(registry) -> str + Returns a markdown block like: + + ## Available Skills + - **alpha**: Alpha skill + - **beta**: Beta skill + + This block is injected into the system prompt so the model sees all available + skills before deciding to call load_skill. + """ + + def test_create_skill_tools_returns_load_skill_tool(self) -> None: + """create_skill_tools() returns exactly one tool and its name is 'load_skill'. + + Setup: + - Registry with one skill entry named "my_skill". + + Expected result: + - len(tools) == 1 + - tools[0].name == "load_skill" + + Note: the tool name is always "load_skill" regardless of how many skills are + in the registry. The model uses this single entry point to load any skill by + name at runtime. + """ + entry = SkillEntry( + name="my_skill", + description="My skill", + body="## Body", + source="/tmp/SKILL.md", + requirements=(), + ) + registry = SkillRegistry([entry]) + + tools = create_skill_tools(registry) + + assert len(tools) == 1 + assert tools[0].name == "load_skill" + + def test_format_available_skills_block_lists_all_skill_names(self) -> None: + """format_available_skills_block() includes every skill name registered. + + Setup: + - Registry with two skills: "alpha" and "beta". + + Expected result: + - The returned block string contains "alpha" and "beta". + - Both names must be present; the model uses this block to decide which + skill to load. A missing name means the model cannot know that skill exists. + """ + entries = [ + SkillEntry("alpha", "Alpha skill", "## Body", "/tmp/a/SKILL.md", ()), + SkillEntry("beta", "Beta skill", "## Body", "/tmp/b/SKILL.md", ()), + ] + registry = SkillRegistry(entries) + + block = format_available_skills_block(registry) + + assert "alpha" in block + assert "beta" in block + + def test_format_available_skills_block_includes_descriptions(self) -> None: + """format_available_skills_block() includes each skill's description alongside its name. + + Setup: + - Registry with one skill: name="gamma", description="Gamma makes reports". + + Expected result: + - "Gamma makes reports" appears in the block. + - The description is what helps the model decide *which* skill is relevant to + the user's request. Without it the model sees only skill names, which may + not be self-explanatory. + """ + entry = SkillEntry("gamma", "Gamma makes reports", "## Body", "/tmp/g/SKILL.md", ()) + registry = SkillRegistry([entry]) + + block = format_available_skills_block(registry) + + assert "Gamma makes reports" in block + + +# --------------------------------------------------------------------------- +# Tier 2 - CugaLite graph integration +# --------------------------------------------------------------------------- + + +class TestSkillsCugaLiteIntegration: + """Runs the full compiled CugaLiteGraph with a mock LLM and asserts on LLM inputs. + + These tests verify that the skills pipeline (discovery -> registry -> block/tool) + is correctly wired into the graph. CaptureChatModel records everything the graph + sends to the model without making any real LLM calls. + + Required monkeypatches in every test: + - settings.skills.enabled = True + Skills discovery is gated behind this flag. Without it, no skills block is + built and the model never sees available skills. + - CUGA_FOLDER env var = str(tmp_path / ".cuga") + Some graph paths resolve the skills root from this env var. + - settings.advanced_features.enable_shell_tool = True + The skills block is silently cleared at prompt_utils.py:539-541 when the + shell tool is disabled. This patch is required for the skill name to actually + appear in the system message. + - settings.advanced_features.cuga_lite_nl_auto_continue = False + Prevents the graph from looping for a second LLM call, which would exhaust + the CaptureChatModel's scripted response queue. + - settings.policy.enabled = False + Disables policy enforcement to avoid needing policy fixtures. + """ + + @pytest.mark.asyncio + async def test_skills_block_appears_in_cuga_lite_system_prompt( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """The skill name appears in the system message sent to the LLM. + + Setup: + - Writes a SKILL.md for "summarize_report" with description + "Summarizes complex reports into bullet points" under tmp_path/.agents/skills/. + - Monkeypatches cwd to tmp_path so discover_skills() finds the skill. + - Human message: "Can you summarize this report for me?" + - Scripted model response: "I will summarize." + + Expected system message contains: + - "summarize_report" (the skill name listed in the available skills block) + + Full expected shape of the skills section in the system message (partial): + ## Available Skills + - **summarize_report**: Summarizes complex reports into bullet points + + Failure modes: + - Skill name absent: skills.enabled was not picked up, CUGA_FOLDER pointed + to the wrong location, enable_shell_tool=False cleared the block, or + discover_skills() found no skills at the resolved path. + """ + from cuga.backend.cuga_graph.nodes.cuga_lite.cuga_lite_graph import ( + CugaLiteState, + create_cuga_lite_graph, + ) + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + write_skill( + tmp_path, + "summarize_report", + "Summarizes complex reports into bullet points", + "## Instructions\nRead the report and summarize.", + ) + + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + # Skills block silently cleared at prompt_utils.py:539-541 when enable_shell_tool=False. + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + + capture_model = CaptureChatModel(responses=[AIMessage(content="I will summarize.")]) + graph = create_cuga_lite_graph( + model=capture_model, + tool_provider=MinimalToolProvider(), + apps_list=[], + ).compile() + + thread_id = f"e2e_skills_{uuid.uuid4().hex[:8]}" + state = CugaLiteState( + chat_messages=[HumanMessage(content="Can you summarize this report for me?")], + thread_id=thread_id, + ) + config = { + "configurable": { + "thread_id": thread_id, + "apps_list": [], + } + } + + await graph.ainvoke(state, config=config) + + assert capture_model.captured_inputs, "CaptureChatModel was never invoked" + system_content = extract_system_content(capture_model.captured_inputs[0]) + assert system_content, "No system message found in LLM inputs" + assert "summarize_report" in system_content, ( + f"Expected skill name 'summarize_report' in system message. Got: {system_content[:600]}" + ) + + @pytest.mark.asyncio + async def test_load_skill_tool_is_bound_to_model_when_native_tools_enabled( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """When native bind-tools mode is 'tools' with load_skill, bind_tools receives it. + + Background: + CugaLite supports two ways to expose tools to the model: + 1. Text mode (default): tool names and descriptions appear in the system prompt + as plain text. The model references them by generating Python code. + 2. Native bind-tools mode: tools are passed to model.bind_tools(), which + exposes them as function-calling schemas. The model calls them via the + provider's native function-calling API. + + This test exercises mode 2 for the load_skill tool specifically. + + Setup: + - Writes a SKILL.md for "data_extractor". + - Sets cuga_lite_bind_tools_mode = "tools" and + cuga_lite_bind_tools_tool_names = ["load_skill"]. + - Human message: "Extract the data from this document." + - Scripted model response: "Done." + + Expected result: + - capture_model.captured_tools contains an object with .name == "load_skill". + - capture_model.captured_tools is populated by CaptureChatModel.bind_tools(), + which the graph calls during prepare_tools_and_apps when bind-tools mode + is active. + + Failure modes: + - "load_skill" absent from captured_tools: bind_tools was never called, likely + because cuga_lite_bind_tools_mode was not respected, or the skill was not + discovered and therefore no load_skill tool was created. + - captured_tools is empty: bind_tools mode setting was not picked up, so the + graph fell back to text mode and never called bind_tools() on the model. + """ + from cuga.backend.cuga_graph.nodes.cuga_lite.cuga_lite_graph import ( + CugaLiteState, + create_cuga_lite_graph, + ) + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + write_skill(tmp_path, "data_extractor", "Extracts structured data", "## Extract data") + + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + # Skills block silently cleared at prompt_utils.py:539-541 when enable_shell_tool=False. + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + # Enable native tool binding so bind_tools is called with the skill tool + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + + capture_model = CaptureChatModel(responses=[AIMessage(content="Done.")]) + graph = create_cuga_lite_graph( + model=capture_model, + tool_provider=MinimalToolProvider(), + apps_list=[], + ).compile() + + thread_id = f"e2e_tools_{uuid.uuid4().hex[:8]}" + state = CugaLiteState( + chat_messages=[HumanMessage(content="Extract the data from this document.")], + thread_id=thread_id, + ) + config = {"configurable": {"thread_id": thread_id, "apps_list": []}} + + await graph.ainvoke(state, config=config) + + tool_names = [getattr(t, "name", None) for t in capture_model.captured_tools] + assert "load_skill" in tool_names, f"Expected 'load_skill' in bound tools, got: {tool_names}" + + @pytest.mark.asyncio + async def test_graph_completes_without_skills_block_when_no_skills_found( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """When skills_enabled=True but the skills directory is empty, the graph completes + without error and the system message does not contain an block. + + Setup: + - skills.enabled = True, CUGA_FOLDER set to tmp_path/.cuga + - No SKILL.md files written anywhere — the skills directory does not exist. + - enable_shell_tool = True so skills are not suppressed for an unrelated reason. + - Scripted model response: "Done." + + Expected result: + - The graph invokes the LLM successfully. + - The system message does NOT contain "available_skills" or "load_skill". + + Failure modes: + - Crash or exception: the graph does not handle empty discovery gracefully. + - "available_skills" present: an empty or phantom skills block was injected. + """ + from cuga.backend.cuga_graph.nodes.cuga_lite.cuga_lite_graph import ( + CugaLiteState, + create_cuga_lite_graph, + ) + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + # No skills written — directory does not exist + + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + + capture_model = CaptureChatModel(responses=[AIMessage(content="Done.")]) + graph = create_cuga_lite_graph( + model=capture_model, + tool_provider=MinimalToolProvider(), + apps_list=[], + ).compile() + + thread_id = f"e2e_no_skills_{uuid.uuid4().hex[:8]}" + state = CugaLiteState( + chat_messages=[HumanMessage(content="Hello.")], + thread_id=thread_id, + ) + config = {"configurable": {"thread_id": thread_id, "apps_list": []}} + + await graph.ainvoke(state, config=config) + + assert capture_model.captured_inputs, "CaptureChatModel was never invoked" + system_content = extract_system_content(capture_model.captured_inputs[0]) + assert "available_skills" not in system_content, ( + "Expected no skills block when no SKILL.md files exist. " + f"Got system content: {system_content[:400]}" + ) + assert "load_skill" not in system_content, ( + "Expected no load_skill tool mention when no skills are available." + ) + + +# --------------------------------------------------------------------------- +# Blocked / placeholder tests +# --------------------------------------------------------------------------- + + +class TestSkillsBlockedPaths: + """Placeholder tests for skill execution paths that are not yet implemented. + + These tests are skipped with an explicit reason pointing to the blocking issue. + They are kept in the suite (rather than deleted) so that: + - The gap in coverage is visible when running pytest with -v. + - When the blocking issue is resolved, the test scaffold is already in place. + - The skip reason documents the dependency clearly for anyone reading the code. + """ + + @pytest.mark.skip(reason="Blocked on #199 - use_sub_agents skill execution path not yet wired") + @pytest.mark.asyncio + async def test_skill_executed_via_sub_agent( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Verify that a loaded skill is executed end-to-end via the sub-agent path. + + Planned flow (once #199 is resolved): + 1. Write a SKILL.md with use_sub_agents: true in its frontmatter. + 2. Run a graph invocation where the model calls load_skill(). + 3. The graph routes to a sub-agent that executes the skill instructions. + 4. Assert the sub-agent's result is returned to the parent conversation. + + Once issue #199 is resolved (use_sub_agents wired for skill execution), + this test should drive a full skill load -> sub-agent invocation -> result cycle. + """ + raise NotImplementedError("Implement after #199 is resolved") diff --git a/tests/e2e/test_skills_llm_e2e.py b/tests/e2e/test_skills_llm_e2e.py new file mode 100644 index 00000000..906056b8 --- /dev/null +++ b/tests/e2e/test_skills_llm_e2e.py @@ -0,0 +1,500 @@ +"""E2E tests for the skills component — Tier 3 (real LLM). + +These tests run the full CugaLiteGraph with the project's configured LLM (oss120b +via the rits platform, or whatever AGENT_SETTING_CONFIG points to) against skills +that contain proprietary information the LLM cannot know from training: custom +formula coefficients, fabricated internal codes, invented system names. + +The task for each test is designed so that only a model that has read the skill +body can produce the correct, verifiable output. Paired negative controls run +the same task with skills disabled and assert the expected value is absent, +confirming the skill is the genuine gating factor. + +The model and credentials are loaded from the same .env + AGENT_SETTING_CONFIG +that cuga uses in production — no test-specific keys or model overrides. If +credentials are missing the test fails (not skips) so the gap is visible in CI. + +How to run +---------- +Run all Tier 3 tests: + + uv run pytest tests/e2e/test_skills_llm_e2e.py -v -s -m e2e + +Run a single test: + + uv run pytest tests/e2e/test_skills_llm_e2e.py::test_compliance_scorer_produces_correct_score -v -s + +The -s flag is required to see the expected/actual output printed by each test. + +Graph execution flow (bind_tools mode, two LLM turns) +------------------------------------------------------ + Turn 1 — LLM receives the block (name + description) and + the load_skill function schema via bind_tools. It issues a native + tool call: load_skill(name=""). + Sandbox — _extract_code_from_response_tool_calls converts the tool call to + Python; code_executor forces local mode for skills; the skill body + is returned as execution output. + Turn 2 — LLM receives the skill body, follows its instructions, and produces + a final NL answer. nl_auto_continue=False routes to END. + Result — ainvoke returns a dict; result["final_answer"] holds that answer. + +Required patches (positive tests) +---------------------------------- + settings.skills.enabled = True + CUGA_FOLDER = str(tmp_path / ".cuga") + settings.advanced_features.enable_shell_tool = True + (skills block cleared at prompt_utils.py:539-541 when False) + settings.advanced_features.cuga_lite_bind_tools_mode = "tools" + settings.advanced_features.cuga_lite_bind_tools_tool_names = ["load_skill"] + settings.advanced_features.cuga_lite_nl_auto_continue = False + settings.policy.enabled = False +""" + +from __future__ import annotations + +import uuid +from pathlib import Path + +import pytest +from langchain_core.messages import HumanMessage + +from .conftest import MinimalToolProvider, write_skill + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +# Collects one entry per _report() call; printed as a table by +# pytest_terminal_summary in conftest.py at the end of the session. +_RESULTS: list[dict] = [] + + +def _report( + *, + skill: str, + task: str, + expected: str | list[str], + actual: str, + negative: bool = False, +) -> None: + """Print an expected-vs-actual summary (visible with pytest -s) and + append the result to _RESULTS for the end-of-session summary table. + + expected may be a single string or a list of strings (all must be present). + Call this before the assertion so the output is visible for both + passing and failing tests. + """ + terms = expected if isinstance(expected, list) else [expected] + if negative: + passed = all(t not in actual for t in terms) + else: + passed = all(t in actual for t in terms) + + display = repr(terms[0]) + (f" (+{len(terms) - 1} more)" if len(terms) > 1 else "") + _RESULTS.append( + { + "skill": skill, + "task": task, + "expected": display, + "actual": actual, + "negative": negative, + "passed": passed, + } + ) + width = 64 + verb = "NOT in" if negative else "in" + check = f"{display} {verb} response" + print(f"\n{'─' * width}") + print(f" skill : {skill}") + print(f" task : {task[:70]}{'…' if len(task) > 70 else ''}") + print(f" expected : {check}") + print(f" actual :\n {actual[:400]}{'…' if len(actual) > 400 else ''}") + print(f"{'─' * width}") + + +async def _run_graph(model, human_message: str, thread_id: str) -> str: + """Compile and invoke CugaLiteGraph; return the final NL answer. + + Caller must monkeypatch cwd and CUGA_FOLDER before calling so that + discover_skills() resolves to the test's tmp_path. + """ + from cuga.backend.cuga_graph.nodes.cuga_lite.cuga_lite_graph import ( + CugaLiteState, + create_cuga_lite_graph, + ) + + graph = create_cuga_lite_graph( + model=model, + tool_provider=MinimalToolProvider(), + apps_list=[], + ).compile() + + state = CugaLiteState( + chat_messages=[HumanMessage(content=human_message)], + thread_id=thread_id, + ) + config = { + "configurable": { + "thread_id": thread_id, + "apps_list": [], + "cuga_lite_max_steps": 6, + } + } + result = await graph.ainvoke(state, config=config) + final_answer = result.get("final_answer", "") + if not final_answer: + for msg in reversed(result.get("chat_messages", [])): + if getattr(msg, "type", None) == "ai" and getattr(msg, "content", ""): + final_answer = msg.content + break + return final_answer + + +# --------------------------------------------------------------------------- +# Skill 1: Proprietary compliance risk score +# --------------------------------------------------------------------------- +# +# Formula: CRS = (violations * 14) + (days_overdue * 3) - (controls_passed * 5) + 22 +# +# For violations=3, days_overdue=45, controls_passed=8: +# CRS = (3*14) + (45*3) - (8*5) + 22 = 42 + 135 - 40 + 22 = 159 +# +# The coefficients 14, 3, 5 and the constant 22 are invented. No LLM can +# produce 159 without reading the skill body. + +_SCORER_SKILL_BODY = ( + "## Acme Corp Compliance Risk Score Calculator\n\n" + "Use this proprietary formula to compute the CRS (Compliance Risk Score):\n\n" + " CRS = (violations * 14) + (days_overdue * 3) - (controls_passed * 5) + 22\n\n" + "Where:\n" + "- violations: number of distinct policy violations found\n" + "- days_overdue: number of calendar days past the remediation deadline\n" + "- controls_passed: number of controls that passed review in the same audit cycle\n" + "- The constant offset 22 is the Acme baseline risk factor\n\n" + 'Report the result as: "Acme CRS: "' +) +_SCORER_TASK = "Compute the Acme compliance risk score for: 3 violations, 45 days overdue, 8 controls passed." + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_compliance_scorer_produces_correct_score( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """LLM computes the proprietary Acme CRS formula and returns 159. + + Setup: + - Writes acme_compliance_scorer SKILL.md under tmp_path/.agents/skills/. + - Enables skills + bind_tools so the LLM can call load_skill natively. + + Expected result: + - "159" appears in final_answer (3*14 + 45*3 - 8*5 + 22 = 159). + + Why the LLM cannot produce this without the skill: + The coefficients 14, 3, 5 and the constant offset 22 are fabricated. + Without the skill body the model has no basis for these values. + """ + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + write_skill( + tmp_path, + "acme_compliance_scorer", + "Computes the Acme Corp proprietary compliance risk score for audit findings", + _SCORER_SKILL_BODY, + ) + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_SCORER_TASK, + thread_id=f"e2e_crs_{uuid.uuid4().hex[:8]}", + ) + + _report(skill="acme_compliance_scorer", task=_SCORER_TASK, expected="159", actual=final_answer) + assert "159" in final_answer, ( + f"Expected CRS=159 in final answer (3*14 + 45*3 - 8*5 + 22 = 159). Got: {final_answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_compliance_scorer_cannot_produce_correct_score_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Negative control: LLM cannot produce 159 when the skill is not loaded. + + skills.enabled=False so the skill body is never delivered to the model. + The model has no knowledge of the proprietary formula and will produce + a different answer. + """ + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.skills, "enabled", False) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_SCORER_TASK, + thread_id=f"e2e_crs_neg_{uuid.uuid4().hex[:8]}", + ) + + _report( + skill="acme_compliance_scorer (no skill)", + task=_SCORER_TASK, + expected="159", + actual=final_answer, + negative=True, + ) + assert "159" not in final_answer, ( + "LLM produced 159 without the skill — the skill is not gating this capability. " + f"Got: {final_answer[:500]!r}" + ) + + +# --------------------------------------------------------------------------- +# Skill 2: Internal parts catalog lookup +# --------------------------------------------------------------------------- +# +# PRU-2267-K is an invented identifier absent from all public training data. +# Without the skill the model will refuse or produce a different code. + +_PARTS_SKILL_BODY = ( + "## Acme Corp Parts Catalog — Internal Reference\n\n" + "Return the exact internal part code for the requested product.\n\n" + "| Product Description | Internal Part Code |\n" + "|---------------------------------|--------------------|\n" + "| Thermal Bypass Valve | TBV-9143-X |\n" + "| Pressure Relief Unit | PRU-2267-K |\n" + "| Flow Control Module | FCM-5508-J |\n" + "| Rotary Actuator Assembly Type-3 | RAA-7712-Q |\n" + "| Solenoid Isolation Block | SIB-3391-N |\n\n" + 'If the product is not listed, respond: "Part code not found in catalog."' +) +_PARTS_TASK = "What is the Acme Corp internal part code for the Pressure Relief Unit?" + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_parts_catalog_returns_internal_code( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """LLM returns the fabricated internal code PRU-2267-K from the skill body. + + Setup: + - Writes parts_catalog_lookup SKILL.md with a table of fabricated codes. + + Expected result: + - "PRU-2267-K" appears in final_answer. + + Why the LLM cannot produce this without the skill: + PRU-2267-K is a made-up identifier absent from all public training data. + Without the skill the model will either refuse or produce a plausible-looking + but incorrect code. + """ + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + write_skill( + tmp_path, + "parts_catalog_lookup", + "Returns internal part codes from the Acme Corp industrial parts catalog", + _PARTS_SKILL_BODY, + ) + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_PARTS_TASK, + thread_id=f"e2e_parts_{uuid.uuid4().hex[:8]}", + ) + + _report(skill="parts_catalog_lookup", task=_PARTS_TASK, expected="PRU-2267-K", actual=final_answer) + assert "PRU-2267-K" in final_answer, ( + f"Expected part code 'PRU-2267-K' in final answer. Got: {final_answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_parts_catalog_cannot_return_code_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Negative control: LLM cannot return PRU-2267-K without the skill body.""" + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.skills, "enabled", False) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_PARTS_TASK, + thread_id=f"e2e_parts_neg_{uuid.uuid4().hex[:8]}", + ) + + _report( + skill="parts_catalog_lookup (no skill)", + task=_PARTS_TASK, + expected="PRU-2267-K", + actual=final_answer, + negative=True, + ) + assert "PRU-2267-K" not in final_answer, ( + "LLM produced the fabricated part code without the skill — " + "the skill is not gating this capability. " + f"Got: {final_answer[:500]!r}" + ) + + +# --------------------------------------------------------------------------- +# Skill 3: Internal vendor onboarding process +# --------------------------------------------------------------------------- +# +# The skill body uses four invented system names: NEXUS, CERBERUS, IRONGATE, +# DOCUVAULT. A generic onboarding answer contains none of these names. + +_ONBOARDING_SKILL_BODY = ( + "## Acme Corp Vendor Onboarding — Standard Process v4.2\n\n" + "Complete all steps in order. Do not skip or reorder.\n\n" + "Step 1 — NEXUS Compliance Screen\n" + " Submit vendor details to the NEXUS compliance portal (portal ID: NX-VENDOR).\n" + " Await NEXUS clearance code before proceeding.\n\n" + "Step 2 — CERBERUS Authentication Setup\n" + " Create vendor account in CERBERUS (internal IAM system).\n" + " Assign role: VENDOR_EXTERNAL_L1.\n\n" + "Step 3 — IRONGATE Financial Vetting\n" + " Submit bank details and tax forms to IRONGATE (finance validation system).\n" + " Record the IRONGATE approval reference number.\n\n" + "Step 4 — Master Agreement via DOCUVAULT\n" + " Send the standard MSA template via DOCUVAULT (contract management portal).\n" + " DOCUVAULT signatures only — do not use email attachments.\n\n" + "Step 5 — Activation Confirmation\n" + " Confirm all prior steps, then issue activation. Reference the NEXUS clearance\n" + " code, CERBERUS activation token, and IRONGATE reference number.\n\n" + "Always name all four internal systems in your summary: " + "NEXUS, CERBERUS, IRONGATE, DOCUVAULT." +) +_ONBOARDING_TASK = "Walk me through the Acme Corp vendor onboarding process." +_ONBOARDING_SYSTEMS = ("NEXUS", "CERBERUS", "IRONGATE", "DOCUVAULT") + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_vendor_onboarding_uses_internal_system_names( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """LLM produces an onboarding guide that names all four fabricated internal systems. + + Setup: + - Writes acme_vendor_onboarding SKILL.md with a 5-step process using + NEXUS, CERBERUS, IRONGATE, and DOCUVAULT as internal system names. + + Expected result: + - All four system names appear in final_answer. + + Why the LLM cannot produce this without the skill: + NEXUS/CERBERUS/IRONGATE/DOCUVAULT are invented names absent from any public + training corpus. Without the skill the model produces a generic onboarding + process with no reference to these systems. + """ + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + write_skill( + tmp_path, + "acme_vendor_onboarding", + "Guides the Acme Corp vendor onboarding process with all required internal steps", + _ONBOARDING_SKILL_BODY, + ) + monkeypatch.setattr(settings.skills, "enabled", True) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_ONBOARDING_TASK, + thread_id=f"e2e_onboard_{uuid.uuid4().hex[:8]}", + ) + + _report( + skill="acme_vendor_onboarding", + task=_ONBOARDING_TASK, + expected=list(_ONBOARDING_SYSTEMS), + actual=final_answer, + ) + for system in _ONBOARDING_SYSTEMS: + assert system in final_answer, ( + f"Expected internal system name '{system}' in final answer. Got: {final_answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_vendor_onboarding_lacks_internal_names_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Negative control: without the skill the LLM produces generic onboarding guidance. + + None of the four fabricated system names should appear in a response that + has no access to the skill body. + """ + from cuga.config import settings + + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("CUGA_FOLDER", str(tmp_path / ".cuga")) + monkeypatch.setattr(settings.skills, "enabled", False) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + final_answer = await _run_graph( + model=real_llm, + human_message=_ONBOARDING_TASK, + thread_id=f"e2e_onboard_neg_{uuid.uuid4().hex[:8]}", + ) + + _report( + skill="acme_vendor_onboarding (no skill)", + task=_ONBOARDING_TASK, + expected=list(_ONBOARDING_SYSTEMS), + actual=final_answer, + negative=True, + ) + found = [s for s in _ONBOARDING_SYSTEMS if s in final_answer] + assert not found, ( + f"LLM produced fabricated system names without the skill: {found}. Got: {final_answer[:500]!r}" + ) diff --git a/tests/e2e/test_skills_presentation_e2e.py b/tests/e2e/test_skills_presentation_e2e.py new file mode 100644 index 00000000..00786c11 --- /dev/null +++ b/tests/e2e/test_skills_presentation_e2e.py @@ -0,0 +1,229 @@ +"""Demo test: create a real .pptx file using the anthropics/skills pptx skill. + +This is a visual-confirmation test, not a unit test. Its job is to prove that +the full skill pipeline — discovery → load → companion guide → code execution → +file output — produces a file you can actually open in PowerPoint or LibreOffice. + +What happens end-to-end +----------------------- +1. Downloads ``SKILL.md`` and ``pptxgenjs.md`` from github.com/anthropics/skills. +2. Pre-installs pptxgenjs (Node.js) locally in the temp working directory. +3. Provides a ``run_node_script`` function-calling tool the agent can use to + actually execute a Node.js script (without needing a code-execution sandbox). +4. Creates a CugaAgent in tools mode with both ``load_skill`` and + ``run_node_script`` bound. +5. The agent loads the pptx skill → reads the pptxgenjs companion guide → + writes a pptxgenjs script → calls ``run_node_script`` to run it → + a real .pptx file lands in the working directory. +6. Copies the result to ``tests/e2e/demo_outputs/`` and prints the path. + +Run with +-------- + uv run pytest tests/e2e/test_skills_presentation_e2e.py -m e2e -v -s + +After the test open the file path printed to stdout. +""" + +from __future__ import annotations + +import os +import subprocess +import urllib.error +import urllib.request +import uuid +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import pytest +from langchain_core.tools import tool + +from cuga.backend.cuga_graph.nodes.cuga_lite.tool_provider_interface import ToolProviderInterface + +if TYPE_CHECKING: + from cuga.sdk import CugaAgent + +_GITHUB_RAW = "https://raw.githubusercontent.com/anthropics/skills/main/skills/pptx" +_DEMO_OUTPUTS = Path(__file__).parent / "demo_outputs" + +_TASK = """ +Use the pptx skill to create a 5-slide presentation titled "Agent Skills in Practice". + +Slide outline: + 1. Title slide — "Agent Skills in Practice", subtitle "From Discovery to Execution" + 2. What Are Skills? — reusable SKILL.md playbooks that give AI agents domain-specific + procedural knowledge + 3. How It Works — skill discovery → load_skill → companion guides → execution + 4. Real-World Skill Examples — pptx, deploy-to-vercel, react-best-practices, docx + 5. Summary — "Skills turn general-purpose agents into domain experts" + +Design: dark color scheme (navy or charcoal background, white text), colored accent +shapes, concise bullets (max 4 per slide). + +When you have the pptxgenjs script ready, call run_node_script to execute it. +The script must end with pres.writeFile({ fileName: "presentation.pptx" }) so the +file lands in the current directory. +""" + + +def _fetch(path: str) -> str: + """Download a file from the anthropics/skills pptx directory. Skip if unreachable.""" + url = f"{_GITHUB_RAW}/{path}" + try: + with urllib.request.urlopen(url, timeout=15) as resp: # noqa: S310 + return resp.read().decode("utf-8") + except (urllib.error.URLError, OSError) as exc: + pytest.skip(f"GitHub unreachable ({exc}); skipping presentation demo test") + + +def _install_skill_files(tmp_path: Path) -> None: + """Place the real pptx SKILL.md and pptxgenjs companion guide in the skills directory.""" + skill_dir = tmp_path / ".agents" / "skills" / "pptx" + skill_dir.mkdir(parents=True, exist_ok=True) + + (skill_dir / "SKILL.md").write_text(_fetch("SKILL.md"), encoding="utf-8") + + pptxgenjs_md = _fetch("pptxgenjs.md") + (skill_dir / "pptxgenjs.md").write_text(pptxgenjs_md, encoding="utf-8") + # Also at cwd root — agents sometimes look for companions via a bare filename. + (tmp_path / "pptxgenjs.md").write_text(pptxgenjs_md, encoding="utf-8") + + +def _npm_install_pptxgenjs(tmp_path: Path) -> None: + """Pre-install pptxgenjs so the agent can require() it immediately.""" + result = subprocess.run( + ["npm", "install", "pptxgenjs", "--prefix", str(tmp_path)], + capture_output=True, + text=True, + timeout=60, + ) + if result.returncode != 0: + pytest.skip(f"npm install pptxgenjs failed: {result.stderr[:300]}") + + +class NodeExecutorToolProvider(ToolProviderInterface): + """Exposes a single ``run_node_script`` tool that executes a pptxgenjs script. + + This gives the agent a real execution mechanism without needing the full + code-execution sandbox. The script runs in ``workdir`` so that + ``require('pptxgenjs')`` resolves against ``workdir/node_modules/``. + """ + + def __init__(self, workdir: Path) -> None: + self._workdir = workdir + + async def initialize(self) -> None: + pass + + async def get_apps(self) -> list: + return [] + + async def get_all_tools(self) -> list: + workdir = self._workdir + + @tool + async def run_node_script(script: str) -> str: + """Execute a Node.js pptxgenjs script to produce a .pptx file. + + Write a complete self-contained script that ends with + pres.writeFile({ fileName: "presentation.pptx" }). + Returns stdout / stderr so you can verify success. + """ + script_path = workdir / "create_presentation.js" + script_path.write_text(script, encoding="utf-8") + + env = {**os.environ, "NODE_PATH": str(workdir / "node_modules")} + proc = subprocess.run( + ["node", str(script_path)], + capture_output=True, + text=True, + timeout=60, + cwd=str(workdir), + env=env, + ) + if proc.returncode != 0: + return f"Exit {proc.returncode}\nstdout: {proc.stdout}\nstderr: {proc.stderr}" + return proc.stdout.strip() or "Script completed (no stdout)." + + return [run_node_script] + + async def get_tools(self, app_name: str) -> list[Any]: + return [] + + +def _make_sdk_agent( + *, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> "CugaAgent": + from cuga.config import settings + from cuga.sdk import CugaAgent + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + # Tools mode: load_skill and run_node_script are bound as function-call tools. + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr( + settings.advanced_features, + "cuga_lite_bind_tools_tool_names", + ["load_skill", "run_node_script"], + ) + # Allow multi-turn: the agent writes a planning step before calling load_skill. + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", True) + monkeypatch.setattr(settings.policy, "enabled", False) + + return CugaAgent( + model=real_llm, + enable_skills=True, + skills_folder=str(tmp_path), + enable_knowledge=False, + tool_provider=NodeExecutorToolProvider(tmp_path), + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_pptx_skill_creates_real_presentation( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Agent uses the real anthropics/skills pptx skill to produce a .pptx file. + + The assertion is minimal: the file must exist and be non-trivially sized. + The primary value is visual — open the file and confirm a real presentation + was generated. + """ + _install_skill_files(tmp_path) + _npm_install_pptxgenjs(tmp_path) + + agent = _make_sdk_agent(tmp_path=tmp_path, monkeypatch=monkeypatch, real_llm=real_llm) + + result = await agent.invoke( + _TASK, + thread_id=f"pptx_demo_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[pptx_demo] agent answer:\n{result.answer[:800]}") + + pptx_files = list(tmp_path.glob("**/*.pptx")) + assert pptx_files, ( + f"No .pptx file found in {tmp_path}.\n" + f"Agent answer: {result.answer[:500]!r}\n" + f"Working dir: {sorted(str(p.name) for p in tmp_path.iterdir())}" + ) + + output_file = max(pptx_files, key=lambda p: p.stat().st_size) + assert output_file.stat().st_size > 1_000, ( + f"Output file {output_file.name} is suspiciously small ({output_file.stat().st_size} bytes)." + ) + + _DEMO_OUTPUTS.mkdir(parents=True, exist_ok=True) + dest = _DEMO_OUTPUTS / f"presentation_{uuid.uuid4().hex[:6]}.pptx" + dest.write_bytes(output_file.read_bytes()) + + print(f"\n{'=' * 60}") + print(" Presentation saved — open to visually inspect:") + print(f" {dest}") + print(f" Size: {dest.stat().st_size:,} bytes") + print(f"{'=' * 60}\n") diff --git a/tests/e2e/test_skills_real_e2e.py b/tests/e2e/test_skills_real_e2e.py new file mode 100644 index 00000000..97aefbb0 --- /dev/null +++ b/tests/e2e/test_skills_real_e2e.py @@ -0,0 +1,230 @@ +"""E2E tests using real skills fetched from github.com/vercel-labs/agent-skills. + +Unlike the fabricated-data tests in test_skills_sdk_e2e.py, the skill content here +is real and publicly available, so the LLM may have encountered similar material +during training. Content verification is therefore a "soft" signal: it increases +confidence that the skill was loaded and applied, but is not a hard gate the way +fabricated secret values are. + +The primary value of these tests is integration coverage: + - Skill discovery from a real, unmodified SKILL.md (not hand-crafted frontmatter). + - The load_skill tool is invoked by the agent against that SKILL.md. + - The agent's answer is richer and more specific than it would be without the skill. + +Verification strategy +--------------------- +We assert that rule identifiers specific to the Vercel taxonomy appear in the answer. +Identifiers like ``async-cheap-condition-before-await`` or ``server-no-shared-module-state`` +are long and precise enough that they are unlikely to appear without reading the skill. + +Note on negative controls +-------------------------- +Negative controls (enable_skills=False) are omitted here because the LLM may reproduce +Vercel rule names from training data, making them unreliable for public skills. +Use test_skills_sdk_e2e.py for hard-gated negative controls with fabricated data. + +How to run +---------- + uv run pytest tests/e2e/test_skills_real_e2e.py -m e2e -v -s + +Skip reason when GitHub is unreachable: + Tests are skipped automatically if the raw.githubusercontent.com download fails. +""" + +from __future__ import annotations + +import urllib.error +import urllib.request +import uuid +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from cuga.sdk import CugaAgent + +_GITHUB_RAW = "https://raw.githubusercontent.com/vercel-labs/agent-skills/main/skills" + + +def _fetch_skill_md(skill_path: str) -> str: + """Download SKILL.md from vercel-labs/agent-skills. Skip if unreachable.""" + url = f"{_GITHUB_RAW}/{skill_path}/SKILL.md" + try: + with urllib.request.urlopen(url, timeout=15) as resp: # noqa: S310 + return resp.read().decode("utf-8") + except (urllib.error.URLError, OSError) as exc: + pytest.skip(f"GitHub unreachable ({exc}); skipping real-skill test") + + +def _install_skill(tmp_path: Path, dir_name: str, content: str) -> None: + """Write a SKILL.md to the skills directory under tmp_path.""" + skill_dir = tmp_path / ".agents" / "skills" / dir_name + skill_dir.mkdir(parents=True, exist_ok=True) + (skill_dir / "SKILL.md").write_text(content, encoding="utf-8") + + +def _make_sdk_agent( + *, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + enable_skills: bool, + real_llm, +) -> "CugaAgent": + from cuga.config import settings + from cuga.sdk import CugaAgent + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + return CugaAgent( + model=real_llm, + enable_skills=enable_skills, + skills_folder=str(tmp_path), + enable_knowledge=False, + ) + + +# --------------------------------------------------------------------------- +# Tier 3 – real LLM against real skills from vercel-labs/agent-skills +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_react_best_practices_skill_loaded_and_applied( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Agent uses the real Vercel react-best-practices skill to answer a rule-specific query. + + The Vercel taxonomy uses identifiers like ``async-cheap-condition-before-await`` + that are long and precise enough to be unlikely in a generic React answer. + """ + content = _fetch_skill_md("react-best-practices") + _install_skill(tmp_path, "react-best-practices", content) + + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + task = ( + "Using Vercel's react-best-practices skill, list the specific rule identifiers " + "for all CRITICAL priority rules that prevent async waterfalls in React. " + "Include the exact rule names as they appear in the skill." + ) + + result = await agent.invoke(task, thread_id=f"react_real_{uuid.uuid4().hex[:8]}") + print(f"\n[react_real] answer: {result.answer[:600]}") + + # Rule IDs specific to the Vercel taxonomy — unlikely without reading the skill. + expected_ids = [ + "async-cheap-condition-before-await", + "async-defer-await", + "async-parallel", + ] + found = [rid for rid in expected_ids if rid in result.answer] + assert found, ( + f"Expected at least one Vercel rule identifier in answer but found none of {expected_ids}. " + f"Got: {result.answer[:600]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_react_best_practices_server_rules_present( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Agent applies the server-side rules section from the real Vercel skill. + + ``server-no-shared-module-state`` is a particularly precise identifier — it would + be unusual for a generic React answer to reproduce this exact string. + """ + content = _fetch_skill_md("react-best-practices") + _install_skill(tmp_path, "react-best-practices", content) + + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + task = ( + "Using Vercel's react-best-practices skill, what are the HIGH priority " + "server-side performance rules? List their exact rule identifiers." + ) + + result = await agent.invoke(task, thread_id=f"react_server_{uuid.uuid4().hex[:8]}") + print(f"\n[react_server] answer: {result.answer[:600]}") + + server_ids = [ + "server-no-shared-module-state", + "server-cache-react", + "server-parallel-fetching", + ] + found = [rid for rid in server_ids if rid in result.answer] + assert found, ( + f"Expected at least one server-side rule ID in answer but found none of {server_ids}. " + f"Got: {result.answer[:600]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_deploy_to_vercel_skill_guides_deployment_workflow( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """Agent follows the deploy-to-vercel skill's opinionated preview-first workflow. + + The skill explicitly mandates: + - Always deploy as preview unless production is explicitly requested. + - Run four specific state-gathering checks before choosing a deploy method. + These are specific enough to distinguish skill-guided from generic answers. + """ + content = _fetch_skill_md("deploy-to-vercel") + _install_skill(tmp_path, "deploy-to-vercel", content) + + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + task = ( + "Using the deploy-to-vercel skill, walk me through the correct process " + "for deploying a project to Vercel. What do I check first, and should I " + "deploy to production or preview by default?" + ) + + result = await agent.invoke(task, thread_id=f"deploy_real_{uuid.uuid4().hex[:8]}") + print(f"\n[deploy_real] answer: {result.answer[:600]}") + + answer_lower = result.answer.lower() + + # The skill explicitly says "Always deploy as preview (not production)". + assert "preview" in answer_lower, ( + f"Expected 'preview' in answer (skill mandates preview-first). Got: {result.answer[:500]!r}" + ) + + # The skill mandates four pre-deploy checks; at least one of the specific commands + # or file paths should appear. + deployment_signals = ["vercel whoami", ".vercel/project.json", "vercel teams list", ".vercel/repo.json"] + found = [s for s in deployment_signals if s in result.answer] + assert found, ( + f"Expected at least one deployment-state check from the skill ({deployment_signals}). " + f"Got: {result.answer[:500]!r}" + ) diff --git a/tests/e2e/test_skills_sdk_e2e.py b/tests/e2e/test_skills_sdk_e2e.py new file mode 100644 index 00000000..e42ebfe4 --- /dev/null +++ b/tests/e2e/test_skills_sdk_e2e.py @@ -0,0 +1,486 @@ +"""E2E tests for skills via the CugaAgent SDK. + +Tier 1 — SDK configuration: + Verifies that CugaAgent(enable_skills=True, skills_folder=...) correctly + routes the skills_enabled / skills_folder values into the graph configurable, + without making any LLM calls. + +Tier 3 — Real LLM via SDK: + Runs CugaAgent(enable_skills=True) with the real configured LLM and asserts + that skills containing proprietary data (fabricated formulas / codes) are + loaded and applied correctly. Paired negative controls run with + enable_skills=False and assert the expected value is absent. + +Same fabricated-data approach as test_skills_llm_e2e.py — the LLM cannot +produce the correct answer without reading the skill body. + +How to run +---------- +Tier 1 only (fast, no LLM): + + uv run pytest tests/e2e/test_skills_sdk_e2e.py::TestSkillsSdkConfiguration -v + +Tier 3 only (real LLM): + + uv run pytest tests/e2e/test_skills_sdk_e2e.py -m e2e -v -s +""" + +from __future__ import annotations + +import uuid +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from .conftest import write_skill + +if TYPE_CHECKING: + from cuga.sdk import CugaAgent + + +def _normalize_hyphens(text: str) -> str: + """Replace Unicode dash variants with ASCII hyphen for robust assertions.""" + for ch in "‐‑‒–—―−": + text = text.replace(ch, "-") + return text + + +# Collects one entry per _report() call; printed in the end-of-session summary +# by pytest_terminal_summary in conftest.py (alongside test_skills_llm_e2e results). +_RESULTS: list[dict] = [] + + +def _report(*, skill: str, expected: str | list[str], actual: str, negative: bool = False) -> None: + terms = expected if isinstance(expected, list) else [expected] + passed = all(t not in actual for t in terms) if negative else all(t in actual for t in terms) + display = repr(terms[0]) + (f" (+{len(terms) - 1} more)" if len(terms) > 1 else "") + _RESULTS.append( + {"skill": skill, "expected": display, "actual": actual, "negative": negative, "passed": passed} + ) + + +# --------------------------------------------------------------------------- +# Tier 1 – SDK configuration (no LLM calls) +# --------------------------------------------------------------------------- + + +class TestSkillsSdkConfiguration: + """Verify that enable_skills / skills_folder are stored and forwarded correctly. + + These tests inspect the agent's internal state without invoking the graph. + They confirm the SDK wiring before any real LLM call. + """ + + def test_enable_skills_defaults_to_none(self) -> None: + """CugaAgent() without enable_skills stores None (auto from settings).""" + from cuga.sdk import CugaAgent + + agent = CugaAgent(enable_knowledge=False) + assert agent._enable_skills is None + + def test_enable_skills_true_is_stored(self) -> None: + """CugaAgent(enable_skills=True) stores True on the agent.""" + from cuga.sdk import CugaAgent + + agent = CugaAgent(enable_skills=True, enable_knowledge=False) + assert agent._enable_skills is True + + def test_enable_skills_false_is_stored(self) -> None: + """CugaAgent(enable_skills=False) stores False on the agent.""" + from cuga.sdk import CugaAgent + + agent = CugaAgent(enable_skills=False, enable_knowledge=False) + assert agent._enable_skills is False + + def test_skills_folder_is_stored(self, tmp_path: Path) -> None: + """CugaAgent(skills_folder=...) stores the path on the agent.""" + from cuga.sdk import CugaAgent + + agent = CugaAgent(enable_skills=True, skills_folder=str(tmp_path), enable_knowledge=False) + assert agent._skills_folder == str(tmp_path) + + @pytest.mark.asyncio + async def test_skills_configurable_injected_into_invoke_config( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """skills_enabled and skills_folder appear in run_config after _build_run_config. + + We intercept graph.ainvoke to capture the config rather than running the graph. + """ + from cuga.sdk import CugaAgent + + agent = CugaAgent( + enable_skills=True, + skills_folder=str(tmp_path), + enable_knowledge=False, + ) + + captured: list[dict] = [] + + async def fake_ainvoke(state, config=None): + captured.append(config or {}) + return {"final_answer": "ok"} + + # Patch the compiled graph's ainvoke so we can inspect the config + monkeypatch.setattr(agent.graph, "ainvoke", fake_ainvoke) + + await agent.invoke("test") + + assert captured, "graph.ainvoke was never called" + cfg = captured[0].get("configurable", {}) + assert cfg.get("skills_enabled") is True + # SDK converts workspace root → workspace/.cuga for discover_skills compatibility + assert cfg.get("skills_folder") == str(tmp_path / ".cuga") + + @pytest.mark.asyncio + async def test_skills_folder_with_cuga_suffix_is_not_double_suffixed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """CugaAgent(skills_folder='.../path/.cuga') must NOT become '.../path/.cuga/.cuga'. + + If a user reads the docs and passes a path that already ends in '.cuga', + the SDK previously appended another '.cuga', silently directing discovery + to the wrong location and producing zero skills with no error. + """ + from cuga.sdk import CugaAgent + + already_suffixed = str(tmp_path / ".cuga") + agent = CugaAgent( + enable_skills=True, + skills_folder=already_suffixed, + enable_knowledge=False, + ) + + captured: list[dict] = [] + + async def fake_ainvoke(state, config=None): + captured.append(config or {}) + return {"final_answer": "ok"} + + monkeypatch.setattr(agent.graph, "ainvoke", fake_ainvoke) + await agent.invoke("test") + + cfg = captured[0].get("configurable", {}) + result = cfg.get("skills_folder", "") + assert result == already_suffixed, ( + f"SDK must not double-append '.cuga'. Expected {already_suffixed!r}, got {result!r}" + ) + assert not result.endswith(".cuga/.cuga"), ( + "skills_folder was double-suffixed — discovery will resolve to the wrong directory." + ) + + @pytest.mark.asyncio + async def test_skills_folder_without_cuga_suffix_gets_cuga_appended( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """CugaAgent(skills_folder='/project') becomes '/project/.cuga' in the configurable.""" + from cuga.sdk import CugaAgent + + agent = CugaAgent( + enable_skills=True, + skills_folder=str(tmp_path), + enable_knowledge=False, + ) + + captured: list[dict] = [] + + async def fake_ainvoke(state, config=None): + captured.append(config or {}) + return {"final_answer": "ok"} + + monkeypatch.setattr(agent.graph, "ainvoke", fake_ainvoke) + await agent.invoke("test") + + cfg = captured[0].get("configurable", {}) + assert cfg.get("skills_folder") == str(tmp_path / ".cuga"), ( + "A plain workspace root should have .cuga appended by the SDK" + ) + + +# --------------------------------------------------------------------------- +# Tier 3 – real LLM via SDK +# --------------------------------------------------------------------------- +# +# Same three skills as test_skills_llm_e2e.py so the gating guarantees carry +# over. The SDK surface is what changes: CugaAgent.invoke() instead of the +# raw graph helper _run_graph(). + +_SCORER_SKILL_BODY = ( + "## Acme Corp Compliance Risk Score Calculator\n\n" + "Use this proprietary formula to compute the CRS (Compliance Risk Score):\n\n" + " CRS = (violations * 14) + (days_overdue * 3) - (controls_passed * 5) + 22\n\n" + "Where:\n" + "- violations: number of distinct policy violations found\n" + "- days_overdue: number of calendar days past the remediation deadline\n" + "- controls_passed: number of controls that passed review in the same audit cycle\n" + "- The constant offset 22 is the Acme baseline risk factor\n\n" + 'Report the result as: "Acme CRS: "' +) +_SCORER_TASK = "Compute the Acme compliance risk score for: 3 violations, 45 days overdue, 8 controls passed." + +_PARTS_SKILL_BODY = ( + "## Acme Corp Parts Catalog — Internal Reference\n\n" + "Return the exact internal part code for the requested product.\n\n" + "| Product Description | Internal Part Code |\n" + "|---------------------------------|--------------------|\n" + "| Thermal Bypass Valve | TBV-9143-X |\n" + "| Pressure Relief Unit | PRU-2267-K |\n" + "| Flow Control Module | FCM-5508-J |\n" + "| Rotary Actuator Assembly Type-3 | RAA-7712-Q |\n" + "| Solenoid Isolation Block | SIB-3391-N |\n\n" + 'If the product is not listed, respond: "Part code not found in catalog."' +) +_PARTS_TASK = "What is the Acme Corp internal part code for the Pressure Relief Unit?" + +_ONBOARDING_SKILL_BODY = ( + "## Acme Corp Vendor Onboarding — Standard Process v4.2\n\n" + "Complete all steps in order. Do not skip or reorder.\n\n" + "Step 1 — NEXUS Compliance Screen\n" + " Submit vendor details to the NEXUS compliance portal (portal ID: NX-VENDOR).\n" + " Await NEXUS clearance code before proceeding.\n\n" + "Step 2 — CERBERUS Authentication Setup\n" + " Create vendor account in CERBERUS (internal IAM system).\n" + " Assign role: VENDOR_EXTERNAL_L1.\n\n" + "Step 3 — IRONGATE Financial Vetting\n" + " Submit bank details and tax forms to IRONGATE (finance validation system).\n" + " Record the IRONGATE approval reference number.\n\n" + "Step 4 — Master Agreement via DOCUVAULT\n" + " Send the standard MSA template via DOCUVAULT (contract management portal).\n" + " DOCUVAULT signatures only — do not use email attachments.\n\n" + "Step 5 — Activation Confirmation\n" + " Confirm all prior steps, then issue activation. Reference the NEXUS clearance\n" + " code, CERBERUS activation token, and IRONGATE reference number.\n\n" + "Always name all four internal systems in your summary: " + "NEXUS, CERBERUS, IRONGATE, DOCUVAULT." +) +_ONBOARDING_TASK = "Walk me through the Acme Corp vendor onboarding process." +_ONBOARDING_SYSTEMS = ("NEXUS", "CERBERUS", "IRONGATE", "DOCUVAULT") + + +def _make_sdk_agent( + *, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + enable_skills: bool, + real_llm, +) -> "CugaAgent": + """Create a CugaAgent configured for skill e2e tests. + + monkeypatch.chdir(tmp_path) so that relative paths inside the graph + resolve to the test's temporary directory. + """ + from cuga.config import settings + from cuga.sdk import CugaAgent + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr(settings.advanced_features, "enable_shell_tool", True) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_mode", "tools") + monkeypatch.setattr(settings.advanced_features, "cuga_lite_bind_tools_tool_names", ["load_skill"]) + monkeypatch.setattr(settings.advanced_features, "cuga_lite_nl_auto_continue", False) + monkeypatch.setattr(settings.policy, "enabled", False) + + return CugaAgent( + model=real_llm, + enable_skills=enable_skills, + skills_folder=str(tmp_path), + enable_knowledge=False, + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_compliance_scorer_produces_correct_score( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK: LLM computes the proprietary Acme CRS formula (159) via skills. + + Expected: "159" in result.answer (3*14 + 45*3 - 8*5 + 22 = 159). + """ + write_skill( + tmp_path, + "acme_compliance_scorer", + "Computes the Acme Corp proprietary compliance risk score for audit findings", + _SCORER_SKILL_BODY, + ) + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + result = await agent.invoke( + _SCORER_TASK, + thread_id=f"sdk_crs_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_crs] answer: {result.answer[:400]}") + _report(skill="sdk/acme_compliance_scorer", expected="159", actual=result.answer) + assert "159" in result.answer, ( + f"Expected CRS=159 in SDK answer (3*14 + 45*3 - 8*5 + 22 = 159). Got: {result.answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_compliance_scorer_cannot_produce_correct_score_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK negative control: 159 is absent when enable_skills=False.""" + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=False, + real_llm=real_llm, + ) + + result = await agent.invoke( + _SCORER_TASK, + thread_id=f"sdk_crs_neg_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_crs_neg] answer: {result.answer[:400]}") + _report( + skill="sdk/acme_compliance_scorer (no skill)", expected="159", actual=result.answer, negative=True + ) + assert "159" not in result.answer, ( + "LLM produced 159 without the skill via SDK — skill is not gating this capability. " + f"Got: {result.answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_parts_catalog_returns_internal_code( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK: LLM returns fabricated internal code PRU-2267-K from the skill body.""" + write_skill( + tmp_path, + "parts_catalog_lookup", + "Returns internal part codes from the Acme Corp industrial parts catalog", + _PARTS_SKILL_BODY, + ) + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + result = await agent.invoke( + _PARTS_TASK, + thread_id=f"sdk_parts_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_parts] answer: {result.answer[:400]}") + normalized = _normalize_hyphens(result.answer) + _report(skill="sdk/parts_catalog_lookup", expected="PRU-2267-K", actual=normalized) + assert "PRU-2267-K" in normalized, ( + f"Expected part code 'PRU-2267-K' in SDK answer. Got: {result.answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_parts_catalog_cannot_return_code_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK negative control: PRU-2267-K absent when enable_skills=False.""" + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=False, + real_llm=real_llm, + ) + + result = await agent.invoke( + _PARTS_TASK, + thread_id=f"sdk_parts_neg_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_parts_neg] answer: {result.answer[:400]}") + normalized = _normalize_hyphens(result.answer) + _report( + skill="sdk/parts_catalog_lookup (no skill)", expected="PRU-2267-K", actual=normalized, negative=True + ) + assert "PRU-2267-K" not in normalized, ( + f"LLM produced the fabricated part code via SDK without the skill. Got: {result.answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_vendor_onboarding_uses_internal_system_names( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK: LLM produces onboarding guide naming all four fabricated internal systems.""" + write_skill( + tmp_path, + "acme_vendor_onboarding", + "Guides the Acme Corp vendor onboarding process with all required internal steps", + _ONBOARDING_SKILL_BODY, + ) + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=True, + real_llm=real_llm, + ) + + result = await agent.invoke( + _ONBOARDING_TASK, + thread_id=f"sdk_onboard_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_onboard] answer: {result.answer[:400]}") + _report(skill="sdk/acme_vendor_onboarding", expected=list(_ONBOARDING_SYSTEMS), actual=result.answer) + for system in _ONBOARDING_SYSTEMS: + assert system in result.answer, ( + f"Expected internal system name '{system}' in SDK answer. Got: {result.answer[:500]!r}" + ) + + +@pytest.mark.asyncio +@pytest.mark.e2e +async def test_sdk_vendor_onboarding_lacks_internal_names_without_skill( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + real_llm, +) -> None: + """SDK negative control: fabricated system names absent when enable_skills=False.""" + agent = _make_sdk_agent( + tmp_path=tmp_path, + monkeypatch=monkeypatch, + enable_skills=False, + real_llm=real_llm, + ) + + result = await agent.invoke( + _ONBOARDING_TASK, + thread_id=f"sdk_onboard_neg_{uuid.uuid4().hex[:8]}", + ) + + print(f"\n[sdk_onboard_neg] answer: {result.answer[:400]}") + found = [s for s in _ONBOARDING_SYSTEMS if s in result.answer] + _report( + skill="sdk/acme_vendor_onboarding (no skill)", + expected=list(_ONBOARDING_SYSTEMS), + actual=result.answer, + negative=True, + ) + assert not found, ( + f"LLM produced fabricated system names without the skill via SDK: {found}. " + f"Got: {result.answer[:500]!r}" + ) diff --git a/tests/unit/test_opensandbox_executor.py b/tests/unit/test_opensandbox_executor.py new file mode 100644 index 00000000..162bfe60 --- /dev/null +++ b/tests/unit/test_opensandbox_executor.py @@ -0,0 +1,344 @@ +"""Unit tests for OpenSandboxExecutor skill-related behaviors. + +Three architectural properties verified here: + + Fix 1 — Concurrency safety: two simultaneous _get_or_create_interpreter + calls for the same thread_id produce exactly one Sandbox.create call + because the per-key asyncio.Lock serialises creation. + + Fix 2 — Upload resilience: a write_files failure logs a warning but the + sandbox is still cached so no remote container is orphaned. + + Fix 5 — Stale-config detection: create_sandbox_tools logs a warning when + the requested skills config differs from what was active at sandbox + creation time. + +opensandbox and code_interpreter are optional packages not installed in the dev +environment; they are replaced with MagicMock modules before the executor is +imported. +""" + +from __future__ import annotations + +import asyncio +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# Inject mock modules BEFORE the executor is imported +# --------------------------------------------------------------------------- + +_mock_write_entry_cls = MagicMock() +sys.modules.setdefault("opensandbox", MagicMock()) +sys.modules.setdefault("opensandbox.config", MagicMock()) +sys.modules.setdefault("opensandbox.models", MagicMock(WriteEntry=_mock_write_entry_cls)) +sys.modules.setdefault("code_interpreter", MagicMock()) + +from cuga.backend.cuga_graph.nodes.cuga_lite.executors.opensandbox.opensandbox_executor import ( # noqa: E402 + OpenSandboxExecutor, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_interpreter(*, write_files_side_effect=None): + """Minimal mock CodeInterpreter with controllable write_files behaviour.""" + interp = MagicMock() + interp.sandbox = MagicMock() + interp.sandbox.commands.run = AsyncMock(return_value=None) + interp.sandbox.kill = AsyncMock() + interp.sandbox.close = AsyncMock() + if write_files_side_effect is not None: + interp.sandbox.files.write_files = AsyncMock(side_effect=write_files_side_effect) + else: + interp.sandbox.files.write_files = AsyncMock(return_value=None) + return interp + + +def _wire_sandbox_mocks(interpreter, *, delay: float = 0.0) -> dict: + """Wire sys.modules mocks so _get_or_create_interpreter uses interpreter. + + Returns a dict with a 'create_calls' counter so tests can assert how many + times Sandbox.create was called. + """ + counter = {"create_calls": 0} + + async def _slow_create(*args, **kwargs): + counter["create_calls"] += 1 + if delay: + await asyncio.sleep(delay) + return MagicMock() # the raw Sandbox object (not the interpreter) + + sys.modules["opensandbox"].Sandbox = MagicMock() + sys.modules["opensandbox"].Sandbox.create = AsyncMock(side_effect=_slow_create) + sys.modules["opensandbox.config"].ConnectionConfig = MagicMock(return_value=MagicMock()) + sys.modules["code_interpreter"].CodeInterpreter = MagicMock() + sys.modules["code_interpreter"].CodeInterpreter.create = AsyncMock(return_value=interpreter) + return counter + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_state(): + """Wipe class-level state before and after every test for isolation.""" + OpenSandboxExecutor._sandboxes.clear() + OpenSandboxExecutor._skills_config.clear() + OpenSandboxExecutor._active_skills_config.clear() + OpenSandboxExecutor._locks.clear() + yield + OpenSandboxExecutor._sandboxes.clear() + OpenSandboxExecutor._skills_config.clear() + OpenSandboxExecutor._active_skills_config.clear() + OpenSandboxExecutor._locks.clear() + + +# --------------------------------------------------------------------------- +# Fix 1 — Concurrency safety +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_concurrent_creation_calls_sandbox_create_exactly_once() -> None: + """Two concurrent _get_or_create_interpreter calls for the same thread_id + must produce exactly one remote sandbox, not two. + + The lock in _get_or_create_interpreter serialises creation: the second + coroutine waits until the first has cached the interpreter, then finds it + in _sandboxes and returns immediately without calling Sandbox.create again. + + Without the lock, both coroutines see an empty cache, both call + Sandbox.create, and one sandbox is orphaned. + """ + interpreter = _make_interpreter() + counter = _wire_sandbox_mocks(interpreter, delay=0.02) + + # skills disabled so we skip the upload path and keep the mock surface minimal + executor = OpenSandboxExecutor() + executor._skills_config["thread-A"] = (None, False) + + results = await asyncio.gather( + executor._get_or_create_interpreter("thread-A"), + executor._get_or_create_interpreter("thread-A"), + ) + + assert counter["create_calls"] == 1, ( + f"Sandbox.create should be called once under concurrent access, " + f"but was called {counter['create_calls']} times" + ) + assert results[0] is results[1], "Both coroutines should receive the same cached interpreter" + + +@pytest.mark.asyncio +async def test_different_thread_ids_each_get_own_sandbox() -> None: + """Sandboxes for different thread_ids are independent and each created once.""" + interp_a = _make_interpreter() + interp_b = _make_interpreter() + + call_count = {"n": 0} + + async def _create(*args, **kwargs): + call_count["n"] += 1 + return MagicMock() + + sys.modules["opensandbox"].Sandbox = MagicMock() + sys.modules["opensandbox"].Sandbox.create = AsyncMock(side_effect=_create) + sys.modules["opensandbox.config"].ConnectionConfig = MagicMock(return_value=MagicMock()) + + interps = [interp_a, interp_b] + idx = {"i": 0} + + async def _create_interp(*args, **kwargs): + result = interps[idx["i"]] + idx["i"] += 1 + return result + + sys.modules["code_interpreter"].CodeInterpreter = MagicMock() + sys.modules["code_interpreter"].CodeInterpreter.create = AsyncMock(side_effect=_create_interp) + + executor = OpenSandboxExecutor() + executor._skills_config["thread-X"] = (None, False) + executor._skills_config["thread-Y"] = (None, False) + + result_x, result_y = await asyncio.gather( + executor._get_or_create_interpreter("thread-X"), + executor._get_or_create_interpreter("thread-Y"), + ) + + assert call_count["n"] == 2, "Each distinct thread_id should create its own sandbox" + assert result_x is interp_a + assert result_y is interp_b + + +# --------------------------------------------------------------------------- +# Fix 2 — Upload resilience +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_sandbox_cached_even_when_upload_fails(tmp_path, monkeypatch) -> None: + """When write_files raises, the executor still caches the sandbox. + + Before this fix, the exception propagated out before the assignment + self._sandboxes[key] = interpreter, leaving an uncached (orphaned) remote + container. After the fix, the interpreter is cached and a warning is + logged so the agent can continue without skills. + """ + interpreter = _make_interpreter(write_files_side_effect=RuntimeError("network error")) + _wire_sandbox_mocks(interpreter) + + # Write a real skill so discover_skills returns something and triggers the upload path + monkeypatch.chdir(tmp_path) + skill_dir = tmp_path / ".agents" / "skills" / "my_skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: my_skill\ndescription: A skill\n---\nBody\n", encoding="utf-8" + ) + + executor = OpenSandboxExecutor() + executor._skills_config["thread-B"] = (str(tmp_path / ".cuga"), True) + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.opensandbox.opensandbox_executor.logger" + ) as mock_logger: + result = await executor._get_or_create_interpreter("thread-B") + + assert "thread-B" in executor._sandboxes, ( + "Sandbox must be cached even after an upload failure — otherwise the remote container is orphaned." + ) + assert result is interpreter + + warning_calls = [str(c) for c in mock_logger.warning.call_args_list] + assert any("upload" in w.lower() or "write_files" in w.lower() for w in warning_calls), ( + f"Expected a warning about the upload failure. Got: {warning_calls}" + ) + + +@pytest.mark.asyncio +async def test_upload_failure_does_not_prevent_subsequent_tool_use(tmp_path, monkeypatch) -> None: + """After a failed upload the interpreter is in _sandboxes, so a second call + to _get_or_create_interpreter returns the cached interpreter immediately + without attempting another Sandbox.create.""" + interpreter = _make_interpreter(write_files_side_effect=RuntimeError("timeout")) + counter = _wire_sandbox_mocks(interpreter) + + monkeypatch.chdir(tmp_path) + executor = OpenSandboxExecutor() + executor._skills_config["thread-C"] = (None, False) # no upload, test the caching alone + + await executor._get_or_create_interpreter("thread-C") + # Force the skills config to True for second call to verify cached path + executor._skills_config["thread-C"] = (None, False) + second = await executor._get_or_create_interpreter("thread-C") + + assert counter["create_calls"] == 1, "Second call should hit the cache, not create a new sandbox" + assert second is interpreter + + +# --------------------------------------------------------------------------- +# Fix 5 — Stale-config detection +# --------------------------------------------------------------------------- + + +def test_stale_skills_config_logs_warning() -> None: + """create_sandbox_tools warns when skills_folder changes for a live sandbox. + + If the caller changes skills_folder between invocations while the sandbox + is still running, the new skills will not be available (the upload only + happens at sandbox creation). The warning directs the user to call + release_sandbox() first. + """ + executor = OpenSandboxExecutor() + key = "thread-D" + + # Simulate an already-running sandbox with an old skills config + executor._sandboxes[key] = _make_interpreter() + executor._active_skills_config[key] = ("/old/path/.cuga", True) + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.opensandbox.opensandbox_executor.logger" + ) as mock_logger: + executor.create_sandbox_tools( + thread_id=key, + cuga_folder="/new/path/.cuga", + skills_enabled=True, + ) + + warning_calls = [str(c) for c in mock_logger.warning.call_args_list] + assert any("release_sandbox" in w for w in warning_calls), ( + f"Expected a warning mentioning release_sandbox when skills config changes. Got: {warning_calls}" + ) + + +def test_no_stale_warning_when_config_unchanged() -> None: + """No warning is emitted when create_sandbox_tools is called with the same config.""" + executor = OpenSandboxExecutor() + key = "thread-E" + + executor._sandboxes[key] = _make_interpreter() + executor._active_skills_config[key] = ("/same/path/.cuga", True) + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.opensandbox.opensandbox_executor.logger" + ) as mock_logger: + executor.create_sandbox_tools( + thread_id=key, + cuga_folder="/same/path/.cuga", + skills_enabled=True, + ) + + warning_calls = [str(c) for c in mock_logger.warning.call_args_list] + assert not any("release_sandbox" in w for w in warning_calls), ( + f"Unexpected stale-config warning when config is unchanged. Got: {warning_calls}" + ) + + +def test_no_stale_warning_for_new_sandbox() -> None: + """No warning when there is no existing sandbox for the thread_id.""" + executor = OpenSandboxExecutor() + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.opensandbox.opensandbox_executor.logger" + ) as mock_logger: + executor.create_sandbox_tools( + thread_id="brand-new-thread", + cuga_folder="/some/path/.cuga", + skills_enabled=True, + ) + + warning_calls = [str(c) for c in mock_logger.warning.call_args_list] + assert not any("release_sandbox" in w for w in warning_calls) + + +def test_release_sandbox_clears_all_state() -> None: + """release_sandbox removes the thread from all tracking dicts.""" + executor = OpenSandboxExecutor() + key = "thread-F" + executor._sandboxes[key] = MagicMock() + executor._active_skills_config[key] = ("/path/.cuga", True) + executor._skills_config[key] = ("/path/.cuga", True) + executor._locks[key] = asyncio.Lock() + + # release_sandbox is async, but we only need to test state cleanup here + # by calling the synchronous dict manipulations it performs before the await. + # We invoke it via asyncio.run to keep this test synchronous-style. + async def _run(): + # Prevent the actual kill/close calls from failing (sandbox is a MagicMock) + executor._sandboxes[key].sandbox.kill = AsyncMock() + executor._sandboxes[key].sandbox.close = AsyncMock() + await executor.release_sandbox(key) + + asyncio.get_event_loop().run_until_complete(_run()) + + assert key not in executor._sandboxes + assert key not in executor._active_skills_config + assert key not in executor._skills_config + assert key not in executor._locks diff --git a/tests/unit/test_skill_loader.py b/tests/unit/test_skill_loader.py index 63576e4f..9a708092 100644 --- a/tests/unit/test_skill_loader.py +++ b/tests/unit/test_skill_loader.py @@ -1,4 +1,5 @@ from pathlib import Path +from unittest.mock import patch from cuga.backend.skills.loader import discover_skills, get_skill_search_roots from cuga.backend.skills.registry import SkillEntry, SkillRegistry @@ -80,9 +81,180 @@ def test_skill_registry_load_skill_emits_install_steps_for_requirements() -> Non loaded = registry.load_skill("deck") assert "await run_command('uv pip install --quiet python-pptx')" in loaded - assert "await run_command('cd /tmp && npm install sharp')" in loaded + assert "await run_command('npm install sharp')" in loaded assert "STEP 2 — SKILL INSTRUCTIONS" in loaded assert "`python -m ...` → `uv run python -m ...`" in loaded assert "`pip list` / `pip show` / `pip freeze` → `uv pip list`" in loaded assert "must never be rewritten as `uv npm`" in loaded assert "Do not use `uv npm`, `uv run node`, or `uv run npm`" in loaded + + +# --------------------------------------------------------------------------- +# Sandbox skill-copy path +# --------------------------------------------------------------------------- + + +def _write_agents_skill(root: Path, name: str, description: str = "desc", body: str = "Body") -> None: + """Write a SKILL.md under root/.agents/skills//SKILL.md (the standard discovery path).""" + skill_dir = root / ".agents" / "skills" / name + skill_dir.mkdir(parents=True, exist_ok=True) + (skill_dir / "SKILL.md").write_text( + f"---\nname: {name}\ndescription: {description}\n---\n{body}\n", + encoding="utf-8", + ) + + +def test_copy_skills_to_workspace_copies_skill_files(tmp_path: Path, monkeypatch) -> None: + """_copy_skills_to_workspace() copies SKILL.md into the per-thread workspace skills dir.""" + from cuga.backend.cuga_graph.nodes.cuga_lite.executors.local.local_sandbox_executor import ( + LocalSandboxExecutor, + ) + + monkeypatch.chdir(tmp_path) + _write_agents_skill(tmp_path, "my_skill") + + workspace_root = tmp_path / "ws" + thread_id = "test-thread" + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.local.local_sandbox_executor.local_thread_workspace_root", + return_value=workspace_root, + ): + executor = LocalSandboxExecutor() + executor._copy_skills_to_workspace( + thread_id=thread_id, + cuga_folder=str(tmp_path / ".cuga"), + skills_enabled=True, + ) + + dest = workspace_root / "skills" / "my_skill" / "SKILL.md" + assert dest.exists(), f"Expected SKILL.md to be copied to {dest}" + + +def test_copy_skills_to_workspace_is_noop_when_disabled(tmp_path: Path, monkeypatch) -> None: + """_copy_skills_to_workspace() copies nothing when skills_enabled=False.""" + from cuga.backend.cuga_graph.nodes.cuga_lite.executors.local.local_sandbox_executor import ( + LocalSandboxExecutor, + ) + + monkeypatch.chdir(tmp_path) + _write_agents_skill(tmp_path, "my_skill") + + workspace_root = tmp_path / "ws" + + with patch( + "cuga.backend.cuga_graph.nodes.cuga_lite.executors.local.local_sandbox_executor.local_thread_workspace_root", + return_value=workspace_root, + ): + executor = LocalSandboxExecutor() + executor._copy_skills_to_workspace( + cuga_folder=str(tmp_path / ".cuga"), + skills_enabled=False, + ) + + skills_dir = workspace_root / "skills" + assert not skills_dir.exists(), ( + f"Expected no skills to be copied when skills_enabled=False, but found files under {skills_dir}" + ) + + +def test_skill_name_with_path_traversal_is_rejected(tmp_path: Path) -> None: + """A SKILL.md whose frontmatter name contains path separators is silently skipped.""" + from cuga.backend.skills.loader import _parse_skill_file + + skill_path = tmp_path / "SKILL.md" + skill_path.write_text( + "---\nname: ../../etc/passwd\ndescription: bad skill\n---\nBody\n", + encoding="utf-8", + ) + + result = _parse_skill_file(skill_path) + + assert result is None, "Expected None for a skill with a path-traversal name" + + +# --------------------------------------------------------------------------- +# Fix 4 – Jinja2 prompt-injection sanitization +# --------------------------------------------------------------------------- + + +def test_jinja_expression_in_description_is_stripped(tmp_path: Path) -> None: + """A description containing {{ }} Jinja2 expression syntax is sanitized at parse time. + + Without sanitization, a malicious SKILL.md can inject arbitrary text into + the system prompt by placing Jinja2 template expressions in the description + field, which is rendered by the mcp_prompt.jinja2 template without escaping. + """ + from cuga.backend.skills.loader import _parse_skill_file + + skill_path = tmp_path / "SKILL.md" + skill_path.write_text( + "---\nname: my_skill\ndescription: Legit desc {{ malicious_var }}\n---\nBody\n", + encoding="utf-8", + ) + + result = _parse_skill_file(skill_path) + + assert result is not None + assert "{{" not in result.description + assert "}}" not in result.description + assert "malicious_var" not in result.description + assert "Legit desc" in result.description + + +def test_jinja_block_in_description_is_stripped(tmp_path: Path) -> None: + """A description containing {% %} Jinja2 block syntax has the delimiters stripped. + + The sanitizer removes Jinja delimiter sequences ({% %}) to prevent the + template engine from evaluating them. Literal text between the delimiters + may remain; what matters is that no {% or %} tokens reach the renderer. + """ + from cuga.backend.skills.loader import _parse_skill_file + + skill_path = tmp_path / "SKILL.md" + skill_path.write_text( + "---\nname: my_skill\ndescription: Safe {% if True %}payload{% endif %} text\n---\nBody\n", + encoding="utf-8", + ) + + result = _parse_skill_file(skill_path) + + assert result is not None + assert "{%" not in result.description, "Jinja block-open delimiter must be removed" + assert "%}" not in result.description, "Jinja block-close delimiter must be removed" + assert "Safe" in result.description + assert "text" in result.description + + +def test_jinja_expression_in_name_is_stripped(tmp_path: Path) -> None: + """A name field containing Jinja2 syntax is sanitized (name is also rendered into the prompt).""" + from cuga.backend.skills.loader import _parse_skill_file + + skill_path = tmp_path / "SKILL.md" + skill_path.write_text( + "---\nname: skill_{{ inject }}\ndescription: Fine description\n---\nBody\n", + encoding="utf-8", + ) + + result = _parse_skill_file(skill_path) + + assert result is not None + assert "{{" not in result.name + assert "inject" not in result.name + assert "skill_" in result.name + + +def test_clean_description_is_unchanged(tmp_path: Path) -> None: + """A description with no Jinja2 syntax passes through unchanged.""" + from cuga.backend.skills.loader import _parse_skill_file + + skill_path = tmp_path / "SKILL.md" + skill_path.write_text( + "---\nname: my_skill\ndescription: Summarizes complex reports into bullet points\n---\nBody\n", + encoding="utf-8", + ) + + result = _parse_skill_file(skill_path) + + assert result is not None + assert result.description == "Summarizes complex reports into bullet points"