Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions digital_land/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import json
import os
import re
import shutil
from datetime import datetime
from enum import Enum
from timeit import default_timer as timer
Expand All @@ -19,6 +20,8 @@

from .adapter.file import FileAdapter
from .plugins.sparql import get as sparql_get
from .plugins.wfs import WFSFileResource
from .plugins.wfs import WFSParameters
from .plugins.wfs import get as wfs_get
from .plugins.arcgis import ArcGISParameters
from .plugins.arcgis import get as arcgis_get
Expand All @@ -27,6 +30,7 @@

PLUGIN_PARAMETER_MODELS = {
"arcgis": ("ArcGIS", ArcGISParameters),
"wfs": ("WFS", WFSParameters),
}


Expand Down Expand Up @@ -87,6 +91,20 @@ def save_log(self, path, log, refill_todays_logs=False):
)

def save_content(self, content):
if isinstance(content, WFSFileResource):
hasher = hashlib.sha256()
with open(content.path, "rb") as f:
for chunk in iter(lambda: f.read(32 * 1024 * 1024), b""):
hasher.update(chunk)

resource = hasher.hexdigest()
path = os.path.join(self.resource_dir, resource)
os.makedirs(os.path.dirname(path), exist_ok=True)
if not os.path.exists(path):
logging.info(path)
shutil.copyfile(content.path, path)
return resource

resource = hashlib.sha256(content).hexdigest()
path = os.path.join(self.resource_dir, resource)
self.save(path, content)
Expand Down Expand Up @@ -202,6 +220,7 @@ def fetch(
self,
url,
log,
parameters=parameters,
)
elif plugin == "sparql":
log, content = sparql_get(self, url, log)
Expand All @@ -220,6 +239,8 @@ def save_resource(self, content, url, log):
if content:
try:
log["resource"] = self.save_content(content)
if isinstance(content, WFSFileResource) and content.cleanup:
os.unlink(content.path)
return FetchStatus.OK
except Exception as exception:
logging.warning(f"Failed to save data from '{url} ({exception})")
Expand Down Expand Up @@ -298,3 +319,4 @@ def _validate_plugin_parameters(self, plugin, parameters):
logging.warning(
f"Invalid {plugin_name} parameters. Falling back to defaults. Errors: {exc.errors()}"
)
return None
106 changes: 105 additions & 1 deletion digital_land/plugins/wfs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import io
import logging
import os
import re
import subprocess
import tempfile
from typing import Optional
from urllib.parse import parse_qs, urlsplit, urlunsplit

from digital_land.phase.convert import detect_encoding
from pydantic import ConfigDict, Field
from pydantic.dataclasses import dataclass


# TBD: split this code into a WFS API plugin and a canonicalisation step

DEFAULT_PAGE_SIZE = 1000


strip_exps = [
(re.compile(rb' ?timeStamp="[^"]*"'), rb""),
Expand All @@ -14,15 +24,109 @@
]


@dataclass(config=ConfigDict(extra="forbid"))
class WFSParameters:
page_size: Optional[int] = Field(default=None, gt=0)


@dataclass
class WFSFileResource:
path: str
cleanup: bool = True


def strip_variable_content(content):
for strip_exp, replacement in strip_exps:
content = strip_exp.sub(replacement, content)
return content


def get(collector, url, log={}, plugin="wfs"):
def get(collector, url, log={}, plugin="wfs", parameters=None):
if parameters is None:
parameters = WFSParameters()
elif not isinstance(parameters, WFSParameters):
logging.warning(
"WFS get expects parameters to be a WFSParameters instance. using default parameters"
)
parameters = WFSParameters()

if parameters.page_size:
return get_paged_wfs(url, log, plugin=plugin, parameters=parameters)

log, content = collector.get(url=url, log=log, plugin=plugin)
encoding = detect_encoding(io.BytesIO(content))
if encoding:
content = strip_variable_content(content)
return log, content


def get_paged_wfs(url, log, plugin="wfs", parameters=None):
parameters = parameters or WFSParameters()
page_size = parameters.page_size or DEFAULT_PAGE_SIZE
output_file = tempfile.NamedTemporaryFile(suffix=".gpkg", delete=False)
output_path = output_file.name
output_file.close()
_remove_file(output_path)

source_url, layer_name = wfs_source_and_layer(url)

command = [
"ogr2ogr",
"--config",
"OGR_WFS_PAGING_ALLOWED",
"ON",
"--config",
"OGR_WFS_PAGING_PAGE_SIZE",
str(page_size),
"-f",
"GPKG",
output_path,
f"WFS:{source_url}",
]
command.append(layer_name)

logging.info("%s %s", plugin, url)

try:
result = subprocess.run(
command,
capture_output=True,
check=False,
)
except Exception as exception:
logging.warning(exception)
log["exception"] = type(exception).__name__
_remove_file(output_path)
return log, None

