diff --git a/README.md b/README.md index 72b83b3..a8f9e9b 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,57 @@ def login_example() -> None: raise RuntimeError("login failed") ``` +## Pre-action authority hook (production pattern) + +If you want every action proposal to be authorized before execution, pass a +`pre_action_authorizer` into `RuntimeAgent`. + +This hook receives a shared `predicate-contracts` `ActionRequest` generated from +runtime state (`snapshot` + assertion evidence) and must return either: + +- `True` / `False`, or +- an object with an `allowed: bool` field (for richer decision payloads). + +```python +from predicate.agent_runtime import AgentRuntime +from predicate.runtime_agent import RuntimeAgent, RuntimeStep + +# Optional: your authority client can be local guard, sidecar client, or remote API client. +def pre_action_authorizer(action_request): + # Example: call your authority service + # resp = authority_client.authorize(action_request) + # return resp + return True + + +runtime = AgentRuntime(backend=backend, tracer=tracer) +agent = RuntimeAgent( + runtime=runtime, + executor=executor, + pre_action_authorizer=pre_action_authorizer, + authority_principal_id="agent:web-checkout", + authority_tenant_id="tenant-a", + authority_session_id="session-123", + authority_fail_closed=True, # deny/authorizer errors block action execution +) + +ok = await agent.run_step( + task_goal="Complete checkout", + step=RuntimeStep(goal="Click submit order"), +) +``` + +Fail-open option (not recommended for sensitive actions): + +```python +agent = RuntimeAgent( + runtime=runtime, + executor=executor, + pre_action_authorizer=pre_action_authorizer, + authority_fail_closed=False, # authorizer errors allow action to proceed +) +``` + ## Capabilities (lifecycle guarantees) ### Controlled perception diff --git a/predicate/agent_runtime.py b/predicate/agent_runtime.py index 47194ca..d4ad2d4 100644 --- a/predicate/agent_runtime.py +++ b/predicate/agent_runtime.py @@ -98,6 +98,7 @@ from .backends.protocol import BrowserBackend from .browser import AsyncSentienceBrowser from .tracing import Tracer + from predicate_contracts import ActionRequest class AgentRuntime: @@ -980,6 +981,39 @@ def _compute_snapshot_digest(self, snap: Snapshot | None) -> str | None: except Exception: return None + def build_authority_action_request( + self, + *, + principal_id: str, + action: str, + resource: str, + intent: str, + tenant_id: str | None = None, + session_id: str | None = None, + state_source: str = "sdk-python", + ) -> ActionRequest: + """ + Build a predicate-contracts ActionRequest from current runtime state. + + This boundary helper keeps sdk-python internals decoupled from authority + enforcement internals by exporting only shared contract types. + """ + from .integrations.authority import ( + AuthorityActionInput, + build_action_request_from_runtime, + ) + + action_input = AuthorityActionInput( + principal_id=principal_id, + action=action, + resource=resource, + intent=intent, + tenant_id=tenant_id, + session_id=session_id, + state_source=state_source, + ) + return build_action_request_from_runtime(runtime=self, action_input=action_input) + async def emit_step_end( self, *, diff --git a/predicate/integrations/authority/__init__.py b/predicate/integrations/authority/__init__.py new file mode 100644 index 0000000..dce8b64 --- /dev/null +++ b/predicate/integrations/authority/__init__.py @@ -0,0 +1,13 @@ +from predicate.integrations.authority.contracts_adapter import ( + AuthorityActionInput, + build_action_request_from_runtime, + state_evidence_from_runtime, + to_verification_evidence, +) + +__all__ = [ + "AuthorityActionInput", + "build_action_request_from_runtime", + "state_evidence_from_runtime", + "to_verification_evidence", +] diff --git a/predicate/integrations/authority/contracts_adapter.py b/predicate/integrations/authority/contracts_adapter.py new file mode 100644 index 0000000..be7c63d --- /dev/null +++ b/predicate/integrations/authority/contracts_adapter.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass +from typing import Any, Mapping, Sequence + +# pylint: disable=import-error + +from predicate_contracts import ( + ActionRequest, + ActionSpec, + PrincipalRef, + StateEvidence, + VerificationEvidence, + VerificationSignal, + VerificationStatus, +) + + +@dataclass(frozen=True) +class AuthorityActionInput: + principal_id: str + action: str + resource: str + intent: str + tenant_id: str | None = None + session_id: str | None = None + state_source: str = "sdk-python" + + +def to_verification_evidence(assertions: Sequence[Mapping[str, Any]]) -> VerificationEvidence: + signals: list[VerificationSignal] = [] + for assertion in assertions: + label = str(assertion.get("label", "")).strip() + if label == "": + continue + passed = bool(assertion.get("passed", False)) + required = bool(assertion.get("required", False)) + reason_raw = assertion.get("reason") + reason = str(reason_raw) if isinstance(reason_raw, str) and reason_raw != "" else None + signals.append( + VerificationSignal( + label=label, + status=VerificationStatus.PASSED if passed else VerificationStatus.FAILED, + required=required, + reason=reason, + ) + ) + return VerificationEvidence(signals=tuple(signals)) + + +def state_evidence_from_runtime(runtime: Any, source: str = "sdk-python") -> StateEvidence: + snapshot = getattr(runtime, "last_snapshot", None) + step_id = getattr(runtime, "step_id", None) + state_hash = _snapshot_state_hash(snapshot=snapshot, step_id=step_id) + return StateEvidence(source=source, state_hash=state_hash) + + +def build_action_request_from_runtime(runtime: Any, action_input: AuthorityActionInput) -> ActionRequest: + assertions_payload = runtime.get_assertions_for_step_end() + assertions = assertions_payload.get("assertions", []) + verification_evidence = to_verification_evidence(assertions) + state_evidence = state_evidence_from_runtime(runtime=runtime, source=action_input.state_source) + return ActionRequest( + principal=PrincipalRef( + principal_id=action_input.principal_id, + tenant_id=action_input.tenant_id, + session_id=action_input.session_id, + ), + action_spec=ActionSpec( + action=action_input.action, + resource=action_input.resource, + intent=action_input.intent, + ), + state_evidence=state_evidence, + verification_evidence=verification_evidence, + ) + + +def _snapshot_state_hash(snapshot: Any, step_id: str | None) -> str: + url = str(getattr(snapshot, "url", "") or "") + timestamp = str(getattr(snapshot, "timestamp", "") or "") + if url != "" or timestamp != "": + digest = hashlib.sha256(f"{url}{timestamp}".encode("utf-8")).hexdigest() + return "sha256:" + digest + fallback_material = step_id or "missing_snapshot" + fallback_digest = hashlib.sha256(fallback_material.encode("utf-8")).hexdigest() + return "sha256:" + fallback_digest diff --git a/predicate/runtime_agent.py b/predicate/runtime_agent.py index f919a67..ddfa46c 100644 --- a/predicate/runtime_agent.py +++ b/predicate/runtime_agent.py @@ -62,6 +62,12 @@ class ActOnceResult: used_vision: bool +@dataclass(frozen=True) +class PreActionAuthorityDecision: + allowed: bool + reason: str | None = None + + class RuntimeAgent: """ A thin orchestration layer over AgentRuntime: @@ -79,12 +85,22 @@ def __init__( vision_executor: LLMProvider | None = None, vision_verifier: LLMProvider | None = None, short_circuit_canvas: bool = True, + pre_action_authorizer: Callable[[Any], Any] | None = None, + authority_principal_id: str | None = None, + authority_tenant_id: str | None = None, + authority_session_id: str | None = None, + authority_fail_closed: bool = True, ) -> None: self.runtime = runtime self.executor = executor self.vision_executor = vision_executor self.vision_verifier = vision_verifier self.short_circuit_canvas = short_circuit_canvas + self.pre_action_authorizer = pre_action_authorizer + self.authority_principal_id = authority_principal_id + self.authority_tenant_id = authority_tenant_id + self.authority_session_id = authority_session_id + self.authority_fail_closed = authority_fail_closed self._structured_llm = LLMInteractionHandler(executor) @@ -120,7 +136,7 @@ async def run_step( # 1) Structured executor attempt. action = self._propose_structured_action(task_goal=task_goal, step=step, snap=snap) - await self._execute_action(action=action, snap=snap) + await self._execute_action(action=action, snap=snap, step_goal=step.goal) ok = await self._apply_verifications(step=step) if ok: outcome = "ok" @@ -268,7 +284,7 @@ async def act_once_result( temperature=0.0, ) action = self._extract_action_from_text(resp.content) - await self._execute_action(action=action, snap=snap) + await self._execute_action(action=action, snap=snap, step_goal=step.goal) return ActOnceResult(action=action, snap=snap, used_vision=True) # Structured snapshot-first proposal. @@ -290,7 +306,7 @@ async def act_once_result( resp = self._structured_llm.query_llm(dom_context, combined_goal) action = self._structured_llm.extract_action(resp.content) - await self._execute_action(action=action, snap=snap) + await self._execute_action(action=action, snap=snap, step_goal=step.goal) return ActOnceResult(action=action, snap=snap, used_vision=False) async def _run_hook( @@ -367,7 +383,7 @@ async def _vision_executor_attempt( ) action = self._extract_action_from_text(resp.content) - await self._execute_action(action=action, snap=snap) + await self._execute_action(action=action, snap=snap, step_goal=step.goal) # Important: vision executor fallback is a *retry* of the same step. # Clear prior step assertions so required_assertions_passed reflects the final attempt. self.runtime.flush_assertions() @@ -397,21 +413,28 @@ async def _apply_verifications(self, *, step: RuntimeStep) -> bool: # Respect required verifications semantics. return self.runtime.required_assertions_passed() and all_ok - async def _execute_action(self, *, action: str, snap: Snapshot | None) -> None: + async def _execute_action(self, *, action: str, snap: Snapshot | None, step_goal: str | None) -> None: url = None try: url = await self.runtime.get_url() except Exception: url = getattr(snap, "url", None) - await self.runtime.record_action(action, url=url) - # Coordinate-backed execution (by snapshot id or explicit coordinates). kind, payload = self._parse_action(action) if kind == "finish": + await self.runtime.record_action(action, url=url) return + await self._authorize_pre_action_or_raise( + action=action, + kind=kind, + url=url, + step_goal=step_goal, + ) + await self.runtime.record_action(action, url=url) + if kind == "press": await self._press_key_best_effort(payload["key"]) await self._stabilize_best_effort() @@ -449,6 +472,65 @@ async def _execute_action(self, *, action: str, snap: Snapshot | None) -> None: raise ValueError(f"Unknown action kind: {kind}") + async def _authorize_pre_action_or_raise( + self, + *, + action: str, + kind: str, + url: str | None, + step_goal: str | None, + ) -> None: + if self.pre_action_authorizer is None: + return + principal_id = self.authority_principal_id or "agent:sdk-python" + action_name = self._authority_action_name(kind) + resource = url or "about:blank" + intent = step_goal or action + + try: + request = self.runtime.build_authority_action_request( + principal_id=principal_id, + action=action_name, + resource=resource, + intent=intent, + tenant_id=self.authority_tenant_id, + session_id=self.authority_session_id, + ) + decision_raw = self.pre_action_authorizer(request) + if inspect.isawaitable(decision_raw): + decision_raw = await decision_raw + decision = self._normalize_authority_decision(decision_raw) + if decision.allowed: + return + raise RuntimeError( + f"pre_action_authority_denied: {decision.reason or 'denied_by_authority'}" + ) + except Exception: + if self.authority_fail_closed: + raise + return + + def _normalize_authority_decision(self, value: Any) -> PreActionAuthorityDecision: + if isinstance(value, PreActionAuthorityDecision): + return value + allowed_attr = getattr(value, "allowed", None) + if isinstance(allowed_attr, bool): + reason_attr = getattr(value, "reason", None) + reason = str(reason_attr) if isinstance(reason_attr, str) and reason_attr else None + return PreActionAuthorityDecision(allowed=allowed_attr, reason=reason) + if isinstance(value, bool): + return PreActionAuthorityDecision(allowed=value) + raise RuntimeError("invalid_pre_action_authority_decision") + + def _authority_action_name(self, kind: str) -> str: + if kind in {"click", "click_xy", "click_rect"}: + return "browser.click" + if kind == "type": + return "browser.type" + if kind == "press": + return "browser.press" + return "browser.unknown" + async def _stabilize_best_effort(self) -> None: try: await self.runtime.backend.wait_ready_state(state="interactive", timeout_ms=15000) diff --git a/pyproject.toml b/pyproject.toml index 08679f3..ca737d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "httpx>=0.25.0", # For async API calls "playwright-stealth>=1.0.6", # Bot evasion and stealth mode "markdownify>=0.11.6", # Enhanced HTML to Markdown conversion + "predicate-contracts", ] [project.urls] diff --git a/tests/test_authority_contracts_adapter.py b/tests/test_authority_contracts_adapter.py new file mode 100644 index 0000000..14c62e3 --- /dev/null +++ b/tests/test_authority_contracts_adapter.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from predicate.agent_runtime import AgentRuntime +from predicate.integrations.authority import ( + AuthorityActionInput, + build_action_request_from_runtime, + to_verification_evidence, +) +from predicate.models import Snapshot + + +class _MockBackend: + async def get_url(self) -> str: + return "https://example.com" + + +class _MockTracer: + # pylint: disable=unused-argument + def emit(self, event_type: str, data: dict, step_id: str | None = None) -> None: + return + + +class _RuntimeStub: + def __init__(self) -> None: + self.step_id = "step-7" + self.last_snapshot = Snapshot( + status="success", + url="https://example.com/checkout", + timestamp="2026-02-17T00:00:00Z", + elements=[], + ) + + def get_assertions_for_step_end(self) -> dict[str, object]: + return { + "assertions": [ + {"label": "on_checkout", "passed": True, "required": True, "reason": ""}, + {"label": "has_total", "passed": False, "required": True, "reason": "selector_missing"}, + ] + } + + +def test_to_verification_evidence_maps_assertions() -> None: + evidence = to_verification_evidence( + [ + {"label": "a", "passed": True, "required": True, "reason": ""}, + {"label": "b", "passed": False, "required": False, "reason": "missing"}, + ] + ) + assert len(evidence.signals) == 2 + assert evidence.signals[0].label == "a" + assert evidence.signals[0].status.value == "passed" + assert evidence.signals[1].label == "b" + assert evidence.signals[1].status.value == "failed" + assert evidence.signals[1].reason == "missing" + + +def test_build_action_request_from_runtime_exports_contracts() -> None: + runtime = _RuntimeStub() + request = build_action_request_from_runtime( + runtime=runtime, + action_input=AuthorityActionInput( + principal_id="agent:checkout", + action="http.post", + resource="https://api.vendor.com/orders", + intent="submit order", + tenant_id="tenant-a", + session_id="session-1", + ), + ) + assert request.principal.principal_id == "agent:checkout" + assert request.action_spec.intent == "submit order" + assert request.state_evidence.source == "sdk-python" + assert request.state_evidence.state_hash.startswith("sha256:") + assert len(request.verification_evidence.signals) == 2 + + +def test_agent_runtime_build_authority_action_request() -> None: + runtime = AgentRuntime(backend=_MockBackend(), tracer=_MockTracer()) + runtime.step_id = "step-1" + runtime.last_snapshot = Snapshot( + status="success", + url="https://example.com", + timestamp="2026-02-17T00:00:00Z", + elements=[], + ) + runtime._assertions_this_step = [ # pylint: disable=protected-access + {"label": "has_heading", "passed": True, "required": True, "reason": ""} + ] + request = runtime.build_authority_action_request( + principal_id="agent:web", + action="browser.click", + resource="https://example.com", + intent="click checkout", + ) + assert request.principal.principal_id == "agent:web" + assert request.action_spec.action == "browser.click" + assert request.state_evidence.state_hash.startswith("sha256:") + assert request.verification_evidence.signals[0].label == "has_heading" diff --git a/tests/unit/test_runtime_agent.py b/tests/unit/test_runtime_agent.py index 81dca3e..50eec36 100644 --- a/tests/unit/test_runtime_agent.py +++ b/tests/unit/test_runtime_agent.py @@ -278,6 +278,62 @@ async def fake_snapshot(**_kwargs): assert ended[0].error is None +@pytest.mark.asyncio +async def test_runtime_agent_pre_action_authority_denied_fail_closed() -> None: + backend = MockBackend() + tracer = MockTracer() + runtime = AgentRuntime(backend=backend, tracer=tracer) + snapshot = make_snapshot(url="https://example.com/start", elements=[make_clickable_element(1)]) + + async def fake_snapshot(**_kwargs): + runtime.last_snapshot = snapshot + return snapshot + + runtime.snapshot = AsyncMock(side_effect=fake_snapshot) # type: ignore[method-assign] + executor = ProviderStub(responses=["CLICK(1)"]) + agent = RuntimeAgent( + runtime=runtime, + executor=executor, + pre_action_authorizer=lambda _request: False, + authority_fail_closed=True, + ) + step = RuntimeStep(goal="Click denied", verifications=[], max_snapshot_attempts=1) + + with pytest.raises(RuntimeError, match="pre_action_authority_denied"): + await agent.run_step(task_goal="test", step=step) + assert len(backend.mouse_clicks) == 0 + + +@pytest.mark.asyncio +async def test_runtime_agent_pre_action_authority_error_fail_open() -> None: + backend = MockBackend() + tracer = MockTracer() + runtime = AgentRuntime(backend=backend, tracer=tracer) + snapshot = make_snapshot(url="https://example.com/start", elements=[make_clickable_element(1)]) + + async def fake_snapshot(**_kwargs): + runtime.last_snapshot = snapshot + return snapshot + + runtime.snapshot = AsyncMock(side_effect=fake_snapshot) # type: ignore[method-assign] + executor = ProviderStub(responses=["CLICK(1)"]) + + def _broken_authorizer(_request): + raise RuntimeError("authority_backend_unavailable") + + agent = RuntimeAgent( + runtime=runtime, + executor=executor, + pre_action_authorizer=_broken_authorizer, + authority_fail_closed=False, + ) + step = RuntimeStep(goal="Click fail-open", verifications=[], max_snapshot_attempts=1) + + ok = await agent.run_step(task_goal="test", step=step) + assert ok is True + assert len(backend.mouse_clicks) == 1 + + @pytest.mark.asyncio async def test_snapshot_limit_ramp_increases_limit_on_low_confidence() -> None: backend = MockBackend()