Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions browser/agent-harness/cli_anything/browser/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
python -m pytest cli_anything/browser/tests/test_core.py -v
"""

import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from cli_anything.browser.core.session import Session
from cli_anything.browser.core import page, fs
from cli_anything.browser.utils import domshell_backend as backend


# ── Session Tests ────────────────────────────────────────────────
Expand Down Expand Up @@ -387,3 +389,45 @@ def test_normal_mode_does_not_use_daemon(self):
result = fs.list_elements(sess)

mock_ls.assert_called_once_with("/", use_daemon=False)


class TestBackendResultNormalization:
"""测试后端返回结构标准化。"""

def test_parse_ls_text_to_entries(self):
"""应把 ls 文本结果解析为 entries 列表。"""
text = " windows/ (1 windows)\n tabs/ (44 tabs)"
Comment on lines +394 to +399
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new test docstrings are in Chinese while the surrounding test file uses English docstrings/comments. For consistency and easier maintenance by the rest of the team, please translate these docstrings to English (or adopt a consistent bilingual style across the file).

Copilot uses AI. Check for mistakes.
result = backend._parse_ls_text("/", text)
assert result["path"] == "/"
assert len(result["entries"]) == 2
assert result["entries"][0]["name"] == "windows"
assert result["entries"][0]["path"] == "/windows"

def test_normalize_ls_call_tool_result(self):
"""应把 CallToolResult 转为 fs ls 可直接消费的字典。"""
mock_result = type(
"MockCallToolResult",
(),
{
"isError": False,
"content": [
type("Text", (), {"text": " windows/ (1 windows)"})()
],
},
)()
normalized = backend._normalize_tool_result(
"domshell_ls",
mock_result,
{"options": "/"},
)
assert "entries" in normalized
assert normalized["entries"][0]["path"] == "/windows"

def test_safe_aexit_timeout(self):
"""关闭超时应被吞掉,避免主流程卡死。"""
class SlowContext:
async def __aexit__(self, exc_type, exc, tb):
await asyncio.sleep(0.05)

with patch.object(backend, "MCP_CLOSE_TIMEOUT_SECONDS", 0.01):
asyncio.run(backend._safe_aexit(SlowContext()))
180 changes: 157 additions & 23 deletions browser/agent-harness/cli_anything/browser/utils/domshell_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
# DOMSHELL_TOKEN — auth token (required, must match the running server)
# DOMSHELL_PORT — MCP HTTP port of the running server (default: 3001)
DEFAULT_SERVER_CMD = "npx"
MCP_CALL_TIMEOUT_SECONDS = float(os.environ.get("DOMSHELL_MCP_CALL_TIMEOUT", "15"))
MCP_CLOSE_TIMEOUT_SECONDS = float(os.environ.get("DOMSHELL_MCP_CLOSE_TIMEOUT", "2"))


Comment on lines +29 to 32
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These timeout constants are parsed with float() at import time; an invalid env value (e.g., "15s") will raise ValueError and prevent the harness/CLI from importing. Consider parsing defensively (try/except with fallback to defaults) and/or validating that the value is > 0.

Suggested change
MCP_CALL_TIMEOUT_SECONDS = float(os.environ.get("DOMSHELL_MCP_CALL_TIMEOUT", "15"))
MCP_CLOSE_TIMEOUT_SECONDS = float(os.environ.get("DOMSHELL_MCP_CLOSE_TIMEOUT", "2"))
def _get_timeout_seconds(env_var_name: str, default: float) -> float:
"""Parse a positive timeout value from the environment, with fallback."""
raw_value = os.environ.get(env_var_name)
if raw_value is None:
return default
try:
timeout = float(raw_value)
except (TypeError, ValueError):
return default
if timeout <= 0:
return default
return timeout
MCP_CALL_TIMEOUT_SECONDS = _get_timeout_seconds("DOMSHELL_MCP_CALL_TIMEOUT", 15.0)
MCP_CLOSE_TIMEOUT_SECONDS = _get_timeout_seconds("DOMSHELL_MCP_CLOSE_TIMEOUT", 2.0)

Copilot uses AI. Check for mistakes.
def _build_server_args() -> list[str]:
Expand All @@ -45,6 +47,47 @@ def _build_server_args() -> list[str]:
"--token", token,
]


def _should_ignore_loop_error(context: dict[str, Any]) -> bool:
"""判断是否忽略已知的 anyio 关闭阶段噪声异常。"""
exc = context.get("exception")
if not exc:
return False
text = str(exc)
return "Attempted to exit cancel scope in a different task" in text


def _run_sync(coro):
"""以可控事件循环执行协程,避免 asyncio.run 关闭阶段噪声。"""
loop = asyncio.new_event_loop()
Comment on lines +51 to +62
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new docstrings/comments here are in Chinese while the rest of this module’s docstrings are in English. To keep the harness maintainable for the broader contributor base, please translate these docstrings/comments to English (or keep bilingual text consistently across the file).

Copilot uses AI. Check for mistakes.
old_loop = None
try:
old_loop = asyncio.get_event_loop_policy().get_event_loop()
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_sync() calls get_event_loop_policy().get_event_loop() to capture the previous loop. In Python 3.11+, this can implicitly create a new event loop when none exists, and then you restore it at the end without closing it, leaking a loop for the rest of the process. Consider restoring to None unless there was an explicitly set loop, or using get_running_loop() to detect an active loop and otherwise leaving the policy’s current loop unset.

Suggested change
old_loop = asyncio.get_event_loop_policy().get_event_loop()
old_loop = asyncio.get_running_loop()

Copilot uses AI. Check for mistakes.
except RuntimeError:
old_loop = None
try:
asyncio.set_event_loop(loop)
default_handler = loop.get_exception_handler()

def _exception_handler(current_loop, context):
if _should_ignore_loop_error(context):
return
if default_handler is not None:
default_handler(current_loop, context)
else:
current_loop.default_exception_handler(context)

loop.set_exception_handler(_exception_handler)
return loop.run_until_complete(coro)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
except Exception:
# 关闭阶段异常不应影响工具结果
pass
loop.close()
asyncio.set_event_loop(old_loop)

# Daemon mode: persistent MCP connection
_daemon_session: Optional[ClientSession] = None
_daemon_read: Optional[Any] = None
Expand Down Expand Up @@ -134,7 +177,7 @@ async def _call_tool(
# Use persistent daemon connection
try:
result = await _daemon_session.call_tool(tool_name, arguments)
return result
return _normalize_tool_result(tool_name, result, arguments)
except Exception as e:
Comment on lines 179 to 181
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_call_tool() is now returning normalized dicts (via _normalize_tool_result) rather than the raw MCP CallToolResult, but its signature/docstring still imply it returns the MCP server result unchanged (-> Any). Please update the type hint/docstring to reflect the new return contract so callers and tests don’t have to guess.

Copilot uses AI. Check for mistakes.
# Daemon died, fall back to spawning new server
await _stop_daemon()
Expand All @@ -144,19 +187,111 @@ async def _call_tool(
command=DEFAULT_SERVER_CMD,
args=_build_server_args()
)

try:
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments)
return result
# 说明:DOMShell 在某些环境下会在 __aexit__ 阶段阻塞,导致命令无输出卡住。
# 这里改为一次性连接后直接执行并返回,避免被退出清理阻塞。
client_ctx = stdio_client(server_params)
read, write = await client_ctx.__aenter__()
session_ctx = ClientSession(read, write)
session = await session_ctx.__aenter__()
await session.initialize()
result = await asyncio.wait_for(
session.call_tool(tool_name, arguments),
timeout=MCP_CALL_TIMEOUT_SECONDS
)
return _normalize_tool_result(tool_name, result, arguments)
Comment on lines +193 to +202
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one-shot code path manually calls aenter on stdio_client / ClientSession but never calls aexit for either context. That can leak subprocesses/pipes (and potentially keep an orphaned npx/domshell-proxy running after the CLI prints output). Consider adding a finally block that closes both contexts using the new _safe_aexit() (with timeout) so you still avoid the observed hang while ensuring resources are released.

Copilot uses AI. Check for mistakes.
except Exception as e:
raise RuntimeError(
f"DOMShell MCP call failed: {e}\n"
f"Ensure Chrome is running with DOMShell extension installed.\n"
f"Chrome Web Store: https://chromewebstore.google.com/detail/domshell"
) from e
async def _safe_aexit(ctx: Any) -> None:
"""带超时保护地关闭异步上下文,避免退出阶段卡死。"""
if ctx is None:
return
try:
await asyncio.wait_for(
ctx.__aexit__(None, None, None),
timeout=MCP_CLOSE_TIMEOUT_SECONDS
)
except Exception:
# 退出阶段失败不应影响主流程
pass


def _extract_text_content(result: Any) -> str:
"""从 MCP 结果中提取 text 内容。"""
content_items = getattr(result, "content", None)
if not content_items:
return ""
texts: list[str] = []
for item in content_items:
text = getattr(item, "text", None)
if text:
texts.append(str(text))
return "\n".join(texts).strip()


def _parse_ls_text(path: str, text: str) -> dict:
"""将 DOMShell ls 的文本输出解析为结构化 entries。"""
entries: list[dict[str, str]] = []
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
# 兼容: windows/ (1 windows)
name_part = stripped.split("(")[0].strip()
if not name_part:
continue
clean_name = name_part.rstrip("/")
if path in ("", "/"):
entry_path = f"/{clean_name}"
else:
entry_path = f"{path.rstrip('/')}/{clean_name}"
entries.append(
{
"name": clean_name,
"role": "container",
"path": entry_path,
}
)
return {"path": path or "/", "entries": entries, "raw_text": text}


def _parse_grep_text(text: str) -> dict:
"""将 DOMShell grep 的文本输出解析为 matches 列表。"""
matches = []
for line in text.splitlines():
stripped = line.strip()
if stripped:
matches.append(stripped)
return {"matches": matches, "raw_text": text}


def _normalize_tool_result(tool_name: str, result: Any, arguments: dict) -> dict:
"""将 mcp.types.CallToolResult 统一转换为 CLI 可消费字典。"""
if isinstance(result, dict):
return result

is_error = getattr(result, "isError", False)
text = _extract_text_content(result)

if tool_name == "domshell_ls":
parsed = _parse_ls_text(arguments.get("options", "/"), text)
elif tool_name == "domshell_grep":
parsed = _parse_grep_text(text)
elif tool_name == "domshell_cd":
parsed = {
"path": arguments.get("path", "/"),
"raw_text": text,
}
else:
parsed = {"raw_text": text}

if is_error:
parsed["error"] = text or f"{tool_name} failed"
return parsed

# NOTE: Known limitation - Daemon mode uses asyncio.run() per tool call (in sync wrappers).
# Each asyncio.run() creates a new event loop. Async IO objects created in one loop
Expand Down Expand Up @@ -206,11 +341,10 @@ async def _stop_daemon() -> None:
return

try:
await _daemon_session.__aexit__(None, None, None)
if _daemon_client_context:
await _daemon_client_context.__aexit__(None, None, None)
await _safe_aexit(_daemon_session)
await _safe_aexit(_daemon_client_context)
except Exception:
pass # Ignore cleanup errors
pass
finally:
_daemon_session = None
_daemon_read = None
Expand Down Expand Up @@ -239,7 +373,7 @@ def ls(path: str = "/", use_daemon: bool = False) -> dict:
>>> ls("/")
{"path": "/", "entries": [{"name": "main", "role": "landmark", ...}]}
"""
result = asyncio.run(_call_tool("domshell_ls", {"options": path}, use_daemon))
result = _run_sync(_call_tool("domshell_ls", {"options": path}, use_daemon))
return result


