diff --git a/lcu_driver/connector.py b/lcu_driver/connector.py index 694b85d..b313552 100644 --- a/lcu_driver/connector.py +++ b/lcu_driver/connector.py @@ -2,18 +2,70 @@ import logging import time from abc import ABC, abstractmethod +from psutil import Process # Type annotation only from .connection import Connection from .events.managers import ConnectorEventManager, WebsocketEventManager from .utils import _return_ux_process +from .exceptions import NoLeagueClientDetected # Handles one case of the number of League Clients logger = logging.getLogger('lcu-driver') +def chooseClient(processList: list[Process]): # Allows users to select one running League Client + if isinstance(processList, list) and all(map(lambda x: isinstance(x, Process), processList)): + if len(processList) == 0: + raise NoLeagueClientDetected() + elif len(processList) == 1: + process = processList[0] + else: + print('Multiple League Clients are detected. Please select one process to continue: (Submit "0" to exit.)') + # Optimize the prompt layout by formatting each column's width + indexWidth: int = max(map(lambda x: len(str(x + 1)), range(len(processList)))) + 2 + pidWidth: int = max(map(lambda x: len(str(x.pid)), processList)) + 2 + statusWidth: int = max(map(lambda x: len(str(x.status())), processList)) + 2 + createTimeWidth: int = max(map(lambda x: len(str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x.create_time())))), processList)) + 2 + filePathWidth: int = max(map(lambda x: len(str(x.exe())), processList)) + 2 + print("{0:^{5}}{1:^{6}}{2:^{7}}{3:^{8}}{4:^{9}}".format("No.", "pid", "status", "createTime", "filePath", indexWidth, pidWidth, statusWidth, createTimeWidth, filePathWidth)) + latest_index: int = 0 # The index of the process with the latest creation time + max_procCreateTime: str = "" # An intermediate variable to store the latest creation time. It obeys one-to-one correspondence with the integer timestamp, so comparisons can be made on it + for i in range(len(processList)): + process = processList[i] + procId: int = process.pid + procStatus: str = process.status() # Based on the status, this table may only display the running processes in a future commit + procCreateTime: str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(process.create_time())) + procFilePath: str = process.exe() + print("{0:^{5}}{1:^{6}}{2:^{7}}{3:^{8}}{4:^{9}}".format(i + 1, procId, procStatus, procCreateTime, procFilePath, indexWidth, pidWidth, statusWidth, createTimeWidth, filePathWidth)) + if max_procCreateTime < procCreateTime: + max_procCreateTime = procCreateTime + latest_index = i + while True: + processIndex_str: str = input() + if processIndex_str == "": # Enter nothing to select the default option + processIndex: int = latest_index + break + if processIndex_str == "0": + exit(0) + elif processIndex_str in set(map(str, range(1, len(processList) + 1))): + processIndex = int(processIndex_str) - 1 + break + else: + print("Please input an integer between 1 and %d." %(len(processList))) + process = processList[processIndex] + return process + else: + raise TypeError('invalid type of parameter "processList". Pass a list of Process objects instead') class BaseConnector(ConnectorEventManager, ABC): def __init__(self, loop=None): super().__init__() - self.loop = loop or asyncio.get_event_loop() + if loop is not None: + self.loop = loop + else: + try: + self.loop = asyncio.get_event_loop() + except RuntimeError: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) self.ws = WebsocketEventManager() @abstractmethod @@ -54,11 +106,8 @@ def start(self) -> None: """ try: def wrapper(): - process = next(_return_ux_process(), None) - while not process: - process = next(_return_ux_process(), None) - time.sleep(0.5) - + processList: list[Process] = _return_ux_process() + process = chooseClient(processList) connection = Connection(self, process) self.register_connection(connection) self.loop.run_until_complete(connection.init()) @@ -109,15 +158,11 @@ async def _astart(self): tasks = [] try: while True: - process_iter = _return_ux_process() - - process = next(process_iter, None) - while process: + processList: list[Process] = _return_ux_process() + for process in processList: connection = Connection(self, process) if not self._process_was_initialized(connection): tasks.append(asyncio.create_task(connection.init())) - - process = next(process_iter, None) await asyncio.sleep(0.5) except KeyboardInterrupt: diff --git a/lcu_driver/exceptions.py b/lcu_driver/exceptions.py index bb37fdb..50e62c0 100644 --- a/lcu_driver/exceptions.py +++ b/lcu_driver/exceptions.py @@ -19,3 +19,7 @@ class InvalidURI(BaseException): def __init__(self, error_type, used_uri=None): if error_type == 'backslash': super().__init__(f'every endpoint must start with a backslash, replace {used_uri} by /{used_uri}') + +class NoLeagueClientDetected(BaseException): + def __init__(self): + super().__init__("The program didn't detect a running League Client.") diff --git a/lcu_driver/utils.py b/lcu_driver/utils.py index c1cf626..4ac6934 100644 --- a/lcu_driver/utils.py +++ b/lcu_driver/utils.py @@ -1,10 +1,9 @@ -from typing import Dict, Generator, List +import platform +from psutil import ZombieProcess, AccessDenied, Process, process_iter -from psutil import STATUS_ZOMBIE, Process, process_iter - -def parse_cmdline_args(cmdline_args) -> Dict[str, str]: - cmdline_args_parsed = {} +def parse_cmdline_args(cmdline_args: list[str]) -> dict[str, str]: + cmdline_args_parsed: dict[str, str] = {} for cmdline_arg in cmdline_args: if len(cmdline_arg) > 0 and "=" in cmdline_arg: key, value = cmdline_arg[2:].split("=", 1) @@ -12,17 +11,26 @@ def parse_cmdline_args(cmdline_args) -> Dict[str, str]: return cmdline_args_parsed -def _return_ux_process() -> Generator[Process, None, None]: - for process in process_iter(attrs=["cmdline"]): - if process.status() == STATUS_ZOMBIE: +def _return_ux_process() -> list[Process]: + processList: list[Process] = [] + osPlatform: str = platform.system() # Distinguish the operating system preemptively + seen_pid: set[int] = set() # Ensure processList's uniqueness + for process in process_iter(): #attrs=["cmdline"] greatly increases time cost on Windows + try: + name: str = process.name() + if osPlatform in {"Linux", "Darwin"}: + cmdline: list[str] = process.cmdline() + except (ZombieProcess, AccessDenied): # Accessing the status method significantly increases time expense. This try-except statement should optimize this issue continue - - cmdline: List[str] = process.info.get("cmdline", []) - - if process.name() in ["LeagueClientUx.exe", "LeagueClientUx"]: - yield process - - # Check cmdline for the executable, especially useful in Linux environments - # where process names might differ due to compatibility layers like wine. - if cmdline and cmdline[0].endswith("LeagueClientUx.exe"): - yield process + else: + if name in {"LeagueClientUx.exe", "LeagueClientUx"}: + processList.append(process) + seen_pid.add(process.pid) + + if osPlatform in {"Linux", "Darwin"}: # In case the same process would be added multiple times on Windows + # Check cmdline for the executable, especially useful in Linux environments + # where process names might differ due to compatibility layers like wine. + if cmdline and cmdline[0].endswith("LeagueClientUx.exe") and not process.pid in seen_pid: + processList.append(process) + seen_pid.add(process.pid) + return processList