Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 8.0.3 - 2026-03-26

### Fixed

- Engine: recover from `BrokenProcessPool` by recreating the morph-kgc pool instead of crashing.
- Protocol: use the correct `mapping_pool_size` variable when sizing the `ThreadPoolExecutor`.
- Protocol: cap postprocessor pool size at 75 % of `pool_size` (minimum 2) to avoid starving the main pool.

### Changed

- Logging: url-fetch and postprocessor concurrency levels are now logged at startup.

## 8.0.0 - 2026-03-20

### Breaking
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "wordlift-sdk"
version = "8.0.2"
version = "8.0.3"
description = "Python toolkit for orchestrating WordLift imports and structured data workflows."
authors = ["David Riccitelli <david@wordlift.io>"]
readme = "README.md"
Expand Down
152 changes: 152 additions & 0 deletions tests/test_structured_data_materialization_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,155 @@ class _Response:
payload = str(jsonld)
assert "__ID__" not in payload
assert runtime_id in payload


# ---------------------------------------------------------------------------
# morph-kgc process pool: BrokenProcessPool recovery and pool sizing
# ---------------------------------------------------------------------------


def _make_ntriples_pool(ntriples: str):
"""Return a fake pool whose submit() returns (ntriples, 0)."""

class _Future:
def result(self):
return ntriples, 0

class _Pool:
def submit(self, fn, *args, **kwargs):
return _Future()

return _Pool()


def _make_broken_pool():
"""Return a fake pool whose submit() raises BrokenProcessPool."""
from concurrent.futures.process import BrokenProcessPool

class _Future:
def result(self):
raise BrokenProcessPool("simulated crash")

class _Pool:
def submit(self, fn, *args, **kwargs):
return _Future()

return _Pool()


def test_broken_process_pool_retries_and_recovers(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""First call raises BrokenProcessPool; second succeeds with a fresh pool."""
import wordlift_sdk.structured_data.engine as _engine
from rdflib import Graph

xhtml_path = tmp_path / "page.xhtml"
xhtml_path.write_text("<html><head></head><body></body></html>")

good_graph = Graph()
ntriples = good_graph.serialize(format="nt")

broken = _make_broken_pool()
good = _make_ntriples_pool(ntriples)
pools = iter([broken, good])

monkeypatch.setattr(_engine, "_morph_kgc_pool", None)
monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 0)
monkeypatch.setattr(_engine, "_get_morph_kgc_pool", lambda: next(pools))

mapping = """
prefixes:
schema: 'https://schema.org/'
mappings:
page:
sources:
- [__XHTML__~xpath, '/']
s: https://example.com/page~iri
po:
- [a, 'schema:WebPage']
"""
from wordlift_sdk.structured_data.engine import materialize_yarrrml_jsonld

result = materialize_yarrrml_jsonld(
mapping,
xhtml_path=xhtml_path,
workdir=tmp_path / "work",
)
assert isinstance(result, (dict, list))


def test_broken_process_pool_twice_raises(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Two consecutive BrokenProcessPool errors must raise RuntimeError."""
import wordlift_sdk.structured_data.engine as _engine

xhtml_path = tmp_path / "page.xhtml"
xhtml_path.write_text("<html><head></head><body></body></html>")

pools = iter([_make_broken_pool(), _make_broken_pool()])

monkeypatch.setattr(_engine, "_morph_kgc_pool", None)
monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 0)
monkeypatch.setattr(_engine, "_get_morph_kgc_pool", lambda: next(pools))

mapping = """
prefixes:
schema: 'https://schema.org/'
mappings:
page:
sources:
- [__XHTML__~xpath, '/']
s: https://example.com/page~iri
"""
from wordlift_sdk.structured_data.engine import materialize_yarrrml_jsonld

with pytest.raises(RuntimeError, match="broke twice"):
materialize_yarrrml_jsonld(
mapping,
xhtml_path=xhtml_path,
workdir=tmp_path / "work",
)


