diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py index 043f826..ef147b6 100644 --- a/src/dve/core_engine/backends/base/rules.py +++ b/src/dve/core_engine/backends/base/rules.py @@ -360,6 +360,7 @@ def apply_sync_filters( messages: Messages = [] for entity_name, filter_rules in filters_by_entity.items(): + self.logger.info(f"Applying filters to {entity_name}") entity = entities[entity_name] filter_column_names: list[str] = [] @@ -367,6 +368,7 @@ def apply_sync_filters( modified_entities = {entity_name: entity} for rule in filter_rules: + self.logger.info(f"Applying filter {rule.reporting.code}") if rule.reporting.emit == "record_failure": column_name = f"filter_{uuid4().hex}" filter_column_names.append(column_name) @@ -411,7 +413,12 @@ def apply_sync_filters( if not success: return messages, False + self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues") + if filter_column_names: + self.logger.info( + f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long + ) success_condition = " AND ".join( [f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names] ) @@ -456,6 +463,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag altering the entities in-place. """ + self.logger.info("Applying business rules") rules_and_locals: Iterable[tuple[Rule, TemplateVariables]] if rule_metadata.templating_strategy == "upfront": rules_and_locals = [] @@ -472,6 +480,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag rules_and_locals = rule_metadata messages: Messages = [] + + self.logger.info("Applying pre-sync steps") for rule, local_variables in rules_and_locals: for step in rule.pre_sync_steps: if rule_metadata.templating_strategy == "runtime": @@ -498,6 +508,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag if not success: return messages + self.logger.info("Applying post-sync steps") + for rule, local_variables in rules_and_locals: for step in rule.post_sync_steps: if rule_metadata.templating_strategy == "runtime": diff --git a/src/dve/core_engine/backends/implementations/duckdb/contract.py b/src/dve/core_engine/backends/implementations/duckdb/contract.py index 5113da5..51017a5 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/contract.py +++ b/src/dve/core_engine/backends/implementations/duckdb/contract.py @@ -102,7 +102,6 @@ def apply_data_contract( self, entities: DuckDBEntities, contract_metadata: DataContractMetadata ) -> tuple[DuckDBEntities, Messages, StageSuccessful]: """Apply the data contract to the duckdb relations""" - self.logger.info("Applying data contracts") all_messages: Messages = [] successful = True @@ -131,6 +130,9 @@ def apply_data_contract( coerce_inferred_numpy_array_to_list(relation.df()).apply( application_helper, axis=1 ) # pandas uses eager evaluation so potential memory issue here? + self.logger.info( + f"Data contract found {len(application_helper.errors)} issues in {entity_name}" + ) all_messages.extend(application_helper.errors) casting_statements = [ diff --git a/src/dve/core_engine/backends/implementations/spark/contract.py b/src/dve/core_engine/backends/implementations/spark/contract.py index bbd2d5a..afbd85e 100644 --- a/src/dve/core_engine/backends/implementations/spark/contract.py +++ b/src/dve/core_engine/backends/implementations/spark/contract.py @@ -113,7 +113,10 @@ def apply_data_contract( # .persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ) messages = validated.flatMap(lambda row: row[1]).filter(bool) + messages.cache() + self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}") all_messages.extend(messages.collect()) + messages.unpersist() try: record_df = record_df.select( diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index f04e71c..87e927d 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline): """ Modified Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self, diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 385687b..366b590 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -190,6 +190,7 @@ def write_file_to_parquet( errors = [] for model_name, model in models.items(): + self._logger.info(f"Transforming {model_name} to stringified parquet") reader: BaseFileReader = load_reader(dataset, model_name, ext) try: if not entity_type: @@ -230,6 +231,7 @@ def audit_received_file_step( self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]] ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: """Set files as being received and mark them for file transformation""" + self._logger.info("Starting audit received file service") audit_received_futures: list[tuple[str, FileURI, Future]] = [] for submission_file in submitted_files: data_uri, metadata_uri = submission_file @@ -291,7 +293,7 @@ def file_transformation( """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") - + self._logger.info(f"Applying file transformation to {submission_info.submission_id}") errors: list[FeedbackMessage] = [] submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( @@ -326,6 +328,7 @@ def file_transformation_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to transform files from their original format into parquet files""" + self._logger.info("Starting file transformation service") file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] for submission_info in submissions_to_process: @@ -397,6 +400,7 @@ def apply_data_contract( self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None ) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" + self._logger.info(f"Applying data contract to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "contract", submission_info.submission_id @@ -450,6 +454,7 @@ def data_contract_step( list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] ]: """Step to validate the types of an untyped (stringly typed) parquet file""" + self._logger.info("Starting data contract service") processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] @@ -517,6 +522,7 @@ def apply_business_rules( """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + self._logger.info(f"Applying business rules to {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "business_rules", submission_info.submission_id @@ -606,6 +612,7 @@ def business_rule_step( list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" + self._logger.info("Starting business rules service") future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] for submission_info, submission_status in files: @@ -747,7 +754,7 @@ def error_report( SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI] ]: """Creates the error reports given a submission info and submission status""" - + self._logger.info(f"Generating error report for {submission_info.submission_id}") if not submission_status: submission_status = self.get_submission_status( "error_report", submission_info.submission_id @@ -756,6 +763,7 @@ def error_report( if not self.processed_files_path: raise AttributeError("processed files path not provided") + self._logger.info("Reading error dataframes") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) if not submission_status.number_of_records: @@ -794,9 +802,11 @@ def error_report( "error_reports", f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx", ) + self._logger.info("Writing error report") with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) + self._logger.info("Publishing error aggregates") self._publish_error_aggregates(submission_info.submission_id, aggregates) return submission_info, submission_status, sub_stats, report_uri @@ -812,6 +822,7 @@ def error_report_step( """Step to produce error reports takes processed files and files that failed file transformation """ + self._logger.info("Starting error reports service") futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 4b26163..71fdb32 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline): """ Polymorphed Pipeline class for running a DVE Pipeline with Spark """ + # pylint: disable=R0913 def __init__( self,