Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/dve/core_engine/backends/base/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,15 @@ def apply_sync_filters(

messages: Messages = []
for entity_name, filter_rules in filters_by_entity.items():
self.logger.info(f"Applying filters to {entity_name}")
entity = entities[entity_name]

filter_column_names: list[str] = []
unmodified_entities = {entity_name: entity}
modified_entities = {entity_name: entity}

for rule in filter_rules:
self.logger.info(f"Applying filter {rule.reporting.code}")
if rule.reporting.emit == "record_failure":
column_name = f"filter_{uuid4().hex}"
filter_column_names.append(column_name)
Expand Down Expand Up @@ -411,7 +413,12 @@ def apply_sync_filters(
if not success:
return messages, False

self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues")

if filter_column_names:
self.logger.info(
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
)
success_condition = " AND ".join(
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
)
Expand Down Expand Up @@ -456,6 +463,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
altering the entities in-place.

"""
self.logger.info("Applying business rules")
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
if rule_metadata.templating_strategy == "upfront":
rules_and_locals = []
Expand All @@ -472,6 +480,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
rules_and_locals = rule_metadata

messages: Messages = []

self.logger.info("Applying pre-sync steps")
for rule, local_variables in rules_and_locals:
for step in rule.pre_sync_steps:
if rule_metadata.templating_strategy == "runtime":
Expand All @@ -498,6 +508,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
if not success:
return messages

self.logger.info("Applying post-sync steps")

for rule, local_variables in rules_and_locals:
for step in rule.post_sync_steps:
if rule_metadata.templating_strategy == "runtime":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def apply_data_contract(
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata
) -> tuple[DuckDBEntities, Messages, StageSuccessful]:
"""Apply the data contract to the duckdb relations"""
self.logger.info("Applying data contracts")
all_messages: Messages = []

successful = True
Expand Down Expand Up @@ -131,6 +130,9 @@ def apply_data_contract(
coerce_inferred_numpy_array_to_list(relation.df()).apply(
application_helper, axis=1
) # pandas uses eager evaluation so potential memory issue here?
self.logger.info(
f"Data contract found {len(application_helper.errors)} issues in {entity_name}"
)
all_messages.extend(application_helper.errors)

casting_statements = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ def apply_data_contract(
# .persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
)
messages = validated.flatMap(lambda row: row[1]).filter(bool)
messages.cache()
self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}")
all_messages.extend(messages.collect())
messages.unpersist()

try:
record_df = record_df.select(
Expand Down
1 change: 1 addition & 0 deletions src/dve/pipeline/duckdb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline):
"""
Modified Pipeline class for running a DVE Pipeline with Spark
"""

# pylint: disable=R0913
def __init__(
self,
Expand Down
15 changes: 13 additions & 2 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def write_file_to_parquet(
errors = []

for model_name, model in models.items():
self._logger.info(f"Transforming {model_name} to stringified parquet")
reader: BaseFileReader = load_reader(dataset, model_name, ext)
try:
if not entity_type:
Expand Down Expand Up @@ -230,6 +231,7 @@ def audit_received_file_step(
self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]]
) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]:
"""Set files as being received and mark them for file transformation"""
self._logger.info("Starting audit received file service")
audit_received_futures: list[tuple[str, FileURI, Future]] = []
for submission_file in submitted_files:
data_uri, metadata_uri = submission_file
Expand Down Expand Up @@ -291,7 +293,7 @@ def file_transformation(
"""Transform a file from its original format into a 'stringified' parquet file"""
if not self.processed_files_path:
raise AttributeError("processed files path not provided")

self._logger.info(f"Applying file transformation to {submission_info.submission_id}")
errors: list[FeedbackMessage] = []
submission_status: SubmissionStatus = SubmissionStatus()
submission_file_uri: URI = fh.joinuri(
Expand Down Expand Up @@ -326,6 +328,7 @@ def file_transformation_step(
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
]:
"""Step to transform files from their original format into parquet files"""
self._logger.info("Starting file transformation service")
file_transform_futures: list[tuple[SubmissionInfo, Future]] = []

for submission_info in submissions_to_process:
Expand Down Expand Up @@ -397,6 +400,7 @@ def apply_data_contract(
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
) -> tuple[SubmissionInfo, SubmissionStatus]:
"""Method for applying the data contract given a submission_info"""
self._logger.info(f"Applying data contract to {submission_info.submission_id}")
if not submission_status:
submission_status = self.get_submission_status(
"contract", submission_info.submission_id
Expand Down Expand Up @@ -450,6 +454,7 @@ def data_contract_step(
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
]:
"""Step to validate the types of an untyped (stringly typed) parquet file"""
self._logger.info("Starting data contract service")
processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = []
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
Expand Down Expand Up @@ -517,6 +522,7 @@ def apply_business_rules(
"""Apply the business rules to a given submission, the submission may have failed at the
data_contract step so this should be passed in as a bool
"""
self._logger.info(f"Applying business rules to {submission_info.submission_id}")
if not submission_status:
submission_status = self.get_submission_status(
"business_rules", submission_info.submission_id
Expand Down Expand Up @@ -606,6 +612,7 @@ def business_rule_step(
list[tuple[SubmissionInfo, SubmissionStatus]],
]:
"""Step to apply business rules (Step impl) to a typed parquet file"""
self._logger.info("Starting business rules service")
future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []

for submission_info, submission_status in files:
Expand Down Expand Up @@ -747,7 +754,7 @@ def error_report(
SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]
]:
"""Creates the error reports given a submission info and submission status"""

self._logger.info(f"Generating error report for {submission_info.submission_id}")
if not submission_status:
submission_status = self.get_submission_status(
"error_report", submission_info.submission_id
Expand All @@ -756,6 +763,7 @@ def error_report(
if not self.processed_files_path:
raise AttributeError("processed files path not provided")

self._logger.info("Reading error dataframes")
errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id)

if not submission_status.number_of_records:
Expand Down Expand Up @@ -794,9 +802,11 @@ def error_report(
"error_reports",
f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx",
)
self._logger.info("Writing error report")
with fh.open_stream(report_uri, "wb") as stream:
stream.write(er.ExcelFormat.convert_to_bytes(workbook))

self._logger.info("Publishing error aggregates")
self._publish_error_aggregates(submission_info.submission_id, aggregates)

return submission_info, submission_status, sub_stats, report_uri
Expand All @@ -812,6 +822,7 @@ def error_report_step(
"""Step to produce error reports
takes processed files and files that failed file transformation
"""
self._logger.info("Starting error reports service")
futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
reports: list[
tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI]
Expand Down
1 change: 1 addition & 0 deletions src/dve/pipeline/spark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline):
"""
Polymorphed Pipeline class for running a DVE Pipeline with Spark
"""

# pylint: disable=R0913
def __init__(
self,
Expand Down