Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## v0.5.2 (2026-02-02)

### Refactor

- allow passing of custom loggers into pipeline objects
- ensure traceback in broad exceptions
- improve the logging around dve processing errors and align reporting to module name rather than legacy name
- add sense check for text based file (#32)

## v0.5.1 (2026-01-28)

### Fix
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ behave:
${activate} behave

pytest:
${activate} pytest tests/
${activate} pytest -c pytest-dev.ini

all-tests: pytest behave

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "0.5.1"
version = "0.5.2"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down
3 changes: 3 additions & 0 deletions pytest-dev.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
log_cli = true
log_cli_level = INFO
39 changes: 38 additions & 1 deletion src/dve/core_engine/backends/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from pydantic import BaseModel
from typing_extensions import Protocol

from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport
from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport
from dve.core_engine.backends.types import EntityName, EntityType
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator
from dve.parser.file_handling.service import open_stream

T = TypeVar("T")
ET_co = TypeVar("ET_co", covariant=True)
Expand Down Expand Up @@ -116,6 +118,8 @@ def read_to_entity_type(
if entity_name == Iterator[dict[str, Any]]:
return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore

self.raise_if_not_sensible_file(resource, entity_name)

try:
reader_func = self.__read_methods__[entity_type]
except KeyError as err:
Expand All @@ -137,3 +141,36 @@ def write_parquet(

"""
raise NotImplementedError(f"write_parquet not implemented in {self.__class__}")

@staticmethod
def _check_likely_text_file(resource: URI) -> bool:
"""Quick sense check of file to see if it looks like text
- not 100% full proof, but hopefully enough to weed out most
non-text files"""
with open_stream(resource, "rb") as fle:
start_chunk = fle.read(4096)
# check for BOM character - utf-16 can contain NULL bytes
if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")):
return True
# if null byte in - unlikely text
if b"\x00" in start_chunk:
return False
return True

def raise_if_not_sensible_file(self, resource: URI, entity_name: str):
"""Sense check that the file is a text file. Raise error if doesn't
appear to be the case."""
if not self._check_likely_text_file(resource):
raise MessageBearingError(
"The submitted file doesn't appear to be text",
messages=[
FeedbackMessage(
entity=entity_name,
record=None,
failure_type="submission",
error_location="Whole File",
error_code="MalformedFile",
error_message="The resource doesn't seem to be a valid text file",
)
],
)
59 changes: 57 additions & 2 deletions src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
Expand All @@ -24,7 +25,14 @@

@duckdb_write_parquet
class DuckDBCSVReader(BaseFileReader):
"""A reader for CSV files"""
"""A reader for CSV files including the ability to compare the passed model
to the file header, if it exists.

field_check: flag to compare submitted file header to the accompanying pydantic model
field_check_error_code: The error code to provide if the file header doesn't contain
the expected fields
field_check_error_message: The error message to provide if the file header doesn't contain
the expected fields"""

# TODO - the read_to_relation should include the schema and determine whether to
# TODO - stringify or not
Expand All @@ -35,15 +43,43 @@ def __init__(
delim: str = ",",
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
field_check: bool = False,
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
field_check_error_message: Optional[str] = "The submitted header is missing fields",
**_,
):
self.header = header
self.delim = delim
self.quotechar = quotechar
self._connection = connection if connection else default_connection
self.field_check = field_check
self.field_check_error_code = field_check_error_code
self.field_check_error_message = field_check_error_message

super().__init__()

def perform_field_check(
self, resource: URI, entity_name: str, expected_schema: type[BaseModel]
):
"""Check that the header of the CSV aligns with the provided model"""
if not self.header:
raise ValueError("Cannot perform field check without a CSV header")

if missing := check_csv_header_expected(resource, expected_schema, self.delim):
raise MessageBearingError(
"The CSV header doesn't match what is expected",
messages=[
FeedbackMessage(
entity=entity_name,
record=None,
failure_type="submission",
error_location="Whole File",
error_code=self.field_check_error_code,
error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long
)
],
)

def read_to_py_iterator(
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
) -> Iterator[dict[str, Any]]:
Expand All @@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.field_check:
self.perform_field_check(resource, entity_name, schema)

reader_options: dict[str, Any] = {
"header": self.header,
"delimiter": self.delim,
Expand Down Expand Up @@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.field_check:
self.perform_field_check(resource, entity_name, schema)

reader_options: dict[str, Any] = {
"has_header": self.header,
"separator": self.delim,
Expand Down Expand Up @@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
| shop1 | clothes | 2025-01-01 |
"""

def __init__(
self,
*args,
non_unique_header_error_code: Optional[str] = "NonUniqueHeader",
non_unique_header_error_message: Optional[str] = None,
**kwargs,
):
self._non_unique_header_code = non_unique_header_error_code
self._non_unique_header_message = non_unique_header_error_message
super().__init__(*args, **kwargs)

@read_function(DuckDBPyRelation)
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
Expand All @@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument
failure_type="submission",
error_message=(
f"Found {no_records} distinct combination of header values."
if not self._non_unique_header_message
else self._non_unique_header_message
),
error_location=entity_name,
category="Bad file",
error_code="NonUniqueHeader",
error_code=self._non_unique_header_code,
)
],
)
Expand Down
21 changes: 21 additions & 0 deletions src/dve/core_engine/backends/readers/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""General utilities for file readers"""

from typing import Optional

from pydantic import BaseModel

from dve.core_engine.type_hints import URI
from dve.parser.file_handling.service import open_stream


def check_csv_header_expected(
resource: URI,
expected_schema: type[BaseModel],
delimiter: Optional[str] = ",",
quote_char: str = '"',
) -> set[str]:
"""Check the header of a CSV matches the expected fields"""
with open_stream(resource) as fle:
header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter)
expected_fields = expected_schema.__fields__.keys()
return set(expected_fields).difference(header_fields)
20 changes: 4 additions & 16 deletions src/dve/core_engine/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
"""Exceptions emitted by the pipeline."""

from collections.abc import Iterator
import traceback
from typing import Optional

from dve.core_engine.backends.implementations.spark.types import SparkEntities
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import Messages


class CriticalProcessingError(ValueError):
"""An exception emitted if critical errors are received."""
Expand All @@ -15,26 +11,18 @@ def __init__(
self,
error_message: str,
*args: object,
messages: Optional[Messages],
entities: Optional[SparkEntities] = None
messages: Optional[list[str]] = None,
) -> None:
super().__init__(error_message, *args)
self.error_message = error_message
"""The error message explaining the critical processing error."""
self.messages = messages
"""The messages gathered at the time the error was emitted."""
self.entities = entities
"""The entities as they exist at the time the error was emitted."""

@property
def critical_messages(self) -> Iterator[FeedbackMessage]:
"""Critical messages which caused the processing error."""
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
"""The stacktrace for the messages."""

@classmethod
def from_exception(cls, exc: Exception):
"""Create from broader exception, for recording in processing errors"""
return cls(error_message=repr(exc), entities=None, messages=[])
return cls(error_message=repr(exc), messages=traceback.format_exception(exc))


class EntityTypeMismatch(TypeError):
Expand Down
5 changes: 4 additions & 1 deletion src/dve/pipeline/duckdb_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""DuckDB implementation for `Pipeline` object."""

