From 7fd34af39a9a8b0e2910968bc2af9d08a7453fb0 Mon Sep 17 00:00:00 2001 From: Antu7 Date: Tue, 24 Feb 2026 01:38:52 +0600 Subject: [PATCH 1/2] feat: Enhance Nmap scanning with faster options, improved vulnerability parsing, a comprehensive risk scoring system, and a new dedicated report generation module. --- .../Built_In_Automation/Security/nmap_scan.py | 818 ++++++++---------- server/main.py | 2 + server/reports.py | 57 ++ 3 files changed, 431 insertions(+), 446 deletions(-) create mode 100644 server/reports.py diff --git a/Framework/Built_In_Automation/Security/nmap_scan.py b/Framework/Built_In_Automation/Security/nmap_scan.py index 0c046ac24..fd60861ca 100644 --- a/Framework/Built_In_Automation/Security/nmap_scan.py +++ b/Framework/Built_In_Automation/Security/nmap_scan.py @@ -14,7 +14,7 @@ def run_nmap(ip, output_dir=None): xml_output_file = os.path.join(output_dir, f"nmap_scan_{ip}.xml") normal_output_file = os.path.join(output_dir, f"nmap_scan_{ip}.txt") - process = subprocess.Popen(["nmap", "-sV", "--script", "vuln", "-oX", xml_output_file, "-oN", normal_output_file, ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(["nmap", "-sV", "-T4", "--open", "--min-rate", "1000", "--script", "vuln", "-oX", xml_output_file, "-oN", normal_output_file, ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) spinner = ['|', '/', '-', '\\'] start_time = datetime.now() @@ -111,13 +111,38 @@ def parse_nmap_output(xml_file): script_id = script.get("id") script_output = script.get("output") - if "CVE" in script_output or "EXPLOIT" in script_output: + # Check for vulnerability indicators + is_vulnerable = "VULNERABLE" in script_output or "CVE" in script_output or "EXPLOIT" in script_output or "vulnerable" in script_output.lower() + + if is_vulnerable: cve_matches = re.findall(r'(CVE-\d{4}-\d+)', script_output) severity_matches = re.findall(r'(\d\.\d)', script_output) + + # Extract description description_match = re.search(r'VULNERABLE:\s*(.*?)(?=\n\n|\n\s*\|\s*|\Z)', script_output, re.DOTALL) - description = description_match.group(1).strip() if description_match else "No description available" + if not description_match: + # Fallback to just the output if regex fails + description = script_output[:200] + "..." if len(script_output) > 200 else script_output + else: + description = description_match.group(1).strip() - for i in range(len(cve_matches)): + # Logic to handle cases with CVEs and without + if cve_matches: + for i in range(len(cve_matches)): + vulnerabilities.append({ + "ip": ip_address, + "hostname": hostname, + "port": port_id, + "protocol": protocol, + "state": state, + "service": service_info, + "script": script_id, + "cve": cve_matches[i], + "severity": float(severity_matches[i]) if i < len(severity_matches) else 5.0, # Default to medium if severity not found + "description": description + }) + else: + # No CVE found but it is a vulnerability vulnerabilities.append({ "ip": ip_address, "hostname": hostname, @@ -126,8 +151,8 @@ def parse_nmap_output(xml_file): "state": state, "service": service_info, "script": script_id, - "cve": cve_matches[i], - "severity": float(severity_matches[i]) if i < len(severity_matches) else 0, + "cve": "N/A", + "severity": 5.0, # Default risk for unknown vulnerabilities "description": description }) @@ -146,34 +171,43 @@ def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None): "None (0.0)": 0 } + # Calculate Severity Counts for v in vulnerabilities: - if v["severity"] >= 8.0: + # Ensure severity is a float + try: + sev = float(v.get("severity", 0)) + except (ValueError, TypeError): + sev = 0.0 + + if sev >= 8.0: severity_counts["Critical (8.0-10.0)"] += 1 - elif v["severity"] >= 6.0: + elif sev >= 6.0: severity_counts["High (6.0-7.9)"] += 1 - elif v["severity"] >= 4.0: + elif sev >= 4.0: severity_counts["Medium (4.0-5.9)"] += 1 - elif v["severity"] > 0: + elif sev > 0: severity_counts["Low (0.1-3.9)"] += 1 else: severity_counts["None (0.0)"] += 1 + # JSON Data for Charts severity_data = [{"level": k, "count": v} for k, v in severity_counts.items()] json_severity_data = json.dumps(severity_data) - chart_data = [{"cve": v["cve"], "severity": v["severity"]} for v in vulnerabilities] + chart_data = [{"cve": v.get("cve", "N/A"), "severity": v.get("severity", 0)} for v in vulnerabilities] json_data = json.dumps(chart_data) service_counts = {} for v in vulnerabilities: - service_name = v["service"].split(" ")[0] + service_name = v.get("service", "Unknown").split(" ")[0] service_counts[service_name] = service_counts.get(service_name, 0) + 1 service_data = [{"service": k, "count": v} for k, v in service_counts.items()] json_service_data = json.dumps(service_data) port_counts = {} for v in vulnerabilities: - port_counts[v["port"]] = port_counts.get(v["port"], 0) + 1 + port_val = v.get("port", "Unknown") + port_counts[port_val] = port_counts.get(port_val, 0) + 1 port_data = [{"port": k, "count": v} for k, v in port_counts.items()] json_port_data = json.dumps(port_data) @@ -183,81 +217,126 @@ def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None): # Calculate scan duration if available scan_duration = "N/A" if "start_time" in scan_info and "end_time" in scan_info: - start_time = datetime.strptime(scan_info["start_time"], "%Y-%m-%d %H:%M:%S") - end_time = datetime.strptime(scan_info["end_time"], "%Y-%m-%d %H:%M:%S") - duration = end_time - start_time - scan_duration = f"{duration.seconds // 60} minutes, {duration.seconds % 60} seconds" + try: + start_time = datetime.strptime(scan_info["start_time"], "%Y-%m-%d %H:%M:%S") + end_time = datetime.strptime(scan_info["end_time"], "%Y-%m-%d %H:%M:%S") + duration = end_time - start_time + scan_duration = f"{duration.seconds // 60}m {duration.seconds % 60}s" + except Exception: + pass - # Calculate risk score (fixed to be between 0-100) - total_severity = sum(v["severity"] for v in vulnerabilities) + # Calculate risk score (0-100) vuln_count = len(vulnerabilities) + is_clean = vuln_count == 0 + + risk_score = 0 + risk_level = "Secure" + risk_color = "#10b981" # Green - if vuln_count > 0: - # Base score on average severity and number of vulnerabilities + if not is_clean: + total_severity = sum(float(v.get("severity", 0)) for v in vulnerabilities) avg_severity = total_severity / vuln_count - # Scale from 0-10 to 0-70 (severity component) severity_component = (avg_severity / 10) * 70 - # Scale count component (max out at 20 vulnerabilities) count_component = min(vuln_count / 20, 1) * 30 risk_score = round(severity_component + count_component) - # Ensure score is capped at 100 risk_score = min(risk_score, 100) - else: - risk_score = 0 - + + if risk_score >= 80: + risk_level = "Critical" + risk_color = "#dc3545" + elif risk_score >= 60: + risk_level = "High" + risk_color = "#f59e0b" + elif risk_score >= 40: + risk_level = "Medium" + risk_color = "#3b82f6" + else: + risk_level = "Low" + risk_color = "#10b981" + + # Network Info Rows network_info_rows = "" for info in scan_info.get("network_info", []): network_info_rows += f""" - {info['type']} - {info['address']} - {info['vendor']} + {info.get('type', 'Unknown')} + {info.get('address', 'N/A')} + {info.get('vendor', '')} """ + + # Vulnerability Table Rows table_rows = "" - for v in sorted(vulnerabilities, key=lambda x: x['severity'], reverse=True): - if v['severity'] >= 8.0: - severity_class = "critical" - elif v['severity'] >= 6.0: - severity_class = "high" - elif v['severity'] >= 4.0: - severity_class = "medium" - elif v['severity'] > 0: - severity_class = "low" + sorted_vulns = sorted(vulnerabilities, key=lambda x: float(x.get('severity', 0)), reverse=True) + + for v in sorted_vulns: + sev = float(v.get('severity', 0)) + if sev >= 8.0: + severity_class = "severity-critical" + sev_label = "CRITICAL" + elif sev >= 6.0: + severity_class = "severity-high" + sev_label = "HIGH" + elif sev >= 4.0: + severity_class = "severity-medium" + sev_label = "MEDIUM" + elif sev > 0: + severity_class = "severity-low" + sev_label = "LOW" else: - severity_class = "none" + severity_class = "severity-none" + sev_label = "INFO" + + cve_display = v.get('cve', 'N/A') + cve_link = f'{cve_display}' if cve_display != "N/A" else "N/A" table_rows += f""" - {v['ip']} - {v['hostname']} - {v['port']}/{v['protocol']} - {v['service']} - {v['cve']} + {sev} + +
{v.get('service', 'Unknown')}
+
{v.get('port', 'N/A')}/{v.get('protocol', 'tcp')}
+ + {cve_link} - - {v['severity']} - +
{v.get('description', 'No description')}
+
Script: {v.get('script', 'Unknown')}
- {v['description']} """ - + + # HTML Template Construction html_template = f""" - Security Vulnerability Report + Security Scan Report - {target_ip} + + + + +
+
+ +
+
{scan_date}
+
Duration: {scan_duration}
+
+
+
+
-
-

Security Vulnerability Report

-

Target: {target_ip}

+ +
+ {'
' if is_clean else f'
!
'} +
+ {'No Security Risks Found' if is_clean else f'{vuln_count} Security Risks Detected'} +
+
+ {'Great job! No known vulnerabilities were detected on the target system.' if is_clean else f'The scan identified potential vulnerabilities with a Risk Score of {risk_score}/100. Immediate attention is recommended for Critical and High severity issues.'} +
-
- Scan Date: {scan_date} | - Scan Duration: {scan_duration} | - Total Hosts Scanned: {scan_info.get("total_hosts", "N/A")} | - Hosts Up: {scan_info.get("up_hosts", "N/A")} | - Report Generated: {current_datetime} -
- -
-

Network Information

- - - - - - - {network_info_rows} -
TypeAddressVendor/Info
-
- -
-
-

Total Vulnerabilities

-

{len(vulnerabilities)}

+ {'' if not is_clean else ''} + {f''' +
+
+
{severity_counts["Critical (8.0-10.0)"]}
+
Critical
-
-

Risk Score

-

{risk_score}/100

-
-
-
+
+
{severity_counts["High (6.0-7.9)"]}
+
High
+
+
+
{severity_counts["Medium (4.0-5.9)"]}
+
Medium
-
-

Critical Vulnerabilities

-

{severity_counts["Critical (8.0-10.0)"]}

+
+
{severity_counts["Low (0.1-3.9)"]}
+
Low
-

Vulnerability Overview

-
-
- +
Vulnerability Analysis
+
+
+
-
+
-
- -
+ ''' if not is_clean else ''} +
-

Top Vulnerabilities by Severity

-
- -
+
Network Information
+ + + + + + + + + + {network_info_rows} + +
TypeAddressVendor / Info
+ {'' if not is_clean else ''} + {f'''
-

Vulnerability Details

-
- +
Detailed Findings
+
+ - - - - - - + + + + + {table_rows} -
IP AddressHostnamePort/ProtocolServiceCVESeveritySevServiceCVE Description
-
+ +
+ ''' if not is_clean else ''}
-

This report was automatically generated and provides a summary of potential security vulnerabilities. - All findings should be verified by a security professional.

+

Generated by Zeuz Security Automation Framework • {current_datetime}

- + @@ -742,7 +668,7 @@ def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None): output_file = os.path.join(output_dir, f"security_report_{target_ip}.html") with open(output_file, "w") as f: f.write(html_template) - + print(f"Enhanced security report saved: {output_file}") return output_file diff --git a/server/main.py b/server/main.py index 8098c5ae2..d5fd12b10 100644 --- a/server/main.py +++ b/server/main.py @@ -10,6 +10,7 @@ from server.node_operator import router as operator_router from server.mobile import router as mobile_router, upload_android_ui_dump from server.mac import router as mac_router +from server.reports import router as security_reports_router from server.linux import router as linux_router from server.installers import router as installers_router import asyncio @@ -42,6 +43,7 @@ def main() -> FastAPI: v1router.include_router(evaluator_router) v1router.include_router(mobile_router) v1router.include_router(mac_router) + v1router.include_router(security_reports_router) v1router.include_router(linux_router) v1router.include_router(installers_router) app = FastAPI() diff --git a/server/reports.py b/server/reports.py new file mode 100644 index 000000000..2bafe5766 --- /dev/null +++ b/server/reports.py @@ -0,0 +1,57 @@ +import os +import glob +from pathlib import Path +from fastapi import APIRouter +from fastapi.responses import FileResponse, JSONResponse +from typing import Optional + +router = APIRouter(prefix="/debug/reports/security", tags=["security-reports"]) + +# Resolve the Node's root directory relative to THIS file's location +# This file lives at: /server/reports.py +NODE_ROOT = Path(__file__).resolve().parent.parent +AUTO_LOG_DIR = NODE_ROOT / "AutomationLog" + + +@router.get("") +def get_security_report(file_path: Optional[str] = None): + """ + Security report downloader for the ZeuZ Node. + + - file_path (optional): exact path of the report file (absolute OR relative to a run dir). + If not given, the newest HTML file in the security_report folder is returned. + """ + try: + # ── Case 1: caller supplied an explicit file path ────────────────────── + if file_path: + p = Path(file_path) + + # Absolute path provided + if p.is_absolute() and p.is_file(): + return FileResponse(str(p)) + + # Relative path – search across every debug run folder (newest first) + run_dirs = sorted( + AUTO_LOG_DIR.glob("debug_*/session_*/*"), + key=lambda d: d.stat().st_mtime, + reverse=True, + ) + for run_dir in run_dirs: + candidate = run_dir / file_path + if candidate.is_file(): + return FileResponse(str(candidate)) + + # ── Case 2: auto-find the newest HTML in any security_report folder ── + candidates = list(AUTO_LOG_DIR.glob("debug_*/session_*/*/security_report/*.html")) + + if not candidates: + return JSONResponse( + {"error": f"No security report found. Searched in: {AUTO_LOG_DIR}"}, + status_code=404, + ) + + latest_report = max(candidates, key=lambda f: f.stat().st_mtime) + return FileResponse(str(latest_report)) + + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) From b158cf78791b240f7a74ad5c61790463c0cf8065 Mon Sep 17 00:00:00 2001 From: Antu7 Date: Thu, 26 Feb 2026 00:35:45 +0600 Subject: [PATCH 2/2] Cryptography Security Testing added --- .../Built_In_Automation/Security/nmap_scan.py | 342 +++++++++++++++++- 1 file changed, 339 insertions(+), 3 deletions(-) diff --git a/Framework/Built_In_Automation/Security/nmap_scan.py b/Framework/Built_In_Automation/Security/nmap_scan.py index fd60861ca..a0b190123 100644 --- a/Framework/Built_In_Automation/Security/nmap_scan.py +++ b/Framework/Built_In_Automation/Security/nmap_scan.py @@ -8,6 +8,10 @@ from pathlib import Path import shutil from datetime import datetime, timedelta +import socket +import ssl +import urllib.request +import urllib.parse def run_nmap(ip, output_dir=None): os.makedirs(output_dir, exist_ok=True) @@ -160,7 +164,7 @@ def parse_nmap_output(xml_file): return vulnerabilities, scan_info -def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None): +def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None, crypto_data=None): """Generate HTML report for security vulnerabilities.""" severity_counts = { @@ -595,6 +599,8 @@ def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None):
''' if not is_clean else ''} + {get_crypto_html_snippets(crypto_data) if crypto_data else ''} +

Generated by Zeuz Security Automation Framework • {current_datetime}

@@ -675,7 +681,11 @@ def generate_html(vulnerabilities, scan_info, target_ip, output_dir=None): def nmap_scan_run(url, security_report_dir=None): ip_address = url - security_report_dir.mkdir(parents=True, exist_ok=True) + if hasattr(security_report_dir, 'mkdir'): + security_report_dir.mkdir(parents=True, exist_ok=True) + elif security_report_dir: + os.makedirs(security_report_dir, exist_ok=True) + print(f"Saving all reports directly to: {security_report_dir}") print("Running Nmap scan. It may take a while...") xml_result, text_result = run_nmap(ip_address, security_report_dir) @@ -686,11 +696,337 @@ def nmap_scan_run(url, security_report_dir=None): print("Parsing results...") vuln_data, scan_info = parse_nmap_output(xml_result) + print("Running Cryptography & Surface Level scan (Background)...") + crypto_data = get_cryptography_data(ip_address) + print("Generating HTML report...") - html_result = generate_html(vuln_data, scan_info, ip_address, security_report_dir) + html_result = generate_html(vuln_data, scan_info, ip_address, security_report_dir, crypto_data) return { "xml": xml_result, "txt": text_result, + "html": html_result, + "crypto": crypto_data + } + +def get_crypto_html_snippets(crypto_data): + ssl_info = crypto_data.get('ssl', {}) + headers_info = crypto_data.get('headers', {}) + banner_info = crypto_data.get('banner', {}) + whois_info = crypto_data.get('whois', {}) + geo_info = crypto_data.get('geo', {}) + osint_info = crypto_data.get('osint', {}) + + def badge(value, expected=None, good='Good', is_tls=False): + if is_tls: + if 'Good' in value: return f'{value}' + else: return f'{value}' + + if isinstance(value, str) and value.lower() in ['missing', 'invalid', 'hidden', 'unknown', 'true', 'false']: + if value.lower() == 'true' and expected is False: + return f'Yes' + elif value.lower() == 'false' and expected is False: + return f'No' + elif value.lower() == 'missing' or (value.lower() == 'true'): + return f'{value}' + return f'{value}' + + if expected is not None: + if value == expected: + return f'{good}' + else: + if ssl_info.get('blocked'): + return f'Domain Blocked' + return f'High Risk' + + return f'{value}' + + tls_versions_html = "" + for tls_ver, strength in ssl_info.get("tls_versions", []): + tls_versions_html += f"
{tls_ver}: {badge(strength, is_tls=True)}
" + if not tls_versions_html: + tls_versions_html = "
Unknown - Detection failed
" + + ssl_error_html = f" ({ssl_info.get('error', 'Validation Failed')})" if not ssl_info.get('valid') and ssl_info.get('error') else "" + + osint_html = "" + if osint_info.get('subdomains'): + osint_html = f""" +
+
OSINT & Web Footprint
+ + + + +
Public Subdomains (Top 15 via crt.sh & HackerTarget){'
'.join(f"{s}" for s in osint_info['subdomains'])}
+
""" + + return f""" +
+
SSL / TLS Configuration
+ + + + + + + + + + +
Valid Certificate{badge(ssl_info.get('valid', 'Unknown'), expected=True, good='Yes')}{ssl_error_html}
Expired{badge(str(ssl_info.get('expired', 'Unknown')), expected=False, good='No')}
Self-Signed{badge(str(ssl_info.get('self_signed', 'Unknown')), expected=False, good='No')}
Wrong Hostname{badge(str(ssl_info.get('wrong_hostname', 'Unknown')), expected=False, good='No')}
Issuer{ssl_info.get('issuer', 'Unknown')}
Expiry Date{ssl_info.get('not_after', 'Unknown')}
Supported TLS Versions{tls_versions_html}
+
+ +
+
Security Headers
+ + + + + + +
Strict-Transport-Security{badge(headers_info.get('Strict-Transport-Security', 'Missing'))}
Content-Security-Policy{badge(headers_info.get('Content-Security-Policy', 'Missing'))}
X-Content-Type-Options{badge(headers_info.get('X-Content-Type-Options', 'Missing'))}
+
+ +
+
Server Banner Information
+ + + + + +
Server{banner_info.get('Server', 'Hidden')}
X-Powered-By{banner_info.get('X-Powered-By', 'Hidden')}
+
+ +
+
WHOIS Information
+ + + + + + +
Registrar{whois_info.get('Registrar', 'Unknown')}
Creation Date{whois_info.get('Creation Date', 'Unknown')}
Expiry Date{whois_info.get('Expiry Date', 'Unknown')}
+
+ +
+
IP Geolocation
+ + + + + + +
IP Address{geo_info.get('IP', 'Unknown')}
Country{geo_info.get('Country', 'Unknown')}
Hosting Provider / ISP{geo_info.get('Hosting Provider', 'Unknown')}
+
+ {osint_html} +""" + +def generate_crypto_html(report_data, target, output_dir=None): + current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + domain = report_data.get('domain', target) + + html_template = f""" + + + + + + Cryptography Scan Report - {domain} + + + + +
+
+ +
+
{current_datetime}
+
+
+
+
+ {get_crypto_html_snippets(report_data)} + +
+ + + """ + + if output_dir: + os.makedirs(output_dir, exist_ok=True) + else: + output_dir = '.' + out_path = os.path.join(output_dir, f"crypto_report_{domain.replace('.', '_')}.html") + with open(out_path, "w", encoding="utf-8") as f: + f.write(html_template) + print(f"Enhanced cryptography report saved: {out_path}") + return out_path + + +def cryptography_scan_run(url, security_report_dir=None): + if hasattr(security_report_dir, 'mkdir'): + security_report_dir.mkdir(parents=True, exist_ok=True) + elif security_report_dir: + os.makedirs(security_report_dir, exist_ok=True) + + print(f"Running Cryptography scan for {url}...") + +def get_cryptography_data(url): + if not url.startswith("http"): + url = "https://" + url + + parsed = urllib.parse.urlparse(url) + domain = parsed.netloc.split(':')[0] + + report_data = { + "domain": domain, + "url": url, + "ssl": {"valid": False, "expired": False, "self_signed": False, "wrong_hostname": False, "tls_versions": []}, + "headers": {}, + "banner": {}, + "whois": {}, + "geo": {} + } + # 1. SSL Certificate Info + print(f"[{domain}] Fetching SSL/TLS Certificate information...", flush=True) + try: + context = ssl.create_default_context() + try: + with socket.create_connection((domain, 443), timeout=2) as sock: + with context.wrap_socket(sock, server_hostname=domain) as ssock: + cert = ssock.getpeercert() + report_data['ssl']['expired'] = ssl.cert_time_to_seconds(cert['notAfter']) < time.time() + + issuer = dict(x[0] for x in cert.get('issuer', [])) + subject = dict(x[0] for x in cert.get('subject', [])) + report_data['ssl']['self_signed'] = issuer == subject + + report_data['ssl']['valid'] = True + report_data['ssl']['not_after'] = cert['notAfter'] + report_data['ssl']['issuer'] = issuer.get('organizationName', issuer.get('commonName', 'Unknown')) + + # Check TLS Version while socket is open + ver = ssock.version() + if ver: + strength = 'Good' if ver in ['TLSv1.2', 'TLSv1.3'] else 'Weak' + report_data['ssl']['tls_versions'] = [(ver, strength)] + + except ssl.CertificateError as e: + report_data['ssl']['wrong_hostname'] = True + report_data['ssl']['valid'] = False + report_data['ssl']['error'] = str(e) + + # Reconnect without verification to get cert info anyway + ctx_no_verify = ssl._create_unverified_context() + with socket.create_connection((domain, 443), timeout=2) as sock: + with ctx_no_verify.wrap_socket(sock, server_hostname=domain) as ssock: + cert = ssock.getpeercert() + if cert: + report_data['ssl']['expired'] = ssl.cert_time_to_seconds(cert['notAfter']) < time.time() + issuer = dict(x[0] for x in cert.get('issuer', [])) + report_data['ssl']['not_after'] = cert['notAfter'] + report_data['ssl']['issuer'] = issuer.get('organizationName', issuer.get('commonName', 'Unknown')) + ver = ssock.version() + if ver: + strength = 'Good' if ver in ['TLSv1.2', 'TLSv1.3'] else 'Weak' + report_data['ssl']['tls_versions'] = [(ver, strength)] + except Exception as e: + report_data['ssl']['valid'] = 'Error' + report_data['ssl']['error'] = str(e) + if '[Errno 54]' in str(e) or 'timed out' in str(e).lower() or 'Connection refused' in str(e): + report_data['ssl']['blocked'] = True + print(f"[{domain}] WARNING: Access Blocked/Reset by Domain Firewall/WAF.", flush=True) + + # 2. HTTP Headers & Server Banner + print(f"[{domain}] Fetching HTTP Security Headers...", flush=True) + try: + ctx = ssl._create_unverified_context() + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}) + with urllib.request.urlopen(req, context=ctx, timeout=2) as resp: + headers = resp.headers + except urllib.error.HTTPError as e: + headers = e.headers + except Exception as e: + headers = {} + report_data['headers']['error'] = str(e) + + if headers: + report_data['headers'] = { + "Strict-Transport-Security": headers.get("Strict-Transport-Security", "Missing"), + "Content-Security-Policy": headers.get("Content-Security-Policy", "Missing"), + "X-Content-Type-Options": headers.get("X-Content-Type-Options", "Missing") + } + report_data['banner'] = { + "Server": headers.get("Server", "Hidden"), + "X-Powered-By": headers.get("X-Powered-By", headers.get("X-Powered-By-Plesk", "Hidden")) + } + + # 3. IP Geolocation ONLY + print(f"[{domain}] Fetching IP Geolocation...", flush=True) + try: + ip = socket.gethostbyname(domain) + geo_req = urllib.request.Request(f"http://ip-api.com/json/{ip}") + with urllib.request.urlopen(geo_req, timeout=2) as geo_resp: + geo_data = json.loads(geo_resp.read().decode()) + report_data['geo'] = { + "IP": ip, + "Country": geo_data.get("country", "Unknown"), + "Hosting Provider": geo_data.get("isp", geo_data.get("org", "Unknown")) + } + except Exception as e: + report_data['geo'] = {"IP": socket.gethostbyname(domain) if 'domain' in locals() else 'Unknown'} + + return report_data + + +def cryptography_scan_run(url, security_report_dir=None): + if hasattr(security_report_dir, 'mkdir'): + security_report_dir.mkdir(parents=True, exist_ok=True) + elif security_report_dir: + os.makedirs(security_report_dir, exist_ok=True) + + print(f"Running Cryptography scan for {url}...") + + report_data = get_cryptography_data(url) + domain = report_data.get("domain", url) + + html_result = generate_crypto_html(report_data, domain, security_report_dir) + print("Cryptography scan complete!") + return { + "report_data": report_data, "html": html_result } \ No newline at end of file