diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 0f43bff71..76b1e90e6 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -1191,6 +1191,33 @@ async def _build_result_contract( target=target, findings=[item for item in result.get("findings", []) if isinstance(item, dict)], ) + + try: + from .remediation import build_dependency_graph, validate_remediation + graph = build_dependency_graph(target) + validations = {} + for f in normalized_findings: + remediation_str = f.get("remediation", "") + if remediation_str: + val_res = validate_remediation(remediation_str, graph) + validations[id(f)] = val_res + + for f in normalized_findings: + if id(f) in validations: + val_res = validations[id(f)] + f_metadata = f.setdefault("metadata", {}) + f_metadata["safe_to_apply"] = val_res["safe_to_apply"] + f_metadata["compatible_range"] = val_res["compatible_range"] + f_metadata["alternatives"] = val_res["alternatives"] + except Exception as e: + logger.warning( + "Remediation safety validation failed for task %s (plugin %s): %s. Skipping safety metadata enrichment.", + task_id, + plugin_id, + str(e), + exc_info=True, + ) + previous_findings = await self._load_previous_task_findings( db, owner_id=owner_id, diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index b6cb61c03..b7501ff47 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -235,6 +235,9 @@ class Finding(BaseModel): evidence_count: int = 0 analyst_status: AnalystStatus = AnalystStatus.NEW retest_status: RetestStatus = RetestStatus.NOT_REQUESTED + safe_to_apply: Optional[bool] = None + compatible_range: Optional[str] = None + alternatives: Optional[List[str]] = None class TaskResult(BaseModel): diff --git a/backend/secuscan/remediation.py b/backend/secuscan/remediation.py new file mode 100644 index 000000000..0cc76e1c8 --- /dev/null +++ b/backend/secuscan/remediation.py @@ -0,0 +1,384 @@ +""" +Dependency graph resolution and remediation conflict validation. +""" + +import json +import re +import importlib.metadata +from pathlib import Path +from typing import Dict, List, Any, Tuple +from packaging.version import Version +from packaging.specifiers import SpecifierSet + + +def normalize_package_name(name: str) -> str: + """Normalize a package name to lowercase with PEP 503 compatibility.""" + return re.sub(r"[-_.]+", "-", name).strip().lower() + + +def clean_version_string(ver_str: str) -> str: + """Extract numeric prefix from version strings for comparison.""" + ver_str = ver_str.strip().lower() + if ver_str.startswith("v"): + ver_str = ver_str[1:] + # Match the first sequence of digits and dots (e.g., "1.2.3" in "1.2.3-ubuntu") + match = re.match(r"^([0-9]+(?:\.[0-9]+)*)", ver_str) + if match: + return match.group(1) + return ver_str + + +def parse_remediation_suggestion(remediation_str: str) -> Tuple[str, str] | None: + """Parse recommendation string to extract package name and target upgrade version. + + Example: "Update framer-motion to version 11.0.0" -> ("framer-motion", "11.0.0") + """ + pattern = r"(?:update|upgrade)\s+([a-zA-Z0-9_\-\.]+)\s+(?:to\s+)?(?:version\s+)?([a-zA-Z0-9_\-\.\+\~]+)" + match = re.search(pattern, remediation_str, re.IGNORECASE) + if match: + pkg_name = normalize_package_name(match.group(1)) + version = match.group(2) + return pkg_name, version + return None + + +def handle_caret(ver_str: str) -> List[str]: + """Convert NPM caret specification to PEP 440 constraints. + + ^1.2.3 -> >=1.2.3, <2.0.0 + ^0.2.3 -> >=0.2.3, <0.3.0 + ^0.0.3 -> >=0.0.3, <0.0.4 + """ + parts = ver_str.split(".") + while len(parts) < 3: + parts.append("0") + + major = "".join(filter(str.isdigit, parts[0])) or "0" + minor = "".join(filter(str.isdigit, parts[1])) or "0" + patch = "".join(filter(str.isdigit, parts[2])) or "0" + + if major != "0": + next_major = int(major) + 1 + return [f">={ver_str}", f"<{next_major}.0.0"] + elif minor != "0": + next_minor = int(minor) + 1 + return [f">={ver_str}", f"<0.{next_minor}.0"] + else: + next_patch = int(patch) + 1 + return [f">={ver_str}", f"<0.0.{next_patch}"] + + +def handle_tilde(ver_str: str) -> List[str]: + """Convert NPM tilde specification to PEP 440 constraints. + + ~1.2.3 -> >=1.2.3, <1.3.0 + ~1.2 -> >=1.2.0, <1.3.0 + """ + parts = ver_str.split(".") + while len(parts) < 2: + parts.append("0") + major = "".join(filter(str.isdigit, parts[0])) or "0" + minor = "".join(filter(str.isdigit, parts[1])) or "0" + next_minor = int(minor) + 1 + return [f">={ver_str}", f"<{major}.{next_minor}.0"] + + +def handle_wildcard(part: str) -> List[str]: + """Convert wildcard version strings (e.g. 1.x or 1.*) to PEP 440 constraints.""" + part = part.replace("*", "x") + parts = part.split(".") + if len(parts) == 1 or parts[0] == "x": + return [] + if len(parts) == 2 or parts[1] == "x": + major = "".join(filter(str.isdigit, parts[0])) or "0" + next_major = int(major) + 1 + return [f">={major}.0.0", f"<{next_major}.0.0"] + if parts[2] == "x": + major = "".join(filter(str.isdigit, parts[0])) or "0" + minor = "".join(filter(str.isdigit, parts[1])) or "0" + next_minor = int(minor) + 1 + return [f">={major}.{minor}.0", f"<{major}.{next_minor}.0"] + return [] + + +def semver_to_pep440(semver_str: str) -> SpecifierSet: + """Convert NPM/semver package version specifier into PEP 440 SpecifierSet.""" + semver_str = semver_str.strip() + if not semver_str or semver_str in ("*", "x", "any"): + return SpecifierSet() + + parts = semver_str.split() + pep440_parts = [] + + for part in parts: + part = part.strip() + if not part: + continue + + if part.startswith("^"): + pep440_parts.extend(handle_caret(part[1:])) + elif part.startswith("~"): + pep440_parts.extend(handle_tilde(part[1:])) + elif "x" in part or "*" in part: + pep440_parts.extend(handle_wildcard(part)) + elif part.startswith((">=", "<=", ">", "<", "==")): + match = re.match(r"^(>=|<=|>|<|==)\s*([0-9a-zA-Z\.\-\+]+)$", part) + if match: + op, ver = match.groups() + pep440_parts.append(f"{op}{ver}") + else: + if re.match(r"^[0-9]+(?:\.[0-9]+)?$", part): + pep440_parts.extend(handle_wildcard(part + ".x")) + elif re.match(r"^[0-9a-zA-Z\.\-\+]+$", part): + pep440_parts.append(f"=={part}") + + try: + return SpecifierSet(",".join(pep440_parts)) + except Exception: + return SpecifierSet() + + +def parse_package_lock(filepath: str) -> Dict[str, List[Tuple[str, str]]]: + """Parse a package-lock.json and extract direct and transitive package dependency requirements.""" + try: + with open(filepath, "r", encoding="utf-8") as f: + data = json.load(f) + except Exception: + return {} + + relations = {} + + # Check for packages key (modern NPM lockfile v2/v3) + packages = data.get("packages", {}) + for path, info in packages.items(): + if not path: + parent = "root" + else: + parent = path.replace("node_modules/", "") + + deps = info.get("dependencies", {}) + peer_deps = info.get("peerDependencies", {}) + all_deps = {**deps, **peer_deps} + + if all_deps: + relations[parent] = [(normalize_package_name(k), v) for k, v in all_deps.items()] + + # Fallback to dependencies key (NPM lockfile v1) + dependencies = data.get("dependencies", {}) + def parse_v1_deps(deps_dict): + for name, info in deps_dict.items(): + requires = info.get("requires", {}) + if requires: + relations[name] = [(normalize_package_name(k), v) for k, v in requires.items()] + child_deps = info.get("dependencies", {}) + if child_deps: + parse_v1_deps(child_deps) + + if not packages and dependencies: + parse_v1_deps(dependencies) + + return relations + + +def parse_package_json(filepath: str) -> Dict[str, List[Tuple[str, str]]]: + """Parse a package.json for direct project dependencies.""" + try: + with open(filepath, "r", encoding="utf-8") as f: + data = json.load(f) + deps = data.get("dependencies", {}) + dev_deps = data.get("devDependencies", {}) + peer_deps = data.get("peerDependencies", {}) + all_deps = {**deps, **dev_deps, **peer_deps} + return { + "root": [(normalize_package_name(k), v) for k, v in all_deps.items()] + } + except Exception: + return {} + + +def parse_requirement_line(line: str) -> Tuple[str, SpecifierSet] | None: + """Parse a single requirements.txt line into a normalized package name and SpecifierSet.""" + line = line.strip() + if not line or line.startswith(('#', '-')): + return None + # Strip environment markers (e.g. "pydantic; python_version >= '3.8'") + line = line.split(";")[0].strip() + match = re.match(r"^([a-zA-Z0-9_\-\.]+)\s*(.*)$", line) + if not match: + return None + name, spec_str = match.groups() + # Normalize comparison operators if present + spec_str = spec_str.strip() + name = normalize_package_name(name) + try: + spec = SpecifierSet(spec_str) + except Exception: + spec = SpecifierSet() + return name, spec + +def get_python_transitive_dependencies(package_name: str) -> List[Tuple[str, SpecifierSet]]: + """Retrieve python transitive dependencies from installed metadata.""" + try: + reqs = importlib.metadata.requires(package_name) + if not reqs: + return [] + dependencies = [] + for req in reqs: + req_clean = req.split(";")[0].strip() + match = re.match(r"^([a-zA-Z0-9_\-\.]+)\s*\((.*)\)$", req_clean) + if match: + dep_name, dep_spec = match.groups() + else: + match2 = re.match(r"^([a-zA-Z0-9_\-\.]+)\s*(.*)$", req_clean) + if match2: + dep_name, dep_spec = match2.groups() + else: + continue + dep_name = normalize_package_name(dep_name) + try: + spec = SpecifierSet(dep_spec) + except Exception: + spec = SpecifierSet() + dependencies.append((dep_name, spec)) + return dependencies + except importlib.metadata.PackageNotFoundError: + return [] + + +def build_dependency_graph(target_dir: str) -> Dict[str, List[Dict[str, Any]]]: + """Scan the target directory for Python/Node manifests and construct a transitive dependency constraint graph.""" + graph: Dict[str, List[Dict[str, Any]]] = {} + + if not target_dir: + return graph + + target_path = Path(target_dir) + if not target_path.exists(): + return graph + + if target_path.is_file(): + target_path = target_path.parent + + # 1. Search for python requirements + req_files = ["requirements.txt", "requirements-dev.txt"] + for req_name in req_files: + p = target_path / req_name + if p.exists(): + try: + with open(p, "r", encoding="utf-8") as f: + for line in f: + parsed = parse_requirement_line(line) + if parsed: + name, spec = parsed + graph.setdefault(name, []).append({ + "parent": "root", + "specifier": spec + }) + + # Transitive resolution + for dep_name, dep_spec in get_python_transitive_dependencies(name): + graph.setdefault(dep_name, []).append({ + "parent": name, + "specifier": dep_spec + }) + except Exception: + pass + + # 2. Search for Node.js package-lock.json / package.json + lock_path = target_path / "package-lock.json" + pkg_path = target_path / "package.json" + + if lock_path.exists(): + try: + relations = parse_package_lock(str(lock_path)) + for parent, children in relations.items(): + for child_name, semver_str in children: + spec = semver_to_pep440(semver_str) + graph.setdefault(child_name, []).append({ + "parent": parent, + "specifier": spec + }) + except Exception: + pass + elif pkg_path.exists(): + try: + relations = parse_package_json(str(pkg_path)) + for parent, children in relations.items(): + for child_name, semver_str in children: + spec = semver_to_pep440(semver_str) + graph.setdefault(child_name, []).append({ + "parent": parent, + "specifier": spec + }) + except Exception: + pass + + return graph + + +def validate_remediation(remediation_str: str, graph: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: + """Validate a remediation string against a dependency graph, yielding safety status and alternative actions.""" + res = { + "safe_to_apply": True, + "compatible_range": None, + "alternatives": [] + } + + parsed = parse_remediation_suggestion(remediation_str) + if not parsed: + return res + + pkg_name, target_version = parsed + if pkg_name not in graph: + return res + + constraints = graph[pkg_name] + specifiers = [c["specifier"] for c in constraints] + + clean_target = clean_version_string(target_version) + + is_safe = True + try: + ver = Version(clean_target) + for c in constraints: + spec = c["specifier"] + if ver not in spec: + is_safe = False + break + except Exception: + # Fall back to safe if parsing error happens to prevent blocking valid tools + pass + + if not is_safe: + res["safe_to_apply"] = False + + # Combine all constraints to show the allowed range + combined_parts = [] + for c in constraints: + for spec in c["specifier"]: + combined_parts.append(str(spec)) + res["compatible_range"] = ", ".join(combined_parts) if combined_parts else "N/A" + + # Determine which packages impose conflicting requirements + try: + ver = Version(clean_target) + conflicting_parents = sorted(list({ + c["parent"] for c in constraints if ver not in c["specifier"] + })) + except Exception: + conflicting_parents = sorted(list({c["parent"] for c in constraints})) + + for parent in conflicting_parents: + if parent == "root": + res["alternatives"].append( + f"Update root project constraints for '{pkg_name}' to allow version {target_version}." + ) + else: + res["alternatives"].append( + f"Upgrade parent package '{parent}' to a version that supports '{pkg_name}' version {target_version}." + ) + res["alternatives"].append( + f"Downgrade or keep '{pkg_name}' within compatible range: {res['compatible_range']}." + ) + + return res diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index a070419d8..34cf97993 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -54,6 +54,12 @@ def deserialize_finding_rows(rows: List[Dict]) -> List[Dict[str, Any]]: finding["references"] = finding.pop("references_json") if "corroborating_sources_json" in finding: finding["corroborating_sources"] = finding.pop("corroborating_sources_json") + + # Expose remediation safety fields at the top level + metadata = finding.get("metadata", {}) or {} + finding["safe_to_apply"] = metadata.get("safe_to_apply") + finding["compatible_range"] = metadata.get("compatible_range") + finding["alternatives"] = metadata.get("alternatives") return findings @@ -2146,6 +2152,9 @@ async def get_finding_details(finding_id: str, owner: str = Depends(get_current_ "asset_exposure": finding_row.get("asset_exposure"), "risk_score": finding_row.get("risk_score"), "risk_factors": risk_factors, + "safe_to_apply": metadata.get("safe_to_apply"), + "compatible_range": metadata.get("compatible_range"), + "alternatives": metadata.get("alternatives"), } diff --git a/testing/backend/integration/test_routes_remediation_safety.py b/testing/backend/integration/test_routes_remediation_safety.py new file mode 100644 index 000000000..2954c09ea --- /dev/null +++ b/testing/backend/integration/test_routes_remediation_safety.py @@ -0,0 +1,89 @@ +import sqlite3 +import json +import pytest +from backend.secuscan.config import settings + +ALICE = {"X-User-Id": "alice"} +ALICE_OWNER = "user:alice" + +def _seed_task(owner_id: str, task_id: str) -> None: + """Insert a task row directly with an explicit owner_id.""" + conn = sqlite3.connect(settings.database_path) + try: + conn.execute( + """ + INSERT INTO tasks (id, owner_id, plugin_id, tool_name, target, + status, inputs_json, structured_json, consent_granted) + VALUES (?, ?, 'nmap', 'nmap', '127.0.0.1', 'completed', '{}', '{"findings": []}', 1) + """, + (task_id, owner_id), + ) + conn.commit() + finally: + conn.close() + +def _seed_finding(owner_id: str, finding_id: str, task_id: str, metadata: dict | None = None) -> None: + conn = sqlite3.connect(settings.database_path) + metadata_json = json.dumps(metadata) if metadata is not None else None + try: + conn.execute( + """ + INSERT INTO findings (id, owner_id, task_id, plugin_id, title, category, + severity, target, description, remediation, metadata_json) + VALUES (?, ?, ?, 'nmap', 'Open port', 'network', 'low', '127.0.0.1', 'desc', 'fix', ?) + """, + (finding_id, owner_id, task_id, metadata_json), + ) + conn.commit() + finally: + conn.close() + +def test_routes_expose_remediation_safety_fields(test_client): + """Test that safe_to_apply, compatible_range, and alternatives fields are exposed in API responses when present in metadata, and default to None otherwise.""" + _seed_task(ALICE_OWNER, "task-1") + + # 1. Seed finding with validated remediation metadata + metadata_validated = { + "safe_to_apply": False, + "compatible_range": "<2.0", + "alternatives": ["Upgrade package-y"], + "other_key": "some_value" + } + _seed_finding(ALICE_OWNER, "finding-validated", "task-1", metadata=metadata_validated) + + # 2. Seed finding without validated remediation metadata + metadata_unvalidated = { + "other_key": "some_value" + } + _seed_finding(ALICE_OWNER, "finding-unvalidated", "task-1", metadata=metadata_unvalidated) + + # 3. Test `/findings` list endpoint + response_list = test_client.get("/api/v1/findings", headers=ALICE) + assert response_list.status_code == 200 + findings_list = response_list.json()["findings"] + + finding_val = next(f for f in findings_list if f["id"] == "finding-validated") + assert finding_val["safe_to_apply"] is False + assert finding_val["compatible_range"] == "<2.0" + assert finding_val["alternatives"] == ["Upgrade package-y"] + + finding_unval = next(f for f in findings_list if f["id"] == "finding-unvalidated") + assert finding_unval["safe_to_apply"] is None + assert finding_unval["compatible_range"] is None + assert finding_unval["alternatives"] is None + + # 4. Test `/finding/{finding_id}` detail endpoint - Validated Case + response_detail_val = test_client.get("/api/v1/finding/finding-validated", headers=ALICE) + assert response_detail_val.status_code == 200 + detail_val = response_detail_val.json() + assert detail_val["safe_to_apply"] is False + assert detail_val["compatible_range"] == "<2.0" + assert detail_val["alternatives"] == ["Upgrade package-y"] + + # 5. Test `/finding/{finding_id}` detail endpoint - Unvalidated Case + response_detail_unval = test_client.get("/api/v1/finding/finding-unvalidated", headers=ALICE) + assert response_detail_unval.status_code == 200 + detail_unval = response_detail_unval.json() + assert detail_unval["safe_to_apply"] is None + assert detail_unval["compatible_range"] is None + assert detail_unval["alternatives"] is None diff --git a/testing/backend/unit/test_remediation_safety.py b/testing/backend/unit/test_remediation_safety.py new file mode 100644 index 000000000..710e0800b --- /dev/null +++ b/testing/backend/unit/test_remediation_safety.py @@ -0,0 +1,204 @@ +import json +import tempfile +from pathlib import Path +import pytest +from packaging.specifiers import SpecifierSet +from backend.secuscan.remediation import ( + normalize_package_name, + clean_version_string, + parse_remediation_suggestion, + semver_to_pep440, + parse_package_lock, + parse_package_json, + parse_requirement_line, + build_dependency_graph, + validate_remediation +) +from backend.secuscan.models import Finding + + +def test_normalize_package_name(): + assert normalize_package_name("pydantic_settings") == "pydantic-settings" + assert normalize_package_name("Flask-RESTful") == "flask-restful" + assert normalize_package_name(" PyJWT ") == "pyjwt" + assert normalize_package_name("libssl.1.1") == "libssl-1-1" + + +def test_clean_version_string(): + assert clean_version_string("v1.2.3") == "1.2.3" + assert clean_version_string("1.1.1f-1ubuntu2.23") == "1.1.1" + assert clean_version_string("3.0.0-rc1") == "3.0.0" + assert clean_version_string("invalid") == "invalid" + + +def test_parse_remediation_suggestion(): + res1 = parse_remediation_suggestion("Update framer-motion to version 11.0.0") + assert res1 == ("framer-motion", "11.0.0") + + res2 = parse_remediation_suggestion("upgrade library-x to 2.4.1") + assert res2 == ("library-x", "2.4.1") + + res3 = parse_remediation_suggestion("Apply secure controls") + assert res3 is None + + +def test_semver_to_pep440(): + # Carets + assert semver_to_pep440("^1.2.3") == SpecifierSet(">=1.2.3,<2.0.0") + assert semver_to_pep440("^0.2.3") == SpecifierSet(">=0.2.3,<0.3.0") + assert semver_to_pep440("^0.0.3") == SpecifierSet(">=0.0.3,<0.0.4") + + # Tildes + assert semver_to_pep440("~1.2.3") == SpecifierSet(">=1.2.3,<1.3.0") + assert semver_to_pep440("~1.2") == SpecifierSet(">=1.2.0,<1.3.0") + + # Wildcards + assert semver_to_pep440("1.x") == SpecifierSet(">=1.0.0,<2.0.0") + assert semver_to_pep440("1.*") == SpecifierSet(">=1.0.0,<2.0.0") + assert semver_to_pep440("1.2.x") == SpecifierSet(">=1.2.0,<1.3.0") + + # Partial without wildcards + assert semver_to_pep440("1.2") == SpecifierSet(">=1.2.0,<1.3.0") + assert semver_to_pep440("1") == SpecifierSet(">=1.0.0,<2.0.0") + + # Operators & ranges + assert semver_to_pep440(">=1.0.0 <2.0.0") == SpecifierSet(">=1.0.0,<2.0.0") + assert semver_to_pep440("<=2.0.0") == SpecifierSet("<=2.0.0") + + # Exact and fallbacks + assert semver_to_pep440("1.2.3") == SpecifierSet("==1.2.3") + assert semver_to_pep440("*") == SpecifierSet("") + + +def test_parse_requirement_line(): + assert parse_requirement_line("fastapi>=0.115.0") == ("fastapi", SpecifierSet(">=0.115.0")) + assert parse_requirement_line("cryptography>=42.0.0 ; extra == 'ssl'") == ("cryptography", SpecifierSet(">=42.0.0")) + assert parse_requirement_line(" # commented line") is None + assert parse_requirement_line("") is None + + +def test_parse_package_lock(): + lock_data = { + "packages": { + "": { + "dependencies": { + "framer-motion": "^10.0.0" + } + }, + "node_modules/framer-motion": { + "version": "10.16.4", + "dependencies": { + "react": "^18.0.0" + } + } + } + } + with tempfile.TemporaryDirectory() as tmpdir: + lock_file = Path(tmpdir) / "package-lock.json" + with open(lock_file, "w") as f: + json.dump(lock_data, f) + + relations = parse_package_lock(str(lock_file)) + assert "root" in relations + assert relations["root"] == [("framer-motion", "^10.0.0")] + assert "framer-motion" in relations + assert relations["framer-motion"] == [("react", "^18.0.0")] + + +def test_parse_package_json(): + pkg_data = { + "dependencies": { + "express": "^4.17.1" + }, + "devDependencies": { + "jest": "^26.0.0" + } + } + with tempfile.TemporaryDirectory() as tmpdir: + pkg_file = Path(tmpdir) / "package.json" + with open(pkg_file, "w") as f: + json.dump(pkg_data, f) + + relations = parse_package_json(str(pkg_file)) + assert "root" in relations + assert ("express", "^4.17.1") in relations["root"] + assert ("jest", "^26.0.0") in relations["root"] + + +def test_validate_remediation_no_conflict(): + # If package not in graph, defaults to safe + graph = {} + res = validate_remediation("Update framer-motion to version 11.0.0", graph) + assert res["safe_to_apply"] is True + assert res["compatible_range"] is None + assert len(res["alternatives"]) == 0 + + +def test_validate_remediation_with_conflict(): + # Setup graph where root requires library-y, which transitively requires library-x <2.0 + graph = { + "library-x": [ + {"parent": "library-y", "specifier": SpecifierSet("<2.0")} + ] + } + + # Suggest upgrade of library-x to 1.5.0 (compatible with <2.0) + res_safe = validate_remediation("Update library-x to version 1.5.0", graph) + assert res_safe["safe_to_apply"] is True + + # Suggest upgrade of library-x to 2.1.0 (conflicts with <2.0) + res_unsafe = validate_remediation("Update library-x to version 2.1.0", graph) + assert res_unsafe["safe_to_apply"] is False + assert res_unsafe["compatible_range"] == "<2.0" + assert len(res_unsafe["alternatives"]) > 0 + assert any("library-y" in alt for alt in res_unsafe["alternatives"]) + + +def test_finding_model_safety_fields(): + finding = Finding( + title="Outdated dependency", + category="Dependency Vulnerability", + severity="high", + target="package.json", + description="Vulnerability in library-x", + safe_to_apply=False, + compatible_range="<2.0", + alternatives=["Upgrade library-y"] + ) + assert finding.safe_to_apply is False + assert finding.compatible_range == "<2.0" + assert finding.alternatives == ["Upgrade library-y"] + + +def test_build_dependency_graph_fallback_disabled(): + """Verify that build_dependency_graph does not fall back to local manifests when target is invalid/nonexistent.""" + # 1. Non-existent directory + graph_nonexistent = build_dependency_graph("nonexistent_directory_123") + assert graph_nonexistent == {} + + # 2. URL/IP target + graph_url = build_dependency_graph("http://example.com/api") + assert graph_url == {} + + +def test_build_dependency_graph_python_transitive_mocked(): + """Test building dependency graph for Python requirements with mocked transitive dependencies.""" + from unittest.mock import patch + + req_content = "library-y>=1.0.0\n" + + with tempfile.TemporaryDirectory() as tmpdir: + req_file = Path(tmpdir) / "requirements.txt" + with open(req_file, "w", encoding="utf-8") as f: + f.write(req_content) + + # Mock get_python_transitive_dependencies to return a transitive dependency + mock_transitive = [("library-x", SpecifierSet("<2.0"))] + with patch("backend.secuscan.remediation.get_python_transitive_dependencies", return_value=mock_transitive): + graph = build_dependency_graph(tmpdir) + + assert "library-y" in graph + assert graph["library-y"] == [{"parent": "root", "specifier": SpecifierSet(">=1.0.0")}] + + assert "library-x" in graph + assert graph["library-x"] == [{"parent": "library-y", "specifier": SpecifierSet("<2.0")}]