From 4c79114dc6d5b15e63e883ced64a7236f8d3dc2a Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 25 Mar 2026 11:53:36 +0100 Subject: [PATCH 1/6] fix(engine): recover from BrokenProcessPool by recreating the morph-kgc pool --- wordlift_sdk/structured_data/engine.py | 89 ++++++++++++++++---------- 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index a1077f3..24fb314 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -10,6 +10,7 @@ import os import re from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from dataclasses import dataclass from importlib import resources from pathlib import Path @@ -54,34 +55,39 @@ def _morph_kgc_worker(config: str, submit_time: float) -> tuple[str, int]: # layer receive the timing as a regular return value. _morph_kgc_tls = threading.local() +logger = logging.getLogger(__name__) + # Lazy process pool — created on first use in the main process only. # Worker subprocesses import this module but never call _get_morph_kgc_pool(), # so they do NOT create their own pools (no recursive process explosion). _morph_kgc_pool: ProcessPoolExecutor | None = None +_morph_kgc_pool_max_workers: int = 0 +_morph_kgc_pool_lock = threading.Lock() def init_morph_kgc_pool(max_workers: int) -> None: """Pre-create the morph_kgc process pool with a specific worker count. Call once from the protocol __init__ before any mapping work starts. - Subsequent calls are no-ops (pool is only created once). + Subsequent calls are no-ops (pool is only created once per worker count). """ - global _morph_kgc_pool - if _morph_kgc_pool is not None: - return - ctx = multiprocessing.get_context("spawn") - _morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) + global _morph_kgc_pool, _morph_kgc_pool_max_workers + with _morph_kgc_pool_lock: + _morph_kgc_pool_max_workers = max_workers + if _morph_kgc_pool is not None: + return + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) def _get_morph_kgc_pool() -> ProcessPoolExecutor: - global _morph_kgc_pool - if _morph_kgc_pool is None: - # Fallback if init_morph_kgc_pool was never called. - ctx = multiprocessing.get_context("spawn") - _morph_kgc_pool = ProcessPoolExecutor( - max_workers=os.cpu_count() or 4, - mp_context=ctx, - ) - return _morph_kgc_pool + global _morph_kgc_pool, _morph_kgc_pool_max_workers + with _morph_kgc_pool_lock: + if _morph_kgc_pool is None: + # Fallback if init_morph_kgc_pool was never called, or after a reset. + workers = _morph_kgc_pool_max_workers or (os.cpu_count() or 4) + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor(max_workers=workers, mp_context=ctx) + return _morph_kgc_pool _SCHEMA_BASE = "https://schema.org" @@ -1422,25 +1428,40 @@ def _materialize_graph(mapping_path: Path) -> Graph: "[DataSource1]\n" f"mappings = {mapping_path}\n" ) - try: - # Submit to subprocess pool — each worker has isolated pyparsing state, - # so calls are genuinely parallel across CPU cores with no lock needed. - # .result() blocks the calling thread (not the asyncio event loop). - ntriples, queue_wait_ms = ( - _get_morph_kgc_pool() - .submit(_morph_kgc_worker, config, _time.time()) - .result() - ) - # Store wait time in thread-local so protocol.py can read it without - # changing the return type of this function. - _morph_kgc_tls.mapping_wait_ms = queue_wait_ms - graph = Graph() - graph.parse(data=ntriples, format="nt") - return graph - except RuntimeError: - raise - except Exception as exc: - raise _normalize_materialization_error(exc) from exc + for attempt in range(2): + pool = _get_morph_kgc_pool() + try: + # Submit to subprocess pool — each worker has isolated pyparsing state, + # so calls are genuinely parallel across CPU cores with no lock needed. + # .result() blocks the calling thread (not the asyncio event loop). + ntriples, queue_wait_ms = pool.submit( + _morph_kgc_worker, config, _time.time() + ).result() + # Store wait time in thread-local so protocol.py can read it without + # changing the return type of this function. + _morph_kgc_tls.mapping_wait_ms = queue_wait_ms + graph = Graph() + graph.parse(data=ntriples, format="nt") + return graph + except BrokenProcessPool: + logger.warning( + "morph-kgc process pool broken (attempt %d/2), recreating.", + attempt + 1, + ) + global _morph_kgc_pool + with _morph_kgc_pool_lock: + # Only reset if this is still the same broken pool instance, + # preventing a race where multiple threads all try to reset it. + if _morph_kgc_pool is pool: + _morph_kgc_pool = None + if attempt >= 1: + raise _normalize_materialization_error( + RuntimeError("morph-kgc process pool broke twice in a row") + ) from None + except RuntimeError: + raise + except Exception as exc: + raise _normalize_materialization_error(exc) from exc def materialize_yarrrml( From 0e07458193041989930dc8187d2f0ed7557dd5a6 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 25 Mar 2026 16:40:22 +0100 Subject: [PATCH 2/6] test(engine): pool recovery strategy --- ...structured_data_materialization_generic.py | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/tests/test_structured_data_materialization_generic.py b/tests/test_structured_data_materialization_generic.py index ee8269d..4104ffb 100644 --- a/tests/test_structured_data_materialization_generic.py +++ b/tests/test_structured_data_materialization_generic.py @@ -854,3 +854,155 @@ class _Response: payload = str(jsonld) assert "__ID__" not in payload assert runtime_id in payload + + +# --------------------------------------------------------------------------- +# morph-kgc process pool: BrokenProcessPool recovery and pool sizing +# --------------------------------------------------------------------------- + + +def _make_ntriples_pool(ntriples: str): + """Return a fake pool whose submit() returns (ntriples, 0).""" + + class _Future: + def result(self): + return ntriples, 0 + + class _Pool: + def submit(self, fn, *args, **kwargs): + return _Future() + + return _Pool() + + +def _make_broken_pool(): + """Return a fake pool whose submit() raises BrokenProcessPool.""" + from concurrent.futures.process import BrokenProcessPool + + class _Future: + def result(self): + raise BrokenProcessPool("simulated crash") + + class _Pool: + def submit(self, fn, *args, **kwargs): + return _Future() + + return _Pool() + + +def test_broken_process_pool_retries_and_recovers( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """First call raises BrokenProcessPool; second succeeds with a fresh pool.""" + import wordlift_sdk.structured_data.engine as _engine + from rdflib import Graph + + xhtml_path = tmp_path / "page.xhtml" + xhtml_path.write_text("") + + good_graph = Graph() + ntriples = good_graph.serialize(format="nt") + + broken = _make_broken_pool() + good = _make_ntriples_pool(ntriples) + pools = iter([broken, good]) + + monkeypatch.setattr(_engine, "_morph_kgc_pool", None) + monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 0) + monkeypatch.setattr(_engine, "_get_morph_kgc_pool", lambda: next(pools)) + + mapping = """ +prefixes: + schema: 'https://schema.org/' +mappings: + page: + sources: + - [__XHTML__~xpath, '/'] + s: https://example.com/page~iri + po: + - [a, 'schema:WebPage'] +""" + from wordlift_sdk.structured_data.engine import materialize_yarrrml_jsonld + + result = materialize_yarrrml_jsonld( + mapping, + xhtml_path=xhtml_path, + workdir=tmp_path / "work", + ) + assert isinstance(result, (dict, list)) + + +def test_broken_process_pool_twice_raises( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Two consecutive BrokenProcessPool errors must raise RuntimeError.""" + import wordlift_sdk.structured_data.engine as _engine + + xhtml_path = tmp_path / "page.xhtml" + xhtml_path.write_text("") + + pools = iter([_make_broken_pool(), _make_broken_pool()]) + + monkeypatch.setattr(_engine, "_morph_kgc_pool", None) + monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 0) + monkeypatch.setattr(_engine, "_get_morph_kgc_pool", lambda: next(pools)) + + mapping = """ +prefixes: + schema: 'https://schema.org/' +mappings: + page: + sources: + - [__XHTML__~xpath, '/'] + s: https://example.com/page~iri +""" + from wordlift_sdk.structured_data.engine import materialize_yarrrml_jsonld + + with pytest.raises(RuntimeError, match="broke twice"): + materialize_yarrrml_jsonld( + mapping, + xhtml_path=xhtml_path, + workdir=tmp_path / "work", + ) + + +def test_init_morph_kgc_pool_is_noop_when_pool_exists( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """init_morph_kgc_pool must not replace an already-existing pool.""" + import wordlift_sdk.structured_data.engine as _engine + + sentinel = object() + monkeypatch.setattr(_engine, "_morph_kgc_pool", sentinel) + monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 2) + + _engine.init_morph_kgc_pool(8) + + assert _engine._morph_kgc_pool is sentinel + + +def test_get_morph_kgc_pool_reuses_max_workers_after_reset( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """After a pool reset, _get_morph_kgc_pool recreates with the stored worker count.""" + import wordlift_sdk.structured_data.engine as _engine + + created_with: list[int] = [] + + class _FakePool: + def __init__(self, max_workers: int, **_kwargs): + created_with.append(max_workers) + + monkeypatch.setattr(_engine, "_morph_kgc_pool", None) + monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 6) + monkeypatch.setattr( + _engine, + "ProcessPoolExecutor", + lambda max_workers, **kw: _FakePool(max_workers, **kw), + ) + + pool = _engine._get_morph_kgc_pool() + assert isinstance(pool, _FakePool) + assert created_with == [6] From 87209ffd36cf1df82a9325514e65aa9bbe4bf2fa Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 25 Mar 2026 12:43:35 +0100 Subject: [PATCH 3/6] fix(protocol): use correct `mapping_pool_size` variable for ThreadPoolExecutor --- wordlift_sdk/kg_build/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index d05b3e2..acdb301 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -277,7 +277,7 @@ def _init_mapping_service( # the asyncio event loop. The thread itself blocks on the morph_kgc # ProcessPoolExecutor slot, leaving the event loop free for I/O. self._mapping_executor = ThreadPoolExecutor( - max_workers=pool_size, thread_name_prefix="worai_ml" + max_workers=mapping_pool_size, thread_name_prefix="worai_ml" ) def _init_shacl_validator(self, settings: dict, pool_size: int) -> None: From bfba01f881a83f1c9d26b15d06307bdbf0a87022 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 25 Mar 2026 16:27:08 +0100 Subject: [PATCH 4/6] fix(protocol): cap postprocessor pool at 75% of pool_size (min 2) --- wordlift_sdk/kg_build/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index acdb301..c8edeb2 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -203,7 +203,7 @@ def _init_postprocessor_service( settings, "postprocessor_pool_size", "POSTPROCESSOR_POOL_SIZE", - pool_size, + max(2, int(pool_size * 3 / 4)), ) ) logger.info( From e5eb8d11d74e2fecca26897965a080233ffd1833 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Thu, 26 Mar 2026 08:50:39 +0100 Subject: [PATCH 5/6] chore(logging): log url-fetch and postprocessor concurrency --- wordlift_sdk/container/application_container.py | 6 ++++++ wordlift_sdk/kg_build/container.py | 3 +++ wordlift_sdk/kg_build/protocol.py | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/wordlift_sdk/container/application_container.py b/wordlift_sdk/container/application_container.py index 21f9b9e..53282a9 100644 --- a/wordlift_sdk/container/application_container.py +++ b/wordlift_sdk/container/application_container.py @@ -1,3 +1,4 @@ +import logging from os import cpu_count from wordlift_client import Configuration, AccountInfo, WebPageImportFetchOptions @@ -22,6 +23,8 @@ from ..workflow.url_handler.search_console_url_handler import SearchConsoleUrlHandler from ..workflow.url_handler.url_handler import UrlHandler +logger = logging.getLogger(__name__) + class ApplicationContainer: _api_url: str @@ -130,6 +133,9 @@ async def create_kg_import_workflow(self) -> KgImportWorkflow: concurrency = self._configuration_provider.get_value( "CONCURRENCY", min(cpu_count(), 4) ) + logger.info( + "URL fetch concurrency: %d", concurrency + ) return KgImportWorkflow( context=await self.get_context(), url_source=await self.create_new_or_changed_source(), diff --git a/wordlift_sdk/kg_build/container.py b/wordlift_sdk/kg_build/container.py index d88e055..701646f 100644 --- a/wordlift_sdk/kg_build/container.py +++ b/wordlift_sdk/kg_build/container.py @@ -54,6 +54,9 @@ async def create_web_page_scrape_url_handler( async def create_kg_import_workflow(self) -> KgImportWorkflow: concurrency = self._configuration_provider.get_value("CONCURRENCY", 2) + logger.info( + "URL fetch concurrency: %d", concurrency + ) url_source = await self.create_new_or_changed_source() return KgImportWorkflow( diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index c8edeb2..6fc9607 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -160,6 +160,11 @@ def __init__( settings = dict(self.profile.settings) _pool_size = int(_setting(settings, "concurrency", "CONCURRENCY", 4)) + logger.info( + "Concurrency for profile '%s': %d", + self.profile.name, + _pool_size + ) self._init_postprocessor_service(settings, context, _pool_size) self._init_mapping_service(settings, context, _pool_size) self._init_shacl_validator(settings, _pool_size) From 4cae7cae9df836d603c2cf9c03d8dc88d3aedf40 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Thu, 26 Mar 2026 09:18:20 +0100 Subject: [PATCH 6/6] chore: bump to v8.0.3 --- CHANGELOG.md | 12 ++++++++++++ pyproject.toml | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a83011..5a5469d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 8.0.3 - 2026-03-26 + +### Fixed + +- Engine: recover from `BrokenProcessPool` by recreating the morph-kgc pool instead of crashing. +- Protocol: use the correct `mapping_pool_size` variable when sizing the `ThreadPoolExecutor`. +- Protocol: cap postprocessor pool size at 75 % of `pool_size` (minimum 2) to avoid starving the main pool. + +### Changed + +- Logging: url-fetch and postprocessor concurrency levels are now logged at startup. + ## 8.0.0 - 2026-03-20 ### Breaking diff --git a/pyproject.toml b/pyproject.toml index 4b19da5..880e261 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "wordlift-sdk" -version = "8.0.2" +version = "8.0.3" description = "Python toolkit for orchestrating WordLift imports and structured data workflows." authors = ["David Riccitelli "] readme = "README.md"