Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ behave:
${activate} behave

pytest:
${activate} pytest tests/
${activate} pytest -c pytest-dev.ini

all-tests: pytest behave

Expand Down
3 changes: 3 additions & 0 deletions pytest-dev.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
log_cli = true
log_cli_level = INFO
20 changes: 4 additions & 16 deletions src/dve/core_engine/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
"""Exceptions emitted by the pipeline."""

from collections.abc import Iterator
import traceback
from typing import Optional

from dve.core_engine.backends.implementations.spark.types import SparkEntities
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import Messages


class CriticalProcessingError(ValueError):
"""An exception emitted if critical errors are received."""
Expand All @@ -15,26 +11,18 @@ def __init__(
self,
error_message: str,
*args: object,
messages: Optional[Messages],
entities: Optional[SparkEntities] = None
messages: Optional[list[str]] = None,
) -> None:
super().__init__(error_message, *args)
self.error_message = error_message
"""The error message explaining the critical processing error."""
self.messages = messages
"""The messages gathered at the time the error was emitted."""
self.entities = entities
"""The entities as they exist at the time the error was emitted."""

@property
def critical_messages(self) -> Iterator[FeedbackMessage]:
"""Critical messages which caused the processing error."""
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
"""The stacktrace for the messages."""

@classmethod
def from_exception(cls, exc: Exception):
"""Create from broader exception, for recording in processing errors"""
return cls(error_message=repr(exc), entities=None, messages=[])
return cls(error_message=repr(exc), messages=traceback.format_exception(exc))


class EntityTypeMismatch(TypeError):
Expand Down
3 changes: 3 additions & 0 deletions src/dve/pipeline/duckdb_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""DuckDB implementation for `Pipeline` object."""

import logging
from typing import Optional

from duckdb import DuckDBPyConnection, DuckDBPyRelation
Expand Down Expand Up @@ -30,6 +31,7 @@ def __init__(
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
job_run_id: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
self._connection = connection
super().__init__(
Expand All @@ -41,6 +43,7 @@ def __init__(
submitted_files_path,
reference_data_loader,
job_run_id,
logger,
)

# pylint: disable=arguments-differ
Expand Down
12 changes: 4 additions & 8 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def file_transformation(
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("File transformation raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
Expand All @@ -73,8 +72,7 @@ def apply_data_contract(
try:
return super().apply_data_contract(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Apply data contract raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"contract",
Expand All @@ -89,8 +87,7 @@ def apply_business_rules(
try:
return super().apply_business_rules(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Apply business rules raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"business_rules",
Expand All @@ -105,8 +102,7 @@ def error_report(
try:
return super().error_report(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Error reports raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Error reports raised exception:")
sub_stats = None
report_uri = None
dump_processing_errors(
Expand Down
32 changes: 16 additions & 16 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long
"""Generic Pipeline object to define how DVE should be interacted with."""
import json
import logging
import re
from collections import defaultdict
from collections.abc import Generator, Iterable, Iterator
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(
submitted_files_path: Optional[URI],
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
job_run_id: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
self._submitted_files_path = submitted_files_path
self._processed_files_path = processed_files_path
Expand All @@ -66,11 +68,16 @@ def __init__(
self._audit_tables = audit_tables
self._data_contract = data_contract
self._step_implementations = step_implementations
self._logger = get_logger(__name__)
self._logger = logger or get_logger(__name__)
self._summary_lock = Lock()
self._rec_tracking_lock = Lock()
self._aggregates_lock = Lock()

if self._data_contract:
self._data_contract.logger = self._logger
if self._step_implementations:
self._step_implementations.logger = self._logger

@property
def job_run_id(self) -> Optional[int]:
"""Unique Identifier for the job/process that is running this Pipeline."""
Expand Down Expand Up @@ -244,8 +251,7 @@ def audit_received_file_step(
)
continue
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"audit_received_file raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("audit_received_file raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_id),
"audit_received",
Expand Down Expand Up @@ -301,8 +307,7 @@ def file_transformation(
)

except MessageBearingError as exc:
self._logger.error(f"Unexpected file transformation error: {exc}")
self._logger.exception(exc)
self._logger.exception("Unexpected file transformation error:")
errors.extend(exc.messages)

if errors:
Expand Down Expand Up @@ -352,8 +357,7 @@ def file_transformation_step(
)
continue
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("File transformation raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, sub_info.submission_id),
"file_transformation",
Expand Down Expand Up @@ -478,8 +482,7 @@ def data_contract_step(
)
continue
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"Data Contract raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Data Contract raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, sub_info.submission_id),
"contract",
Expand Down Expand Up @@ -644,8 +647,7 @@ def business_rule_step(
)
continue
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"Business Rules raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Business Rules raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, sub_info.submission_id),
"business_rules",
Expand Down Expand Up @@ -704,9 +706,8 @@ def _get_error_dataframes(self, submission_id: str):
errors = None
try:
errors = json.load(f)
except UnicodeDecodeError as exc:
self._logger.error(f"Error reading file: {file}")
self._logger.exception(exc)
except UnicodeDecodeError:
self._logger.exception(f"Error reading file: {file}")
continue
if not errors:
continue
Expand Down Expand Up @@ -845,8 +846,7 @@ def error_report_step(
)
continue
except Exception as exc: # pylint: disable=W0703
self._logger.error(f"Error reports raised exception: {exc}")
self._logger.exception(exc)
self._logger.exception("Error reports raised exception:")
dump_processing_errors(
fh.joinuri(self.processed_files_path, sub_info.submission_id),
"error_report",
Expand Down
3 changes: 3 additions & 0 deletions src/dve/pipeline/spark_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Spark implementation for `Pipeline` object."""

