From 13ccf9e3ed4c786ec98dff93265109c0579cd4a1 Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Tue, 18 Mar 2025 17:19:54 +0100 Subject: [PATCH 1/3] Create soda runner --- dagger/conf.py | 10 ++ .../airflow/operator_creators/soda_creator.py | 59 ++++++++ .../dag_creator/airflow/operator_factory.py | 1 + .../airflow/operators/soda_batch.py | 5 + dagger/pipeline/task_factory.py | 1 + dagger/pipeline/tasks/soda_task.py | 143 ++++++++++++++++++ 6 files changed, 219 insertions(+) create mode 100644 dagger/dag_creator/airflow/operator_creators/soda_creator.py create mode 100644 dagger/dag_creator/airflow/operators/soda_batch.py create mode 100644 dagger/pipeline/tasks/soda_task.py diff --git a/dagger/conf.py b/dagger/conf.py index df2ab8e..c024e6b 100644 --- a/dagger/conf.py +++ b/dagger/conf.py @@ -110,3 +110,13 @@ REVERSE_ETL_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None) REVERSE_ETL_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None) REVERSE_ETL_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None) + +# Soda parameters +SODA_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None) +SODA_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None) +SODA_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None) +SODA_DEFAULT_PROJECT_DIR = reverse_etl_config.get('default_project_dir', None) +SODA_DEFAULT_PROFILES_DIR = reverse_etl_config.get('default_profiles_dir', None) +SODA_DEFAULT_PROFILE_NAME = reverse_etl_config.get('default_profile_name', None) +SODA_DEFAULT_OUTPUT_TABLE = reverse_etl_config.get('default_output_table', None) +SODA_DEFAULT_OUTPUT_S3_PATH = reverse_etl_config.get('default_output_s3_path', None) diff --git a/dagger/dag_creator/airflow/operator_creators/soda_creator.py b/dagger/dag_creator/airflow/operator_creators/soda_creator.py new file mode 100644 index 0000000..a0c593d --- /dev/null +++ b/dagger/dag_creator/airflow/operator_creators/soda_creator.py @@ -0,0 +1,59 @@ +import base64 + +from dagger.dag_creator.airflow.operator_creators.batch_creator import BatchCreator +from dagger.dag_creator.airflow.operators.soda_batch import SodaBatchOperator +import json + + +class SodaCreator(BatchCreator): + ref_name = "soda" + + def __init__(self, task, dag): + super().__init__(task, dag) + + self._absolute_job_name = task.absolute_job_name + self._project_dir = task.project_dir + self._profiles_dir = task.profiles_dir + self._profile_name = task.profile_name + self._target_name = task.target_name + self._table_name = task.table_name + self._model_name = task.model_name + self._output_s3_path = task.output_s3_path + self._output_table = task.output_table + self._vars = task.vars + + def _generate_command(self): + command = BatchCreator._generate_command(self) + + command.append(f"--project_dir={self._project_dir}") + command.append(f"--profiles_dir={self._profiles_dir}") + command.append(f"--profile_name={self._profile_name}") + command.append(f"--target_name={self._target_name}") + command.append(f"--output_s3_path={self._output_s3_path}") + command.append(f"--output_table={self._output_table}") + + if self._table_name: + command.append(f"--table_name={self._table_name}") + if self._model_name: + command.append(f"--model_name={self._model_name}") + if self._vars: + command.append(f"--vars={self._vars}") + return command + + def _create_operator(self, **kwargs): + overrides = self._task.overrides + overrides.update({"command": self._generate_command()}) + + job_name = self._validate_job_name(self._task.job_name, self._task.absolute_job_name) + batch_op = SodaBatchOperator( + dag=self._dag, + task_id=self._task.name, + job_name=self._task.name, + job_definition=job_name, + region_name=self._task.region_name, + job_queue=self._task.job_queue, + container_overrides=overrides, + awslogs_enabled=True, + **kwargs, + ) + return batch_op diff --git a/dagger/dag_creator/airflow/operator_factory.py b/dagger/dag_creator/airflow/operator_factory.py index f610f1e..2a1654a 100644 --- a/dagger/dag_creator/airflow/operator_factory.py +++ b/dagger/dag_creator/airflow/operator_factory.py @@ -13,6 +13,7 @@ reverse_etl_creator, spark_creator, sqoop_creator, + soda_creator, ) from dagger.dag_creator.airflow.utils.operator_factories import make_control_flow from dagger.utilities.classes import get_deep_obj_subclasses diff --git a/dagger/dag_creator/airflow/operators/soda_batch.py b/dagger/dag_creator/airflow/operators/soda_batch.py new file mode 100644 index 0000000..d67a26c --- /dev/null +++ b/dagger/dag_creator/airflow/operators/soda_batch.py @@ -0,0 +1,5 @@ +from dagger.dag_creator.airflow.operators.awsbatch_operator import AWSBatchOperator + +class SodaBatchOperator(AWSBatchOperator): + custom_operator_name = 'Soda' + ui_color = "#e4f0e7" diff --git a/dagger/pipeline/task_factory.py b/dagger/pipeline/task_factory.py index d8a1e53..9ed79e7 100644 --- a/dagger/pipeline/task_factory.py +++ b/dagger/pipeline/task_factory.py @@ -12,6 +12,7 @@ reverse_etl_task, spark_task, sqoop_task, + soda_task ) from dagger.utilities.classes import get_deep_obj_subclasses diff --git a/dagger/pipeline/tasks/soda_task.py b/dagger/pipeline/tasks/soda_task.py new file mode 100644 index 0000000..25a90f7 --- /dev/null +++ b/dagger/pipeline/tasks/soda_task.py @@ -0,0 +1,143 @@ +from dagger.pipeline.tasks.batch_task import BatchTask +from dagger.utilities.config_validator import Attribute +from dagger import conf + +class SodaTask(BatchTask): + ref_name = "soda" + + @classmethod + def init_attributes(cls, orig_cls): + cls.add_config_attributes( + [ + Attribute( + attribute_name="executable_prefix", + required=False, + parent_fields=["task_parameters"], + comment="E.g.: python", + ), + Attribute( + attribute_name="executable", + required=False, + parent_fields=["task_parameters"], + comment="E.g.: my_code.py", + ), + Attribute( + attribute_name="project_dir", + parent_fields=["task_parameters"], + required = True, + validator=str, + comment="Directory containing the dbt_project.yml file", + ), + Attribute( + attribute_name="profiles_dir", + parent_fields=["task_parameters"], + required=True, + comment="Directory containing the profiles.yml file", + ), + Attribute( + attribute_name="profile_name", + parent_fields=["task_parameters"], + required=True, + comment="Profile name to load from the profiles.yml file.", + ), + Attribute( + attribute_name="target_name", + parent_fields=["task_parameters"], + validator=str, + required=True, + comment="Target to load for the given profile. By default use 'ENV' environment variable.", + ), + Attribute( + attribute_name="table_name", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Full table name in the format 'database.schema.table'", + ), + Attribute( + attribute_name="model_name", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Name of dbt model to be scanned by soda", + ), + Attribute( + attribute_name="output_s3_path", + parent_fields=["task_parameters"], + validator=str, + required=True, + comment="S3 location to upload the scan results", + + ), + Attribute( + attribute_name="output_table", + parent_fields=["task_parameters"], + validator=str, + required=True, + comment="Athena table that will contain the scan results.", + ), + Attribute( + attribute_name="vars", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Variables needed to run soda scan", + ) + + ] + ) + + def __init__(self, name, pipeline_name, pipeline, job_config): + super().__init__(name, pipeline_name, pipeline, job_config) + + self._executable = self.executable or conf.SODA_DEFAULT_EXECUTABLE + self._executable_prefix = self.executable_prefix or conf.SODA_DEFAULT_EXECUTABLE_PREFIX + self._absolute_job_name = self._absolute_job_name or conf.SODA_DEFAULT_JOB_NAME + self._project_dir = self.parse_attribute("project_dir") or conf.SODA_DEFAULT_PROJECT_DIR + self._profiles_dir = self.parse_attribute("profiles_dir") or conf.SODA_DEFAULT_PROFILES_DIR + self._profile_name = self.parse_attribute("profile_name") or conf.SODA_DEFAULT_PROFILE_NAME + self._output_table = self.parse_attribute("output_path") or conf.SODA_DEFAULT_OUTPUT_TABLE + self._output_s3_path = self.parse_attribute("output_s3_path") or conf.SODA_DEFAULT_OUTPUT_S3_PATH + + self._table_name = self.parse_attribute("table_name") + self._model_name = self.parse_attribute("model_name") + self._vars = self.parse_attribute("vars") + + + if self._table_name and self._model_name: + raise ValueError(f"SodaTask: {self._name} table_name and model_name are mutually exclusive") + + + + @property + def project_dir(self): + return self._project_dir + + @property + def profiles_dir(self): + return self._profiles_dir + + @property + def profile_name(self): + return self._profile_name + + @property + def output_table(self): + return self._output_table + + @property + def output_s3_path(self): + return self._output_s3_path + + @property + def table_name(self): + return self._table_name + + @property + def model_name(self): + return self._model_name + + @property + def vars(self): + return self._vars + From 2f7cc55c4731b7b0d0c1c66c9a812d789ba43cc9 Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Wed, 19 Mar 2025 14:40:41 +0100 Subject: [PATCH 2/3] Fixing configs --- dagger/conf.py | 17 +++++++++-------- dagger/dagger_config.yaml | 10 ++++++++++ dagger/pipeline/tasks/soda_task.py | 17 +++++++++++------ 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/dagger/conf.py b/dagger/conf.py index c024e6b..ae3d9ec 100644 --- a/dagger/conf.py +++ b/dagger/conf.py @@ -112,11 +112,12 @@ REVERSE_ETL_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None) # Soda parameters -SODA_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None) -SODA_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None) -SODA_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None) -SODA_DEFAULT_PROJECT_DIR = reverse_etl_config.get('default_project_dir', None) -SODA_DEFAULT_PROFILES_DIR = reverse_etl_config.get('default_profiles_dir', None) -SODA_DEFAULT_PROFILE_NAME = reverse_etl_config.get('default_profile_name', None) -SODA_DEFAULT_OUTPUT_TABLE = reverse_etl_config.get('default_output_table', None) -SODA_DEFAULT_OUTPUT_S3_PATH = reverse_etl_config.get('default_output_s3_path', None) +soda_config = config.get('soda', None) or {} +SODA_DEFAULT_JOB_NAME = soda_config.get('default_job_name', None) +SODA_DEFAULT_EXECUTABLE_PREFIX = soda_config.get('default_executable_prefix', None) +SODA_DEFAULT_EXECUTABLE = soda_config.get('default_executable', None) +SODA_DEFAULT_PROJECT_DIR = soda_config.get('default_project_dir', None) +SODA_DEFAULT_PROFILES_DIR = soda_config.get('default_profiles_dir', None) +SODA_DEFAULT_PROFILE_NAME = soda_config.get('default_profile_name', None) +SODA_DEFAULT_OUTPUT_TABLE = soda_config.get('default_output_table', None) +SODA_DEFAULT_OUTPUT_S3_PATH = soda_config.get('default_output_s3_path', None) diff --git a/dagger/dagger_config.yaml b/dagger/dagger_config.yaml index 38abccd..209f110 100644 --- a/dagger/dagger_config.yaml +++ b/dagger/dagger_config.yaml @@ -67,3 +67,13 @@ reverse_etl: # default_job_name: # default_executable_prefix: # default_executable: + +soda: +# default_job_name: +# default_executable_prefix: +# default_executable: +# default_project_dir: +# default_profiles_dir: +# default_profile_name: +# default_output_table: +# default_output_s3_path: \ No newline at end of file diff --git a/dagger/pipeline/tasks/soda_task.py b/dagger/pipeline/tasks/soda_task.py index 25a90f7..7aeebf9 100644 --- a/dagger/pipeline/tasks/soda_task.py +++ b/dagger/pipeline/tasks/soda_task.py @@ -24,20 +24,20 @@ def init_attributes(cls, orig_cls): Attribute( attribute_name="project_dir", parent_fields=["task_parameters"], - required = True, + required = False, validator=str, comment="Directory containing the dbt_project.yml file", ), Attribute( attribute_name="profiles_dir", parent_fields=["task_parameters"], - required=True, + required=False, comment="Directory containing the profiles.yml file", ), Attribute( attribute_name="profile_name", parent_fields=["task_parameters"], - required=True, + required=False, comment="Profile name to load from the profiles.yml file.", ), Attribute( @@ -65,7 +65,7 @@ def init_attributes(cls, orig_cls): attribute_name="output_s3_path", parent_fields=["task_parameters"], validator=str, - required=True, + required=False, comment="S3 location to upload the scan results", ), @@ -73,7 +73,7 @@ def init_attributes(cls, orig_cls): attribute_name="output_table", parent_fields=["task_parameters"], validator=str, - required=True, + required=False, comment="Athena table that will contain the scan results.", ), Attribute( @@ -96,8 +96,9 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._project_dir = self.parse_attribute("project_dir") or conf.SODA_DEFAULT_PROJECT_DIR self._profiles_dir = self.parse_attribute("profiles_dir") or conf.SODA_DEFAULT_PROFILES_DIR self._profile_name = self.parse_attribute("profile_name") or conf.SODA_DEFAULT_PROFILE_NAME - self._output_table = self.parse_attribute("output_path") or conf.SODA_DEFAULT_OUTPUT_TABLE + self._output_table = self.parse_attribute("output_table") or conf.SODA_DEFAULT_OUTPUT_TABLE self._output_s3_path = self.parse_attribute("output_s3_path") or conf.SODA_DEFAULT_OUTPUT_S3_PATH + self._target_name = self.parse_attribute("target_name") self._table_name = self.parse_attribute("table_name") self._model_name = self.parse_attribute("model_name") @@ -141,3 +142,7 @@ def model_name(self): def vars(self): return self._vars + @property + def target_name(self): + return self._target_name + From 47f4d83dc6533c7d61a72c87cc2be34f489d798d Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Mon, 24 Mar 2025 17:27:06 +0100 Subject: [PATCH 3/3] Remove unnecessary vars --- dagger/conf.py | 3 - .../airflow/operator_creators/soda_creator.py | 12 +--- dagger/dagger_config.yaml | 3 - dagger/pipeline/tasks/soda_task.py | 64 +------------------ 4 files changed, 3 insertions(+), 79 deletions(-) diff --git a/dagger/conf.py b/dagger/conf.py index ae3d9ec..b750d84 100644 --- a/dagger/conf.py +++ b/dagger/conf.py @@ -116,8 +116,5 @@ SODA_DEFAULT_JOB_NAME = soda_config.get('default_job_name', None) SODA_DEFAULT_EXECUTABLE_PREFIX = soda_config.get('default_executable_prefix', None) SODA_DEFAULT_EXECUTABLE = soda_config.get('default_executable', None) -SODA_DEFAULT_PROJECT_DIR = soda_config.get('default_project_dir', None) -SODA_DEFAULT_PROFILES_DIR = soda_config.get('default_profiles_dir', None) -SODA_DEFAULT_PROFILE_NAME = soda_config.get('default_profile_name', None) SODA_DEFAULT_OUTPUT_TABLE = soda_config.get('default_output_table', None) SODA_DEFAULT_OUTPUT_S3_PATH = soda_config.get('default_output_s3_path', None) diff --git a/dagger/dag_creator/airflow/operator_creators/soda_creator.py b/dagger/dag_creator/airflow/operator_creators/soda_creator.py index a0c593d..5f145c8 100644 --- a/dagger/dag_creator/airflow/operator_creators/soda_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/soda_creator.py @@ -12,12 +12,7 @@ def __init__(self, task, dag): super().__init__(task, dag) self._absolute_job_name = task.absolute_job_name - self._project_dir = task.project_dir - self._profiles_dir = task.profiles_dir - self._profile_name = task.profile_name - self._target_name = task.target_name self._table_name = task.table_name - self._model_name = task.model_name self._output_s3_path = task.output_s3_path self._output_table = task.output_table self._vars = task.vars @@ -25,17 +20,12 @@ def __init__(self, task, dag): def _generate_command(self): command = BatchCreator._generate_command(self) - command.append(f"--project_dir={self._project_dir}") - command.append(f"--profiles_dir={self._profiles_dir}") - command.append(f"--profile_name={self._profile_name}") - command.append(f"--target_name={self._target_name}") + command.append(f"--output_s3_path={self._output_s3_path}") command.append(f"--output_table={self._output_table}") if self._table_name: command.append(f"--table_name={self._table_name}") - if self._model_name: - command.append(f"--model_name={self._model_name}") if self._vars: command.append(f"--vars={self._vars}") return command diff --git a/dagger/dagger_config.yaml b/dagger/dagger_config.yaml index 209f110..c7b291a 100644 --- a/dagger/dagger_config.yaml +++ b/dagger/dagger_config.yaml @@ -72,8 +72,5 @@ soda: # default_job_name: # default_executable_prefix: # default_executable: -# default_project_dir: -# default_profiles_dir: -# default_profile_name: # default_output_table: # default_output_s3_path: \ No newline at end of file diff --git a/dagger/pipeline/tasks/soda_task.py b/dagger/pipeline/tasks/soda_task.py index 7aeebf9..443f98a 100644 --- a/dagger/pipeline/tasks/soda_task.py +++ b/dagger/pipeline/tasks/soda_task.py @@ -21,45 +21,13 @@ def init_attributes(cls, orig_cls): parent_fields=["task_parameters"], comment="E.g.: my_code.py", ), - Attribute( - attribute_name="project_dir", - parent_fields=["task_parameters"], - required = False, - validator=str, - comment="Directory containing the dbt_project.yml file", - ), - Attribute( - attribute_name="profiles_dir", - parent_fields=["task_parameters"], - required=False, - comment="Directory containing the profiles.yml file", - ), - Attribute( - attribute_name="profile_name", - parent_fields=["task_parameters"], - required=False, - comment="Profile name to load from the profiles.yml file.", - ), - Attribute( - attribute_name="target_name", - parent_fields=["task_parameters"], - validator=str, - required=True, - comment="Target to load for the given profile. By default use 'ENV' environment variable.", - ), Attribute( attribute_name="table_name", parent_fields=["task_parameters"], validator=str, required=False, - comment="Full table name in the format 'database.schema.table'", - ), - Attribute( - attribute_name="model_name", - parent_fields=["task_parameters"], - validator=str, - required=False, - comment="Name of dbt model to be scanned by soda", + comment="Full table name in the format 'database.schema.table' By default it is" + " set to the name of the input .", ), Attribute( attribute_name="output_s3_path", @@ -93,34 +61,13 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._executable = self.executable or conf.SODA_DEFAULT_EXECUTABLE self._executable_prefix = self.executable_prefix or conf.SODA_DEFAULT_EXECUTABLE_PREFIX self._absolute_job_name = self._absolute_job_name or conf.SODA_DEFAULT_JOB_NAME - self._project_dir = self.parse_attribute("project_dir") or conf.SODA_DEFAULT_PROJECT_DIR - self._profiles_dir = self.parse_attribute("profiles_dir") or conf.SODA_DEFAULT_PROFILES_DIR - self._profile_name = self.parse_attribute("profile_name") or conf.SODA_DEFAULT_PROFILE_NAME self._output_table = self.parse_attribute("output_table") or conf.SODA_DEFAULT_OUTPUT_TABLE self._output_s3_path = self.parse_attribute("output_s3_path") or conf.SODA_DEFAULT_OUTPUT_S3_PATH - self._target_name = self.parse_attribute("target_name") - self._table_name = self.parse_attribute("table_name") - self._model_name = self.parse_attribute("model_name") self._vars = self.parse_attribute("vars") - if self._table_name and self._model_name: - raise ValueError(f"SodaTask: {self._name} table_name and model_name are mutually exclusive") - - - @property - def project_dir(self): - return self._project_dir - - @property - def profiles_dir(self): - return self._profiles_dir - - @property - def profile_name(self): - return self._profile_name @property def output_table(self): @@ -134,15 +81,8 @@ def output_s3_path(self): def table_name(self): return self._table_name - @property - def model_name(self): - return self._model_name - @property def vars(self): return self._vars - @property - def target_name(self): - return self._target_name