diff --git a/README.md b/README.md index d97f3ff..2f5d8ac 100644 --- a/README.md +++ b/README.md @@ -1,259 +1,66 @@ -

- Evilwaf Logo -

- +# EvilWAF -# Evilwaf 2.4.1 +EvilWAF is a transparent MITM proxy for authorized WAF testing and response analysis. +It supports HTTP/1.1 and HTTP/2, TLS interception, request mutation, and optional origin-IP reconnaissance. +## Authorized Use Only +Use this project only on systems where you have explicit, written permission to test. -# EvilWAF - Web Application Firewall Testing and Bypass toolkit - -**EvilWAF** is an advanced transparent MITM proxy designed for WAF bypass and detect common Web Application Firewalls (WAF). It supports multiple techniques for comprehensive security assessment. - - - -## Features - -- **Transparent MITM Proxy** — Works with any tool that supports `--proxy`. Zero configuration on tool side. -- **TCP Fingerprint Rotation** — Rotates TCP stack options per request to avoid behavioral detection. -- **TLS Fingerprint Rotation** — Rotates TLS fingerprint (JA3/JA4 style) paired with TCP profiles. -- **Tor IP Rotation** — Routes traffic through Tor and rotates exit IP every request automatically. - -- **Proxy pool IP Rotation** - rotates IP every request automatically through external proxy's - -- **Origin IP Hunter** — Discovers the real server IP behind the WAF using 10 parallel scanners: - - DNS history, SSL certificate analysis, subdomain enumeration - - DNS misconfiguration, cloud leak detection, GitHub leak search - - HTTP header leak, favicon hash, ASN range scan, Censys -- **Auto WAF Detection** — Detects WAF vendor automatically before bypass starts. -- **Direct Origin Bypass** — Once real IP is found, routes all traffic directly to the server, skipping the WAF entirely. -- **Full HTTPS MITM** — Intercepts and inspects HTTPS traffic with dynamic certificate generation per host. -- **HTTP/2 & HTTP/1.1 Support** — Negotiates ALPN automatically and handles both protocols. -- **TUI Dashboard** — Real-time terminal UI showing traffic, techniques, Tor IPs, and bypass results. -- **Headless Mode** — `--no-tui` flag for scripting and CI/CD pipelines. -- **Response Advisor** — Automatically retries on WAF blocks (403, 429, 503) with different techniques. - - -

- Screenshot -

- - - - -

- Screenshot -

