From e95e483b14764dbe49db198ec1f1d22e82aaff40 Mon Sep 17 00:00:00 2001 From: ndjama Date: Sun, 7 Jun 2026 23:42:00 +0200 Subject: [PATCH] feat: test-to-code mapping tools + cgh impact CI command Add two MCP tools and a CI-oriented CLI command, all computed on the fly from existing IMPORTS / CALLS edges plus File.role. No new edge type, no schema change. FEAT-3 (server/tools_tests.py): - tests_for(symbol_or_file): test files that import the target (or, for a symbol, whose functions call it). Returns the inferred mapping with a heuristic note, federated across parent + subrepos. - untested(role, layer): non-test source files no test imports, filtered by role/layer, capped at 200 with a truncation note. FEAT-10 (cli/commands_impact.py): `cgh impact --since ` for PR bots. Diffs the working tree against a git ref, reads the graph read-only (no MCP owner needed), and emits the changed symbols, the IMPORTS blast radius grouped by role/layer, endpoints touched, and tests to run. JSON on clean stdout (--json / --format json) or a markdown PR-comment summary (--format md). Degrades gracefully when the index is missing or the graph is locked. Banner and notes go to stderr. Shared logic lives in analysis/impact.py (pure GraphDB-protocol helpers: tests_for, untested_files, reverse_import_bfs, symbols_in_file, endpoints_in_files) so the MCP tools and the CLI stay in lockstep. --- codegraph/__main__.py | 25 ++ codegraph/analysis/impact.py | 295 ++++++++++++++++++++++++ codegraph/cli/commands_impact.py | 278 ++++++++++++++++++++++ codegraph/server/__init__.py | 2 + codegraph/server/tools_tests.py | 165 +++++++++++++ tests/test_cli/test_impact.py | 107 +++++++++ tests/test_server/test_tests_mapping.py | 121 ++++++++++ 7 files changed, 993 insertions(+) create mode 100644 codegraph/analysis/impact.py create mode 100644 codegraph/cli/commands_impact.py create mode 100644 codegraph/server/tools_tests.py create mode 100644 tests/test_cli/test_impact.py create mode 100644 tests/test_server/test_tests_mapping.py diff --git a/codegraph/__main__.py b/codegraph/__main__.py index 3b6ee19..a14fd46 100644 --- a/codegraph/__main__.py +++ b/codegraph/__main__.py @@ -24,6 +24,7 @@ from codegraph.cli.commands_graph import cmd_add_dir, cmd_graph, register_graph_parser from codegraph.cli.commands_ensurepath import cmd_ensurepath from codegraph.cli.commands_githooks import cmd_githooks +from codegraph.cli.commands_impact import cmd_impact from codegraph.cli.commands_index import ( cmd_force_index, cmd_index, @@ -112,6 +113,7 @@ def _print_help(): ("logs", "View MCP tool call history"), ("history", "Recent indexing activity grouped by day"), ("diff", "Files changed since last index"), + ("impact", "CI: blast radius + tests for a PR diff (JSON/md)"), ("parsers", "List registered language parsers"), ], ), @@ -470,6 +472,28 @@ def _add_root(p) -> None: "--since", default="HEAD", help="Git ref to diff against (default: HEAD)" ) + # --- impact (CI mode: blast radius + tests for a PR diff) --- + p = sub.add_parser( + "impact", + help="CI: blast radius + tests for files changed since a git ref", + ) + _add_root(p) + p.add_argument( + "--since", + default="HEAD~1", + help="Git ref to diff the working tree against (default: HEAD~1)", + ) + p.add_argument( + "--json", action="store_true", help="Emit JSON (shorthand for --format json)" + ) + p.add_argument( + "--format", + choices=["md", "json"], + default="md", + help="Output format: md (PR comment) or json (default: md). " + "The graph index should be fresh: run `cgh index` first in CI.", + ) + # --- history --- p = sub.add_parser("history", help="Show recent indexing activity by day") _add_root(p) @@ -590,6 +614,7 @@ def _add_root(p) -> None: "outline": cmd_outline, "doctor": cmd_doctor, "diff": cmd_diff, + "impact": cmd_impact, "history": cmd_history, "compact": cmd_compact, "graph": cmd_graph, diff --git a/codegraph/analysis/impact.py b/codegraph/analysis/impact.py new file mode 100644 index 0000000..463f1ea --- /dev/null +++ b/codegraph/analysis/impact.py @@ -0,0 +1,295 @@ +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# __creation__ = 2026-06-07 +# __author__ = "jndjama (Joy Ndjama)" +# __copyright__ = "Copyright 2026 ALTIKVA." +# __licence__ = "MIT & CC BY-NC-SA (http://www.altikva.com/licenses/LICENSE-1.0)" +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# Description: Pure GraphDB-protocol helpers shared by the test-mapping MCP +# tools (tests_for / untested) and the `cgh impact` CI command. +# Computes test-to-code mapping on the fly from IMPORTS + CALLS +# edges plus File.role, with no new edge type, plus a bounded +# reverse-BFS over IMPORTS for blast radius. Backend-neutral: +# every call goes through the GraphDB protocol, no raw SQL. + +from __future__ import annotations + +from typing import Any + +# Hard caps so a pathological graph never produces an unbounded result. +TEST_ROLE = "test" +_FANOUT_CAP = 500 +_REVERSE_CAP = 300 + + +def _is_test_role(role: str | None) -> bool: + """A File node is a test when roles.classify tagged it `test`.""" + return (role or "") == TEST_ROLE + + +def file_role(conn: Any, file_path: str) -> tuple[str, str]: + """Return (role, layer) for a File node, or ("", "") when absent.""" + rows = conn.find_nodes( + "File", + where={"path": file_path}, + return_fields=["role", "layer"], + limit=1, + ) + if not rows: + return "", "" + return rows[0].get("role") or "", rows[0].get("layer") or "" + + +def resolve_target_file(conn: Any, target: str) -> str | None: + """Resolve a symbol-or-file argument to a defining File path. + + - If ``target`` is itself a File node path, return it. + - Else treat it as a Function / Class name and return the path of the + first defining file found. + + Returns None when nothing matches. + """ + hits = conn.find_nodes("File", where={"path": target}, limit=1) + if hits: + return target + for label in ("Function", "Class"): + rows = conn.find_nodes( + label, + where={"name": target}, + return_fields=["file_path"], + limit=1, + ) + if rows and rows[0].get("file_path"): + return rows[0]["file_path"] + return None + + +def tests_for_file(conn: Any, target_file: str) -> list[dict[str, str]]: + """Test files that import ``target_file`` directly. + + Inferred heuristic: a test file is a File node whose role is `test`, and + that has an IMPORTS edge into the target file. Returns + ``[{"file", "role"}]`` (de-duplicated, order-preserving). + """ + seen: set[str] = set() + out: list[dict[str, str]] = [] + for row in conn.find_neighbors( + "IMPORTS", + dst_key=target_file, + return_src=["path", "role"], + limit=_FANOUT_CAP, + ): + path = row.get("src_path") + role = row.get("src_role") or "" + if not path or path in seen: + continue + if not _is_test_role(role): + continue + seen.add(path) + out.append({"file": path, "role": role}) + return out + + +def tests_calling_symbol(conn: Any, symbol: str) -> list[dict[str, str]]: + """Test files that hold a function whose CALLS reach ``symbol``. + + Inferred heuristic: find Function nodes named ``symbol``, walk CALLS + backward one hop, and keep callers that live in a `test`-role file. + CALLS edges are same-file-scoped (per BUG-2) and name-matched, so this is + a candidate set, not ground truth. Returns ``[{"file", "role"}]``. + """ + target_ids = [ + r["id"] + for r in conn.find_nodes( + "Function", where={"name": symbol}, return_fields=["id"] + ) + ] + seen: set[str] = set() + out: list[dict[str, str]] = [] + for tid in target_ids: + for row in conn.find_neighbors( + "CALLS", + dst_key=tid, + return_src=["id"], + limit=_FANOUT_CAP, + ): + caller_id = row.get("src_id") or "" + caller_file = caller_id.rsplit("::", 1)[0] if "::" in caller_id else "" + if not caller_file or caller_file in seen: + continue + role, _ = file_role(conn, caller_file) + if not _is_test_role(role): + continue + seen.add(caller_file) + out.append({"file": caller_file, "role": role}) + return out + + +def tests_for(conn: Any, target: str) -> dict[str, Any]: + """Full test-to-code mapping for one scope. + + Resolves ``target`` to a defining file, collects importing test files, + and (when ``target`` is a symbol) test files whose calls reach it. + Returns ``{target, target_file, tests: [{file, role}], count}``. When the + target cannot be resolved, ``target_file`` is None and ``tests`` is empty. + """ + target_file = resolve_target_file(conn, target) + if target_file is None: + return {"target": target, "target_file": None, "tests": [], "count": 0} + + seen: set[str] = set() + tests: list[dict[str, str]] = [] + for t in tests_for_file(conn, target_file): + if t["file"] in seen: + continue + seen.add(t["file"]) + tests.append(t) + + # When the argument named a symbol (not the file itself), also follow + # the call graph from that symbol into test functions. + if target != target_file: + for t in tests_calling_symbol(conn, target): + if t["file"] in seen: + continue + seen.add(t["file"]) + tests.append(t) + + return { + "target": target, + "target_file": target_file, + "tests": tests, + "count": len(tests), + } + + +def untested_files( + conn: Any, + role: str = "", + layer: str = "", + cap: int = 200, +) -> tuple[list[dict[str, str]], bool]: + """Non-test source files that no test file imports. + + Walks every File node, skips test / doc files, applies the optional + role / layer filter, and keeps those with no `test`-role importer. + Returns ``(rows, truncated)`` where each row is ``{file, role, layer}``. + """ + where: dict[str, Any] = {} + if role: + where["role"] = role + if layer: + where["layer"] = layer + + files = conn.find_nodes( + "File", + where=where or None, + return_fields=["path", "role", "layer"], + order_by=["path"], + ) + + out: list[dict[str, str]] = [] + truncated = False + for f in files: + path = f.get("path") + frole = f.get("role") or "" + flayer = f.get("layer") or "" + if not path: + continue + # Never report test or doc files as "untested". + if frole in (TEST_ROLE, "doc"): + continue + if tests_for_file(conn, path): + continue + out.append({"file": path, "role": frole, "layer": flayer}) + if len(out) >= cap: + truncated = True + break + return out, truncated + + +def reverse_import_bfs( + conn: Any, + start_files: list[str], + max_depth: int = 3, +) -> tuple[list[str], bool]: + """Bounded reverse BFS over IMPORTS: every file that transitively imports + any of ``start_files`` within ``max_depth`` hops. + + Returns ``(ordered_file_paths, truncated)``. ``start_files`` themselves are + not included in the result. Caps both the per-node fan-out and the total + result size so a hub file cannot blow up the walk. + """ + seen: set[str] = set(start_files) + frontier = list(start_files) + ordered: list[str] = [] + truncated = False + depth = 0 + while frontier and depth < max(1, int(max_depth)): + depth += 1 + nxt: list[str] = [] + for key in frontier: + rows = conn.find_neighbors( + "IMPORTS", + dst_key=key, + return_src=["path"], + limit=_FANOUT_CAP, + ) + if len(rows) >= _FANOUT_CAP: + truncated = True + for r in rows: + src = r.get("src_path") + if not src or src in seen: + continue + seen.add(src) + ordered.append(src) + nxt.append(src) + if len(ordered) >= _REVERSE_CAP: + return ordered, True + frontier = nxt + return ordered, truncated + + +def symbols_in_file(conn: Any, file_path: str) -> list[dict[str, str]]: + """Functions and classes defined in ``file_path``. + + Returns ``[{name, kind, lines}]`` ordered by start line. Used by the + impact command to report which symbols actually changed in a diff. + """ + out: list[dict[str, str]] = [] + for label, kind in (("Function", "function"), ("Class", "class")): + for s in conn.find_nodes( + label, + where={"file_path": file_path}, + return_fields=["name", "start_line", "end_line"], + order_by=["start_line"], + ): + out.append( + { + "name": s.get("name", ""), + "kind": kind, + "lines": f"{s.get('start_line', '')}-{s.get('end_line', '')}", + } + ) + return out + + +def endpoints_in_files(conn: Any, files: list[str]) -> list[dict[str, str]]: + """Endpoints declared (DEFINES_ENDPOINT) in any of ``files``. + + Returns ``[{file, method, path}]``, de-duplicated. + """ + seen: set[tuple[str, str, str]] = set() + out: list[dict[str, str]] = [] + for fp in files: + for e in conn.find_neighbors( + "DEFINES_ENDPOINT", + src_key=fp, + return_dst=["method", "path"], + ): + method = e.get("dst_method", "") or "" + path = e.get("dst_path", "") or "" + key = (fp, method, path) + if key in seen: + continue + seen.add(key) + out.append({"file": fp, "method": method, "path": path}) + return out diff --git a/codegraph/cli/commands_impact.py b/codegraph/cli/commands_impact.py new file mode 100644 index 0000000..538385f --- /dev/null +++ b/codegraph/cli/commands_impact.py @@ -0,0 +1,278 @@ +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# __creation__ = 2026-06-07 +# __author__ = "jndjama (Joy Ndjama)" +# __copyright__ = "Copyright 2026 ALTIKVA." +# __licence__ = "MIT & CC BY-NC-SA (http://www.altikva.com/licenses/LICENSE-1.0)" +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# Description: `cgh impact --since ` CI command for PR bots. Diffs the +# working tree against a git ref, then reads the graph read-only +# to report changed symbols, the IMPORTS blast radius grouped by +# role / layer, endpoints touched, and tests to run. Emits JSON +# (machine-parseable on stdout) or a markdown PR-comment summary. +# Runs without an MCP owner: opens the graph DB read-only and +# degrades gracefully when the index is missing or stale. + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +from pathlib import Path + +from rich.console import Console + +from codegraph.cli import LOGO + +# Banner + notes go to stderr so stdout stays a clean JSON / markdown stream +# that a PR bot can pipe and parse. +_err = Console(stderr=True) + + +def _git_changed_files(root: str, since: str) -> tuple[list[str], str | None]: + """Return (changed_files, error). Diffs the working tree against ``since``. + + Mirrors the validation in tools_index.index_changed_files: a leading dash + is rejected so a value like "--output=/x" cannot be read as a git flag, + and the trailing "--" keeps the ref from being parsed as a pathspec. + """ + if since.startswith("-"): + return [], f"invalid git ref: {since!r}" + cmd = [ + "git", + "diff", + "--name-only", + "--diff-filter=ACMR", + f"{since}...", + "--", + ] + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + cwd=root, + timeout=30, + ) + except Exception as exc: + return [], f"git diff failed: {exc}" + if result.returncode != 0: + msg = (result.stderr or "").strip() or f"git diff exited {result.returncode}" + return [], f"git diff failed: {msg}" + files = [ + f.strip() + for f in result.stdout.strip().splitlines() + if f.strip() and not f.strip().startswith(".codegraph/") + ] + return files, None + + +def _build_report(conn, root: str, changed_files: list[str]) -> dict: + """Assemble the impact report from the graph for the changed files. + + Uses the shared analysis helpers so the CLI and the MCP tools stay in + lockstep. All paths returned to the caller are repo-relative. + """ + from codegraph.analysis import impact as _impact + + root_path = Path(root).resolve() + + def _rel(p: str) -> str: + try: + return str(Path(p).resolve().relative_to(root_path)) + except (ValueError, OSError): + return p + + # Changed files resolve to absolute File-node keys for graph lookups. + abs_changed = [str((root_path / f)) for f in changed_files] + + changed_symbols: list[dict] = [] + for abs_f, rel_f in zip(abs_changed, changed_files): + for sym in _impact.symbols_in_file(conn, abs_f): + changed_symbols.append({"file": rel_f, **sym}) + + # Blast radius: files that transitively import any changed file. + radius, radius_trunc = _impact.reverse_import_bfs(conn, abs_changed, max_depth=3) + + impacted: list[dict] = [] + by_role: dict[str, int] = {} + by_layer: dict[str, int] = {} + for abs_p in radius: + role, layer = _impact.file_role(conn, abs_p) + impacted.append({"file": _rel(abs_p), "role": role, "layer": layer}) + if role: + by_role[role] = by_role.get(role, 0) + 1 + if layer: + by_layer[layer] = by_layer.get(layer, 0) + 1 + + # Endpoints declared in the changed files OR any impacted file. + endpoint_scope = abs_changed + radius + endpoints = [ + {"file": _rel(e["file"]), "method": e["method"], "path": e["path"]} + for e in _impact.endpoints_in_files(conn, endpoint_scope) + ] + + # Tests to run: for each changed file, the test files that exercise it. + test_seen: set[str] = set() + tests: list[dict] = [] + for abs_f in abs_changed: + for t in _impact.tests_for_file(conn, abs_f): + rel_t = _rel(t["file"]) + if rel_t in test_seen: + continue + test_seen.add(rel_t) + tests.append({"file": rel_t, "role": t["role"]}) + + return { + "since_changed": changed_files, + "changed_symbols": changed_symbols, + "impacted": impacted, + "impacted_count": len(impacted), + "impacted_by_role": by_role, + "impacted_by_layer": by_layer, + "endpoints": endpoints, + "tests_to_run": tests, + "truncated": radius_trunc, + "note": ( + "Blast radius and tests are inferred from IMPORTS / CALLS edges, " + "not a coverage run. Keep the index fresh with `cgh index` in CI." + ), + } + + +def _render_markdown(report: dict, since: str) -> str: + """Render the report as a PR-comment-friendly markdown summary.""" + lines: list[str] = [] + lines.append(f"## cgh impact (since `{since}`)") + lines.append("") + + changed = report["since_changed"] + lines.append(f"**Changed files ({len(changed)})**") + if changed: + for f in changed: + lines.append(f"- `{f}`") + else: + lines.append("- _none_") + lines.append("") + + impacted = report["impacted"] + lines.append(f"**Impacted files ({report['impacted_count']})**") + if impacted: + # Group by layer for a compact read. + by_layer: dict[str, list[dict]] = {} + for row in impacted: + by_layer.setdefault(row.get("layer") or "other", []).append(row) + for layer in sorted(by_layer): + rows = by_layer[layer] + lines.append(f"- _{layer}_ ({len(rows)})") + for row in rows[:25]: + role = row.get("role") or "" + suffix = f" `{role}`" if role else "" + lines.append(f" - `{row['file']}`{suffix}") + if len(rows) > 25: + lines.append(f" - _... {len(rows) - 25} more_") + else: + lines.append("- _none_") + lines.append("") + + endpoints = report["endpoints"] + lines.append(f"**Endpoints touched ({len(endpoints)})**") + if endpoints: + for e in endpoints: + method = e.get("method") or "?" + lines.append(f"- `{method} {e.get('path', '')}` ({e['file']})") + else: + lines.append("- _none_") + lines.append("") + + tests = report["tests_to_run"] + lines.append(f"**Tests to run ({len(tests)})**") + if tests: + for t in tests: + lines.append(f"- `{t['file']}`") + else: + lines.append("- _no importing tests found_") + lines.append("") + + if report.get("truncated"): + lines.append("> Note: blast radius was truncated (large graph).") + lines.append("") + lines.append(f"> {report['note']}") + return "\n".join(lines) + + +def cmd_impact(args: argparse.Namespace) -> None: + """Handler for `cgh impact`. Non-MCP, CI-oriented: diffs against a ref, + reads the graph read-only, and emits JSON or markdown.""" + root = os.path.abspath(args.root) + since = getattr(args, "since", "HEAD~1") or "HEAD~1" + + # --json is shorthand for --format json; default format is markdown. + fmt = getattr(args, "format", "md") or "md" + if getattr(args, "json", False): + fmt = "json" + want_json = fmt == "json" + + # Banner to stderr only, never pollute the JSON / markdown on stdout. + _err.print(LOGO) + _err.print( + "[dim]impact: diffing against " + f"[/dim][cyan]{since}[/cyan][dim], reading graph read-only. " + "Keep the index fresh with [/dim][cyan]cgh index[/cyan][dim] in CI.[/dim]\n" + ) + + if not (Path(root) / ".codegraph").is_dir(): + _fail( + want_json, + "repo is not indexed by cgh (.codegraph/ missing). " + "Run `cgh init` then `cgh index`.", + ) + return + + changed, err = _git_changed_files(root, since) + if err is not None: + _fail(want_json, err) + return + + # Open the graph read-only directly, no MCP owner required. When an owner + # holds the write lock, get_readonly_connection returns None; tell the + # caller clearly rather than emitting a misleading empty report. + from codegraph.core.db import get_readonly_connection + + conn = None + try: + conn = get_readonly_connection(root) + except Exception as exc: + _fail(want_json, f"could not open graph read-only: {exc}") + return + + if conn is None: + _fail( + want_json, + "graph DB is locked (an MCP owner is running) or missing. " + "Stop the owner with `cgh serve --stop`, or run this in CI where " + "no owner is alive.", + ) + return + + report = _build_report(conn, root, changed) + report["since"] = since + + if want_json: + # Clean machine-parseable stdout. + print(json.dumps(report, indent=2)) + else: + print(_render_markdown(report, since)) + + +def _fail(want_json: bool, message: str) -> None: + """Emit a graceful error. JSON mode keeps stdout parseable with an + {"error": ...} object; markdown mode writes the note to stderr.""" + if want_json: + print(json.dumps({"error": message}, indent=2)) + else: + _err.print(f"[yellow]{message}[/yellow]") + sys.exit(1) diff --git a/codegraph/server/__init__.py b/codegraph/server/__init__.py index 372990f..73cacf9 100644 --- a/codegraph/server/__init__.py +++ b/codegraph/server/__init__.py @@ -194,11 +194,13 @@ def _short_path(path: str) -> str: from codegraph.server.tools_meta import register as _register_meta # noqa: E402 from codegraph.server.tools_plans import register as _register_plans # noqa: E402 from codegraph.server.tools_query import register as _register_query # noqa: E402 +from codegraph.server.tools_tests import register as _register_tests # noqa: E402 from codegraph.server.tools_viz import register as _register_viz # noqa: E402 _register_arch(mcp) # architecture_overview, domain_map, endpoints, use FIRST _register_query(mcp) _register_insight(mcp) # file_summary, impact_of, path_between, import_cycles +_register_tests(mcp) # tests_for, untested _register_docs(mcp) _register_index(mcp) _register_viz(mcp) diff --git a/codegraph/server/tools_tests.py b/codegraph/server/tools_tests.py new file mode 100644 index 0000000..57af84d --- /dev/null +++ b/codegraph/server/tools_tests.py @@ -0,0 +1,165 @@ +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# __creation__ = 2026-06-07 +# __author__ = "jndjama (Joy Ndjama)" +# __copyright__ = "Copyright 2026 ALTIKVA." +# __licence__ = "MIT & CC BY-NC-SA (http://www.altikva.com/licenses/LICENSE-1.0)" +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# Description: Test-to-code mapping MCP tools, computed on the fly from the +# existing IMPORTS / CALLS edges plus File.role. No TESTS edge +# type and no schema change. tests_for(symbol_or_file) surfaces +# the test files that exercise a target; untested(role, layer) +# lists source files no test imports. Both federate across +# parent + subrepos and return JSON strings. + +from __future__ import annotations + +import json +import os + +from codegraph.analysis import impact as _impact + +# Cap untested output so a large repo cannot produce an unbounded list. +_UNTESTED_CAP = 200 + +# Shared caveat: this mapping is inferred from import / call edges, not from +# running a coverage tool. Keep the wording in one place. +_INFER_NOTE = ( + "Inferred from IMPORTS / CALLS edges plus File.role, not from a coverage " + "run. A test counts if it imports the target file (or, for a symbol, calls " + "it). Treat as a heuristic, not ground truth." +) + + +def register(mcp) -> None: + """Register the test-mapping tools on the given FastMCP instance.""" + import codegraph.server as _srv + from codegraph.analysis.federation import federate_flat + from codegraph.server import _get_conn, _logged_tool + + def _federate(query_fn): + """Parent + federated children fan-out, flattened.""" + return federate_flat(_get_conn, _srv._root, query_fn) + + def _abs(path: str) -> str: + """Resolve a repo-relative path against the parent root.""" + if not os.path.isabs(path) and _srv._root: + return str(_srv._root / path) + return path + + @mcp.tool() + @_logged_tool + def tests_for(symbol_or_file: str) -> str: + """ + Find the test files that exercise a target symbol or file. Resolves + the argument to a defining File node, then reports test files (role + `test`) that IMPORTS-> that file, plus, when the target is a symbol, + test files whose functions CALLS-reach it. + + Args: + symbol_or_file: a function / class name, or a repo-relative / + absolute file path. + + Returns JSON `{target, tests: [{file, role, scope}], count, note}`. + This is an inferred import/call heuristic, NOT a coverage tool: see + `note`. Federated across parent + subrepos; each test row carries a + `scope` tag. + """ + # Path-like args resolve against the parent root for the File lookup. + looks_path = ( + "/" in symbol_or_file + or "\\" in symbol_or_file + or (os.path.splitext(symbol_or_file)[1] != "") + ) + arg = _abs(symbol_or_file) if looks_path else symbol_or_file + + def query(conn): + res = _impact.tests_for(conn, arg) + return list(res["tests"]) + + results, warnings = _federate(query) + + seen: set[tuple[str, str]] = set() + tests: list[dict] = [] + for row in results: + scope = row.get("scope", "parent") + key = (scope, row.get("file", "")) + if key in seen: + continue + seen.add(key) + tests.append( + { + "file": row.get("file", ""), + "role": row.get("role", ""), + "scope": scope, + } + ) + + payload: dict = { + "target": symbol_or_file, + "tests": tests, + "count": len(tests), + "note": _INFER_NOTE, + } + if warnings: + payload["partial"] = True + payload["warnings"] = warnings + return json.dumps(payload, indent=2) + + @mcp.tool() + @_logged_tool + def untested(role: str = "", layer: str = "") -> str: + """ + List non-test source files that NO test file imports. Optionally + filter by File.role (e.g. "service", "router") or File.layer (e.g. + "application", "domain"). Test and doc files are never reported. + + Args: + role: optional File.role filter (exact match). + layer: optional File.layer filter (exact match). + + Returns JSON `{untested: [{file, role, layer, scope}], count, note}`, + capped at 200 with a truncation note. Inferred from import edges, not + coverage (see `note`). Federated across parent + subrepos. + """ + + def query(conn): + rows, _trunc = _impact.untested_files( + conn, role=role, layer=layer, cap=_UNTESTED_CAP + ) + if _trunc and rows: + rows[-1] = {**rows[-1], "_trunc": True} + return rows + + results, warnings = _federate(query) + + truncated = False + untested_rows: list[dict] = [] + for row in results: + if row.pop("_trunc", False): + truncated = True + scope = row.get("scope", "parent") + untested_rows.append( + { + "file": row.get("file", ""), + "role": row.get("role", ""), + "layer": row.get("layer", ""), + "scope": scope, + } + ) + + if len(untested_rows) > _UNTESTED_CAP: + untested_rows = untested_rows[:_UNTESTED_CAP] + truncated = True + + payload: dict = { + "untested": untested_rows, + "count": len(untested_rows), + "note": _INFER_NOTE, + } + if truncated: + payload["truncated"] = True + payload["truncation_note"] = f"capped at {_UNTESTED_CAP} files per scope" + if warnings: + payload["partial"] = True + payload["warnings"] = warnings + return json.dumps(payload, indent=2) diff --git a/tests/test_cli/test_impact.py b/tests/test_cli/test_impact.py new file mode 100644 index 0000000..22c6bd1 --- /dev/null +++ b/tests/test_cli/test_impact.py @@ -0,0 +1,107 @@ +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# __creation__ = 2026-06-07 +# __author__ = "jndjama (Joy Ndjama)" +# __copyright__ = "Copyright 2026 ALTIKVA." +# __licence__ = "MIT & CC BY-NC-SA (http://www.altikva.com/licenses/LICENSE-1.0)" +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# Description: CLI tests for `cgh impact`. Builds a tmp git repo, indexes it, +# changes a file in a second commit, then runs cmd_impact (JSON +# mode) and asserts stdout is valid JSON listing the changed +# file. Git identity is pinned with -c so the test is +# deterministic and does not depend on the host config. + +from __future__ import annotations + +import argparse +import json +import subprocess + +import pytest + +from codegraph.cli.commands_impact import cmd_impact +from codegraph.core.db import reset_connection +from codegraph.indexer import index_repo + + +def _git(root, *args): + subprocess.run( + [ + "git", + "-c", + "user.name=Test", + "-c", + "user.email=test@example.com", + "-c", + "commit.gpgsign=false", + *args, + ], + cwd=str(root), + check=True, + capture_output=True, + text=True, + ) + + +@pytest.fixture +def impact_repo(tmp_path): + """A git repo with two commits; the second changes app.py.""" + root = tmp_path + _git(root, "init") + (root / "lib.py").write_text("def helper():\n return 1\n", encoding="utf-8") + (root / "app.py").write_text( + "import lib\n\n\ndef run():\n return lib.helper()\n", encoding="utf-8" + ) + _git(root, "add", "-A") + _git(root, "commit", "-m", "initial") + + # Second commit changes app.py so HEAD~1 diff yields it. + (root / "app.py").write_text( + "import lib\n\n\ndef run():\n return lib.helper() + 1\n", encoding="utf-8" + ) + _git(root, "add", "-A") + _git(root, "commit", "-m", "tweak app") + + reset_connection() + index_repo(str(root)) + reset_connection() + + yield root + + reset_connection() + + +def test_impact_json_lists_changed_file(impact_repo, capsys): + root = impact_repo + args = argparse.Namespace(root=str(root), since="HEAD~1", json=True, format="md") + cmd_impact(args) + + captured = capsys.readouterr() + # stdout must be valid JSON (stderr carries the banner / notes). + report = json.loads(captured.out) + + assert report["since"] == "HEAD~1" + assert any(f.endswith("app.py") for f in report["since_changed"]) + # Report keys a PR bot relies on are present. + for key in ("impacted", "endpoints", "tests_to_run", "changed_symbols"): + assert key in report + + +def test_impact_missing_index_fails_cleanly(tmp_path, capsys): + # No .codegraph/ -> graceful JSON error, exit 1. + _git(tmp_path, "init") + (tmp_path / "x.py").write_text("y = 1\n", encoding="utf-8") + _git(tmp_path, "add", "-A") + _git(tmp_path, "commit", "-m", "c1") + (tmp_path / "x.py").write_text("y = 2\n", encoding="utf-8") + _git(tmp_path, "add", "-A") + _git(tmp_path, "commit", "-m", "c2") + + args = argparse.Namespace( + root=str(tmp_path), since="HEAD~1", json=True, format="md" + ) + with pytest.raises(SystemExit) as exc: + cmd_impact(args) + assert exc.value.code == 1 + + report = json.loads(capsys.readouterr().out) + assert "error" in report diff --git a/tests/test_server/test_tests_mapping.py b/tests/test_server/test_tests_mapping.py new file mode 100644 index 0000000..03ed881 --- /dev/null +++ b/tests/test_server/test_tests_mapping.py @@ -0,0 +1,121 @@ +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# __creation__ = 2026-06-07 +# __author__ = "jndjama (Joy Ndjama)" +# __copyright__ = "Copyright 2026 ALTIKVA." +# __licence__ = "MIT & CC BY-NC-SA (http://www.altikva.com/licenses/LICENSE-1.0)" +# -#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-#-# +# Description: Tests for the test-to-code mapping MCP tools (tests_for / +# untested). Builds a tiny indexed repo with a source file and a +# test_*.py that imports it (so roles.classify tags the test +# file `test`), then asserts tests_for finds the test and +# untested lists an un-imported source file. + +from __future__ import annotations + +import json + +import pytest + +import codegraph.server as _srv +from codegraph.core.db import reset_connection +from codegraph.indexer import index_file +from codegraph.server.tools_tests import register as register_tests + + +class _FakeMcp: + """Minimal stand-in for FastMCP: .tool() records the decorated function + unchanged so tests can call the tool bodies directly.""" + + def __init__(self) -> None: + self.tools: dict = {} + + def tool(self): + def deco(fn): + self.tools[fn.__name__] = fn + return fn + + return deco + + +@pytest.fixture +def mapping_root(tmp_path): + reset_connection() + _srv._root = tmp_path.resolve() + _srv._conn = None + + yield tmp_path.resolve() + + reset_connection() + _srv._root = None + _srv._conn = None + + +def _register() -> dict: + m = _FakeMcp() + register_tests(m) + return m.tools + + +def _build_repo(root): + """A tested module (mymod), its importing test, and an untested module.""" + (root / "mymod.py").write_text("def widget():\n return 1\n", encoding="utf-8") + (root / "test_mymod.py").write_text( + "import mymod\n\n\ndef test_widget():\n assert mymod.widget() == 1\n", + encoding="utf-8", + ) + (root / "lonely.py").write_text("def orphan():\n return 2\n", encoding="utf-8") + index_file(root / "mymod.py", root) + index_file(root / "test_mymod.py", root) + index_file(root / "lonely.py", root) + + +def test_tests_for_finds_importing_test(mapping_root): + root = mapping_root + _build_repo(root) + + tools = _register() + out = json.loads(tools["tests_for"](str(root / "mymod.py"))) + + assert out["count"] >= 1 + files = {t["file"] for t in out["tests"]} + assert any(f.endswith("test_mymod.py") for f in files) + # Every reported test carries the `test` role. + assert all(t["role"] == "test" for t in out["tests"]) + assert "note" in out + + +def test_tests_for_accepts_relative_path(mapping_root): + root = mapping_root + _build_repo(root) + + tools = _register() + out = json.loads(tools["tests_for"]("mymod.py")) + files = {t["file"] for t in out["tests"]} + assert any(f.endswith("test_mymod.py") for f in files) + + +def test_tests_for_by_symbol_name(mapping_root): + root = mapping_root + _build_repo(root) + + tools = _register() + out = json.loads(tools["tests_for"]("widget")) + files = {t["file"] for t in out["tests"]} + assert any(f.endswith("test_mymod.py") for f in files) + + +def test_untested_lists_unimported_source(mapping_root): + root = mapping_root + _build_repo(root) + + tools = _register() + out = json.loads(tools["untested"]()) + + files = {u["file"] for u in out["untested"]} + # lonely.py has no importing test -> untested. + assert any(f.endswith("lonely.py") for f in files) + # mymod.py IS imported by a test -> not untested. + assert not any(f.endswith("mymod.py") for f in files) + # Test files are never reported as untested. + assert not any(f.endswith("test_mymod.py") for f in files) + assert "note" in out