Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions backend/app/delta_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import os
import re
import json
import logging
import urllib.request
import tarfile
import tempfile
import ast

logger = logging.getLogger(__name__)

class DeltaAnalysisEngine:
"""
Engine to handle Delta Analysis for auditing dependency version bumps (Issue #106).
"""
def __init__(self, dependency_name: str = None, old_version: str = None, new_version: str = None):
self.dependency_name = dependency_name
self.old_version = old_version
self.new_version = new_version

# Placeholders for collected audit data
self.added_files = []
self.suspicious_patterns = []

def parse_manifest_diff(self, diff_text: str) -> bool:
"""
Step 1: Manifest Diffing.
Parses a raw diff string line to extract package version changes.
"""
try:
pattern = r"([\w\-]+):\s*([\d\.]+)\s*->\s*([\d\.]+)"
match = re.search(pattern, diff_text)

if match:
self.dependency_name = match.group(1)
self.old_version = match.group(2)
self.new_version = match.group(3)
return True
return False
except Exception as e:
logger.error(f"Error parsing manifest diff: {str(e)}")
return False

def fetch_upstream_package(self, version: str) -> str:
"""
Step 2: Upstream Registry Fetching & Downloading.
"""
if not self.dependency_name:
return ""

url = f"https://pypi.org/pypi/{self.dependency_name}/{version}/json"
try:
with urllib.request.urlopen(url, timeout=5) as response:
if response.status != 200:
return ""
data = json.loads(response.read().decode())

tarball_url = None
for url_info in data.get("urls", []):
if url_info.get("packagetype") == "sdist":
tarball_url = url_info.get("url")
break

if not tarball_url:
return ""

temp_extract_dir = tempfile.mkdtemp(prefix=f"patchpilot_{self.dependency_name}_{version}_")
archive_path = os.path.join(temp_extract_dir, f"package_{version}.tar.gz")

urllib.request.urlretrieve(tarball_url, archive_path)

with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(path=temp_extract_dir)

os.remove(archive_path)
return temp_extract_dir

except Exception as e:
logger.error(f"Failed handling upstream fetching/extraction for v{version}: {str(e)}")
return ""

def audit_code_changes(self, old_dir: str, new_dir: str):
"""
Step 3: Code Changes Audit.
Compares directory structures to identify new files, and scans code signatures.
"""
old_files = set()
new_files_map = {}

# Walk through the old version files to establish a baseline
for root, _, files in os.walk(old_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), old_dir)
# Strip the top-level folder name inside the tarball to make paths clean
clean_rel_path = "/".join(rel_path.split(os.sep)[1:])
old_files.add(clean_rel_path)

# Walk through the new version files to identify added files and inspect contents
for root, _, files in os.walk(new_dir):
for file in files:
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, new_dir)
clean_rel_path = "/".join(rel_path.split(os.sep)[1:])

# Check if this file didn't exist in the old version
if clean_rel_path and clean_rel_path not in old_files:
# Ignore metadata directories like egg-info or PKG-INFO
if not any(x in clean_rel_path for x in ["egg-info", "PKG-INFO", "setup.cfg"]):
self.added_files.append(clean_rel_path)

# If it's a Python file, scan it for critical risks like eval()
if file.endswith(".py"):
self._scan_file_ast(full_path, clean_rel_path)