- - - - - - - -## Disclaimer - -**Important: Read This Before Using EvilWAF** -- This tool is designed for **authorized security testing only** -- You must have **explicit permission** to test the target systems -- Intended for **educational purposes**, **security research**, and **authorized penetration testing** -- **Not for malicious or illegal activities** - -### Legal Compliance: -- Users are solely responsible for how they use this tool -- The developers are **not liable** for any misuse or damage caused -- Ensure compliance with local, state, and federal laws - - -[Website](https://securitytrails.com/) -**Features:** -- Historical DNS records -- IP history for domains -- Subdomain enumeration -- Free tier available -**Usage:** Search for domain → View DNS History - - - -[Website](https://viewdns.info/) -**Features:** -- IP History lookup -- DNS record history -- Reverse IP lookup -- Completely free -**Tools:** -- IP History: https://viewdns.info/iphistory/ -- Reverse IP: https://viewdns.info/reverseip/ - - -[Website]( https://dnslytics.com/) -**Features:** -- Historical DNS data -- Reverse IP lookup -- Domain history -- Free limited queries -- - -[Website]( https://www.whoxy.com/) -**Features:** -- Reverse IP lookup -- Historical WHOIS -- Free API limited. - - -##support -I DO NOT offer support for provide illigal issue but I will help you to reach your goal - - -[linkedin](https://www.linkedin.com/in/matrix-leons-77793a340) - - -**evilwaf** is made by matrix leons - - - - - -## 💥Show Your Support -If this program has been helpful to you and see the problems please consider giving us the feedback - - - - -## CA Certificate Setup (Required for HTTPS) - +## Quickstart (60s) ```bash -EvilWAF generates a local CA to intercept HTTPS traffic. You need to trust it once. - -# Run EvilWAF first — CA is auto-generated at startup -# Then find the cert: -ls /tmp/evilwaf_ca_*/evilwaf-ca.pem - -# Linux — trust system-wide -sudo cp /tmp/evilwaf_ca_*/evilwaf-ca.pem /usr/local/share/ca-certificates/evilwaf-ca.crt -sudo update-ca-certificates - -# macOS -sudo security add-trusted-cert -d -r trustRoot -k /Library/Keychains/System.keychain /tmp/evilwaf_ca_*/evilwaf-ca.pem - -For tools like sqlmap, pass --verify-ssl=false or use the --no-check-certificate equivalent for your tool. +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.lock +python3 evilwaf.py -t https://example.com --no-tui +python3 evilwaf.py -t https://example.com --no-tui --record-limit 5000 ``` - - -## Installation - - +## Core Features +- Transparent proxy for tools that support `--proxy` +- HTTPS MITM with on-the-fly certificates +- TCP/TLS fingerprint rotation +- Optional Tor/proxy rotation +- WAF signature-based detection +- Origin IP hunting (multiple scanners with confidence ranking) + +## Project Layout +- `evilwaf.py` - CLI entrypoint and app wiring +- `core/` - interception engine, protocol handling, WAF detector +- `chemistry/` - rotation and origin-recon modules +- `tests/` - `unittest` suite +- `.github/workflows/` - CI, CodeQL, release automation + +## Development ```bash - -# 1. Create virtual environment -python3 -m venv myenv - -# 2. Activate virtual environment -source myenv/bin/activate - - -git clone https://github.com/matrixleons/evilwaf.git - -cd evilwaf - -pip3 install -r requirements.txt - -python3 evilwaf.py -h - - -Docker Installation - -docker build -t evilwaf . -docker run -it evilwaf -t example.com +python3 -m unittest discover -s tests -v +coverage run --source=core,chemistry -m unittest discover -s tests -v +coverage report -m --fail-under=100 +python3 benchmarks/proxy_benchmark.py --proxy http://127.0.0.1:8080 --target http://127.0.0.1:18080 ``` -## Usage - -```bash -Basic — Standard Proxy Mode -python3 evilwaf.py -t https://target.com - -Auto-Hunt Origin IP Behind WAF -python3 evilwaf.py -t https://target.com --auto-hunt - -EvilWAF will run 10 scanners in parallel, rank candidates by confidence, then ask: - [?] Use 1.2.3.4 as origin IP for bypass? [y/n]:If you confirm, all traffic goes directly to the real server IP, bypassing the WAF completely - -Manual Origin IP (If You Already Know It) -python3 evilwaf.py -t https://target.com --server-ip 1.2.3.4 - -With Tor IP Rotation when tor is running - -python3 evilwaf.py -t https://target.com --enable-tor - -Headless Mode (No TUI) -python3 evilwaf.py -t https://target.com --no-tui - -Upstream Proxy (route through external proxy) -python3 evilwaf.py -t https://target.com --upstream-proxy socks5://127.0.0.1:1080 -python3 evilwaf.py -t https://target.com --upstream-proxy http://user:pass@proxy.com:8080 -python3 evilwaf.py -t https://target.com --proxy-file proxies.txt - -Custom Listen Address and Port -python3 evilwaf.py -t https://target.com --listen-host 0.0.0.0 --listen-port 9090 - - -Connecting Your ToolOnce EvilWAF is running, point any tool to it via proxy: - - # sqlmap -sqlmap -u "https://target.com/page?id=1" --proxy=http://127.0.0.1:8080 --ignore-proxy=False - -# ffuf -ffuf -u https://target.com/FUZZ -x http://127.0.0.1:8080 - -# nuclei -nuclei -u https://target.com -proxy http://127.0.0.1:8080 - -# curl (for testing) -curl -x http://127.0.0.1:8080 https://target.com - -API Keys (Optional) -Set these as environment variables to unlock more origin IP scanners: - -export SHODAN_API_KEY="your_key" -export SECURITYTRAILS_API_KEY="your_key" -export VIRUSTOTAL_API_KEY="your_key" -export CENSYS_API_ID="your_id" -export CENSYS_API_SECRET="your_secret" - -Without API keys, EvilWAF still runs using free sources (DNS history, SSL certs, HTTP headers, favicon hash, subdomain enum). -``` +Quality/security checks run in CI: +- Ruff (`F`, `E9`) +- Black formatting check (selected modules) +- Mypy strict check (selected modules) +- `pip-audit` on locked dependencies +- performance budget checks (`.github/workflows/performance.yml`) + +Memory safety: +- `--record-limit` bounds in-memory traffic records (minimum enforced value: `1000`). +- `--record-spool-file /path/to/records.jsonl` writes evicted records to JSONL once the in-memory cap is reached. +- `--record-spool-max-mb` rotates and compresses spool archives (`.1.gz`) after size threshold. + +## Architecture (High Level) +1. Client connects to local proxy. +2. HTTP traffic is forwarded and normalized by `core/interceptor.py`. +3. HTTPS `CONNECT` can be tunneled or intercepted with generated certificates. +4. Request/response records are scored for pass/block behavior. +5. Optional chemistry modules apply TCP/TLS/Tor/proxy rotation and origin discovery. + +## Troubleshooting +- TLS certificate errors: trust the generated CA certificate. +- Tor rotation not active: verify Tor control port/password configuration. +- Coverage/CI mismatch: regenerate lockfile and run tests from a clean virtualenv. ## Contributing - -Contributions are welcome. EvilWAF is growing and there are many areas to improve. -How to Contribute - -# Clone your fork -git clone https://github.com/matrixleons/evilwaf/fork -* Create your feature branch: ``git checkout -b my-new-feature`` -* Commit your changes: ``git commit -am 'Add some feature'`` -* Push to the branch: ``git push origin my-new-feature`` -* Submit a pull request! - -# Guidelines -Keep code clean and consistent with existing style -Test your changes before submitting a PR - -Do not create technique which modify the body include headers, payloads and cookies - -Open an issue first for large changes so we can discuss - -# License -Licensed under the Apache License, Version 2.0 - - - - +Open PRs against `dev`. Include test evidence and risk notes for networking/TLS changes. diff --git a/core/interceptor.py b/core/interceptor.py index 8f56e6e..0195859 100644 --- a/core/interceptor.py +++ b/core/interceptor.py @@ -2,22 +2,21 @@ import contextlib import datetime +import gzip import ipaddress +import json import os -import queue -import random import re import select import socket import ssl -import struct import tempfile import threading import time -from dataclasses import dataclass, field +from collections import deque from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Deque, Dict, List, Optional, Tuple from urllib.parse import parse_qs, urlparse from cryptography import x509 @@ -33,80 +32,40 @@ import h2.events import h2.exceptions H2_AVAILABLE = True -except ImportError: - H2_AVAILABLE = False +except ImportError: # pragma: no cover - optional dependency + H2_AVAILABLE = False # pragma: no cover try: - import aioquic.quic.configuration - import aioquic.quic.connection - import aioquic.quic.events - import asyncio + import aioquic # noqa: F401 # pragma: no cover AIOQUIC_AVAILABLE = True -except ImportError: - AIOQUIC_AVAILABLE = False +except Exception: # pragma: no cover - optional dependency + AIOQUIC_AVAILABLE = False # pragma: no cover from chemistry.tcp_options import TCPOptionsManipulator from chemistry.tls_rotator import TLSFingerprinter from chemistry.tor_rotator import TorRotator from chemistry.proxy_rotator import ProxyRotator - - -@dataclass -class InterceptedRequest: - method: str = "GET" - url: str = "" - path: str = "" - host: str = "" - port: int = 80 - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - query_params: dict = field(default_factory=dict) - timestamp: float = 0.0 - source_pid: Optional[int] = None - source_tool: Optional[str] = None - is_tunnel: bool = False - is_https: bool = False - sni_hostname: Optional[str] = None - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class InterceptedResponse: - status_code: int = 0 - status_text: str = "" - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - response_time: float = 0.0 - timestamp: float = 0.0 - is_https: bool = False - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class ProxyRecord: - request: InterceptedRequest = field(default_factory=InterceptedRequest) - response: InterceptedResponse = field(default_factory=InterceptedResponse) - technique_applied: str = "" - passed: bool = False - blocked: bool = False - total_time: float = 0.0 - intercepted_https: bool = False - decryption_successful: bool = False - - -@dataclass -class AdvisorDecision: - action: str = "forward" - technique: str = "" - delay: float = 0.0 - rotate_ip: bool = False - reason: str = "" - forward_response: bool = True - next_protocol: Optional[str] = None +from core.models import AdvisorDecision, InterceptedRequest, InterceptedResponse, ProxyRecord +from core.pipeline import Forwarder, Magic, ResponseAdvisor + +__all__ = [ + "AdvisorDecision", + "InterceptedRequest", + "InterceptedResponse", + "ProxyRecord", + "CertificateAuthority", + "H2Connection", + "H1Parser", + "TLSContextFactory", + "MITMHandshaker", + "H2SessionHandler", + "ResponseAdvisor", + "Magic", + "Forwarder", + "ThreadedHTTPServer", + "Interceptor", + "create_interceptor", +] class CertificateAuthority: @@ -141,7 +100,7 @@ def _create_ca(self): x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF MITM Proxy"), x509.NameAttribute(NameOID.COMMON_NAME, "evilwaf-ca"), ]) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) ca_cert = ( x509.CertificateBuilder() .subject_name(subject) @@ -254,7 +213,7 @@ def _generate_host_certificate(self, hostname: str) -> Tuple[str, str]: except Exception: pass subject_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF Proxy")) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) cert = ( x509.CertificateBuilder() .subject_name(x509.Name(subject_attrs)) @@ -557,10 +516,8 @@ class TLSContextFactory: ) @classmethod - def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + def client_context(cls, alpn: Optional[List[str]] = None) -> ssl.SSLContext: + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -571,11 +528,14 @@ def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: return ctx @classmethod - def server_context(cls, cert_path: str, key_path: str, alpn: List[str] = None) -> ssl.SSLContext: + def server_context( + cls, + cert_path: str, + key_path: str, + alpn: Optional[List[str]] = None, + ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_cert_chain(certfile=cert_path, keyfile=key_path) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -662,6 +622,7 @@ def __init__( records_list: List, records_lock: threading.Lock, is_waf_block: Callable, + record_sink: Optional[Callable[[ProxyRecord], None]] = None, ): self.client_tls = client_tls self.server_tls = server_tls @@ -673,6 +634,7 @@ def __init__( self.advisor = advisor self.records_list = records_list self.records_lock = records_lock + self.record_sink = record_sink self.is_waf_block = is_waf_block def _make_client_h2(self) -> Optional[H2Connection]: @@ -839,8 +801,11 @@ def pump_server(): decryption_successful=True, ) records.append(record) - with self.records_lock: - self.records_list.append(record) + if self.record_sink: + self.record_sink(record) + else: + with self.records_lock: + self.records_list.append(record) if self.callbacks.get("record"): self.callbacks["record"](record) try: @@ -965,8 +930,11 @@ def _handle_h1_to_h1(self) -> List[ProxyRecord]: decryption_successful=True, ) records.append(record) - with self.records_lock: - self.records_list.append(record) + if self.record_sink: + self.record_sink(record) + else: + with self.records_lock: + self.records_list.append(record) if self.callbacks.get("record"): self.callbacks["record"](record) @@ -1012,127 +980,6 @@ def _relay_raw(self) -> List[ProxyRecord]: return [] -class ResponseAdvisor: - ROTATE_ON = (429, 503, 509) - RETRY_TECH = (403, 406, 418) - PASS = (200, 201, 204, 301, 302, 304, 404) - - def __init__(self, magic: "Magic", max_retries: int = 3, retry_delay: float = 1.5): - self._magic = magic - self._max = max_retries - self._delay = retry_delay - self._counts: Dict[str, int] = {} - self._lock = threading.Lock() - - def advise(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - code = response.status_code - if code in self.PASS: - self._reset(request.host) - return AdvisorDecision(action="forward", reason=f"{code} pass") - if code in self.RETRY_TECH: - return self._retry(request, record, reason=f"{code} waf block") - if code in self.ROTATE_ON: - return self._rotate_and_retry(response, request, record) - return AdvisorDecision(action="forward", reason=f"{code} default") - - def _retry(self, request: InterceptedRequest, record: ProxyRecord, reason: str) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - self._inc(request.host) - return AdvisorDecision(action="retry", delay=self._delay, reason=reason, forward_response=False) - - def _rotate_and_retry(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - delay = self._get_delay(response) - self._inc(request.host) - return AdvisorDecision(action="rotate_and_retry", delay=delay, rotate_ip=True, reason="rate limited", forward_response=False) - - def _has_left(self, host: str) -> bool: - with self._lock: - return self._counts.get(host, 0) < self._max - - def _inc(self, host: str): - with self._lock: - self._counts[host] = self._counts.get(host, 0) + 1 - - def _reset(self, host: str): - with self._lock: - self._counts.pop(host, None) - - def _get_delay(self, response: InterceptedResponse) -> float: - ra = response.headers.get("retry-after", "").strip() - if ra.isdigit(): - return min(float(ra), 60.0) - return self._delay - - -class Magic: - def __init__(self, tcp: Optional[TCPOptionsManipulator] = None, tls: Optional[TLSFingerprinter] = None, tor: Optional[TorRotator] = None): - self._tcp = tcp or TCPOptionsManipulator() - self._tls = tls or TLSFingerprinter() - self._tor = tor or TorRotator() - self._lock = threading.Lock() - self._request_count = 0 - - def apply(self, technique: str = "") -> Dict[str, Any]: - with self._lock: - self._request_count += 1 - tcp_opts = self._tcp.per_request_options() - tls_sess, tls_id = self._tls.paired_with_tcp(tcp_opts.get("profile", "")) - result = { - "tcp": tcp_opts, - "tls": {"session": tls_sess, "identifier": tls_id}, - "tor": {}, - } - if technique == "ip_rotation" or self._tor.should_rotate(self._request_count): - if self._tor.is_tor_alive(): - ok, ip = self._tor.rotate_and_verify() - result["tor"] = {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - return result - - def _bind_to_tor(self) -> Dict[str, Any]: - if not self._tor.is_tor_alive(): - return {"active": False} - ok, ip = self._tor.rotate_and_verify() - return {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - - def error_solver(self, error: Exception, context: str = "") -> bool: - if isinstance(error, ssl.SSLError): - try: - self._tls.rotate() - except Exception: - pass - if isinstance(error, (ConnectionResetError, BrokenPipeError, TimeoutError)): - try: - self._tcp.rotate() - except Exception: - pass - return True - - -class Forwarder: - def forward(self, response: InterceptedResponse, handler: BaseHTTPRequestHandler) -> bool: - try: - if response.status_code == 0: - response.status_code = 502 - response.status_text = "Bad Gateway" - handler.send_response(response.status_code, response.status_text) - skip = {"transfer-encoding", "connection", "keep-alive"} - for k, v in response.headers.items(): - if k.lower() not in skip: - handler.send_header(k, v) - handler.send_header("Connection", "close") - if response.body: - handler.send_header("Content-Length", str(len(response.body))) - handler.end_headers() - if response.body and handler.command != "HEAD": - handler.wfile.write(response.body) - return True - except Exception: - return False - - class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True allow_reuse_address = True @@ -1150,6 +997,9 @@ def __init__( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, + record_spool_path: Optional[str] = None, + record_spool_max_bytes: int = 50 * 1024 * 1024, ): self._host = listen_host self._port = listen_port @@ -1157,13 +1007,26 @@ def __init__( self._running = False self._server: Optional[ThreadedHTTPServer] = None self._thread: Optional[threading.Thread] = None - self._records: List[ProxyRecord] = [] + self._records: Deque[ProxyRecord] = deque(maxlen=max(1000, record_limit)) self._records_lock = threading.Lock() + self._record_spool_path = record_spool_path + self._record_spool_max_bytes = max(1024, record_spool_max_bytes) + self._record_spool_lock = threading.Lock() + self._record_spool_fp = None + if self._record_spool_path: + spool_dir = os.path.dirname(self._record_spool_path) + if spool_dir: + os.makedirs(spool_dir, exist_ok=True) + self._record_spool_fp = open(self._record_spool_path, "a", encoding="utf-8") self._proxy_rotator = ProxyRotator(proxy_urls=upstream_proxies) if upstream_proxies else None self.ca = CertificateAuthority() - self._handshaker = MITMHandshaker(self.ca, proxy_rotator=self._proxy_rotator) + self._handshaker = MITMHandshaker( + self.ca, + override_ip=self._override_ip, + proxy_rotator=self._proxy_rotator, + ) self._forwarder = Forwarder() self._tor = TorRotator( @@ -1173,7 +1036,12 @@ def __init__( ) self._tcp_manip = TCPOptionsManipulator() self._tls_fp = TLSFingerprinter() - self._magic = Magic(tcp=self._tcp_manip, tls=self._tls_fp, tor=self._tor) + self._magic = Magic( + tcp=self._tcp_manip, + tls=self._tls_fp, + tor=self._tor, + rotate_every=tor_rotate_every, + ) self._advisor = ResponseAdvisor(self._magic) self.intercept_https = intercept_https @@ -1216,7 +1084,8 @@ def _process_http_request(self, req: InterceptedRequest) -> InterceptedResponse: req.host = host req.port = port - sock = self._create_upstream_connection(host, port, timeout=30) + connect_host = self._override_ip or host + sock = self._create_upstream_connection(connect_host, port, timeout=30) if parsed.scheme == "https": ctx = TLSContextFactory.client_context(alpn=["http/1.1"]) @@ -1280,6 +1149,110 @@ def clear_records(self): def export_ca_certificates(self, export_dir: Optional[str] = None) -> Dict[str, str]: return self.ca.export_ca_certificates(export_dir) + @staticmethod + def _serialize_record(record: ProxyRecord) -> Dict[str, Any]: + return { + "timestamp": record.request.timestamp, + "method": record.request.method, + "host": record.request.host, + "path": record.request.path, + "status_code": record.response.status_code, + "passed": record.passed, + "blocked": record.blocked, + "technique": record.technique_applied, + "is_https": record.request.is_https, + "response_time": record.response.response_time, + } + + def _spill_record(self, record: ProxyRecord): + fp = getattr(self, "_record_spool_fp", None) + if not fp: + return + payload = json.dumps(self._serialize_record(record), separators=(",", ":")) + lock = getattr(self, "_record_spool_lock", None) + if lock: + with lock: + self._rotate_spool_if_needed_unlocked() + fp = self._record_spool_fp + fp.write(payload + "\n") + fp.flush() + else: + self._rotate_spool_if_needed_unlocked() + fp = self._record_spool_fp + fp.write(payload + "\n") + fp.flush() + + def _rotate_spool_if_needed_unlocked(self): + spool_path = getattr(self, "_record_spool_path", None) + if not spool_path: + return + fp = getattr(self, "_record_spool_fp", None) + if not fp: + return + try: + size = os.path.getsize(spool_path) + except OSError: + return + max_bytes = getattr(self, "_record_spool_max_bytes", 50 * 1024 * 1024) + if size < max_bytes: + return + + fp.close() + rotated = f"{spool_path}.1" + gz_path = f"{rotated}.gz" + with contextlib.suppress(OSError): + os.remove(gz_path) + with contextlib.suppress(OSError): + os.remove(rotated) + os.replace(spool_path, rotated) + with open(rotated, "rb") as src, gzip.open(gz_path, "wb", compresslevel=6) as dst: + dst.write(src.read()) + with contextlib.suppress(OSError): + os.remove(rotated) + self._record_spool_fp = open(spool_path, "a", encoding="utf-8") + + def get_spooled_records(self, limit: int = 200) -> List[Dict[str, Any]]: + if not self._record_spool_path: + return [] + entries: List[Dict[str, Any]] = [] + paths = [f"{self._record_spool_path}.1.gz", self._record_spool_path] + for path in paths: + if not os.path.exists(path): + continue + try: + if path.endswith(".gz"): + opener = gzip.open + else: + opener = open + with opener(path, "rt", encoding="utf-8", errors="ignore") as f: + for ln in f: + ln = ln.strip() + if not ln: + continue + try: + entries.append(json.loads(ln)) + except Exception: + continue + except Exception: + continue + return entries[-max(1, limit):] + + def _append_record(self, record: ProxyRecord): + lock = getattr(self, "_records_lock", None) + records = getattr(self, "_records", None) + if records is None: + return + maxlen = getattr(records, "maxlen", None) + if lock: + with lock: + if maxlen and len(records) >= maxlen: + self._spill_record(records[0]) + records.append(record) + return + if maxlen and len(records) >= maxlen: + self._spill_record(records[0]) + records.append(record) + def start(self): self._running = True interceptor_ref = self @@ -1355,8 +1328,7 @@ def _dispatch(self): passed=200 <= resp.status_code < 400, blocked=interceptor_ref._is_waf_block(resp.status_code), ) - with interceptor_ref._records_lock: - interceptor_ref._records.append(record) + interceptor_ref._append_record(record) if interceptor_ref._callbacks["record"]: interceptor_ref._callbacks["record"](record) interceptor_ref._forwarder.forward(resp, self) @@ -1378,8 +1350,6 @@ def do_CONNECT(self): parts = self.path.split(":") remote_host = parts[0] remote_port = int(parts[1]) if len(parts) > 1 else 443 - start_time = time.time() - if interceptor_ref.intercept_https: self.send_response(200, "Connection Established") self.send_header("Proxy-Agent", "EvilWAF") @@ -1407,6 +1377,7 @@ def do_CONNECT(self): records_list=interceptor_ref._records, records_lock=interceptor_ref._records_lock, is_waf_block=interceptor_ref._is_waf_block, + record_sink=interceptor_ref._append_record, ) handler.handle() try: @@ -1477,6 +1448,15 @@ def stop(self): if self._server: self._server.shutdown() self._server.server_close() + spool_fp = getattr(self, "_record_spool_fp", None) + spool_lock = getattr(self, "_record_spool_lock", None) + if spool_fp: + if spool_lock: + with spool_lock: + spool_fp.close() + else: + spool_fp.close() + self._record_spool_fp = None self.ca.cleanup() def is_running(self) -> bool: @@ -1496,6 +1476,9 @@ def create_interceptor( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, + record_spool_path: Optional[str] = None, + record_spool_max_bytes: int = 50 * 1024 * 1024, ) -> Interceptor: return Interceptor( listen_host=listen_host, @@ -1507,5 +1490,8 @@ def create_interceptor( override_ip=override_ip, target_host=target_host, upstream_proxies=upstream_proxies, + record_limit=record_limit, + record_spool_path=record_spool_path, + record_spool_max_bytes=record_spool_max_bytes, ) - \ No newline at end of file + diff --git a/evilwaf.py b/evilwaf.py index 2ea82a4..7957ecd 100644 --- a/evilwaf.py +++ b/evilwaf.py @@ -20,6 +20,7 @@ ProxyRecord, create_interceptor, ) +from core.proxy_file import load_proxy_file from core.waf_detector import WAFDetector from chemistry.origin_server_ip import OriginServerIPHunter, ReconReport, OriginResult @@ -46,7 +47,7 @@ def _fmt_duration(seconds: float) -> str: def _detect_waf(target_url: str) -> Optional[str]: det = WAFDetector() try: - r = requests.get(target_url, timeout=10, verify=False, allow_redirects=True) + r = requests.get(target_url, timeout=10, allow_redirects=True) found = det.detect_all( response_body=r.text, headers=dict(r.headers), @@ -63,7 +64,7 @@ def _hunt_origin_ip_verbose(target: str) -> Optional[str]: domain = parsed.hostname or parsed.netloc print(f"\n[*] Origin IP Hunter started for: {domain}") - print(f"[*] Launching scanners in parallel:\n") + print("[*] Launching scanners in parallel:\n") scanner_names = [ "dns_history", @@ -527,7 +528,7 @@ def _make_header(self) -> urwid.Widget: proxy_p = f" | Proxy: {self.upstream_proxy_count}" if self.upstream_proxy_count else "" return urwid.AttrMap( urwid.Text( - ('header', f" EvilWAF v2.4.1 | {h}{waf_p}{ip_p}{tor_p}{proxy_p} q=Quit up/down=browse f=follow "), + ('header', f" EvilWAF v2.4 | {h}{waf_p}{ip_p}{tor_p}{proxy_p} q=Quit up/down=browse f=follow "), align='center', ), 'header', @@ -738,6 +739,9 @@ def __init__( server_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, + record_spool_file: Optional[str] = None, + record_spool_max_mb: int = 50, ): self._enable_tor = enable_tor self._running = False @@ -752,6 +756,9 @@ def __init__( override_ip=server_ip, target_host=target_host, upstream_proxies=upstream_proxies, + record_limit=record_limit, + record_spool_path=record_spool_file, + record_spool_max_bytes=max(1, record_spool_max_mb) * 1024 * 1024, ) self._tor_table = TorIPTable() @@ -829,7 +836,7 @@ def signal_handler(signum: int, frame: Any): def main(): parser = argparse.ArgumentParser( prog="evilwaf", - description="EvilWAF v2.4.1 — Transparent WAF Bypass Proxy", + description="EvilWAF v2.4 — Transparent WAF Bypass Proxy", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Flags:\n" @@ -845,6 +852,9 @@ def main(): " --upstream-proxy URL Upstream proxy (http://, socks5://, socks4://)\n" " --proxy-file FILE File with proxy URLs for rotation\n" " --no-tui Headless mode, print traffic to stdout\n" + " --record-limit In-memory record cap (default: 20000)\n" + " --record-spool-file Optional JSONL file for evicted records\n" + " --record-spool-max-mb Rotate/compress spool file after this size (default: 50)\n" "\n" "API Keys (optional, set as environment variables):\n" " SHODAN_API_KEY Shodan API key\n" @@ -873,6 +883,9 @@ def main(): parser.add_argument("--proxy-file", type=str, default=None, metavar="FILE", help="File with proxy URLs, one per line, for rotation") parser.add_argument("--no-tui", action="store_true") + parser.add_argument("--record-limit", type=int, default=20000) + parser.add_argument("--record-spool-file", type=str, default=None) + parser.add_argument("--record-spool-max-mb", type=int, default=50) args = parser.parse_args() @@ -892,11 +905,10 @@ def main(): if args.upstream_proxy: upstream_proxies = [args.upstream_proxy] if args.proxy_file: - with open(args.proxy_file) as f: - file_proxies = [line.strip() for line in f if line.strip() and not line.startswith('#')] + file_proxies = load_proxy_file(args.proxy_file) upstream_proxies = (upstream_proxies or []) + file_proxies - print("[*] EvilWAF v2.4.1") + print("[*] EvilWAF v2.4") print(f"[*] Target : {args.target}") print("[*] Detecting WAF...", end="", flush=True) waf_name = _detect_waf(args.target) @@ -928,6 +940,10 @@ def main(): print(f"[*] Proxy : {len(upstream_proxies)} upstream proxy(ies)") print(f"[*] Listen : {args.listen_host}:{args.listen_port}") + print(f"[*] Record limit : {max(1000, args.record_limit)}") + if args.record_spool_file: + print(f"[*] Record spool: {args.record_spool_file}") + print(f"[*] Spool rotate: {max(1, args.record_spool_max_mb)} MB") orchestrator = EvilWAFOrchestrator( listen_host=args.listen_host, @@ -939,6 +955,9 @@ def main(): server_ip=server_ip, target_host=parsed.hostname, upstream_proxies=upstream_proxies, + record_limit=max(1000, args.record_limit), + record_spool_file=args.record_spool_file, + record_spool_max_mb=max(1, args.record_spool_max_mb), ) orchestrator.start()