Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pip install "aastf[openai-agents]"
# PydanticAI support
pip install "aastf[pydantic-ai]"

# AutoGen support
pip install "aastf[autogen]"

# All adapters
pip install "aastf[all]"
```
Expand Down Expand Up @@ -57,6 +60,7 @@ Replace `myapp.agent:create_agent` with the dotted path to your agent factory fu
| CrewAI | CrewAI | `--adapter crewai` |
| OpenAI Agents | OpenAI Agents SDK | `--adapter openai_agents` |
| PydanticAI | PydanticAI | `--adapter pydantic_ai` |
| AutoGen | AutoGen (AG2) | `--adapter autogen` |

## Understanding Results

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ crewai = ["crewai>=0.28"]
pydantic-ai = ["pydantic-ai>=0.0.13"]
google-adk = ["google-adk>=0.1"]
ms-agent = ["semantic-kernel>=1.0"]
autogen = ["autogen-agentchat>=0.4"]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
Expand All @@ -54,7 +55,7 @@ dev = [
"mypy>=1.10",
"types-PyYAML>=6.0",
]
all = ["aastf[langgraph,openai-agents,crewai,pydantic-ai,google-adk,ms-agent]"]
all = ["aastf[langgraph,openai-agents,crewai,pydantic-ai,google-adk,ms-agent,autogen]"]

[project.scripts]
aastf = "aastf.cli.app:main"
Expand Down
156 changes: 156 additions & 0 deletions src/aastf/harness/adapters/autogen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
AutoGen (AG2 / autogen-agentchat) adapter — instruments an AutoGen agent for AASTF scenario runs.

AutoGen agents (e.g. AssistantAgent) expose ``run(task=...)`` as an async,
non-blocking entry point and accept tools as plain callables registered at
construction time. As with the PydanticAI and generic adapters, tools are
wrapped with the ``@instrument`` decorator before being handed to the agent
factory, so tool calls are captured into the active TraceCollector via
contextvars without needing to monkeypatch AutoGen internals.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING, Any

import anyio

from ...exceptions import AdapterNotFoundError
from ...models.scenario import AttackScenario, InjectionPoint
from ...models.trace import AgentTrace
from ..collector import TraceCollector
from .generic import instrument, set_collector

if TYPE_CHECKING:
from ...sandbox.server import SandboxServer

try:
import autogen_agentchat # noqa: F401

HAS_AUTOGEN = True
except ImportError:
HAS_AUTOGEN = False


class AutoGenHarness:
"""
Harness for AutoGen (AG2 / autogen-agentchat) agents.

agent_factory: callable(tools: list) -> autogen_agentchat agent instance
The tools list contains @instrument-decorated async callables wired to
the sandbox. The returned agent must expose an async ``run(task=...)``
method that returns a result with a ``.messages`` sequence.
"""

def __init__(
self,
agent_factory: Callable[..., Any],
sandbox: SandboxServer,
timeout: float = 30.0,
) -> None:
if not HAS_AUTOGEN:
raise AdapterNotFoundError(
"autogen-agentchat is required. Install with: pip install 'aastf[autogen]'"
)
self._factory = agent_factory
self._sandbox = sandbox
self._timeout = timeout

async def run_scenario(self, scenario: AttackScenario) -> AgentTrace:
self._sandbox.configure_for_scenario(scenario)
collector = TraceCollector(scenario_id=scenario.id, adapter="autogen")

tools = self._create_instrumented_tools(scenario)
agent = self._factory(tools)

if agent is None or not callable(getattr(agent, "run", None)):
collector.set_error(
"autogen agent_factory must return an agent with a callable run(); "
f"got {type(agent).__name__}"
)
return collector.build_trace()

task = self._build_input(scenario)

set_collector(collector)
try:
with anyio.move_on_after(self._timeout) as _timeout_scope:
result = await agent.run(task=task)
collector.set_final_output(self._extract_final_output(result))
self._record_run_structure(result, collector)
if _timeout_scope.cancelled_caught:
collector.set_error(
f"Agent execution exceeded the {self._timeout:.0f}s timeout"
)
except Exception as e:
collector.set_error(str(e))
finally:
set_collector(None)

trace = collector.build_trace()
trace.metadata["tool_call_count"] = len(trace.tool_invocations)
return trace

@staticmethod
def _extract_final_output(result: Any) -> str:
"""The last message's text content is the agent's final output."""
messages = getattr(result, "messages", None) or []
if messages:
last = messages[-1]
content = getattr(last, "content", last)
return str(content)
return str(result)

@staticmethod
def _record_run_structure(result: Any, collector: TraceCollector) -> None:
"""
Derive iteration_count from the TaskResult message list. Each message
produced by the assistant (TextMessage / ToolCallSummaryMessage, etc.)
represents one planning/response turn.
"""
messages = getattr(result, "messages", None) or []
iterations = 0
for m in messages:
source = getattr(m, "source", None)
if source and source != "user":
iterations += 1
for _ in range(max(iterations, 1) if messages else 1):
collector.increment_iteration()

def _create_instrumented_tools(self, scenario: AttackScenario) -> list:
"""Create @instrument-decorated async callables for AutoGen tools."""
import httpx

from ...sandbox.tools import validate_tool_name

tools: list = []
sandbox_url = self._sandbox.base_url

for name in scenario.available_tools:
validate_tool_name(name)

async def _tool(tool_name: str = name, **kwargs: Any) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{sandbox_url}/tools/{tool_name}",
json=kwargs,
timeout=10.0,
)
try:
return resp.json()
except Exception:
return {"raw": resp.text}

_tool.__name__ = name
_tool.__doc__ = f"Sandbox-wired tool: {name}"
tools.append(instrument(_tool, name=name))

return tools

def _build_input(self, scenario: AttackScenario) -> str:
if scenario.inject_into == InjectionPoint.USER_MESSAGE:
return scenario.payload
if scenario.inject_into == InjectionPoint.SYSTEM_PROMPT:
return f"[SYSTEM] {scenario.payload}\n[USER] Proceed with the task."
return "Research AI safety topics and provide a summary."
6 changes: 5 additions & 1 deletion src/aastf/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,14 @@ def _build_harness(self, sandbox: SandboxServer): # type: ignore[return]
from .harness.adapters.ms_agent import MSAgentHarness

return MSAgentHarness(factory, sandbox, timeout=self._config.timeout_seconds)
elif adapter == "autogen":
from .harness.adapters.autogen import AutoGenHarness

return AutoGenHarness(factory, sandbox, timeout=self._config.timeout_seconds)
raise AdapterNotFoundError(
f"Unknown adapter: {adapter!r}. "
"Supported: langgraph, crewai, openai_agents, pydantic_ai, generic, "
"mcp, google_adk, ms_agent"
"mcp, google_adk, ms_agent, autogen"
)

async def _run_one(self, harness: Any, scenario: AttackScenario) -> TestResult:
Expand Down
140 changes: 140 additions & 0 deletions tests/unit/test_autogen_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Unit tests for AutoGen adapter."""

from __future__ import annotations

import pytest


class TestAutoGenHarnessImport:
def test_raises_adapter_not_found_when_autogen_missing(self):
"""If autogen-agentchat is not installed, harness raises AdapterNotFoundError."""
import sys

backup = sys.modules.get("autogen_agentchat")
sys.modules["autogen_agentchat"] = None # type: ignore[assignment]

try:
import importlib

import aastf.harness.adapters.autogen as autogen_mod

importlib.reload(autogen_mod)

if not autogen_mod.HAS_AUTOGEN:
from aastf.exceptions import AdapterNotFoundError
from aastf.sandbox.server import SandboxServer

with pytest.raises(AdapterNotFoundError, match="autogen-agentchat"):
autogen_mod.AutoGenHarness(lambda tools: None, SandboxServer())
finally:
if backup is not None:
sys.modules["autogen_agentchat"] = backup
elif "autogen_agentchat" in sys.modules:
del sys.modules["autogen_agentchat"]

def test_harness_module_importable(self):
"""Module should import without error regardless of autogen-agentchat presence."""
from aastf.harness.adapters import autogen as autogen_mod # noqa: F401

assert hasattr(autogen_mod, "AutoGenHarness")
assert hasattr(autogen_mod, "HAS_AUTOGEN")


class TestAutoGenHarnessStructure:
"""Test the harness structure using a mock agent (no real autogen-agentchat needed)."""

def _make_harness(self, agent_factory=lambda t: None):
import sys
import types

mock_mod = types.ModuleType("autogen_agentchat")
sys.modules["autogen_agentchat"] = mock_mod

try:
import importlib

import aastf.harness.adapters.autogen as autogen_mod

importlib.reload(autogen_mod)
autogen_mod.HAS_AUTOGEN = True

from aastf.sandbox.server import SandboxServer

return autogen_mod.AutoGenHarness(agent_factory, SandboxServer())
finally:
if "autogen_agentchat" in sys.modules:
del sys.modules["autogen_agentchat"]

@staticmethod
def _scenario(inject_into):
from aastf.models.scenario import (
ASICategory,
AttackScenario,
DetectionCriteria,
Severity,
)

return AttackScenario(
id="ASI01-001",
name="T",
category=ASICategory.ASI01,
severity=Severity.HIGH,
description="d",
attack_vector="v",
inject_into=inject_into,
payload="INJECTED PAYLOAD",
detection=DetectionCriteria(),
expected_behavior="safe",
remediation="fix",
)

def test_build_input_user_message(self):
"""_build_input returns the payload for USER_MESSAGE injection."""
from aastf.models.scenario import InjectionPoint

harness = self._make_harness()
scenario = self._scenario(InjectionPoint.USER_MESSAGE)
assert harness._build_input(scenario) == "INJECTED PAYLOAD"

def test_build_input_system_prompt(self):
"""_build_input wraps the payload with [SYSTEM]/[USER] markers for SYSTEM_PROMPT injection."""
from aastf.models.scenario import InjectionPoint

harness = self._make_harness()
scenario = self._scenario(InjectionPoint.SYSTEM_PROMPT)
result = harness._build_input(scenario)
assert "[SYSTEM] INJECTED PAYLOAD" in result
assert "[USER]" in result

def test_extract_final_output_from_messages(self):
"""_extract_final_output returns the content of the last message."""
import types

harness = self._make_harness()
result = types.SimpleNamespace(
messages=[
types.SimpleNamespace(source="user", content="hello"),
types.SimpleNamespace(source="assistant", content="final answer"),
]
)
assert harness._extract_final_output(result) == "final answer"

def test_extract_final_output_no_messages(self):
"""_extract_final_output falls back to str(result) when there are no messages."""
import types

harness = self._make_harness()
result = types.SimpleNamespace(messages=[])
assert harness._extract_final_output(result) == str(result)

@pytest.mark.asyncio
async def test_run_scenario_with_malformed_factory(self):
"""A factory returning an object without run() is reported as an error, not silently SAFE."""
from aastf.models.scenario import InjectionPoint

harness = self._make_harness(agent_factory=lambda tools: object())
scenario = self._scenario(InjectionPoint.USER_MESSAGE)

trace = await harness.run_scenario(scenario)
assert trace.error is not None
assert "run()" in trace.error
Loading