import logging
from concurrent.futures import Executor
from typing import Optional

Expand Down Expand Up @@ -32,6 +33,7 @@ def __init__(
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
spark: Optional[SparkSession] = None,
job_run_id: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
self._spark = spark if spark else SparkSession.builder.getOrCreate()
super().__init__(
Expand All @@ -43,6 +45,7 @@ def __init__(
submitted_files_path,
reference_data_loader,
job_run_id,
logger,
)

# pylint: disable=arguments-differ
Expand Down
3 changes: 2 additions & 1 deletion src/dve/reporting/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def dump_processing_errors(
if not errors:
raise AttributeError("errors list not passed")

error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json")
error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
processed = []

for error in errors:
Expand All @@ -71,6 +71,7 @@ def dump_processing_errors(
"error_location": "processing",
"error_level": "integrity",
"error_message": error.error_message,
"error_traceback": error.messages,
}
)

Expand Down
38 changes: 37 additions & 1 deletion tests/test_pipeline/test_foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tempfile
from uuid import uuid4

import pytest
import polars as pl

from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
Expand Down Expand Up @@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert not fh.get_resource_exists(report_uri)
assert not output_loc

perror_path = Path(
processing_folder,
sub_info.submission_id,
"processing_errors",
"processing_errors.json"
)
assert perror_path.exists()
perror_schema = {
"step_name": pl.Utf8(),
"error_location": pl.Utf8(),
"error_level": pl.Utf8(),
"error_message": pl.Utf8(),
"error_traceback": pl.List(pl.Utf8()),
}
expected_error_df = (
pl.DataFrame(
[
{
"step_name": "file_transformation",
"error_location": "processing",
"error_level": "integrity",
"error_message": "ReaderLacksEntityTypeSupport()",
"error_traceback": None,
},
],
perror_schema
)
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
)
actual_error_df = (
pl.read_json(perror_path, schema=perror_schema)
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
)
assert actual_error_df.equals(expected_error_df)

assert len(list(fh.iter_prefix(audit_files))) == 2


Expand Down
51 changes: 51 additions & 0 deletions tests/test_reporting/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""test utility functions & objects in dve.reporting module"""

import tempfile
from pathlib import Path

import polars as pl

from dve.core_engine.exceptions import CriticalProcessingError
from dve.reporting.utils import dump_processing_errors

# pylint: disable=C0116


def test_dump_processing_errors():
perror_schema = {
"step_name": pl.Utf8(),
"error_location": pl.Utf8(),
"error_level": pl.Utf8(),
"error_message": pl.Utf8(),
"error_stacktrace": pl.List(pl.Utf8()),
}
with tempfile.TemporaryDirectory() as temp_dir:
dump_processing_errors(
temp_dir,
"test_step",
[CriticalProcessingError("test error message")]
)

output_path = Path(temp_dir, "processing_errors")

assert output_path.exists()
assert len(list(output_path.iterdir())) == 1

expected_df = pl.DataFrame(
[
{
"step_name": "test_step",
"error_location": "processing",
"error_level": "integrity",
"error_message": "test error message",
"error_stacktrace": None,
},
],
perror_schema
)
error_df = pl.read_json(
Path(output_path, "processing_errors.json")
)
cols_to_check = ["step_name", "error_location", "error_level", "error_message"]

assert error_df.select(pl.col(k) for k in cols_to_check).equals(expected_df.select(pl.col(k) for k in cols_to_check))