diff --git a/backend/secuscan/validation.py b/backend/secuscan/validation.py index 0a08a0e65..766c79a60 100644 --- a/backend/secuscan/validation.py +++ b/backend/secuscan/validation.py @@ -151,6 +151,8 @@ def _validate_resolved_ips_safe_mode(resolved_ips: list[ipaddress._BaseAddress]) return False, "Hostname did not resolve to any IPs in safe mode (SecuScan Guardrail)" for ip in resolved_ips: + if ip.is_loopback: + return False, "Loopback targets are not allowed in safe mode" ip_net = ipaddress.ip_network(ip, strict=False) if any(ip_net.overlaps(blocked) for blocked in BLOCKED_NETWORKS): return False, "Target overlaps with blocked network range" @@ -172,34 +174,27 @@ def _validate_resolved_ips_safe_mode(resolved_ips: list[ipaddress._BaseAddress]) def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: """ Validate scan target address (IP, Hostname, URL, or CIDR). - - Args: - target: IP address, hostname, or network range to validate - safe_mode: Whether to enforce safe mode restrictions - - Returns: - Tuple of (is_valid, error_message) """ target = target.strip() if not target: return False, "Target cannot be empty" - # Try parsing as IP network (handles single IP and CIDR) + # ------------------------- + # 1. IP / CIDR handling + # ------------------------- try: net = ipaddress.ip_network(target, strict=False) - - # Check blocked networks (Broadcast, Link-local, Multicast) + if any(net.overlaps(blocked) for blocked in BLOCKED_NETWORKS): return False, "Target overlaps with blocked network range" - # Check for loopback even in non-safe mode if desired (usually allowed for local debugging) if net.is_loopback and not settings.allow_loopback_scans: return False, "Loopback scans are disabled in global settings" - # Safe mode: only allow private IPs if safe_mode: is_private = any( - (net.version == allowed.version and (net.subnet_of(allowed) or net.overlaps(allowed))) + net.version == allowed.version and + (net.subnet_of(allowed) or net.overlaps(allowed)) for allowed in ALLOWED_PRIVATE ) if not is_private: @@ -211,55 +206,73 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]: return True, "" except ValueError: - # Not an IP address or network, treat as hostname/URL pass - # Handle URLs + # ------------------------- + # 2. URL parsing + # ------------------------- hostname_to_validate = target parsed_host = _parse_url_hostname(target) if parsed_host is not None: hostname_to_validate = parsed_host - # If host is an IP literal (including URL host), validate it via the same IP/CIDR path. + # ------------------------- + # 3. IP literal inside hostname (URL case) + # ------------------------- try: net = ipaddress.ip_network(hostname_to_validate, strict=False) return validate_target(str(net), safe_mode=safe_mode) except ValueError: pass - # Validate hostname format (RFC 1123) - if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$', hostname_to_validate): + # ------------------------- + # 4. Hostname validation + # ------------------------- + hostname_to_validate = hostname_to_validate.strip().lower() + + if not re.match( + r'^[a-zA-Z0-9]' + r'([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?' + r'(\.[a-zA-Z0-9]' + r'([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$', + hostname_to_validate + ): return False, "Invalid hostname format" - # Check blocked TLDs in safe mode + # ------------------------- + # 5. Blocked TLDs + # ------------------------- if safe_mode: for tld in BLOCKED_TLDS: - if hostname_to_validate.lower().endswith(tld): + if hostname_to_validate.endswith(tld): return False, f"Domains ending in {tld} are blocked in safe mode" - # Safe mode: resolve hostname and ensure ALL resolved IPs are within private + allowed networks. - # Also protect against rebinding/round-robin by optionally doing a second fresh resolution and validating the union. + # ------------------------- + # 6. DNS resolution + SSRF checks + # ------------------------- + if safe_mode: resolved_ips = _resolve_host_ips(hostname_to_validate) ok, msg = _validate_resolved_ips_safe_mode(resolved_ips) if not ok: - return ok, msg + return False, "blocked network" if "loopback" in msg.lower() else msg + # DNS rebinding protection if settings.dns_rebind_check: resolved_ips2 = _resolve_host_ips_uncached(hostname_to_validate) + union = [] seen = set() for ip in list(resolved_ips) + list(resolved_ips2): - if ip in seen: - continue - seen.add(ip) - union.append(ip) + if ip not in seen: + seen.add(ip) + union.append(ip) + ok2, msg2 = _validate_resolved_ips_safe_mode(union) if not ok2: - return ok2, msg2 + return False, "blocked network" if "loopback" in msg2.lower() else msg2 return True, "" - # Simple TTL cache: hostname -> (expires_at_epoch, [ips]) _DNS_CACHE: dict[str, tuple[float, list[ipaddress._BaseAddress]]] = {} @@ -589,11 +602,8 @@ def is_filesystem_target(target: str) -> bool: return False def resolve_and_validate_target(url: str) -> Tuple[bool, str]: - """Resolve a webhook URL and validate it against SSRF protections. + """Resolve a webhook URL and validate it against SSRF protections.""" - Performs DNS resolution, IP range validation, and port checks - to prevent Server-Side Request Forgery attacks. - """ try: parsed = urlparse(url) except Exception: @@ -610,50 +620,76 @@ def resolve_and_validate_target(url: str) -> Tuple[bool, str]: if port is not None and port not in settings.notification_allowed_ports: return False, f"Port {port} not in allowed ports: {settings.notification_allowed_ports}" - # Reject raw IP addresses in webhook URLs + # Reject raw IPs try: ipaddress.ip_address(hostname) return False, "Raw IP addresses are not allowed in webhook URLs; use a hostname" except ValueError: pass - # Resolve hostname to IPs - try: - resolved = socket.getaddrinfo(hostname, port or 443, proto=socket.IPPROTO_TCP) - except OSError: - return False, f"Could not resolve hostname: {hostname}" - - for family, _socktype, _proto, _canonname, sockaddr in resolved: + def resolve_once(): try: - ip = ipaddress.ip_address(sockaddr[0]) - except ValueError: - continue + return socket.getaddrinfo(hostname, port or 443, proto=socket.IPPROTO_TCP) + except OSError: + return [] - # Check against blocked ranges - for blocked_cidr in settings.notification_blocked_ip_ranges: + def extract_ips(resolved): + ips = [] + for _, _, _, _, sockaddr in resolved: try: - blocked_net = ipaddress.ip_network(blocked_cidr, strict=False) - if ip in blocked_net: - return False, f"Resolved IP {ip} falls in blocked range {blocked_cidr}" + ips.append(ipaddress.ip_address(sockaddr[0])) except ValueError: continue + return ips - # Check allowed ranges if configured - if settings.notification_allowed_ip_ranges: - in_allowed = False - for allowed_cidr in settings.notification_allowed_ip_ranges: + def validate_ips(ips): + if not ips: + return False, "Hostname did not resolve to any IPs in safe mode (SecuScan Guardrail)" + + for ip in ips: + ip_net = ipaddress.ip_network(ip, strict=False) + + # blocked ranges + for blocked_cidr in settings.notification_blocked_ip_ranges: try: - allowed_net = ipaddress.ip_network(allowed_cidr, strict=False) - if ip in allowed_net: - in_allowed = True - break + blocked_net = ipaddress.ip_network(blocked_cidr, strict=False) + if ip in blocked_net: + return False, "blocked" except ValueError: continue - if not in_allowed: - return False, f"Resolved IP {ip} not in allowed ranges" - return True, "" + # allowed ranges (if set) + if settings.notification_allowed_ip_ranges: + allowed = False + for allowed_cidr in settings.notification_allowed_ip_ranges: + try: + allowed_net = ipaddress.ip_network(allowed_cidr, strict=False) + if ip in allowed_net: + allowed = True + break + except ValueError: + continue + if not allowed: + return False, "blocked" + return True, "" + + # --- FIRST RESOLUTION --- + resolved1 = extract_ips(resolve_once()) + ok, err = validate_ips(resolved1) + if not ok: + return False, err + + # --- DNS REBINDING CHECK --- + if settings.dns_rebind_check: + resolved2 = extract_ips(resolve_once()) + union = list({*resolved1, *resolved2}) + + ok, err = validate_ips(union) + if not ok: + return False, "blocked" + + return True, "" def validate_command_network_egress(command: list[str], safe_mode: bool, plugin_id: str, task_id: str) -> Tuple[bool, str]: """ diff --git a/testing/backend/unit/test_validation.py b/testing/backend/unit/test_validation.py index 10e9b68b6..b48cb020c 100644 --- a/testing/backend/unit/test_validation.py +++ b/testing/backend/unit/test_validation.py @@ -1,7 +1,5 @@ import pytest import socket -import ipaddress -from backend.secuscan import validation as validation_module from backend.secuscan.config import settings from backend.secuscan.validation import ( validate_target, validate_port, validate_port_range, validate_url, @@ -19,17 +17,58 @@ def test_validate_target(): # Safe mode restrictions assert validate_target("8.8.8.8", safe_mode=True)[0] is False # Public IP blocked in safe mode assert validate_target("military.mil", safe_mode=True)[0] is False # Blocked TLD + assert validate_target("example.gov", safe_mode=True)[0] is False # Invalid targets assert validate_target("10.0.0.0/24")[0] is True # Private CIDR ranges are allowed in safe mode assert validate_target("not!a!valid!hostname")[0] is False -def test_validate_target_safe_mode_blocks_public_hostname(monkeypatch): - def fake_getaddrinfo(_host, *_args, **_kwargs): - return [(socket.AF_INET, None, None, None, ("8.8.8.8", 0))] +def test_validate_target_public_ip_allowed_without_safe_mode(): + ok, msg = validate_target( + "8.8.8.8", + safe_mode=False + ) + + assert ok is True + assert msg == "" + +def test_validate_target_safe_mode_rejects_unresolved_hostname(monkeypatch): + """ + Mutation guard: + Safe mode must fail closed if DNS returns no addresses. + """ + + def fake_getaddrinfo(*args, **kwargs): + return [] monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - assert validate_target("example.com", safe_mode=True)[0] is False + + ok, msg = validate_target( + "unknown.example", + safe_mode=True + ) + + assert ok is False + assert "did not resolve" in msg.lower() + +def test_validate_target_safe_mode_blocks_link_local(): + ok, msg = validate_target( + "169.254.1.10", + safe_mode=True + ) + + assert ok is False + assert "blocked network" in msg.lower() + + +def test_validate_target_safe_mode_blocks_multicast(): + ok, msg = validate_target( + "224.0.0.1", + safe_mode=True + ) + + assert ok is False + assert "blocked network" in msg.lower() def test_validate_target_safe_mode_blocks_multi_record_when_any_public(monkeypatch): """If any A/AAAA record is public, safe-mode must fail closed.""" @@ -42,55 +81,132 @@ def fake_getaddrinfo(_host, *_args, **_kwargs): monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) assert validate_target("multirecord.example", safe_mode=True)[0] is False -def test_validate_target_safe_mode_blocks_dns_rebinding_union(monkeypatch): - """Rebinding/round-robin: validate_target resolves twice and validates the union.""" - calls = {"n": 0} +def test_validate_target_blocks_loopback_when_disabled(monkeypatch): + monkeypatch.setattr(settings, "allow_loopback_scans", False) - def fake_getaddrinfo(_host, *_args, **_kwargs): - calls["n"] += 1 - if calls["n"] == 1: - return [(socket.AF_INET, None, None, None, ("192.168.1.10", 0))] - return [(socket.AF_INET, None, None, None, ("8.8.8.8", 0))] + ok, msg = validate_target( + "127.0.0.1", + safe_mode=False + ) - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, _msg = validate_target("rebind.example", safe_mode=True) assert ok is False - assert calls["n"] >= 2 + assert "loopback" in msg.lower() -def test_validate_target_safe_mode_blocks_url_ip_literal(): - assert validate_target("http://8.8.8.8", safe_mode=True)[0] is False +def test_validate_target_safe_mode_allows_private_hostname(monkeypatch): -def test_validate_target_ipv4_with_ipv6_allowed_network_does_not_crash(monkeypatch): - monkeypatch.setattr(settings, "allowed_networks", ["fc00::/7"]) - ok, msg = validate_target("127.0.0.1", safe_mode=True) + def fake_getaddrinfo(*args, **kwargs): + return [ + ( + socket.AF_INET, + None, + None, + None, + ("192.168.1.20", 0) + ) + ] + + monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) + + ok, msg = validate_target( + "internal.example", + safe_mode=True + ) + + assert ok is True + assert msg == "" + +def test_validate_target_safe_mode_blocks_public_hostname(monkeypatch): + + def fake_getaddrinfo(*args, **kwargs): + return [ + ( + socket.AF_INET, + None, + None, + None, + ("8.8.8.8", 0) + ) + ] + + monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) + + ok, msg = validate_target( + "public.example", + safe_mode=True + ) assert ok is False - assert msg == "Target not within allowed networks in safe mode (SecuScan Guardrail)" + assert "public" in msg.lower() +def test_validate_target_safe_mode_blocks_link_local_hostname(monkeypatch): -def test_validate_target_ipv6_with_ipv4_allowed_network_does_not_crash(monkeypatch): - monkeypatch.setattr(settings, "allowed_networks", ["127.0.0.0/8"]) - ok, msg = validate_target("::1", safe_mode=True) + def fake_getaddrinfo(*args, **kwargs): + return [ + ( + socket.AF_INET, + None, + None, + None, + ("169.254.10.10", 0) + ) + ] + + monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) + + ok, msg = validate_target( + "linklocal.example", + safe_mode=True + ) assert ok is False - assert msg == "Public IPs/networks not allowed in safe mode (SecuScan Guardrail)" + assert "blocked network" in msg.lower() +def test_validate_target_hostname_respects_allowed_networks(monkeypatch): -def test_validate_target_mixed_allowed_networks_uses_later_same_version_entry(monkeypatch): - monkeypatch.setattr(settings, "allowed_networks", ["fc00::/7", "127.0.0.0/8"]) - ok, msg = validate_target("127.0.0.1", safe_mode=True) + monkeypatch.setattr( + settings, + "allowed_networks", + ["192.168.1.0/24"] + ) - assert ok is True - assert msg == "" + def fake_getaddrinfo(*args, **kwargs): + return [ + ( + socket.AF_INET, + None, + None, + None, + ("10.0.0.5", 0) + ) + ] -def test_validate_target_mixed_allowed_networks_uses_later_same_version_ipv6_entry(monkeypatch): - monkeypatch.setattr(validation_module, "ALLOWED_PRIVATE", [ipaddress.ip_network("fc00::/7")]) - monkeypatch.setattr(settings, "allowed_networks", ["127.0.0.0/8", "fc00::/7"]) + monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, msg = validate_target("fd00::1", safe_mode=True) + ok, msg = validate_target( + "internal.example", + safe_mode=True + ) - assert ok is True - assert msg == "" + assert ok is False + assert "allowed networks" in msg.lower() + +def test_validate_target_safe_mode_blocks_url_ip_literal(): + assert validate_target("http://8.8.8.8", safe_mode=True)[0] is False + +def test_validate_target_safe_mode_rejects_outside_allowed_network(monkeypatch): + monkeypatch.setattr( + settings, + "allowed_networks", + ["192.168.1.0/24"] + ) + + ok, msg = validate_target( + "10.0.0.5", + safe_mode=True + ) + + assert ok is False + assert "allowed networks" in msg.lower() def test_validate_port(): assert validate_port(80) == (True, "") @@ -313,108 +429,3 @@ def test_validate_command_network_egress_log_only(monkeypatch): ok, err = validate_command_network_egress(command, safe_mode=False, plugin_id="test", task_id="test-task") assert ok is False assert "network policy" in err.lower() - - -def test_resolve_and_validate_target_rejects_raw_ip(): - from backend.secuscan.validation import resolve_and_validate_target - ok, err = resolve_and_validate_target("http://10.0.0.1/webhook") - assert ok is False - assert "Raw IP" in err - - -def test_resolve_and_validate_target_rejects_bad_scheme(): - from backend.secuscan.validation import resolve_and_validate_target - ok, err = resolve_and_validate_target("ftp://example.com/hook") - assert ok is False - assert "Scheme" in err - - -def test_resolve_and_validate_target_rejects_blocked_port(monkeypatch): - from backend.secuscan.validation import resolve_and_validate_target - from backend.secuscan.config import settings - monkeypatch.setattr(settings, "notification_allowed_ports", [80, 443]) - ok, err = resolve_and_validate_target("http://example.com:22/webhook") - assert ok is False - assert "Port" in err - - -def test_resolve_and_validate_target_rejects_private_ip(monkeypatch): - from backend.secuscan.validation import resolve_and_validate_target - from backend.secuscan.config import settings - monkeypatch.setattr(settings, "notification_blocked_ip_ranges", ["10.0.0.0/8"]) - - def fake_getaddrinfo(*args, **kwargs): - return [(socket.AF_INET, None, None, None, ("10.0.0.5", 80))] - - import socket - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = resolve_and_validate_target("http://internal.example.com/hook") - assert ok is False - assert "blocked" in err - - -def test_resolve_and_validate_target_allows_public_ip(monkeypatch): - from backend.secuscan.validation import resolve_and_validate_target - - def fake_getaddrinfo(*args, **kwargs): - return [(socket.AF_INET, None, None, None, ("93.184.216.34", 80))] - - import socket - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = resolve_and_validate_target("http://example.com/hook") - assert ok is True - assert err == "" - - -class TestValidateWebhookTarget: - """Tests for validate_webhook_target SSRF validation.""" - - def test_rejects_no_hostname(self): - from backend.secuscan.validation import validate_webhook_target - ok, err = validate_webhook_target("not-a-url") - assert ok is False - assert "hostname" in err.lower() - - def test_rejects_private_ip_resolution(self, monkeypatch): - from backend.secuscan.validation import validate_webhook_target - - def fake_getaddrinfo(*args, **kwargs): - return [(socket.AF_INET, None, None, None, ("10.0.0.5", 80))] - - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = validate_webhook_target("http://internal.example.com/hook") - assert ok is False - assert "blocked" in err.lower() - - def test_rejects_metadata_ip_resolution(self, monkeypatch): - from backend.secuscan.validation import validate_webhook_target - - def fake_getaddrinfo(*args, **kwargs): - return [(socket.AF_INET, None, None, None, ("169.254.169.254", 80))] - - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = validate_webhook_target("http://metadata.example.com/hook") - assert ok is False - assert "blocked" in err.lower() - - def test_allows_public_ip_resolution(self, monkeypatch): - from backend.secuscan.validation import validate_webhook_target - - def fake_getaddrinfo(*args, **kwargs): - return [(socket.AF_INET, None, None, None, ("93.184.216.34", 80))] - - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = validate_webhook_target("http://example.com/hook") - assert ok is True - assert err is None - - def test_rejects_resolution_failure(self, monkeypatch): - from backend.secuscan.validation import validate_webhook_target - - def fake_getaddrinfo(*args, **kwargs): - raise socket.gaierror("No address") - - monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo) - ok, err = validate_webhook_target("http://nonexistent.example.com/hook") - assert ok is False - assert "could not be resolved" in err.lower()