diff --git a/cyberai/agents/web3/__init__.py b/cyberai/agents/web3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cyberai/agents/web3/agent.py b/cyberai/agents/web3/agent.py new file mode 100644 index 0000000..f70c87d --- /dev/null +++ b/cyberai/agents/web3/agent.py @@ -0,0 +1,99 @@ +"""SmartContractAgent — Solidity static analysis & severity triage (day 24). + +Standalone agent (not in the recon→intel→exploit→report network pipeline): +takes a contract address or local .sol path, runs static analysis, and triages +findings against Immunefi severity. Etherscan fetch is optional; local source +is the primary path. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, Optional + +from rich.console import Console + +from cyberai.core.base_agent import BaseAgent, Tool + +from .etherscan import EtherscanClient +from .immunefi_severity import classify_all, highest_tier +from .slither_tool import SlitherTool + +console = Console() + + +class SmartContractAgent(BaseAgent): + """Web3 agent — static analysis of Solidity contracts.""" + + AGENT_NAME = "web3" + ROLE = "Smart Contract Auditor" + + def _register_tools(self) -> None: + self.register_tool( + Tool( + name="fetch_source", + description="Fetch verified contract source from Etherscan", + func=self._fetch_source, + parameters={"address": "str"}, + ) + ) + self.register_tool( + Tool( + name="slither_scan", + description="Static-analyze a Solidity file with slither", + func=self._slither_scan, + parameters={"path": "str"}, + ) + ) + + def _fetch_source(self, address: str) -> Dict[str, Any]: + client = EtherscanClient() + src = client.get_source(address) + if src is None: + return {"verified": False, "source_code": ""} + return { + "address": src.address, + "name": src.name, + "verified": src.verified, + "compiler_version": src.compiler_version, + "source_len": len(src.source_code), + } + + def _slither_scan(self, path: str) -> Dict[str, Any]: + tool = SlitherTool() + findings = tool.analyze(path) + classified = classify_all(findings) + return { + "available": tool.available, + "findings": classified, + "highest_severity": highest_tier(findings), + "count": len(classified), + } + + def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Analyze a contract. + + `target` is either a local .sol path or a contract address. Slither + wiring + severity arrive in later commits; this skeleton resolves the + source and records intent. + """ + self._log(f"Smart-contract analysis target: {target}") + is_local = Path(target).exists() and target.endswith(".sol") + result: Dict[str, Any] = { + "target": target, + "mode": "local" if is_local else "address", + "findings": [], + } + if is_local: + scan = self.call_tool("slither_scan", path=target) + result["findings"] = scan["findings"] + result["highest_severity"] = scan["highest_severity"] + result["slither_available"] = scan["available"] + else: + result["source_meta"] = self.call_tool("fetch_source", address=target) + self.kb.set("web3", result, agent=self.AGENT_NAME) + self._log( + "Smart-contract analysis complete", + {"mode": result["mode"], "findings": len(result["findings"])}, + ) + return result diff --git a/cyberai/agents/web3/etherscan.py b/cyberai/agents/web3/etherscan.py new file mode 100644 index 0000000..98955da --- /dev/null +++ b/cyberai/agents/web3/etherscan.py @@ -0,0 +1,86 @@ +"""Etherscan API client — fetch verified source, ABI, status (day 24). + +Degrades gracefully when no API key is set (available=False), so the agent +can still analyze local .sol files without an internet/Etherscan dependency. +""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +import httpx + +logger = logging.getLogger("cyberai.web3.etherscan") + +DEFAULT_API = "https://api.etherscan.io/api" + + +@dataclass +class ContractSource: + """Verified contract metadata from Etherscan.""" + + address: str + name: str = "" + source_code: str = "" + abi: str = "" + compiler_version: str = "" + verified: bool = False + raw: Dict[str, Any] = field(default_factory=dict) + + +class EtherscanClient: + """Minimal Etherscan client for source/ABI retrieval.""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = DEFAULT_API, + timeout: int = 15, + ): + self.api_key = api_key or os.getenv("ETHERSCAN_API_KEY", "") + self.base_url = base_url + self.timeout = timeout + + @property + def available(self) -> bool: + return bool(self.api_key) + + def get_source(self, address: str) -> Optional[ContractSource]: + """Fetch verified source for a contract address. None if unavailable.""" + if not self.available: + logger.warning("no ETHERSCAN_API_KEY — skipping remote source fetch") + return None + try: + with httpx.Client(timeout=self.timeout) as client: + r = client.get( + self.base_url, + params={ + "module": "contract", + "action": "getsourcecode", + "address": address, + "apikey": self.api_key, + }, + ) + r.raise_for_status() + body = r.json() + except Exception as exc: # noqa: BLE001 — never hard-fail + logger.warning("etherscan fetch failed: %s", exc) + return None + + results: List[Dict[str, Any]] = body.get("result", []) or [] + if not results or not isinstance(results, list): + return None + item = results[0] + source = item.get("SourceCode", "") or "" + return ContractSource( + address=address, + name=item.get("ContractName", ""), + source_code=source, + abi=item.get("ABI", ""), + compiler_version=item.get("CompilerVersion", ""), + verified=bool(source) and item.get("ABI") != "Contract source code not verified", + raw=item, + ) diff --git a/cyberai/agents/web3/immunefi_severity.py b/cyberai/agents/web3/immunefi_severity.py new file mode 100644 index 0000000..e1694a1 --- /dev/null +++ b/cyberai/agents/web3/immunefi_severity.py @@ -0,0 +1,108 @@ +"""Immunefi severity classification for slither findings (day 24). + +Maps slither detector checks to Immunefi's severity tiers +(Critical / High / Medium / Low / Insight) following their bug-bounty +severity methodology for smart contracts. A per-check table gives precise +classification; an impact/confidence fallback covers unknown detectors. + +Immunefi smart-contract impact reference (paraphrased): + Critical — direct theft/loss/freezing of funds, contract takeover. + High — theft of unclaimed yield, temporary freezing, griefing with cost. + Medium — contract fails to deliver promised value (no fund loss). + Low — minor/contained issues, best-practice deviations. + Insight — informational, no security impact. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, List + +if TYPE_CHECKING: + from .slither_tool import SlitherFinding + +# Immunefi tiers, ordered high→low for ranking. +IMMUNEFI_TIERS = ["Critical", "High", "Medium", "Low", "Insight"] +_TIER_RANK = {t: i for i, t in enumerate(IMMUNEFI_TIERS)} + +# Precise per-check mapping for high-signal slither detectors. +CHECK_TO_IMMUNEFI = { + # direct fund loss / takeover -> Critical + "reentrancy-eth": "Critical", + "arbitrary-send-eth": "Critical", + "arbitrary-send-erc20": "Critical", + "suicidal": "Critical", + "controlled-delegatecall": "Critical", + "unprotected-upgrade": "Critical", + "delegatecall-loop": "Critical", + # exploitable but conditional / no direct theft -> High + "reentrancy-no-eth": "High", + "tx-origin": "High", + "weak-prng": "High", + "incorrect-equality": "High", + "unchecked-transfer": "High", + "controlled-array-length": "High", + # logic/contained -> Medium + "uninitialized-state": "Medium", + "uninitialized-storage": "Medium", + "divide-before-multiply": "Medium", + "reentrancy-benign": "Medium", + "timestamp": "Medium", + "unchecked-lowlevel": "Medium", + "unchecked-send": "Medium", + # best-practice / contained -> Low + "low-level-calls": "Low", + "missing-zero-check": "Low", + "calls-loop": "Low", + "reentrancy-events": "Low", + # informational -> Insight + "solc-version": "Insight", + "pragma": "Insight", + "naming-convention": "Insight", + "dead-code": "Insight", + "assembly": "Insight", + "external-function": "Insight", +} + +# Fallback: slither impact + confidence -> Immunefi tier. +_IMPACT_FALLBACK = { + ("High", "High"): "Critical", + ("High", "Medium"): "High", + ("High", "Low"): "High", + ("Medium", "High"): "High", + ("Medium", "Medium"): "Medium", + ("Medium", "Low"): "Medium", + ("Low", "High"): "Low", + ("Low", "Medium"): "Low", + ("Low", "Low"): "Low", + ("Informational", "High"): "Insight", + ("Informational", "Medium"): "Insight", + ("Informational", "Low"): "Insight", +} + + +def classify(finding: "SlitherFinding") -> str: + """Return the Immunefi tier for a single slither finding.""" + if finding.check in CHECK_TO_IMMUNEFI: + return CHECK_TO_IMMUNEFI[finding.check] + return _IMPACT_FALLBACK.get((finding.impact, finding.confidence), "Insight") + + +def classify_all(findings: List["SlitherFinding"]) -> List[dict]: + """Classify findings, attaching an `immunefi_severity` field, sorted high→low.""" + rows = [] + for f in findings: + d = f.to_dict() + d["immunefi_severity"] = classify(f) + rows.append(d) + rows.sort(key=lambda r: _TIER_RANK.get(r["immunefi_severity"], 99)) + return rows + + +def highest_tier(findings: List["SlitherFinding"]) -> str: + """Return the most severe Immunefi tier across findings (Insight if none).""" + if not findings: + return "Insight" + return min( + (classify(f) for f in findings), + key=lambda t: _TIER_RANK.get(t, 99), + ) diff --git a/cyberai/agents/web3/slither_tool.py b/cyberai/agents/web3/slither_tool.py new file mode 100644 index 0000000..72d1559 --- /dev/null +++ b/cyberai/agents/web3/slither_tool.py @@ -0,0 +1,133 @@ +"""Slither static-analysis wrapper for Solidity contracts (day 24). + +Runs `slither --json -` and parses results.detectors into structured +findings. Degrades gracefully when the binary is absent. Slither resolves the +compiler via solc-select automatically. + +Real slither 0.11.5 JSON shape: + {"success": bool, "error": ..., "results": {"detectors": [ + {"check","impact","confidence","description","markdown","id","elements"} + ]}} +""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("cyberai.web3.slither") + +_FALLBACK_PATHS = [ + os.path.expanduser("~/.local/bin/slither"), + "/usr/local/bin/slither", + "/usr/bin/slither", +] + +DEFAULT_TIMEOUT = 180 + + +def find_slither() -> Optional[str]: + """Locate the slither binary: env, PATH, then known fallback dirs.""" + env = os.getenv("SLITHER_PATH") + if env and os.path.exists(env): + return env + found = shutil.which("slither") + if found: + return found + for p in _FALLBACK_PATHS: + if os.path.exists(p): + return p + return None + + +@dataclass +class SlitherFinding: + """One slither detector result.""" + + check: str + impact: str + confidence: str + description: str + detector_id: str = "" + raw: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "check": self.check, + "impact": self.impact, + "confidence": self.confidence, + "description": self.description.strip(), + "detector_id": self.detector_id, + } + + +def parse_slither_json(output: str) -> List[SlitherFinding]: + """Parse `slither --json -` output into findings.""" + output = output.strip() + if not output: + return [] + try: + data = json.loads(output) + except json.JSONDecodeError: + return [] + detectors = (data.get("results") or {}).get("detectors", []) or [] + findings = [] + for d in detectors: + findings.append( + SlitherFinding( + check=d.get("check", ""), + impact=d.get("impact", "Informational"), + confidence=d.get("confidence", "Low"), + description=d.get("description", ""), + detector_id=d.get("id", ""), + raw=d, + ) + ) + return findings + + +class SlitherTool: + """Runs slither against a Solidity source file or directory.""" + + def __init__( + self, + slither_path: Optional[str] = None, + timeout: int = DEFAULT_TIMEOUT, + ): + self.slither_path = slither_path or find_slither() + self.timeout = timeout + + @property + def available(self) -> bool: + return bool(self.slither_path and os.path.exists(self.slither_path)) + + def analyze(self, target: str) -> List[SlitherFinding]: + """Run slither on a .sol file/dir. [] when unavailable or on failure. + + slither writes JSON to stdout with `--json -`; it exits non-zero when + findings exist, so the return code is not treated as failure. + """ + if not self.available: + logger.warning("slither not found — skipping static analysis") + return [] + cmd = [self.slither_path or "slither", target, "--json", "-"] + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout, + ) + except subprocess.TimeoutExpired: + logger.warning("slither timed out after %ss", self.timeout) + return [] + except Exception as exc: # noqa: BLE001 — never hard-fail + logger.warning("slither execution failed: %s", exc) + return [] + # slither prints JSON to stdout even when detectors are found (rc!=0). + return parse_slither_json(proc.stdout) diff --git a/tests/fixtures/dao_reentrant.sol b/tests/fixtures/dao_reentrant.sol new file mode 100644 index 0000000..0c930d4 --- /dev/null +++ b/tests/fixtures/dao_reentrant.sol @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: MIT +pragma solidity ^0.8.0; + +// TheDAO-style reentrancy: external call before state update. +contract DAOReentrant { + mapping(address => uint256) public balances; + + function deposit() external payable { + balances[msg.sender] += msg.value; + } + + // Vulnerable: sends ETH before zeroing the balance, enabling reentrancy. + function withdraw() external { + uint256 amount = balances[msg.sender]; + require(amount > 0, "no balance"); + (bool ok, ) = msg.sender.call{value: amount}(""); + require(ok, "transfer failed"); + balances[msg.sender] = 0; + } + + function balanceOf(address who) external view returns (uint256) { + return balances[who]; + } +} diff --git a/tests/integration/test_web3.py b/tests/integration/test_web3.py new file mode 100644 index 0000000..c394e89 --- /dev/null +++ b/tests/integration/test_web3.py @@ -0,0 +1,99 @@ +"""Day 24 — SmartContractAgent e2e: reentrant fixture -> slither -> Critical.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from cyberai.agents.web3.agent import SmartContractAgent +from cyberai.agents.web3.immunefi_severity import classify_all, highest_tier +from cyberai.agents.web3.slither_tool import SlitherTool, parse_slither_json + +FIXTURE = Path(__file__).parent.parent / "fixtures" / "dao_reentrant.sol" + +# Slither requires the binary + solc; skip the live runs when unavailable. +_slither_available = SlitherTool().available +requires_slither = pytest.mark.skipif(not _slither_available, reason="slither binary not installed") + + +def _agent() -> SmartContractAgent: + agent = SmartContractAgent.__new__(SmartContractAgent) + agent.AGENT_NAME = "web3" + agent._log = MagicMock() + agent.kb = MagicMock() + agent.audit = MagicMock() + agent.tools = {} + agent._register_tools() + return agent + + +def test_fixture_exists(): + assert FIXTURE.exists(), "reentrant fixture missing" + assert "withdraw" in FIXTURE.read_text() + + +# ── mocked pipeline (always runs, no binary needed) ─────────────────── + + +def test_classification_pipeline_mocked(): + """slither JSON (canned) -> classify -> Critical reentrancy.""" + canned = ( + '{"success":true,"error":null,"results":{"detectors":[' + '{"check":"reentrancy-eth","impact":"High","confidence":"Medium",' + '"description":"Reentrancy in DAOReentrant.withdraw()","id":"r1"},' + '{"check":"low-level-calls","impact":"Informational","confidence":"High",' + '"description":"Low level call","id":"l1"},' + '{"check":"solc-version","impact":"Informational","confidence":"High",' + '"description":"solc","id":"s1"}' + "]}}" + ) + findings = parse_slither_json(canned) + assert len(findings) == 3 + rows = classify_all(findings) + assert rows[0]["immunefi_severity"] == "Critical" + assert rows[0]["check"] == "reentrancy-eth" + assert highest_tier(findings) == "Critical" + + +def test_agent_run_mocked(monkeypatch): + """SmartContractAgent.run on a local .sol with slither mocked.""" + import cyberai.agents.web3.agent as ag + + fake_tool = MagicMock() + fake_tool.available = True + fake_tool.analyze.return_value = parse_slither_json( + '{"results":{"detectors":[{"check":"reentrancy-eth","impact":"High",' + '"confidence":"Medium","description":"Reentrancy","id":"r1"}]}}' + ) + monkeypatch.setattr(ag, "SlitherTool", lambda *a, **k: fake_tool) + + agent = _agent() + res = agent.run(str(FIXTURE)) + assert res["mode"] == "local" + assert res["highest_severity"] == "Critical" + assert res["findings"][0]["check"] == "reentrancy-eth" + + +# ── live slither run (skipped if binary absent) ─────────────────────── + + +@requires_slither +def test_live_slither_finds_reentrancy(): + """Real slither against the fixture must flag reentrancy as Critical.""" + tool = SlitherTool() + findings = tool.analyze(str(FIXTURE)) + checks = [f.check for f in findings] + assert any("reentrancy" in c for c in checks), f"no reentrancy in {checks}" + assert highest_tier(findings) == "Critical" + + +@requires_slither +def test_live_agent_run(): + """Full agent.run against the fixture with the real slither binary.""" + agent = _agent() + res = agent.run(str(FIXTURE)) + assert res["mode"] == "local" + assert res["slither_available"] is True + assert res["highest_severity"] == "Critical"