diff --git a/.gitignore b/.gitignore index 95a926b..723a583 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,11 @@ utils/failed_ids.txt utils/summary.txt utils/node_modules utils/node_modules/* + +# Local development scratch files (not for upstream) +utils/*_backup.py +utils/*_backup.txt +utils/dry_run_*.py +utils/test_import.py +utils/processed_ids_*.txt +utils/failed_ids_*.txt diff --git a/utils/image_install_parallel.py b/utils/image_install_parallel.py index 003eced..c95c952 100644 --- a/utils/image_install_parallel.py +++ b/utils/image_install_parallel.py @@ -25,12 +25,15 @@ counter_lock = threading.Lock() host_error_counts = {} -HOST_ERROR_THRESHOLD = 50 +HOST_ERROR_THRESHOLD = 500 circuit_breaker_lock = threading.Lock() """ -Image install script to download images from a GBIF multimedia.txt file. -Accurate as of September Fall 2025. +Image install script to download images from a GBIF multimedia.txt file. +Updated to download ALL images per gbifID (not just the first successful one). +Each image is saved with a suffix: -00.jpg, -01.jpg, etc. +A gbifID is marked as processed only when ALL its images are successfully downloaded. +Accurate as of March 2026. """ CWD = os.getcwd() @@ -53,7 +56,6 @@ else: processed_ids = set() - today = dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig(filename=f'{LOG_DIR}/image_install_{today}.log', @@ -61,7 +63,6 @@ filemode='w') logger = logging.getLogger(__name__) - link_logger = logging.getLogger("link_logger") link_logger.setLevel(logging.INFO) @@ -69,22 +70,11 @@ GBIF_MULTIMEDIA_DATA = "/projectnb/herbdl/data/GBIF-F25/multimedia.txt" -existing_gbif_datasets = ["/projectnb/herbdl/data/harvard-herbaria/gbif/multimedia.txt", "/projectnb/herbdl/data/GBIF-F24/multimedia.txt"] -existing_gbif_dfs = [pd.read_csv(f, delimiter="\t", usecols=['gbifID']) for f in existing_gbif_datasets] - existing_gbif_ids = set() -for df in existing_gbif_dfs: - existing_gbif_ids.update(df['gbifID'].astype(str).tolist()) - -print(list(existing_gbif_ids)[:10]) - -print(f"Number of existing ids to check for duplicates: {len(existing_gbif_ids)}") - -n_installed = sum(len(files) for _, _, files in os.walk(INSTALL_PATH)) -print(f"Number of already installed images: {n_installed}") - +print("Duplicate check against old datasets disabled.") +n_installed = 0 user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", @@ -103,17 +93,19 @@ session.mount("http://", adapter) session.mount("https://", adapter) -min_delay = 15 -max_delay = 30 -def get_hierarchical_path(base_dir, gbif_id, ext=".jpg"): +def get_hierarchical_path(base_dir, gbif_id, suffix, ext=".jpg"): + """ + Build a hierarchical storage path to avoid too many files in one directory. + suffix: image index suffix e.g. '-00', '-01'. Pass '' for single-image compatibility. + Example: gbifID=1057161997, suffix='-00' -> /105/716/-00.jpg + """ stem = str(gbif_id) prefix1 = stem[:3] if len(stem) >= 3 else stem prefix2 = stem[3:6] if len(stem) >= 6 else "000" - dest_dir = os.path.join(base_dir, prefix1, prefix2) os.makedirs(dest_dir, exist_ok=True) - return os.path.join(dest_dir, f"{stem}{ext}") + return os.path.join(dest_dir, f"{stem}{suffix}{ext}") def _host_from_url(url): @@ -133,10 +125,7 @@ def is_host_blocked(url): def is_host_circuit_broken(url): host = _host_from_url(url) with circuit_breaker_lock: - error_count = host_error_counts.get(host, 0) - if error_count >= HOST_ERROR_THRESHOLD: - return True - return False + return host_error_counts.get(host, 0) >= HOST_ERROR_THRESHOLD def increment_host_errors(url, is_rate_limit=False): if is_rate_limit: @@ -145,9 +134,8 @@ def increment_host_errors(url, is_rate_limit=False): with circuit_breaker_lock: host_error_counts[host] = host_error_counts.get(host, 0) + 1 current_count = host_error_counts[host] - if current_count == HOST_ERROR_THRESHOLD: - logger.error(f"CIRCUIT BREAKER ACTIVATED: Host '{host}' has reached {HOST_ERROR_THRESHOLD} errors. All future URLs from this host will be skipped.") + logger.error(f"CIRCUIT BREAKER ACTIVATED: Host '{host}' reached {HOST_ERROR_THRESHOLD} errors.") elif current_count < HOST_ERROR_THRESHOLD: logger.warning(f"Host '{host}' error count: {current_count}/{HOST_ERROR_THRESHOLD}") @@ -163,36 +151,34 @@ def block_host(url, retry_after=None, timeout_issue=False): from email.utils import parsedate_to_datetime dt_retry = parsedate_to_datetime(retry_after) seconds = max(0, (dt_retry - dt.datetime.now(dt.timezone.utc)).total_seconds()) - except Exception: seconds = HOST_COOLDOWN_DEFAULT with host_lock: host_block_until[host] = now + seconds reason = "timeout issues" if timeout_issue else "rate limiting" - logger.warning(f"Blocking host '{host}' due to {reason}. Cooling down for ~{int(seconds)}s.") + logger.warning(f"Blocking host '{host}' due to {reason} for ~{int(seconds)}s.") def is_duplicate(gbif_id): return str(gbif_id) in existing_gbif_ids + def extract_image_from_iiif_manifest(manifest_url, gbif_id): + """ + Fetch a IIIF manifest and extract direct image URLs from it. + Returns a list of candidate image URLs (multiple resolutions per canvas). + """ try: response = session.get( manifest_url, - headers={ - "User-Agent": random.choice(user_agents), - "Accept": "application/json", - }, + headers={"User-Agent": random.choice(user_agents), "Accept": "application/json"}, timeout=120, ) - if response.status_code != 200: logger.warning(f"Failed to fetch IIIF manifest for {gbif_id}: HTTP {response.status_code}") return [] - manifest = response.json() image_urls = [] - if 'items' in manifest: for item in manifest['items']: if item.get('type') == 'Canvas' and 'items' in item: @@ -208,102 +194,83 @@ def extract_image_from_iiif_manifest(manifest_url, gbif_id): image_urls.append(f"{base_url}/full/1600,/0/default.jpg") image_urls.append(f"{base_url}/full/1200,/0/default.jpg") image_urls.append(f"{base_url}/full/800,/0/default.jpg") - if image_urls: logger.info(f"Extracted {len(image_urls)} image URLs from IIIF manifest for {gbif_id}") - return image_urls - except Exception as e: logger.warning(f"Error parsing IIIF manifest for {gbif_id}: {e}") return [] - -def download_image_from_candidates(gbif_id, candidate_urls, local_path): - """ - Try each URL for this gbif_id until one succeeds. - Skips hosts under cooldown; on 429, cools down that host and tries the next. - """ - - expanded_urls = [] - for url in candidate_urls: - if '/manifest' in url or url.endswith('.json'): - extracted = extract_image_from_iiif_manifest(url, gbif_id) - if extracted: - expanded_urls.extend(extracted) - else: - expanded_urls.append(url) - else: - expanded_urls.append(url) - random.shuffle(expanded_urls) - for image_url in expanded_urls: - if is_host_circuit_broken(image_url): - logger.info(f"Host circuit broken (>{HOST_ERROR_THRESHOLD} errors); skipping for {gbif_id}: {image_url}") - continue - if is_host_blocked(image_url): - continue - - try: - time.sleep(random.uniform(0.2, 0.8)) - image_response = session.get( - image_url, - stream=True, - verify=False, - headers={ - "User-Agent": random.choice(user_agents), - "Connection": "keep-alive", - "Referer": "https://scc-ondemand1.bu.edu/", - }, - timeout=180, - ) - - status = image_response.status_code - - if status == 429: - increment_host_errors(image_url, is_rate_limit=True) - block_host(image_url, image_response.headers.get("Retry-After")) - del image_response - continue - - if status >= 500: - increment_host_errors(image_url) - logger.error(f"Server error {status} for {gbif_id} from {image_url}; trying another source.") - del image_response - continue +def download_single_image(gbif_id, image_url, local_path): + """ + Attempt to download one image from image_url and save it to local_path. + Returns True on success, False on any failure. + Unlike the old download_image_from_candidates (which stopped at first success), + this function handles exactly one URL — the caller loops over all URLs. + """ + if is_host_circuit_broken(image_url): + logger.info(f"Host circuit broken (>{HOST_ERROR_THRESHOLD} errors); skipping {gbif_id}: {image_url}") + return False + if is_host_blocked(image_url): + logger.info(f"Host under cooldown; skipping {gbif_id}: {image_url}") + return False - if status != 200: - increment_host_errors(image_url) - logger.error(f"HTTP {status} for {gbif_id} from {image_url}; trying another source.") - del image_response - continue + try: + time.sleep(random.uniform(0.2, 0.8)) + image_response = session.get( + image_url, + stream=True, + verify=False, + headers={ + "User-Agent": random.choice(user_agents), + "Connection": "keep-alive", + "Referer": "https://scc-ondemand1.bu.edu/", + }, + timeout=180, + ) + status = image_response.status_code - ctype = (image_response.headers.get("Content-Type") or "").lower() - if not ctype: - logger.warning(f"Missing Content-Type header for {gbif_id} from {image_url}, attempting download anyway.") - elif ctype and any(bad in ctype for bad in ["text/html", "text/plain", "application/xml"]): - increment_host_errors(image_url) - logger.error(f"Invalid content type for {gbif_id} from {image_url}: {ctype}. Skipping.") - del image_response - continue + if status == 429: + increment_host_errors(image_url, is_rate_limit=True) + block_host(image_url, image_response.headers.get("Retry-After")) + del image_response + return False - with open(local_path, "wb") as out_file: - shutil.copyfileobj(image_response.raw, out_file) + if status >= 500: + increment_host_errors(image_url) + logger.error(f"Server error {status} for {gbif_id} from {image_url}") + del image_response + return False - logger.info(f"Downloaded {gbif_id} to {local_path} from {image_url}") + if status != 200: + increment_host_errors(image_url) + logger.error(f"HTTP {status} for {gbif_id} from {image_url}") del image_response - return True + return False - except (ConnectTimeout, ReadTimeout, Timeout) as e: - logger.error(f"Timeout error for {gbif_id} from {image_url}: {e}") - block_host(image_url, timeout_issue=True) - continue - except Exception as e: + ctype = (image_response.headers.get("Content-Type") or "").lower() + if ctype and any(bad in ctype for bad in ["text/html", "text/plain", "application/xml"]): increment_host_errors(image_url) - logger.error(f"Error downloading {gbif_id} from {image_url}: {e}") - continue + logger.error(f"Invalid content type for {gbif_id} from {image_url}: {ctype}") + del image_response + return False - return False + with open(local_path, "wb") as out_file: + shutil.copyfileobj(image_response.raw, out_file) + + logger.info(f"Downloaded {gbif_id} suffix based on index -> {local_path}") + del image_response + return True + + except (ConnectTimeout, ReadTimeout, Timeout) as e: + logger.error(f"Timeout for {gbif_id} from {image_url}: {e}") + block_host(image_url, timeout_issue=True) + return False + except Exception as e: + increment_host_errors(image_url) + logger.error(f"Unexpected error for {gbif_id} from {image_url}: {e}") + return False def resize_image(gbif_id, local_path): @@ -311,92 +278,120 @@ def resize_image(gbif_id, local_path): if changed: logger.info(f"Resized {gbif_id} to {new_size}. Path: {local_path}") else: - logger.info(f"Skipped resizing {gbif_id}; already <= target. Path: {local_path}") + logger.info(f"No resize needed for {gbif_id}. Path: {local_path}") def process_id(gbif_id, candidate_urls): + """ + Download ALL images associated with a gbifID. + Each URL is saved as a separate file: -00.jpg, -01.jpg, etc. + A gbifID is written to processed_ids only if ALL images succeed. + If any image fails, the gbifID is written to failed_ids for later retry. + """ global n_installed gbif_id = str(gbif_id) - with counter_lock: - current_total = n_installed - if current_total % 10000 == 0 and current_total > 0: - logger.info(f"Checkpointed {current_total} images so far.") - - if gbif_id in processed_ids or gbif_id in failed_ids: + # Skip if already fully processed + if gbif_id in processed_ids: return - local_path = get_hierarchical_path(INSTALL_PATH, gbif_id, ".jpg") - downloaded = False - existing_valid = False - + # Skip duplicates already present in earlier datasets if is_duplicate(gbif_id): - logger.warning(f"Image {gbif_id} is a duplicate from earlier datasets; skipping download.") - if os.path.exists(local_path): - try: - os.remove(local_path) - logger.warning(f"Removed existing file for duplicate {gbif_id} at {local_path}.") - except Exception as e: - logger.error(f"Failed removing duplicate file for {gbif_id}: {e}") + logger.warning(f"Duplicate gbifID {gbif_id} found in earlier datasets; skipping.") return - if os.path.exists(local_path): - logger.info(f"Image {gbif_id} already exists at {local_path}, verifying size...") - try: - size = get_file_size_in_mb(local_path) - except FileNotFoundError: - size = 0.0 - - if size < 0.01: - logger.warning(f"Image {gbif_id} is too small ({size:.4f} MB), redownloading from alternatives") - downloaded = download_image_from_candidates(gbif_id, candidate_urls, local_path) - if downloaded: - logger.info(f"Successfully re-downloaded {gbif_id}, proceeding to resize.") + # Expand any IIIF manifest URLs into direct image URLs + expanded_urls = [] + for url in candidate_urls: + if '/manifest' in url or url.endswith('.json'): + extracted = extract_image_from_iiif_manifest(url, gbif_id) + if extracted: + expanded_urls.extend(extracted) + # Do not add the manifest URL itself as a download target else: - with checkpoint_lock: - if gbif_id not in processed_ids: - with open(CHECKPOINT_FILE, "a") as f: - f.write(gbif_id + "\n"); f.flush(); os.fsync(f.fileno()) - processed_ids.add(gbif_id) - return - else: - logger.info(f"Attempting {gbif_id} → {local_path} (trying {len(candidate_urls)} source(s))") - downloaded = download_image_from_candidates(gbif_id, candidate_urls, local_path) - if downloaded: - logger.info(f"Successfully downloaded {gbif_id}, proceeding to resize.") - - if downloaded: - with counter_lock: - n_installed += 1 - current = n_installed - if current % 50000 == 0: - send_notification("Image Installation", f"Installed {current} images. Remaining: {total_to_install - current}") - logger.info(f"Installed {current} images") - try: - resize_image(gbif_id, local_path) - with checkpoint_lock: - if gbif_id not in processed_ids: - with open(CHECKPOINT_FILE, "a") as f: - f.write(gbif_id + "\n"); f.flush(); os.fsync(f.fileno()) - processed_ids.add(gbif_id) - except (OSError, UnidentifiedImageError) as e: - try: - os.remove(local_path) - except Exception: - pass - logger.error(f"Error resizing {gbif_id}: {e}. File removed.") - downloaded = False + expanded_urls.append(url) - if not downloaded: - logger.warning(f"All download attempts failed for {gbif_id}. Marking as failed.") + if not expanded_urls: + logger.warning(f"No downloadable URLs found for {gbif_id}") with checkpoint_lock: if gbif_id not in failed_ids: with open(FAILED_FILE, "a") as f: f.write(gbif_id + "\n"); f.flush(); os.fsync(f.fileno()) failed_ids.add(gbif_id) + return + # Deduplicate URLs while preserving order + seen = set() + unique_urls = [] + for url in expanded_urls: + if url not in seen: + seen.add(url) + unique_urls.append(url) + logger.info(f"Processing {gbif_id}: {len(unique_urls)} unique image URL(s)") + all_success = True + downloaded_count = 0 + + for idx, image_url in enumerate(unique_urls): + suffix = f"-{idx:02d}" # e.g. -00, -01, -02 + local_path = get_hierarchical_path(INSTALL_PATH, gbif_id, suffix, ".jpg") + + # Skip this image if it already exists and is valid + if os.path.exists(local_path): + try: + size = get_file_size_in_mb(local_path) + except FileNotFoundError: + size = 0.0 + if size >= 0.01: + logger.info(f"Already exists and valid: {local_path}, skipping.") + downloaded_count += 1 + continue + else: + logger.warning(f"Existing file too small ({size:.4f} MB), re-downloading: {local_path}") + + success = download_single_image(gbif_id, image_url, local_path) + + if success: + try: + resize_image(gbif_id, local_path) + downloaded_count += 1 + with counter_lock: + n_installed += 1 + current = n_installed + if current % 50000 == 0: + send_notification("Image Installation", + f"Installed {current} images. Remaining: {total_to_install - current}") + logger.info(f"Installed {current} images total") + except (OSError, UnidentifiedImageError) as e: + try: + os.remove(local_path) + except Exception: + pass + logger.error(f"Resize failed for {gbif_id} index={idx}: {e}. File removed.") + all_success = False + else: + logger.warning(f"Failed to download {gbif_id} index={idx} from {image_url}") + all_success = False + + # Write to processed_ids only if every image succeeded + # Otherwise write to failed_ids for retry + with checkpoint_lock: + if all_success and downloaded_count == len(unique_urls): + if gbif_id not in processed_ids: + with open(CHECKPOINT_FILE, "a") as f: + f.write(gbif_id + "\n"); f.flush(); os.fsync(f.fileno()) + processed_ids.add(gbif_id) + # Remove from failed_ids if it was a retry that succeeded + if gbif_id in failed_ids: + failed_ids.discard(gbif_id) + logger.info(f"All {downloaded_count} image(s) done for {gbif_id}. Marked as processed.") + else: + if gbif_id not in failed_ids: + with open(FAILED_FILE, "a") as f: + f.write(gbif_id + "\n"); f.flush(); os.fsync(f.fileno()) + failed_ids.add(gbif_id) + logger.warning(f"Partial failure for {gbif_id}: {downloaded_count}/{len(unique_urls)} downloaded.") if __name__ == "__main__": @@ -405,25 +400,32 @@ def process_id(gbif_id, candidate_urls): args = parser.parse_args() country = args.country - cols = ['gbifID','identifier','countryCode'] + cols = ['gbifID', 'identifier', 'countryCode'] df = pd.read_csv(GBIF_MULTIMEDIA_DATA, - delimiter="\t", - usecols=lambda c: c in cols, - on_bad_lines='skip') - - + delimiter="\t", + usecols=lambda c: c in cols, + on_bad_lines='skip') print(f"Length of multimedia.txt (rows): {len(df)}") - + if 'countryCode' in df.columns and country: df = df[df['countryCode'] == country] - - + grouped = df.groupby('gbifID')['identifier'].apply(list) unique_ids = grouped.index.tolist() + + if failed_ids: + failed_ids_int = {int(gid) for gid in failed_ids if gid.isdigit()} + grouped_keys = set(grouped.index) + retry_ids = [gid for gid in failed_ids_int if gid in grouped_keys] + if retry_ids: + print(f"Retrying {len(retry_ids)} previously failed IDs first...") + # NOTE: Do NOT clear failed_ids here. Each ID will be removed + # from failed_ids only after it successfully downloads, in process_id(). + unique_ids = retry_ids total_to_install = len(unique_ids) print(f"Unique gbifIDs to process: {total_to_install}") - send_notification("Image Installation", f"Starting image installation for {total_to_install} unique images") + send_notification("Image Installation", f"Starting image installation for {total_to_install} unique gbifIDs") with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(process_id, gbif_id, grouped.loc[gbif_id]) for gbif_id in unique_ids] @@ -435,10 +437,13 @@ def process_id(gbif_id, candidate_urls): logger.warning("Interrupted by user. Waiting for threads to finish...") executor.shutdown(wait=True, cancel_futures=True) finally: + with open(FAILED_FILE, "w") as f: + for fid in failed_ids: + f.write(str(fid) + "\n") + f.flush(); os.fsync(f.fileno()) logger.info(f"Final processed IDs: {len(processed_ids)}, failed: {len(failed_ids)}") logger.info(f"Circuit breaker status: {len([h for h, c in host_error_counts.items() if c >= HOST_ERROR_THRESHOLD])} hosts permanently blocked") for host, count in sorted(host_error_counts.items(), key=lambda x: x[1], reverse=True)[:10]: logger.info(f" {host}: {count} errors") - - print(f"All done. Number of installed images: {n_installed}") + print(f"All done. Number of installed images: {n_installed}") \ No newline at end of file