diff --git a/docs/quickstart.md b/docs/quickstart.md index d71a211..9d0a11a 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -25,6 +25,9 @@ pip install "aastf[openai-agents]" # PydanticAI support pip install "aastf[pydantic-ai]" +# AutoGen support +pip install "aastf[autogen]" + # All adapters pip install "aastf[all]" ``` @@ -57,6 +60,7 @@ Replace `myapp.agent:create_agent` with the dotted path to your agent factory fu | CrewAI | CrewAI | `--adapter crewai` | | OpenAI Agents | OpenAI Agents SDK | `--adapter openai_agents` | | PydanticAI | PydanticAI | `--adapter pydantic_ai` | +| AutoGen | AutoGen (AG2) | `--adapter autogen` | ## Understanding Results diff --git a/pyproject.toml b/pyproject.toml index 17f5355..23ef784 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ crewai = ["crewai>=0.28"] pydantic-ai = ["pydantic-ai>=0.0.13"] google-adk = ["google-adk>=0.1"] ms-agent = ["semantic-kernel>=1.0"] +autogen = ["autogen-agentchat>=0.4"] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", @@ -54,7 +55,7 @@ dev = [ "mypy>=1.10", "types-PyYAML>=6.0", ] -all = ["aastf[langgraph,openai-agents,crewai,pydantic-ai,google-adk,ms-agent]"] +all = ["aastf[langgraph,openai-agents,crewai,pydantic-ai,google-adk,ms-agent,autogen]"] [project.scripts] aastf = "aastf.cli.app:main" diff --git a/src/aastf/harness/adapters/autogen.py b/src/aastf/harness/adapters/autogen.py new file mode 100644 index 0000000..00cc694 --- /dev/null +++ b/src/aastf/harness/adapters/autogen.py @@ -0,0 +1,156 @@ +""" +AutoGen (AG2 / autogen-agentchat) adapter — instruments an AutoGen agent for AASTF scenario runs. + +AutoGen agents (e.g. AssistantAgent) expose ``run(task=...)`` as an async, +non-blocking entry point and accept tools as plain callables registered at +construction time. As with the PydanticAI and generic adapters, tools are +wrapped with the ``@instrument`` decorator before being handed to the agent +factory, so tool calls are captured into the active TraceCollector via +contextvars without needing to monkeypatch AutoGen internals. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +import anyio + +from ...exceptions import AdapterNotFoundError +from ...models.scenario import AttackScenario, InjectionPoint +from ...models.trace import AgentTrace +from ..collector import TraceCollector +from .generic import instrument, set_collector + +if TYPE_CHECKING: + from ...sandbox.server import SandboxServer + +try: + import autogen_agentchat # noqa: F401 + + HAS_AUTOGEN = True +except ImportError: + HAS_AUTOGEN = False + + +class AutoGenHarness: + """ + Harness for AutoGen (AG2 / autogen-agentchat) agents. + + agent_factory: callable(tools: list) -> autogen_agentchat agent instance + The tools list contains @instrument-decorated async callables wired to + the sandbox. The returned agent must expose an async ``run(task=...)`` + method that returns a result with a ``.messages`` sequence. + """ + + def __init__( + self, + agent_factory: Callable[..., Any], + sandbox: SandboxServer, + timeout: float = 30.0, + ) -> None: + if not HAS_AUTOGEN: + raise AdapterNotFoundError( + "autogen-agentchat is required. Install with: pip install 'aastf[autogen]'" + ) + self._factory = agent_factory + self._sandbox = sandbox + self._timeout = timeout + + async def run_scenario(self, scenario: AttackScenario) -> AgentTrace: + self._sandbox.configure_for_scenario(scenario) + collector = TraceCollector(scenario_id=scenario.id, adapter="autogen") + + tools = self._create_instrumented_tools(scenario) + agent = self._factory(tools) + + if agent is None or not callable(getattr(agent, "run", None)): + collector.set_error( + "autogen agent_factory must return an agent with a callable run(); " + f"got {type(agent).__name__}" + ) + return collector.build_trace() + + task = self._build_input(scenario) + + set_collector(collector) + try: + with anyio.move_on_after(self._timeout) as _timeout_scope: + result = await agent.run(task=task) + collector.set_final_output(self._extract_final_output(result)) + self._record_run_structure(result, collector) + if _timeout_scope.cancelled_caught: + collector.set_error( + f"Agent execution exceeded the {self._timeout:.0f}s timeout" + ) + except Exception as e: + collector.set_error(str(e)) + finally: + set_collector(None) + + trace = collector.build_trace() + trace.metadata["tool_call_count"] = len(trace.tool_invocations) + return trace + + @staticmethod + def _extract_final_output(result: Any) -> str: + """The last message's text content is the agent's final output.""" + messages = getattr(result, "messages", None) or [] + if messages: + last = messages[-1] + content = getattr(last, "content", last) + return str(content) + return str(result) + + @staticmethod + def _record_run_structure(result: Any, collector: TraceCollector) -> None: + """ + Derive iteration_count from the TaskResult message list. Each message + produced by the assistant (TextMessage / ToolCallSummaryMessage, etc.) + represents one planning/response turn. + """ + messages = getattr(result, "messages", None) or [] + iterations = 0 + for m in messages: + source = getattr(m, "source", None) + if source and source != "user": + iterations += 1 + for _ in range(max(iterations, 1) if messages else 1): + collector.increment_iteration() + + def _create_instrumented_tools(self, scenario: AttackScenario) -> list: + """Create @instrument-decorated async callables for AutoGen tools.""" + import httpx + + from ...sandbox.tools import validate_tool_name + + tools: list = [] + sandbox_url = self._sandbox.base_url + + for name in scenario.available_tools: + validate_tool_name(name) + + async def _tool(tool_name: str = name, **kwargs: Any) -> dict: + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{sandbox_url}/tools/{tool_name}", + json=kwargs, + timeout=10.0, + ) + try: + return resp.json() + except Exception: + return {"raw": resp.text} + + _tool.__name__ = name + _tool.__doc__ = f"Sandbox-wired tool: {name}" + tools.append(instrument(_tool, name=name)) + + return tools + + def _build_input(self, scenario: AttackScenario) -> str: + if scenario.inject_into == InjectionPoint.USER_MESSAGE: + return scenario.payload + if scenario.inject_into == InjectionPoint.SYSTEM_PROMPT: + return f"[SYSTEM] {scenario.payload}\n[USER] Proceed with the task." + return "Research AI safety topics and provide a summary." diff --git a/src/aastf/runner.py b/src/aastf/runner.py index 5e447be..a003f04 100644 --- a/src/aastf/runner.py +++ b/src/aastf/runner.py @@ -165,10 +165,14 @@ def _build_harness(self, sandbox: SandboxServer): # type: ignore[return] from .harness.adapters.ms_agent import MSAgentHarness return MSAgentHarness(factory, sandbox, timeout=self._config.timeout_seconds) + elif adapter == "autogen": + from .harness.adapters.autogen import AutoGenHarness + + return AutoGenHarness(factory, sandbox, timeout=self._config.timeout_seconds) raise AdapterNotFoundError( f"Unknown adapter: {adapter!r}. " "Supported: langgraph, crewai, openai_agents, pydantic_ai, generic, " - "mcp, google_adk, ms_agent" + "mcp, google_adk, ms_agent, autogen" ) async def _run_one(self, harness: Any, scenario: AttackScenario) -> TestResult: diff --git a/tests/unit/test_autogen_adapter.py b/tests/unit/test_autogen_adapter.py new file mode 100644 index 0000000..6dc62de --- /dev/null +++ b/tests/unit/test_autogen_adapter.py @@ -0,0 +1,140 @@ +"""Unit tests for AutoGen adapter.""" + +from __future__ import annotations + +import pytest + + +class TestAutoGenHarnessImport: + def test_raises_adapter_not_found_when_autogen_missing(self): + """If autogen-agentchat is not installed, harness raises AdapterNotFoundError.""" + import sys + + backup = sys.modules.get("autogen_agentchat") + sys.modules["autogen_agentchat"] = None # type: ignore[assignment] + + try: + import importlib + + import aastf.harness.adapters.autogen as autogen_mod + + importlib.reload(autogen_mod) + + if not autogen_mod.HAS_AUTOGEN: + from aastf.exceptions import AdapterNotFoundError + from aastf.sandbox.server import SandboxServer + + with pytest.raises(AdapterNotFoundError, match="autogen-agentchat"): + autogen_mod.AutoGenHarness(lambda tools: None, SandboxServer()) + finally: + if backup is not None: + sys.modules["autogen_agentchat"] = backup + elif "autogen_agentchat" in sys.modules: + del sys.modules["autogen_agentchat"] + + def test_harness_module_importable(self): + """Module should import without error regardless of autogen-agentchat presence.""" + from aastf.harness.adapters import autogen as autogen_mod # noqa: F401 + + assert hasattr(autogen_mod, "AutoGenHarness") + assert hasattr(autogen_mod, "HAS_AUTOGEN") + + +class TestAutoGenHarnessStructure: + """Test the harness structure using a mock agent (no real autogen-agentchat needed).""" + + def _make_harness(self, agent_factory=lambda t: None): + import sys + import types + + mock_mod = types.ModuleType("autogen_agentchat") + sys.modules["autogen_agentchat"] = mock_mod + + try: + import importlib + + import aastf.harness.adapters.autogen as autogen_mod + + importlib.reload(autogen_mod) + autogen_mod.HAS_AUTOGEN = True + + from aastf.sandbox.server import SandboxServer + + return autogen_mod.AutoGenHarness(agent_factory, SandboxServer()) + finally: + if "autogen_agentchat" in sys.modules: + del sys.modules["autogen_agentchat"] + + @staticmethod + def _scenario(inject_into): + from aastf.models.scenario import ( + ASICategory, + AttackScenario, + DetectionCriteria, + Severity, + ) + + return AttackScenario( + id="ASI01-001", + name="T", + category=ASICategory.ASI01, + severity=Severity.HIGH, + description="d", + attack_vector="v", + inject_into=inject_into, + payload="INJECTED PAYLOAD", + detection=DetectionCriteria(), + expected_behavior="safe", + remediation="fix", + ) + + def test_build_input_user_message(self): + """_build_input returns the payload for USER_MESSAGE injection.""" + from aastf.models.scenario import InjectionPoint + + harness = self._make_harness() + scenario = self._scenario(InjectionPoint.USER_MESSAGE) + assert harness._build_input(scenario) == "INJECTED PAYLOAD" + + def test_build_input_system_prompt(self): + """_build_input wraps the payload with [SYSTEM]/[USER] markers for SYSTEM_PROMPT injection.""" + from aastf.models.scenario import InjectionPoint + + harness = self._make_harness() + scenario = self._scenario(InjectionPoint.SYSTEM_PROMPT) + result = harness._build_input(scenario) + assert "[SYSTEM] INJECTED PAYLOAD" in result + assert "[USER]" in result + + def test_extract_final_output_from_messages(self): + """_extract_final_output returns the content of the last message.""" + import types + + harness = self._make_harness() + result = types.SimpleNamespace( + messages=[ + types.SimpleNamespace(source="user", content="hello"), + types.SimpleNamespace(source="assistant", content="final answer"), + ] + ) + assert harness._extract_final_output(result) == "final answer" + + def test_extract_final_output_no_messages(self): + """_extract_final_output falls back to str(result) when there are no messages.""" + import types + + harness = self._make_harness() + result = types.SimpleNamespace(messages=[]) + assert harness._extract_final_output(result) == str(result) + + @pytest.mark.asyncio + async def test_run_scenario_with_malformed_factory(self): + """A factory returning an object without run() is reported as an error, not silently SAFE.""" + from aastf.models.scenario import InjectionPoint + + harness = self._make_harness(agent_factory=lambda tools: object()) + scenario = self._scenario(InjectionPoint.USER_MESSAGE) + + trace = await harness.run_scenario(scenario) + assert trace.error is not None + assert "run()" in trace.error