From db3f84be7f96bddb618730cb1ed873012b34f37c Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Mon, 13 Apr 2026 17:51:37 +0100 Subject: [PATCH 1/2] bugfix: fix threading races and concurrency in moonrakerComm/RepeatedTimer --- BlocksScreen/lib/moonrakerComm.py | 204 ++++++++++++++++-------- BlocksScreen/lib/utils/RepeatedTimer.py | 46 +++--- 2 files changed, 161 insertions(+), 89 deletions(-) diff --git a/BlocksScreen/lib/moonrakerComm.py b/BlocksScreen/lib/moonrakerComm.py index 5f889d9f..74286e4e 100644 --- a/BlocksScreen/lib/moonrakerComm.py +++ b/BlocksScreen/lib/moonrakerComm.py @@ -21,7 +21,7 @@ class OneShotTokenError(Exception): """Raised when unable to get oneshot token to connect to a websocket""" def __init__(self, message="Unable to get oneshot token", errors=None) -> None: - super(OneShotTokenError).__init__(message, errors) + super().__init__(message, errors) self.errors = errors self.message = message @@ -30,10 +30,6 @@ class MoonWebSocket(QtCore.QObject, threading.Thread): """MoonWebSocket class object for creating a websocket connection to Moonraker.""" QUERY_KLIPPY_TIMEOUT: int = 2 - connected = False - connecting = False - callback_table = {} - _reconnect_count = 0 max_retries = 3 timeout = 3 @@ -45,9 +41,19 @@ class MoonWebSocket(QtCore.QObject, threading.Thread): query_server_info_signal = QtCore.pyqtSignal(name="query_server_information") def __init__(self, parent: QtCore.QObject) -> None: + """Initialize the websocket thread, timers, and Moonraker API helper.""" super().__init__(parent) self.daemon = True + self.connected = False + self.connecting = False + self.disconnected = False + self._reconnect_count = 0 + self.callback_table: dict = {} + + self._state_lock = threading.RLock() + self._request_lock = threading.Lock() + self._host = parent.config.get("host", parser=str, default="localhost") self._port = parent.config.get("port", parser=int, default=7125) @@ -55,10 +61,11 @@ def __init__(self, parent: QtCore.QObject) -> None: self._callback = None self._wst = None self._request_id = 0 - self.request_table = {} + self.request_table: dict = {} + self._klippy_retry_count = 0 self._moonRest = MoonRest(host=self._host, port=self._port) self.api: MoonAPI = MoonAPI(self) - self._retry_timer: RepeatedTimer + self._retry_timer: RepeatedTimer | None = None websocket.setdefaulttimeout(self.timeout) self.query_server_info_signal.connect(self.api.api_query_server_info) @@ -69,41 +76,56 @@ def __init__(self, parent: QtCore.QObject) -> None: self.klippy_state_signal.connect(self.api.request_printer_info) logger.info("Websocket object initialized") + @property + def moonRest(self) -> MoonRest: + """Returns the current moonrestAPI object""" + return self._moonRest + @QtCore.pyqtSlot(name="retry_wb_conn") def retry_wb_conn(self): """Retry websocket connection""" - if self.connecting is True and self.connected is False: - return False - self._reconnect_count = 0 + with self._state_lock: + if self.connecting is True and self.connected is False: + return False + self._reconnect_count = 0 self.try_connection() def try_connection(self): """Try connecting to websocket""" - self.connecting = True + with self._state_lock: + self.connecting = True + if self._retry_timer is not None: + self._retry_timer.stopTimer() self._retry_timer = RepeatedTimer(self.timeout, self.reconnect) return self.connect() def reconnect(self): """Reconnect to websocket""" - if self.connected: - return True - - if self._reconnect_count >= self.max_retries: - self._retry_timer.stopTimer() + with self._state_lock: + if self.connected: + return True + over_limit = self._reconnect_count >= self.max_retries + + if over_limit: + if self._retry_timer is not None: + self._retry_timer.stopTimer() unable_to_connect_event = WebSocketError( data="Unable to establish connection to Websocket" ) self.connecting_signal[int].emit(0) - self.connecting = False + with self._state_lock: + self.connecting = False try: instance = QtWidgets.QApplication.instance() if instance is not None: - instance.sendEvent(self.parent(), unable_to_connect_event) + instance.postEvent(self.parent(), unable_to_connect_event) else: raise TypeError("QApplication.instance expected ad non-None value") except Exception as e: logger.error( - f"Error on sending Event {unable_to_connect_event.__class__.__name__} | Error message: {e}" + "Error on sending Event %s | Error message: %s", + unable_to_connect_event.__class__.__name__, + e, ) logger.info( "Maximum number of connection retries reached, Unable to establish connection with Moonraker" @@ -113,17 +135,19 @@ def reconnect(self): def connect(self) -> bool: """Connect to websocket""" - if self.connected: - logger.info("Connection established") - return True - self._reconnect_count += 1 - self.connecting_signal[int].emit(int(self._reconnect_count)) + with self._state_lock: + if self.connected: + logger.info("Connection established") + return True + self._reconnect_count += 1 + _count_snapshot = self._reconnect_count + self.connecting_signal[int].emit(int(_count_snapshot)) logger.debug( - f"Establishing connection to Moonraker...\n Try number {self._reconnect_count}" + "Establishing connection to Moonraker...\n Try number %d", + _count_snapshot, ) - # TODO Handle if i cannot connect to moonraker, request server.info and see if i get a result try: - _oneshot_token = self._moonRest.get_oneshot_token() + _oneshot_token = self.moonRest.get_oneshot_token() if _oneshot_token is None: raise OneShotTokenError("Unable to retrieve oneshot token") except Exception as e: @@ -152,7 +176,7 @@ def connect(self) -> bool: logger.debug(self.ws.url) self._wst.start() except Exception as e: - logger.info(f"Unexpected while starting websocket {self._wst.name}: {e}") + logger.info("Unexpected while starting websocket %s: %s", self._wst.name, e) return False return True @@ -161,17 +185,17 @@ def wb_disconnect(self) -> None: if self._wst is not None and self.ws is not None: self.ws.close() if self._wst.is_alive(): - self._wst.join() + self._wst.join(timeout=self.timeout + 1) logger.info("Websocket closed") def on_error(self, *args) -> None: """Websocket error callback""" # First argument is ws second is error message - # TODO: Handle error messages _error = args[1] if len(args) == 2 else args[0] - logger.info(f"Websocket error, disconnected: {_error}") - self.connected = False - self.disconnected = True + logger.error("Websocket error, disconnected: %s", _error) + with self._state_lock: + self.connected = False + self.disconnected = True def on_close(self, *args) -> None: """Websocket on close callback @@ -184,7 +208,8 @@ def on_close(self, *args) -> None: return _close_status_code = args[1] if len(args) == 3 else None _close_message = args[2] if len(args) == 3 else None - self.connected = False + with self._state_lock: + self.connected = False self.ws.keep_running = False self.connection_lost[str].emit( f"code: {_close_status_code} | message {_close_message}" @@ -220,8 +245,10 @@ def on_open(self, *args) -> None: TypeError: When QApplication.instance `is` None """ _ws = args[0] if len(args) == 1 else None - self.connecting = False - self.connected = True + with self._state_lock: + self.connecting = False + self.connected = True + self._klippy_retry_count = 0 self.evaluate_klippy_status() open_event = WebSocketOpen(data="Connected") try: @@ -231,11 +258,12 @@ def on_open(self, *args) -> None: else: raise TypeError("QApplication.instance expected non None value") except Exception as e: - logger.info(f"Unexpected error opening websocket: {e}") + logger.info("Unexpected error opening websocket: %s", e) self.connected_signal.emit() - self._retry_timer.stopTimer() - logger.info(f"Connection to websocket achieved on {_ws}") + if self._retry_timer is not None: + self._retry_timer.stopTimer() + logger.info("Connection to websocket achieved on %s", _ws) def on_message(self, *args) -> None: """Websocket on message callback @@ -247,24 +275,44 @@ def on_message(self, *args) -> None: args[1] if len(args) == 2 else args[0] ) # First argument is ws second is message - response: dict = json.loads(_message) - if "id" in response and response["id"] in self.request_table: - _entry = self.request_table.pop(response["id"]) + try: + response: dict = json.loads(_message) + except json.JSONDecodeError as e: + logger.error("Failed to decode websocket message: %s", e) + return + if "id" in response: + with self._request_lock: + _entry = self.request_table.pop(response["id"], None) + else: + _entry = None + + if _entry is not None: if "server.info" in _entry[0]: - if response["result"]["klippy_state"] == "ready": + if "error" in response: + return + _result = response.get("result", {}) + _klippy_state = _result.get("klippy_state") + if not _klippy_state: + return + if _klippy_state == "ready": self.query_klippy_status_timer.stopTimer() + self._klippy_retry_count = 0 self.api.update_status() # Request update status immediately after klippy ready DEVDEBT - elif response["result"]["klippy_state"] == "startup": - # request server.info in 2 seconds - if not self.query_klippy_status_timer.running: - self.query_klippy_status_timer.startTimer() - elif response["result"]["klippy_state"] == "disconnected": - if not self.query_klippy_status_timer.running: + elif _klippy_state in ("startup", "disconnected"): + self._klippy_retry_count += 1 + if self._klippy_retry_count >= 30: + self.query_klippy_status_timer.stopTimer() + logger.error( + "Klippy startup sequence timed out after %d retries (state=%s)", + self._klippy_retry_count, + _klippy_state, + ) + elif not self.query_klippy_status_timer.running: self.query_klippy_status_timer.startTimer() self.klippy_connected_signal.emit( - response["result"]["klippy_connected"] + _result.get("klippy_connected", False) ) - self.klippy_state_signal.emit(response["result"]["klippy_state"]) + self.klippy_state_signal.emit(_klippy_state) return else: if "error" in response: @@ -276,13 +324,15 @@ def on_message(self, *args) -> None: else: message_event = WebSocketMessageReceived( method=str(_entry[0]), - data=response["result"], + data=response.get("result", {}), metadata=_entry, ) elif "method" in response: if ( str(response["method"]).lower() == "notify_klippy_disconnected" ): # Checkout for notify_klippy_disconnect + self.klippy_state_signal.emit("disconnected") + self._klippy_retry_count = 0 self.evaluate_klippy_status() message_event = ( @@ -292,6 +342,8 @@ def on_message(self, *args) -> None: metadata=None, ) ) + else: + return try: instance = QtWidgets.QApplication.instance() @@ -300,36 +352,46 @@ def on_message(self, *args) -> None: else: raise TypeError("QApplication.instance expected non None value") except Exception as e: - logger.info(f"Unexpected error while creating websocket message event: {e}") + logger.info( + "Unexpected error while creating websocket message event: %s", e + ) - def send_request(self, method: str, params: dict = {}) -> bool: + def send_request(self, method: str, params: dict | None = None) -> bool: """Send a request over the websocket Args: method (str): Websocket method name - params (dict, optional): parameters for the websocket method. Defaults to {}. + params (dict, optional): parameters for the websocket method. Defaults to None. Returns: bool: Whether the method finished and a request was sent """ - if not self.connected or self.ws is None: + if params is None: + params = {} + with self._state_lock: + _connected = self.connected + _ws = self.ws + if not _connected or _ws is None: return False - self._request_id += 1 - self.request_table[self._request_id] = [method, params] + with self._request_lock: + self._request_id += 1 + _rid = self._request_id + self.request_table[_rid] = [method, params] + packet = { "jsonrpc": "2.0", "method": method, "params": params, - "id": self._request_id, + "id": _rid, } - self.ws.send(json.dumps(packet)) + _ws.send(json.dumps(packet)) return True class MoonAPI(QtCore.QObject): def __init__(self, ws: MoonWebSocket): - super(MoonAPI, self).__init__(ws) + super().__init__(ws) self._ws: MoonWebSocket = ws @QtCore.pyqtSlot(name="api_query_server_info") @@ -446,11 +508,11 @@ def restart_service(self, service): @QtCore.pyqtSlot(name="firmware_restart") def firmware_restart(self): - """Request Klipper firmware restart + """`POST MoonrakerAPI` /printer/firmware_restart + Firmware restart to Klipper - HTTP_REQUEST: POST /printer/firmware_restart - - JSON_RPC_REQUEST: printer.firmware_restart + Returns: + str: Returns an 'ok' from Moonraker """ return self._ws.send_request(method="printer.firmware_restart") @@ -605,7 +667,7 @@ def move_file(self, source_dir: str, dest_dir: str): isinstance(source_dir, str) is False or isinstance(dest_dir, str) is False or source_dir is None - or dest_dir is False + or dest_dir is None ): return False return self._ws.send_request( @@ -619,7 +681,7 @@ def copy_file(self, source_dir: str, dest_dir: str): isinstance(source_dir, str) is False or isinstance(dest_dir, str) is False or source_dir is None - or dest_dir is False + or dest_dir is None ): return False return self._ws.send_request( @@ -732,7 +794,7 @@ def update_status(self, refresh: bool = False) -> bool: @QtCore.pyqtSlot(name="update-refresh") @QtCore.pyqtSlot(str, name="update-refresh") - def refresh_update_status(self, name: str = None) -> bool: + def refresh_update_status(self, name: str | None = None) -> bool: """Refresh packages state""" if isinstance(name, str): return self._ws.send_request( @@ -763,7 +825,9 @@ def update_client(self, client_name: str = "") -> bool: """Issue client update""" if not isinstance(client_name, str) or not client_name: return False - return self._ws.send_request(method="machine.update.client") + return self._ws.send_request( + method="machine.update.client", params={"name": client_name} + ) @QtCore.pyqtSlot(name="update-system") def update_system(self): @@ -787,7 +851,7 @@ def rollback_update(self, name: str): if not isinstance(name, str) or not name: return False return self._ws.send_request( - method="machine,update.rollback", params={"name": name} + method="machine.update.rollback", params={"name": name} ) def history_list(self, limit, start, since, before, order): diff --git a/BlocksScreen/lib/utils/RepeatedTimer.py b/BlocksScreen/lib/utils/RepeatedTimer.py index 42b42aa0..698c2ce8 100644 --- a/BlocksScreen/lib/utils/RepeatedTimer.py +++ b/BlocksScreen/lib/utils/RepeatedTimer.py @@ -10,6 +10,7 @@ def __init__( *args, **kwargs, ): + """Initialize a repeating timer that invokes callback every timeout seconds.""" super().__init__(daemon=True) self.name = name self._timeout = timeout @@ -17,6 +18,7 @@ def __init__( self._args = args self._kwargs = kwargs + self._lock = threading.Lock() self.running = False self.timeoutEvent = threading.Event() self.stopEvent = threading.Event() @@ -24,36 +26,42 @@ def __init__( self.startTimer() def _run(self): - self.running = False - self.startTimer() - self.stopEvent.wait() + """Invoke the callback and restart the timer loop, unless stopped.""" + with self._lock: + self.running = False + if self.stopEvent.is_set(): + return if callable(self._function): self._function(*self._args, **self._kwargs) + self.startTimer() def startTimer(self): """Start timer""" - if self.running is False: + with self._lock: + if self.running: + return + self.stopEvent.clear() try: - self._timer = threading.Timer(self._timeout, self._run) - self._timer.daemon = True - self._timer.start() - if not self.stopEvent.is_set(): - self.stopEvent.set() + timer = threading.Timer(self._timeout, self._run) + timer.daemon = True + self._timer = timer + self.running = True except Exception as e: + self.running = False raise Exception( f"RepeatedTimer {self.name} error while starting timer, error: {e}" - ) - finally: - self.running = False - self.running = True + ) from e + # Start outside the lock to avoid holding it during thread creation + timer.start() def stopTimer(self): """Stop timer""" - if self._timer is None: - return - if self.running: - self._timer.cancel() - self._timer.join() + with self._lock: + if self._timer is None or not self.running: + return + timer = self._timer self._timer = None - self.stopEvent.clear() self.running = False + self.stopEvent.set() + timer.cancel() + timer.join() From 4573e8535aa75a5392da174df339a3117e59f2ae Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Mon, 13 Apr 2026 18:44:24 +0100 Subject: [PATCH 2/2] fix: eliminate RepeatedTimer stop race by checking stopEvent under lock --- BlocksScreen/lib/utils/RepeatedTimer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/BlocksScreen/lib/utils/RepeatedTimer.py b/BlocksScreen/lib/utils/RepeatedTimer.py index 698c2ce8..6e059a74 100644 --- a/BlocksScreen/lib/utils/RepeatedTimer.py +++ b/BlocksScreen/lib/utils/RepeatedTimer.py @@ -29,8 +29,8 @@ def _run(self): """Invoke the callback and restart the timer loop, unless stopped.""" with self._lock: self.running = False - if self.stopEvent.is_set(): - return + if self.stopEvent.is_set(): + return if callable(self._function): self._function(*self._args, **self._kwargs) self.startTimer() @@ -62,6 +62,6 @@ def stopTimer(self): timer = self._timer self._timer = None self.running = False - self.stopEvent.set() + self.stopEvent.set() timer.cancel() timer.join()