diff --git a/dagger/conf.py b/dagger/conf.py index df2ab8e..b750d84 100644 --- a/dagger/conf.py +++ b/dagger/conf.py @@ -110,3 +110,11 @@ REVERSE_ETL_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None) REVERSE_ETL_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None) REVERSE_ETL_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None) + +# Soda parameters +soda_config = config.get('soda', None) or {} +SODA_DEFAULT_JOB_NAME = soda_config.get('default_job_name', None) +SODA_DEFAULT_EXECUTABLE_PREFIX = soda_config.get('default_executable_prefix', None) +SODA_DEFAULT_EXECUTABLE = soda_config.get('default_executable', None) +SODA_DEFAULT_OUTPUT_TABLE = soda_config.get('default_output_table', None) +SODA_DEFAULT_OUTPUT_S3_PATH = soda_config.get('default_output_s3_path', None) diff --git a/dagger/dag_creator/airflow/operator_creators/soda_creator.py b/dagger/dag_creator/airflow/operator_creators/soda_creator.py new file mode 100644 index 0000000..5f145c8 --- /dev/null +++ b/dagger/dag_creator/airflow/operator_creators/soda_creator.py @@ -0,0 +1,49 @@ +import base64 + +from dagger.dag_creator.airflow.operator_creators.batch_creator import BatchCreator +from dagger.dag_creator.airflow.operators.soda_batch import SodaBatchOperator +import json + + +class SodaCreator(BatchCreator): + ref_name = "soda" + + def __init__(self, task, dag): + super().__init__(task, dag) + + self._absolute_job_name = task.absolute_job_name + self._table_name = task.table_name + self._output_s3_path = task.output_s3_path + self._output_table = task.output_table + self._vars = task.vars + + def _generate_command(self): + command = BatchCreator._generate_command(self) + + + command.append(f"--output_s3_path={self._output_s3_path}") + command.append(f"--output_table={self._output_table}") + + if self._table_name: + command.append(f"--table_name={self._table_name}") + if self._vars: + command.append(f"--vars={self._vars}") + return command + + def _create_operator(self, **kwargs): + overrides = self._task.overrides + overrides.update({"command": self._generate_command()}) + + job_name = self._validate_job_name(self._task.job_name, self._task.absolute_job_name) + batch_op = SodaBatchOperator( + dag=self._dag, + task_id=self._task.name, + job_name=self._task.name, + job_definition=job_name, + region_name=self._task.region_name, + job_queue=self._task.job_queue, + container_overrides=overrides, + awslogs_enabled=True, + **kwargs, + ) + return batch_op diff --git a/dagger/dag_creator/airflow/operator_factory.py b/dagger/dag_creator/airflow/operator_factory.py index f610f1e..2a1654a 100644 --- a/dagger/dag_creator/airflow/operator_factory.py +++ b/dagger/dag_creator/airflow/operator_factory.py @@ -13,6 +13,7 @@ reverse_etl_creator, spark_creator, sqoop_creator, + soda_creator, ) from dagger.dag_creator.airflow.utils.operator_factories import make_control_flow from dagger.utilities.classes import get_deep_obj_subclasses diff --git a/dagger/dag_creator/airflow/operators/soda_batch.py b/dagger/dag_creator/airflow/operators/soda_batch.py new file mode 100644 index 0000000..d67a26c --- /dev/null +++ b/dagger/dag_creator/airflow/operators/soda_batch.py @@ -0,0 +1,5 @@ +from dagger.dag_creator.airflow.operators.awsbatch_operator import AWSBatchOperator + +class SodaBatchOperator(AWSBatchOperator): + custom_operator_name = 'Soda' + ui_color = "#e4f0e7" diff --git a/dagger/dagger_config.yaml b/dagger/dagger_config.yaml index 38abccd..c7b291a 100644 --- a/dagger/dagger_config.yaml +++ b/dagger/dagger_config.yaml @@ -67,3 +67,10 @@ reverse_etl: # default_job_name: # default_executable_prefix: # default_executable: + +soda: +# default_job_name: +# default_executable_prefix: +# default_executable: +# default_output_table: +# default_output_s3_path: \ No newline at end of file diff --git a/dagger/pipeline/task_factory.py b/dagger/pipeline/task_factory.py index d8a1e53..9ed79e7 100644 --- a/dagger/pipeline/task_factory.py +++ b/dagger/pipeline/task_factory.py @@ -12,6 +12,7 @@ reverse_etl_task, spark_task, sqoop_task, + soda_task ) from dagger.utilities.classes import get_deep_obj_subclasses diff --git a/dagger/pipeline/tasks/soda_task.py b/dagger/pipeline/tasks/soda_task.py new file mode 100644 index 0000000..443f98a --- /dev/null +++ b/dagger/pipeline/tasks/soda_task.py @@ -0,0 +1,88 @@ +from dagger.pipeline.tasks.batch_task import BatchTask +from dagger.utilities.config_validator import Attribute +from dagger import conf + +class SodaTask(BatchTask): + ref_name = "soda" + + @classmethod + def init_attributes(cls, orig_cls): + cls.add_config_attributes( + [ + Attribute( + attribute_name="executable_prefix", + required=False, + parent_fields=["task_parameters"], + comment="E.g.: python", + ), + Attribute( + attribute_name="executable", + required=False, + parent_fields=["task_parameters"], + comment="E.g.: my_code.py", + ), + Attribute( + attribute_name="table_name", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Full table name in the format 'database.schema.table' By default it is" + " set to the name of the input .", + ), + Attribute( + attribute_name="output_s3_path", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="S3 location to upload the scan results", + + ), + Attribute( + attribute_name="output_table", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Athena table that will contain the scan results.", + ), + Attribute( + attribute_name="vars", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Variables needed to run soda scan", + ) + + ] + ) + + def __init__(self, name, pipeline_name, pipeline, job_config): + super().__init__(name, pipeline_name, pipeline, job_config) + + self._executable = self.executable or conf.SODA_DEFAULT_EXECUTABLE + self._executable_prefix = self.executable_prefix or conf.SODA_DEFAULT_EXECUTABLE_PREFIX + self._absolute_job_name = self._absolute_job_name or conf.SODA_DEFAULT_JOB_NAME + self._output_table = self.parse_attribute("output_table") or conf.SODA_DEFAULT_OUTPUT_TABLE + self._output_s3_path = self.parse_attribute("output_s3_path") or conf.SODA_DEFAULT_OUTPUT_S3_PATH + self._table_name = self.parse_attribute("table_name") + self._vars = self.parse_attribute("vars") + + + + + @property + def output_table(self): + return self._output_table + + @property + def output_s3_path(self): + return self._output_s3_path + + @property + def table_name(self): + return self._table_name + + @property + def vars(self): + return self._vars + +