Skip to content
4 changes: 2 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ for entity in data_contract_config.schemas:

# Data contract step here
data_contract = SparkDataContract(spark_session=spark)
entities, validation_messages, success = data_contract.apply_data_contract(
entities, data_contract_config
entities, feedback_errors_uri, success = data_contract.apply_data_contract(
entities, None, data_contract_config
)
```

Expand Down
Empty file added src/dve/common/__init__.py
Empty file.
187 changes: 187 additions & 0 deletions src/dve/common/error_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Utilities to support reporting"""

import datetime as dt
import json
import logging
from collections.abc import Iterable
from itertools import chain
from multiprocessing import Queue
from threading import Thread
from typing import Optional, Union

import dve.parser.file_handling as fh
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import UserMessage
from dve.core_engine.type_hints import URI, DVEStage, Messages


def get_feedback_errors_uri(working_folder: URI, step_name: DVEStage) -> URI:
"""Determine the location of json lines file containing all errors generated in a step"""
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")


def get_processing_errors_uri(working_folder: URI) -> URI:
"""Determine the location of json lines file containing all processing
errors generated from DVE run"""
return fh.joinuri(working_folder, "errors", "processing_errors", "processing_errors.jsonl")


def dump_feedback_errors(
working_folder: URI,
step_name: DVEStage,
messages: Messages,
key_fields: Optional[dict[str, list[str]]] = None,
) -> URI:
"""Write out captured feedback error messages."""
if not working_folder:
raise AttributeError("processed files path not passed")

if not key_fields:
key_fields = {}

error_file = get_feedback_errors_uri(working_folder, step_name)
processed = []

for message in messages:
if message.original_entity is not None:
primary_keys = key_fields.get(message.original_entity, [])
elif message.entity is not None:
primary_keys = key_fields.get(message.entity, [])
else:
primary_keys = []

error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
max_number_of_values=10,
record_converter=None,
)
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
processed.append(error)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
return error_file


def dump_processing_errors(
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
):
"""Write out critical processing errors"""
if not working_folder:
raise AttributeError("processed files path not passed")
if not step_name:
raise AttributeError("step name not passed")
if not errors:
raise AttributeError("errors list not passed")

error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
processed = []

for error in errors:
processed.append(
{
"step_name": step_name,
"error_location": "processing",
"error_level": "integrity",
"error_message": error.error_message,
"error_traceback": error.messages,
}
)

with fh.open_stream(error_file, "a") as f:
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")

return error_file


def load_feedback_messages(feedback_messages_uri: URI) -> Iterable[UserMessage]:
"""Load user messages from jsonl file"""
if not fh.get_resource_exists(feedback_messages_uri):
return
with fh.open_stream(feedback_messages_uri) as errs:
yield from (UserMessage(**json.loads(err)) for err in errs.readlines())


def load_all_error_messages(error_directory_uri: URI) -> Iterable[UserMessage]:
"Load user messages from all jsonl files"
return chain.from_iterable(
[
load_feedback_messages(err_file)
for err_file, _ in fh.iter_prefix(error_directory_uri)
if err_file.endswith(".jsonl")
]
)


class BackgroundMessageWriter:
"""Controls batch writes to error jsonl files"""

def __init__(
self,
working_directory: URI,
dve_stage: DVEStage,
key_fields: Optional[dict[str, list[str]]] = None,
logger: Optional[logging.Logger] = None,
):
self._working_directory = working_directory
self._dve_stage = dve_stage
self._feedback_message_uri = get_feedback_errors_uri(
self._working_directory, self._dve_stage
)
self._key_fields = key_fields
self.logger = logger or get_logger(type(self).__name__)
self._write_thread: Optional[Thread] = None
self._queue: Queue = Queue()

@property
def write_queue(self) -> Queue: # type: ignore
"""Queue for storing batches of messages to be written"""
return self._queue

@property
def write_thread(self) -> Thread: # type: ignore
"""Thread to write batches of messages to jsonl file"""
if not self._write_thread:
self._write_thread = Thread(target=self._write_process_wrapper)
return self._write_thread

def _write_process_wrapper(self):
"""Wrapper for dump feedback errors to run in background process"""
while True:
if msgs := self.write_queue.get():
dump_feedback_errors(
self._working_directory, self._dve_stage, msgs, self._key_fields
)
else:
break

def __enter__(self) -> "BackgroundMessageWriter":
self.write_thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
self.logger.exception(
"Issue occured during background write process:",
exc_info=(exc_type, exc_value, traceback),
)
self.write_queue.put(None)
self.write_thread.join()


def conditional_cast(value, primary_keys: list[str], value_separator: str) -> Union[list[str], str]:
"""Determines what to do with a value coming back from the error list"""
if isinstance(value, list):
casts = [
conditional_cast(val, primary_keys, value_separator) for val in value
] # type: ignore
return value_separator.join(
[f"{pk}: {id}" if pk else "" for pk, id in zip(primary_keys, casts)]
)
if isinstance(value, dt.date):
return value.isoformat()
if isinstance(value, dict):
return ""
return str(value)
57 changes: 30 additions & 27 deletions src/dve/core_engine/backends/base/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
from dve.core_engine.loggers import get_logger
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import (
URI,
EntityLocations,
EntityName,
EntityParquetLocations,
Messages,
)
from dve.core_engine.type_hints import URI, EntityLocations, EntityName, EntityParquetLocations
from dve.parser.file_handling.service import get_parent, joinuri


class BaseBackend(Generic[EntityType], ABC):
Expand Down Expand Up @@ -148,65 +143,71 @@ def convert_entities_to_spark(

def apply(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[Entities, Messages, StageSuccessful]:
) -> tuple[Entities, URI, StageSuccessful]:
"""Apply the data contract and the rules, returning the entities and all
generated messages.

"""
reference_data = self.load_reference_data(
rule_metadata.reference_data_config, submission_info
)
entities, messages, successful = self.contract.apply(entity_locations, contract_metadata)
entities, dc_feedback_errors_uri, successful, processing_errors_uri = self.contract.apply(
working_dir, entity_locations, contract_metadata
)
if not successful:
return entities, messages, successful
return entities, get_parent(processing_errors_uri), successful

for entity_name, entity in entities.items():
entities[entity_name] = self.step_implementations.add_row_id(entity)

# TODO: Handle entity manager creation errors.
entity_manager = EntityManager(entities, reference_data)
# TODO: Add stage success to 'apply_rules'
rule_messages = self.step_implementations.apply_rules(entity_manager, rule_metadata)
messages.extend(rule_messages)
# TODO: In case of large errors in business rules, write messages to jsonl file
# TODO: and return uri to errors
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)

for entity_name, entity in entity_manager.entities.items():
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)

return entity_manager.entities, messages, True
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True

def process(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
cache_prefix: URI,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[MutableMapping[EntityName, URI], Messages]:
) -> tuple[MutableMapping[EntityName, URI], URI]:
"""Apply the data contract and the rules, write the entities out to parquet
and returning the entity locations and all generated messages.

"""
entities, messages, successful = self.apply(
entity_locations, contract_metadata, rule_metadata, submission_info
entities, feedback_errors_uri, successful = self.apply(
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
)
if successful:
parquet_locations = self.write_entities_to_parquet(entities, cache_prefix)
parquet_locations = self.write_entities_to_parquet(
entities, joinuri(working_dir, "outputs")
)
else:
parquet_locations = {}
return parquet_locations, messages
return parquet_locations, get_parent(feedback_errors_uri)

def process_legacy(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
rule_metadata: RuleMetadata,
cache_prefix: URI,
submission_info: Optional[SubmissionInfo] = None,
) -> tuple[MutableMapping[EntityName, DataFrame], Messages]:
) -> tuple[MutableMapping[EntityName, DataFrame], URI]:
"""Apply the data contract and the rules, create Spark `DataFrame`s from the
entities and return the Spark entities and all generated messages.

Expand All @@ -221,17 +222,19 @@ def process_legacy(
category=DeprecationWarning,
)

entities, messages, successful = self.apply(
entity_locations, contract_metadata, rule_metadata, submission_info
entities, errors_uri, successful = self.apply(
working_dir, entity_locations, contract_metadata, rule_metadata, submission_info
)

if not successful:
return {}, messages
return {}, errors_uri

if self.__entity_type__ == DataFrame:
return entities, messages # type: ignore
return entities, errors_uri # type: ignore

return (
self.convert_entities_to_spark(entities, cache_prefix, _emit_deprecation_warning=False),
messages,
self.convert_entities_to_spark(
entities, joinuri(working_dir, "outputs"), _emit_deprecation_warning=False
),
errors_uri,
)
Loading