diff --git a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py index bfca34b7..c59268ac 100644 --- a/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Playwright/BuiltInFunctions.py @@ -77,7 +77,7 @@ def connect_selenium_to_playwright(port=9222): sr.Set_Shared_Variables("selenium_driver", driver) CommonUtil.ExecLog("connect_selenium_to_playwright", "Connected Selenium to Playwright", 1) - return "passed" + return driver except Exception as e: CommonUtil.ExecLog("connect_selenium_to_playwright", f"Failed to connect Selenium to Playwright: {e}", 3) @@ -112,6 +112,69 @@ def connect_selenium_to_playwright(port=9222): # # ######################### +def _handle_playwright_session(step_data): + """ + Helper function to handle session parameter for Playwright actions. + + Args: + step_data: The step data containing potential session parameter + + Returns: + tuple: (session_name, current_page, current_page_id, context, browser) + - session_name: The session name found or None + - current_page: The appropriate page instance + - current_page_id: The current page ID + - context: The browser context + - browser: The browser instance + """ + global current_page, current_page_id, context, browser + + # Parse session parameter + session_name = None + for left, mid, right in step_data: + left_l = left.strip().lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if mid_l == "optional parameter" and left_l == "session": + session_name = right_v + break + + # If session parameter is provided, switch to that session + if session_name: + from Framework.Built_In_Automation.Web.utils import get_browser_session + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("playwright_page"): + # Session exists, use existing browser + current_page = existing_session["playwright_page"] + context = existing_session["playwright_context"] + browser = existing_session["playwright_browser"] + current_page_id = session_name + + # Update globals + globals().update({ + 'current_page': current_page, + 'context': context, + 'browser': browser, + 'current_page_id': current_page_id + }) + + # Update shared variables + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) + else: + # Session doesn't exist + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Using current browser.", 2) + + return session_name, current_page, current_page_id, context, browser + + @logger async def Open_Browser(step_data): """ @@ -168,8 +231,6 @@ async def Open_Browser(step_data): url = right_v elif left_l in ("browser", "browser name"): browser_name = right_v.lower() - elif left_l in ("driver id", "page id", "driver tag"): - page_id = right_v elif mid_l == "optional parameter": if left_l == "headless": @@ -199,6 +260,8 @@ async def Open_Browser(step_data): color_scheme = right_v elif left_l == "permission": permissions.append(right_v) + elif left_l in ("driver id", "page id", "driver tag", "session"): + page_id = right_v elif mid_l == "shared capability": # Handle Selenium-style capabilities where possible @@ -220,8 +283,13 @@ async def Open_Browser(step_data): "devtools": devtools, } - # Add remote debugging port for CDP connection - all_args = args + ["--remote-debugging-port=9222"] + # Add remote debugging port for CDP connection with unique port per session + import hashlib + # Generate unique port based on page_id (range 9222-9322 to avoid conflicts) + port_hash = int(hashlib.md5(page_id.encode()).hexdigest(), 16) + unique_port = 9222 + (port_hash % 100) + all_args = args + [f"--remote-debugging-port={unique_port}"] + CommonUtil.ExecLog(sModuleInfo, f"Using remote debugging port {unique_port} for session '{page_id}'", 1) if all_args: launch_options["args"] = all_args if downloads_path: @@ -293,7 +361,20 @@ async def Open_Browser(step_data): CommonUtil.set_screenshot_vars(sr.Shared_Variable_Export()) # Connect Selenium to Playwright via CDP - connect_selenium_to_playwright(port=9222) + selenium_driver = connect_selenium_to_playwright(port=unique_port) + + # Create browser session + from Framework.Built_In_Automation.Web.utils import create_browser_session + + create_browser_session( + session_name=page_id, + selenium_driver=selenium_driver, + playwright_page=current_page, + playwright_browser=browser, + playwright_context=context, + playwright_frame=None + ) + CommonUtil.ExecLog(sModuleInfo, f"Created browser session: {page_id}", 5) CommonUtil.ExecLog(sModuleInfo, f"Browser opened successfully (page_id: {page_id})", 1) return "passed" @@ -319,7 +400,59 @@ async def Go_To_Link(step_data): global current_page try: - if current_page is None: + # Parse session parameter first + session_name = None + for left, mid, right in step_data: + left_l = left.strip().lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if mid_l == "optional parameter" and left_l == "session": + session_name = right_v + break + + # Check if session exists and use it + if session_name: + from Framework.Built_In_Automation.Web.utils import get_browser_session + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("playwright_page"): + # Session exists, use existing browser + current_page = existing_session["playwright_page"] + context = existing_session["playwright_context"] + browser = existing_session["playwright_browser"] + current_page_id = session_name + + # Update globals + globals().update({ + 'current_page': current_page, + 'context': context, + 'browser': browser, + 'current_page_id': current_page_id + }) + + # Update shared variables + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + + CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) + else: + # Session doesn't exist, open new browser with session name + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Opening new browser.", 2) + + # Add session parameter to step_data for Open_Browser + step_data_with_session = step_data.copy() + if not any(left.strip().lower() == "session" and mid.strip().lower() == "optional parameter" for left, mid, right in step_data_with_session): + step_data_with_session.append(("session", "optional parameter", session_name)) + + result = await Open_Browser(step_data_with_session) + if result == "zeuz_failed": + CommonUtil.ExecLog(sModuleInfo, "Failed to open browser for new session", 3) + return "zeuz_failed" + + elif current_page is None: + # No session specified and no browser open CommonUtil.ExecLog(sModuleInfo, "No browser open. Opening browser with default settings.", 2) result = await Open_Browser(step_data) if result == "zeuz_failed": @@ -338,7 +471,10 @@ async def Go_To_Link(step_data): if left_l in ("go to link", "url", "link"): url = right_v elif mid_l == "optional parameter": - if left_l in ("wait until", "wait_until", "waituntil", "wait time"): + if left_l == "session": + # Skip session parameter - already processed above + continue + elif left_l in ("wait until", "wait_until", "waituntil", "wait time"): wait_until = right_v.lower() elif left_l == "timeout": timeout = int(float(right_v) * 1000) @@ -371,57 +507,136 @@ async def Tear_Down_Playwright(step_data=None): Example: Field Sub Field Value tear down playwright action tear down + + Example with session: + Field Sub Field Value + session optional parameter my_session + tear down playwright action tear down """ sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global playwright_instance, browser, context, current_page global playwright_details, current_page_id try: - # Close all tracked pages/contexts - for page_id, details in playwright_details.items(): + # Parse session parameter + session_name = None + if step_data: + for left, mid, right in step_data: + left_l = left.strip().lower() + mid_l = mid.strip().lower() + right_v = right.strip() + + if mid_l == "optional parameter" and left_l == "session": + session_name = right_v + break + + # Handle session-specific teardown + if session_name: + from Framework.Built_In_Automation.Web.utils import get_browser_session + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("playwright_page"): + try: + # Close the specific session's page and context + session_page = existing_session["playwright_page"] + session_context = existing_session["playwright_context"] + session_browser = existing_session["playwright_browser"] + + if session_page: + await session_page.close() + if session_context: + await session_context.close() + + CommonUtil.ExecLog(sModuleInfo, f"Teared down session '{session_name}'", 1) + except Exception as e: + errMsg = f"Unable to tear down session '{session_name}'. may already been killed" + CommonUtil.ExecLog(sModuleInfo, errMsg, 2) + + # Remove session from browser_sessions + browser_sessions = sr.Get_Shared_Variables("browser_sessions", {}) + if session_name in browser_sessions: + del browser_sessions[session_name] + sr.Set_Shared_Variables("browser_sessions", browser_sessions) + + # Remove from playwright_details if present + if session_name in playwright_details: + del playwright_details[session_name] + + # If this was the current session, clear globals + if current_page_id == session_name: + current_page = None + context = None + browser = None + current_page_id = None + + # Try to switch to another available session + if playwright_details: + for page_id, details in playwright_details.items(): + current_page = details["page"] + context = details["context"] + browser = details["browser"] + current_page_id = page_id + + # Update shared variables + sr.Set_Shared_Variables("playwright_page", current_page) + sr.Set_Shared_Variables("playwright_context", context) + sr.Set_Shared_Variables("playwright_browser", browser) + + CommonUtil.ExecLog(sModuleInfo, f"Switched to session '{page_id}'", 1) + break + else: + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Nothing to tear down.", 2) + + # Handle full teardown (backwards compatibility) + else: + # Close all tracked pages/contexts + for page_id, details in playwright_details.items(): + try: + if details.get("page"): + await details["page"].close() + if details.get("context"): + await details["context"].close() + except Exception: + pass + + # Close main instances try: - if details.get("page"): - await details["page"].close() - if details.get("context"): - await details["context"].close() + if current_page and current_page not in [d.get("page") for d in playwright_details.values()]: + await current_page.close() except Exception: pass - # Close main instances - try: - if current_page and current_page not in [d.get("page") for d in playwright_details.values()]: - await current_page.close() - except Exception: - pass - - try: - if context: - await context.close() - except Exception: - pass - - try: - if browser: - await browser.close() - except Exception: - pass + try: + if context: + await context.close() + except Exception: + pass - try: - if playwright_instance: - await playwright_instance.stop() - except Exception: - pass + try: + if browser: + await browser.close() + except Exception: + pass - # Reset all globals - current_page = None - context = None - browser = None - playwright_instance = None - playwright_details = {} - current_page_id = None + try: + if playwright_instance: + await playwright_instance.stop() + except Exception: + pass - CommonUtil.ExecLog(sModuleInfo, "Browser closed successfully", 1) - return "passed" + # Reset all globals + current_page = None + context = None + browser = None + playwright_instance = None + playwright_details = {} + current_page_id = None + + # Clear all browser sessions + sr.Set_Shared_Variables("browser_sessions", {}) + + CommonUtil.ExecLog(sModuleInfo, "Browser closed successfully", 1) + return "passed" except Exception: return CommonUtil.Exception_Handler(sys.exc_info()) @@ -517,6 +732,9 @@ async def Click_Element(step_data): global current_page try: + # Handle session parameter + session_name, current_page, current_page_id, context, browser = _handle_playwright_session(step_data) + if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" @@ -536,6 +754,10 @@ async def Click_Element(step_data): mid_l = mid.strip().lower() right_v = right.strip() + # Skip session parameter - already handled above + if mid_l == "optional parameter" and left_l == "session": + continue + if mid_l == "optional parameter": if left_l == "use js": use_js = right_v.lower() in ("true", "yes", "1") @@ -723,6 +945,9 @@ async def Enter_Text_In_Text_Box(step_data): global current_page try: + # Handle session parameter + session_name, current_page, current_page_id, context, browser = _handle_playwright_session(step_data) + if current_page is None: CommonUtil.ExecLog(sModuleInfo, "No browser open", 3) return "zeuz_failed" @@ -737,6 +962,10 @@ async def Enter_Text_In_Text_Box(step_data): left_l = left.strip().lower() mid_l = mid.strip().lower() + # Skip session parameter - already handled above + if mid_l == "optional parameter" and left_l == "session": + continue + if mid_l == "action": text_value = right # Don't strip - preserve whitespace elif mid_l == "optional parameter": diff --git a/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py b/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py index 49b578b8..2772f25a 100644 --- a/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py +++ b/Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py @@ -72,6 +72,7 @@ from .utils import ChromeForTesting, ChromeExtensionDownloader from playwright.async_api import async_playwright +from Framework.Built_In_Automation.Web.utils import get_browser_session, create_browser_session ######################### # # @@ -149,6 +150,50 @@ ReturnType = Literal["passed", "zeuz_failed"] +def _handle_selenium_session(step_data): + """ + Helper function to handle session parameter for Selenium actions. + + Args: + step_data: The step data containing potential session parameter + + Returns: + tuple: (session_name, selenium_driver, current_driver_id) + - session_name: The session name found or None + - selenium_driver: The appropriate selenium driver instance + - current_driver_id: The current driver ID + """ + global selenium_driver, current_driver_id + + # Parse session parameter + session_name = None + for left, mid, right in step_data: + left = left.replace(" ", "").replace("_", "").replace("-", "").lower() + if left == "session" and mid.strip().lower() == "optional parameter": + session_name = right.strip() + break + + # If session parameter is provided, switch to that session + if session_name: + from Framework.Built_In_Automation.Web.utils import get_browser_session + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("selenium_driver"): + # Session exists, use existing driver + selenium_driver = existing_session["selenium_driver"] + current_driver_id = session_name + Shared_Resources.Set_Shared_Variables("selenium_driver", selenium_driver) + + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Using existing browser session: {session_name}", 1) + else: + # Session doesn't exist + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Using current browser.", 2) + + return session_name, selenium_driver, current_driver_id + + class DefaultChromiumArguments(TypedDict): add_argument: list[str] add_experimental_option: dict[str, Any] @@ -466,7 +511,12 @@ def Open_Electron_App(data_set): opts = Options() opts.binary_location = desktop_app_path - opts.add_argument("--remote-debugging-port=9222") + # Generate unique port for Electron app based on driver_id + import hashlib + port_hash = int(hashlib.md5((driver_id or "electron").encode()).hexdigest(), 16) + electron_port = 9230 + (port_hash % 90) # Range 9230-9320 to avoid conflicts with browser sessions + opts.add_argument(f"--remote-debugging-port={electron_port}") + CommonUtil.ExecLog(sModuleInfo, f"Using remote debugging port {electron_port} for Electron app", 1) # service = Service(executable_path=electron_chrome_path) arch = platform.machine().lower() if platform.system() == "Darwin" and arch == "arm64": @@ -671,11 +721,11 @@ async def connect_playwright_to_selenium(port=9222): Shared_Resources.Set_Shared_Variables("playwright_browser", browser) Shared_Resources.Set_Shared_Variables("playwright_page", page) - return "passed" + return browser, context, page @logger -async def Open_Browser(browser, browser_options: BrowserOptions): +async def Open_Browser(browser, browser_options: BrowserOptions, session_name: str = "default"): """Launch browser from options and service object""" try: global selenium_driver @@ -705,8 +755,13 @@ async def Open_Browser(browser, browser_options: BrowserOptions): options = generate_options(browser, browser_options) - # Enable remote debugging / CDP - options.add_argument("--remote-debugging-port=9222") + # Enable remote debugging / CDP with unique port per session + import hashlib + # Generate unique port based on session name (range 9222-9322 to avoid conflicts) + port_hash = int(hashlib.md5(session_name.encode()).hexdigest(), 16) + unique_port = 9222 + (port_hash % 100) + options.add_argument(f"--remote-debugging-port={unique_port}") + CommonUtil.ExecLog(sModuleInfo, f"Using remote debugging port {unique_port} for session '{session_name}'", 1) if browser in ("android", "chrome", "chromeheadless"): from selenium.webdriver.chrome.service import Service @@ -777,12 +832,29 @@ async def Open_Browser(browser, browser_options: BrowserOptions): ) return "zeuz_failed" - # Connect Playwright to Selenium via CDP - try: - await connect_playwright_to_selenium(port=9222) - CommonUtil.ExecLog(sModuleInfo, "Connected Playwright to Selenium", 1) - except Exception as e: - CommonUtil.ExecLog(sModuleInfo, f"Failed to connect Playwright to Selenium: {e}", 3) + # If selenium_driver is of type Webdriver + from selenium.webdriver import Chrome, Firefox, Edge, Safari + if isinstance(selenium_driver, (Chrome, Firefox, Edge, Safari)): + # Connect Playwright to Selenium via CDP + playwright_browser = None + playwright_context = None + playwright_page = None + try: + playwright_browser, playwright_context, playwright_page = await connect_playwright_to_selenium(port=unique_port) + CommonUtil.ExecLog(sModuleInfo, "Connected Playwright to Selenium", 1) + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, f"Failed to connect Playwright to Selenium: {e}", 3) + + # Create browser session + create_browser_session( + session_name=session_name, + selenium_driver=selenium_driver, + playwright_page=playwright_page, + playwright_browser=playwright_browser, + playwright_context=playwright_context, + playwright_frame=None + ) + CommonUtil.ExecLog(sModuleInfo, f"Created browser session: {session_name=}", 5) CommonUtil.ExecLog(sModuleInfo, f"Started {browser} browser", 1) Shared_Resources.Set_Shared_Variables("selenium_driver", selenium_driver) @@ -1018,13 +1090,14 @@ async def Go_To_Link(dataset: Dataset) -> ReturnType: chrome_version = None chrome_channel = None debug_port = False + session_name = "default" for left, mid, right in dataset: left = left.replace(" ", "").replace("_", "").replace("-", "").lower() if left == "gotolink": web_link = right.strip() elif left == "driverid": - driver_id = right.strip() + session_name = driver_id = right.strip() elif left in ("waittimetoappearelement", "waitforelement"): Shared_Resources.Set_Shared_Variables( "element_wait", float(right.strip()) @@ -1037,6 +1110,8 @@ async def Go_To_Link(dataset: Dataset) -> ReturnType: window_size_Y = int(resolution[1]) elif left == "chrome:version": chrome_version = right.strip() + elif left == "session": + session_name = driver_id = right.strip() # Capabilities are WebDriver attribute common across different browser elif mid.strip().lower() == "shared capability": @@ -1145,9 +1220,11 @@ async def Go_To_Link(dataset: Dataset) -> ReturnType: sModuleInfo, "Browser not previously opened, doing so now", 1 ) - if await Open_Browser(dependency["Browser"], browser_options) == "zeuz_failed": + if await Open_Browser(dependency["Browser"], browser_options, session_name) == "zeuz_failed": return "zeuz_failed" + selenium_driver = get_browser_session(session_name)["selenium_driver"] + if ConfigModule.get_config_value( "RunDefinition", "window_size_x" ) and ConfigModule.get_config_value("RunDefinition", "window_size_y"): @@ -1172,7 +1249,7 @@ async def Go_To_Link(dataset: Dataset) -> ReturnType: "remote-debugging-port": debug_port } else: - selenium_driver = selenium_details[driver_id]["driver"] + selenium_driver = get_browser_session(session_name)["selenium_driver"] Shared_Resources.Set_Shared_Variables("selenium_driver", selenium_driver) current_driver_id = driver_id except Exception: @@ -1219,7 +1296,7 @@ async def Go_To_Link(dataset: Dataset) -> ReturnType: # If the browser is closed but selenium instance is on, relaunch selenium_driver if Shared_Resources.Test_Shared_Variables("dependency"): dependency = Shared_Resources.Get_Shared_Variables("dependency") - result = await Open_Browser(dependency["Browser"], browser_options) + result = await Open_Browser(dependency["Browser"], browser_options, session_name) else: result = "zeuz_failed" @@ -1612,6 +1689,10 @@ def Enter_Text_In_Text_Box(step_data): use_js = False clear = True global selenium_driver + + # Handle session parameter + session_name, selenium_driver, current_driver_id = _handle_selenium_session(step_data) + Element = LocateElement.Get_Element(step_data, selenium_driver) if Element == "zeuz_failed": CommonUtil.ExecLog( @@ -1621,6 +1702,9 @@ def Enter_Text_In_Text_Box(step_data): for left, mid, right in step_data: mid = mid.strip().lower() left = left.strip().lower() + # Skip session parameter - already handled above + if left == "session" and mid == "optional parameter": + continue if mid == "action": text_value = right elif left == "delay": @@ -2055,8 +2139,14 @@ def Click_Element(data_set, retry=0): global selenium_driver use_js = False # Use js to click on element? try: + # Handle session parameter + session_name, selenium_driver, current_driver_id = _handle_selenium_session(data_set) + location = "" for row in data_set: + # Skip session parameter - already handled above + if row[0].lower().replace(" ", "").replace("_", "").replace("-", "") == "session" and row[1].strip().lower() == "optional parameter": + continue if row[0] == "offset" and row[1] == "optional parameter": location = row[ 2 @@ -3002,6 +3092,9 @@ def Validate_Text(step_data): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global selenium_driver try: + # Handle session parameter + session_name, selenium_driver, current_driver_id = _handle_selenium_session(step_data) + Element = LocateElement.Get_Element(step_data, selenium_driver) ignore_case = False zeuz_ai = None @@ -3011,6 +3104,9 @@ def Validate_Text(step_data): ) return "zeuz_failed" for each_step_data_item in step_data: + # Skip session parameter - already handled above + if each_step_data_item[0].lower().replace(" ", "").replace("_", "").replace("-", "") == "session" and each_step_data_item[1].strip().lower() == "optional parameter": + continue if each_step_data_item[1] == "action": expected_text_data = each_step_data_item[2] validation_type = each_step_data_item[0] @@ -3116,6 +3212,9 @@ def Scroll(step_data): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global selenium_driver try: + # Handle session parameter + session_name, selenium_driver, current_driver_id = _handle_selenium_session(step_data) + Element = None get_element = False scroll_direction = "" @@ -3123,6 +3222,10 @@ def Scroll(step_data): pixel = 750 for left, mid, right in step_data: mid = mid.strip().lower() + left_clean = left.replace(" ", "").replace("_", "").replace("-", "").lower() + # Skip session parameter - already handled above + if left_clean == "session" and mid == "optional parameter": + continue if "action" in mid: scroll_direction = right.strip().lower() elif mid == "element parameter": @@ -3399,12 +3502,62 @@ def Tear_Down_Selenium(step_data=[]): global current_driver_id try: driver_id = "" + session_name = None + + # Parse both driverid (legacy) and session (new) parameters for left, mid, right in step_data: left = left.replace(" ", "").replace("_", "").replace("-", "").lower() if left == "driverid": driver_id = right.strip() - - if not driver_id: + elif left == "session" and mid.strip().lower() == "optional parameter": + session_name = right.strip() + # For backward compatibility, treat session_name as driver_id + driver_id = session_name + + # Handle session-specific teardown + if session_name: + from Framework.Built_In_Automation.Web.utils import get_browser_session + existing_session = get_browser_session(session_name) + + if existing_session and existing_session.get("selenium_driver"): + try: + # Close the specific session's browser + session_driver = existing_session["selenium_driver"] + session_driver.quit() + CommonUtil.ExecLog(sModuleInfo, f"Teared down session '{session_name}'", 1) + except Exception as e: + errMsg = f"Unable to tear down session '{session_name}'. may already been killed" + CommonUtil.ExecLog(sModuleInfo, errMsg, 2) + CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) + + # Remove session from browser_sessions + browser_sessions = Shared_Resources.Get_Shared_Variables("browser_sessions", {}) + if session_name in browser_sessions: + del browser_sessions[session_name] + Shared_Resources.Set_Shared_Variables("browser_sessions", browser_sessions) + + # Remove from selenium_details if present + if session_name in selenium_details: + del selenium_details[session_name] + + # If this was the current driver, switch to another or clear + if current_driver_id == session_name: + if selenium_details: + for driver in selenium_details: + selenium_driver = selenium_details[driver]["driver"] + Shared_Resources.Set_Shared_Variables("selenium_driver", selenium_driver) + CommonUtil.ExecLog(sModuleInfo, f"Current driver switched to driver_id='{driver}'", 1) + current_driver_id = driver + break + else: + Shared_Resources.Remove_From_Shared_Variables("selenium_driver") + selenium_driver = None + current_driver_id = None + else: + CommonUtil.ExecLog(sModuleInfo, f"Session '{session_name}' not found. Nothing to tear down.", 2) + + # Handle existing driver_id logic (backwards compatibility) + elif not driver_id: CommonUtil.Join_Thread_and_Return_Result( "screenshot" ) # Let the capturing screenshot end in thread @@ -3422,6 +3575,8 @@ def Tear_Down_Selenium(step_data=[]): CommonUtil.ExecLog(sModuleInfo, errMsg, 2) CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) Shared_Resources.Remove_From_Shared_Variables("selenium_driver") + # Clear all browser sessions + Shared_Resources.Set_Shared_Variables("browser_sessions", {}) selenium_details = {} selenium_driver = None @@ -4601,16 +4756,23 @@ def drag_and_drop(dataset): def playwright(dataset): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME global selenium_driver + global selenium_details + global current_driver_id try: from playwright.sync_api import sync_playwright + # Get the correct remote debugging port for current driver + debug_port = 9222 # fallback + if current_driver_id and current_driver_id in selenium_details: + debug_port = selenium_details[current_driver_id].get("remote-debugging-port", 9222) + devtools_url = ( selenium_driver.command_executor._url.replace("http://", "ws://") + "/devtools/browser" ) with sync_playwright() as p: # browser = p.chromium.connect(browserURL=devtools_url) - browser = p.chromium.connect_over_cdp("http://localhost:9222") + browser = p.chromium.connect_over_cdp(f"http://localhost:{debug_port}") page = browser.contexts[0].pages[0] # source = page.locator("//div[contains(text(), 'abcd')]") diff --git a/Framework/Built_In_Automation/Web/utils.py b/Framework/Built_In_Automation/Web/utils.py new file mode 100644 index 00000000..8329b25c --- /dev/null +++ b/Framework/Built_In_Automation/Web/utils.py @@ -0,0 +1,71 @@ +from Framework.Built_In_Automation.Shared_Resources import ( + BuiltInFunctionSharedResources as sr, +) +from playwright.async_api import Browser, BrowserContext, Frame, Page +from selenium.webdriver import Chrome, Firefox, Edge, Safari + + + +def initialize_browser_sessions(): + """ + Checks if `browser_sessions` shared variable is already initialized. + If not, initializes it as an empty dictionary. + """ + + if sr.Test_Shared_Variables("browser_sessions") == False: + sr.Set_Shared_Variables("browser_sessions", {}) + + +def create_browser_session( + session_name: str = "default", + selenium_driver: Chrome | Firefox | Edge | Safari | None = None, + playwright_page: Page | None = None, + playwright_browser: Browser | None = None, + playwright_context: BrowserContext | None = None, + playwright_frame: Frame | None = None +) -> dict: + """ + Creates a new browser session with the given parameters. + Replaces the session if it already exists with the given name. + + Args: + session_name (str): The name of the session. + selenium_driver (Chrome | Firefox | Edge | Safari): The Selenium WebDriver instance. + playwright_page (Page): The Playwright Page instance. + playwright_browser (Browser): The Playwright Browser instance. + playwright_context (BrowserContext): The Playwright BrowserContext instance. + playwright_frame (Frame): The Playwright Frame instance. + """ + + if sr.Test_Shared_Variables("browser_sessions") == False: + initialize_browser_sessions() + + browser_sessions = sr.Get_Shared_Variables("browser_sessions") + browser_sessions[session_name] = { + "selenium_driver": selenium_driver, + "playwright_page": playwright_page, + "playwright_browser": playwright_browser, + "playwright_context": playwright_context, + "playwright_frame": playwright_frame + } + sr.Set_Shared_Variables("browser_sessions", browser_sessions) + + return browser_sessions[session_name] + + +def get_browser_session(session_name: str) -> dict: + """ + Returns the browser session with the given name. + + Args: + session_name (str): The name of the session. + + Returns: + dict: The browser session with the given name. + """ + + if sr.Test_Shared_Variables("browser_sessions") == False: + initialize_browser_sessions() + + browser_sessions = sr.Get_Shared_Variables("browser_sessions") + return browser_sessions.get(session_name, {})