diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..f411971 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,128 @@ +name: CI/CD Pipeline + +on: + push: + branches: [ main, master, develop ] + pull_request: + branches: [ main, master ] + +jobs: + test: + name: Test on Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest pytest-cov flake8 black mypy + + - name: Lint with flake8 + run: | + # Stop the build if there are Python syntax errors or undefined names + flake8 takeover.py --count --select=E9,F63,F7,F82 --show-source --statistics + # Exit-zero treats all errors as warnings + flake8 takeover.py --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + continue-on-error: true + + - name: Check code formatting with black + run: | + black --check takeover.py + continue-on-error: true + + - name: Type checking with mypy + run: | + mypy takeover.py --ignore-missing-imports + continue-on-error: true + + - name: Run tests with pytest + run: | + pytest test_takeover.py -v --cov=takeover --cov-report=xml --cov-report=term + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + flags: unittests + name: codecov-umbrella + fail_ci_if_error: false + + - name: Test CLI help + run: | + python takeover.py || true + + - name: Verify services.json + run: | + python -c "import json; json.load(open('services.json'))" + + security: + name: Security Scan + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install bandit safety + + - name: Run Bandit security scan + run: | + bandit -r takeover.py -f json -o bandit-report.json + continue-on-error: true + + - name: Check dependencies for vulnerabilities + run: | + safety check --json + continue-on-error: true + + build: + name: Build and Package + runs-on: ubuntu-latest + needs: test + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install build wheel setuptools + + - name: Build package + run: | + python -m build + + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: dist-packages + path: dist/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7cdb9a1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,186 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased] - 2024 + +### Added - Functionality Enhancements + +#### Service Signatures +- **70+ Service Signatures** - Added modern cloud services: + - Vercel, Netlify, Railway, Render + - Azure, DigitalOcean, Cloudflare Pages + - GitLab Pages, Firebase + - Squarespace, Wix, Supabase +- **JSON Configuration** - `services.json` file for easy service signature updates +- **Custom Services Support** - Load custom service definitions via `-S` flag +- **Dynamic Service Loading** - Services loaded from JSON with fallback to defaults + +#### DNS Resolution & Validation +- **DNS Resolution Checking** - Validates domains resolve before scanning +- **CNAME Validation** - Checks CNAME records match expected service patterns +- **False Positive Reduction** - DNS checks significantly reduce false positives +- **Optional DNS Checking** - Can be disabled with `-n` flag for faster scanning + +#### Output Formats +- **CSV Export** - Structured data export for spreadsheet analysis +- **HTML Reports** - Beautiful, styled HTML reports with summary statistics +- **Enhanced JSON** - Improved JSON structure with metadata +- **Text Format** - Maintained original text output format + +#### Rate Limiting +- **Configurable Rate Limiting** - Control requests per second with `-r` flag +- **Thread-Safe Implementation** - Works correctly with multi-threaded scanning +- **Prevents Blocking** - Avoid overwhelming targets or getting IP blocked + +#### Testing & Quality +- **Unit Tests** - Comprehensive pytest test suite (test_takeover.py) +- **Test Coverage** - Tests for all major functions and edge cases +- **CI/CD Pipeline** - GitHub Actions workflow for automated testing +- **Multi-Platform Testing** - Tests run on Ubuntu, Windows, and macOS +- **Python Version Support** - Tested on Python 3.8, 3.9, 3.10, 3.11, 3.12 + +#### Documentation +- **requirements.txt** - Standard Python dependencies file +- **requirements-dev.txt** - Development dependencies for testing/linting +- **Enhanced README** - Comprehensive documentation with examples +- **Code Documentation** - Docstrings for all functions with type hints +- **Contributing Guide** - Instructions for contributors + +### Changed - Code Quality & Maintainability + +#### Architecture +- **Removed Global Variables** - Replaced with `ScanConfig` dataclass +- **Class-Based Design** - `TakeoverScanner` class for better organization +- **Type Hints** - Full type annotations throughout codebase +- **Dataclasses** - Modern Python dataclass for configuration + +#### Error Handling +- **Specific Exception Handling** - Catches specific request exceptions +- **Informative Error Messages** - Clear error messages with context +- **Graceful Degradation** - Continues scanning on individual failures +- **Verbose Error Reporting** - Optional detailed error output + +#### Code Standards +- **Named Constants** - HTTP status codes as named constants +- **Removed Magic Numbers** - All hardcoded values replaced with constants +- **Removed Redundant Code** - Cleaned up unnecessary assignments +- **Better Function Names** - More descriptive function names + +#### Input Validation +- **Domain Validation** - Regex-based domain format validation +- **File Path Validation** - Validates file paths before reading +- **Invalid Domain Filtering** - Automatically filters invalid domains from lists +- **Early Validation** - Validates inputs before processing + +### Security Improvements + +#### SSL/TLS +- **Configurable SSL Verification** - Enable with `-s` flag +- **SSL Warnings** - Warns when SSL verification is disabled +- **Secure by Default** - Encourages SSL verification usage + +#### User Agent +- **Updated User Agent** - Modern Chrome 120 user agent string +- **Configurable User Agent** - Custom user agents via `-u` flag + +#### Dependencies +- **Security Scanning** - Bandit and Safety checks in CI/CD +- **Updated Dependencies** - Latest versions of requests and urllib3 +- **Vulnerability Monitoring** - Automated dependency vulnerability checks + +### Performance + +#### Optimization +- **Connection Reuse** - Better connection handling +- **Efficient Threading** - Improved thread pool management +- **Rate Limiting** - Prevents resource exhaustion +- **DNS Caching** - DNS resolver with timeout configuration + +### Developer Experience + +#### CLI Improvements +- **Better Help Text** - Organized help with sections and examples +- **More Options** - Additional flags for fine-tuning behavior +- **Clear Examples** - Usage examples in help output + +#### Code Quality Tools +- **Linting** - flake8 configuration +- **Formatting** - black code formatter +- **Type Checking** - mypy static type checking +- **Pre-commit Hooks** - Ready for pre-commit integration + +### Files Added +- `services.json` - Service signature definitions +- `requirements.txt` - Python dependencies +- `requirements-dev.txt` - Development dependencies +- `test_takeover.py` - Unit test suite +- `.github/workflows/ci.yml` - CI/CD pipeline +- `CHANGELOG.md` - This file + +### Files Modified +- `takeover.py` - Major refactoring with new features +- `README.md` - Comprehensive documentation update +- `setup.py` - Updated dependencies + +## [0.2] - Previous Version + +### Features +- Basic subdomain takeover detection +- Multiple service signatures +- Multi-threaded scanning +- Proxy support +- JSON and TXT output +- Verbose mode + +--- + +## Migration Guide + +### For Users + +**Old Command:** +```bash +python takeover.py -d example.com -o output.json -v +``` + +**New Command (same functionality):** +```bash +python takeover.py -d example.com -o output.json -v +``` + +**New Features:** +```bash +# With DNS checking and rate limiting +python takeover.py -d example.com -o output.html -v -r 5 + +# With SSL verification +python takeover.py -d example.com -v -s + +# With custom services +python takeover.py -d example.com -S my_services.json -v +``` + +### For Developers + +**Breaking Changes:** +- None - All existing functionality maintained + +**New APIs:** +- `load_services(services_file)` - Load services from JSON +- `check_dns_resolution(domain, verbose)` - Check DNS resolution +- `check_cname_match(cnames, service_cnames)` - Validate CNAME records +- `validate_domain(domain)` - Validate domain format +- `savecsv()`, `savehtml()` - New output formats + +**Deprecated:** +- Global `_output` variable (replaced with parameter passing) +- Global `k_` dictionary (replaced with `ScanConfig` dataclass) + +--- + +## Acknowledgments + +- Original author: M'hamed (@m4ll0k) Outaadi +- Fork maintainer: [@edoardottt](https://github.com/edoardottt) +- Contributors: All who have submitted issues and pull requests diff --git a/README.md b/README.md index 2216602..be22aa3 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,19 @@ This allows an attacker to set up a page on the service that was being used and For example, if **subdomain.example.com** was pointing to a GitHub page and the user decided to delete their GitHub page, an attacker can now create a GitHub page, add a **CNAME** file containing **subdomain.example.com**, and claim **subdomain.example.com**. For more information read +## Features + +- ✅ **70+ Service Signatures** - Detects takeover vulnerabilities across major platforms +- ✅ **DNS Resolution Checking** - Validates CNAME records to reduce false positives +- ✅ **Multiple Output Formats** - Export results as TXT, JSON, CSV, or HTML +- ✅ **Rate Limiting** - Configurable request rate to avoid overwhelming targets +- ✅ **Custom Service Definitions** - Load service signatures from JSON files +- ✅ **SSL Verification** - Optional SSL certificate validation +- ✅ **Multi-threaded Scanning** - Concurrent domain checking for faster results +- ✅ **Input Validation** - Validates domain formats before processing +- ✅ **Comprehensive Testing** - Unit tests with pytest +- ✅ **CI/CD Pipeline** - Automated testing with GitHub Actions + ## Supported Services - Acquia @@ -64,6 +77,18 @@ For more information read or -v -python3 takeover.py -l uber-sub-domains.txt -o output.txt -p http://xxx.xxx.xxx.xxx:8080 -v -python3 takeover.py -d uber-sub-domains.txt -o output.txt -T 3 -v +# Scan a single domain +python3 takeover.py -d www.domain.com -v + +# Scan multiple domains from file +python3 takeover.py -l domains.txt -v + +# Save results to file (supports .txt, .json, .csv, .html) +python3 takeover.py -d www.domain.com -o results.html -v ``` +### Advanced Options + +```console +# Use custom threads and timeout +python3 takeover.py -d www.domain.com -v -t 10 -T 30 + +# Use proxy +python3 takeover.py -d www.domain.com -p http://127.0.0.1:8080 -v + +# Enable SSL verification +python3 takeover.py -d www.domain.com -v -s + +# Rate limiting (5 requests per second) +python3 takeover.py -l domains.txt -r 5 -v + +# Use custom services file +python3 takeover.py -d www.domain.com -S custom_services.json -v + +# Disable DNS checking +python3 takeover.py -d www.domain.com -n -v + +# Full example with all options +python3 takeover.py -l domains.txt -o output.csv -t 10 -T 30 -r 5 -s -v +``` + +### Command Line Options + +**Basic Options:** +- `-d` : Set domain URL (e.g: www.test.com) +- `-l` : Scan multiple targets in a text file +- `-o` : Output file (supports .txt, .json, .csv, .html) +- `-v` : Verbose, print more info + +**Advanced Options:** +- `-t` : Set threads (default: 1) +- `-T` : Set request timeout in seconds (default: 20) +- `-p` : Use a proxy to connect the target URL +- `-u` : Set custom user agent +- `-r` : Rate limit in requests/second (default: 0 = no limit) +- `-S` : Custom services JSON file path + +**Security & Validation:** +- `-s` : Verify SSL certificates (disabled by default) +- `-n` : Disable DNS resolution checking +- `-k` : Process 200 HTTP code (may cause false positives) + ## Docker support Build the image: @@ -104,6 +179,58 @@ Run the container: docker run -it --rm takeover -d www.domain.com -v ``` +## Testing + +Run the test suite: + +```console +# Install development dependencies +pip install -r requirements-dev.txt + +# Run tests +pytest test_takeover.py -v + +# Run tests with coverage +pytest test_takeover.py -v --cov=takeover --cov-report=html + +# Run linting +flake8 takeover.py +black --check takeover.py +mypy takeover.py --ignore-missing-imports +``` + +## Custom Services File + +You can define your own service signatures in a JSON file: + +```json +{ + "services": { + "MyService": { + "error": "error pattern regex", + "cname": ["expected-cname.example.com"] + } + } +} +``` + +Then use it with the `-S` flag: + +```console +python3 takeover.py -d example.com -S my_services.json -v +``` + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +1. Fork the repository +2. Create your feature branch (`git checkout -b feature/AmazingFeature`) +3. Run tests (`pytest test_takeover.py -v`) +4. Commit your changes (`git commit -m 'Add some AmazingFeature'`) +5. Push to the branch (`git push origin feature/AmazingFeature`) +6. Open a Pull Request + --------- This repository is under [MIT License](https://github.com/edoardottt/takeover/blob/master/LICENSE). diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..07d1b61 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,14 @@ +-r requirements.txt + +# Testing +pytest>=7.4.0 +pytest-cov>=4.1.0 + +# Linting and formatting +flake8>=6.1.0 +black>=23.7.0 +mypy>=1.5.0 + +# Security scanning +bandit>=1.7.5 +safety>=2.3.5 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a9221b9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +requests>=2.31.0 +urllib3>=2.0.0 +dnspython>=2.4.0 + +# Development dependencies (optional) +# Install with: pip install -r requirements.txt +# For testing: pip install pytest pytest-cov +# For linting: pip install flake8 black mypy diff --git a/services.json b/services.json new file mode 100644 index 0000000..b605f83 --- /dev/null +++ b/services.json @@ -0,0 +1,288 @@ +{ + "services": { + "AWS/S3": { + "error": "The specified bucket does not exist", + "cname": ["s3.amazonaws.com"] + }, + "BitBucket": { + "error": "Repository not found", + "cname": ["bitbucket.io"] + }, + "Github": { + "error": "There isn\\'t a Github Pages site here\\.|a Github Pages site here", + "cname": ["github.io", "github.com"] + }, + "Shopify": { + "error": "Sorry\\, this (shop|store) is currently unavailable\\.", + "cname": ["myshopify.com"] + }, + "Fastly": { + "error": "Fastly error\\: unknown domain\\:", + "cname": ["fastly.net"] + }, + "Ghost": { + "error": "The thing you were looking for is no longer here\\, or never was", + "cname": ["ghost.io"] + }, + "Heroku": { + "error": "no-such-app.html|no such app|herokucdn.com/error-pages/no-such-app.html|No such app", + "cname": ["herokuapp.com", "herokussl.com"] + }, + "Pantheon": { + "error": "The gods are wise, but do not know of the site which you seek|404 error unknown site", + "cname": ["pantheonsite.io"] + }, + "Tumblr": { + "error": "Whatever you were looking for doesn\\'t currently exist at this address", + "cname": ["tumblr.com"] + }, + "Wordpress": { + "error": "Do you want to register", + "cname": ["wordpress.com"] + }, + "TeamWork": { + "error": "Oops - We didn\\'t find your site.", + "cname": ["teamwork.com"] + }, + "Helpjuice": { + "error": "We could not find what you\\'re looking for.", + "cname": ["helpjuice.com"] + }, + "Helpscout": { + "error": "No settings were found for this company:", + "cname": ["helpscoutdocs.com"] + }, + "Cargo": { + "error": "404 — File not found", + "cname": ["cargocollective.com"] + }, + "Uservoice": { + "error": "This UserVoice subdomain is currently available", + "cname": ["uservoice.com"] + }, + "Surge.sh": { + "error": "project not found", + "cname": ["surge.sh"] + }, + "Intercom": { + "error": "This page is reserved for artistic dogs\\.|Uh oh\\. That page doesn\\'t exist", + "cname": ["intercom.io"] + }, + "Webflow": { + "error": "

