diff --git a/README.md b/README.md
index d97f3ff..044b59c 100644
--- a/README.md
+++ b/README.md
@@ -1,259 +1,65 @@
-
-
-
-
+# EvilWAF
-# Evilwaf 2.4.1
+EvilWAF is a transparent MITM proxy for authorized WAF testing and response analysis.
+It supports HTTP/1.1 and HTTP/2, TLS interception, request mutation, and optional origin-IP reconnaissance.
+## Authorized Use Only
+Use this project only on systems where you have explicit, written permission to test.
-# EvilWAF - Web Application Firewall Testing and Bypass toolkit
-
-**EvilWAF** is an advanced transparent MITM proxy designed for WAF bypass and detect common Web Application Firewalls (WAF). It supports multiple techniques for comprehensive security assessment.
-
-
-
-## Features
-
-- **Transparent MITM Proxy** — Works with any tool that supports `--proxy`. Zero configuration on tool side.
-- **TCP Fingerprint Rotation** — Rotates TCP stack options per request to avoid behavioral detection.
-- **TLS Fingerprint Rotation** — Rotates TLS fingerprint (JA3/JA4 style) paired with TCP profiles.
-- **Tor IP Rotation** — Routes traffic through Tor and rotates exit IP every request automatically.
-
-- **Proxy pool IP Rotation** - rotates IP every request automatically through external proxy's
-
-- **Origin IP Hunter** — Discovers the real server IP behind the WAF using 10 parallel scanners:
- - DNS history, SSL certificate analysis, subdomain enumeration
- - DNS misconfiguration, cloud leak detection, GitHub leak search
- - HTTP header leak, favicon hash, ASN range scan, Censys
-- **Auto WAF Detection** — Detects WAF vendor automatically before bypass starts.
-- **Direct Origin Bypass** — Once real IP is found, routes all traffic directly to the server, skipping the WAF entirely.
-- **Full HTTPS MITM** — Intercepts and inspects HTTPS traffic with dynamic certificate generation per host.
-- **HTTP/2 & HTTP/1.1 Support** — Negotiates ALPN automatically and handles both protocols.
-- **TUI Dashboard** — Real-time terminal UI showing traffic, techniques, Tor IPs, and bypass results.
-- **Headless Mode** — `--no-tui` flag for scripting and CI/CD pipelines.
-- **Response Advisor** — Automatically retries on WAF blocks (403, 429, 503) with different techniques.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-## Disclaimer
-
-**Important: Read This Before Using EvilWAF**
-- This tool is designed for **authorized security testing only**
-- You must have **explicit permission** to test the target systems
-- Intended for **educational purposes**, **security research**, and **authorized penetration testing**
-- **Not for malicious or illegal activities**
-
-### Legal Compliance:
-- Users are solely responsible for how they use this tool
-- The developers are **not liable** for any misuse or damage caused
-- Ensure compliance with local, state, and federal laws
-
-
-[Website](https://securitytrails.com/)
-**Features:**
-- Historical DNS records
-- IP history for domains
-- Subdomain enumeration
-- Free tier available
-**Usage:** Search for domain → View DNS History
-
-
-
-[Website](https://viewdns.info/)
-**Features:**
-- IP History lookup
-- DNS record history
-- Reverse IP lookup
-- Completely free
-**Tools:**
-- IP History: https://viewdns.info/iphistory/
-- Reverse IP: https://viewdns.info/reverseip/
-
-
-[Website]( https://dnslytics.com/)
-**Features:**
-- Historical DNS data
-- Reverse IP lookup
-- Domain history
-- Free limited queries
--
-
-[Website]( https://www.whoxy.com/)
-**Features:**
-- Reverse IP lookup
-- Historical WHOIS
-- Free API limited.
-
-
-##support
-I DO NOT offer support for provide illigal issue but I will help you to reach your goal
-
-
-[linkedin](https://www.linkedin.com/in/matrix-leons-77793a340)
-
-
-**evilwaf** is made by matrix leons
-
-
-
-
-
-## 💥Show Your Support
-If this program has been helpful to you and see the problems please consider giving us the feedback
-
-
-
-
-## CA Certificate Setup (Required for HTTPS)
-
+## Quickstart (60s)
```bash
-EvilWAF generates a local CA to intercept HTTPS traffic. You need to trust it once.
-
-# Run EvilWAF first — CA is auto-generated at startup
-# Then find the cert:
-ls /tmp/evilwaf_ca_*/evilwaf-ca.pem
-
-# Linux — trust system-wide
-sudo cp /tmp/evilwaf_ca_*/evilwaf-ca.pem /usr/local/share/ca-certificates/evilwaf-ca.crt
-sudo update-ca-certificates
-
-# macOS
-sudo security add-trusted-cert -d -r trustRoot -k /Library/Keychains/System.keychain /tmp/evilwaf_ca_*/evilwaf-ca.pem
-
-For tools like sqlmap, pass --verify-ssl=false or use the --no-check-certificate equivalent for your tool.
+python3 -m venv .venv
+source .venv/bin/activate
+pip install -r requirements.lock
+python3 evilwaf.py -t https://example.com --no-tui
+python3 evilwaf.py -t https://example.com --no-tui --record-limit 5000
```
-
-
-## Installation
-
-
+## Core Features
+- Transparent proxy for tools that support `--proxy`
+- HTTPS MITM with on-the-fly certificates
+- TCP/TLS fingerprint rotation
+- Optional Tor/proxy rotation
+- WAF signature-based detection
+- Origin IP hunting (multiple scanners with confidence ranking)
+
+## Project Layout
+- `evilwaf.py` - CLI entrypoint and app wiring
+- `core/` - interception engine, protocol handling, WAF detector
+- `chemistry/` - rotation and origin-recon modules
+- `tests/` - `unittest` suite
+- `.github/workflows/` - CI, CodeQL, release automation
+
+## Development
```bash
-
-# 1. Create virtual environment
-python3 -m venv myenv
-
-# 2. Activate virtual environment
-source myenv/bin/activate
-
-
-git clone https://github.com/matrixleons/evilwaf.git
-
-cd evilwaf
-
-pip3 install -r requirements.txt
-
-python3 evilwaf.py -h
-
-
-Docker Installation
-
-docker build -t evilwaf .
-docker run -it evilwaf -t example.com
+python3 -m unittest discover -s tests -v
+coverage run --source=core,chemistry -m unittest discover -s tests -v
+coverage report -m --fail-under=100
+python3 benchmarks/proxy_benchmark.py --proxy http://127.0.0.1:8080 --target http://127.0.0.1:18080
```
-## Usage
-
-```bash
-Basic — Standard Proxy Mode
-python3 evilwaf.py -t https://target.com
-
-Auto-Hunt Origin IP Behind WAF
-python3 evilwaf.py -t https://target.com --auto-hunt
-
-EvilWAF will run 10 scanners in parallel, rank candidates by confidence, then ask:
- [?] Use 1.2.3.4 as origin IP for bypass? [y/n]:If you confirm, all traffic goes directly to the real server IP, bypassing the WAF completely
-
-Manual Origin IP (If You Already Know It)
-python3 evilwaf.py -t https://target.com --server-ip 1.2.3.4
-
-With Tor IP Rotation when tor is running
-
-python3 evilwaf.py -t https://target.com --enable-tor
-
-Headless Mode (No TUI)
-python3 evilwaf.py -t https://target.com --no-tui
-
-Upstream Proxy (route through external proxy)
-python3 evilwaf.py -t https://target.com --upstream-proxy socks5://127.0.0.1:1080
-python3 evilwaf.py -t https://target.com --upstream-proxy http://user:pass@proxy.com:8080
-python3 evilwaf.py -t https://target.com --proxy-file proxies.txt
-
-Custom Listen Address and Port
-python3 evilwaf.py -t https://target.com --listen-host 0.0.0.0 --listen-port 9090
-
-
-Connecting Your ToolOnce EvilWAF is running, point any tool to it via proxy:
-
- # sqlmap
-sqlmap -u "https://target.com/page?id=1" --proxy=http://127.0.0.1:8080 --ignore-proxy=False
-
-# ffuf
-ffuf -u https://target.com/FUZZ -x http://127.0.0.1:8080
-
-# nuclei
-nuclei -u https://target.com -proxy http://127.0.0.1:8080
-
-# curl (for testing)
-curl -x http://127.0.0.1:8080 https://target.com
-
-API Keys (Optional)
-Set these as environment variables to unlock more origin IP scanners:
-
-export SHODAN_API_KEY="your_key"
-export SECURITYTRAILS_API_KEY="your_key"
-export VIRUSTOTAL_API_KEY="your_key"
-export CENSYS_API_ID="your_id"
-export CENSYS_API_SECRET="your_secret"
-
-Without API keys, EvilWAF still runs using free sources (DNS history, SSL certs, HTTP headers, favicon hash, subdomain enum).
-```
+Quality/security checks run in CI:
+- Ruff (`F`, `E9`)
+- Black formatting check (selected modules)
+- Mypy strict check (selected modules)
+- `pip-audit` on locked dependencies
+- performance budget checks (`.github/workflows/performance.yml`)
+
+Memory safety:
+- `--record-limit` bounds in-memory traffic records (minimum enforced value: `1000`).
+- `--record-spool-file /path/to/records.jsonl` writes evicted records to JSONL once the in-memory cap is reached.
+
+## Architecture (High Level)
+1. Client connects to local proxy.
+2. HTTP traffic is forwarded and normalized by `core/interceptor.py`.
+3. HTTPS `CONNECT` can be tunneled or intercepted with generated certificates.
+4. Request/response records are scored for pass/block behavior.
+5. Optional chemistry modules apply TCP/TLS/Tor/proxy rotation and origin discovery.
+
+## Troubleshooting
+- TLS certificate errors: trust the generated CA certificate.
+- Tor rotation not active: verify Tor control port/password configuration.
+- Coverage/CI mismatch: regenerate lockfile and run tests from a clean virtualenv.
## Contributing
-
-Contributions are welcome. EvilWAF is growing and there are many areas to improve.
-How to Contribute
-
-# Clone your fork
-git clone https://github.com/matrixleons/evilwaf/fork
-* Create your feature branch: ``git checkout -b my-new-feature``
-* Commit your changes: ``git commit -am 'Add some feature'``
-* Push to the branch: ``git push origin my-new-feature``
-* Submit a pull request!
-
-# Guidelines
-Keep code clean and consistent with existing style
-Test your changes before submitting a PR
-
-Do not create technique which modify the body include headers, payloads and cookies
-
-Open an issue first for large changes so we can discuss
-
-# License
-Licensed under the Apache License, Version 2.0
-
-
-
-
+Open PRs against `dev`. Include test evidence and risk notes for networking/TLS changes.
diff --git a/core/interceptor.py b/core/interceptor.py
index 8f56e6e..a6fc2ef 100644
--- a/core/interceptor.py
+++ b/core/interceptor.py
@@ -3,21 +3,19 @@
import contextlib
import datetime
import ipaddress
+import json
import os
-import queue
-import random
import re
import select
import socket
import ssl
-import struct
import tempfile
import threading
import time
-from dataclasses import dataclass, field
+from collections import deque
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
-from typing import Any, Callable, Dict, List, Optional, Tuple
+from typing import Any, Callable, Deque, Dict, List, Optional, Tuple
from urllib.parse import parse_qs, urlparse
from cryptography import x509
@@ -33,80 +31,40 @@
import h2.events
import h2.exceptions
H2_AVAILABLE = True
-except ImportError:
- H2_AVAILABLE = False
+except ImportError: # pragma: no cover - optional dependency
+ H2_AVAILABLE = False # pragma: no cover
try:
- import aioquic.quic.configuration
- import aioquic.quic.connection
- import aioquic.quic.events
- import asyncio
+ import aioquic # noqa: F401 # pragma: no cover
AIOQUIC_AVAILABLE = True
-except ImportError:
- AIOQUIC_AVAILABLE = False
+except Exception: # pragma: no cover - optional dependency
+ AIOQUIC_AVAILABLE = False # pragma: no cover
from chemistry.tcp_options import TCPOptionsManipulator
from chemistry.tls_rotator import TLSFingerprinter
from chemistry.tor_rotator import TorRotator
from chemistry.proxy_rotator import ProxyRotator
-
-
-@dataclass
-class InterceptedRequest:
- method: str = "GET"
- url: str = ""
- path: str = ""
- host: str = ""
- port: int = 80
- headers: dict = field(default_factory=dict)
- body: bytes = b""
- cookies: dict = field(default_factory=dict)
- query_params: dict = field(default_factory=dict)
- timestamp: float = 0.0
- source_pid: Optional[int] = None
- source_tool: Optional[str] = None
- is_tunnel: bool = False
- is_https: bool = False
- sni_hostname: Optional[str] = None
- tls_version: Optional[str] = None
- http_version: str = "1.1"
-
-
-@dataclass
-class InterceptedResponse:
- status_code: int = 0
- status_text: str = ""
- headers: dict = field(default_factory=dict)
- body: bytes = b""
- cookies: dict = field(default_factory=dict)
- response_time: float = 0.0
- timestamp: float = 0.0
- is_https: bool = False
- tls_version: Optional[str] = None
- http_version: str = "1.1"
-
-
-@dataclass
-class ProxyRecord:
- request: InterceptedRequest = field(default_factory=InterceptedRequest)
- response: InterceptedResponse = field(default_factory=InterceptedResponse)
- technique_applied: str = ""
- passed: bool = False
- blocked: bool = False
- total_time: float = 0.0
- intercepted_https: bool = False
- decryption_successful: bool = False
-
-
-@dataclass
-class AdvisorDecision:
- action: str = "forward"
- technique: str = ""
- delay: float = 0.0
- rotate_ip: bool = False
- reason: str = ""
- forward_response: bool = True
- next_protocol: Optional[str] = None
+from core.models import AdvisorDecision, InterceptedRequest, InterceptedResponse, ProxyRecord
+from core.pipeline import Forwarder, Magic, ResponseAdvisor
+
+__all__ = [
+ "AdvisorDecision",
+ "InterceptedRequest",
+ "InterceptedResponse",
+ "ProxyRecord",
+ "CertificateAuthority",
+ "H2Connection",
+ "H1Parser",
+ "TLSContextFactory",
+ "MITMHandshaker",
+ "H2SessionHandler",
+ "ResponseAdvisor",
+ "Magic",
+ "Forwarder",
+ "ThreadedHTTPServer",
+ "Interceptor",
+ "create_interceptor",
+]
class CertificateAuthority:
@@ -141,7 +99,7 @@ def _create_ca(self):
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF MITM Proxy"),
x509.NameAttribute(NameOID.COMMON_NAME, "evilwaf-ca"),
])
- now = datetime.datetime.utcnow()
+ now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
ca_cert = (
x509.CertificateBuilder()
.subject_name(subject)
@@ -254,7 +212,7 @@ def _generate_host_certificate(self, hostname: str) -> Tuple[str, str]:
except Exception:
pass
subject_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF Proxy"))
- now = datetime.datetime.utcnow()
+ now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
cert = (
x509.CertificateBuilder()
.subject_name(x509.Name(subject_attrs))
@@ -557,10 +515,8 @@ class TLSContextFactory:
)
@classmethod
- def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext:
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
+ def client_context(cls, alpn: Optional[List[str]] = None) -> ssl.SSLContext:
+ ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
try:
ctx.set_ciphers(cls.CIPHERS)
@@ -571,11 +527,14 @@ def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext:
return ctx
@classmethod
- def server_context(cls, cert_path: str, key_path: str, alpn: List[str] = None) -> ssl.SSLContext:
+ def server_context(
+ cls,
+ cert_path: str,
+ key_path: str,
+ alpn: Optional[List[str]] = None,
+ ) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(certfile=cert_path, keyfile=key_path)
- ctx.check_hostname = False
- ctx.verify_mode = ssl.CERT_NONE
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
try:
ctx.set_ciphers(cls.CIPHERS)
@@ -662,6 +621,7 @@ def __init__(
records_list: List,
records_lock: threading.Lock,
is_waf_block: Callable,
+ record_sink: Optional[Callable[[ProxyRecord], None]] = None,
):
self.client_tls = client_tls
self.server_tls = server_tls
@@ -673,6 +633,7 @@ def __init__(
self.advisor = advisor
self.records_list = records_list
self.records_lock = records_lock
+ self.record_sink = record_sink
self.is_waf_block = is_waf_block
def _make_client_h2(self) -> Optional[H2Connection]:
@@ -839,8 +800,11 @@ def pump_server():
decryption_successful=True,
)
records.append(record)
- with self.records_lock:
- self.records_list.append(record)
+ if self.record_sink:
+ self.record_sink(record)
+ else:
+ with self.records_lock:
+ self.records_list.append(record)
if self.callbacks.get("record"):
self.callbacks["record"](record)
try:
@@ -965,8 +929,11 @@ def _handle_h1_to_h1(self) -> List[ProxyRecord]:
decryption_successful=True,
)
records.append(record)
- with self.records_lock:
- self.records_list.append(record)
+ if self.record_sink:
+ self.record_sink(record)
+ else:
+ with self.records_lock:
+ self.records_list.append(record)
if self.callbacks.get("record"):
self.callbacks["record"](record)
@@ -1012,127 +979,6 @@ def _relay_raw(self) -> List[ProxyRecord]:
return []
-class ResponseAdvisor:
- ROTATE_ON = (429, 503, 509)
- RETRY_TECH = (403, 406, 418)
- PASS = (200, 201, 204, 301, 302, 304, 404)
-
- def __init__(self, magic: "Magic", max_retries: int = 3, retry_delay: float = 1.5):
- self._magic = magic
- self._max = max_retries
- self._delay = retry_delay
- self._counts: Dict[str, int] = {}
- self._lock = threading.Lock()
-
- def advise(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision:
- code = response.status_code
- if code in self.PASS:
- self._reset(request.host)
- return AdvisorDecision(action="forward", reason=f"{code} pass")
- if code in self.RETRY_TECH:
- return self._retry(request, record, reason=f"{code} waf block")
- if code in self.ROTATE_ON:
- return self._rotate_and_retry(response, request, record)
- return AdvisorDecision(action="forward", reason=f"{code} default")
-
- def _retry(self, request: InterceptedRequest, record: ProxyRecord, reason: str) -> AdvisorDecision:
- if not self._has_left(request.host):
- return AdvisorDecision(action="forward", reason="max retries")
- self._inc(request.host)
- return AdvisorDecision(action="retry", delay=self._delay, reason=reason, forward_response=False)
-
- def _rotate_and_retry(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision:
- if not self._has_left(request.host):
- return AdvisorDecision(action="forward", reason="max retries")
- delay = self._get_delay(response)
- self._inc(request.host)
- return AdvisorDecision(action="rotate_and_retry", delay=delay, rotate_ip=True, reason="rate limited", forward_response=False)
-
- def _has_left(self, host: str) -> bool:
- with self._lock:
- return self._counts.get(host, 0) < self._max
-
- def _inc(self, host: str):
- with self._lock:
- self._counts[host] = self._counts.get(host, 0) + 1
-
- def _reset(self, host: str):
- with self._lock:
- self._counts.pop(host, None)
-
- def _get_delay(self, response: InterceptedResponse) -> float:
- ra = response.headers.get("retry-after", "").strip()
- if ra.isdigit():
- return min(float(ra), 60.0)
- return self._delay
-
-
-class Magic:
- def __init__(self, tcp: Optional[TCPOptionsManipulator] = None, tls: Optional[TLSFingerprinter] = None, tor: Optional[TorRotator] = None):
- self._tcp = tcp or TCPOptionsManipulator()
- self._tls = tls or TLSFingerprinter()
- self._tor = tor or TorRotator()
- self._lock = threading.Lock()
- self._request_count = 0
-
- def apply(self, technique: str = "") -> Dict[str, Any]:
- with self._lock:
- self._request_count += 1
- tcp_opts = self._tcp.per_request_options()
- tls_sess, tls_id = self._tls.paired_with_tcp(tcp_opts.get("profile", ""))
- result = {
- "tcp": tcp_opts,
- "tls": {"session": tls_sess, "identifier": tls_id},
- "tor": {},
- }
- if technique == "ip_rotation" or self._tor.should_rotate(self._request_count):
- if self._tor.is_tor_alive():
- ok, ip = self._tor.rotate_and_verify()
- result["tor"] = {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()}
- return result
-
- def _bind_to_tor(self) -> Dict[str, Any]:
- if not self._tor.is_tor_alive():
- return {"active": False}
- ok, ip = self._tor.rotate_and_verify()
- return {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()}
-
- def error_solver(self, error: Exception, context: str = "") -> bool:
- if isinstance(error, ssl.SSLError):
- try:
- self._tls.rotate()
- except Exception:
- pass
- if isinstance(error, (ConnectionResetError, BrokenPipeError, TimeoutError)):
- try:
- self._tcp.rotate()
- except Exception:
- pass
- return True
-
-
-class Forwarder:
- def forward(self, response: InterceptedResponse, handler: BaseHTTPRequestHandler) -> bool:
- try:
- if response.status_code == 0:
- response.status_code = 502
- response.status_text = "Bad Gateway"
- handler.send_response(response.status_code, response.status_text)
- skip = {"transfer-encoding", "connection", "keep-alive"}
- for k, v in response.headers.items():
- if k.lower() not in skip:
- handler.send_header(k, v)
- handler.send_header("Connection", "close")
- if response.body:
- handler.send_header("Content-Length", str(len(response.body)))
- handler.end_headers()
- if response.body and handler.command != "HEAD":
- handler.wfile.write(response.body)
- return True
- except Exception:
- return False
-
-
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
allow_reuse_address = True
@@ -1150,6 +996,8 @@ def __init__(
override_ip: Optional[str] = None,
target_host: Optional[str] = None,
upstream_proxies: Optional[List[str]] = None,
+ record_limit: int = 20000,
+ record_spool_path: Optional[str] = None,
):
self._host = listen_host
self._port = listen_port
@@ -1157,13 +1005,25 @@ def __init__(
self._running = False
self._server: Optional[ThreadedHTTPServer] = None
self._thread: Optional[threading.Thread] = None
- self._records: List[ProxyRecord] = []
+ self._records: Deque[ProxyRecord] = deque(maxlen=max(1000, record_limit))
self._records_lock = threading.Lock()
+ self._record_spool_path = record_spool_path
+ self._record_spool_lock = threading.Lock()
+ self._record_spool_fp = None
+ if self._record_spool_path:
+ spool_dir = os.path.dirname(self._record_spool_path)
+ if spool_dir:
+ os.makedirs(spool_dir, exist_ok=True)
+ self._record_spool_fp = open(self._record_spool_path, "a", encoding="utf-8")
self._proxy_rotator = ProxyRotator(proxy_urls=upstream_proxies) if upstream_proxies else None
self.ca = CertificateAuthority()
- self._handshaker = MITMHandshaker(self.ca, proxy_rotator=self._proxy_rotator)
+ self._handshaker = MITMHandshaker(
+ self.ca,
+ override_ip=self._override_ip,
+ proxy_rotator=self._proxy_rotator,
+ )
self._forwarder = Forwarder()
self._tor = TorRotator(
@@ -1173,7 +1033,12 @@ def __init__(
)
self._tcp_manip = TCPOptionsManipulator()
self._tls_fp = TLSFingerprinter()
- self._magic = Magic(tcp=self._tcp_manip, tls=self._tls_fp, tor=self._tor)
+ self._magic = Magic(
+ tcp=self._tcp_manip,
+ tls=self._tls_fp,
+ tor=self._tor,
+ rotate_every=tor_rotate_every,
+ )
self._advisor = ResponseAdvisor(self._magic)
self.intercept_https = intercept_https
@@ -1216,7 +1081,8 @@ def _process_http_request(self, req: InterceptedRequest) -> InterceptedResponse:
req.host = host
req.port = port
- sock = self._create_upstream_connection(host, port, timeout=30)
+ connect_host = self._override_ip or host
+ sock = self._create_upstream_connection(connect_host, port, timeout=30)
if parsed.scheme == "https":
ctx = TLSContextFactory.client_context(alpn=["http/1.1"])
@@ -1280,6 +1146,51 @@ def clear_records(self):
def export_ca_certificates(self, export_dir: Optional[str] = None) -> Dict[str, str]:
return self.ca.export_ca_certificates(export_dir)
+ @staticmethod
+ def _serialize_record(record: ProxyRecord) -> Dict[str, Any]:
+ return {
+ "timestamp": record.request.timestamp,
+ "method": record.request.method,
+ "host": record.request.host,
+ "path": record.request.path,
+ "status_code": record.response.status_code,
+ "passed": record.passed,
+ "blocked": record.blocked,
+ "technique": record.technique_applied,
+ "is_https": record.request.is_https,
+ "response_time": record.response.response_time,
+ }
+
+ def _spill_record(self, record: ProxyRecord):
+ fp = getattr(self, "_record_spool_fp", None)
+ if not fp:
+ return
+ payload = json.dumps(self._serialize_record(record), separators=(",", ":"))
+ lock = getattr(self, "_record_spool_lock", None)
+ if lock:
+ with lock:
+ fp.write(payload + "\n")
+ fp.flush()
+ else:
+ fp.write(payload + "\n")
+ fp.flush()
+
+ def _append_record(self, record: ProxyRecord):
+ lock = getattr(self, "_records_lock", None)
+ records = getattr(self, "_records", None)
+ if records is None:
+ return
+ maxlen = getattr(records, "maxlen", None)
+ if lock:
+ with lock:
+ if maxlen and len(records) >= maxlen:
+ self._spill_record(records[0])
+ records.append(record)
+ return
+ if maxlen and len(records) >= maxlen:
+ self._spill_record(records[0])
+ records.append(record)
+
def start(self):
self._running = True
interceptor_ref = self
@@ -1355,8 +1266,7 @@ def _dispatch(self):
passed=200 <= resp.status_code < 400,
blocked=interceptor_ref._is_waf_block(resp.status_code),
)
- with interceptor_ref._records_lock:
- interceptor_ref._records.append(record)
+ interceptor_ref._append_record(record)
if interceptor_ref._callbacks["record"]:
interceptor_ref._callbacks["record"](record)
interceptor_ref._forwarder.forward(resp, self)
@@ -1378,8 +1288,6 @@ def do_CONNECT(self):
parts = self.path.split(":")
remote_host = parts[0]
remote_port = int(parts[1]) if len(parts) > 1 else 443
- start_time = time.time()
-
if interceptor_ref.intercept_https:
self.send_response(200, "Connection Established")
self.send_header("Proxy-Agent", "EvilWAF")
@@ -1407,6 +1315,7 @@ def do_CONNECT(self):
records_list=interceptor_ref._records,
records_lock=interceptor_ref._records_lock,
is_waf_block=interceptor_ref._is_waf_block,
+ record_sink=interceptor_ref._append_record,
)
handler.handle()
try:
@@ -1477,6 +1386,15 @@ def stop(self):
if self._server:
self._server.shutdown()
self._server.server_close()
+ spool_fp = getattr(self, "_record_spool_fp", None)
+ spool_lock = getattr(self, "_record_spool_lock", None)
+ if spool_fp:
+ if spool_lock:
+ with spool_lock:
+ spool_fp.close()
+ else:
+ spool_fp.close()
+ self._record_spool_fp = None
self.ca.cleanup()
def is_running(self) -> bool:
@@ -1496,6 +1414,8 @@ def create_interceptor(
override_ip: Optional[str] = None,
target_host: Optional[str] = None,
upstream_proxies: Optional[List[str]] = None,
+ record_limit: int = 20000,
+ record_spool_path: Optional[str] = None,
) -> Interceptor:
return Interceptor(
listen_host=listen_host,
@@ -1507,5 +1427,7 @@ def create_interceptor(
override_ip=override_ip,
target_host=target_host,
upstream_proxies=upstream_proxies,
+ record_limit=record_limit,
+ record_spool_path=record_spool_path,
)
-
\ No newline at end of file
+
diff --git a/evilwaf.py b/evilwaf.py
index 2ea82a4..ed77c06 100644
--- a/evilwaf.py
+++ b/evilwaf.py
@@ -20,6 +20,7 @@
ProxyRecord,
create_interceptor,
)
+from core.proxy_file import load_proxy_file
from core.waf_detector import WAFDetector
from chemistry.origin_server_ip import OriginServerIPHunter, ReconReport, OriginResult
@@ -46,7 +47,7 @@ def _fmt_duration(seconds: float) -> str:
def _detect_waf(target_url: str) -> Optional[str]:
det = WAFDetector()
try:
- r = requests.get(target_url, timeout=10, verify=False, allow_redirects=True)
+ r = requests.get(target_url, timeout=10, allow_redirects=True)
found = det.detect_all(
response_body=r.text,
headers=dict(r.headers),
@@ -63,7 +64,7 @@ def _hunt_origin_ip_verbose(target: str) -> Optional[str]:
domain = parsed.hostname or parsed.netloc
print(f"\n[*] Origin IP Hunter started for: {domain}")
- print(f"[*] Launching scanners in parallel:\n")
+ print("[*] Launching scanners in parallel:\n")
scanner_names = [
"dns_history",
@@ -527,7 +528,7 @@ def _make_header(self) -> urwid.Widget:
proxy_p = f" | Proxy: {self.upstream_proxy_count}" if self.upstream_proxy_count else ""
return urwid.AttrMap(
urwid.Text(
- ('header', f" EvilWAF v2.4.1 | {h}{waf_p}{ip_p}{tor_p}{proxy_p} q=Quit up/down=browse f=follow "),
+ ('header', f" EvilWAF v2.4 | {h}{waf_p}{ip_p}{tor_p}{proxy_p} q=Quit up/down=browse f=follow "),
align='center',
),
'header',
@@ -738,6 +739,8 @@ def __init__(
server_ip: Optional[str] = None,
target_host: Optional[str] = None,
upstream_proxies: Optional[List[str]] = None,
+ record_limit: int = 20000,
+ record_spool_file: Optional[str] = None,
):
self._enable_tor = enable_tor
self._running = False
@@ -752,6 +755,8 @@ def __init__(
override_ip=server_ip,
target_host=target_host,
upstream_proxies=upstream_proxies,
+ record_limit=record_limit,
+ record_spool_path=record_spool_file,
)
self._tor_table = TorIPTable()
@@ -829,7 +834,7 @@ def signal_handler(signum: int, frame: Any):
def main():
parser = argparse.ArgumentParser(
prog="evilwaf",
- description="EvilWAF v2.4.1 — Transparent WAF Bypass Proxy",
+ description="EvilWAF v2.4 — Transparent WAF Bypass Proxy",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Flags:\n"
@@ -845,6 +850,8 @@ def main():
" --upstream-proxy URL Upstream proxy (http://, socks5://, socks4://)\n"
" --proxy-file FILE File with proxy URLs for rotation\n"
" --no-tui Headless mode, print traffic to stdout\n"
+ " --record-limit In-memory record cap (default: 20000)\n"
+ " --record-spool-file Optional JSONL file for evicted records\n"
"\n"
"API Keys (optional, set as environment variables):\n"
" SHODAN_API_KEY Shodan API key\n"
@@ -873,6 +880,8 @@ def main():
parser.add_argument("--proxy-file", type=str, default=None, metavar="FILE",
help="File with proxy URLs, one per line, for rotation")
parser.add_argument("--no-tui", action="store_true")
+ parser.add_argument("--record-limit", type=int, default=20000)
+ parser.add_argument("--record-spool-file", type=str, default=None)
args = parser.parse_args()
@@ -892,11 +901,10 @@ def main():
if args.upstream_proxy:
upstream_proxies = [args.upstream_proxy]
if args.proxy_file:
- with open(args.proxy_file) as f:
- file_proxies = [line.strip() for line in f if line.strip() and not line.startswith('#')]
+ file_proxies = load_proxy_file(args.proxy_file)
upstream_proxies = (upstream_proxies or []) + file_proxies
- print("[*] EvilWAF v2.4.1")
+ print("[*] EvilWAF v2.4")
print(f"[*] Target : {args.target}")
print("[*] Detecting WAF...", end="", flush=True)
waf_name = _detect_waf(args.target)
@@ -928,6 +936,9 @@ def main():
print(f"[*] Proxy : {len(upstream_proxies)} upstream proxy(ies)")
print(f"[*] Listen : {args.listen_host}:{args.listen_port}")
+ print(f"[*] Record limit : {max(1000, args.record_limit)}")
+ if args.record_spool_file:
+ print(f"[*] Record spool: {args.record_spool_file}")
orchestrator = EvilWAFOrchestrator(
listen_host=args.listen_host,
@@ -939,6 +950,8 @@ def main():
server_ip=server_ip,
target_host=parsed.hostname,
upstream_proxies=upstream_proxies,
+ record_limit=max(1000, args.record_limit),
+ record_spool_file=args.record_spool_file,
)
orchestrator.start()
diff --git a/tests/test_record_spool.py b/tests/test_record_spool.py
new file mode 100644
index 0000000..bc429bb
--- /dev/null
+++ b/tests/test_record_spool.py
@@ -0,0 +1,140 @@
+import io
+import json
+import tempfile
+import threading
+import unittest
+from unittest import mock
+
+from _deps import install_dependency_stubs
+
+install_dependency_stubs()
+
+import core.interceptor as i
+
+
+class RecordSpoolTest(unittest.TestCase):
+ def test_append_record_spills_oldest_when_buffer_full(self):
+ inter = i.Interceptor.__new__(i.Interceptor)
+ inter._records = i.deque(maxlen=1)
+ inter._records_lock = threading.Lock()
+ inter._record_spool_lock = threading.Lock()
+ inter._record_spool_fp = io.StringIO()
+
+ old = i.ProxyRecord(
+ request=i.InterceptedRequest(timestamp=1.0, method="GET", host="old.example", path="/a"),
+ response=i.InterceptedResponse(status_code=200, response_time=0.1),
+ technique_applied="x",
+ passed=True,
+ blocked=False,
+ )
+ new = i.ProxyRecord(
+ request=i.InterceptedRequest(timestamp=2.0, method="POST", host="new.example", path="/b"),
+ response=i.InterceptedResponse(status_code=403, response_time=0.2),
+ technique_applied="y",
+ passed=False,
+ blocked=True,
+ )
+
+ inter._append_record(old)
+ inter._append_record(new)
+
+ self.assertEqual(len(inter._records), 1)
+ self.assertEqual(inter._records[0].request.host, "new.example")
+ spilled = inter._record_spool_fp.getvalue().strip().splitlines()
+ self.assertEqual(len(spilled), 1)
+ payload = json.loads(spilled[0])
+ self.assertEqual(payload["host"], "old.example")
+ self.assertEqual(payload["status_code"], 200)
+
+ def test_spill_and_append_fallback_paths(self):
+ inter = i.Interceptor.__new__(i.Interceptor)
+ inter._record_spool_fp = None
+ inter._record_spool_lock = None
+ inter._spill_record(i.ProxyRecord())
+
+ inter._record_spool_fp = io.StringIO()
+ inter._record_spool_lock = None
+ rec = i.ProxyRecord(
+ request=i.InterceptedRequest(timestamp=3.0, method="GET", host="nolock.example", path="/"),
+ response=i.InterceptedResponse(status_code=201, response_time=0.05),
+ passed=True,
+ )
+ inter._spill_record(rec)
+ self.assertIn("nolock.example", inter._record_spool_fp.getvalue())
+
+ inter2 = i.Interceptor.__new__(i.Interceptor)
+ inter2._records = None
+ inter2._records_lock = None
+ inter2._record_spool_fp = None
+ inter2._record_spool_lock = None
+ inter2._append_record(i.ProxyRecord())
+
+ inter3 = i.Interceptor.__new__(i.Interceptor)
+ inter3._records = i.deque(maxlen=1)
+ inter3._records_lock = None
+ inter3._record_spool_fp = io.StringIO()
+ inter3._record_spool_lock = None
+ inter3._append_record(
+ i.ProxyRecord(
+ request=i.InterceptedRequest(timestamp=4.0, method="GET", host="first.example"),
+ response=i.InterceptedResponse(status_code=202),
+ )
+ )
+ inter3._append_record(
+ i.ProxyRecord(
+ request=i.InterceptedRequest(timestamp=5.0, method="GET", host="second.example"),
+ response=i.InterceptedResponse(status_code=203),
+ )
+ )
+ self.assertEqual(inter3._records[0].request.host, "second.example")
+ self.assertIn("first.example", inter3._record_spool_fp.getvalue())
+
+ def test_h2session_handler_uses_record_sink_in_h1_flow(self):
+ sink = mock.Mock()
+ handler = i.H2SessionHandler(
+ client_tls=mock.Mock(),
+ server_tls=mock.Mock(),
+ host="example.com",
+ port=443,
+ server_alpn="http/1.1",
+ callbacks={},
+ magic=mock.Mock(apply=mock.Mock(return_value={"tcp": {"profile": "p"}, "tor": {"active": False}})),
+ advisor=mock.Mock(advise=mock.Mock(return_value=i.AdvisorDecision(action="forward"))),
+ records_list=[],
+ records_lock=threading.Lock(),
+ is_waf_block=lambda code: code == 403,
+ record_sink=sink,
+ )
+ req_raw = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
+ resp_raw = b"HTTP/1.1 200 OK\r\n\r\n"
+ with mock.patch.object(
+ i.H1Parser,
+ "read_message",
+ side_effect=[(req_raw, b""), (resp_raw, b""), (b"", b"")],
+ ):
+ out = handler._handle_h1_to_h1()
+ self.assertEqual(len(out), 1)
+ sink.assert_called_once()
+
+ def test_interceptor_opens_and_closes_spool_file(self):
+ with tempfile.TemporaryDirectory() as d:
+ spool_path = f"{d}/records.jsonl"
+ inter = i.Interceptor(record_spool_path=spool_path)
+ self.assertIsNotNone(inter._record_spool_fp)
+ inter.stop()
+ self.assertIsNone(inter._record_spool_fp)
+
+ def test_stop_closes_spool_without_lock(self):
+ inter = i.Interceptor.__new__(i.Interceptor)
+ inter._running = True
+ inter._server = None
+ inter._record_spool_fp = io.StringIO()
+ inter._record_spool_lock = None
+ inter.ca = mock.Mock()
+ inter.stop()
+ self.assertIsNone(inter._record_spool_fp)
+ inter.ca.cleanup.assert_called_once()
+
+
+if __name__ == "__main__":
+ unittest.main()