def _scan_file_ast(self, real_file_path: str, clean_display_path: str):
"""
Runs an Abstract Syntax Tree (AST) scan looking for high-risk language primitives.
"""
try:
with open(real_file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()

tree = ast.parse(content, filename=real_file_path)

# Node walker to look for function calls named 'eval'
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id == "eval":
line_no = node.lineno
self.suspicious_patterns.append(
f"eval() usage in {clean_display_path}:{line_no}"
)
except Exception as e:
# If a single file fails parsing, log it but keep scanning other files safely
logger.debug(f"Could not parse AST for {clean_display_path}: {str(e)}")

def run_audit(self) -> dict:
"""
Main execution flow for the delta audit pipeline.
"""
if not all([self.dependency_name, self.old_version, self.new_version]):
return {"error": "Missing dependency details. Ensure manifest diffing runs successfully first."}

try:
logger.info(f"Starting delta analysis for {self.dependency_name}: {self.old_version} -> {self.new_version}")

old_dir = self.fetch_upstream_package(self.old_version)
new_dir = self.fetch_upstream_package(self.new_version)

if not old_dir or not new_dir:
return {"error": "Failed to download and extract package versions from upstream registry."}

# Execute step 3 audit logic
self.audit_code_changes(old_dir, new_dir)

return self.generate_output_payload()

except Exception as e:
logger.error(f"Error during delta analysis execution: {str(e)}")
return {"error": f"Analysis failed: {str(e)}"}

def generate_output_payload(self) -> dict:
"""
Formats the final audited results into the expected output schema.
"""
return {
"supply_chain_diff": {
"dependency": self.dependency_name,
"upgrade_path": f"{self.old_version} -> {self.new_version}",
"risk_assessment": {
"added_files": self.added_files,
"suspicious_patterns_detected": self.suspicious_patterns,
"overall_risk_score": self._calculate_risk_score()
}
}
}

def _calculate_risk_score(self) -> str:
"""
Internal helper to evaluate overall risk level.
"""
if self.suspicious_patterns:
return "elevated"
return "low"
48 changes: 48 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,3 +1410,51 @@ async def get_blast_radius(org_job_id: str):
links.append({"source": repo, "target": pkg})

return {"nodes": list(nodes_dict.values()), "links": links}

# ==========================================
# SARIF Telemetry Export Endpoint
# ==========================================
from app.sarif_translator import SarifTranslator

@app.get("/api/export/sarif")
async def export_sarif_telemetry():
try:
# Define path to the target triage file seen in your project root
base_dir = Path(__file__).resolve().parent.parent
semgrep_file_path = base_dir / "semgrep_out.json"

if not semgrep_file_path.exists():
raise HTTPException(
status_code=404,
detail="Source vulnerability triage metrics file (semgrep_out.json) not found."
)

# Read the raw metrics data
with open(semgrep_file_path, "r", encoding="utf-8") as f:
raw_data = json.load(f)

# Extract the elements array (Semgrep usually nests findings under "results")
if isinstance(raw_data, dict) and "results" in raw_data:
triage_elements = raw_data["results"]
elif isinstance(raw_data, list):
triage_elements = raw_data
else:
triage_elements = []

# Run the translator pipeline
translator = SarifTranslator(triage_elements)
sarif_payload = translator.generate_payload()

# Return compliance-ready payload with standard download headers
return Response(
content=json.dumps(sarif_payload, indent=2),
media_type="application/sarif+json",
headers={
"Content-Disposition": "attachment; filename=patchpilot-report.sarif"
}
)

except json.JSONDecodeError:
raise HTTPException(status_code=500, detail="Invalid JSON format in source triage metrics.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to compile SARIF telemetry: {str(e)}")
79 changes: 79 additions & 0 deletions backend/app/sarif_translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
from typing import List, Dict, Any

class SarifTranslator:
def __init__(self, triage_elements: List[Dict[str, Any]]):
"""
Initializes the translator with PatchPilot's internal triage JSON array elements.
"""
self.triage_elements = triage_elements

def _map_severity(self, internal_severity: str) -> str:
"""
Maps internal tool severities to official SARIF levels:
'error', 'warning', 'note', or 'none'.
"""
severity_upper = str(internal_severity).upper()
if severity_upper in ["ERROR", "CRITICAL", "HIGH"]:
return "error"
elif severity_upper in ["WARNING", "MEDIUM"]:
return "warning"
elif severity_upper in ["INFO", "NOTE", "LOW"]:
return "note"
else:
return "none"

def _translate_result(self, element: Dict[str, Any]) -> Dict[str, Any]:
"""
Translates a single PatchPilot internal element into a SARIF result item.
"""
rule_id = element.get("check_id", "patchpilot-generic-rule")
extra_data = element.get("extra", {})
message_text = extra_data.get("message", "No message provided by tool.")
raw_severity = extra_data.get("severity", "WARNING")

sarif_result = {
"ruleId": rule_id,
"message": {
"text": message_text
},
"level": self._map_severity(raw_severity),
"locations": [
{
"physicalLocation": {
"artifactLocation": {
"uri": element.get("path", "unknown_file")
},
"region": {
"startLine": element.get("start", {}).get("line", 1),
"startColumn": element.get("start", {}).get("col", 1),
"endLine": element.get("end", {}).get("line", 1),
"endColumn": element.get("end", {}).get("col", 1)
}
}
}
]
}
return sarif_result

def generate_payload(self) -> Dict[str, Any]:
"""
Executes the pipeline and returns the full schema-compliant SARIF payload.
"""
sarif_results = [self._translate_result(item) for item in self.triage_elements]

payload = {
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "PatchPilot Engine"
}
},
"results": sarif_results
}
]
}
return payload
36 changes: 36 additions & 0 deletions backend/semgrep_out.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"results": [
{
"check_id": "rules.python.security.injection.sql-injection",
"path": "backend/app/main.py",
"start": {
"line": 125,
"col": 14
},
"end": {
"line": 125,
"col": 45
},
"extra": {
"message": "Remote user input directly concatenated into SQL string. Potential SQL Injection.",
"severity": "ERROR"
}
},
{
"check_id": "rules.python.maintainability.unused-import",
"path": "backend/app/db.py",
"start": {
"line": 4,
"col": 1
},
"end": {
"line": 4,
"col": 13
},
"extra": {
"message": "Imported module 'sys' is never used.",
"severity": "INFO"
}
}
]
}
29 changes: 29 additions & 0 deletions backend/test_sarif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json
from app.sarif_translator import SarifTranslator

def run_test():
print("🔄 Reading semgrep_out.json...")
try:
with open("semgrep_out.json", "r", encoding="utf-8") as f:
raw_data = json.load(f)

# Get the findings array
triage_elements = raw_data.get("results", [])
print(f"📋 Found {len(triage_elements)} raw vulnerability elements.")

# Translate
print("⚙️ Running data through SarifTranslator pipeline...")
translator = SarifTranslator(triage_elements)
sarif_payload = translator.generate_payload()

# Print a snippet of the result to verify structure
print("\n✅ Success! Here is a preview of your generated SARIF payload:")
print(json.dumps(sarif_payload, indent=2)[:600] + "\n\n... [truncated preview] ...")

except FileNotFoundError:
print("❌ Error: Could not find 'semgrep_out.json' in the backend root directory.")
except Exception as e:
print(f"❌ Test failed with error: {str(e)}")

if __name__ == "__main__":
run_test()
Loading