From 17b96b3480b338b0967979f62856cac9b11735e4 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Wed, 4 Feb 2026 20:37:55 +0000 Subject: [PATCH 1/4] style: Additional logging around file transformation, business rules and error reports --- src/dve/core_engine/backends/base/rules.py | 8 ++++++++ src/dve/pipeline/pipeline.py | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py index 043f826..8853042 100644 --- a/src/dve/core_engine/backends/base/rules.py +++ b/src/dve/core_engine/backends/base/rules.py @@ -360,6 +360,7 @@ def apply_sync_filters( messages: Messages = [] for entity_name, filter_rules in filters_by_entity.items(): + self.logger.info(f"Applying filters to {entity_name}") entity = entities[entity_name] filter_column_names: list[str] = [] @@ -367,6 +368,7 @@ def apply_sync_filters( modified_entities = {entity_name: entity} for rule in filter_rules: + self.logger.info(f"Applying filter {rule.reporting.code}") if rule.reporting.emit == "record_failure": column_name = f"filter_{uuid4().hex}" filter_column_names.append(column_name) @@ -412,6 +414,7 @@ def apply_sync_filters( return messages, False if filter_column_names: + self.logger.info(f"Filtering records where validation is record level") success_condition = " AND ".join( [f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names] ) @@ -456,6 +459,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag altering the entities in-place. """ + self.logger.info("Applying business rules") rules_and_locals: Iterable[tuple[Rule, TemplateVariables]] if rule_metadata.templating_strategy == "upfront": rules_and_locals = [] @@ -472,6 +476,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag rules_and_locals = rule_metadata messages: Messages = [] + + self.logger.info("Applying pre-sync steps") for rule, local_variables in rules_and_locals: for step in rule.pre_sync_steps: if rule_metadata.templating_strategy == "runtime": @@ -498,6 +504,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag if not success: return messages + self.logger.info("Applying post-sync steps") + for rule, local_variables in rules_and_locals: for step in rule.post_sync_steps: if rule_metadata.templating_strategy == "runtime": diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 385687b..5317b9d 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -190,6 +190,7 @@ def write_file_to_parquet( errors = [] for model_name, model in models.items(): + self._logger.info(f"Transforming {model_name} to stringly typed parquet") reader: BaseFileReader = load_reader(dataset, model_name, ext) try: if not entity_type: @@ -747,7 +748,7 @@ def error_report( SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI] ]: """Creates the error reports given a submission info and submission status""" - + self._logger.info("Generating error report") if not submission_status: submission_status = self.get_submission_status( "error_report", submission_info.submission_id @@ -756,6 +757,7 @@ def error_report( if not self.processed_files_path: raise AttributeError("processed files path not provided") + self._logger.info("Reading error dataframes") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) if not submission_status.number_of_records: @@ -794,9 +796,11 @@ def error_report( "error_reports", f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx", ) + self._logger.info("Writing error report") with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) + self._logger.info("Publishing error aggregates") self._publish_error_aggregates(submission_info.submission_id, aggregates) return submission_info, submission_status, sub_stats, report_uri From 98cea40c9fb1955dbc0c90eea64060860351ff17 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Wed, 4 Feb 2026 20:47:42 +0000 Subject: [PATCH 2/4] style: remove unnecessary f string --- src/dve/core_engine/backends/base/rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py index 8853042..f893e4b 100644 --- a/src/dve/core_engine/backends/base/rules.py +++ b/src/dve/core_engine/backends/base/rules.py @@ -414,7 +414,7 @@ def apply_sync_filters( return messages, False if filter_column_names: - self.logger.info(f"Filtering records where validation is record level") + self.logger.info("Filtering records where validation is record level") success_condition = " AND ".join( [f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names] ) From fff45e945ef138501278bde0dccffab3361dfe7b Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 5 Feb 2026 11:54:35 +0000 Subject: [PATCH 3/4] refactor: further logging details --- src/dve/core_engine/backends/base/rules.py | 10 +++++++--- .../backends/implementations/duckdb/contract.py | 4 +++- .../backends/implementations/spark/contract.py | 3 +++ src/dve/pipeline/duckdb_pipeline.py | 1 + src/dve/pipeline/pipeline.py | 11 +++++++++-- src/dve/pipeline/spark_pipeline.py | 1 + 6 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py index f893e4b..ef147b6 100644 --- a/src/dve/core_engine/backends/base/rules.py +++ b/src/dve/core_engine/backends/base/rules.py @@ -413,8 +413,12 @@ def apply_sync_filters( if not success: return messages, False + self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues") + if filter_column_names: - self.logger.info("Filtering records where validation is record level") + self.logger.info( + f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long + ) success_condition = " AND ".join( [f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names] ) @@ -476,7 +480,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag rules_and_locals = rule_metadata messages: Messages = [] - + self.logger.info("Applying pre-sync steps") for rule, local_variables in rules_and_locals: for step in rule.pre_sync_steps: @@ -505,7 +509,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag return messages self.logger.info("Applying post-sync steps") - + for rule, local_variables in rules_and_locals: for step in rule.post_sync_steps: if rule_metadata.templating_strategy == "runtime": diff --git a/src/dve/core_engine/backends/implementations/duckdb/contract.py b/src/dve/core_engine/backends/implementations/duckdb/contract.py index 5113da5..51017a5 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/contract.py +++ b/src/dve/core_engine/backends/implementations/duckdb/contract.py @@ -102,7 +102,6 @@ def apply_data_contract( self, entities: DuckDBEntities, contract_metadata: DataContractMetadata ) -> tuple[DuckDBEntities, Messages, StageSuccessful]: """Apply the data contract to the duckdb relations""" - self.logger.info("Applying data contracts") all_messages: Messages = [] successful = True @@ -131,6 +130,9 @@ def apply_data_contract( coerce_inferred_numpy_array_to_list(relation.df()).apply( application_helper, axis=1 ) # pandas uses eager evaluation so potential memory issue here? + self.logger.info( + f"Data contract found {len(application_helper.errors)} issues in {entity_name}" + ) all_messages.extend(application_helper.errors) casting_statements = [ diff --git a/src/dve/core_engine/backends/implementations/spark/contract.py b/src/dve/core_engine/backends/implementations/spark/contract.py index bbd2d5a..afbd85e 100644 --- a/src/dve/core_engine/backends/implementations/spark/contract.py +++ b/src/dve/core_engine/backends/implementations/spark/contract.py @@ -113,7 +113,10 @@ def apply_data_contract( # .persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ) messages = validated.flatMap(lambda row: row[1]).filter(bool) + messages.cache() + self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}") all_messages.extend(messages.collect()) + messages.unpersist() try: record_df = record_df.select( diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index f04e71c..87e927d 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline): """ Modified Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self, diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 5317b9d..b40d4ba 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -231,6 +231,7 @@ def audit_received_file_step( self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]] ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: """Set files as being received and mark them for file transformation""" + self._logger.info("Starting audit received file service") audit_received_futures: list[tuple[str, FileURI, Future]] = [] for submission_file in submitted_files: data_uri, metadata_uri = submission_file @@ -292,7 +293,7 @@ def file_transformation( """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") - + self._logger.info(f"Applying file transformation to {submission_info.submission_id}") errors: list[FeedbackMessage] = [] submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( @@ -327,6 +328,7 @@ def file_transformation_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to transform files from their original format into parquet files""" + self._logger.info("Starting file transformation service") file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] for submission_info in submissions_to_process: @@ -398,6 +400,7 @@ def apply_data_contract( self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None ) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" + self._logger.info(f"Applying data contract to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "contract", submission_info.submission_id @@ -451,6 +454,7 @@ def data_contract_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to validate the types of an untyped (stringly typed) parquet file""" + self._logger.info("Starting data contract service") processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] @@ -518,6 +522,7 @@ def apply_business_rules( """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + self._logger.info(f"Applying business rules to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "business_rules", submission_info.submission_id @@ -607,6 +612,7 @@ def business_rule_step( list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" + self._logger.info("Starting business rules service") future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] for submission_info, submission_status in files: @@ -748,7 +754,7 @@ def error_report( SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI] ]: """Creates the error reports given a submission info and submission status""" - self._logger.info("Generating error report") + self._logger.info(f"Generating error report for {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "error_report", submission_info.submission_id @@ -816,6 +822,7 @@ def error_report_step( """Step to produce error reports takes processed files and files that failed file transformation """ + self._logger.info("Starting error reports service") futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4b26163..71fdb32 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline): """ Polymorphed Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self, From 05649ca3675e4cb28560172ad7019ae28ef9af67 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Fri, 6 Feb 2026 12:15:54 +0000 Subject: [PATCH 4/4] refactor: tweaked some logging messages following review --- src/dve/pipeline/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index b40d4ba..366b590 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -190,7 +190,7 @@ def write_file_to_parquet( errors = [] for model_name, model in models.items(): - self._logger.info(f"Transforming {model_name} to stringly typed parquet") + self._logger.info(f"Transforming {model_name} to stringified parquet") reader: BaseFileReader = load_reader(dataset, model_name, ext) try: if not entity_type: