From 48694828bf52b9ce898690d89b47fb261c4290b2 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 28 Jan 2026 13:20:33 +0000 Subject: [PATCH 01/12] fix: deal with pathing assumption that file had been moved to processed_file_path during file transformation --- src/dve/pipeline/foundry_ddb_pipeline.py | 15 ++++++- .../test_foundry_ddb_pipeline.py | 40 +++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 5e0b757..e087072 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -1,6 +1,8 @@ # pylint: disable=W0223 """A duckdb pipeline for running on Foundry platform""" +import shutil +from pathlib import Path from typing import Optional from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( @@ -23,6 +25,15 @@ class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" + def _move_submission_to_processing_files_path(self, submission_info: SubmissionInfo): + """Move submitted file to 'processed_files_path'.""" + _submitted_file_location = Path( + self._submitted_files_path, submission_info.file_name_with_ext # type: ignore + ) + _dest = Path(self.processed_files_path, submission_info.submission_id) + _dest.mkdir(parents=True, exist_ok=True) + shutil.copy2(_submitted_file_location, _dest) + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") @@ -113,8 +124,8 @@ def run_pipeline( try: sub_id: str = submission_info.submission_id report_uri = None - self._audit_tables.add_new_submissions(submissions=[submission_info]) - self._audit_tables.mark_transform(submission_ids=[sub_id]) + if self._submitted_files_path: + self._move_submission_to_processing_files_path(submission_info) sub_info, sub_status = self.file_transformation(submission_info=submission_info) if not (sub_status.validation_failed or sub_status.processing_failed): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index 49440fc..f0eecc6 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -5,6 +5,7 @@ from datetime import datetime from pathlib import Path import shutil +import tempfile from uuid import uuid4 import pytest @@ -116,3 +117,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): assert not fh.get_resource_exists(report_uri) assert not output_loc assert len(list(fh.iter_prefix(audit_files))) == 2 + + +def test_foundry_runner_with_submitted_files_path(movies_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + ref_db_file = Path(db_file.parent, "movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet( + get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix() + ).to_table("movies_refdata.sequels") + processing_folder = Path(tempfile.mkdtemp()).as_posix() + submitted_files_path = Path(movies_test_files).as_posix() + sub_id = uuid4().hex + sub_info = SubmissionInfo( + submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5) + ) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + processed_files_path=processing_folder, + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + submitted_files_path=submitted_files_path, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + + assert Path(processing_folder, sub_id, sub_info.file_name_with_ext).exists() + assert fh.get_resource_exists(report_uri) + assert len(list(fh.iter_prefix(output_loc))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 From cee39a009eebdbd835cc2b515f0e6b19fb4834c1 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 28 Jan 2026 14:16:05 +0000 Subject: [PATCH 02/12] fix: re-add audit table logging after accidental removal --- src/dve/pipeline/foundry_ddb_pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index e087072..4c72375 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -126,6 +126,8 @@ def run_pipeline( report_uri = None if self._submitted_files_path: self._move_submission_to_processing_files_path(submission_info) + self._audit_tables.add_new_submissions(submissions=[submission_info]) + self._audit_tables.mark_transform(submission_ids=[sub_id]) sub_info, sub_status = self.file_transformation(submission_info=submission_info) if not (sub_status.validation_failed or sub_status.processing_failed): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) From 2d886ac70f6624ec47896643688afe1c8bb50c50 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:50:42 +0000 Subject: [PATCH 03/12] =?UTF-8?q?bump:=20version=200.5.0=20=E2=86=92=200.5?= =?UTF-8?q?.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 7 +++++++ pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ebd629..88a8acc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## v0.5.1 (2026-01-28) + +### Fix + +- re-add audit table logging after accidental removal +- deal with pathing assumption that file had been moved to processed_file_path during file transformation + ## v0.5.0 (2026-01-16) ### Feat diff --git a/pyproject.toml b/pyproject.toml index 52263a9..cd3763c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nhs_dve" -version = "0.5.0" +version = "0.5.1" description = "`nhs data validation engine` is a framework used to validate data" authors = ["NHS England "] readme = "README.md" From cefa5530b21a6683ecaa87d93f68dddf44aa9e5b Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:59:07 +0000 Subject: [PATCH 04/12] docs: condense patch notes for v0.5.1 --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88a8acc..8dfab48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,6 @@ ### Fix -- re-add audit table logging after accidental removal - deal with pathing assumption that file had been moved to processed_file_path during file transformation ## v0.5.0 (2026-01-16) From 4bbdf02e09a4ca15684379970ce799582008c7fc Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Fri, 30 Jan 2026 17:38:37 +0000 Subject: [PATCH 05/12] build: add pytest dev settings with live log --- Makefile | 2 +- pytest-dev.ini | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 pytest-dev.ini diff --git a/Makefile b/Makefile index cfad520..7684514 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ behave: ${activate} behave pytest: - ${activate} pytest tests/ + ${activate} pytest -c pytest-dev.ini all-tests: pytest behave diff --git a/pytest-dev.ini b/pytest-dev.ini new file mode 100644 index 0000000..11c72fa --- /dev/null +++ b/pytest-dev.ini @@ -0,0 +1,3 @@ +[pytest] +log_cli = true +log_cli_level = INFO From cdd5fc809c80b98b315381fdb5f9706f57223445 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Fri, 30 Jan 2026 17:42:11 +0000 Subject: [PATCH 06/12] refactor: improve the logging around dve processing errors and align reporting to module name rather than legacy name --- src/dve/core_engine/exceptions.py | 20 ++------ src/dve/reporting/utils.py | 3 +- .../test_foundry_ddb_pipeline.py | 38 +++++++++++++- .../__init__.py | 0 .../test_excel_report.py | 0 tests/test_reporting/test_utils.py | 51 +++++++++++++++++++ 6 files changed, 94 insertions(+), 18 deletions(-) rename tests/{test_error_reporting => test_reporting}/__init__.py (100%) rename tests/{test_error_reporting => test_reporting}/test_excel_report.py (100%) create mode 100644 tests/test_reporting/test_utils.py diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index cba7508..4877baf 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,12 +1,8 @@ """Exceptions emitted by the pipeline.""" -from collections.abc import Iterator +import traceback from typing import Optional -from dve.core_engine.backends.implementations.spark.types import SparkEntities -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages - class CriticalProcessingError(ValueError): """An exception emitted if critical errors are received.""" @@ -15,26 +11,18 @@ def __init__( self, error_message: str, *args: object, - messages: Optional[Messages], - entities: Optional[SparkEntities] = None + messages: Optional[list[str]] = None, ) -> None: super().__init__(error_message, *args) self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages - """The messages gathered at the time the error was emitted.""" - self.entities = entities - """The entities as they exist at the time the error was emitted.""" - - @property - def critical_messages(self) -> Iterator[FeedbackMessage]: - """Critical messages which caused the processing error.""" - yield from filter(lambda message: message.is_critical, self.messages) # type: ignore + """The stacktrace for the messages.""" @classmethod def from_exception(cls, exc: Exception): """Create from broader exception, for recording in processing errors""" - return cls(error_message=repr(exc), entities=None, messages=[]) + return cls(error_message=repr(exc), messages=traceback.format_exception(exc)) class EntityTypeMismatch(TypeError): diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index 8832b6a..1cb0b45 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -61,7 +61,7 @@ def dump_processing_errors( if not errors: raise AttributeError("errors list not passed") - error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json") + error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json") processed = [] for error in errors: @@ -71,6 +71,7 @@ def dump_processing_errors( "error_location": "processing", "error_level": "integrity", "error_message": error.error_message, + "error_traceback": error.messages, } ) diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index f0eecc6..68fec99 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -8,7 +8,7 @@ import tempfile from uuid import uuid4 -import pytest +import polars as pl from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader @@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert not fh.get_resource_exists(report_uri) assert not output_loc + + perror_path = Path( + processing_folder, + sub_info.submission_id, + "processing_errors", + "processing_errors.json" + ) + assert perror_path.exists() + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_traceback": pl.List(pl.Utf8()), + } + expected_error_df = ( + pl.DataFrame( + [ + { + "step_name": "file_transformation", + "error_location": "processing", + "error_level": "integrity", + "error_message": "ReaderLacksEntityTypeSupport()", + "error_traceback": None, + }, + ], + perror_schema + ) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + actual_error_df = ( + pl.read_json(perror_path, schema=perror_schema) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + assert actual_error_df.equals(expected_error_df) + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_error_reporting/__init__.py b/tests/test_reporting/__init__.py similarity index 100% rename from tests/test_error_reporting/__init__.py rename to tests/test_reporting/__init__.py diff --git a/tests/test_error_reporting/test_excel_report.py b/tests/test_reporting/test_excel_report.py similarity index 100% rename from tests/test_error_reporting/test_excel_report.py rename to tests/test_reporting/test_excel_report.py diff --git a/tests/test_reporting/test_utils.py b/tests/test_reporting/test_utils.py new file mode 100644 index 0000000..c240ca5 --- /dev/null +++ b/tests/test_reporting/test_utils.py @@ -0,0 +1,51 @@ +"""test utility functions & objects in dve.reporting module""" + +import tempfile +from pathlib import Path + +import polars as pl + +from dve.core_engine.exceptions import CriticalProcessingError +from dve.reporting.utils import dump_processing_errors + +# pylint: disable=C0116 + + +def test_dump_processing_errors(): + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_stacktrace": pl.List(pl.Utf8()), + } + with tempfile.TemporaryDirectory() as temp_dir: + dump_processing_errors( + temp_dir, + "test_step", + [CriticalProcessingError("test error message")] + ) + + output_path = Path(temp_dir, "processing_errors") + + assert output_path.exists() + assert len(list(output_path.iterdir())) == 1 + + expected_df = pl.DataFrame( + [ + { + "step_name": "test_step", + "error_location": "processing", + "error_level": "integrity", + "error_message": "test error message", + "error_stacktrace": None, + }, + ], + perror_schema + ) + error_df = pl.read_json( + Path(output_path, "processing_errors.json") + ) + cols_to_check = ["step_name", "error_location", "error_level", "error_message"] + + assert error_df.select(pl.col(k) for k in cols_to_check).equals(expected_df.select(pl.col(k) for k in cols_to_check)) From 5c96472a5dd1e66cbed5e5f8e476e67f3466f7e7 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Mon, 2 Feb 2026 12:27:43 +0000 Subject: [PATCH 07/12] refactor: ensure traceback in broad exceptions --- src/dve/pipeline/foundry_ddb_pipeline.py | 12 ++++-------- src/dve/pipeline/pipeline.py | 23 ++++++++--------------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 4c72375..27bc84b 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -57,8 +57,7 @@ def file_transformation( try: return super().file_transformation(submission_info) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", @@ -73,8 +72,7 @@ def apply_data_contract( try: return super().apply_data_contract(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply data contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply data contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", @@ -89,8 +87,7 @@ def apply_business_rules( try: return super().apply_business_rules(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply business rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply business rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", @@ -105,8 +102,7 @@ def error_report( try: return super().error_report(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") sub_stats = None report_uri = None dump_processing_errors( diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 819656a..e50674a 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -244,8 +244,7 @@ def audit_received_file_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"audit_received_file raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("audit_received_file raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_id), "audit_received", @@ -301,8 +300,7 @@ def file_transformation( ) except MessageBearingError as exc: - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) + self._logger.exception("Unexpected file transformation error:") errors.extend(exc.messages) if errors: @@ -352,8 +350,7 @@ def file_transformation_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "file_transformation", @@ -478,8 +475,7 @@ def data_contract_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Data Contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Data Contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "contract", @@ -644,8 +640,7 @@ def business_rule_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Business Rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Business Rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "business_rules", @@ -704,9 +699,8 @@ def _get_error_dataframes(self, submission_id: str): errors = None try: errors = json.load(f) - except UnicodeDecodeError as exc: - self._logger.error(f"Error reading file: {file}") - self._logger.exception(exc) + except UnicodeDecodeError: + self._logger.exception(f"Error reading file: {file}") continue if not errors: continue @@ -845,8 +839,7 @@ def error_report_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "error_report", From 7110ef66861c603783d13b9185ad604dc8814565 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:05:14 +0000 Subject: [PATCH 08/12] refactor: add sense check for text based file (#32) * feat: Added new option to check csv headers in duckdb csv readers * refactor: small changes to foundry pipeline and duckdb csv to fix header check * feat: Added new check to base reader for sense check of whether processing a text file - applied for all reads via read_to_entity_type * refactor: Address review comments --- src/dve/core_engine/backends/base/reader.py | 39 +++++++++++- .../implementations/duckdb/readers/csv.py | 59 ++++++++++++++++++- .../core_engine/backends/readers/utilities.py | 21 +++++++ src/dve/pipeline/foundry_ddb_pipeline.py | 5 +- .../test_duckdb/test_ddb_utils.py | 6 +- .../test_readers/test_ddb_json.py | 6 +- .../test_readers/test_utilities.py | 55 +++++++++++++++++ 7 files changed, 181 insertions(+), 10 deletions(-) create mode 100644 src/dve/core_engine/backends/readers/utilities.py create mode 100644 tests/test_core_engine/test_backends/test_readers/test_utilities.py diff --git a/src/dve/core_engine/backends/base/reader.py b/src/dve/core_engine/backends/base/reader.py index 9862e7e..54abaa9 100644 --- a/src/dve/core_engine/backends/base/reader.py +++ b/src/dve/core_engine/backends/base/reader.py @@ -8,9 +8,11 @@ from pydantic import BaseModel from typing_extensions import Protocol -from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport +from dve.core_engine.backends.exceptions import MessageBearingError, ReaderLacksEntityTypeSupport from dve.core_engine.backends.types import EntityName, EntityType +from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, ArbitraryFunction, WrapDecorator +from dve.parser.file_handling.service import open_stream T = TypeVar("T") ET_co = TypeVar("ET_co", covariant=True) @@ -116,6 +118,8 @@ def read_to_entity_type( if entity_name == Iterator[dict[str, Any]]: return self.read_to_py_iterator(resource, entity_name, schema) # type: ignore + self.raise_if_not_sensible_file(resource, entity_name) + try: reader_func = self.__read_methods__[entity_type] except KeyError as err: @@ -137,3 +141,36 @@ def write_parquet( """ raise NotImplementedError(f"write_parquet not implemented in {self.__class__}") + + @staticmethod + def _check_likely_text_file(resource: URI) -> bool: + """Quick sense check of file to see if it looks like text + - not 100% full proof, but hopefully enough to weed out most + non-text files""" + with open_stream(resource, "rb") as fle: + start_chunk = fle.read(4096) + # check for BOM character - utf-16 can contain NULL bytes + if start_chunk.startswith((b"\xff\xfe", b"\xfe\xff")): + return True + # if null byte in - unlikely text + if b"\x00" in start_chunk: + return False + return True + + def raise_if_not_sensible_file(self, resource: URI, entity_name: str): + """Sense check that the file is a text file. Raise error if doesn't + appear to be the case.""" + if not self._check_likely_text_file(resource): + raise MessageBearingError( + "The submitted file doesn't appear to be text", + messages=[ + FeedbackMessage( + entity=entity_name, + record=None, + failure_type="submission", + error_location="Whole File", + error_code="MalformedFile", + error_message="The resource doesn't seem to be a valid text file", + ) + ], + ) diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index 3998bf5..ff65d9f 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -16,6 +16,7 @@ get_duckdb_type_from_annotation, ) from dve.core_engine.backends.implementations.duckdb.types import SQLType +from dve.core_engine.backends.readers.utilities import check_csv_header_expected from dve.core_engine.backends.utilities import get_polars_type_from_annotation from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, EntityName @@ -24,7 +25,14 @@ @duckdb_write_parquet class DuckDBCSVReader(BaseFileReader): - """A reader for CSV files""" + """A reader for CSV files including the ability to compare the passed model + to the file header, if it exists. + + field_check: flag to compare submitted file header to the accompanying pydantic model + field_check_error_code: The error code to provide if the file header doesn't contain + the expected fields + field_check_error_message: The error message to provide if the file header doesn't contain + the expected fields""" # TODO - the read_to_relation should include the schema and determine whether to # TODO - stringify or not @@ -35,15 +43,43 @@ def __init__( delim: str = ",", quotechar: str = '"', connection: Optional[DuckDBPyConnection] = None, + field_check: bool = False, + field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch", + field_check_error_message: Optional[str] = "The submitted header is missing fields", **_, ): self.header = header self.delim = delim self.quotechar = quotechar self._connection = connection if connection else default_connection + self.field_check = field_check + self.field_check_error_code = field_check_error_code + self.field_check_error_message = field_check_error_message super().__init__() + def perform_field_check( + self, resource: URI, entity_name: str, expected_schema: type[BaseModel] + ): + """Check that the header of the CSV aligns with the provided model""" + if not self.header: + raise ValueError("Cannot perform field check without a CSV header") + + if missing := check_csv_header_expected(resource, expected_schema, self.delim): + raise MessageBearingError( + "The CSV header doesn't match what is expected", + messages=[ + FeedbackMessage( + entity=entity_name, + record=None, + failure_type="submission", + error_location="Whole File", + error_code=self.field_check_error_code, + error_message=f"{self.field_check_error_message} - missing fields: {missing}", # pylint: disable=line-too-long + ) + ], + ) + def read_to_py_iterator( self, resource: URI, entity_name: EntityName, schema: type[BaseModel] ) -> Iterator[dict[str, Any]]: @@ -58,6 +94,9 @@ def read_to_relation( # pylint: disable=unused-argument if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + if self.field_check: + self.perform_field_check(resource, entity_name, schema) + reader_options: dict[str, Any] = { "header": self.header, "delimiter": self.delim, @@ -89,6 +128,9 @@ def read_to_relation( # pylint: disable=unused-argument if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + if self.field_check: + self.perform_field_check(resource, entity_name, schema) + reader_options: dict[str, Any] = { "has_header": self.header, "separator": self.delim, @@ -132,6 +174,17 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader): | shop1 | clothes | 2025-01-01 | """ + def __init__( + self, + *args, + non_unique_header_error_code: Optional[str] = "NonUniqueHeader", + non_unique_header_error_message: Optional[str] = None, + **kwargs, + ): + self._non_unique_header_code = non_unique_header_error_code + self._non_unique_header_message = non_unique_header_error_message + super().__init__(*args, **kwargs) + @read_function(DuckDBPyRelation) def read_to_relation( # pylint: disable=unused-argument self, resource: URI, entity_name: EntityName, schema: type[BaseModel] @@ -156,10 +209,12 @@ def read_to_relation( # pylint: disable=unused-argument failure_type="submission", error_message=( f"Found {no_records} distinct combination of header values." + if not self._non_unique_header_message + else self._non_unique_header_message ), error_location=entity_name, category="Bad file", - error_code="NonUniqueHeader", + error_code=self._non_unique_header_code, ) ], ) diff --git a/src/dve/core_engine/backends/readers/utilities.py b/src/dve/core_engine/backends/readers/utilities.py new file mode 100644 index 0000000..642c0b2 --- /dev/null +++ b/src/dve/core_engine/backends/readers/utilities.py @@ -0,0 +1,21 @@ +"""General utilities for file readers""" + +from typing import Optional + +from pydantic import BaseModel + +from dve.core_engine.type_hints import URI +from dve.parser.file_handling.service import open_stream + + +def check_csv_header_expected( + resource: URI, + expected_schema: type[BaseModel], + delimiter: Optional[str] = ",", + quote_char: str = '"', +) -> set[str]: + """Check the header of a CSV matches the expected fields""" + with open_stream(resource) as fle: + header_fields = fle.readline().rstrip().replace(quote_char, "").split(delimiter) + expected_fields = expected_schema.__fields__.keys() + return set(expected_fields).difference(header_fields) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 4c72375..f667d6e 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -109,6 +109,8 @@ def error_report( self._logger.exception(exc) sub_stats = None report_uri = None + submission_status = submission_status if submission_status else SubmissionStatus() + submission_status.processing_failed = True dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "error_report", @@ -148,7 +150,8 @@ def run_pipeline( sub_info, sub_status, sub_stats, report_uri = self.error_report( submission_info=submission_info, submission_status=sub_status ) - self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + if sub_stats: + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: # pylint: disable=W0718 self._logger.error( f"During processing of submission_id: {sub_id}, this exception was raised: {err}" diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py index 8490ab5..2899dc6 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py @@ -1,4 +1,3 @@ -from typing import Dict, List import pytest from dve.core_engine.backends.implementations.duckdb.utilities import ( @@ -16,7 +15,7 @@ ), ], ) -def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]): +def test_expr_mapping_to_columns(expressions: dict[str, str], expected: list[str]): observed = expr_mapping_to_columns(expressions) assert observed == expected @@ -51,6 +50,7 @@ def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str ), ], ) -def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]): +def test_expr_array_to_columns(expressions: dict[str, str], expected: list[str]): observed = expr_array_to_columns(expressions) assert observed == expected + diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py index 900632d..c326fef 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py @@ -57,7 +57,7 @@ def test_ddb_json_reader_all_str(temp_json_file): expected_fields = [fld for fld in mdl.__fields__] reader = DuckDBJSONReader() rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl) ) assert rel.columns == expected_fields assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in expected_fields} @@ -68,7 +68,7 @@ def test_ddb_json_reader_cast(temp_json_file): uri, data, mdl = temp_json_file expected_fields = [fld for fld in mdl.__fields__] reader = DuckDBJSONReader() - rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl) + rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri.as_posix(), "test", mdl) assert rel.columns == expected_fields assert dict(zip(rel.columns, rel.dtypes)) == { @@ -82,7 +82,7 @@ def test_ddb_csv_write_parquet(temp_json_file): uri, _, mdl = temp_json_file reader = DuckDBJSONReader() rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl) ) target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() reader.write_parquet(rel, target_loc) diff --git a/tests/test_core_engine/test_backends/test_readers/test_utilities.py b/tests/test_core_engine/test_backends/test_readers/test_utilities.py new file mode 100644 index 0000000..4426769 --- /dev/null +++ b/tests/test_core_engine/test_backends/test_readers/test_utilities.py @@ -0,0 +1,55 @@ +import datetime as dt +from pathlib import Path +import tempfile +from uuid import uuid4 + +import pytest +from pydantic import BaseModel, create_model + +from dve.core_engine.backends.readers.utilities import check_csv_header_expected + +@pytest.mark.parametrize( + ["header_row", "delim", "schema", "expected"], + [ + ( + "field1,field2,field3", + ",", + {"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)}, + set(), + ), + ( + "field2,field3,field1", + ",", + {"field1": (str, ...), "field2": (int, ...), "field3": (float, 1.2)}, + set(), + ), + ( + "str_field|int_field|date_field|", + ",", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + {"str_field","int_field","date_field"}, + ), + ( + '"str_field"|"int_field"|"date_field"', + "|", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + set(), + ), + ( + 'str_field,int_field,date_field\n', + ",", + {"str_field": (str, ...), "int_field": (int, ...), "date_field": (dt.date, dt.date.today())}, + set(), + ), + + ], +) +def test_check_csv_header_expected( + header_row: str, delim: str, schema: type[BaseModel], expected: set[str] +): + mdl = create_model("TestModel", **schema) + with tempfile.TemporaryDirectory() as tmpdir: + fle = Path(tmpdir).joinpath(f"test_file_{uuid4().hex}.csv") + fle.open("w+").write(header_row) + res = check_csv_header_expected(fle.as_posix(), mdl, delim) + assert res == expected \ No newline at end of file From 8aa4973d3056ed1afbf7be4bcb1d687b3d34758a Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:10:21 +0000 Subject: [PATCH 09/12] refactor: allow passing of custom loggers into pipeline objects --- src/dve/pipeline/duckdb_pipeline.py | 3 +++ src/dve/pipeline/pipeline.py | 9 ++++++++- src/dve/pipeline/spark_pipeline.py | 3 +++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 96156a9..c6cf6bc 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -1,5 +1,6 @@ """DuckDB implementation for `Pipeline` object.""" +import logging from typing import Optional from duckdb import DuckDBPyConnection, DuckDBPyRelation @@ -30,6 +31,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._connection = connection super().__init__( @@ -41,6 +43,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index e50674a..385687b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -1,6 +1,7 @@ # pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long """Generic Pipeline object to define how DVE should be interacted with.""" import json +import logging import re from collections import defaultdict from collections.abc import Generator, Iterable, Iterator @@ -57,6 +58,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -66,11 +68,16 @@ def __init__( self._audit_tables = audit_tables self._data_contract = data_contract self._step_implementations = step_implementations - self._logger = get_logger(__name__) + self._logger = logger or get_logger(__name__) self._summary_lock = Lock() self._rec_tracking_lock = Lock() self._aggregates_lock = Lock() + if self._data_contract: + self._data_contract.logger = self._logger + if self._step_implementations: + self._step_implementations.logger = self._logger + @property def job_run_id(self) -> Optional[int]: """Unique Identifier for the job/process that is running this Pipeline.""" diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4111cf3..853677e 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -1,5 +1,6 @@ """Spark implementation for `Pipeline` object.""" +import logging from concurrent.futures import Executor from typing import Optional @@ -32,6 +33,7 @@ def __init__( reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( @@ -43,6 +45,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ From 9b8f3f564dd8c755a05cbfdcf28350d01ee22147 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Mon, 2 Feb 2026 14:48:28 +0000 Subject: [PATCH 10/12] =?UTF-8?q?bump:=20version=200.5.1=20=E2=86=92=200.5?= =?UTF-8?q?.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 9 +++++++++ pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dfab48..7da6c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## v0.5.2 (2026-02-02) + +### Refactor + +- allow passing of custom loggers into pipeline objects +- ensure traceback in broad exceptions +- improve the logging around dve processing errors and align reporting to module name rather than legacy name +- add sense check for text based file (#32) + ## v0.5.1 (2026-01-28) ### Fix diff --git a/pyproject.toml b/pyproject.toml index cd3763c..ca0c98f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nhs_dve" -version = "0.5.1" +version = "0.5.2" description = "`nhs data validation engine` is a framework used to validate data" authors = ["NHS England "] readme = "README.md" From b111c1ca5a8eda52d06d95063ca64c91815de22a Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:07:40 +0000 Subject: [PATCH 11/12] style: fix linting --- src/dve/pipeline/duckdb_pipeline.py | 2 +- src/dve/pipeline/spark_pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index c6cf6bc..f04e71c 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -21,7 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline): """ Modified Pipeline class for running a DVE Pipeline with Spark """ - + # pylint: disable=R0913 def __init__( self, processed_files_path: URI, diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 853677e..4b26163 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -23,7 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline): """ Polymorphed Pipeline class for running a DVE Pipeline with Spark """ - + # pylint: disable=R0913 def __init__( self, processed_files_path: URI, From aaea47303cf03b5314bb46a63b1cd7f7ae138e0f Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Fri, 6 Feb 2026 12:18:07 +0000 Subject: [PATCH 12/12] style: Additional logging around file transformation, business rules and error reports (#36) * style: Additional logging around file transformation, business rules and error reports * style: remove unnecessary f string * refactor: further logging details * refactor: tweaked some logging messages following review --- src/dve/core_engine/backends/base/rules.py | 12 ++++++++++++ .../backends/implementations/duckdb/contract.py | 4 +++- .../backends/implementations/spark/contract.py | 3 +++ src/dve/pipeline/duckdb_pipeline.py | 1 + src/dve/pipeline/pipeline.py | 15 +++++++++++++-- src/dve/pipeline/spark_pipeline.py | 1 + 6 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py index 043f826..ef147b6 100644 --- a/src/dve/core_engine/backends/base/rules.py +++ b/src/dve/core_engine/backends/base/rules.py @@ -360,6 +360,7 @@ def apply_sync_filters( messages: Messages = [] for entity_name, filter_rules in filters_by_entity.items(): + self.logger.info(f"Applying filters to {entity_name}") entity = entities[entity_name] filter_column_names: list[str] = [] @@ -367,6 +368,7 @@ def apply_sync_filters( modified_entities = {entity_name: entity} for rule in filter_rules: + self.logger.info(f"Applying filter {rule.reporting.code}") if rule.reporting.emit == "record_failure": column_name = f"filter_{uuid4().hex}" filter_column_names.append(column_name) @@ -411,7 +413,12 @@ def apply_sync_filters( if not success: return messages, False + self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues") + if filter_column_names: + self.logger.info( + f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long + ) success_condition = " AND ".join( [f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names] ) @@ -456,6 +463,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag altering the entities in-place. """ + self.logger.info("Applying business rules") rules_and_locals: Iterable[tuple[Rule, TemplateVariables]] if rule_metadata.templating_strategy == "upfront": rules_and_locals = [] @@ -472,6 +480,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag rules_and_locals = rule_metadata messages: Messages = [] + + self.logger.info("Applying pre-sync steps") for rule, local_variables in rules_and_locals: for step in rule.pre_sync_steps: if rule_metadata.templating_strategy == "runtime": @@ -498,6 +508,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag if not success: return messages + self.logger.info("Applying post-sync steps") + for rule, local_variables in rules_and_locals: for step in rule.post_sync_steps: if rule_metadata.templating_strategy == "runtime": diff --git a/src/dve/core_engine/backends/implementations/duckdb/contract.py b/src/dve/core_engine/backends/implementations/duckdb/contract.py index 5113da5..51017a5 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/contract.py +++ b/src/dve/core_engine/backends/implementations/duckdb/contract.py @@ -102,7 +102,6 @@ def apply_data_contract( self, entities: DuckDBEntities, contract_metadata: DataContractMetadata ) -> tuple[DuckDBEntities, Messages, StageSuccessful]: """Apply the data contract to the duckdb relations""" - self.logger.info("Applying data contracts") all_messages: Messages = [] successful = True @@ -131,6 +130,9 @@ def apply_data_contract( coerce_inferred_numpy_array_to_list(relation.df()).apply( application_helper, axis=1 ) # pandas uses eager evaluation so potential memory issue here? + self.logger.info( + f"Data contract found {len(application_helper.errors)} issues in {entity_name}" + ) all_messages.extend(application_helper.errors) casting_statements = [ diff --git a/src/dve/core_engine/backends/implementations/spark/contract.py b/src/dve/core_engine/backends/implementations/spark/contract.py index bbd2d5a..afbd85e 100644 --- a/src/dve/core_engine/backends/implementations/spark/contract.py +++ b/src/dve/core_engine/backends/implementations/spark/contract.py @@ -113,7 +113,10 @@ def apply_data_contract( # .persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ) messages = validated.flatMap(lambda row: row[1]).filter(bool) + messages.cache() + self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}") all_messages.extend(messages.collect()) + messages.unpersist() try: record_df = record_df.select( diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index f04e71c..87e927d 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline): """ Modified Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self, diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 385687b..366b590 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -190,6 +190,7 @@ def write_file_to_parquet( errors = [] for model_name, model in models.items(): + self._logger.info(f"Transforming {model_name} to stringified parquet") reader: BaseFileReader = load_reader(dataset, model_name, ext) try: if not entity_type: @@ -230,6 +231,7 @@ def audit_received_file_step( self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]] ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: """Set files as being received and mark them for file transformation""" + self._logger.info("Starting audit received file service") audit_received_futures: list[tuple[str, FileURI, Future]] = [] for submission_file in submitted_files: data_uri, metadata_uri = submission_file @@ -291,7 +293,7 @@ def file_transformation( """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") - + self._logger.info(f"Applying file transformation to {submission_info.submission_id}") errors: list[FeedbackMessage] = [] submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( @@ -326,6 +328,7 @@ def file_transformation_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to transform files from their original format into parquet files""" + self._logger.info("Starting file transformation service") file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] for submission_info in submissions_to_process: @@ -397,6 +400,7 @@ def apply_data_contract( self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None ) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" + self._logger.info(f"Applying data contract to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "contract", submission_info.submission_id @@ -450,6 +454,7 @@ def data_contract_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to validate the types of an untyped (stringly typed) parquet file""" + self._logger.info("Starting data contract service") processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] @@ -517,6 +522,7 @@ def apply_business_rules( """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + self._logger.info(f"Applying business rules to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "business_rules", submission_info.submission_id @@ -606,6 +612,7 @@ def business_rule_step( list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" + self._logger.info("Starting business rules service") future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] for submission_info, submission_status in files: @@ -747,7 +754,7 @@ def error_report( SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI] ]: """Creates the error reports given a submission info and submission status""" - + self._logger.info(f"Generating error report for {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "error_report", submission_info.submission_id @@ -756,6 +763,7 @@ def error_report( if not self.processed_files_path: raise AttributeError("processed files path not provided") + self._logger.info("Reading error dataframes") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) if not submission_status.number_of_records: @@ -794,9 +802,11 @@ def error_report( "error_reports", f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx", ) + self._logger.info("Writing error report") with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) + self._logger.info("Publishing error aggregates") self._publish_error_aggregates(submission_info.submission_id, aggregates) return submission_info, submission_status, sub_stats, report_uri @@ -812,6 +822,7 @@ def error_report_step( """Step to produce error reports takes processed files and files that failed file transformation """ + self._logger.info("Starting error reports service") futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4b26163..71fdb32 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline): """ Polymorphed Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self,