From 6090601d3fc3dd6c931d9349e282e43cf6b3ee9b Mon Sep 17 00:00:00 2001 From: "Saheed Alabi (Standard User)" Date: Mon, 1 Jun 2026 13:39:26 +0100 Subject: [PATCH 1/3] added paging func to wfs plugin --- digital_land/collect.py | 22 +++++ digital_land/plugins/wfs.py | 98 ++++++++++++++++++++++- tests/unit/plugins/test_wfs.py | 142 +++++++++++++++++++++++++++++++++ tests/unit/test_collect.py | 45 +++++++++++ 4 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 tests/unit/plugins/test_wfs.py diff --git a/digital_land/collect.py b/digital_land/collect.py index 51f9c9b24..8c0cbcadc 100755 --- a/digital_land/collect.py +++ b/digital_land/collect.py @@ -9,6 +9,7 @@ import json import os import re +import shutil from datetime import datetime from enum import Enum from timeit import default_timer as timer @@ -19,6 +20,8 @@ from .adapter.file import FileAdapter from .plugins.sparql import get as sparql_get +from .plugins.wfs import WFSFileResource +from .plugins.wfs import WFSParameters from .plugins.wfs import get as wfs_get from .plugins.arcgis import ArcGISParameters from .plugins.arcgis import get as arcgis_get @@ -27,6 +30,7 @@ PLUGIN_PARAMETER_MODELS = { "arcgis": ("ArcGIS", ArcGISParameters), + "wfs": ("WFS", WFSParameters), } @@ -87,6 +91,20 @@ def save_log(self, path, log, refill_todays_logs=False): ) def save_content(self, content): + if isinstance(content, WFSFileResource): + hasher = hashlib.sha256() + with open(content.path, "rb") as f: + for chunk in iter(lambda: f.read(32 * 1024 * 1024), b""): + hasher.update(chunk) + + resource = hasher.hexdigest() + path = os.path.join(self.resource_dir, resource) + os.makedirs(os.path.dirname(path), exist_ok=True) + if not os.path.exists(path): + logging.info(path) + shutil.copyfile(content.path, path) + return resource + resource = hashlib.sha256(content).hexdigest() path = os.path.join(self.resource_dir, resource) self.save(path, content) @@ -202,6 +220,7 @@ def fetch( self, url, log, + parameters=parameters, ) elif plugin == "sparql": log, content = sparql_get(self, url, log) @@ -220,6 +239,8 @@ def save_resource(self, content, url, log): if content: try: log["resource"] = self.save_content(content) + if isinstance(content, WFSFileResource) and content.cleanup: + os.unlink(content.path) return FetchStatus.OK except Exception as exception: logging.warning(f"Failed to save data from '{url} ({exception})") @@ -298,3 +319,4 @@ def _validate_plugin_parameters(self, plugin, parameters): logging.warning( f"Invalid {plugin_name} parameters. Falling back to defaults. Errors: {exc.errors()}" ) + return None diff --git a/digital_land/plugins/wfs.py b/digital_land/plugins/wfs.py index f119eece9..dbea190a6 100644 --- a/digital_land/plugins/wfs.py +++ b/digital_land/plugins/wfs.py @@ -1,11 +1,20 @@ import io +import logging +import os import re +import subprocess +import tempfile +from typing import Optional from digital_land.phase.convert import detect_encoding +from pydantic import ConfigDict, Field +from pydantic.dataclasses import dataclass # TBD: split this code into a WFS API plugin and a canonicalisation step +DEFAULT_PAGE_SIZE = 1000 + strip_exps = [ (re.compile(rb' ?timeStamp="[^"]*"'), rb""), @@ -14,15 +23,102 @@ ] +@dataclass(config=ConfigDict(extra="forbid")) +class WFSParameters: + paging: bool = False + page_size: int = Field(default=DEFAULT_PAGE_SIZE, gt=0) + source_url: Optional[str] = None + layer_name: Optional[str] = None + + +@dataclass +class WFSFileResource: + path: str + cleanup: bool = True + + def strip_variable_content(content): for strip_exp, replacement in strip_exps: content = strip_exp.sub(replacement, content) return content -def get(collector, url, log={}, plugin="wfs"): +def get(collector, url, log={}, plugin="wfs", parameters=None): + if parameters is None: + parameters = WFSParameters() + elif not isinstance(parameters, WFSParameters): + logging.warning( + "WFS get expects parameters to be a WFSParameters instance. using default parameters" + ) + parameters = WFSParameters() + + if parameters.paging: + return get_paged_wfs(url, log, plugin=plugin, parameters=parameters) + log, content = collector.get(url=url, log=log, plugin=plugin) encoding = detect_encoding(io.BytesIO(content)) if encoding: content = strip_variable_content(content) return log, content + + +def get_paged_wfs(url, log, plugin="wfs", parameters=None): + parameters = parameters or WFSParameters() + output_file = tempfile.NamedTemporaryFile(suffix=".gpkg", delete=False) + output_path = output_file.name + output_file.close() + _remove_file(output_path) + + source = parameters.source_url or url + command = [ + "ogr2ogr", + "--config", + "OGR_WFS_PAGING_ALLOWED", + "ON", + "--config", + "OGR_WFS_PAGING_PAGE_SIZE", + str(parameters.page_size), + "-f", + "GPKG", + output_path, + f"WFS:{source}", + ] + if parameters.layer_name: + command.append(parameters.layer_name) + + logging.info("%s %s", plugin, url) + + try: + result = subprocess.run( + command, + capture_output=True, + check=False, + ) + except Exception as exception: + logging.warning(exception) + log["exception"] = type(exception).__name__ + _remove_file(output_path) + return log, None + + log["status"] = "200" if result.returncode == 0 else str(result.returncode) + + if result.returncode != 0: + stderr = result.stderr.decode("utf-8", errors="replace").strip() + logging.warning("ogr2ogr failed (%s): %s", result.returncode, stderr) + log["exception"] = "CalledProcessError" + _remove_file(output_path) + return log, None + + if not os.path.getsize(output_path): + log["exception"] = "EmptyWFSResponse" + _remove_file(output_path) + return log, None + + return log, WFSFileResource(output_path) + + +def _remove_file(path): + try: + os.unlink(path) + except OSError: + pass diff --git a/tests/unit/plugins/test_wfs.py b/tests/unit/plugins/test_wfs.py new file mode 100644 index 000000000..2c2130fa2 --- /dev/null +++ b/tests/unit/plugins/test_wfs.py @@ -0,0 +1,142 @@ +from subprocess import CompletedProcess + +from digital_land.plugins.wfs import WFSFileResource +from digital_land.plugins.wfs import WFSParameters +from digital_land.plugins.wfs import get as wfs_get + + +def test_get_falls_back_to_default_parameters_for_unvalidated_parameter_dict(caplog): + class FakeCollector: + def get(self, url, log, plugin): + log["status"] = "200" + return log, b"reference" + + with caplog.at_level("WARNING"): + log, content = wfs_get( + FakeCollector(), + "https://example.com/wfs", + parameters={"paging": True}, + ) + + assert log["status"] == "200" + assert content == b"reference" + assert ( + "WFS get expects parameters to be a WFSParameters instance. using default parameters" + in caplog.text + ) + + +def test_get_paged_wfs_runs_ogr2ogr_with_paging_config(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + captured = {} + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + captured["command"] = command + captured["capture_output"] = capture_output + captured["check"] = check + output_path.write_bytes(b"geopackage") + return CompletedProcess(command, 0, stdout=b"", stderr=b"") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs?request=GetFeature", + parameters=WFSParameters(paging=True, page_size=500), + ) + + assert isinstance(content, WFSFileResource) + assert content.path == str(output_path) + assert log["status"] == "200" + assert captured["command"] == [ + "ogr2ogr", + "--config", + "OGR_WFS_PAGING_ALLOWED", + "ON", + "--config", + "OGR_WFS_PAGING_PAGE_SIZE", + "500", + "-f", + "GPKG", + str(output_path), + "WFS:https://example.com/wfs?request=GetFeature", + ] + +def test_get_paged_wfs_logs_failure_and_cleans_temp_file(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + output_path.write_bytes(b"partial") + return CompletedProcess(command, 1, stdout=b"", stderr=b"failed") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs", + parameters=WFSParameters(paging=True), + ) + + assert content is None + assert log["status"] == "1" + assert log["exception"] == "CalledProcessError" + assert not output_path.exists() + + +def test_get_paged_wfs_can_use_source_url_override_and_layer_name(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + captured = {} + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + captured["command"] = command + output_path.write_bytes(b"geopackage") + return CompletedProcess(command, 0, stdout=b"", stderr=b"") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs?request=GetFeature&typeName=ignored", + parameters=WFSParameters( + paging=True, + source_url="https://example.com/wfs", + layer_name="dataset:Flood_Zones", + ), + ) + + assert isinstance(content, WFSFileResource) + assert log["status"] == "200" + assert captured["command"][-2:] == [ + "WFS:https://example.com/wfs", + "dataset:Flood_Zones", + ] diff --git a/tests/unit/test_collect.py b/tests/unit/test_collect.py index b965e16a6..0fd7a6444 100644 --- a/tests/unit/test_collect.py +++ b/tests/unit/test_collect.py @@ -12,6 +12,8 @@ from digital_land.collect import Collector, FetchStatus from digital_land.plugins.arcgis import ArcGISParameters +from digital_land.plugins.wfs import WFSFileResource +from digital_land.plugins.wfs import WFSParameters @pytest.fixture @@ -245,6 +247,49 @@ def fake_arcgis_get(collector_obj, url, log, parameters=None, plugin="arcgis"): assert captured["parameters"] == ArcGISParameters(max_page_size=20) +def test_fetch_passes_parameters_to_wfs_plugin(collector, mocker): + captured = {} + + def fake_wfs_get(collector_obj, url, log, parameters=None, plugin="wfs"): + captured["collector"] = collector_obj + captured["url"] = url + captured["parameters"] = parameters + return log, b"wfs data" + + mocker.patch("digital_land.collect.wfs_get", side_effect=fake_wfs_get) + + url = "http://some.wfs.url" + status, log = collector.fetch( + url, + endpoint=sha_digest(url), + plugin="wfs", + parameters={"paging": True, "page_size": 500}, + refill_todays_logs=True, + ) + + assert status == FetchStatus.OK + assert captured["url"] == url + assert captured["parameters"] == WFSParameters(paging=True, page_size=500) + + +def test_save_resource_saves_file_resource_without_loading_content(collector, tmp_path): + source_path = tmp_path / "source.gpkg" + source_path.write_bytes(b"geopackage") + + log = {} + status = collector.save_resource( + WFSFileResource(str(source_path)), + "http://some.wfs.url", + log, + ) + + resource_path = pathlib.Path(collector.resource_dir) / log["resource"] + + assert status == FetchStatus.OK + assert resource_path.read_bytes() == b"geopackage" + assert not source_path.exists() + + def test_collect_reads_parameters_from_csv(tmp_path, mocker): collector = Collector( resource_dir=str(tmp_path / "resource"), From a4ad54885bdb7c7a94433da31aa0410829d7ad99 Mon Sep 17 00:00:00 2001 From: "Saheed Alabi (Standard User)" Date: Mon, 1 Jun 2026 21:04:51 +0100 Subject: [PATCH 2/3] added paging func to wfs plugin --- digital_land/plugins/wfs.py | 21 ++++++++---- tests/unit/plugins/test_wfs.py | 62 +++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/digital_land/plugins/wfs.py b/digital_land/plugins/wfs.py index dbea190a6..e2890fd26 100644 --- a/digital_land/plugins/wfs.py +++ b/digital_land/plugins/wfs.py @@ -4,7 +4,7 @@ import re import subprocess import tempfile -from typing import Optional +from urllib.parse import parse_qs, urlsplit, urlunsplit from digital_land.phase.convert import detect_encoding from pydantic import ConfigDict, Field @@ -27,8 +27,6 @@ class WFSParameters: paging: bool = False page_size: int = Field(default=DEFAULT_PAGE_SIZE, gt=0) - source_url: Optional[str] = None - layer_name: Optional[str] = None @dataclass @@ -69,7 +67,8 @@ def get_paged_wfs(url, log, plugin="wfs", parameters=None): output_file.close() _remove_file(output_path) - source = parameters.source_url or url + source_url, layer_name = wfs_source_and_layer(url) + command = [ "ogr2ogr", "--config", @@ -81,10 +80,9 @@ def get_paged_wfs(url, log, plugin="wfs", parameters=None): "-f", "GPKG", output_path, - f"WFS:{source}", + f"WFS:{source_url}", ] - if parameters.layer_name: - command.append(parameters.layer_name) + command.append(layer_name) logging.info("%s %s", plugin, url) @@ -117,6 +115,15 @@ def get_paged_wfs(url, log, plugin="wfs", parameters=None): return log, WFSFileResource(output_path) +def wfs_source_and_layer(url): + parsed = urlsplit(url) + query = {key.lower(): value for key, value in parse_qs(parsed.query).items()} + values = query.get("typename") or query.get("typenames") + if values: + return urlunsplit(parsed._replace(query="")), values[0] + return url, "" + + def _remove_file(path): try: os.unlink(path) diff --git a/tests/unit/plugins/test_wfs.py b/tests/unit/plugins/test_wfs.py index 2c2130fa2..588bf8071 100644 --- a/tests/unit/plugins/test_wfs.py +++ b/tests/unit/plugins/test_wfs.py @@ -3,6 +3,7 @@ from digital_land.plugins.wfs import WFSFileResource from digital_land.plugins.wfs import WFSParameters from digital_land.plugins.wfs import get as wfs_get +from digital_land.plugins.wfs import wfs_source_and_layer def test_get_falls_back_to_default_parameters_for_unvalidated_parameter_dict(caplog): @@ -51,7 +52,7 @@ def fake_run(command, capture_output, check): log, content = wfs_get( None, - "https://example.com/wfs?request=GetFeature", + "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones", parameters=WFSParameters(paging=True, page_size=500), ) @@ -69,11 +70,14 @@ def fake_run(command, capture_output, check): "-f", "GPKG", str(output_path), - "WFS:https://example.com/wfs?request=GetFeature", + "WFS:https://example.com/wfs", + "dataset:Flood_Zones", ] -def test_get_paged_wfs_logs_failure_and_cleans_temp_file(tmp_path, mocker): + +def test_get_paged_wfs_derives_layer_name_from_endpoint_url(tmp_path, mocker): output_path = tmp_path / "output.gpkg" + captured = {} class FakeTempFile: name = str(output_path) @@ -85,27 +89,38 @@ def close(self): pass def fake_run(command, capture_output, check): - output_path.write_bytes(b"partial") - return CompletedProcess(command, 1, stdout=b"", stderr=b"failed") + captured["command"] = command + output_path.write_bytes(b"geopackage") + return CompletedProcess(command, 0, stdout=b"", stderr=b"") mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) log, content = wfs_get( None, - "https://example.com/wfs", + "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones&outputFormat=Geopackage", parameters=WFSParameters(paging=True), ) - assert content is None - assert log["status"] == "1" - assert log["exception"] == "CalledProcessError" - assert not output_path.exists() + assert isinstance(content, WFSFileResource) + assert log["status"] == "200" + assert captured["command"][-2:] == [ + "WFS:https://example.com/wfs", + "dataset:Flood_Zones", + ] + + +def test_wfs_source_and_layer_extracts_case_insensitive_typename(): + source_url, layer_name = wfs_source_and_layer( + "https://example.com/wfs?request=GetFeature&TYPENAME=dataset:Flood_Zones" + ) + + assert source_url == "https://example.com/wfs" + assert layer_name == "dataset:Flood_Zones" -def test_get_paged_wfs_can_use_source_url_override_and_layer_name(tmp_path, mocker): +def test_get_paged_wfs_logs_failure_and_cleans_temp_file(tmp_path, mocker): output_path = tmp_path / "output.gpkg" - captured = {} class FakeTempFile: name = str(output_path) @@ -117,26 +132,19 @@ def close(self): pass def fake_run(command, capture_output, check): - captured["command"] = command - output_path.write_bytes(b"geopackage") - return CompletedProcess(command, 0, stdout=b"", stderr=b"") + output_path.write_bytes(b"partial") + return CompletedProcess(command, 1, stdout=b"", stderr=b"failed") mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) log, content = wfs_get( None, - "https://example.com/wfs?request=GetFeature&typeName=ignored", - parameters=WFSParameters( - paging=True, - source_url="https://example.com/wfs", - layer_name="dataset:Flood_Zones", - ), + "https://example.com/wfs", + parameters=WFSParameters(paging=True), ) - assert isinstance(content, WFSFileResource) - assert log["status"] == "200" - assert captured["command"][-2:] == [ - "WFS:https://example.com/wfs", - "dataset:Flood_Zones", - ] + assert content is None + assert log["status"] == "1" + assert log["exception"] == "CalledProcessError" + assert not output_path.exists() From f53a51ade7f6d93c96e041f01b1e4577dd7655aa Mon Sep 17 00:00:00 2001 From: "Saheed Alabi (Standard User)" Date: Tue, 2 Jun 2026 08:13:49 +0100 Subject: [PATCH 3/3] added paging func to wfs plugin --- digital_land/plugins/wfs.py | 9 +++++---- tests/unit/plugins/test_wfs.py | 9 +++++---- tests/unit/test_collect.py | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/digital_land/plugins/wfs.py b/digital_land/plugins/wfs.py index e2890fd26..48fbfa2b9 100644 --- a/digital_land/plugins/wfs.py +++ b/digital_land/plugins/wfs.py @@ -4,6 +4,7 @@ import re import subprocess import tempfile +from typing import Optional from urllib.parse import parse_qs, urlsplit, urlunsplit from digital_land.phase.convert import detect_encoding @@ -25,8 +26,7 @@ @dataclass(config=ConfigDict(extra="forbid")) class WFSParameters: - paging: bool = False - page_size: int = Field(default=DEFAULT_PAGE_SIZE, gt=0) + page_size: Optional[int] = Field(default=None, gt=0) @dataclass @@ -50,7 +50,7 @@ def get(collector, url, log={}, plugin="wfs", parameters=None): ) parameters = WFSParameters() - if parameters.paging: + if parameters.page_size: return get_paged_wfs(url, log, plugin=plugin, parameters=parameters) log, content = collector.get(url=url, log=log, plugin=plugin) @@ -62,6 +62,7 @@ def get(collector, url, log={}, plugin="wfs", parameters=None): def get_paged_wfs(url, log, plugin="wfs", parameters=None): parameters = parameters or WFSParameters() + page_size = parameters.page_size or DEFAULT_PAGE_SIZE output_file = tempfile.NamedTemporaryFile(suffix=".gpkg", delete=False) output_path = output_file.name output_file.close() @@ -76,7 +77,7 @@ def get_paged_wfs(url, log, plugin="wfs", parameters=None): "ON", "--config", "OGR_WFS_PAGING_PAGE_SIZE", - str(parameters.page_size), + str(page_size), "-f", "GPKG", output_path, diff --git a/tests/unit/plugins/test_wfs.py b/tests/unit/plugins/test_wfs.py index 588bf8071..5bafc2d8b 100644 --- a/tests/unit/plugins/test_wfs.py +++ b/tests/unit/plugins/test_wfs.py @@ -16,7 +16,7 @@ def get(self, url, log, plugin): log, content = wfs_get( FakeCollector(), "https://example.com/wfs", - parameters={"paging": True}, + parameters={"page_size": 1000}, ) assert log["status"] == "200" @@ -53,7 +53,7 @@ def fake_run(command, capture_output, check): log, content = wfs_get( None, "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones", - parameters=WFSParameters(paging=True, page_size=500), + parameters=WFSParameters(page_size=500), ) assert isinstance(content, WFSFileResource) @@ -99,7 +99,7 @@ def fake_run(command, capture_output, check): log, content = wfs_get( None, "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones&outputFormat=Geopackage", - parameters=WFSParameters(paging=True), + parameters=WFSParameters(page_size=1000), ) assert isinstance(content, WFSFileResource) @@ -108,6 +108,7 @@ def fake_run(command, capture_output, check): "WFS:https://example.com/wfs", "dataset:Flood_Zones", ] + assert "1000" in captured["command"] def test_wfs_source_and_layer_extracts_case_insensitive_typename(): @@ -141,7 +142,7 @@ def fake_run(command, capture_output, check): log, content = wfs_get( None, "https://example.com/wfs", - parameters=WFSParameters(paging=True), + parameters=WFSParameters(page_size=1000), ) assert content is None diff --git a/tests/unit/test_collect.py b/tests/unit/test_collect.py index 0fd7a6444..92f6b99cd 100644 --- a/tests/unit/test_collect.py +++ b/tests/unit/test_collect.py @@ -263,13 +263,13 @@ def fake_wfs_get(collector_obj, url, log, parameters=None, plugin="wfs"): url, endpoint=sha_digest(url), plugin="wfs", - parameters={"paging": True, "page_size": 500}, + parameters={"page_size": 500}, refill_todays_logs=True, ) assert status == FetchStatus.OK assert captured["url"] == url - assert captured["parameters"] == WFSParameters(paging=True, page_size=500) + assert captured["parameters"] == WFSParameters(page_size=500) def test_save_resource_saves_file_resource_without_loading_content(collector, tmp_path):