From 75bb61f7b773a0fac2af8a22e2f7cc0b06ddc54b Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Fri, 28 Mar 2025 15:48:19 +0100 Subject: [PATCH 1/4] Adding column mapping and case conversion --- .../operator_creators/reverse_etl_creator.py | 9 +++++ dagger/pipeline/tasks/reverse_etl_task.py | 37 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 7ac32d7..6ee4637 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -28,6 +28,9 @@ def __init__(self, task, dag): self._output_type = task.output_type self._region_name = task.region_name self._full_refresh = task.full_refresh + self._target_case = task.target_case + self._source_case = task.source_case + self._column_mapping = task.column_mapping def _generate_command(self): command = BatchCreator._generate_command(self) @@ -59,6 +62,12 @@ def _generate_command(self): command.append(f"--region_name={self._region_name}") if self._full_refresh: command.append(f"--full_refresh={self._full_refresh}") + if self._target_case: + command.append(f"--target_case={self._target_case}") + if self._source_case: + command.append(f"--source_case={self._source_case}") + if self._column_mapping: + command.append(f"--column_mapping={self._column_mapping}") return command diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index 30ce352..d4ead6a 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -122,8 +122,30 @@ def init_attributes(cls, orig_cls): validator=bool, required=False, comment="If set to True, the job will perform a full refresh instead of an incremental one", + ), + Attribute( + attribute_name="target_case", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Target column case for DynamoDB. 'snake' leaves columns in snake_case; 'camel' converts to camelCase.", + ), + Attribute( + attribute_name="source_case", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="Source dataset column case. Specify the case of the incoming dataset." + ), + Attribute( + attribute_name="column_mapping", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional JSON string for column mappings. Example: \'{"id": "chat_id"}\'', ) + ] ) @@ -148,6 +170,9 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._from_time = self.parse_attribute("from_time") self._days_to_live = self.parse_attribute("days_to_live") self._full_refresh = self.parse_attribute("full_refresh") + self._target_case = self.parse_attribute("target_case") + self._source_case = self.parse_attribute("source_case") + self._column_mapping = self.parse_attribute("column_mapping") if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") @@ -249,3 +274,15 @@ def region_name(self): @property def full_refresh(self): return self._full_refresh + + @property + def target_case(self): + return self._target_case + + @property + def source_case(self): + return self._source_case + + @property + def column_mapping(self): + return self._column_mapping From 57f3c35e90bb13b9636b0a43fa0b482707d59132 Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Tue, 1 Apr 2025 11:31:13 +0200 Subject: [PATCH 2/4] Adjust reverse_etl --- .../operator_creators/reverse_etl_creator.py | 16 +++- dagger/pipeline/tasks/reverse_etl_task.py | 73 ++++++++++++------- 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 6ee4637..0bd9355 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -18,8 +18,6 @@ def __init__(self, task, dag): self._primary_id_column = task.primary_id_column self._secondary_id_column = task.secondary_id_column self._custom_id_column = task.custom_id_column - self._model_name = task.model_name - self._project_name = task.project_name self._is_deleted_column = task.is_deleted_column self._hash_column = task.hash_column self._updated_at_column = task.updated_at_column @@ -31,6 +29,10 @@ def __init__(self, task, dag): self._target_case = task.target_case self._source_case = task.source_case self._column_mapping = task.column_mapping + self._glue_registry_name = self.parse_attribute("glue_registry_name") + self._glue_schema_name = self.parse_attribute("glue_schema_name") + self._sort_key = self.parse_attribute("sort_key") + self._custom_columns = self.parse_attribute("custom_columns") def _generate_command(self): command = BatchCreator._generate_command(self) @@ -38,9 +40,8 @@ def _generate_command(self): command.append(f"--num_threads={self._num_threads}") command.append(f"--batch_size={self._batch_size}") command.append(f"--primary_id_column={self._primary_id_column}") - command.append(f"--model_name={self._model_name}") - command.append(f"--project_name={self._project_name}") command.append(f"--output_type={self._output_type}") + command.append(f"--glue_registry_name={self._glue_registry_name}") if self._assume_role_arn: command.append(f"--assume_role_arn={self._assume_role_arn}") @@ -68,6 +69,13 @@ def _generate_command(self): command.append(f"--source_case={self._source_case}") if self._column_mapping: command.append(f"--column_mapping={self._column_mapping}") + if self._glue_schema_name: + command.append(f"--glue_schema_name={self._glue_schema_name}") + if self._sort_key: + command.append(f"--sort_key={self._sort_key}") + if self._custom_columns: + command.append(f"--custom_columns={json.dumps(self._custom_columns)}") + return command diff --git a/dagger/pipeline/tasks/reverse_etl_task.py b/dagger/pipeline/tasks/reverse_etl_task.py index d4ead6a..c0c6290 100644 --- a/dagger/pipeline/tasks/reverse_etl_task.py +++ b/dagger/pipeline/tasks/reverse_etl_task.py @@ -60,21 +60,6 @@ def init_attributes(cls, orig_cls): required=False, comment="The custom key column to use for the job", ), - Attribute( - attribute_name="model_name", - parent_fields=["task_parameters"], - validator=str, - required=False, - comment="The name of the model. This is going to be a column on the target table. By default it is" - " set to the name of the input .", - ), - Attribute( - attribute_name="project_name", - parent_fields=["task_parameters"], - validator=str, - required=True, - comment="The name of the project. This is going to be a column on the target table.", - ), Attribute( attribute_name="is_deleted_column", parent_fields=["task_parameters"], @@ -143,6 +128,34 @@ def init_attributes(cls, orig_cls): validator=str, required=False, comment='Optional JSON string for column mappings. Example: \'{"id": "chat_id"}\'', + ), + Attribute( + attribute_name="glue_registry_name", + parent_fields=["task_parameters"], + validator=str, + required=True, + comment='AWS Glue Registry name', + ), + Attribute( + attribute_name="glue_schema_name", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='AWS Glue Schema name. output_name will be used if not provided', + ), + Attribute( + attribute_name="sort_key", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional JSON string for sort key composition using #.join(). Example: \'{"sort_key": ["project", "model_name", "secondary_id", "custom_id"]}\'', + ), + Attribute( + attribute_name="custom_columns", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment='Optional JSON string for additional custom columns from static values. Example: \'{"custom_project": "ProjectXYZ", "model_name": "ModelABC"}\'' ) @@ -162,8 +175,6 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._primary_id_column = self.parse_attribute("primary_id_column") self._secondary_id_column = self.parse_attribute("secondary_id_column") self._custom_id_column = self.parse_attribute("custom_id_column") - self._model_name = self.parse_attribute("model_name") - self._project_name = self.parse_attribute("project_name") self._is_deleted_column = self.parse_attribute("is_deleted_column") self._hash_column = self.parse_attribute("hash_column") self._updated_at_column = self.parse_attribute("updated_at_column") @@ -173,6 +184,10 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._target_case = self.parse_attribute("target_case") self._source_case = self.parse_attribute("source_case") self._column_mapping = self.parse_attribute("column_mapping") + self._glue_registry_name = self.parse_attribute("glue_registry_name") + self._glue_schema_name = self.parse_attribute("glue_schema_name") + self._sort_key = self.parse_attribute("sort_key") + self._custom_columns = self.parse_attribute("custom_columns") if self._hash_column and self._updated_at_column: raise ValueError(f"ReverseETLTask: {self._name} hash_column and updated_at_column are mutually exclusive") @@ -235,14 +250,6 @@ def secondary_id_column(self): def custom_id_column(self): return self._custom_id_column - @property - def model_name(self): - return self._model_name - - @property - def project_name(self): - return self._project_name - @property def is_deleted_column(self): return self._is_deleted_column @@ -286,3 +293,19 @@ def source_case(self): @property def column_mapping(self): return self._column_mapping + + @property + def glue_registry_name(self): + return self._glue_registry_name + + @property + def glue_schema_name(self): + return self._glue_schema_name + + @property + def sort_key(self): + return self._sort_key + + @property + def custom_columns(self): + return self._custom_columns From 7e10650b96ece93b4eb53c6265da73de2b8573c7 Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Tue, 1 Apr 2025 12:07:56 +0200 Subject: [PATCH 3/4] fixing error --- .../airflow/operator_creators/reverse_etl_creator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 0bd9355..8c3385d 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -29,10 +29,10 @@ def __init__(self, task, dag): self._target_case = task.target_case self._source_case = task.source_case self._column_mapping = task.column_mapping - self._glue_registry_name = self.parse_attribute("glue_registry_name") - self._glue_schema_name = self.parse_attribute("glue_schema_name") - self._sort_key = self.parse_attribute("sort_key") - self._custom_columns = self.parse_attribute("custom_columns") + self._glue_registry_name = task.glue_registry_name + self._glue_schema_name = task.glue_schema_name + self._sort_key = task.sort_key + self._custom_columns = task.custom_columns def _generate_command(self): command = BatchCreator._generate_command(self) From 78ce5b2f8f2a7798e8eb021f1f82a4146974c8cc Mon Sep 17 00:00:00 2001 From: raimundovidaljunior Date: Tue, 1 Apr 2025 14:28:23 +0200 Subject: [PATCH 4/4] fixing custom columns --- .../airflow/operator_creators/reverse_etl_creator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py index 8c3385d..5ab3910 100644 --- a/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/reverse_etl_creator.py @@ -74,7 +74,7 @@ def _generate_command(self): if self._sort_key: command.append(f"--sort_key={self._sort_key}") if self._custom_columns: - command.append(f"--custom_columns={json.dumps(self._custom_columns)}") + command.append(f"--custom_columns={self._custom_columns}") return command