From bc593a9dde7baf5cee74f631aaa39214ae4959ac Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 8 Aug 2025 03:49:42 +0000 Subject: [PATCH] Add X automation scripts for login and interaction with X platform Co-authored-by: cuteyw --- x_automation/like_and_reply_elon.py | 82 +++++++++++++++ x_automation/requirements.txt | 2 + x_automation/x_login_save_state.py | 158 ++++++++++++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 x_automation/like_and_reply_elon.py create mode 100644 x_automation/requirements.txt create mode 100644 x_automation/x_login_save_state.py diff --git a/x_automation/like_and_reply_elon.py b/x_automation/like_and_reply_elon.py new file mode 100644 index 0000000..a4954df --- /dev/null +++ b/x_automation/like_and_reply_elon.py @@ -0,0 +1,82 @@ +import os +from playwright.sync_api import sync_playwright + +STORAGE_STATE_PATH = os.environ.get( + "X_STORAGE_STATE", + os.path.join(os.path.dirname(__file__), "x_storage_state.json"), +) +TARGET_PROFILE = os.environ.get("X_TARGET_PROFILE", "elonmusk") +REPLY_TEXT = os.environ.get("X_REPLY_TEXT", "nice") + + +def main() -> int: + if not os.path.exists(STORAGE_STATE_PATH): + print( + f"Storage state '{STORAGE_STATE_PATH}' not found. Run 'python x_login_save_state.py' first to log in." + ) + return 2 + + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + context = browser.new_context( + storage_state=STORAGE_STATE_PATH, viewport={"width": 1280, "height": 900} + ) + page = context.new_page() + + # Go to profile + page.goto(f"https://x.com/{TARGET_PROFILE}", wait_until="networkidle", timeout=90000) + + # Try dismiss cookie/consent prompts if they appear (best-effort) + try: + consent = page.locator('div[role="button"]:has-text("Accept")').first + if consent.is_visible(): + consent.click() + except Exception: + pass + + # Wait for first tweet/article + first_tweet = page.locator('article[data-testid="tweet"]').first + first_tweet.wait_for(state="visible", timeout=90000) + + # Like if not already liked + try: + like_button = first_tweet.locator('div[data-testid="like"]').first + if like_button.count() > 0: + like_button.click() + print("Liked the first post.") + else: + print("Like button not found (possibly already liked). Skipping like.") + except Exception as e: + print(f"Failed to like: {e}") + + # Reply 'nice' + try: + reply_button = first_tweet.locator('div[data-testid="reply"]').first + reply_button.click() + + composer = page.locator('div[role="dialog"] div[role="textbox"]').first + composer.wait_for(state="visible", timeout=30000) + composer.fill(REPLY_TEXT) + + tweet_button = page.locator('div[role="dialog"] div[data-testid="tweetButton"]').first + if tweet_button.count() == 0: + tweet_button = page.locator('div[role="dialog"] div[data-testid="tweetButtonInline"]').first + tweet_button.click() + + # Wait for dialog to close as a success heuristic + page.locator('div[role="dialog"]').first.wait_for(state="hidden", timeout=30000) + print(f"Replied '{REPLY_TEXT}' to the first post.") + except Exception as e: + print(f"Failed to reply: {e}") + + # Persist updated auth/session + context.storage_state(path=STORAGE_STATE_PATH) + context.close() + browser.close() + return 0 + + +if __name__ == "__main__": + import sys + + sys.exit(main()) \ No newline at end of file diff --git a/x_automation/requirements.txt b/x_automation/requirements.txt new file mode 100644 index 0000000..8c93fbf --- /dev/null +++ b/x_automation/requirements.txt @@ -0,0 +1,2 @@ +playwright>=1.43,<2 +pyotp>=2.8 \ No newline at end of file diff --git a/x_automation/x_login_save_state.py b/x_automation/x_login_save_state.py new file mode 100644 index 0000000..af1fc29 --- /dev/null +++ b/x_automation/x_login_save_state.py @@ -0,0 +1,158 @@ +import os +import time +from typing import Optional +from playwright.sync_api import sync_playwright, Page + +try: + import pyotp # type: ignore +except Exception: + pyotp = None + +STORAGE_STATE_PATH = os.environ.get( + "X_STORAGE_STATE", + os.path.join(os.path.dirname(__file__), "x_storage_state.json"), +) +LOGIN_TIMEOUT_SEC = int(os.environ.get("X_LOGIN_TIMEOUT_SEC", "180")) +HEADLESS = os.environ.get("X_HEADLESS", "1") not in ["0", "false", "False"] +X_USERNAME = os.environ.get("X_USERNAME", "").strip() +X_PASSWORD = os.environ.get("X_PASSWORD", "").strip() +X_TOTP_SECRET = os.environ.get("X_TOTP_SECRET", "").strip() + + +def is_logged_in(page: Page) -> bool: + try: + locator = page.locator('div[data-testid="SideNav_AccountSwitcher_Button"]').first + return locator.is_visible() + except Exception: + return False + + +def fill_if_visible(page: Page, selector: str, value: str) -> bool: + try: + locator = page.locator(selector).first + if locator.is_visible(): + locator.click() + locator.fill(value) + return True + except Exception: + pass + return False + + +def click_if_visible(page: Page, selector: str) -> bool: + try: + locator = page.locator(selector).first + if locator.is_visible(): + locator.click() + return True + except Exception: + pass + return False + + +def attempt_automated_login(page: Page) -> bool: + # Go directly to the login flow + page.goto("https://x.com/i/flow/login", wait_until="domcontentloaded") + + # Step 1: username/email/phone + page.wait_for_timeout(800) + filled = fill_if_visible(page, 'input[name="text"]', X_USERNAME) + if not filled: + # Sometimes input is within a dialog + filled = fill_if_visible(page, 'div[role="dialog"] input[name="text"]', X_USERNAME) + if not filled: + return False + + # Click Next + clicked = ( + click_if_visible(page, 'div[role="button"]:has-text("Next")') + or click_if_visible(page, 'div[data-testid="LoginForm_Login_Button"]') + ) + if not clicked: + return False + + page.wait_for_timeout(800) + + # Some accounts are asked for username again + if page.locator('input[name="text"]').first.is_visible(): + fill_if_visible(page, 'input[name="text"]', X_USERNAME) + click_if_visible(page, 'div[role="button"]:has-text("Next")') + page.wait_for_timeout(800) + + # Step 2: password + page.wait_for_timeout(800) + filled = fill_if_visible(page, 'input[name="password"]', X_PASSWORD) + if not filled: + return False + + # Click Log in + clicked = ( + click_if_visible(page, 'div[data-testid="LoginForm_Login_Button"]') + or click_if_visible(page, 'div[role="button"]:has-text("Log in")') + ) + if not clicked: + return False + + # Optional: TOTP 2FA + if X_TOTP_SECRET and pyotp is not None: + # Give time for 2FA prompt to appear + page.wait_for_timeout(1500) + try: + code_input = page.locator('input[name="text"]').first + if code_input.is_visible(): + otp = pyotp.TOTP(X_TOTP_SECRET).now() + code_input.fill(otp) + click_if_visible(page, 'div[role="button"]:has-text("Next")') + except Exception: + pass + + # Wait for home/feed to be visible as a success criterion + deadline = time.time() + 30 + while time.time() < deadline: + page.wait_for_timeout(1000) + if is_logged_in(page): + return True + return False + + +def main() -> int: + with sync_playwright() as p: + browser = p.chromium.launch(headless=HEADLESS) + context = browser.new_context(viewport={"width": 1280, "height": 900}) + page = context.new_page() + + if X_USERNAME and X_PASSWORD: + print("Attempting automated login with provided credentials (headless=%s)..." % HEADLESS) + ok = attempt_automated_login(page) + if not ok: + print("Automated login did not complete. Exiting with failure.") + context.close() + browser.close() + return 1 + else: + print( + "Environment variables X_USERNAME/X_PASSWORD not provided. This script requires them to run headless here." + ) + print("Set X_USERNAME, X_PASSWORD, and optional X_TOTP_SECRET, then rerun.") + context.close() + browser.close() + return 2 + + if not is_logged_in(page): + print("Login not detected after flow. Exiting.") + context.close() + browser.close() + return 1 + + context.storage_state(path=STORAGE_STATE_PATH) + print(f"Saved session to {STORAGE_STATE_PATH}") + + context.close() + browser.close() + return 0 + + +if __name__ == "__main__": + import sys + + sys.exit(main()) \ No newline at end of file