Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions lcu_driver/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,70 @@
import logging
import time
from abc import ABC, abstractmethod
from psutil import Process # Type annotation only

from .connection import Connection
from .events.managers import ConnectorEventManager, WebsocketEventManager
from .utils import _return_ux_process
from .exceptions import NoLeagueClientDetected # Handles one case of the number of League Clients

logger = logging.getLogger('lcu-driver')

def chooseClient(processList: list[Process]): # Allows users to select one running League Client
if isinstance(processList, list) and all(map(lambda x: isinstance(x, Process), processList)):
if len(processList) == 0:
raise NoLeagueClientDetected()
elif len(processList) == 1:
process = processList[0]
else:
print('Multiple League Clients are detected. Please select one process to continue: (Submit "0" to exit.)')
# Optimize the prompt layout by formatting each column's width
indexWidth: int = max(map(lambda x: len(str(x + 1)), range(len(processList)))) + 2
pidWidth: int = max(map(lambda x: len(str(x.pid)), processList)) + 2
statusWidth: int = max(map(lambda x: len(str(x.status())), processList)) + 2
createTimeWidth: int = max(map(lambda x: len(str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x.create_time())))), processList)) + 2
filePathWidth: int = max(map(lambda x: len(str(x.exe())), processList)) + 2
print("{0:^{5}}{1:^{6}}{2:^{7}}{3:^{8}}{4:^{9}}".format("No.", "pid", "status", "createTime", "filePath", indexWidth, pidWidth, statusWidth, createTimeWidth, filePathWidth))
latest_index: int = 0 # The index of the process with the latest creation time
max_procCreateTime: str = "" # An intermediate variable to store the latest creation time. It obeys one-to-one correspondence with the integer timestamp, so comparisons can be made on it
for i in range(len(processList)):
process = processList[i]
procId: int = process.pid
procStatus: str = process.status() # Based on the status, this table may only display the running processes in a future commit
procCreateTime: str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(process.create_time()))
procFilePath: str = process.exe()
print("{0:^{5}}{1:^{6}}{2:^{7}}{3:^{8}}{4:^{9}}".format(i + 1, procId, procStatus, procCreateTime, procFilePath, indexWidth, pidWidth, statusWidth, createTimeWidth, filePathWidth))
if max_procCreateTime < procCreateTime:
max_procCreateTime = procCreateTime
latest_index = i
while True:
processIndex_str: str = input()
if processIndex_str == "": # Enter nothing to select the default option
processIndex: int = latest_index
break
if processIndex_str == "0":
exit(0)
elif processIndex_str in set(map(str, range(1, len(processList) + 1))):
processIndex = int(processIndex_str) - 1
break
else:
print("Please input an integer between 1 and %d." %(len(processList)))
process = processList[processIndex]
return process
else:
raise TypeError('invalid type of parameter "processList". Pass a list of Process objects instead')

class BaseConnector(ConnectorEventManager, ABC):
def __init__(self, loop=None):
super().__init__()
self.loop = loop or asyncio.get_event_loop()
if loop is not None:
self.loop = loop
else:
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.ws = WebsocketEventManager()

@abstractmethod
Expand Down Expand Up @@ -54,11 +106,8 @@ def start(self) -> None:
"""
try:
def wrapper():
process = next(_return_ux_process(), None)
while not process:
process = next(_return_ux_process(), None)
time.sleep(0.5)

processList: list[Process] = _return_ux_process()
process = chooseClient(processList)
connection = Connection(self, process)
self.register_connection(connection)
self.loop.run_until_complete(connection.init())
Expand Down Expand Up @@ -109,15 +158,11 @@ async def _astart(self):
tasks = []
try:
while True:
process_iter = _return_ux_process()

process = next(process_iter, None)
while process:
processList: list[Process] = _return_ux_process()
for process in processList:
connection = Connection(self, process)
if not self._process_was_initialized(connection):
tasks.append(asyncio.create_task(connection.init()))

process = next(process_iter, None)
await asyncio.sleep(0.5)

except KeyboardInterrupt:
Expand Down
4 changes: 4 additions & 0 deletions lcu_driver/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ class InvalidURI(BaseException):
def __init__(self, error_type, used_uri=None):
if error_type == 'backslash':
super().__init__(f'every endpoint must start with a backslash, replace {used_uri} by /{used_uri}')

class NoLeagueClientDetected(BaseException):
def __init__(self):
super().__init__("The program didn't detect a running League Client.")
44 changes: 26 additions & 18 deletions lcu_driver/utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
from typing import Dict, Generator, List
import platform
from psutil import ZombieProcess, AccessDenied, Process, process_iter

from psutil import STATUS_ZOMBIE, Process, process_iter


def parse_cmdline_args(cmdline_args) -> Dict[str, str]:
cmdline_args_parsed = {}
def parse_cmdline_args(cmdline_args: list[str]) -> dict[str, str]:
cmdline_args_parsed: dict[str, str] = {}
for cmdline_arg in cmdline_args:
if len(cmdline_arg) > 0 and "=" in cmdline_arg:
key, value = cmdline_arg[2:].split("=", 1)
cmdline_args_parsed[key] = value
return cmdline_args_parsed


def _return_ux_process() -> Generator[Process, None, None]:
for process in process_iter(attrs=["cmdline"]):
if process.status() == STATUS_ZOMBIE:
def _return_ux_process() -> list[Process]:
processList: list[Process] = []
osPlatform: str = platform.system() # Distinguish the operating system preemptively
seen_pid: set[int] = set() # Ensure processList's uniqueness
for process in process_iter(): #attrs=["cmdline"] greatly increases time cost on Windows
try:
name: str = process.name()
if osPlatform in {"Linux", "Darwin"}:
cmdline: list[str] = process.cmdline()
except (ZombieProcess, AccessDenied): # Accessing the status method significantly increases time expense. This try-except statement should optimize this issue
continue

cmdline: List[str] = process.info.get("cmdline", [])

if process.name() in ["LeagueClientUx.exe", "LeagueClientUx"]:
yield process

# Check cmdline for the executable, especially useful in Linux environments
# where process names might differ due to compatibility layers like wine.
if cmdline and cmdline[0].endswith("LeagueClientUx.exe"):
yield process
else:
if name in {"LeagueClientUx.exe", "LeagueClientUx"}:
processList.append(process)
seen_pid.add(process.pid)

if osPlatform in {"Linux", "Darwin"}: # In case the same process would be added multiple times on Windows
# Check cmdline for the executable, especially useful in Linux environments
# where process names might differ due to compatibility layers like wine.
if cmdline and cmdline[0].endswith("LeagueClientUx.exe") and not process.pid in seen_pid:
processList.append(process)
seen_pid.add(process.pid)
return processList