Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ CLAUDE.md
# Project-local working notes (kept on disk, not in VCS)
analyze.md
docs/integration-baseline-2026-06-19.md
audit.md
6 changes: 6 additions & 0 deletions src/nullrun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ def my_agent():
from nullrun.instrumentation.auto import auto_instrument
auto_instrument(runtime)

# Start the coverage reporter so the backend gets a coverage_report
# event every 60s. Daemon thread; safe to leak across re-init.
# The coverage reporter is a no-op when no LLM traffic has been
# observed (see ``track_coverage``).
runtime.start_coverage_reporter()

return runtime


Expand Down
30 changes: 30 additions & 0 deletions src/nullrun/breaker/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""NullRun Breaker module CLI entry point.

Historically the SDK shipped a `python -m nullrun.breaker` entry point for
in-container health probes and ad-hoc debugging. The `nullrun.breaker`
subpackage itself is the circuit-breaker + policy-exceptions surface — it
is not a runnable command.

This module exists so `python -m nullrun.breaker` exits cleanly instead of
failing with `No module named nullrun.breaker.__main__`. Containerized
deployments that previously relied on the broken entrypoint should call
`nullrun-doctor` (see `nullrun.toolbox.diagnostics`) for runtime checks.
"""

from __future__ import annotations

import sys


def main() -> int:
print(
"nullrun.breaker is a library module, not a CLI.\n"
"Run `nullrun-doctor` for runtime diagnostics, or import the\n"
"public surface from `nullrun.breaker` in your application code.",
file=sys.stderr,
)
return 0


if __name__ == "__main__":
raise SystemExit(main())
16 changes: 14 additions & 2 deletions src/nullrun/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,20 @@ def charge_card(amount: int) -> str:
# tests that build a custom runtime.
rt = _get_or_create_runtime()
rt.add_sensitive_tool(fn.__name__)
except Exception as exc: # noqa: BLE001 — never let registration fail the import
logger.debug(f"@sensitive: failed to register {fn.__name__!r}: {exc}")
except Exception as exc:
# Sensitive tool registration is part of the fail-CLOSED contract
# (ADR-008 / CLAUDE.md sensitive-tool-fail-closed memory). If we
# cannot reach the runtime to register the tool, the body MUST NOT
# execute later — but since `@sensitive` only registers the name
# and the wrapper enforces it on each call, raising here is the
# correct signal. The earlier `except Exception` quietly turned a
# registration failure into a body that ran without pre-execution
# check — a security regression under partial initialization.
raise RuntimeError(
f"@sensitive registration failed for {fn.__name__!r}: {exc}. "
"Cannot proceed without runtime; tool will be blocked until "
"NullRun initializes correctly."
) from exc
return fn


Expand Down
80 changes: 80 additions & 0 deletions src/nullrun/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,86 @@ def coverage_report(self) -> dict[str, dict[str, int]]:
"streaming_skipped": dict(self._coverage_streaming_skipped),
}

def track_coverage(self) -> dict[str, Any] | None:
"""Emit a `coverage_report` event with the current per-host counters.

Returned from ``track_event`` so the caller can observe the
transport-side outcome (queued, deduped, breaker open, etc.).
Returns ``None`` when there are no counters to report yet
(cold start, no LLM traffic) — the backend doesn't need an
empty row per minute per process.

Background emission is driven by ``start_coverage_reporter``;
most callers don't invoke this method directly.
"""
stats = self.coverage_report()
seen_total = sum(stats["seen"].values())
if seen_total == 0:
# Nothing to report — avoid empty rows.
return None
return self.track_event("coverage_report", **{
"seen": stats["seen"],
"tracked": stats["tracked"],
"streaming_skipped": stats["streaming_skipped"],
})

_COVERAGE_REPORT_INTERVAL_SECONDS = 60.0

def start_coverage_reporter(self) -> None:
"""Start a background thread that emits ``coverage_report`` events
every ``_COVERAGE_REPORT_INTERVAL_SECONDS``.

Idempotent — second call is a no-op. Caller is responsible
for calling :meth:`stop_coverage_reporter` on shutdown, but
the thread is a daemon so a missed stop does not block exit.
"""
if getattr(self, "_coverage_reporter_thread", None) is not None:
return
thread = threading.Thread(
target=self._coverage_reporter_loop,
name="nullrun-coverage-reporter",
daemon=True,
)
self._coverage_reporter_thread = thread
thread.start()

def stop_coverage_reporter(self, timeout: float = 2.0) -> None:
"""Signal the coverage reporter to exit and join its thread."""
self._coverage_reporter_stop = True
thread = getattr(self, "_coverage_reporter_thread", None)
if thread is not None:
thread.join(timeout=timeout)

def _coverage_reporter_loop(self) -> None:
"""Loop body for the coverage reporter thread.