log["status"] = "200" if result.returncode == 0 else str(result.returncode)

if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="replace").strip()
logging.warning("ogr2ogr failed (%s): %s", result.returncode, stderr)
log["exception"] = "CalledProcessError"
_remove_file(output_path)
return log, None

if not os.path.getsize(output_path):
log["exception"] = "EmptyWFSResponse"
_remove_file(output_path)
return log, None

return log, WFSFileResource(output_path)


def wfs_source_and_layer(url):
parsed = urlsplit(url)
query = {key.lower(): value for key, value in parse_qs(parsed.query).items()}
values = query.get("typename") or query.get("typenames")
if values:
return urlunsplit(parsed._replace(query="")), values[0]
return url, ""


def _remove_file(path):
try:
os.unlink(path)
except OSError:
pass
151 changes: 151 additions & 0 deletions tests/unit/plugins/test_wfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from subprocess import CompletedProcess

from digital_land.plugins.wfs import WFSFileResource
from digital_land.plugins.wfs import WFSParameters
from digital_land.plugins.wfs import get as wfs_get
from digital_land.plugins.wfs import wfs_source_and_layer


def test_get_falls_back_to_default_parameters_for_unvalidated_parameter_dict(caplog):
class FakeCollector:
def get(self, url, log, plugin):
log["status"] = "200"
return log, b"reference"

with caplog.at_level("WARNING"):
log, content = wfs_get(
FakeCollector(),
"https://example.com/wfs",
parameters={"page_size": 1000},
)

assert log["status"] == "200"
assert content == b"reference"
assert (
"WFS get expects parameters to be a WFSParameters instance. using default parameters"
in caplog.text
)


def test_get_paged_wfs_runs_ogr2ogr_with_paging_config(tmp_path, mocker):
output_path = tmp_path / "output.gpkg"
captured = {}

class FakeTempFile:
name = str(output_path)

def __init__(self, *args, **kwargs):
pass

def close(self):
pass

def fake_run(command, capture_output, check):
captured["command"] = command
captured["capture_output"] = capture_output
captured["check"] = check
output_path.write_bytes(b"geopackage")
return CompletedProcess(command, 0, stdout=b"", stderr=b"")

mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile)
mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run)

log, content = wfs_get(
None,
"https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones",
parameters=WFSParameters(page_size=500),
)

assert isinstance(content, WFSFileResource)
assert content.path == str(output_path)
assert log["status"] == "200"
assert captured["command"] == [
"ogr2ogr",
"--config",
"OGR_WFS_PAGING_ALLOWED",
"ON",
"--config",
"OGR_WFS_PAGING_PAGE_SIZE",
"500",
"-f",
"GPKG",
str(output_path),
"WFS:https://example.com/wfs",
"dataset:Flood_Zones",
]


def test_get_paged_wfs_derives_layer_name_from_endpoint_url(tmp_path, mocker):
output_path = tmp_path / "output.gpkg"
captured = {}

class FakeTempFile:
name = str(output_path)

def __init__(self, *args, **kwargs):
pass

def close(self):
pass

def fake_run(command, capture_output, check):
captured["command"] = command
output_path.write_bytes(b"geopackage")
return CompletedProcess(command, 0, stdout=b"", stderr=b"")

mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile)
mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run)

log, content = wfs_get(
None,
"https://example.com/wfs?request=GetFeature&typeName=dataset:Flood_Zones&outputFormat=Geopackage",
parameters=WFSParameters(page_size=1000),
)

assert isinstance(content, WFSFileResource)
assert log["status"] == "200"
assert captured["command"][-2:] == [
"WFS:https://example.com/wfs",
"dataset:Flood_Zones",
]
assert "1000" in captured["command"]


def test_wfs_source_and_layer_extracts_case_insensitive_typename():
source_url, layer_name = wfs_source_and_layer(
"https://example.com/wfs?request=GetFeature&TYPENAME=dataset:Flood_Zones"
)

assert source_url == "https://example.com/wfs"
assert layer_name == "dataset:Flood_Zones"


def test_get_paged_wfs_logs_failure_and_cleans_temp_file(tmp_path, mocker):
output_path = tmp_path / "output.gpkg"

class FakeTempFile:
name = str(output_path)

def __init__(self, *args, **kwargs):
pass

def close(self):
pass

def fake_run(command, capture_output, check):
output_path.write_bytes(b"partial")
return CompletedProcess(command, 1, stdout=b"", stderr=b"failed")

mocker.patch("digital_land.plugins.wfs.tempfile.NamedTemporaryFile", FakeTempFile)
mocker.patch("digital_land.plugins.wfs.subprocess.run", side_effect=fake_run)

log, content = wfs_get(
None,
"https://example.com/wfs",
parameters=WFSParameters(page_size=1000),
)

assert content is None
assert log["status"] == "1"
assert log["exception"] == "CalledProcessError"
assert not output_path.exists()
Loading
Loading