diff --git a/CHANGELOG.md b/CHANGELOG.md index e4cafdf..01146fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## v2.3.0 (2025-12-23) + +### Feat + +- **node**: enhance trigger handling with asyncio task management +- **tests**: add yappi profiling context manager and integrate into test cases +- **plugins**: add JSON representation method to InstalledModule + +### Fix + +- **files**: ensure temporary file cleanup on error in write_json_secure + +### Refactor + +- **node, plugins**: improve event handling and entry point validation +- **plugins**: enhance module handling and entry point loading +- **setup**: improve error logging and simplify entry point handling + ## v2.2.0 (2025-12-12) ### Feat diff --git a/pyproject.toml b/pyproject.toml index a3b6d9a..a56d0cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "funcnodes-core" -version = "2.2.0" +version = "2.3.0" description = "core package for funcnodes" authors = [{name = "Julian Kimmig", email = "julian.kimmig@linkdlab.de"}] diff --git a/src/funcnodes_core/_setup.py b/src/funcnodes_core/_setup.py index 9b06ada..d113522 100644 --- a/src/funcnodes_core/_setup.py +++ b/src/funcnodes_core/_setup.py @@ -1,7 +1,6 @@ from typing import Dict, Optional import gc from .config import update_render_options -from .lib import check_shelf from ._logging import FUNCNODES_LOGGER from .utils.plugins import get_installed_modules, InstalledModule, PLUGIN_FUNCTIONS @@ -10,8 +9,8 @@ def setup_module(mod_data: InstalledModule) -> Optional[InstalledModule]: gc.collect() entry_points = mod_data.entry_points mod = mod_data.module - if not mod: # funcnodes modules must have an module entry point - return None + # if not mod: # funcnodes modules must have an module entry point + # return None # first we try to register the plugin setup function as this might register other functions try: @@ -21,7 +20,7 @@ def setup_module(mod_data: InstalledModule) -> Optional[InstalledModule]: mod.FUNCNODES_PLUGIN_SETUP() entry_points["render_options"] = mod.FUNCNODES_PLUGIN_SETUP except Exception as e: - FUNCNODES_LOGGER.error("Error in plugin setup %s: %s" % (mod.__name__, e)) + FUNCNODES_LOGGER.error("Error in plugin setup %s: %s" % (mod_data.name, e)) # Then we call the plugin functions for pluginf in PLUGIN_FUNCTIONS.values(): @@ -29,7 +28,7 @@ def setup_module(mod_data: InstalledModule) -> Optional[InstalledModule]: pluginf(mod_data) except Exception as e: FUNCNODES_LOGGER.error( - "Error in setup_module plugin function %s: %s" % (mod.__name__, e) + "Error in setup_module plugin function %s: %s" % (mod_data.name, e) ) if "render_options" in entry_points: @@ -48,15 +47,15 @@ def setup_module(mod_data: InstalledModule) -> Optional[InstalledModule]: if hasattr(mod, sn): entry_points["shelf"] = getattr(mod, sn) break - if "shelf" in entry_points: - try: - entry_points["shelf"] = check_shelf( - entry_points["shelf"], parent_id=mod_data.name - ) - except ValueError as e: - FUNCNODES_LOGGER.error("Error in module %s: %s" % (mod.__name__, e)) - del entry_points["shelf"] - mod_data._is_setup = True + # if "shelf" in entry_points: + # try: + # entry_points["shelf"] = check_shelf( + # entry_points["shelf"], parent_id=mod_data.name + # ) + # except ValueError as e: + # FUNCNODES_LOGGER.error("Error in module %s: %s" % (mod_data.name, e)) + # del entry_points["shelf"] + mod_data._is_setup = mod is not None return mod_data diff --git a/src/funcnodes_core/node.py b/src/funcnodes_core/node.py index db8f962..315d8de 100644 --- a/src/funcnodes_core/node.py +++ b/src/funcnodes_core/node.py @@ -400,6 +400,7 @@ def __init__( self._outputs: List[NodeOutput] = [] self._outputs_dict: Optional[Dict[str, NodeOutput]] = None self._triggerstack: Optional[TriggerStack] = None + self._trigger_task: Optional[asyncio.Task] = None # flag whether the trigger has started but still not read the ios self._trigger_open = False self._requests_trigger = False @@ -724,6 +725,8 @@ def in_trigger(self): checked if the triggerstack is not None and if it is done or if the _trigger_open flag is set """ + if self._trigger_task is not None and self._trigger_task.done(): + self._trigger_task = None if self._triggerstack is not None: if self._triggerstack.done(): self._triggerstack = None @@ -1100,8 +1103,26 @@ async def await_trigger(self): @savemethod async def wait_for_trigger_finish(self): - if self.in_trigger: - await self.asynceventmanager.wait("triggerdone") + while self.in_trigger: + task = self._trigger_task + if task is not None: + try: + # Wait on the trigger task directly to avoid missing event pulses. + await asyncio.wait_for(asyncio.shield(task), timeout=0.5) + except asyncio.TimeoutError: + continue + else: + try: + # Fallback when no task reference is available. + await asyncio.wait_for( + self.asynceventmanager.wait("triggerdone"), timeout=0.5 + ) + except asyncio.TimeoutError: + continue + + def _clear_trigger_task(self, task: asyncio.Task) -> None: + if self._trigger_task is task: + self._trigger_task = None @savemethod async def await_until_complete(self): @@ -1145,7 +1166,10 @@ def trigger(self, triggerstack: Optional[TriggerStack] = None) -> TriggerStack: triggerlogger.debug("triggering %s", self) self._trigger_open = True self._triggerstack = triggerstack - self._triggerstack.append(asyncio.create_task(self())) + task = asyncio.create_task(self()) + self._trigger_task = task + task.add_done_callback(self._clear_trigger_task) + self._triggerstack.append(task) self._requests_trigger = False return self._triggerstack diff --git a/src/funcnodes_core/utils/files.py b/src/funcnodes_core/utils/files.py index 108c728..5fcd014 100644 --- a/src/funcnodes_core/utils/files.py +++ b/src/funcnodes_core/utils/files.py @@ -22,6 +22,7 @@ def write_json_secure(data, filepath: Union[Path, str], cls=None, **kwargs): cls = cls or JSONEncoder # Create a temporary file in the same directory + temp_file_path = None try: with tempfile.NamedTemporaryFile( "w+", dir=directory, delete=False, encoding="utf-8" @@ -33,8 +34,9 @@ def write_json_secure(data, filepath: Union[Path, str], cls=None, **kwargs): os.fsync(temp_file.fileno()) # Force writing to disk for durability except Exception as e: # Clean up the temporary file in case of an error - if os.path.exists(temp_file_path): - os.remove(temp_file_path) + if temp_file_path: + if os.path.exists(temp_file_path): + os.remove(temp_file_path) raise e # Atomically replace the target file with the temporary file diff --git a/src/funcnodes_core/utils/plugins.py b/src/funcnodes_core/utils/plugins.py index bf7b5a4..015aaf6 100644 --- a/src/funcnodes_core/utils/plugins.py +++ b/src/funcnodes_core/utils/plugins.py @@ -1,6 +1,11 @@ from typing import Dict, Optional from collections.abc import Callable -from importlib.metadata import entry_points, Distribution, PackageNotFoundError +from importlib.metadata import ( + EntryPoint, + entry_points, + Distribution, + PackageNotFoundError, +) from importlib import reload import sys from .._logging import FUNCNODES_LOGGER @@ -44,13 +49,15 @@ def reload_plugin_module(module_name: str): def assert_entry_points_loaded(modulde_data: InstalledModule): for ep in entry_points(group="funcnodes.module", module=modulde_data.name): - if ep.name in modulde_data.entry_points: + if ep.name in modulde_data.entry_points and not isinstance( + modulde_data.entry_points[ep.name], EntryPoint + ): continue try: loaded = ep.load() modulde_data.entry_points[ep.name] = loaded if ep.name == "module": - modulde_data.module = loaded + modulde_data.set_module(loaded) except Exception as exc: FUNCNODES_LOGGER.exception(f"Failed to load entry point {ep.name}: {exc}") @@ -106,24 +113,45 @@ def _lazydist() -> Distribution: return modulde_data +def setup_plugin_module(module_name: str) -> Optional[InstalledModule]: + modulde_data = InstalledModule( + name=module_name, + entry_points={}, + module=None, # module not directly added since only modules with a module entry point are relevant + ) + + for ep in entry_points(group="funcnodes.module", module=modulde_data.name): + if ep.name in modulde_data.entry_points: + continue + modulde_data.entry_points[ep.name] = ep + + return modulde_data + + def get_installed_modules( named_objects: Optional[Dict[str, InstalledModule]] = None, ) -> Dict[str, InstalledModule]: if named_objects is None: named_objects: Dict[str, InstalledModule] = {} - modules = set() - for ep in entry_points(group="funcnodes.module"): module_name = ep.module - modules.add(module_name) - for module_name in modules: - if module_name not in named_objects: + if module_name in named_objects: + continue + # insmod = setup_plugin_module(module_name) + # if not insmod: + # continue + # named_objects[module_name] = insmod + + # old code + if module_name in sys.modules: named_objects[module_name] = reload_plugin_module(module_name) + else: + named_objects[module_name] = setup_plugin_module(module_name) modulde_data = named_objects[module_name] - modulde_data = assert_entry_points_loaded(modulde_data) - modulde_data = assert_module_metadata(modulde_data) + for module_name, modulde_data in named_objects.items(): + modulde_data = assert_module_metadata(modulde_data) return named_objects diff --git a/src/funcnodes_core/utils/plugins_types.py b/src/funcnodes_core/utils/plugins_types.py index eef270c..d8dc653 100644 --- a/src/funcnodes_core/utils/plugins_types.py +++ b/src/funcnodes_core/utils/plugins_types.py @@ -1,5 +1,6 @@ from typing import Dict, Any, Optional, TypedDict, List from dataclasses import dataclass, field +from importlib.metadata import EntryPoint class RenderOptions(TypedDict, total=False): @@ -27,6 +28,29 @@ class BasePlugin(TypedDict): module: str +class _LazyEntryDict(dict): + def __getitem__(self, name: str) -> Any: + value = super().__getitem__(name) + if isinstance(value, EntryPoint): + value = value.load() + self[name] = value + if hasattr(self, "installed_module"): + installed_module = getattr(self, "installed_module") + if installed_module.module is None: + if name == "module": + installed_module.set_module(value) + else: + module = self.get("module", None) + if module is not None: + installed_module.set_module(module) + return value + + def get(self, name: str, default: Any = None) -> Any: + if name not in self: + return default + return self[name] + + @dataclass class InstalledModule: """ @@ -40,12 +64,18 @@ class InstalledModule: name: str module: Any description: Optional[str] = None - entry_points: Dict[str, Any] = field(default_factory=dict) + entry_points: Dict[str, Any] = field(default_factory=_LazyEntryDict) plugins: List[BasePlugin] = field(default_factory=list) render_options: Optional[RenderOptions] = None version: Optional[str] = None _is_setup = False + # make sure that entrz points is a _LazyEntryDict + def __post_init__(self): + if not isinstance(self.entry_points, _LazyEntryDict): + self.entry_points = _LazyEntryDict(self.entry_points) + self.entry_points.installed_module = self + @property def rep_dict(self) -> dict[str, Any]: return { @@ -62,3 +92,18 @@ def __repr__(self) -> str: def __str__(self) -> str: return self.__repr__() + + def set_module(self, module: Any): + if self.module is not None: + if self.module != module: + raise ValueError( + f"Module {self.name} already has a module {self.module} and cannot be set to {module}" + ) + self.module = module + if not self._is_setup: + from .._setup import setup_module + + setup_module(self) + + def _repr_json_(self) -> dict[str, Any]: + return self.rep_dict diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1a8fe75 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,39 @@ +import pathlib +import os +import pytest + +try: + import yappi +except ImportError: + yappi = None + + +class _yappicontext: + def __init__(self, file): + base_dir = pathlib.Path( + os.environ.get("TEST_OUTPUT_DIR", "testouts") + ).absolute() + if not base_dir.exists(): + base_dir.mkdir(parents=True, exist_ok=True) + self.file = str(base_dir / file) + + def __enter__(self): + if yappi is not None: + yappi.set_clock_type("WALL") + yappi.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + if yappi is not None: + yappi.stop() + yappi.get_func_stats().save(self.file, "pstat") + yappi.clear_stats() + + +@pytest.fixture +def yappicontext_class(): + return _yappicontext + + +@pytest.fixture +def yappicontext(yappicontext_class, request): + return yappicontext_class(request.node.name + ".pstat") diff --git a/tests/test_nodeutils.py b/tests/test_nodeutils.py index 2bf5e63..f137051 100644 --- a/tests/test_nodeutils.py +++ b/tests/test_nodeutils.py @@ -8,6 +8,7 @@ run_until_complete, ) from funcnodes_core.nodemaker import NodeDecorator +from funcnodes_core.eventmanager import AsyncEventManager import funcnodes_core as fn @@ -37,6 +38,26 @@ async def func(self, ip1, ip2): self.outputs["op1"].value += 1 +class SlowClearEventManager(AsyncEventManager): + def __init__(self, obj): + super().__init__(obj) + self.triggerdone_cleared = asyncio.Event() + + async def set_and_clear(self, event: str, delta: float = 0) -> None: + await super().set_and_clear(event, delta=delta) + if event == "triggerdone": + self.triggerdone_cleared.set() + await asyncio.sleep(0.05) + + +class SlowEventTKNode(TKNode): + node_id = "slow_event_tknode" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.asynceventmanager = SlowClearEventManager(self) + + @dataclass class NodeChain: node1: fn.Node @@ -181,3 +202,22 @@ async def test_trigger_fast(): tw1 = te1 - ts1 assert tw1 < 0.5 assert node.outputs["op1"].value == 101 + + +@funcnodes_test +async def test_wait_for_trigger_finish_avoids_triggerdone_race(): + node = SlowEventTKNode() + node.pretrigger_delay = 0.0 + node.inputs["ip1"].value = 1 + node.inputs["ip2"].value = 2 + node.request_trigger() + + await node.asynceventmanager.triggerdone_cleared.wait() + assert node.in_trigger + + ts = time.perf_counter() + await node.wait_for_trigger_finish() + tw = time.perf_counter() - ts + + assert tw < 0.2 + assert node.outputs["op1"].value == 1 diff --git a/tests/test_triggering.py b/tests/test_triggering.py index 7d28278..cfab1dd 100644 --- a/tests/test_triggering.py +++ b/tests/test_triggering.py @@ -1,39 +1,11 @@ import time -import os -import pathlib import funcnodes_core as fn from pytest_funcnodes import funcnodes_test -try: - import yappi -except ImportError: - yappi = None - - -class yappicontext: - def __init__(self, file): - base_dir = pathlib.Path( - os.environ.get("TEST_OUTPUT_DIR", "testouts") - ).absolute() - if not base_dir.exists(): - base_dir.mkdir(parents=True, exist_ok=True) - self.file = str(base_dir / file) - - def __enter__(self): - if yappi is not None: - yappi.set_clock_type("WALL") - yappi.start() - - def __exit__(self, exc_type, exc_val, exc_tb): - if yappi is not None: - yappi.stop() - yappi.get_func_stats().save(self.file, "pstat") - yappi.clear_stats() - @funcnodes_test -async def test_triggerspeeds(): +async def test_triggerspeeds(yappicontext_class): @fn.NodeDecorator("TestTriggerSpeed test_triggerspeeds") async def _add_one(input: int) -> int: return input + 1 # a very simple and fast operation @@ -43,13 +15,13 @@ async def _a_add_one(input: int) -> int: node = _add_one(pretrigger_delay=0.0) - with yappicontext("test_triggerspeeds_directfunc.pstat"): + with yappicontext_class("test_triggerspeeds_directfunc.pstat"): t = time.perf_counter() cound_directfunc = 0 while time.perf_counter() - t < 1: cound_directfunc = await node.func(cound_directfunc) - with yappicontext("test_triggerspeeds_simplefunc.pstat"): + with yappicontext_class("test_triggerspeeds_simplefunc.pstat"): t = time.perf_counter() count_simplefunc = 0 while time.perf_counter() - t < 1: @@ -78,7 +50,7 @@ def increase_called_triggerfast(*args, **kwargs): node.on("triggerstart", increase_called_trigger) node.on("triggerfast", increase_called_triggerfast) - with yappicontext("test_triggerspeeds_direct_called.pstat"): + with yappicontext_class("test_triggerspeeds_direct_called.pstat"): while time.perf_counter() - t < 1: await node() node.inputs["input"].value = node.outputs["out"].value @@ -93,7 +65,7 @@ def increase_called_triggerfast(*args, **kwargs): trigger_direct_called > cound_directfunc / 5 ) # overhead due to all the trigger set and clear - with yappicontext("test_triggerspeeds_called_await.pstat"): + with yappicontext_class("test_triggerspeeds_called_await.pstat"): node.inputs["input"].value = 1 t = time.perf_counter() diff --git a/tests/test_utils/test_plugins.py b/tests/test_utils/test_plugins.py index c71f62c..c320d0c 100644 --- a/tests/test_utils/test_plugins.py +++ b/tests/test_utils/test_plugins.py @@ -245,17 +245,24 @@ def test_get_installed_modules_deduplicates_and_updates(monkeypatch): """get_installed_modules should reload missing entries once and enrich data.""" class GroupEntryPoint: - def __init__(self, module): + def __init__(self, module, name): self.module = module + self.name = name group_eps = [ - GroupEntryPoint("alpha"), - GroupEntryPoint("beta"), - GroupEntryPoint("alpha"), # duplicate to ensure deduplication + GroupEntryPoint("alpha", "alpha"), + GroupEntryPoint("beta", "beta"), + GroupEntryPoint("alpha", "alpha2"), # duplicate to ensure deduplication ] def fake_entry_points(**kwargs): - assert kwargs == {"group": "funcnodes.module"} + assert kwargs["group"] == "funcnodes.module" + if "module" in kwargs: + if kwargs["module"] == "alpha": + return [group_eps[0]] + if kwargs["module"] == "beta": + return [group_eps[1]] + raise ValueError(f"Unknown module: {kwargs['module']}") return group_eps reloaded = [] @@ -276,19 +283,14 @@ def fake_assert_module_metadata(data): monkeypatch.setattr(plugins_module, "entry_points", fake_entry_points) monkeypatch.setattr(plugins_module, "reload_plugin_module", fake_reload) - monkeypatch.setattr( - plugins_module, "assert_entry_points_loaded", fake_assert_entry_points_loaded - ) + monkeypatch.setattr( plugins_module, "assert_module_metadata", fake_assert_module_metadata ) result = plugins_module.get_installed_modules(named_objects={"beta": existing}) - assert reloaded == ["alpha"] assert set(result.keys()) == {"alpha", "beta"} - assert result["alpha"].entry_points["loaded"] is True - assert result["beta"].entry_points["loaded"] is True assert result["beta"].description == "desc:beta" @@ -348,6 +350,7 @@ def fake_from_name(cls, name): assert module_name in result assert reload_calls == [module_name] + result = plugins_module.get_installed_modules() installed = result[module_name] assert installed.module is module_obj assert installed.entry_points["other"] is not None diff --git a/uv.lock b/uv.lock index 48a18dd..2f13528 100644 --- a/uv.lock +++ b/uv.lock @@ -457,7 +457,7 @@ wheels = [ [[package]] name = "funcnodes-core" -version = "2.2.0" +version = "2.3.0" source = { editable = "." } dependencies = [ { name = "dill" },