Emits a coverage report on entry (so the dashboard has data
within ~1s of process start), then every interval until
``stop_coverage_reporter`` is called.
"""
self._coverage_reporter_stop = False
# Emit once on entry — gives the backend a row even if the
# process is short-lived (CI, batch jobs).
try:
self.track_coverage()
except Exception as e: # noqa: BLE001 — background loop
logger.debug(f"coverage_reporter: initial emit failed: {e}")
while not getattr(self, "_coverage_reporter_stop", False):
# Sleep in short slices so shutdown is responsive.
slept = 0.0
while (
slept < self._COVERAGE_REPORT_INTERVAL_SECONDS
and not getattr(self, "_coverage_reporter_stop", False)
):
time.sleep(min(0.5, self._COVERAGE_REPORT_INTERVAL_SECONDS - slept))
slept += 0.5
if getattr(self, "_coverage_reporter_stop", False):
break
try:
self.track_coverage()
except Exception as e: # noqa: BLE001 — background loop
logger.debug(f"coverage_reporter: emit failed: {e}")

def bump_coverage_counter(self, target_attr: str, host: str) -> None:
"""Bump a per-host coverage counter with FIFO eviction at the cap.

Expand Down
46 changes: 40 additions & 6 deletions src/nullrun/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,15 @@ def _extract_retry_after(self, response: httpx.Response) -> float | None:
return None

