From e14dc708db0a1a284106f3069e3fb5eb63e5cad9 Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Mon, 13 Apr 2026 17:52:51 +0100 Subject: [PATCH 1/3] bugfix: fix 30+ Pi runtime crashes across UI panels and widgets --- BlocksScreen/lib/panels/filamentTab.py | 63 +++--- BlocksScreen/lib/panels/mainWindow.py | 118 ++++++++---- BlocksScreen/lib/panels/networkWindow.py | 43 +++-- BlocksScreen/lib/panels/utilitiesTab.py | 36 ++-- BlocksScreen/lib/panels/widgets/basePopup.py | 34 ++-- BlocksScreen/lib/panels/widgets/cancelPage.py | 61 +++--- .../lib/panels/widgets/confirmPage.py | 84 +++++--- .../lib/panels/widgets/connectionPage.py | 14 +- .../lib/panels/widgets/inputshaperPage.py | 21 +- .../lib/panels/widgets/notificationPage.py | 27 ++- .../lib/panels/widgets/probeHelperPage.py | 87 +++++---- .../lib/panels/widgets/sensorsPanel.py | 40 ++-- .../lib/panels/widgets/troubleshootPage.py | 9 +- BlocksScreen/lib/printer.py | 180 ++++++++++-------- BlocksScreen/screensaver.py | 26 ++- 15 files changed, 482 insertions(+), 361 deletions(-) diff --git a/BlocksScreen/lib/panels/filamentTab.py b/BlocksScreen/lib/panels/filamentTab.py index 04fc5ef4..f5849f7b 100644 --- a/BlocksScreen/lib/panels/filamentTab.py +++ b/BlocksScreen/lib/panels/filamentTab.py @@ -1,12 +1,11 @@ import enum +import typing from functools import partial - -from lib.printer import Printer from lib.filament import Filament -from lib.ui.filamentStackedWidget_ui import Ui_filamentStackedWidget - from lib.panels.widgets.popupDialogWidget import Popup +from lib.printer import Printer +from lib.ui.filamentStackedWidget_ui import Ui_filamentStackedWidget from PyQt6 import QtCore, QtGui, QtWidgets @@ -18,6 +17,9 @@ class FilamentTab(QtWidgets.QStackedWidget): request_toolhead_count = QtCore.pyqtSignal(int, name="toolhead_number_received") run_gcode = QtCore.pyqtSignal(str, name="run_gcode") call_load_panel = QtCore.pyqtSignal(bool, str, name="call-load-panel") + filament_type_changed: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( + str, name="filament-type-changed" + ) class FilamentTypes(enum.Enum): PLA = Filament(name="PLA", temperature=220) @@ -44,6 +46,7 @@ def __init__(self, parent: QtWidgets.QWidget, printer: Printer, ws, /) -> None: self.has_load_unload_objects = None self._filament_state = self.FilamentStates.UNKNOWN self._sensor_states = {} + self._loaded_filament_name: str = "" self.filament_type: Filament | None = None self.panel.filament_page_load_btn.clicked.connect( partial(self.change_page, self.indexOf(self.panel.load_page)) @@ -52,22 +55,22 @@ def __init__(self, parent: QtWidgets.QWidget, printer: Printer, ws, /) -> None: self.panel.load_custom_btn.hide() self.panel.load_header_back_button.clicked.connect(self.back_button) self.panel.load_pla_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=220) + partial(self.load_filament, toolhead=0, temp=220, name="PLA") ) self.panel.load_petg_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=240) + partial(self.load_filament, toolhead=0, temp=240, name="PETG") ) self.panel.load_abs_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=250) + partial(self.load_filament, toolhead=0, temp=250, name="ABS") ) self.panel.load_hips_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=250) + partial(self.load_filament, toolhead=0, temp=250, name="HIPS") ) self.panel.load_nylon_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=270) + partial(self.load_filament, toolhead=0, temp=270, name="Nylon") ) self.panel.load_tpu_btn.clicked.connect( - partial(self.load_filament, toolhead=0, temp=230) + partial(self.load_filament, toolhead=0, temp=230, name="TPU") ) self.panel.filament_page_unload_btn.clicked.connect( lambda: self.unload_filament(toolhead=0, temp=250) @@ -122,32 +125,32 @@ def on_extruder_update( self, extruder_name: str, field: str, new_value: float ) -> None: """Handle extruder update""" - if not self.isVisible: + if not self.isVisible(): return if not self.loadignore or not self.unloadignore: if self.target_temp != 0: if self.current_temp == self.target_temp: - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit( True, "Extruder heated up \n Please wait" ) return if field == "temperature": self.current_temp = round(new_value, 0) - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit( True, f"Heating up ({new_value}/{self.target_temp}) \n Please wait", ) if field == "target": self.target_temp = round(new_value, 0) - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit(True, "Heating up \n Please wait") @QtCore.pyqtSlot(bool, name="on_load_filament") def on_load_filament(self, status: bool): """Handle load filament object updated""" - if not self.isVisible: + if not self.isVisible(): return if self.loadignore: return @@ -158,12 +161,14 @@ def on_load_filament(self, status: bool): self.target_temp = 0 self.call_load_panel.emit(False, "") self._filament_state = self.FilamentStates.LOADED + if self._loaded_filament_name: + self.filament_type_changed.emit(self._loaded_filament_name) self.handle_filament_state() @QtCore.pyqtSlot(bool, name="on_unload_filament") def on_unload_filament(self, status: bool): """Handle unload filament object updated""" - if not self.isVisible: + if not self.isVisible(): return if self.unloadignore: return @@ -174,12 +179,19 @@ def on_unload_filament(self, status: bool): self.call_load_panel.emit(False, "") self.target_temp = 0 self._filament_state = self.FilamentStates.UNLOADED + self._loaded_filament_name = "" self.handle_filament_state() - @QtCore.pyqtSlot(int, int, name="load_filament") - def load_filament(self, toolhead: int = 0, temp: int = 220) -> None: - """Handle load filament buttons clicked""" - if not self.isVisible: + @QtCore.pyqtSlot(int, int, str, name="load_filament") + def load_filament(self, toolhead: int = 0, temp: int = 220, name: str = "") -> None: + """Handle load filament buttons clicked. + + Args: + toolhead: Toolhead index. + temp: Target temperature for loading. + name: Filament type name (e.g. "PLA", "PETG"). + """ + if not self.isVisible(): return if self._filament_state == self.FilamentStates.UNKNOWN: @@ -194,6 +206,7 @@ def load_filament(self, toolhead: int = 0, temp: int = 220) -> None: message="Filament is already loaded.", ) return + self._loaded_filament_name = name self.loadignore = False self.call_load_panel.emit(True, "Loading Filament") self.run_gcode.emit(f"LOAD_FILAMENT TOOLHEAD=load_toolhead TEMPERATURE={temp}") @@ -201,7 +214,7 @@ def load_filament(self, toolhead: int = 0, temp: int = 220) -> None: @QtCore.pyqtSlot(str, int, name="unload_filament") def unload_filament(self, toolhead: int = 0, temp: int = 220) -> None: """Handle unload filament button clicked""" - if not self.isVisible: + if not self.isVisible(): return if self._filament_state == self.FilamentStates.UNKNOWN: @@ -260,15 +273,15 @@ def find_routine_objects(self): _available_objects = self.printer.available_objects.copy() - if "load_filament" in _available_objects.keys(): + if "load_filament" in _available_objects: self.has_load_unload_objects = True return True - if "unload_filament" in _available_objects.keys(): + if "unload_filament" in _available_objects: self.has_load_unload_objects = True return True - if "gcode_macro LOAD_FILAMENT" in _available_objects.keys(): + if "gcode_macro LOAD_FILAMENT" in _available_objects: return True - if "gcode_macro UNLOAD_FILAMENT" in _available_objects.keys(): + if "gcode_macro UNLOAD_FILAMENT" in _available_objects: return True return False diff --git a/BlocksScreen/lib/panels/mainWindow.py b/BlocksScreen/lib/panels/mainWindow.py index 12fbc628..baca9b52 100644 --- a/BlocksScreen/lib/panels/mainWindow.py +++ b/BlocksScreen/lib/panels/mainWindow.py @@ -106,7 +106,7 @@ class MainWindow(QtWidgets.QMainWindow): def __init__(self): """Set up UI, instantiate subsystems, and wire all inter-component signals.""" - super(MainWindow, self).__init__() + super().__init__() self.config: BlocksScreenConfig = get_configparser() self.ui = Ui_MainWindow() self.ui.setupUi(self) @@ -125,6 +125,7 @@ def __init__(self): self.mc = MachineControl(self) self.file_data = Files(self, self.ws) self.index_stack = deque(maxlen=4) + self._printing_active = False self.printer = Printer(self, self.ws) self.conn_window = ConnectionPage(self, self.ws) self.update_page = UpdatePage(self) @@ -154,6 +155,7 @@ def __init__(self): self.printPanel.request_change_page.connect(slot=self.global_change_page) self.filamentPanel.request_back.connect(slot=self.global_back) self.filamentPanel.request_change_page.connect(slot=self.global_change_page) + self.filamentPanel.filament_type_changed.connect(self.set_header_filament_type) self.controlPanel.request_back_button.connect(slot=self.global_back) self.controlPanel.request_change_page.connect(slot=self.global_change_page) self.utilitiesPanel.request_back.connect(slot=self.global_back) @@ -253,8 +255,9 @@ def __init__(self): ) self.loadscreen.add_widget(self.loadwidget) self.controlPanel.toggle_conn_page.connect(self.conn_window.set_toggle) - self.cancelpage = CancelPage(self, ws=self.ws) + self.cancelpage = CancelPage(self) self.cancelpage.request_file_info.connect(self.file_data.on_request_fileinfo) + self.cancelpage.reprint_start.connect(self.ws.api.start_print) self.cancelpage.run_gcode.connect(self.ws.api.run_gcode) self.printer.print_stats_update[str, str].connect( self.cancelpage.on_print_stats_update @@ -333,9 +336,13 @@ def show_update_page(self, fullscreen: bool): @QtCore.pyqtSlot(name="on-cancel-print") def on_cancel_print(self): """Slot for cancel print signal""" + self._printing_active = False self.enable_tab_bar() - self.ui.extruder_temp_display.clicked.disconnect() - self.ui.bed_temp_display.clicked.disconnect() + try: + self.ui.extruder_temp_display.clicked.disconnect() + self.ui.bed_temp_display.clicked.disconnect() + except TypeError: + pass self.ui.filament_type_icon.setDisabled(False) self.ui.nozzle_size_icon.setDisabled(False) self.ui.extruder_temp_display.clicked.connect( @@ -378,16 +385,16 @@ def enable_tab_bar(self) -> bool: self.ui.header_main_layout.setEnabled(True) return all( [ - not self.ui.main_content_widget.isTabEnabled( + self.ui.main_content_widget.isTabEnabled( self.ui.main_content_widget.indexOf(self.ui.filamentTab) ), - not self.ui.main_content_widget.isTabEnabled( + self.ui.main_content_widget.isTabEnabled( self.ui.main_content_widget.indexOf(self.ui.controlTab) ), - not self.ui.main_content_widget.isTabEnabled( + self.ui.main_content_widget.isTabEnabled( self.ui.main_content_widget.indexOf(self.ui.utilitiesTab) ), - not self.ui.header_main_layout.isEnabled(), + self.ui.header_main_layout.isEnabled(), ] ) @@ -513,8 +520,18 @@ def global_change_page(self, tab_index: int, panel_index: int) -> None: _logger.debug("User is already on the requested page") return self.index_stack.append(current_page) + # Temporarily enable the target tab so setCurrentIndex works, + # then re-disable the tab bar if a print is active. + was_enabled = self.ui.main_content_widget.isTabEnabled(tab_index) + if not was_enabled: + self.ui.main_content_widget.setTabEnabled(tab_index, True) self.ui.main_content_widget.setCurrentIndex(tab_index) self.set_current_panel_index(panel_index) + if self._printing_active: + self.disable_tab_bar() + # Keep the target tab enabled — Qt auto-switches away from + # a disabled current tab, which undoes the navigation. + self.ui.main_content_widget.setTabEnabled(tab_index, True) _logger.debug( f"Requested page change -> Tab index : {requested_page[0]} | panel index : {requested_page[1]}", ) @@ -525,9 +542,16 @@ def global_back(self) -> None: if not bool(self.index_stack): _logger.debug("Index stack is empty, cannot go back any further") return - self.ui.main_content_widget.setCurrentIndex(self.index_stack[-1][0]) - self.set_current_panel_index(self.index_stack[-1][1]) - self.index_stack.pop() # Remove the last position. + tab_index, panel_index = self.index_stack[-1] + was_enabled = self.ui.main_content_widget.isTabEnabled(tab_index) + if not was_enabled: + self.ui.main_content_widget.setTabEnabled(tab_index, True) + self.ui.main_content_widget.setCurrentIndex(tab_index) + self.set_current_panel_index(panel_index) + if self._printing_active: + self.disable_tab_bar() + self.ui.main_content_widget.setTabEnabled(tab_index, True) + self.index_stack.pop() _logger.debug("Successfully went back a page.") @QtCore.pyqtSlot(name="bo-start-websocket-connection") @@ -560,7 +584,7 @@ def messageReceivedEvent(self, event: events.WebSocketMessageReceived) -> None: return api_reference = _method.split(".") if "klippy" in _method: - api_reference = "notify_klippy" + api_reference = ["notify_klippy"] method_handle = f"_handle_{api_reference[0]}_message" if hasattr(self, method_handle): obj = getattr(self, method_handle) @@ -576,10 +600,8 @@ def _handle_server_message(self, method, data, metadata) -> None: QtWidgets.QApplication.postEvent(self.file_data, file_data_event) except Exception as e: _logger.error( - ( - "Error posting event for file related information", - "received from websocket | error message received: %s", - ), + "Error posting event for file related information " + "received from websocket | error message received: %s", str(e), ) @@ -589,20 +611,18 @@ def _handle_machine_message(self, method, data, metadata) -> None: if "ok" in data: return if "update" in method: - if ("status" or "refresh") in method: + if "status" in method or "refresh" in method: self.on_update_message.emit(dict(data)) @api_handler def _handle_notify_update_response_message(self, method, data, metadata) -> None: """Handle update response messages""" - self.on_update_message.emit( - dict(dict(data.get("params", {})[0])) - ) # Also necessary, notify klippy can also signal update complete + self.on_update_message.emit(dict(dict(data.get("params", [{}])[0]))) @api_handler def _handle_notify_update_refreshed_message(self, method, data, metadata) -> None: """Handle update refreshed messages""" - self.on_update_message.emit(dict(data.get("params", {})[0])) + self.on_update_message.emit(dict(data.get("params", [{}])[0])) @api_handler def _handle_printer_message(self, method, data, metadata) -> None: @@ -621,15 +641,19 @@ def _handle_printer_message(self, method, data, metadata) -> None: self.printer_state_signal.emit("canceled") if "objects" in method: if "list" in method: - _object_list: list = data["objects"] + _object_list: list = data.get("objects", []) self.query_object_list[list].emit(_object_list) if "subscribe" in method: - _objects_response_list = [data["status"], data["eventtime"]] + _objects_response_list = [ + data.get("status", {}), + data.get("eventtime", 0), + ] self.printer_object_report_signal[list].emit(_objects_response_list) if "query" in method: - if isinstance(data["status"], dict): - _object_report = [data["status"]] - _object_report_keys = data["status"].items() + _query_status = data.get("status") + if isinstance(_query_status, dict): + _object_report = [_query_status] + _object_report_keys = _query_status.items() _object_report_list_dict: list = [] for _, key in enumerate(_object_report_keys): _helper_dict: dict = {key[0]: key[1]} @@ -690,7 +714,9 @@ def _handle_notify_service_state_changed_message( if self._popup_toggle: return service_entry: dict = entry[0] - service_name, service_info = service_entry.popitem() + if not service_entry: + return + service_name, service_info = next(iter(service_entry.items())) self.show_notifications.emit( "mainwindow", str( @@ -703,12 +729,15 @@ def _handle_notify_service_state_changed_message( @api_handler def _handle_notify_gcode_response_message(self, method, data, metadata) -> None: """Handle websocket gcode responses messages""" - _gcode_response = data.get("params") + _gcode_response = data.get("params", []) self.gcode_response[list].emit(_gcode_response) if _gcode_response: if self._popup_toggle: return - _gcode_msg_type, _message = str(_gcode_response[0]).split(" ", maxsplit=1) + _parts = str(_gcode_response[0]).split(" ", maxsplit=1) + if len(_parts) < 2: + return + _gcode_msg_type, _message = _parts popupWhitelist = ["filament runout", "no filament"] if _message.lower() not in popupWhitelist or _gcode_msg_type != "!!": return @@ -753,22 +782,27 @@ def _handle_notify_cpu_throttled_message(self, method, data, metadata) -> None: "Currently Throttled": 1 << 2, "Temperature Limit Active": 1 << 3, } - _bits = data.get("bits", None) - if not _bits: + _params = data.get("params", [{}]) + _bits = _params[0].get("bits") if _params else None + if _bits is None: self.show_notifications.emit( "mainWindow", "Cpu throttled unknown reason", 2, False ) return + if _bits == 0: + return _active_flags = [name for name, mask in flags.items() if _bits & mask] self.show_notifications.emit("mainwindow", str(_active_flags), 2, False) except Exception: - logging.debug("Error emitting notification for cpu throttled notification.") + _logger.debug("Error emitting notification for cpu throttled notification.") return @api_handler def _handle_notify_status_update_message(self, method, data, metadata) -> None: """Handle websocket printer objects status update messages""" - _object_report = data["params"] + _object_report = data.get("params") + if not _object_report: + return self.printer_object_report_signal[list].emit(_object_report) @QtCore.pyqtSlot(str, str, float, name="on-extruder-update") @@ -825,9 +859,13 @@ def event(self, event: QtCore.QEvent) -> bool: return True return False if event.type() == events.PrintStart.type(): + self._printing_active = True self.disable_tab_bar() - self.ui.extruder_temp_display.clicked.disconnect() - self.ui.bed_temp_display.clicked.disconnect() + try: + self.ui.extruder_temp_display.clicked.disconnect() + self.ui.bed_temp_display.clicked.disconnect() + except TypeError: + pass self.ui.filament_type_icon.setDisabled(True) self.ui.nozzle_size_icon.setDisabled(True) self.ui.extruder_temp_display.clicked.connect( @@ -851,9 +889,13 @@ def event(self, event: QtCore.QEvent) -> bool: ): if event.type() == events.PrintCancelled.type(): self.handle_cancel_print() + self._printing_active = False self.enable_tab_bar() - self.ui.extruder_temp_display.clicked.disconnect() - self.ui.bed_temp_display.clicked.disconnect() + try: + self.ui.extruder_temp_display.clicked.disconnect() + self.ui.bed_temp_display.clicked.disconnect() + except TypeError: + pass self.ui.filament_type_icon.setDisabled(False) self.ui.nozzle_size_icon.setDisabled(False) self.ui.extruder_temp_display.clicked.connect( @@ -874,4 +916,4 @@ def event(self, event: QtCore.QEvent) -> bool: def sizeHint(self) -> QtCore.QSize: """Sets default size for the widget""" self.adjustSize() - return super().sizeHint(QtCore.QSize(800, 480)) + return QtCore.QSize(800, 480) diff --git a/BlocksScreen/lib/panels/networkWindow.py b/BlocksScreen/lib/panels/networkWindow.py index f9b17a52..b39a1bd5 100644 --- a/BlocksScreen/lib/panels/networkWindow.py +++ b/BlocksScreen/lib/panels/networkWindow.py @@ -35,7 +35,6 @@ from lib.utils.icon_button import IconButton from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem from PyQt6 import QtCore, QtGui, QtWidgets -from PyQt6.QtCore import QTimer, pyqtSlot logger = logging.getLogger(__name__) @@ -255,7 +254,7 @@ def _prefill_ip_from_os(self) -> None: except OSError: continue - @pyqtSlot() + @QtCore.pyqtSlot() def _on_reconnect_complete(self) -> None: """Navigate back to the main panel after a static-IP or DHCP-reset operation.""" logger.debug("reconnect_complete received — navigating to main_network_page") @@ -263,7 +262,7 @@ def _on_reconnect_complete(self) -> None: def _init_timers(self) -> None: """Initialize timers.""" - self._load_timer = QTimer(self) + self._load_timer = QtCore.QTimer(self) self._load_timer.setSingleShot(True) self._load_timer.timeout.connect(self._handle_load_timeout) @@ -277,7 +276,7 @@ def _init_model_view(self) -> None: self._entry_delegate.item_selected.connect(self._on_ssid_item_clicked) self._configure_list_view_palette() - @pyqtSlot(NetworkState) + @QtCore.pyqtSlot(NetworkState) def _on_network_state_changed(self, state: NetworkState) -> None: """React to a NetworkState update: sync toggles, populate header and connection info.""" logger.debug( @@ -438,7 +437,7 @@ def _on_network_state_changed(self, state: NetworkState) -> None: self._emit_status_icon(state) self._sync_active_network_list_icon(state) - @pyqtSlot(list) + @QtCore.pyqtSlot(list) def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: """Receive scan results, filter/sort them, and rebuild the SSID list view. @@ -457,9 +456,11 @@ def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: # Stamp the connected AP as ACTIVE so the list is correct on first # render even when the scan ran before the connection fully settled. filtered = [ - replace(net, network_status=NetworkStatus.ACTIVE) - if net.ssid == current_ssid - else net + ( + replace(net, network_status=NetworkStatus.ACTIVE) + if net.ssid == current_ssid + else net + ) for net in filtered ] active = next((n for n in filtered if n.ssid == current_ssid), None) @@ -478,12 +479,12 @@ def _on_scan_complete(self, networks: list[NetworkInfo]) -> None: state = self._nm.current_state self._emit_status_icon(state) - @pyqtSlot(list) + @QtCore.pyqtSlot(list) def _on_saved_networks_loaded(self, networks: list[SavedNetwork]) -> None: """Receive saved-network data and update the priority spinbox for the active SSID.""" logger.debug("Loaded %d saved networks", len(networks)) - @pyqtSlot(ConnectionResult) + @QtCore.pyqtSlot(ConnectionResult) def _on_operation_complete(self, result: ConnectionResult) -> None: """Handle network operation completion.""" logger.debug("Operation: success=%s, msg=%s", result.success, result.message) @@ -570,7 +571,7 @@ def _on_operation_complete(self, result: ConnectionResult) -> None: result.message, ) ssid = self._target_ssid - QTimer.singleShot( + QtCore.QTimer.singleShot( 2000, lambda _ssid=ssid: self._nm.connect_network(_ssid) ) return # Keep loading visible; state machine handles completion @@ -578,7 +579,7 @@ def _on_operation_complete(self, result: ConnectionResult) -> None: self._clear_loading() self._show_error_popup(result.message) - @pyqtSlot(str, str) + @QtCore.pyqtSlot(str, str) def _on_network_error(self, operation: str, message: str) -> None: """Log network errors and surface critical failures in the info box.""" logger.error("Network error [%s]: %s", operation, message) @@ -658,13 +659,15 @@ def _sync_active_network_list_icon(self, state: NetworkState) -> None: # Update the cached entry with the authoritative signal and status updated = [ - replace( - net, - signal_strength=self._active_signal, - network_status=NetworkStatus.ACTIVE, + ( + replace( + net, + signal_strength=self._active_signal, + network_status=NetworkStatus.ACTIVE, + ) + if net.ssid == state.current_ssid + else net ) - if net.ssid == state.current_ssid - else net for net in self._cached_scan_networks ] @@ -1063,7 +1066,9 @@ def _handle_wifi_toggle(self, is_on: bool) -> None: # Non-blocking: disable hotspot then connect self._nm.toggle_hotspot(False) _ssid_to_connect = self._target_ssid - QTimer.singleShot(500, lambda: self._nm.connect_network(_ssid_to_connect)) + QtCore.QTimer.singleShot( + 500, lambda: self._nm.connect_network(_ssid_to_connect) + ) def _handle_hotspot_toggle(self, is_on: bool) -> None: """Enable or disable the hotspot, enforcing the ethernet/Wi-Fi mutual-exclusion rule.""" diff --git a/BlocksScreen/lib/panels/utilitiesTab.py b/BlocksScreen/lib/panels/utilitiesTab.py index 6cff5f27..d2dc1679 100644 --- a/BlocksScreen/lib/panels/utilitiesTab.py +++ b/BlocksScreen/lib/panels/utilitiesTab.py @@ -1,9 +1,14 @@ +import logging +import re import typing from dataclasses import dataclass from enum import Enum, auto from functools import partial from lib.moonrakerComm import MoonWebSocket +from lib.panels.widgets.basePopup import BasePopup +from lib.panels.widgets.inputshaperPage import InputShaperPage +from lib.panels.widgets.optionCardWidget import OptionCard from lib.panels.widgets.troubleshootPage import TroubleshootPage from lib.printer import Printer from lib.ui.utilitiesStackedWidget_ui import Ui_utilitiesStackedWidget @@ -11,11 +16,7 @@ from lib.utils.toggleAnimatedButton import ToggleAnimatedButton from PyQt6 import QtCore, QtGui, QtWidgets -from lib.panels.widgets.optionCardWidget import OptionCard -from lib.panels.widgets.inputshaperPage import InputShaperPage -from lib.panels.widgets.basePopup import BasePopup - -import re +logger = logging.getLogger(__name__) @dataclass @@ -113,8 +114,8 @@ def __init__( self.x_inputshaper: dict = {} self.stepper_limits: dict = {} - self.current_object: typing.Optional[str] = None - self.current_process: typing.Optional[Process] = None + self.current_object: str | None = None + self.current_process: Process | None = None self.axis_in: str = "x" self.amount: int = 1 self.tb: bool = False @@ -250,8 +251,9 @@ def handle_gcode_response(self, data: list[str]) -> None: """ if not isinstance(data, list) or len(data) != 1 or not isinstance(data[0], str): - print( - f"WARNING: Invalid input format. Expected a list with one string. Received: {data}" + logger.warning( + "handle_gcode_response: invalid input format. Expected list[str], received: %r", + data, ) return @@ -317,7 +319,7 @@ def handle_gcode_response(self, data: list[str]) -> None: self.is_page.set_type_dictionary(self.is_types) first_key = next(iter(reordered.keys()), None) - for key in reordered.keys(): + for key in reordered: if key == first_key: self.is_page.add_type_entry(key, "Recommended type") else: @@ -365,7 +367,7 @@ def on_object_list(self, object_list: list) -> None: @QtCore.pyqtSlot(dict, name="on_object_config") @QtCore.pyqtSlot(list, name="on_object_config") - def on_object_config(self, config: typing.Union[dict, list]) -> None: + def on_object_config(self, config: dict | list) -> None: """Handle receiving printer object configurations""" if not config: return @@ -381,12 +383,12 @@ def on_object_config(self, config: typing.Union[dict, list]) -> None: pos_max = value.get("position_max") if pos_min is not None or pos_max is not None: self.stepper_limits[key] = { - "min": float(pos_min) - if pos_min is not None - else -float("inf"), - "max": float(pos_max) - if pos_max is not None - else float("inf"), + "min": ( + float(pos_min) if pos_min is not None else -float("inf") + ), + "max": ( + float(pos_max) if pos_max is not None else float("inf") + ), } def on_printer_config_received(self, config: dict) -> None: diff --git a/BlocksScreen/lib/panels/widgets/basePopup.py b/BlocksScreen/lib/panels/widgets/basePopup.py index a9a4d188..16c42c85 100644 --- a/BlocksScreen/lib/panels/widgets/basePopup.py +++ b/BlocksScreen/lib/panels/widgets/basePopup.py @@ -1,5 +1,3 @@ -import typing - from PyQt6 import QtCore, QtGui, QtWidgets @@ -47,13 +45,11 @@ def __init__( self.setAttribute(QtCore.Qt.WidgetAttribute.WA_TranslucentBackground, True) self.setWindowModality(QtCore.Qt.WindowModality.ApplicationModal) else: - self.setStyleSheet( - """ + self.setStyleSheet(""" #MyParent { background-image: url(:/background/media/1st_background.png); } - """ - ) + """) def _update_button_style(self) -> None: """Applies the current color variables and adds the central border to the stylesheets.""" @@ -61,26 +57,21 @@ def _update_button_style(self) -> None: return if not self.floating: - self.confirm_button.setStyleSheet( - f""" + self.confirm_button.setStyleSheet(f""" background-color: {self.confirm_bk_color}; color: {self.confirm_ft_color}; border: none; padding: 10px; - """ - ) + """) - self.cancel_button.setStyleSheet( - f""" + self.cancel_button.setStyleSheet(f""" background-color: {self.cancel_bk_color}; color: {self.cancel_ft_color}; border: none; padding: 10px; - """ - ) + """) else: - self.confirm_button.setStyleSheet( - f""" + self.confirm_button.setStyleSheet(f""" background-color: {self.confirm_bk_color}; color: {self.confirm_ft_color}; border-top: none; @@ -89,11 +80,9 @@ def _update_button_style(self) -> None: border-right: 1px solid #80807e; border-bottom-left-radius: 16px; padding: 10px; - """ - ) + """) - self.cancel_button.setStyleSheet( - f""" + self.cancel_button.setStyleSheet(f""" background-color: {self.cancel_bk_color}; color: {self.cancel_ft_color}; border-left: 1px solid #80807e;; @@ -101,8 +90,7 @@ def _update_button_style(self) -> None: border-right: 2px solid #80807e; border-bottom-right-radius: 16px; padding: 10px; - """ - ) + """) def set_message(self, message: str) -> None: self.label.setText(message) @@ -151,7 +139,7 @@ def add_widget(self, widget: QtWidgets.QWidget) -> None: layout.insertWidget(index, widget) widget.show() - def _get_mainWindow_widget(self) -> typing.Optional[QtWidgets.QMainWindow]: + def _get_mainWindow_widget(self) -> QtWidgets.QMainWindow | None: """Get the main application window""" app_instance = QtWidgets.QApplication.instance() if not app_instance: diff --git a/BlocksScreen/lib/panels/widgets/cancelPage.py b/BlocksScreen/lib/panels/widgets/cancelPage.py index e16fb2e6..832c2f29 100644 --- a/BlocksScreen/lib/panels/widgets/cancelPage.py +++ b/BlocksScreen/lib/panels/widgets/cancelPage.py @@ -1,17 +1,16 @@ +import logging +import typing + from lib.utils.blocks_button import BlocksCustomButton from lib.utils.blocks_frame import BlocksCustomFrame from lib.utils.blocks_label import BlocksLabel from PyQt6 import QtCore, QtGui, QtWidgets -import typing -from lib.moonrakerComm import MoonWebSocket +logger = logging.getLogger(__name__) class CancelPage(QtWidgets.QWidget): - """Update GUI Page, - retrieves from moonraker available clients and adds functionality - for updating or recovering them - """ + """Displayed when a print is cancelled; offers reprint or ignore.""" request_file_info: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="request_file_info" @@ -23,16 +22,13 @@ class CancelPage(QtWidgets.QWidget): str, name="run_gcode" ) - def __init__(self, parent: QtWidgets.QWidget, ws: MoonWebSocket) -> None: + def __init__(self, parent: QtWidgets.QWidget) -> None: super().__init__(parent) - self.ws: MoonWebSocket = ws self._setupUI() self.filename = "" - - self.reprint_start.connect(self.ws.api.start_print) + self._thumbnail_scan_done: bool = False self.confirm_button.clicked.connect(lambda: self._handle_accept()) - self.refuse_button.clicked.connect(lambda: self._handle_refuse()) self.setAttribute(QtCore.Qt.WidgetAttribute.WA_StyledBackground, True) @@ -52,8 +48,10 @@ def _handle_refuse(self): def on_print_stats_update(self, field: str, value: dict | float | str) -> None: if isinstance(value, str): if "filename" in field: + if value != self.filename: + self._thumbnail_scan_done = False self.filename = value - if self.isVisible: + if self.isVisible(): self.set_file_name(value) def show(self): @@ -91,22 +89,25 @@ def set_pixmap(self, pixmap: QtGui.QPixmap) -> None: def set_file_name(self, file_name: str) -> None: self.cf_file_name.setText(file_name) - def _show_screen_thumbnail(self, dict): - try: - thumbnails = dict["thumbnail_images"] + def _show_screen_thumbnail(self, metadata: dict | None) -> None: + """Display the largest thumbnail from file metadata. - last_thumb = QtGui.QPixmap.fromImage(thumbnails[-1]) - - if last_thumb.isNull(): - last_thumb = QtGui.QPixmap( - "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" - ) - except Exception as e: - print(e) - last_thumb = QtGui.QPixmap( - "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" - ) - self.set_pixmap(last_thumb) + ``thumbnail_images`` values are pre-loaded ``QImage`` + objects produced by ``Files._process_metadata``. + """ + fallback = QtGui.QPixmap( + "BlocksScreen/lib/ui/resources/media/logoblocks400x300.png" + ) + thumbnails = metadata.get("thumbnail_images", []) if metadata else [] + if not thumbnails: + self.set_pixmap(fallback) + return + + last_thumb = thumbnails[-1] + if isinstance(last_thumb, QtGui.QImage) and not last_thumb.isNull(): + self.set_pixmap(QtGui.QPixmap.fromImage(last_thumb)) + else: + self.set_pixmap(fallback) def _setupUI(self) -> None: """Setup widget ui""" @@ -119,11 +120,9 @@ def _setupUI(self) -> None: sizePolicy.setHeightForWidth(self.sizePolicy().hasHeightForWidth()) self.setSizePolicy(sizePolicy) self.setObjectName("cancelPage") - self.setStyleSheet( - """#cancelPage { + self.setStyleSheet("""#cancelPage { background-image: url(:/background/media/1st_background.png); - }""" - ) + }""") self.setMinimumSize(QtCore.QSize(800, 480)) self.setMaximumSize(QtCore.QSize(800, 480)) self.setLayoutDirection(QtCore.Qt.LayoutDirection.LeftToRight) diff --git a/BlocksScreen/lib/panels/widgets/confirmPage.py b/BlocksScreen/lib/panels/widgets/confirmPage.py index 0f35ba39..d759f0ea 100644 --- a/BlocksScreen/lib/panels/widgets/confirmPage.py +++ b/BlocksScreen/lib/panels/widgets/confirmPage.py @@ -1,3 +1,4 @@ +import logging import os import typing @@ -8,8 +9,12 @@ from lib.utils.icon_button import IconButton from PyQt6 import QtCore, QtGui, QtWidgets +logger = logging.getLogger(__name__) + class ConfirmWidget(QtWidgets.QWidget): + """Widget displayed when a user selects a file to print.""" + on_accept: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="on_accept" ) @@ -26,7 +31,6 @@ def __init__(self, parent) -> None: self.setMouseTracking(True) self.setAttribute(QtCore.Qt.WidgetAttribute.WA_AcceptTouchEvents, True) self.thumbnail: QtGui.QImage = self._blocksthumbnail - self._thumbnails: typing.List = [] self.directory = "gcodes" self.filename = "" self.confirm_button.clicked.connect( @@ -39,32 +43,34 @@ def __init__(self, parent) -> None: lambda: self.on_delete.emit(self.filename, self.directory) ) - @QtCore.pyqtSlot(str, dict, name="on_show_widget") - def on_show_widget(self, text: str, filedata: dict | None = None) -> None: - """Handle widget show""" - if not filedata: - return + @QtCore.pyqtSlot(str, object, name="on_show_widget") + def on_show_widget(self, text: str, metadata: dict | None = None) -> None: + """Handle widget show.""" directory = os.path.dirname(text) filename = os.path.basename(text) self.directory = directory self.filename = filename self.cf_file_name.setText(self.filename) - self._thumbnails = filedata.get("thumbnail_images", []) - if self._thumbnails: - _biggest_thumbnail = self._thumbnails[-1] # Show last which is biggest - self.thumbnail = QtGui.QImage(_biggest_thumbnail) - else: + if metadata is None: self.thumbnail = self._blocksthumbnail - _total_filament = filedata.get("filament_weight_total") - _estimated_time = filedata.get("estimated_time") - if isinstance(_estimated_time, str): - seconds = 0 - else: - seconds = _estimated_time + self.cf_info_tf.setText("Total Filament: loading...") + self.cf_info_tr.setText("Slicer time: loading...") + self.update() + return + self._update_metadata_labels(metadata) + self.update() + + def _update_metadata_labels(self, metadata: dict) -> None: + """Update thumbnail and text labels from metadata.""" + self._apply_thumbnail(metadata) + raw_weight = metadata.get("filament_weight_total", 0) + _total_filament: float | str = raw_weight if raw_weight > 0 else 0 + seconds = metadata.get("estimated_time", 0) + seconds = seconds if seconds > 0 else 0 days, hours, minutes, _ = helper_methods.estimate_print_time(seconds) if seconds <= 0: - time_str = "??" + time_str = "Unknown" elif seconds < 60: time_str = "less than 1 minute" else: @@ -83,9 +89,39 @@ def on_show_widget(self, text: str, filedata: dict | None = None) -> None: _total_filament = str("%.2f" % _total_filament) + "g" filament_label = f"Total Filament: {_total_filament}" time_label = f"Slicer time: {time_str}" - self.cf_info_tf.setText(f"{filament_label}") - self.cf_info_tr.setText(f"{time_label}") - self.repaint() + self.cf_info_tf.setText(filament_label) + self.cf_info_tr.setText(time_label) + + def _apply_thumbnail(self, metadata: dict) -> None: + """Set self.thumbnail from metadata, falling back to the logo.""" + thumbnails = metadata.get("thumbnail_images", []) + if thumbnails: + last = thumbnails[-1] + if isinstance(last, QtGui.QImage) and not last.isNull(): + self.thumbnail = last + return + self.thumbnail = self._blocksthumbnail + + @QtCore.pyqtSlot(dict, name="on_fileinfo") + def on_fileinfo(self, metadata: dict) -> None: + """Update thumbnail and metadata labels when new data arrives.""" + if not metadata or not self.filename: + return + incoming = metadata.get("filename", "") + current = ( + f"{self.directory}/{self.filename}" if self.directory else self.filename + ) + # Also accept bare-filename match for USB files: Moonraker may strip the + # USB directory prefix from the returned filename. + is_usb_bare_match = ( + incoming == self.filename + and self.directory.startswith("USB-") + and incoming == os.path.basename(incoming) + ) + if incoming != current and not is_usb_bare_match: + return + self._update_metadata_labels(metadata) + self.update() def estimate_print_time(self, seconds: int) -> list: """Convert time in seconds format to days, hours, minutes, seconds. @@ -142,8 +178,8 @@ def paintEvent(self, event: QtGui.QPaintEvent) -> None: def showEvent(self, a0: QtGui.QShowEvent) -> None: """Re-implemented method, Handle widget show event""" - if not self.thumbnail: - self.cf_thumbnail.close() + if self.thumbnail.isNull(): + self.cf_thumbnail.hide() return super().showEvent(a0) def _setupUI(self) -> None: @@ -252,7 +288,6 @@ def _setupUI(self) -> None: "icon_pixmap", QtGui.QPixmap(":/dialog/media/btn_icons/yes.svg") ) self.confirm_button.setText("Print") - # 2. Align buttons to the right self.cf_confirm_layout.addWidget( self.confirm_button, 0, QtCore.Qt.AlignmentFlag.AlignCenter ) @@ -266,7 +301,6 @@ def _setupUI(self) -> None: "icon_pixmap", QtGui.QPixmap(":/ui/media/btn_icons/garbage-icon.svg") ) self.delete_file_button.setText("Delete") - # 2. Align buttons to the right self.cf_confirm_layout.addWidget( self.delete_file_button, 0, QtCore.Qt.AlignmentFlag.AlignCenter ) diff --git a/BlocksScreen/lib/panels/widgets/connectionPage.py b/BlocksScreen/lib/panels/widgets/connectionPage.py index 3cd1fc21..745b8195 100644 --- a/BlocksScreen/lib/panels/widgets/connectionPage.py +++ b/BlocksScreen/lib/panels/widgets/connectionPage.py @@ -57,7 +57,7 @@ def __init__(self, parent: QtWidgets.QWidget, ws: MoonWebSocket, /): self.restart_klipper_clicked.emit ) self.ws.connection_lost.connect(slot=self.show) - self.ws.klippy_connected_signal.connect(self.on_klippy_connected) + self.ws.klippy_connected_signal.connect(self.on_klippy_connection) self.ws.klippy_state_signal.connect(self.on_klippy_state) @QtCore.pyqtSlot(bool, name="toggle_connection_page") @@ -82,7 +82,7 @@ def showEvent(self, a0: QtCore.QEvent | None): self.call_cancel_panel.emit(False) return super().showEvent(a0) - @QtCore.pyqtSlot(bool, name="on_klippy_connected") + @QtCore.pyqtSlot(bool, name="on_klippy_connection") def on_klippy_connection(self, connected: bool): """Handle klippy connection state""" self.dot_timer.stop() @@ -143,15 +143,13 @@ def text_update(self, text: int | str | None = None): if self.state == "shutdown" and self.message is not None: return False self.dot_timer.stop() - logger.debug(f"[ConnectionWindowPanel] text_update: {text}") + logger.debug("[ConnectionWindowPanel] text_update: %r", text) if text == "wb lost": self.panel.connectionTextBox.setText("Moonraker connection lost") if text is None: - self.panel.connectionTextBox.setText( - """ + self.panel.connectionTextBox.setText(""" Not connected to Moonraker Websocket - """ - ) + """) return True if isinstance(text, str): self.panel.connectionTextBox.setText( @@ -208,7 +206,7 @@ def eventFilter(self, object: QtCore.QObject, event: QtCore.QEvent) -> bool: elif event.type() == KlippyShutdown.type(): self.dot_timer.stop() if not self.isVisible(): - self.panel.connectionTextBox.setText(f"{self.message}") + self.panel.connectionTextBox.setText(self.message or "") self.show() return True diff --git a/BlocksScreen/lib/panels/widgets/inputshaperPage.py b/BlocksScreen/lib/panels/widgets/inputshaperPage.py index ead82cfb..dd890fb5 100644 --- a/BlocksScreen/lib/panels/widgets/inputshaperPage.py +++ b/BlocksScreen/lib/panels/widgets/inputshaperPage.py @@ -1,11 +1,11 @@ +import typing + from lib.utils.blocks_button import BlocksCustomButton from lib.utils.blocks_frame import BlocksCustomFrame from lib.utils.icon_button import IconButton from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem from PyQt6 import QtCore, QtGui, QtWidgets -import typing - class InputShaperPage(QtWidgets.QWidget): """Update GUI Page, @@ -24,6 +24,7 @@ def __init__(self, parent=None) -> None: else: super().__init__() self._setupUI() + self.currentItem: ListItem | None = None self.selected_item: ListItem | None = None self.ongoing_update: bool = False self.type_dict: dict = {} @@ -96,21 +97,26 @@ def on_item_clicked(self, item: ListItem) -> None: if not current_info: return - self.vib_label.setText(str("%.0f" % current_info.get("vibration", "N/A")) + "%") + _vib = current_info.get("vibration") + self.vib_label.setText(f"{float(_vib):.0f}%" if _vib is not None else "N/A%") + _accel = current_info.get("max_accel") self.sug_accel_label.setText( - str("%.0f" % current_info.get("max_accel", "N/A")) + "mm/s²" + f"{float(_accel):.0f}mm/s²" if _accel is not None else "N/Amm/s²" ) self.action_btn.show() def handle_ism_confirm(self) -> None: + """Apply the selected input shaper type to the printer and save the config.""" + if self.currentItem is None: + return current_info = self.type_dict.get(self.currentItem.text, {}) frequency = current_info.get("frequency", "N/A") - if self.type_dict["Axis"] == "x": + if self.type_dict.get("Axis") == "x": self.run_gcode_signal.emit( f"SET_INPUT_SHAPER SHAPER_TYPE_X={self.currentItem.text} SHAPER_FREQ_X={frequency}" ) - elif self.type_dict["Axis"] == "y": + elif self.type_dict.get("Axis") == "y": self.run_gcode_signal.emit( f"SET_INPUT_SHAPER SHAPER_TYPE_Y={self.currentItem.text} SHAPER_FREQ_Y={frequency}" ) @@ -138,7 +144,8 @@ def _setupUI(self) -> None: font_id = QtGui.QFontDatabase.addApplicationFont( ":/font/media/fonts for text/Momcake-Bold.ttf" ) - font_family = QtGui.QFontDatabase.applicationFontFamilies(font_id)[0] + _families = QtGui.QFontDatabase.applicationFontFamilies(font_id) + font_family = _families[0] if _families else "" sizePolicy = QtWidgets.QSizePolicy( QtWidgets.QSizePolicy.Policy.MinimumExpanding, QtWidgets.QSizePolicy.Policy.MinimumExpanding, diff --git a/BlocksScreen/lib/panels/widgets/notificationPage.py b/BlocksScreen/lib/panels/widgets/notificationPage.py index a14b6cbc..6bf1fd00 100644 --- a/BlocksScreen/lib/panels/widgets/notificationPage.py +++ b/BlocksScreen/lib/panels/widgets/notificationPage.py @@ -1,15 +1,12 @@ -from lib.utils.blocks_frame import BlocksCustomFrame -from lib.utils.blocks_button import BlocksCustomButton -from lib.utils.icon_button import IconButton -from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem -from PyQt6 import QtCore, QtGui, QtWidgets import typing - from collections import deque -from typing import Deque - from lib.panels.widgets.popupDialogWidget import Popup +from lib.utils.blocks_button import BlocksCustomButton +from lib.utils.blocks_frame import BlocksCustomFrame +from lib.utils.icon_button import IconButton +from lib.utils.list_model import EntryDelegate, EntryListModel, ListItem +from PyQt6 import QtCore, QtGui, QtWidgets class NotificationPage(QtWidgets.QWidget): @@ -28,7 +25,7 @@ def __init__(self, parent=None) -> None: else: super().__init__() self._setupUI() - self.cli_tracking: Deque = deque() + self.cli_tracking: deque = deque() self.selected_item: ListItem | None = None self.ongoing_update: bool = False self.popup = Popup(self) @@ -78,6 +75,8 @@ def reset_view_model(self) -> None: def build_model_list(self) -> None: """Builds the model list (`self.model`) containing updatable clients""" + if not self.cli_tracking: + return self.update_buttons_list_widget.blockSignals(True) message, origin, priority = self.cli_tracking.popleft() match priority: @@ -194,11 +193,9 @@ def _setupUI(self) -> None: font.setPointSize(20) self.setSizePolicy(sizePolicy) self.setObjectName("updatePage") - self.setStyleSheet( - """#updatePage { + self.setStyleSheet("""#updatePage { background-image: url(:/background/media/1st_background.png); - }""" - ) + }""") self.setLayoutDirection(QtCore.Qt.LayoutDirection.LeftToRight) self.update_page_content_layout = QtWidgets.QVBoxLayout() self.setMinimumSize(800, 480) @@ -347,10 +344,10 @@ def _setupUI(self) -> None: self.time_title.setFont(font) self.time_title.setStyleSheet("color:#FFFFFF") - self.time_title.setFont(font) + self.type_label.setFont(font) self.type_label.setStyleSheet("color:#FFFFFF") - self.time_title.setFont(font) + self.time_label.setFont(font) self.time_label.setStyleSheet("color:#FFFFFF") self.info_frame.setLayout(self.info_box_layout) diff --git a/BlocksScreen/lib/panels/widgets/probeHelperPage.py b/BlocksScreen/lib/panels/widgets/probeHelperPage.py index 219adb6a..902f9a4c 100644 --- a/BlocksScreen/lib/panels/widgets/probeHelperPage.py +++ b/BlocksScreen/lib/panels/widgets/probeHelperPage.py @@ -42,7 +42,6 @@ class ProbeHelper(QtWidgets.QWidget): ) distances = ["0.01", ".025", "0.1", "0.5", "1"] - _calibration_commands: list = [] helper_start: bool = False helper_initialize: bool = False _zhop_height: float = float(distances[0]) @@ -52,6 +51,7 @@ class ProbeHelper(QtWidgets.QWidget): z_offset_calibration_speed: int = 100 def __init__(self, parent: QtWidgets.QWidget) -> None: + """Initialize the probe helper widget and connect internal signals.""" super().__init__(parent) self.setObjectName("probe_offset_page") @@ -92,6 +92,7 @@ def __init__(self, parent: QtWidgets.QWidget) -> None: self.target_temp = 0 self.current_temp = 0 self._eddy_calibration_state = False + self._calibration_commands: list = [] @QtCore.pyqtSlot(str, dict, name="on_print_stats_update") @QtCore.pyqtSlot(str, float, name="on_print_stats_update") @@ -281,8 +282,6 @@ def on_object_config(self, config: dict | list) -> None: if not _config: return if _config.get("home_xy_position"): - if not _config.get("home_xy_position"): - return self.z_offset_safe_xy = tuple( map( lambda value: float(value), @@ -333,6 +332,7 @@ def on_printer_config(self, config: dict) -> None: @QtCore.pyqtSlot(dict, name="on_available_gcode_cmds") def on_available_gcode_cmds(self, gcode_cmds: dict) -> None: """Setup available probe calibration commands""" + self._calibration_commands.clear() _available_commands = gcode_cmds.keys() if "PROBE_CALIBRATE" in _available_commands: self._calibration_commands.append("PROBE_CALIBRATE") @@ -367,16 +367,11 @@ def _build_calibration_command(self, tool: str) -> str: return "Z_ENDSTOP_CALIBRATE" elif "eddy" in tool: if self._verify_gcode("PROBE_EDDY_CURRENT_CALIBRATE"): - _name = tool.split(" ")[1] - # if not _name: - # return "" - # return ( - # f"PROBE_EDDY_CURRENT_CALIBRATE CHIP={tool.split(' ')[1]}" - # ) - return ( - f"PROBE_EDDY_CURRENT_CALIBRATE CHIP={tool.split(' ')[1]}" - * bool(_name) - ) + ("" * ~bool(_name)) + _parts = tool.split(" ", 1) + if len(_parts) < 2: + return "" + _name = _parts[1] + return f"PROBE_EDDY_CURRENT_CALIBRATE CHIP={_name}" if _name else "" elif "probe" in tool or "bltouch" in tool: if self._verify_gcode("PROBE_CALIBRATE"): @@ -403,7 +398,7 @@ def handle_zhopHeight_change(self, new_value: float) -> None: self._zhop_height = new_value @QtCore.pyqtSlot("PyQt_PyObject", name="handle_start_tool") - def handle_start_tool(self, sender: typing.Type[OptionCard]) -> None: + def handle_start_tool(self, sender: OptionCard) -> None: """Handle probe tool helper start by sending the correct gcode command according to the clicked option card. This is achieved by @@ -414,7 +409,7 @@ def handle_start_tool(self, sender: typing.Type[OptionCard]) -> None: sender. Args: - sender (typing.Type[OptionCard]): The clicked OptionCard object + sender (OptionCard): The clicked OptionCard instance """ if not sender: return @@ -429,19 +424,21 @@ def handle_start_tool(self, sender: typing.Type[OptionCard]) -> None: lambda: self.query_printer_object.emit({"manual_probe": None}) ) _timer.start(int(300)) - # self.query_printer_object.emit({"manual_probe": None}) - _cmd = self._build_calibration_command(sender.name) # type:ignore + _cmd = self._build_calibration_command(sender.name) # type: ignore if not _cmd: return self.disable_popups.emit(True) self.run_gcode_signal.emit("G28\nM400") - if "eddy" in sender.name: # type:ignore + if "eddy" in sender.name: # type: ignore self.call_load_panel.emit(True, "Preparing Eddy Current Calibration...") self.toggle_conn_page.emit(False) self._move_to_pos(self.z_offset_safe_xy[0], self.z_offset_safe_xy[1], 100) + _name_parts = sender.name.split(" ", 1) # type: ignore + if len(_name_parts) < 2: + return self.run_gcode_signal.emit( - f"LDC_CALIBRATE_DRIVE_CURRENT CHIP={sender.name.split(' ')[1]}" # type:ignore + f"LDC_CALIBRATE_DRIVE_CURRENT CHIP={_name_parts[1]}" ) self.run_gcode_signal.emit("M400\nSAVE_CONFIG") @@ -467,19 +464,19 @@ def on_extruder_update( return if self.target_temp != 0: if self.current_temp == self.target_temp: - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit(True, "Extruder heated up \n Please wait") return if field == "temperature": self.current_temp = round(new_value, 0) - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit( True, f"Heating up ({new_value}/{self.target_temp}) \n Please wait", ) if field == "target": self.target_temp = round(new_value, 0) - if self.isVisible: + if self.isVisible(): self.call_load_panel.emit(True, "Cleaning the nozzle \n Please wait") @QtCore.pyqtSlot(name="handle_accept") @@ -523,32 +520,32 @@ def on_gcode_move_update(self, name: str, value: list) -> None: @QtCore.pyqtSlot(dict, name="on_manual_probe_update") def on_manual_probe_update(self, update: dict) -> None: - """Handle manual probe update""" + """Handle manual probe update. + + Only process ``is_active`` state transitions when the key is + actually present in the update dict. Klipper sends partial + updates (e.g. only position data after TESTZ) and defaulting + ``is_active`` to False on those would reset the entire UI. + """ if not update: return - # if update.get("z_position_lower"): - # f"{update.get('z_position_lower'):.4f} mm" - - is_active = update.get("is_active", None) - if update.get("z_position_upper"): - self.old_offset_info.setText(f"{update.get('z_position_upper'):.4f} mm") - if update.get("z_position"): - self.current_offset_info.setText(f"{update.get('z_position'):.4f} mm") + if "is_active" in update: + is_active = update["is_active"] + if is_active and not self.isVisible(): + self.request_page_view.emit() + self.helper_initialize = False + self.helper_start = is_active + self._toggle_tool_buttons(is_active) + if is_active: + self._hide_option_cards() + else: + self._show_option_cards() - if not is_active: - return - if not self.isVisible(): - self.request_page_view.emit() - # Shared state updates - self.helper_initialize = False - self.helper_start = is_active - # UI updates - self._toggle_tool_buttons(is_active) - if is_active: - self._hide_option_cards() - else: - self._show_option_cards() + if update.get("z_position_upper") is not None: + self.old_offset_info.setText(f"{update['z_position_upper']:.4f} mm") + if update.get("z_position") is not None: + self.current_offset_info.setText(f"{update['z_position']:.4f} mm") @QtCore.pyqtSlot(list, name="handle_gcode_response") def handle_gcode_response(self, data: list) -> None: @@ -558,6 +555,8 @@ def handle_gcode_response(self, data: list) -> None: data (list): A list containing the gcode that originated the response and the response """ + if not data: + return if self.isVisible(): if data[0].startswith("!!"): # An error occurred if "already in a manual z probe" in data[0].strip("!! ").lower(): diff --git a/BlocksScreen/lib/panels/widgets/sensorsPanel.py b/BlocksScreen/lib/panels/widgets/sensorsPanel.py index df63cfb5..77fe5865 100644 --- a/BlocksScreen/lib/panels/widgets/sensorsPanel.py +++ b/BlocksScreen/lib/panels/widgets/sensorsPanel.py @@ -19,7 +19,7 @@ class SensorsWindow(QtWidgets.QWidget): ) def __init__(self, parent): - super(SensorsWindow, self).__init__(parent) + super().__init__(parent) self.model = EntryListModel() self.entry_delegate = EntryDelegate() self.sensor_tracking_widget = {} @@ -32,11 +32,15 @@ def __init__(self, parent): self.fs_back_button.clicked.connect(self.request_back) def reset_view_model(self) -> None: - """Clears items from ListView - (Resets `QAbstractListModel` by clearing entries) - """ + """Clears items from ListView and removes existing sensor widgets.""" self.model.clear() self.entry_delegate.clear() + for widget in self.sensor_tracking_widget.values(): + self.info_box_layout.removeWidget(widget) + widget.deleteLater() + self.sensor_tracking_widget.clear() + self.sensor_list.clear() + self.current_widget = None @QtCore.pyqtSlot(dict, name="handle_available_fil_sensors") def handle_available_fil_sensors(self, sensors: dict) -> None: @@ -46,7 +50,7 @@ def handle_available_fil_sensors(self, sensors: dict) -> None: self.reset_view_model() filtered_sensors = [ sensor - for sensor in sensors.keys() + for sensor in sensors if sensor.startswith( ("filament_switch_sensor", "filament_motion_sensor", "cutter_sensor") ) @@ -63,14 +67,20 @@ def handle_available_fil_sensors(self, sensors: dict) -> None: def handle_fil_state_change( self, sensor_name: str, parameter: str, value: bool ) -> None: - """Handle Klipper signals for filament sensor changes""" + """Handle Klipper signals for filament sensor changes.""" _item = self.sensor_tracking_widget.get(sensor_name) - if _item: - if parameter == "filament_detected": - state = SensorWidget.FilamentState(not value) - _item.change_fil_sensor_state(state) - elif parameter == "enabled": - _item.toggle_button_state(SensorWidget.SensorState(value)) + if not _item: + return + if parameter == "filament_detected": + # filament_detected=True means filament IS present + state = ( + SensorWidget.FilamentState.PRESENT + if value + else SensorWidget.FilamentState.MISSING + ) + _item.set_filament_state(state) + elif parameter == "enabled": + _item.toggle_button_state(SensorWidget.SensorState(value)) def showEvent(self, event: QtGui.QShowEvent | None) -> None: """Re-add clients to update list""" @@ -108,7 +118,8 @@ def create_sensor_widget(self, name: str) -> SensorWidget: else: _item_widget.show() self.current_widget = _item_widget - name_id = str(name).split(" ")[1] + _parts = str(name).split(" ", 1) + name_id = _parts[1] if len(_parts) > 1 else _parts[0] item = ListItem( text=name_id, right_text="", @@ -133,7 +144,8 @@ def _setupUi(self) -> None: font_id = QtGui.QFontDatabase.addApplicationFont( ":/font/media/fonts for text/Momcake-Bold.ttf" ) - font_family = QtGui.QFontDatabase.applicationFontFamilies(font_id)[0] + _families = QtGui.QFontDatabase.applicationFontFamilies(font_id) + font_family = _families[0] if _families else "" sizePolicy = QtWidgets.QSizePolicy( QtWidgets.QSizePolicy.Policy.MinimumExpanding, QtWidgets.QSizePolicy.Policy.MinimumExpanding, diff --git a/BlocksScreen/lib/panels/widgets/troubleshootPage.py b/BlocksScreen/lib/panels/widgets/troubleshootPage.py index 0c327ac7..8673f045 100644 --- a/BlocksScreen/lib/panels/widgets/troubleshootPage.py +++ b/BlocksScreen/lib/panels/widgets/troubleshootPage.py @@ -1,6 +1,5 @@ -from PyQt6 import QtCore, QtGui, QtWidgets - from lib.utils.icon_button import IconButton +from PyQt6 import QtCore, QtGui, QtWidgets class TroubleshootPage(QtWidgets.QDialog): @@ -9,14 +8,12 @@ def __init__( parent: QtWidgets.QWidget, ) -> None: super().__init__(parent) - self.setStyleSheet( - """ + self.setStyleSheet(""" #troubleshoot_page { background-image: url(:/background/media/1st_background.png); border: none; } - """ - ) + """) self.setWindowFlags( QtCore.Qt.WindowType.Popup | QtCore.Qt.WindowType.FramelessWindowHint ) diff --git a/BlocksScreen/lib/printer.py b/BlocksScreen/lib/printer.py index c6c76fbc..fc820730 100644 --- a/BlocksScreen/lib/printer.py +++ b/BlocksScreen/lib/printer.py @@ -105,7 +105,7 @@ class Printer(QtCore.QObject): current_loaded_file_metadata: str = "" def __init__(self, parent: QtCore.QObject, ws: MoonWebSocket, /) -> None: - super(Printer, self).__init__(parent) + super().__init__(parent) self.ws = ws self.active_extruder_name: str = "" @@ -138,6 +138,18 @@ def clear_printer_objs(self) -> None: self.printer_busy = False self.current_loaded_file = "" self.current_loaded_file_metadata = "" + _heater_attributes: dict = { + "current_temperature": 0.0, + "target_temperature": 0.0, + "can_extrude": False, + } + self.heaters_object = { + "extruder": _heater_attributes.copy(), + "bed": _heater_attributes.copy(), + } + self.active_extruder_name = "" + self.available_filament_sensors = {} + self.has_chamber = False @QtCore.pyqtSlot(str, name="on_klippy_status") def on_klippy_status(self, state: str): @@ -224,7 +236,7 @@ def get_config(self, section_name: str) -> dict: return _config[0].get(section_name, {}) def search_config_list( - self, search_list: list[str], _objects: typing.Optional[list] = None + self, search_list: list[str], _objects: list | None = None ) -> list: """ Search a list of printer objects recursively @@ -295,7 +307,7 @@ def _gcode_response(self, report: list) -> None: self.gcode_response.emit(report) def _webhook_printcore_updated(self, value: dict): - self.on_printcore_update[dict].emit(value) + self.on_printcore_update.emit(value) def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: """Sends an event type according to the received state @@ -305,7 +317,7 @@ def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: value (dict): _description_ name (str, optional): _description_. Defaults to "". """ - if "state" in value.keys() and "state_message" in value.keys(): + if "state" in value and "state_message" in value: self.webhooks_update.emit(value["state"], value["state_message"]) logger.debug("Webhooks message received") _state: str = value["state"] @@ -318,71 +330,71 @@ def _webhooks_object_updated(self, value: dict, name: str = "webhooks") -> None: event = _event_callback(value["state"], value["state_message"]) instance = QtWidgets.QApplication.instance() if instance is not None and isinstance(event, QtCore.QEvent): - instance.sendEvent(self.parent(), event) + instance.postEvent(self.parent(), event) else: raise TypeError("QApplication.instance is None type.") except Exception as e: logger.debug( - "Unable to send internal Klippy %s notification : %e", + "Unable to send internal Klippy %s notification : %s", _state_call, e, ) def _gcode_move_object_updated(self, value: dict, name: str = "gcode_move") -> None: - if "speed_factor" in value.keys(): + if "speed_factor" in value: self.gcode_move_update[str, float].emit( "speed_factor", value["speed_factor"] ) - if "speed" in value.keys(): + if "speed" in value: self.gcode_move_update[str, float].emit("speed", value["speed"]) - if "extrude_factor" in value.keys(): + if "extrude_factor" in value: self.gcode_move_update[str, float].emit( "extruder_factor", value["extrude_factor"] ) - if "absolute_coordinates" in value.keys(): + if "absolute_coordinates" in value: self.gcode_move_update[str, bool].emit( "absolute_coordinates", value["absolute_coordinates"] ) - if "absolute_extrude" in value.keys(): + if "absolute_extrude" in value: self.gcode_move_update[str, bool].emit( "absolute_extrude", value["absolute_extrude"] ) - if "homing_origin" in value.keys(): + if "homing_origin" in value: self.gcode_move_update[str, list].emit( "homing_origin", value["homing_origin"] ) - if "position" in value.keys(): + if "position" in value: self.gcode_move_update[str, list].emit("position", value["position"]) - if "gcode_position" in value.keys(): + if "gcode_position" in value: self.gcode_move_update[str, list].emit( "gcode_position", value["gcode_position"] ) def _toolhead_object_updated(self, values: dict, name: str = "toolhead") -> None: - if "homed_axes" in values.keys(): + if "homed_axes" in values: self.toolhead_update[str, str].emit("homed_axes", values["homed_axes"]) - if "print_time" in values.keys(): + if "print_time" in values: self.toolhead_update[str, float].emit("print_time", values["print_time"]) - if "estimated_print_time" in values.keys(): + if "estimated_print_time" in values: self.toolhead_update[str, float].emit( "estimated_print_time", values["estimated_print_time"] ) - if "extruder" in values.keys(): + if "extruder" in values: self.toolhead_update[str, str].emit("extruder", values["extruder"]) self.active_extruder_name = values["extruder"] - if "position" in values.keys(): + if "position" in values: self.toolhead_update[str, list].emit("position", values["position"]) - if "max_velocity" in values.keys(): + if "max_velocity" in values: self.toolhead_update[str, float].emit( "max_velocity", values["max_velocity"] ) - if "max_accel" in values.keys(): + if "max_accel" in values: self.toolhead_update[str, float].emit("max_accel", values["max_accel"]) - if "max_accel_to_decel" in values.keys(): + if "max_accel_to_decel" in values: self.toolhead_update[str, float].emit( "max_accel_to_decel", values["max_accel_to_decel"] ) - if "square_corner_velocity" in values.keys(): + if "square_corner_velocity" in values: self.toolhead_update[str, float].emit( "square_corner_velocity", values["square_corner_velocity"] ) @@ -390,76 +402,79 @@ def _toolhead_object_updated(self, values: dict, name: str = "toolhead") -> None def _extruder_object_updated( self, value: dict, extruder_name: str = "extruder" ) -> None: - if "temperature" in value.keys(): + """Handle extruder object updates and emit corresponding signals.""" + if extruder_name not in self.heaters_object: + self.heaters_object[extruder_name] = {} + if "temperature" in value: self.extruder_update.emit( extruder_name, "temperature", value["temperature"] ) self.heaters_object[f"{extruder_name}"]["actual_temperature"] = value[ "temperature" ] - if "target" in value.keys(): + if "target" in value: self.extruder_update.emit(extruder_name, "target", value["target"]) self.heaters_object[f"{extruder_name}"]["target_temperature"] = value[ "target" ] - if "can_extrude" in value.keys(): + if "can_extrude" in value: self.heaters_object[f"{extruder_name}"]["can_extrude"] = value[ "can_extrude" ] - if "power" in value.keys(): + if "power" in value: self.extruder_update.emit(extruder_name, "power", value["power"]) - if "pressure_advance" in value.keys(): + if "pressure_advance" in value: self.extruder_update.emit( extruder_name, "pressure_advance", value["pressure_advance"] ) - if "smooth_time" in value.keys(): + if "smooth_time" in value: self.extruder_update.emit( extruder_name, "smooth_time", value["smooth_time"] ) - if "can_extrude" in value.keys(): + if "can_extrude" in value: pass def _heater_bed_object_updated( self, value: dict, heater_name: str = "heater_bed" ) -> None: - if "temperature" in value.keys(): + if "temperature" in value: self.heater_bed_update.emit( heater_name, "temperature", value["temperature"] ) self.heaters_object["bed"]["actual_temperature"] = value["temperature"] - if "target" in value.keys(): + if "target" in value: self.heater_bed_update.emit(heater_name, "target", value["target"]) self.heaters_object["bed"]["target_temperature"] = value["target"] - if "power" in value.keys(): + if "power" in value: self.heater_bed_update.emit(heater_name, "power", value["power"]) def _chamber_object_updated(self, value: dict, heater_name: str = "chamber"): self.has_chamber = True def _fan_object_updated(self, value: dict, fan_name: str = "fan") -> None: - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit("fan", "speed", value["speed"]) - if "rpm" in value.keys(): + if "rpm" in value: self.fan_update[str, str, int].emit("fan", "rpm", value["rpm"]) def _fan_generic_object_updated(self, value: dict, fan_name: str = "") -> None: _names = ["fan_generic", fan_name] object_name = " ".join(_names) - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit( object_name, "speed", value.get("speed") ) - if "rpm" in value.keys(): + if "rpm" in value: self.fan_update[str, str, int].emit(object_name, "rpm", value.get("rpm")) def _controller_fan_object_updated(self, value: dict, fan_name: str = "") -> None: _names = ["controller_fan", fan_name] object_name = " ".join(_names) - if "speed" in value.keys(): + if "speed" in value: self.fan_update[str, str, float].emit( object_name, "speed", value.get("speed") ) - elif "rpm" in value.keys(): + elif "rpm" in value: self.fan_update[str, str, int].emit(object_name, "rpm", value.get("rpm")) def _heater_fan_object_updated(self, value: dict, fan_name: str = "") -> None: @@ -469,20 +484,20 @@ def _heater_fan_object_updated(self, value: dict, fan_name: str = "") -> None: # object_name = " ".join(_names) def _z_tilt_object_updated(self, value: dict, name: str = "") -> None: - if value["applied"]: + if value.get("applied"): self.z_tilt_update[str, bool].emit("applied", value["applied"]) def _idle_timeout_object_updated( self, value: dict, name: str = "idle_timeout" ) -> None: - if "state" in value.keys(): + if "state" in value: self.idle_timeout_update[str, str].emit("state", value["state"]) if "printing" in value["state"]: self.printer_busy = True elif self.printing_state != "printing" and value["state"] != "printing": # It's also busy if the printer is printing or paused self.printer_busy = False - if "printing_time" in value.keys(): + if "printing_time" in value: self.idle_timeout_update[str, float].emit( "printing_time", value["printing_time"] ) @@ -490,11 +505,11 @@ def _idle_timeout_object_updated( def _virtual_sdcard_object_updated( self, values: dict, name: str = "virtual_sdcard" ) -> None: - if "progress" in values.keys(): + if "progress" in values: self.virtual_sdcard_update[str, float].emit("progress", values["progress"]) - if "is_active" in values.keys(): + if "is_active" in values: self.virtual_sdcard_update[str, bool].emit("is_active", values["is_active"]) - if "file_position" in values.keys(): + if "file_position" in values: self.virtual_sdcard_update[str, float].emit( "file_position", float(values["file_position"]) ) @@ -508,6 +523,8 @@ def send_print_event(self, event: str): Raises: TypeError: Thrown when QApplication is None """ + if not event: + return _print_state_upper = event[0].upper() _print_state_call = f"{_print_state_upper}{event[1:]}" if hasattr(events, f"Print{_print_state_call}"): @@ -516,15 +533,16 @@ def send_print_event(self, event: str): _print_state_call, f"Print{_print_state_call}", ) - _event_callback: QtCore.QEvent = getattr( - events, f"Print{_print_state_call}" - ) + _event_callback = getattr(events, f"Print{_print_state_call}") if callable(_event_callback): try: instance = QtWidgets.QApplication.instance() - if instance: - instance.postEvent(self.window(), _event_callback) - else: + # Printer is a QObject, not QWidget — use parent() + # to reach the MainWindow (which has the event handler). + target = self.parent() + if instance and target: + instance.postEvent(target, _event_callback()) + elif not instance: raise TypeError("QApplication.instance expected non None value") except Exception as e: logger.info( @@ -534,24 +552,24 @@ def send_print_event(self, event: str): def _print_stats_object_updated( self, values: dict, name: str = "print_stats" ) -> None: - if "filename" in values.keys(): + if "filename" in values: self.print_stats_update[str, str].emit("filename", values["filename"]) self.print_file_loaded = True - if "total_duration" in values.keys(): + if "total_duration" in values: self.print_stats_update[str, float].emit( "total_duration", values["total_duration"] ) - if "print_duration" in values.keys(): + if "print_duration" in values: self.print_stats_update[str, float].emit( "print_duration", values["print_duration"] ) - if "filament_used" in values.keys(): + if "filament_used" in values: self.print_stats_update[str, float].emit( "filament_used", values["filament_used"] ) - if "state" in values.keys(): + if "state" in values: self.print_stats_update[str, str].emit("state", values["state"]) - self.printing_state = values.get("state", None) + self.printing_state = values.get("state") or "" if not self.printing_state: return self.send_print_event(self.printing_state) @@ -562,33 +580,33 @@ def _print_stats_object_updated( self.print_file_loaded = True if values["state"] == "printing" or values["state"] == "pause": self.printing = True - if "message" in values.keys(): + if "message" in values: self.print_stats_update[str, str].emit("message", values["message"]) - if "info" in values.keys(): + if "info" in values: self.print_stats_update[str, dict].emit("info", values["info"]) def _display_status_object_updated( self, values: dict, name: str = "display_status" ) -> None: - if "message" in values.keys(): + if "message" in values: self.display_update[str, str].emit("message", values["message"]) - if "progress" in values.keys(): + if "progress" in values: self.display_update[str, float].emit("progress", values["progress"]) def _temperature_sensor_object_updated( self, values: dict, temperature_sensor_name: str ) -> None: - if "temperature" in values.keys(): + if "temperature" in values: self.temperature_sensor_update.emit( temperature_sensor_name, "temperature", values["temperature"] ) - if "measured_min_temp" in values.keys(): + if "measured_min_temp" in values: self.temperature_sensor_update.emit( temperature_sensor_name, "measured_min_temp", values["measured_min_temp"], ) - if "measured_max_temp" in values.keys(): + if "measured_max_temp" in values: self.temperature_sensor_update.emit( temperature_sensor_name, "measured_max_temp", @@ -600,19 +618,19 @@ def _temperature_fan_object_updated( ) -> None: _names = ["temperature_fan", temperature_fan_name] object_name = " ".join(_names) - if "speed" in values.keys(): + if "speed" in values: self.temperature_fan_update.emit( object_name, "speed", values["speed"], ) - if "temperature" in values.keys(): + if "temperature" in values: self.temperature_fan_update.emit( object_name, "temperature", values["temperature"], ) - if "target" in values.keys(): + if "target" in values: self.temperature_fan_update.emit( object_name, "target", @@ -622,14 +640,14 @@ def _temperature_fan_object_updated( def _filament_switch_sensor_object_updated( self, values: dict, filament_switch_name: str ) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_switch_sensor_update.emit( filament_switch_name, "filament_detected", values["filament_detected"], ) self.available_filament_sensors.update({f"{filament_switch_name}": values}) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_switch_sensor_update.emit( filament_switch_name, "enabled", values["enabled"] ) @@ -638,7 +656,7 @@ def _filament_switch_sensor_object_updated( def _filament_motion_sensor_object_updated( self, values: dict, filament_motion_name: str ) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_motion_sensor_update.emit( filament_motion_name, "filament_detected", @@ -648,18 +666,18 @@ def _filament_motion_sensor_object_updated( {f"{filament_motion_name}": values["filament_detected"]} ) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_motion_sensor_update.emit( filament_motion_name, "enabled", values["enabled"] ) self.available_filament_sensors.update({f"{filament_motion_name}": values}) def _cutter_sensor_object_updated(self, values: dict, cutter_name: str) -> None: - if "filament_detected" in values.keys(): + if "filament_detected" in values: self.filament_switch_sensor_update.emit( cutter_name, "filament_detected", values["filament_detected"] ) - if "enabled" in values.keys(): + if "enabled" in values: self.filament_switch_sensor_update.emit( cutter_name, "enabled", values["enabled"] ) @@ -667,7 +685,7 @@ def _cutter_sensor_object_updated(self, values: dict, cutter_name: str) -> None: self.available_filament_sensors.update({f"{cutter_name}": values}) def _output_pin_object_updated(self, values: dict, output_pin_name: str) -> None: - if "value" in values.keys(): + if "value" in values: self.output_pin_update.emit(output_pin_name, "value", values["value"]) def _bed_mesh_object_updated(self, values: dict, name: str = "bed_mesh") -> None: @@ -684,17 +702,17 @@ def _configfile_object_updated( self, values: dict, name: str = "configfile" ) -> None: self.configfile.update(values) - if "config" in values.keys(): + if "config" in values: self.printer_config.emit(values["config"]) - if "settings" in values.keys(): + if "settings" in values: # TODO ... - if "save_config_pending" in values.keys(): + if "save_config_pending" in values: self.save_config_pending.emit() - if "save_config_pending_items" in values.keys(): + if "save_config_pending_items" in values: # TODO ... - if "warnings" in values.keys(): + if "warnings" in values: # TODO ... @@ -734,9 +752,9 @@ def _temperature_probe_object_updated(self, values: dict, name: str) -> None: # TODO: testing needed here idk if does work def _unload_filament_object_updated(self, values: dict, name: str) -> None: - if "state" in values.keys(): + if "state" in values: self.unload_filament_update[bool].emit(values["state"]) def _load_filament_object_updated(self, values: dict, name: str) -> None: - if "state" in values.keys(): + if "state" in values: self.load_filament_update[bool].emit(values["state"]) diff --git a/BlocksScreen/screensaver.py b/BlocksScreen/screensaver.py index de02ba02..20cda0cf 100644 --- a/BlocksScreen/screensaver.py +++ b/BlocksScreen/screensaver.py @@ -3,15 +3,22 @@ class ScreenSaver(QtCore.QObject): + """Screensaver that uses X11 DPMS to blank the display after inactivity.""" + timer = QtCore.QTimer() - dpms_off_timeout = helper_methods.get_dpms_timeouts().get("off_timeout") - dpms_suspend_timeout = helper_methods.get_dpms_timeouts().get("suspend_timeout") - dpms_standby_timeout = helper_methods.get_dpms_timeouts().get("standby_timeout") touch_blocked: bool = False + _dpms_available: bool = hasattr(helper_methods, "get_dpms_timeouts") def __init__(self, parent) -> None: super().__init__() + dpms_timeouts = ( + helper_methods.get_dpms_timeouts() if self._dpms_available else {} + ) + self.dpms_off_timeout = dpms_timeouts.get("off_timeout") + self.dpms_suspend_timeout = dpms_timeouts.get("suspend_timeout") + self.dpms_standby_timeout = dpms_timeouts.get("standby_timeout") + self.screensaver_config = parent.config.get_section( "screensaver", fallback=None ) @@ -29,9 +36,11 @@ def __init__(self, parent) -> None: self.timer.start() def eventFilter(self, object, event) -> bool: - """Filter touch events considering DPMS Screen state""" + """Filter touch events considering DPMS screen state.""" + if not self._dpms_available: + return False - if event.type() in ( # Block Touch Filter and Wake Touch Filter + if event.type() in ( QtCore.QEvent.Type.TouchBegin, QtCore.QEvent.Type.TouchUpdate, QtCore.QEvent.Type.TouchEnd, @@ -52,14 +61,15 @@ def eventFilter(self, object, event) -> bool: self.touch_blocked = False helper_methods.set_dpms_mode(helper_methods.DPMSState.ON) self.timer.start() - return True # filter out the event, block touch events on the application + return True else: self.timer.stop() self.timer.start() return False def check_dpms(self) -> None: - """Checks the X11 extension dpms for the status of the screen""" + """Blank the display via DPMS standby.""" self.touch_blocked = True - helper_methods.set_dpms_mode(helper_methods.DPMSState.STANDBY) + if self._dpms_available: + helper_methods.set_dpms_mode(helper_methods.DPMSState.STANDBY) self.timer.stop() From bc9f17cad44f825e88265c6e981e9f4ec09a1a77 Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Mon, 13 Apr 2026 18:19:51 +0100 Subject: [PATCH 2/3] test: update networkWindow QTimer patch path to QtCore.QTimer --- tests/network/test_network_ui.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/network/test_network_ui.py b/tests/network/test_network_ui.py index 466ec14a..a60a330a 100644 --- a/tests/network/test_network_ui.py +++ b/tests/network/test_network_ui.py @@ -693,7 +693,7 @@ def test_transient_mismatch_retries(self, win, qapp): message="not compatible with device", error_code="nm_error", ) - with patch("BlocksScreen.lib.panels.networkWindow.QTimer") as mock_timer: + with patch("BlocksScreen.lib.panels.networkWindow.QtCore.QTimer") as mock_timer: w._on_operation_complete(result) mock_timer.singleShot.assert_called_once() # Loading should still be visible — retry is pending @@ -740,7 +740,7 @@ def test_wifi_on_with_saved_networks_starts_connect(self, win): ) ] nm.saved_networks = saved - with patch("BlocksScreen.lib.panels.networkWindow.QTimer") as mock_timer: + with patch("BlocksScreen.lib.panels.networkWindow.QtCore.QTimer") as mock_timer: w._handle_wifi_toggle(True) mock_timer.singleShot.assert_called() assert w._pending_operation == PendingOperation.WIFI_ON From 965193ba0722b4c99b886386219a3d7b6f504b26 Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Mon, 13 Apr 2026 18:48:16 +0100 Subject: [PATCH 3/3] fix: add set_filament_state to SensorWidget; remove dead change_fil_sensor_state signal --- BlocksScreen/lib/panels/widgets/sensorWidget.py | 7 +++++++ BlocksScreen/lib/panels/widgets/sensorsPanel.py | 3 --- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/BlocksScreen/lib/panels/widgets/sensorWidget.py b/BlocksScreen/lib/panels/widgets/sensorWidget.py index e0ed9955..ea2e4959 100644 --- a/BlocksScreen/lib/panels/widgets/sensorWidget.py +++ b/BlocksScreen/lib/panels/widgets/sensorWidget.py @@ -104,6 +104,13 @@ def change_fil_sensor_state(self, state: FilamentState): self.filament_state = SensorWidget.FilamentState(not state.value) self.update() + def set_filament_state(self, state: FilamentState) -> None: + """Set filament state directly without inversion.""" + if not isinstance(state, SensorWidget.FilamentState): + return + self.filament_state = state + self.update() + def toggle_button_state(self, state: ToggleAnimatedButton.State) -> None: """Called when the Klipper firmware reports an update to the filament sensor state""" self.toggle_button.setDisabled(False) diff --git a/BlocksScreen/lib/panels/widgets/sensorsPanel.py b/BlocksScreen/lib/panels/widgets/sensorsPanel.py index 77fe5865..6ef78dba 100644 --- a/BlocksScreen/lib/panels/widgets/sensorsPanel.py +++ b/BlocksScreen/lib/panels/widgets/sensorsPanel.py @@ -11,9 +11,6 @@ class SensorsWindow(QtWidgets.QWidget): run_gcode_signal: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( str, name="run_gcode" ) - change_fil_sensor_state: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( - SensorWidget.FilamentState, name="change_fil_sensor_state" - ) request_back: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal( name="request_back" )