The page you are looking for doesn\\'t exist or has been moved.

|The page you are looking for doesn\\'t exist or has been moved", + "cname": ["webflow.io"] + }, + "Kajabi": { + "error": "

The page you were looking for doesn\\'t exist.

", + "cname": ["kajabi.com"] + }, + "Thinkific": { + "error": "You may have mistyped the address or the page may have moved.", + "cname": ["thinkific.com"] + }, + "Tave": { + "error": "

Error 404: Page Not Found

", + "cname": ["tave.com"] + }, + "Wishpond": { + "error": "

https://www.wishpond.com/404?campaign=true", + "cname": ["wishpond.com"] + }, + "Aftership": { + "error": "Oops.

The page you\\'re looking for doesn\\'t exist.", + "cname": ["aftership.com"] + }, + "Aha": { + "error": "There is no portal here \\.\\.\\. sending you back to Aha!", + "cname": ["aha.io"] + }, + "Tictail": { + "error": "to target URL: Error Code: 404

", + "cname": ["brightcove.com", "bcvp0rtal.com"] + }, + "Bigcartel": { + "error": "

Oops! We couldn’t find that page.

", + "cname": ["bigcartel.com"] + }, + "ActiveCampaign": { + "error": "alt=\\\"LIGHTTPD - fly light.\\\"", + "cname": ["activehosted.com"] + }, + "Campaignmonitor": { + "error": "Double check the URL or
|Unrecognized domain", + "cname": ["mashery.com"] + }, + "Divio": { + "error": "Application not responding", + "cname": ["divio-media.com"] + }, + "Feedpress": { + "error": "The feed has not been found.", + "cname": ["redirect.feedpress.me"] + }, + "Readme.io": { + "error": "Project doesnt exist... yet!", + "cname": ["readme.io"] + }, + "Statuspage": { + "error": "You are being ", + "cname": ["statuspage.io"] + }, + "Zendesk": { + "error": "Help Center Closed", + "cname": ["zendesk.com"] + }, + "Worksites.net": { + "error": "Hello! Sorry, but the webs>", + "cname": ["worksites.net"] + }, + "Agile CRM": { + "error": "this page is no longer available", + "cname": ["agilecrm.com"] + }, + "Anima": { + "error": "try refreshing in a minute|this is your website and you've just created it", + "cname": ["animaapp.io"] + }, + "Fly.io": { + "error": "404 Not Found", + "cname": ["fly.dev"] + }, + "Gemfury": { + "error": "This page could not be found", + "cname": ["fury.io"] + }, + "HatenaBlog": { + "error": "404 Blog is not found", + "cname": ["hatenablog.com"] + }, + "Kinsta": { + "error": "No Site For Domain", + "cname": ["kinsta.com", "kinsta.cloud"] + }, + "LaunchRock": { + "error": "It looks like you may have taken a wrong turn somewhere|worry...it happens to all of us", + "cname": ["launchrock.com"] + }, + "Ngrok": { + "error": "ngrok.io not found", + "cname": ["ngrok.io"] + }, + "SmartJobBoard": { + "error": "This job board website is either expired or its domain name is invalid", + "cname": ["smartjobboard.com"] + }, + "Strikingly": { + "error": "page not found", + "cname": ["strikingly.com"] + }, + "Uberflip": { + "error": "hub domain\\, The URL you\\'ve accessed does not provide a hub", + "cname": ["uberflip.com"] + }, + "Unbounce": { + "error": "The requested URL was not found on this server", + "cname": ["unbounce.com"] + }, + "Uptimerobot": { + "error": "page not found", + "cname": ["stats.uptimerobot.com"] + }, + "Vercel": { + "error": "The deployment could not be found|404: NOT_FOUND", + "cname": ["vercel.app", "vercel-dns.com", "now.sh"] + }, + "Netlify": { + "error": "Not Found - Request ID:|Page Not Found|404 - Page not found", + "cname": ["netlify.app", "netlify.com"] + }, + "Railway": { + "error": "404 - Application Not Found|Railway - 404", + "cname": ["railway.app", "up.railway.app"] + }, + "Render": { + "error": "404 - Not Found|The page you are looking for does not exist", + "cname": ["onrender.com"] + }, + "Azure": { + "error": "404 Web Site not found|The resource you are looking for has been removed", + "cname": ["azurewebsites.net", "cloudapp.azure.com", "azure-api.net"] + }, + "DigitalOcean": { + "error": "Domain not found|404 - Page not found", + "cname": ["ondigitalocean.app"] + }, + "Cloudflare Pages": { + "error": "404 - Not found|The page you are looking for does not exist", + "cname": ["pages.dev"] + }, + "GitLab Pages": { + "error": "The page you're looking for could not be found|404 - Page Not Found", + "cname": ["gitlab.io"] + }, + "Firebase": { + "error": "The requested URL was not found on this server|404 Not Found", + "cname": ["firebaseapp.com", "web.app"] + }, + "Squarespace": { + "error": "No Such Account|This domain is not connected to a website", + "cname": ["squarespace.com"] + }, + "Wix": { + "error": "Error ConnectYourDomain occurred|This site is not published yet", + "cname": ["wixsite.com", "wix.com"] + }, + "Supabase": { + "error": "404 - Not Found|Project not found", + "cname": ["supabase.co"] + } + } +} diff --git a/takeover.py b/takeover.py index 35e815d..d1c67a7 100644 --- a/takeover.py +++ b/takeover.py @@ -13,8 +13,21 @@ import getopt import sys import re +import warnings +import time +import csv +from typing import Optional, Tuple, List, Dict, Any +from dataclasses import dataclass, field +from pathlib import Path +try: + import dns.resolver + DNS_AVAILABLE = True +except ImportError: + DNS_AVAILABLE = False + +# ANSI Color codes r = "\033[1;31m" g = "\033[1;32m" y = "\033[1;33m" @@ -25,29 +38,174 @@ b_ = "\033[0;34m" e = "\033[0m" -global _output -_output = [] -global k_ -k_ = { - "domain": None, - "threads": 1, - "d_list": None, - "proxy": None, - "output": None, - "timeout": None, - "process": False, - "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 \ -(KHTML, like Gecko) Chrome/76.0.3809.36 Safari/537.36", - "verbose": False, - "dict_len": 0, -} - - -# index/lenght * 100 -def PERCENT(x, y): +# HTTP Status Code Constants +HTTP_OK = 200 +HTTP_CREATED = 201 +HTTP_MAX_ERROR = 599 + +# Default Configuration +DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +DEFAULT_TIMEOUT = 20 +DEFAULT_THREADS = 1 +DEFAULT_RATE_LIMIT = 0 # requests per second, 0 = no limit + + +@dataclass +class ScanConfig: + """Configuration for the subdomain takeover scanner.""" + domain: Optional[str] = None + threads: int = DEFAULT_THREADS + d_list: Optional[str] = None + proxy: Optional[str] = None + output: Optional[str] = None + timeout: Optional[int] = None + process: bool = False + user_agent: str = DEFAULT_USER_AGENT + verbose: bool = False + verify_ssl: bool = False + check_dns: bool = True + rate_limit: float = DEFAULT_RATE_LIMIT + services_file: Optional[str] = None + domains: List[str] = field(default_factory=list) + dict_len: int = 0 + + +class TakeoverScanner: + """Main scanner class for subdomain takeover detection.""" + + def __init__(self, config: ScanConfig): + self.config = config + self.output: List[Tuple[str, str, str]] = [] + + +def percent(x: int, y: int) -> float: + """Calculate percentage. + + Args: + x: Current value + y: Total value + + Returns: + Percentage as float + """ return (float(x) / float(y)) * 100 if y != 0 else 0 +def load_services(services_file: Optional[str] = None) -> Dict[str, Dict[str, Any]]: + """Load service signatures from JSON file. + + Args: + services_file: Path to custom services JSON file + + Returns: + Dictionary of service signatures + """ + # Try custom file first + if services_file and os.path.exists(services_file): + try: + with open(services_file, 'r') as f: + data = json.load(f) + return data.get('services', {}) + except Exception as e: + warn(f"Failed to load custom services file: {e}") + + # Try default services.json in same directory as script + script_dir = Path(__file__).parent + default_file = script_dir / 'services.json' + + if default_file.exists(): + try: + with open(default_file, 'r') as f: + data = json.load(f) + return data.get('services', {}) + except Exception as e: + warn(f"Failed to load default services file: {e}") + + # Fallback to hardcoded services + return get_default_services() + + +def get_default_services() -> Dict[str, Dict[str, Any]]: + """Get default hardcoded service signatures as fallback. + + Returns: + Dictionary of service signatures + """ + return { + "AWS/S3": {"error": r"The specified bucket does not exist"}, + "BitBucket": {"error": r"Repository not found"}, + "Github": {"error": r"There isn\\'t a Github Pages site here\\.|a Github Pages site here"}, + "Shopify": {"error": r"Sorry\\, this (shop|store) is currently unavailable\\."}, + "Heroku": {"error": r"no-such-app.html|no such app|No such app"}, + "Netlify": {"error": r"Not Found - Request ID:|Page Not Found|404 - Page not found"}, + "Vercel": {"error": r"The deployment could not be found|404: NOT_FOUND"}, + } + + +def check_dns_resolution(domain: str, verbose: bool = False) -> Optional[Tuple[bool, List[str]]]: + """Check if domain resolves and get CNAME records. + + Args: + domain: Domain to check + verbose: Verbose output + + Returns: + Tuple of (has_cname, cname_list) or None if DNS check fails + """ + if not DNS_AVAILABLE: + return None + + try: + # Remove protocol if present + clean_domain = domain.replace('http://', '').replace('https://', '').split('/')[0] + + resolver = dns.resolver.Resolver() + resolver.timeout = 5 + resolver.lifetime = 5 + + # Try to get CNAME records + try: + cname_answers = resolver.resolve(clean_domain, 'CNAME') + cnames = [str(rdata.target).rstrip('.') for rdata in cname_answers] + return (True, cnames) + except dns.resolver.NoAnswer: + # No CNAME, but domain might still resolve via A record + try: + resolver.resolve(clean_domain, 'A') + return (False, []) + except: + return (False, []) + except dns.resolver.NXDOMAIN: + # Domain doesn't exist + if verbose: + info(f"Domain does not exist: {clean_domain}") + return (False, []) + except Exception: + return None + except Exception as e: + if verbose: + warn(f"DNS check failed for {domain}: {str(e)}") + return None + + +def check_cname_match(cnames: List[str], service_cnames: List[str]) -> bool: + """Check if any CNAME matches service CNAME patterns. + + Args: + cnames: List of CNAME records from DNS + service_cnames: List of expected CNAME patterns for service + + Returns: + True if match found + """ + for cname in cnames: + for service_cname in service_cnames: + if service_cname in cname: + return True + return False + + +# Global services dictionary - will be loaded from JSON or defaults services = { "AWS/S3": {"error": r"The specified bucket does not exist"}, "BitBucket": {"error": r"Repository not found"}, @@ -154,70 +312,138 @@ def PERCENT(x, y): } -def plus(string): +def plus(string: str) -> None: + """Print success message.""" print("{0}[ + ]{1} {2}".format(g, e, string)) -def warn(string, exit=not 1): +def warn(string: str, exit: bool = False) -> None: + """Print warning message and optionally exit. + + Args: + string: Warning message + exit: Whether to exit after warning + """ print("{0}[ ! ]{1} {2}".format(r, e, string)) if exit: - sys.exit() + sys.exit(1) -def info(string): +def info(string: str) -> None: + """Print info message.""" print("{0}[ i ]{1} {2}".format(y, e, string)) -def _info(): +def _info() -> str: + """Return formatted info prefix.""" return "{0}[ i ]{1} ".format(y, e) -def err(string): +def err(string: str) -> None: + """Print error regex pattern.""" print(r" |= [REGEX]: {0}{1}{2}".format(y_, string, e)) -def request(domain, proxy, timeout, user_agent): +def validate_domain(domain: str) -> bool: + """Validate domain format. + + Args: + domain: Domain to validate + + Returns: + True if valid, False otherwise + """ + if not domain or not isinstance(domain, str): + return False + + # Remove protocol if present + domain_part = domain.replace('http://', '').replace('https://', '').split('/')[0] + + # Basic domain validation regex + domain_pattern = re.compile( + r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' + ) + + return bool(domain_pattern.match(domain_part)) or domain_part == 'localhost' + + +def request(domain: str, proxy: Optional[str], timeout: Optional[int], + user_agent: str, verify_ssl: bool = False) -> Optional[Tuple[int, bytes]]: + """Make HTTP request to domain. + + Args: + domain: Target domain + proxy: Proxy URL if any + timeout: Request timeout in seconds + user_agent: User agent string + verify_ssl: Whether to verify SSL certificates + + Returns: + Tuple of (status_code, content) or None on failure + """ url = checkurl(domain) - timeout = timeout - proxies = {"http": proxy, "https": proxy} - redirect = True + proxies = {"http": proxy, "https": proxy} if proxy else None headers = {"User-Agent": user_agent} + try: - req = requests.packages.urllib3.disable_warnings( - urllib3.exceptions.InsecureRequestWarning - ) + # Disable SSL warnings if verification is disabled + if not verify_ssl: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + req = requests.get( url=url, headers=headers, - verify=False, - allow_redirects=redirect, - timeout=int(timeout) if timeout is not None else None, + verify=verify_ssl, + allow_redirects=True, + timeout=timeout if timeout is not None else DEFAULT_TIMEOUT, proxies=proxies, ) return req.status_code, req.content - except Exception: - pass - # if k_.get("d_list"): - # print("") - # warn("Failed to establish a new connection for: %s" % (domain), 1) - # else: - # warn("Failed to establish a new connection for: %s" % (domain), 1) - - -def find(status, content, ok): + except requests.exceptions.Timeout: + if verify_ssl: + warn(f"Timeout connecting to: {domain}") + return None + except requests.exceptions.ConnectionError: + if verify_ssl: + warn(f"Connection error for: {domain}") + return None + except requests.exceptions.RequestException as e: + if verify_ssl: + warn(f"Request failed for {domain}: {str(e)}") + return None + except Exception as e: + warn(f"Unexpected error for {domain}: {str(e)}") + return None + + +def find(status: int, content: bytes, ok: bool) -> Optional[Tuple[str, str]]: + """Find matching service based on response. + + Args: + status: HTTP status code + content: Response content + ok: Whether to process 200 status codes + + Returns: + Tuple of (service_name, error_pattern) or None + """ + min_status = HTTP_OK if ok else HTTP_CREATED + content_str = str(content) + for service in services: for values in services[service].items(): if ( - re.findall(str(values[1]), str(content), re.I) - and int(status) in range(201 if ok is False else 200, 599) - and "nginx" not in str(content) - and "openresty" - not in str(content) # avoid false positives (Cargo mainly) + re.findall(str(values[1]), content_str, re.I) + and min_status <= int(status) <= HTTP_MAX_ERROR + and "nginx" not in content_str + and "openresty" not in content_str # avoid false positives (Cargo mainly) ): return str(service), str(values[1]) + return None -def banner(): +def banner() -> None: + """Print application banner.""" print("\n /~\\") print(" C oo ---------------") print(" _( ^) |T|A|K|E|O|V|E|R|") @@ -229,42 +455,93 @@ def banner(): print() -def help(_exit_=False): +def help(_exit_: bool = False) -> None: + """Print help message. + + Args: + _exit_: Whether to exit after printing help + """ banner() print("Usage: %s [OPTION]\n" % sys.argv[0]) + print("Basic Options:") print("\t-d\tSet domain URL (e.g: www.test.com)") - print("\t-t\tSet threads, default 1") print("\t-l\tScan multiple targets in a text file") + print("\t-o\tOutput file (supports .txt, .json, .csv, .html)") + print("\t-v\tVerbose, print more info\n") + print("Advanced Options:") + print("\t-t\tSet threads (default: 1)") + print("\t-T\tSet request timeout in seconds (default: 20)") print("\t-p\tUse a proxy to connect the target URL") - print("\t-o\tUse this settings for save a file, args=json or text") - print("\t-T\tSet a request timeout,default value is 20 seconds") - print("\t-k\tProcess 200 http code, cause more false positive") print("\t-u\tSet custom user agent (e.g: takeover-bot)") - print("\t-v\tVerbose, print more info\n") + print("\t-r\tRate limit in requests/second (default: 0 = no limit)") + print("\t-S\tCustom services JSON file path\n") + print("Security & Validation:") + print("\t-s\tVerify SSL certificates (disabled by default)") + print("\t-n\tDisable DNS resolution checking") + print("\t-k\tProcess 200 HTTP code (may cause false positives)\n") + print("Examples:") + print("\tpython takeover.py -d example.com -v") + print("\tpython takeover.py -l domains.txt -o results.html -v -s") + print("\tpython takeover.py -d example.com -r 5 -t 10 -o output.csv\n") if _exit_: sys.exit() -def checkpath(path): +def checkpath(path: str) -> str: + """Check if path exists and is valid. + + Args: + path: File path to check + + Returns: + The path if valid + + Raises: + SystemExit: If path is invalid + """ if os.path.exists(path): + if os.path.isdir(path): + warn('"%s" is directory!' % path, True) return path - elif os.path.isdir(path): - warn('"%s" is directory!', 1) - elif os.path.exists(path) is False: - warn('"%s" not exists!' % path, 1) else: - warn('Error in: "%s"' % path, 1) - - -def readfile(path): + warn('"%s" not exists!' % path, True) + return path + + +def readfile(path: str) -> List[str]: + """Read domains from file. + + Args: + path: Path to file containing domains + + Returns: + List of domain strings + """ info('Read wordlist... "%s"' % path) - return [x.strip() for x in open(checkpath(path), "r")] - - -def checkurl(url): + validated_path = checkpath(path) + with open(validated_path, "r") as f: + domains = [x.strip() for x in f if x.strip()] + + # Validate domains + invalid_domains = [d for d in domains if not validate_domain(d)] + if invalid_domains: + warn(f"Warning: {len(invalid_domains)} invalid domains found and will be skipped") + + return [d for d in domains if validate_domain(d)] + + +def checkurl(url: str) -> str: + """Normalize and validate URL. + + Args: + url: URL to check + + Returns: + Normalized URL with scheme + """ o = urllib.parse.urlsplit(url) if o.scheme not in ["http", "https", ""]: - warn('Scheme "%s" not supported!' % o.scheme, 1) + warn('Scheme "%s" not supported!' % o.scheme, True) if o.netloc == "": return "http://" + o.path elif o.netloc: @@ -273,172 +550,465 @@ def checkurl(url): return "http://" + o.netloc -def print_(string): +def print_(string: str) -> None: + """Print string with terminal control codes for updating same line. + + Args: + string: String to print + """ sys.stdout.write("\033[1K") sys.stdout.write("\033[0G") sys.stdout.write(string) sys.stdout.flush() -def runner(k): - threadpool = thread.ThreadPoolExecutor(max_workers=k.get("threads")) - if k.get("verbose"): - info("Set %s threads.." % k.get("threads")) - futures = ( - threadpool.submit( +def runner(config: ScanConfig, output_list: List[Tuple[str, str, str]]) -> None: + """Run the scanner with thread pool. + + Args: + config: Scanner configuration + output_list: List to store results + """ + threadpool = thread.ThreadPoolExecutor(max_workers=config.threads) + if config.verbose: + info("Set %s threads.." % config.threads) + if not config.verify_ssl: + warn("SSL verification is disabled. Use -s flag to enable.") + if config.check_dns and not DNS_AVAILABLE: + warn("DNS checking disabled: dnspython not installed") + if config.rate_limit > 0: + info(f"Rate limiting enabled: {config.rate_limit} requests/second") + + # Rate limiting setup + request_interval = 1.0 / config.rate_limit if config.rate_limit > 0 else 0 + last_request_time = [0.0] # Use list to allow modification in closure + + def rate_limited_submit(domain): + """Submit with rate limiting.""" + if request_interval > 0: + current_time = time.time() + time_since_last = current_time - last_request_time[0] + if time_since_last < request_interval: + time.sleep(request_interval - time_since_last) + last_request_time[0] = time.time() + + return threadpool.submit( requester, domain, - k.get("proxy"), - k.get("timeout"), - k.get("user_agent"), - k.get("output"), - k.get("process"), - k.get("verbose"), + config.proxy, + config.timeout, + config.user_agent, + config.output, + config.process, + config.verbose, + config.verify_ssl, + config.check_dns, + output_list, + config.d_list, ) - for domain in k.get("domains") - ) + + futures = [rate_limited_submit(domain) for domain in config.domains] + for i, _ in enumerate(thread.as_completed(futures)): - if k.get("verbose") and k.get("d_list"): + if config.verbose and config.d_list: str_ = "{i}{b:.2f}% Domain: {d}".format( i=_info(), - b=PERCENT(int(i), int(k.get("dict_len"))), - d=k.get("domains")[i], + b=percent(int(i), int(config.dict_len)), + d=config.domains[i], ) print_(str_) - # else: - # info("Domain: {}".format(k.get("domains")[i])) - pass -def requester(domain, proxy, timeout, user_agent, output, ok, v): - code, html = request(domain, proxy, timeout, user_agent) - service, error = find(code, html, ok) - if service and error: +def requester(domain: str, proxy: Optional[str], timeout: Optional[int], + user_agent: str, output: Optional[str], ok: bool, verbose: bool, + verify_ssl: bool, check_dns: bool, output_list: List[Tuple[str, str, str]], + d_list: Optional[str]) -> None: + """Make request and check for takeover vulnerability. + + Args: + domain: Target domain + proxy: Proxy URL + timeout: Request timeout + user_agent: User agent string + output: Output file path + ok: Process 200 status codes + verbose: Verbose output + verify_ssl: Verify SSL certificates + check_dns: Check DNS resolution + output_list: List to append results + d_list: Domain list file path + """ + # Validate domain before processing + if not validate_domain(domain): + if verbose: + warn(f"Invalid domain format: {domain}") + return + + # Check DNS resolution if enabled + dns_result = None + if check_dns and DNS_AVAILABLE: + dns_result = check_dns_resolution(domain, verbose) + if dns_result and not dns_result[0] and not dns_result[1]: + # Domain doesn't resolve and has no CNAME + if verbose: + info(f"Skipping {domain} - no DNS resolution") + return + + result = request(domain, proxy, timeout, user_agent, verify_ssl) + if result is None: + return + + code, html = result + match = find(code, html, ok) + + if match: + service, error = match + + # Additional CNAME validation if DNS checking is enabled + if check_dns and dns_result and dns_result[0]: + # Has CNAME, check if it matches the service + service_info = services.get(service, {}) + service_cnames = service_info.get('cname', []) + + if service_cnames and not check_cname_match(dns_result[1], service_cnames): + # CNAME doesn't match expected service, might be false positive + if verbose: + warn(f"CNAME mismatch for {domain} - expected {service_cnames}, got {dns_result[1]}") + return + if output: - _output.append((domain, service, error)) - if v and not k_.get("d_list"): + output_list.append((domain, service, error)) + if verbose and not d_list: plus( "%s service found! Potential domain takeover found! - %s" % (service, domain) ) - elif v and k_.get("d_list"): + elif verbose and d_list: print("") plus( "%s service found! Potential domain takeover found! - %s" % (service, domain) ) else: - if k_.get("d_list"): + if d_list: print("") plus( "%s service found! Potential domain takeover found! - %s" % (service, domain) ) - elif not k_.get("d_list"): + elif not d_list: plus( "%s service found! Potential domain takeover found! - %s" % (service, domain) ) - if v: + if verbose: err(error) -def savejson(path, content, v): - if v and not k_.get("d_list"): +def savejson(path: str, content: List[Tuple[str, str, str]], + verbose: bool, d_list: Optional[str]) -> None: + """Save results to JSON file. + + Args: + path: Output file path + content: List of (domain, service, error) tuples + verbose: Verbose output + d_list: Domain list file path + """ + if verbose and not d_list: info("Writing file..") - elif v and k_.get("d_list"): + elif verbose and d_list: print("") info("Writing file..") - a = {} - b = {"domains": {}} - for i in content: - a.update({i[0]: {"service": i[1], "error": i[2]}}) - b["domains"] = a + + domains_dict = {} + for domain, service, error in content: + domains_dict[domain] = {"service": service, "error": error} + + output_data = {"domains": domains_dict} + with open(path, "w+") as outjsonfile: - json.dump(b, outjsonfile, indent=4) - outjsonfile.close() + json.dump(output_data, outjsonfile, indent=4) + info("Saved at " + path + "..") -def savetxt(path, content, v): - if v and not k_.get("d_list"): +def savetxt(path: str, content: List[Tuple[str, str, str]], + verbose: bool, d_list: Optional[str]) -> None: + """Save results to text file. + + Args: + path: Output file path + content: List of (domain, service, error) tuples + verbose: Verbose output + d_list: Domain list file path + """ + if verbose and not d_list: info("Writing file..") - elif v and k_.get("d_list"): + elif verbose and d_list: print("") info("Writing file..") + br = "-" * 40 bf = "=" * 40 - out = "" + br + "\n" - for i in content: - out += "Domain\t: %s\n" % i[0] - out += "Service\t: %s\n" % i[1] - out += "Error\t: %s\n" % i[2] - out += "" + bf + "\n" - out += "" + br + "\n" + out = br + "\n" + + for domain, service, error in content: + out += "Domain\t: %s\n" % domain + out += "Service\t: %s\n" % service + out += "Error\t: %s\n" % error + out += bf + "\n" + + out += br + "\n" + with open(path, "w+") as outtxtfile: outtxtfile.write(out) - outtxtfile.close() + + info("Saved at " + path + "..") + + +def savecsv(path: str, content: List[Tuple[str, str, str]], + verbose: bool, d_list: Optional[str]) -> None: + """Save results to CSV file. + + Args: + path: Output file path + content: List of (domain, service, error) tuples + verbose: Verbose output + d_list: Domain list file path + """ + if verbose and not d_list: + info("Writing file..") + elif verbose and d_list: + print("") + info("Writing file..") + + with open(path, "w+", newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(['Domain', 'Service', 'Error Pattern']) + for domain, service, error in content: + writer.writerow([domain, service, error]) + info("Saved at " + path + "..") -def main(): - # -- +def savehtml(path: str, content: List[Tuple[str, str, str]], + verbose: bool, d_list: Optional[str]) -> None: + """Save results to HTML file. + + Args: + path: Output file path + content: List of (domain, service, error) tuples + verbose: Verbose output + d_list: Domain list file path + """ + if verbose and not d_list: + info("Writing file..") + elif verbose and d_list: + print("") + info("Writing file..") + + html_template = """ + + + + + Takeover Scan Results + + + +

🔍 Subdomain Takeover Scan Results

+
+

Total Vulnerabilities Found: {count}

+

Scan Date: {timestamp}

+
+ + + + + + + + + + +{rows} + +
#DomainServiceError Pattern
+ +""" + + rows = "" + for i, (domain, service, error) in enumerate(content, 1): + rows += f""" + {i} + {domain} + {service} + {error[:100]}... + +""" + + from datetime import datetime + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + html_content = html_template.format( + count=len(content), + timestamp=timestamp, + rows=rows + ) + + with open(path, "w+", encoding='utf-8') as htmlfile: + htmlfile.write(html_content) + + info("Saved at " + path + "..") + + +def main() -> None: + """Main entry point for the application.""" + global services + if len(sys.argv) < 2: - help(1) + help(True) + + # Initialize configuration + config = ScanConfig() + try: opts, _ = getopt.getopt( sys.argv[1:], - "d:l:p:o:t:T::u:kv", - ["d=", "l=", "p=", "v", "o=", "t=", "T=", "u=", "k"], + "d:l:p:o:t:T::u:r:S:kvsnD", + ["d=", "l=", "p=", "v", "o=", "t=", "T=", "u=", "r=", "S=", "k", "s", "n", "D"], ) except Exception as e: - warn(e, 1) + warn(str(e), True) + for o, a in opts: if o == "-d": - k_["domain"] = a - if o == "-t": - k_["threads"] = int(a) - if o == "-l": - k_["d_list"] = a - if o == "-p": - k_["proxy"] = a - if o == "-o": - k_["output"] = a - if o == "-T": - k_["timeout"] = int(a) - if o == "-k": - k_["process"] = True - if o == "-u": - k_["user_agent"] = a - if o == "-v": - k_["verbose"] = True - - if k_.get("domain") or k_.get("d_list"): + config.domain = a + elif o == "-t": + config.threads = int(a) + elif o == "-l": + config.d_list = a + elif o == "-p": + config.proxy = a + elif o == "-o": + config.output = a + elif o == "-T": + config.timeout = int(a) + elif o == "-k": + config.process = True + elif o == "-u": + config.user_agent = a + elif o == "-v": + config.verbose = True + elif o == "-s": + config.verify_ssl = True + elif o == "-r": + config.rate_limit = float(a) + elif o == "-S": + config.services_file = a + elif o == "-n": + config.check_dns = False + elif o == "-D": + # Debug: show DNS info only + if config.verbose: + info(f"DNS library available: {DNS_AVAILABLE}") + + if config.domain or config.d_list: banner() - domains = [] - if k_.get("verbose"): + + # Load services from JSON or defaults + services = load_services(config.services_file) + if config.verbose: + info(f"Loaded {len(services)} service signatures") + + domains: List[str] = [] + + if config.verbose: info("Starting..") - if k_.get("d_list"): - domains.extend(readfile(k_.get("d_list"))) + if config.d_list: + domains.extend(readfile(config.d_list)) else: - domains.append(k_.get("domain")) - k_["domains"] = domains - k_["dict_len"] = len(domains) - runner(k_) - if k_.get("output"): - if ".txt" in k_.get("output"): - savetxt(k_.get("output"), _output, k_.get("verbose")) - elif ".json" in k_.get("output"): - savejson(k_.get("output"), _output, k_.get("verbose")) + if config.domain: + if not validate_domain(config.domain): + warn(f"Invalid domain format: {config.domain}", True) + domains.append(config.domain) + + config.domains = domains + config.dict_len = len(domains) + + # Create output list + output_list: List[Tuple[str, str, str]] = [] + + runner(config, output_list) + + if config.output: + if ".txt" in config.output: + savetxt(config.output, output_list, config.verbose, config.d_list) + elif ".json" in config.output: + savejson(config.output, output_list, config.verbose, config.d_list) + elif ".csv" in config.output: + savecsv(config.output, output_list, config.verbose, config.d_list) + elif ".html" in config.output or ".htm" in config.output: + savehtml(config.output, output_list, config.verbose, config.d_list) else: + extension = config.output.split(".")[-1] if "." in config.output else "unknown" warn( - "Output Error: %s extension not supported, only .txt or .json" - % k_.get("output").split(".")[1], - 1, + "Output Error: %s extension not supported, only .txt, .json, .csv, or .html" % extension, + True, ) - elif k_.get("domain") is None and k_.get("d_list") is None: - help(1) + else: + help(True) if __name__ == "__main__": diff --git a/test_takeover.py b/test_takeover.py new file mode 100644 index 0000000..f1f8bbd --- /dev/null +++ b/test_takeover.py @@ -0,0 +1,297 @@ +""" +Unit tests for takeover subdomain takeover scanner. +Run with: pytest test_takeover.py -v +""" + +import pytest +import os +import json +import tempfile +from pathlib import Path +from takeover import ( + validate_domain, + percent, + checkurl, + load_services, + get_default_services, + check_cname_match, + ScanConfig, +) + + +class TestDomainValidation: + """Tests for domain validation functionality.""" + + def test_valid_domain(self): + """Test valid domain formats.""" + assert validate_domain("example.com") == True + assert validate_domain("subdomain.example.com") == True + assert validate_domain("test-domain.example.co.uk") == True + assert validate_domain("localhost") == True + + def test_invalid_domain(self): + """Test invalid domain formats.""" + assert validate_domain("") == False + assert validate_domain(None) == False + assert validate_domain("invalid domain with spaces") == False + assert validate_domain("http://") == False + assert validate_domain("-.example.com") == False + + def test_domain_with_protocol(self): + """Test domains with protocol prefixes.""" + assert validate_domain("http://example.com") == True + assert validate_domain("https://example.com") == True + + +class TestURLChecking: + """Tests for URL normalization.""" + + def test_url_with_http(self): + """Test URL with http protocol.""" + assert checkurl("http://example.com") == "http://example.com" + + def test_url_with_https(self): + """Test URL with https protocol.""" + assert checkurl("https://example.com") == "https://example.com" + + def test_url_without_protocol(self): + """Test URL without protocol.""" + assert checkurl("example.com") == "http://example.com" + + def test_url_with_path(self): + """Test URL with path component.""" + result = checkurl("example.com/path") + assert result.startswith("http://") + + +class TestPercentCalculation: + """Tests for percentage calculation.""" + + def test_normal_percentage(self): + """Test normal percentage calculation.""" + assert percent(50, 100) == 50.0 + assert percent(1, 4) == 25.0 + assert percent(3, 4) == 75.0 + + def test_zero_total(self): + """Test percentage with zero total.""" + assert percent(10, 0) == 0 + + def test_zero_current(self): + """Test percentage with zero current.""" + assert percent(0, 100) == 0.0 + + +class TestServiceLoading: + """Tests for service signature loading.""" + + def test_get_default_services(self): + """Test getting default services.""" + services = get_default_services() + assert isinstance(services, dict) + assert len(services) > 0 + assert "AWS/S3" in services + assert "Github" in services + assert "Netlify" in services + + def test_load_services_from_json(self): + """Test loading services from JSON file.""" + # Create temporary JSON file + test_services = { + "services": { + "TestService": { + "error": "test error pattern", + "cname": ["test.example.com"] + } + } + } + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(test_services, f) + temp_file = f.name + + try: + services = load_services(temp_file) + assert "TestService" in services + assert services["TestService"]["error"] == "test error pattern" + finally: + os.unlink(temp_file) + + def test_load_services_fallback(self): + """Test fallback to default services when file doesn't exist.""" + services = load_services("nonexistent_file.json") + assert isinstance(services, dict) + assert len(services) > 0 + + +class TestCNAMEMatching: + """Tests for CNAME matching functionality.""" + + def test_cname_match_found(self): + """Test when CNAME matches.""" + cnames = ["example.herokuapp.com", "other.com"] + service_cnames = ["herokuapp.com"] + assert check_cname_match(cnames, service_cnames) == True + + def test_cname_match_not_found(self): + """Test when CNAME doesn't match.""" + cnames = ["example.github.io"] + service_cnames = ["herokuapp.com", "netlify.app"] + assert check_cname_match(cnames, service_cnames) == False + + def test_cname_empty_lists(self): + """Test with empty lists.""" + assert check_cname_match([], []) == False + assert check_cname_match(["example.com"], []) == False + assert check_cname_match([], ["example.com"]) == False + + +class TestScanConfig: + """Tests for ScanConfig dataclass.""" + + def test_default_config(self): + """Test default configuration values.""" + config = ScanConfig() + assert config.threads == 1 + assert config.verbose == False + assert config.verify_ssl == False + assert config.check_dns == True + assert config.rate_limit == 0 + assert config.domains == [] + + def test_custom_config(self): + """Test custom configuration values.""" + config = ScanConfig( + domain="example.com", + threads=10, + verbose=True, + rate_limit=5.0 + ) + assert config.domain == "example.com" + assert config.threads == 10 + assert config.verbose == True + assert config.rate_limit == 5.0 + + +class TestFileOperations: + """Tests for file reading operations.""" + + def test_read_domain_list(self): + """Test reading domains from file.""" + from takeover import readfile + + # Create temporary file with domains + domains = ["example.com", "test.com", "subdomain.example.org"] + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + for domain in domains: + f.write(domain + "\n") + temp_file = f.name + + try: + result = readfile(temp_file) + assert len(result) == 3 + assert "example.com" in result + assert "test.com" in result + finally: + os.unlink(temp_file) + + def test_read_domain_list_with_invalid(self): + """Test reading domains with some invalid entries.""" + from takeover import readfile + + # Create file with mix of valid and invalid domains + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("example.com\n") + f.write("invalid domain with spaces\n") + f.write("test.com\n") + temp_file = f.name + + try: + result = readfile(temp_file) + # Should only include valid domains + assert "example.com" in result + assert "test.com" in result + assert "invalid domain with spaces" not in result + finally: + os.unlink(temp_file) + + +class TestOutputFormats: + """Tests for output file generation.""" + + def test_json_output(self): + """Test JSON output generation.""" + from takeover import savejson + + test_data = [ + ("example.com", "Heroku", "no-such-app"), + ("test.com", "Github", "404 not found") + ] + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + temp_file = f.name + + try: + savejson(temp_file, test_data, False, None) + + with open(temp_file, 'r') as f: + data = json.load(f) + + assert "domains" in data + assert "example.com" in data["domains"] + assert data["domains"]["example.com"]["service"] == "Heroku" + finally: + os.unlink(temp_file) + + def test_csv_output(self): + """Test CSV output generation.""" + from takeover import savecsv + import csv + + test_data = [ + ("example.com", "Heroku", "no-such-app"), + ("test.com", "Github", "404 not found") + ] + + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + temp_file = f.name + + try: + savecsv(temp_file, test_data, False, None) + + with open(temp_file, 'r', newline='') as f: + reader = csv.reader(f) + rows = list(reader) + + assert len(rows) == 3 # Header + 2 data rows + assert rows[0] == ['Domain', 'Service', 'Error Pattern'] + assert rows[1][0] == 'example.com' + finally: + os.unlink(temp_file) + + def test_html_output(self): + """Test HTML output generation.""" + from takeover import savehtml + + test_data = [ + ("example.com", "Heroku", "no-such-app"), + ] + + with tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False) as f: + temp_file = f.name + + try: + savehtml(temp_file, test_data, False, None) + + with open(temp_file, 'r', encoding='utf-8') as f: + content = f.read() + + assert "" in content + assert "example.com" in content + assert "Heroku" in content + finally: + os.unlink(temp_file) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])