Expand All @@ -257,7 +391,7 @@ def cd(path: str, use_daemon: bool = False) -> dict:
>>> cd("/main/div[0]")
{"path": "/main/div[0]", "element": {...}}
"""
result = asyncio.run(_call_tool("domshell_cd", {"path": path}, use_daemon))
result = _run_sync(_call_tool("domshell_cd", {"path": path}, use_daemon))
return result


Expand All @@ -275,7 +409,7 @@ def cat(path: str, use_daemon: bool = False) -> dict:
>>> cat("/main/button[0]")
{"name": "Submit", "role": "button", "text": "Submit", ...}
"""
result = asyncio.run(_call_tool("domshell_cat", {"name": path}, use_daemon))
result = _run_sync(_call_tool("domshell_cat", {"name": path}, use_daemon))
return result


Expand All @@ -295,7 +429,7 @@ def grep(pattern: str, use_daemon: bool = False) -> dict:
>>> grep("Login")
{"matches": ["/main/button[0]", "/main/link[1]"]}
"""
result = asyncio.run(_call_tool(
result = _run_sync(_call_tool(
"domshell_grep",
{"pattern": pattern},
use_daemon
Expand All @@ -317,7 +451,7 @@ def click(path: str, use_daemon: bool = False) -> dict:
>>> click("/main/button[0]")
{"action": "click", "path": "/main/button[0]", "status": "success"}
"""
result = asyncio.run(_call_tool("domshell_click", {"name": path}, use_daemon))
result = _run_sync(_call_tool("domshell_click", {"name": path}, use_daemon))
return result


