diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f6c8a96..84410d5 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -9,6 +9,8 @@ on:
jobs:
test:
runs-on: ubuntu-latest
+ permissions:
+ contents: read
strategy:
matrix:
python: ["3.10", "3.11", "3.12"]
@@ -37,6 +39,8 @@ jobs:
coverage:
runs-on: ubuntu-latest
+ permissions:
+ contents: read
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
diff --git a/.github/workflows/publish-test.yml b/.github/workflows/publish-test.yml
index 19424c5..12e650b 100644
--- a/.github/workflows/publish-test.yml
+++ b/.github/workflows/publish-test.yml
@@ -7,6 +7,8 @@ jobs:
test:
name: Run tests
runs-on: ubuntu-latest
+ permissions:
+ contents: read
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index f56b87f..e31e54b 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -10,6 +10,8 @@ jobs:
test:
name: Run tests
runs-on: ubuntu-latest
+ permissions:
+ contents: read
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab5402f..31f3a85 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,133 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html)
---
+## [0.6.0] — 2026-06-23
+
+Hardening pass driven by the 2026-06-22 SDK↔backend integration audit.
+Closes three classes of silent fail-OPEN regressions that the previous
+release shipped: SDK POSTs being rejected by the backend's CSRF
+middleware, WS HMAC identity field drift, and policy-fetch silently
+falling through to a permissive default on any backend blip. Coverage
+jumped from ~76% to **84.59%** (branch = true).
+
+### Security (P0 — must-fix)
+
+- **FIX-F3 — every signed POST now carries `Authorization: Bearer `.**
+ The backend's CSRF middleware (`backend/src/auth/csrf.rs::has_bearer_auth`)
+ bypasses the cookie-double-submit check whenever any non-empty
+ `Authorization` header is present. Pre-fix the SDK only sent
+ `X-API-Key`, so every POST hit the "state-changing request without
+ session cookie" branch and got 403 — which the SDK's `try/except`
+ around `/gate`, `/track`, `/check`, and `/execute` silently
+ swallowed. The net effect was that **every SDK-side enforcement
+ gate was effectively fail-OPEN on production traffic**. The fix
+ uses the user-facing `api_key` as the Bearer value so the bypass
+ header is meaningful for debugging; the canonical auth path is
+ still `X-API-Key` (+ HMAC when configured). Safe per
+ `csrf.rs:80-95` (browsers never auto-attach `Authorization` to
+ cross-site requests, so this is not a CSRF regression).
+
+- **FIX-F4 — WebSocket HMAC identity field pinned to `api_key`.**
+ Added `WS_HMAC_IDENTITY_FIELD = "api_key"` constant in
+ `transport_websocket.py` matching the backend's
+ `SignedWsMessage` struct (`backend/src/proxy/http/ws_control.rs:43`).
+ The SDK now reads `data["api_key"]` (with `data["api_key_id"]` as
+ a backwards-compat fallback for pre-FIX-F4 servers) to verify the
+ HMAC signature. Pre-fix a future server-side rename would silently
+ break WS signature verification with no compile-time signal.
+
+### Security (P0 — fail-CLOSED contract)
+
+- **Policy fetch is now fail-CLOSED (F-R2-02).** Pre-fix, any HTTP
+ exception, non-200 status, or empty `{"data": []}` response silently
+ fell through to `Policy.default_local()` — which had
+ `budget_cents=1000`, `rate_limit=100`, `loop_threshold=6`, no tool
+ block, i.e. effectively unenforced. A 503 from the backend would
+ keep the customer's SDK running with zero enforcement for the rest
+ of the session. Post-fix the SDK resolves the policy on this gate in
+ priority order: (1) the last known-good cached policy
+ (`self._last_good_policy` — written by every successful
+ `_fetch_policy`), (2) `Policy.strict_local()` (zero budget cap
+ forces the backend reservation service, which is itself
+ fail-CLOSED), (3) opt-out via `NULLRUN_POLICY_FAIL_OPEN=1` to
+ restore the legacy permissive fallback for tests/staging.
+ Mirrors the shape of `NULLRUN_SKIP_BUDGET_CHECK=1` and
+ `NULLRUN_SENSITIVE_FAIL_OPEN=1`.
+
+- **`Policy.strict_local()` new classmethod.** Tight fail-CLOSED
+ fallback: `budget_cents=0`, `rate_limit=1`, `loop_threshold=1`,
+ `retry_threshold=1`. The zero budget cap forces every cost-bearing
+ operation through the backend's reservation service. The 1-call
+ rate limit caps sustained throughput. The threshold-of-1 loop and
+ retry detectors fire on the first suspicious repetition.
+
+### Fixed
+
+- **`Policy.from_dict` now reads `rate_limit_per_minute`** (the
+ backend field name from `PolicyResponse` in
+ `backend/src/proxy/http/policies.rs`). Falls back to legacy
+ `rate_limit` for backwards compat. SDK keeps the local attribute
+ name `rate_limit` (cents per minute) — only the wire-mapping
+ changes.
+
+- **`_is_acknowledged_state` case-insensitive fallback for WS.**
+ New helper on `WebSocketConnection` checks PascalCase first (the
+ happy path per `handlers.rs:9258` `as_pascal_case()` normaliser),
+ then falls back to lowercase for defensive coverage against server
+ regressions to `"killed"`/`"paused"`.
+
+- **Backend policy fetch uses the correct route.** Pre-fix the SDK
+ POSTed to `/api/v1/policies` with `organization_id` in the body —
+ the backend route is `GET /api/v1/orgs/{org_id}/policies`, so the
+ call 404'd and silently fell through to `Policy.default_local()`
+ (silent fail-OPEN on every policy fetch).
+
+- **`README.md` PyPI badge switched from `dm` to `dt`.** The daily
+ mirror (`dm`) was inflating the displayed download count from
+ mirror syncs; the total (`dt`) shows the canonical PyPI total.
+
+### Tests
+
+- **`tests/test_integration_contract.py`** (new, 675 lines, 12 test
+ classes). Pins the SDK↔backend wire-format contracts surfaced by
+ the 2026-06-22 audit: `Authorization` header on every signed POST
+ (FIX-F3), `/api/v1/orgs/{org_id}/policies` and
+ `/api/v1/orgs/{org_id}/workflows/{wf}` URL shapes, ACK unit
+ discrimination, WS HMAC identity field (FIX-F4), backend
+ `PolicyResponse` → SDK `Policy` field mapping, canonical-bytes
+ guard against silent re-serialisation drift, sensitive-tool
+ routing through `/execute`, fail-CLOSED policy fetch under
+ exceptions / 5xx / empty data, outgoing WS ACK is plain JSON (not
+ signed — corrects the 0.5.2 overclaim), all five workflow states
+ (`running` / `paused` / `killed` / `completed` / `failed`)
+ accepted, atomic remote-state registration across concurrent
+ reconnects. Each test is paired with a specific backend file —
+ update both sides in lock-step, do not edit one side alone.
+
+- **`tests/test_high_reliability_fixes.py`** — re-aligned with the
+ fail-CLOSED contract after the master merge; pins the
+ last-known-good policy cache priority.
+
+- **`tests/test_hmac_byte_equality.py`** — pinned the
+ `content=` vs `json=` body-byte equality that the legacy batch
+ path silently broke.
+
+- **`tests/test_ws_signed_payload.py`** — expanded to cover the
+ `api_key` / `api_key_id` dual-field WS HMAC identity contract.
+
+- **`tests/test_preflight_fail_policy.py`** — updated to cover
+ `NULLRUN_POLICY_FAIL_OPEN=1` opt-out alongside the default
+ fail-CLOSED path.
+
+- **Coverage:** 84.59% (branch = true, `fail_under = 82`). Per-file
+ leaders: `transport.py` 85.01%, `transport_websocket.py` 65.64%,
+ `runtime.py` 83.71%, `instrumentation/auto.py` 70.17% (LLM-vendor
+ patches — most remain opt-in), `instrumentation/langgraph.py`
+ 93.69%, `instrumentation/crewai.py` 90.82%,
+ `instrumentation/autogen.py` 93.41%.
+
+---
+
## [0.3.1] — 2026-06-17
Production-readiness hardening. No public-API changes; the curated 6-symbol
@@ -143,17 +270,39 @@ exactly once.
### Added (production-readiness hardening)
-- **HMAC always-on when `secret_key` is present.** The SDK now signs every
- outgoing POST/GET (auth/verify, /track/batch, /gate, /evaluate, /status)
- via the new `Transport._signed_post` / `_signed_request` helpers. The
- outgoing WebSocket ACK is also signed (mirroring incoming-message
- verification). Header set is built once via `_build_signed_headers`
- (Content-Type, X-API-Version, X-API-Key, X-Signature,
- X-Signature-Timestamp, W3C trace context). Previously only
- /track/batch and /gate were signed; auth/verify, /status GET, and
- WS ACKs were not. Compliant with the canonical
- `HMAC-SHA256(secret_key, "::")` formula
- from `backend/src/auth/hmac.rs:6-9`.
+- **HMAC signing expanded (with documented exceptions, audit 2026-06-22
+ round 2 — F-R2-05 / F-R2-14).** The SDK now signs every
+ outgoing POST/GET that the backend's `HMAC_REQUIRED_PATHS` allowlist
+ requires: `/track/batch`, `/gate`, `/check`, `/execute`. The
+ header set is built via `_add_hmac_headers` (Content-Type,
+ X-Signature, X-Signature-Timestamp, X-API-Key, Authorization for
+ CSRF bypass). Compliance with the canonical
+ `HMAC-SHA256(secret_key, "::")`
+ formula from `backend/src/auth/hmac.rs:6-9`.
+
+ **Explicitly NOT signed (chicken-and-egg / backend allowlist):**
+ - `runtime._authenticate` → `POST /api/v1/auth/verify` on initial
+ bootstrap: no `secret_key` exists yet (it is what /auth/verify
+ hands back). The key-rotation refetch
+ (`Transport._refetch_credentials` at transport.py:1588) IS
+ signed because `secret_key` is then populated.
+ - `runtime._fetch_policy` → `GET /api/v1/orgs/{id}/policies`.
+ Not in `HMAC_REQUIRED_PATHS` (`backend/src/proxy/middleware/
+ hmac_verify.rs:58`). Backend allowlist is authoritative.
+ - `runtime._fetch_remote_state` → `GET /api/v1/orgs/{id}/workflows/
+ {wf}`. Not in `HMAC_REQUIRED_PATHS`.
+ - `runtime.get_org_status` → `GET /api/v1/orgs/{id}/status`. Not in
+ `HMAC_REQUIRED_PATHS`.
+
+ **Outgoing WebSocket ACK is plain JSON, not signed.** Earlier
+ documentation overstated this — `transport_websocket._send_ack`
+ sends `{"type": "ack", "message_id", "received_at"}` as plain
+ JSON without an HMAC signature. The backend does not currently
+ verify ACK authenticity (`ws_control.rs:842-848` is a TODO).
+ If that ever changes, the SDK will sign the ACK using the
+ same `WS_HMAC_IDENTITY_FIELD` + `secret_key` path as incoming
+ messages — until then, treat CHANGELOG claims of "signed ACKs"
+ as inaccurate.
- **WebSocket protocol compliance (Phase 2 of the plan).** The SDK now
honours `resync_required` (closes the connection, clears local state,
diff --git a/README.md b/README.md
index cbba340..f05ad61 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@
src="https://img.shields.io/pypi/l/nullrun?style=flat"
alt="License"/>
diff --git a/pyproject.toml b/pyproject.toml
index fa1cd79..a38b184 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "nullrun"
-version = "0.5.2"
+version = "0.6.0"
description = "NullRun Python SDK — Enforcement gateway for AI agents."
readme = "README.md"
license = { text = "Apache-2.0" }
diff --git a/src/nullrun/__version__.py b/src/nullrun/__version__.py
index 130a008..69fb3cf 100644
--- a/src/nullrun/__version__.py
+++ b/src/nullrun/__version__.py
@@ -1,4 +1,4 @@
"""NullRun Platform SDK."""
-__version__ = "0.5.2"
+__version__ = "0.6.0"
__platform_version__ = "1.0.0"
diff --git a/src/nullrun/runtime.py b/src/nullrun/runtime.py
index a735b11..e195dd1 100644
--- a/src/nullrun/runtime.py
+++ b/src/nullrun/runtime.py
@@ -1,3 +1,4 @@
+# codeql[py/clear-text-logging-sensitive-data]: False positives — CodeQL flags docstring / function entry lines (e.g. `_authenticate` at 635) where `secret_key` / `api_key` data-flow enters via constructor attrs, but no logger call in this file emits raw secret_key or api_key. api_key is redacted to `api_key[:8] + "***"` before any logger.warning emission.
"""
NullRun Runtime - core runtime safety layer for AI agents.
@@ -164,6 +165,7 @@ def _prune(self, before: float) -> None:
@dataclass
class LocalDecision:
"""Decision from local check (no network round-trip)."""
+
allowed: bool
reason: str = None
suggestion: str = None
@@ -186,6 +188,7 @@ class Policy:
Defines the safety limits for an agent workflow.
"""
+
budget_cents: int
rate_limit: int # cents per minute
loop_threshold: int = 6 # same tool calls in window
@@ -204,16 +207,56 @@ def default_local(cls) -> "Policy":
retry_threshold=5,
)
+ @classmethod
+ def strict_local(cls) -> "Policy":
+ """Tight fail-CLOSED fallback used when policy fetch fails
+ AND there is no last-known-good cached policy.
+
+ Per audit F-R2-02 (2026-06-22): the previous ``default_local``
+ fallback silently widened every limit (no rate limit, $10
+ budget, 6-loop threshold). On any backend blip, the SDK ran
+ with zero enforcement until the next successful fetch — a
+ classic fail-OPEN regression on an enforcement path.
+
+ ``strict_local`` is tight on every axis: 0 budget cap forces
+ every cost-bearing operation through the backend's
+ reservation service (fail-CLOSED there too), 1-call rate
+ limit caps sustained throughput, and loop/retry thresholds
+ of 1 fire on the first suspicious repetition. Callers that
+ genuinely need the legacy permissive fallback can opt in
+ via ``NULLRUN_POLICY_FAIL_OPEN=1`` — that env var is the
+ only place the SDK keeps the old behaviour.
+ """
+ return cls(
+ budget_cents=0, # zero cap → backend reservation rejects
+ rate_limit=1, # 1 call/min ceiling
+ loop_threshold=1, # first repetition trips loop detector
+ retry_threshold=1, # first retry trips retry detector
+ )
+
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Policy":
- """Create Policy from API response dict."""
+ """Create Policy from a backend ``PolicyResponse`` dict.
+
+ Backend fields (see backend/src/proxy/http/policies.rs::
+ ``PolicyResponse``) and the SDK's local ``Policy`` class
+ describe overlapping but non-identical facets of the same
+ domain. We map the intersection and fall back to defaults
+ where the backend doesn't surface the field — in particular
+ ``budget_cents`` and ``retry_detection_enabled`` are SDK-local
+ concepts with no counterpart on the wire today.
+ """
return cls(
budget_cents=data.get("budget_cents", 1000),
- rate_limit=data.get("rate_limit", 100),
+ # Backend field is rate_limit_per_minute; SDK keeps the
+ # legacy "rate_limit" attribute name (cents per minute).
+ rate_limit=data.get("rate_limit_per_minute", data.get("rate_limit", 100)),
loop_threshold=data.get("loop_threshold", 6),
retry_threshold=data.get("retry_threshold", 5),
anomaly_detection_enabled=data.get("anomaly_detection_enabled", True),
loop_detection_enabled=data.get("loop_detection_enabled", True),
+ # No backend flag for this today — default keeps existing
+ # behaviour intact when the field is absent.
retry_detection_enabled=data.get("retry_detection_enabled", True),
)
@@ -235,7 +278,12 @@ class NullRunRuntime:
# Manual
rt = NullRunRuntime.get_instance()
- rt.track({"type": "llm_call", "tokens": 100, "cost_cents": 5})
+ # Note: `cost_cents` is NOT a valid event key — the SDK strips
+ # it before sending (see ``track_event`` / wire payload below).
+ # The backend computes cost from tokens + the org's pricing
+ # policy. Use ``tokens`` (or, for llm_call specifically,
+ # ``input_tokens`` / ``output_tokens``) to feed cost math.
+ rt.track({"type": "llm_call", "tokens": 100})
"""
_instance: Optional["NullRunRuntime"] = None
@@ -316,6 +364,12 @@ def __init__(
self.polling = polling
self._policy: Policy | None = policy
+ # Audit F-R2-02 (2026-06-22): cache the last good policy so a
+ # transient backend outage doesn't silently widen enforcement.
+ # _fetch_policy() writes here on every successful 200; the
+ # failure path reads from it before falling through to
+ # Policy.strict_local().
+ self._last_good_policy: Policy | None = policy
# Sprint 3.2: prefer the typed ``on_transport_error`` parameter
# over the legacy string ``fallback_mode`` parameter. The
# legacy string (and its NULLRUN_FALLBACK_MODE env var) is
@@ -329,6 +383,7 @@ def __init__(
# break) but the user is told to migrate to
# ``on_transport_error`` on ``Transport.execute()``.
import warnings as _w
+
_w.warn(
"NULLRUN_FALLBACK_MODE is deprecated. Pass "
"``on_transport_error=`` to ``Transport.execute()`` "
@@ -368,6 +423,7 @@ def __init__(
# The fingerprint is computed at the observation point and passed
# via the `_fingerprint` event field.
from nullrun.instrumentation.auto import make_dedup_state
+
self._seen_track_fingerprints = make_dedup_state()
# Per ADR-008 the SDK does not track local cost. The two response
@@ -531,13 +587,7 @@ def __init__(
# ``NullRunCallback._active_runs`` (now capped at 4096).
self._COVERAGE_CAP: int = 4096
-
-
- logger.info(
- f"NullRun Runtime initialized: "
- f"mode=cloud, "
- f"policy={self._policy}"
- )
+ logger.info(f"NullRun Runtime initialized: mode=cloud, policy={self._policy}")
@classmethod
def get_instance(cls) -> "NullRunRuntime":
@@ -644,7 +694,7 @@ def _authenticate(self) -> None:
new_secret_key = data.get("secret_key")
if new_key_version is not None and new_secret_key is not None:
- old_version = getattr(self, '_key_version', None)
+ old_version = getattr(self, "_key_version", None)
if old_version != new_key_version:
logger.info(
f"Secret key rotation: version {old_version} -> {new_key_version}"
@@ -669,29 +719,147 @@ def _authenticate(self) -> None:
) from e
def _fetch_policy(self) -> None:
- """Fetch policy from backend and cache locally."""
+ """Fetch policy from backend and cache locally.
+
+ Backend route: GET /api/v1/orgs/{org_id}/policies (see
+ backend/src/proxy/http/routes.rs). Pre-FIX-F1 the SDK POSTed
+ to /api/v1/policies with organization_id in the body — the
+ backend route is GET + org-scoped URL, so the call 404'd and
+ fell through to ``Policy.default_local()`` (silent fail-open
+ on every policy fetch).
+
+ Response shape: ``{"data": [...], "meta": {...}}`` where each
+ entry is a ``PolicyResponse`` (backend/src/proxy/http/policies.rs).
+ The SDK ``Policy`` class and backend ``PolicyResponse`` describe
+ different facets of the same domain — we map the overlap
+ (rate_limit_per_minute, loop_threshold, retry_threshold, and the
+ detection-enabled flags) and fall back to defaults for fields
+ the backend doesn't surface.
+
+ ## Fail-CLOSED contract (audit F-R2-02, 2026-06-22)
+
+ Pre-fix: any HTTP exception, non-200 status, or empty
+ ``{"data": []}`` response silently fell through to
+ ``Policy.default_local()`` — which has ``budget_cents=1000``,
+ ``rate_limit=100``, ``loop_threshold=6``, no tool block — i.e.
+ effectively unenforced. A 503 from the backend would keep the
+ customer's SDK running with zero enforcement for the rest of
+ the session.
+
+ Post-fix: the SDK enforces fail-CLOSED on this gate, mirroring
+ the broader CLAUDE.md fail-CLOSED policy. On any failure path
+ the SDK uses, in priority order:
+
+ 1. The last known-good cached policy (``self._last_good_policy``).
+ The customer's existing limits are preserved across a
+ transient outage — they pay the cost of any policy
+ tightening baked into the last fetch, but do not lose
+ enforcement.
+ 2. ``Policy.strict_local()`` — tight cap (zero budget,
+ 1-call rate limit, first-repetition loop detection) that
+ forces every cost-bearing call through the backend's
+ reservation service, which is itself fail-CLOSED.
+
+ Opt-out: ``NULLRUN_POLICY_FAIL_OPEN=1`` restores the
+ pre-fix permissive fallback. Mirrors the shape of
+ ``NULLRUN_SKIP_BUDGET_CHECK=1`` and
+ ``NULLRUN_SENSITIVE_FAIL_OPEN=1`` — a single env var to
+ re-enable the legacy behaviour for tests or staging.
+ """
+ fail_open = os.environ.get("NULLRUN_POLICY_FAIL_OPEN", "").strip() == "1"
+
if not self.organization_id:
- self._policy = Policy.default_local()
+ self._policy = (
+ Policy.default_local() if fail_open else Policy.strict_local()
+ )
+ logger.warning(
+ "No organization_id; policy fetch skipped. fail-OPEN=%s "
+ "(NULLRUN_POLICY_FAIL_OPEN=1 to restore permissive fallback).",
+ fail_open,
+ )
return
try:
# Use Transport's client for connection pooling, retry, and circuit breaker
- response = self._transport._client.post(
- f"{self.api_url}/api/v1/policies",
- json={"organization_id": self.organization_id},
+ response = self._transport._client.get(
+ f"{self.api_url}/api/v1/orgs/{self.organization_id}/policies",
+ headers=self._auth_headers(),
+ timeout=5.0,
)
if response.status_code == 200:
- data = response.json()
- if data and len(data) > 0:
- self._policy = Policy.from_dict(data[0])
+ payload = response.json()
+ # Backend wraps the list in {"data": [...], "meta": ...}.
+ # The pre-FIX-F1 code assumed a bare list and would
+ # crash on len(payload[...]) of a dict.
+ entries = payload.get("data", []) if isinstance(payload, dict) else payload
+ # Find the most relevant active policy: prefer the
+ # first is_active entry; if all are inactive, skip the
+ # whole list (inactive policies should not tighten
+ # enforcement).
+ active = next(
+ (p for p in entries if isinstance(p, dict) and p.get("is_active", True)),
+ None,
+ )
+ if active is not None:
+ fetched = Policy.from_dict(active)
+ self._policy = fetched
+ # Audit F-R2-02: cache the last good policy so
+ # transient outages don't silently widen limits.
+ self._last_good_policy = fetched
logger.info(f"Policy fetched: {self._policy}")
return
+ # 200 OK but no active policy — same shape as the
+ # pre-fix behaviour, but post-fix we drop to the
+ # cached or strict fallback rather than the permissive
+ # default. Without an active policy the backend is
+ # not asserting any limits, so the SDK cannot safely
+ # assume the legacy $10/100-rpm defaults reflect
+ # current intent.
+ logger.warning(
+ "Policy fetch returned no active policies for org=%s",
+ self.organization_id,
+ )
+ else:
+ logger.warning(
+ "Policy fetch returned status=%s for org=%s",
+ response.status_code,
+ self.organization_id,
+ )
except Exception as e:
- logger.warning(f"Failed to fetch policy: {e}")
+ logger.warning(
+ "Failed to fetch policy for org=%s: %s", self.organization_id, e
+ )
- # Fallback to default
- self._policy = Policy.default_local()
+ # Audit F-R2-02: fail-CLOSED. Order of precedence:
+ # 1. last known-good cached policy (if any)
+ # 2. strict_local() (zero budget, 1-call rate limit)
+ # 3. opt-out env var NULLRUN_POLICY_FAIL_OPEN=1 → default_local()
+ if getattr(self, "_last_good_policy", None) is not None:
+ self._policy = self._last_good_policy
+ logger.warning(
+ "Policy fetch failed; using last known-good policy (fail-CLOSED). "
+ "Set NULLRUN_POLICY_FAIL_OPEN=1 to fall back to permissive defaults."
+ )
+ return
+
+ if fail_open:
+ self._policy = Policy.default_local()
+ logger.warning(
+ "No cached policy and NULLRUN_POLICY_FAIL_OPEN=1; "
+ "using permissive default policy (audit F-R2-02 fail-OPEN opt-in)."
+ )
+ return
+
+ self._policy = Policy.strict_local()
+ logger.warning(
+ "No cached policy available; activating Policy.strict_local() "
+ "(zero budget, 1-call rate limit). Backend unreachable — "
+ "every cost-bearing call will be rejected by the reservation "
+ "service until the next successful policy fetch. "
+ "Set NULLRUN_POLICY_FAIL_OPEN=1 to restore the legacy "
+ "permissive fallback for tests / staging."
+ )
def _start_transport(self) -> None:
"""Start the transport layer with background flush.
@@ -719,9 +887,7 @@ def _start_http_poller(self) -> None:
"""Legacy: poll the server every second for state changes."""
self._poll_running = True
self._poll_thread = threading.Thread(
- target=self._poll_commands,
- daemon=True,
- name="nullrun-poller"
+ target=self._poll_commands, daemon=True, name="nullrun-poller"
)
self._poll_thread.start()
logger.info("Started remote state poller (HTTP)")
@@ -749,9 +915,7 @@ def _start_ws_listener(self) -> None:
name="nullrun-ws",
)
self._ws_thread.start()
- logger.info(
- "Started WS control plane listener (org=%s)", self.organization_id
- )
+ logger.info("Started WS control plane listener (org=%s)", self.organization_id)
def _ws_run(self) -> None:
"""Background thread entry point: run the WS connect/receive loop.
@@ -797,12 +961,15 @@ def on_state_change(state: dict[str, Any]) -> None:
if not workflow_id:
logger.debug("WS state message missing workflow_id: %s", state)
return
- self._set_remote_state(workflow_id, {
- "state": state.get("state", "Normal"),
- "version": state.get("version", 0),
- "reason": state.get("reason"),
- "updated_at": state.get("updated_at", 0),
- })
+ self._set_remote_state(
+ workflow_id,
+ {
+ "state": state.get("state", "Normal"),
+ "version": state.get("version", 0),
+ "reason": state.get("reason"),
+ "updated_at": state.get("updated_at", 0),
+ },
+ )
logger.debug(
"WS state push: workflow=%s state=%s reason=%s",
workflow_id,
@@ -901,27 +1068,50 @@ def _set_remote_state(self, workflow_id: str, state: dict[str, Any]) -> None:
self._remote_states[workflow_id] = dict(state)
def _fetch_remote_state(self, workflow_id: str) -> None:
- """Fetch remote state for a specific workflow from /status endpoint.
-
- Phase 5 #5.5: route through ``self._transport._client`` so the
- shared connection pool, retry policy, and circuit breaker
- apply. The previous raw ``httpx.get`` call created a fresh
- connection every time and bypassed the CB.
+ """Fetch remote state for a specific workflow via the org-scoped
+ workflow endpoint.
+
+ Pre-FIX-F2 the SDK hit ``/api/v1/status/{workflow_id}``, which
+ is not a registered route on the backend (the backend exposes
+ per-workflow state via
+ ``GET /api/v1/orgs/{org_id}/workflows/{workflow_id}``). The
+ pre-fix code therefore 404'd every poll and silently fell back
+ to local state — meaning the legacy HTTP-poll path could never
+ observe a remote kill/pause. WS push (the default mode since
+ Phase 5) does NOT go through this code path, so the WS control
+ plane is unaffected.
+
+ Backend ``WorkflowResponse`` (see
+ backend/src/proxy/http/workflows.rs:43) does not surface a
+ numeric ``version`` or ``reason`` for a workflow — those
+ fields are SDK-local only and remain at their cached values
+ when the remote response arrives. ``state`` is the only field
+ the kill/pause check (``check_control_plane``) actually reads,
+ so this is sufficient for correctness.
"""
+ if not self.organization_id:
+ # Legacy HTTP-poll was always org-bound; without org_id we
+ # cannot resolve the right route. Bail silently — the WS
+ # push path remains the authoritative source.
+ return
try:
response = self._transport._client.get(
- f"{self.api_url}/api/v1/status/{workflow_id}",
+ f"{self.api_url}/api/v1/orgs/{self.organization_id}/workflows/{workflow_id}",
headers=self._auth_headers(),
timeout=5.0,
)
if response.status_code == 200:
data = response.json()
- self._set_remote_state(workflow_id, {
- "state": data.get("state", "Normal"),
- "version": data.get("version", 0),
- "reason": data.get("reason"),
- "updated_at": data.get("updated_at", 0),
- })
+ # Merge with existing cached state so version / reason /
+ # updated_at (SDK-local fields not on the wire) survive.
+ cached = self._remote_state_for(workflow_id)
+ self._set_remote_state(
+ workflow_id,
+ {
+ **cached,
+ "state": data.get("state", cached.get("state", "Normal")),
+ },
+ )
logger.debug(
"Remote state for %s: %s",
workflow_id,
@@ -1049,9 +1239,7 @@ def check_workflow_budget(self) -> None:
try:
response = self._transport.check(check_req)
except Exception as exc: # noqa: BLE001
- logger.warning(
- f"check_workflow_budget: /gate unavailable, failing open: {exc}"
- )
+ logger.warning(f"check_workflow_budget: /gate unavailable, failing open: {exc}")
return
decision = response.get("decision", "allow")
@@ -1185,6 +1373,7 @@ def track(
fp = event.get("_fingerprint")
if fp:
from nullrun.instrumentation.auto import _fingerprint_is_seen
+
if _fingerprint_is_seen(self._seen_track_fingerprints, fp):
logger.debug("track() dedup hit for fingerprint=%s", fp)
return {
@@ -1209,7 +1398,7 @@ def track(
}
# Local check passed - record the call BEFORE sending to backend
- tool_name = event.get('tool_name', 'unknown')
+ tool_name = event.get("tool_name", "unknown")
self._loop_tracker.record(tool_name)
self._rate_tracker.record()
@@ -1224,10 +1413,22 @@ def track(
# Register workflow for remote state polling. workflow_id
# may be None on legacy keys -- that's fine, the no-op
# branch in check_control_plane will skip polling.
+ #
+ # Audit F-R2-12 (2026-06-22): route through ``_remote_state_for``
+ # which takes ``_states_lock`` for the entire setdefault. The
+ # pre-fix code did `with self._states_lock: setdefault(...)`
+ # in a single lock entry but never held the lock across the
+ # subsequent state read — so a concurrent ``_set_remote_state``
+ # from a WS push could win the race and leave the entry as a
+ # freshly-empty dict again on the next track_event call (a
+ # remote PAUSE / KILL would silently lose its state between
+ # the WS push and the next event). Using the locked helper
+ # here keeps setdefault atomic against WS pushes, and we
+ # don't read the returned dict anywhere — we only need the
+ # side-effect of registering the workflow_id.
workflow_id = enriched.get("workflow_id")
if workflow_id:
- with self._states_lock:
- self._remote_states.setdefault(workflow_id, {})
+ self._remote_state_for(workflow_id)
# Phase 0.3.1: the local cost / loop / retry-storm check
# (``_check_local_limits``) has been removed. It read
@@ -1254,10 +1455,7 @@ def track(
# sink-only ``_fingerprint`` field is also stripped before
# the wire send so the dedup key shape is not leaked to
# anyone with audit-log access.
- wire_event = {
- k: v for k, v in enriched.items()
- if k not in ("cost_cents", "_fingerprint")
- }
+ wire_event = {k: v for k, v in enriched.items() if k not in ("cost_cents", "_fingerprint")}
self._transport.track(wire_event)
# Update metrics (thread-safe)
@@ -1320,10 +1518,9 @@ def is_sensitive_tool(self, tool_name: str) -> bool:
"""
needle = tool_name.lower()
with self._tools_lock:
- return (
- needle in {t.lower() for t in self._sensitive_tools}
- or needle in {t.lower() for t in self._strict_mode_tools}
- )
+ return needle in {t.lower() for t in self._sensitive_tools} or needle in {
+ t.lower() for t in self._strict_mode_tools
+ }
def coverage_report(self) -> dict[str, dict[str, int]]:
"""
@@ -1373,11 +1570,14 @@ def track_coverage(self) -> dict[str, Any] | None:
if seen_total == 0:
# Nothing to report — avoid empty rows.
return None
- return self.track_event("coverage_report", **{
- "seen": stats["seen"],
- "tracked": stats["tracked"],
- "streaming_skipped": stats["streaming_skipped"],
- })
+ return self.track_event(
+ "coverage_report",
+ **{
+ "seen": stats["seen"],
+ "tracked": stats["tracked"],
+ "streaming_skipped": stats["streaming_skipped"],
+ },
+ )
_COVERAGE_REPORT_INTERVAL_SECONDS = 60.0
@@ -1423,9 +1623,8 @@ def _coverage_reporter_loop(self) -> None:
while not getattr(self, "_coverage_reporter_stop", False):
# Sleep in short slices so shutdown is responsive.
slept = 0.0
- while (
- slept < self._COVERAGE_REPORT_INTERVAL_SECONDS
- and not getattr(self, "_coverage_reporter_stop", False)
+ while slept < self._COVERAGE_REPORT_INTERVAL_SECONDS and not getattr(
+ self, "_coverage_reporter_stop", False
):
time.sleep(min(0.5, self._COVERAGE_REPORT_INTERVAL_SECONDS - slept))
slept += 0.5
@@ -1686,8 +1885,7 @@ def start_recording(self, workflow_id: str, metadata: dict[str, Any] = None) ->
# version to avoid breaking callers that imported it. It
# will be deleted in the next release.
logger.debug(
- "runtime.start_recording() is a no-op; "
- "decision history moved to the backend dashboard."
+ "runtime.start_recording() is a no-op; decision history moved to the backend dashboard."
)
return ""
@@ -1761,7 +1959,7 @@ def _local_check(self, event: dict[str, Any]) -> LocalDecision:
Returns:
LocalDecision with allowed/blocked status
"""
- tool_name = event.get('tool_name', 'unknown')
+ tool_name = event.get("tool_name", "unknown")
# Check loop count (6 same tool calls in 60s window)
loop_count = self._loop_tracker.count(tool_name, window=60)
@@ -1771,18 +1969,12 @@ def _local_check(self, event: dict[str, Any]) -> LocalDecision:
# of an agent stuck in a retry loop).
metrics.inc_runtime("loop_detections")
return LocalDecision(
- allowed=False,
- reason="loop_detected",
- suggestion="retry after 60s"
+ allowed=False, reason="loop_detected", suggestion="retry after 60s"
)
# Check rate limit (max 1000/min default)
if self._rate_tracker.exceeds_limit(self._local_rate_limit):
- return LocalDecision(
- allowed=False,
- reason="rate_limit",
- suggestion="slow down"
- )
+ return LocalDecision(allowed=False, reason="rate_limit", suggestion="slow down")
return LocalDecision(allowed=True)
@@ -1930,6 +2122,7 @@ def track_event(
from nullrun.instrumentation.auto import (
_fingerprint_for_event_dict,
)
+
event["_fingerprint"] = _fingerprint_for_event_dict(event)
return self.track(event)
@@ -1953,7 +2146,10 @@ def track(event: dict[str, Any]) -> dict[str, Any]:
Usage:
from nullrun import track
- track({"type": "llm_call", "tokens": 100, "cost_cents": 5})
+ # Note: `cost_cents` is NOT a valid event key — the SDK strips
+ # it before sending. Use `tokens` (or input_tokens/output_tokens
+ # for track_llm).
+ track({"type": "llm_call", "tokens": 100})
"""
return get_runtime().track(event)
diff --git a/src/nullrun/transport.py b/src/nullrun/transport.py
index e4c9b63..dbde3c5 100644
--- a/src/nullrun/transport.py
+++ b/src/nullrun/transport.py
@@ -4,6 +4,7 @@
Handles HTTP communication with batching and background flush.
Includes fallback modes for Gateway unavailability.
"""
+# codeql[py/clear-text-logging-sensitive-data]: False positives — CodeQL flags the function entry line (e.g. CircuitBreaker() ctor at line 555) where `secret_key` data-flow enters via class attrs, but no logger call in this file emits `secret_key`. api_key IS redacted to `api_key[:8] + "***"` at line 567 before logging.
import hashlib
import hmac
@@ -39,6 +40,7 @@
try:
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
+
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
@@ -48,9 +50,6 @@
logger = logging.getLogger(__name__)
-
-
-
__api_version__ = "1.0"
@@ -58,6 +57,7 @@
# HMAC Request Signing (Task 11)
# =============================================================================
+
def generate_hmac_signature(
api_key: str,
secret_key: str,
@@ -84,13 +84,19 @@ def generate_hmac_signature(
Returns:
Hex-encoded HMAC-SHA256 signature
"""
- body_hash = hashlib.sha256(body.encode('utf-8')).hexdigest()
+ # SHA-256 is the standard hash for HMAC-SHA256 (RFC 2104). secret_key
+ # is an HMAC shared key, not a password; replacing with a slow KDF
+ # would break the HMAC contract because the server must recompute
+ # the exact same bytes. body_hash is a payload fingerprint, not a
+ # password. False positive of py/weak-cryptographic-hash — CodeQL
+ # mis-classifies variables ending in `_key` as passwords.
+ # codeql[py/weak-sensitive-data-hashing]: HMAC-SHA256 standard hash; secret_key is HMAC key not a password.
+ body_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
message = f"{timestamp}:{api_key}:{body_hash}"
+ # codeql[py/weak-sensitive-data-hashing]: HMAC-SHA256 standard hash; secret_key is HMAC key not a password.
signature = hmac.new(
- secret_key.encode('utf-8'),
- message.encode('utf-8'),
- hashlib.sha256
+ secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256
).hexdigest()
return signature
@@ -127,6 +133,7 @@ def verify_hmac_signature(
# vs. incident response.
try:
from nullrun.observability import metrics
+
metrics.inc_transport("hmac_verify_expired_total")
except Exception: # noqa: BLE001 — best-effort counter
pass
@@ -144,6 +151,7 @@ def verify_hmac_signature(
# Policy Cache for CACHED fallback mode
# =============================================================================
+
class CachedDecision:
"""Represents a cached execute decision."""
@@ -195,7 +203,9 @@ def get(self, key: str) -> CachedDecision | None:
self._hits += 1
return decision
- def set(self, key: str, decision: str, policy_id: str = None, policy_version: int = None) -> None:
+ def set(
+ self, key: str, decision: str, policy_id: str = None, policy_version: int = None
+ ) -> None:
if key in self._cache:
self._cache.move_to_end(key)
elif len(self._cache) >= self._maxsize:
@@ -231,8 +241,6 @@ def __len__(self) -> int:
return len(self._cache)
-
-
def _signed_request_body(payload: dict[str, Any]) -> bytes:
"""Serialise a JSON payload to the canonical bytes the HMAC
signature is computed over.
@@ -250,6 +258,7 @@ def _signed_request_body(payload: dict[str, Any]) -> bytes:
"""
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
+
# =============================================================================
# Retry with exponential backoff + jitter
# =============================================================================
@@ -258,6 +267,7 @@ def _signed_request_body(payload: dict[str, Any]) -> bytes:
Retry with exponential backoff + jitter + Retry-After header support
"""
+
def _retry_with_backoff(
func: Callable[[], Any],
max_retries: int = 3,
@@ -345,7 +355,7 @@ def _retry_with_backoff(
type(exc).__name__,
)
else:
- delay = min(base_delay * (backoff_factor ** attempt), max_delay)
+ delay = min(base_delay * (backoff_factor**attempt), max_delay)
jitter_amount = delay * jitter
# Standard jitter for retry delay -- not crypto-sensitive
actual_delay = delay + random.uniform(-jitter_amount, jitter_amount) # noqa: S311
@@ -360,14 +370,14 @@ def _retry_with_backoff(
time.sleep(actual_delay)
- raise BreakerTransportError(
- f"Request failed after {max_retries + 1} attempts"
- ) from last_exc
+ raise BreakerTransportError(f"Request failed after {max_retries + 1} attempts") from last_exc
+
# =============================================================================
# Fallback Modes (Phase 1 - SDK Resilience)
# =============================================================================
+
class FallbackMode:
"""
SDK behavior when Gateway is unavailable.
@@ -375,6 +385,7 @@ class FallbackMode:
This is CRITICAL for production - Gateway unavailability should NOT
block agent execution, but behavior must be defined and logged.
"""
+
# Block if Gateway unavailable (for critical tools)
STRICT = "strict"
# Allow if Gateway unavailable, log locally (DEFAULT)
@@ -387,6 +398,7 @@ class DecisionSource:
"""
Where the decision originated - for provenance tracking.
"""
+
GATEWAY = "gateway"
CACHED = "cached"
FALLBACK = "fallback"
@@ -396,6 +408,7 @@ class DecisionSource:
@dataclass
class FlushConfig:
"""Configuration for transport flush behavior."""
+
batch_size: int = 50
flush_interval: float = 5.0 # seconds
max_retries: int = 3
@@ -407,6 +420,7 @@ class FlushConfig:
@dataclass
class ExecuteConfig:
"""Configuration for execute (strict mode) behavior."""
+
# Fallback mode when Gateway is unavailable
fallback_mode: str = FallbackMode.PERMISSIVE
# Gateway timeout in seconds
@@ -488,9 +502,7 @@ def __init__(
)
if "NULLRUN_FLUSH_INTERVAL_MS" in os.environ:
try:
- self.config.flush_interval = (
- int(os.environ["NULLRUN_FLUSH_INTERVAL_MS"]) / 1000.0
- )
+ self.config.flush_interval = int(os.environ["NULLRUN_FLUSH_INTERVAL_MS"]) / 1000.0
except ValueError:
logger.warning(
"NULLRUN_FLUSH_INTERVAL_MS=%r is not an int; ignoring",
@@ -499,9 +511,9 @@ def __init__(
self._buffer: list[dict[str, Any]] = []
self._in_flight: dict[str, dict[str, Any]] = {} # event_id -> event for retry dedup
self._lock = threading.RLock() # RLock so re-entrant acquisition (e.g.
- # test fixtures that hold the lock
- # while calling lock-acquiring
- # methods) doesn't deadlock.
+ # test fixtures that hold the lock
+ # while calling lock-acquiring
+ # methods) doesn't deadlock.
self._flush_thread: threading.Thread | None = None
self._running = False
@@ -825,9 +837,7 @@ def send_batch():
self._circuit_breaker.call(send_batch)
except BreakerTransportError:
# Circuit breaker is open - re-add batch to buffer for retry later
- logger.warning(
- f"Circuit breaker OPEN. Batch of {len(batch)} events will be re-queued."
- )
+ logger.warning(f"Circuit breaker OPEN. Batch of {len(batch)} events will be re-queued.")
# P0-4 (plan §10): drop NEWEST non-critical events instead of
# oldest. For cost-audit the oldest events are the
# most valuable (incident start, billing-period start) —
@@ -866,12 +876,14 @@ def _drain_batch(self) -> list[dict[str, Any]] | None:
# These are control-plane events: the dashboard's KILL/PAUSE has
# to land even under sustained backend outage, otherwise the
# kill-switch promise is broken (plan §11.4 P0-4 recommendation).
- _CRITICAL_EVENT_TYPES = frozenset({
- "state_change",
- "kill_received",
- "policy_invalidated",
- "key_rotated",
- })
+ _CRITICAL_EVENT_TYPES = frozenset(
+ {
+ "state_change",
+ "kill_received",
+ "policy_invalidated",
+ "key_rotated",
+ }
+ )
def _drop_newest_with_priority(
self,
@@ -907,10 +919,7 @@ def _drop_newest_with_priority(
# Reverse so we can pop from the "newest" end first while
# rebuilding in original order.
for event in reversed(batch):
- if (
- dropped < overflow
- and event.get("type") not in self._CRITICAL_EVENT_TYPES
- ):
+ if dropped < overflow and event.get("type") not in self._CRITICAL_EVENT_TYPES:
dropped += 1
continue
kept.append(event)
@@ -982,12 +991,28 @@ def _build_signed_headers(
}
if self.api_key:
headers["X-API-Key"] = self.api_key
+ # FIX-F3 (counterpart of backend csrf.rs has_bearer_auth):
+ # The backend's CSRF middleware bypasses cookie-based
+ # double-submit checks whenever the request carries any
+ # non-empty Authorization header (see
+ # backend/src/auth/csrf.rs::has_bearer_auth). Without this
+ # header the SDK POSTs hit the "state-changing request
+ # without session cookie" branch and get 403 — which the
+ # SDK's try/except in /gate, /track, /check, /execute
+ # silently swallowed, so every SDK-side enforcement was
+ # effectively fail-OPEN on production traffic.
+ #
+ # We use the user-facing api_key as the Bearer value so the
+ # bypass header is meaningful for debugging; the actual
+ # SDK auth path is still X-API-Key (+ HMAC when configured).
+ # Bearer-style bypass is documented as safe in csrf.rs:80-95
+ # because browsers never auto-attach Authorization to
+ # cross-site requests, so this is not a CSRF regression.
+ headers["Authorization"] = f"Bearer {self.api_key}"
if body is not None and self.secret_key and self.api_key:
body_str = body if isinstance(body, str) else body.decode("utf-8")
timestamp = int(time.time())
- signature = generate_hmac_signature(
- self.api_key, self.secret_key, timestamp, body_str
- )
+ signature = generate_hmac_signature(self.api_key, self.secret_key, timestamp, body_str)
headers["X-Signature-Timestamp"] = str(timestamp)
headers["X-Signature"] = signature
if extra:
@@ -1031,15 +1056,17 @@ def _extract_retry_after(self, response: httpx.Response) -> float | None:
# Try parsing as HTTP datetime (RFC 7231)
try:
from email.utils import parsedate_to_datetime
+
dt = parsedate_to_datetime(retry_after)
from datetime import datetime, timezone
+
return (dt - datetime.now(timezone.utc)).total_seconds()
except Exception:
pass
return None
- def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> 'SendResult':
+ def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> "SendResult":
"""Send batch to server using batch endpoint. Returns SendResult with retry info.
P0 #2: the post() call below is wrapped with _retry_with_backoff so a
@@ -1053,6 +1080,8 @@ def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> 'SendResul
headers = {"Content-Type": "application/json", "X-API-Version": __api_version__}
if self.api_key:
headers["X-API-Key"] = self.api_key
+ # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers).
+ headers["Authorization"] = f"Bearer {self.api_key}"
# Add HMAC signature headers
body = json.dumps({"events": batch})
@@ -1107,12 +1136,12 @@ def _post_batch() -> httpx.Response:
try:
data = response.json()
# Check for rejection info
- if 'rejected' in data and data['rejected']:
- rejected_info = data['rejected']
+ if "rejected" in data and data["rejected"]:
+ rejected_info = data["rejected"]
if isinstance(rejected_info, dict):
- if 'retry_after_ms' in rejected_info:
- retry_after_ms = rejected_info['retry_after_ms']
- if 'reason' in rejected_info and rejected_info['reason'] == 'policy_limit':
+ if "retry_after_ms" in rejected_info:
+ retry_after_ms = rejected_info["retry_after_ms"]
+ if "reason" in rejected_info and rejected_info["reason"] == "policy_limit":
is_policy_limit = True
except Exception: # noqa: S110
pass
@@ -1149,12 +1178,12 @@ def _post_batch() -> httpx.Response:
logger.warning(f"Failed to process actions_taken: {e}")
# Return accepted event_ids for retry dedup
- accepted_event_ids = data.get("accepted_event_ids", []) if 'data' in locals() else []
+ accepted_event_ids = data.get("accepted_event_ids", []) if "data" in locals() else []
logger.debug(f"Batch track: sent {len(batch)} events")
return self.SendResult(
accepted_event_ids=accepted_event_ids,
retry_after_ms=retry_after_ms,
- is_policy_limit=is_policy_limit
+ is_policy_limit=is_policy_limit,
)
def flush_now(self) -> None:
@@ -1178,10 +1207,17 @@ def execute(
on_transport_error: Callable[[Exception], dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""
- Pre-execution policy evaluation via unified gate endpoint.
+ Pre-execution policy evaluation via the /api/v1/execute endpoint.
This is the PRIMARY enforcement point - decision is made BEFORE execution.
- Uses /api/v1/gate endpoint for unified execute + check functionality.
+ Per audit F-R2-01 (2026-06-22): the SDK MUST call /api/v1/execute (which
+ checks the ``execute`` scope on the API key) rather than /api/v1/gate
+ (advisory, no scope check). Calling /gate here would let an API key
+ with only ``read``/``write`` scopes drive a sensitive-tool decision --
+ scope gate would be skipped entirely.
+
+ /api/v1/gate is reserved for budget pre-flight (``Transport.check``);
+ see CLAUDE.md ``fail-CLOSED`` table for sensitive tools.
Args:
organization_id: Organization identifier
@@ -1214,13 +1250,24 @@ def execute(
"trace_id": trace_id,
"tool": tool,
"input": input_data,
+ # Audit F-R2-19 (2026-06-22): `mode` field is wire-present
+ # but never read by the backend
+ # (`backend/src/proxy/http/gate/internal.rs:42-54`). The
+ # backend's `EnforcementMode` is selected by the route
+ # handler (`gate.rs:33`, `check.rs:?`, `execute.rs:59`),
+ # NOT by this string. We keep the field for now to avoid a
+ # breaking change for any third-party proxies that mirror
+ # the wire shape, but the SDK does NOT honour this value
+ # for any local decision.
"mode": mode,
"operation_id": operation_id or str(uuid.uuid4()),
}
- headers = {"Content-Type": "application/json"}
+ headers = {"Content-Type": "application/json", "X-API-Version": __api_version__}
if self.api_key:
headers["X-API-Key"] = self.api_key
+ # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers).
+ headers["Authorization"] = f"Bearer {self.api_key}"
# HMAC fix: serialise via the canonical-bytes helper and send
# via content=body so the wire bytes match the signed bytes.
@@ -1231,9 +1278,9 @@ def execute(
# Inject trace context for distributed tracing (W3C Trace Context)
self._inject_trace_context(headers)
- def do_gate_request() -> httpx.Response:
+ def do_execute_request() -> httpx.Response:
return self._client.post(
- f"{self.api_url}/api/v1/gate",
+ f"{self.api_url}/api/v1/execute",
content=body,
headers=headers,
timeout=5.0,
@@ -1242,7 +1289,7 @@ def do_gate_request() -> httpx.Response:
# Try Gateway with retry backoff
try:
response = _retry_with_backoff(
- do_gate_request,
+ do_execute_request,
max_retries=2,
base_delay=0.5,
on_transport_error=on_transport_error,
@@ -1252,15 +1299,12 @@ def do_gate_request() -> httpx.Response:
data = response.json()
data["decision_source"] = DecisionSource.GATEWAY
# Cache successful decision for CACHED mode
- cache_key = self._policy_cache.make_key(
- organization_id,
- data.get("policy_version")
- )
+ cache_key = self._policy_cache.make_key(organization_id, data.get("policy_version"))
self._policy_cache.set(
cache_key,
data.get("decision", "allow"),
data.get("policy_id"),
- data.get("policy_version")
+ data.get("policy_version"),
)
return data # type: ignore[no-any-return]
elif response.status_code >= 400:
@@ -1346,9 +1390,7 @@ def do_gate_request() -> httpx.Response:
}
else:
logger.warning(
- "Gateway unreachable, no cache for %s, "
- "falling back to PERMISSIVE",
- tool
+ "Gateway unreachable, no cache for %s, falling back to PERMISSIVE", tool
)
return {
"decision": "allow",
@@ -1412,6 +1454,8 @@ def check(
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
+ # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers).
+ headers["Authorization"] = f"Bearer {self.api_key}"
headers["X-API-Version"] = __api_version__
# HMAC fix: serialise via the canonical-bytes helper and send
@@ -1480,7 +1524,7 @@ def check(
def clear_policy_cache(self) -> None:
"""Clear the policy cache, forcing next gate/execute to fetch fresh policy."""
- if hasattr(self, '_policy_cache'):
+ if hasattr(self, "_policy_cache"):
self._policy_cache._cache.clear()
logger.debug("Policy cache cleared")
@@ -1519,11 +1563,10 @@ async def connect_websocket(
from urllib.parse import urlparse, urlunparse
from nullrun.transport_websocket import WebSocketConnection
+
parsed = urlparse(self.api_url)
if parsed.scheme not in ("http", "https"):
- raise ValueError(
- f"Unsupported scheme for control plane: {parsed.scheme!r}"
- )
+ raise ValueError(f"Unsupported scheme for control plane: {parsed.scheme!r}")
ws_scheme = "wss" if parsed.scheme == "https" else "ws"
ws_url = urlunparse(
parsed._replace(
@@ -1538,6 +1581,8 @@ async def connect_websocket(
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
+ # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers).
+ headers["Authorization"] = f"Bearer {self.api_key}"
# Wrap the policy invalidated callback to clear local cache
async def wrapped_policy_invalidated(ws_id: str, policy_id: str, new_version: int) -> None:
@@ -1593,6 +1638,8 @@ async def _refetch_credentials(self) -> None:
headers: dict[str, str] = {
"Content-Type": "application/json",
"X-API-Key": self.api_key or "",
+ # FIX-F3: Bearer header for CSRF bypass (see _build_signed_headers).
+ "Authorization": f"Bearer {self.api_key}" if self.api_key else "",
}
# Re-use the same HMAC headers as /gate and /track so
# the server's auth-verify path is consistent.
@@ -1622,6 +1669,29 @@ async def _refetch_credentials(self) -> None:
logger.error(f"Error refetching credentials: {e}")
+# Audit F-R2-13 (2026-06-22): the module-level ``_parse_error_envelope``
+# helper below is documented as "canonical" but is NOT called from any
+# live wire path — every endpoint does its own ad-hoc
+# ``response.raise_for_status()`` or status-code branch.
+#
+# The audit's recommendation was "either delete the helper (it's
+# misleading), OR wire it up everywhere". We chose "keep but mark
+# test-only" because:
+#
+# 1. ``tests/test_error_envelope.py`` and
+# ``tests/test_transport_branches.py`` import this helper as a
+# pure-function reference for the canonical envelope→exception
+# mapping (the test fixtures encode the contract that a future
+# refactor will need to match).
+# 2. Tests are documentation. Deleting it forces the tests to
+# duplicate the mapping table, which is exactly the kind of
+# drift the helper exists to prevent.
+#
+# DO NOT add a new caller that uses this helper from the SDK wire
+# path until every endpoint is refactored to route through it. The
+# helper is currently a frozen contract test, not a live translator.
+# If you wire it up everywhere, delete this comment and rename to a
+# non-underscored name (it's no longer private).
def _parse_error_envelope(
response: httpx.Response,
endpoint: str,
@@ -1635,6 +1705,9 @@ def _parse_error_envelope(
Module-level helper (not a Transport method) so it can be called
from background threads that do not carry a Transport instance.
+
+ **Audit F-R2-13 (2026-06-22):** no live wire path uses this. It
+ exists for tests only. See the comment block above.
"""
status = response.status_code
try:
@@ -1644,16 +1717,11 @@ def _parse_error_envelope(
if not isinstance(body, dict):
body = {}
error_slug: str = body.get("error", "") or ""
- message: str = (
- body.get("message")
- or response.text
- or f"HTTP {status}"
- )
+ message: str = body.get("message") or response.text or f"HTTP {status}"
if status in (401, 403):
return NullRunAuthenticationError(
- f"Auth failed on {endpoint} (status {status}, "
- f"error={error_slug!r}): {message}"
+ f"Auth failed on {endpoint} (status {status}, error={error_slug!r}): {message}"
)
if status == 429:
@@ -1666,16 +1734,14 @@ def _parse_error_envelope(
try:
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
+
dt = parsedate_to_datetime(ra_header)
- retry_after = (
- dt - datetime.now(timezone.utc)
- ).total_seconds()
+ retry_after = (dt - datetime.now(timezone.utc)).total_seconds()
except Exception:
retry_after = None
upgrade_url = body.get("upgrade_url") if isinstance(body, dict) else None
return RateLimitError(
- f"Rate limited on {endpoint} (status 429, error={error_slug!r}): "
- f"{message}",
+ f"Rate limited on {endpoint} (status 429, error={error_slug!r}): {message}",
source=TransportErrorSource.GATEWAY_ERROR,
endpoint=endpoint,
retry_after=retry_after,
@@ -1685,8 +1751,7 @@ def _parse_error_envelope(
if 500 <= status < 600:
return NullRunTransportError(
- f"Gateway error on {endpoint} (status {status}, "
- f"error={error_slug!r}): {message}",
+ f"Gateway error on {endpoint} (status {status}, error={error_slug!r}): {message}",
source=TransportErrorSource.GATEWAY_ERROR,
endpoint=endpoint,
status_code=status,
@@ -1694,11 +1759,9 @@ def _parse_error_envelope(
)
return NullRunTransportError(
- f"Client error on {endpoint} (status {status}, "
- f"error={error_slug!r}): {message}",
+ f"Client error on {endpoint} (status {status}, error={error_slug!r}): {message}",
source=TransportErrorSource.GATEWAY_ERROR,
endpoint=endpoint,
status_code=status,
error_slug=error_slug,
)
-
diff --git a/src/nullrun/transport_websocket.py b/src/nullrun/transport_websocket.py
index 9d0a882..a5bbba6 100644
--- a/src/nullrun/transport_websocket.py
+++ b/src/nullrun/transport_websocket.py
@@ -5,6 +5,7 @@
Replaces polling pattern: SDK connects to WS, receives push updates
when workflow state changes (KILL/PAUSE).
"""
+# codeql[py/clear-text-logging-sensitive-data]: False positives — CodeQL flags handler-branch lines (resync_required / subscribed / _handle_state_change_with_ack) where `secret_key` data-flow enters via `self.secret_key`. The actual logger calls log public identifiers (`key_id` UUID, `organization_id`), NOT the HMAC secret_key.
import asyncio
import hashlib
@@ -17,6 +18,7 @@
try:
import websockets
+
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
@@ -31,6 +33,25 @@
# cost-rolls; only the WS push latency advantage is lost).
_MAX_RECONNECT_ATTEMPTS = 10
+# HMAC identity field on the WS wire format.
+#
+# The backend's ``SignedWsMessage`` struct (NULLRUN/backend/src/proxy/
+# http/ws_control.rs:43) serializes the HMAC identity under the field
+# name ``api_key``. Pre-FIX-F4 the wire field was named ``api_key_id``
+# (the rename happened in the backend struct comment but not in every
+# test fixture — see tests/test_ws_signed_payload.py for the historical
+# mock shape). The SDK reads this field and uses the value to verify
+# the HMAC signature; without a constant pin, a future struct rename
+# silently breaks signature verification on every push.
+#
+# HTTP path uses a different field name — ``X-API-Key`` (see
+# Transport._build_signed_headers). The two transports agree on the
+# field NAME but disagree on the VALUE: HTTP carries the user-facing
+# ``nr_live_...`` string, WS carries the internal UUID from
+# ``auth_context.key_id()``. Both are internally consistent, but the
+# split is a known regression risk — see audit 2026-06-22 #3+#8.
+WS_HMAC_IDENTITY_FIELD = "api_key"
+
def compute_hmac_signature(api_key: str, secret_key: str, timestamp: int, payload: bytes) -> str:
"""
@@ -48,17 +69,21 @@ def compute_hmac_signature(api_key: str, secret_key: str, timestamp: int, payloa
Returns:
Hex-encoded HMAC-SHA256 signature
"""
- # Compute payload hash: SHA256(payload)
+ # SHA-256 is the standard hash for HMAC-SHA256 (RFC 2104). secret_key
+ # is an HMAC shared key, not a password; replacing with a slow KDF
+ # would break the HMAC contract because the server must recompute
+ # the exact same bytes. payload_hash is a payload fingerprint, not
+ # a password. False positive of py/weak-cryptographic-hash — CodeQL
+ # mis-classifies variables ending in `_key` as passwords.
+ # codeql[py/weak-sensitive-data-hashing]: HMAC-SHA256 standard hash; secret_key is HMAC key not a password.
payload_hash = hashlib.sha256(payload).hexdigest()
# Construct message: timestamp:api_key:payload_hash
message = f"{timestamp}:{api_key}:{payload_hash}"
- # Compute HMAC-SHA256
+ # codeql[py/weak-sensitive-data-hashing]: HMAC-SHA256 standard hash; secret_key is HMAC key not a password.
signature = hmac.new(
- secret_key.encode('utf-8'),
- message.encode('utf-8'),
- hashlib.sha256
+ secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256
).hexdigest()
return signature
@@ -96,6 +121,7 @@ def verify_hmac_signature(
# clock-skew issues, not two.
try:
from nullrun.observability import metrics
+
metrics.inc_transport("hmac_verify_expired_total")
except Exception: # noqa: BLE001 — best-effort counter
pass
@@ -132,6 +158,19 @@ class WebSocketConnection:
# without ever being drained.
ACKNOWLEDGED_STATES = {"Killed", "Paused"}
+ @classmethod
+ def _is_acknowledged_state(cls, state: str) -> bool:
+ """Case-insensitive membership check against ``ACKNOWLEDGED_STATES``.
+
+ Audit-2026-06-22: added a lowercase fallback so a server
+ regression to ``"killed"``/``"paused"`` doesn't silently
+ drop the ACK. Exact PascalCase is still the happy path and
+ is checked first; the lowercase branch is defensive only.
+ """
+ if state in cls.ACKNOWLEDGED_STATES:
+ return True
+ return state.lower() in {s.lower() for s in cls.ACKNOWLEDGED_STATES}
+
def __init__(
self,
url: str,
@@ -268,9 +307,7 @@ async def _connect(self) -> None:
Internal method used by connect() and reconnect loop.
"""
- self._conn = await websockets.connect(
- self.url, additional_headers=self.headers
- )
+ self._conn = await websockets.connect(self.url, additional_headers=self.headers)
self._running = True
self._receive_task = asyncio.create_task(self._receive_loop())
@@ -284,8 +321,7 @@ async def connect(self) -> None:
"""
if not WEBSOCKETS_AVAILABLE:
raise ImportError(
- "websockets library not available. "
- "Install with: pip install nullrun[websocket]"
+ "websockets library not available. Install with: pip install nullrun[websocket]"
)
self._closed = False
@@ -354,7 +390,7 @@ async def _handle_message(self, message: str) -> None:
# so we still have a chance to accept it; the
# signature check will fail in either case
# and we'll reject with the standard error.
- verify_payload = message.encode('utf-8')
+ verify_payload = message.encode("utf-8")
else:
# Pre-FIX-C server: verify against full wire
# bytes. Will pass only on round-trip tests where
@@ -362,10 +398,43 @@ async def _handle_message(self, message: str) -> None:
# do; in real life this is the byte-mismatch path
# and the message should be rejected. Kept as
# best-effort backwards compatibility.
- verify_payload = message.encode('utf-8')
+ verify_payload = message.encode("utf-8")
+
+ # FIX-F4 (counterpart of backend ws_control.rs FIX-F4): the server
+ # signs HMAC over the user-facing API key the SDK has
+ # (``nr_live_...``). The envelope publishes the same
+ # value under the ``api_key`` field — we MUST read it
+ # back from there and use it as the HMAC identifier.
+ #
+ # Pre-FIX-F4 this branch read ``data["api_key_id"]``,
+ # which used to be the wire field name on the server
+ # side. That field now carries the same user-facing
+ # value (no longer the internal UUID key_id), so for
+ # backwards compat we accept either field name —
+ # pre-FIX-F4 envelopes may still arrive with
+ # ``api_key_id`` carrying the user-facing string
+ # because the server's only consumers were pre-FIX-F4
+ # SDKs.
+ #
+ # Fall back to ``self.api_key`` only when the envelope
+ # has neither field (a pre-FIX-D server without
+ # signed_payload), which is a degraded path that
+ # already 403'd in real life per the FIX-C comments.
+ envelope_api_key = (
+ data.get(WS_HMAC_IDENTITY_FIELD)
+ if isinstance(data.get(WS_HMAC_IDENTITY_FIELD), str) and data.get(WS_HMAC_IDENTITY_FIELD)
+ else data.get("api_key_id")
+ )
+ if isinstance(envelope_api_key, str) and envelope_api_key:
+ verify_api_key = envelope_api_key
+ else:
+ # Pre-FIX-D server: no api_key/api_key_id
+ # published. Round-trip only — never expected in
+ # production after the FIX-C deployment.
+ verify_api_key = self.api_key
if not verify_hmac_signature(
- self.api_key,
+ verify_api_key,
self.secret_key,
msg_timestamp,
verify_payload,
@@ -396,6 +465,7 @@ async def _handle_message(self, message: str) -> None:
# observability imports nothing from us, so this
# is safe and lazy.
from nullrun.observability import metrics
+
metrics.inc_transport("hmac_verify_failures_total")
return
@@ -418,9 +488,7 @@ async def _handle_message(self, message: str) -> None:
if signature and timestamp and self.api_key and self.secret_key:
if isinstance(signed_payload_hex, str) and signed_payload_hex:
try:
- trusted = json.loads(
- bytes.fromhex(signed_payload_hex).decode("utf-8")
- )
+ trusted = json.loads(bytes.fromhex(signed_payload_hex).decode("utf-8"))
except (ValueError, json.JSONDecodeError):
# Malformed signed_payload — the signature
# check above will already have rejected this
@@ -440,11 +508,14 @@ async def _handle_message(self, message: str) -> None:
# envelope is signed, parse each entry from its
# embedded signed_payload if present, else fall
# back to the outer dict.
- if isinstance(wf, dict) and wf.get("signed_payload") and self.api_key and self.secret_key:
+ if (
+ isinstance(wf, dict)
+ and wf.get("signed_payload")
+ and self.api_key
+ and self.secret_key
+ ):
try:
- inner = json.loads(
- bytes.fromhex(wf["signed_payload"]).decode("utf-8")
- )
+ inner = json.loads(bytes.fromhex(wf["signed_payload"]).decode("utf-8"))
self._dispatch_state(inner)
continue
except (ValueError, json.JSONDecodeError, KeyError):
@@ -461,7 +532,9 @@ async def _handle_message(self, message: str) -> None:
organization_id = data.get("organization_id", "")
policy_id = data.get("policy_id", "")
new_version = data.get("new_version", 0)
- logger.info(f"Policy invalidated: {policy_id} v{new_version}, org: {organization_id}")
+ logger.info(
+ f"Policy invalidated: {policy_id} v{new_version}, org: {organization_id}"
+ )
if self.on_policy_invalidated:
try:
self.on_policy_invalidated(organization_id, policy_id, new_version)
@@ -552,7 +625,27 @@ async def _handle_state_change_with_ack(
message_id = source.get("message_id")
# Check if this state requires acknowledgment
- if state in self.ACKNOWLEDGED_STATES and message_id:
+ #
+ # Audit-2026-06-22 case-defensive: the HTTP-poll path
+ # (`runtime.py`) lowercases before comparing so it survives a
+ # server regression to lowercase states. The WS path used to
+ # exact-match only. Without this fallback, a server regression
+ # would silently drop the ACK (the existing test pins
+ # PascalCase as the happy path, but does not pin what happens
+ # if the server emits ``"killed"``).
+ #
+ # ACK semantics contract (audit 2026-06-22): the server
+ # currently treats ACK as a BEST-EFFORT INFORMATIONAL signal
+ # (see ``backend/src/proxy/http/ws_control.rs`` ACK handler
+ # comment for the full contract). Only `Killed`/`Paused` are
+ # ACKed; the other 3 WsWorkflowState variants
+ # (Normal/Flagged/Tripped) are dispatched to the callback but
+ # do not trigger an ACK. This is by design — the backend
+ # pending-ack queue is dead code, so a missing ACK does not
+ # block state propagation today. If a future refactor makes
+ # the server gate on ACK arrival, the SDK must extend its
+ # ACK set to all 5 states or states will silently stick.
+ if self._is_acknowledged_state(state) and message_id:
# Send ACK immediately
await self._send_ack(message_id)
logger.debug(f"Sent ACK for message {message_id} ({state} for workflow {workflow_id})")
@@ -568,6 +661,16 @@ async def _send_ack(self, message_id: str) -> None:
Args:
message_id: The message ID to acknowledge
+
+ Audit F-R2-14 (2026-06-22): this ACK is plain JSON — no
+ HMAC signature. CHANGELOG 0.5.2 overclaimed "signed outgoing
+ ACKs". The backend does NOT currently verify ACK authenticity
+ (``backend/src/proxy/http/ws_control.rs:842-848`` is a TODO).
+ If the backend ever enables ACK verification, this method must
+ add a signature field — and the test
+ ``TestOutgoingAckIsPlainJson`` in
+ ``tests/test_integration_contract.py`` must be updated to
+ match.
"""
if not self._conn or not self._running:
logger.warning("Cannot send ACK - WebSocket not connected")
@@ -577,7 +680,14 @@ async def _send_ack(self, message_id: str) -> None:
ack = {
"type": "ack",
"message_id": message_id,
- "received_at": int(time.time() * 1000), # milliseconds
+ # FIX-F5: backend declares ``received_at: i64`` on
+ # ``WsMessage::Ack`` (backend/src/proxy/http/ws_control.rs)
+ # and its fallback path stamps ``Utc::now().timestamp()`` —
+ # unix seconds. Sending milliseconds here would silently
+ # diverge by 1000x in any future telemetry / analytics
+ # consumer that reads this field. Pin the unit to seconds
+ # to match the contract on both sides.
+ "received_at": int(time.time()),
}
await self._conn.send(json.dumps(ack))
logger.debug(f"ACK sent for message {message_id}")
@@ -671,4 +781,3 @@ async def close(self) -> None:
def is_connected(self) -> bool:
"""Check if connection is active."""
return self._running and self._conn is not None and not self._closed
-
diff --git a/tests/test_actions_context_init.py b/tests/test_actions_context_init.py
index d364334..76668dd 100644
--- a/tests/test_actions_context_init.py
+++ b/tests/test_actions_context_init.py
@@ -28,7 +28,6 @@
WorkflowKilledInterrupt,
)
-
# ─── ActionHandler ──────────────────────────────────────────────────
diff --git a/tests/test_auto_requests.py b/tests/test_auto_requests.py
index df49ffe..85033ff 100644
--- a/tests/test_auto_requests.py
+++ b/tests/test_auto_requests.py
@@ -80,9 +80,10 @@ def test_patch_requests_returns_false_when_missing(monkeypatch, fresh_patch_modu
def test_patch_requests_idempotent(monkeypatch, fresh_patch_module):
"""Calling patch_requests twice does not double-wrap Session.send."""
_install_fake_requests(monkeypatch)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(MagicMock()) is True
wrapped = Session.send
assert patch_requests(MagicMock()) is True
@@ -91,9 +92,10 @@ def test_patch_requests_idempotent(monkeypatch, fresh_patch_module):
def test_patch_requests_skips_when_class_marker_present(monkeypatch, fresh_patch_module):
_install_fake_requests(monkeypatch)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
Session._nullrun_patched = True
try:
assert patch_requests(MagicMock()) is True
@@ -112,9 +114,10 @@ def test_session_send_emits_llm_call_for_openai(monkeypatch, fresh_patch_module)
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
# Build a fake PreparedRequest-like object.
@@ -138,9 +141,10 @@ def test_session_send_marks_request_as_tracked(monkeypatch, fresh_patch_module):
_install_fake_requests(monkeypatch)
rt = _fake_runtime({})
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={})
Session().send(req)
@@ -153,9 +157,10 @@ def test_session_send_unknown_host_no_track(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://example.com/api", headers={})
Session().send(req)
@@ -170,9 +175,10 @@ def test_session_send_already_tracked_returns_unchanged(monkeypatch, fresh_patch
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={}, _nullrun_tracked=True)
Session().send(req)
@@ -191,9 +197,10 @@ def test_session_send_streaming_skips_track(monkeypatch, fresh_patch_module):
rt._coverage_streaming_skipped = {}
rt._bump_coverage_counter = MagicMock()
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={})
Session().send(req, stream=True)
@@ -209,9 +216,10 @@ def test_session_send_accept_event_stream_header_skips_track(monkeypatch, fresh_
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={"Accept": "text/event-stream"})
Session().send(req)
@@ -224,9 +232,10 @@ def test_session_send_no_extractor_for_host_returns_response(monkeypatch, fresh_
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://unknown.host.example/api", headers={})
resp = Session().send(req)
@@ -241,9 +250,10 @@ def test_session_send_status_400_no_track(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={})
Session().send(req)
@@ -276,9 +286,10 @@ def send(self_or_cls, request, **kwargs):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={})
Session().send(req)
@@ -292,9 +303,10 @@ def test_session_send_track_failure_is_swallowed(monkeypatch, fresh_patch_module
rt.track.side_effect = RuntimeError("down")
rt.track_event.side_effect = lambda **kw: None
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://api.openai.com/v1/chat/completions", headers={})
resp = Session().send(req)
@@ -314,9 +326,10 @@ def test_session_send_seen_counter_bumped(monkeypatch, fresh_patch_module):
rt._coverage_seen = {}
rt._bump_coverage_counter = MagicMock()
- from nullrun.instrumentation.auto_requests import patch_requests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests
+
assert patch_requests(rt) is True
req = SimpleNamespace(url="https://example.com/api", headers={})
Session().send(req)
@@ -329,9 +342,10 @@ def test_session_send_seen_counter_bumped(monkeypatch, fresh_patch_module):
def test_reset_for_tests_restores_session(monkeypatch, fresh_patch_module):
_install_fake_requests(monkeypatch)
- from nullrun.instrumentation.auto_requests import patch_requests, reset_for_tests
from requests import Session
+ from nullrun.instrumentation.auto_requests import patch_requests, reset_for_tests
+
original_send = Session.send
assert patch_requests(MagicMock()) is True
assert Session.send is not original_send
diff --git a/tests/test_autogen_patch.py b/tests/test_autogen_patch.py
index 505ef49..9933ad2 100644
--- a/tests/test_autogen_patch.py
+++ b/tests/test_autogen_patch.py
@@ -144,9 +144,10 @@ def test_patch_autogen_without_ext_module(monkeypatch, fresh_patch_module):
def test_patch_autogen_idempotent(monkeypatch, fresh_patch_module):
"""Calling ``patch_autogen`` twice does not double-wrap."""
_install_fake_autogen(monkeypatch)
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_agentchat.agents import BaseChatAgent
+ from nullrun.instrumentation.autogen import patch_autogen
+
first_orig = BaseChatAgent.on_messages
assert patch_autogen(MagicMock()) is True
second_orig = BaseChatAgent.on_messages
@@ -160,9 +161,10 @@ def test_patch_autogen_skips_when_class_already_patched(monkeypatch, fresh_patch
process installed it), the patch returns True without rewriting.
"""
_install_fake_autogen(monkeypatch)
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_agentchat.agents import BaseChatAgent
+ from nullrun.instrumentation.autogen import patch_autogen
+
BaseChatAgent._nullrun_patched = True
try:
assert patch_autogen(MagicMock()) is True
@@ -181,9 +183,10 @@ def test_on_messages_success_emits_span_start_and_end(monkeypatch, fresh_patch_m
recorder = {"track_event": [], "track": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_agentchat.agents import BaseChatAgent
+ from nullrun.instrumentation.autogen import patch_autogen
+
assert patch_autogen(rt) is True
result = BaseChatAgent.on_messages(None, ["hello"])
assert result.content == "ok"
@@ -231,9 +234,10 @@ def test_on_messages_track_event_failure_is_swallowed(monkeypatch, fresh_patch_m
rt = MagicMock()
rt.track_event.side_effect = [RuntimeError("down"), None]
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_agentchat.agents import BaseChatAgent
+ from nullrun.instrumentation.autogen import patch_autogen
+
assert patch_autogen(rt) is True
# Should NOT raise even though track_event errored.
assert BaseChatAgent.on_messages(None, []).content == "ok"
@@ -251,9 +255,10 @@ def test_openai_create_with_usage_emits_llm_call(monkeypatch, fresh_patch_module
recorder = {"track_event": [], "track": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_ext.models.openai import OpenAIChatCompletionClient
+ from nullrun.instrumentation.autogen import patch_autogen
+
assert patch_autogen(rt) is True
# The wrapper reads ``getattr(self, "model", None)`` — needs an
@@ -303,9 +308,10 @@ def test_openai_create_track_failure_is_swallowed(monkeypatch, fresh_patch_modul
rt.track.side_effect = RuntimeError("down")
rt.track_event.side_effect = lambda **kw: None
- from nullrun.instrumentation.autogen import patch_autogen
from autogen_ext.models.openai import OpenAIChatCompletionClient
+ from nullrun.instrumentation.autogen import patch_autogen
+
assert patch_autogen(rt) is True
result = OpenAIChatCompletionClient.create(None)
assert result.usage.prompt_tokens == 12
@@ -320,10 +326,11 @@ def test_unpatch_restores_original(monkeypatch, fresh_patch_module):
idempotency markers are cleared.
"""
_install_fake_autogen(monkeypatch, with_ext=True)
- from nullrun.instrumentation.autogen import patch_autogen, unpatch_autogen
from autogen_agentchat.agents import BaseChatAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
+ from nullrun.instrumentation.autogen import patch_autogen, unpatch_autogen
+
original_on_messages = BaseChatAgent.on_messages
original_create = OpenAIChatCompletionClient.create
diff --git a/tests/test_circuit_breaker_branches.py b/tests/test_circuit_breaker_branches.py
index 9b7e2e5..1098f55 100644
--- a/tests/test_circuit_breaker_branches.py
+++ b/tests/test_circuit_breaker_branches.py
@@ -26,7 +26,6 @@
CircuitBreakerMetrics,
)
-
# ─── CircuitBreakerMetrics ───────────────────────────────────────────
diff --git a/tests/test_crewai_patch.py b/tests/test_crewai_patch.py
index f8205dd..4c03421 100644
--- a/tests/test_crewai_patch.py
+++ b/tests/test_crewai_patch.py
@@ -74,9 +74,10 @@ def test_patch_crewai_returns_false_when_missing(monkeypatch, fresh_patch_module
def test_patch_crewai_idempotent(monkeypatch, fresh_patch_module):
_install_fake_crewai(monkeypatch)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(MagicMock()) is True
wrapped = Crew.kickoff
# Second call must NOT re-wrap.
@@ -86,9 +87,10 @@ def test_patch_crewai_idempotent(monkeypatch, fresh_patch_module):
def test_patch_crewai_skips_when_class_marker_present(monkeypatch, fresh_patch_module):
_install_fake_crewai(monkeypatch)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
Crew._nullrun_patched = True
try:
assert patch_crewai(MagicMock()) is True
@@ -101,9 +103,10 @@ def test_patch_crewai_without_async_kickoff(monkeypatch, fresh_patch_module):
installs the sync wrap and silently skips the async wrap.
"""
_install_fake_crewai(monkeypatch, with_async=False)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(MagicMock()) is True
@@ -118,9 +121,10 @@ def test_kickoff_emits_usage_metrics_per_model(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -152,9 +156,10 @@ def test_kickoff_without_usage_metrics_no_emit(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -171,9 +176,10 @@ def test_kickoff_non_dict_usage_metrics(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -188,9 +194,10 @@ def test_kickoff_non_dict_metric_value_skipped(monkeypatch, fresh_patch_module):
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -209,9 +216,10 @@ def test_kickoff_step_callback_installed_when_missing(monkeypatch, fresh_patch_m
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -229,9 +237,10 @@ def test_kickoff_preserves_user_step_callback(monkeypatch, fresh_patch_module):
_install_fake_crewai(monkeypatch)
rt = _fake_runtime({})
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
sentinel = MagicMock()
assert patch_crewai(rt) is True
crew = Crew()
@@ -251,9 +260,10 @@ async def test_kickoff_async_emits_usage_metrics(monkeypatch, fresh_patch_module
recorder = {"track": [], "track_event": []}
rt = _fake_runtime(recorder)
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
@@ -276,9 +286,10 @@ def test_kickoff_track_failure_is_swallowed(monkeypatch, fresh_patch_module):
rt.track.side_effect = RuntimeError("down")
rt.track_event.side_effect = lambda **kw: None
- from nullrun.instrumentation.crewai import patch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai
+
assert patch_crewai(rt) is True
crew = Crew()
crew.usage_metrics = {"m": {"prompt_tokens": 1, "completion_tokens": 2, "total_tokens": 3}}
@@ -290,9 +301,10 @@ def test_kickoff_track_failure_is_swallowed(monkeypatch, fresh_patch_module):
def test_unpatch_restores_original(monkeypatch, fresh_patch_module):
_install_fake_crewai(monkeypatch)
- from nullrun.instrumentation.crewai import patch_crewai, unpatch_crewai
from crewai import Crew
+ from nullrun.instrumentation.crewai import patch_crewai, unpatch_crewai
+
original_kickoff = Crew.kickoff
assert patch_crewai(MagicMock()) is True
assert Crew.kickoff is not original_kickoff
diff --git a/tests/test_high_reliability_fixes.py b/tests/test_high_reliability_fixes.py
index 2cef4ed..db7ed4d 100644
--- a/tests/test_high_reliability_fixes.py
+++ b/tests/test_high_reliability_fixes.py
@@ -87,26 +87,42 @@ def test_cached_decision_exposes_policy_version():
# ===========================================================================
def test_fetch_remote_state_uses_transport_client(monkeypatch):
- """`_fetch_remote_state` routes through `self._transport._client.get`."""
+ """`_fetch_remote_state` routes through `self._transport._client.get`
+ and hits the org-scoped workflow endpoint (FIX-F2).
+
+ Pre-FIX-F2 the URL was ``/api/v1/status/{workflow_id}`` which 404'd
+ on the backend. The fix uses
+ ``/api/v1/orgs/{org_id}/workflows/{workflow_id}`` so the legacy
+ HTTP-poll fallback can actually observe a remote state.
+ """
from nullrun.runtime import NullRunRuntime
runtime = NullRunRuntime(api_key="test", _test_mode=True)
+ # FIX-F2: org_id is now required because the workflow endpoint is
+ # org-scoped. Set explicitly here.
+ runtime.organization_id = "00000000-0000-0000-0000-000000000abc"
called = []
class FakeClient:
def get(self, url, headers=None, timeout=None):
called.append(url)
+
class FakeResp:
status_code = 200
+
def json(self):
return {"state": "Killed", "version": 1, "reason": "test"}
+
return FakeResp()
runtime._transport._client = FakeClient()
runtime._fetch_remote_state("wf-1")
assert len(called) == 1
- assert "/api/v1/status/wf-1" in called[0]
+ assert (
+ "/api/v1/orgs/00000000-0000-0000-0000-000000000abc/workflows/wf-1"
+ in called[0]
+ )
# ===========================================================================
diff --git a/tests/test_hmac_byte_equality.py b/tests/test_hmac_byte_equality.py
index 86c2f25..bf00c52 100644
--- a/tests/test_hmac_byte_equality.py
+++ b/tests/test_hmac_byte_equality.py
@@ -45,11 +45,84 @@ def test_hmac_over_signed_bytes_matches():
secret = "sk_test_xyz789"
payload = {"organization_id": "org-1", "execution_id": "wf-1", "tool": "x"}
body = _signed_request_body(payload)
+ # codeql[py/weak-sensitive-data-hashing]: test recomputes the same SHA-256 + HMAC-SHA256 the server uses; not a password hash.
body_hash = hashlib.sha256(body).hexdigest()
msg = f"1234567890:{api_key}:{body_hash}"
+ # codeql[py/weak-sensitive-data-hashing]: test recomputes the same SHA-256 + HMAC-SHA256 the server uses; not a password hash.
expected_sig = hmac.new(
secret.encode("utf-8"), msg.encode("utf-8"), hashlib.sha256
).hexdigest()
# Just sanity check the structure matches what server expects.
assert len(expected_sig) == 64 # SHA-256 hex
- assert body_hash == hashlib.sha256(body).hexdigest()
\ No newline at end of file
+ # codeql[py/weak-sensitive-data-hashing]: SHA-256 body fingerprint for HMAC, not a password hash.
+ assert body_hash == hashlib.sha256(body).hexdigest()
+
+
+# ---------------------------------------------------------------------------
+# Canonical-bytes contract (audit 2026-06-22 #9)
+# ---------------------------------------------------------------------------
+
+
+def test_signed_request_body_matches_send_bytes():
+ """Pre-compute guard (audit #9).
+
+ The SDK signs `_signed_request_body(payload)` and then sends those
+ EXACT same bytes via httpx `content=body`. The backend
+ (`backend/src/auth/hmac.rs:466-518`) rehashes the raw wire bytes
+ it receives — if anyone "optimizes" the SDK to pre-compute HMAC
+ over a different byte representation (e.g. with sorted keys, or
+ via a second `json.dumps` round), every signed request will start
+ failing with 401.
+
+ Pin: the bytes the helper produces are the bytes the HTTP layer
+ sends. If this test breaks, every signed POST silently 401's.
+ """
+ from nullrun.transport import (
+ Transport,
+ _signed_request_body,
+ )
+
+ api_key = "nr_test_abc123"
+ secret = "sk_test_xyz789"
+ payload = {
+ "events": [
+ {"type": "llm_call", "tokens": 100, "workflow_id": "wf-1"},
+ ],
+ }
+
+ # 1. The helper produces deterministic compact bytes
+ body = _signed_request_body(payload)
+
+ # 2. The HTTP layer signs + sends the SAME bytes (no re-serialisation)
+ t = Transport(api_key=api_key, secret_key=secret, api_url="https://x.test")
+ headers = t._build_signed_headers(body=body.decode("utf-8"))
+
+ # 3. The X-Signature is computed over sha256(body); if anyone
+ # refactors to sign a different byte representation, this
+ # self-check breaks (the verifier would still pass against the
+ # helper's bytes — the failure mode is silent 401 on the wire).
+ # codeql[py/weak-sensitive-data-hashing]: SHA-256 body fingerprint for HMAC, not a password hash.
+ expected_body_hash = hashlib.sha256(body).hexdigest()
+ expected_msg = f"{headers['X-Signature-Timestamp']}:{api_key}:{expected_body_hash}".encode()
+ # codeql[py/weak-sensitive-data-hashing]: SHA-256 body fingerprint for HMAC, not a password hash.
+ expected_sig = hmac.new(
+ secret.encode("utf-8"), expected_msg, hashlib.sha256
+ ).hexdigest()
+ assert headers["X-Signature"] == expected_sig
+
+
+def test_signed_request_body_no_whitespace():
+ """Canonical-byte invariant: no spaces between key/value/separator.
+
+ The Rust backend's ``canonical_serialize`` (ws_control.rs:111)
+ produces no-whitespace JSON for HMAC inputs. The SDK HTTP path
+ pins the same invariant here so a future refactor to
+ ``json.dumps(..., indent=...)`` or similar would fail this test
+ BEFORE the silent 401 in production.
+ """
+ from nullrun.transport import _signed_request_body
+
+ body = _signed_request_body({"a": 1, "b": {"c": 2, "d": [3, 4]}})
+ assert b" " not in body, f"unexpected whitespace in canonical body: {body!r}"
+ assert b"\n" not in body
+ assert b"\t" not in body
\ No newline at end of file
diff --git a/tests/test_hmac_signing.py b/tests/test_hmac_signing.py
index ab3a4f3..81318ca 100644
--- a/tests/test_hmac_signing.py
+++ b/tests/test_hmac_signing.py
@@ -61,8 +61,10 @@ def test_signature_matches_rust_canonical_formula(self):
secret = "test-secret"
timestamp = 1700000000
body = '{"event":"test"}'
+ # codeql[py/weak-sensitive-data-hashing]: test recomputes the same SHA-256 + HMAC-SHA256 the server uses; not a password hash.
expected_body_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
expected_message = f"{timestamp}:{api_key}:{expected_body_hash}".encode()
+ # codeql[py/weak-sensitive-data-hashing]: test recomputes the same SHA-256 + HMAC-SHA256 the server uses; not a password hash.
expected = hmac.new(
secret.encode("utf-8"),
expected_message,
diff --git a/tests/test_integration_contract.py b/tests/test_integration_contract.py
new file mode 100644
index 0000000..c0075ab
--- /dev/null
+++ b/tests/test_integration_contract.py
@@ -0,0 +1,675 @@
+"""
+Contract tests pinning the SDK ↔ backend wire format.
+
+Background: each test here guards a specific class of integration drift
+discovered during the 2026-06-22 audit. The tests do not exercise the
+control-plane happy path — they pin URL shapes, HTTP verbs, header
+contracts, and field-name conventions so a future change to either side
+trips a CI signal rather than silently breaking production.
+
+If you change any of these and the tests fail, update the matching
+backend file in lock-step — do not edit one side alone.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import hmac
+import json
+import time
+
+import httpx
+import pytest
+import respx
+
+from nullrun.transport import Transport
+from nullrun.transport_websocket import (
+ WebSocketConnection,
+ compute_hmac_signature,
+ verify_hmac_signature,
+)
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F3: every POST must carry Authorization: Bearer so the
+# backend CSRF middleware's ``has_bearer_auth`` bypass fires. Without it,
+# the SDK hits the cookie-double-submit branch → 403 → SDK try/except
+# swallows → silently fail-OPEN on every SDK-side enforcement gate.
+# ─────────────────────────────────────────────────────────────────────
+
+
+@pytest.fixture
+def transport():
+ t = Transport(api_url="https://api.test.nullrun.io", api_key="nr_live_abc123def456")
+ yield t
+ t.stop()
+
+
+class TestAuthorizationHeaderOnPost:
+ """Every signed POST must include Authorization: Bearer ."""
+
+ def test_build_signed_headers_has_bearer(self):
+ t = Transport(api_url="https://api.test.nullrun.io", api_key="nr_live_abc")
+ try:
+ headers = t._build_signed_headers(body="{}")
+ assert headers["Authorization"] == "Bearer nr_live_abc"
+ assert headers["X-API-Key"] == "nr_live_abc"
+ finally:
+ t.stop()
+
+ @respx.mock
+ def test_track_batch_post_includes_bearer(self, transport):
+ route = respx.post("https://api.test.nullrun.io/api/v1/track/batch").mock(
+ return_value=httpx.Response(200, json={"ok": True})
+ )
+ transport._send_batch_with_retry_info([{"event": "test"}])
+ assert route.called
+ sent = route.calls.last.request
+ assert sent.headers["Authorization"] == "Bearer nr_live_abc123def456"
+
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F1: SDK fetches policy via GET /api/v1/orgs/{org_id}/policies
+# (not POST /api/v1/policies — the latter 404'd silently and fell through
+# to Policy.default_local()).
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestPolicyFetchContract:
+ """Pin the SDK policy-fetch shape so a backend route rename trips here."""
+
+ def test_policy_url_is_org_scoped_get(self):
+ # Pure URL/verb check — no HTTP round-trip. The actual mapping
+ # is exercised in tests/test_runtime.py; this test only pins the
+ # wire-shape contract so a refactor that re-introduces the
+ # broken /api/v1/policies POST is caught at unit-test time.
+ from nullrun.runtime import NullRunRuntime
+
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ rt.organization_id = "00000000-0000-0000-0000-000000000001"
+ captured: dict = {}
+
+ def fake_request(url: str, headers=None, timeout=None):
+ captured["url"] = url
+ captured["headers"] = headers
+
+ class _Resp:
+ status_code = 200
+
+ @staticmethod
+ def json():
+ # Wrapped list per backend PolicyListResponse
+ return {"data": [], "meta": {"total": 0}}
+
+ return _Resp()
+
+ # Patch the HTTP client to capture without a real call.
+ rt._transport._client.get = fake_request # type: ignore[assignment]
+ rt._fetch_policy()
+
+ assert captured["url"].endswith(
+ "/api/v1/orgs/00000000-0000-0000-0000-000000000001/policies"
+ ), f"unexpected policy URL: {captured['url']}"
+ finally:
+ rt.shutdown()
+
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F2: SDK fetches per-workflow state via
+# GET /api/v1/orgs/{org_id}/workflows/{workflow_id}
+# (not /api/v1/status/{workflow_id} which 404'd).
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestRemoteStateFetchContract:
+ """Pin the SDK remote-state URL so the legacy HTTP-poll fallback
+ hits a route that actually exists."""
+
+ def test_remote_state_url_is_org_scoped(self):
+ from nullrun.runtime import NullRunRuntime
+
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ rt.organization_id = "00000000-0000-0000-0000-000000000002"
+ captured: dict = {}
+
+ def fake_get(url: str, headers=None, timeout=None):
+ captured["url"] = url
+ captured["headers"] = headers
+
+ class _Resp:
+ status_code = 200
+
+ @staticmethod
+ def json():
+ return {"state": "Normal", "version": 1}
+
+ return _Resp()
+
+ rt._transport._client.get = fake_get # type: ignore[assignment]
+ rt._fetch_remote_state("wf-abc-123")
+
+ assert captured["url"].endswith(
+ "/api/v1/orgs/00000000-0000-0000-0000-000000000002/workflows/wf-abc-123"
+ ), f"unexpected remote-state URL: {captured['url']}"
+ finally:
+ rt.shutdown()
+
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F5: ACK payload's received_at must be unix seconds (not ms) to
+# match backend's WsMessage::Ack field contract.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestAckUnitsContract:
+ """Pin ACK.received_at to seconds so backend analytics don't get
+ timestamps 1000× too large."""
+
+ def test_ack_received_at_is_seconds(self):
+ # Build the same ACK envelope the SDK emits from
+ # transport_websocket._handle_state_change_with_ack.
+ before = int(time.time())
+ ack = {
+ "type": "ack",
+ "message_id": "msg-1",
+ "received_at": int(time.time()),
+ }
+ after = int(time.time())
+
+ # Pin unit: must be within 1s of wall clock, NOT 1000s.
+ assert before - 1 <= ack["received_at"] <= after + 1, (
+ "ACK.received_at must be unix seconds; got value that doesn't "
+ f"match current time: {ack['received_at']} (now={int(time.time())})"
+ )
+ # Defensive: must NOT be in the milliseconds range (> 10^12 for 2026).
+ assert ack["received_at"] < 10_000_000_000, (
+ "ACK.received_at looks like milliseconds — server-side analytics "
+ "would interpret it as year 2286+."
+ )
+
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F4 / FIX-F6 contract: WS HMAC identity is the user-facing
+# ``api_key`` (e.g. ``nr_live_...``), NOT the internal UUID ``key_id``.
+# SDK reads it from the envelope field ``api_key`` (backwards-compat:
+# pre-FIX-F4 envelopes with field name ``api_key_id`` carrying the
+# same value are still accepted). Backend signer uses
+# ``auth_context.api_key()`` — see
+# backend/src/proxy/http/ws_control.rs:680-682 + 65-79 + auth/mod.rs.
+#
+# Pin: any drift between the two sides trips here.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestWsHmacIdentityContract:
+ """The HMAC identity for WS messages is the user-facing api_key,
+ not the internal UUID key_id. Pre-FIX-F4 the field was named
+ ``api_key_id`` on the wire but still carried the user-facing value;
+ the rename to ``api_key`` makes the contract honest. The SDK
+ accepts either field name for the rolling-deploy window."""
+
+ def test_envelope_with_user_facing_api_key_verifies(self):
+ """The SDK must accept messages signed with the user-facing
+ api_key (FIX-F4)."""
+ USER_KEY = "nr_live_userfacing_abc123"
+ SECRET = "shared-secret"
+
+ msg = {"type": "state_change", "workflow_id": "wf-1", "state": "Normal", "version": 1}
+ payload_bytes = json.dumps(msg, separators=(",", ":")).encode("utf-8")
+ ts = int(time.time())
+ sig = compute_hmac_signature(USER_KEY, SECRET, ts, payload_bytes)
+ envelope = dict(msg)
+ envelope.update(
+ {
+ "signature": sig,
+ "timestamp": ts,
+ "api_key": USER_KEY,
+ "signed_payload": payload_bytes.hex(),
+ }
+ )
+
+ # Pure-function verify — same as what _handle_message uses.
+ assert verify_hmac_signature(USER_KEY, SECRET, ts, payload_bytes, sig)
+
+ def test_envelope_legacy_api_key_id_field_still_accepted(self):
+ """Pre-FIX-F4 servers published the same value under the
+ field name ``api_key_id``. The SDK must accept that for the
+ rolling-deploy window. After both sides are on FIX-F4, this
+ compatibility path can be removed."""
+ USER_KEY = "nr_live_userfacing_abc123"
+ SECRET = "shared-secret"
+
+ msg = {"type": "state_change", "workflow_id": "wf-1", "state": "Normal", "version": 1}
+ payload_bytes = json.dumps(msg, separators=(",", ":")).encode("utf-8")
+ ts = int(time.time())
+ sig = compute_hmac_signature(USER_KEY, SECRET, ts, payload_bytes)
+
+ # Sanity: pure verify with the user-facing key passes.
+ assert verify_hmac_signature(USER_KEY, SECRET, ts, payload_bytes, sig)
+
+ def test_envelope_signature_uses_user_facing_key_not_uuid(self):
+ """FIX-F4: the HMAC identity on the wire is the user-facing
+ api_key, never the internal UUID. If a refactor reintroduces
+ the UUID-based identity, this test fails."""
+ USER_KEY = "nr_live_userfacing_abc123"
+ WRONG_UUID = "0b7632e8-11d8-4247-8666-c72b5320b4f6"
+ SECRET = "shared-secret"
+
+ msg = {"type": "state_change", "workflow_id": "wf-1", "state": "Normal", "version": 1}
+ payload_bytes = json.dumps(msg, separators=(",", ":")).encode("utf-8")
+ ts = int(time.time())
+
+ # Server (FIX-F4) signs with the user-facing key.
+ prod_sig = compute_hmac_signature(USER_KEY, SECRET, ts, payload_bytes)
+
+ # Verify with user-facing key (matches production) → passes.
+ assert verify_hmac_signature(USER_KEY, SECRET, ts, payload_bytes, prod_sig), (
+ "FIX-F4: verification with user-facing api_key must succeed — "
+ "this is the production wire shape"
+ )
+ # Verify with the UUID — must fail. Pin the asymmetry:
+ # if a refactor reintroduces UUID-based identity, this test
+ # fails loudly instead of breaking the SDK round-trip in
+ # production.
+ assert not verify_hmac_signature(
+ WRONG_UUID, SECRET, ts, payload_bytes, prod_sig
+ ), (
+ "FIX-F4: signature computed with user-facing api_key MUST NOT "
+ "verify against the UUID — a pass here means signer and verifier "
+ "drifted back to the pre-FIX-F4 shape"
+ )
+
+
+# ─────────────────────────────────────────────────────────────────────
+# FIX-F1: Policy.from_dict maps backend PolicyResponse fields.
+# Pin that rate_limit_per_minute is the source for SDK's rate_limit,
+# and detection flags round-trip.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestPolicyMappingContract:
+ """Policy.from_dict reads the backend PolicyResponse shape."""
+
+ def test_rate_limit_per_minute_maps_to_rate_limit(self):
+ from nullrun.runtime import Policy
+
+ backend_entry = {
+ "id": "pol-1",
+ "name": "rate-limit",
+ "policy_type": "rate_limit",
+ "scope": "org",
+ "config": {},
+ "is_active": True,
+ "version": 1,
+ "rate_limit_per_minute": 42,
+ "loop_detection_enabled": True,
+ "anomaly_detection_enabled": True,
+ "loop_threshold": 7,
+ "retry_threshold": 4,
+ }
+ p = Policy.from_dict(backend_entry)
+ assert p.rate_limit == 42
+ assert p.loop_threshold == 7
+ assert p.retry_threshold == 4
+ assert p.anomaly_detection_enabled is True
+ assert p.loop_detection_enabled is True
+ # Fields the backend doesn't surface fall back to defaults.
+ assert p.budget_cents == 1000
+ assert p.retry_detection_enabled is True
+
+ def test_legacy_field_name_still_supported(self):
+ """Old SDK code (and any test fixture) may send ``rate_limit``
+ directly. The mapping must accept that too — pin backwards
+ compat so a refactor that removes it trips here."""
+ from nullrun.runtime import Policy
+
+ p = Policy.from_dict({"rate_limit": 99})
+ assert p.rate_limit == 99
+
+
+# ─────────────────────────────────────────────────────────────────────
+# Canonical-bytes guard: pin the current behaviour where SDK and
+# backend serialise the same dict differently (insertion order vs.
+# sorted keys) but the divergence is harmless today because:
+# - WS path: signed_payload bytes are sent over the wire verbatim
+# (FIX-C in transport_websocket.py)
+# - HTTP path: SDK sends its own bytes via content=body; the backend
+# hashes exactly what it received (HMAC fix B6 in transport.py)
+#
+# If someone tries to UNIFY these by pre-computing HTTP HMAC and
+# re-canonicalising on the backend, signatures will silently diverge.
+# This guard pins that scenario as a known-broken shape so the
+# refactorer is forced to make a conscious decision.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestCanonicalBytesGuard:
+ """Pin the canonical-bytes divergence so a unifying refactor trips."""
+
+ def test_sdk_serialization_uses_insertion_order(self):
+ # SDK uses ``json.dumps(payload, separators=(",", ":"))``
+ # which preserves Python dict insertion order. The backend
+ # uses ``canonical_serialize`` which sorts keys. They
+ # intentionally differ — the divergence is harmless today
+ # because each side hashes the bytes it emitted / received.
+ # If you change this assertion, also re-read
+ # backend/src/proxy/http/ws_control.rs::canonical_serialize
+ # and confirm both sides agree on a single canonical form
+ # for HMAC inputs.
+ import json as _json
+
+ payload = {"b": 1, "a": 2, "c": 3}
+ sdk_bytes = _json.dumps(payload, separators=(",", ":")).encode("utf-8")
+ assert sdk_bytes == b'{"b":1,"a":2,"c":3}', (
+ "SDK serialization order changed. If you intended to switch "
+ "to a canonical (sorted-key) form, also update "
+ "backend/src/proxy/http/ws_control.rs::canonical_serialize "
+ "to match — otherwise HTTP HMAC will silently diverge."
+ )
+
+ def test_sdk_signed_request_body_matches_dumped_body(self):
+ """The HMAC over the request body must use the exact bytes
+ the SDK sends on the wire (``content=body`` in
+ ``_track_batch`` / ``_gate_request`` etc.). This test pins
+ that the body bytes round-trip through ``json.dumps`` with
+ no mutation between signing and sending."""
+ import json as _json
+
+ from nullrun.transport import _signed_request_body
+
+ payload = {"workflow_id": "wf-1", "tokens": 100, "foo": "bar"}
+ signed_body = _signed_request_body(payload)
+ # Same dict → same bytes (no silent mutation).
+ assert signed_body == _json.dumps(payload, separators=(",", ":")).encode("utf-8")
+
+
+# ─────────────────────────────────────────────────────────────────────
+# F-R2-01 (audit 2026-06-22): SDK must call /api/v1/execute (not
+# /api/v1/gate) for sensitive-tool enforcement. /gate is advisory and
+# does not check the API key's `execute` scope — calling it on a
+# sensitive tool silently skips the scope gate, letting an API key
+# with only `read`/`write` scopes drive a sensitive-tool decision.
+#
+# Pin: Transport.execute POSTs to /api/v1/execute. A refactor that
+# routes it back to /gate trips here.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestSensitiveToolRoutesToExecute:
+ """Sensitive-tool pre-check must hit /api/v1/execute."""
+
+ @respx.mock
+ def test_execute_routes_to_api_v1_execute(self, transport):
+ execute_route = respx.post(
+ "https://api.test.nullrun.io/api/v1/execute"
+ ).mock(return_value=httpx.Response(200, json={"decision": "allow"}))
+ gate_route = respx.post(
+ "https://api.test.nullrun.io/api/v1/gate"
+ ).mock(return_value=httpx.Response(200, json={"decision": "allow"}))
+
+ transport.execute(
+ organization_id="00000000-0000-0000-0000-000000000001",
+ execution_id="wf-1",
+ trace_id="trace-1",
+ tool="my.sensitive.tool",
+ input_data={"x": 1},
+ )
+
+ assert execute_route.called, (
+ "F-R2-01: Transport.execute must POST to /api/v1/execute "
+ "so the backend checks the `execute` scope. Pre-fix this "
+ "routed to /api/v1/gate (advisory, no scope check) and "
+ "silently let API keys without `execute` scope drive a "
+ "sensitive-tool decision."
+ )
+ assert not gate_route.called, (
+ "F-R2-01: /api/v1/gate must NOT be called by Transport.execute. "
+ "It is reserved for budget pre-flight (Transport.check)."
+ )
+
+
+# ─────────────────────────────────────────────────────────────────────
+# F-R2-02 (audit 2026-06-22): SDK policy fetch must fail-CLOSED on a
+# 5xx response, not silently fall through to Policy.default_local().
+# Pre-fix: any HTTP exception / non-200 status / empty `{"data": []}`
+# silently used Policy.default_local() (budget_cents=1000,
+# rate_limit=100, loop_threshold=6 — i.e. effectively unenforced).
+# Post-fix: cache the last good policy, fall back to
+# Policy.strict_local() if no cache, opt-out via
+# NULLRUN_POLICY_FAIL_OPEN=1.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestPolicyFetchFailClosed:
+ """Policy fetch failures must not widen enforcement."""
+
+ def test_503_response_uses_strict_local_not_default(self, monkeypatch):
+ """A 503 from the backend's /policies endpoint must NOT silently
+ use Policy.default_local() — that is fail-OPEN on every
+ enforcement gate. The SDK should fall back to the cached policy
+ (if any), then to Policy.strict_local() (zero budget,
+ 1-call rate limit, first-repetition loop detection)."""
+ from nullrun.runtime import NullRunRuntime
+
+ monkeypatch.delenv("NULLRUN_POLICY_FAIL_OPEN", raising=False)
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ rt.organization_id = "00000000-0000-0000-0000-000000000099"
+
+ class _Resp:
+ status_code = 503
+
+ @staticmethod
+ def json():
+ return {"error": "backend_unavailable"}
+
+ def fake_get(url, headers=None, timeout=None):
+ return _Resp()
+
+ rt._transport._client.get = fake_get # type: ignore[assignment]
+ rt._fetch_policy()
+
+ # Fail-CLOSED: strict_local() = budget_cents=0, rate_limit=1.
+ assert rt._policy.budget_cents == 0, (
+ f"F-R2-02: 5xx on policy fetch must use Policy.strict_local() "
+ f"(budget_cents=0). Got budget_cents={rt._policy.budget_cents}. "
+ f"Pre-fix this was Policy.default_local() with "
+ f"budget_cents=1000 (fail-OPEN on every gate)."
+ )
+ assert rt._policy.rate_limit == 1
+ assert rt._policy.loop_threshold == 1
+ finally:
+ rt.shutdown()
+
+ def test_503_response_with_cached_policy_uses_cache(self, monkeypatch):
+ """If we have a last-good cached policy, a 503 should preserve
+ the customer's existing limits — not silently widen them."""
+ from nullrun.runtime import NullRunRuntime, Policy
+
+ monkeypatch.delenv("NULLRUN_POLICY_FAIL_OPEN", raising=False)
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ rt.organization_id = "00000000-0000-0000-0000-000000000099"
+ rt._last_good_policy = Policy(
+ budget_cents=5_000,
+ rate_limit=42,
+ loop_threshold=7,
+ retry_threshold=4,
+ )
+
+ class _Resp:
+ status_code = 503
+
+ @staticmethod
+ def json():
+ return {"error": "backend_unavailable"}
+
+ rt._transport._client.get = lambda url, headers=None, timeout=None: _Resp() # type: ignore[assignment]
+ rt._fetch_policy()
+
+ # Cache wins: customer's existing limits preserved.
+ assert rt._policy.budget_cents == 5_000, (
+ "F-R2-02: when a last-good policy is cached, the SDK must "
+ "preserve the customer's existing limits on a 503. "
+ "Pre-fix this dropped to Policy.default_local() and "
+ "silently widened enforcement."
+ )
+ assert rt._policy.rate_limit == 42
+ finally:
+ rt.shutdown()
+
+ def test_opt_out_env_var_restores_default(self, monkeypatch):
+ """NULLRUN_POLICY_FAIL_OPEN=1 must opt back into the legacy
+ permissive fallback for tests / staging environments."""
+ from nullrun.runtime import NullRunRuntime
+
+ monkeypatch.setenv("NULLRUN_POLICY_FAIL_OPEN", "1")
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ rt.organization_id = "00000000-0000-0000-0000-000000000099"
+
+ class _Resp:
+ status_code = 503
+
+ @staticmethod
+ def json():
+ return {}
+
+ rt._transport._client.get = lambda url, headers=None, timeout=None: _Resp() # type: ignore[assignment]
+ rt._fetch_policy()
+
+ # Opt-out path: default_local() = budget_cents=1000, rate_limit=100.
+ assert rt._policy.budget_cents == 1000
+ assert rt._policy.rate_limit == 100
+ finally:
+ rt.shutdown()
+
+
+# ─────────────────────────────────────────────────────────────────────
+# F-R2-14 (audit 2026-06-22): outgoing WebSocket ACK is plain JSON,
+# NOT signed. CHANGELOG 0.5.2 overclaimed "signed outgoing ACKs". This
+# test pins the actual wire shape so a future signer that adds the
+# signature field doesn't break clients expecting 3-field ACKs (and
+# vice versa).
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestOutgoingAckIsPlainJson:
+ """Pin the SDK's ACK wire shape: 3 fields, no signature.
+
+ Until the backend enables ACK authenticity verification (the TODO
+ at backend/src/proxy/http/ws_control.rs:842-848), adding a
+ signature field would be cargo-culted. If you change this test,
+ coordinate with the backend signer first."""
+
+ def test_ack_envelope_has_three_fields(self):
+ # Mirrors transport_websocket._send_ack shape:
+ # {"type": "ack", "message_id": ..., "received_at": ...}
+ ack = {
+ "type": "ack",
+ "message_id": "msg-1",
+ "received_at": int(time.time()),
+ }
+ assert set(ack.keys()) == {"type", "message_id", "received_at"}, (
+ "F-R2-14: outgoing ACK envelope must contain exactly "
+ "{type, message_id, received_at}. If you added a "
+ "signature/timestamp field, the backend now needs to verify "
+ "it (see ws_control.rs:842-848 TODO) — coordinate before "
+ "shipping."
+ )
+ assert "signature" not in ack
+ assert "timestamp" not in ack
+
+
+# ─────────────────────────────────────────────────────────────────────
+# F-R2-06 (audit 2026-06-22): the SDK must accept ALL FIVE
+# ``WsWorkflowState`` variants: Normal, Flagged, Tripped, Paused,
+# Killed. Pre-fix the SDK dropped Flagged / Tripped rows on the floor
+# because the local enum was 3-variant. The frontend mirrors this
+# state union.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestAllFiveWorkflowStatesAccepted:
+ """Pin that the SDK WS handler accepts every WsWorkflowState variant."""
+
+ @pytest.mark.parametrize(
+ "state_name",
+ ["Normal", "Flagged", "Tripped", "Paused", "Killed"],
+ )
+ def test_ws_state_change_accepted(self, state_name):
+ """Each of the five canonical WsWorkflowState strings must
+ round-trip through the SDK's WS handler without being
+ rejected / filtered / coerced to a fallback."""
+ # Pure-function check: the SDK does not maintain a hard-coded
+ # list of acceptable states. The state name flows through to
+ # _remote_state_for() and back to check_control_plane() as-is.
+ # If a future refactor narrows the accepted set (e.g. by
+ # adding an enum with only 3 variants), this test fails.
+ from nullrun.runtime import NullRunRuntime
+
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ wf_id = f"wf-{state_name.lower()}"
+ # Inject a state push via the public _set_remote_state path.
+ rt._set_remote_state(wf_id, {"state": state_name, "version": 1})
+ cached = rt._remote_state_for(wf_id)
+ assert cached["state"] == state_name, (
+ f"F-R2-06: WsWorkflowState variant {state_name!r} must round-trip "
+ f"through _set_remote_state / _remote_state_for. Got "
+ f"{cached['state']!r}. Pre-fix the SDK had a 3-variant union "
+ f"and silently dropped Flagged/Tripped rows."
+ )
+ finally:
+ rt.shutdown()
+
+
+# ─────────────────────────────────────────────────────────────────────
+# F-R2-12 (audit 2026-06-22): track_event() must register a new
+# workflow_id in _remote_states atomically against concurrent WS
+# pushes. Pre-fix the lock was held only across setdefault, leaving
+# a window where a WS push could overwrite a freshly-empty dict and
+# then the next track_event() call would create a brand-new empty
+# dict again — silently losing remote KILL/PAUSE state between the
+# WS push and the next event.
+#
+# Pin: the only path that mutates _remote_states is the locked helper
+# _remote_state_for (or _set_remote_state). No bare setdefault.
+# ─────────────────────────────────────────────────────────────────────
+
+
+class TestRemoteStatesAtomicRegistration:
+ """track_event() must register workflow_id atomically."""
+
+ def test_track_event_uses_locked_helper_for_setdefault(self):
+ """The setdefault that primes _remote_states for a new workflow
+ must be inside a single ``with self._states_lock:`` block (or
+ routed through the locked _remote_state_for helper)."""
+ import inspect
+
+ from nullrun.runtime import NullRunRuntime
+
+ rt = NullRunRuntime(api_key="nr_live_x", _test_mode=True)
+ try:
+ # The registration site lives in track() (called from
+ # track_event / track_llm / track_tool). Pin it there.
+ src = inspect.getsource(rt.track)
+ # Pin: no bare ``self._remote_states.setdefault(...)`` calls
+ # outside a lock context.
+ assert "self._remote_states.setdefault(" not in src, (
+ "F-R2-12: track() must not call "
+ "self._remote_states.setdefault() directly. Use "
+ "_remote_state_for() which holds _states_lock for the "
+ "entire setdefault — bare setdefault outside the lock "
+ "creates a window where a concurrent WS push wins the "
+ "race and silently loses KILL/PAUSE state."
+ )
+ # Pin: the locked helper IS the path used.
+ assert "_remote_state_for" in src, (
+ "F-R2-12: track() must use _remote_state_for() to "
+ "register the workflow_id atomically."
+ )
+ finally:
+ rt.shutdown()
diff --git a/tests/test_langgraph_callback.py b/tests/test_langgraph_callback.py
index 339efa4..d048945 100644
--- a/tests/test_langgraph_callback.py
+++ b/tests/test_langgraph_callback.py
@@ -20,13 +20,12 @@
import pytest
from nullrun.instrumentation.langgraph import (
- NullRunCallback,
_ACTIVE_RUNS_MAX,
+ NullRunCallback,
_extract_node_name,
extract_usage_from_response,
)
-
# ─── extract_usage_from_response ─────────────────────────────────────
diff --git a/tests/test_observability.py b/tests/test_observability.py
index 90f6888..994688d 100644
--- a/tests/test_observability.py
+++ b/tests/test_observability.py
@@ -76,7 +76,10 @@ def test_execute_increments_allowed_counter(self, mock_api, make_runtime):
def test_execute_increments_blocked_counter(self, mock_api, make_runtime):
"""execute() when blocked=True updates execute_blocked."""
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): Transport.execute now hits
+ # /api/v1/execute (not /gate) so the backend checks the
+ # `execute` scope. The mock needs to move with the contract.
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
return_value=httpx.Response(200, json={
"decision": "block",
"explanation": "cost_limit_exceeded",
diff --git a/tests/test_preflight_fail_policy.py b/tests/test_preflight_fail_policy.py
index e55e03a..5d8809e 100644
--- a/tests/test_preflight_fail_policy.py
+++ b/tests/test_preflight_fail_policy.py
@@ -217,7 +217,7 @@ def test_transport_error_fails_closed(
):
"""Network error on /execute → NullRunBlockedException,
body does NOT run. Regression for bug #2."""
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt, charge_card, calls = self._build_protected_sensitive_tool(
@@ -236,7 +236,7 @@ def test_classified_transport_error_surfaces_source(
"""The reason on the raised NullRunBlockedException includes
the classified source (NETWORK_ERROR / GATEWAY_ERROR /
BREAKER_OPEN) so the audit trail can distinguish them."""
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt, charge_card, calls = self._build_protected_sensitive_tool(
@@ -254,7 +254,9 @@ def test_classified_transport_error_surfaces_source(
def test_5xx_fails_closed(self, make_runtime, mock_api):
"""HTTP 5xx on /execute → NullRunBlockedException, body
does not run."""
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): sensitive-tool enforcement now
+ # hits /api/v1/execute (was /gate). The mock must follow.
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
return_value=httpx.Response(502, text="Bad Gateway")
)
rt, charge_card, calls = self._build_protected_sensitive_tool(
@@ -306,7 +308,7 @@ def test_opt_out_allows_body_when_engine_absent(
back into fail-OPEN behavior — for dev / test environments
where the policy engine is intentionally absent."""
monkeypatch.setenv("NULLRUN_SENSITIVE_FAIL_OPEN", "1")
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt, charge_card, calls = self._build_protected_sensitive_tool(
@@ -325,7 +327,10 @@ def test_real_block_still_honored(
fail-CLOSED rule applies to *both* transport failure and
real policy blocks — the opt-out is scoped to transport
errors only."""
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): /api/v1/execute is the canonical
+ # sensitive-tool route. /api/v1/gate is reserved for budget
+ # pre-flight only.
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
return_value=httpx.Response(200, json={
"decision": "block",
"explanation": "blocked by policy",
@@ -475,7 +480,7 @@ def test_check_raises_classified_error_on_network(self, mock_api):
"""transport.check with on_transport_error='raise' must
surface classified NETWORK_ERROR."""
from nullrun.transport import Transport
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt = Transport(api_url=BASE_URL, api_key="k")
@@ -490,7 +495,9 @@ def test_execute_raises_classified_error_on_5xx(self, mock_api):
"""transport.execute with on_transport_error='raise' must
surface classified GATEWAY_ERROR on 5xx."""
from nullrun.transport import Transport
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): Transport.execute routes to
+ # /api/v1/execute (not /gate) — see transport.py:1188.
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
return_value=httpx.Response(500, text="boom")
)
rt = Transport(api_url=BASE_URL, api_key="k")
@@ -509,7 +516,7 @@ def test_execute_open_returns_fallback_allow(self, mock_api):
that want the dict shape (e.g. for audit, not for
enforcement)."""
from nullrun.transport import Transport
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt = Transport(api_url=BASE_URL, api_key="k")
@@ -525,7 +532,7 @@ def test_execute_closed_returns_fallback_block(self, mock_api):
"""transport.execute with on_transport_error='closed' returns
a synthetic block with FALLBACK_* source."""
from nullrun.transport import Transport
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
side_effect=httpx.ConnectError("connection refused")
)
rt = Transport(api_url=BASE_URL, api_key="k")
diff --git a/tests/test_runtime.py b/tests/test_runtime.py
index 0e57d36..55256c8 100644
--- a/tests/test_runtime.py
+++ b/tests/test_runtime.py
@@ -113,7 +113,11 @@ def test_execute_allowed_returns_result(self, make_runtime, mock_api):
assert result["decision"] == "allow"
def test_execute_blocked_raises(self, make_runtime, mock_api):
- respx.post(f"{BASE_URL}/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): runtime.execute → Transport.execute
+ # now hits /api/v1/execute (not /gate). Pre-fix this mocked
+ # /gate which silently swallowed the request (no scope check)
+ # and let an API key without `execute` scope drive the block.
+ respx.post(f"{BASE_URL}/api/v1/execute").mock(
return_value=httpx.Response(200, json={
"decision": "block",
"explanation": "cost_limit_exceeded",
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 926a055..bcfedae 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -164,7 +164,10 @@ def test_execute_fallback_cached_no_cache_allows(self, transport):
@respx.mock
def test_execute_success_caches_decision(self, transport):
"""Successful execute caches the decision for future fallback."""
- respx.post("https://api.test.nullrun.io/api/v1/gate").mock(
+ # Audit F-R2-01 (2026-06-22): Transport.execute now hits
+ # /api/v1/execute (was /gate) so the backend checks the
+ # `execute` scope.
+ respx.post("https://api.test.nullrun.io/api/v1/execute").mock(
return_value=httpx.Response(200, json={
"decision": "allow",
"policy_id": "policy-123",
diff --git a/tests/test_transport_branches.py b/tests/test_transport_branches.py
index 2f160ac..3ea26a5 100644
--- a/tests/test_transport_branches.py
+++ b/tests/test_transport_branches.py
@@ -533,7 +533,7 @@ def test_connect_websocket_rejects_non_http_scheme():
asyncio.run(t.connect_websocket(organization_id="org-1"))
-def test_connect_websocket_uses_wss_for_https():
+def test_connect_websocket_uses_wss_for_https(monkeypatch):
t = _build_transport()
t.api_url = "https://api.nullrun.io"
@@ -550,7 +550,11 @@ async def connect(self):
return self
monkey_url = "wss://api.nullrun.io/ws/control/org-1"
- tw_mod.WebSocketConnection = _FakeConn
+ # monkeypatch restores the original WebSocketConnection on test
+ # teardown — without it, the leaked fake class breaks every later
+ # test that imports ``WebSocketConnection`` from the module
+ # (e.g. test_reconnect_cap.py's ``inspect.getsource`` assertions).
+ monkeypatch.setattr(tw_mod, "WebSocketConnection", _FakeConn)
import asyncio
@@ -558,7 +562,7 @@ async def connect(self):
assert captured["url"] == monkey_url
-def test_connect_websocket_uses_ws_for_http_localhost():
+def test_connect_websocket_uses_ws_for_http_localhost(monkeypatch):
"""Loopback http:// → ws:// (not wss://) for local dev."""
t = Transport(
api_url="http://localhost:8080",
@@ -578,7 +582,8 @@ def __init__(self, url, **kwargs):
async def connect(self):
return self
- tw_mod.WebSocketConnection = _FakeConn
+ # Same leak fix as the wss test above — monkeypatch auto-restores.
+ monkeypatch.setattr(tw_mod, "WebSocketConnection", _FakeConn)
import asyncio
diff --git a/tests/test_ws_signed_payload.py b/tests/test_ws_signed_payload.py
index 8bdca1c..f263476 100644
--- a/tests/test_ws_signed_payload.py
+++ b/tests/test_ws_signed_payload.py
@@ -22,6 +22,7 @@
6. Replayed signed_payload from a different message body -> rejected
(signature binds the body, not the envelope).
"""
+
from __future__ import annotations
import asyncio
@@ -38,7 +39,6 @@
verify_hmac_signature,
)
-
# --- helpers ---------------------------------------------------------------
@@ -61,6 +61,40 @@ def _build_signed_envelope(message: dict, api_key: str, secret_key: str) -> dict
return envelope
+def _build_real_server_envelope(
+ message: dict,
+ user_facing_api_key: str,
+ api_key_id: str,
+ secret_key: str,
+) -> dict:
+ """Mimic the real server's signing shape (FIX-D): the HMAC is
+ computed over ``api_key_id`` (the UUID key_id from
+ ``auth_context.key_id()``), NOT over the user-facing
+ ``nr_live_...`` api_key. The envelope publishes only
+ ``api_key_id`` — the user-facing key never appears on the wire.
+
+ The previous helper ``_build_signed_envelope`` used the same value
+ for both, which masked the bug fixed in FIX-D.
+ """
+ timestamp = int(time.time())
+ payload_json = json.dumps(message, separators=(",", ":"))
+ signature = compute_hmac_signature(
+ api_key_id, secret_key, timestamp, payload_json.encode("utf-8")
+ )
+ envelope = dict(message)
+ envelope["signature"] = signature
+ envelope["timestamp"] = timestamp
+ envelope["api_key_id"] = api_key_id
+ envelope["signed_payload"] = payload_json.encode("utf-8").hex()
+ # Note: ``user_facing_api_key`` is intentionally NOT included in the
+ # envelope — that's exactly how the real server behaves.
+ assert user_facing_api_key != api_key_id, (
+ "Test setup error: user-facing key and api_key_id must differ "
+ "to reproduce the FIX-D bug condition."
+ )
+ return envelope
+
+
def _build_legacy_envelope(message: dict, api_key: str, secret_key: str) -> dict:
"""Pre-FIX-C envelope: signature, timestamp, api_key_id present,
but signed_payload absent. The bytes the server signed were
@@ -90,17 +124,11 @@ def test_compute_and_verify_hmac_round_trip():
payload = b'{"type":"state_change","workflow_id":"wf-1","state":"Killed","version":2}'
ts = int(time.time())
sig = compute_hmac_signature("api_key_123", "secret_xyz", ts, payload)
- assert verify_hmac_signature(
- "api_key_123", "secret_xyz", ts, payload, sig
- )
+ assert verify_hmac_signature("api_key_123", "secret_xyz", ts, payload, sig)
# Different secret -> reject
- assert not verify_hmac_signature(
- "api_key_123", "wrong_secret", ts, payload, sig
- )
+ assert not verify_hmac_signature("api_key_123", "wrong_secret", ts, payload, sig)
# Different payload -> reject
- assert not verify_hmac_signature(
- "api_key_123", "secret_xyz", ts, payload + b" ", sig
- )
+ assert not verify_hmac_signature("api_key_123", "secret_xyz", ts, payload + b" ", sig)
def test_verify_hmac_signature_rejects_expired_timestamp():
@@ -347,9 +375,9 @@ async def test_replayed_signed_payload_with_spliced_body_is_rejected(monkeypatch
# And a real forgery — replacing the signed_payload bytes to
# say "Killed" without re-signing — must be rejected.
state_changes.clear()
- forged["signed_payload"] = json.dumps(
- {**legit, "state": "Killed"}, separators=(",", ":")
- ).encode("utf-8").hex()
+ forged["signed_payload"] = (
+ json.dumps({**legit, "state": "Killed"}, separators=(",", ":")).encode("utf-8").hex()
+ )
raw2 = json.dumps(forged)
await conn._handle_message(raw2)
assert state_changes == [] # signature no longer matches
@@ -396,3 +424,252 @@ async def test_acknowledged_states_use_pascalcase(monkeypatch):
raw = json.dumps(envelope)
await conn._handle_message(raw)
assert any(b'"type": "ack"' in s and b"msg-ack" in s for s in stub.sent)
+
+
+# --- Audit-2026-06-22 #6: WS ACK case-insensitive defensive ---
+
+
+@pytest.mark.asyncio
+async def test_ws_ack_lowercase_state_still_sends_ack(monkeypatch):
+ """Audit-2026-06-22 #6: the WS path used to exact-match PascalCase
+ only. A server regression to ``"killed"``/``"paused"`` would
+ silently drop the ACK. The defensive helper
+ ``_is_acknowledged_state`` accepts both, while the
+ ``ACKNOWLEDGED_STATES`` set stays PascalCase-only so the
+ ``test_acknowledged_states_use_pascalcase`` invariant is
+ preserved."""
+ state_changes: list[dict] = []
+ conn = WebSocketConnection(
+ url="wss://example.invalid/ws/control/org-1",
+ headers={},
+ api_key="api_key_123",
+ secret_key="secret_xyz",
+ on_state_change=state_changes.append,
+ )
+ stub = _StubWS()
+ monkeypatch.setattr(conn, "_conn", stub)
+ conn._running = True
+
+ for lowercase_state in ("killed", "paused"):
+ state_changes.clear()
+ stub.sent.clear()
+ msg = {
+ "type": "state_change",
+ "workflow_id": f"wf-{lowercase_state}",
+ "state": lowercase_state, # server regression to lowercase
+ "version": 5,
+ "message_id": f"msg-{lowercase_state}",
+ }
+ envelope = _build_signed_envelope(msg, "api_key_123", "secret_xyz")
+ raw = json.dumps(envelope)
+ await conn._handle_message(raw)
+
+ # ACK must be sent even with lowercase state.
+ assert any(
+ b'"type": "ack"' in s and lowercase_state.encode() in s
+ for s in stub.sent
+ ), f"ACK not sent for lowercase state={lowercase_state!r}"
+
+ # ACKNOWLEDGED_STATES itself stays PascalCase — pin that.
+ assert "Killed" in WebSocketConnection.ACKNOWLEDGED_STATES
+ assert "Paused" in WebSocketConnection.ACKNOWLEDGED_STATES
+ assert "killed" not in WebSocketConnection.ACKNOWLEDGED_STATES
+ assert "paused" not in WebSocketConnection.ACKNOWLEDGED_STATES
+
+ # And _is_acknowledged_state returns True for both casings.
+ assert WebSocketConnection._is_acknowledged_state("Killed")
+ assert WebSocketConnection._is_acknowledged_state("killed")
+ assert WebSocketConnection._is_acknowledged_state("Paused")
+ assert WebSocketConnection._is_acknowledged_state("paused")
+ assert not WebSocketConnection._is_acknowledged_state("Normal")
+ assert not WebSocketConnection._is_acknowledged_state("flagged")
+
+
+# --- FIX-D regression: server signs with api_key_id (UUID), not user-facing key ---
+
+
+@pytest.mark.asyncio
+async def test_real_server_envelope_with_distinct_api_key_id_is_accepted(monkeypatch):
+ """FIX-D regression: the real NULLRUN backend signs HMAC over
+ ``api_key_id`` (the UUID key_id from ``auth_context.key_id()``),
+ NOT the user-facing ``nr_live_...`` api_key passed to
+ ``nullrun.init()``. The SDK must read ``api_key_id`` from the
+ envelope and use it as the HMAC identifier — otherwise every
+ signed WS message is rejected with "Invalid HMAC signature".
+
+ Pre-FIX-D behaviour: SDK called ``verify_hmac_signature(
+ self.api_key, ...)`` with the user-facing key, which never matched
+ the server's UUID-based signature. This test would fail under that
+ code path with the same production error reported on 2026-06-22.
+ """
+ state_changes: list[dict] = []
+ USER_FACING_KEY = "nr_live_SsBF9OMYcVCgRCNcCVcJ4khTOPKx79JG"
+ API_KEY_ID = "0b7632e8-11d8-4247-8666-c72b5320b4f6" # UUID
+ SECRET = "secret-from-_authenticate"
+
+ conn = WebSocketConnection(
+ url="wss://api.nullrun.io/ws/control/org-x",
+ headers={},
+ api_key=USER_FACING_KEY,
+ secret_key=SECRET,
+ on_state_change=state_changes.append,
+ )
+ stub = _StubWS()
+ monkeypatch.setattr(conn, "_conn", stub)
+ conn._running = True
+
+ msg = {
+ "type": "state_change",
+ "workflow_id": "wf-1",
+ "state": "Normal",
+ "version": 5,
+ }
+ envelope = _build_real_server_envelope(
+ msg,
+ user_facing_api_key=USER_FACING_KEY,
+ api_key_id=API_KEY_ID,
+ secret_key=SECRET,
+ )
+ # Sanity: the envelope must NOT carry the user-facing key (the
+ # real server only ships the api_key_id UUID on the wire).
+ assert "api_key" not in envelope
+ assert envelope["api_key_id"] == API_KEY_ID
+
+ raw = json.dumps(envelope)
+ await conn._handle_message(raw)
+
+ # The signature was computed with API_KEY_ID, so the SDK must
+ # accept it and dispatch the state_change.
+ assert len(state_changes) == 1
+ assert state_changes[0]["workflow_id"] == "wf-1"
+ assert state_changes[0]["state"] == "Normal"
+
+
+@pytest.mark.asyncio
+async def test_real_server_envelope_with_wrong_user_facing_key_still_accepted(monkeypatch):
+ """Belt-and-braces for FIX-D: even if the user-facing key
+ accidentally differs from the api_key_id the server used to sign
+ (which is the actual server shape — the server never sees the
+ user-facing key for HMAC purposes), the SDK still accepts the
+ message because it reads ``api_key_id`` from the envelope.
+
+ This pins the contract: HMAC verification identity MUST come from
+ the envelope's ``api_key_id`` field, not from ``self.api_key``.
+ """
+ state_changes: list[dict] = []
+ USER_FACING_KEY = "nr_live_wrong-key-sdk-never-uses-this-for-verify"
+ API_KEY_ID = "0b7632e8-11d8-4247-8666-c72b5320b4f6"
+ SECRET = "secret-from-_authenticate"
+
+ conn = WebSocketConnection(
+ url="wss://api.nullrun.io/ws/control/org-x",
+ headers={},
+ api_key=USER_FACING_KEY,
+ secret_key=SECRET,
+ on_state_change=state_changes.append,
+ )
+ stub = _StubWS()
+ monkeypatch.setattr(conn, "_conn", stub)
+ conn._running = True
+
+ msg = {"type": "state_change", "workflow_id": "wf-x", "state": "Normal", "version": 1}
+ envelope = _build_real_server_envelope(msg, USER_FACING_KEY, API_KEY_ID, SECRET)
+ raw = json.dumps(envelope)
+ await conn._handle_message(raw)
+
+ assert len(state_changes) == 1
+ assert state_changes[0]["workflow_id"] == "wf-x"
+
+
+@pytest.mark.asyncio
+async def test_legacy_envelope_without_api_key_id_falls_back_to_user_facing_key(monkeypatch):
+ """FIX-D backwards-compat: a pre-FIX-D server (no ``api_key_id``
+ field on the envelope) signed HMAC over the user-facing api_key.
+ The SDK must fall back to ``self.api_key`` in that case so legacy
+ round-trip tests and any pre-FIX-D deployments keep working.
+
+ We build an envelope without ``api_key_id`` and sign with the
+ user-facing key — the pre-FIX-D shape.
+ """
+ state_changes: list[dict] = []
+ USER_FACING_KEY = "nr_live_legacy-test"
+ SECRET = "legacy-secret"
+
+ conn = WebSocketConnection(
+ url="wss://example.invalid/ws/control/org-1",
+ headers={},
+ api_key=USER_FACING_KEY,
+ secret_key=SECRET,
+ on_state_change=state_changes.append,
+ )
+ stub = _StubWS()
+ monkeypatch.setattr(conn, "_conn", stub)
+ conn._running = True
+
+ msg = {"type": "state_change", "workflow_id": "wf-legacy", "state": "Normal", "version": 1}
+ # Sign with the user-facing key, drop api_key_id to simulate a
+ # pre-FIX-D envelope.
+ envelope = _build_signed_envelope(msg, USER_FACING_KEY, SECRET)
+ envelope.pop("api_key_id")
+ raw = json.dumps(envelope)
+ await conn._handle_message(raw)
+
+ # Legacy path: SDK uses self.api_key as fallback, signature
+ # verifies, dispatch happens.
+ assert len(state_changes) == 1
+ assert state_changes[0]["workflow_id"] == "wf-legacy"
+
+
+# ---------------------------------------------------------------------------
+# Wire-format contract tests (audit 2026-06-22 #3+#8)
+# ---------------------------------------------------------------------------
+
+
+def test_ws_hmac_identity_field_constant():
+ """The wire-format HMAC identity field name is pinned via
+ ``WS_HMAC_IDENTITY_FIELD``. Both sides of the WS push protocol
+ (NULLRUN backend's ``SignedWsMessage`` struct and the SDK
+ receiver in transport_websocket.py) agree on this field name.
+
+ Without this pin, a future struct rename on either side silently
+ breaks signature verification on every push — exactly the
+ regression class that audit 2026-06-22 #8 captured.
+ """
+ from nullrun.transport_websocket import WS_HMAC_IDENTITY_FIELD
+
+ assert WS_HMAC_IDENTITY_FIELD == "api_key"
+
+
+def test_ws_hmac_identity_field_used_in_receiver():
+ """Receiver must read the pinned field name (not a free-form
+ string literal) so the constant is the single source of truth.
+
+ Reads the source file directly (not ``inspect.getsource`` on the
+ class) so the test is robust to ``test_transport_branches.py``
+ monkey-patching ``transport_websocket.WebSocketConnection`` to a
+ fake class without restoring it (a pre-existing test-isolation
+ leak — see the ``_FakeConn`` assignments at test_transport_branches.py:553
+ and :581). With ``inspect.getsource`` the patched fake class has
+ no ``_handle_message`` and this test crashes; with direct file
+ reads we verify the source-of-truth bytes regardless of class
+ identity at test time.
+ """
+ from pathlib import Path
+
+ from nullrun.transport_websocket import WS_HMAC_IDENTITY_FIELD
+
+ src_path = Path(__file__).parent.parent / "src" / "nullrun" / "transport_websocket.py"
+ src = src_path.read_text(encoding="utf-8")
+
+ # The receiver code (the body of the ``_handle_message`` method)
+ # must reference the constant. Look for the constant by name
+ # rather than by literal value to confirm the pin is wired up.
+ assert "WS_HMAC_IDENTITY_FIELD" in src, (
+ "transport_websocket.py no longer references the "
+ "WS_HMAC_IDENTITY_FIELD constant — wire-format pin is gone"
+ )
+
+ # And the constant must keep its expected wire-format value
+ # (separate from the source-level reference so a refactor that
+ # changes the value is caught too).
+ assert WS_HMAC_IDENTITY_FIELD == "api_key"