diff --git a/Makefile b/Makefile index cfad520..7684514 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ behave: ${activate} behave pytest: - ${activate} pytest tests/ + ${activate} pytest -c pytest-dev.ini all-tests: pytest behave diff --git a/pytest-dev.ini b/pytest-dev.ini new file mode 100644 index 0000000..11c72fa --- /dev/null +++ b/pytest-dev.ini @@ -0,0 +1,3 @@ +[pytest] +log_cli = true +log_cli_level = INFO diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index cba7508..4877baf 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,12 +1,8 @@ """Exceptions emitted by the pipeline.""" -from collections.abc import Iterator +import traceback from typing import Optional -from dve.core_engine.backends.implementations.spark.types import SparkEntities -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages - class CriticalProcessingError(ValueError): """An exception emitted if critical errors are received.""" @@ -15,26 +11,18 @@ def __init__( self, error_message: str, *args: object, - messages: Optional[Messages], - entities: Optional[SparkEntities] = None + messages: Optional[list[str]] = None, ) -> None: super().__init__(error_message, *args) self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages - """The messages gathered at the time the error was emitted.""" - self.entities = entities - """The entities as they exist at the time the error was emitted.""" - - @property - def critical_messages(self) -> Iterator[FeedbackMessage]: - """Critical messages which caused the processing error.""" - yield from filter(lambda message: message.is_critical, self.messages) # type: ignore + """The stacktrace for the messages.""" @classmethod def from_exception(cls, exc: Exception): """Create from broader exception, for recording in processing errors""" - return cls(error_message=repr(exc), entities=None, messages=[]) + return cls(error_message=repr(exc), messages=traceback.format_exception(exc)) class EntityTypeMismatch(TypeError): diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 96156a9..c6cf6bc 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -1,5 +1,6 @@ """DuckDB implementation for `Pipeline` object.""" +import logging from typing import Optional from duckdb import DuckDBPyConnection, DuckDBPyRelation @@ -30,6 +31,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._connection = connection super().__init__( @@ -41,6 +43,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 4c72375..27bc84b 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -57,8 +57,7 @@ def file_transformation( try: return super().file_transformation(submission_info) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", @@ -73,8 +72,7 @@ def apply_data_contract( try: return super().apply_data_contract(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply data contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply data contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", @@ -89,8 +87,7 @@ def apply_business_rules( try: return super().apply_business_rules(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Apply business rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Apply business rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", @@ -105,8 +102,7 @@ def error_report( try: return super().error_report(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") sub_stats = None report_uri = None dump_processing_errors( diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 819656a..385687b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -1,6 +1,7 @@ # pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long """Generic Pipeline object to define how DVE should be interacted with.""" import json +import logging import re from collections import defaultdict from collections.abc import Generator, Iterable, Iterator @@ -57,6 +58,7 @@ def __init__( submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -66,11 +68,16 @@ def __init__( self._audit_tables = audit_tables self._data_contract = data_contract self._step_implementations = step_implementations - self._logger = get_logger(__name__) + self._logger = logger or get_logger(__name__) self._summary_lock = Lock() self._rec_tracking_lock = Lock() self._aggregates_lock = Lock() + if self._data_contract: + self._data_contract.logger = self._logger + if self._step_implementations: + self._step_implementations.logger = self._logger + @property def job_run_id(self) -> Optional[int]: """Unique Identifier for the job/process that is running this Pipeline.""" @@ -244,8 +251,7 @@ def audit_received_file_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"audit_received_file raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("audit_received_file raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, submission_id), "audit_received", @@ -301,8 +307,7 @@ def file_transformation( ) except MessageBearingError as exc: - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) + self._logger.exception("Unexpected file transformation error:") errors.extend(exc.messages) if errors: @@ -352,8 +357,7 @@ def file_transformation_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"File transformation raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("File transformation raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "file_transformation", @@ -478,8 +482,7 @@ def data_contract_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Data Contract raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Data Contract raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "contract", @@ -644,8 +647,7 @@ def business_rule_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Business Rules raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Business Rules raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "business_rules", @@ -704,9 +706,8 @@ def _get_error_dataframes(self, submission_id: str): errors = None try: errors = json.load(f) - except UnicodeDecodeError as exc: - self._logger.error(f"Error reading file: {file}") - self._logger.exception(exc) + except UnicodeDecodeError: + self._logger.exception(f"Error reading file: {file}") continue if not errors: continue @@ -845,8 +846,7 @@ def error_report_step( ) continue except Exception as exc: # pylint: disable=W0703 - self._logger.error(f"Error reports raised exception: {exc}") - self._logger.exception(exc) + self._logger.exception("Error reports raised exception:") dump_processing_errors( fh.joinuri(self.processed_files_path, sub_info.submission_id), "error_report", diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4111cf3..853677e 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -1,5 +1,6 @@ """Spark implementation for `Pipeline` object.""" +import logging from concurrent.futures import Executor from typing import Optional @@ -32,6 +33,7 @@ def __init__( reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, job_run_id: Optional[int] = None, + logger: Optional[logging.Logger] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( @@ -43,6 +45,7 @@ def __init__( submitted_files_path, reference_data_loader, job_run_id, + logger, ) # pylint: disable=arguments-differ diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index 8832b6a..1cb0b45 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -61,7 +61,7 @@ def dump_processing_errors( if not errors: raise AttributeError("errors list not passed") - error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json") + error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json") processed = [] for error in errors: @@ -71,6 +71,7 @@ def dump_processing_errors( "error_location": "processing", "error_level": "integrity", "error_message": error.error_message, + "error_traceback": error.messages, } ) diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index f0eecc6..68fec99 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -8,7 +8,7 @@ import tempfile from uuid import uuid4 -import pytest +import polars as pl from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader @@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert not fh.get_resource_exists(report_uri) assert not output_loc + + perror_path = Path( + processing_folder, + sub_info.submission_id, + "processing_errors", + "processing_errors.json" + ) + assert perror_path.exists() + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_traceback": pl.List(pl.Utf8()), + } + expected_error_df = ( + pl.DataFrame( + [ + { + "step_name": "file_transformation", + "error_location": "processing", + "error_level": "integrity", + "error_message": "ReaderLacksEntityTypeSupport()", + "error_traceback": None, + }, + ], + perror_schema + ) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + actual_error_df = ( + pl.read_json(perror_path, schema=perror_schema) + .select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message")) + ) + assert actual_error_df.equals(expected_error_df) + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_error_reporting/__init__.py b/tests/test_reporting/__init__.py similarity index 100% rename from tests/test_error_reporting/__init__.py rename to tests/test_reporting/__init__.py diff --git a/tests/test_error_reporting/test_excel_report.py b/tests/test_reporting/test_excel_report.py similarity index 100% rename from tests/test_error_reporting/test_excel_report.py rename to tests/test_reporting/test_excel_report.py diff --git a/tests/test_reporting/test_utils.py b/tests/test_reporting/test_utils.py new file mode 100644 index 0000000..c240ca5 --- /dev/null +++ b/tests/test_reporting/test_utils.py @@ -0,0 +1,51 @@ +"""test utility functions & objects in dve.reporting module""" + +import tempfile +from pathlib import Path + +import polars as pl + +from dve.core_engine.exceptions import CriticalProcessingError +from dve.reporting.utils import dump_processing_errors + +# pylint: disable=C0116 + + +def test_dump_processing_errors(): + perror_schema = { + "step_name": pl.Utf8(), + "error_location": pl.Utf8(), + "error_level": pl.Utf8(), + "error_message": pl.Utf8(), + "error_stacktrace": pl.List(pl.Utf8()), + } + with tempfile.TemporaryDirectory() as temp_dir: + dump_processing_errors( + temp_dir, + "test_step", + [CriticalProcessingError("test error message")] + ) + + output_path = Path(temp_dir, "processing_errors") + + assert output_path.exists() + assert len(list(output_path.iterdir())) == 1 + + expected_df = pl.DataFrame( + [ + { + "step_name": "test_step", + "error_location": "processing", + "error_level": "integrity", + "error_message": "test error message", + "error_stacktrace": None, + }, + ], + perror_schema + ) + error_df = pl.read_json( + Path(output_path, "processing_errors.json") + ) + cols_to_check = ["step_name", "error_location", "error_level", "error_message"] + + assert error_df.select(pl.col(k) for k in cols_to_check).equals(expected_df.select(pl.col(k) for k in cols_to_check))