Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions digital_land/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def run(
endpoint: Optional[str] = None,
log_path: Optional[Path] = None,
issue_path: Optional[Path] = None,
column_field_path: Optional[Path] = None,
mandatory_fields: Optional[List[str]] = None,
severity_filter: Optional[List[str]] = None,
responsibility_filter: Optional[List[str]] = None,
entry_date: Optional[str] = None,
Expand All @@ -86,7 +88,20 @@ def run(

if output_path is None:
logger.error("output_path is required")
return TaskPipelineStatus.FAILED
self.status = TaskPipelineStatus.FAILED
return self.status

if column_field_path and not mandatory_fields:
logger.warning(
"column_field_path provided but mandatory_fields not supplied — skipping missing field checks"
)
column_field_path = None
elif column_field_path and not Path(column_field_path).exists():
logger.warning(
"column_field_path provided but does not exist: %s",
column_field_path,
)
column_field_path = None

frames = []

Expand Down Expand Up @@ -119,16 +134,30 @@ def run(
if not issue_tasks.is_empty():
frames.append(issue_tasks)

if column_field_path:
column_field_df = pl.scan_csv(
column_field_path, infer_schema_length=0, null_values=[""]
).collect()
column_field_tasks = _transform_column_field_to_tasks(
column_field_df,
organisation,
dataset,
mandatory_fields,
entry_date,
)
if not column_field_tasks.is_empty():
frames.append(column_field_tasks)

result = pl.concat(frames) if frames else pl.DataFrame(schema=_EMPTY_SCHEMA)
result.write_csv(output_path)

self.status = TaskPipelineStatus.COMPLETE
return TaskPipelineStatus.COMPLETE
return self.status

except Exception:
logger.exception("TaskPipeline failed")
self.status = TaskPipelineStatus.FAILED
return TaskPipelineStatus.FAILED
return self.status


def _transform_log_to_tasks(
Expand Down Expand Up @@ -201,6 +230,67 @@ def _transform_log_to_tasks(
return result


def _transform_column_field_to_tasks(
df: pl.DataFrame,
organisation: str,
dataset: str,
mandatory_fields: List[str],
entry_date: str,
) -> pl.DataFrame:
"""Identify missing required fields from column-field log and create task rows."""
present_fields = set(df["field"].to_list()) if "field" in df.columns else set()
missing = []
for field in mandatory_fields:
if isinstance(field, list):
if not any(f in present_fields for f in field):
missing.extend(field)
else:
if field not in present_fields:
missing.append(field)

if not missing:
return pl.DataFrame(schema=_EMPTY_SCHEMA)

resource = df["resource"][0] if "resource" in df.columns and len(df) > 0 else ""
n = len(missing)
details_col = [
json.dumps({"field": f, "issue_type": "missing-field"}) for f in missing
]

result = pl.DataFrame(
{
"dataset": pl.Series([dataset] * n, dtype=pl.Utf8),
"organisation": pl.Series([organisation] * n, dtype=pl.Utf8),
"endpoint": pl.Series([""] * n, dtype=pl.Utf8),
"resource": pl.Series([resource] * n, dtype=pl.Utf8),
"details": pl.Series(details_col, dtype=pl.Utf8),
"severity": pl.Series(["error"] * n, dtype=pl.Utf8),
"responsibility": pl.Series(["external"] * n, dtype=pl.Utf8),
"task-source": pl.Series(["column-field"] * n, dtype=pl.Utf8),
"entry-date": pl.Series([entry_date] * n, dtype=pl.Utf8),
}
)
result = result.with_columns(
pl.struct(["dataset", "endpoint", "resource", "task-source", "details"])
.map_elements(
lambda r: hashlib.sha256(
"|".join(
[
r["dataset"] or "",
r["endpoint"] or "",
r["resource"] or "",
r["task-source"],
r["details"],
]
).encode()
).hexdigest()[:16],
return_dtype=pl.Utf8,
)
.alias("reference")
)
return result


def _transform_issues_to_tasks(
df: pl.DataFrame,
organisation: str,
Expand Down Expand Up @@ -262,8 +352,8 @@ def _transform_issues_to_tasks(
result = grouped.select(
[
pl.col("dataset"),
pl.lit(organisation).alias("organisation"),
pl.lit(endpoint).alias("endpoint"),
pl.lit(organisation or "").alias("organisation"),
pl.lit(endpoint or "").alias("endpoint"),
pl.col("resource"),
pl.col("details"),
pl.col("severity"),
Expand Down
144 changes: 144 additions & 0 deletions tests/unit/pipeline/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,147 @@ def test_run_output_has_correct_columns(self, log_csv, tmp_path):
}
rows = _read_csv(output)
assert set(rows[0].keys()) == expected_cols


RESOURCE_HASH = "be5a869a80edf1eee0cedf66efc726ffe9c51e30f04d4ef976c5b3db4dbb5456"

COLUMN_FIELD_ROWS = (
"dataset,resource,column,field\n"
f"tree-preservation-zone,{RESOURCE_HASH},name,name\n"
f"tree-preservation-zone,{RESOURCE_HASH},geometry,geometry\n"
f"tree-preservation-zone,{RESOURCE_HASH},reference,reference\n"
)


@pytest.fixture
def column_field_csv(tmp_path):
path = tmp_path / "column_field.csv"
path.write_text(COLUMN_FIELD_ROWS)
return path


class TestColumnFieldTasks:

def test_missing_mandatory_field_creates_task(self, column_field_csv, tmp_path):
output = tmp_path / "tasks.csv"
status = TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=["name", "geometry", "start-date"],
)

assert status == TaskPipelineStatus.COMPLETE
rows = _read_csv(output)
assert len(rows) == 1
assert rows[0]["task-source"] == "column-field"
details = json.loads(rows[0]["details"])
assert details["field"] == "start-date"
assert details["issue_type"] == "missing-field"

def test_multiple_missing_fields_each_produce_a_task(
self, column_field_csv, tmp_path
):
output = tmp_path / "tasks.csv"
TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=["name", "start-date", "end-date"],
)

rows = _read_csv(output)
missing_fields = {json.loads(r["details"])["field"] for r in rows}
assert missing_fields == {"start-date", "end-date"}

def test_no_tasks_when_all_mandatory_fields_present(
self, column_field_csv, tmp_path
):
output = tmp_path / "tasks.csv"
status = TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=["name", "geometry"],
)

assert status == TaskPipelineStatus.COMPLETE
assert _read_csv(output) == []

def test_alternative_field_group_passes_when_one_present(
self, column_field_csv, tmp_path
):
"""A list inside mandatory_fields means 'any one of these is sufficient'."""
output = tmp_path / "tasks.csv"
TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=[["geometry", "point"]],
)

assert _read_csv(output) == []

def test_alternative_field_group_fails_when_none_present(
self, column_field_csv, tmp_path
):
"""All alternatives missing → both are appended as missing."""
output = tmp_path / "tasks.csv"
TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=[["start-date", "opening-date"]],
)

rows = _read_csv(output)
missing_fields = {json.loads(r["details"])["field"] for r in rows}
assert missing_fields == {"start-date", "opening-date"}

def test_column_field_path_without_mandatory_fields_is_skipped(
self, column_field_csv, tmp_path, caplog
):
import logging

output = tmp_path / "tasks.csv"
with caplog.at_level(logging.WARNING):
status = TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
)

assert status == TaskPipelineStatus.COMPLETE
assert _read_csv(output) == []
assert "mandatory_fields not supplied" in caplog.text

def test_column_field_task_output_columns(self, column_field_csv, tmp_path):
output = tmp_path / "tasks.csv"
TaskPipeline().run(
output_path=output,
dataset="tree-preservation-zone",
organisation="local-authority-eng:ABC",
endpoint="endpoint-aaa",
column_field_path=column_field_csv,
mandatory_fields=["start-date"],
)

rows = _read_csv(output)
assert rows[0]["dataset"] == "tree-preservation-zone"
assert rows[0]["organisation"] == "local-authority-eng:ABC"
assert rows[0]["severity"] == "error"
assert rows[0]["responsibility"] == "external"
assert rows[0]["resource"] == RESOURCE_HASH
assert rows[0]["reference"] != ""
Loading