From fd7f88537c091700ecc5e53ddc79268524078faa Mon Sep 17 00:00:00 2001
From: Krittin Phornsiricharoenphant
Date: Tue, 7 Apr 2026 02:01:28 +0200
Subject: [PATCH 1/4] deprecate legacy scrapers and their old structure.
---
.../scrapers/integrations/__init__.py | 0
.../scrapers/integrations/git_scraper.py | 353 -------------
.../scrapers/integrations/sso_scraper.py | 466 ------------------
.../collectors/scrapers/scraped_resource.py | 11 -
.../collectors/scrapers/scraper.py | 314 ------------
.../collectors/scrapers/scraper_manager.py | 366 --------------
6 files changed, 1510 deletions(-)
delete mode 100644 src/data_manager/collectors/scrapers/integrations/__init__.py
delete mode 100644 src/data_manager/collectors/scrapers/integrations/git_scraper.py
delete mode 100644 src/data_manager/collectors/scrapers/integrations/sso_scraper.py
delete mode 100644 src/data_manager/collectors/scrapers/scraper.py
delete mode 100644 src/data_manager/collectors/scrapers/scraper_manager.py
diff --git a/src/data_manager/collectors/scrapers/integrations/__init__.py b/src/data_manager/collectors/scrapers/integrations/__init__.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/src/data_manager/collectors/scrapers/integrations/git_scraper.py b/src/data_manager/collectors/scrapers/integrations/git_scraper.py
deleted file mode 100644
index 7d73fd37a..000000000
--- a/src/data_manager/collectors/scrapers/integrations/git_scraper.py
+++ /dev/null
@@ -1,353 +0,0 @@
-import os
-import re
-import shutil
-from pathlib import Path
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
-
-from git import Repo
-from mkdocs.utils.yaml import yaml_load
-
-from src.utils.config_access import get_global_config
-from src.data_manager.collectors.scrapers.scraped_resource import ScrapedResource
-from src.utils.env import read_secret
-from src.utils.logging import get_logger
-
-logger = get_logger(__name__)
-
-if TYPE_CHECKING:
- from src.data_manager.collectors.scrapers.scraper_manager import \
- ScraperManager
-
-global_config = get_global_config()
-
-class GitScraper:
- """Scraper integration that clones Git repositories and indexes MkDocs sites and code files."""
-
- def __init__(self, manager: "ScraperManager", git_config: Optional[Dict[str, Any]] = None) -> None:
- self.manager = manager
- self.config = git_config or {}
-
- # where we clone our repos to
- self.data_path = global_config["DATA_PATH"]
- self.git_dir = Path(self.data_path) / "raw_git_repos"
- self.git_dir.mkdir(parents=True, exist_ok=True)
-
- self.code_suffixes = {
- suffix.lower()
- for suffix in (
- self.config.get(
- "code_suffixes",
- [
- ".py",
- ".js",
- ".ts",
- ".tsx",
- ".jsx",
- ".java",
- ".go",
- ".rs",
- ".c",
- ".cpp",
- ".h",
- ".hpp",
- ".sh",
- ".sql",
- ".json",
- ".yaml",
- ".yml",
- ".toml",
- ".md",
- ".txt",
- ],
- )
- or []
- )
- }
- self.exclude_dirs = {
- dir_name
- for dir_name in (
- self.config.get(
- "exclude_dirs",
- [
- ".git",
- "node_modules",
- ".venv",
- "venv",
- "__pycache__",
- ".idea",
- ".vscode",
- "dist",
- "build",
- ],
- )
- or []
- )
- }
- self.max_file_size_bytes = int(self.config.get("max_file_size_bytes", 1_000_000))
-
- self.git_username = read_secret("GIT_USERNAME")
- self.git_token = read_secret("GIT_TOKEN")
- self._credentials_available = bool(self.git_username and self.git_token)
- if not self._credentials_available:
- logger.info("No git credentials supplied; will attempt public repo cloning.")
-
- def collect(self, git_urls: List[str]) -> List[ScrapedResource]:
- if not git_urls:
- logger.warning("No git URLs provided for scraping; skipping git scraper.")
- return []
-
- harvested: List[ScrapedResource] = []
-
- for url in git_urls:
- try:
- repo_info = self._prepare_repository(url)
- except ValueError as exc:
- logger.info(f"{exc}")
- continue
- except Exception as exc:
- logger.error(f"Failed to clone {url}: {exc}")
- continue
-
- try:
- harvested.extend(self._harvest_repository(repo_info))
- finally:
- shutil.rmtree(repo_info["repo_path"], ignore_errors=True)
-
- if harvested:
- logger.info("Git scraping was completed successfully")
-
- return harvested
-
- def _prepare_repository(self, url: str) -> Dict[str, Any]:
- url_dict = self._parse_url(url)
- repo_path = self._clone_repo(url_dict)
- mkdocs_site_url = self._read_mkdocs_site_url(repo_path)
- ref = self._determine_ref(repo_path, url_dict["branch"])
- web_base_url = self._compute_web_base_url(url_dict["original_url"])
-
- return {
- "repo_path": repo_path,
- "repo_name": url_dict["repo_name"],
- "mkdocs_site_url": mkdocs_site_url,
- "ref": ref,
- "web_base_url": web_base_url,
- }
-
- def _harvest_repository(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]:
- resources: List[ScrapedResource] = []
- resources.extend(self._harvest_mkdocs(repo_info))
- resources.extend(self._harvest_code(repo_info))
- return resources
-
- def _harvest_mkdocs(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]:
- repo_path = repo_info["repo_path"]
- mkdocs_site_url = repo_info["mkdocs_site_url"]
- base_url = repo_info["web_base_url"]
- ref = repo_info["ref"]
- docs_dir = repo_path / "docs"
- if not docs_dir.exists():
- logger.info(f"Skipping MkDocs harvesting for {repo_path}; missing docs directory")
- return []
-
- resources: List[ScrapedResource] = []
- parent_repo = repo_info["repo_name"]
- used_blob_links = False
- for markdown_path in docs_dir.rglob("*.md"):
- if mkdocs_site_url:
- current_url = mkdocs_site_url + markdown_path.relative_to(docs_dir).with_suffix("").as_posix()
- else:
- current_url = self._build_blob_url(base_url, ref, markdown_path.relative_to(repo_path))
- used_blob_links = True
- logger.info(f"Indexing Git doc: {current_url}")
- text_content = markdown_path.read_text(encoding="utf-8")
- relative_path = Path(parent_repo) / markdown_path.relative_to(repo_path)
- resource = ScrapedResource(
- url=current_url,
- content=text_content,
- suffix=markdown_path.suffix.lstrip(".") or "txt",
- source_type="git",
- metadata={
- "repo_path": str(markdown_path.relative_to(repo_path)),
- "title": markdown_path.stem.replace("_", " ").replace("-", " ").title(),
- "parent": parent_repo,
- },
- file_name=markdown_path.name,
- relative_path=str(relative_path),
- )
- if resource.content:
- resources.append(resource)
- else:
- logger.info(f"Resource {current_url} is empty. Skipping...")
-
- if used_blob_links and not mkdocs_site_url:
- logger.info(f"Used repository blob URLs for MkDocs content in {repo_path} (site_url missing)")
-
- return resources
-
- def _harvest_code(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]:
- repo_path = repo_info["repo_path"]
- ref = repo_info["ref"]
- base_url = repo_info["web_base_url"]
- repo_name = repo_info["repo_name"]
-
- resources: List[ScrapedResource] = []
- for file_path in self._iter_code_files(repo_path):
- logger.debug(file_path)
- rel_path = file_path.relative_to(repo_path)
-
- # avoid overlap wtih _harvest_mkdocs
- if rel_path.parts and rel_path.parts[0] == "docs" and file_path.suffix.lower() == ".md":
- continue
-
- try:
- if file_path.stat().st_size > self.max_file_size_bytes:
- logger.warning(f"Skipping {file_path} due to file size")
- continue
- except OSError:
- continue
-
- if not self._is_allowed_suffix(file_path):
- logger.warning(f"Skipping {file_path} due to disallowed suffix")
- continue
-
- if self._looks_binary(file_path):
- logger.warning(f"Skipping {file_path} due to likely binary content")
- continue
-
- try:
- text_content = file_path.read_text(encoding="utf-8", errors="ignore")
- except Exception:
- continue
-
- if not text_content.strip():
- continue
-
- resource_url = self._build_blob_url(base_url, ref, rel_path)
- relative_path = Path(repo_name) / rel_path
- resource = ScrapedResource(
- url=resource_url,
- content=text_content,
- suffix=file_path.suffix.lstrip("."),
- source_type="git",
- metadata={
- "repo_path": str(rel_path),
- "parent": repo_name,
- "ref": ref,
- },
- file_name=file_path.name,
- relative_path=str(relative_path),
- )
- resources.append(resource)
-
- return resources
-
- def _parse_url(self, url: str) -> dict:
- branch_name = None
-
- regex_repo_name = r"(?:github|gitlab)\.[\w.]+\/[^\/]+\/([\w.-]+)(?:\.git|\/|$)"
- match = re.search(regex_repo_name, url, re.IGNORECASE)
- if not match:
- raise ValueError(f"The git url {url} does not match the expected format.")
-
- repo_name = match.group(1)
-
- # Only inject credentials if available (for private repos)
- if self._credentials_available:
- if "gitlab" in url:
- clone_from_url = url.replace("gitlab", f"{self.git_username}:{self.git_token}@gitlab")
- elif "github" in url:
- clone_from_url = url.replace("github", f"{self.git_username}:{self.git_token}@github")
- else:
- # For other hosts, try without credentials
- clone_from_url = url
- else:
- # No credentials - use URL as-is (for public repos)
- clone_from_url = url
-
- branch_split = re.split(r"/(?:-/)?tree/", clone_from_url, maxsplit=1)
- if len(branch_split) > 1:
- branch_name = branch_split[1].strip("/") or None
- clone_from_url = branch_split[0].rstrip("/")
-
- return {
- "original_url": url,
- "clone_url": clone_from_url,
- "repo_name": repo_name,
- "branch": branch_name,
- }
-
- def _clone_repo(self, url_dict: dict) -> Path:
- clone_url = url_dict["clone_url"]
- branch = url_dict["branch"]
- repo_name = url_dict["repo_name"]
-
- logger.info(f"Cloning repository {repo_name}...")
-
- repo_path = self.git_dir / repo_name
- if branch is None:
- Repo.clone_from(clone_url, repo_path)
- else:
- Repo.clone_from(clone_url, repo_path, branch=branch)
-
- return repo_path
-
- def _read_mkdocs_site_url(self, repo_path: Path) -> Optional[str]:
- mkdocs_file = repo_path / "mkdocs.yml"
- if not mkdocs_file.exists():
- return None
- try:
- with mkdocs_file.open("r") as file:
- data = yaml_load(file)
- site_url = data.get("site_url")
- if not site_url:
- return None
- return site_url if site_url.endswith("/") else site_url + "/"
- except Exception:
- logger.info(f"Could not read mkdocs.yml in {repo_path}")
- return None
-
- def _compute_web_base_url(self, original_url: str) -> str:
- sanitized = re.sub(r"//[^@/]+@", "//", original_url)
- sanitized = re.split(r"/(?:-/)?tree/", sanitized, maxsplit=1)[0]
- if sanitized.endswith(".git"):
- sanitized = sanitized[:-4]
- return sanitized.rstrip("/")
-
- def _determine_ref(self, repo_path: Path, requested_branch: Optional[str]) -> str:
- if requested_branch:
- return requested_branch
- repo: Optional[Repo] = None
- try:
- repo = Repo(repo_path)
- return repo.active_branch.name
- except Exception:
- try:
- repo = repo or Repo(repo_path)
- return repo.head.commit.hexsha[:7]
- except Exception:
- return "main"
-
- def _iter_code_files(self, repo_path: Path):
- for root, dirs, files in os.walk(repo_path):
- dirs[:] = [d for d in dirs if d not in self.exclude_dirs]
- for filename in files:
- file_path = Path(root) / filename
- yield file_path
-
- def _is_allowed_suffix(self, file_path: Path) -> bool:
- return file_path.suffix.lower() in self.code_suffixes
-
- def _looks_binary(self, file_path: Path) -> bool:
- try:
- with file_path.open("rb") as file:
- sample = file.read(8000)
- return b"\0" in sample
- except Exception:
- return True
-
- def _build_blob_url(self, base_url: str, ref: str, rel_path: Path) -> str:
- base = base_url.rstrip("/")
- rel = rel_path.as_posix()
- if "gitlab" in base:
- return f"{base}/-/blob/{ref}/{rel}"
- return f"{base}/blob/{ref}/{rel}"
diff --git a/src/data_manager/collectors/scrapers/integrations/sso_scraper.py b/src/data_manager/collectors/scrapers/integrations/sso_scraper.py
deleted file mode 100644
index d03877bfb..000000000
--- a/src/data_manager/collectors/scrapers/integrations/sso_scraper.py
+++ /dev/null
@@ -1,466 +0,0 @@
-import hashlib
-import importlib
-import json
-import os
-import re
-import time
-import urllib.parse
-from abc import ABC, abstractmethod
-from typing import Dict, List, Tuple
-
-from selenium import webdriver
-from selenium.webdriver.common.by import By
-from selenium.webdriver.firefox.options import Options as FirefoxOptions
-from selenium.webdriver.support import expected_conditions as EC
-from selenium.webdriver.support.ui import WebDriverWait
-from selenium.common.exceptions import TimeoutException
-
-from src.data_manager.collectors.scrapers.scraped_resource import \
- ScrapedResource, BrowserIntermediaryResult
-from src.utils.env import read_secret
-from src.utils.logging import get_logger
-
-logger = get_logger(__name__)
-
-class SSOScraper(ABC):
- """Generic base class for SSO-authenticated web scrapers."""
-
- def __init__(self, username=None, password=None, headless=True, site_type="generic", max_depth=2, selenium_url=None):
- """Initialize the SSO scraper with credentials and browser settings.
-
- Args:
- username (str, optional): SSO username. If None, will try to get from env vars.
- password (str, optional): SSO password. If None, will try to get from env vars.
- headless (bool): Whether to run the browser in headless mode.
- site_type (str): Type of site to scrape ('generic' or 'mkdocs')
- max_depth (int): Maximum number of levels to crawl per page.
- """
- self.username = username or self.get_username_from_env()
- self.password = password or self.get_password_from_env()
- self.headless = headless
- self.max_depth = max_depth
- self.site_type = site_type
- self.driver = None
- self.visited_urls = set()
- self.selenium_url = selenium_url
-
- if self.username:
- logger.info(f"Using username: {self.username}")
-
- def _is_image_url(self, url: str) -> bool:
- """Check if URL points to an image file."""
- image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico', '.webp')
- parsed_url = urllib.parse.urlparse(url)
- path = parsed_url.path.lower()
- return any(path.endswith(ext) for ext in image_extensions)
-
- @abstractmethod
- def get_username_from_env(self):
- """Get username from environment variables. Override in subclasses."""
- pass
-
- @abstractmethod
- def get_password_from_env(self):
- """Get password from environment variables. Override in subclasses."""
- pass
-
- @abstractmethod
- def login(self):
- """Login to SSO with the provided credentials. Override in subclasses."""
- pass
-
- def setup_driver(self):
- """Configure and initialize the Firefox WebDriver."""
- firefox_options = FirefoxOptions()
- if self.headless:
- firefox_options.add_argument("--headless")
-
- # Additional options for better performance in containers
- firefox_options.add_argument("--no-sandbox")
- firefox_options.add_argument("--disable-dev-shm-usage")
- firefox_options.add_argument("--disable-gpu")
- firefox_options.add_argument("--window-size=1920,1080")
-
- # Create Firefox profile with preferences
- firefox_profile = webdriver.FirefoxProfile()
- firefox_profile.set_preference("dom.disable_open_during_load", False)
- firefox_profile.set_preference("browser.download.folderList", 2)
- firefox_profile.set_preference("browser.download.manager.showWhenStarting", False)
- firefox_profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/pdf")
-
- # Initialize the driver with options
- if self.selenium_url:
- self.driver = webdriver.Remote(command_executor=self.selenium_url,options=firefox_options)
- else:
- self.driver = webdriver.Firefox(options=firefox_options)
- self.driver.set_page_load_timeout(30)
- logger.info(f"Starting Firefox browser in {'headless' if self.headless else 'visible'} mode...")
- return self.driver
-
- def navigate_to(self, url, wait_time=1):
- """Navigate to specified URL and wait for page to load."""
- if not self.driver:
- raise RuntimeError("WebDriver not initialized. Call setup_driver() first.")
-
- self.driver.get(url)
- time.sleep(wait_time) # Enable wait time for page loading
- logger.info(f"Navigated to {url}")
- logger.info(f"Page title: {self.driver.title}")
- return self.driver.title
-
- def get_links_with_same_hostname(self, base_url):
- """Extract all links from the current page that have the same hostname as base_url."""
- base_hostname = urllib.parse.urlparse(base_url).netloc
- links = []
-
- # Find all anchor tags
- if self.site_type == "mkdocs":
- # For MkDocs, prioritize navigation links
- anchors = self.driver.find_elements(By.CSS_SELECTOR, ".md-nav__link, .md-content a")
- else:
- anchors = self.driver.find_elements(By.TAG_NAME, "a")
-
- for anchor in anchors:
- try:
- href = anchor.get_attribute("href")
- if href and href.strip():
- parsed_url = urllib.parse.urlparse(href)
- # Check if the link has the same hostname and is not a fragment
- if parsed_url.netloc == base_hostname and parsed_url.scheme in ('http', 'https'):
- # Normalize the URL to prevent duplicates
- normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
- if parsed_url.query:
- normalized_url += f"?{parsed_url.query}"
-
- # this works for CMS twiki but should be generalized
- normalized_url = normalized_url.split("?")[0]
- if 'bin/rdiff' in normalized_url or 'bin/edit' in normalized_url or 'bin/oops' in normalized_url or 'bin/attach' in normalized_url or 'bin/genpdf' in normalized_url or '/WebIndex' in normalized_url:
- continue
-
- if not self._clear_url(normalized_url):
- continue
-
- # Skip image files
- if self._is_image_url(normalized_url):
- logger.debug(f"Skipping image URL: {normalized_url}")
- continue
-
- links.append(normalized_url)
-
- except Exception as e:
- logger.error(f"Error extracting link: {e}")
-
- return list(set(links)) # Remove duplicates
-
- def extract_page_data(self, current_url):
- """Return the raw HTML payload for the current page."""
- if not self.driver:
- raise RuntimeError("WebDriver not initialized. Call setup_driver() first.")
-
- title = self.driver.title or ""
- content = self.driver.page_source or ""
-
- return {
- "url": current_url,
- "title": title,
- "content": content,
- "suffix": "html",
- }
-
- def crawl(self, start_url):
- """Crawl pages starting from the given URL, storing title and content of each page.
-
- Args:
- start_url (str): The URL to start crawling from
-
- Returns:
- List[Dict]: A list of dictionaries describing each visited page.
- """
- max_depth = self.max_depth
- depth = 0
-
- if not self.driver:
- self.setup_driver()
-
- # Reset crawling state
- self.visited_urls = set()
- self.page_data = []
- to_visit = [start_url]
- level_links = []
-
- # First authenticate through the start URL
- self.authenticate_and_navigate(start_url)
-
- base_hostname = urllib.parse.urlparse(start_url).netloc
- logger.info(f"Base hostname for crawling: {base_hostname}")
- logger.info(f"Site type: {self.site_type}")
-
- # History record
- pages_visited = 0
- self.visited_urls = set()
-
- while to_visit and depth < max_depth:
- current_url = to_visit.pop(0)
-
- # Skip if we've already visited this URL
- if current_url in self.visited_urls:
- continue
-
- # Skip image files
- if self._is_image_url(current_url):
- logger.debug(f"Skipping image URL: {current_url}")
- self.visited_urls.add(current_url)
- continue
-
- logger.info(f"Crawling page {depth + 1}/{max_depth}: {current_url}")
-
- try:
- # Navigate to the page
- self.navigate_to(current_url, wait_time=2)
-
- # Mark as visited
- self.visited_urls.add(current_url)
- pages_visited += 1
-
- # Extract and store page data
- page_data = self.extract_page_data(current_url)
- self.page_data.append(page_data)
- logger.info(f"Extracted data from {current_url} ({len(page_data['content'])} chars)")
-
- # Get links to follow
- new_links = self.get_links_with_same_hostname(current_url)
- logger.info(f"Found {len(new_links)} links on the page (nv: {pages_visited})")
-
- # Add new links to visit
- for link in new_links:
- if link not in self.visited_urls and link not in to_visit and link not in level_links:
- logger.info(f"Found new link: {link} (nv: {pages_visited})")
- level_links.append(link)
-
- # Scan next level if to_visit is empty
- if not to_visit:
- to_visit.extend(level_links)
- level_links = []
- depth += 1
-
- except Exception as e:
- logger.info(f"Error crawling {current_url}: {e}", exc_info=True)
- self.visited_urls.add(current_url) # Mark as visited to avoid retrying
-
- logger.info(f"Crawling complete. Visited {pages_visited} pages.")
- return list(self.page_data)
-
- def _clear_url(self, url: str) -> bool:
- """Basic filtering for duplicate or fragment-only URLs."""
- if not url:
- return False
-
- # Ignore pure fragments or JavaScript links
- if url.startswith("javascript:"):
- return False
-
- return True
-
- def close(self):
- """Close the browser and clean up resources."""
- if self.driver:
- logger.info("Closing browser...")
- self.driver.quit()
- self.driver = None
-
- def authenticate_and_navigate(self, url):
- """Complete authentication flow and navigate to target URL."""
-
- if not self.driver:
- self.setup_driver()
-
- try:
- # First navigate to trigger SSO
- self.driver.get(url)
-
- # Login
- if self.login():
- # Navigate back to target page
- title = self.navigate_to(url)
- return title
- else:
- return None
- except Exception as e:
- logger.warning(f"Error during authentication: {e}", exc_info=True)
- return None
-
- def authenticate(self, url):
- """Complete authentication flow and navigate to target URL."""
- try:
- if not self.driver:
- self.setup_driver()
-
- # First navigate to trigger SSO
- self.driver.get(url)
-
- # Login
- if self.login():
- # Navigate back to target page
- return self.driver.get_cookies()
- else:
- return None
- except Exception as e:
- logger.warning(f"Error during authentication: {e}", exc_info=True)
- return None
-
- def __enter__(self):
- """Context manager entry point."""
- self.setup_driver()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """Context manager exit point."""
- self.close()
-
-
-class CERNSSOScraper(SSOScraper):
- """A scraper to handle CERN SSO authentication and page navigation."""
-
- def get_username_from_env(self):
- """Get CERN SSO username from environment variables."""
- return read_secret("SSO_USERNAME")
-
- def get_password_from_env(self):
- """Get CERN SSO password from environment variables."""
- return read_secret("SSO_PASSWORD")
-
- def login(self):
- """Login to CERN SSO with the provided credentials."""
- if not self.username or not self.password:
- raise ValueError("Missing credentials for CERN SSO")
-
- try:
- wait = WebDriverWait(self.driver, 20)
-
- # Wait for login form to appear
- username_input = wait.until(
- EC.presence_of_element_located((By.ID, "username"))
- )
- username_input.send_keys(self.username)
- # time.sleep(1) # Optional sleep to ensure the input is registered
-
- password_input = wait.until(EC.presence_of_element_located((By.ID, "password")))
- password_input.send_keys(self.password)
- # time.sleep(1) # Optional sleep to ensure the input is registered
-
- sign_in = wait.until(EC.presence_of_element_located((By.ID, "kc-login")))
- sign_in.click()
-
- logger.info("Login credentials submitted")
- return True
- except TimeoutException as e:
- logger.error(f"Could not find username or password fields in due time: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Error during login: {e}",exc_info=True)
- return False
-
-
-class SSOCollector:
- """Collects resources behind SSO-protected URLs using configured scrapers."""
-
- def __init__(self, selenium_config: Dict[str, Dict]) -> None:
- self._config = selenium_config or {}
- self._enabled = self._config.get("enabled", False)
- self._class_name = self._config.get("selenium_class", "")
- self._class_map = self._config.get("selenium_class_map", {})
-
- def collect(self, url: str) -> List[ScrapedResource]:
- if not self._enabled:
- logger.error("SSO is disabled or not configured")
- return []
-
- scraper_class, scraper_kwargs = self._resolve_scraper()
- if scraper_class is None:
- return []
-
- try:
- with scraper_class(**scraper_kwargs) as scraper:
- payload = scraper.crawl(url)
- resources = self._extract_resources(scraper, payload)
- if not resources:
- logger.warning(f"No content extracted from SSO crawl for {url}")
- return resources
- except Exception as exc: # pragma: no cover - defensive catch
- logger.error(f"SSO scraping failed for {url}: {exc}")
- return []
-
- def _resolve_scraper(self):
- entry = self._class_map.get(self._class_name)
- if not entry:
- logger.error(f"SSO class {self._class_name} not configured")
- return None, {}
-
- scraper_class = entry.get("class")
- if isinstance(scraper_class, str):
- module_name = entry.get(
- "module",
- "src.data_manager.collectors.scrapers.integrations.sso_scraper",
- )
- module = importlib.import_module(module_name)
- scraper_class = getattr(module, scraper_class)
-
- scraper_kwargs = entry.get("kwargs", {})
- return scraper_class, scraper_kwargs
-
- def _extract_resources(self, scraper, payload) -> List[ScrapedResource]:
- resources: List[ScrapedResource] = []
-
- page_data = getattr(scraper, "page_data", None)
- if isinstance(page_data, list):
- for page in page_data:
- if not isinstance(page, dict):
- continue
- page_url = page.get("url")
- content = page.get("content")
- if not page_url or content is None:
- continue
-
- resources.append(
- ScrapedResource(
- url=page_url,
- content=content,
- suffix=page.get("suffix", "html"),
- source_type="sso",
- metadata={
- "title": page.get("title"),
- },
- )
- )
-
- elif isinstance(payload, list):
- for item in payload:
- if not isinstance(item, dict):
- continue
- page_url = item.get("url")
- content = item.get("content")
- if not page_url or content is None:
- continue
- resources.append(
- ScrapedResource(
- url=page_url,
- content=content,
- suffix=item.get("suffix", "html"),
- source_type="sso",
- metadata={
- "visible": str(self._visible).lower(),
- },
- )
- )
-
- elif isinstance(payload, dict):
- for page_url in payload.values():
- logger.warning(
- f"SSO scraper returned mapping without page content; skipping {page_url}"
- )
-
- elif payload is not None:
- logger.warning(
- f"Unsupported SSO payload type {type(payload).__name__}"
- )
-
- return resources
diff --git a/src/data_manager/collectors/scrapers/scraped_resource.py b/src/data_manager/collectors/scrapers/scraped_resource.py
index 357eaaf41..080e4cbb7 100644
--- a/src/data_manager/collectors/scrapers/scraped_resource.py
+++ b/src/data_manager/collectors/scrapers/scraped_resource.py
@@ -74,14 +74,3 @@ def _safe_relative_path(self) -> Optional[Path]:
if rel_path.is_absolute() or ".." in rel_path.parts:
return None
return rel_path
-
-@dataclass
-class BrowserIntermediaryResult:
- """
- this class is meant to provide a layer of abstraction for browser based scrapers (i.e selenium)
- it will format everything into a single class so that more complicated scraping results which may hit
- multiple tabs or pages at once can be handled in a uniform way by the LinkScraper class.
- """
-
- artifacts: List[Dict] # list of scraper results for each page produced by a seelnium navigation
- links: List[str] # links reached
diff --git a/src/data_manager/collectors/scrapers/scraper.py b/src/data_manager/collectors/scrapers/scraper.py
deleted file mode 100644
index 7fe1ef0e3..000000000
--- a/src/data_manager/collectors/scrapers/scraper.py
+++ /dev/null
@@ -1,314 +0,0 @@
-import requests
-import re
-
-from typing import Dict, Iterator, List, Optional
-from bs4 import BeautifulSoup
-from urllib.parse import urlparse, urljoin, urldefrag
-
-from src.data_manager.collectors.scrapers.scraped_resource import \
- ScrapedResource
-from src.utils.logging import get_logger
-
-logger = get_logger(__name__)
-
-class LinkScraper:
- """
- Single scraper for all our link needs that handles Selenium and requests.
- This class explicitly handles requests, but if selenium scraping is enabled for a link
- everything is passed through to the driver including how the page data is collected and
- how the next level of links are found. This class DOESNT own the selenium driver, that is
- owned by the scraper manager class.
- """
-
- def __init__(self, verify_urls: bool = True, enable_warnings: bool = True) -> None:
- self.verify_urls = verify_urls
- self.enable_warnings = enable_warnings
- # seen_urls tracks anything queued/visited; visited_urls tracks pages actually crawled.
- self.visited_urls = set()
- self.seen_urls = set()
-
- def _is_image_url(self, url: str) -> bool:
- """Check if URL points to an image file."""
- image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico', '.webp')
- parsed_url = urlparse(url)
- path = parsed_url.path.lower()
- return any(path.endswith(ext) for ext in image_extensions)
-
- def reap(self, response, current_url: str, selenium_scrape: bool = False, authenticator = None):
- """
- probably the most complicated method here and most volatile in terms of maybe later needing a rewrite
-
- this method is here to deal with any result that it gets back. for a selenium resource it expects results as a
- BrowserIntermediaryResult, otherwhise it will handle it as a normal http response. it handles getting the next set
- of links and updating the page data gathered
-
- Args:
- response (BrowserIntermediaryResult | requests.response): whatever has been collected for the current_url by the scraper
- selenium_scrape (bool): whether or not selenium was used to scrape this content
- authenticator (SSOAuthenticator | None): client being used to crawl websites or just for auth
-
- Return (tuple[list[str], list[ScrapedResource]]): next links to crawl and resources collected
- """
-
- # mark as visited
- self._mark_visited(current_url)
-
- source_type = "web" if (authenticator is None) else "sso"
-
- resources = []
-
- if selenium_scrape: # deals with a selenium response (should work for both non authenitcated and authenticated sites in principle)
- assert(authenticator is not None) ## this shouldnt be tripped
-
- # For selenium scraping, we expect a simple dict from extract_page_data
- # containing url, title, content, suffix
- content = response.get("content", "")
- title = response.get("title", "")
- suffix = response.get("suffix", "html")
-
- resource = ScrapedResource(
- url=current_url,
- content=content,
- suffix=suffix,
- source_type=source_type,
- metadata={
- "title": title,
- "content_type": "rendered_html",
- "renderer": "selenium",
- },
- )
- res = authenticator.get_links_with_same_hostname(current_url)
- resources.append(resource)
-
- else: # deals with http response
- content_type = response.headers.get("Content-type")
-
- if current_url.lower().endswith(".pdf"):
- resource = ScrapedResource(
- url=current_url,
- content=response.content,
- suffix="pdf",
- source_type=source_type,
- metadata={"content_type": content_type},
- )
- else:
- resource = ScrapedResource(
- url=current_url,
- content=response.text,
- suffix="html",
- source_type=source_type,
- metadata={
- "content_type": content_type,
- "encoding": response.encoding,
- },
- )
- res = self.get_links_with_same_hostname(current_url, resource)
- resources.append(resource)
-
- return res, resources # either collected via http or via authenticators method
-
-
- def crawl(
- self,
- start_url: str,
- browserclient = None,
- max_depth: int = 1,
- selenium_scrape: bool = False,
- max_pages: Optional[int] = None,
- ):
- """
- crawl pages from a given starting url up to a given depth either using basic http or a provided browser client
-
- Args :
- start_url (str): Url to start crawling from
- authenticator (SSOAuthenticator): class used for handling authenticatoin for web resources
- max_depth (int): max depth of links to descend from the start url
- selenium_scrape (bool): tracks whether or not the page should be scraped through selenium or not
- max_pages (int | None): cap on total pages to visit before stopping
-
- Returns: List[ScrapedResource]
-
- """
- # Consume the iterator so page_data is populated for callers of crawl().
- for _ in self.crawl_iter(
- start_url,
- browserclient=browserclient,
- max_depth=max_depth,
- selenium_scrape=selenium_scrape,
- max_pages=max_pages,
- collect_page_data=True,
- ):
- pass
- return list(self.page_data)
-
- def crawl_iter(
- self,
- start_url: str,
- browserclient = None,
- max_depth: int = 1,
- selenium_scrape: bool = False,
- max_pages: Optional[int] = None,
- collect_page_data: bool = False,
- ) -> Iterator[ScrapedResource]:
- """
- crawl pages from a given starting url up to a given depth either using basic http or a provided browser client
-
- Args :
- start_url (str): Url to start crawling from
- authenticator (SSOAuthenticator): class used for handling authenticatoin for web resources
- max_depth (int): max depth of links to descend from the start url
- selenium_scrape (bool): tracks whether or not the page should be scraped through selenium or not
- max_pages (int | None): cap on total pages to visit before stopping
- collect_page_data (bool): whether to store resources on the scraper instance
-
- Returns: Iterator[ScrapedResource]
-
- """
-
- if not self.enable_warnings:
- import urllib3
- urllib3.disable_warnings()
-
- depth = 0
- self.visited_urls = set()
- self.seen_urls = set()
- self.page_data = []
- normalized_start_url = self._normalize_url(start_url)
- if not normalized_start_url:
- logger.error(f"Failed to crawl: {start_url}, could not normalize URL")
- return
- to_visit = [normalized_start_url]
- self.seen_urls.add(normalized_start_url)
- level_links = []
- pages_visited = 0
-
- base_hostname = urlparse(normalized_start_url).netloc
- logger.info(f"Base hostname for crawling: {base_hostname}")
-
- # session either stays none or becomes a requests.Session object if not selenium scraping
- session = None
-
- if selenium_scrape: # scrape page with pure selenium
- if browserclient is None:
- logger.error(f"Failed to crawl: {start_url}, auth is needed but no browser clilent was passed through")
- return []
- browserclient.authenticate_and_navigate(normalized_start_url)
-
- elif not selenium_scrape and browserclient is not None: # use browser client for auth but scrape with http request
- session = requests.Session()
- cookies = browserclient.authenticate(normalized_start_url)
- if cookies is not None:
- for cookie_args in cookies:
- cookie = requests.cookies.create_cookie(name=cookie_args['name'],
- value=cookie_args['value'],
- domain=cookie_args.get('domain'),
- path=cookie_args.get('path', '/'),
- expires=cookie_args.get('expires'),
- secure=cookie_args.get('secure', False))
- session.cookies.set_cookie(cookie)
-
- else: # pure html no browser client needed
- session = requests.Session()
-
- while to_visit and depth < max_depth:
- if max_pages is not None and pages_visited >= max_pages:
- logger.info(f"Reached max_pages={max_pages}; stopping crawl early.")
- break
- current_url = to_visit.pop(0)
-
- # Skip if we've already visited this URL
- if current_url in self.visited_urls:
- continue
-
- # Skip image files
- if self._is_image_url(current_url):
- logger.debug(f"Skipping image URL: {current_url}")
- self._mark_visited(current_url)
- continue
-
- logger.info(f"Crawling depth {depth + 1}/{max_depth}: {current_url}")
-
- try:
-
- # grab the page content
- if not selenium_scrape:
- assert (session is not None) # REMOVELATER
- response = session.get(current_url, verify = self.verify_urls)
- response.raise_for_status()
- else:
- assert (browserclient is not None) # REMOVELATER
- browserclient.navigate_to(current_url, wait_time = 2)
- response = browserclient.extract_page_data(current_url) # see the BrowserIntermediaryResult class to see what comes back here
-
-
- # Mark as visited and store content
- pages_visited += 1
- new_links, resources = self.reap(response, current_url, selenium_scrape, browserclient)
- for resource in resources:
- if collect_page_data:
- self.page_data.append(resource)
- yield resource
-
- for link in new_links:
- normalized_link = self._normalize_url(link)
- if not normalized_link:
- continue
- if normalized_link in self.seen_urls:
- continue
- logger.info(f"Found new link: {normalized_link} (nv: {pages_visited})")
- self.seen_urls.add(normalized_link)
- level_links.append(normalized_link)
-
- except Exception as e:
- logger.info(f"Error crawling {current_url}: {e}")
- self._mark_visited(current_url) # Mark as visited to avoid retrying
-
- if not to_visit:
- to_visit.extend(level_links)
- level_links = []
- depth += 1
-
- logger.info(f"Crawling complete. Visited {pages_visited} pages.")
- return
-
- def _normalize_url(self, url: str) -> Optional[str]:
- if not url:
- return None
-
- normalized, _ = urldefrag(url)
- parsed = urlparse(normalized)
- if not parsed.scheme:
- return normalized
- return parsed._replace(
- scheme=parsed.scheme.lower(),
- netloc=parsed.netloc.lower(),
- ).geturl()
-
- def _mark_visited(self, url: str) -> None:
- normalized = self._normalize_url(url)
- if not normalized:
- return
- self.visited_urls.add(normalized)
- self.seen_urls.add(normalized)
-
- def get_links_with_same_hostname(self, url: str, page_data: ScrapedResource):
- """Return all links on the page that share the same hostname as `url`. For now does not support PDFs"""
-
- base_url = self._normalize_url(url) or url
- base_hostname = urlparse(base_url).netloc
- links = set()
- a_tags = []
-
- if (page_data.suffix == "html"):
- soup = BeautifulSoup(page_data.content, "html.parser")
- a_tags = soup.find_all("a", href=True)
-
- # how many links found on the first level
- for tag in a_tags:
- full = urljoin(base_url, tag["href"])
- normalized = self._normalize_url(full)
- if not normalized:
- continue
- if urlparse(normalized).netloc == base_hostname:
- links.add(normalized)
- return list(links)
diff --git a/src/data_manager/collectors/scrapers/scraper_manager.py b/src/data_manager/collectors/scrapers/scraper_manager.py
deleted file mode 100644
index 1904f7f11..000000000
--- a/src/data_manager/collectors/scrapers/scraper_manager.py
+++ /dev/null
@@ -1,366 +0,0 @@
-import os
-import importlib
-from pathlib import Path
-from typing import TYPE_CHECKING, Any, Dict, List, Optional
-
-from src.data_manager.collectors.persistence import PersistenceService
-from src.data_manager.collectors.scrapers.scraped_resource import \
- ScrapedResource
-from src.data_manager.collectors.scrapers.scraper import LinkScraper
-from src.utils.config_access import get_global_config
-from src.utils.env import read_secret
-from src.utils.logging import get_logger
-
-logger = get_logger(__name__)
-
-if TYPE_CHECKING:
- from src.data_manager.collectors.scrapers.integrations.git_scraper import \
- GitScraper
-
-
-class ScraperManager:
- """Coordinates scraper integrations and centralises persistence logic."""
-
- def __init__(self, dm_config: Optional[Dict[str, Any]] = None) -> None:
- global_config = get_global_config()
-
- sources_config = (dm_config or {}).get("sources", {}) or {}
- links_config = sources_config.get("links", {}) if isinstance(sources_config, dict) else {}
- selenium_config = links_config.get("selenium_scraper", {}) if isinstance(sources_config, dict) else {}
-
- git_config = sources_config.get("git", {}) if isinstance(sources_config, dict) else {}
- sso_config = sources_config.get("sso", {}) if isinstance(sources_config, dict) else {}
- self.base_depth = links_config.get('base_source_depth', 5)
- logger.debug(f"Using base depth of {self.base_depth} for weblist URLs")
-
- scraper_config = {}
- if isinstance(links_config, dict):
- scraper_config = links_config.get("html_scraper", {}) or {}
- self.config = scraper_config
- raw_max_pages = links_config.get("max_pages")
- self.max_pages = None
- if raw_max_pages not in (None, ""):
- try:
- self.max_pages = int(raw_max_pages)
- except (TypeError, ValueError):
- logger.warning(f"Invalid max_pages value {raw_max_pages}; ignoring.")
-
- self.links_enabled = True
- self.git_enabled = git_config.get("enabled", False) if isinstance(git_config, dict) else True
- self.git_config = git_config if isinstance(git_config, dict) else {}
- self.selenium_config = selenium_config or {}
- self.selenium_enabled = self.selenium_config.get("enabled", False)
- self.scrape_with_selenium = self.selenium_config.get("use_for_scraping", False)
-
- self.sso_enabled = bool(sso_config.get("enabled", False))
-
- self.data_path = Path(global_config["DATA_PATH"])
- self.input_lists = links_config.get("input_lists", [])
- self.git_dir = self.data_path / "git"
-
- self.data_path.mkdir(parents=True, exist_ok=True)
-
- self.web_scraper = LinkScraper(
- verify_urls=self.config.get("verify_urls", False), # Default to False for broader compatibility
- enable_warnings=self.config.get("enable_warnings", False),
- )
- self._git_scraper: Optional["GitScraper"] = None
-
- def collect_all_from_config(
- self, persistence: PersistenceService
- ) -> None:
- """Run the configured scrapers and persist their output."""
- link_urls, git_urls, sso_urls = self._collect_urls_from_lists_by_type(self.input_lists)
-
- if git_urls:
- self.git_enabled = True
- if sso_urls:
- self.sso_enabled = True
- self._ensure_sso_defaults()
-
- self.collect_links(persistence, link_urls=link_urls)
- self.collect_sso(persistence, sso_urls=sso_urls)
- self.collect_git(persistence, git_urls=git_urls)
-
- logger.info("Web scraping was completed successfully")
-
- def collect_links(
- self,
- persistence: PersistenceService,
- link_urls: List[str] = [],
- max_depth: Optional[int] = None,
- ) -> int:
- """Collect only standard link sources. Returns count of resources scraped."""
- if not self.links_enabled:
- logger.info("Links disabled, skipping link scraping")
- return 0
- if not link_urls:
- return 0
- websites_dir = persistence.data_path / "websites"
- if not os.path.exists(websites_dir):
- os.makedirs(websites_dir, exist_ok=True)
- return self._collect_links_from_urls(link_urls, persistence, websites_dir, max_depth=max_depth)
-
- def collect_git(
- self,
- persistence: PersistenceService,
- git_urls: Optional[List[str]] = None,
- ) -> None:
- """Collect only git sources."""
- if not self.git_enabled:
- logger.info("Git disabled, skipping git scraping")
- return
- if not git_urls:
- return
- git_dir = persistence.data_path / "git"
- if not os.path.exists(git_dir):
- os.makedirs(git_dir, exist_ok=True)
- self._collect_git_resources(git_urls, persistence, git_dir)
-
- def collect_sso(
- self,
- persistence: PersistenceService,
- sso_urls: Optional[List[str]] = None,
- ) -> None:
- """Collect only SSO sources."""
- if not self.sso_enabled:
- logger.info("SSO disabled, skipping SSO scraping")
- return
- self._ensure_sso_defaults()
- if not sso_urls:
- return
- sso_dir = persistence.data_path / "sso"
- if not os.path.exists(sso_dir):
- os.makedirs(sso_dir, exist_ok=True)
- self._collect_sso_from_urls(sso_urls, persistence, sso_dir)
-
- def schedule_collect_links(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None:
- """
- Scheduled collection of link sources.
- For now, this behaves the same as a full collection, overriding last_run depending on the persistence layer.
- """
- metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="web", metadata_keys=["url"])
- catalog_urls = [m[1].get("url", "").strip() for m in metadata]
- catalog_urls = [u for u in catalog_urls if u]
- logger.info("Scheduled links collection found %d URL(s) in catalog", len(catalog_urls))
- self.collect_links(persistence, link_urls=catalog_urls)
-
- def schedule_collect_git(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None:
- metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="git", metadata_keys=["url"])
- catalog_urls = [m[1].get("url", "") for m in metadata]
- self.collect_git(persistence, git_urls=catalog_urls)
-
- def schedule_collect_sso(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None:
- metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="sso", metadata_keys=["url"])
- catalog_urls = [m[1].get("url", "") for m in metadata]
- self.collect_sso(persistence, sso_urls=catalog_urls)
-
- def _collect_links_from_urls(
- self,
- urls: List[str],
- persistence: PersistenceService,
- output_dir: Path,
- max_depth: Optional[int] = None,
- ) -> int:
- """Collect links from URLs and return total count of resources scraped."""
- # Initialize authenticator if selenium is enabled
- authenticator = None
- if self.selenium_enabled:
- authenticator_class, kwargs = self._resolve_scraper()
- if authenticator_class is not None:
- authenticator = authenticator_class(**kwargs)
-
- total_count = 0
- try:
- for url in urls:
- # For standard link collection, don't use selenium for scraping
- # (SSO urls are handled separately via collect_sso)
- count = self._handle_standard_url(
- url,
- persistence,
- output_dir,
- max_depth=max_depth if max_depth is not None else self.base_depth,
- client=None,
- use_client_for_scraping=False
- )
- total_count += count
- finally:
- if authenticator is not None:
- authenticator.close() # Close the authenticator properly and free the resources
- return total_count
-
- def _collect_sso_from_urls(
- self,
- urls: List[str],
- persistence: PersistenceService,
- output_dir: Path,
- ) -> None:
- """Collect SSO-protected URLs using selenium for authentication."""
- if not self.selenium_enabled:
- logger.error("SSO scraping requires data_manager.sources.links.selenium_scraper.enabled")
- return
- if not read_secret("SSO_USERNAME") or not read_secret("SSO_PASSWORD"):
- logger.error("SSO scraping requires SSO_USERNAME and SSO_PASSWORD secrets")
- return
- authenticator = None
- if self.selenium_enabled:
- authenticator_class, kwargs = self._resolve_scraper()
- if authenticator_class is not None:
- authenticator = authenticator_class(**kwargs)
-
- if authenticator is None:
- logger.error("SSO collection requires a valid selenium scraper configuration")
- return
-
- try:
- for url in urls:
- # For SSO URLs, use selenium client for authentication
- # scrape_with_selenium determines if we use selenium for scraping too
- self._handle_standard_url(
- url,
- persistence,
- output_dir,
- max_depth=self.base_depth,
- client=authenticator,
- use_client_for_scraping=self.scrape_with_selenium
- )
- finally:
- if authenticator is not None:
- authenticator.close()
-
- def _ensure_sso_defaults(self) -> None:
- if not self.selenium_config:
- self.selenium_config = {}
-
- if not self.selenium_enabled:
- self.selenium_config["enabled"] = True
- self.selenium_enabled = True
-
- if not self.selenium_config.get("selenium_class"):
- self.selenium_config["selenium_class"] = "CERNSSOScraper"
-
- class_map = self.selenium_config.setdefault("selenium_class_map", {})
- if "CERNSSOScraper" not in class_map:
- class_map["CERNSSOScraper"] = {
- "class": "CERNSSOScraper",
- "kwargs": {
- "headless": True,
- "max_depth": 2,
- },
- }
-
- def _collect_urls_from_lists(self, input_lists) -> List[str]:
- """Collect URLs from the configured weblists."""
- # Handle case where input_lists might be None
- urls: List[str] = []
- if not input_lists:
- return urls
- for list_name in input_lists:
- list_path = Path("weblists") / Path(list_name).name
- if not list_path.exists():
- logger.warning(f"Input list {list_path} not found.")
- continue
-
- urls.extend(self._extract_urls_from_file(list_path))
-
- return urls
-
- def _collect_urls_from_lists_by_type(self, input_lists: List[str]) -> tuple[List[str], List[str], List[str]]:
- """All types of URLs are in the same input lists, separate them via prefixes"""
- link_urls: List[str] = []
- git_urls: List[str] = []
- sso_urls: List[str] = []
- for raw_url in self._collect_urls_from_lists(input_lists):
- if raw_url.startswith("git-"):
- git_urls.append(raw_url.split("git-", 1)[1])
- continue
- if raw_url.startswith("sso-"):
- sso_urls.append(raw_url.split("sso-", 1)[1])
- continue
- link_urls.append(raw_url)
- return link_urls, git_urls, sso_urls
- def _resolve_scraper(self):
- class_name = self.selenium_config.get("selenium_class")
- class_map = self.selenium_config.get("selenium_class_map", {})
- selenium_url = self.selenium_config.get("selenium_url",None)
-
- entry = class_map.get(class_name)
-
- if not entry:
- logger.error(f"Selenium class {class_name} is not defined in the configuration")
- return None, {}
-
- scraper_class = entry.get("class")
- if isinstance(scraper_class, str):
- module_name = entry.get(
- "module",
- "src.data_manager.collectors.scrapers.integrations.sso_scraper",
- )
- module = importlib.import_module(module_name)
- scraper_class = getattr(module, scraper_class)
- scraper_kwargs = entry.get("kwargs", {})
- scraper_kwargs["selenium_url"] = selenium_url
- return scraper_class, scraper_kwargs
-
-
- def _handle_standard_url(
- self,
- url: str,
- persistence: PersistenceService,
- output_dir: Path,
- max_depth: int,
- client=None,
- use_client_for_scraping: bool = False,
- ) -> int:
- """Scrape a URL and persist resources. Returns count of resources scraped."""
- count = 0
- try:
- for resource in self.web_scraper.crawl_iter(
- url,
- browserclient=client,
- max_depth=max_depth,
- selenium_scrape=use_client_for_scraping,
- max_pages=self.max_pages,
- ):
- persistence.persist_resource(
- resource, output_dir
- )
- count += 1
- logger.info(f"Scraped {count} resources from {url}")
- except Exception as exc:
- logger.error(f"Failed to scrape {url}: {exc}", exc_info=exc)
- return count
-
- def _extract_urls_from_file(self, path: Path) -> List[str]:
- """Extract URLs from file, ignoring depth specifications for now."""
- urls: List[str] = []
- with path.open("r") as file:
- for line in file:
- stripped = line.strip()
- if not stripped or stripped.startswith("#"):
- continue
- # Extract just the URL part, ignoring depth specification if present
- url_depth = stripped.split(",")
- url = url_depth[0].strip()
- urls.append(url)
- return urls
-
- def _collect_git_resources(
- self,
- git_urls: List[str],
- persistence: PersistenceService,
- git_dir: Path,
- ) -> List[ScrapedResource]:
- git_scraper = self._get_git_scraper()
- resources = git_scraper.collect(git_urls)
- for resource in resources:
- persistence.persist_resource(resource, git_dir)
- return resources
-
- def _get_git_scraper(self) -> "GitScraper":
- if self._git_scraper is None:
- from src.data_manager.collectors.scrapers.integrations.git_scraper import \
- GitScraper
-
- self._git_scraper = GitScraper(manager=self, git_config=self.git_config)
- return self._git_scraper
From 1778dd4c7423a7d903ed55f45209c279b3446a5f Mon Sep 17 00:00:00 2001
From: Krittin Phornsiricharoenphant
Date: Tue, 7 Apr 2026 03:17:34 +0200
Subject: [PATCH 2/4] standalone workable scrapy project structure with safe
default settings, can scrapy check/crawl link.
---
pyproject.toml | 4 +-
scrapy.cfg | 2 +
.../collectors/scrapers/adapters.py | 64 ++++++++++
.../collectors/scrapers/auth/__init__.py | 0
src/data_manager/collectors/scrapers/items.py | 62 ++++++++++
.../scrapers/middlewares/__init__.py | 0
.../collectors/scrapers/parsers/__init__.py | 0
.../collectors/scrapers/parsers/link.py | 74 ++++++++++++
.../collectors/scrapers/pipelines/__init__.py | 0
.../collectors/scrapers/settings.py | 95 +++++++++++++++
.../collectors/scrapers/spiders/__init__.py | 0
.../collectors/scrapers/spiders/link.py | 109 ++++++++++++++++++
src/data_manager/collectors/scrapers/utils.py | 19 +++
13 files changed, 428 insertions(+), 1 deletion(-)
create mode 100644 scrapy.cfg
create mode 100644 src/data_manager/collectors/scrapers/adapters.py
create mode 100644 src/data_manager/collectors/scrapers/auth/__init__.py
create mode 100644 src/data_manager/collectors/scrapers/items.py
create mode 100644 src/data_manager/collectors/scrapers/middlewares/__init__.py
create mode 100644 src/data_manager/collectors/scrapers/parsers/__init__.py
create mode 100644 src/data_manager/collectors/scrapers/parsers/link.py
create mode 100644 src/data_manager/collectors/scrapers/pipelines/__init__.py
create mode 100644 src/data_manager/collectors/scrapers/settings.py
create mode 100644 src/data_manager/collectors/scrapers/spiders/__init__.py
create mode 100644 src/data_manager/collectors/scrapers/spiders/link.py
create mode 100644 src/data_manager/collectors/scrapers/utils.py
diff --git a/pyproject.toml b/pyproject.toml
index f5136f334..bc4465843 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -26,7 +26,9 @@ dependencies = [
"pandas==2.3.2",
"isort==6.0.1",
"pre-commit>=4",
- "psycopg2-binary==2.9.10"
+ "psycopg2-binary==2.9.10",
+ "Scrapy>=2.14.2",
+ "playwright>=1.49.0,<2"
]
[project.scripts]
diff --git a/scrapy.cfg b/scrapy.cfg
new file mode 100644
index 000000000..124bc2c4b
--- /dev/null
+++ b/scrapy.cfg
@@ -0,0 +1,2 @@
+[settings]
+default = src.data_manager.collectors.scrapers.settings
diff --git a/src/data_manager/collectors/scrapers/adapters.py b/src/data_manager/collectors/scrapers/adapters.py
new file mode 100644
index 000000000..bbdc79e9b
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/adapters.py
@@ -0,0 +1,64 @@
+"""
+Single-dispatch adapter: converts Scrapy Items into ScrapedResource.
+
+Design principles:
+- Items are dumb data bags. They know nothing about ScrapedResource.
+- This is the ONLY place that knows about both schemas.
+- New sources: add a @to_scraped_resource.register block here. Touch nothing else.
+- Do NOT reconstruct ResourceMetadata — ScrapedResource.get_metadata() already
+ derives display_name, url, suffix, source_type from raw fields. Pass raw values only.
+
+Constraint: ~50 LOC of logic.
+
+Adding a new source (e.g. TwikiPageItem):
+ @to_scraped_resource.register(TwikiPageItem)
+ def _twiki(item) -> ScrapedResource:
+ ...
+
+If two sources share identical mapping logic, stack decorators:
+ @to_scraped_resource.register(WebPageItem)
+ @to_scraped_resource.register(TwikiPageItem)
+ def _html_page(item) -> ScrapedResource:
+ ...
+ Note: do NOT use union type hints (WebPageItem | TwikiPageItem) —
+ singledispatch ignores annotations, it dispatches on runtime type only.
+"""
+from __future__ import annotations
+
+from functools import singledispatch
+
+from src.data_manager.collectors.scrapers.scraped_resource import ScrapedResource
+from src.data_manager.collectors.scrapers.items import WebPageItem
+
+
+@singledispatch
+def to_scraped_resource(item) -> ScrapedResource:
+ """Raises for unregistered types — fail loudly, never silently skip."""
+ raise TypeError(
+ f"No adapter registered for item type {type(item).__name__!r}. "
+ "Add @to_scraped_resource.register(YourItemClass) in this module."
+ )
+
+
+@to_scraped_resource.register(WebPageItem)
+def _html_page(item) -> ScrapedResource:
+ """
+ Handles all HTML-family pages regardless of auth method.
+
+ PDFs scraped from the web also route here — the parser sets
+ suffix="pdf" and content=bytes in the item, so no branch needed.
+ The adapter passes suffix and source_type through without inspection.
+ """
+ return ScrapedResource(
+ url=item["url"],
+ content=item["content"],
+ suffix=item.get("suffix", "html"),
+ source_type=item["source_type"],
+ metadata={
+ "content_type": item.get("content_type"),
+ "encoding": item.get("encoding"),
+ "title": item.get("title"),
+ },
+ )
+
+
diff --git a/src/data_manager/collectors/scrapers/auth/__init__.py b/src/data_manager/collectors/scrapers/auth/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/data_manager/collectors/scrapers/items.py b/src/data_manager/collectors/scrapers/items.py
new file mode 100644
index 000000000..f72d91b5a
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/items.py
@@ -0,0 +1,62 @@
+"""
+Scrapy intuition — Items as the data contract (FR-7a):
+
+ Items sit between Parser and Adapter.
+ Their field schema must be driven by what the Adapter needs
+ to construct a ScrapedResource — not by what's convenient
+ to inspect during development.
+
+ Wrong mental model: "what fields help me debug?"
+ Right mental model: "what fields does ScrapedResource.__init__ need?"
+
+ ScrapedResource fields (from scraped_resource.py):
+ url — required
+ content — required (str or bytes)
+ suffix — required
+ source_type — required ("web", "sso", "git")
+ metadata — dict, optional (title, content_type, encoding, etc.)
+ file_name — optional
+ relative_path — optional
+
+ So items carry exactly those fields.
+ Debug fields (body_preview, body_length) belong in logger calls,
+ not in the item schema — otherwise the adapter becomes a translation
+ layer for data that should never have been structured in the first place.
+
+SOLID note — Open/Closed:
+ Add new Item subclasses for new source types.
+ Do not add source-specific fields to the base class.
+ The adapter is the extension point, not the Item.
+"""
+
+import scrapy
+
+
+class BasePageItem(scrapy.Item):
+ """
+ Common fields shared across all scraped source types.
+ Maps directly to ScrapedResource constructor arguments.
+ """
+ url = scrapy.Field()
+ content = scrapy.Field() # Full text or bytes — NOT a preview
+ suffix = scrapy.Field() # "html", "pdf", "md" etc.
+ source_type = scrapy.Field() # "web" | "twiki" | "indico" | "discourse"
+
+ # Metadata fields — become ScrapedResource.metadata dict
+ title = scrapy.Field()
+ content_type = scrapy.Field() # HTTP Content-Type header value
+ encoding = scrapy.Field() # HTTP response encoding
+
+ # Optional — used by git/SSO scrapers for filesystem layout
+ file_name = scrapy.Field()
+ relative_path = scrapy.Field()
+
+
+class WebPageItem(BasePageItem):
+ """
+ Generic page item, works for SSO-*, ordinary web page.
+ No extra fields needed beyond BasePageItem.
+ Subclassing is the extension point (OCP) — Twiki quirks
+ belong in parse_twiki_page(), not in a bloated base class.
+ """
+ pass
diff --git a/src/data_manager/collectors/scrapers/middlewares/__init__.py b/src/data_manager/collectors/scrapers/middlewares/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/data_manager/collectors/scrapers/parsers/__init__.py b/src/data_manager/collectors/scrapers/parsers/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/data_manager/collectors/scrapers/parsers/link.py b/src/data_manager/collectors/scrapers/parsers/link.py
new file mode 100644
index 000000000..c61e46d4e
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/parsers/link.py
@@ -0,0 +1,74 @@
+from typing import Iterator, List
+from scrapy.http import Response, TextResponse
+from urllib.parse import urlparse
+from src.data_manager.collectors.scrapers.items import WebPageItem
+from src.data_manager.collectors.scrapers.utils import get_content_type
+# Tried in order — first non-empty match wins.
+# Covers: HTML5 semantic, ARIA landmark, common CMS patterns, final fallback.
+_CONTENT_SELECTORS = [
+ "main",
+ "article",
+ '[role="main"]',
+ "#content",
+ "#main",
+ "#main-content",
+ ".main-content", # MIT.edu Drupal wrapper
+ ".region-content", # Drupal generic region
+ ".content",
+ ".post-content",
+ ".entry-content",
+ "body",
+]
+
+def _first_outer_html(response: Response, selectors: List[str]) -> str:
+ for selector in selectors:
+ nodes = response.css(selector)
+ if not nodes:
+ continue
+ html = nodes[0].get()
+ if html and html.strip():
+ return html.strip()
+ return ""
+
+def parse_link_page(response: Response) -> Iterator[WebPageItem]:
+ """
+ Generic page parser — works for any HTML page with no site-specific selectors.
+ Strategy:
+ - PDFs: return raw bytes, suffix="pdf".
+ - HTML: extract visible text from the first matching content container,
+ falling back through _CONTENT_SELECTORS to .
+ Full raw HTML is never stored — only visible text reaches the item.
+ Suitable as the default parse_item for LinkSpider subclasses that have
+ no meaningful site-specific structure to exploit.
+ """
+ ct = get_content_type(response)
+ # ── PDF ──────────────────────────────────────────────────────────────────
+ if response.url.lower().endswith(".pdf") or "application/pdf" in ct:
+ yield WebPageItem(
+ url=response.url,
+ content=response.body,
+ suffix="pdf",
+ source_type="web",
+ title=urlparse(response.url).path.split("/")[-1].replace(".pdf", "").strip(),
+ content_type=ct,
+ )
+ return
+ # ── HTML ─────────────────────────────────────────────────────────────────
+ title = (
+ response.css("h1::text").get()
+ or response.css("title::text").get()
+ or ""
+ ).strip()
+ body_text = _first_outer_html(response, _CONTENT_SELECTORS)
+ encoding = response.encoding if isinstance(response, TextResponse) else "utf-8"
+ if not body_text:
+ return # empty page — don't yield a blank item
+ yield WebPageItem(
+ url=response.url,
+ content=body_text,
+ suffix="html",
+ source_type="web",
+ title=title,
+ content_type=ct,
+ encoding=encoding,
+ )
diff --git a/src/data_manager/collectors/scrapers/pipelines/__init__.py b/src/data_manager/collectors/scrapers/pipelines/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/data_manager/collectors/scrapers/settings.py b/src/data_manager/collectors/scrapers/settings.py
new file mode 100644
index 000000000..9db783924
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/settings.py
@@ -0,0 +1,95 @@
+BOT_NAME = "archi_scrapers"
+
+SPIDER_MODULES = ["src.data_manager.collectors.scrapers.spiders"]
+
+NEWSPIDER_MODULE = "src.data_manager.collectors.scrapers.spiders"
+
+# Browser-like UA to avoid bot-blocking (e.g. Twiki ConnectionLost issue)
+USER_AGENT = (
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/120.0.0.0 Safari/537.36"
+ "archi_scrapers/1.0 (+https://github.com/archi-physics/archi)"
+)
+
+# Default RETRY_TIMES is 2. We bump to 3 for transient failures.
+# ConnectionLost is in RETRY_HTTP_CODES by default as a non-HTTP failure;
+# Scrapy retries it automatically via RetryMiddleware.
+RETRY_ENABLED = True
+RETRY_TIMES = 3 # max retries per request (transport + server errors only)
+RETRY_HTTP_CODES = [
+ 500, # Internal Server Error — transient server fault
+ 502, # Bad Gateway — upstream not reachable
+ 503, # Service Unavailable — server overloaded
+ 504, # Gateway Timeout
+ 408, # Request Timeout — network-level timeout
+ # 429 (Too Many Requests) omitted: AutoThrottle should prevent hitting it;
+]
+
+# Conservative floor delay for all sources.
+# AutoThrottle will increase this dynamically but never go below it.
+# Indico's robots.txt mandates Crawl-delay: 10 — Indico spiders must override
+# this to 10 via custom_settings = {"DOWNLOAD_DELAY": 10}.
+DOWNLOAD_DELAY = 2 # seconds
+# Per-request timeout — prevents indefinite hangs
+DOWNLOAD_TIMEOUT = 30 # seconds
+
+# Keep a single concurrent request per domain.
+# AutoThrottle adjusts throughput dynamically; starting at 1 is safe.
+CONCURRENT_REQUESTS = 1
+CONCURRENT_REQUESTS_PER_DOMAIN = 1
+
+# Robots.txt: obey by default.
+# override this per-spider: custom_settings = {"ROBOTSTXT_OBEY": False}
+# Never disable globally — it would affect all spiders.
+ROBOTSTXT_OBEY = True
+
+# AutoThrottle
+# Enabled as a second politeness layer on top of DOWNLOAD_DELAY.
+# AutoThrottle treats DOWNLOAD_DELAY as a minimum — it will never go lower.
+# Target concurrency of 1.0 keeps us single-threaded per domain by default.
+AUTOTHROTTLE_ENABLED = True
+AUTOTHROTTLE_START_DELAY = DOWNLOAD_DELAY # initial delay before AT calibrates
+AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
+AUTOTHROTTLE_MAX_DELAY = 60 # cap: never wait more than 60s
+# Log every AutoThrottle adjustment — useful during development, can be
+# set False in production if log volume is too high.
+AUTOTHROTTLE_DEBUG = False
+
+# ------------------------------------------------------------------ #
+# Depth limiting — safety cap; spiders can narrow via custom_settings.
+# ------------------------------------------------------------------ #
+DEPTH_LIMIT = 2 # hard cap so a misconfigured crawl can't run forever
+
+# ---------------------------------------------------------------------------
+# Safety: fail loudly on spider import errors
+# ---------------------------------------------------------------------------
+SPIDER_LOADER_WARN_ONLY = False
+
+# Maximum error count before the spider is closed automatically.
+# 25 gives enough room to diagnose intermittent failures without letting
+# a completely broken crawl run for hours.
+CLOSESPIDER_ERRORCOUNT = 25
+
+LOG_LEVEL = "INFO"
+
+# The class used to detect and filter duplicate requests
+DUPEFILTER_CLASS = "scrapy.dupefilters.RFPDupeFilter"
+
+# ---------------------------------------------------------------------------
+# Middlewares, Pipelines and Extensions Priorities
+# ---------------------------------------------------------------------------
+DOWNLOADER_MIDDLEWARES = {
+ "scrapy.downloadermiddlewares.retry.RetryMiddleware": 550,
+ # RedirectMiddleware stays at its default 600 — no entry needed
+}
+
+SPIDER_AUTH_PROVIDERS = {
+}
+
+ITEM_PIPELINES = {
+}
+
+EXTENSIONS = {
+ "scrapy.extensions.closespider.CloseSpider": 500,
+}
diff --git a/src/data_manager/collectors/scrapers/spiders/__init__.py b/src/data_manager/collectors/scrapers/spiders/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/data_manager/collectors/scrapers/spiders/link.py b/src/data_manager/collectors/scrapers/spiders/link.py
new file mode 100644
index 000000000..c51c0826d
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/spiders/link.py
@@ -0,0 +1,109 @@
+from typing import Iterator, Callable
+from urllib.parse import urlparse
+from scrapy import Spider
+from scrapy.http import Response, Request
+from scrapy.linkextractors import LinkExtractor
+from scrapy.link import Link
+from src.data_manager.collectors.scrapers.utils import IMAGE_EXTENSIONS, IGNORED_DOCUMENT_EXTENSIONS
+from src.data_manager.collectors.scrapers.items import WebPageItem
+from src.data_manager.collectors.scrapers.parsers.link import parse_link_page
+
+class LinkSpider(Spider):
+ """
+ Generic link-following spider for unauthenticated pages.
+ Stays within the hostnames of all start_urls, up to max_depth.
+ """
+
+ name = "link"
+
+ _DEFAULT_START_URLS = ["https://quotes.toscrape.com/"]
+
+ custom_settings = {
+ "DEPTH_LIMIT": 1, # Default max depth
+ "DOWNLOAD_DELAY": 2, # Default (download) delay
+ "CLOSESPIDER_PAGECOUNT": 500 # Default max pages
+ }
+
+ @classmethod
+ def from_crawler(cls, crawler, *args, **kwargs):
+ max_depth = kwargs.get("max_depth")
+ max_pages = kwargs.get("max_pages")
+ delay = kwargs.get("delay")
+ if max_depth:
+ crawler.settings.set("DEPTH_LIMIT", max_depth, priority="spider")
+ if max_pages:
+ crawler.settings.set("CLOSESPIDER_PAGECOUNT", max_pages, priority="spider")
+ if delay:
+ crawler.settings.set("DOWNLOAD_DELAY", delay, priority="spider")
+ return super().from_crawler(crawler, *args, **kwargs)
+
+ def __init__(self, start_urls: list[str] = None, max_depth: int = None, max_pages: int = None, allow: list[str] = None, deny: list[str] = None, delay: int = None, canonicalize: bool = False, process_value: Callable[[str], str] = None, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._start_urls = start_urls or getattr(self, "_DEFAULT_START_URLS", [])
+ self._allowed_domains: set[str] = {
+ urlparse(u).netloc
+ for u in self._start_urls
+ if urlparse(u).netloc
+ }
+ default_deny = getattr(self, "_DEFAULT_DENY", [])
+ default_process_value = getattr(self, "_DEFAULT_PROCESS_VALUE", None)
+ self._le = LinkExtractor(
+ allow=allow or [],
+ deny=(deny or []) + default_deny,
+ allow_domains=list(self._allowed_domains),
+ deny_extensions=(IMAGE_EXTENSIONS + IGNORED_DOCUMENT_EXTENSIONS),
+ canonicalize=canonicalize,
+ process_value=process_value or default_process_value,
+ unique=True,
+ )
+
+ async def start(self):
+ """
+ Seed requests — validates start_urls at crawl time, not import time.
+ Building the habit: always attach errback here, never rely on
+ start_urls shortcut in production spiders.
+ """
+ if not self._start_urls:
+ raise ValueError("LinkSpider requires start_urls to be set")
+ for url in self._start_urls:
+ yield Request(url=url, callback=self.parse, errback=self.errback, meta={"depth": 0})
+
+ def parse(self, response: Response) -> Iterator[WebPageItem | Request]:
+ """
+ Extract one item per response, then yield follow Requests up to max_depth.
+ @url https://quotes.toscrape.com/
+ @returns items 1
+ @returns requests 1
+ @scrapes url title
+ """
+ yield from self.parse_item(response) # Yield Item
+ yield from self.follow_links(response) # Yield Requests
+
+
+ def follow_links(self, response: Response) -> Iterator[Request]:
+ current_depth = response.meta.get("depth", 0)
+ if current_depth >= self.settings.get("DEPTH_LIMIT"):
+ self.logger.info("Reached max depth %d", self.settings.get("DEPTH_LIMIT"))
+ return
+ for link in self.parse_follow_links(response):
+ self.logger.info("Following %s at depth %d", link.url, current_depth)
+ yield Request(link.url, callback=self.parse, errback=self.errback, meta={"depth": current_depth + 1})
+
+ def errback(self, failure):
+ self.logger.error(
+ "Request failed: %s — %s",
+ failure.request.url,
+ repr(failure.value),
+ )
+
+ # ------------------------------------------------------------------ #
+ # Extension points — pure, unit-testable/checkable without a reactor
+ # ------------------------------------------------------------------ #
+
+ def parse_item(self, response: Response) -> Iterator[WebPageItem]:
+ yield from parse_link_page(response)
+
+ def parse_follow_links(self, response: Response) -> Iterator[Link]:
+ links = self._le.extract_links(response)
+ self.logger.info("Extracted %d links from %s", len(links), response.url)
+ yield from links
diff --git a/src/data_manager/collectors/scrapers/utils.py b/src/data_manager/collectors/scrapers/utils.py
new file mode 100644
index 000000000..003003d77
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/utils.py
@@ -0,0 +1,19 @@
+from scrapy.http import Response
+
+IMAGE_EXTENSIONS = [
+ "png", "jpg", "jpeg", "gif", "bmp", "svg", "ico", "webp"
+]
+
+# pdf, docs, xlsx, pptx are first class supported by MarkItDown
+IGNORED_DOCUMENT_EXTENSIONS = [
+ "doc",
+ "xls",
+ "ppt",
+ "zip",
+ "rar",
+]
+
+def get_content_type(response: Response) -> str:
+ """Decode the Content-Type header bytes to str."""
+ raw: bytes = response.headers.get("Content-Type", b"") or b""
+ return raw.decode("utf-8", errors="replace")
From 1130333c16cbe0b221cc038d07c6c96582523946 Mon Sep 17 00:00:00 2001
From: Krittin Phornsiricharoenphant
Date: Tue, 7 Apr 2026 03:48:21 +0200
Subject: [PATCH 3/4] [anonymizer] Scrapy AnnymizationPipeline and patching
global anonymizer to support generic markdow, twiki, discourse patterns.
---
.../scrapers/pipelines/anonymization.py | 55 +++++++
.../collectors/scrapers/settings.py | 1 +
.../collectors/utils/anonymizer.py | 153 +++++++++++++++---
3 files changed, 184 insertions(+), 25 deletions(-)
create mode 100644 src/data_manager/collectors/scrapers/pipelines/anonymization.py
diff --git a/src/data_manager/collectors/scrapers/pipelines/anonymization.py b/src/data_manager/collectors/scrapers/pipelines/anonymization.py
new file mode 100644
index 000000000..4f07af398
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/pipelines/anonymization.py
@@ -0,0 +1,55 @@
+from typing import TYPE_CHECKING
+
+from src.data_manager.collectors.utils.anonymizer import Anonymizer
+from src.data_manager.collectors.scrapers.items import BasePageItem
+
+from scrapy import Spider
+from src.utils.logging import get_logger
+
+logger = get_logger(__name__)
+
+class AnonymizationPipeline:
+ """Runs at priority 250, before PersistencePipeline (300)."""
+
+ _DEFAULT_ANONYMIZER_CONFIG = {
+ "utils": {
+ "anonymizer": {
+ "nlp_model": "en_core_web_sm",
+ "excluded_words": ["John", "Jane", "Doe"],
+ "greeting_patterns": [
+ r"^(hi|hello|hey|greetings|dear)\b",
+ r"^\w+,\s*",
+ ],
+ "signoff_patterns": [
+ r"\b(regards|sincerely|best regards|cheers|thank you)\b",
+ r"^\s*[-~]+\s*$",
+ ],
+ "email_pattern": r"[\w\.-]+@[\w\.-]+\.\w+",
+ "username_pattern": r"\[~[^\]]+\]",
+ }
+ }
+ }
+
+ def __init__(self, anonymizer: Anonymizer) -> None:
+ self._anonymizer = anonymizer
+
+ @classmethod
+ def from_crawler(cls, crawler):
+ enabled = crawler.settings.getbool("ANONYMIZE_DATA", True)
+ anonymizer = crawler.settings.get("ANONYMIZER_SERVICE")
+ if not enabled:
+ raise NotConfigured("Anonymization is disabled")
+ if anonymizer is None:
+ # when we use scrapy cmd, we don't have the anonymizer service provided
+ dm_config = cls._DEFAULT_ANONYMIZER_CONFIG
+ return cls(anonymizer=Anonymizer(dm_config))
+ return cls(anonymizer=anonymizer)
+
+ def process_item(self, item: BasePageItem, spider: Spider) -> BasePageItem:
+ if isinstance(item.get("content"), str):
+ logger.debug(f"Anonymizing content: {item['content']}")
+ item["content"] = self._anonymizer.anonymize_markup(item["content"])
+ logger.debug(f"Anonymized content: {item['content']}")
+ if isinstance(item.get("title"), str):
+ item["title"] = self._anonymizer.anonymize(item["title"])
+ return item
diff --git a/src/data_manager/collectors/scrapers/settings.py b/src/data_manager/collectors/scrapers/settings.py
index 9db783924..c89e1acb3 100644
--- a/src/data_manager/collectors/scrapers/settings.py
+++ b/src/data_manager/collectors/scrapers/settings.py
@@ -88,6 +88,7 @@
}
ITEM_PIPELINES = {
+ "src.data_manager.collectors.scrapers.pipelines.anonymization.AnonymizationPipeline": 250,
}
EXTENSIONS = {
diff --git a/src/data_manager/collectors/utils/anonymizer.py b/src/data_manager/collectors/utils/anonymizer.py
index 72ac00456..e6ffcc353 100644
--- a/src/data_manager/collectors/utils/anonymizer.py
+++ b/src/data_manager/collectors/utils/anonymizer.py
@@ -3,20 +3,71 @@
"""
import re
-from typing import List, Set
+from typing import List, Set, Dict, Any
import spacy
from src.utils.config_access import get_data_manager_config
+from html import unescape
+
+# Generic markup patterns
+_TAG_RE = re.compile(r"<[^>]+>")
+_CDATA_RE = re.compile(r"")
+_DC_CREATOR_RE = re.compile(
+ r'()',
+ re.IGNORECASE,
+)
+_ATTR_TEXT_RE = re.compile(r'(?:title|alt|creator|author)=["\']([^"\']+)["\']', re.IGNORECASE)
+_CONTENT_TAG_RE = re.compile(
+ r'<(?:p|li|td|description|title|dc:creator)[^>]*>(.*?)(?:p|li|td|description|title|dc:creator)>',
+ re.DOTALL | re.IGNORECASE,
+)
+# Albert-Einstein → (removed)
+_DEFAULT_GENERIC_MARKUP_USER_LINK_RE = re.compile(
+ r']*href="[^"]*?/(?:Main|author|user|profile|members)/[^"]*"[^>]*>[^<]*',
+ re.IGNORECASE,
+)
+# Generic author link, like Albert-Einstein
+# Stephenie Meyer
+# John Doe
+# Jane Smith
+# Bob
+_DEFAULT_GENERIC_MARKUP_AUTHOR_ELEMENT_RE = re.compile(
+ r'<[^>]*(?:itemprop=["\']author["\']|class=["\'][^"\']*\bauthor\b[^"\']*["\']|rel=["\']author["\'])[^>]*>[^<]*[^>]+>',
+ re.IGNORECASE,
+)
+# JohnDoe → (removed)
+_DEFAULT_MARKUP_TWIKI_USER_LINK_RE = re.compile(
+ r']*href="[^"]*?/twiki/bin/\w+/Main/\w+"[^>]*>\w+',
+ re.IGNORECASE,
+)
+# John
→ (removed)
+#
John Doe
→ (removed)
+_DEFAULT_MARKUP_SIGNOFF_TAG_RE = re.compile(
+ r'\s*(?:
)?\s*[A-Z][\w.]*(?:\s+[A-Z][\w.]*){0,2}\s*
',
+ re.IGNORECASE,
+)
+# ..atm
\nJohn
→ ..atm
+# Thanks\John →
+# Yours sincerely,\nJ.D.Doe]]> → ]]>
+_DEFAULT_MARKUP_TRAILING_SIGNOFF_TAG_RE = re.compile(
+ r'(?:'
+ r'
\s*\n?\s*'
+ r'|(?:Thanks|Cheers|Best|Regards|HTH|Yours\s+sincerely)\s*,?\s*[\n\s]*'
+ r')'
+ r'[A-Z][\w.]*(?:\s+[A-Z][\w.]*){0,2}'
+ r'\s*(?=||\]\]>)',
+ re.IGNORECASE,
+)
class Anonymizer:
- def __init__(self):
+ def __init__(self, dm_config: Dict[str, Any]=None):
"""
Initialize the Anonymizer.
"""
- dm_config = get_data_manager_config()
+ dm_config = dm_config or get_data_manager_config()
data_manager_utils = dm_config.get("utils", {}) if isinstance(dm_config, dict) else {}
anonymizer_config = data_manager_utils.get("anonymizer", {}) if isinstance(data_manager_utils, dict) else {}
@@ -45,39 +96,91 @@ def __init__(self):
self.SIGNOFF_PATTERNS = [re.compile(pattern, re.IGNORECASE) for pattern in signoff_patterns]
self.EMAIL_PATTERN = re.compile(email_pattern)
self.USERNAME_PATTERN = re.compile(username_pattern)
+
+ def _discover_names(self, text: str) -> set:
+ """NER to discover names in the text."""
+ doc = self.nlp(text)
+ return {
+ ent.text for ent in doc.ents
+ if ent.label_ == "PERSON" and ent.text not in self.EXCLUDED_WORDS
+ }
+
+ def _discover_names_markup(self, markup: str) -> set:
+ # Full document: names with surrounding context (catches CDATA)
+ full_text = self._extract_text(markup)
+ names = self._discover_names(full_text)
+ # Per-chunk: focused paragraphs (catches standalone names in )
+ for chunk in self._extract_text_chunks(markup):
+ names |= self._discover_names(chunk)
+ return names
def anonymize(self, text: str) -> str:
"""
Anonymize names, emails, usernames, greetings, and sign-offs from the text.
"""
- doc = self.nlp(text)
- names_to_replace = {
- ent.text for ent in doc.ents
- if ent.label_ == "PERSON" and ent.text not in self.EXCLUDED_WORDS
- }
+ names_to_replace = self._discover_names(text)
# Remove email addresses and usernames
text = self.EMAIL_PATTERN.sub("", text)
text = self.USERNAME_PATTERN.sub("", text)
- # Remove greetings and sign-offs
+ text = self._strip_greetings_signoffs(text)
+ return self._replace_names(text, names_to_replace)
+
+ def anonymize_markup(self, markup: str) -> str:
+ """
+ Anonymize names, emails, usernames, greetings, and sign-offs from the markup.
+ including html, rss, and other markup formats. (especially twiki and discourse markup)
+ """
+ names_to_replace = self._discover_names_markup(markup)
+ # Remove email addresses and usernames
+ markup = self.EMAIL_PATTERN.sub("", markup)
+ markup = self.USERNAME_PATTERN.sub("", markup)
+ markup = _DC_CREATOR_RE.sub(r'\1\2', markup)
+ markup = _DEFAULT_GENERIC_MARKUP_AUTHOR_ELEMENT_RE.sub("", markup)
+ markup = _DEFAULT_GENERIC_MARKUP_USER_LINK_RE.sub("", markup)
+ markup = _DEFAULT_MARKUP_SIGNOFF_TAG_RE.sub("", markup)
+ markup = _DEFAULT_MARKUP_TRAILING_SIGNOFF_TAG_RE.sub("", markup)
+ markup = _DEFAULT_MARKUP_TWIKI_USER_LINK_RE.sub("", markup)
+ markup = self._strip_greetings_signoffs(markup)
+ return self._replace_names(markup, names_to_replace)
+
+ def _strip_greetings_signoffs(self, text: str) -> str:
lines = text.splitlines()
- filtered_lines: List[str] = []
+ filtered = []
for line in lines:
- stripped_line = line.strip()
- if any(p.match(stripped_line) for p in self.GREETING_PATTERNS):
+ stripped = line.strip()
+ if any(p.match(stripped) for p in self.GREETING_PATTERNS):
continue
- if any(p.match(stripped_line) for p in self.SIGNOFF_PATTERNS):
+ if any(p.match(stripped) for p in self.SIGNOFF_PATTERNS):
continue
- filtered_lines.append(line)
- text = "\n".join(filtered_lines)
-
- # Remove names (case-insensitive)
- for name in sorted(names_to_replace, key=len, reverse=True):
- pattern = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE)
- text = pattern.sub("", text)
-
- # Remove extra whitespace
- text = "\n".join(line for line in text.splitlines() if line.strip())
-
- return text
+ filtered.append(line)
+ return "\n".join(filtered)
+
+ def _replace_names(self, text: str, names: set) -> str:
+ for name in sorted(names, key=len, reverse=True):
+ text = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE).sub("", text)
+ return "\n".join(line for line in text.splitlines() if line.strip())
+
+ def _extract_text(self, markup: str) -> str:
+ """Strip markup to plain text for NER. Format-agnostic."""
+ attrs = " ".join(_ATTR_TEXT_RE.findall(markup))
+ clean = _CDATA_RE.sub(" ", markup)
+ clean = _TAG_RE.sub(" ", clean)
+ clean = unescape(clean)
+ return re.sub(r"\s+", " ", f"{clean} {attrs}").strip()
+
+ def _extract_text_chunks(self, markup: str) -> list:
+ chunks = []
+ # Text content from tags
+ for match in _CONTENT_TAG_RE.finditer(markup):
+ inner = _CDATA_RE.sub(" ", match.group(1))
+ clean = _TAG_RE.sub(" ", inner)
+ clean = unescape(clean).strip()
+ if clean:
+ chunks.append(clean)
+ # Text from attributes
+ attr_text = " ".join(_ATTR_TEXT_RE.findall(markup))
+ if attr_text.strip():
+ chunks.append(attr_text.strip())
+ return chunks
From 5dede518a6670e1092bafa59eef573736941d482 Mon Sep 17 00:00:00 2001
From: Krittin Phornsiricharoenphant
Date: Tue, 7 Apr 2026 04:25:04 +0200
Subject: [PATCH 4/4] [markitdown] scrapy MarkitDownPipeline and standalone
generic MarkitdownConvertor.
---
pyproject.toml | 3 +-
.../scrapers/pipelines/markitdown.py | 41 +++++++++++++++++++
.../collectors/scrapers/settings.py | 1 +
.../collectors/utils/markitdown_convertor.py | 38 +++++++++++++++++
4 files changed, 82 insertions(+), 1 deletion(-)
create mode 100644 src/data_manager/collectors/scrapers/pipelines/markitdown.py
create mode 100644 src/data_manager/collectors/utils/markitdown_convertor.py
diff --git a/pyproject.toml b/pyproject.toml
index bc4465843..970c0666a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -28,7 +28,8 @@ dependencies = [
"pre-commit>=4",
"psycopg2-binary==2.9.10",
"Scrapy>=2.14.2",
- "playwright>=1.49.0,<2"
+ "playwright>=1.49.0,<2",
+ "markitdown>=0.1.0"
]
[project.scripts]
diff --git a/src/data_manager/collectors/scrapers/pipelines/markitdown.py b/src/data_manager/collectors/scrapers/pipelines/markitdown.py
new file mode 100644
index 000000000..b8defdfc0
--- /dev/null
+++ b/src/data_manager/collectors/scrapers/pipelines/markitdown.py
@@ -0,0 +1,41 @@
+from scrapy import Spider
+from src.utils.logging import get_logger
+from src.data_manager.collectors.utils.markitdown_convertor import MarkitdownConvertor
+from src.data_manager.collectors.utils.anonymizer import Anonymizer
+from src.data_manager.collectors.scrapers.pipelines.anonymization import AnonymizationPipeline
+from src.data_manager.collectors.scrapers.items import BasePageItem
+from scrapy.exceptions import NotConfigured
+
+logger = get_logger(__name__)
+
+class MarkitdownPipeline:
+ """Runs at priority 250, before PersistencePipeline (300)."""
+
+ def __init__(self, markitdown: MarkitdownConvertor, anonymizer: Anonymizer, anonymize_data: bool):
+ self._markitdown = markitdown
+ self._anonymizer = anonymizer
+ self._anonymize_data = anonymize_data
+
+ @classmethod
+ def from_crawler(cls, crawler):
+ enabled = crawler.settings.getbool("MARKITDOWN_ENABLED", True)
+ markitdown_convertor = crawler.settings.get("MARKITDOWN_SERVICE")
+ anonymizer = crawler.settings.get("ANONYMIZER_SERVICE")
+ anonymize_data = crawler.settings.getbool("ANONYMIZE_DATA", True)
+ if not enabled:
+ raise NotConfigured("Markitdown is disabled")
+ if markitdown_convertor is None:
+ # when we use scrapy cmd, we don't have the markitdown service provided
+ markitdown_convertor = MarkitdownConvertor()
+ if anonymizer is None:
+ # when we use scrapy cmd, we don't have the anonymizer service provided
+ anonymizer = AnonymizationPipeline.from_crawler(crawler)._anonymizer
+ return cls(markitdown=markitdown_convertor, anonymizer=anonymizer, anonymize_data=anonymize_data)
+
+ def process_item(self, item: BasePageItem, spider: Spider) -> BasePageItem:
+ if isinstance(item.get("content"), str):
+ item["content"] = self._markitdown.convert(item["content"], file_extension=item["suffix"])
+ if self._anonymize_data:
+ item["content"] = self._anonymizer.anonymize(item["content"])
+ logger.debug(f"Markitdown result ({'anonymized' if self._anonymize_data else 'not second pass anonymized'})): {item['content']}")
+ return item
diff --git a/src/data_manager/collectors/scrapers/settings.py b/src/data_manager/collectors/scrapers/settings.py
index c89e1acb3..2a447c2e4 100644
--- a/src/data_manager/collectors/scrapers/settings.py
+++ b/src/data_manager/collectors/scrapers/settings.py
@@ -89,6 +89,7 @@
ITEM_PIPELINES = {
"src.data_manager.collectors.scrapers.pipelines.anonymization.AnonymizationPipeline": 250,
+ "src.data_manager.collectors.scrapers.pipelines.markitdown.MarkitdownPipeline": 260,
}
EXTENSIONS = {
diff --git a/src/data_manager/collectors/utils/markitdown_convertor.py b/src/data_manager/collectors/utils/markitdown_convertor.py
new file mode 100644
index 000000000..5cbadd603
--- /dev/null
+++ b/src/data_manager/collectors/utils/markitdown_convertor.py
@@ -0,0 +1,38 @@
+import io
+from markitdown import MarkItDown
+from src.utils.logging import get_logger
+# from src.interfaces.llm.llm_client import LLMClient
+
+logger = get_logger(__name__)
+
+def to_valid_file_extension(file_extension: str) -> str:
+ """
+ Convert the file extension to a valid MarkItDown file extension.
+ """
+ return "." + file_extension.lstrip(".")
+
+class MarkitdownConvertor:
+
+ def __init__(self):
+ self.markitdown = MarkItDown(
+ enable_plugins=True,
+ # llm_client=llm_client,
+ # llm_model=llm_model,
+ )
+
+ def convert(self, content: str, file_extension: str = ".html") -> str:
+ """
+ Convert the content to markdown using MarkItDown.
+ Args:
+ content: The content to convert.
+ file_extension: The file extension of the content.
+ Returns:
+ The converted content.
+ """
+ logger.debug(f"Converting content to markdown: {content}")
+ result = self.markitdown.convert_stream(
+ io.BytesIO(content.encode("utf-8")),
+ file_extension=to_valid_file_extension(file_extension),
+ )
+ logger.debug(f"Markitdown result: {result.text_content if hasattr(result, 'text_content') else str(result)}")
+ return result.text_content if hasattr(result, 'text_content') else str(result)