diff --git a/CHANGELOG.md b/CHANGELOG.md index 234cc1c..d0206a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - cA2A-compatible conformance suite: `tests/conformance/` with a normative README (stable MUST/SHOULD test IDs across delegation, scope-policy, attestation, sealed channel, provenance, and the inbound pipeline) and runnable checks that exercise every MUST-level requirement. Wired into CI and documented at `docs/spec/conformance.md`; ties to the CHARTER trademark language. - TPM 2.0 attestation backend: `ca2a_runtime.tee.tpm` (TPMS_ATTEST parsing, `TpmProvider`) and `ca2a_verify.tpm.verify_tpm_quote` (AK chain to a caller-supplied vendor root, AK signature over the attest blob (ECDSA or RSA), magic/type checks, and qualifying-data/PCR-digest binding), all fail-closed. Synthetic-vector validated; TPM AK roots are per-vendor so the caller supplies its trusted roots. Quote generation requires a real TPM. - Intel TDX attestation backend: `ca2a_runtime.tee.tdx` (DCAP Quote v4 parsing, `TdxProvider`) and `ca2a_verify.tdx.verify_tdx_quote` (PCK chain to a trusted Intel root, QE report signature, attestation-key binding, quote signature, and MRTD/report-data binding), all fail-closed. Chain path validated against the genuine Intel SGX Root CA; multi-level signature path validated with a synthetic self-consistent quote. Quote generation requires a real TDX guest. +- Real Cedar policy engine binding: `ca2a_runtime.cedar.CedarPolicy` (backed by `cedarpy`, the engine cMCP runs) evaluates each capability as a Cedar authorization request. A new `ca2a_runtime.policy.Policy` protocol makes `LocalPolicy` (allow set) and `CedarPolicy` interchangeable in the peer path. Adds the `cedarpy` dependency. - Transport-agnostic inbound peer request handler: `ca2a_runtime.peer.handle_peer_request` with `PeerRequest` / `PeerResult`. Composes the full pipeline (verify chain, intersect scope and enforce, open a sealed payload with the enclave key, emit a linked provenance record) fail-closed. A transport parses its wire format into a `PeerRequest`; cA2A does not define the transport (profile, not protocol). - RFC 8785 (JSON Canonicalization Scheme) canonicalization: `ca2a_runtime.canonical.canonicalize`. Credential and provenance bodies are now signed over the JCS encoding (UTF-16 key ordering, JCS string escaping, literal non-ASCII, shortest-decimal integers), so cA2A signatures are cross-verifiable with agent-manifest. ASCII credentials are byte-identical to the previous encoding, so existing signatures still verify. - Repository scaffold: governance, CI/CD, docs framework, and packaging at parity with the agentrust-io house standard diff --git a/ROADMAP.md b/ROADMAP.md index 26578df..af79966 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -22,7 +22,7 @@ Already implemented and tested elsewhere; cA2A depends on it rather than reimple ## v0.2: Runtime enforcement and sealed channel -- Runtime peer-delegation enforcement: **decision core landed** (`ca2a_runtime.peer.enforce_peer_call`: verify chain, intersect delegated scope with local policy, enforce, emit provenance record; claim C3 validated). Remaining: bind a Cedar policy engine as the local policy, and wire the decision core to a live A2A transport (Tier 2) +- Runtime peer-delegation enforcement: **decision core landed** (`ca2a_runtime.peer.enforce_peer_call`: verify chain, intersect delegated scope with local policy, enforce, emit provenance record; claim C3 validated), now with a **real Cedar policy engine** option (`ca2a_runtime.cedar.CedarPolicy`) alongside the allow-set `LocalPolicy`. Remaining: wire the decision core to a live A2A transport (Tier 2) - Sealed peer channel: **landed** (`ca2a_runtime.channel`: HPKE-style X25519 -> HKDF-SHA256 -> ChaCha20-Poly1305 sealing to the peer's attested key; claim C4 validated). Remaining: bind the seal to a verified attestation report on a live call, and rely on the enclave to hold the private key (hardware property) - Linked runtime evidence: each hop's TRACE record references the parent record hash and delegation credential id, producing a verifiable delegation DAG (Tier 2) diff --git a/docs/spec/cedar-policy.md b/docs/spec/cedar-policy.md index b47560a..c77dfd9 100644 --- a/docs/spec/cedar-policy.md +++ b/docs/spec/cedar-policy.md @@ -6,7 +6,19 @@ When a peer accepts a delegated task, two independent trust decisions meet. The effective = delegated_scope ∩ local_policy_allow ``` -**Status.** The intersection semantics are **implemented** as an enforcement decision core in `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`) against a `ca2a_runtime.policy.LocalPolicy` capability allow set, and validated by experiment C3. Two pieces remain: binding a full **Cedar policy engine** as the local policy (this page's title; the semantics are policy-language-agnostic and the allow-set model stands in today, tracked as #10), and wiring the decision core to a **live A2A transport** rather than a direct call. See [call-graph.md](call-graph.md), [ROADMAP.md](../../ROADMAP.md), and [LIMITATIONS.md](../../LIMITATIONS.md). +**Status.** The intersection semantics are **implemented** as an enforcement decision core in `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`), and the local policy can be either a capability allow set (`ca2a_runtime.policy.LocalPolicy`) or a **real Cedar policy engine** (`ca2a_runtime.cedar.CedarPolicy`, backed by `cedarpy`, the same engine cMCP runs). Both satisfy the `ca2a_runtime.policy.Policy` protocol, so they are interchangeable in the peer path. Validated by experiment C3 and the Cedar unit tests. What remains is wiring the decision core to a **live A2A transport** rather than a direct call. See [call-graph.md](call-graph.md) and [ROADMAP.md](../../ROADMAP.md). + +## Cedar policy + +`CedarPolicy` evaluates each capability as a Cedar authorization request whose action id is the capability name; a capability is permitted iff Cedar returns `Allow`. The effective scope is the delegated leaf scope intersected with the capabilities Cedar permits. + +```python +from ca2a_runtime.cedar import CedarPolicy +from ca2a_runtime.peer import effective_scope + +policy = CedarPolicy('permit(principal, action == Action::"read", resource);') +effective_scope(chain, policy) # delegated leaf scope AND what Cedar allows +``` ## Why an intersection diff --git a/pyproject.toml b/pyproject.toml index aba9592..be975c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ requires-python = ">=3.11" dependencies = [ "cryptography>=42.0", "pyyaml>=6.0", + "cedarpy>=4.8", ] [project.optional-dependencies] @@ -91,5 +92,5 @@ warn_return_any = false files = ["src/ca2a_runtime", "src/ca2a_verify"] [[tool.mypy.overrides]] -module = ["ca2a_runtime.tee.sev_snp", "ca2a_runtime.tee.tdx", "ca2a_runtime.tee.tpm", "ca2a_verify.tdx", "ca2a_verify.tpm", "ca2a_runtime.channel.sealed"] +module = ["ca2a_runtime.tee.sev_snp", "ca2a_runtime.tee.tdx", "ca2a_runtime.tee.tpm", "ca2a_runtime.cedar", "ca2a_verify.tdx", "ca2a_verify.tpm", "ca2a_runtime.channel.sealed"] warn_unused_ignores = false diff --git a/src/ca2a_runtime/cedar.py b/src/ca2a_runtime/cedar.py new file mode 100644 index 0000000..e932524 --- /dev/null +++ b/src/ca2a_runtime/cedar.py @@ -0,0 +1,46 @@ +"""A local policy backed by a real Cedar policy engine. + +`CedarPolicy` evaluates the callee's Cedar policy to decide which capabilities a +peer may exercise. It satisfies the `ca2a_runtime.policy.Policy` protocol, so it +is a drop-in for `LocalPolicy` in the peer path: the effective scope on an +inbound call is the delegated leaf scope intersected with what Cedar permits. + +Each capability is evaluated as a Cedar authorization request whose action id is +the capability name; a capability is permitted iff Cedar returns Allow. This +reuses the same policy engine cMCP runs (see docs/spec/cedar-policy.md). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from cedarpy import Decision, is_authorized + + +@dataclass(frozen=True) +class CedarPolicy: + """A local policy backed by a Cedar policy set.""" + + policies: str + principal_type: str = "Agent" + principal_id: str = "peer" + resource_type: str = "Task" + resource_id: str = "task" + + def _request(self, capability: str) -> dict[str, Any]: + return { + "principal": {"type": self.principal_type, "id": self.principal_id}, + "action": {"type": "Action", "id": capability}, + "resource": {"type": self.resource_type, "id": self.resource_id}, + "context": {}, + } + + def permits(self, capability: str) -> bool: + """Return True iff Cedar authorizes an action of this capability's name.""" + result = is_authorized(self._request(capability), self.policies, []) + return bool(result.decision == Decision.Allow) + + def intersect(self, delegated: frozenset[str]) -> frozenset[str]: + """Return the effective scope: delegated capabilities Cedar permits.""" + return frozenset(cap for cap in delegated if self.permits(cap)) diff --git a/src/ca2a_runtime/peer.py b/src/ca2a_runtime/peer.py index 8892d72..addda54 100644 --- a/src/ca2a_runtime/peer.py +++ b/src/ca2a_runtime/peer.py @@ -1,142 +1,142 @@ -"""Inbound peer-call enforcement: the decision the callee makes before it acts. - -When a peer presents a delegation chain and requests a capability, the callee: - -1. verifies the chain (signature, continuity, attenuation, depth, replay); -2. computes the effective scope as the leaf's delegated scope intersected with - the callee's local policy; -3. enforces: the requested capability must be in the effective scope; -4. emits a provenance record for the accepted hop, linked to its parent. - -`enforce_peer_call` is the enforcement decision core. `handle_peer_request` -composes it into the full transport-agnostic inbound pipeline: verify, enforce, -open any sealed payload with the enclave key, and emit a provenance record. A -transport (an A2A server) parses its wire format into a `PeerRequest` and calls -this; cA2A does not define the transport itself, only what the peer does with a -parsed request. -""" - -from __future__ import annotations - -from dataclasses import dataclass - -from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey - -from ca2a_runtime.channel import open_sealed -from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain -from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError -from ca2a_runtime.policy import LocalPolicy -from ca2a_runtime.provenance import DelegationRecord, record_for - - -def effective_scope( - chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8 -) -> frozenset[str]: - """Verify the chain and return the effective scope (delegated ∩ local policy). - - Raises the relevant CA2AError if the chain does not verify. - """ - verify_chain(chain, max_depth=max_depth) - return policy.intersect(chain[-1].scope) - - -@dataclass(frozen=True) -class PeerDecision: - """The result of an accepted peer call.""" - - effective_scope: frozenset[str] - granted_capability: str - record: DelegationRecord - - -def enforce_peer_call( - chain: list[DelegationCredential], - requested_capability: str, - *, - policy: LocalPolicy, - record_id: str, - parent_record_hash: str | None = None, - max_depth: int = 8, -) -> PeerDecision: - """Verify, intersect with local policy, enforce, and emit a provenance record. - - Raises ScopeNotPermitted if the requested capability is not in the effective - scope, and the underlying CA2AError if the chain does not verify. On accept, - returns a PeerDecision carrying the linked provenance record. - """ - effective = effective_scope(chain, policy, max_depth=max_depth) - if requested_capability not in effective: - raise ScopeNotPermitted( - f"capability {requested_capability!r} is not in the effective scope", - detail=f"effective={sorted(effective)}", - ) - record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash) - return PeerDecision( - effective_scope=effective, - granted_capability=requested_capability, - record=record, - ) - - -@dataclass(frozen=True) -class PeerRequest: - """A transport-agnostic inbound peer request. - - A transport (an A2A server) parses its wire format into this shape and hands - it to ``handle_peer_request``. cA2A does not define the transport; it defines - what a peer does with the request once parsed. - """ - - chain: list[DelegationCredential] - requested_capability: str - record_id: str - sealed_payload: bytes | None = None - parent_record_hash: str | None = None - - -@dataclass(frozen=True) -class PeerResult: - """The outcome of handling an accepted peer request.""" - - effective_scope: frozenset[str] - granted_capability: str - record: DelegationRecord - payload: bytes | None - - -def handle_peer_request( - request: PeerRequest, - *, - policy: LocalPolicy, - enclave_private_key: X25519PrivateKey | None = None, - max_depth: int = 8, -) -> PeerResult: - """Run the full inbound pipeline for a parsed peer request. - - Verifies the delegation chain, intersects the delegated scope with the local - policy and enforces the requested capability, opens any sealed payload with - the enclave-bound key, and emits a linked provenance record. Fails closed: - any verification or authorization failure raises the relevant CA2AError and - no payload is returned. - """ - decision = enforce_peer_call( - request.chain, - request.requested_capability, - policy=policy, - record_id=request.record_id, - parent_record_hash=request.parent_record_hash, - max_depth=max_depth, - ) - - payload: bytes | None = None - if request.sealed_payload is not None: - if enclave_private_key is None: - raise SealedChannelError("a sealed payload was sent but no enclave key is available") - payload = open_sealed(request.sealed_payload, enclave_private_key) - - return PeerResult( - effective_scope=decision.effective_scope, - granted_capability=decision.granted_capability, - record=decision.record, - payload=payload, - ) +"""Inbound peer-call enforcement: the decision the callee makes before it acts. + +When a peer presents a delegation chain and requests a capability, the callee: + +1. verifies the chain (signature, continuity, attenuation, depth, replay); +2. computes the effective scope as the leaf's delegated scope intersected with + the callee's local policy; +3. enforces: the requested capability must be in the effective scope; +4. emits a provenance record for the accepted hop, linked to its parent. + +`enforce_peer_call` is the enforcement decision core. `handle_peer_request` +composes it into the full transport-agnostic inbound pipeline: verify, enforce, +open any sealed payload with the enclave key, and emit a provenance record. A +transport (an A2A server) parses its wire format into a `PeerRequest` and calls +this; cA2A does not define the transport itself, only what the peer does with a +parsed request. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + +from ca2a_runtime.channel import open_sealed +from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain +from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError +from ca2a_runtime.policy import Policy +from ca2a_runtime.provenance import DelegationRecord, record_for + + +def effective_scope( + chain: list[DelegationCredential], policy: Policy, *, max_depth: int = 8 +) -> frozenset[str]: + """Verify the chain and return the effective scope (delegated ∩ local policy). + + Raises the relevant CA2AError if the chain does not verify. + """ + verify_chain(chain, max_depth=max_depth) + return policy.intersect(chain[-1].scope) + + +@dataclass(frozen=True) +class PeerDecision: + """The result of an accepted peer call.""" + + effective_scope: frozenset[str] + granted_capability: str + record: DelegationRecord + + +def enforce_peer_call( + chain: list[DelegationCredential], + requested_capability: str, + *, + policy: Policy, + record_id: str, + parent_record_hash: str | None = None, + max_depth: int = 8, +) -> PeerDecision: + """Verify, intersect with local policy, enforce, and emit a provenance record. + + Raises ScopeNotPermitted if the requested capability is not in the effective + scope, and the underlying CA2AError if the chain does not verify. On accept, + returns a PeerDecision carrying the linked provenance record. + """ + effective = effective_scope(chain, policy, max_depth=max_depth) + if requested_capability not in effective: + raise ScopeNotPermitted( + f"capability {requested_capability!r} is not in the effective scope", + detail=f"effective={sorted(effective)}", + ) + record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash) + return PeerDecision( + effective_scope=effective, + granted_capability=requested_capability, + record=record, + ) + + +@dataclass(frozen=True) +class PeerRequest: + """A transport-agnostic inbound peer request. + + A transport (an A2A server) parses its wire format into this shape and hands + it to ``handle_peer_request``. cA2A does not define the transport; it defines + what a peer does with the request once parsed. + """ + + chain: list[DelegationCredential] + requested_capability: str + record_id: str + sealed_payload: bytes | None = None + parent_record_hash: str | None = None + + +@dataclass(frozen=True) +class PeerResult: + """The outcome of handling an accepted peer request.""" + + effective_scope: frozenset[str] + granted_capability: str + record: DelegationRecord + payload: bytes | None + + +def handle_peer_request( + request: PeerRequest, + *, + policy: Policy, + enclave_private_key: X25519PrivateKey | None = None, + max_depth: int = 8, +) -> PeerResult: + """Run the full inbound pipeline for a parsed peer request. + + Verifies the delegation chain, intersects the delegated scope with the local + policy and enforces the requested capability, opens any sealed payload with + the enclave-bound key, and emits a linked provenance record. Fails closed: + any verification or authorization failure raises the relevant CA2AError and + no payload is returned. + """ + decision = enforce_peer_call( + request.chain, + request.requested_capability, + policy=policy, + record_id=request.record_id, + parent_record_hash=request.parent_record_hash, + max_depth=max_depth, + ) + + payload: bytes | None = None + if request.sealed_payload is not None: + if enclave_private_key is None: + raise SealedChannelError("a sealed payload was sent but no enclave key is available") + payload = open_sealed(request.sealed_payload, enclave_private_key) + + return PeerResult( + effective_scope=decision.effective_scope, + granted_capability=decision.granted_capability, + record=decision.record, + payload=payload, + ) diff --git a/src/ca2a_runtime/policy.py b/src/ca2a_runtime/policy.py index 3f04268..7026d45 100644 --- a/src/ca2a_runtime/policy.py +++ b/src/ca2a_runtime/policy.py @@ -1,34 +1,48 @@ -"""Local peer policy for scope intersection. - -A callee constrains what an inbound peer may do with a set of allowed -capabilities. The effective permission on a peer call is the delegated scope -intersected with this allow set, so a peer can never exercise more than both -its grant and the callee's local policy permit. - -The intersection semantics here are policy-language-agnostic. Binding a full -Cedar policy engine (as cMCP does) is tracked separately; this allow-set model -is the enforcement primitive the peer path uses today. -""" - -from __future__ import annotations - -from collections.abc import Iterable -from dataclasses import dataclass - - -@dataclass(frozen=True) -class LocalPolicy: - """A callee's local capability allow set.""" - - allow: frozenset[str] - - @classmethod - def of(cls, caps: Iterable[str]) -> LocalPolicy: - return cls(allow=frozenset(caps)) - - def intersect(self, delegated: frozenset[str]) -> frozenset[str]: - """Return the effective scope: delegated capabilities this policy allows.""" - return delegated & self.allow - - def permits(self, capability: str) -> bool: - return capability in self.allow +"""Local peer policy for scope intersection. + +A callee constrains what an inbound peer may do with a set of allowed +capabilities. The effective permission on a peer call is the delegated scope +intersected with this allow set, so a peer can never exercise more than both +its grant and the callee's local policy permit. + +The intersection semantics here are policy-language-agnostic. Binding a full +Cedar policy engine (as cMCP does) is tracked separately; this allow-set model +is the enforcement primitive the peer path uses today. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class Policy(Protocol): + """A local policy the peer path intersects a delegated scope against. + + Implemented by ``LocalPolicy`` (a capability allow set) and by + ``ca2a_runtime.cedar.CedarPolicy`` (a real Cedar policy engine). + """ + + def intersect(self, delegated: frozenset[str]) -> frozenset[str]: + """Return the effective scope: the delegated capabilities this policy allows.""" + ... + + +@dataclass(frozen=True) +class LocalPolicy: + """A callee's local capability allow set.""" + + allow: frozenset[str] + + @classmethod + def of(cls, caps: Iterable[str]) -> LocalPolicy: + return cls(allow=frozenset(caps)) + + def intersect(self, delegated: frozenset[str]) -> frozenset[str]: + """Return the effective scope: delegated capabilities this policy allows.""" + return delegated & self.allow + + def permits(self, capability: str) -> bool: + return capability in self.allow diff --git a/tests/unit/test_cedar.py b/tests/unit/test_cedar.py new file mode 100644 index 0000000..afbe71a --- /dev/null +++ b/tests/unit/test_cedar.py @@ -0,0 +1,57 @@ +"""Tests for the Cedar-backed local policy.""" + +from __future__ import annotations + +import pytest + +from ca2a_runtime.cedar import CedarPolicy +from ca2a_runtime.errors import ScopeNotPermitted +from ca2a_runtime.peer import effective_scope, enforce_peer_call +from ca2a_runtime.policy import Policy +from tests.unit.conftest import build_chain + +POLICIES = ( + 'permit(principal, action == Action::"read", resource);\n' + 'permit(principal, action == Action::"write", resource);\n' +) + + +def _chain(): + return build_chain([frozenset({"read", "write", "admin"}), frozenset({"read", "write"})]) + + +def test_cedar_policy_satisfies_protocol() -> None: + assert isinstance(CedarPolicy(POLICIES), Policy) + + +def test_permits_reflects_cedar_decision() -> None: + p = CedarPolicy(POLICIES) + assert p.permits("read") and p.permits("write") + assert not p.permits("admin") + + +def test_effective_scope_uses_cedar() -> None: + # leaf delegated {read, write}; Cedar allows {read, write}; effective {read, write}. + assert effective_scope(_chain(), CedarPolicy(POLICIES)) == frozenset({"read", "write"}) + + +def test_effective_scope_narrows_to_cedar_allow() -> None: + read_only = CedarPolicy('permit(principal, action == Action::"read", resource);') + assert effective_scope(_chain(), read_only) == frozenset({"read"}) + + +def test_enforce_denies_capability_cedar_forbids() -> None: + read_only = CedarPolicy('permit(principal, action == Action::"read", resource);') + with pytest.raises(ScopeNotPermitted): + enforce_peer_call(_chain(), "write", policy=read_only, record_id="r0") + + +def test_enforce_grants_capability_cedar_allows() -> None: + decision = enforce_peer_call(_chain(), "read", policy=CedarPolicy(POLICIES), record_id="r0") + assert decision.granted_capability == "read" + + +def test_capability_with_colon() -> None: + p = CedarPolicy('permit(principal, action == Action::"tool:read", resource);') + assert p.permits("tool:read") + assert not p.permits("tool:write")