From 27cc47368ad0351482a369beb76d77eccee9b7bb Mon Sep 17 00:00:00 2001 From: N1ghthill <115030983+N1ghthill@users.noreply.github.com> Date: Tue, 24 Mar 2026 00:02:29 -0300 Subject: [PATCH] Validate MCP client flow and harden approval concurrency --- CHANGELOG.md | 6 + README.md | 3 +- docs/README.md | 1 + docs/beta-readiness-gate.md | 22 +- docs/mcp-client-validation.md | 92 ++++ docs/release-checklist.md | 5 +- docs/roadmap.md | 10 +- docs/runtime-integration-testing.md | 13 +- docs/runtime-mcp-maturation-plan.md | 10 +- docs/status.md | 16 +- docs/vps-validation-report.md | 32 +- scripts/validate_mcp_client.py | 378 +++++++++++++++ src/master_control/core/runtime.py | 29 +- src/master_control/interfaces/mcp/server.py | 502 ++++++++++++++++++-- src/master_control/store/session_store.py | 312 +++++++++--- tests/test_mcp_server.py | 183 +++++++ tests/test_mcp_stdio_integration.py | 102 ++++ tests/test_session_store.py | 105 +++- 18 files changed, 1648 insertions(+), 173 deletions(-) create mode 100644 docs/mcp-client-validation.md create mode 100644 scripts/validate_mcp_client.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a079361..002069d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,8 @@ - typed planning decision kinds plus final turn classification - deterministic final-message guidance keyed off turn classification - repository hardening with `ruff`, `mypy`, `pre-commit`, and GitHub templates +- official MCP Inspector CLI validation harness in `scripts/validate_mcp_client.py` +- MCP approval tools for standard clients: `approval_list`, `approval_get`, `approval_approve`, and `approval_reject` ### Changed @@ -72,6 +74,10 @@ - chat payloads and audit now distinguish planner intent from final turn outcome, including missing safe tools and confirmation waits - blocked or partial turns now return clearer operator guidance instead of relying only on provider prose - CI now enforces lint and typecheck in addition to tests and smoke validation +- `mc mcp-serve` now closes the standard JSON-RPC MCP handshake expected by real clients +- MCP stdio now exposes approval resolution through standard `tools/list` and `tools/call`, not only through custom approval methods +- tool approvals now deduplicate active action envelopes and block duplicate in-flight execution for the same pending action +- release-facing docs and gates now reflect the latest VPS rerun and the Inspector-backed MCP validation path ### Notes diff --git a/README.md b/README.md index acd2084..bec5dd4 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ MC is built around three constraints: - single-host and local-first by design - install path: source checkout plus `install.sh` - validated on the maintainer workstation and on a dedicated Debian 13 VPS lab -- main integration interface: experimental MCP stdio with approval-mediated write flow +- main integration interface: experimental but JSON-RPC-compatible MCP stdio with approval-mediated write flow - local administration interface: CLI - optional interface: chat/provider path - not positioned as a production-ready Linux administration platform, security auditor, or package manager @@ -72,6 +72,7 @@ If `install.sh` reports that `ensurepip` is unavailable on Debian or Ubuntu, ins - [Policy guide](docs/policy.md) - [Operator workflows](docs/operator-workflows.md) - [Runtime integration testing](docs/runtime-integration-testing.md) +- [MCP client validation](docs/mcp-client-validation.md) - [Provider setup](docs/providers.md) - [Host-profile validation guide](docs/host-profile-validation.md) - [Validation evidence](docs/alpha-validation-report.md) diff --git a/docs/README.md b/docs/README.md index fd7b5d8..05d8e47 100644 --- a/docs/README.md +++ b/docs/README.md @@ -27,6 +27,7 @@ Use this file to find the right working document, validation record, or planning - `docs/alpha-validation-report.md`: main validation summary for the current pre-1.0 baseline - `docs/runtime-integration-testing.md`: runtime and MCP contract validation guide +- `docs/mcp-client-validation.md`: official MCP Inspector CLI validation guide and evidence - `docs/vps-validation-report.md`: dedicated Debian VPS validation evidence - `docs/vps-validation-runbook.md`: repeatable runbook for the maintainer-controlled VPS lab - `docs/call-for-testers.md`: outreach copy for collecting more host-validation evidence diff --git a/docs/beta-readiness-gate.md b/docs/beta-readiness-gate.md index c5966cc..27364e7 100644 --- a/docs/beta-readiness-gate.md +++ b/docs/beta-readiness-gate.md @@ -1,6 +1,6 @@ # Beta Readiness Gate -Snapshot date: 2026-03-20 +Snapshot date: 2026-03-23 ## Purpose @@ -100,9 +100,10 @@ Minimum exit bar: Primary hotspots still to watch: -- `src/master_control/app.py` +- `src/master_control/core/runtime.py` +- `src/master_control/store/session_store.py` +- `src/master_control/interfaces/cli/entrypoint.py` - `src/master_control/providers/heuristic.py` -- `src/master_control/agent/session_insights.py` ## Required Release Artifacts Before Beta @@ -116,15 +117,16 @@ Before any beta tag, the repository should have: ## Current Status -Current assessment on 2026-03-20: +Current assessment on 2026-03-23: - not yet beta-ready -- the `0.1.0a2` release-candidate package is prepared locally, and Gate 1 host-count evidence is now satisfied through the maintainer workstation plus a dedicated Debian 13 VPS validation run -- local hardening has already closed bootstrap, comparative follow-up, config-diff, config-diff refinement, service-log compression, service-log pattern refinement, bootstrap evidence, comparative phrase-collection, bootstrap-to-CI decision, and community validation intake packages -- release-facing docs and final positioning still need to be synchronized before any beta claim or tag decision -- dedicated VPS evidence is recorded in `docs/vps-validation-report.md` +- Gate 1 host-count evidence is satisfied through the maintainer workstation plus the dedicated Debian 13 VPS validation lab +- the operator bootstrap path, host-profile validation, and maintainer engineering baseline were rerun successfully on the VPS on 2026-03-23 +- the MCP approval-mediated mutation flow is now also validated through the official Inspector CLI on the maintainer workstation +- the remaining blockers are now codebase maintainability, explicit schema governance, and broader operational hardening rather than missing first-pass MCP or host-validation evidence Main remaining actions: -1. update the canonical release-facing docs to reflect that workflow validation now exists on more than one real host profile -2. decide whether the current maintainability and release posture justify beta language now, or whether the project should remain in late-alpha/private-preview wording a little longer +1. reduce the central runtime/store/CLI hotspots enough that the next operational slices stay reviewable +2. define and enforce tool-schema compatibility rules before broadening the MCP surface further +3. decide whether late-alpha/private-preview wording should continue until those two bars move diff --git a/docs/mcp-client-validation.md b/docs/mcp-client-validation.md new file mode 100644 index 0000000..173e931 --- /dev/null +++ b/docs/mcp-client-validation.md @@ -0,0 +1,92 @@ +# MCP Client Validation + +Snapshot date: 2026-03-23 + +## Purpose + +This document records the current real-client validation path for `mc mcp-serve`. + +The goal is not to rely only on in-process stdio tests. +The goal is to prove that a standard MCP client can complete the approval-mediated mutation flow against the real server process. + +## Validated Client + +Current validated client: + +- official MCP Inspector CLI via `@modelcontextprotocol/inspector --cli` + +Current validated host: + +- maintainer workstation with Node `22.22.1` + +Note: + +- the dedicated Debian 13 VPS lab currently provides Node `20.19.2`, which is below the Inspector package's declared Node floor +- the VPS remains useful for operator bootstrap and host-validation evidence, but the current Inspector CLI transcript is maintained from the workstation + +## What Is Validated + +The current Inspector-backed flow validates: + +1. standard JSON-RPC `initialize` over stdio +2. `tools/list` against the real `mc mcp-serve` process +3. read-only `tools/call` for `system_info` +4. approval-mediated `tools/call` for `write_config_file` +5. approval resolution through MCP-exposed tools: + - `approval_get` + - `approval_approve` + +This means a real MCP client can: + +- inspect the host +- trigger a bounded mutating action +- receive a structured pending-approval payload +- resolve that approval through the MCP tool surface +- observe the executed result and persisted approval record + +## Repeatable Command + +Run: + +```bash +python3 scripts/validate_mcp_client.py --output-dir ./artifacts/mcp-client-validation +``` + +Optional JSON report: + +```bash +python3 scripts/validate_mcp_client.py --json +``` + +## Generated Artifacts + +Each run writes: + +- `report.json` +- per-step stdout/stderr logs +- isolated MC state under the run directory + +Each successful run is written beneath: + +- `artifacts/mcp-client-validation/` + +## Current Interpretation + +What this validation proves: + +- `mc mcp-serve` now closes the standard JSON-RPC MCP handshake required by the official Inspector client +- the server exposes host tools and approval tools in a format accepted by a standard MCP client +- the approval-mediated mutation path works through a real external client, not only through direct line tests + +What this validation does not prove: + +- every desktop MCP client UX is identical +- GUI-oriented client affordances are already documented well enough for operators +- the broader tool schema governance work is complete + +## Remaining Gap + +The remaining MCP evidence gap is narrower now: + +- a desktop-specific transcript such as Claude Desktop is still useful follow-up evidence +- but the project is no longer blocked on "no real MCP client validation at all" diff --git a/docs/release-checklist.md b/docs/release-checklist.md index a2bf928..37bd86e 100644 --- a/docs/release-checklist.md +++ b/docs/release-checklist.md @@ -86,8 +86,9 @@ Validate the interfaces that are part of the release scope. - `mc mcp-serve` starts cleanly - documented MCP behavior matches the currently supported scope -- if the release includes only read-only MCP, confirm mutating tools are still blocked there -- if the release includes write-capable MCP in the future, confirm approval-mediated mutation flow from a real MCP client +- confirm the stdio server still completes the standard JSON-RPC MCP handshake +- confirm approval-mediated mutation flow from a real MCP client through `python3 scripts/validate_mcp_client.py` +- if a release claims desktop-client friendliness beyond Inspector CLI, capture that client-specific transcript too ### Optional planner layer diff --git a/docs/roadmap.md b/docs/roadmap.md index 5732fe4..7d84c1f 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -9,6 +9,8 @@ Snapshot date: 2026-03-23 - runtime-first and MCP-first product direction - runtime already supports bounded inspection, controlled service/config actions, auditability, and validation workflows - a first operator-configurable policy slice now exists through versioned TOML +- standard-client MCP validation now exists through the official Inspector CLI +- approval concurrency now deduplicates active envelopes and blocks duplicate in-flight execution - current priority is to mature the runtime and MCP path into a trustworthy operational interface ## Phase 1: Controlled MCP Write Path @@ -83,10 +85,10 @@ Exit criteria: ## Near-Term Execution Order -1. add runtime integration coverage and harnesses -2. validate the MCP contract through real-client and stdio transcript flows -3. harden concurrency and schema governance -4. continue core/interface cleanup +1. define tool-schema compatibility rules and release policy +2. continue core/interface cleanup +3. add container-backed service/config integration harnesses +4. broaden client and host validation evidence 5. expand tool domains only after the previous items are stable ## Out Of Scope For This Track diff --git a/docs/runtime-integration-testing.md b/docs/runtime-integration-testing.md index 6041027..8383120 100644 --- a/docs/runtime-integration-testing.md +++ b/docs/runtime-integration-testing.md @@ -20,6 +20,7 @@ The goal is to validate the real runtime boundaries: 3. MCP stdio subprocess contract coverage in `tests/test_mcp_stdio_integration.py` 4. Operator bootstrap validation via `python3 scripts/validate_operator_bootstrap.py` 5. Host-profile validation via `mc validate-host-profile` +6. Real MCP client validation via `python3 scripts/validate_mcp_client.py` ## Local Commands @@ -57,15 +58,23 @@ python3 scripts/validate_operator_bootstrap.py \ --python python3 ``` +Run the real-client MCP contract check: + +```bash +python3 scripts/validate_mcp_client.py \ + --output-dir /tmp/mc-client-validation +``` + ## What These Tests Prove Today - operator policy can disable tools, require confirmation, constrain service targets, and redefine managed config targets - invalid policy fails closed and is surfaced through `mc doctor` -- `mc mcp-serve` works as a real stdio subprocess for `initialize`, `tools/list`, `tools/call`, and `approvals/*` +- `mc mcp-serve` works as a real stdio subprocess for both the legacy approval API and the standard JSON-RPC MCP handshake - approval-mediated config mutation works through the real MCP server process, not just through in-process unit helpers +- the official MCP Inspector CLI can complete `tools/list`, read-only execution, pending approval, `approval_get`, and `approval_approve` ## Known Gaps - no container-backed integration harness yet for repeatable `systemd` service scenarios -- no external desktop MCP client transcript is checked in yet +- no desktop-specific GUI transcript is checked in yet - real-host smoke validation remains necessary for host-specific paths that containers do not model well diff --git a/docs/runtime-mcp-maturation-plan.md b/docs/runtime-mcp-maturation-plan.md index 86825b9..734d08d 100644 --- a/docs/runtime-mcp-maturation-plan.md +++ b/docs/runtime-mcp-maturation-plan.md @@ -26,11 +26,11 @@ As of this snapshot, MC already has: The current codebase also still has clear limits: -- the MCP write path now exists through persisted approvals, but real-client validation and broader contract hardening are still pending +- the MCP write path now exists through persisted approvals, and official Inspector CLI validation now proves the standard-client flow - the runtime still carries chat-oriented orchestration inside `core.runtime` - a first operator-configurable policy slice now exists through versioned TOML, but broader validation and operator guidance are still pending - the tool contract does not yet have explicit schema-version governance -- concurrency and multi-call behavior are only partially addressed through SQLite WAL and bounded subprocesses, not through a complete runtime concurrency model +- concurrency and multi-call behavior now include approval-envelope deduplication and in-flight claim protection, but not a complete runtime concurrency model - real-host validation exists, but deeper integration coverage for runtime mutation paths and MCP write flows is not yet the main center of the test strategy ## Product Goal @@ -127,6 +127,8 @@ Current progress: - persisted tool approvals are now part of the runtime state model - the experimental MCP bridge now exposes controlled write requests plus `approvals/list|get|approve|reject` - approval lifecycle coverage exists at unit and runtime-contract level +- the stdio server now closes the standard JSON-RPC MCP handshake expected by real clients +- the official Inspector CLI has now completed the approval-mediated mutation flow against `mc mcp-serve` Required outcomes: @@ -149,6 +151,7 @@ Recommended approval contract: - `approvals/get` - `approvals/approve` - `approvals/reject` +- expose the same approval lifecycle through standard MCP tools for interoperable clients that cannot call custom top-level methods directly - bind each pending action to an approval id plus a normalized action envelope - include operator-facing evidence in the approval payload: - requested tool @@ -166,7 +169,8 @@ Implementation rule: Interoperability target: - `mc mcp-serve` can be used from a standard MCP client -- a client such as Claude Desktop can inspect the host and complete an approval-mediated mutation flow without unrestricted shell access +- a standard client such as the official Inspector CLI can inspect the host and complete an approval-mediated mutation flow without unrestricted shell access +- a desktop-client transcript such as Claude Desktop remains a useful follow-up, not the first blocker ### Workstream 1.3: Operator-configurable policy model diff --git a/docs/status.md b/docs/status.md index 024122c..9e3613b 100644 --- a/docs/status.md +++ b/docs/status.md @@ -78,6 +78,8 @@ Chat and planner providers remain optional layers on top of the same runtime. ### Interfaces - experimental MCP stdio bridge with approval-mediated write flow on top of the runtime +- standard JSON-RPC-compatible MCP stdio handshake for real MCP clients +- MCP approval tools exposed through the standard `tools/list` / `tools/call` surface - CLI commands for doctor, tools, audit, sessions, observations, recommendations, direct tool execution, and chat - CLI-integrated `validate-host-profile` command backed by reusable host-validation code - optional `systemd` timer installation for bounded recommendation reconciliation @@ -96,10 +98,12 @@ Chat and planner providers remain optional layers on top of the same runtime. - MC is already useful as a bounded runtime for Linux inspection and controlled actions - MCP is the main external interface direction, and the current experimental slice already supports approval-mediated write operations +- the official MCP Inspector CLI now validates that a real client can complete the approval-mediated mutation flow - CLI is still the most complete operator surface today - chat/provider paths are optional and should not define the product center - a first operator-configurable policy slice is landed through versioned TOML, but broader validation and operator evidence are still ahead -- concurrency and tool-schema governance work are still ahead of the current baseline +- approval concurrency is now hardened against duplicate active requests and duplicate in-flight execution for the same action envelope +- tool-schema governance and broader runtime ownership cleanup are still ahead of the current baseline ## Active Focus @@ -107,10 +111,10 @@ The current execution focus is defined by `docs/runtime-mcp-maturation-plan.md`. The next maturity steps are: -1. stronger runtime integration coverage for read and write flows -2. real-client MCP validation and contract hardening on top of the new approval flow -3. concurrency hardening and state-integrity guarantees -4. tool-schema compatibility rules and release policy +1. tool-schema compatibility rules and release policy +2. narrower runtime ownership seams, especially around `core.runtime` and `session_store` +3. container-backed integration harnesses for repeatable service/config scenarios +4. broader client evidence beyond the Inspector CLI transcript 5. broader tool expansion only after the runtime contract is stable ## Intentionally Out Of Scope Right Now @@ -132,6 +136,7 @@ At this snapshot, the project is validated by: - `PYTHONPATH=src python3 -m pytest -q` - explicit runtime/MCP integration coverage in `tests/test_runtime_policy_integration.py` and `tests/test_mcp_stdio_integration.py` - `python3 -m compileall src` +- real-client MCP validation through `python3 scripts/validate_mcp_client.py` - manual CLI smoke checks for chat, recommendations, recommendation-triggered actions, and `reconcile-timer render|install|remove` - manual CLI smoke checks for managed config write with validation and backup - manual CLI smoke checks for `process_to_unit` and `failed_services` @@ -152,6 +157,7 @@ At this snapshot, the project is validated by: - `docs/policy.md`: operator policy guide - `docs/operator-workflows.md`: bounded operator journeys - `docs/runtime-integration-testing.md`: runtime and MCP validation guide +- `docs/mcp-client-validation.md`: real MCP client validation guide - `docs/host-profile-validation.md`: validation harness guide ## Evidence Records diff --git a/docs/vps-validation-report.md b/docs/vps-validation-report.md index 1aa3c15..ff35f6e 100644 --- a/docs/vps-validation-report.md +++ b/docs/vps-validation-report.md @@ -1,10 +1,10 @@ # VPS Validation Report -Snapshot date: 2026-03-20 +Snapshot date: 2026-03-23 ## Purpose -This document records the first controlled VPS validation pass for Master Control as a private proving ground. +This document records the current controlled VPS validation pass for Master Control as a private proving ground. It is not a production-readiness claim. It is real-host evidence intended to reduce release guesswork. @@ -20,7 +20,7 @@ It is real-host evidence intended to reduce release guesswork. ## Operator Path Results -Validated successfully after host preparation: +Validated successfully on the Debian 13 VPS lab: - `./install.sh --provider heuristic` - `~/.local/bin/mc doctor` @@ -87,23 +87,25 @@ Observed result: Validated successfully in a dedicated remote virtual environment: -- `python -m ruff check .` -- `python -m mypy src` -- `PYTHONPATH=src python -m unittest discover -s tests` -- `PYTHONPATH=src python -m pytest -q` +- `python3 -m ruff check .` +- `python3 -m mypy src` +- `PYTHONPATH=src python3 -m unittest discover -s tests` +- `PYTHONPATH=src python3 -m pytest -q tests --ignore tests/test_runtime_policy_integration.py --ignore tests/test_mcp_stdio_integration.py` +- `PYTHONPATH=src python3 -m pytest -q tests/test_runtime_policy_integration.py tests/test_mcp_stdio_integration.py` +- `python3 -m compileall src` +- `PYTHONPATH=src python3 -m master_control --json doctor` +- `python3 -m pip wheel . --no-deps -w ` Observed result: - `ruff`: passed - `mypy`: passed -- `unittest`: `171` tests passed -- `pytest`: `171` tests passed - -Note: - -- later SSH reachability was transient on this VPS during additional reruns -- because of that transient host issue, this report does not claim a complete second rerun of every maintainer command on the VPS in the same session -- the operator-path `doctor` proof and the bootstrap harness proof were both still captured successfully +- `unittest`: passed +- `pytest` unit slice: passed +- `pytest` runtime/MCP integration slice: passed +- `compileall`: passed +- `doctor`: passed +- `wheel`: passed ## Artifact Handling diff --git a/scripts/validate_mcp_client.py b/scripts/validate_mcp_client.py new file mode 100644 index 0000000..139a8a8 --- /dev/null +++ b/scripts/validate_mcp_client.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import socket +import subprocess +import sys +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +@dataclass(frozen=True, slots=True) +class StepResult: + name: str + command: tuple[str, ...] + exit_code: int + stdout: str + stderr: str + parsed: dict[str, object] | None + + @property + def ok(self) -> bool: + return self.exit_code == 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Validate mc mcp-serve against the official MCP Inspector CLI and write a report." + ), + ) + parser.add_argument( + "--output-dir", + default=str(Path.cwd() / "artifacts" / "mcp-client-validation"), + help="Directory where the report and command logs will be written.", + ) + parser.add_argument( + "--python", + default=sys.executable, + help="Python interpreter used to start mc mcp-serve. Default: current interpreter.", + ) + parser.add_argument( + "--npx", + default="npx", + help="NPX binary used to run the official MCP Inspector CLI. Default: npx.", + ) + parser.add_argument( + "--inspector-package", + default="@modelcontextprotocol/inspector", + help="Inspector package used for CLI validation. Default: @modelcontextprotocol/inspector.", + ) + parser.add_argument( + "--json", + action="store_true", + help="Print the generated report as JSON instead of only the report path.", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + report = run_validation( + output_dir=Path(args.output_dir), + python_bin=args.python, + npx_bin=args.npx, + inspector_package=args.inspector_package, + ) + if args.json: + print(json.dumps(report, indent=2, sort_keys=True)) + else: + print(report["report_path"]) + return 0 if report["overall_ok"] else 1 + + +def run_validation( + *, + output_dir: Path, + python_bin: str, + npx_bin: str, + inspector_package: str, +) -> dict[str, object]: + run_id = _utc_now().strftime("%Y%m%dT%H%M%SZ") + host_slug = _slugify(socket.gethostname()) + run_dir = output_dir.expanduser().resolve() / f"{run_id}-{host_slug}" + state_dir = run_dir / "state" + managed_root = state_dir / "managed-configs" + log_dir = run_dir / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + managed_root.mkdir(parents=True, exist_ok=True) + + config_path = managed_root / "demo.ini" + config_path.write_text("[main]\nkey=old\n", encoding="utf-8") + + base_env = { + "MC_STATE_DIR": str(state_dir), + "MC_DB_PATH": str(state_dir / "mc.sqlite3"), + "MC_PROVIDER": "heuristic", + "PYTHONPATH": str(REPO_ROOT / "src"), + } + + inspector_base = [ + npx_bin, + "-y", + inspector_package, + "--cli", + "--transport", + "stdio", + ] + for key, value in base_env.items(): + inspector_base.extend(["-e", f"{key}={value}"]) + inspector_base.extend(["--", python_bin, "-m", "master_control", "mcp-serve"]) + + steps: list[StepResult] = [] + steps.append( + _run_step( + name="tools_list", + command=[*inspector_base, "--method", "tools/list"], + cwd=REPO_ROOT, + log_dir=log_dir, + ) + ) + steps.append( + _run_step( + name="system_info", + command=[ + *inspector_base, + "--method", + "tools/call", + "--tool-name", + "system_info", + ], + cwd=REPO_ROOT, + log_dir=log_dir, + ) + ) + pending = _run_step( + name="write_config_pending", + command=[ + *inspector_base, + "--method", + "tools/call", + "--tool-name", + "write_config_file", + "--tool-arg", + f"path={config_path}", + "--tool-arg", + 'content="[main]\\nkey=new\\n"', + ], + cwd=REPO_ROOT, + log_dir=log_dir, + ) + steps.append(pending) + + approval_id = _extract_approval_id(pending.parsed) + if approval_id is None: + report = _build_report( + run_dir=run_dir, + state_dir=state_dir, + config_path=config_path, + inspector_package=inspector_package, + python_bin=python_bin, + steps=steps, + notes=["Could not extract approval id from the pending write response."], + ) + _write_report(report, run_dir) + return report + + steps.append( + _run_step( + name="approval_get", + command=[ + *inspector_base, + "--method", + "tools/call", + "--tool-name", + "approval_get", + "--tool-arg", + f"id={approval_id}", + ], + cwd=REPO_ROOT, + log_dir=log_dir, + ) + ) + steps.append( + _run_step( + name="approval_approve", + command=[ + *inspector_base, + "--method", + "tools/call", + "--tool-name", + "approval_approve", + "--tool-arg", + f"id={approval_id}", + ], + cwd=REPO_ROOT, + log_dir=log_dir, + ) + ) + + report = _build_report( + run_dir=run_dir, + state_dir=state_dir, + config_path=config_path, + inspector_package=inspector_package, + python_bin=python_bin, + steps=steps, + notes=[], + ) + _write_report(report, run_dir) + return report + + +def _run_step( + *, + name: str, + command: list[str], + cwd: Path, + log_dir: Path, +) -> StepResult: + completed = subprocess.run( + command, + cwd=cwd, + capture_output=True, + text=True, + env=os.environ.copy(), + ) + stdout_path = log_dir / f"{name}.stdout.log" + stderr_path = log_dir / f"{name}.stderr.log" + stdout_path.write_text(completed.stdout, encoding="utf-8") + stderr_path.write_text(completed.stderr, encoding="utf-8") + + parsed = _parse_json_output(completed.stdout) + return StepResult( + name=name, + command=tuple(command), + exit_code=completed.returncode, + stdout=completed.stdout, + stderr=completed.stderr, + parsed=parsed, + ) + + +def _parse_json_output(stdout: str) -> dict[str, object] | None: + text = stdout.strip() + if not text: + return None + try: + payload = json.loads(text) + except json.JSONDecodeError: + return None + if isinstance(payload, dict): + return payload + return None + + +def _extract_approval_id(payload: dict[str, object] | None) -> int | None: + if not isinstance(payload, dict): + return None + structured = payload.get("structuredContent") + if not isinstance(structured, dict): + return None + approval = structured.get("approval") + if not isinstance(approval, dict): + return None + approval_id = approval.get("id") + if isinstance(approval_id, int) and not isinstance(approval_id, bool): + return approval_id + return None + + +def _build_report( + *, + run_dir: Path, + state_dir: Path, + config_path: Path, + inspector_package: str, + python_bin: str, + steps: list[StepResult], + notes: list[str], +) -> dict[str, object]: + step_reports = [ + { + "name": step.name, + "command": list(step.command), + "exit_code": step.exit_code, + "ok": step.ok, + "parsed": step.parsed, + "stdout_log": str(run_dir / "logs" / f"{step.name}.stdout.log"), + "stderr_log": str(run_dir / "logs" / f"{step.name}.stderr.log"), + } + for step in steps + ] + + overall_ok = _evaluate_report(steps, config_path) + return { + "generated_at": _utc_now().isoformat(), + "repo_root": str(REPO_ROOT), + "run_dir": str(run_dir), + "state_dir": str(state_dir), + "config_path": str(config_path), + "inspector_package": inspector_package, + "python_bin": python_bin, + "steps": step_reports, + "final_config_content": config_path.read_text(encoding="utf-8"), + "overall_ok": overall_ok, + "notes": notes, + "report_path": str(run_dir / "report.json"), + } + + +def _evaluate_report(steps: list[StepResult], config_path: Path) -> bool: + if not all(step.ok and step.parsed is not None for step in steps): + return False + + tools_list = steps[0].parsed + system_info = steps[1].parsed + pending = steps[2].parsed + approval_get = steps[3].parsed if len(steps) > 3 else None + approval_approve = steps[4].parsed if len(steps) > 4 else None + + if not isinstance(tools_list, dict) or not isinstance(system_info, dict): + return False + if not isinstance(pending, dict) or not isinstance(approval_get, dict): + return False + if not isinstance(approval_approve, dict): + return False + + tools = tools_list.get("tools") + if not isinstance(tools, list): + return False + tool_names = [item.get("name") for item in tools if isinstance(item, dict)] + required_tools = {"system_info", "write_config_file", "approval_get", "approval_approve"} + if not required_tools.issubset(set(name for name in tool_names if isinstance(name, str))): + return False + + pending_structured = pending.get("structuredContent") + if not isinstance(pending_structured, dict) or not pending_structured.get("pending_confirmation"): + return False + + fetched_structured = approval_get.get("structuredContent") + if not isinstance(fetched_structured, dict) or fetched_structured.get("status") != "pending": + return False + + approved_structured = approval_approve.get("structuredContent") + if not isinstance(approved_structured, dict): + return False + approval = approved_structured.get("approval") + execution = approved_structured.get("execution") + if not isinstance(approval, dict) or approval.get("status") != "completed": + return False + if not isinstance(execution, dict) or not bool(execution.get("ok")): + return False + if config_path.read_text(encoding="utf-8") != "[main]\nkey=new\n": + return False + return True + + +def _write_report(report: dict[str, object], run_dir: Path) -> None: + report_path = run_dir / "report.json" + report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +def _utc_now() -> datetime: + return datetime.now(UTC) + + +def _slugify(value: str) -> str: + return "".join(char if char.isalnum() else "-" for char in value.lower()).strip("-") + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/master_control/core/runtime.py b/src/master_control/core/runtime.py index 319e301..acd083a 100644 --- a/src/master_control/core/runtime.py +++ b/src/master_control/core/runtime.py @@ -675,14 +675,17 @@ def run_tool( self.store.record_audit_event("tool_execution", payload) return payload + matching_approval: dict[str, object] | None = None claimed_approval: dict[str, object] | None = None if confirmed and not _has_explicit_approval_id(context_payload): - claimed_approval = self.store.claim_latest_matching_tool_approval( + claim_result = self.store.prepare_matching_tool_approval_for_execution( tool_name=tool.spec.name, arguments=argument_payload, audit_context=context_payload, ) - if claimed_approval is not None: + matching_approval = claim_result.approval + if claim_result.outcome == "claimed" and matching_approval is not None: + claimed_approval = matching_approval context_payload["approval_id"] = claimed_approval["id"] decision = self.policy.evaluate(tool.spec, argument_payload) @@ -708,6 +711,17 @@ def run_tool( self.store.record_audit_event("tool_execution", payload) return payload + if confirmed and matching_approval is not None and claimed_approval is None: + payload = { + "ok": False, + **audit_base, + "approval_in_progress": True, + "approval": self._format_tool_approval(matching_approval), + "error": "Matching approval is already executing.", + } + self.store.record_audit_event("tool_execution", payload) + return payload + if decision.needs_confirmation and not confirmed: approval = self._create_tool_approval( tool_name=tool.spec.name, @@ -715,12 +729,19 @@ def run_tool( arguments=argument_payload, context_payload=context_payload, ) + approval_status = str(approval["status"]) + approval_required = approval_status == "pending" payload = { "ok": False, **audit_base, - "pending_confirmation": True, + "pending_confirmation": approval_required, + "approval_in_progress": approval_status == "executing", "approval": approval, - "error": "Tool requires explicit confirmation before execution.", + "error": ( + "Tool requires explicit confirmation before execution." + if approval_required + else "Matching approval is already executing." + ), } self.store.record_audit_event("tool_execution", payload) return payload diff --git a/src/master_control/interfaces/mcp/server.py b/src/master_control/interfaces/mcp/server.py index 126e175..c818c67 100644 --- a/src/master_control/interfaces/mcp/server.py +++ b/src/master_control/interfaces/mcp/server.py @@ -6,11 +6,13 @@ from dataclasses import dataclass from typing import TextIO +from master_control import __version__ from master_control.config import Settings from master_control.core.runtime import MasterControlRuntime from master_control.logging_utils import configure_logging +from master_control.tools.base import RiskLevel, ToolSpec -SUPPORTED_METHODS = frozenset( +LEGACY_SUPPORTED_METHODS = frozenset( { "initialize", "ping", @@ -24,9 +26,34 @@ } ) +JSONRPC_SUPPORTED_REQUEST_METHODS = frozenset( + { + "initialize", + "ping", + "doctor", + "tools/list", + "tools/call", + "approvals/list", + "approvals/get", + "approvals/approve", + "approvals/reject", + } +) + +JSONRPC_VERSION = "2.0" +JSONRPC_INVALID_REQUEST = -32600 +JSONRPC_METHOD_NOT_FOUND = -32601 +JSONRPC_INVALID_PARAMS = -32602 +JSONRPC_INTERNAL_ERROR = -32603 + +APPROVAL_LIST_TOOL = "approval_list" +APPROVAL_GET_TOOL = "approval_get" +APPROVAL_APPROVE_TOOL = "approval_approve" +APPROVAL_REJECT_TOOL = "approval_reject" + @dataclass(frozen=True, slots=True) -class MCPError: +class LegacyMCPError: code: str message: str @@ -37,8 +64,20 @@ def as_dict(self) -> dict[str, str]: } +@dataclass(frozen=True, slots=True) +class JSONRPCError: + code: int + message: str + + def as_dict(self) -> dict[str, object]: + return { + "code": self.code, + "message": self.message, + } + + class MasterControlMCPServer: - """Experimental stdio MCP interface with approval-mediated write operations.""" + """MCP stdio interface with approval-mediated write operations.""" def __init__(self, runtime: MasterControlRuntime) -> None: self.runtime = runtime @@ -56,49 +95,65 @@ def run( if not line: continue response = self._handle_line(line) + if response is None: + continue stdout.write(json.dumps(response, sort_keys=True) + "\n") stdout.flush() - def _handle_line(self, line: str) -> dict[str, object]: + def _handle_line(self, line: str) -> dict[str, object] | None: try: payload = json.loads(line) except json.JSONDecodeError as exc: - return self._error_response( + return self._legacy_error_response( request_id=None, - error=MCPError("invalid_json", f"Invalid JSON request: {exc.msg}"), + error=LegacyMCPError("invalid_json", f"Invalid JSON request: {exc.msg}"), ) if not isinstance(payload, dict): - return self._error_response( + return self._legacy_error_response( request_id=None, - error=MCPError("invalid_request", "Requests must be JSON objects."), + error=LegacyMCPError("invalid_request", "Requests must be JSON objects."), ) + jsonrpc_version = payload.get("jsonrpc") + if jsonrpc_version is None: + return self._handle_legacy_request(payload) + if jsonrpc_version != JSONRPC_VERSION: + return self._jsonrpc_error_response( + request_id=payload.get("id"), + error=JSONRPCError( + JSONRPC_INVALID_REQUEST, + f"Unsupported JSON-RPC version: {jsonrpc_version}", + ), + ) + return self._handle_jsonrpc_request(payload) + + def _handle_legacy_request(self, payload: dict[str, object]) -> dict[str, object]: request_id = payload.get("id") method = payload.get("method") params = payload.get("params") if not isinstance(method, str) or not method: - return self._error_response( + return self._legacy_error_response( request_id=request_id, - error=MCPError("invalid_request", "Request is missing a string method."), + error=LegacyMCPError("invalid_request", "Request is missing a string method."), ) - if method not in SUPPORTED_METHODS: - return self._error_response( + if method not in LEGACY_SUPPORTED_METHODS: + return self._legacy_error_response( request_id=request_id, - error=MCPError("unsupported_method", f"Unsupported method: {method}"), + error=LegacyMCPError("unsupported_method", f"Unsupported method: {method}"), ) try: - result = self._dispatch(method, params) + result = self._dispatch_legacy(method, params) except ValueError as exc: - return self._error_response( + return self._legacy_error_response( request_id=request_id, - error=MCPError("invalid_params", str(exc)), + error=LegacyMCPError("invalid_params", str(exc)), ) except Exception as exc: # pragma: no cover - return self._error_response( + return self._legacy_error_response( request_id=request_id, - error=MCPError("runtime_error", str(exc)), + error=LegacyMCPError("runtime_error", str(exc)), ) return { @@ -107,7 +162,48 @@ def _handle_line(self, line: str) -> dict[str, object]: "result": result, } - def _dispatch(self, method: str, params: object) -> dict[str, object]: + def _handle_jsonrpc_request(self, payload: dict[str, object]) -> dict[str, object] | None: + request_id = payload.get("id") + method = payload.get("method") + params = payload.get("params") + if not isinstance(method, str) or not method: + return self._jsonrpc_error_response( + request_id=request_id, + error=JSONRPCError(JSONRPC_INVALID_REQUEST, "Request is missing a string method."), + ) + if request_id is None: + if method == "notifications/initialized": + return None + return None + if method not in JSONRPC_SUPPORTED_REQUEST_METHODS: + return self._jsonrpc_error_response( + request_id=request_id, + error=JSONRPCError( + JSONRPC_METHOD_NOT_FOUND, + f"Unsupported method: {method}", + ), + ) + + try: + result = self._dispatch_jsonrpc(method, params) + except ValueError as exc: + return self._jsonrpc_error_response( + request_id=request_id, + error=JSONRPCError(JSONRPC_INVALID_PARAMS, str(exc)), + ) + except Exception as exc: # pragma: no cover + return self._jsonrpc_error_response( + request_id=request_id, + error=JSONRPCError(JSONRPC_INTERNAL_ERROR, str(exc)), + ) + + return { + "jsonrpc": JSONRPC_VERSION, + "id": request_id, + "result": result, + } + + def _dispatch_legacy(self, method: str, params: object) -> dict[str, object]: if method == "initialize": return { "server": { @@ -118,7 +214,7 @@ def _dispatch(self, method: str, params: object) -> dict[str, object]: "capabilities": { "tools": { "mode": "approval_controlled", - "count": len(self._list_exposed_tools()), + "count": len(self._list_exposed_tools(standard_mcp=False)), }, "approvals": { "mode": "explicit", @@ -131,7 +227,7 @@ def _dispatch(self, method: str, params: object) -> dict[str, object]: if method == "doctor": return self.runtime.doctor() if method == "tools/list": - return {"tools": self._list_exposed_tools()} + return {"tools": self._list_exposed_tools(standard_mcp=False)} if method == "tools/call": arguments = params if isinstance(params, dict) else {} tool_name = arguments.get("name") @@ -140,48 +236,344 @@ def _dispatch(self, method: str, params: object) -> dict[str, object]: raise ValueError("tools/call requires params.name.") if not isinstance(tool_arguments, dict): raise ValueError("tools/call params.arguments must be an object.") - return self.runtime.run_tool( - tool_name, - tool_arguments, - audit_context={"source": "mcp_stdio"}, - ) + return self._call_legacy_tool(tool_name, tool_arguments) if method == "approvals/list": - arguments = params if isinstance(params, dict) else {} - status = arguments.get("status") - limit = arguments.get("limit", 100) - if status is not None and not isinstance(status, str): - raise ValueError("approvals/list params.status must be a string when provided.") - if not isinstance(limit, int) or isinstance(limit, bool) or limit <= 0: - raise ValueError("approvals/list params.limit must be a positive integer.") + status, limit = self._coerce_approval_list_params(params) return self.runtime.list_tool_approvals(status=status, limit=limit) if method == "approvals/get": - arguments = params if isinstance(params, dict) else {} - approval_id = arguments.get("id") - if not isinstance(approval_id, int) or isinstance(approval_id, bool): - raise ValueError("approvals/get requires params.id as an integer.") + approval_id = self._coerce_approval_id(params, method="approvals/get") return self.runtime.get_tool_approval(approval_id) if method == "approvals/approve": - arguments = params if isinstance(params, dict) else {} - approval_id = arguments.get("id") - if not isinstance(approval_id, int) or isinstance(approval_id, bool): - raise ValueError("approvals/approve requires params.id as an integer.") + approval_id = self._coerce_approval_id(params, method="approvals/approve") return self.runtime.approve_tool_approval(approval_id) if method == "approvals/reject": + approval_id = self._coerce_approval_id(params, method="approvals/reject") + return self.runtime.reject_tool_approval(approval_id) + raise ValueError(f"Unsupported method: {method}") + + def _dispatch_jsonrpc(self, method: str, params: object) -> dict[str, object]: + if method == "initialize": + arguments = params if isinstance(params, dict) else {} + protocol_version = arguments.get("protocolVersion") + if not isinstance(protocol_version, str) or not protocol_version: + raise ValueError("initialize requires params.protocolVersion.") + return { + "protocolVersion": protocol_version, + "capabilities": { + "tools": {}, + }, + "serverInfo": { + "name": "master-control", + "version": __version__, + }, + "instructions": ( + "Master Control exposes bounded Linux host tools and approval tools over MCP. " + "Mutating host tools return pending approval payloads; use approval_list, " + "approval_get, approval_approve, and approval_reject to inspect and resolve them." + ), + } + if method == "ping": + return {} + if method == "doctor": + return self.runtime.doctor() + if method == "tools/list": + return {"tools": self._list_exposed_tools(standard_mcp=True)} + if method == "tools/call": arguments = params if isinstance(params, dict) else {} - approval_id = arguments.get("id") - if not isinstance(approval_id, int) or isinstance(approval_id, bool): - raise ValueError("approvals/reject requires params.id as an integer.") + tool_name = arguments.get("name") + tool_arguments = arguments.get("arguments", {}) + if not isinstance(tool_name, str) or not tool_name: + raise ValueError("tools/call requires params.name.") + if not isinstance(tool_arguments, dict): + raise ValueError("tools/call params.arguments must be an object.") + return self._call_standard_tool(tool_name, tool_arguments) + if method == "approvals/list": + status, limit = self._coerce_approval_list_params(params) + return self.runtime.list_tool_approvals(status=status, limit=limit) + if method == "approvals/get": + approval_id = self._coerce_approval_id(params, method="approvals/get") + return self.runtime.get_tool_approval(approval_id) + if method == "approvals/approve": + approval_id = self._coerce_approval_id(params, method="approvals/approve") + return self.runtime.approve_tool_approval(approval_id) + if method == "approvals/reject": + approval_id = self._coerce_approval_id(params, method="approvals/reject") return self.runtime.reject_tool_approval(approval_id) raise ValueError(f"Unsupported method: {method}") - def _list_exposed_tools(self) -> list[dict[str, object]]: - return [spec.as_dict() for spec in self.runtime.list_tools()] + def _call_legacy_tool( + self, + tool_name: str, + arguments: dict[str, object], + ) -> dict[str, object]: + if tool_name == APPROVAL_LIST_TOOL: + status, limit = self._coerce_approval_list_tool_arguments(arguments) + return self.runtime.list_tool_approvals(status=status, limit=limit) + if tool_name == APPROVAL_GET_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + return self.runtime.get_tool_approval(approval_id) + if tool_name == APPROVAL_APPROVE_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + return self.runtime.approve_tool_approval(approval_id) + if tool_name == APPROVAL_REJECT_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + return self.runtime.reject_tool_approval(approval_id) + return self.runtime.run_tool( + tool_name, + arguments, + audit_context={"source": "mcp_stdio"}, + ) - def _error_response( + def _call_standard_tool( + self, + tool_name: str, + arguments: dict[str, object], + ) -> dict[str, object]: + if tool_name == APPROVAL_LIST_TOOL: + status, limit = self._coerce_approval_list_tool_arguments(arguments) + payload = self.runtime.list_tool_approvals(status=status, limit=limit) + return self._build_standard_tool_result(payload, is_error=False) + if tool_name == APPROVAL_GET_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + payload = self.runtime.get_tool_approval(approval_id) + return self._build_standard_tool_result(payload, is_error=False) + if tool_name == APPROVAL_APPROVE_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + payload = self.runtime.approve_tool_approval(approval_id) + return self._build_standard_tool_result(payload, is_error=False) + if tool_name == APPROVAL_REJECT_TOOL: + approval_id = self._coerce_approval_tool_id(arguments, tool_name=tool_name) + payload = self.runtime.reject_tool_approval(approval_id) + return self._build_standard_tool_result(payload, is_error=False) + + payload = self.runtime.run_tool( + tool_name, + arguments, + audit_context={"source": "mcp_stdio"}, + ) + is_error = bool(payload.get("ok")) is False and not bool(payload.get("pending_confirmation")) + return self._build_standard_tool_result(payload, is_error=is_error) + + def _list_exposed_tools(self, *, standard_mcp: bool) -> list[dict[str, object]]: + host_specs = self.runtime.list_tools() + if standard_mcp: + return [ + *[self._host_tool_to_standard_mcp(spec) for spec in host_specs], + *self._approval_tools_standard_mcp(), + ] + return [ + *[spec.as_dict() for spec in host_specs], + *self._approval_tools_legacy(), + ] + + def _host_tool_to_standard_mcp(self, spec: ToolSpec) -> dict[str, object]: + properties: dict[str, object] = {argument: {} for argument in spec.arguments} + annotations = { + "readOnlyHint": spec.risk == RiskLevel.READ_ONLY, + "idempotentHint": spec.risk == RiskLevel.READ_ONLY, + "openWorldHint": False, + } + return { + "name": spec.name, + "description": spec.description, + "inputSchema": { + "type": "object", + "properties": properties, + }, + "annotations": annotations, + "_meta": { + "master-control/risk": spec.risk.value, + }, + } + + def _approval_tools_legacy(self) -> list[dict[str, object]]: + return [ + { + "name": APPROVAL_LIST_TOOL, + "description": "List active or historical approvals.", + "risk": RiskLevel.READ_ONLY.value, + "arguments": ["status", "limit"], + }, + { + "name": APPROVAL_GET_TOOL, + "description": "Fetch a specific approval by id.", + "risk": RiskLevel.READ_ONLY.value, + "arguments": ["id"], + }, + { + "name": APPROVAL_APPROVE_TOOL, + "description": "Approve and execute a pending approval by id.", + "risk": RiskLevel.MUTATING_SAFE.value, + "arguments": ["id"], + }, + { + "name": APPROVAL_REJECT_TOOL, + "description": "Reject a pending approval by id.", + "risk": RiskLevel.MUTATING_SAFE.value, + "arguments": ["id"], + }, + ] + + def _approval_tools_standard_mcp(self) -> list[dict[str, object]]: + return [ + self._approval_tool_definition( + name=APPROVAL_LIST_TOOL, + description="List active or historical approvals.", + properties={ + "status": {"type": "string"}, + "limit": {"type": "integer"}, + }, + read_only=True, + ), + self._approval_tool_definition( + name=APPROVAL_GET_TOOL, + description="Fetch a specific approval by id.", + properties={ + "id": {"type": "integer"}, + }, + required=("id",), + read_only=True, + ), + self._approval_tool_definition( + name=APPROVAL_APPROVE_TOOL, + description="Approve and execute a pending approval by id.", + properties={ + "id": {"type": "integer"}, + }, + required=("id",), + read_only=False, + ), + self._approval_tool_definition( + name=APPROVAL_REJECT_TOOL, + description="Reject a pending approval by id.", + properties={ + "id": {"type": "integer"}, + }, + required=("id",), + read_only=False, + ), + ] + + def _approval_tool_definition( + self, + *, + name: str, + description: str, + properties: dict[str, object], + required: tuple[str, ...] = (), + read_only: bool, + ) -> dict[str, object]: + payload: dict[str, object] = { + "name": name, + "description": description, + "inputSchema": { + "type": "object", + "properties": properties, + }, + "annotations": { + "readOnlyHint": read_only, + "idempotentHint": read_only, + "openWorldHint": False, + }, + "_meta": { + "master-control/risk": ( + RiskLevel.READ_ONLY.value if read_only else RiskLevel.MUTATING_SAFE.value + ), + }, + } + if required: + payload["inputSchema"]["required"] = list(required) # type: ignore[index] + return payload + + def _build_standard_tool_result( + self, + payload: dict[str, object], + *, + is_error: bool, + ) -> dict[str, object]: + result: dict[str, object] = { + "content": [ + { + "type": "text", + "text": self._render_standard_tool_text(payload), + } + ], + "structuredContent": payload, + } + if is_error: + result["isError"] = True + return result + + def _render_standard_tool_text(self, payload: dict[str, object]) -> str: + approval = payload.get("approval") + if isinstance(approval, dict): + approval_id = approval.get("id") + approval_status = approval.get("status") + if payload.get("pending_confirmation"): + return f"Approval required. approval_id={approval_id}, status={approval_status}." + if payload.get("approval_in_progress"): + return f"Approval {approval_id} is already executing." + if payload.get("ok") is False: + error = payload.get("error") + if isinstance(error, str) and error: + return error + return "Tool execution failed." + result = payload.get("result") + if isinstance(result, dict): + try: + return json.dumps(result, sort_keys=True) + except TypeError: + return str(result) + try: + return json.dumps(payload, sort_keys=True) + except TypeError: + return str(payload) + + def _coerce_approval_list_params(self, params: object) -> tuple[str | None, int]: + arguments = params if isinstance(params, dict) else {} + status = arguments.get("status") + limit = arguments.get("limit", 100) + if status is not None and not isinstance(status, str): + raise ValueError("approvals/list params.status must be a string when provided.") + if not isinstance(limit, int) or isinstance(limit, bool) or limit <= 0: + raise ValueError("approvals/list params.limit must be a positive integer.") + return status, limit + + def _coerce_approval_id(self, params: object, *, method: str) -> int: + arguments = params if isinstance(params, dict) else {} + approval_id = arguments.get("id") + if not isinstance(approval_id, int) or isinstance(approval_id, bool): + raise ValueError(f"{method} requires params.id as an integer.") + return approval_id + + def _coerce_approval_list_tool_arguments( + self, + arguments: dict[str, object], + ) -> tuple[str | None, int]: + status = arguments.get("status") + limit = arguments.get("limit", 100) + if status is not None and not isinstance(status, str): + raise ValueError("approval_list requires 'status' to be a string when provided.") + if not isinstance(limit, int) or isinstance(limit, bool) or limit <= 0: + raise ValueError("approval_list requires 'limit' to be a positive integer.") + return status, limit + + def _coerce_approval_tool_id( + self, + arguments: dict[str, object], + *, + tool_name: str, + ) -> int: + approval_id = arguments.get("id") + if not isinstance(approval_id, int) or isinstance(approval_id, bool): + raise ValueError(f"{tool_name} requires 'id' as an integer.") + return approval_id + + def _legacy_error_response( self, *, request_id: object, - error: MCPError, + error: LegacyMCPError, ) -> dict[str, object]: return { "id": request_id, @@ -189,11 +581,23 @@ def _error_response( "error": error.as_dict(), } + def _jsonrpc_error_response( + self, + *, + request_id: object, + error: JSONRPCError, + ) -> dict[str, object]: + return { + "jsonrpc": JSONRPC_VERSION, + "id": request_id, + "error": error.as_dict(), + } + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="mc-mcp", - description="Run the experimental Master Control MCP interface with approval flow.", + description="Run the Master Control MCP interface with approval flow.", ) return parser diff --git a/src/master_control/store/session_store.py b/src/master_control/store/session_store.py index c6778eb..92287df 100644 --- a/src/master_control/store/session_store.py +++ b/src/master_control/store/session_store.py @@ -1,8 +1,10 @@ from __future__ import annotations +import hashlib import json import sqlite3 from contextlib import closing +from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path @@ -85,6 +87,7 @@ risk TEXT NOT NULL, arguments_json TEXT NOT NULL, audit_context_json TEXT NOT NULL, + action_digest TEXT, summary TEXT NOT NULL, cli_command TEXT NOT NULL, chat_command TEXT NOT NULL, @@ -98,6 +101,12 @@ """ +@dataclass(frozen=True, slots=True) +class ToolApprovalExecutionMatch: + outcome: str + approval: dict[str, object] | None + + class SessionStore: def __init__(self, path: Path) -> None: self.path = path @@ -165,6 +174,14 @@ def _migrate_schema(self, connection: sqlite3.Connection) -> None: "session_id": "INTEGER", }, ) + self._ensure_columns( + connection, + "tool_approvals", + { + "action_digest": "TEXT", + }, + ) + self._backfill_tool_approval_digests(connection) def _ensure_indexes(self, connection: sqlite3.Connection) -> None: connection.execute( @@ -185,6 +202,12 @@ def _ensure_indexes(self, connection: sqlite3.Connection) -> None: ON tool_approvals(tool_name, status, id DESC) """ ) + connection.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tool_approvals_action_digest + ON tool_approvals(action_digest, status, id DESC) + """ + ) def _ensure_columns( self, @@ -199,6 +222,38 @@ def _ensure_columns( continue connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}") + def _backfill_tool_approval_digests(self, connection: sqlite3.Connection) -> None: + cursor = connection.execute( + """ + SELECT id, tool_name, arguments_json, audit_context_json + FROM tool_approvals + WHERE action_digest IS NULL OR action_digest = '' + """ + ) + rows = cursor.fetchall() + for approval_id, tool_name, arguments_json, audit_context_json in rows: + if not isinstance(tool_name, str): + continue + if not isinstance(arguments_json, str): + arguments_json = "{}" + if not isinstance(audit_context_json, str): + audit_context_json = "{}" + connection.execute( + """ + UPDATE tool_approvals + SET action_digest = ? + WHERE id = ? + """, + ( + _build_tool_approval_digest( + tool_name=tool_name, + arguments_json=arguments_json, + audit_context_json=audit_context_json, + ), + approval_id, + ), + ) + def record_audit_event(self, event_type: str, payload: dict[str, object]) -> None: serialized_payload = json.dumps(payload, sort_keys=True) with closing(self._connect()) as connection: @@ -249,38 +304,51 @@ def create_tool_approval( cli_command: str, chat_command: str, ) -> dict[str, object]: + arguments_json, audit_context_json, action_digest = _serialize_tool_approval_identity( + tool_name=tool_name, + arguments=arguments, + audit_context=audit_context, + ) with closing(self._connect()) as connection: - cursor = connection.execute( - """ - INSERT INTO tool_approvals ( - tool_name, - risk, - arguments_json, - audit_context_json, - summary, - cli_command, - chat_command + connection.execute("BEGIN IMMEDIATE") + existing_row = self._select_active_tool_approval_row_by_digest(connection, action_digest) + if existing_row is None: + cursor = connection.execute( + """ + INSERT INTO tool_approvals ( + tool_name, + risk, + arguments_json, + audit_context_json, + action_digest, + summary, + cli_command, + chat_command + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + tool_name, + risk, + arguments_json, + audit_context_json, + action_digest, + summary, + cli_command, + chat_command, + ), ) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - tool_name, - risk, - json.dumps(arguments, sort_keys=True), - json.dumps(audit_context, sort_keys=True), - summary, - cli_command, - chat_command, - ), - ) + approval_id = cursor.lastrowid + if approval_id is None: + connection.rollback() + raise RuntimeError("SQLite did not return a tool approval id.") + created_row = self._select_tool_approval_row(connection, int(approval_id)) + connection.commit() + if created_row is None: + raise RuntimeError(f"Tool approval {approval_id} disappeared after insert.") + return self._row_to_tool_approval(created_row) connection.commit() - approval_id = cursor.lastrowid - if approval_id is None: - raise RuntimeError("SQLite did not return a tool approval id.") - approval = self.get_tool_approval(int(approval_id)) - if approval is None: - raise RuntimeError(f"Tool approval {approval_id} disappeared after insert.") - return approval + return self._row_to_tool_approval(existing_row) def list_tool_approvals( self, @@ -419,30 +487,40 @@ def claim_latest_matching_tool_approval( arguments: dict[str, object], audit_context: dict[str, object], ) -> dict[str, object] | None: + match = self.prepare_matching_tool_approval_for_execution( + tool_name=tool_name, + arguments=arguments, + audit_context=audit_context, + ) + if match.outcome != "claimed": + return None + return match.approval + + def prepare_matching_tool_approval_for_execution( + self, + *, + tool_name: str, + arguments: dict[str, object], + audit_context: dict[str, object], + ) -> ToolApprovalExecutionMatch: + _, _, action_digest = _serialize_tool_approval_identity( + tool_name=tool_name, + arguments=arguments, + audit_context=audit_context, + ) with closing(self._connect()) as connection: connection.execute("BEGIN IMMEDIATE") - cursor = connection.execute( - """ - SELECT id - FROM tool_approvals - WHERE tool_name = ? - AND status = 'pending' - AND arguments_json = ? - AND audit_context_json = ? - ORDER BY id DESC - LIMIT 1 - """, - ( - tool_name, - json.dumps(arguments, sort_keys=True), - json.dumps(audit_context, sort_keys=True), - ), - ) - row = cursor.fetchone() + row = self._select_active_tool_approval_row_by_digest(connection, action_digest) if row is None: connection.rollback() - return None - approval_id = int(row[0]) + return ToolApprovalExecutionMatch(outcome="none", approval=None) + if str(row[8]) == "executing": + connection.commit() + return ToolApprovalExecutionMatch( + outcome="already_executing", + approval=self._row_to_tool_approval(row), + ) + approval_id = _coerce_int(row[0], "tool approval id") cursor = connection.execute( """ UPDATE tool_approvals @@ -452,35 +530,22 @@ def claim_latest_matching_tool_approval( (approval_id,), ) if cursor.rowcount != 1: - connection.rollback() - return None - cursor = connection.execute( - """ - SELECT - id, - tool_name, - risk, - arguments_json, - audit_context_json, - summary, - cli_command, - chat_command, - status, - execution_payload_json, - error_text, - created_at, - updated_at, - resolved_at - FROM tool_approvals - WHERE id = ? - """, - (approval_id,), - ) - claimed_row = cursor.fetchone() + current_row = self._select_active_tool_approval_row_by_digest(connection, action_digest) + if current_row is None: + connection.rollback() + return ToolApprovalExecutionMatch(outcome="none", approval=None) + connection.commit() + outcome = "already_executing" if str(current_row[8]) == "executing" else "none" + approval = self._row_to_tool_approval(current_row) if outcome != "none" else None + return ToolApprovalExecutionMatch(outcome=outcome, approval=approval) + claimed_row = self._select_tool_approval_row(connection, approval_id) connection.commit() if claimed_row is None: - return None - return self._row_to_tool_approval(claimed_row) + return ToolApprovalExecutionMatch(outcome="none", approval=None) + return ToolApprovalExecutionMatch( + outcome="claimed", + approval=self._row_to_tool_approval(claimed_row), + ) def finish_tool_approval( self, @@ -1191,6 +1256,72 @@ def _row_to_tool_approval(self, row: tuple[object, ...]) -> dict[str, object]: "resolved_at": row[13], } + def _select_tool_approval_row( + self, + connection: sqlite3.Connection, + approval_id: int, + ) -> tuple[object, ...] | None: + cursor = connection.execute( + """ + SELECT + id, + tool_name, + risk, + arguments_json, + audit_context_json, + summary, + cli_command, + chat_command, + status, + execution_payload_json, + error_text, + created_at, + updated_at, + resolved_at + FROM tool_approvals + WHERE id = ? + """, + (approval_id,), + ) + return cursor.fetchone() + + def _select_active_tool_approval_row_by_digest( + self, + connection: sqlite3.Connection, + action_digest: str, + ) -> tuple[object, ...] | None: + cursor = connection.execute( + """ + SELECT + id, + tool_name, + risk, + arguments_json, + audit_context_json, + summary, + cli_command, + chat_command, + status, + execution_payload_json, + error_text, + created_at, + updated_at, + resolved_at + FROM tool_approvals + WHERE action_digest = ? + AND status IN ('pending', 'executing') + ORDER BY + CASE status + WHEN 'pending' THEN 0 + ELSE 1 + END, + id DESC + LIMIT 1 + """, + (action_digest,), + ) + return cursor.fetchone() + def _coerce_int(value: object, label: str) -> int: if isinstance(value, int) and not isinstance(value, bool): @@ -1210,3 +1341,32 @@ def _deserialize_json_object(value: object) -> dict[str, object]: if isinstance(payload, dict): return payload return {} + + +def _serialize_tool_approval_identity( + *, + tool_name: str, + arguments: dict[str, object], + audit_context: dict[str, object], +) -> tuple[str, str, str]: + arguments_json = json.dumps(arguments, sort_keys=True) + audit_context_json = json.dumps(audit_context, sort_keys=True) + return ( + arguments_json, + audit_context_json, + _build_tool_approval_digest( + tool_name=tool_name, + arguments_json=arguments_json, + audit_context_json=audit_context_json, + ), + ) + + +def _build_tool_approval_digest( + *, + tool_name: str, + arguments_json: str, + audit_context_json: str, +) -> str: + digest_input = f"{tool_name}\x1f{arguments_json}\x1f{audit_context_json}" + return hashlib.sha256(digest_input.encode("utf-8")).hexdigest() diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 0eaa67a..8b87e5f 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -25,6 +25,8 @@ def test_list_tools_exposes_read_and_write_tools(self) -> None: tool_names = [item["name"] for item in tools if isinstance(item, dict)] self.assertIn("system_info", tool_names) self.assertIn("write_config_file", tool_names) + self.assertIn("approval_list", tool_names) + self.assertIn("approval_approve", tool_names) def test_tools_call_runs_read_only_tool(self) -> None: with tempfile.TemporaryDirectory() as tmp_dir: @@ -190,6 +192,187 @@ def test_approvals_reject_closes_pending_tool(self) -> None: self.assertEqual(rejected["result"]["status"], "rejected") self.assertEqual(config_path.read_text(encoding="utf-8"), "[main]\nkey=old\n") + def test_repeated_mutating_request_reuses_same_pending_approval(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + managed_root = root / "state" / "managed-configs" + managed_root.mkdir(parents=True, exist_ok=True) + config_path = managed_root / "demo.ini" + config_path.write_text("[main]\nkey=old\n", encoding="utf-8") + + server = self._build_server(root) + first = server._handle_line( + json.dumps( + { + "id": "req-9", + "method": "tools/call", + "params": { + "name": "write_config_file", + "arguments": { + "path": str(config_path), + "content": "[main]\nkey=new\n", + }, + }, + } + ) + ) + second = server._handle_line( + json.dumps( + { + "id": "req-10", + "method": "tools/call", + "params": { + "name": "write_config_file", + "arguments": { + "path": str(config_path), + "content": "[main]\nkey=new\n", + }, + }, + } + ) + ) + + first_approval_id = first["result"]["approval"]["id"] + second_approval_id = second["result"]["approval"]["id"] + self.assertEqual(first_approval_id, second_approval_id) + + listed = server._handle_line( + json.dumps( + { + "id": "req-11", + "method": "approvals/list", + "params": {"status": "pending"}, + } + ) + ) + self.assertEqual(len(listed["result"]["approvals"]), 1) + + def test_jsonrpc_initialize_and_tool_flow_is_standard_mcp_compatible(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + managed_root = root / "state" / "managed-configs" + managed_root.mkdir(parents=True, exist_ok=True) + config_path = managed_root / "demo.ini" + config_path.write_text("[main]\nkey=old\n", encoding="utf-8") + + server = self._build_server(root) + + initialized = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-06-18", + "capabilities": {}, + "clientInfo": {"name": "test-client", "version": "1.0.0"}, + }, + } + ) + ) + assert initialized is not None + self.assertEqual(initialized["jsonrpc"], "2.0") + self.assertEqual(initialized["result"]["protocolVersion"], "2025-06-18") + self.assertIn("serverInfo", initialized["result"]) + + notification = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "method": "notifications/initialized", + } + ) + ) + self.assertIsNone(notification) + + listed = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + } + ) + ) + assert listed is not None + tool_names = [item["name"] for item in listed["result"]["tools"]] + self.assertIn("system_info", tool_names) + self.assertIn("write_config_file", tool_names) + self.assertIn("approval_list", tool_names) + self.assertIn("approval_approve", tool_names) + + read_only = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": {"name": "system_info", "arguments": {}}, + } + ) + ) + assert read_only is not None + self.assertEqual(read_only["jsonrpc"], "2.0") + self.assertNotIn("isError", read_only["result"]) + + pending = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 4, + "method": "tools/call", + "params": { + "name": "write_config_file", + "arguments": { + "path": str(config_path), + "content": "[main]\nkey=new\n", + }, + }, + } + ) + ) + assert pending is not None + structured_pending = pending["result"]["structuredContent"] + approval_id = structured_pending["approval"]["id"] + self.assertTrue(structured_pending["pending_confirmation"]) + + fetched = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 5, + "method": "tools/call", + "params": { + "name": "approval_get", + "arguments": {"id": approval_id}, + }, + } + ) + ) + assert fetched is not None + structured_fetched = fetched["result"]["structuredContent"] + self.assertEqual(structured_fetched["status"], "pending") + + approved = server._handle_line( + json.dumps( + { + "jsonrpc": "2.0", + "id": 6, + "method": "tools/call", + "params": { + "name": "approval_approve", + "arguments": {"id": approval_id}, + }, + } + ) + ) + assert approved is not None + structured_approved = approved["result"]["structuredContent"] + self.assertEqual(structured_approved["approval"]["status"], "completed") + self.assertTrue(structured_approved["execution"]["ok"]) + self.assertEqual(config_path.read_text(encoding="utf-8"), "[main]\nkey=new\n") + def _build_server(self, root: Path) -> MasterControlMCPServer: state_dir = root / "state" settings = Settings( diff --git a/tests/test_mcp_stdio_integration.py b/tests/test_mcp_stdio_integration.py index e5efa1d..766f741 100644 --- a/tests/test_mcp_stdio_integration.py +++ b/tests/test_mcp_stdio_integration.py @@ -85,6 +85,88 @@ def test_stdio_server_round_trips_initialize_and_write_approval_flow(self) -> No self.assertEqual(approved["result"]["approval"]["status"], "completed") self.assertEqual(config_path.read_text(encoding="utf-8"), "[main]\nkey=new\n") + def test_stdio_server_supports_standard_jsonrpc_tool_flow(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + state_dir = Path(tmp_dir) / "state" + managed_root = state_dir / "managed-configs" + managed_root.mkdir(parents=True, exist_ok=True) + config_path = managed_root / "demo.ini" + config_path.write_text("[main]\nkey=old\n", encoding="utf-8") + + with self._start_server(state_dir) as process: + initialize = self._request( + process, + { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-06-18", + "capabilities": {}, + "clientInfo": {"name": "stdio-test", "version": "1.0.0"}, + }, + }, + ) + self.assertEqual(initialize["jsonrpc"], "2.0") + self.assertEqual(initialize["result"]["protocolVersion"], "2025-06-18") + + notification = self._request_optional( + process, + { + "jsonrpc": "2.0", + "method": "notifications/initialized", + }, + ) + self.assertIsNone(notification) + + listed = self._request( + process, + { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + }, + ) + tool_names = [item["name"] for item in listed["result"]["tools"] if isinstance(item, dict)] + self.assertIn("system_info", tool_names) + self.assertIn("approval_approve", tool_names) + + pending = self._request( + process, + { + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": { + "name": "write_config_file", + "arguments": { + "path": str(config_path), + "content": "[main]\nkey=new\n", + }, + }, + }, + ) + approval_id = pending["result"]["structuredContent"]["approval"]["id"] + self.assertTrue(pending["result"]["structuredContent"]["pending_confirmation"]) + + approved = self._request( + process, + { + "jsonrpc": "2.0", + "id": 4, + "method": "tools/call", + "params": { + "name": "approval_approve", + "arguments": {"id": approval_id}, + }, + }, + ) + self.assertEqual( + approved["result"]["structuredContent"]["approval"]["status"], + "completed", + ) + self.assertEqual(config_path.read_text(encoding="utf-8"), "[main]\nkey=new\n") + def _request( self, process: subprocess.Popen[str], @@ -102,6 +184,26 @@ def _request( raise AssertionError(f"MCP server closed the pipe unexpectedly. stderr={stderr!r}") return json.loads(line) + def _request_optional( + self, + process: subprocess.Popen[str], + payload: dict[str, Any], + ) -> dict[str, Any] | None: + assert process.stdin is not None + assert process.stdout is not None + process.stdin.write(json.dumps(payload) + "\n") + process.stdin.flush() + process.stdout.flush() + import select + + ready, _, _ = select.select([process.stdout], [], [], 0.25) + if not ready: + return None + line = process.stdout.readline() + if not line: + return None + return json.loads(line) + def _start_server(self, state_dir: Path): env = os.environ.copy() env["MC_STATE_DIR"] = str(state_dir) diff --git a/tests/test_session_store.py b/tests/test_session_store.py index b63ec80..daad0c8 100644 --- a/tests/test_session_store.py +++ b/tests/test_session_store.py @@ -2,7 +2,9 @@ import sqlite3 import tempfile +import threading import unittest +from concurrent.futures import ThreadPoolExecutor from contextlib import closing from pathlib import Path @@ -104,9 +106,108 @@ def test_claim_latest_matching_tool_approval_selects_pending_request(self) -> No ) assert claimed is not None - self.assertEqual(claimed["id"], second["id"]) + self.assertEqual(first["id"], second["id"]) + self.assertEqual(claimed["id"], first["id"]) self.assertEqual(claimed["status"], "executing") - self.assertEqual(store.get_tool_approval(int(first["id"]))["status"], "pending") + self.assertEqual(store.get_tool_approval(int(first["id"]))["status"], "executing") + + def test_create_tool_approval_deduplicates_concurrent_identical_requests(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + store = SessionStore(Path(tmp_dir) / "mc.sqlite3") + store.initialize() + + def create() -> dict[str, object]: + return store.create_tool_approval( + tool_name="write_config_file", + risk="mutating_safe", + arguments={"path": "/tmp/demo.ini", "content": "same"}, + audit_context={"source": "test"}, + summary="same", + cli_command="same", + chat_command="same", + ) + + with ThreadPoolExecutor(max_workers=4) as executor: + results = list(executor.map(lambda _: create(), range(4))) + + approval_ids = {int(item["id"]) for item in results} + self.assertEqual(len(approval_ids), 1) + pending = store.list_tool_approvals(status="pending", limit=10) + self.assertEqual(len(pending), 1) + + def test_prepare_matching_tool_approval_reports_inflight_execution(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + store = SessionStore(Path(tmp_dir) / "mc.sqlite3") + store.initialize() + + created = store.create_tool_approval( + tool_name="write_config_file", + risk="mutating_safe", + arguments={"path": "/tmp/demo.ini", "content": "same"}, + audit_context={"source": "test"}, + summary="same", + cli_command="same", + chat_command="same", + ) + + claimed = store.prepare_matching_tool_approval_for_execution( + tool_name="write_config_file", + arguments={"path": "/tmp/demo.ini", "content": "same"}, + audit_context={"source": "test"}, + ) + self.assertEqual(claimed.outcome, "claimed") + assert claimed.approval is not None + self.assertEqual(claimed.approval["id"], created["id"]) + self.assertEqual(claimed.approval["status"], "executing") + + already_running = store.prepare_matching_tool_approval_for_execution( + tool_name="write_config_file", + arguments={"path": "/tmp/demo.ini", "content": "same"}, + audit_context={"source": "test"}, + ) + self.assertEqual(already_running.outcome, "already_executing") + assert already_running.approval is not None + self.assertEqual(already_running.approval["id"], created["id"]) + self.assertEqual(already_running.approval["status"], "executing") + + def test_claim_and_reject_do_not_both_win_same_approval(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + store = SessionStore(Path(tmp_dir) / "mc.sqlite3") + store.initialize() + + created = store.create_tool_approval( + tool_name="write_config_file", + risk="mutating_safe", + arguments={"path": "/tmp/demo.ini", "content": "same"}, + audit_context={"source": "test"}, + summary="same", + cli_command="same", + chat_command="same", + ) + approval_id = int(created["id"]) + barrier = threading.Barrier(2) + + outcomes: dict[str, dict[str, object] | None] = {} + + def claim() -> None: + barrier.wait() + outcomes["claim"] = store.claim_tool_approval(approval_id) + + def reject() -> None: + barrier.wait() + outcomes["reject"] = store.reject_tool_approval(approval_id) + + claim_thread = threading.Thread(target=claim) + reject_thread = threading.Thread(target=reject) + claim_thread.start() + reject_thread.start() + claim_thread.join() + reject_thread.join() + + successes = sum(1 for value in outcomes.values() if value is not None) + self.assertEqual(successes, 1) + final_status = store.get_tool_approval(approval_id)["status"] + self.assertIn(final_status, {"executing", "rejected"}) if __name__ == "__main__":