diff --git a/README.md b/README.md index 4dc26fc9..89e3eede 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ MCPMark provides a reproducible, extensible benchmark for researchers and engine ## News +- πŸ”₯ **13 Dec** β€” Added auto-compaction support (`--compaction-token`) to summarize long conversations and avoid context overflow during evaluation ([#236](https://github.com/eval-sys/mcpmark/pull/236])). - πŸ… **02 Dec** β€” Evaluated `gemini-3-pro-preview` (thinking: low): **Pass@1 50.6%** Β± 2.3% β€” so close to `gpt-5-high` (51.6%)! Also `deepseek-v3.2-thinking` 36.8% and `deepseek-v3.2-chat` 29.7% - πŸ”₯ **02 Dec** β€” Obfuscate GitHub @mentions to prevent notification spam during evaluation ([#229](https://github.com/eval-sys/mcpmark/pull/229)) - πŸ… **01 Dec** β€” DeepSeek v3.2 uses MCPMark! Kudos on securing the best open-source model. [X Post](https://x.com/deepseek_ai/status/1995452650557763728) | [Technical Report](https://huggingface.co/deepseek-ai/DeepSeek-V3.2/resolve/main/assets/paper.pdf) @@ -150,6 +151,8 @@ Please visit `docs/introduction.md` for choices of *MODEL*. Tip: MCPMark supports **auto-resume**. When re-running, only unfinished tasks will execute. Failures matching our retryable patterns (see [RETRYABLE_PATTERNS](src/errors.py)) are retried automatically. Models may emit different error stringsβ€”if you encounter a new resumable error, please open a PR or issue. +Tip: MCPMark supports **auto-compaction**; pass `--compaction-token N` to enable automatic context summarization when prompt tokens reach `N` (use `999999999` to disable). + --- ## Service setup and authentication diff --git a/pipeline.py b/pipeline.py index ff292427..16e17894 100644 --- a/pipeline.py +++ b/pipeline.py @@ -76,6 +76,15 @@ def main(): parser.add_argument( "--timeout", type=int, default=3600, help="Timeout in seconds for agent execution" ) + parser.add_argument( + "--compaction-token", + type=int, + default=999_999_999, + help=( + "Auto-compact conversation when prompt tokens (from API usage) reach this limit. " + "Use 999999999 to disable compaction." + ), + ) parser.add_argument( "--reasoning-effort", default="default", @@ -155,6 +164,7 @@ def main(): reasoning_effort=args.reasoning_effort, agent_name=args.agent, task_suite=args.task_suite, + compaction_token=args.compaction_token, ) pipeline.run_evaluation(args.tasks) diff --git a/src/agents/base_agent.py b/src/agents/base_agent.py index faf0420b..a007b570 100644 --- a/src/agents/base_agent.py +++ b/src/agents/base_agent.py @@ -22,6 +22,7 @@ class BaseMCPAgent(ABC): STDIO_SERVICES = ["notion", "filesystem", "playwright", "playwright_webarena", "postgres", "insforge"] HTTP_SERVICES = ["github", "supabase"] DEFAULT_TIMEOUT = 600 + COMPACTION_DISABLED_TOKEN = 999_999_999 CLAUDE_THINKING_BUDGETS = { "low": 1024, @@ -39,6 +40,7 @@ def __init__( service_config: Optional[Dict[str, Any]] = None, service_config_provider: Optional[Callable[[], Dict[str, Any]]] = None, reasoning_effort: Optional[str] = "default", + compaction_token: int = COMPACTION_DISABLED_TOKEN, ): self.litellm_input_model_name = litellm_input_model_name self.api_key = api_key @@ -48,6 +50,7 @@ def __init__( self.service_config = service_config or {} self._service_config_provider = service_config_provider self.reasoning_effort = reasoning_effort or "default" + self.compaction_token = int(compaction_token) self.is_claude = self._is_anthropic_model(litellm_input_model_name) self.use_claude_thinking = self.is_claude and self.reasoning_effort != "default" @@ -249,6 +252,17 @@ def _create_http_server(self) -> MCPHttpServer: # Message/Tool formatting helpers # ------------------------------------------------------------------ + def _compaction_enabled(self) -> bool: + return 0 < self.compaction_token < self.COMPACTION_DISABLED_TOKEN + + def _count_prompt_tokens_litellm(self, messages: List[Dict[str, Any]]) -> int: + try: + from litellm import token_counter + + return int(token_counter(model=self.litellm_input_model_name, messages=messages) or 0) + except Exception: # pragma: no cover - best effort + return 0 + def _convert_to_sdk_format(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: sdk_format: List[Dict[str, Any]] = [] function_call_map: Dict[str, str] = {} @@ -489,4 +503,3 @@ def _convert_to_openai_format(self, tools: List[Dict[str, Any]]) -> List[Dict[st logger.info("Converted %d tools for Gemini compatibility", len(functions)) return functions - diff --git a/src/agents/mcpmark_agent.py b/src/agents/mcpmark_agent.py index 976ddb05..859af53f 100644 --- a/src/agents/mcpmark_agent.py +++ b/src/agents/mcpmark_agent.py @@ -50,6 +50,16 @@ class MCPMarkAgent(BaseMCPAgent): "You are a helpful agent that uses tools iteratively to complete the user's task, " "and when finished, provides the final answer or simply states \"Task completed\" without further tool calls." ) + COMPACTION_PROMPT = ( + "You are performing a CONTEXT CHECKPOINT COMPACTION.\n" + "Summarize the conversation so far for another model to continue.\n\n" + "Include:\n" + "- Current progress and key decisions made\n" + "- Important context, constraints, or user preferences\n" + "- What remains to be done (clear next steps)\n" + "- Any critical data, examples, or references needed to continue\n\n" + "Be concise and structured. Do NOT call tools." + ) DEFAULT_TIMEOUT = BaseMCPAgent.DEFAULT_TIMEOUT def __init__( @@ -62,6 +72,7 @@ def __init__( service_config: Optional[Dict[str, Any]] = None, service_config_provider: Optional[Callable[[], Dict[str, Any]]] = None, reasoning_effort: Optional[str] = "default", + compaction_token: int = BaseMCPAgent.COMPACTION_DISABLED_TOKEN, ): super().__init__( litellm_input_model_name=litellm_input_model_name, @@ -72,6 +83,7 @@ def __init__( service_config=service_config, service_config_provider=service_config_provider, reasoning_effort=reasoning_effort, + compaction_token=compaction_token, ) logger.debug( "Initialized MCPMarkAgent for '%s' with model '%s' (Claude: %s, Thinking: %s, Reasoning: %s)", @@ -303,6 +315,177 @@ async def _call_claude_native_api( return None, e.response.text except Exception as e: return None, e + + async def _count_claude_input_tokens( + self, + messages: List[Dict[str, Any]], + tools: Optional[List[Dict]] = None, + system: Optional[str] = None, + ) -> int: + import os + + api_base = os.getenv("ANTHROPIC_API_BASE", "https://api.anthropic.com") + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload: Dict[str, Any] = { + "model": self.litellm_input_model_name.replace("anthropic/", ""), + "messages": messages, + } + if tools: + payload["tools"] = tools + if system: + payload["system"] = system + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{api_base}/v1/messages/count_tokens", + headers=headers, + json=payload, + timeout=self.timeout, + ) + response.raise_for_status() + data = response.json() or {} + return int(data.get("input_tokens", 0) or 0) + + def _extract_litellm_text(self, response: Any) -> str: + try: + choices = getattr(response, "choices", None) or [] + if not choices: + return "" + msg = getattr(choices[0], "message", None) + if msg is not None: + return str(getattr(msg, "content", "") or "") + return str(getattr(choices[0], "text", "") or "") + except Exception: # pragma: no cover - best effort + return "" + + def _extract_anthropic_text(self, response_json: Dict[str, Any]) -> str: + pieces: List[str] = [] + for block in response_json.get("content", []) or []: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text") + if text: + pieces.append(str(text)) + return "\n".join(pieces).strip() + + def _merge_usage(self, total_tokens: Dict[str, int], usage: Dict[str, Any]) -> None: + try: + input_tokens = int(usage.get("input_tokens", 0) or 0) + output_tokens = int(usage.get("output_tokens", 0) or 0) + total_tokens_count = int(usage.get("total_tokens", 0) or (input_tokens + output_tokens)) + total_tokens["input_tokens"] += input_tokens + total_tokens["output_tokens"] += output_tokens + total_tokens["total_tokens"] += total_tokens_count + except Exception: # pragma: no cover - best effort + return + + async def _maybe_compact_litellm_messages( + self, + messages: List[Dict[str, Any]], + total_tokens: Dict[str, int], + tool_call_log_file: Optional[str], + current_prompt_tokens: int, + ) -> List[Dict[str, Any]]: + if not self._compaction_enabled(): + return messages + if current_prompt_tokens < self.compaction_token: + return messages + + logger.info(f"| [compaction] Triggered at prompt tokens: {current_prompt_tokens:,}") + if tool_call_log_file: + try: + with open(tool_call_log_file, "a", encoding="utf-8") as f: + f.write(f"| [compaction] Triggered at prompt tokens: {current_prompt_tokens:,}\n") + except Exception: + pass + + compact_messages = [ + {"role": "system", "content": self.COMPACTION_PROMPT}, + {"role": "user", "content": json.dumps(messages, ensure_ascii=False)}, + ] + completion_kwargs = { + "model": self.litellm_input_model_name, + "messages": compact_messages, + "api_key": self.api_key, + } + if self.base_url: + completion_kwargs["base_url"] = self.base_url + response = await litellm.acompletion(**completion_kwargs) + + usage = getattr(response, "usage", None) + if usage: + input_tokens = getattr(usage, "prompt_tokens", None) or getattr(usage, "input_tokens", None) or 0 + output_tokens = getattr(usage, "completion_tokens", None) or getattr(usage, "output_tokens", None) or 0 + total_tokens_count = getattr(usage, "total_tokens", None) + if total_tokens_count is None: + total_tokens_count = input_tokens + output_tokens + total_tokens["input_tokens"] += int(input_tokens or 0) + total_tokens["output_tokens"] += int(output_tokens or 0) + total_tokens["total_tokens"] += int(total_tokens_count or 0) + + summary = self._extract_litellm_text(response).strip() or "(no summary)" + system_msg = messages[0] if messages else {"role": "system", "content": self.SYSTEM_PROMPT} + first_user = messages[1] if len(messages) > 1 else {"role": "user", "content": ""} + return [ + system_msg, + first_user, + { + "role": "user", + "content": f"Context summary (auto-compacted due to token limit):\n{summary}", + }, + ] + + async def _maybe_compact_anthropic_messages( + self, + messages: List[Dict[str, Any]], + total_tokens: Dict[str, int], + thinking_budget: int, + tool_call_log_file: Optional[str], + current_input_tokens: int, + ) -> List[Dict[str, Any]]: + if not self._compaction_enabled(): + return messages + if current_input_tokens < self.compaction_token: + return messages + + logger.info(f"| [compaction] Triggered at input tokens: {current_input_tokens:,}") + if tool_call_log_file: + try: + with open(tool_call_log_file, "a", encoding="utf-8") as f: + f.write(f"| [compaction] Triggered at input tokens: {current_input_tokens:,}\n") + except Exception: + pass + + compact_messages = [ + {"role": "user", "content": self.COMPACTION_PROMPT}, + {"role": "user", "content": json.dumps(messages, ensure_ascii=False)}, + ] + response, error_msg = await self._call_claude_native_api( + messages=compact_messages, + thinking_budget=thinking_budget, + tools=None, + system=None, + ) + if error_msg or not response: + logger.warning(f"| [compaction] Failed: {error_msg}") + return messages + + usage = response.get("usage", {}) or {} + input_tokens = usage.get("input_tokens", 0) or 0 + output_tokens = usage.get("output_tokens", 0) or 0 + total_tokens["input_tokens"] += int(input_tokens) + total_tokens["output_tokens"] += int(output_tokens) + total_tokens["total_tokens"] += int(input_tokens + output_tokens) + + summary = self._extract_anthropic_text(response) or "(no summary)" + first_user = messages[0] if messages else {"role": "user", "content": ""} + return [ + first_user, + {"role": "user", "content": f"Context summary (auto-compacted due to token limit):\n{summary}"}, + ] async def _execute_anthropic_native_tool_loop( @@ -327,9 +510,29 @@ async def _execute_anthropic_native_tool_loop( system_text = self.SYSTEM_PROMPT # Record initial state self._update_progress(messages, total_tokens, turn_count) - + for _ in range(max_turns): turn_count += 1 + + current_input_tokens = 0 + if self._compaction_enabled(): + try: + current_input_tokens = await self._count_claude_input_tokens( + messages=messages, + tools=tools, + system=system_text, + ) + except Exception as exc: # noqa: BLE001 + logger.debug("Claude token counting failed: %s", exc) + + messages = await self._maybe_compact_anthropic_messages( + messages=messages, + total_tokens=total_tokens, + thinking_budget=thinking_budget, + tool_call_log_file=tool_call_log_file, + current_input_tokens=current_input_tokens, + ) + self._update_progress(messages, total_tokens, turn_count) # Call Claude native API response, error_msg = await self._call_claude_native_api( @@ -584,9 +787,20 @@ async def _execute_litellm_tool_loop( # Record initial state self._update_progress(messages, total_tokens, turn_count) - + try: while turn_count < max_turns: + current_prompt_tokens = 0 + if self._compaction_enabled(): + current_prompt_tokens = self._count_prompt_tokens_litellm(messages) + + messages = await self._maybe_compact_litellm_messages( + messages=messages, + total_tokens=total_tokens, + tool_call_log_file=tool_call_log_file, + current_prompt_tokens=current_prompt_tokens, + ) + self._update_progress(messages, total_tokens, turn_count) # Build completion kwargs completion_kwargs = { @@ -626,7 +840,15 @@ async def _execute_litellm_tool_loop( if consecutive_failures >= max_consecutive_failures: raise if "ContextWindowExceededError" in str(e): - raise + # Best-effort fallback: compact and retry once. + messages = await self._maybe_compact_litellm_messages( + messages=messages, + total_tokens=total_tokens, + tool_call_log_file=tool_call_log_file, + current_prompt_tokens=self.compaction_token, + ) + self._update_progress(messages, total_tokens, turn_count) + continue elif "RateLimitError" in str(e): await asyncio.sleep(12 ** consecutive_failures) else: @@ -794,12 +1016,6 @@ async def _execute_litellm_tool_loop( } - - # ==================== Format Conversion Methods ==================== - - - - # ==================== MCP Server Management ==================== async def _create_mcp_server(self) -> Any: diff --git a/src/agents/react_agent.py b/src/agents/react_agent.py index 53312653..a8b85940 100644 --- a/src/agents/react_agent.py +++ b/src/agents/react_agent.py @@ -25,6 +25,16 @@ class ReActAgent(BaseMCPAgent): "or the phrase \"Task completed.\" if no further detail is required. " "Every reply must be valid JSON without code fences." ) + COMPACTION_PROMPT = ( + "You are performing a CONTEXT CHECKPOINT COMPACTION.\n" + "Summarize the conversation so far for another model to continue.\n\n" + "Include:\n" + "- Current progress and key decisions made\n" + "- Important context, constraints, or user preferences\n" + "- What remains to be done (clear next steps)\n" + "- Any critical data, examples, or references needed to continue\n\n" + "Be concise and structured. Do NOT call tools." + ) def __init__( self, @@ -38,6 +48,7 @@ def __init__( reasoning_effort: Optional[str] = "default", max_iterations: int = 100, system_prompt: Optional[str] = None, + compaction_token: int = BaseMCPAgent.COMPACTION_DISABLED_TOKEN, ): super().__init__( litellm_input_model_name=litellm_input_model_name, @@ -48,6 +59,7 @@ def __init__( service_config=service_config, service_config_provider=service_config_provider, reasoning_effort=reasoning_effort, + compaction_token=compaction_token, ) self.max_iterations = max_iterations self.react_system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT @@ -141,6 +153,76 @@ async def _execute_react_loop( self._update_progress(messages, total_tokens, turn_count) for step in range(1, self.max_iterations + 1): + current_prompt_tokens = 0 + if self._compaction_enabled(): + current_prompt_tokens = self._count_prompt_tokens_litellm(messages) + + if self._compaction_enabled() and current_prompt_tokens >= self.compaction_token: + logger.info( + f"| [compaction] Triggered at prompt tokens: {current_prompt_tokens:,}" + ) + if tool_call_log_file: + try: + with open(tool_call_log_file, "a", encoding="utf-8") as log_file: + log_file.write( + f"| [compaction] Triggered at prompt tokens: {current_prompt_tokens:,}\n" + ) + except Exception: # noqa: BLE001 + pass + + compact_messages = [ + {"role": "system", "content": self.COMPACTION_PROMPT}, + {"role": "user", "content": json.dumps(messages, ensure_ascii=False)}, + ] + compact_kwargs = { + "model": self.litellm_input_model_name, + "messages": compact_messages, + "api_key": self.api_key, + } + if self.base_url: + compact_kwargs["base_url"] = self.base_url + + compact_response = await litellm.acompletion(**compact_kwargs) + usage = getattr(compact_response, "usage", None) + if usage: + prompt_tokens = ( + getattr(usage, "prompt_tokens", None) + or getattr(usage, "input_tokens", None) + or 0 + ) + completion_tokens = ( + getattr(usage, "completion_tokens", None) + or getattr(usage, "output_tokens", None) + or 0 + ) + total_tokens_count = getattr(usage, "total_tokens", None) + if total_tokens_count is None: + total_tokens_count = prompt_tokens + completion_tokens + + total_tokens["input_tokens"] += int(prompt_tokens or 0) + total_tokens["output_tokens"] += int(completion_tokens or 0) + total_tokens["total_tokens"] += int(total_tokens_count or 0) + + summary = "" + try: + summary = compact_response.choices[0].message.content or "" + except Exception: # noqa: BLE001 + summary = "" + summary = summary.strip() or "(no summary)" + + messages = [ + system_message, + task_message, + { + "role": "user", + "content": ( + "Context summary (auto-compacted due to token limit):\n" + f"{summary}" + ), + }, + ] + self._update_progress(messages, total_tokens, turn_count) + completion_kwargs = { "model": self.litellm_input_model_name, "messages": messages, @@ -163,6 +245,8 @@ async def _execute_react_loop( except Exception as exc: # noqa: BLE001 final_error = f"LLM call failed on step {step}: {exc}" logger.error(final_error) + if "ContextWindowExceededError" in str(exc): + continue break if turn_count == 0 and getattr(response, "model", None): diff --git a/src/evaluator.py b/src/evaluator.py index 01518408..25be3fcc 100644 --- a/src/evaluator.py +++ b/src/evaluator.py @@ -28,6 +28,7 @@ def __init__( reasoning_effort: str = "default", agent_name: str = "mcpmark", task_suite: str = "standard", + compaction_token: int = 0, ): # Main configuration self.mcp_service = mcp_service @@ -72,6 +73,7 @@ def __init__( service_config=self.service_config, service_config_provider=self.state_manager.get_service_config_for_agent, reasoning_effort=self.reasoning_effort, + compaction_token=compaction_token, ) # Initialize results reporter