diff --git a/docs/agent-payment-guard-service.md b/docs/agent-payment-guard-service.md index 2e367f0..201b86c 100644 --- a/docs/agent-payment-guard-service.md +++ b/docs/agent-payment-guard-service.md @@ -17,7 +17,7 @@ python3 examples/agent-payment-guard/payment_guard_service.py \ python3 examples/agent-payment-guard/payment_guard_service.py --host 127.0.0.1 --port 8787 ``` -CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the corresponding config file values when both are provided. +CLI flags (`--host`, `--port`, `--policy`, `--audit-path`, `--replay-store-path`) override the corresponding config file values when both are provided. ## Config file reference @@ -29,6 +29,7 @@ CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the correspo "require_signed_intent": true, "policy_path": "examples/agent-payment-guard/payment_policy.json", "audit_path": ".proofpath/audit.jsonl", + "replay_store_path": ".proofpath/replay-store.json", "service": { "host": "127.0.0.1", "port": 8787 @@ -47,6 +48,7 @@ CLI flags (`--host`, `--port`, `--policy`, `--audit-path`) override the correspo | `require_signed_intent` | If `true`, requests without an `intent_envelope` are treated as strict and return `BLOCK / MISSING_INTENT_ENVELOPE`. Merged with per-request strictness; strictest wins. | | `policy_path` | Path to the payment policy JSON file. | | `audit_path` | Path to the JSONL audit log. | +| `replay_store_path` | Path to the local JSON replay store used for signed intent nonce replay protection. | | `service.host` | Bind address. | | `service.port` | Bind port. | | `audit.hash_chain` | Enables hash-chained audit records (always on in current implementation). | @@ -106,6 +108,48 @@ Shadow semantics: Shadow mode always writes an audit record with actual decision and reason. +## Replay protection + +Accepted signed intent envelopes persist their nonce into the local replay store: + +```text +.proofpath/replay-store.json +``` + +A second attempt to use the same nonce returns: + +```text +BLOCK / INTENT_REPLAYED +``` + +This replay state survives service restart because it is stored separately from process memory. The audit log still records both the original `ACCEPT` decision and the later replay `BLOCK` decision. + +## Read replay store diagnostics + +```bash +curl -sS http://127.0.0.1:8787/v1/replay-store | python3 -m json.tool +``` + +Example response: + +```json +{ + "nonces": 1, + "entries": { + "nonce_market_research_001": { + "nonce": "nonce_market_research_001", + "human_intent_id": "intent_market_research_001", + "agent_id": "agent_researcher_01", + "used_at": "2026-05-26T00:00:00Z", + "decision_hash": "sha256:...", + "status": "used" + } + } +} +``` + +This endpoint is local demo diagnostics only. It is not an authorization API. + ## Read recent audit records ```bash diff --git a/examples/agent-payment-guard/payment_guard.py b/examples/agent-payment-guard/payment_guard.py index e841ff4..e316a9e 100755 --- a/examples/agent-payment-guard/payment_guard.py +++ b/examples/agent-payment-guard/payment_guard.py @@ -11,6 +11,7 @@ from typing import Any, Dict, Optional, Tuple DEMO_SIGNATURE_SECRET = "proofpath-demo-secret-v0" +DEFAULT_REPLAY_STORE_PATH = Path(".proofpath/replay-store.json") def load_json(path: Path) -> Dict[str, Any]: @@ -28,6 +29,48 @@ def parse_ts(value: str) -> Optional[datetime]: return None +def utc_now() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def load_replay_store(path: Path) -> Dict[str, Dict[str, Any]]: + if not path.exists(): + return {} + payload = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(payload, dict): + return {} + return {str(nonce): record for nonce, record in payload.items() if isinstance(record, dict)} + + +def save_replay_store(path: Path, store: Dict[str, Dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_suffix(path.suffix + ".tmp") + tmp_path.write_text(json.dumps(store, sort_keys=True, indent=2) + "\n", encoding="utf-8") + tmp_path.replace(path) + + +def record_nonce( + path: Path, + nonce: str, + *, + human_intent_id: Optional[str], + agent_id: Optional[str], + decision_hash: Optional[str], +) -> None: + if not nonce: + return + store = load_replay_store(path) + store[nonce] = { + "nonce": nonce, + "human_intent_id": human_intent_id, + "agent_id": agent_id, + "used_at": utc_now(), + "decision_hash": decision_hash, + "status": "used", + } + save_replay_store(path, store) + + def verify_intent_envelope(proposal: Dict[str, Any], envelope: Dict[str, Any], now: datetime) -> Tuple[bool, str]: if envelope.get("signature_alg") != "demo-sha256-v0": return False, "INVALID_INTENT_SIGNATURE" @@ -72,8 +115,14 @@ def verify_intent_envelope(proposal: Dict[str, Any], envelope: Dict[str, Any], n return True, "PAYMENT_WITHIN_SIGNED_INTENT_ENVELOPE" -def nonce_replayed(audit_path: Path, nonce: str) -> bool: - if not nonce or not audit_path.exists(): +def nonce_replayed(audit_path: Path, nonce: str, replay_store_path: Optional[Path] = None) -> bool: + if not nonce: + return False + + if replay_store_path is not None and replay_store_path.exists(): + return nonce in load_replay_store(replay_store_path) + + if not audit_path.exists(): return False for line in audit_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): @@ -95,7 +144,14 @@ def intent_load_error_meta(load_error: str) -> Dict[str, Any]: } -def decide(proposal: Dict[str, Any], policy: Dict[str, Any], envelope: Optional[Dict[str, Any]], strict_mode: bool, audit_path: Path) -> Tuple[str, str, Dict[str, Any]]: +def decide( + proposal: Dict[str, Any], + policy: Dict[str, Any], + envelope: Optional[Dict[str, Any]], + strict_mode: bool, + audit_path: Path, + replay_store_path: Optional[Path] = None, +) -> Tuple[str, str, Dict[str, Any]]: intent_meta: Dict[str, Any] = { "intent_verified": False, "intent_envelope_id": None, @@ -155,7 +211,7 @@ def decide(proposal: Dict[str, Any], policy: Dict[str, Any], envelope: Optional[ "intent_nonce": envelope.get("nonce"), } ) - if nonce_replayed(audit_path, str(envelope.get("nonce", ""))): + if nonce_replayed(audit_path, str(envelope.get("nonce", "")), replay_store_path): return "BLOCK", "INTENT_REPLAYED", intent_meta ok, reason = verify_intent_envelope(proposal, envelope, datetime.now(timezone.utc)) if not ok: @@ -191,7 +247,7 @@ def append_audit(path: Path, proposal: Dict[str, Any], decision: str, reason: st path.parent.mkdir(parents=True, exist_ok=True) previous_hash = get_previous_hash(path) record = { - "ts": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "ts": utc_now(), "surface": "agent-payment-guard", "decision": decision, "reason": reason, @@ -209,12 +265,34 @@ def append_audit(path: Path, proposal: Dict[str, Any], decision: str, reason: st handle.write(json.dumps(record, sort_keys=True) + "\n") +def persist_accepted_nonce_if_needed( + replay_store_path: Path, + proposal: Dict[str, Any], + decision: str, + intent_meta: Dict[str, Any], + decision_hash: str, +) -> None: + if decision != "ACCEPT" or not intent_meta.get("intent_verified"): + return + nonce = intent_meta.get("intent_nonce") + if not isinstance(nonce, str) or not nonce: + return + record_nonce( + replay_store_path, + nonce, + human_intent_id=intent_meta.get("intent_envelope_id"), + agent_id=proposal.get("agent_id"), + decision_hash=decision_hash, + ) + + def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("proposal") parser.add_argument("--policy") parser.add_argument("--intent-envelope") parser.add_argument("--require-intent-envelope", action="store_true") + parser.add_argument("--replay-store-path", default=str(DEFAULT_REPLAY_STORE_PATH)) args = parser.parse_args() proposal_path = Path(args.proposal) @@ -223,6 +301,7 @@ def main() -> int: proposal = load_json(proposal_path) policy = load_json(policy_path) audit_path = Path(".proofpath/audit.jsonl") + replay_store_path = Path(args.replay_store_path) envelope_ref = args.intent_envelope or proposal.get("intent_envelope") envelope = None if envelope_ref: @@ -239,8 +318,17 @@ def main() -> int: print(json.dumps({"decision": decision, "reason": reason}, separators=(",", ":"))) return 2 - decision, reason, intent_meta = decide(proposal, policy, envelope, args.require_intent_envelope, audit_path) + decision, reason, intent_meta = decide( + proposal, + policy, + envelope, + args.require_intent_envelope, + audit_path, + replay_store_path, + ) append_audit(audit_path, proposal, decision, reason, intent_meta) + audit_hash = get_previous_hash(audit_path) + persist_accepted_nonce_if_needed(replay_store_path, proposal, decision, intent_meta, audit_hash) print(json.dumps({"decision": decision, "reason": reason}, separators=(",", ":"))) return 0 if decision == "ACCEPT" else 2 if decision == "BLOCK" else 3 diff --git a/examples/agent-payment-guard/payment_guard_service.py b/examples/agent-payment-guard/payment_guard_service.py index b049f83..86d6ad4 100644 --- a/examples/agent-payment-guard/payment_guard_service.py +++ b/examples/agent-payment-guard/payment_guard_service.py @@ -10,7 +10,14 @@ from typing import Any, Dict, List, Optional, Tuple from urllib.parse import parse_qs, urlparse -from payment_guard import append_audit, decide, get_previous_hash, load_json +from payment_guard import ( + append_audit, + decide, + get_previous_hash, + load_json, + load_replay_store, + persist_accepted_nonce_if_needed, +) DEFAULT_HOST = "127.0.0.1" @@ -18,6 +25,7 @@ DEFAULT_MODE = "enforce" DEFAULT_AUDIT_DEFAULT_LIMIT = 20 DEFAULT_AUDIT_MAX_LIMIT = 100 +DEFAULT_REPLAY_STORE_PATH = ".proofpath/replay-store.json" # --------------------------------------------------------------------------- @@ -52,6 +60,10 @@ def load_config(path: Path) -> Dict[str, Any]: if not isinstance(audit_path, str) or not audit_path: _fail("Config 'audit_path' must be a non-empty string") + replay_store_path = raw.get("replay_store_path", DEFAULT_REPLAY_STORE_PATH) + if not isinstance(replay_store_path, str) or not replay_store_path: + _fail("Config 'replay_store_path' must be a non-empty string") + service = raw.get("service", {}) if not isinstance(service, dict): _fail("Config 'service' must be an object") @@ -87,6 +99,7 @@ def load_config(path: Path) -> Dict[str, Any]: "require_signed_intent": require_signed_intent, "policy_path": policy_path, "audit_path": audit_path, + "replay_store_path": replay_store_path, "host": host, "port": port, "audit_default_limit": default_limit, @@ -172,6 +185,12 @@ def do_GET(self) -> None: # noqa: N802 self._send_json(HTTPStatus.OK, {"records": records, "count": len(records), "limit": limit}) return + if parsed.path == "/v1/replay-store": + replay_store_path: Path = self.server.replay_store_path # type: ignore[attr-defined] + entries = load_replay_store(replay_store_path) + self._send_json(HTTPStatus.OK, {"nonces": len(entries), "entries": entries}) + return + self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"}) def do_POST(self) -> None: # noqa: N802 @@ -212,15 +231,18 @@ def do_POST(self) -> None: # noqa: N802 request_strict = mode == "enforce" strict_mode = config_strict or request_strict + replay_store_path: Path = self.server.replay_store_path # type: ignore[attr-defined] decision, reason, intent_meta = decide( proposal=proposal, policy=self.server.policy, # type: ignore[attr-defined] envelope=envelope, strict_mode=strict_mode, audit_path=self.server.audit_path, # type: ignore[attr-defined] + replay_store_path=replay_store_path, ) append_audit(self.server.audit_path, proposal, decision, reason, intent_meta) # type: ignore[attr-defined] audit_hash = get_previous_hash(self.server.audit_path) # type: ignore[attr-defined] + persist_accepted_nonce_if_needed(replay_store_path, proposal, decision, intent_meta, audit_hash) execution_allowed, would_block = execution_flags(mode, decision) self._send_json( @@ -279,6 +301,7 @@ def main() -> int: parser.add_argument("--port", type=int, default=None) parser.add_argument("--policy", default=None) parser.add_argument("--audit-path", default=None) + parser.add_argument("--replay-store-path", default=None) args = parser.parse_args() # Precedence: hardcoded defaults < config file < explicit CLI flags @@ -286,6 +309,7 @@ def main() -> int: port = DEFAULT_PORT policy_path = "examples/agent-payment-guard/payment_policy.json" audit_path_str = ".proofpath/audit.jsonl" + replay_store_path_str = DEFAULT_REPLAY_STORE_PATH config_mode = DEFAULT_MODE require_signed_intent = False audit_default_limit = DEFAULT_AUDIT_DEFAULT_LIMIT @@ -297,6 +321,7 @@ def main() -> int: port = cfg["port"] policy_path = cfg["policy_path"] audit_path_str = cfg["audit_path"] + replay_store_path_str = cfg["replay_store_path"] config_mode = cfg["mode"] require_signed_intent = cfg["require_signed_intent"] audit_default_limit = cfg["audit_default_limit"] @@ -311,11 +336,14 @@ def main() -> int: policy_path = args.policy if args.audit_path is not None: audit_path_str = args.audit_path + if args.replay_store_path is not None: + replay_store_path_str = args.replay_store_path policy = load_json(Path(policy_path)) server = ThreadingHTTPServer((host, port), PaymentGuardServiceHandler) server.policy = policy server.audit_path = Path(audit_path_str) + server.replay_store_path = Path(replay_store_path_str) server.config_mode = config_mode server.require_signed_intent = require_signed_intent server.audit_default_limit = audit_default_limit @@ -325,6 +353,7 @@ def main() -> int: if args.config: print(f" config: {args.config}") print(f" mode: {config_mode} | require_signed_intent: {require_signed_intent}") + print(f" replay_store_path: {replay_store_path_str}") try: server.serve_forever() except KeyboardInterrupt: @@ -335,4 +364,4 @@ def main() -> int: if __name__ == "__main__": - raise SystemExit(main()) + raise SystemExit(main()) \ No newline at end of file diff --git a/examples/agent-payment-guard/payment_guard_service_config.json b/examples/agent-payment-guard/payment_guard_service_config.json index 676f90f..9d660cd 100644 --- a/examples/agent-payment-guard/payment_guard_service_config.json +++ b/examples/agent-payment-guard/payment_guard_service_config.json @@ -3,6 +3,7 @@ "require_signed_intent": true, "policy_path": "examples/agent-payment-guard/payment_policy.json", "audit_path": ".proofpath/audit.jsonl", + "replay_store_path": ".proofpath/replay-store.json", "service": { "host": "127.0.0.1", "port": 8787 diff --git a/examples/agent-payment-guard/run_demo_check.sh b/examples/agent-payment-guard/run_demo_check.sh index 1e92e10..783d8c2 100755 --- a/examples/agent-payment-guard/run_demo_check.sh +++ b/examples/agent-payment-guard/run_demo_check.sh @@ -3,7 +3,7 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" cd "$ROOT_DIR" -rm -f .proofpath/audit.jsonl +rm -f .proofpath/audit.jsonl .proofpath/replay-store.json check_case() { local expected_decision="$1" expected_reason="$2" expected_status="$3" @@ -45,6 +45,7 @@ check_case BLOCK MISSING_INTENT_ENVELOPE 2 "$BASE_PROPOSAL" --intent-envelope ex check_case BLOCK INVALID_INTENT_SIGNATURE 2 "$BASE_PROPOSAL" --intent-envelope examples/agent-payment-guard/intent_envelopes/intent.malformed.json [[ -f .proofpath/audit.jsonl ]] +[[ -f .proofpath/replay-store.json ]] python3 - <<'PY' import json from pathlib import Path @@ -56,7 +57,10 @@ assert records[-2]["reason"] == "MISSING_INTENT_ENVELOPE" assert records[-2]["intent_load_error"] == "missing" assert records[-1]["reason"] == "INVALID_INTENT_SIGNATURE" assert records[-1]["intent_load_error"] == "malformed" +store = json.loads(Path('.proofpath/replay-store.json').read_text(encoding='utf-8')) +assert "nonce_market_research_001" in store, store +assert len(store) == 1, store PY echo "Agent Payment Guard demo check passed." -python3 scripts/verify_audit_log.py .proofpath/audit.jsonl \ No newline at end of file +python3 scripts/verify_audit_log.py .proofpath/audit.jsonl diff --git a/examples/agent-payment-guard/run_service_check.sh b/examples/agent-payment-guard/run_service_check.sh index 5aa48ec..b77b621 100644 --- a/examples/agent-payment-guard/run_service_check.sh +++ b/examples/agent-payment-guard/run_service_check.sh @@ -4,44 +4,90 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" cd "$ROOT_DIR" -rm -f .proofpath/audit.jsonl +rm -f .proofpath/audit.jsonl .proofpath/replay-store.json HOST="127.0.0.1" PORT="18787" SERVICE="examples/agent-payment-guard/payment_guard_service.py" CONFIG="examples/agent-payment-guard/payment_guard_service_config.json" +VALID_INTENT_TEMPLATE="examples/agent-payment-guard/intent_envelopes/intent.valid.json" + +make_intent() { + local nonce="$1" + python3 - "$VALID_INTENT_TEMPLATE" "$nonce" <<'PY' +import json +import sys +from hashlib import sha256 + +secret = "proofpath-demo-secret-v0" +path, nonce = sys.argv[1], sys.argv[2] +intent = json.load(open(path, encoding="utf-8")) +intent["nonce"] = nonce +intent.pop("signature", None) +canonical = json.dumps(intent, sort_keys=True, separators=(",", ":"), ensure_ascii=False) +intent["signature"] = sha256((canonical + secret).encode("utf-8")).hexdigest() +print(json.dumps(intent, separators=(",", ":"))) +PY +} + +start_service() { + python3 "$SERVICE" \ + --config "$CONFIG" \ + --port "$PORT" \ + >/tmp/payment_guard_service.log 2>&1 & + SERVICE_PID=$! + for _ in $(seq 1 50); do + if curl -fsS "http://$HOST:$PORT/v1/health" >/dev/null 2>&1; then + return 0 + fi + sleep 0.1 + done + echo "FAIL: service did not start" >&2 + cat /tmp/payment_guard_service.log >&2 || true + return 1 +} + +stop_service() { + if [ -n "${SERVICE_PID:-}" ]; then + kill "$SERVICE_PID" >/dev/null 2>&1 || true + wait "$SERVICE_PID" >/dev/null 2>&1 || true + fi +} -# Start service with JSON config; --port overrides config port to avoid conflicts during tests -python3 "$SERVICE" \ - --config "$CONFIG" \ - --port "$PORT" \ - >/tmp/payment_guard_service.log 2>&1 & -SERVICE_PID=$! cleanup() { - kill "$SERVICE_PID" >/dev/null 2>&1 || true + stop_service } trap cleanup EXIT -for _ in $(seq 1 50); do - if curl -fsS "http://$HOST:$PORT/v1/health" >/dev/null 2>&1; then - break - fi - sleep 0.1 -done +start_service # --- health --- curl -fsS "http://$HOST:$PORT/v1/health" \ | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["status"]=="ok"; assert r["surface"]=="agent-payment-guard-service"; assert r["version"]=="0.1"' -VALID_INTENT=$(cat examples/agent-payment-guard/intent_envelopes/intent.valid.json) VALID_PROPOSAL=$(cat examples/agent-payment-guard/payment_proposal.valid_micro_payment.json) +VALID_INTENT_ACCEPT=$(make_intent nonce_service_accept_001) +VALID_INTENT_CONFIG_MODE=$(make_intent nonce_service_config_mode_001) # --- enforce ACCEPT (with intent envelope to satisfy require_signed_intent=true from config) --- curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ -H 'content-type: application/json' \ - -d "{\"mode\":\"enforce\",\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT}" \ + -d "{\"mode\":\"enforce\",\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT_ACCEPT}" \ | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="enforce"; assert r["decision"]=="ACCEPT"; assert r["execution_allowed"] is True; assert r["would_block"] is False; assert r["audit_hash"].startswith("sha256:")' +# --- replay store contains accepted nonce --- +curl -fsS "http://$HOST:$PORT/v1/replay-store" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["nonces"]==1, r; assert "nonce_service_accept_001" in r["entries"], r' + +# --- replaying the same envelope is blocked and does not grow replay store --- +curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ + -H 'content-type: application/json' \ + -d "{\"mode\":\"enforce\",\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT_ACCEPT}" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="enforce"; assert r["decision"]=="BLOCK", r; assert r["reason"]=="INTENT_REPLAYED", r; assert r["execution_allowed"] is False; assert r["would_block"] is True; assert r["audit_hash"].startswith("sha256:")' + +curl -fsS "http://$HOST:$PORT/v1/replay-store" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["nonces"]==1, r; assert "nonce_service_accept_001" in r["entries"], r' + BAD_PROPOSAL=$(cat examples/agent-payment-guard/payment_proposal.asset_not_allowed.json) # --- shadow BLOCK --- @@ -67,18 +113,18 @@ curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ # --- request without mode uses config.mode (config has mode=enforce) --- curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ -H 'content-type: application/json' \ - -d "{\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT}" \ - | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="enforce", f"expected config mode enforce, got {r[\"mode\"]}"; assert r["decision"]=="ACCEPT"; assert r["execution_allowed"] is True' + -d "{\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT_CONFIG_MODE}" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="enforce", ("expected config mode enforce", r); assert r["decision"]=="ACCEPT"; assert r["execution_allowed"] is True' # --- require_signed_intent=true blocks valid proposal without envelope (shadow mode) --- curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ -H 'content-type: application/json' \ -d "{\"mode\":\"shadow\",\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":null}" \ - | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["decision"]=="BLOCK", f"expected BLOCK, got {r[\"decision\"]}"; assert r["reason"]=="MISSING_INTENT_ENVELOPE", f"expected MISSING_INTENT_ENVELOPE, got {r[\"reason\"]}"; assert r["execution_allowed"] is True; assert r["would_block"] is True' + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["decision"]=="BLOCK", ("expected BLOCK", r); assert r["reason"]=="MISSING_INTENT_ENVELOPE", ("expected MISSING_INTENT_ENVELOPE", r); assert r["execution_allowed"] is True; assert r["would_block"] is True' # --- audit limit: limit=999 clamped to max (100 from config) --- curl -fsS "http://$HOST:$PORT/v1/audit/records?limit=999" \ - | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["limit"]==100, f"expected limit 100, got {r[\"limit\"]}"' + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["limit"]==100, ("expected limit 100", r)' # --- audit limit: limit=abc returns 400 with JSON error --- HTTP_CODE="$(curl -sS -o /tmp/payment_guard_bad_limit.json -w '%{http_code}' "http://$HOST:$PORT/v1/audit/records?limit=abc")" @@ -86,11 +132,18 @@ if [ "$HTTP_CODE" != "400" ]; then echo "FAIL: expected 400 for limit=abc, got HTTP $HTTP_CODE" >&2 exit 1 fi -python3 -c 'import json; r=json.load(open("/tmp/payment_guard_bad_limit.json",encoding="utf-8")); assert "error" in r, f"expected error field in response: {r}"' +python3 -c 'import json; r=json.load(open("/tmp/payment_guard_bad_limit.json",encoding="utf-8")); assert "error" in r, ("expected error field", r)' + +# --- replay store survives service restart --- +[[ -f .proofpath/replay-store.json ]] +stop_service +start_service +curl -fsS "http://$HOST:$PORT/v1/replay-store" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["nonces"]==2, r; assert "nonce_service_accept_001" in r["entries"], r; assert "nonce_service_config_mode_001" in r["entries"], r' -# --- audit records (6 total: enforce ACCEPT, shadow BLOCK, enforce HOLD, shadow HOLD, config-mode ACCEPT, require_signed_intent BLOCK) --- +# --- audit records (7 total: ACCEPT, replay BLOCK, BLOCK, HOLD, HOLD, ACCEPT, missing-envelope BLOCK) --- curl -fsS "http://$HOST:$PORT/v1/audit/records" \ - | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["count"]==6, f"expected 6 audit records, got {r[\"count\"]}"; decisions=[row["decision"] for row in r["records"]]; assert decisions==["ACCEPT","BLOCK","HOLD","HOLD","ACCEPT","BLOCK"], f"unexpected decisions: {decisions}"' + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["count"]==7, ("expected 7 audit records", r); decisions=[row["decision"] for row in r["records"]]; assert decisions==["ACCEPT","BLOCK","BLOCK","HOLD","HOLD","ACCEPT","BLOCK"], ("unexpected decisions", decisions)' # --- hash-chain verification --- python3 scripts/verify_audit_log.py .proofpath/audit.jsonl >/dev/null