From 2649f4f3140bad98b80c8da2b41aca03753570d2 Mon Sep 17 00:00:00 2001 From: angel Date: Mon, 15 Jun 2026 19:52:15 -0700 Subject: [PATCH 1/2] security: remove API key from proxy URL, use SecretStr for credentials (#27) - Change proxy_api_key and ai_api_key to pydantic SecretStr in AppSettings so model_dump()/repr() never expose raw key values. - Refactor _build_proxy_url() to return (base_url, auth_tuple) instead of embedding credentials in the URL string. - Use httpx.Proxy(url, auth=...) so credentials are never in exception messages, debug logs, or tracebacks. - Update all consumers (agent_engine, ai_analyst, CLI, doctor) to use .get_secret_value() where the raw string is needed. - Add 5 new SecretStr redaction tests verifying model_dump, repr, and URL string safety. Closes #27 --- src/adapters/ai_analyst.py | 2 +- src/adapters/http_client.py | 34 ++++++---- src/cli/doctor.py | 3 +- src/cli/main.py | 6 +- src/core/config.py | 6 +- src/core/services/agent_engine.py | 2 +- tests/test_http_client.py | 108 ++++++++++++++++++++++-------- 7 files changed, 111 insertions(+), 50 deletions(-) diff --git a/src/adapters/ai_analyst.py b/src/adapters/ai_analyst.py index 121f241..cbc5507 100644 --- a/src/adapters/ai_analyst.py +++ b/src/adapters/ai_analyst.py @@ -710,7 +710,7 @@ async def analyze_person( clean_person.profiles = [p for p in clean_person.profiles if p.exists] - api_key = (settings.ai_api_key or "").strip() + api_key = (settings.ai_api_key.get_secret_value() if settings.ai_api_key else "").strip() if not api_key: # Sin API key: si es un provider local OpenAI-compatible, usamos dummy. # En providers hosted (DeepSeek/Groq/etc.) caemos a heurístico. diff --git a/src/adapters/http_client.py b/src/adapters/http_client.py index b8a01ca..5485fe3 100644 --- a/src/adapters/http_client.py +++ b/src/adapters/http_client.py @@ -34,27 +34,29 @@ _API_PROXY_ENDPOINT = "proxy.scrapingant.com:8080" -def _build_proxy_url(settings: AppSettings) -> str | None: +def _build_proxy_url( + settings: AppSettings, +) -> tuple[str, tuple[str, str]] | tuple[None, None]: """Build a ScrapingAnt proxy URL from settings. Supports two modes: 1. **Standalone proxies** (when ``proxy_username`` is set): - ``http://customer-USER-country-CC:KEY@residential.scrapingant.com:8080`` + ``http://residential.scrapingant.com:8080`` + auth tuple 2. **API Proxy Mode** (fallback, when only ``proxy_api_key`` is set): - ``http://scrapingant&browser=false&proxy_type=MODE:KEY@proxy.scrapingant.com:8080`` + ``http://proxy.scrapingant.com:8080`` + auth tuple - Returns an ``http://user:pass@host:port`` string suitable for - ``httpx.AsyncClient(proxy=...)``, or ``None`` when proxy is disabled. + Returns ``(base_url, (username, password))`` — credentials are **never** + embedded in the URL string — or ``(None, None)`` when proxy is disabled. """ mode = settings.effective_proxy_mode if not mode or not settings.proxy_api_key: - return None + return None, None if mode not in ("residential", "datacenter"): - return None + return None, None - password = settings.proxy_api_key + password = settings.proxy_api_key.get_secret_value() if settings.proxy_username: # ── Standalone residential/datacenter proxy product ── @@ -70,7 +72,7 @@ def _build_proxy_url(settings: AppSettings) -> str | None: username += f"&proxy_country={settings.proxy_country}" endpoint = _API_PROXY_ENDPOINT - return f"http://{username}:{password}@{endpoint}" + return f"http://{endpoint}", (username, password) # --------------------------------------------------------------------------- @@ -87,6 +89,10 @@ def build_async_client( Por qué un builder: - Centraliza timeouts/headers para que todas las fuentes se comporten igual. - Inyecta proxy ScrapingAnt de forma transparente si está configurado. + + Security: proxy credentials are passed via ``httpx.Proxy(auth=...)`` + instead of being embedded in the URL, so they never leak into exception + messages, debug logs, or tracebacks. """ settings = settings or AppSettings() @@ -97,14 +103,18 @@ def build_async_client( if extra_headers: headers.update(extra_headers) - proxy_url = _build_proxy_url(settings) + proxy_base, proxy_auth = _build_proxy_url(settings) + + proxy = None + if proxy_base and proxy_auth: + proxy = httpx.Proxy(proxy_base, auth=proxy_auth) return httpx.AsyncClient( timeout=httpx.Timeout(settings.http_timeout_seconds), follow_redirects=True, headers=headers, - proxy=proxy_url, - verify=proxy_url is None, # Proxy handles TLS termination. + proxy=proxy, + verify=proxy is None, # Proxy handles TLS termination. ) diff --git a/src/cli/doctor.py b/src/cli/doctor.py index e856a8d..7149f17 100644 --- a/src/cli/doctor.py +++ b/src/cli/doctor.py @@ -79,7 +79,8 @@ def run() -> None: # Proxy (ScrapingAnt) proxy_mode = settings.effective_proxy_mode if proxy_mode: - key_masked = "****" + (settings.proxy_api_key or "")[-4:] if settings.proxy_api_key else "(none)" + raw_key = settings.proxy_api_key.get_secret_value() if settings.proxy_api_key else "" + key_masked = "****" + raw_key[-4:] if raw_key else "(none)" country = settings.proxy_country.upper() if settings.proxy_country else "auto" table.add_row("Proxy mode", "OK", f"{proxy_mode} (ScrapingAnt)") table.add_row("Proxy API key", "OK", key_masked) diff --git a/src/cli/main.py b/src/cli/main.py index 1630232..5e76874 100644 --- a/src/cli/main.py +++ b/src/cli/main.py @@ -149,7 +149,7 @@ def _configure_ai_for_run( base_url = preset["base_url"] model = preset["model"] - key = (ai_key or "").strip() or (settings.ai_api_key or "").strip() + key = (ai_key or "").strip() or (settings.ai_api_key.get_secret_value() if settings.ai_api_key else "").strip() # Para proveedores locales (Ollama), la key es opcional/dummy. if provider == "ollama" and not key: @@ -1526,7 +1526,7 @@ def wizard() -> None: # Ensure AI is configured. settings_now = AppSettings() - if not (settings_now.ai_api_key or "").strip(): + if not (settings_now.ai_api_key.get_secret_value() if settings_now.ai_api_key else "").strip(): console.print( "[yellow]Agent mode requires an AI provider.[/yellow]" ) @@ -1660,7 +1660,7 @@ def wizard() -> None: deep_analyze = Confirm.ask("Run AI analysis?", default=True) if deep_analyze: settings_now = AppSettings() - if not (settings_now.ai_api_key or "").strip() and settings_now.ai_base_url.startswith("https://api.deepseek"): + if not (settings_now.ai_api_key.get_secret_value() if settings_now.ai_api_key else "").strip() and settings_now.ai_base_url.startswith("https://api.deepseek"): if Confirm.ask("No AI key configured. Configure a free-tier provider now (recommended)?", default=True): provider = Prompt.ask( "Provider", diff --git a/src/core/config.py b/src/core/config.py index 840f364..5d72f00 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -13,7 +13,7 @@ import sys from pathlib import Path -from pydantic import Field +from pydantic import Field, SecretStr from pydantic_settings import BaseSettings, SettingsConfigDict from core.domain.language import Language @@ -107,7 +107,7 @@ class AppSettings(BaseSettings): description="User-Agent para peticiones OSINT.", ) - ai_api_key: str | None = Field( + ai_api_key: SecretStr | None = Field( default=None, description="API key para el proveedor IA (DeepSeek compatible OpenAI).", ) @@ -196,7 +196,7 @@ class AppSettings(BaseSettings): "Auto-detected from proxy_api_key if not set explicitly." ), ) - proxy_api_key: str | None = Field( + proxy_api_key: SecretStr | None = Field( default=None, description="ScrapingAnt API/proxy key (password for proxy auth).", ) diff --git a/src/core/services/agent_engine.py b/src/core/services/agent_engine.py index 5ae0c78..92e4237 100644 --- a/src/core/services/agent_engine.py +++ b/src/core/services/agent_engine.py @@ -131,7 +131,7 @@ async def run( ) client = AsyncOpenAI( - api_key=self._settings.ai_api_key, + api_key=self._settings.ai_api_key.get_secret_value(), base_url=self._settings.ai_base_url, ) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index f720fd4..4c9888c 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -17,11 +17,11 @@ def test_residential_with_username(self): proxy_username="angeldOzt2u", proxy_mode="residential", ) - url = _build_proxy_url(settings) - assert url == ( - "http://customer-angeldOzt2u:my-password" - "@residential.scrapingant.com:8080" - ) + base_url, auth = _build_proxy_url(settings) + assert base_url == "http://residential.scrapingant.com:8080" + assert auth == ("customer-angeldOzt2u", "my-password") + # Key must NOT appear in the URL + assert "my-password" not in base_url def test_datacenter_with_username(self): settings = AppSettings( @@ -29,11 +29,10 @@ def test_datacenter_with_username(self): proxy_username="angeldOzt2u", proxy_mode="datacenter", ) - url = _build_proxy_url(settings) - assert url == ( - "http://customer-angeldOzt2u:my-password" - "@datacenter.scrapingant.com:8080" - ) + base_url, auth = _build_proxy_url(settings) + assert base_url == "http://datacenter.scrapingant.com:8080" + assert auth == ("customer-angeldOzt2u", "my-password") + assert "my-password" not in base_url def test_residential_with_country(self): settings = AppSettings( @@ -42,11 +41,10 @@ def test_residential_with_country(self): proxy_mode="residential", proxy_country="us", ) - url = _build_proxy_url(settings) - assert url == ( - "http://customer-myuser-country-us:key123" - "@residential.scrapingant.com:8080" - ) + base_url, auth = _build_proxy_url(settings) + assert base_url == "http://residential.scrapingant.com:8080" + assert auth == ("customer-myuser-country-us", "key123") + assert "key123" not in base_url # --------------------------------------------------------------------------- @@ -60,11 +58,15 @@ def test_api_mode_residential(self): proxy_username=None, proxy_mode="residential", ) - url = _build_proxy_url(settings) - assert url == ( - "http://scrapingant&browser=false&proxy_type=residential" - ":my-api-key@proxy.scrapingant.com:8080" - ) + base_url, auth = _build_proxy_url(settings) + assert base_url == "http://proxy.scrapingant.com:8080" + assert auth is not None + username, password = auth + assert "scrapingant" in username + assert "proxy_type=residential" in username + assert password == "my-api-key" + # Key must NOT appear in the URL + assert "my-api-key" not in base_url def test_api_mode_with_country(self): settings = AppSettings( @@ -73,9 +75,11 @@ def test_api_mode_with_country(self): proxy_mode="residential", proxy_country="de", ) - url = _build_proxy_url(settings) - assert "proxy_country=de" in url - assert "proxy.scrapingant.com" in url + base_url, auth = _build_proxy_url(settings) + assert "proxy.scrapingant.com" in base_url + username, _ = auth + assert "proxy_country=de" in username + assert "key" not in base_url # --------------------------------------------------------------------------- @@ -85,24 +89,31 @@ def test_api_mode_with_country(self): class TestBuildProxyUrlEdgeCases: def test_no_proxy_when_no_key(self): settings = AppSettings(proxy_api_key=None) - assert _build_proxy_url(settings) is None + base_url, auth = _build_proxy_url(settings) + assert base_url is None + assert auth is None def test_auto_detect_mode_from_key(self): settings = AppSettings(proxy_api_key="test-key", proxy_mode=None) - url = _build_proxy_url(settings) - assert url is not None - assert "residential" in url + base_url, auth = _build_proxy_url(settings) + assert base_url is not None + assert "residential" in base_url + assert auth is not None def test_unknown_mode_returns_none(self): settings = AppSettings( proxy_api_key="key", proxy_mode="unknown_mode", ) - assert _build_proxy_url(settings) is None + base_url, auth = _build_proxy_url(settings) + assert base_url is None + assert auth is None def test_empty_key_returns_none(self): settings = AppSettings(proxy_api_key="", proxy_mode="residential") - assert _build_proxy_url(settings) is None + base_url, auth = _build_proxy_url(settings) + assert base_url is None + assert auth is None # --------------------------------------------------------------------------- @@ -142,3 +153,42 @@ def test_explicit_mode_overrides(self): def test_no_key_no_mode(self): settings = AppSettings(proxy_api_key=None, proxy_mode=None) assert settings.effective_proxy_mode is None + + +# --------------------------------------------------------------------------- +# SecretStr redaction (issue #27) +# --------------------------------------------------------------------------- + +class TestSecretStrRedaction: + def test_model_dump_does_not_expose_proxy_key(self): + settings = AppSettings(proxy_api_key="super-secret-key-12345") + dumped = settings.model_dump() + # SecretStr serializes as '**********' in model_dump + assert dumped["proxy_api_key"] != "super-secret-key-12345" + assert "super-secret" not in str(dumped["proxy_api_key"]) + + def test_model_dump_does_not_expose_ai_key(self): + settings = AppSettings(ai_api_key="sk-my-secret-ai-key") + dumped = settings.model_dump() + assert dumped["ai_api_key"] != "sk-my-secret-ai-key" + assert "sk-my-secret" not in str(dumped["ai_api_key"]) + + def test_proxy_url_does_not_contain_key(self): + settings = AppSettings( + proxy_api_key="LIVE_SECRET_KEY", + proxy_username="user1", + proxy_mode="residential", + ) + base_url, auth = _build_proxy_url(settings) + assert "LIVE_SECRET_KEY" not in base_url + assert auth[1] == "LIVE_SECRET_KEY" + + def test_secret_value_accessible_via_getter(self): + settings = AppSettings(proxy_api_key="my-key-value") + assert settings.proxy_api_key.get_secret_value() == "my-key-value" + + def test_repr_does_not_expose_key(self): + settings = AppSettings(proxy_api_key="secret123", ai_api_key="sk-secret") + repr_str = repr(settings) + assert "secret123" not in repr_str + assert "sk-secret" not in repr_str From fa1600fc26d74c799f5e793d2ca466a134d871cc Mon Sep 17 00:00:00 2001 From: angel Date: Mon, 15 Jun 2026 19:55:22 -0700 Subject: [PATCH 2/2] fix: correct auto-detect test assertion for API proxy endpoint Without proxy_username, the endpoint is proxy.scrapingant.com (not residential.scrapingant.com). Check 'residential' in auth username instead of URL. --- tests/test_http_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 4c9888c..6f78f1e 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -97,8 +97,9 @@ def test_auto_detect_mode_from_key(self): settings = AppSettings(proxy_api_key="test-key", proxy_mode=None) base_url, auth = _build_proxy_url(settings) assert base_url is not None - assert "residential" in base_url assert auth is not None + # Auto-detected mode is "residential" — visible in the auth username + assert "residential" in auth[0] def test_unknown_mode_returns_none(self): settings = AppSettings(