Skip to content
46 changes: 45 additions & 1 deletion docs/agent-payment-guard-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ python3 examples/agent-payment-guard/payment_guard_service.py \
python3 examples/agent-payment-guard/payment_guard_service.py --host 127.0.0.1 --port 8787
```

CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the corresponding config file values when both are provided.
CLI flags (`--host`, `--port`, `--policy`, `--audit-path`, `--replay-store-path`) override the corresponding config file values when both are provided.

## Config file reference

Expand All @@ -29,6 +29,7 @@ CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the correspo
"require_signed_intent": true,
"policy_path": "examples/agent-payment-guard/payment_policy.json",
"audit_path": ".proofpath/audit.jsonl",
"replay_store_path": ".proofpath/replay-store.json",
"service": {
"host": "127.0.0.1",
"port": 8787
Expand All @@ -47,6 +48,7 @@ CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the correspo
| `require_signed_intent` | If `true`, requests without an `intent_envelope` are treated as strict and return `BLOCK / MISSING_INTENT_ENVELOPE`. Merged with per-request strictness; strictest wins. |
| `policy_path` | Path to the payment policy JSON file. |
| `audit_path` | Path to the JSONL audit log. |
| `replay_store_path` | Path to the local JSON replay store used for signed intent nonce replay protection. |
| `service.host` | Bind address. |
| `service.port` | Bind port. |
| `audit.hash_chain` | Enables hash-chained audit records (always on in current implementation). |
Expand Down Expand Up @@ -106,6 +108,48 @@ Shadow semantics:

Shadow mode always writes an audit record with actual decision and reason.

## Replay protection

Accepted signed intent envelopes persist their nonce into the local replay store:

```text
.proofpath/replay-store.json
```

A second attempt to use the same nonce returns:

```text
BLOCK / INTENT_REPLAYED
```

This replay state survives service restart because it is stored separately from process memory. The audit log still records both the original `ACCEPT` decision and the later replay `BLOCK` decision.

## Read replay store diagnostics

```bash
curl -sS http://127.0.0.1:8787/v1/replay-store | python3 -m json.tool
```

Example response:

```json
{
"nonces": 1,
"entries": {
"nonce_market_research_001": {
"nonce": "nonce_market_research_001",
"human_intent_id": "intent_market_research_001",
"agent_id": "agent_researcher_01",
"used_at": "2026-05-26T00:00:00Z",
"decision_hash": "sha256:...",
"status": "used"
}
}
}
```

This endpoint is local demo diagnostics only. It is not an authorization API.

## Read recent audit records

```bash
Expand Down
100 changes: 94 additions & 6 deletions examples/agent-payment-guard/payment_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Dict, Optional, Tuple

DEMO_SIGNATURE_SECRET = "proofpath-demo-secret-v0"
DEFAULT_REPLAY_STORE_PATH = Path(".proofpath/replay-store.json")


def load_json(path: Path) -> Dict[str, Any]:
Expand All @@ -28,6 +29,48 @@ def parse_ts(value: str) -> Optional[datetime]:
return None


def utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def load_replay_store(path: Path) -> Dict[str, Dict[str, Any]]:
if not path.exists():
return {}
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
return {}
return {str(nonce): record for nonce, record in payload.items() if isinstance(record, dict)}


