diff --git a/.gitignore b/.gitignore index df862b5eb..111414245 100644 --- a/.gitignore +++ b/.gitignore @@ -57,6 +57,8 @@ coverage.xml .pytest_cache/ desktop-tool/export/ desktop-tool/tests/export/ +desktop-tool/tests/cards/ +desktop-tool/TODO.md cover/ @@ -147,6 +149,7 @@ dmypy.json cython_debug/ .idea +.cursor/ .DS_Store MPCAutofill/card_db.db MPCAutofill/build @@ -178,3 +181,9 @@ MPCAutofill/drives.csv frontend/public/mockServiceWorker.js /image-cdn/node_modules/* /image-cdn/.wrangler/* + +# Plan files +*.plan.md + +# Claude Code +claude.md diff --git a/.python-version b/.python-version index 24ee5b1be..2c45fe3a7 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.13 +3.13.11 diff --git a/desktop-tool/assets/icc/Adobe-Color-Profile-EULA.pdf b/desktop-tool/assets/icc/Adobe-Color-Profile-EULA.pdf new file mode 100644 index 000000000..be14c29e7 Binary files /dev/null and b/desktop-tool/assets/icc/Adobe-Color-Profile-EULA.pdf differ diff --git a/desktop-tool/assets/icc/USWebCoatedSWOP.icc b/desktop-tool/assets/icc/USWebCoatedSWOP.icc new file mode 100644 index 000000000..078a6443a Binary files /dev/null and b/desktop-tool/assets/icc/USWebCoatedSWOP.icc differ diff --git a/desktop-tool/assets/placeholder_cover.png b/desktop-tool/assets/placeholder_cover.png new file mode 100644 index 000000000..72d708f45 Binary files /dev/null and b/desktop-tool/assets/placeholder_cover.png differ diff --git a/desktop-tool/autofill.py b/desktop-tool/autofill.py index bd82abc61..adbdfd73c 100644 --- a/desktop-tool/autofill.py +++ b/desktop-tool/autofill.py @@ -1,6 +1,8 @@ # nuitka-project: --mode=onefile # nuitka-project: --include-data-files=client_secrets.json=client_secrets.json # nuitka-project: --include-data-files=post-launch.html=post-launch.html +# nuitka-project: --include-data-files=assets/icc/USWebCoatedSWOP.icc=assets/icc/USWebCoatedSWOP.icc +# nuitka-project: --include-data-files=assets/icc/Adobe-Color-Profile-EULA.pdf=assets/icc/Adobe-Color-Profile-EULA.pdf # nuitka-project: --noinclude-pytest-mode=nofollow # nuitka-project: --windows-icon-from-ico=favicon.ico # nuitka-project-if: {OS} == "Windows": @@ -17,8 +19,11 @@ import logging import os +import shutil +import subprocess import sys from contextlib import nullcontext +from pathlib import Path from typing import Optional, Union import click @@ -31,7 +36,7 @@ from src.io import DEFAULT_WORKING_DIRECTORY, create_image_directory_if_not_exists from src.logging import configure_loggers, logger from src.order import CardOrder, aggregate_and_split_orders -from src.pdf_maker import PdfExporter +from src.pdf_maker import PdfExporter, PdfXConversionConfig, get_ghostscript_path, get_ghostscript_version from src.processing import ImagePostProcessingConfig from src.web_server import WebServer @@ -47,6 +52,94 @@ def prompt_if_no_arguments(prompt: str) -> Union[str, bool]: return f"{prompt} (Press Enter if you're not sure.)" if len(sys.argv) == 1 else False +def get_default_dtc_icc_profile() -> Optional[str]: + if "__compiled__" in globals(): + candidate = Path(sys.argv[0]).resolve().parent / "assets/icc/USWebCoatedSWOP.icc" + else: + candidate = Path(__file__).resolve().parent / "assets/icc/USWebCoatedSWOP.icc" + return str(candidate) if candidate.is_file() else None + + +def wait_for_user_to_complete_order() -> None: + input( + f"If this software has brought you joy and you'd like to throw a few bucks my way,\n" + f"you can find my tip jar here: {bold('https://www.buymeacoffee.com/chilli.axe')} Thank you!\n\n" + f"Press {bold('Enter')} to close this window - your browser window will remain open.\n" + ) + + +def ensure_ghostscript_available() -> str: + while True: + gs_path = get_ghostscript_path() + if gs_path: + version = get_ghostscript_version(gs_path) + if version: + logger.info(f"Ghostscript detected: {bold(version)} at {bold(gs_path)}") + else: + logger.info(f"Ghostscript detected at {bold(gs_path)}") + return gs_path + + logger.info( + "DriveThruCards export requires Ghostscript for PDF/X-1a compliance." + ) + + should_install = click.confirm("Install Ghostscript now?", default=True) + if should_install: + if sys.platform.startswith("darwin"): + if shutil.which("brew") is None: + logger.info("Homebrew not found. Please install Homebrew, then re-run.") + else: + logger.info("Installing Ghostscript via Homebrew...") + result = subprocess.run(["brew", "install", "ghostscript"], check=False) + if result.returncode != 0: + logger.warning("Ghostscript installation via Homebrew failed.") + elif sys.platform.startswith("win"): + if shutil.which("winget") is None: + logger.info( + "winget not found. Please install Ghostscript from " + "https://ghostscript.com/releases/gsdnld.html and ensure it's on PATH." + ) + else: + logger.info("Installing Ghostscript via winget...") + result = subprocess.run( + ["winget", "install", "--id", "ArtifexSoftware.Ghostscript", "--accept-source-agreements"], + check=False, + ) + if result.returncode != 0: + logger.warning("Ghostscript installation via winget failed.") + else: + if shutil.which("sudo") is None: + logger.info( + "sudo not found. Please install Ghostscript with your package manager manually." + ) + elif shutil.which("apt") is not None: + logger.info("Installing Ghostscript via apt...") + result = subprocess.run(["sudo", "apt", "install", "-y", "ghostscript"], check=False) + if result.returncode != 0: + logger.warning("Ghostscript installation via apt failed.") + elif shutil.which("dnf") is not None: + logger.info("Installing Ghostscript via dnf...") + result = subprocess.run(["sudo", "dnf", "install", "-y", "ghostscript"], check=False) + if result.returncode != 0: + logger.warning("Ghostscript installation via dnf failed.") + elif shutil.which("yum") is not None: + logger.info("Installing Ghostscript via yum...") + result = subprocess.run(["sudo", "yum", "install", "-y", "ghostscript"], check=False) + if result.returncode != 0: + logger.warning("Ghostscript installation via yum failed.") + else: + logger.info("No supported package manager found. Please install Ghostscript manually.") + else: + logger.info( + "Please install Ghostscript, then return here to continue.\n" + "macOS: brew install ghostscript\n" + "Windows: https://ghostscript.com/releases/gsdnld.html\n" + "Linux: use your package manager (e.g., apt install ghostscript)." + ) + + input("Press Enter to re-check for Ghostscript, or Ctrl+C to exit.") + + @click.command(context_settings={"show_default": True}) @click.option("-d", "--directory", default=None, help="The directory to search for order XML files.") @click.option( @@ -127,6 +220,11 @@ def prompt_if_no_arguments(prompt: str) -> Union[str, bool]: "\nhttps://pillow.readthedocs.io/en/latest/handbook/concepts.html#filters-comparison-table" ), ) +@click.option( + "--dtc-icc-profile", + default=None, + help="Optional ICC profile path to embed in DriveThruCards exports.", +) @click.option( "--combine-orders/--no-combine-orders", default=True, @@ -175,6 +273,7 @@ def main( image_post_processing: bool, max_dpi: int, downscale_alg: str, + dtc_icc_profile: Optional[str], combine_orders: bool, log_level: str, write_debug_logs: bool, @@ -208,17 +307,67 @@ def main( logger.info("System sleep is being prevented during this execution.") if image_post_processing: logger.info("Images are being post-processed during this execution.") + target_site = TargetSites[site] post_processing_config = ( ImagePostProcessingConfig(max_dpi=max_dpi, downscale_alg=ImageResizeMethods[downscale_alg]) if image_post_processing else None ) - if exportpdf: + if target_site == TargetSites.DriveThruCards: + ensure_ghostscript_available() + using_default_icc = dtc_icc_profile is None + resolved_icc_profile = dtc_icc_profile or get_default_dtc_icc_profile() + if resolved_icc_profile is None: + raise Exception( + "Default DriveThruCards ICC profile was not found. " + "Ensure assets/icc/USWebCoatedSWOP.icc is available or pass --dtc-icc-profile." + ) + if resolved_icc_profile and not os.path.isfile(resolved_icc_profile): + raise Exception( + f"DriveThruCards ICC profile path does not exist or is not a file: {bold(resolved_icc_profile)}" + ) + icc_source = "bundled default" if using_default_icc else "user-provided" + logger.info(f"DriveThruCards ICC profile ({icc_source}): {bold(resolved_icc_profile)}") + # Keep images as RGB JPEG during PDF generation - fpdf can embed these directly + # with DCT compression. Ghostscript will handle CMYK conversion with the ICC + # profile during PDF/X-1a conversion, which produces better results than + # pre-converting to CMYK (which fpdf re-encodes with FlateDecode, bloating file size). + dtc_post_processing_config = ImagePostProcessingConfig( + max_dpi=300, + downscale_alg=ImageResizeMethods[downscale_alg], + output_format="JPEG", + convert_to_cmyk=False, # Let Ghostscript handle CMYK conversion + ) + orders = CardOrder.from_xmls_in_folder(working_directory=working_directory) + for i, order in enumerate(orders, start=1): + exporter = PdfExporter( + order=order, + export_mode="drive_thru_cards", + pdfx_config=PdfXConversionConfig(icc_profile_path=resolved_icc_profile), + ) + pdf_paths = exporter.execute(post_processing_config=dtc_post_processing_config) + # Only use the Ghostscript PDF/X-1a output - no fallback + dtc_pdf_path = next((path for path in reversed(pdf_paths) if path.endswith("_pdfx.pdf")), None) + if dtc_pdf_path is None: + logger.error( + "Ghostscript PDF/X-1a conversion failed. Cannot proceed with DriveThruCards upload.\n" + "Please fix the Ghostscript conversion issue and try again." + ) + continue + AutofillDriver( + browser=Browsers[browser], + target_site=target_site, + binary_location=binary_location, + starting_url=target_site.value.starting_url, + ).execute_drive_thru_cards_order(order=order, pdf_path=dtc_pdf_path) + if i < len(orders): + input(f"Press {bold('Enter')} to continue with the next DriveThruCards order.\n") + wait_for_user_to_complete_order() + elif exportpdf: PdfExporter(order=CardOrder.from_xmls_in_folder(working_directory=working_directory)[0]).execute( post_processing_config=post_processing_config ) else: - target_site = TargetSites[site] card_orders = aggregate_and_split_orders( orders=CardOrder.from_xmls_in_folder(working_directory=working_directory), target_site=target_site, @@ -235,11 +384,7 @@ def main( auto_save_threshold=auto_save_threshold if auto_save else None, post_processing_config=post_processing_config, ) - input( - f"If this software has brought you joy and you'd like to throw a few bucks my way,\n" - f"you can find my tip jar here: {bold('https://www.buymeacoffee.com/chilli.axe')} Thank you!\n\n" - f"Press {bold('Enter')} to close this window - your browser window will remain open.\n" - ) + wait_for_user_to_complete_order() except ValidationException as e: input(f"There was a problem with your order file:\n\n{bold(e)}\n\nPress Enter to exit.") sys.exit(0) diff --git a/desktop-tool/requirements.txt b/desktop-tool/requirements.txt index ab6075257..a5b08a836 100644 --- a/desktop-tool/requirements.txt +++ b/desktop-tool/requirements.txt @@ -19,4 +19,6 @@ ratelimit~=2.2.1 requests~=2.32.5 sanitize-filename~=1.2.0 selenium~=4.35 +setuptools # Required for undetected-chromedriver on Python 3.13+ (distutils removed) +undetected-chromedriver~=3.5.5 wakepy==0.6.0 diff --git a/desktop-tool/src/constants.py b/desktop-tool/src/constants.py index 663212770..c50007b7b 100644 --- a/desktop-tool/src/constants.py +++ b/desktop-tool/src/constants.py @@ -152,6 +152,34 @@ def accept_settings_url(self) -> str: return self.format_url(self.accept_settings_url_route) +@attr.s +class DriveThruCardsSelectors: + product_url: str = attr.ib() + pdf_upload_input_selector: str = attr.ib(default="input[type='file']") + pdf_upload_input_index: int = attr.ib(default=0) + quantity_selector: str = attr.ib(default="") + add_to_cart_selector: str = attr.ib(default="") + continue_selector: str = attr.ib(default="") + # Login selectors - two step process: click login button, then click "Go to Log in" link + login_button_selector: str = attr.ib(default="button[data-cy='login']") + # Target the login link in the modal, not the logout link (both can have href='/en/') + go_to_login_selector: str = attr.ib(default=".modal a[href='/en/'], .modal-content a[href='/en/']") + # Publisher Tools link only appears when logged in as a publisher + logged_in_indicator_selector: str = attr.ib(default="a[href*='pub_tools.php']") + + +@attr.s +class DriveThruCardsSite: + base_url: str = attr.ib(default="https://www.drivethrucards.com") + selectors: DriveThruCardsSelectors = attr.ib( + default=attr.Factory(lambda: DriveThruCardsSelectors(product_url="https://www.drivethrucards.com")) + ) + + @property + def starting_url(self) -> str: + return self.selectors.product_url + + class TargetSites(Enum): MakePlayingCards = TargetSite( base_url="https://www.makeplayingcards.com", starting_url_route="design/custom-blank-card.html" @@ -214,6 +242,12 @@ class TargetSites(Enum): Cardstocks.P10: "Plastique (100%)", }, ) + DriveThruCards = DriveThruCardsSite( + selectors=DriveThruCardsSelectors( + product_url="https://www.drivethrucards.com", + pdf_upload_input_selector="input[type='file']", + ) + ) DPI_HEIGHT_RATIO = 300 / 1110 # TODO: share this between desktop tool and backend diff --git a/desktop-tool/src/driver.py b/desktop-tool/src/driver.py index f29e26e03..35c6d4083 100644 --- a/desktop-tool/src/driver.py +++ b/desktop-tool/src/driver.py @@ -1,4 +1,5 @@ import datetime as dt +import re import textwrap import time from concurrent.futures import ThreadPoolExecutor @@ -11,9 +12,13 @@ from InquirerPy import inquirer from selenium.common import exceptions as sl_exc from selenium.common.exceptions import NoAlertPresentException, NoSuchElementException +from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.support.expected_conditions import ( + element_to_be_clickable, + presence_of_element_located, invisibility_of_element, text_to_be_present_in_element, visibility_of_element_located, @@ -202,6 +207,11 @@ def wait(self) -> bool: logger.debug(e) return False + def wait_for_selector(self, selector: str, timeout_seconds: int = 30) -> None: + WebDriverWait(self.driver, timeout_seconds, poll_frequency=0.2).until( + presence_of_element_located((By.CSS_SELECTOR, selector)) + ) + def set_state(self, state: str, action: Optional[str] = None) -> None: self.state = state self.action = action @@ -323,6 +333,739 @@ def handle_alert(self) -> None: # endregion + # region DriveThruCards + + def _try_click_turnstile_checkbox(self) -> bool: + """ + Attempt to find and click the Cloudflare Turnstile checkbox. + Returns True if checkbox was found and clicked, False otherwise. + """ + try: + # Disable implicit wait for fast polling + self.driver.implicitly_wait(0) + try: + # Turnstile is rendered in an iframe - find it by common attributes + iframe_selectors = [ + "iframe[src*='challenges.cloudflare.com']", + "iframe[title*='cloudflare']", + "iframe[title*='Cloudflare']", + "iframe[id*='cf-']", + ] + + iframe = None + for selector in iframe_selectors: + iframes = self.driver.find_elements(By.CSS_SELECTOR, selector) + if iframes: + iframe = iframes[0] + break + + if not iframe: + return False + + # Switch to iframe context + self.driver.switch_to.frame(iframe) + + try: + # Look for the checkbox input or clickable verification element + checkbox_selectors = [ + "input[type='checkbox']", + ".ctp-checkbox-label", + "#challenge-stage", + "[data-testid='challenge-input']", + ] + + for selector in checkbox_selectors: + elements = self.driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + if element.is_displayed(): + element.click() + logger.debug("Clicked Turnstile checkbox") + return True + finally: + # Always switch back to main content + self.driver.switch_to.default_content() + + return False + finally: + self.driver.implicitly_wait(5) + except Exception as e: + logger.debug(f"Error clicking Turnstile checkbox: {e}") + # Ensure we're back in main content even on error + try: + self.driver.switch_to.default_content() + except Exception: + pass + return False + + def _is_cloudflare_challenge_active(self) -> bool: + """Check if a Cloudflare challenge page is currently displayed.""" + try: + title = self.driver.title.lower() + if "just a moment" in title: + return True + # Temporarily disable implicit wait for fast polling + self.driver.implicitly_wait(0) + try: + # Also check for challenge body text + body_text = self.driver.find_element(By.TAG_NAME, "body").text.lower() + if "verifying you are human" in body_text or "checking your browser" in body_text: + return True + return False + finally: + self.driver.implicitly_wait(5) + except Exception: + return False + + def _is_site_loaded(self) -> bool: + """Check if the actual site content has loaded (past Cloudflare).""" + selectors = self.target_site.value.selectors + try: + title = self.driver.title.lower() + if "just a moment" in title: + return False + # Temporarily disable implicit wait for fast polling + self.driver.implicitly_wait(0) + try: + # Check for login button or logged-in indicator + login_btns = self.driver.find_elements(By.CSS_SELECTOR, selectors.login_button_selector) + logged_in = self.driver.find_elements(By.CSS_SELECTOR, selectors.logged_in_indicator_selector) + return bool(login_btns or logged_in) + finally: + # Restore implicit wait + self.driver.implicitly_wait(5) + except Exception: + return False + + def wait_for_cloudflare_challenge(self, timeout_seconds: int = 300) -> None: + """ + Wait for the Cloudflare challenge to be completed by waiting for site content to appear. + Uses aggressive polling and attempts to auto-click the Turnstile checkbox. + Waits for either the login button or Publisher Tools link (if already logged in). + """ + self.set_state(States.defining_order, "Waiting for site to load") + logger.info("Waiting for DriveThruCards to load...") + + poll_interval = 0.5 # Check every 500ms for responsive detection + turnstile_click_interval = 3.0 # Try clicking Turnstile every 3 seconds + last_turnstile_attempt = 0.0 + challenge_detected = False + start_time = time.time() + + while time.time() - start_time < timeout_seconds: + # Check if site has loaded successfully + if self._is_site_loaded(): + logger.info("Site loaded successfully!") + return + + # Check if we're on a Cloudflare challenge + if self._is_cloudflare_challenge_active(): + if not challenge_detected: + challenge_detected = True + logger.info( + "Cloudflare challenge detected. Attempting auto-solve...\n" + "If this doesn't work, please complete the captcha manually." + ) + + # Periodically try to click the Turnstile checkbox + current_time = time.time() + if current_time - last_turnstile_attempt >= turnstile_click_interval: + last_turnstile_attempt = current_time + if self._try_click_turnstile_checkbox(): + logger.debug("Turnstile click attempted, waiting for verification...") + + time.sleep(poll_interval) + + # Timeout reached + logger.warning( + f"Timeout after {timeout_seconds}s waiting for site to load. " + "Attempting to continue anyway." + ) + + def is_dtc_user_authenticated(self) -> bool: + """Check if the user is logged in to DriveThruCards.""" + selectors = self.target_site.value.selectors + try: + # Temporarily disable implicit wait for fast check + self.driver.implicitly_wait(0) + try: + # Look for logout button as indicator of being logged in + # This needs to be specific - only match elements that appear when logged in + logged_in_elements = self.driver.find_elements( + By.CSS_SELECTOR, selectors.logged_in_indicator_selector + ) + # Filter to only visible elements + visible_elements = [el for el in logged_in_elements if el.is_displayed()] + if visible_elements: + logger.debug(f"Found {len(visible_elements)} visible logged-in indicator(s)") + for el in visible_elements[:3]: + logger.debug(f" Element: tag={el.tag_name}, text='{el.text}', aria-label='{el.get_attribute('aria-label')}'") + return len(visible_elements) > 0 + finally: + self.driver.implicitly_wait(5) + except Exception as e: + logger.debug(f"Error checking auth status: {e}") + return False + + def click_element_with_retry(self, element: Any) -> bool: + """ + Attempt to click an element using multiple strategies. + Returns True if click succeeded, False otherwise. + """ + # Strategy 1: Scroll into view and use native click + try: + self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element) + element.click() + return True + except Exception as e: + logger.debug(f"Native click failed: {e}") + + # Strategy 2: JavaScript click (bypasses overlays and visibility issues) + try: + self.driver.execute_script("arguments[0].click();", element) + return True + except Exception as e: + logger.debug(f"JavaScript click failed: {e}") + + return False + + def click_element_polling(self, by: By, selector: str, timeout: int = 30) -> bool: + """ + Aggressively poll for an element and click it as soon as it's available. + No fixed waits - keeps trying until success or timeout. + """ + start = time.time() + # Disable implicit wait for true aggressive polling + self.driver.implicitly_wait(0) + try: + while time.time() - start < timeout: + try: + elements = self.driver.find_elements(by, selector) + for el in elements: + if el.is_displayed() and self.click_element_with_retry(el): + return True + except Exception: + pass + time.sleep(0.1) # Small delay to avoid CPU spinning + return False + finally: + self.driver.implicitly_wait(5) + + def _click_dtc_login_button(self) -> bool: + """Click the DriveThruCards login button to open the login modal.""" + selectors = self.target_site.value.selectors + return self.click_element_polling(By.CSS_SELECTOR, selectors.login_button_selector, timeout=15) + + def authenticate_dtc(self) -> None: + """ + Handle DriveThruCards login flow. + """ + selectors = self.target_site.value.selectors + + if self.is_dtc_user_authenticated(): + logger.info("Already logged in to DriveThruCards.") + return + + self.set_state(States.defining_order, "Awaiting DriveThruCards login") + + logger.info("Please log in to your DriveThruCards account.") + + # Now attempt actual login flow + if not self._click_dtc_login_button(): + logger.info( + "Could not find or click login button automatically.\n" + "Please click the login button manually." + ) + + # Click "Go to Log in" link - poll aggressively + go_to_login_clicked = self.click_element_polling( + By.XPATH, "//a[contains(normalize-space(), 'Go to Log in')]", timeout=15 + ) + if not go_to_login_clicked: + # Fallback to CSS selector + go_to_login_clicked = self.click_element_polling( + By.CSS_SELECTOR, selectors.go_to_login_selector, timeout=5 + ) + + if go_to_login_clicked: + logger.info("Navigated to login page.") + else: + logger.info( + "Could not find 'Go to Log in' link automatically.\n" + "Please navigate to the login page manually if needed." + ) + + logger.info( + "Please complete the login process in the browser window.\n" + "The tool will automatically continue once you're logged in." + ) + + # Wait for user to complete login (timeout after 5 minutes) + timeout_seconds = 300 + start_time = time.time() + while time.time() - start_time < timeout_seconds: + time.sleep(1) + if self.is_dtc_user_authenticated(): + logger.info("Successfully logged in to DriveThruCards!") + return + + logger.warning( + f"Login timeout after {timeout_seconds}s. " + "Please ensure you're logged in before continuing." + ) + + def navigate_to_dtc_product_setup(self) -> None: + """ + Navigate through DriveThruCards to the product setup page. + Steps: Publisher Tools -> Set up a new title + """ + self.set_state(States.defining_order, "Navigating to Publisher Tools") + selectors = self.target_site.value.selectors + + # Step 1: Click "Publisher Tools" link (use same selector as login detection) + try: + publisher_tools_link = WebDriverWait(self.driver, 5).until( + element_to_be_clickable((By.CSS_SELECTOR, selectors.logged_in_indicator_selector)) + ) + logger.info("Found 'Publisher Tools' link, clicking...") + if self.click_element_with_retry(publisher_tools_link): + logger.info("Successfully clicked 'Publisher Tools'.") + else: + logger.warning("Could not click 'Publisher Tools' automatically. Please click it manually.") + except sl_exc.TimeoutException: + logger.warning("Could not find 'Publisher Tools' link. Trying direct navigation...") + self.driver.get("https://site.drivethrucards.com/pub_tools.php") + + # Step 2: Wait for and click "Set up a new title" link + self.set_state(States.defining_order, "Navigating to product setup") + try: + setup_link = WebDriverWait(self.driver, 10).until( + element_to_be_clickable((By.XPATH, "//a[contains(@href, 'pub_enter_product.php')]")) + ) + logger.info("Found 'Set up a new title' link, clicking...") + if self.click_element_with_retry(setup_link): + logger.info("Successfully clicked 'Set up a new title'.") + else: + logger.warning("Could not click 'Set up a new title' automatically. Please click it manually.") + except sl_exc.TimeoutException: + logger.warning("Could not find 'Set up a new title' link. Trying direct navigation...") + self.driver.get("https://tools.drivethrucards.com/pub_enter_product.php") + + def fill_dtc_product_form(self, order: CardOrder) -> None: + """ + Fill out the DriveThruCards product setup form (first page). + """ + import os + + self.set_state(States.defining_order, "Filling product form") + + # Get the placeholder cover image path (bundled with assets) + assets_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets") + placeholder_cover_path = os.path.join(assets_dir, "placeholder_cover.png") + + # Generate title: order name + today's date + today = dt.date.today().strftime("%Y-%m-%d") + title = f"{order.name or 'Order'} {today}" + + # Fill in the title field + try: + title_input = WebDriverWait(self.driver, 10).until( + presence_of_element_located((By.ID, "products_name")) + ) + title_input.clear() + title_input.send_keys(title) + logger.info(f"Set product title to: {title}") + except Exception as e: + logger.warning(f"Could not fill title field: {e}") + + # Fill in the special price field with "0" + try: + price_input = self.driver.find_element(By.ID, "options_values_total_price") + price_input.clear() + price_input.send_keys("0") + logger.info("Set special price to: 0") + except Exception as e: + logger.warning(f"Could not fill price field: {e}") + + # Upload the placeholder cover image + try: + image_input = self.driver.find_element(By.NAME, "products_image") + image_input.send_keys(placeholder_cover_path) + logger.info(f"Uploaded placeholder cover image: {placeholder_cover_path}") + except Exception as e: + logger.warning(f"Could not upload cover image: {e}") + + # Check the two filter checkboxes + try: + checkbox1 = self.driver.find_element(By.ID, "filter_44550") + if not checkbox1.is_selected(): + self.click_element_with_retry(checkbox1) + logger.debug("Checked filter_44550") + except Exception as e: + logger.warning(f"Could not check filter_44550: {e}") + + try: + checkbox2 = self.driver.find_element(By.ID, "filter_1000138") + if not checkbox2.is_selected(): + self.click_element_with_retry(checkbox2) + logger.debug("Checked filter_1000138") + except Exception as e: + logger.warning(f"Could not check filter_1000138: {e}") + + # Click the first submit button (Save Title Data and Continue to Preview Description) + self.set_state(States.defining_order, "Submitting product form") + try: + submit_button = WebDriverWait(self.driver, 10).until( + element_to_be_clickable((By.ID, "submit_id")) + ) + logger.info("Clicking submit button...") + self.click_element_with_retry(submit_button) + logger.info("Product form submitted successfully.") + except Exception as e: + logger.warning(f"Could not click submit button: {e}") + + def submit_dtc_description_page(self) -> None: + """ + Click 'Save and Continue' on the description preview page. + Waits for the button to be available (page loaded from previous step). + """ + self.set_state(States.defining_order, "Saving description") + try: + save_continue_button = WebDriverWait(self.driver, 15).until( + element_to_be_clickable((By.ID, "clicked_element")) + ) + logger.info("Clicking 'Save and Continue' button...") + self.click_element_with_retry(save_continue_button) + logger.info("Description page submitted.") + except Exception as e: + logger.warning(f"Could not click 'Save and Continue': {e}") + + def open_dtc_upload_page(self) -> None: + """ + Navigate to the upload page by extracting the URL from the 'Upload print-ready file' + button's onclick attribute, rather than letting it open a new tab via window.open(). + """ + self.set_state(States.defining_order, "Opening upload page") + + try: + upload_button = WebDriverWait(self.driver, 15).until( + element_to_be_clickable((By.XPATH, "//button[contains(@onclick, 'pub_upload_podcard_files.php')]")) + ) + onclick = upload_button.get_attribute("onclick") or "" + # Extract URL from onclick like: window.open('https://...pub_upload_podcard_files.php?products_id=123'); + url_match = re.search(r"window\.open\(['\"]([^'\"]+)['\"]\)", onclick) + if url_match: + upload_url = url_match.group(1) + self.driver.execute_script("window.location.href = arguments[0];", upload_url) + # Wait for the upload page to load + WebDriverWait(self.driver, 15).until( + presence_of_element_located((By.ID, "card_type_select")) + ) + logger.info(f"Navigated to upload page: {self.driver.current_url}") + else: + logger.warning(f"Could not extract URL from upload button onclick: {onclick}") + except Exception as e: + logger.warning(f"Could not open upload page: {e}") + + def select_card_type_and_upload_pdf(self, pdf_path: str) -> None: + """ + Select 'Premium Euro Poker Card(s)' from dropdown and upload the PDF. + Waits for elements to be available instead of using fixed sleeps. + """ + import os + + self.set_state(States.inserting_fronts, "Selecting card type") + + # Wait for and select the Euro Poker card option from the dropdown + try: + # Wait for dropdown to be present and interactable + WebDriverWait(self.driver, 15).until( + presence_of_element_located((By.ID, "card_type_select")) + ) + # Re-fetch the element to avoid stale reference after page/tab switch + card_type_dropdown = self.driver.find_element(By.ID, "card_type_select") + select = Select(card_type_dropdown) + + # Find option containing "Euro Poker" (case-insensitive search) + euro_poker_option_text = None + for option in select.options: + if "euro poker" in option.text.lower(): + euro_poker_option_text = option.text + break + + if euro_poker_option_text: + select.select_by_visible_text(euro_poker_option_text) + logger.info(f"Selected '{euro_poker_option_text}' from dropdown.") + else: + logger.warning("Could not find Euro Poker option in dropdown.") + except Exception as e: + logger.warning(f"Could not select card type: {e}") + + # Convert PDF path to absolute if needed + if not os.path.isabs(pdf_path): + pdf_path = os.path.abspath(pdf_path) + + # Verify the file exists + if not os.path.exists(pdf_path): + logger.error(f"PDF file not found: {pdf_path}") + return + + logger.info(f"PDF file found: {pdf_path} ({os.path.getsize(pdf_path)} bytes)") + + # Wait for the dropzone to appear after card type selection + self.set_state(States.inserting_fronts, "Uploading PDF") + try: + # Wait for the dropzone div to be present + dropzone_div = WebDriverWait(self.driver, 15).until( + presence_of_element_located((By.ID, "uploadfiles")) + ) + logger.debug("Dropzone div found.") + + # Click the dropzone to initialize Dropzone's hidden input + # This should create the .dz-hidden-input element + logger.debug("Clicking dropzone to initialize hidden input...") + self.driver.execute_script("arguments[0].click();", dropzone_div) + + # Brief wait for the file dialog to appear, then send Escape to close it + time.sleep(0.5) + ActionChains(self.driver).send_keys(Keys.ESCAPE).perform() + time.sleep(0.5) + + # Find the file input and send the file - do this in a single operation + # to avoid stale element references + logger.info(f"Uploading PDF: {pdf_path}") + + def find_and_use_file_input() -> bool: + """Find a usable file input and send the file path to it.""" + # Strategy 1: Dropzone hidden input + try: + fi = self.driver.find_element(By.CSS_SELECTOR, ".dz-hidden-input") + logger.debug("Found Dropzone hidden input, sending file...") + fi.send_keys(pdf_path) + return True + except (sl_exc.NoSuchElementException, sl_exc.StaleElementReferenceException): + pass + + # Strategy 2: Any file input that's not the fallback + try: + file_inputs = self.driver.find_elements(By.CSS_SELECTOR, "input[type='file']") + logger.debug(f"Found {len(file_inputs)} file input(s) on page.") + for fi in file_inputs: + try: + name = fi.get_attribute("name") + if name == "groups_csv": + continue + logger.debug(f"Trying file input: name={name}") + fi.send_keys(pdf_path) + return True + except sl_exc.StaleElementReferenceException: + continue + except Exception as e: + logger.debug(f"Error with file inputs: {e}") + + # Strategy 3: Use the fallback input + try: + logger.debug("Using fallback file input...") + self.driver.execute_script( + "document.getElementById('dropzoneFallback').style.display = 'block';" + ) + fi = self.driver.find_element(By.CSS_SELECTOR, "#dropzoneFallback input[type='file']") + fi.send_keys(pdf_path) + return True + except Exception as e: + logger.debug(f"Fallback input failed: {e}") + + return False + + # Try up to 3 times to handle any remaining stale element issues + file_sent = False + for attempt in range(3): + if find_and_use_file_input(): + file_sent = True + break + logger.debug(f"Attempt {attempt + 1} failed, retrying...") + time.sleep(0.5) + + if not file_sent: + logger.warning("Could not send file to any input element.") + return + + # Trigger change event on all file inputs (one of them has our file) + self.driver.execute_script(""" + document.querySelectorAll('input[type="file"]').forEach(function(input) { + input.dispatchEvent(new Event('change', { bubbles: true })); + }); + """) + logger.debug("Dispatched change event on file inputs.") + + # Wait for the upload button to become enabled (Dropzone enables it when files are queued) + try: + WebDriverWait(self.driver, 10).until( + lambda d: not d.find_element(By.ID, "dropzoneButton").get_attribute("disabled") + ) + logger.debug("Upload button is now enabled.") + except sl_exc.TimeoutException: + logger.debug("Button didn't become enabled automatically, forcing it.") + + # Click the upload button using JavaScript + upload_clicked = self.driver.execute_script(""" + var btn = document.getElementById('dropzoneButton'); + if (btn) { + btn.disabled = false; + btn.style.display = 'block'; + btn.click(); + return true; + } + return false; + """) + if upload_clicked: + logger.info("Clicked 'Begin Card File Upload' button via JavaScript.") + else: + logger.warning("Could not find upload button.") + + # Also try to trigger Dropzone's processQueue as a backup + try: + self.driver.execute_script(""" + var dz = Dropzone.forElement('#uploadfiles'); + if (dz && dz.files && dz.files.length > 0) { + dz.processQueue(); + } + """) + logger.debug("Triggered Dropzone processQueue.") + except Exception as e: + logger.debug(f"Could not trigger processQueue: {e}") + + # Wait for upload to complete - look for success message + try: + WebDriverWait(self.driver, 120).until( + lambda d: "successfully uploaded" in ( + d.find_element(By.ID, "status_messages").text.lower() + if d.find_elements(By.ID, "status_messages") else "" + ) + ) + logger.info("PDF upload completed to DriveThruCards.") + except sl_exc.TimeoutException: + logger.warning("Could not confirm upload completion. Please verify manually.") + + # Wait for the continue button to become active. + # The button starts with an onclick handler that shows an error and returns false. + # After upload validation, the page JS replaces this handler to enable navigation. + # We detect activation by checking that the onclick no longer contains "return false". + try: + def continue_button_is_active(d: Any) -> Any: + btn = d.find_element(By.ID, "continue_button") + onclick = btn.get_attribute("onclick") or "" + if "return false" in onclick: + return False + return btn + + continue_button = WebDriverWait(self.driver, 60).until(continue_button_is_active) + logger.debug(f"Continue button activated. onclick: {continue_button.get_attribute('onclick')}") + self.driver.execute_script("arguments[0].click();", continue_button) + logger.info("Clicked 'Click here after uploading your files' button.") + except sl_exc.TimeoutException: + logger.warning("Continue button did not activate. Please click it manually.") + + # Click the "Complete Setup" button on the next page + try: + complete_button = WebDriverWait(self.driver, 30).until( + element_to_be_clickable((By.ID, "submit_id")) + ) + self.driver.execute_script("arguments[0].click();", complete_button) + logger.info("Clicked 'Complete Setup' button.") + except sl_exc.TimeoutException: + logger.warning("Could not find 'Complete Setup' button. Please click it manually.") + + # Click the "buy now" link to start placing the order. + # This link has target="_blank", so navigate directly to avoid new-tab issues. + try: + buy_now_link = WebDriverWait(self.driver, 30).until( + element_to_be_clickable((By.CSS_SELECTOR, "a[href*='action=buy_now']")) + ) + buy_now_href = buy_now_link.get_attribute("href") + if buy_now_href: + self.driver.get(buy_now_href) + logger.info("Navigated to 'buy now' page to start placing the order.") + else: + self.driver.execute_script("arguments[0].click();", buy_now_link) + logger.info("Clicked 'buy now' link.") + except sl_exc.TimeoutException: + logger.warning("Could not find 'buy now' link. Please click it manually.") + except Exception as e: + logger.warning(f"Could not upload PDF: {e}") + + def execute_drive_thru_cards_order(self, order: CardOrder, pdf_path: str) -> None: + t = time.time() + selectors = self.target_site.value.selectors + self.set_state(States.defining_order, "Opening DriveThruCards") + self.driver.get(self.target_site.value.starting_url) + + # Handle Cloudflare challenge if present + self.wait_for_cloudflare_challenge() + + # Handle login + self.authenticate_dtc() + + # Navigate to product setup page + self.navigate_to_dtc_product_setup() + + # Fill out the product form (first page - title, price, cover image, checkboxes) + self.fill_dtc_product_form(order=order) + + # Submit the description preview page + self.submit_dtc_description_page() + + # Open the upload page (new tab) + self.open_dtc_upload_page() + + # Select card type and upload the PDF + self.select_card_type_and_upload_pdf(pdf_path=pdf_path) + + # DriveThruCards automation complete - user should finish checkout manually + log_hours_minutes_seconds_elapsed(t) + logger.info( + "DriveThruCards order setup complete!\n" + "You are now at the checkout page. Please review your order and complete the purchase manually." + ) + return + + if selectors.quantity_selector: + try: + self.wait_for_selector(selectors.quantity_selector) + quantity_input = self.driver.find_element(By.CSS_SELECTOR, selectors.quantity_selector) + quantity_input.clear() + quantity_input.send_keys(str(order.details.quantity)) + except sl_exc.WebDriverException as exc: + logger.warning(f"Failed to set DriveThruCards quantity automatically: {exc}") + + self.set_state(States.inserting_fronts, "Uploading PDF") + self.wait_for_selector(selectors.pdf_upload_input_selector) + upload_inputs = self.driver.find_elements(By.CSS_SELECTOR, selectors.pdf_upload_input_selector) + if len(upload_inputs) <= selectors.pdf_upload_input_index: + raise Exception( + f"DriveThruCards PDF upload input not found at index {selectors.pdf_upload_input_index} " + f"using selector {selectors.pdf_upload_input_selector}." + ) + upload_inputs[selectors.pdf_upload_input_index].send_keys(pdf_path) + logger.info("DriveThruCards PDF uploaded. Please confirm the preview and continue checkout.") + + if selectors.continue_selector: + try: + self.wait_for_selector(selectors.continue_selector) + self.driver.find_element(By.CSS_SELECTOR, selectors.continue_selector).click() + except sl_exc.WebDriverException: + logger.warning("DriveThruCards continue button could not be clicked automatically.") + + if selectors.add_to_cart_selector: + try: + self.wait_for_selector(selectors.add_to_cart_selector) + self.driver.find_element(By.CSS_SELECTOR, selectors.add_to_cart_selector).click() + except sl_exc.WebDriverException: + logger.warning("DriveThruCards add-to-cart button could not be clicked automatically.") + + log_hours_minutes_seconds_elapsed(t) + + # endregion + # region uploading @exception_retry_skip_handler diff --git a/desktop-tool/src/io.py b/desktop-tool/src/io.py index a081a0c74..35767d104 100644 --- a/desktop-tool/src/io.py +++ b/desktop-tool/src/io.py @@ -14,7 +14,12 @@ import src.constants as constants from src.logging import logger -from src.processing import ImagePostProcessingConfig, post_process_image +from src.processing import ( + ImagePostProcessingConfig, + get_post_processed_path, + post_process_image, + save_processed_image, +) thread_local = threading.local() # Should only be called once per thread @@ -194,8 +199,12 @@ def download_google_drive_file( if post_processing_config is not None: logger.debug(f"Post-processing {drive_id}...") - processed_image = post_process_image(raw_image=file_bytes, config=post_processing_config) - processed_image.save(file_path) + output_path = get_post_processed_path(file_path=file_path, config=post_processing_config) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + processed_image, icc_profile_bytes = post_process_image(raw_image=file_bytes, config=post_processing_config) + save_processed_image( + processed_image, file_path=output_path, config=post_processing_config, icc_profile_bytes=icc_profile_bytes + ) else: # Save the bytes directly to disk - avoid reading in pillow in case any quality degradation occurs with open(file_path, "wb") as f: diff --git a/desktop-tool/src/order.py b/desktop-tool/src/order.py index ad03df03a..024a6a4aa 100644 --- a/desktop-tool/src/order.py +++ b/desktop-tool/src/order.py @@ -27,7 +27,12 @@ get_image_directory, ) from src.logging import logger -from src.processing import ImagePostProcessingConfig +from src.processing import ( + ImagePostProcessingConfig, + get_post_processed_path, + post_process_image, + save_processed_image, +) from src.utils import unpack_element @@ -179,16 +184,77 @@ def download_image( post_processing_config: Optional[ImagePostProcessingConfig], ) -> None: try: + def is_image_valid(file_path: str) -> bool: + try: + from PIL import Image + + with Image.open(file_path) as img: + img.verify() + return True + except Exception: + return False + + def remove_if_exists(file_path: str) -> None: + try: + if os.path.isfile(file_path): + os.remove(file_path) + except Exception: + pass + + source_path = self.file_path + if post_processing_config and self.file_path: + self.file_path = get_post_processed_path(self.file_path, post_processing_config) + os.makedirs(os.path.dirname(self.file_path), exist_ok=True) if self.source_type == SourceType.LOCAL_FILE: if self.file_exists() and not self.errored: - self.downloaded = True + if is_image_valid(cast(str, self.file_path)): + self.downloaded = True + else: + logger.info( + f"Local file '{bold(self.name)}' appears to be corrupted at path:\n" + f"{bold(self.file_path)}\n" + ) + self.errored = True + elif post_processing_config and source_path and file_exists(source_path) and not self.errored: + with open(source_path, "rb") as f: + raw_image = f.read() + processed_image, icc_profile_bytes = post_process_image( + raw_image=raw_image, config=post_processing_config + ) + save_processed_image( + processed_image, + file_path=cast(str, self.file_path), + config=post_processing_config, + icc_profile_bytes=icc_profile_bytes, + ) + if is_image_valid(cast(str, self.file_path)): + self.downloaded = True + else: + logger.info( + f"Processed local file '{bold(self.name)}' appears to be corrupted at path:\n" + f"{bold(self.file_path)}\n" + ) + self.errored = True else: logger.info(f"Local file '{bold(self.name)}' does not exist at path:\n{bold(self.drive_id)}\n") elif self.source_type == SourceType.GOOGLE_DRIVE: - if not self.file_exists() and not self.errored and self.file_path is not None: - self.errored = not download_google_drive_file( - drive_id=self.drive_id, file_path=self.file_path, post_processing_config=post_processing_config - ) + if self.file_path is not None and not self.errored: + for attempt in range(2): + if not self.file_exists(): + self.errored = not download_google_drive_file( + drive_id=self.drive_id, + file_path=self.file_path, + post_processing_config=post_processing_config, + ) + if self.file_exists() and not self.errored and is_image_valid(cast(str, self.file_path)): + break + if self.file_exists(): + remove_if_exists(cast(str, self.file_path)) + logger.info( + f"Downloaded image '{bold(self.name)}' appears corrupted. Retrying download..." + ) + if attempt == 1: + self.errored = True if self.file_exists() and not self.errored: self.downloaded = True diff --git a/desktop-tool/src/pdf_maker.py b/desktop-tool/src/pdf_maker.py index 3c722748a..c14386b44 100644 --- a/desktop-tool/src/pdf_maker.py +++ b/desktop-tool/src/pdf_maker.py @@ -1,5 +1,10 @@ +import io import os +import shutil +import subprocess +import tempfile from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from typing import Optional import attr @@ -11,7 +16,84 @@ from src.formatting import bold from src.logging import logger from src.order import CardOrder -from src.processing import ImagePostProcessingConfig +from src.processing import ( + DTC_CARD_HEIGHT_INCHES, + DTC_CARD_WIDTH_INCHES, + ImagePostProcessingConfig, + calculate_dtc_target_pixel_size, + post_process_image, + save_processed_image, +) + + +@dataclass +class PdfXConversionConfig: + icc_profile_path: Optional[str] = None + ghostscript_path: Optional[str] = None + + +def _resolve_ghostscript_path(explicit_path: Optional[str]) -> Optional[str]: + if explicit_path: + return explicit_path + for candidate in ["gs", "gswin64c", "gswin32c"]: + if resolved := shutil.which(candidate): + return resolved + return None + + +def get_ghostscript_path(explicit_path: Optional[str] = None) -> Optional[str]: + return _resolve_ghostscript_path(explicit_path) + + +def get_ghostscript_version(gs_path: str) -> Optional[str]: + try: + result = subprocess.run([gs_path, "-version"], capture_output=True, text=True, check=False) + except Exception: + return None + version = result.stdout.strip().splitlines() + return version[0] if version else None + + +def convert_pdf_to_pdfx( + source_path: str, + output_path: str, + config: PdfXConversionConfig, +) -> bool: + gs_path = _resolve_ghostscript_path(config.ghostscript_path) + if not gs_path: + logger.warning("Ghostscript was not found. Skipping PDF/X-1a conversion.") + return False + + cmd = [ + gs_path, + "-dBATCH", + "-dNOPAUSE", + "-dNOSAFER", # Allow file system access for ICC profile and output + "-sDEVICE=pdfwrite", + "-dCompatibilityLevel=1.3", + "-dPDFX", + "-dPDFXNoTrimBox", + "-dDownsampleColorImages=false", + "-dDownsampleGrayImages=false", + "-dDownsampleMonoImages=false", + "-sProcessColorModel=DeviceCMYK", + "-sColorConversionStrategy=CMYK", + f"-sOutputFile={output_path}", + ] + if config.icc_profile_path: + cmd.append(f"-sOutputICCProfile={config.icc_profile_path}") + cmd.append(source_path) + + logger.debug(f"Ghostscript command: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logger.warning( + "Ghostscript failed to convert PDF/X-1a.\n" + f"stdout: {result.stdout}\n" + f"stderr: {result.stderr}" + ) + return False + return True @attr.s @@ -27,10 +109,14 @@ class PdfExporter: save_path: str = attr.ib(default="") separate_faces: bool = attr.ib(default=False) current_face: str = attr.ib(default="all") + export_mode: str = attr.ib(default="standard") + pdfx_config: Optional[PdfXConversionConfig] = attr.ib(default=None) + image_post_processing_config: Optional[ImagePostProcessingConfig] = attr.ib(default=None) manager: enlighten.Manager = attr.ib(init=False, default=attr.Factory(enlighten.get_manager)) status_bar: enlighten.StatusBar = attr.ib(init=False, default=False) download_bar: enlighten.Counter = attr.ib(init=False, default=None) processed_bar: enlighten.Counter = attr.ib(init=False, default=None) + saved_files: list[str] = attr.ib(init=False, factory=list) def configure_bars(self) -> None: num_images = len(self.order.fronts.cards_by_id) + len(self.order.backs.cards_by_id) @@ -52,7 +138,14 @@ def set_state(self, state: str) -> None: self.status_bar.refresh() def __attrs_post_init__(self) -> None: - self.ask_questions() + if self.export_mode == "drive_thru_cards": + # DriveThruCards Premium Euro Poker requires 2.73" x 3.71" with bleed + self.card_width_in_inches = DTC_CARD_WIDTH_INCHES + self.card_height_in_inches = DTC_CARD_HEIGHT_INCHES + self.separate_faces = False + self.number_of_cards_per_file = max(1, self.order.details.quantity) + else: + self.ask_questions() self.configure_bars() self.generate_file_path() @@ -104,18 +197,54 @@ def generate_pdf(self) -> None: def add_image(self, image_path: str) -> None: self.pdf.add_page() - self.pdf.image(image_path, x=0, y=0, w=self.card_width_in_inches, h=self.card_height_in_inches) + if self.export_mode == "drive_thru_cards" and self.image_post_processing_config: + with open(image_path, "rb") as f: + raw_image = f.read() + # post_process_image handles resizing to target_pixel_size (set in execute()) + # which ensures the correct DPI for the DTC card dimensions + processed_image, icc_profile_bytes = post_process_image( + raw_image=raw_image, config=self.image_post_processing_config + ) + # Save to a temporary file so fpdf embeds the JPEG data directly + # (passing BytesIO causes fpdf to re-encode with FlateDecode, bloating file size) + ext = ".jpg" if self.image_post_processing_config.output_format == "JPEG" else ".png" + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp_path = tmp.name + try: + save_processed_image( + processed_image, + file_path=tmp_path, + config=self.image_post_processing_config, + icc_profile_bytes=icc_profile_bytes, + ) + self.pdf.image( + tmp_path, + x=0, + y=0, + w=self.card_width_in_inches, + h=self.card_height_in_inches, + ) + finally: + # Clean up temp file + if os.path.exists(tmp_path): + os.unlink(tmp_path) + else: + self.pdf.image(image_path, x=0, y=0, w=self.card_width_in_inches, h=self.card_height_in_inches) - def save_file(self) -> None: + def save_file(self) -> str: extra = "" if self.separate_faces: extra = f"{self.current_face}/" - self.pdf.output(f"{self.save_path}{extra}{self.file_num}.pdf") + file_path = f"{self.save_path}{extra}{self.file_num}.pdf" + self.pdf.output(file_path) + self.saved_files.append(file_path) + return file_path def download_and_collect_images(self, post_processing_config: Optional[ImagePostProcessingConfig]) -> None: + download_config = None if self.export_mode == "drive_thru_cards" else post_processing_config with ThreadPoolExecutor(max_workers=THREADS) as pool: - self.order.fronts.download_images(pool, self.download_bar, post_processing_config) - self.order.backs.download_images(pool, self.download_bar, post_processing_config) + self.order.fronts.download_images(pool, self.download_bar, download_config) + self.order.backs.download_images(pool, self.download_bar, download_config) backs_by_slots = {} for card in self.order.backs.cards_by_id.values(): @@ -132,7 +261,15 @@ def download_and_collect_images(self, post_processing_config: Optional[ImagePost paths_by_slot[slot] = (str(backs_by_slots.get(slot, backs_by_slots[0])), str(fronts_by_slots[slot])) self.paths_by_slot = paths_by_slot - def execute(self, post_processing_config: Optional[ImagePostProcessingConfig]) -> None: + def execute(self, post_processing_config: Optional[ImagePostProcessingConfig]) -> list[str]: + if self.export_mode == "drive_thru_cards" and post_processing_config is not None: + # Calculate exact pixel dimensions for the target DPI at DTC card size (2.73" x 3.71") + post_processing_config.target_pixel_size = calculate_dtc_target_pixel_size( + post_processing_config.max_dpi + ) + # Embed DPI metadata so PDF tools correctly interpret the image resolution + post_processing_config.embed_dpi_metadata = True + self.image_post_processing_config = post_processing_config self.download_and_collect_images(post_processing_config=post_processing_config) if self.separate_faces: self.number_of_cards_per_file = 1 @@ -140,7 +277,17 @@ def execute(self, post_processing_config: Optional[ImagePostProcessingConfig]) - else: self.export() + if self.pdfx_config: + for file_path in list(self.saved_files): + pdfx_path = f"{os.path.splitext(file_path)[0]}_pdfx.pdf" + if convert_pdf_to_pdfx(file_path, pdfx_path, self.pdfx_config): + self.saved_files.append(pdfx_path) + logger.info(f"PDF/X-1a conversion succeeded: {pdfx_path}") + else: + logger.info(f"PDF/X-1a conversion failed for {file_path}. Using original PDF.") + logger.info(f"Finished exporting files! They should be accessible at {self.save_path}.") + return self.saved_files def export(self) -> None: for slot in sorted(self.paths_by_slot.keys()): diff --git a/desktop-tool/src/processing.py b/desktop-tool/src/processing.py index e5439c81c..86061c4ca 100644 --- a/desktop-tool/src/processing.py +++ b/desktop-tool/src/processing.py @@ -1,30 +1,150 @@ import io +import os from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, Tuple, Dict, Any from src.constants import DPI_HEIGHT_RATIO, ImageResizeMethods +from src.logging import logger if TYPE_CHECKING: from PIL import Image +# DriveThruCards physical card dimensions (Premium Euro Poker with bleed) +# DriveThruCards requires 2.73" x 3.71" which includes bleed area +DTC_CARD_WIDTH_INCHES = 2.73 +DTC_CARD_HEIGHT_INCHES = 3.71 +MM_PER_INCH = 25.4 + + +def calculate_dtc_target_pixel_size(target_dpi: int) -> Tuple[int, int]: + """ + Calculate the target pixel dimensions for DriveThruCards at the specified DPI. + Card size is 2.73" x 3.71" (Premium Euro Poker with bleed). + """ + width = max(1, round(DTC_CARD_WIDTH_INCHES * target_dpi)) + height = max(1, round(DTC_CARD_HEIGHT_INCHES * target_dpi)) + return (width, height) + @dataclass class ImagePostProcessingConfig: max_dpi: int downscale_alg: ImageResizeMethods - # jpeg: bool + output_format: Optional[str] = None + output_extension: Optional[str] = None + convert_to_cmyk: bool = False + icc_profile_path: Optional[str] = None + output_directory: Optional[str] = None + jpeg_quality: int = 95 + target_pixel_size: Optional[Tuple[int, int]] = None + embed_dpi_metadata: bool = False + + +def get_post_processed_path(file_path: str, config: ImagePostProcessingConfig) -> str: + directory = config.output_directory or os.path.dirname(file_path) + base_name = os.path.splitext(os.path.basename(file_path))[0] + extension = config.output_extension or os.path.splitext(file_path)[1] + return os.path.join(directory, f"{base_name}{extension}") + + +def _apply_color_processing( + img: "Image", config: ImagePostProcessingConfig +) -> Tuple["Image", Optional[bytes]]: + icc_profile_bytes = None + if config.convert_to_cmyk: + if img.mode in ("RGBA", "LA"): + img = img.convert("RGB") + elif img.mode not in ("RGB", "CMYK"): + img = img.convert("RGB") + if config.icc_profile_path: + try: + from PIL import ImageCms + + srgb = ImageCms.createProfile("sRGB") + cmyk_profile = ImageCms.getOpenProfile(config.icc_profile_path) + img = ImageCms.profileToProfile(img, srgb, cmyk_profile, outputMode="CMYK") + icc_profile_bytes = cmyk_profile.tobytes() + except Exception as exc: + logger.warning(f"Failed to apply ICC profile ({config.icc_profile_path}): {exc}") + img = img.convert("CMYK") + else: + img = img.convert("CMYK") + elif config.output_format and config.output_format.upper() == "JPEG": + if img.mode in ("RGBA", "LA"): + img = img.convert("RGB") + return img, icc_profile_bytes -def post_process_image(raw_image: bytes, config: ImagePostProcessingConfig) -> "Image": +def post_process_image(raw_image: bytes, config: ImagePostProcessingConfig) -> Tuple["Image", Optional[bytes]]: from PIL import Image img = Image.open(io.BytesIO(raw_image)) # downscale the image to `max_dpi` - img_dpi = 10 * round(int(img.height) * DPI_HEIGHT_RATIO / 10) - if img_dpi > config.max_dpi: - new_height = round((config.max_dpi / img_dpi) * img.height) - new_width = round((config.max_dpi / img_dpi) * img.width) - img = img.resize((new_width, new_height), config.downscale_alg.value) + if config.target_pixel_size: + target_width, target_height = config.target_pixel_size + if img.width != target_width or img.height != target_height: + # For DTC, force exact pixel size to guarantee 300 DPI at 2.73" x 3.71". + img = img.resize((target_width, target_height), config.downscale_alg.value) + else: + img_dpi = 10 * round(int(img.height) * DPI_HEIGHT_RATIO / 10) + if img_dpi > config.max_dpi: + new_height = round((config.max_dpi / img_dpi) * img.height) + new_width = round((config.max_dpi / img_dpi) * img.width) + img = img.resize((new_width, new_height), config.downscale_alg.value) + + img, icc_profile_bytes = _apply_color_processing(img, config) + return img, icc_profile_bytes + + +def save_processed_image( + img: "Image", + file_path: str, + config: ImagePostProcessingConfig, + icc_profile_bytes: Optional[bytes] = None, +) -> None: + # Remove XMP data if it's present in the image info to avoid "XMP data is too long" error. + # JPEG format has a 64KB limit for XMP metadata in a single APP1 segment. + if "xmp" in img.info: + img.info.pop("xmp") + + img.save(file_path, **_build_save_kwargs(config=config, icc_profile_bytes=icc_profile_bytes)) + + +def save_processed_image_to_bytes( + img: "Image", + config: ImagePostProcessingConfig, + icc_profile_bytes: Optional[bytes] = None, +) -> bytes: + # Remove XMP data if it's present in the image info to avoid "XMP data is too long" error. + if "xmp" in img.info: + img.info.pop("xmp") + + output = io.BytesIO() + img.save(output, **_build_save_kwargs(config=config, icc_profile_bytes=icc_profile_bytes)) + output.seek(0) + return output.read() + - return img +def _build_save_kwargs( + config: ImagePostProcessingConfig, + icc_profile_bytes: Optional[bytes], +) -> Dict[str, Any]: + save_kwargs: Dict[str, Any] = {} + if config.output_format: + save_kwargs["format"] = config.output_format + if config.output_format and config.output_format.upper() == "JPEG": + save_kwargs["quality"] = config.jpeg_quality + save_kwargs["subsampling"] = 0 + save_kwargs["optimize"] = True + if icc_profile_bytes: + save_kwargs["icc_profile"] = icc_profile_bytes + # Embed DPI metadata to ensure PDF tools correctly interpret the image resolution. + # This is critical for DriveThruCards where the target DPI must be 300. + if config.embed_dpi_metadata and config.target_pixel_size: + # Calculate DPI from target pixel size and DTC card dimensions + target_width, target_height = config.target_pixel_size + dpi_x = round(target_width / DTC_CARD_WIDTH_INCHES) + dpi_y = round(target_height / DTC_CARD_HEIGHT_INCHES) + save_kwargs["dpi"] = (dpi_x, dpi_y) + return save_kwargs diff --git a/desktop-tool/src/webdrivers.py b/desktop-tool/src/webdrivers.py index 179d11a9c..842b702cf 100644 --- a/desktop-tool/src/webdrivers.py +++ b/desktop-tool/src/webdrivers.py @@ -1,39 +1,147 @@ +import re +import subprocess import sys from typing import Optional -from selenium.webdriver import Chrome, Edge, Firefox -from selenium.webdriver.chrome.options import Options as ChromeOptions +import undetected_chromedriver as uc +from selenium.webdriver import Edge, Firefox from selenium.webdriver.chromium.options import ChromiumOptions from selenium.webdriver.chromium.webdriver import ChromiumDriver from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.firefox.options import Options as FirefoxOptions -def get_chrome_driver(headless: bool = False, binary_location: Optional[str] = None) -> Chrome: - options = ChromeOptions() +def _detect_chrome_version() -> Optional[int]: + """ + Detect the installed Chrome version by querying the browser. + Returns the major version number (e.g., 144) or None if detection fails. + """ + try: + if sys.platform == "darwin": + # macOS: Use the Chrome binary to get version + result = subprocess.run( + ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", "--version"], + capture_output=True, + text=True, + timeout=5, + ) + elif sys.platform == "win32": + # Windows: Query registry or use wmic + result = subprocess.run( + ["reg", "query", r"HKEY_CURRENT_USER\Software\Google\Chrome\BLBeacon", "/v", "version"], + capture_output=True, + text=True, + timeout=5, + ) + else: + # Linux: Use google-chrome binary + result = subprocess.run( + ["google-chrome", "--version"], + capture_output=True, + text=True, + timeout=5, + ) + + # Extract version number from output (e.g., "Google Chrome 144.0.7559.110") + match = re.search(r"(\d+)\.\d+\.\d+\.\d+", result.stdout) + if match: + return int(match.group(1)) + except Exception: + pass + return None + + +def _apply_stealth_scripts(driver: uc.Chrome) -> None: + """ + Apply additional stealth JavaScript to hide automation traces. + These patches help bypass bot detection on sites like DriveThruCards. + """ + stealth_js = """ + // Hide webdriver property + Object.defineProperty(navigator, 'webdriver', { + get: () => undefined + }); + + // Fix chrome object + window.chrome = { + runtime: {}, + loadTimes: function() {}, + csi: function() {}, + app: {} + }; + + // Fix permissions query + const originalQuery = window.navigator.permissions.query; + window.navigator.permissions.query = (parameters) => ( + parameters.name === 'notifications' ? + Promise.resolve({ state: Notification.permission }) : + originalQuery(parameters) + ); + + // Fix plugins to look more realistic + Object.defineProperty(navigator, 'plugins', { + get: () => [ + { name: 'Chrome PDF Plugin', filename: 'internal-pdf-viewer' }, + { name: 'Chrome PDF Viewer', filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai' }, + { name: 'Native Client', filename: 'internal-nacl-plugin' } + ] + }); + + // Fix languages + Object.defineProperty(navigator, 'languages', { + get: () => ['en-US', 'en'] + }); + + // Remove automation-related properties from window + delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array; + delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise; + delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol; + """ + driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": stealth_js}) + + +def get_chrome_driver( + headless: bool = False, + binary_location: Optional[str] = None, + remote_debugging_port: Optional[int] = None, +) -> uc.Chrome: + """ + Create a Chrome driver using undetected-chromedriver to bypass bot detection. + This automatically handles Cloudflare and similar anti-bot systems. + """ + options = uc.ChromeOptions() options.add_argument("--no-sandbox") options.add_argument("--log-level=3") options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-blink-features=AutomationControlled") if headless: options.add_argument("--headless=new") - options.add_experimental_option("excludeSwitches", ["enable-logging"]) - options.add_experimental_option("detach", True) + if remote_debugging_port is not None: + options.add_argument(f"--remote-debugging-port={remote_debugging_port}") if binary_location is not None: options.binary_location = binary_location - driver = Chrome(options=options) - driver.set_network_conditions(offline=False, latency=5, throughput=5 * 125000) + + # undetected-chromedriver handles stealth automatically + # Detect Chrome version since auto-detection can fail + version_main = _detect_chrome_version() + driver = uc.Chrome(options=options, version_main=version_main) + + # Apply additional stealth scripts + _apply_stealth_scripts(driver) + return driver -def get_brave_driver(headless: bool = False, binary_location: Optional[str] = None) -> Chrome: - options = ChromeOptions() +def get_brave_driver(headless: bool = False, binary_location: Optional[str] = None) -> uc.Chrome: + """ + Create a Brave driver using undetected-chromedriver. + """ + options = uc.ChromeOptions() options.add_argument("--no-sandbox") options.add_argument("--log-level=3") options.add_argument("--disable-dev-shm-usage") if headless: options.add_argument("--headless=new") - options.add_experimental_option("excludeSwitches", ["enable-logging"]) - options.add_experimental_option("detach", True) # the binary location for brave must be manually specified (otherwise chrome will open instead) if binary_location is not None: @@ -50,24 +158,36 @@ def get_brave_driver(headless: bool = False, binary_location: Optional[str] = No ) options.binary_location = default_binary_locations[sys.platform] - driver = Chrome(options=options) - driver.set_network_conditions(offline=False, latency=5, throughput=5 * 125000) + driver = uc.Chrome(options=options) return driver def get_edge_driver(headless: bool = False, binary_location: Optional[str] = None) -> ChromiumDriver: + """ + Create an Edge driver with stealth options. + Note: Edge doesn't have an undetected variant, so we use standard stealth measures. + """ options: ChromiumOptions = EdgeOptions() options.add_argument("--no-sandbox") options.add_argument("--log-level=3") options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-blink-features=AutomationControlled") if headless: options.add_argument("--headless=new") - options.add_experimental_option("excludeSwitches", ["enable-logging"]) + options.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"]) + options.add_experimental_option("useAutomationExtension", False) options.add_experimental_option("detach", True) if binary_location is not None: options.binary_location = binary_location driver: ChromiumDriver = Edge(options=options) # type: ignore - driver.set_network_conditions(offline=False, latency=5, throughput=5 * 125000) + # Apply CDP stealth for Edge + driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { + "source": """ + Object.defineProperty(navigator, 'webdriver', { + get: () => undefined + }); + """ + }) return driver diff --git a/desktop-tool/tests/test_desktop_tool.py b/desktop-tool/tests/test_desktop_tool.py index 3e34f73e5..286930b5a 100644 --- a/desktop-tool/tests/test_desktop_tool.py +++ b/desktop-tool/tests/test_desktop_tool.py @@ -27,7 +27,8 @@ Details, aggregate_and_split_orders, ) -from src.pdf_maker import PdfExporter +import autofill as autofill_cli +from src.pdf_maker import PdfExporter, get_ghostscript_version from src.processing import ImagePostProcessingConfig DEFAULT_POST_PROCESSING = ImagePostProcessingConfig(max_dpi=800, downscale_alg=constants.ImageResizeMethods.LANCZOS) @@ -70,6 +71,102 @@ def assert_file_size(file_path: str, size: int) -> None: assert os.stat(file_path).st_size == size, f"File size {os.stat(file_path).st_size} does not match {size}" +# endregion + +# region Ghostscript + + +def test_get_ghostscript_version_reads_stdout(monkeypatch: pytest.MonkeyPatch) -> None: + class Result: + def __init__(self) -> None: + self.stdout = "10.02.1\n" + + def fake_run(*_args, **_kwargs): + return Result() + + monkeypatch.setattr("src.pdf_maker.subprocess.run", fake_run) + assert get_ghostscript_version("gs") == "10.02.1" + + +def test_ensure_ghostscript_available_prompts_until_found( + monkeypatch: pytest.MonkeyPatch, input_enter +) -> None: + paths = [None, "/usr/local/bin/gs"] + called = {"version": 0} + + def fake_get_path(): + return paths.pop(0) + + def fake_get_version(_path: str) -> str: + called["version"] += 1 + return "10.0.0" + + monkeypatch.setattr(autofill_cli, "get_ghostscript_path", fake_get_path) + monkeypatch.setattr(autofill_cli, "get_ghostscript_version", fake_get_version) + + assert autofill_cli.ensure_ghostscript_available() == "/usr/local/bin/gs" + assert called["version"] == 1 + + +# endregion + +# region PDF/X conversion + + +def test_pdf_exporter_appends_pdfx_on_success(monkeypatch: pytest.MonkeyPatch, card_order_valid) -> None: + def do_nothing(_): + return None + + def fake_convert_pdf_to_pdfx(source_path: str, output_path: str, _config) -> bool: + with open(output_path, "wb") as f: + f.write(b"pdfx") + return True + + monkeypatch.setattr("src.pdf_maker.PdfExporter.ask_questions", do_nothing) + monkeypatch.setattr("src.pdf_maker.convert_pdf_to_pdfx", fake_convert_pdf_to_pdfx) + + card_order_valid.name = "test_order.xml" + pdf_exporter = PdfExporter( + order=card_order_valid, + number_of_cards_per_file=1, + pdfx_config=autofill_cli.PdfXConversionConfig(icc_profile_path="dummy.icc"), + ) + generated_files = pdf_exporter.execute(post_processing_config=DEFAULT_POST_PROCESSING) + + expected_pdfx_files = [ + "export/test_order/1_pdfx.pdf", + "export/test_order/2_pdfx.pdf", + "export/test_order/3_pdfx.pdf", + ] + for file_path in expected_pdfx_files: + assert file_path in generated_files + assert os.path.exists(file_path) + + remove_files([path for path in generated_files if path.endswith(".pdf")]) + remove_directories(["export/test_order", "export"]) + + +def test_pdf_exporter_skips_pdfx_on_failure(monkeypatch: pytest.MonkeyPatch, card_order_valid) -> None: + def do_nothing(_): + return None + + monkeypatch.setattr("src.pdf_maker.PdfExporter.ask_questions", do_nothing) + monkeypatch.setattr("src.pdf_maker.convert_pdf_to_pdfx", lambda *_args, **_kwargs: False) + + card_order_valid.name = "test_order.xml" + pdf_exporter = PdfExporter( + order=card_order_valid, + number_of_cards_per_file=1, + pdfx_config=autofill_cli.PdfXConversionConfig(icc_profile_path="dummy.icc"), + ) + generated_files = pdf_exporter.execute(post_processing_config=DEFAULT_POST_PROCESSING) + + assert not any(path.endswith("_pdfx.pdf") for path in generated_files) + + remove_files([path for path in generated_files if path.endswith(".pdf")]) + remove_directories(["export/test_order", "export"]) + + # endregion # region constants