diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dfab48..7da6c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## v0.5.2 (2026-02-02) + +### Refactor + +- allow passing of custom loggers into pipeline objects +- ensure traceback in broad exceptions +- improve the logging around dve processing errors and align reporting to module name rather than legacy name +- add sense check for text based file (#32) + ## v0.5.1 (2026-01-28) ### Fix diff --git a/Makefile b/Makefile index cfad520..7684514 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ behave: ${activate} behave pytest: - ${activate} pytest tests/ + ${activate} pytest -c pytest-dev.ini all-tests: pytest behave diff --git a/pyproject.toml b/pyproject.toml index cd3763c..ca0c98f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nhs_dve" -version = "0.5.1" +version = "0.5.2" description = "`nhs data validation engine` is a framework used to validate data" authors = ["NHS England "] readme = "README.md" diff --git a/pytest-dev.ini b/pytest-dev.ini new file mode 100644 index 0000000..11c72fa --- /dev/null +++ b/pytest-dev.ini @@ -0,0 +1,3 @@ +[pytest] +log_cli = true +log_cli_level = INFO diff --git a/src/dve/core_engine/backends/base/reader.py b/src/dve/core_engine/backends/base/reader.py index 9862e7e..54abaa9 100644 --- a/src/dve/core_engine/backends/base/reader.py +++ b/src/dve/core_engine/backends/base/reader.py @@ -8,9 +8,11 @@ from pydantic import BaseModel from typing_extensions import Protocol -from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport +from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport from dve.core_engine.backends.types import EntityName, EntityType +from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator +from dve.parser.file_handling.service import open_stream T = TypeVar("T") ET_co = TypeVar("ET_co", covariant=True) @@ -116,6 +118,8 @@ def read_to_entity_type( if entity_name == Iterator[dict[str, Any]]: return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore + self.raise_if_not_sensible_file(resource, entity_name) + try: reader_func = self.__read_methods__[entity_type] except KeyError as err: @@ -137,3 +141,36 @@ def write_parquet( """ raise NotImplementedError(f"write_parquet not implemented in {self.__class__}") + + @staticmethod + def _check_likely_text_file(resource: URI) -> bool: + """Quick sense check of file to see if it looks like text + - not 100% full proof, but hopefully enough to weed out most + non-text files""" + with open_stream(resource, "rb") as fle: + start_chunk = fle.read(4096) + # check for BOM character - utf-16 can contain NULL bytes + if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")): + return True + # if null byte in - unlikely text + if b"\x00" in start_chunk: + return False + return True + + def raise_if_not_sensible_file(self, resource: URI, entity_name: str): + """Sense check that the file is a text file. Raise error if doesn't + appear to be the case.""" + if not self._check_likely_text_file(resource): + raise MessageBearingError( + "The submitted file doesn't appear to be text", + messages=[ + FeedbackMessage( + entity=entity_name, + record=None, + failure_type="submission", + error_location="Whole File", + error_code="MalformedFile", + error_message="The resource doesn't seem to be a valid text file", + ) + ], + ) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index 3998bf5..ff65d9f 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -16,6 +16,7 @@ get_duckdb_type_from_annotation, ) from dve.core_engine.backends.implementations.duckdb.types import SQLType +from dve.core_engine.backends.readers.utilities import check_csv_header_expected from dve.core_engine.backends.utilities import get_polars_type_from_annotation from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, EntityName @@ -24,7 +25,14 @@ @duckdb_write_parquet class DuckDBCSVReader(BaseFileReader): - """A reader for CSV files""" + """A reader for CSV files including the ability to compare the passed model + to the file header, if it exists. + + field_check: flag to compare submitted file header to the accompanying pydantic model + field_check_error_code: The error code to provide if the file header doesn't contain + the expected fields + field_check_error_message: The error message to provide if the file header doesn't contain + the expected fields""" # TODO - the read_to_relation should include the schema and determine whether to # TODO - stringify or not @@ -35,15 +43,43 @@ def __init__( delim: str = ",", quotechar: str = '"', connection: Optional[DuckDBPyConnection] = None, + field_check: bool = False, + field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch", + field_check_error_message: Optional[str] = "The submitted header is missing fields", **_, ): self.header = header self.delim = delim self.quotechar = quotechar self._connection = connection if connection else default_connection + self.field_check = field_check + self.field_check_error_code = field_check_error_code + self.field_check_error_message = field_check_error_message super().__init__() + def perform_field_check( + self, resource: URI, entity_name: str, expected_schema: type[BaseModel] + ): + """Check that the header of the CSV aligns with the provided model""" + if not self.header: + raise ValueError("Cannot perform field check without a CSV header") + + if missing := check_csv_header_expected(resource, expected_schema, self.delim): + raise MessageBearingError( + "The CSV header doesn't match what is expected", + messages=[ + FeedbackMessage( + entity=entity_name, + record=None, + failure_type="submission", + error_location="Whole File", + error_code=self.field_check_error_code, + error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long + ) + ], + ) + def read_to_py_iterator( self, resource: URI, entity_name: EntityName, schema: type[BaseModel] ) -> Iterator[dict[str, Any]]: @@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + if self.field_check: + self.perform_field_check(resource, entity_name, schema) + reader_options: dict[str, Any] = { "header": self.header, "delimiter": self.delim, @@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + if self.field_check: + self.perform_field_check(resource, entity_name, schema) + reader_options: dict[str, Any] = { "has_header": self.header, "separator": self.delim, @@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader): | shop1 | clothes | 2025-01-01 | """ + def __init__( + self, + *args, + non_unique_header_error_code: Optional[str] = "NonUniqueHeader", + non_unique_header_error_message: Optional[str] = None, + **kwargs, + ): + self._non_unique_header_code = non_unique_header_error_code + self._non_unique_header_message = non_unique_header_error_message + super().__init__(*args, **kwargs) + @read_function(DuckDBPyRelation) def read_to_relation( # pylint: disable=unused-argument self, resource: URI, entity_name: EntityName, schema: type[BaseModel] @@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument failure_type="submission", error_message=( f"Found {no_records} distinct combination of header values." + if not self._non_unique_header_message + else self._non_unique_header_message ), error_location=entity_name, category="Bad file", - error_code="NonUniqueHeader", + error_code=self._non_unique_header_code, ) ], ) diff --git a/src/dve/core_engine/backends/readers/utilities.py b/src/dve/core_engine/backends/readers/utilities.py new file mode 100644 index 0000000..642c0b2 --- /dev/null +++ b/src/dve/core_engine/backends/readers/utilities.py @@ -0,0 +1,21 @@ +"""General utilities for file readers""" + +from typing import Optional + +from pydantic import BaseModel + +from dve.core_engine.type_hints import URI +from dve.parser.file_handling.service import open_stream + + +def check_csv_header_expected( + resource: URI, + expected_schema: type[BaseModel], + delimiter: Optional[str] = ",", + quote_char: str = '"', +) -> set[str]: + """Check the header of a CSV matches the expected fields""" + with open_stream(resource) as fle: + header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter) + expected_fields = expected_schema.__fields__.keys() + return set(expected_fields).difference(header_fields) diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index cba7508..4877baf 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,12 +1,8 @@ """Exceptions emitted by the pipeline.""" -from collections.abc import Iterator +import traceback from typing import Optional -from dve.core_engine.backends.implementations.spark.types import SparkEntities -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages - class CriticalProcessingError(ValueError): """An exception emitted if critical errors are received.""" @@ -15,26 +11,18 @@ def __init__( self, error_message: str, *args: object, - messages: Optional[Messages], - entities: Optional[SparkEntities] = None + messages: Optional[list[str]] = None, ) -> None: super().__init__(error_message, *args) self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages - """The messages gathered at the time the error was emitted.""" - self.entities = entities - """The entities as they exist at the time the error was emitted.""" - - @property - def critical_messages(self) -> Iterator[FeedbackMessage]: - """Critical messages which caused the processing error.""" - yield from filter(lambda message: message.is_critical, self.messages) # type: ignore + """The stacktrace for the messages.""" @classmethod def from_exception(cls, exc: Exception): """Create from broader exception, for recording in processing errors""" - return cls(error_message=repr(exc), entities=None, messages=[]) + return cls(error_message=repr(exc), messages=traceback.format_exception(exc)) class EntityTypeMismatch(TypeError): diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 96156a9..f04e71c 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -1,5 +1,6 @@ """DuckDB implementation for `Pipeline` object.""" +import logging from typing import Optional from duckdb import DuckDBPyConnection, DuckDBPyRelation @@ -20,7 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline): """ Modified Pipeline class for running a DVE Pipeline with Spark """ - + # pylint: disable=R0913 def __init__( self, processed_files_path: URI, @@ -30,6 +31,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._connection = connection super().__init__( @@ -41,6 +43,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 4c72375..45d2261 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -57,8 +57,7 @@ def file_transformation( try: return super().file_transformation(submission_info) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", @@ -73,8 +72,7 @@ def apply_data_contract( try: return super().apply_data_contract(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply data contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply data contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", @@ -89,8 +87,7 @@ def apply_business_rules( try: return super().apply_business_rules(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply business rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply business rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", @@ -105,10 +102,11 @@ def error_report( try: return super().error_report(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") sub_stats = None report_uri = None + submission_status = submission_status if submission_status else SubmissionStatus() + submission_status.processing_failed = True dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "error_report", @@ -148,7 +146,8 @@ def run_pipeline( sub_info, sub_status, sub_stats, report_uri = self.error_report( submission_info=submission_info, submission_status=sub_status ) - self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + if sub_stats: + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: # pylint: disable=W0718 self._logger.error( f"During processing of submission_id: {sub_id}, this exception was raised: {err}" diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 819656a..385687b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -1,6 +1,7 @@ # pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long """Generic Pipeline object to define how DVE should be interacted with.""" import json +import logging import re from collections import defaultdict from collections.abc import Generator, Iterable, Iterator @@ -57,6 +58,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -66,11 +68,16 @@ def __init__( self._audit_tables = audit_tables self._data_contract = data_contract self._step_implementations = step_implementations - self._logger = get_logger(__name__) + self._logger = logger or get_logger(__name__) self._summary_lock = Lock() self._rec_tracking_lock = Lock() self._aggregates_lock = Lock() + if self._data_contract: + self._data_contract.logger = self._logger + if self._step_implementations: + self._step_implementations.logger = self._logger + @property def job_run_id(self) -> Optional[int]: """Unique Identifier for the job/process that is running this Pipeline.""" @@ -244,8 +251,7 @@ def audit_received_file_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"audit_received_file raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("audit_received_file raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_id), "audit_received", @@ -301,8 +307,7 @@ def file_transformation( ) except MessageBearingError as exc: - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) + self._logger.exception("Unexpected file transformation error:") errors.extend(exc.messages) if errors: @@ -352,8 +357,7 @@ def file_transformation_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "file_transformation", @@ -478,8 +482,7 @@ def data_contract_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Data Contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Data Contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "contract", @@ -644,8 +647,7 @@ def business_rule_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Business Rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Business Rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "business_rules", @@ -704,9 +706,8 @@ def _get_error_dataframes(self, submission_id: str): errors = None try: errors = json.load(f) - except UnicodeDecodeError as exc: - self._logger.error(f"Error reading file: {file}") - self._logger.exception(exc) + except UnicodeDecodeError: + self._logger.exception(f"Error reading file: {file}") continue if not errors: continue @@ -845,8 +846,7 @@ def error_report_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "error_report", diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4111cf3..4b26163 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -1,5 +1,6 @@ """Spark implementation for `Pipeline` object.""" +import logging from concurrent.futures import Executor from typing import Optional @@ -22,7 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline): """ Polymorphed Pipeline class for running a DVE Pipeline with Spark """ - + # pylint: disable=R0913 def __init__( self, processed_files_path: URI, @@ -32,6 +33,7 @@ def __init__( reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( @@ -43,6 +45,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index 8832b6a..1cb0b45 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -61,7 +61,7 @@ def dump_processing_errors( if not errors: raise AttributeError("errors list not passed") - error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json") + error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json") processed = [] for error in errors: @@ -71,6 +71,7 @@ def dump_processing_errors( "error_location": "processing", "error_level": "integrity", "error_message": error.error_message, + "error_traceback": error.messages, } ) diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py index 8490ab5..2899dc6 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py @@ -1,4 +1,3 @@ -from typing import Dict, List import pytest from dve.core_engine.backends.implementations.duckdb.utilities import ( @@ -16,7 +15,7 @@ ), ], ) -def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]): +def test_expr_mapping_to_columns(expressions: dict[str, str], expected: list[str]): observed = expr_mapping_to_columns(expressions) assert observed == expected @@ -51,6 +50,7 @@ def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str ), ], ) -def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]): +def test_expr_array_to_columns(expressions: dict[str, str], expected: list[str]): observed = expr_array_to_columns(expressions) assert observed == expected + diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py index 900632d..c326fef 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py @@ -57,7 +57,7 @@ def test_ddb_json_reader_all_str(temp_json_file): expected_fields = [fld for fld in mdl.__fields__] reader = DuckDBJSONReader() rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl) ) assert rel.columns == expected_fields assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in expected_fields} @@ -68,7 +68,7 @@ def test_ddb_json_reader_cast(temp_json_file): uri, data, mdl = temp_json_file expected_fields = [fld for fld in mdl.__fields__] reader = DuckDBJSONReader() - rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl) + rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri.as_posix(), "test", mdl) assert rel.columns == expected_fields assert dict(zip(rel.columns, rel.dtypes)) == { @@ -82,7 +82,7 @@ def test_ddb_csv_write_parquet(temp_json_file): uri, _, mdl = temp_json_file reader = DuckDBJSONReader() rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl) ) target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() reader.write_parquet(rel, target_loc) diff --git a/tests/test_core_engine/test_backends/test_readers/test_utilities.py b/tests/test_core_engine/test_backends/test_readers/test_utilities.py new file mode 100644 index 0000000..4426769 --- /dev/null +++ b/tests/test_core_engine/test_backends/test_readers/test_utilities.py @@ -0,0 +1,55 @@ +import datetime as dt +from pathlib import Path +import tempfile +from uuid import uuid4 + +import pytest +from pydantic import BaseModel, create_model + +from dve.core_engine.backends.readers.utilities import check_csv_header_expected + +@pytest.mark.parametrize( + ["header_row", "delim", "schema", "expected"], + [ + ( + "field1,field2,field3", + ",", + {"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)}, + set(), + ), + ( + "field2,field3,field1", + ",", + {"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)}, + set(), + ), + ( + "str_field|int_field|date_field|", + ",", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + {"str_field","int_field","date_field"}, + ), + ( + '"str_field"|"int_field"|"date_field"', + "|", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + set(), + ), + ( + 'str_field,int_field,date_field\n', + ",", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + set(), + ), + + ], +) +def test_check_csv_header_expected( + header_row: str, delim: str, schema: type[BaseModel], expected: set[str] +): + mdl = create_model("TestModel", **schema) + with tempfile.TemporaryDirectory() as tmpdir: + fle = Path(tmpdir).joinpath(f"test_file_{uuid4().hex}.csv") + fle.open("w+").write(header_row) + res = check_csv_header_expected(fle.as_posix(), mdl, delim) + assert res == expected \ No newline at end of file diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index f0eecc6..68fec99 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -8,7 +8,7 @@ import tempfile from uuid import uuid4 -import pytest +import polars as pl from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader @@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert not fh.get_resource_exists(report_uri) assert not output_loc + + perror_path = Path( + processing_folder, + sub_info.submission_id, + "processing_errors", + "processing_errors.json" + ) + assert perror_path.exists() + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_traceback": pl.List(pl.Utf8()), + } + expected_error_df = ( + pl.DataFrame( + [ + { + "step_name": "file_transformation", + "error_location": "processing", + "error_level": "integrity", + "error_message": "ReaderLacksEntityTypeSupport()", + "error_traceback": None, + }, + ], + perror_schema + ) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + actual_error_df = ( + pl.read_json(perror_path, schema=perror_schema) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + assert actual_error_df.equals(expected_error_df) + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_error_reporting/__init__.py b/tests/test_reporting/__init__.py similarity index 100% rename from tests/test_error_reporting/__init__.py rename to tests/test_reporting/__init__.py diff --git a/tests/test_error_reporting/test_excel_report.py b/tests/test_reporting/test_excel_report.py similarity index 100% rename from tests/test_error_reporting/test_excel_report.py rename to tests/test_reporting/test_excel_report.py diff --git a/tests/test_reporting/test_utils.py b/tests/test_reporting/test_utils.py new file mode 100644 index 0000000..c240ca5 --- /dev/null +++ b/tests/test_reporting/test_utils.py @@ -0,0 +1,51 @@ +"""test utility functions & objects in dve.reporting module""" + +import tempfile +from pathlib import Path + +import polars as pl + +from dve.core_engine.exceptions import CriticalProcessingError +from dve.reporting.utils import dump_processing_errors + +# pylint: disable=C0116 + + +def test_dump_processing_errors(): + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_stacktrace": pl.List(pl.Utf8()), + } + with tempfile.TemporaryDirectory() as temp_dir: + dump_processing_errors( + temp_dir, + "test_step", + [CriticalProcessingError("test error message")] + ) + + output_path = Path(temp_dir, "processing_errors") + + assert output_path.exists() + assert len(list(output_path.iterdir())) == 1 + + expected_df = pl.DataFrame( + [ + { + "step_name": "test_step", + "error_location": "processing", + "error_level": "integrity", + "error_message": "test error message", + "error_stacktrace": None, + }, + ], + perror_schema + ) + error_df = pl.read_json( + Path(output_path, "processing_errors.json") + ) + cols_to_check = ["step_name", "error_location", "error_level", "error_message"] + + assert error_df.select(pl.col(k) for k in cols_to_check).equals(expected_df.select(pl.col(k) for k in cols_to_check))