Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 96 additions & 60 deletions backend/secuscan/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def _validate_resolved_ips_safe_mode(resolved_ips: list[ipaddress._BaseAddress])
return False, "Hostname did not resolve to any IPs in safe mode (SecuScan Guardrail)"

for ip in resolved_ips:
if ip.is_loopback:
return False, "Loopback targets are not allowed in safe mode"
ip_net = ipaddress.ip_network(ip, strict=False)
if any(ip_net.overlaps(blocked) for blocked in BLOCKED_NETWORKS):
return False, "Target overlaps with blocked network range"
Expand All @@ -172,34 +174,27 @@ def _validate_resolved_ips_safe_mode(resolved_ips: list[ipaddress._BaseAddress])
def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]:
"""
Validate scan target address (IP, Hostname, URL, or CIDR).

Args:
target: IP address, hostname, or network range to validate
safe_mode: Whether to enforce safe mode restrictions

Returns:
Tuple of (is_valid, error_message)
"""
target = target.strip()
if not target:
return False, "Target cannot be empty"

# Try parsing as IP network (handles single IP and CIDR)
# -------------------------
# 1. IP / CIDR handling
# -------------------------
try:
net = ipaddress.ip_network(target, strict=False)

# Check blocked networks (Broadcast, Link-local, Multicast)

if any(net.overlaps(blocked) for blocked in BLOCKED_NETWORKS):
return False, "Target overlaps with blocked network range"

# Check for loopback even in non-safe mode if desired (usually allowed for local debugging)
if net.is_loopback and not settings.allow_loopback_scans:
return False, "Loopback scans are disabled in global settings"

# Safe mode: only allow private IPs
if safe_mode:
is_private = any(
(net.version == allowed.version and (net.subnet_of(allowed) or net.overlaps(allowed)))
net.version == allowed.version and
(net.subnet_of(allowed) or net.overlaps(allowed))
for allowed in ALLOWED_PRIVATE
)
if not is_private:
Expand All @@ -211,55 +206,73 @@ def validate_target(target: str, safe_mode: bool = True) -> Tuple[bool, str]:
return True, ""

except ValueError:
# Not an IP address or network, treat as hostname/URL
pass

# Handle URLs
# -------------------------
# 2. URL parsing
# -------------------------
hostname_to_validate = target
parsed_host = _parse_url_hostname(target)
if parsed_host is not None:
hostname_to_validate = parsed_host

# If host is an IP literal (including URL host), validate it via the same IP/CIDR path.
# -------------------------
# 3. IP literal inside hostname (URL case)
# -------------------------
try:
net = ipaddress.ip_network(hostname_to_validate, strict=False)
return validate_target(str(net), safe_mode=safe_mode)
except ValueError:
pass

# Validate hostname format (RFC 1123)
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$', hostname_to_validate):
# -------------------------
# 4. Hostname validation
# -------------------------
hostname_to_validate = hostname_to_validate.strip().lower()

if not re.match(
r'^[a-zA-Z0-9]'
r'([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]'
r'([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$',
hostname_to_validate
):
return False, "Invalid hostname format"

# Check blocked TLDs in safe mode
# -------------------------
# 5. Blocked TLDs
# -------------------------
if safe_mode:
for tld in BLOCKED_TLDS:
if hostname_to_validate.lower().endswith(tld):
if hostname_to_validate.endswith(tld):
return False, f"Domains ending in {tld} are blocked in safe mode"

# Safe mode: resolve hostname and ensure ALL resolved IPs are within private + allowed networks.
# Also protect against rebinding/round-robin by optionally doing a second fresh resolution and validating the union.
# -------------------------
# 6. DNS resolution + SSRF checks
# -------------------------
if safe_mode:
resolved_ips = _resolve_host_ips(hostname_to_validate)
ok, msg = _validate_resolved_ips_safe_mode(resolved_ips)
if not ok:
return ok, msg
return False, "blocked network" if "loopback" in msg.lower() else msg

# DNS rebinding protection
if settings.dns_rebind_check:
resolved_ips2 = _resolve_host_ips_uncached(hostname_to_validate)

union = []
seen = set()
for ip in list(resolved_ips) + list(resolved_ips2):
if ip in seen:
continue
seen.add(ip)
union.append(ip)
if ip not in seen:
seen.add(ip)
union.append(ip)

