From ee17c7ba0e26cdc01176b75b7a00491ef8e2d9b1 Mon Sep 17 00:00:00 2001 From: mkilijanek Date: Mon, 9 Mar 2026 09:21:11 +0100 Subject: [PATCH] feat(metrics): add runtime proxy metrics and accounting (cherry picked from commit 8861e3f36d26c2c9fa31e549a23aeefbd0b9cd39) --- core/interceptor.py | 456 +++++++++++++++++++++++--------------------- 1 file changed, 242 insertions(+), 214 deletions(-) diff --git a/core/interceptor.py b/core/interceptor.py index 8f56e6e..6564741 100644 --- a/core/interceptor.py +++ b/core/interceptor.py @@ -2,22 +2,21 @@ import contextlib import datetime +import gzip import ipaddress +import json import os -import queue -import random import re import select import socket import ssl -import struct import tempfile import threading import time -from dataclasses import dataclass, field +from collections import deque from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Deque, Dict, List, Optional, Tuple from urllib.parse import parse_qs, urlparse from cryptography import x509 @@ -33,80 +32,40 @@ import h2.events import h2.exceptions H2_AVAILABLE = True -except ImportError: - H2_AVAILABLE = False +except ImportError: # pragma: no cover - optional dependency + H2_AVAILABLE = False # pragma: no cover try: - import aioquic.quic.configuration - import aioquic.quic.connection - import aioquic.quic.events - import asyncio + import aioquic # noqa: F401 # pragma: no cover AIOQUIC_AVAILABLE = True -except ImportError: - AIOQUIC_AVAILABLE = False +except Exception: # pragma: no cover - optional dependency + AIOQUIC_AVAILABLE = False # pragma: no cover from chemistry.tcp_options import TCPOptionsManipulator from chemistry.tls_rotator import TLSFingerprinter from chemistry.tor_rotator import TorRotator from chemistry.proxy_rotator import ProxyRotator - - -@dataclass -class InterceptedRequest: - method: str = "GET" - url: str = "" - path: str = "" - host: str = "" - port: int = 80 - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - query_params: dict = field(default_factory=dict) - timestamp: float = 0.0 - source_pid: Optional[int] = None - source_tool: Optional[str] = None - is_tunnel: bool = False - is_https: bool = False - sni_hostname: Optional[str] = None - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class InterceptedResponse: - status_code: int = 0 - status_text: str = "" - headers: dict = field(default_factory=dict) - body: bytes = b"" - cookies: dict = field(default_factory=dict) - response_time: float = 0.0 - timestamp: float = 0.0 - is_https: bool = False - tls_version: Optional[str] = None - http_version: str = "1.1" - - -@dataclass -class ProxyRecord: - request: InterceptedRequest = field(default_factory=InterceptedRequest) - response: InterceptedResponse = field(default_factory=InterceptedResponse) - technique_applied: str = "" - passed: bool = False - blocked: bool = False - total_time: float = 0.0 - intercepted_https: bool = False - decryption_successful: bool = False - - -@dataclass -class AdvisorDecision: - action: str = "forward" - technique: str = "" - delay: float = 0.0 - rotate_ip: bool = False - reason: str = "" - forward_response: bool = True - next_protocol: Optional[str] = None +from core.models import AdvisorDecision, InterceptedRequest, InterceptedResponse, ProxyRecord +from core.pipeline import Forwarder, Magic, ResponseAdvisor + +__all__ = [ + "AdvisorDecision", + "InterceptedRequest", + "InterceptedResponse", + "ProxyRecord", + "CertificateAuthority", + "H2Connection", + "H1Parser", + "TLSContextFactory", + "MITMHandshaker", + "H2SessionHandler", + "ResponseAdvisor", + "Magic", + "Forwarder", + "ThreadedHTTPServer", + "Interceptor", + "create_interceptor", +] class CertificateAuthority: @@ -141,7 +100,7 @@ def _create_ca(self): x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF MITM Proxy"), x509.NameAttribute(NameOID.COMMON_NAME, "evilwaf-ca"), ]) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) ca_cert = ( x509.CertificateBuilder() .subject_name(subject) @@ -254,7 +213,7 @@ def _generate_host_certificate(self, hostname: str) -> Tuple[str, str]: except Exception: pass subject_attrs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, "EvilWAF Proxy")) - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) cert = ( x509.CertificateBuilder() .subject_name(x509.Name(subject_attrs)) @@ -557,10 +516,8 @@ class TLSContextFactory: ) @classmethod - def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + def client_context(cls, alpn: Optional[List[str]] = None) -> ssl.SSLContext: + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -571,11 +528,14 @@ def client_context(cls, alpn: List[str] = None) -> ssl.SSLContext: return ctx @classmethod - def server_context(cls, cert_path: str, key_path: str, alpn: List[str] = None) -> ssl.SSLContext: + def server_context( + cls, + cert_path: str, + key_path: str, + alpn: Optional[List[str]] = None, + ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_cert_chain(certfile=cert_path, keyfile=key_path) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE ctx.minimum_version = ssl.TLSVersion.TLSv1_2 try: ctx.set_ciphers(cls.CIPHERS) @@ -662,6 +622,7 @@ def __init__( records_list: List, records_lock: threading.Lock, is_waf_block: Callable, + record_sink: Optional[Callable[[ProxyRecord], None]] = None, ): self.client_tls = client_tls self.server_tls = server_tls @@ -673,6 +634,7 @@ def __init__( self.advisor = advisor self.records_list = records_list self.records_lock = records_lock + self.record_sink = record_sink self.is_waf_block = is_waf_block def _make_client_h2(self) -> Optional[H2Connection]: @@ -839,8 +801,11 @@ def pump_server(): decryption_successful=True, ) records.append(record) - with self.records_lock: - self.records_list.append(record) + if self.record_sink: + self.record_sink(record) + else: + with self.records_lock: + self.records_list.append(record) if self.callbacks.get("record"): self.callbacks["record"](record) try: @@ -965,8 +930,11 @@ def _handle_h1_to_h1(self) -> List[ProxyRecord]: decryption_successful=True, ) records.append(record) - with self.records_lock: - self.records_list.append(record) + if self.record_sink: + self.record_sink(record) + else: + with self.records_lock: + self.records_list.append(record) if self.callbacks.get("record"): self.callbacks["record"](record) @@ -1012,127 +980,6 @@ def _relay_raw(self) -> List[ProxyRecord]: return [] -class ResponseAdvisor: - ROTATE_ON = (429, 503, 509) - RETRY_TECH = (403, 406, 418) - PASS = (200, 201, 204, 301, 302, 304, 404) - - def __init__(self, magic: "Magic", max_retries: int = 3, retry_delay: float = 1.5): - self._magic = magic - self._max = max_retries - self._delay = retry_delay - self._counts: Dict[str, int] = {} - self._lock = threading.Lock() - - def advise(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - code = response.status_code - if code in self.PASS: - self._reset(request.host) - return AdvisorDecision(action="forward", reason=f"{code} pass") - if code in self.RETRY_TECH: - return self._retry(request, record, reason=f"{code} waf block") - if code in self.ROTATE_ON: - return self._rotate_and_retry(response, request, record) - return AdvisorDecision(action="forward", reason=f"{code} default") - - def _retry(self, request: InterceptedRequest, record: ProxyRecord, reason: str) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - self._inc(request.host) - return AdvisorDecision(action="retry", delay=self._delay, reason=reason, forward_response=False) - - def _rotate_and_retry(self, response: InterceptedResponse, request: InterceptedRequest, record: ProxyRecord) -> AdvisorDecision: - if not self._has_left(request.host): - return AdvisorDecision(action="forward", reason="max retries") - delay = self._get_delay(response) - self._inc(request.host) - return AdvisorDecision(action="rotate_and_retry", delay=delay, rotate_ip=True, reason="rate limited", forward_response=False) - - def _has_left(self, host: str) -> bool: - with self._lock: - return self._counts.get(host, 0) < self._max - - def _inc(self, host: str): - with self._lock: - self._counts[host] = self._counts.get(host, 0) + 1 - - def _reset(self, host: str): - with self._lock: - self._counts.pop(host, None) - - def _get_delay(self, response: InterceptedResponse) -> float: - ra = response.headers.get("retry-after", "").strip() - if ra.isdigit(): - return min(float(ra), 60.0) - return self._delay - - -class Magic: - def __init__(self, tcp: Optional[TCPOptionsManipulator] = None, tls: Optional[TLSFingerprinter] = None, tor: Optional[TorRotator] = None): - self._tcp = tcp or TCPOptionsManipulator() - self._tls = tls or TLSFingerprinter() - self._tor = tor or TorRotator() - self._lock = threading.Lock() - self._request_count = 0 - - def apply(self, technique: str = "") -> Dict[str, Any]: - with self._lock: - self._request_count += 1 - tcp_opts = self._tcp.per_request_options() - tls_sess, tls_id = self._tls.paired_with_tcp(tcp_opts.get("profile", "")) - result = { - "tcp": tcp_opts, - "tls": {"session": tls_sess, "identifier": tls_id}, - "tor": {}, - } - if technique == "ip_rotation" or self._tor.should_rotate(self._request_count): - if self._tor.is_tor_alive(): - ok, ip = self._tor.rotate_and_verify() - result["tor"] = {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - return result - - def _bind_to_tor(self) -> Dict[str, Any]: - if not self._tor.is_tor_alive(): - return {"active": False} - ok, ip = self._tor.rotate_and_verify() - return {"active": ok, "ip": ip, "proxies": self._tor.get_proxy_dict()} - - def error_solver(self, error: Exception, context: str = "") -> bool: - if isinstance(error, ssl.SSLError): - try: - self._tls.rotate() - except Exception: - pass - if isinstance(error, (ConnectionResetError, BrokenPipeError, TimeoutError)): - try: - self._tcp.rotate() - except Exception: - pass - return True - - -class Forwarder: - def forward(self, response: InterceptedResponse, handler: BaseHTTPRequestHandler) -> bool: - try: - if response.status_code == 0: - response.status_code = 502 - response.status_text = "Bad Gateway" - handler.send_response(response.status_code, response.status_text) - skip = {"transfer-encoding", "connection", "keep-alive"} - for k, v in response.headers.items(): - if k.lower() not in skip: - handler.send_header(k, v) - handler.send_header("Connection", "close") - if response.body: - handler.send_header("Content-Length", str(len(response.body))) - handler.end_headers() - if response.body and handler.command != "HEAD": - handler.wfile.write(response.body) - return True - except Exception: - return False - - class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True allow_reuse_address = True @@ -1150,6 +997,9 @@ def __init__( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, + record_spool_path: Optional[str] = None, + record_spool_max_bytes: int = 50 * 1024 * 1024, ): self._host = listen_host self._port = listen_port @@ -1157,13 +1007,31 @@ def __init__( self._running = False self._server: Optional[ThreadedHTTPServer] = None self._thread: Optional[threading.Thread] = None - self._records: List[ProxyRecord] = [] + self._records: Deque[ProxyRecord] = deque(maxlen=max(1000, record_limit)) self._records_lock = threading.Lock() + self._metrics_lock = threading.Lock() + self._started_at = time.time() + self._total_records = 0 + self._total_passed = 0 + self._total_blocked = 0 + self._record_spool_path = record_spool_path + self._record_spool_max_bytes = max(1024, record_spool_max_bytes) + self._record_spool_lock = threading.Lock() + self._record_spool_fp = None + if self._record_spool_path: + spool_dir = os.path.dirname(self._record_spool_path) + if spool_dir: + os.makedirs(spool_dir, exist_ok=True) + self._record_spool_fp = open(self._record_spool_path, "a", encoding="utf-8") self._proxy_rotator = ProxyRotator(proxy_urls=upstream_proxies) if upstream_proxies else None self.ca = CertificateAuthority() - self._handshaker = MITMHandshaker(self.ca, proxy_rotator=self._proxy_rotator) + self._handshaker = MITMHandshaker( + self.ca, + override_ip=self._override_ip, + proxy_rotator=self._proxy_rotator, + ) self._forwarder = Forwarder() self._tor = TorRotator( @@ -1173,7 +1041,12 @@ def __init__( ) self._tcp_manip = TCPOptionsManipulator() self._tls_fp = TLSFingerprinter() - self._magic = Magic(tcp=self._tcp_manip, tls=self._tls_fp, tor=self._tor) + self._magic = Magic( + tcp=self._tcp_manip, + tls=self._tls_fp, + tor=self._tor, + rotate_every=tor_rotate_every, + ) self._advisor = ResponseAdvisor(self._magic) self.intercept_https = intercept_https @@ -1216,7 +1089,8 @@ def _process_http_request(self, req: InterceptedRequest) -> InterceptedResponse: req.host = host req.port = port - sock = self._create_upstream_connection(host, port, timeout=30) + connect_host = self._override_ip or host + sock = self._create_upstream_connection(connect_host, port, timeout=30) if parsed.scheme == "https": ctx = TLSContextFactory.client_context(alpn=["http/1.1"]) @@ -1280,6 +1154,147 @@ def clear_records(self): def export_ca_certificates(self, export_dir: Optional[str] = None) -> Dict[str, str]: return self.ca.export_ca_certificates(export_dir) + @staticmethod + def _serialize_record(record: ProxyRecord) -> Dict[str, Any]: + return { + "timestamp": record.request.timestamp, + "method": record.request.method, + "host": record.request.host, + "path": record.request.path, + "status_code": record.response.status_code, + "passed": record.passed, + "blocked": record.blocked, + "technique": record.technique_applied, + "is_https": record.request.is_https, + "response_time": record.response.response_time, + } + + def _spill_record(self, record: ProxyRecord): + fp = getattr(self, "_record_spool_fp", None) + if not fp: + return + payload = json.dumps(self._serialize_record(record), separators=(",", ":")) + lock = getattr(self, "_record_spool_lock", None) + if lock: + with lock: + self._rotate_spool_if_needed_unlocked() + fp = self._record_spool_fp + fp.write(payload + "\n") + fp.flush() + else: + self._rotate_spool_if_needed_unlocked() + fp = self._record_spool_fp + fp.write(payload + "\n") + fp.flush() + + def _rotate_spool_if_needed_unlocked(self): + spool_path = getattr(self, "_record_spool_path", None) + if not spool_path: + return + fp = getattr(self, "_record_spool_fp", None) + if not fp: + return + try: + size = os.path.getsize(spool_path) + except OSError: + return + max_bytes = getattr(self, "_record_spool_max_bytes", 50 * 1024 * 1024) + if size < max_bytes: + return + + fp.close() + rotated = f"{spool_path}.1" + gz_path = f"{rotated}.gz" + with contextlib.suppress(OSError): + os.remove(gz_path) + with contextlib.suppress(OSError): + os.remove(rotated) + os.replace(spool_path, rotated) + with open(rotated, "rb") as src, gzip.open(gz_path, "wb", compresslevel=6) as dst: + dst.write(src.read()) + with contextlib.suppress(OSError): + os.remove(rotated) + self._record_spool_fp = open(spool_path, "a", encoding="utf-8") + + def get_spooled_records(self, limit: int = 200) -> List[Dict[str, Any]]: + if not self._record_spool_path: + return [] + entries: List[Dict[str, Any]] = [] + paths = [f"{self._record_spool_path}.1.gz", self._record_spool_path] + for path in paths: + if not os.path.exists(path): + continue + try: + if path.endswith(".gz"): + opener = gzip.open + else: + opener = open + with opener(path, "rt", encoding="utf-8", errors="ignore") as f: + for ln in f: + ln = ln.strip() + if not ln: + continue + try: + entries.append(json.loads(ln)) + except Exception: + continue + except Exception: + continue + return entries[-max(1, limit):] + + def get_metrics(self) -> Dict[str, Any]: + now = time.time() + uptime = max(0.001, now - self._started_at) + with self._metrics_lock: + total = self._total_records + passed = self._total_passed + blocked = self._total_blocked + with self._records_lock: + in_memory = len(self._records) + memory_cap = self._records.maxlen or 0 + spool_size = 0 + if self._record_spool_path and os.path.exists(self._record_spool_path): + with contextlib.suppress(OSError): + spool_size = os.path.getsize(self._record_spool_path) + return { + "uptime_seconds": uptime, + "total_records": total, + "passed_records": passed, + "blocked_records": blocked, + "pass_rate": (passed / total) if total else 0.0, + "block_rate": (blocked / total) if total else 0.0, + "records_per_second": (total / uptime), + "in_memory_records": in_memory, + "in_memory_capacity": memory_cap, + "spool_file": self._record_spool_path, + "spool_size_bytes": spool_size, + } + + def _append_record(self, record: ProxyRecord): + metrics_lock = getattr(self, "_metrics_lock", None) + if metrics_lock: + with metrics_lock: + self._total_records += 1 + if record.passed: + self._total_passed += 1 + if record.blocked: + self._total_blocked += 1 + + lock = getattr(self, "_records_lock", None) + records = getattr(self, "_records", None) + if records is None: + return + maxlen = getattr(records, "maxlen", None) + if lock: + with lock: + if maxlen and len(records) >= maxlen: + self._spill_record(records[0]) + records.append(record) + return + if maxlen and len(records) >= maxlen: + self._spill_record(records[0]) + records.append(record) + def start(self): self._running = True interceptor_ref = self @@ -1355,8 +1370,7 @@ def _dispatch(self): passed=200 <= resp.status_code < 400, blocked=interceptor_ref._is_waf_block(resp.status_code), ) - with interceptor_ref._records_lock: - interceptor_ref._records.append(record) + interceptor_ref._append_record(record) if interceptor_ref._callbacks["record"]: interceptor_ref._callbacks["record"](record) interceptor_ref._forwarder.forward(resp, self) @@ -1378,8 +1392,6 @@ def do_CONNECT(self): parts = self.path.split(":") remote_host = parts[0] remote_port = int(parts[1]) if len(parts) > 1 else 443 - start_time = time.time() - if interceptor_ref.intercept_https: self.send_response(200, "Connection Established") self.send_header("Proxy-Agent", "EvilWAF") @@ -1407,6 +1419,7 @@ def do_CONNECT(self): records_list=interceptor_ref._records, records_lock=interceptor_ref._records_lock, is_waf_block=interceptor_ref._is_waf_block, + record_sink=interceptor_ref._append_record, ) handler.handle() try: @@ -1477,6 +1490,15 @@ def stop(self): if self._server: self._server.shutdown() self._server.server_close() + spool_fp = getattr(self, "_record_spool_fp", None) + spool_lock = getattr(self, "_record_spool_lock", None) + if spool_fp: + if spool_lock: + with spool_lock: + spool_fp.close() + else: + spool_fp.close() + self._record_spool_fp = None self.ca.cleanup() def is_running(self) -> bool: @@ -1496,6 +1518,9 @@ def create_interceptor( override_ip: Optional[str] = None, target_host: Optional[str] = None, upstream_proxies: Optional[List[str]] = None, + record_limit: int = 20000, + record_spool_path: Optional[str] = None, + record_spool_max_bytes: int = 50 * 1024 * 1024, ) -> Interceptor: return Interceptor( listen_host=listen_host, @@ -1507,5 +1532,8 @@ def create_interceptor( override_ip=override_ip, target_host=target_host, upstream_proxies=upstream_proxies, + record_limit=record_limit, + record_spool_path=record_spool_path, + record_spool_max_bytes=record_spool_max_bytes, ) - \ No newline at end of file +