def save_replay_store(path: Path, store: Dict[str, Dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
tmp_path.write_text(json.dumps(store, sort_keys=True, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(path)


def record_nonce(
path: Path,
nonce: str,
*,
human_intent_id: Optional[str],
agent_id: Optional[str],
decision_hash: Optional[str],
) -> None:
if not nonce:
return
store = load_replay_store(path)
store[nonce] = {
"nonce": nonce,
"human_intent_id": human_intent_id,
"agent_id": agent_id,
"used_at": utc_now(),
"decision_hash": decision_hash,
"status": "used",
}
save_replay_store(path, store)


def verify_intent_envelope(proposal: Dict[str, Any], envelope: Dict[str, Any], now: datetime) -> Tuple[bool, str]:
if envelope.get("signature_alg") != "demo-sha256-v0":
return False, "INVALID_INTENT_SIGNATURE"
Expand Down Expand Up @@ -72,8 +115,14 @@ def verify_intent_envelope(proposal: Dict[str, Any], envelope: Dict[str, Any], n
return True, "PAYMENT_WITHIN_SIGNED_INTENT_ENVELOPE"


def nonce_replayed(audit_path: Path, nonce: str) -> bool:
if not nonce or not audit_path.exists():
def nonce_replayed(audit_path: Path, nonce: str, replay_store_path: Optional[Path] = None) -> bool:
if not nonce:
return False

if replay_store_path is not None and replay_store_path.exists():
return nonce in load_replay_store(replay_store_path)

if not audit_path.exists():
return False
for line in audit_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
Expand All @@ -95,7 +144,14 @@ def intent_load_error_meta(load_error: str) -> Dict[str, Any]:
}


def decide(proposal: Dict[str, Any], policy: Dict[str, Any], envelope: Optional[Dict[str, Any]], strict_mode: bool, audit_path: Path) -> Tuple[str, str, Dict[str, Any]]:
def decide(
proposal: Dict[str, Any],
policy: Dict[str, Any],
envelope: Optional[Dict[str, Any]],
strict_mode: bool,
audit_path: Path,
replay_store_path: Optional[Path] = None,
) -> Tuple[str, str, Dict[str, Any]]:
intent_meta: Dict[str, Any] = {
"intent_verified": False,
"intent_envelope_id": None,
Expand Down Expand Up @@ -155,7 +211,7 @@ def decide(proposal: Dict[str, Any], policy: Dict[str, Any], envelope: Optional[
"intent_nonce": envelope.get("nonce"),
}
)
if nonce_replayed(audit_path, str(envelope.get("nonce", ""))):
if nonce_replayed(audit_path, str(envelope.get("nonce", "")), replay_store_path):
return "BLOCK", "INTENT_REPLAYED", intent_meta
ok, reason = verify_intent_envelope(proposal, envelope, datetime.now(timezone.utc))
if not ok:
Expand Down Expand Up @@ -191,7 +247,7 @@ def append_audit(path: Path, proposal: Dict[str, Any], decision: str, reason: st
path.parent.mkdir(parents=True, exist_ok=True)
previous_hash = get_previous_hash(path)
record = {
"ts": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"ts": utc_now(),
"surface": "agent-payment-guard",
"decision": decision,
"reason": reason,
Expand All @@ -209,12 +265,34 @@ def append_audit(path: Path, proposal: Dict[str, Any], decision: str, reason: st
handle.write(json.dumps(record, sort_keys=True) + "\n")


def persist_accepted_nonce_if_needed(
replay_store_path: Path,
proposal: Dict[str, Any],
decision: str,
intent_meta: Dict[str, Any],
decision_hash: str,
) -> None:
if decision != "ACCEPT" or not intent_meta.get("intent_verified"):
return
nonce = intent_meta.get("intent_nonce")
if not isinstance(nonce, str) or not nonce:
return
record_nonce(
replay_store_path,
nonce,
human_intent_id=intent_meta.get("intent_envelope_id"),
agent_id=proposal.get("agent_id"),
decision_hash=decision_hash,
)


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("proposal")
parser.add_argument("--policy")
parser.add_argument("--intent-envelope")
parser.add_argument("--require-intent-envelope", action="store_true")
parser.add_argument("--replay-store-path", default=str(DEFAULT_REPLAY_STORE_PATH))
args = parser.parse_args()

proposal_path = Path(args.proposal)
Expand All @@ -223,6 +301,7 @@ def main() -> int:
proposal = load_json(proposal_path)
policy = load_json(policy_path)
audit_path = Path(".proofpath/audit.jsonl")
replay_store_path = Path(args.replay_store_path)
envelope_ref = args.intent_envelope or proposal.get("intent_envelope")
envelope = None
if envelope_ref:
Expand All @@ -239,8 +318,17 @@ def main() -> int:
print(json.dumps({"decision": decision, "reason": reason}, separators=(",", ":")))
return 2

decision, reason, intent_meta = decide(proposal, policy, envelope, args.require_intent_envelope, audit_path)
decision, reason, intent_meta = decide(
proposal,
policy,
envelope,
args.require_intent_envelope,
audit_path,
replay_store_path,
)
append_audit(audit_path, proposal, decision, reason, intent_meta)
audit_hash = get_previous_hash(audit_path)
persist_accepted_nonce_if_needed(replay_store_path, proposal, decision, intent_meta, audit_hash)

print(json.dumps({"decision": decision, "reason": reason}, separators=(",", ":")))
return 0 if decision == "ACCEPT" else 2 if decision == "BLOCK" else 3
Expand Down
33 changes: 31 additions & 2 deletions examples/agent-payment-guard/payment_guard_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import parse_qs, urlparse

from payment_guard import append_audit, decide, get_previous_hash, load_json
from payment_guard import (
append_audit,
decide,
get_previous_hash,
load_json,
load_replay_store,
persist_accepted_nonce_if_needed,
)


DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8787
DEFAULT_MODE = "enforce"
DEFAULT_AUDIT_DEFAULT_LIMIT = 20
DEFAULT_AUDIT_MAX_LIMIT = 100
DEFAULT_REPLAY_STORE_PATH = ".proofpath/replay-store.json"


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -52,6 +60,10 @@ def load_config(path: Path) -> Dict[str, Any]:
if not isinstance(audit_path, str) or not audit_path:
_fail("Config 'audit_path' must be a non-empty string")

replay_store_path = raw.get("replay_store_path", DEFAULT_REPLAY_STORE_PATH)
if not isinstance(replay_store_path, str) or not replay_store_path:
_fail("Config 'replay_store_path' must be a non-empty string")

service = raw.get("service", {})
if not isinstance(service, dict):
_fail("Config 'service' must be an object")
Expand Down Expand Up @@ -87,6 +99,7 @@ def load_config(path: Path) -> Dict[str, Any]:
"require_signed_intent": require_signed_intent,
"policy_path": policy_path,
"audit_path": audit_path,
"replay_store_path": replay_store_path,
"host": host,
"port": port,
"audit_default_limit": default_limit,
Expand Down Expand Up @@ -172,6 +185,12 @@ def do_GET(self) -> None: # noqa: N802
self._send_json(HTTPStatus.OK, {"records": records, "count": len(records), "limit": limit})
return

if parsed.path == "/v1/replay-store":
replay_store_path: Path = self.server.replay_store_path # type: ignore[attr-defined]
entries = load_replay_store(replay_store_path)
self._send_json(HTTPStatus.OK, {"nonces": len(entries), "entries": entries})
return

self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"})

def do_POST(self) -> None: # noqa: N802
Expand Down Expand Up @@ -212,15 +231,18 @@ def do_POST(self) -> None: # noqa: N802
request_strict = mode == "enforce"
strict_mode = config_strict or request_strict

replay_store_path: Path = self.server.replay_store_path # type: ignore[attr-defined]
decision, reason, intent_meta = decide(
proposal=proposal,
policy=self.server.policy, # type: ignore[attr-defined]
envelope=envelope,
strict_mode=strict_mode,
audit_path=self.server.audit_path, # type: ignore[attr-defined]
replay_store_path=replay_store_path,
)
append_audit(self.server.audit_path, proposal, decision, reason, intent_meta) # type: ignore[attr-defined]
audit_hash = get_previous_hash(self.server.audit_path) # type: ignore[attr-defined]
persist_accepted_nonce_if_needed(replay_store_path, proposal, decision, intent_meta, audit_hash)
execution_allowed, would_block = execution_flags(mode, decision)

self._send_json(
Expand Down Expand Up @@ -279,13 +301,15 @@ def main() -> int:
parser.add_argument("--port", type=int, default=None)
parser.add_argument("--policy", default=None)
parser.add_argument("--audit-path", default=None)
parser.add_argument("--replay-store-path", default=None)
args = parser.parse_args()

# Precedence: hardcoded defaults < config file < explicit CLI flags
host = DEFAULT_HOST
port = DEFAULT_PORT
policy_path = "examples/agent-payment-guard/payment_policy.json"
audit_path_str = ".proofpath/audit.jsonl"
replay_store_path_str = DEFAULT_REPLAY_STORE_PATH
config_mode = DEFAULT_MODE
require_signed_intent = False
audit_default_limit = DEFAULT_AUDIT_DEFAULT_LIMIT
Expand All @@ -297,6 +321,7 @@ def main() -> int:
port = cfg["port"]
policy_path = cfg["policy_path"]
audit_path_str = cfg["audit_path"]
replay_store_path_str = cfg["replay_store_path"]
config_mode = cfg["mode"]
require_signed_intent = cfg["require_signed_intent"]
audit_default_limit = cfg["audit_default_limit"]
Expand All @@ -311,11 +336,14 @@ def main() -> int:
policy_path = args.policy
if args.audit_path is not None:
audit_path_str = args.audit_path
if args.replay_store_path is not None:
replay_store_path_str = args.replay_store_path

policy = load_json(Path(policy_path))
server = ThreadingHTTPServer((host, port), PaymentGuardServiceHandler)
server.policy = policy
server.audit_path = Path(audit_path_str)
server.replay_store_path = Path(replay_store_path_str)
server.config_mode = config_mode
server.require_signed_intent = require_signed_intent
server.audit_default_limit = audit_default_limit
Expand All @@ -325,6 +353,7 @@ def main() -> int:
if args.config:
print(f" config: {args.config}")
print(f" mode: {config_mode} | require_signed_intent: {require_signed_intent}")
print(f" replay_store_path: {replay_store_path_str}")
try:
server.serve_forever()
except KeyboardInterrupt:
Expand All @@ -335,4 +364,4 @@ def main() -> int:


if __name__ == "__main__":
raise SystemExit(main())
raise SystemExit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"require_signed_intent": true,
"policy_path": "examples/agent-payment-guard/payment_policy.json",
"audit_path": ".proofpath/audit.jsonl",
"replay_store_path": ".proofpath/replay-store.json",
"service": {
"host": "127.0.0.1",
"port": 8787
Expand Down
Loading
Loading