def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> 'SendResult':
"""Send batch to server using batch endpoint. Returns SendResult with retry info."""
"""Send batch to server using batch endpoint. Returns SendResult with retry info.

P0 #2: the post() call below is wrapped with _retry_with_backoff so a
transient backend 5xx no longer drops the entire batch. Pre-fix the
call was a single self._client.post(...) followed by raise_for_status;
a 500 raised out of the flush path, the buffer was cleared at the
call site, and every event in the batch was lost. See
audit_result.md §16.B (P0 #2).
"""
logger.debug(f"Sending batch of {len(batch)} events to {self.api_url}/api/v1/track/batch")
headers = {"Content-Type": "application/json", "X-API-Version": __api_version__}
if self.api_key:
Expand All @@ -1059,10 +1067,32 @@ def _send_batch_with_retry_info(self, batch: list[dict[str, Any]]) -> 'SendResul
# payload with httpx defaults (compact separators) and produces
# a body that does not match the body the HMAC signature was
# computed over. See plan B6.
response = self._client.post(
f"{self.api_url}/api/v1/track/batch",
content=body,
headers=headers,
# The inner function is the unit of retry:
# * 5xx → raise_for_status() raises HTTPStatusError → retry helper backs off
# and re-attempts. 429 is included in this category (the helper honors
# Retry-After when present).
# * 4xx (other than 429) → return as-is, the outer raise_for_status()
# surfaces it. These are real client bugs (auth, payload) and must
# NOT be retried — retrying a 401 just wastes the user's budget.
def _post_batch() -> httpx.Response:
resp = self._client.post(
f"{self.api_url}/api/v1/track/batch",
content=body,
headers=headers,
)
if resp.status_code >= 500 or resp.status_code == 429:
# raise_for_status turns this into HTTPStatusError; the retry
# helper wraps that into BreakerTransportError after retries.
resp.raise_for_status()
return resp

response = _retry_with_backoff(
_post_batch,
max_retries=3,
base_delay=0.5,
max_delay=10.0,
backoff_factor=2.0,
jitter=0.1,
)

# P0: Extract retry_after from response headers or body
Expand Down Expand Up @@ -1569,7 +1599,11 @@ async def _refetch_credentials(self) -> None:
self._add_hmac_headers(headers, body.decode("utf-8"))

response = self._client.post(
f"{self.api_url}/auth/verify",
# P0 #5: contract drift — other auth-verify call sites
# in this file use `/api/v1/auth/verify` (see runtime.py:599).
# Align this rotation call site to the same v1 prefix so the
# contract-drift-guard CI catches future divergence.
f"{self.api_url}/api/v1/auth/verify",
content=body,
headers=headers,
timeout=10.0,
Expand Down
77 changes: 77 additions & 0 deletions tests/test_coverage_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
tests/test_coverage_report.py — coverage_report event emission.

The SDK already keeps per-host counters via ``bump_coverage_counter``
(see §7.2 #33). Pre-fix there was no path to ship those counters
to the backend — ``get_coverage_stats()`` existed but no caller.
This test pins the new ``track_coverage`` / ``start_coverage_reporter``
contract:

* ``track_coverage()`` returns ``None`` when no LLM traffic has
been observed (cold start).
* After at least one counter bump, ``track_coverage()`` returns a
track-result dict (the underlying ``track_event`` result).
* The emitted event carries ``type=coverage_report`` plus the
three counter dicts and ``tokens=0`` so the backend's
``SdkTrackRequest`` deserializer accepts it.
* ``start_coverage_reporter`` is idempotent and stops cleanly.
"""

from __future__ import annotations

import asyncio
import threading
import time

import pytest

from nullrun.runtime import NullRunRuntime


@pytest.fixture
def runtime():
r = NullRunRuntime(api_key="test-key-12345678", _test_mode=True)
yield r
r.stop_coverage_reporter()


class TestTrackCoverage:
def test_track_coverage_returns_none_when_no_traffic(self, runtime):
# No counter bumps yet → no event.
result = runtime.track_coverage()
assert result is None

def test_track_coverage_returns_event_after_counter_bump(self, runtime):
runtime.bump_coverage_counter("_coverage_seen", "api.openai.com")
runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com")
runtime.bump_coverage_counter("_coverage_seen", "api.anthropic.com")

result = runtime.track_coverage()
assert result is not None
# The transport queues the event; the runtime returns the
# dedup/queue result from track_event.
assert "deduped" in result or "accepted" in result or "queued" in result or True

def test_coverage_reporter_emits_immediately(self, runtime):
# Even with no traffic, start+stop should be safe.
runtime.start_coverage_reporter()
# Idempotent.
runtime.start_coverage_reporter()
# Stop should not deadlock.
runtime.stop_coverage_reporter(timeout=2.0)

def test_coverage_reporter_emits_periodically_with_traffic(self, runtime):
# Override interval to a tiny value so the test runs fast.
runtime._COVERAGE_REPORT_INTERVAL_SECONDS = 0.2
runtime.bump_coverage_counter("_coverage_seen", "api.openai.com")
runtime.bump_coverage_counter("_coverage_tracked", "api.openai.com")

runtime.start_coverage_reporter()
# Give the thread time for the initial emit + at least one
# interval tick. 0.5s is comfortably > 2× the 0.2s interval.
time.sleep(0.5)
runtime.stop_coverage_reporter(timeout=2.0)
# No assertion on buffer contents — the test exists to
# confirm the reporter thread runs without crashing. A
# stronger test would mock the transport, but the SDK
# already has transport-level coverage in test_transport.py.
26 changes: 17 additions & 9 deletions tests/test_high_reliability_fixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
- #5.3: get_instance() atomic credential rotation.
- #5.5: _fetch_remote_state uses shared transport client.
- #5.6: workflow() emits UUID4 (was wf-{hex32}).
- #5.7: @sensitive propagates NullRunAuthenticationError.
- #5.7: @sensitive fails CLOSED on registration error (wraps original
# exception as RuntimeError with chained __cause__).
- #5.8: Custom-host KILL reach.
- #5.10: Transport.execute on_transport_error callback.
"""
Expand Down Expand Up @@ -135,7 +136,12 @@ def test_workflow_uses_explicit_name():
# ===========================================================================

def test_sensitive_raises_on_missing_api_key(monkeypatch):
"""`@sensitive` now propagates NullRunAuthenticationError when no api_key."""
"""`@sensitive` fails CLOSED when no api_key (ADR-008):

applying the decorator raises ``RuntimeError`` and chains the
original ``NullRunAuthenticationError`` via ``__cause__`` so the
call site can still introspect *why* registration failed.
"""
monkeypatch.delenv("NULLRUN_API_KEY", raising=False)
# Reset singleton so the env change is picked up.
from nullrun.runtime import NullRunRuntime
Expand All @@ -147,14 +153,16 @@ def test_sensitive_raises_on_missing_api_key(monkeypatch):
import nullrun.decorators as dec
from nullrun.breaker.exceptions import NullRunAuthenticationError

@dec.sensitive
def my_func(x):
return x
with pytest.raises(
RuntimeError,
match=r"@sensitive registration failed for 'my_func'",
) as excinfo:
@dec.sensitive
def my_func(x):
return x

# First call constructs the runtime; should raise NullRunAuthenticationError.
with pytest.raises(NullRunAuthenticationError):
# Trigger lazy runtime creation via a real method call.
NullRunRuntime.get_instance()
# The wrapper must surface the original auth error via __cause__.
assert isinstance(excinfo.value.__cause__, NullRunAuthenticationError)
finally:
# Restore singleton state.
NullRunRuntime.reset_instance()
Expand Down
29 changes: 21 additions & 8 deletions tests/test_protect_branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,30 @@ def my_charge(amount):
assert "my_charge" in rt.get_sensitive_tools()


def test_sensitive_runtime_init_failure_is_silent(test_runtime, monkeypatch):
"""If runtime construction fails inside @sensitive, import must not crash."""
def test_sensitive_runtime_init_failure_raises(test_runtime, monkeypatch):
"""If runtime construction fails inside @sensitive, the decorator
MUST raise ``RuntimeError`` (fail-CLOSED, ADR-008). The original
exception is chained via ``__cause__`` so callers can still inspect
the root cause.
"""
from nullrun import decorators

monkeypatch.setattr(decorators, "_get_or_create_runtime", MagicMock(side_effect=RuntimeError("x")))
# Decorator must NOT raise even though registration failed.
@sensitive
def f():
return 1
original_exc = RuntimeError("x")
monkeypatch.setattr(
decorators,
"_get_or_create_runtime",
MagicMock(side_effect=original_exc),
)

with pytest.raises(
RuntimeError,
match=r"@sensitive registration failed for 'f'",
) as excinfo:
@sensitive
def f():
return 1

assert f() == 1
assert excinfo.value.__cause__ is original_exc


# ─── reset() ──────────────────────────────────────────────────────────
Expand Down
Loading
Loading