Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion agent_sudo/doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
default_delegations_path,
is_broad_delegation,
)
from agent_sudo.inventory import InventoryReport, _version_key, build_inventory
from agent_sudo.self_identity import SelfIdentity, describe_running_install


@dataclass(frozen=True)
Expand All @@ -21,7 +23,12 @@ class DoctorCheck:
detail: str


def run_doctor(*, repo_root: Path | None = None) -> list[DoctorCheck]:
def run_doctor(
*,
repo_root: Path | None = None,
identity: SelfIdentity | None = None,
inventory_report: InventoryReport | None = None,
) -> list[DoctorCheck]:
# doctor reports user readiness only. Repository/contributor hygiene
# (e.g. the personal-data scanner) is a CI concern run via
# scripts/check_no_personal_data.py, not surfaced to evaluators here.
Expand All @@ -40,9 +47,84 @@ def run_doctor(*, repo_root: Path | None = None) -> list[DoctorCheck]:
_writable_file_check("audit log writable", state_root / "doctor-audit.jsonl"),
_writable_file_check("delegation store writable", default_delegations_path()),
_broad_delegations_check(),
*_install_health_checks(identity, inventory_report),
]


def _install_health_checks(
identity: SelfIdentity | None,
inventory_report: InventoryReport | None,
) -> list[DoctorCheck]:
"""Detect a stale running install or an editable that drifted from its source.

WARN-only (neither name is in the required set, so the exit code is never
affected — issue #110 explicitly does not auto-fix). Reuses the
:class:`SelfIdentity` primitive (#108) and the inventory classification
(#101) rather than re-deriving versions.
"""
if identity is None:
identity = describe_running_install()
if inventory_report is None:
inventory_report = build_inventory()
return [
_staleness_check(identity, inventory_report),
_runtime_source_check(identity),
]


def _staleness_check(identity: SelfIdentity, report: InventoryReport) -> DoctorCheck:
name = "install up to date"
running = identity.version
newest = report.newest_version
if not running or running == "unknown" or not newest:
return DoctorCheck(name, True, "no newer install detected")
if _version_key(running) >= _version_key(newest):
return DoctorCheck(name, True, f"running the newest install found ({running})")
# A newer copy exists elsewhere — name where, so the user can re-point/reinstall.
newer = [i for i in report.installs if i.version == newest]
location = _tilde(newer[0].root) if newer else "another install"
return DoctorCheck(
name,
False,
f"running {running} but {newest} is installed at {location}; "
"your shell is resolving an older copy — run `agent-sudo inventory` "
"and reinstall or re-point to the newest",
)


def _runtime_source_check(identity: SelfIdentity) -> DoctorCheck:
name = "runtime matches install source"
if identity.install_type != "editable":
return DoctorCheck(name, True, f"{identity.install_type} install")
source = identity.source_path
package = identity.package_path
if source and _path_within(package, source):
return DoctorCheck(name, True, f"editable source {_tilde(source)}")
return DoctorCheck(
name,
False,
f"editable install registered for {_tilde(source or '?')} but "
f"code is running from {_tilde(package)}; the editable source "
"moved or a stale copy is shadowing it — reinstall with `pip install -e`",
)


def _tilde(path: str) -> str:
"""Collapse $HOME to ~ for install/source locations (never cwd-relative)."""
home = str(Path.home())
return path.replace(home, "~", 1) if path and path.startswith(home) else path


def _path_within(child: str, parent: str) -> bool:
"""True if ``child`` is ``parent`` or lives under it (string-safe, no I/O)."""
try:
child_path = Path(child).resolve()
parent_path = Path(parent).resolve()
except OSError:
return False
return child_path == parent_path or parent_path in child_path.parents


def doctor_exit_code(checks: list[DoctorCheck]) -> int:
required = {
"Python version OK",
Expand Down
8 changes: 7 additions & 1 deletion docs/command_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ and troubleshooting); they are listed under their primary use.

### `doctor`
- **Purpose:** report local readiness (Python version, policy, writable audit/
delegation stores, approval config).
delegation stores, approval config). Also WARNs when the **running install is
stale** (an older copy is resolving ahead of a newer one on the machine) or
when an **editable install has drifted** from its registered source — so a
shell silently running an out-of-date copy is caught here, not in production.
- **Example:** `agent-sudo doctor`
- **When to use:** right after install, or when something isn't working.
- **Common mistakes:** expecting it to validate your MCP client config — it checks the
local Agent_Sudo install, not the client wiring (use `verify-routing` for that).
- **Note:** the staleness/drift checks are **WARN-only** — they never fail the
exit code or change anything; run `agent-sudo inventory` for the full
install map and `--version` to see which copy is running.