import logging
from typing import Optional

from duckdb import DuckDBPyConnection, DuckDBPyRelation
Expand All @@ -20,7 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline):
"""
Modified Pipeline class for running a DVE Pipeline with Spark
"""

# pylint: disable=R0913
def __init__(
self,
processed_files_path: URI,
Expand All @@ -30,6 +31,7 @@ def __init__(
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
job_run_id: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
self._connection = connection
super().__init__(
Expand All @@ -41,6 +43,7 @@ def __init__(
submitted_files_path,
reference_data_loader,
job_run_id,
logger,
)

# pylint: disable=arguments-differ
Expand Down
17 changes: 8 additions & 9 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def file_transformation(
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("File transformation raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
Expand All @@ -73,8 +72,7 @@ def apply_data_contract(
try:
return super().apply_data_contract(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Apply data contract raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"contract",
Expand All @@ -89,8 +87,7 @@ def apply_business_rules(
try:
return super().apply_business_rules(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Apply business rules raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"business_rules",
Expand All @@ -105,10 +102,11 @@ def error_report(
try:
return super().error_report(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Error reports raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Error reports raised exception:")
sub_stats = None
report_uri = None
submission_status = submission_status if submission_status else SubmissionStatus()
submission_status.processing_failed = True
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"error_report",
Expand Down Expand Up @@ -148,7 +146,8 @@ def run_pipeline(
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, submission_status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
if sub_stats:
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, this exception was raised: {err}"
Expand Down
Loading