diff --git a/digital_land/collect.py b/digital_land/collect.py index 51f9c9b2..8c0cbcad 100755 --- a/digital_land/collect.py +++ b/digital_land/collect.py @@ -9,6 +9,7 @@ import json import os import re +import shutil from datetime import datetime from enum import Enum from timeit import default_timer as timer @@ -19,6 +20,8 @@ from .adapter.file import FileAdapter from .plugins.sparql import get as sparql_get +from .plugins.wfs import WFSFileResource +from .plugins.wfs import WFSParameters from .plugins.wfs import get as wfs_get from .plugins.arcgis import ArcGISParameters from .plugins.arcgis import get as arcgis_get @@ -27,6 +30,7 @@ PLUGIN_PARAMETER_MODELS = { "arcgis": ("ArcGIS", ArcGISParameters), + "wfs": ("WFS", WFSParameters), } @@ -87,6 +91,20 @@ def save_log(self, path, log, refill_todays_logs=False): ) def save_content(self, content): + if isinstance(content, WFSFileResource): + hasher = hashlib.sha256() + with open(content.path, "rb") as f: + for chunk in iter(lambda: f.read(32 * 1024 * 1024), b""): + hasher.update(chunk) + + resource = hasher.hexdigest() + path = os.path.join(self.resource_dir, resource) + os.makedirs(os.path.dirname(path), exist_ok=True) + if not os.path.exists(path): + logging.info(path) + shutil.copyfile(content.path, path) + return resource + resource = hashlib.sha256(content).hexdigest() path = os.path.join(self.resource_dir, resource) self.save(path, content) @@ -202,6 +220,7 @@ def fetch( self, url, log, + parameters=parameters, ) elif plugin == "sparql": log, content = sparql_get(self, url, log) @@ -220,6 +239,8 @@ def save_resource(self, content, url, log): if content: try: log["resource"] = self.save_content(content) + if isinstance(content, WFSFileResource) and content.cleanup: + os.unlink(content.path) return FetchStatus.OK except Exception as exception: logging.warning(f"Failed to save data from '{url} ({exception})") @@ -298,3 +319,4 @@ def _validate_plugin_parameters(self, plugin, parameters): logging.warning( f"Invalid {plugin_name} parameters. Falling back to defaults. Errors: {exc.errors()}" ) + return None diff --git a/digital_land/plugins/wfs.py b/digital_land/plugins/wfs.py index f119eece..48fbfa2b 100644 --- a/digital_land/plugins/wfs.py +++ b/digital_land/plugins/wfs.py @@ -1,11 +1,21 @@ import io +import logging +import os import re +import subprocess +import tempfile +from typing import Optional +from urllib.parse import parse_qs, urlsplit, urlunsplit from digital_land.phase.convert import detect_encoding +from pydantic import ConfigDict, Field +from pydantic.dataclasses import dataclass # TBD: split this code into a WFS API plugin and a canonicalisation step +DEFAULT_PAGE_SIZE = 1000 + strip_exps = [ (re.compile(rb' ?timeStamp="[^"]*"'), rb""), @@ -14,15 +24,109 @@ ] +@dataclass(config=ConfigDict(extra="forbid")) +class WFSParameters: + page_size: Optional[int] = Field(default=None, gt=0) + + +@dataclass +class WFSFileResource: + path: str + cleanup: bool = True + + def strip_variable_content(content): for strip_exp, replacement in strip_exps: content = strip_exp.sub(replacement, content) return content -def get(collector, url, log={}, plugin="wfs"): +def get(collector, url, log={}, plugin="wfs", parameters=None): + if parameters is None: + parameters = WFSParameters() + elif not isinstance(parameters, WFSParameters): + logging.warning( + "WFS get expects parameters to be a WFSParameters instance. using default parameters" + ) + parameters = WFSParameters() + + if parameters.page_size: + return get_paged_wfs(url, log, plugin=plugin, parameters=parameters) + log, content = collector.get(url=url, log=log, plugin=plugin) encoding = detect_encoding(io.BytesIO(content)) if encoding: content = strip_variable_content(content) return log, content + + +def get_paged_wfs(url, log, plugin="wfs", parameters=None): + parameters = parameters or WFSParameters() + page_size = parameters.page_size or DEFAULT_PAGE_SIZE + output_file = tempfile.NamedTemporaryFile(suffix=".gpkg", delete=False) + output_path = output_file.name + output_file.close() + _remove_file(output_path) + + source_url, layer_name = wfs_source_and_layer(url) + + command = [ + "ogr2ogr", + "--config", + "OGR_WFS_PAGING_ALLOWED", + "ON", + "--config", + "OGR_WFS_PAGING_PAGE_SIZE", + str(page_size), + "-f", + "GPKG", + output_path, + f"WFS:{source_url}", + ] + command.append(layer_name) + + logging.info("%s %s", plugin, url) + + try: + result = subprocess.run( + command, + capture_output=True, + check=False, + ) + except Exception as exception: + logging.warning(exception) + log["exception"] = type(exception).__name__ + _remove_file(output_path) + return log, None + + log["status"] = "200" if result.returncode == 0 else str(result.returncode) + + if result.returncode != 0: + stderr = result.stderr.decode("utf-8", errors="replace").strip() + logging.warning("ogr2ogr failed (%s): %s", result.returncode, stderr) + log["exception"] = "CalledProcessError" + _remove_file(output_path) + return log, None + + if not os.path.getsize(output_path): + log["exception"] = "EmptyWFSResponse" + _remove_file(output_path) + return log, None + + return log, WFSFileResource(output_path) + + +def wfs_source_and_layer(url): + parsed = urlsplit(url) + query = {key.lower(): value for key, value in parse_qs(parsed.query).items()} + values = query.get("typename") or query.get("typenames") + if values: + return urlunsplit(parsed._replace(query="")), values[0] + return url, "" + + +def _remove_file(path): + try: + os.unlink(path) + except OSError: + pass diff --git a/tests/unit/plugins/test_wfs.py b/tests/unit/plugins/test_wfs.py new file mode 100644 index 00000000..5bafc2d8 --- /dev/null +++ b/tests/unit/plugins/test_wfs.py @@ -0,0 +1,151 @@ +from subprocess import CompletedProcess + +from digital_land.plugins.wfs import WFSFileResource +from digital_land.plugins.wfs import WFSParameters +from digital_land.plugins.wfs import get as wfs_get +from digital_land.plugins.wfs import wfs_source_and_layer + + +def test_get_falls_back_to_default_parameters_for_unvalidated_parameter_dict(caplog): + class FakeCollector: + def get(self, url, log, plugin): + log["status"] = "200" + return log, b"reference" + + with caplog.at_level("WARNING"): + log, content = wfs_get( + FakeCollector(), + "https://example.com/wfs", + parameters={"page_size": 1000}, + ) + + assert log["status"] == "200" + assert content == b"reference" + assert ( + "WFS get expects parameters to be a WFSParameters instance. using default parameters" + in caplog.text + ) + + +def test_get_paged_wfs_runs_ogr2ogr_with_paging_config(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + captured = {} + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + captured["command"] = command + captured["capture_output"] = capture_output + captured["check"] = check + output_path.write_bytes(b"geopackage") + return CompletedProcess(command, 0, stdout=b"", stderr=b"") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones", + parameters=WFSParameters(page_size=500), + ) + + assert isinstance(content, WFSFileResource) + assert content.path == str(output_path) + assert log["status"] == "200" + assert captured["command"] == [ + "ogr2ogr", + "--config", + "OGR_WFS_PAGING_ALLOWED", + "ON", + "--config", + "OGR_WFS_PAGING_PAGE_SIZE", + "500", + "-f", + "GPKG", + str(output_path), + "WFS:https://example.com/wfs", + "dataset:Flood_Zones", + ] + + +def test_get_paged_wfs_derives_layer_name_from_endpoint_url(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + captured = {} + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + captured["command"] = command + output_path.write_bytes(b"geopackage") + return CompletedProcess(command, 0, stdout=b"", stderr=b"") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones&outputFormat=Geopackage", + parameters=WFSParameters(page_size=1000), + ) + + assert isinstance(content, WFSFileResource) + assert log["status"] == "200" + assert captured["command"][-2:] == [ + "WFS:https://example.com/wfs", + "dataset:Flood_Zones", + ] + assert "1000" in captured["command"] + + +def test_wfs_source_and_layer_extracts_case_insensitive_typename(): + source_url, layer_name = wfs_source_and_layer( + "https://example.com/wfs?request=GetFeature&TYPENAME=dataset:Flood_Zones" + ) + + assert source_url == "https://example.com/wfs" + assert layer_name == "dataset:Flood_Zones" + + +def test_get_paged_wfs_logs_failure_and_cleans_temp_file(tmp_path, mocker): + output_path = tmp_path / "output.gpkg" + + class FakeTempFile: + name = str(output_path) + + def __init__(self, *args, **kwargs): + pass + + def close(self): + pass + + def fake_run(command, capture_output, check): + output_path.write_bytes(b"partial") + return CompletedProcess(command, 1, stdout=b"", stderr=b"failed") + + mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile) + mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run) + + log, content = wfs_get( + None, + "https://example.com/wfs", + parameters=WFSParameters(page_size=1000), + ) + + assert content is None + assert log["status"] == "1" + assert log["exception"] == "CalledProcessError" + assert not output_path.exists() diff --git a/tests/unit/test_collect.py b/tests/unit/test_collect.py index b965e16a..92f6b99c 100644 --- a/tests/unit/test_collect.py +++ b/tests/unit/test_collect.py @@ -12,6 +12,8 @@ from digital_land.collect import Collector, FetchStatus from digital_land.plugins.arcgis import ArcGISParameters +from digital_land.plugins.wfs import WFSFileResource +from digital_land.plugins.wfs import WFSParameters @pytest.fixture @@ -245,6 +247,49 @@ def fake_arcgis_get(collector_obj, url, log, parameters=None, plugin="arcgis"): assert captured["parameters"] == ArcGISParameters(max_page_size=20) +def test_fetch_passes_parameters_to_wfs_plugin(collector, mocker): + captured = {} + + def fake_wfs_get(collector_obj, url, log, parameters=None, plugin="wfs"): + captured["collector"] = collector_obj + captured["url"] = url + captured["parameters"] = parameters + return log, b"wfs data" + + mocker.patch("digital_land.collect.wfs_get", side_effect=fake_wfs_get) + + url = "http://some.wfs.url" + status, log = collector.fetch( + url, + endpoint=sha_digest(url), + plugin="wfs", + parameters={"page_size": 500}, + refill_todays_logs=True, + ) + + assert status == FetchStatus.OK + assert captured["url"] == url + assert captured["parameters"] == WFSParameters(page_size=500) + + +def test_save_resource_saves_file_resource_without_loading_content(collector, tmp_path): + source_path = tmp_path / "source.gpkg" + source_path.write_bytes(b"geopackage") + + log = {} + status = collector.save_resource( + WFSFileResource(str(source_path)), + "http://some.wfs.url", + log, + ) + + resource_path = pathlib.Path(collector.resource_dir) / log["resource"] + + assert status == FetchStatus.OK + assert resource_path.read_bytes() == b"geopackage" + assert not source_path.exists() + + def test_collect_reads_parameters_from_csv(tmp_path, mocker): collector = Collector( resource_dir=str(tmp_path / "resource"),