### `--version`
- **Purpose:** print the running version **and which copy is running it** —
Expand Down
101 changes: 101 additions & 0 deletions tests/test_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
run_doctor,
)
from agent_sudo.gateway import main
from agent_sudo.inventory import InstallRecord, InventoryReport
from agent_sudo.self_identity import SelfIdentity


class DoctorTests(unittest.TestCase):
Expand Down Expand Up @@ -88,6 +90,105 @@ def test_doctor_exit_code_fails_required_check(self) -> None:

self.assertEqual(doctor_exit_code(checks), 1)

def test_install_health_checks_present_by_default(self) -> None:
names = {check.name for check in run_doctor()}
self.assertIn("install up to date", names)
self.assertIn("runtime matches install source", names)


class InstallHealthCheckTests(unittest.TestCase):
"""Stale-install and editable-drift detection (issue #110), WARN-only."""

def _identity(self, **over) -> SelfIdentity:
base = dict(
version="0.5.6",
install_type="editable",
source_path="/repo/Agent_Sudo",
package_path="/repo/Agent_Sudo/agent_sudo",
python_executable="/py/bin/python",
python_prefix="/py",
python_version="3.11.14",
origin="console-script",
)
base.update(over)
return SelfIdentity(**base)

def _report(self, *, newest: str, installs) -> InventoryReport:
records = [
InstallRecord(root=root, executable="", version=version)
for root, version in installs
]
return InventoryReport(
installs=records, configs=[], warnings=[], newest_version=newest
)

def _by_name(self, checks):
return {check.name: check for check in checks}

def test_stale_pinned_install_warns(self) -> None:
identity = self._identity(
install_type="pinned-wheel",
version="0.5.5",
source_path="/venv/site-packages/agent_sudo",
package_path="/venv/site-packages/agent_sudo",
)
report = self._report(
newest="0.5.6",
installs=[
("/venv", "0.5.5"),
("/Users/dev/Developer/Agent_Sudo", "0.5.6"),
],
)
checks = self._by_name(run_doctor(identity=identity, inventory_report=report))
stale = checks["install up to date"]
self.assertFalse(stale.ok)
self.assertIn("0.5.5", stale.detail)
self.assertIn("0.5.6", stale.detail)
self.assertIn("Developer/Agent_Sudo", stale.detail)
# a pinned install has no editable source to drift, so this stays OK
self.assertTrue(checks["runtime matches install source"].ok)
# WARN-only: a stale install must not fail the doctor exit code
self.assertEqual(
doctor_exit_code(run_doctor(identity=identity, inventory_report=report)), 0
)

def test_editable_source_mismatch_warns(self) -> None:
identity = self._identity(
install_type="editable",
version="0.5.6",
source_path="/old/checkout/Agent_Sudo",
package_path="/elsewhere/Agent_Sudo/agent_sudo",
)
report = self._report(
newest="0.5.6", installs=[("/old/checkout/Agent_Sudo", "0.5.6")]
)
checks = self._by_name(run_doctor(identity=identity, inventory_report=report))
drift = checks["runtime matches install source"]
self.assertFalse(drift.ok)
self.assertIn("/old/checkout/Agent_Sudo", drift.detail)
self.assertIn("/elsewhere/Agent_Sudo", drift.detail)
# version is newest, so staleness stays OK
self.assertTrue(checks["install up to date"].ok)
self.assertEqual(
doctor_exit_code(run_doctor(identity=identity, inventory_report=report)), 0
)

def test_clean_current_install_is_ok(self) -> None:
identity = self._identity() # editable 0.5.6, package under source
report = self._report(newest="0.5.6", installs=[("/repo/Agent_Sudo", "0.5.6")])
checks = self._by_name(run_doctor(identity=identity, inventory_report=report))
self.assertTrue(checks["install up to date"].ok)
self.assertTrue(checks["runtime matches install source"].ok)
self.assertEqual(
doctor_exit_code(run_doctor(identity=identity, inventory_report=report)), 0
)

def test_unknown_versions_do_not_warn(self) -> None:
identity = self._identity(version="unknown")
report = self._report(newest="", installs=[])
checks = self._by_name(run_doctor(identity=identity, inventory_report=report))
self.assertTrue(checks["install up to date"].ok)

def test_doctor_format_contains_status(self) -> None:
text = format_doctor_checks(
[DoctorCheck("approval config exists", False, "not initialized")]
Expand Down