diff --git a/backend/app/delta_engine.py b/backend/app/delta_engine.py new file mode 100644 index 0000000..ba91783 --- /dev/null +++ b/backend/app/delta_engine.py @@ -0,0 +1,184 @@ +import os +import re +import json +import logging +import urllib.request +import tarfile +import tempfile +import ast + +logger = logging.getLogger(__name__) + +class DeltaAnalysisEngine: + """ + Engine to handle Delta Analysis for auditing dependency version bumps (Issue #106). + """ + def __init__(self, dependency_name: str = None, old_version: str = None, new_version: str = None): + self.dependency_name = dependency_name + self.old_version = old_version + self.new_version = new_version + + # Placeholders for collected audit data + self.added_files = [] + self.suspicious_patterns = [] + + def parse_manifest_diff(self, diff_text: str) -> bool: + """ + Step 1: Manifest Diffing. + Parses a raw diff string line to extract package version changes. + """ + try: + pattern = r"([\w\-]+):\s*([\d\.]+)\s*->\s*([\d\.]+)" + match = re.search(pattern, diff_text) + + if match: + self.dependency_name = match.group(1) + self.old_version = match.group(2) + self.new_version = match.group(3) + return True + return False + except Exception as e: + logger.error(f"Error parsing manifest diff: {str(e)}") + return False + + def fetch_upstream_package(self, version: str) -> str: + """ + Step 2: Upstream Registry Fetching & Downloading. + """ + if not self.dependency_name: + return "" + + url = f"https://pypi.org/pypi/{self.dependency_name}/{version}/json" + try: + with urllib.request.urlopen(url, timeout=5) as response: + if response.status != 200: + return "" + data = json.loads(response.read().decode()) + + tarball_url = None + for url_info in data.get("urls", []): + if url_info.get("packagetype") == "sdist": + tarball_url = url_info.get("url") + break + + if not tarball_url: + return "" + + temp_extract_dir = tempfile.mkdtemp(prefix=f"patchpilot_{self.dependency_name}_{version}_") + archive_path = os.path.join(temp_extract_dir, f"package_{version}.tar.gz") + + urllib.request.urlretrieve(tarball_url, archive_path) + + with tarfile.open(archive_path, "r:gz") as tar: + tar.extractall(path=temp_extract_dir) + + os.remove(archive_path) + return temp_extract_dir + + except Exception as e: + logger.error(f"Failed handling upstream fetching/extraction for v{version}: {str(e)}") + return "" + + def audit_code_changes(self, old_dir: str, new_dir: str): + """ + Step 3: Code Changes Audit. + Compares directory structures to identify new files, and scans code signatures. + """ + old_files = set() + new_files_map = {} + + # Walk through the old version files to establish a baseline + for root, _, files in os.walk(old_dir): + for file in files: + rel_path = os.path.relpath(os.path.join(root, file), old_dir) + # Strip the top-level folder name inside the tarball to make paths clean + clean_rel_path = "/".join(rel_path.split(os.sep)[1:]) + old_files.add(clean_rel_path) + + # Walk through the new version files to identify added files and inspect contents + for root, _, files in os.walk(new_dir): + for file in files: + full_path = os.path.join(root, file) + rel_path = os.path.relpath(full_path, new_dir) + clean_rel_path = "/".join(rel_path.split(os.sep)[1:]) + + # Check if this file didn't exist in the old version + if clean_rel_path and clean_rel_path not in old_files: + # Ignore metadata directories like egg-info or PKG-INFO + if not any(x in clean_rel_path for x in ["egg-info", "PKG-INFO", "setup.cfg"]): + self.added_files.append(clean_rel_path) + + # If it's a Python file, scan it for critical risks like eval() + if file.endswith(".py"): + self._scan_file_ast(full_path, clean_rel_path) + + def _scan_file_ast(self, real_file_path: str, clean_display_path: str): + """ + Runs an Abstract Syntax Tree (AST) scan looking for high-risk language primitives. + """ + try: + with open(real_file_path, "r", encoding="utf-8", errors="ignore") as f: + content = f.read() + + tree = ast.parse(content, filename=real_file_path) + + # Node walker to look for function calls named 'eval' + for node in ast.walk(tree): + if isinstance(node, ast.Call): + if isinstance(node.func, ast.Name) and node.func.id == "eval": + line_no = node.lineno + self.suspicious_patterns.append( + f"eval() usage in {clean_display_path}:{line_no}" + ) + except Exception as e: + # If a single file fails parsing, log it but keep scanning other files safely + logger.debug(f"Could not parse AST for {clean_display_path}: {str(e)}") + + def run_audit(self) -> dict: + """ + Main execution flow for the delta audit pipeline. + """ + if not all([self.dependency_name, self.old_version, self.new_version]): + return {"error": "Missing dependency details. Ensure manifest diffing runs successfully first."} + + try: + logger.info(f"Starting delta analysis for {self.dependency_name}: {self.old_version} -> {self.new_version}") + + old_dir = self.fetch_upstream_package(self.old_version) + new_dir = self.fetch_upstream_package(self.new_version) + + if not old_dir or not new_dir: + return {"error": "Failed to download and extract package versions from upstream registry."} + + # Execute step 3 audit logic + self.audit_code_changes(old_dir, new_dir) + + return self.generate_output_payload() + + except Exception as e: + logger.error(f"Error during delta analysis execution: {str(e)}") + return {"error": f"Analysis failed: {str(e)}"} + + def generate_output_payload(self) -> dict: + """ + Formats the final audited results into the expected output schema. + """ + return { + "supply_chain_diff": { + "dependency": self.dependency_name, + "upgrade_path": f"{self.old_version} -> {self.new_version}", + "risk_assessment": { + "added_files": self.added_files, + "suspicious_patterns_detected": self.suspicious_patterns, + "overall_risk_score": self._calculate_risk_score() + } + } + } + + def _calculate_risk_score(self) -> str: + """ + Internal helper to evaluate overall risk level. + """ + if self.suspicious_patterns: + return "elevated" + return "low" \ No newline at end of file diff --git a/backend/app/main.py b/backend/app/main.py index 81e8359..ca11ce4 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1410,3 +1410,51 @@ async def get_blast_radius(org_job_id: str): links.append({"source": repo, "target": pkg}) return {"nodes": list(nodes_dict.values()), "links": links} + +# ========================================== +# SARIF Telemetry Export Endpoint +# ========================================== +from app.sarif_translator import SarifTranslator + +@app.get("/api/export/sarif") +async def export_sarif_telemetry(): + try: + # Define path to the target triage file seen in your project root + base_dir = Path(__file__).resolve().parent.parent + semgrep_file_path = base_dir / "semgrep_out.json" + + if not semgrep_file_path.exists(): + raise HTTPException( + status_code=404, + detail="Source vulnerability triage metrics file (semgrep_out.json) not found." + ) + + # Read the raw metrics data + with open(semgrep_file_path, "r", encoding="utf-8") as f: + raw_data = json.load(f) + + # Extract the elements array (Semgrep usually nests findings under "results") + if isinstance(raw_data, dict) and "results" in raw_data: + triage_elements = raw_data["results"] + elif isinstance(raw_data, list): + triage_elements = raw_data + else: + triage_elements = [] + + # Run the translator pipeline + translator = SarifTranslator(triage_elements) + sarif_payload = translator.generate_payload() + + # Return compliance-ready payload with standard download headers + return Response( + content=json.dumps(sarif_payload, indent=2), + media_type="application/sarif+json", + headers={ + "Content-Disposition": "attachment; filename=patchpilot-report.sarif" + } + ) + + except json.JSONDecodeError: + raise HTTPException(status_code=500, detail="Invalid JSON format in source triage metrics.") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to compile SARIF telemetry: {str(e)}") \ No newline at end of file diff --git a/backend/app/sarif_translator.py b/backend/app/sarif_translator.py new file mode 100644 index 0000000..e23b4aa --- /dev/null +++ b/backend/app/sarif_translator.py @@ -0,0 +1,79 @@ +import json +from typing import List, Dict, Any + +class SarifTranslator: + def __init__(self, triage_elements: List[Dict[str, Any]]): + """ + Initializes the translator with PatchPilot's internal triage JSON array elements. + """ + self.triage_elements = triage_elements + + def _map_severity(self, internal_severity: str) -> str: + """ + Maps internal tool severities to official SARIF levels: + 'error', 'warning', 'note', or 'none'. + """ + severity_upper = str(internal_severity).upper() + if severity_upper in ["ERROR", "CRITICAL", "HIGH"]: + return "error" + elif severity_upper in ["WARNING", "MEDIUM"]: + return "warning" + elif severity_upper in ["INFO", "NOTE", "LOW"]: + return "note" + else: + return "none" + + def _translate_result(self, element: Dict[str, Any]) -> Dict[str, Any]: + """ + Translates a single PatchPilot internal element into a SARIF result item. + """ + rule_id = element.get("check_id", "patchpilot-generic-rule") + extra_data = element.get("extra", {}) + message_text = extra_data.get("message", "No message provided by tool.") + raw_severity = extra_data.get("severity", "WARNING") + + sarif_result = { + "ruleId": rule_id, + "message": { + "text": message_text + }, + "level": self._map_severity(raw_severity), + "locations": [ + { + "physicalLocation": { + "artifactLocation": { + "uri": element.get("path", "unknown_file") + }, + "region": { + "startLine": element.get("start", {}).get("line", 1), + "startColumn": element.get("start", {}).get("col", 1), + "endLine": element.get("end", {}).get("line", 1), + "endColumn": element.get("end", {}).get("col", 1) + } + } + } + ] + } + return sarif_result + + def generate_payload(self) -> Dict[str, Any]: + """ + Executes the pipeline and returns the full schema-compliant SARIF payload. + """ + sarif_results = [self._translate_result(item) for item in self.triage_elements] + + payload = { + "$schema": "https://json.schemastore.org/sarif-2.1.0.json", + "version": "2.1.0", + "runs": [ + { + "tool": { + "driver": { + "name": "PatchPilot Engine" + } + }, + "results": sarif_results + } + ] + } + return payload \ No newline at end of file diff --git a/backend/semgrep_out.json b/backend/semgrep_out.json index e69de29..a912dc4 100644 --- a/backend/semgrep_out.json +++ b/backend/semgrep_out.json @@ -0,0 +1,36 @@ +{ + "results": [ + { + "check_id": "rules.python.security.injection.sql-injection", + "path": "backend/app/main.py", + "start": { + "line": 125, + "col": 14 + }, + "end": { + "line": 125, + "col": 45 + }, + "extra": { + "message": "Remote user input directly concatenated into SQL string. Potential SQL Injection.", + "severity": "ERROR" + } + }, + { + "check_id": "rules.python.maintainability.unused-import", + "path": "backend/app/db.py", + "start": { + "line": 4, + "col": 1 + }, + "end": { + "line": 4, + "col": 13 + }, + "extra": { + "message": "Imported module 'sys' is never used.", + "severity": "INFO" + } + } + ] +} \ No newline at end of file diff --git a/backend/test_sarif.py b/backend/test_sarif.py new file mode 100644 index 0000000..d150cd7 --- /dev/null +++ b/backend/test_sarif.py @@ -0,0 +1,29 @@ +import json +from app.sarif_translator import SarifTranslator + +def run_test(): + print("šŸ”„ Reading semgrep_out.json...") + try: + with open("semgrep_out.json", "r", encoding="utf-8") as f: + raw_data = json.load(f) + + # Get the findings array + triage_elements = raw_data.get("results", []) + print(f"šŸ“‹ Found {len(triage_elements)} raw vulnerability elements.") + + # Translate + print("āš™ļø Running data through SarifTranslator pipeline...") + translator = SarifTranslator(triage_elements) + sarif_payload = translator.generate_payload() + + # Print a snippet of the result to verify structure + print("\nāœ… Success! Here is a preview of your generated SARIF payload:") + print(json.dumps(sarif_payload, indent=2)[:600] + "\n\n... [truncated preview] ...") + + except FileNotFoundError: + print("āŒ Error: Could not find 'semgrep_out.json' in the backend root directory.") + except Exception as e: + print(f"āŒ Test failed with error: {str(e)}") + +if __name__ == "__main__": + run_test() \ No newline at end of file