def test_init_morph_kgc_pool_is_noop_when_pool_exists(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""init_morph_kgc_pool must not replace an already-existing pool."""
import wordlift_sdk.structured_data.engine as _engine

sentinel = object()
monkeypatch.setattr(_engine, "_morph_kgc_pool", sentinel)
monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 2)

_engine.init_morph_kgc_pool(8)

assert _engine._morph_kgc_pool is sentinel


def test_get_morph_kgc_pool_reuses_max_workers_after_reset(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""After a pool reset, _get_morph_kgc_pool recreates with the stored worker count."""
import wordlift_sdk.structured_data.engine as _engine

created_with: list[int] = []

class _FakePool:
def __init__(self, max_workers: int, **_kwargs):
created_with.append(max_workers)

monkeypatch.setattr(_engine, "_morph_kgc_pool", None)
monkeypatch.setattr(_engine, "_morph_kgc_pool_max_workers", 6)
monkeypatch.setattr(
_engine,
"ProcessPoolExecutor",
lambda max_workers, **kw: _FakePool(max_workers, **kw),
)

pool = _engine._get_morph_kgc_pool()
assert isinstance(pool, _FakePool)
assert created_with == [6]
6 changes: 6 additions & 0 deletions wordlift_sdk/container/application_container.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from os import cpu_count
from wordlift_client import Configuration, AccountInfo, WebPageImportFetchOptions

Expand All @@ -22,6 +23,8 @@
from ..workflow.url_handler.search_console_url_handler import SearchConsoleUrlHandler
from ..workflow.url_handler.url_handler import UrlHandler

logger = logging.getLogger(__name__)


class ApplicationContainer:
_api_url: str
Expand Down Expand Up @@ -130,6 +133,9 @@ async def create_kg_import_workflow(self) -> KgImportWorkflow:
concurrency = self._configuration_provider.get_value(
"CONCURRENCY", min(cpu_count(), 4)
)
logger.info(
"URL fetch concurrency: %d", concurrency
)
return KgImportWorkflow(
context=await self.get_context(),
url_source=await self.create_new_or_changed_source(),
Expand Down
3 changes: 3 additions & 0 deletions wordlift_sdk/kg_build/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ async def create_web_page_scrape_url_handler(

async def create_kg_import_workflow(self) -> KgImportWorkflow:
concurrency = self._configuration_provider.get_value("CONCURRENCY", 2)
logger.info(
"URL fetch concurrency: %d", concurrency
)
url_source = await self.create_new_or_changed_source()

return KgImportWorkflow(
Expand Down
9 changes: 7 additions & 2 deletions wordlift_sdk/kg_build/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ def __init__(

settings = dict(self.profile.settings)
_pool_size = int(_setting(settings, "concurrency", "CONCURRENCY", 4))
logger.info(
"Concurrency for profile '%s': %d",
self.profile.name,
_pool_size
)
self._init_postprocessor_service(settings, context, _pool_size)
self._init_mapping_service(settings, context, _pool_size)
self._init_shacl_validator(settings, _pool_size)
Expand Down Expand Up @@ -203,7 +208,7 @@ def _init_postprocessor_service(
settings,
"postprocessor_pool_size",
"POSTPROCESSOR_POOL_SIZE",
pool_size,
max(2, int(pool_size * 3 / 4)),
)
)
logger.info(
Expand Down Expand Up @@ -277,7 +282,7 @@ def _init_mapping_service(
# the asyncio event loop. The thread itself blocks on the morph_kgc
# ProcessPoolExecutor slot, leaving the event loop free for I/O.
self._mapping_executor = ThreadPoolExecutor(
max_workers=pool_size, thread_name_prefix="worai_ml"
max_workers=mapping_pool_size, thread_name_prefix="worai_ml"
)

def _init_shacl_validator(self, settings: dict, pool_size: int) -> None:
Expand Down
89 changes: 55 additions & 34 deletions wordlift_sdk/structured_data/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import re
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from dataclasses import dataclass
from importlib import resources
from pathlib import Path
Expand Down Expand Up @@ -54,34 +55,39 @@ def _morph_kgc_worker(config: str, submit_time: float) -> tuple[str, int]:
# layer receive the timing as a regular return value.
_morph_kgc_tls = threading.local()

logger = logging.getLogger(__name__)

# Lazy process pool — created on first use in the main process only.
# Worker subprocesses import this module but never call _get_morph_kgc_pool(),
# so they do NOT create their own pools (no recursive process explosion).
_morph_kgc_pool: ProcessPoolExecutor | None = None
_morph_kgc_pool_max_workers: int = 0
_morph_kgc_pool_lock = threading.Lock()


def init_morph_kgc_pool(max_workers: int) -> None:
"""Pre-create the morph_kgc process pool with a specific worker count.
Call once from the protocol __init__ before any mapping work starts.
Subsequent calls are no-ops (pool is only created once).
Subsequent calls are no-ops (pool is only created once per worker count).
"""
global _morph_kgc_pool
if _morph_kgc_pool is not None:
return
ctx = multiprocessing.get_context("spawn")
_morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx)
global _morph_kgc_pool, _morph_kgc_pool_max_workers
with _morph_kgc_pool_lock:
_morph_kgc_pool_max_workers = max_workers
if _morph_kgc_pool is not None:
return
ctx = multiprocessing.get_context("spawn")
_morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx)


def _get_morph_kgc_pool() -> ProcessPoolExecutor:
global _morph_kgc_pool
if _morph_kgc_pool is None:
# Fallback if init_morph_kgc_pool was never called.
ctx = multiprocessing.get_context("spawn")
_morph_kgc_pool = ProcessPoolExecutor(
max_workers=os.cpu_count() or 4,
mp_context=ctx,
)
return _morph_kgc_pool
global _morph_kgc_pool, _morph_kgc_pool_max_workers
with _morph_kgc_pool_lock:
if _morph_kgc_pool is None:
# Fallback if init_morph_kgc_pool was never called, or after a reset.
workers = _morph_kgc_pool_max_workers or (os.cpu_count() or 4)
ctx = multiprocessing.get_context("spawn")
_morph_kgc_pool = ProcessPoolExecutor(max_workers=workers, mp_context=ctx)
return _morph_kgc_pool


_SCHEMA_BASE = "https://schema.org"
Expand Down Expand Up @@ -1422,25 +1428,40 @@ def _materialize_graph(mapping_path: Path) -> Graph:
"[DataSource1]\n"
f"mappings = {mapping_path}\n"
)
try:
# Submit to subprocess pool — each worker has isolated pyparsing state,
# so calls are genuinely parallel across CPU cores with no lock needed.
# .result() blocks the calling thread (not the asyncio event loop).
ntriples, queue_wait_ms = (
_get_morph_kgc_pool()
.submit(_morph_kgc_worker, config, _time.time())
.result()
)
# Store wait time in thread-local so protocol.py can read it without
# changing the return type of this function.
_morph_kgc_tls.mapping_wait_ms = queue_wait_ms
graph = Graph()
graph.parse(data=ntriples, format="nt")
return graph
except RuntimeError:
raise
except Exception as exc:
raise _normalize_materialization_error(exc) from exc
for attempt in range(2):
pool = _get_morph_kgc_pool()
try:
# Submit to subprocess pool — each worker has isolated pyparsing state,
# so calls are genuinely parallel across CPU cores with no lock needed.
# .result() blocks the calling thread (not the asyncio event loop).
ntriples, queue_wait_ms = pool.submit(
_morph_kgc_worker, config, _time.time()
).result()
# Store wait time in thread-local so protocol.py can read it without
# changing the return type of this function.
_morph_kgc_tls.mapping_wait_ms = queue_wait_ms
graph = Graph()
graph.parse(data=ntriples, format="nt")
return graph
except BrokenProcessPool:
logger.warning(
"morph-kgc process pool broken (attempt %d/2), recreating.",
attempt + 1,
)
global _morph_kgc_pool
with _morph_kgc_pool_lock:
# Only reset if this is still the same broken pool instance,
# preventing a race where multiple threads all try to reset it.
if _morph_kgc_pool is pool:
_morph_kgc_pool = None
if attempt >= 1:
raise _normalize_materialization_error(
RuntimeError("morph-kgc process pool broke twice in a row")
) from None
except RuntimeError:
raise
except Exception as exc:
raise _normalize_materialization_error(exc) from exc


def materialize_yarrrml(
Expand Down
Loading