Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,29 @@ def _command_env(self, workspace_root: Path, venv: Path) -> dict[str, str]:
env["PATH"] = bindir + os.pathsep + env.get("PATH", "")
return env

def _copy_skills_to_workspace(self, thread_id: Optional[str] = None) -> None:
def _copy_skills_to_workspace(
self,
thread_id: Optional[str] = None,
cuga_folder: Optional[str] = None,
skills_enabled: Optional[bool] = None,
) -> None:
"""Copy discovered skill folders into the per-thread /workspace/skills directory."""
from cuga.config import settings

if not getattr(settings.skills, "enabled", False):
enabled = skills_enabled if skills_enabled is not None else getattr(settings.skills, "enabled", False)
if not enabled:
return
try:
from cuga.backend.skills.loader import discover_skills
except Exception:
return

cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or (
getattr(settings.policy, "cuga_folder", None) or ""
).strip()
skill_entries = discover_skills(cuga_folder or None)
resolved_folder = (
cuga_folder
or (os.getenv("CUGA_FOLDER") or "").strip()
or (getattr(settings.policy, "cuga_folder", None) or "").strip()
)
skill_entries = discover_skills(resolved_folder or None)

copied = 0
for skill_entry in skill_entries:
Expand Down Expand Up @@ -236,14 +244,19 @@ async def run_command(cmd: str) -> str:

return run_command