Expand All @@ -335,7 +469,7 @@ def open_url(url: str, use_daemon: bool = False) -> dict:
>>> open_url("https://example.com")
{"url": "https://example.com", "status": "loaded"}
"""
result = asyncio.run(_call_tool("domshell_open", {"url": url}, use_daemon))
result = _run_sync(_call_tool("domshell_open", {"url": url}, use_daemon))
return result


Expand All @@ -348,7 +482,7 @@ def reload(use_daemon: bool = False) -> dict:
Returns:
Dict with reload result
"""
result = asyncio.run(_call_tool("domshell_reload", {}, use_daemon))
result = _run_sync(_call_tool("domshell_reload", {}, use_daemon))
return result


Expand All @@ -361,7 +495,7 @@ def back(use_daemon: bool = False) -> dict:
Returns:
Dict with navigation result
"""
result = asyncio.run(_call_tool("domshell_back", {}, use_daemon))
result = _run_sync(_call_tool("domshell_back", {}, use_daemon))
return result


Expand All @@ -374,7 +508,7 @@ def forward(use_daemon: bool = False) -> dict:
Returns:
Dict with navigation result
"""
result = asyncio.run(_call_tool("domshell_forward", {}, use_daemon))
result = _run_sync(_call_tool("domshell_forward", {}, use_daemon))
return result


Expand Down Expand Up @@ -408,7 +542,7 @@ async def _focus_and_type():
await session.call_tool("domshell_focus", {"name": path})
return await session.call_tool("domshell_type", {"text": text})

return asyncio.run(_focus_and_type())
return _run_sync(_focus_and_type())


# ── Daemon control functions ───────────────────────────────────────────
Expand All @@ -422,9 +556,9 @@ def start_daemon() -> bool:
Raises:
RuntimeError: If daemon fails to start
"""
return asyncio.run(_start_daemon())
return _run_sync(_start_daemon())


def stop_daemon() -> None:
"""Stop persistent daemon mode (sync wrapper)."""
asyncio.run(_stop_daemon())
_run_sync(_stop_daemon())