Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions scripts/download_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,28 @@
import os
import sys
import threading
import time
import urllib.error
import urllib.request
from concurrent.futures import Future, ThreadPoolExecutor, as_completed, wait
from dataclasses import dataclass
from email.utils import parsedate_to_datetime
from pathlib import Path

MODELS_DIR = Path("models")
CHUNKS_PER_FILE = 4 # Number of parallel connections per file download.
MAX_CONCURRENT_FILES = 2 # Max files downloading simultaneously.
CHUNKS_PER_FILE = 2 # Number of parallel connections per file download.
MAX_CONCURRENT_FILES = 1 # Max files downloading simultaneously.
READ_CHUNK_SIZE = 1 << 23 # 8 MB — buffer size for network reads / disk writes.
MAX_HTTP_RETRIES = 3
RATE_LIMIT_BASE_SLEEP = 30.0
REQUEST_START_DELAY = 1.0


@dataclass(frozen=True)
class RetryDelay:
seconds: float
detail: str


# (filename, url, sha256)
MODELS = [
Expand Down Expand Up @@ -81,19 +95,92 @@ def sha256_file(path: Path) -> str:
return h.hexdigest()


def _retry_after_delay(err: urllib.error.HTTPError, attempt: int) -> RetryDelay:
"""Return the server-requested or exponential sleep time for HTTP 429."""
value = err.headers.get("Retry-After")
if value:
try:
seconds = max(float(value), REQUEST_START_DELAY)
return RetryDelay(seconds, f"Retry-After={value!r} ({seconds:.0f}s)")
except ValueError:
try:
retry_at = parsedate_to_datetime(value)
server_now = time.time()
response_date = err.headers.get("Date")
if response_date:
try:
server_now = parsedate_to_datetime(response_date).timestamp()
except (TypeError, ValueError):
pass
seconds = max(retry_at.timestamp() - server_now, REQUEST_START_DELAY)
if response_date:
detail = (
f"Retry-After date={value!r}, response Date={response_date!r} "
f"({seconds:.0f}s)"
)
else:
detail = (
f"Retry-After date={value!r}, no response Date ({seconds:.0f}s)"
)
return RetryDelay(seconds, detail)
except (TypeError, ValueError):
pass
Comment thread
codingl2k1 marked this conversation as resolved.

seconds = RATE_LIMIT_BASE_SLEEP * (2**attempt)
if value:
detail = f"unparseable Retry-After={value!r}; using exponential backoff"
else:
detail = "no Retry-After header; using exponential backoff"
return RetryDelay(seconds, detail)


def _sleep_before_request() -> None:
if REQUEST_START_DELAY > 0:
time.sleep(REQUEST_START_DELAY)


def open_url_with_retries(req: urllib.request.Request, timeout: int, description: str):
"""Open a URL, retrying HTTP 429 with backoff."""
request_timeout = timeout
for attempt in range(MAX_HTTP_RETRIES + 1):
try:
_sleep_before_request()
return urllib.request.urlopen(req, timeout=request_timeout)
except urllib.error.HTTPError as err:
if err.code != 429 or attempt == MAX_HTTP_RETRIES:
raise

retry_delay = _retry_after_delay(err, attempt)
sleep_for = retry_delay.seconds
request_timeout = max(timeout, int(sleep_for) + 1)
err.close()
print(
f"HTTP 429 while requesting {description}; "
f"{retry_delay.detail}; "
f"next timeout {request_timeout}s; "
f"sleeping {sleep_for:.0f}s before retry {attempt + 1}/"
f"{MAX_HTTP_RETRIES}...",
flush=True,
)
time.sleep(sleep_for)


def get_file_info(url: str) -> tuple[str, int] | None:
"""Follow redirects and get (final_url, file_size) via HEAD request.

Returns None if Range is not supported or Content-Length unknown.
"""
try:
req = urllib.request.Request(url, method="HEAD")
with urllib.request.urlopen(req, timeout=30) as resp:
with open_url_with_retries(req, timeout=30, description="file info") as resp:
final_url = resp.url # After redirects.
accept_ranges = resp.headers.get("Accept-Ranges", "")
length = resp.headers.get("Content-Length")
if accept_ranges.lower() == "bytes" and length:
return final_url, int(length)
except urllib.error.HTTPError as err:
if err.code == 429:
raise
except Exception:
pass
return None
Expand All @@ -119,13 +206,30 @@ def download_chunk(
"""Download a byte range and write to fd at the correct offset."""
req = urllib.request.Request(url)
req.add_header("Range", f"bytes={start}-{end}")
with urllib.request.urlopen(req, timeout=300) as resp:
with open_url_with_retries(
req, timeout=300, description=f"bytes {start}-{end}"
) as resp:
offset = start
while data := resp.read(READ_CHUNK_SIZE):
_write_at(fd, data, offset, lock)
offset += len(data)


def download_single_connection(url: str, dest: Path) -> None:
"""Download a file using one HTTP connection with 429 retries."""
try:
req = urllib.request.Request(url)
with (
open_url_with_retries(req, timeout=300, description=dest.name) as resp,
open(dest, "wb") as out,
):
while data := resp.read(READ_CHUNK_SIZE):
out.write(data)
except Exception:
dest.unlink(missing_ok=True)
raise


def download_file(
url: str, dest: Path, num_chunks: int, chunk_pool: ThreadPoolExecutor
) -> None:
Expand All @@ -138,7 +242,7 @@ def download_file(

# Fallback: single connection (server doesn't support Range, or tiny file).
if file_info is None or file_info[1] < READ_CHUNK_SIZE * num_chunks:
urllib.request.urlretrieve(url, dest)
download_single_connection(url, dest)
return

final_url, file_size = file_info
Expand Down Expand Up @@ -214,7 +318,9 @@ def main() -> int:
# Two pools, no deadlock:
# file_pool — runs download_one (submits to chunk_pool, waits)
# chunk_pool — runs download_chunk (leaf tasks, never submits sub-tasks)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_FILES * CHUNKS_PER_FILE) as chunk_pool:
with ThreadPoolExecutor(
max_workers=MAX_CONCURRENT_FILES * CHUNKS_PER_FILE
) as chunk_pool:
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_FILES) as file_pool:
futures = {
file_pool.submit(download_one, name, url, sha, chunk_pool): name
Expand All @@ -230,9 +336,7 @@ def main() -> int:
for f in futures:
f.cancel()
print(
f"\n{'=' * 50}\n"
f"ERROR: {e}\n"
f"{'=' * 50}",
f"\n{'=' * 50}\n" f"ERROR: {e}\n" f"{'=' * 50}",
file=sys.stderr,
)
chunk_pool.shutdown(wait=False, cancel_futures=True)
Expand Down
Loading