From 0e008dc0b44f7d645151731ce050bec44bdafb91 Mon Sep 17 00:00:00 2001 From: claudiazi Date: Tue, 22 Apr 2025 15:32:12 +0200 Subject: [PATCH 1/4] feat: add param _columns_to_include and _columns_to_exclude in the reverse_etl task --- .../operator_creators/reverse_etl_creator.py | 7 ++++- dagger/pipeline/tasks/reverse_etl_task.py | 26 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 5ab3910..b8b90d5 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -33,6 +33,8 @@ def __init__(self, task, dag): self._glue_schema_name = task.glue_schema_name self._sort_key = task.sort_key self._custom_columns = task.custom_columns + self._columns_to_include = task.columns_to_include + self._columns_to_exclude = task.columns_to_exclude def _generate_command(self): command = BatchCreator._generate_command(self) @@ -75,7 +77,10 @@ def _generate_command(self): command.append(f"--sort_key={self._sort_key}") if self._custom_columns: command.append(f"--custom_columns={self._custom_columns}") - + if self._columns_to_include: + command.append(f"--columns_to_include={self._columns_to_include}") + if self._columns_to_exclude: + command.append(f"--columns_to_exclude={self._columns_to_exclude}") return command diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index c0c6290..59463ab 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -156,9 +156,21 @@ def init_attributes(cls, orig_cls): validator=str, required=False, comment='Optional JSON string for additional custom columns from static values. Example: \'{"custom_project": "ProjectXYZ", "model_name": "ModelABC"}\'' + ), + Attribute( + attribute_name="columns_to_include", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional comma-separated list of columns to include in the job. Example: \'column1,column2,column3\', if not provided, all columns will be included', + ), + Attribute( + attribute_name="columns_to_exclude", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional comma-separated list of columns to exclude from the job. Example: \'column1,column2,column3\', if not provided, all columns will be included', ) - - ] ) @@ -188,6 +200,8 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._glue_schema_name = self.parse_attribute("glue_schema_name") self._sort_key = self.parse_attribute("sort_key") self._custom_columns = self.parse_attribute("custom_columns") + self._columns_to_include = self.parse_attribute("columns_to_include") + self._columns_to_exclude = self.parse_attribute("columns_to_exclude") if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") @@ -309,3 +323,11 @@ def sort_key(self): @property def custom_columns(self): return self._custom_columns + + @property + def columns_to_include(self): + return self._columns_to_include + + @property + def columns_to_exclude(self): + return self._columns_to_exclude From 001f19c805789ca6daf35135ba870ff9a2b10acd Mon Sep 17 00:00:00 2001 From: claudiazi Date: Wed, 23 Apr 2025 10:19:52 +0200 Subject: [PATCH 2/4] chore: improve the naming of new params --- dagger/pipeline/tasks/reverse_etl_task.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index 59463ab..7442d42 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -158,18 +158,18 @@ def init_attributes(cls, orig_cls): comment='Optional JSON string for additional custom columns from static values. Example: \'{"custom_project": "ProjectXYZ", "model_name": "ModelABC"}\'' ), Attribute( - attribute_name="columns_to_include", + attribute_name="input_table_columns_to_include", parent_fields=["task_parameters"], validator=str, required=False, - comment='Optional comma-separated list of columns to include in the job. Example: \'column1,column2,column3\', if not provided, all columns will be included', + comment='Optional comma-separated list of columns to include in the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included', ), Attribute( - attribute_name="columns_to_exclude", + attribute_name="input_table_columns_to_exclude", parent_fields=["task_parameters"], validator=str, required=False, - comment='Optional comma-separated list of columns to exclude from the job. Example: \'column1,column2,column3\', if not provided, all columns will be included', + comment='Optional comma-separated list of columns to exclude from the job. Example: \'column1,column2,column3\', if not provided, all columns of input table will be included', ) ] ) @@ -200,12 +200,15 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._glue_schema_name = self.parse_attribute("glue_schema_name") self._sort_key = self.parse_attribute("sort_key") self._custom_columns = self.parse_attribute("custom_columns") - self._columns_to_include = self.parse_attribute("columns_to_include") - self._columns_to_exclude = self.parse_attribute("columns_to_exclude") + self._input_table_columns_to_include = self.parse_attribute("input_table_columns_to_include") + self._input_table_columns_to_exclude = self.parse_attribute("input_table_columns_to_exclude") if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") + if self._input_table_columns_to_include and not self._input_table_columns_to_exclude: + raise ValueError(f"ReverseETLTask: {self._name} _input_table_columns_to_include and _input_table_columns_to_exclude are mutually exclusive") + if self._hash_column or self._updated_at_column: if not self._from_time: raise ValueError(f"ReverseETLTask: {self._name} from_time is required when hash_column or updated_at_column is provided") @@ -325,9 +328,9 @@ def custom_columns(self): return self._custom_columns @property - def columns_to_include(self): - return self._columns_to_include + def input_table_columns_to_include(self): + return self._input_table_columns_to_include @property - def columns_to_exclude(self): - return self._columns_to_exclude + def input_table_columns_to_exclude(self): + return self._input_table_columns_to_exclude From b3a96dc277ad55deb4b6536f75360cfb99147d42 Mon Sep 17 00:00:00 2001 From: claudiazi Date: Wed, 23 Apr 2025 10:35:43 +0200 Subject: [PATCH 3/4] fix: error message for _input_table_columns_to_include and _input_table_columns_to_exclude --- dagger/pipeline/tasks/reverse_etl_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index 7442d42..6160bfb 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -206,7 +206,7 @@ def __init__(self, name, pipeline_name, pipeline, job_config): if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") - if self._input_table_columns_to_include and not self._input_table_columns_to_exclude: + if self._input_table_columns_to_include and self._input_table_columns_to_exclude: raise ValueError(f"ReverseETLTask: {self._name} _input_table_columns_to_include and _input_table_columns_to_exclude are mutually exclusive") if self._hash_column or self._updated_at_column: From 18d0ca2efeee4084e3fbc2ef60c2b314a676351a Mon Sep 17 00:00:00 2001 From: claudiazi Date: Wed, 23 Apr 2025 10:46:09 +0200 Subject: [PATCH 4/4] chore: improve the naming of params --- .../airflow/operator_creators/reverse_etl_creator.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index b8b90d5..8446bcb 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -33,8 +33,8 @@ def __init__(self, task, dag): self._glue_schema_name = task.glue_schema_name self._sort_key = task.sort_key self._custom_columns = task.custom_columns - self._columns_to_include = task.columns_to_include - self._columns_to_exclude = task.columns_to_exclude + self._input_table_columns_to_include = task.input_table_columns_to_include + self._input_table_columns_to_exclude = task.input_table_columns_to_exclude def _generate_command(self): command = BatchCreator._generate_command(self) @@ -77,10 +77,10 @@ def _generate_command(self): command.append(f"--sort_key={self._sort_key}") if self._custom_columns: command.append(f"--custom_columns={self._custom_columns}") - if self._columns_to_include: - command.append(f"--columns_to_include={self._columns_to_include}") - if self._columns_to_exclude: - command.append(f"--columns_to_exclude={self._columns_to_exclude}") + if self._input_table_columns_to_include: + command.append(f"--input_table_columns_to_include={self._input_table_columns_to_include}") + if self._input_table_columns_to_exclude: + command.append(f"--input_table_columns_to_exclude={self._input_table_columns_to_exclude}") return command