From ee256878a73c55170235aeef957064255a349bea Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Wed, 1 Jul 2026 11:12:40 -0700 Subject: [PATCH] feat(peer): transport-agnostic inbound peer request handler Add ca2a_runtime.peer.handle_peer_request with PeerRequest / PeerResult. It composes the full inbound pipeline off a parsed request: verify the delegation chain, intersect the delegated scope with the local policy and enforce the requested capability, open any sealed payload with the enclave-bound key, and emit a linked provenance record. Fails closed on any verification or authorization failure, and never returns a payload it could not open. A transport (an A2A server) parses its wire format into a PeerRequest and calls this. cA2A does not define the transport itself, only what the peer does with a parsed request (profile, not protocol), so raw A2A wire parsing is left to implementers. Suite: 104 passed, 99% coverage. Closes #8 Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 1 + docs/spec/call-graph.md | 5 +- src/ca2a_runtime/peer.py | 215 +++++++++++++++++++++----------- tests/unit/test_peer_request.py | 66 ++++++++++ 4 files changed, 212 insertions(+), 75 deletions(-) create mode 100644 tests/unit/test_peer_request.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3069ab7..2ce1e27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Peer-call enforcement decision core (Tier 2): `ca2a_runtime.policy.LocalPolicy` and `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`). Effective permission is the delegated leaf scope intersected with the callee's local policy; a granted call emits a linked provenance record. New error `SCOPE_NOT_PERMITTED`. Claim C3 (scope-policy intersection) is now a validated experiment. Cedar-engine binding of the local policy and live A2A transport wiring remain open. - Sealed peer channel (Tier 2): `ca2a_runtime.channel` (`SealedChannel`, `generate_channel_keypair`, `open_sealed`). HPKE-style X25519 -> HKDF-SHA256 -> ChaCha20-Poly1305 sealing a payload to the peer's attested key; only the peer's private key opens it, and a wrong key or tampered ciphertext fails closed. Claim C4 (sealed-payload confidentiality) is now a validated experiment at the cryptographic layer. The enclave-binding of the private key (a hardware property) and live-path wiring remain open. - Cross-operator attestation (Claim C6) validated in software: a two-operator harness composing the SEV-SNP verifier, measurement pinning, and the sealed channel demonstrates independent keys, mutual attestation, confidential cross-operator delegation, and binary-swap detection. Synthetic report vectors (a genuine report needs SEV-SNP hardware); real hardware end to end remains open. **All six claims (C1-C6) are now validated experiments.** +- Transport-agnostic inbound peer request handler: `ca2a_runtime.peer.handle_peer_request` with `PeerRequest` / `PeerResult`. Composes the full pipeline (verify chain, intersect scope and enforce, open a sealed payload with the enclave key, emit a linked provenance record) fail-closed. A transport parses its wire format into a `PeerRequest`; cA2A does not define the transport (profile, not protocol). - RFC 8785 (JSON Canonicalization Scheme) canonicalization: `ca2a_runtime.canonical.canonicalize`. Credential and provenance bodies are now signed over the JCS encoding (UTF-16 key ordering, JCS string escaping, literal non-ASCII, shortest-decimal integers), so cA2A signatures are cross-verifiable with agent-manifest. ASCII credentials are byte-identical to the previous encoding, so existing signatures still verify. - Repository scaffold: governance, CI/CD, docs framework, and packaging at parity with the agentrust-io house standard diff --git a/docs/spec/call-graph.md b/docs/spec/call-graph.md index f60a0e0..084cb68 100644 --- a/docs/spec/call-graph.md +++ b/docs/spec/call-graph.md @@ -2,7 +2,7 @@ When a cA2A peer receives an inbound A2A task, it runs a fixed sequence of checks before it acts on the task and after it acts. The order is not arbitrary: cheap, offline, deterministic checks run first, and each step fails closed so a later step never runs against unverified input. This page states the full intended enforcement order and marks, for each step, what the code does today versus what is design. -Steps 1 (chain verification), 3 (scope intersection), and 5 (provenance emission) are implemented as a decision core in `ca2a_runtime.peer.enforce_peer_call`. Step 2 has a SEV-SNP verifier (`ca2a_verify.sev_snp`) but is not yet wired into the call path and needs real hardware to produce a report; step 4 (sealing) is not implemented. What remains for a live deployment is wiring the decision core to an actual inbound A2A transport, plus the attestation and sealing steps. See [LIMITATIONS.md](../../LIMITATIONS.md) and [ROADMAP.md](../../ROADMAP.md). +Steps 1 (chain verification), 3 (scope intersection), 4 (opening a sealed payload), and 5 (provenance emission) are composed into one transport-agnostic handler, `ca2a_runtime.peer.handle_peer_request`, which takes a parsed `PeerRequest` and runs the pipeline fail-closed. Step 2 has a SEV-SNP verifier (`ca2a_verify.sev_snp`), used counterparty-side to seal to a peer before sending (see the cross-operator flow), but is not part of the callee handler and needs real hardware to produce a report. What remains for a live deployment is a transport that parses actual A2A wire messages into a `PeerRequest`; cA2A leaves that to implementers by design (profile, not protocol). See [LIMITATIONS.md](../../LIMITATIONS.md) and [ROADMAP.md](../../ROADMAP.md). ## Decision flow @@ -39,7 +39,8 @@ If any step raises, the call is denied. Absence of evidence is denial, not a war | 3. Scope intersection | Delegated scope intersected with local policy | `ca2a_runtime.peer.effective_scope`, `enforce_peer_call` | Implemented (decision core); Cedar engine binding pending (#10) | | 4. Payload sealing | Payload sealed to the peer's attested key | `SealedChannel.seal`, `open_sealed` | Implemented (crypto); binding to a verified report on the live path pending | | 5. Provenance record | A `DelegationRecord` emitted and linked to its parent | `enforce_peer_call`, `record_for`, `verify_dag` | Implemented (emitted by the decision core) | -| Live A2A transport wiring | The decision core runs off an actual inbound A2A request | (design) | Pending, Tier 2 | +| Inbound pipeline handler | Verify, enforce, open sealed payload, emit record off a parsed request | `handle_peer_request`, `PeerRequest` | Implemented (transport-agnostic) | +| A2A wire parsing into a `PeerRequest` | Parse actual A2A extension fields into the handler's input | (implementer/transport) | Left to implementers by design | ## Step 1: verify the delegation chain (implemented) diff --git a/src/ca2a_runtime/peer.py b/src/ca2a_runtime/peer.py index cf9a9e2..8892d72 100644 --- a/src/ca2a_runtime/peer.py +++ b/src/ca2a_runtime/peer.py @@ -1,73 +1,142 @@ -"""Inbound peer-call enforcement: the decision the callee makes before it acts. - -When a peer presents a delegation chain and requests a capability, the callee: - -1. verifies the chain (signature, continuity, attenuation, depth, replay); -2. computes the effective scope as the leaf's delegated scope intersected with - the callee's local policy; -3. enforces: the requested capability must be in the effective scope; -4. emits a provenance record for the accepted hop, linked to its parent. - -This module is the enforcement decision core. Wiring it to a live A2A transport -(accepting the credential off an actual inbound request) is tracked separately; -attestation of the peer and sealing of the payload are Tier 2/3 and are not part -of this decision. -""" - -from __future__ import annotations - -from dataclasses import dataclass - -from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain -from ca2a_runtime.errors import ScopeNotPermitted -from ca2a_runtime.policy import LocalPolicy -from ca2a_runtime.provenance import DelegationRecord, record_for - - -def effective_scope( - chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8 -) -> frozenset[str]: - """Verify the chain and return the effective scope (delegated ∩ local policy). - - Raises the relevant CA2AError if the chain does not verify. - """ - verify_chain(chain, max_depth=max_depth) - return policy.intersect(chain[-1].scope) - - -@dataclass(frozen=True) -class PeerDecision: - """The result of an accepted peer call.""" - - effective_scope: frozenset[str] - granted_capability: str - record: DelegationRecord - - -def enforce_peer_call( - chain: list[DelegationCredential], - requested_capability: str, - *, - policy: LocalPolicy, - record_id: str, - parent_record_hash: str | None = None, - max_depth: int = 8, -) -> PeerDecision: - """Verify, intersect with local policy, enforce, and emit a provenance record. - - Raises ScopeNotPermitted if the requested capability is not in the effective - scope, and the underlying CA2AError if the chain does not verify. On accept, - returns a PeerDecision carrying the linked provenance record. - """ - effective = effective_scope(chain, policy, max_depth=max_depth) - if requested_capability not in effective: - raise ScopeNotPermitted( - f"capability {requested_capability!r} is not in the effective scope", - detail=f"effective={sorted(effective)}", - ) - record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash) - return PeerDecision( - effective_scope=effective, - granted_capability=requested_capability, - record=record, - ) +"""Inbound peer-call enforcement: the decision the callee makes before it acts. + +When a peer presents a delegation chain and requests a capability, the callee: + +1. verifies the chain (signature, continuity, attenuation, depth, replay); +2. computes the effective scope as the leaf's delegated scope intersected with + the callee's local policy; +3. enforces: the requested capability must be in the effective scope; +4. emits a provenance record for the accepted hop, linked to its parent. + +`enforce_peer_call` is the enforcement decision core. `handle_peer_request` +composes it into the full transport-agnostic inbound pipeline: verify, enforce, +open any sealed payload with the enclave key, and emit a provenance record. A +transport (an A2A server) parses its wire format into a `PeerRequest` and calls +this; cA2A does not define the transport itself, only what the peer does with a +parsed request. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + +from ca2a_runtime.channel import open_sealed +from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain +from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError +from ca2a_runtime.policy import LocalPolicy +from ca2a_runtime.provenance import DelegationRecord, record_for + + +def effective_scope( + chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8 +) -> frozenset[str]: + """Verify the chain and return the effective scope (delegated ∩ local policy). + + Raises the relevant CA2AError if the chain does not verify. + """ + verify_chain(chain, max_depth=max_depth) + return policy.intersect(chain[-1].scope) + + +@dataclass(frozen=True) +class PeerDecision: + """The result of an accepted peer call.""" + + effective_scope: frozenset[str] + granted_capability: str + record: DelegationRecord + + +def enforce_peer_call( + chain: list[DelegationCredential], + requested_capability: str, + *, + policy: LocalPolicy, + record_id: str, + parent_record_hash: str | None = None, + max_depth: int = 8, +) -> PeerDecision: + """Verify, intersect with local policy, enforce, and emit a provenance record. + + Raises ScopeNotPermitted if the requested capability is not in the effective + scope, and the underlying CA2AError if the chain does not verify. On accept, + returns a PeerDecision carrying the linked provenance record. + """ + effective = effective_scope(chain, policy, max_depth=max_depth) + if requested_capability not in effective: + raise ScopeNotPermitted( + f"capability {requested_capability!r} is not in the effective scope", + detail=f"effective={sorted(effective)}", + ) + record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash) + return PeerDecision( + effective_scope=effective, + granted_capability=requested_capability, + record=record, + ) + + +@dataclass(frozen=True) +class PeerRequest: + """A transport-agnostic inbound peer request. + + A transport (an A2A server) parses its wire format into this shape and hands + it to ``handle_peer_request``. cA2A does not define the transport; it defines + what a peer does with the request once parsed. + """ + + chain: list[DelegationCredential] + requested_capability: str + record_id: str + sealed_payload: bytes | None = None + parent_record_hash: str | None = None + + +@dataclass(frozen=True) +class PeerResult: + """The outcome of handling an accepted peer request.""" + + effective_scope: frozenset[str] + granted_capability: str + record: DelegationRecord + payload: bytes | None + + +def handle_peer_request( + request: PeerRequest, + *, + policy: LocalPolicy, + enclave_private_key: X25519PrivateKey | None = None, + max_depth: int = 8, +) -> PeerResult: + """Run the full inbound pipeline for a parsed peer request. + + Verifies the delegation chain, intersects the delegated scope with the local + policy and enforces the requested capability, opens any sealed payload with + the enclave-bound key, and emits a linked provenance record. Fails closed: + any verification or authorization failure raises the relevant CA2AError and + no payload is returned. + """ + decision = enforce_peer_call( + request.chain, + request.requested_capability, + policy=policy, + record_id=request.record_id, + parent_record_hash=request.parent_record_hash, + max_depth=max_depth, + ) + + payload: bytes | None = None + if request.sealed_payload is not None: + if enclave_private_key is None: + raise SealedChannelError("a sealed payload was sent but no enclave key is available") + payload = open_sealed(request.sealed_payload, enclave_private_key) + + return PeerResult( + effective_scope=decision.effective_scope, + granted_capability=decision.granted_capability, + record=decision.record, + payload=payload, + ) diff --git a/tests/unit/test_peer_request.py b/tests/unit/test_peer_request.py new file mode 100644 index 0000000..213855d --- /dev/null +++ b/tests/unit/test_peer_request.py @@ -0,0 +1,66 @@ +"""Tests for the transport-agnostic inbound peer request handler.""" + +from __future__ import annotations + +import pytest + +from ca2a_runtime.channel import SealedChannel, generate_channel_keypair +from ca2a_runtime.errors import ScopeEscalation, ScopeNotPermitted, SealedChannelError +from ca2a_runtime.peer import PeerRequest, PeerResult, handle_peer_request +from ca2a_runtime.policy import LocalPolicy +from ca2a_runtime.provenance import verify_dag +from tests.unit.conftest import build_chain + + +def _chain(): + return build_chain([frozenset({"read", "write", "admin"}), frozenset({"read", "write"})]) + + +def test_handles_request_without_payload() -> None: + req = PeerRequest(chain=_chain(), requested_capability="read", record_id="rec-0") + result = handle_peer_request(req, policy=LocalPolicy.of(["read", "audit"])) + assert isinstance(result, PeerResult) + assert result.granted_capability == "read" + assert result.effective_scope == frozenset({"read"}) + assert result.payload is None + assert verify_dag([result.record]) == [result.record] + + +def test_handles_request_with_sealed_payload() -> None: + priv, pub = generate_channel_keypair() + payload = b"do the thing" + req = PeerRequest( + chain=_chain(), requested_capability="read", record_id="rec-0", + sealed_payload=SealedChannel(pub).seal(payload), + ) + result = handle_peer_request( + req, policy=LocalPolicy.of(["read"]), enclave_private_key=priv + ) + assert result.payload == payload + + +def test_denied_capability_raises_before_payload() -> None: + priv, pub = generate_channel_keypair() + req = PeerRequest( + chain=_chain(), requested_capability="admin", record_id="rec-0", + sealed_payload=SealedChannel(pub).seal(b"secret"), + ) + with pytest.raises(ScopeNotPermitted): + handle_peer_request(req, policy=LocalPolicy.of(["read"]), enclave_private_key=priv) + + +def test_sealed_payload_without_key_fails_closed() -> None: + _, pub = generate_channel_keypair() + req = PeerRequest( + chain=_chain(), requested_capability="read", record_id="rec-0", + sealed_payload=SealedChannel(pub).seal(b"secret"), + ) + with pytest.raises(SealedChannelError): + handle_peer_request(req, policy=LocalPolicy.of(["read"])) # no enclave key + + +def test_invalid_chain_rejected() -> None: + bad = build_chain([frozenset({"read"}), frozenset({"read", "write"})]) # escalation + req = PeerRequest(chain=bad, requested_capability="read", record_id="rec-0") + with pytest.raises(ScopeEscalation): + handle_peer_request(req, policy=LocalPolicy.of(["read", "write"]))