diff --git a/backend/app/reports/evidence_pack.py b/backend/app/reports/evidence_pack.py index f83b699..1bfe2f8 100644 --- a/backend/app/reports/evidence_pack.py +++ b/backend/app/reports/evidence_pack.py @@ -1,8 +1,11 @@ from __future__ import annotations +import json +import sqlite3 import zipfile from datetime import datetime, timezone from pathlib import Path +from typing import Any, Dict, List, Tuple from ..utils.exec import run_cmd @@ -38,7 +41,27 @@ def build_evidence_pack( gitleaks.get("stdout", ""), encoding="utf-8" ) - report_md = _render_report(project_name=project_name, job_id=job_id) + raw_outputs = { + "semgrep": semgrep.get("stdout", ""), + "osv": osv.get("stdout", ""), + "gitleaks": gitleaks.get("stdout", ""), + } + + deduped_findings, raw_total, actual_count, source = _load_deduped_findings( + job_id=job_id, raw_outputs=raw_outputs + ) + + (pack_root / "deduplicated-findings.json").write_text( + json.dumps(deduped_findings, indent=2), encoding="utf-8" + ) + + report_md = _render_report( + project_name=project_name, + job_id=job_id, + raw_total=raw_total, + deduped_count=actual_count, + source=source, + ) (pack_root / "REPORT.md").write_text(report_md, encoding="utf-8") zip_path = out_dir / f"{pack_root.name}.zip" @@ -50,17 +73,29 @@ def build_evidence_pack( return zip_path -def _render_report(project_name: str, job_id: str) -> str: +def _render_report( + project_name: str, + job_id: str, + raw_total: int, + deduped_count: int, + source: str, +) -> str: return f"""# PatchPilot Evidence Pack **Project:** {project_name} **Job ID:** {job_id} **Generated:** {datetime.now(timezone.utc).isoformat()} +## Findings summary +- **Findings after deduplication:** {deduped_count} (from {raw_total} raw) +- **Deduplication source:** {source} +- `deduplicated-findings.json` — normalized findings with `related_files` + ## What this pack contains - `raw/semgrep.json` — SAST scan results (Semgrep) - `raw/osv.json` — Dependency vulnerability results (OSV-Scanner) - `raw/gitleaks.json` — Secret detection results (Gitleaks) +- `deduplicated-findings.json` — deduplicated findings grouped for audit - This `REPORT.md` summary ## Methodology (high-level) @@ -73,3 +108,223 @@ def _render_report(project_name: str, job_id: str) -> str: - This MVP focuses on **verifiable evidence** and a clean audit trail. - For production, integrate CI gating (GitHub Actions) and curated fix templates per language/framework. """ + + +def _load_deduped_findings( + job_id: str, raw_outputs: Dict[str, str] +) -> Tuple[List[Dict[str, Any]], int, int, str]: + raw_rows = _parse_raw_findings(raw_outputs) + raw_total = len(raw_rows) + + db_rows = _load_findings_from_db(job_id) + if db_rows: + dedup_source = "database" + rows = db_rows + else: + dedup_source = "in-memory raw scan results" + rows = raw_rows + + deduped_findings = _deduplicate_findings(rows) + return deduped_findings, raw_total, len(deduped_findings), dedup_source + + +def _load_findings_from_db(job_id: str) -> List[Dict[str, Any]]: + db_path = Path(__file__).resolve().parents[1] / "patchpilot.db" + if not db_path.exists(): + return [] + + try: + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT id, rule_id, severity, category, file_path, line_number, + scanner, message, package_name, package_version + FROM findings + WHERE job_id = ? + ORDER BY created_at + """, + (job_id,), + ) + return [dict(row) for row in cursor.fetchall()] + except sqlite3.Error: + return [] + + +def _parse_raw_findings(raw_outputs: Dict[str, str]) -> List[Dict[str, Any]]: + findings: List[Dict[str, Any]] = [] + findings.extend(_parse_semgrep(raw_outputs.get("semgrep", ""))) + findings.extend(_parse_osv(raw_outputs.get("osv", ""))) + findings.extend(_parse_gitleaks(raw_outputs.get("gitleaks", ""))) + return findings + + +def _parse_semgrep(stdout: str) -> List[Dict[str, Any]]: + if not stdout: + return [] + + try: + data = json.loads(stdout) + except json.JSONDecodeError: + return [] + + results = data.get("results") if isinstance(data, dict) else data + if results is None: + return [] + + parsed: List[Dict[str, Any]] = [] + for item in results: + rule_id = item.get("check_id") or item.get("id") or item.get("rule_id") + path = item.get("path") or item.get("extra", {}).get("metadata", {}).get("filepath") + start = item.get("start", {}) or {} + line_number = start.get("line") or start.get("line_number") + message = item.get("extra", {}).get("message") or item.get("message") or "" + + parsed.append( + { + "id": item.get("id") or f"semgrep:{rule_id}:{path}:{line_number}", + "rule_id": rule_id, + "severity": item.get("extra", {}).get("metadata", {}).get("severity", "UNKNOWN"), + "category": "sast", + "file_path": path, + "line_number": line_number, + "scanner": "semgrep", + "message": message, + "package_name": None, + "package_version": None, + } + ) + return parsed + + +def _parse_osv(stdout: str) -> List[Dict[str, Any]]: + if not stdout: + return [] + + try: + data = json.loads(stdout) + except json.JSONDecodeError: + return [] + + results = data.get("results") if isinstance(data, dict) else data + if results is None: + return [] + + parsed: List[Dict[str, Any]] = [] + for item in results: + package = item.get("package", {}) or {} + parsed.append( + { + "id": item.get("id") or f"osv:{package.get('name')}", + "rule_id": item.get("id"), + "severity": item.get("severity", "UNKNOWN"), + "category": "dependency", + "file_path": None, + "line_number": None, + "scanner": "osv", + "message": item.get("details") or item.get("summary") or "", + "package_name": package.get("name"), + "package_version": package.get("version"), + } + ) + return parsed + + +def _parse_gitleaks(stdout: str) -> List[Dict[str, Any]]: + if not stdout: + return [] + + try: + results = json.loads(stdout) + except json.JSONDecodeError: + return [] + + if not isinstance(results, list): + return [] + + parsed: List[Dict[str, Any]] = [] + for item in results: + path = item.get("File") or item.get("path") or item.get("Path") + line_number = item.get("StartLine") or item.get("start_line") or item.get("line") + parsed.append( + { + "id": item.get("Rule") or f"gitleaks:{path}:{line_number}", + "rule_id": item.get("Rule"), + "severity": item.get("Severity", "UNKNOWN"), + "category": "secret", + "file_path": path, + "line_number": line_number, + "scanner": "gitleaks", + "message": item.get("Description") or item.get("Matches") or "", + "package_name": None, + "package_version": None, + } + ) + return parsed + + +def _deduplicate_findings(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + groups: Dict[Tuple[Any, ...], Dict[str, Any]] = {} + + for row in rows: + key = _dedup_key(row) + group = groups.setdefault( + key, + { + "id": row.get("id") or "", + "rule_id": row.get("rule_id"), + "severity": row.get("severity") or "UNKNOWN", + "category": row.get("category") or "unknown", + "scanner": row.get("scanner") or "unknown", + "title": row.get("rule_id") or row.get("id") or "Unknown", + "description": row.get("message") or "", + "location": _location_dict(row), + "package_name": row.get("package_name"), + "package_version": row.get("package_version"), + "related_files": [], + "row_count": 0, + }, + ) + + file_path = row.get("file_path") + if file_path and file_path not in group["related_files"]: + group["related_files"].append(file_path) + group["row_count"] += 1 + + if not group["location"] and row.get("file_path"): + group["location"] = _location_dict(row) + + deduped: List[Dict[str, Any]] = [] + for group in groups.values(): + group["related_files"] = sorted(group["related_files"]) + deduped.append(group) + + return deduped + + +def _dedup_key(row: Dict[str, Any]) -> Tuple[Any, ...]: + scanner = (row.get("scanner") or "").lower() + rule_id = row.get("rule_id") or row.get("id") or "" + category = (row.get("category") or "").lower() + + if category == "dependency" or scanner == "osv" or row.get("package_name"): + return ( + "dependency", + scanner, + rule_id, + row.get("package_name"), + row.get("package_version"), + ) + + return ("issue", scanner, category, rule_id) + + +def _location_dict(row: Dict[str, Any]) -> Dict[str, Any] | None: + path = row.get("file_path") + if not path: + return None + + location: Dict[str, Any] = {"path": path} + if row.get("line_number") is not None: + location["start_line"] = row.get("line_number") + return location diff --git a/backend/tests/test_evidence_pack.py b/backend/tests/test_evidence_pack.py new file mode 100644 index 0000000..11fe16c --- /dev/null +++ b/backend/tests/test_evidence_pack.py @@ -0,0 +1,55 @@ +import json + +from app.reports import evidence_pack + + +def test_deduplicate_findings_from_raw_outputs(): + semgrep_stdout = json.dumps( + { + "results": [ + { + "check_id": "python.django.security.audit.xss", + "path": "app/views.py", + "start": {"line": 10}, + "extra": {"message": "Possible XSS", "metadata": {"severity": "HIGH"}}, + }, + { + "check_id": "python.django.security.audit.xss", + "path": "app/helpers.py", + "start": {"line": 5}, + "extra": {"message": "Possible XSS", "metadata": {"severity": "HIGH"}}, + }, + { + "check_id": "python.django.security.audit.sql_injection", + "path": "app/query.py", + "start": {"line": 42}, + "extra": {"message": "SQL injection risk", "metadata": {"severity": "CRITICAL"}}, + }, + ] + } + ) + osv_stdout = json.dumps({"results": []}) + gitleaks_stdout = json.dumps([]) + + deduped_findings, raw_total, deduped_count, source = evidence_pack._load_deduped_findings( + job_id="missing-job", raw_outputs={ + "semgrep": semgrep_stdout, + "osv": osv_stdout, + "gitleaks": gitleaks_stdout, + }, + ) + + assert source == "in-memory raw scan results" + assert raw_total == 3 + assert deduped_count == 2 + + xss_finding = next( + f for f in deduped_findings if f["rule_id"] == "python.django.security.audit.xss" + ) + assert sorted(xss_finding["related_files"]) == ["app/helpers.py", "app/views.py"] + assert xss_finding["severity"] == "HIGH" + + sql_finding = next( + f for f in deduped_findings if f["rule_id"] == "python.django.security.audit.sql_injection" + ) + assert sql_finding["related_files"] == ["app/query.py"]