Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions src/web_crawler/web_scraper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import logging
import time
from urllib.parse import urlparse

import validators
from selenium.common.exceptions import StaleElementReferenceException
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
Expand All @@ -19,9 +20,13 @@ class WebScraper:
def __init__(self) -> None:
self.options = Options()
self.options.add_argument("--headless")
self.options.add_argument("--mute-audio")
self.options.add_argument("--no-sandbox")
self.options.add_argument("--disable-dev-shm-usage")
prefs = {"download.default_directory": "/tmp/"}
self.options.add_experimental_option("prefs", prefs)
self.driver = webdriver.Chrome(options=self.options)
self.driver.set_page_load_timeout(15)
self.target = None

# def _refresh_driver(self) -> None:
Expand All @@ -37,7 +42,10 @@ def navigate_sync(self, target: str) -> float:
# self._refresh_driver()
start_time = time.perf_counter()
self.target = target
self.driver.get(target)
try:
self.driver.get(target)
except TimeoutException:
logger.warning(f"Timeout fetching {target}")
end_time = time.perf_counter()
return end_time - start_time

Expand Down Expand Up @@ -66,4 +74,17 @@ def find_all_links(self) -> set[str]:
links.append(href)
except StaleElementReferenceException:
logger.info(f"Stale reference exception: {self.target}")
return set(link for link in links if validators.url(link))
return set(
link
for link in links
if validators.url(link) and not self._is_local_url(link)
)

def _is_local_url(self, url: str) -> bool:
host = urlparse(url).hostname
return host in {"localhost", "0.0.0.0"} or (
host is not None and host.startswith("127.")
)

def close(self) -> None:
self.driver.quit()
95 changes: 49 additions & 46 deletions src/web_crawler_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import time

from selenium.common.exceptions import WebDriverException
from selenium.common.exceptions import WebDriverException, TimeoutException
from urllib3.exceptions import MaxRetryError
from web_crawler.web_scraper import WebScraper
from web_crawler.node import Node
Expand Down Expand Up @@ -37,51 +37,54 @@ async def worker(

logger.info("Started worker.")

while True:
try:
node = await queue.get()

fetch_time = await scraper.navigate(node.url)
netloc_last_visited_at[node.netloc] = seconds_since_program_start()

node.text = scraper.extract_rendered_text()
node.title = scraper.driver.title

inverted_index.insert(node)

links = scraper.find_all_links()

if (node.depth + 1) <= max_depth:
for link in links:
link_node = Node(link, node.depth + 1)
# we prefer net locations we haven't seen before to avoid rate limiting
if netloc_last_visited_at.get(link_node.netloc) is not None:
# prioritize longer time since last visit; mult by -1 since PriorityQueue prioritizes lower numbers
link_node.priority = -1 * (
time.perf_counter()
- netloc_last_visited_at[link_node.netloc]
)
if link_node.url in visited:
continue
visited.add(link_node.url)
await queue.put(link_node)

logger.info(
f"index_bytes={sys.getsizeof(inverted_index._inverted_index)} "
f"queue_size={queue.qsize()} "
f"depth={node.depth} "
f"priority={node.priority} "
f"fetch_time={round(fetch_time, 4)}s "
f"url={node.url}"
)
queue.task_done()
if queue.empty():
break
except (WebDriverException, MaxRetryError) as e:
logger.error(f"Failed to fetch error {e} URL: {node.url}")
queue.task_done()
if queue.empty():
break
try:
while True:
try:
node = await queue.get()

fetch_time = await scraper.navigate(node.url)
netloc_last_visited_at[node.netloc] = seconds_since_program_start()

node.text = scraper.extract_rendered_text()
node.title = scraper.driver.title

inverted_index.insert(node)

links = scraper.find_all_links()

if (node.depth + 1) <= max_depth:
for link in links:
link_node = Node(link, node.depth + 1)
# we prefer net locations we haven't seen before to avoid rate limiting
if netloc_last_visited_at.get(link_node.netloc) is not None:
# prioritize longer time since last visit; mult by -1 since PriorityQueue prioritizes lower numbers
link_node.priority = -1 * (
time.perf_counter()
- netloc_last_visited_at[link_node.netloc]
)
if link_node.url in visited:
continue
visited.add(link_node.url)
await queue.put(link_node)

logger.info(
f"index_bytes={sys.getsizeof(inverted_index._inverted_index)} "
f"queue_size={queue.qsize()} "
f"depth={node.depth} "
f"priority={node.priority} "
f"fetch_time={round(fetch_time, 4)}s "
f"url={node.url}"
)
queue.task_done()
if queue.empty():
break
except (WebDriverException, MaxRetryError, TimeoutException) as e:
logger.error(f"Failed to fetch error {e} URL: {node.url}")
queue.task_done()
if queue.empty():
break
finally:
scraper.close()


async def main():
Expand Down