diff --git a/scripts/download_models.py b/scripts/download_models.py index a92c043c..5c38ddad 100644 --- a/scripts/download_models.py +++ b/scripts/download_models.py @@ -16,14 +16,28 @@ import os import sys import threading +import time +import urllib.error import urllib.request from concurrent.futures import Future, ThreadPoolExecutor, as_completed, wait +from dataclasses import dataclass +from email.utils import parsedate_to_datetime from pathlib import Path MODELS_DIR = Path("models") -CHUNKS_PER_FILE = 4 # Number of parallel connections per file download. -MAX_CONCURRENT_FILES = 2 # Max files downloading simultaneously. +CHUNKS_PER_FILE = 2 # Number of parallel connections per file download. +MAX_CONCURRENT_FILES = 1 # Max files downloading simultaneously. READ_CHUNK_SIZE = 1 << 23 # 8 MB — buffer size for network reads / disk writes. +MAX_HTTP_RETRIES = 3 +RATE_LIMIT_BASE_SLEEP = 30.0 +REQUEST_START_DELAY = 1.0 + + +@dataclass(frozen=True) +class RetryDelay: + seconds: float + detail: str + # (filename, url, sha256) MODELS = [ @@ -81,6 +95,76 @@ def sha256_file(path: Path) -> str: return h.hexdigest() +def _retry_after_delay(err: urllib.error.HTTPError, attempt: int) -> RetryDelay: + """Return the server-requested or exponential sleep time for HTTP 429.""" + value = err.headers.get("Retry-After") + if value: + try: + seconds = max(float(value), REQUEST_START_DELAY) + return RetryDelay(seconds, f"Retry-After={value!r} ({seconds:.0f}s)") + except ValueError: + try: + retry_at = parsedate_to_datetime(value) + server_now = time.time() + response_date = err.headers.get("Date") + if response_date: + try: + server_now = parsedate_to_datetime(response_date).timestamp() + except (TypeError, ValueError): + pass + seconds = max(retry_at.timestamp() - server_now, REQUEST_START_DELAY) + if response_date: + detail = ( + f"Retry-After date={value!r}, response Date={response_date!r} " + f"({seconds:.0f}s)" + ) + else: + detail = ( + f"Retry-After date={value!r}, no response Date ({seconds:.0f}s)" + ) + return RetryDelay(seconds, detail) + except (TypeError, ValueError): + pass + + seconds = RATE_LIMIT_BASE_SLEEP * (2**attempt) + if value: + detail = f"unparseable Retry-After={value!r}; using exponential backoff" + else: + detail = "no Retry-After header; using exponential backoff" + return RetryDelay(seconds, detail) + + +def _sleep_before_request() -> None: + if REQUEST_START_DELAY > 0: + time.sleep(REQUEST_START_DELAY) + + +def open_url_with_retries(req: urllib.request.Request, timeout: int, description: str): + """Open a URL, retrying HTTP 429 with backoff.""" + request_timeout = timeout + for attempt in range(MAX_HTTP_RETRIES + 1): + try: + _sleep_before_request() + return urllib.request.urlopen(req, timeout=request_timeout) + except urllib.error.HTTPError as err: + if err.code != 429 or attempt == MAX_HTTP_RETRIES: + raise + + retry_delay = _retry_after_delay(err, attempt) + sleep_for = retry_delay.seconds + request_timeout = max(timeout, int(sleep_for) + 1) + err.close() + print( + f"HTTP 429 while requesting {description}; " + f"{retry_delay.detail}; " + f"next timeout {request_timeout}s; " + f"sleeping {sleep_for:.0f}s before retry {attempt + 1}/" + f"{MAX_HTTP_RETRIES}...", + flush=True, + ) + time.sleep(sleep_for) + + def get_file_info(url: str) -> tuple[str, int] | None: """Follow redirects and get (final_url, file_size) via HEAD request. @@ -88,12 +172,15 @@ def get_file_info(url: str) -> tuple[str, int] | None: """ try: req = urllib.request.Request(url, method="HEAD") - with urllib.request.urlopen(req, timeout=30) as resp: + with open_url_with_retries(req, timeout=30, description="file info") as resp: final_url = resp.url # After redirects. accept_ranges = resp.headers.get("Accept-Ranges", "") length = resp.headers.get("Content-Length") if accept_ranges.lower() == "bytes" and length: return final_url, int(length) + except urllib.error.HTTPError as err: + if err.code == 429: + raise except Exception: pass return None @@ -119,13 +206,30 @@ def download_chunk( """Download a byte range and write to fd at the correct offset.""" req = urllib.request.Request(url) req.add_header("Range", f"bytes={start}-{end}") - with urllib.request.urlopen(req, timeout=300) as resp: + with open_url_with_retries( + req, timeout=300, description=f"bytes {start}-{end}" + ) as resp: offset = start while data := resp.read(READ_CHUNK_SIZE): _write_at(fd, data, offset, lock) offset += len(data) +def download_single_connection(url: str, dest: Path) -> None: + """Download a file using one HTTP connection with 429 retries.""" + try: + req = urllib.request.Request(url) + with ( + open_url_with_retries(req, timeout=300, description=dest.name) as resp, + open(dest, "wb") as out, + ): + while data := resp.read(READ_CHUNK_SIZE): + out.write(data) + except Exception: + dest.unlink(missing_ok=True) + raise + + def download_file( url: str, dest: Path, num_chunks: int, chunk_pool: ThreadPoolExecutor ) -> None: @@ -138,7 +242,7 @@ def download_file( # Fallback: single connection (server doesn't support Range, or tiny file). if file_info is None or file_info[1] < READ_CHUNK_SIZE * num_chunks: - urllib.request.urlretrieve(url, dest) + download_single_connection(url, dest) return final_url, file_size = file_info @@ -214,7 +318,9 @@ def main() -> int: # Two pools, no deadlock: # file_pool — runs download_one (submits to chunk_pool, waits) # chunk_pool — runs download_chunk (leaf tasks, never submits sub-tasks) - with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_FILES * CHUNKS_PER_FILE) as chunk_pool: + with ThreadPoolExecutor( + max_workers=MAX_CONCURRENT_FILES * CHUNKS_PER_FILE + ) as chunk_pool: with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_FILES) as file_pool: futures = { file_pool.submit(download_one, name, url, sha, chunk_pool): name @@ -230,9 +336,7 @@ def main() -> int: for f in futures: f.cancel() print( - f"\n{'=' * 50}\n" - f"ERROR: {e}\n" - f"{'=' * 50}", + f"\n{'=' * 50}\n" f"ERROR: {e}\n" f"{'=' * 50}", file=sys.stderr, ) chunk_pool.shutdown(wait=False, cancel_futures=True)