def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]:
def create_sandbox_tools(
self,
thread_id: Optional[str] = None,
cuga_folder: Optional[str] = None,
skills_enabled: Optional[bool] = None,
) -> list[StructuredTool]:
"""Return the run_command StructuredTool for local (unsandboxed) execution.

Filesystem tools (read/write/list/edit/...) are no longer produced
here — they come from the consolidated ``filesystem`` package via
``create_filesystem_tools`` (see ``cuga_lite_graph``).
"""
self._copy_skills_to_workspace(thread_id)
self._copy_skills_to_workspace(thread_id, cuga_folder=cuga_folder, skills_enabled=skills_enabled)
return [
StructuredTool.from_function(
coroutine=self.create_run_command_tool(thread_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,29 @@ async def _ensure_venv(self) -> None:
"subsequent run_command calls will report `activate: No such file or directory`"
)

def _copy_skills_to_workspace(self, thread_id: Optional[str] = None) -> None:
def _copy_skills_to_workspace(
self,
thread_id: Optional[str] = None,
cuga_folder: Optional[str] = None,
skills_enabled: Optional[bool] = None,
) -> None:
"""Copy discovered skill folders into the hidden per-thread /workspace/skills directory."""
from cuga.config import settings

if not getattr(settings.skills, "enabled", False):
enabled = skills_enabled if skills_enabled is not None else getattr(settings.skills, "enabled", False)
if not enabled:
return
try:
from cuga.backend.skills.loader import discover_skills
except Exception:
return

import os

cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or (
getattr(settings.policy, "cuga_folder", None) or ""
).strip()
skill_entries = discover_skills(cuga_folder or None)
resolved_folder = (
cuga_folder
or (os.getenv("CUGA_FOLDER") or "").strip()
or (getattr(settings.policy, "cuga_folder", None) or "").strip()
)
skill_entries = discover_skills(resolved_folder or None)

copied = 0
for skill_entry in skill_entries:
Expand Down Expand Up @@ -271,14 +277,19 @@ async def run_command(cmd: str) -> str:

return run_command

def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]:
def create_sandbox_tools(
self,
thread_id: Optional[str] = None,
cuga_folder: Optional[str] = None,
skills_enabled: Optional[bool] = None,
) -> list[StructuredTool]:
"""Return the run_command StructuredTool for native macOS sandbox-exec.

Filesystem tools (read/write/list/edit/...) now come from the
consolidated ``filesystem`` package via ``create_filesystem_tools``
(see ``cuga_lite_graph``); they are no longer produced here.
"""
self._copy_skills_to_workspace(thread_id)
self._copy_skills_to_workspace(thread_id, cuga_folder=cuga_folder, skills_enabled=skills_enabled)
return [
StructuredTool.from_function(
coroutine=self.create_run_command_tool(thread_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from __future__ import annotations

import asyncio
import os
from datetime import timedelta
from pathlib import Path
Expand Down Expand Up @@ -88,6 +89,22 @@ class OpenSandboxExecutor(RemoteExecutor):

# Interpreter cache: thread_id -> CodeInterpreter instance
_sandboxes: dict[str, Any] = {}
# Per-thread skills config requested on the current invocation
_skills_config: dict[str, tuple[Optional[str], Optional[bool]]] = {}
# Skills config that was actually applied when the sandbox was created
_active_skills_config: dict[str, tuple[Optional[str], Optional[bool]]] = {}
# Per-key asyncio locks to prevent concurrent sandbox creation for the same thread
_locks: dict[str, asyncio.Lock] = {}

def _get_key_lock(self, key: str) -> asyncio.Lock:
"""Return (creating if necessary) the asyncio.Lock for key.

Safe to call without holding any other lock: in asyncio's single-threaded
model there is no preemption between the dict check and the assignment.
"""
if key not in self._locks:
self._locks[key] = asyncio.Lock()
return self._locks[key]

# ------------------------------------------------------------------ #
# Sandbox lifecycle #
Expand All @@ -100,56 +117,75 @@ def _get_connection_config(self):
return ConnectionConfig(domain=domain)

async def _get_or_create_interpreter(self, thread_id: Optional[str] = None) -> Any:
"""Return a cached CodeInterpreter for thread_id, creating one if needed."""
"""Return a cached CodeInterpreter for thread_id, creating one if needed.

A per-key asyncio.Lock prevents concurrent coroutines from creating
duplicate sandboxes for the same thread_id.
"""
from opensandbox import Sandbox # type: ignore[import]
from code_interpreter import CodeInterpreter # type: ignore[import]

key = thread_id or "_default"
existing = self._sandboxes.get(key)
if existing is not None:
try:
await existing.sandbox.commands.run("true")
return existing
except Exception:
logger.debug(f"[OpenSandboxExecutor] Interpreter for thread={key} is dead, recreating")
self._sandboxes.pop(key, None)

cfg = _cfg()
image = getattr(cfg, "opensandbox_image", "opensandbox/code-interpreter:v1.0.2")
timeout_s = int(getattr(cfg, "opensandbox_timeout_seconds", 600))
python_version = getattr(cfg, "opensandbox_python_version", "3.11")
entrypoint = getattr(cfg, "opensandbox_entrypoint", "/opt/opensandbox/code-interpreter.sh")
conn = self._get_connection_config()

sandbox = await Sandbox.create(
image,
entrypoint=[entrypoint],
env={"PYTHON_VERSION": python_version},
timeout=timedelta(seconds=timeout_s),
connection_config=conn,
)
interpreter = await CodeInterpreter.create(sandbox)

# Ensure the agent-facing workspace and shared Python venv exist.
# Agents see /workspace; /tmp/.venv remains an internal implementation detail.
await interpreter.sandbox.commands.run(
f"mkdir -p {VIRTUAL_WORKSPACE_ROOT} && cd {VIRTUAL_WORKSPACE_ROOT} && "
"(command -v uv >/dev/null 2>&1 || python -m pip install --quiet uv) && "
f"uv venv {VENV_PATH}"
)

if getattr(_cfg(), "enabled", False):
await self._upload_skills_to_sandbox(interpreter)
async with self._get_key_lock(key):
existing = self._sandboxes.get(key)
if existing is not None:
try:
await existing.sandbox.commands.run("true")
return existing
except Exception:
logger.debug(f"[OpenSandboxExecutor] Interpreter for thread={key} is dead, recreating")
self._sandboxes.pop(key, None)

cfg = _cfg()
image = getattr(cfg, "opensandbox_image", "opensandbox/code-interpreter:v1.0.2")
timeout_s = int(getattr(cfg, "opensandbox_timeout_seconds", 600))
python_version = getattr(cfg, "opensandbox_python_version", "3.11")
entrypoint = getattr(cfg, "opensandbox_entrypoint", "/opt/opensandbox/code-interpreter.sh")
conn = self._get_connection_config()

sandbox = await Sandbox.create(
image,
entrypoint=[entrypoint],
env={"PYTHON_VERSION": python_version},
timeout=timedelta(seconds=timeout_s),
connection_config=conn,
)
interpreter = await CodeInterpreter.create(sandbox)

# Ensure the agent-facing workspace and shared Python venv exist.
# Agents see /workspace; /tmp/.venv remains an internal implementation detail.
await interpreter.sandbox.commands.run(
f"mkdir -p {VIRTUAL_WORKSPACE_ROOT} && cd {VIRTUAL_WORKSPACE_ROOT} && "
"(command -v uv >/dev/null 2>&1 || python -m pip install --quiet uv) && "
f"uv venv {VENV_PATH}"
)

self._sandboxes[key] = interpreter
logger.info(f"[OpenSandboxExecutor] Created interpreter for thread={key}")
return interpreter
skill_cuga_folder, skill_enabled = self._skills_config.get(key, (None, None))
enabled = skill_enabled if skill_enabled is not None else getattr(_cfg(), "enabled", False)
if enabled:
try:
await self._upload_skills_to_sandbox(interpreter, cuga_folder=skill_cuga_folder)
except Exception as exc:
# Log and continue — the sandbox is still usable; skills companion
# files will not be available, but the agent can still run commands.
logger.warning(
f"[OpenSandboxExecutor] Skills upload failed for thread={key}: {exc}. "
"Sandbox is cached but skill companion files may be unavailable."
)

# Cache the interpreter regardless of upload outcome to avoid leaking
# orphaned remote containers on repeated failures.
self._sandboxes[key] = interpreter
self._active_skills_config[key] = (skill_cuga_folder, skill_enabled)
logger.info(f"[OpenSandboxExecutor] Created interpreter for thread={key}")
return interpreter

async def get_interpreter_for_thread(self, thread_id: Optional[str] = None) -> Any:
"""Return the CodeInterpreter for thread_id, creating the sandbox if needed (workspace API, tools)."""
return await self._get_or_create_interpreter(thread_id)

async def _upload_skills_to_sandbox(self, interpreter: Any) -> None:
async def _upload_skills_to_sandbox(self, interpreter: Any, cuga_folder: Optional[str] = None) -> None:
"""Upload discovered skill folders into /workspace/skills/ in the sandbox.

Mirrors ``cuga.backend.skills.loader.discover_skills`` precedence:
Expand All @@ -159,8 +195,12 @@ async def _upload_skills_to_sandbox(self, interpreter: Any) -> None:
from opensandbox.models import WriteEntry # type: ignore[import]
from cuga.backend.skills.loader import discover_skills

cuga_folder = (os.getenv("CUGA_FOLDER") or "").strip() or (settings.policy.cuga_folder or "").strip()
skill_entries = discover_skills(cuga_folder or None)
resolved_folder = (
cuga_folder
or (os.getenv("CUGA_FOLDER") or "").strip()
or (settings.policy.cuga_folder or "").strip()
)
skill_entries = discover_skills(resolved_folder or None)

entries_by_path: dict[str, WriteEntry] = {}
for skill_entry in skill_entries:
Expand All @@ -185,16 +225,23 @@ async def _upload_skills_to_sandbox(self, interpreter: Any) -> None:

if entries_by_path:
entries = list(entries_by_path.values())
await interpreter.sandbox.files.write_files(entries)
logger.info(
f"[OpenSandboxExecutor] Uploaded {len(entries)} skill files to "
f"{VIRTUAL_WORKSPACE_ROOT}/skills/"
)
try:
await interpreter.sandbox.files.write_files(entries)
logger.info(
f"[OpenSandboxExecutor] Uploaded {len(entries)} skill files to "
f"{VIRTUAL_WORKSPACE_ROOT}/skills/"
)
except Exception as exc:
logger.warning(f"[OpenSandbox] write_files failed ({len(entries)} skill files): {exc}")
raise

async def release_sandbox(self, thread_id: Optional[str] = None) -> None:
"""Kill and remove the cached interpreter/sandbox for a thread."""
key = thread_id or "_default"
interpreter = self._sandboxes.pop(key, None)
self._active_skills_config.pop(key, None)
self._skills_config.pop(key, None)
self._locks.pop(key, None)
if interpreter:
try:
await interpreter.sandbox.kill()
Expand Down Expand Up @@ -232,13 +279,20 @@ async def run_command(cmd: str) -> str:

return run_command

def create_sandbox_tools(self, thread_id: Optional[str] = None) -> list[StructuredTool]:
def create_sandbox_tools(
self,
thread_id: Optional[str] = None,
cuga_folder: Optional[str] = None,
skills_enabled: Optional[bool] = None,
) -> list[StructuredTool]:
"""Return the run_command StructuredTool bound to thread_id.

Filesystem tools (read/write/list/edit/...) now come from the
consolidated ``filesystem`` package via a ``RemoteSandboxBackend``
(wired in ``cuga_lite_graph``); they are no longer produced here.
"""
key = thread_id or "_default"
self._skills_config[key] = (cuga_folder, skills_enabled)
Comment on lines +294 to +295

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Warn when a live sandbox's skills config changes.

Line 295 updates _skills_config, but _get_or_create_interpreter() returns the cached live interpreter on Lines 131-135 before any new config can be applied. That means a different cuga_folder/skills_enabled is silently ignored until release_sandbox() is called, which breaks the per-call override behavior and the stale-config warning path exercised in tests/unit/test_opensandbox_executor.py:251-278.

Suggested fix
         key = thread_id or "_default"
-        self._skills_config[key] = (cuga_folder, skills_enabled)
+        requested = (cuga_folder, skills_enabled)
+        active = self._active_skills_config.get(key)
+        if key in self._sandboxes and active is not None and requested != active:
+            logger.warning(
+                "[OpenSandboxExecutor] Skills config changed for a live sandbox; "
+                "call release_sandbox() before requesting different "
+                "cuga_folder/skills_enabled values."
+            )
+        self._skills_config[key] = requested
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/cuga/backend/cuga_graph/nodes/cuga_lite/executors/opensandbox/opensandbox_executor.py`
around lines 294 - 295, When updating _skills_config for a given key (computed
from thread_id or "_default"), first check if a live interpreter exists in
self._interpreters for that same key and compare its active config to the
incoming (cuga_folder, skills_enabled); if they differ, emit a warning that the
new config will be ignored until release_sandbox() is called (so callers know
the per-call override won’t take effect). Implement this check alongside the
existing assignment in the same place where _skills_config is updated,
referencing _get_or_create_interpreter, _interpreters, _skills_config, and
release_sandbox in the message so the log is clear and matches the tested
behavior.

return [
StructuredTool.from_function(
coroutine=self.create_run_command_tool(thread_id),
Expand Down
5 changes: 5 additions & 0 deletions src/cuga/backend/cuga_graph/nodes/cuga_lite/prompt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,11 @@ def create_mcp_prompt(
processed_apps = format_apps_for_prompt(apps)

if not enable_shell_tool:
if skills_enabled:
logger.warning(
"Skills are enabled but enable_shell_tool=False; the skills block will be suppressed. "
"Set advanced_features.enable_shell_tool=true to activate skills."
)
skills_enabled = False
skills_prompt_section = ""

Expand Down
Loading
Loading