ok2, msg2 = _validate_resolved_ips_safe_mode(union)
if not ok2:
return ok2, msg2
return False, "blocked network" if "loopback" in msg2.lower() else msg2

return True, ""


# Simple TTL cache: hostname -> (expires_at_epoch, [ips])
_DNS_CACHE: dict[str, tuple[float, list[ipaddress._BaseAddress]]] = {}

Expand Down Expand Up @@ -589,11 +602,8 @@ def is_filesystem_target(target: str) -> bool:
return False

def resolve_and_validate_target(url: str) -> Tuple[bool, str]:
"""Resolve a webhook URL and validate it against SSRF protections.
"""Resolve a webhook URL and validate it against SSRF protections."""

Performs DNS resolution, IP range validation, and port checks
to prevent Server-Side Request Forgery attacks.
"""
try:
parsed = urlparse(url)
except Exception:
Expand All @@ -610,50 +620,76 @@ def resolve_and_validate_target(url: str) -> Tuple[bool, str]:
if port is not None and port not in settings.notification_allowed_ports:
return False, f"Port {port} not in allowed ports: {settings.notification_allowed_ports}"

# Reject raw IP addresses in webhook URLs
# Reject raw IPs
try:
ipaddress.ip_address(hostname)
return False, "Raw IP addresses are not allowed in webhook URLs; use a hostname"
except ValueError:
pass

# Resolve hostname to IPs
try:
resolved = socket.getaddrinfo(hostname, port or 443, proto=socket.IPPROTO_TCP)
except OSError:
return False, f"Could not resolve hostname: {hostname}"

for family, _socktype, _proto, _canonname, sockaddr in resolved:
def resolve_once():
try:
ip = ipaddress.ip_address(sockaddr[0])
except ValueError:
continue
return socket.getaddrinfo(hostname, port or 443, proto=socket.IPPROTO_TCP)
except OSError:
return []

# Check against blocked ranges
for blocked_cidr in settings.notification_blocked_ip_ranges:
def extract_ips(resolved):
ips = []
for _, _, _, _, sockaddr in resolved:
try:
blocked_net = ipaddress.ip_network(blocked_cidr, strict=False)
if ip in blocked_net:
return False, f"Resolved IP {ip} falls in blocked range {blocked_cidr}"
ips.append(ipaddress.ip_address(sockaddr[0]))
except ValueError:
continue
return ips

# Check allowed ranges if configured
if settings.notification_allowed_ip_ranges:
in_allowed = False
for allowed_cidr in settings.notification_allowed_ip_ranges:
def validate_ips(ips):
if not ips:
return False, "Hostname did not resolve to any IPs in safe mode (SecuScan Guardrail)"

for ip in ips:
ip_net = ipaddress.ip_network(ip, strict=False)

# blocked ranges
for blocked_cidr in settings.notification_blocked_ip_ranges:
try:
allowed_net = ipaddress.ip_network(allowed_cidr, strict=False)
if ip in allowed_net:
in_allowed = True
break
blocked_net = ipaddress.ip_network(blocked_cidr, strict=False)
if ip in blocked_net:
return False, "blocked"
except ValueError:
continue
if not in_allowed:
return False, f"Resolved IP {ip} not in allowed ranges"

return True, ""
# allowed ranges (if set)
if settings.notification_allowed_ip_ranges:
allowed = False
for allowed_cidr in settings.notification_allowed_ip_ranges:
try:
allowed_net = ipaddress.ip_network(allowed_cidr, strict=False)
if ip in allowed_net:
allowed = True
break
except ValueError:
continue
if not allowed:
return False, "blocked"

return True, ""

# --- FIRST RESOLUTION ---
resolved1 = extract_ips(resolve_once())
ok, err = validate_ips(resolved1)
if not ok:
return False, err

# --- DNS REBINDING CHECK ---
if settings.dns_rebind_check:
resolved2 = extract_ips(resolve_once())
union = list({*resolved1, *resolved2})

ok, err = validate_ips(union)
if not ok:
return False, "blocked"

return True, ""

def validate_command_network_egress(command: list[str], safe_mode: bool, plugin_id: str, task_id: str) -> Tuple[bool, str]:
"""
Expand Down
Loading
Loading