diff --git a/.github/workflows/ci-unit-tests.yml b/.github/workflows/ci-unit-tests.yml new file mode 100644 index 00000000..a968048b --- /dev/null +++ b/.github/workflows/ci-unit-tests.yml @@ -0,0 +1,41 @@ +name: CI Unit Tests + +on: + push: + branches: + - main + - 'releases/**' + pull_request: + branches: + - main + - 'releases/**' + workflow_dispatch: + +jobs: + unit-tests: + name: Unit tests (${{ matrix.os }} / Python ${{ matrix.python }}) + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python: ['3.10', '3.11', '3.12', '3.13'] + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + + - name: Install dependencies + shell: bash + run: | + python -m pip install --upgrade pip + pip install -r requirements_dev.txt + + - name: Run unit tests + shell: bash + run: | + pytest tests/unit/ -v diff --git a/CHANGELOG.md b/CHANGELOG.md index a5cd4776..98c6a173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,18 @@ ## dbt-teradata 1.0.0a ### Features +- Open Table Format (OTF) support via Teradata DATALAKE objects. Models can target Iceberg/Delta Lake tables using 3-part naming (`."".""`) by setting `catalog_name` and registering a `datalake` catalog integration in `catalogs.yml`. Supports model-level `partitioned_by`, `sorted_by`, `tblproperties`, and `purge_mode` configs. OTF tables defined as sources in `sources.yml` (with differing `database` and `schema`) are auto-detected and rendered with 3-part naming. +- `grants`, `persist_docs`, and adapter cache management are now applied to OTF table materializations, matching the standard table materialization. ### Fixes +- `purge_mode` default changed from `PURGE ALL` to `NO PURGE` to avoid data loss on first DROP of an OTF model whose target table already holds data. +- `purge_mode` validation is now case-insensitive and centralised; invalid values raise a clear compile-time error. +- `add_query` now matches `DROP/DELETE DATABASE` case-insensitively, consistent with the `DROP TABLE/VIEW` branch. ### Docs ### Under the hood +- Adapter suppresses Teradata error 7825 ("OTF table not found in external catalog") on `DROP TABLE /*+ IF EXISTS */`, matching the existing semantics for errors 3807/3853/3854. +- OTF DROP logic deduplicated across `teradata__drop_relation` and `teradata__create_datalake_table_as` via shared `teradata__drop_otf_table` / `teradata__validate_purge_mode` helper macros. +- Unsupported config combinations on the OTF path (`table_kind`, `table_option`, `with_statistics`, `index`, `contract.enforced`) now raise compile-time errors instead of being silently ignored. +- `TeradataRelation.render()` for OTF relations no longer relies on the private `BaseRelation._render_iterator()` API. diff --git a/README.md b/README.md index 4a891973..637541a0 100644 --- a/README.md +++ b/README.md @@ -851,6 +851,121 @@ sources: data_type: CHAR(1) ``` +## Open Table Format (OTF) support + +dbt-teradata can create and read Iceberg / Delta Lake tables via Teradata's native Open Table Format support. OTF tables live in an external object store (S3, Azure, GCS) and are registered with an external catalog (AWS Glue, Unity Catalog, etc.); Teradata accesses them through a pre-created `DATALAKE` object that encapsulates the catalog type, authentication, and object-store path. + +### Pre-requisites + +The following must exist on the Teradata side **before** running dbt: + +* A `DATALAKE` object created in Teradata (e.g. via `CREATE DATALAKE my_lake ...`). dbt does not create DATALAKEs. +* A database inside that DATALAKE (the "OTF database") that will hold the OTF tables. dbt does not create this either. +* The dbt user must have permission to `CREATE TABLE` / `DROP TABLE` within the OTF database, and `SELECT` permission to read OTF tables defined elsewhere. + +Refer to the Teradata documentation for `CREATE DATALAKE` syntax and the specific permissions required for your catalog backend. + +### Configuration + +Register the catalog integration in a `catalogs.yml` file at your dbt project root: + +```yaml +catalogs: + - name: my_otf_catalog + active_write_integration: td_datalake + write_integrations: + - name: td_datalake + catalog_type: datalake + adapter_properties: + datalake_name: my_lake # the pre-created DATALAKE object + otf_database: my_otf_db # the pre-created OTF database within it +``` + +`catalog_type` must be `datalake`. `datalake_name` and `otf_database` are both required and validated at integration registration time. + +Reference the catalog from a model via `catalog_name`: + +```sql +-- models/sales_iceberg.sql +{{ config( + materialized='table', + catalog_name='my_otf_catalog', + partitioned_by='YEAR(order_date), country', + sorted_by='customer_id ASC', + tblproperties="'write.format.default'='parquet', 'gc.enabled'='true'", + purge_mode='NO PURGE' +) }} +select + order_id, + customer_id, + country, + order_date, + amount +from {{ ref('stg_orders') }} +``` + +### Naming conventions: 2-part vs 3-part + +Teradata's native objects use **2-part** naming (`database.object`); in dbt-teradata, the `database` field is unused and the `schema` field carries the Teradata database name. OTF tables are the **only** Teradata objects that use **3-part** naming (`.""."
"`). + +For OTF tables, dbt-teradata maps: + +| dbt field | Teradata concept | +| ------------ | --------------------------------- | +| `database` | DATALAKE name (unquoted) | +| `schema` | OTF database name (quoted) | +| `identifier` | OTF table name (quoted) | + +When you set `catalog_name` on a model, dbt-teradata pulls `database` and `schema` from the registered catalog integration automatically. For an OTF table defined as a **source** (where there is no `catalog_name` model config), declare the `database` and `schema` explicitly in `sources.yml` — the adapter detects the 3-part shape (database ≠ schema) and renders it correctly: + +```yaml +version: 2 +sources: + - name: customer_otf + database: my_lake # DATALAKE name + schema: my_otf_db # OTF database name + tables: + - name: customer_iceberg +``` + +A `ref()` from another model then compiles to `my_lake."my_otf_db"."customer_iceberg"`. + +### Supported model config options + +| Option | Type | Description | +| ---------------- | ------- | ---------------------------------------------------------------------------------------------------------- | +| `catalog_name` | string | Name of the catalog integration from `catalogs.yml`. Required to mark a model as OTF. | +| `partitioned_by` | string | Iceberg/Delta partition expression, e.g. `'YEAR(dt), country'`. | +| `sorted_by` | string | Sort order, e.g. `'id ASC'`. | +| `tblproperties` | string | Iceberg/Delta table properties, e.g. `"'gc.enabled'='true'"`. | +| `purge_mode` | string | DROP behavior. `'NO PURGE'` (default; removes catalog entry only) or `'PURGE ALL'` (also deletes data files on the object store). Case-insensitive. | + +`persist_docs` and standard dbt cache management work on OTF models the same way they do on native tables. **`grants` is not supported on OTF tables** — Teradata does not allow `GRANT` on DATALAKE objects (access control is managed via AUTHORIZATION objects and external IAM/OAuth policies). Setting `grants` on an OTF model emits a warning and is otherwise ignored. + +### Limitations and trade-offs + +* **Non-atomic re-materialization.** OTF tables use 3-part naming that cannot be renamed via standard DDL, so the adapter cannot use the build-tmp-then-rename pattern that protects native tables on a failed CREATE. An OTF model is dropped before it is re-created — if the CREATE fails, the table is gone. Plan for `--full-refresh` workflows accordingly. +* **Model contracts are not supported on the OTF path.** Setting `contract.enforced: true` together with `catalog_name` raises a compile-time error. +* **Teradata-native table options are not supported.** Setting any of `table_kind`, `table_option`, `with_statistics`, or `index` together with `catalog_name` raises a compile-time error — these options describe native Teradata table storage and do not apply to Iceberg/Delta tables. +* **Only `catalog_type: datalake` is supported.** Other catalog types are rejected with a compile-time error. + +### Error 7825 suppression + +Teradata raises error 7825 ("OTF table not found in external catalog") when `DROP TABLE` is issued against an OTF table that no longer exists in the external catalog (e.g. Glue). dbt-teradata treats 7825 the same way it treats native errors 3807/3853/3854 — suppressed under `IF EXISTS` semantics — so re-running a dbt project after an OTF table has been deleted externally does not fail. + +### Testing OTF locally + +Functional tests for the OTF feature live in `tests/functional/adapter/test_otf_integration.py` and are gated on the following environment variables: + +```bash +export DBT_TERADATA_DATALAKE='my_lake' # pre-created DATALAKE name +export DBT_TERADATA_OTF_DATABASE='my_otf_db' # pre-created OTF database name +``` + +Combined with the standard `DBT_TERADATA_SERVER_NAME` / `DBT_TERADATA_USERNAME` / `DBT_TERADATA_PASSWORD` connection variables, `pytest tests/functional/adapter/test_otf_integration.py` will exercise: basic create, idempotency, cross-model `ref()`, source-based 3-part naming, grants warning, `purge_mode: 'NO PURGE'`, and compile-time guardrails. Without the OTF env vars set, all OTF integration tests are skipped. + +Pure unit tests (no Teradata required) live in `tests/unit/test_otf_catalogs.py` and run via `pytest tests/unit/`. + ## temporary_metadata_generation_schema (earlier fallback_schema) dbt-teradata internally created temporary tables to fetch the metadata of views for manifest and catalog creation. In case if user does not have permission to create tables on the schema they are working on, they can define a temporary_metadata_generation_schema(to which they have proper create and drop privileges) in dbt_project.yml as variable. diff --git a/dbt/adapters/teradata/catalogs.py b/dbt/adapters/teradata/catalogs.py new file mode 100644 index 00000000..f60aa769 --- /dev/null +++ b/dbt/adapters/teradata/catalogs.py @@ -0,0 +1,142 @@ +"""Catalog integrations for Teradata Open Table Format (OTF) support. + +This module provides the DATALAKE catalog integration that enables dbt to +create and manage Iceberg/Delta Lake tables via Teradata's native OTF support. +Tables are addressed using 3-part naming: .""."
". + +Configuration in catalogs.yml: + + catalogs: + - name: my_catalog + write_integrations: + - name: iceberg_glue + catalog_type: datalake + adapter_properties: + datalake_name: MyOTFLake # pre-created DATALAKE object + otf_database: my_otf_db # pre-created database within the DATALAKE + +Model-level config options (set via {{ config(...) }} in .sql files): + + partitioned_by -- Iceberg partition expression, e.g. 'YEAR(dt), country' + sorted_by -- Iceberg sort order, e.g. 'id ASC' + tblproperties -- Iceberg table properties, e.g. "'gc.enabled'='true'" + purge_mode -- DROP behavior: 'NO PURGE' (default) or 'PURGE ALL' + NO PURGE = remove catalog entry only, keep data files (safe default) + PURGE ALL = remove catalog entry AND delete data files on object store +""" + +from dataclasses import dataclass +from typing import Optional + +from dbt.adapters.catalogs import ( + CatalogIntegration, + CatalogIntegrationConfig, + CatalogRelation, + InvalidCatalogIntegrationConfigError, +) +from dbt.adapters.contracts.relation import RelationConfig + + +# --------------------------------------------------------------------------- +# Relation dataclass -- carries catalog metadata into Jinja macros +# --------------------------------------------------------------------------- + +@dataclass +class TeradataCatalogRelation(CatalogRelation): + """Relation metadata for DATALAKE-based OTF tables. + + Fields populated from catalogs.yml (via the integration): + catalog_type, catalog_name, table_format, file_format, + external_volume, datalake_name, otf_database + + Fields populated from model config (via build_relation): + partitioned_by, sorted_by, tblproperties, purge_mode + """ + catalog_type: Optional[str] = None + catalog_name: Optional[str] = None + table_format: Optional[str] = None + file_format: Optional[str] = None + external_volume: Optional[str] = None + datalake_name: Optional[str] = None + otf_database: Optional[str] = None + partitioned_by: Optional[str] = None + sorted_by: Optional[str] = None + tblproperties: Optional[str] = None + # Controls DROP TABLE behavior for OTF tables: + # 'NO PURGE' -- removes catalog entry only, data files remain (default; safe) + # 'PURGE ALL' -- removes catalog entry AND deletes data files on object store + purge_mode: Optional[str] = None + + +# --------------------------------------------------------------------------- +# DATALAKE integration -- 3-part naming (datalake."otf_db"."table") +# --------------------------------------------------------------------------- + +class TeradataDatalakeCatalogIntegration(CatalogIntegration): + """Catalog integration for Teradata DATALAKE objects. + + Works with any external catalog (AWS Glue, Unity Catalog, etc.) because + Teradata's DATALAKE object encapsulates the catalog type, auth, and + object store path. dbt only needs the datalake_name and otf_database + for 3-part naming. + + Generated SQL example: + DROP TABLE /*+ IF EXISTS */ .""."
" PURGE ALL; + CREATE TABLE .""."
" + PARTITIONED BY (...) + SORTED BY ... + TBLPROPERTIES(...) + AS (...) WITH DATA; + + Required adapter_properties in catalogs.yml: + datalake_name -- name of the pre-created DATALAKE object in Teradata + otf_database -- name of the pre-created database within the DATALAKE + """ + + catalog_type = "datalake" + allows_writes = True + table_format = "iceberg" + file_format = "parquet" + + def __init__(self, config: CatalogIntegrationConfig) -> None: + super().__init__(config) + # Restore class-level defaults if not provided in config + if config.file_format is None: + self.file_format = "parquet" + adapter_props = config.adapter_properties or {} + self.datalake_name = adapter_props.get("datalake_name") + self.otf_database = adapter_props.get("otf_database") + if not self.datalake_name: + raise InvalidCatalogIntegrationConfigError( + config.name, + "adapter_properties.datalake_name is required -- " + "it must match the pre-created DATALAKE object in Teradata", + ) + if not self.otf_database: + raise InvalidCatalogIntegrationConfigError( + config.name, + "adapter_properties.otf_database is required -- " + "it must match the pre-created OTF database within the DATALAKE", + ) + + def build_relation(self, config: RelationConfig) -> TeradataCatalogRelation: + """Build a TeradataCatalogRelation from integration + model config. + + Integration-level fields (datalake_name, otf_database, etc.) come from + catalogs.yml. Model-level fields (partitioned_by, sorted_by, + tblproperties, purge_mode) come from the model's {{ config(...) }}. + """ + model_config = config.config if hasattr(config, 'config') and config.config else {} + return TeradataCatalogRelation( + catalog_type=self.catalog_type, + catalog_name=self.catalog_name, + table_format=self.table_format, + file_format=self.file_format, + external_volume=self.external_volume, + datalake_name=self.datalake_name, + otf_database=self.otf_database, + partitioned_by=model_config.get("partitioned_by"), + sorted_by=model_config.get("sorted_by"), + tblproperties=model_config.get("tblproperties"), + purge_mode=model_config.get("purge_mode"), + ) diff --git a/dbt/adapters/teradata/connections.py b/dbt/adapters/teradata/connections.py index 6de4fb1c..d42c4764 100644 --- a/dbt/adapters/teradata/connections.py +++ b/dbt/adapters/teradata/connections.py @@ -431,20 +431,22 @@ def add_query( try: return SQLConnectionManager.add_query(self, sql, auto_begin, bindings, abridge_sql_log) except Exception as ex: - ignored = False - query = sql.strip() - if ("DROP view /*+ IF EXISTS */" in query) or ("DROP table /*+ IF EXISTS */" in query): - for error_number in [3807, 3854, 3853]: - if f"[Error {error_number}]" in str (ex): - ignored = True + query_upper = sql.strip().upper() + if ("DROP VIEW /*+ IF EXISTS */" in query_upper) or ("DROP TABLE /*+ IF EXISTS */" in query_upper): + # 3807 = object does not exist (standard Teradata) + # 3854 = table does not exist (standard Teradata) + # 3853 = view does not exist (standard Teradata) + # 7825 = OTF table not found in external catalog (e.g. Glue/Unity) + # raised by ICEBERG_EXPORT UDF when dropping a nonexistent + # DATALAKE table; safe to ignore under IF EXISTS semantics + for error_number in [3807, 3854, 3853, 7825]: + if f"[Error {error_number}]" in str(ex): return None, None - if ("DELETE DATABASE /*+ IF EXISTS */" in query) or ("DROP DATABASE /*+ IF EXISTS */" in query): + if ("DELETE DATABASE /*+ IF EXISTS */" in query_upper) or ("DROP DATABASE /*+ IF EXISTS */" in query_upper): for error_number in [3802]: - if f"[Error {error_number}]" in str (ex): - ignored = True + if f"[Error {error_number}]" in str(ex): return None, None - if not ignored: - raise # rethrow + raise # rethrow # this method will return the datatype as string @classmethod diff --git a/dbt/adapters/teradata/impl.py b/dbt/adapters/teradata/impl.py index 3b52b5b1..e468c8b7 100644 --- a/dbt/adapters/teradata/impl.py +++ b/dbt/adapters/teradata/impl.py @@ -13,6 +13,7 @@ from dbt.adapters.teradata import TeradataConnectionManager from dbt.adapters.teradata import TeradataRelation from dbt.adapters.teradata import TeradataColumn +from dbt.adapters.teradata.catalogs import TeradataDatalakeCatalogIntegration from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability from dbt.adapters.base.meta import available from dbt.adapters.base import BaseRelation @@ -55,6 +56,10 @@ class TeradataAdapter(SQLAdapter): Column = TeradataColumn ConnectionManager = TeradataConnectionManager + CATALOG_INTEGRATIONS = [ + TeradataDatalakeCatalogIntegration, + ] + CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.ENFORCED, ConstraintType.not_null: ConstraintSupport.ENFORCED, diff --git a/dbt/adapters/teradata/relation.py b/dbt/adapters/teradata/relation.py index 0fc6146b..21d377b2 100644 --- a/dbt/adapters/teradata/relation.py +++ b/dbt/adapters/teradata/relation.py @@ -1,8 +1,13 @@ from dataclasses import dataclass, field +from typing import Any, Type, TypeVar from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.adapters.contracts.relation import HasQuoting, RelationConfig from dbt_common.exceptions import DbtRuntimeError +Self = TypeVar("Self", bound="TeradataRelation") + + @dataclass class TeradataQuotePolicy(Policy): database: bool = False @@ -22,8 +27,96 @@ class TeradataRelation(BaseRelation): quote_policy: Policy = field(default_factory=lambda: TeradataQuotePolicy()) include_policy: Policy = field(default_factory=lambda: TeradataIncludePolicy()) quote_character: str = '"' + is_otf: bool = False + + @classmethod + def create_from( + cls: Type[Self], + quoting: HasQuoting, + relation_config: RelationConfig, + **kwargs: Any, + ) -> Self: + # Check if the referenced model has catalog_name (i.e. it's an OTF table) + catalog_name = None + if hasattr(relation_config, "config") and relation_config.config: + catalog_name = ( + relation_config.config.get("catalog_name") + if hasattr(relation_config.config, "get") + else getattr(relation_config.config, "catalog_name", None) + ) + + if catalog_name: + # Lazy import to avoid a circular dependency: dbt.adapters.factory + # imports adapter classes during registration, which would re-enter + # this module if imported at module load time. + from dbt.adapters.factory import get_adapter + adapter = get_adapter(quoting) + catalog_integration = adapter.get_catalog_integration(catalog_name) + + if catalog_integration.catalog_type == "datalake": + # Build a clean kwargs dict — quote_policy/include_policy are + # set explicitly below, so any caller-supplied versions must + # be discarded to avoid "multiple values for keyword argument". + forwarded_kwargs = { + k: v for k, v in kwargs.items() + if k not in ("quote_policy", "include_policy") + } + + # Build an OTF relation with 3-part DATALAKE naming: + # .""."
" + return cls.create( + database=catalog_integration.datalake_name, + schema=catalog_integration.otf_database, + identifier=relation_config.identifier, + quote_policy={ + "database": False, # DATALAKE name is unquoted + "schema": True, # otf_database is quoted + "identifier": True, # table name is quoted + }, + include_policy={ + "database": True, # include all 3 parts + "schema": True, + "identifier": True, + }, + is_otf=True, + **forwarded_kwargs, + ) + + # Standard Teradata relation (non-OTF) + relation = super().create_from(quoting, relation_config, **kwargs) + + # In Teradata, normal objects use 2-part naming (database.object, where + # dbt's `database` and `schema` collapse to the same Teradata database). + # Only OTF objects use 3-part naming (catalog.schema.object). Therefore, + # if a relation arrives with both `database` and `schema` set to + # *different* values, it can only be an OTF reference (typically + # declared in sources.yml without a `catalog_name` model config). + if relation.database and relation.schema and relation.database != relation.schema: + return relation.replace( + include_policy=Policy(database=True, schema=True, identifier=True), + is_otf=True, + ) + + return relation def render(self): + if self.is_otf: + # OTF relations use 3-part naming: .""."
". + # The DATALAKE name (`database`) is unquoted; the OTF database and + # table name are quoted. Constructed explicitly rather than via + # BaseRelation._render_iterator() to avoid depending on a private API. + if self.database is None or self.schema is None: + raise DbtRuntimeError( + f"OTF relation is missing required part(s): " + f"database={self.database!r}, schema={self.schema!r}, " + f"identifier={self.identifier!r}" + ) + if self.identifier is None: + # Schema-only OTF relation (e.g. cache warming via + # .without_identifier()). Return a 2-part form that is + # safe to hash but never used in actual SQL. + return f'{self.database}."{self.schema}"' + return f'{self.database}."{self.schema}"."{self.identifier}"' if self.include_policy.database and self.include_policy.schema: raise DbtRuntimeError( f"Got a teradata relation with schema and database set to " diff --git a/dbt/include/teradata/macros/adapters.sql b/dbt/include/teradata/macros/adapters.sql index 8e8b3593..33b31e97 100644 --- a/dbt/include/teradata/macros/adapters.sql +++ b/dbt/include/teradata/macros/adapters.sql @@ -9,9 +9,24 @@ {% endmacro %} {% macro teradata__drop_relation(relation) -%} - {% call statement('drop_relation', auto_begin=False) -%} - DROP {{ relation.type }} /*+ IF EXISTS */ {{ relation }}; - {%- endcall %} + {%- set catalog_name = config.get('catalog_name', none) -%} + {% if catalog_name is not none %} + {#- OTF table: use 3-part naming. The purge clause is mandatory for OTF + DROP TABLE; the shared validator normalises and rejects bad values. -#} + {% set catalog_integration = adapter.get_catalog_integration(catalog_name) %} + {% if catalog_integration.catalog_type == 'datalake' %} + {{ teradata__drop_otf_table(catalog_integration, relation.identifier, config.get('purge_mode')) }} + {% else %} + {{ exceptions.raise_compiler_error( + "Unsupported catalog_type '" ~ catalog_integration.catalog_type ~ "' for drop_relation." + ) }} + {% endif %} + {% else %} + {#- Standard Teradata table/view drop -#} + {% call statement('drop_relation', auto_begin=False) -%} + DROP {{ relation.type }} /*+ IF EXISTS */ {{ relation }}; + {%- endcall %} + {% endif %} {% endmacro %} {% macro teradata__truncate_relation(relation) -%} @@ -21,6 +36,32 @@ {% endmacro %} {% macro teradata__create_table_as(temporary, relation, sql) -%} + {%- set catalog_name = config.get('catalog_name', none) -%} + + {% if catalog_name is not none %} + {#- Guard against config combinations that are not supported on the OTF path. + Teradata-native options (table_kind, table_option, with_statistics, index) + do not apply to Iceberg/Delta tables and would be silently ignored if + permitted. Contract enforcement is not yet implemented for OTF. -#} + {%- set unsupported = [] -%} + {%- if config.get('table_kind') -%}{%- do unsupported.append('table_kind') -%}{%- endif -%} + {%- if config.get('table_option') -%}{%- do unsupported.append('table_option') -%}{%- endif -%} + {%- if config.get('with_statistics') -%}{%- do unsupported.append('with_statistics') -%}{%- endif -%} + {%- if config.get('index') -%}{%- do unsupported.append('index') -%}{%- endif -%} + {%- if unsupported | length > 0 -%} + {{ exceptions.raise_compiler_error( + "The following config option(s) are not supported with catalog_name (OTF): " + ~ unsupported | join(', ') + ) }} + {%- endif -%} + {%- set contract_config = config.get('contract') -%} + {%- if contract_config is not none and contract_config.enforced -%} + {{ exceptions.raise_compiler_error( + "Model contracts (contract.enforced=true) are not yet supported with catalog_name (OTF)." + ) }} + {%- endif -%} + {{ teradata__create_otf_table_as(relation, sql, catalog_name) }} + {% else %} {%- set sql_header = config.get('sql_header', none) -%} {%- set table_kind = config.get('table_kind', default='') -%} {%- set table_option = config.get('table_option', default='') -%} @@ -81,6 +122,7 @@ {{ sql }} ; {% endif %} + {% endif %} {% endmacro %} {% macro teradata__create_view_as(relation, sql) -%} @@ -302,6 +344,10 @@ {%- endmacro %} {% macro teradata__create_schema(relation) -%} + {%- if relation.is_otf -%} + {#- OTF schemas live inside a DATALAKE and cannot be created via standard + Teradata DDL. They must be pre-created outside of dbt. Skip silently. -#} + {%- else -%} {%- call statement('create_schema') -%} CREATE DATABASE {{ relation.without_identifier().include(database=False) }} -- Teradata expects db sizing params on creation. This macro is probably @@ -310,10 +356,14 @@ AS PERMANENT = 60e6, -- 60MB SPOOL = 120e6; -- 120MB {%- endcall -%} + {%- endif -%} {% endmacro %} {% macro teradata__drop_schema(relation) -%} - {% if relation.schema -%} + {%- if relation.is_otf -%} + {#- OTF schemas live inside a DATALAKE and cannot be dropped via standard + Teradata DDL. Skip silently. -#} + {%- elif relation.schema -%} {{ adapter.verify_database(relation.schema) }} {%- call statement('drop_schema_delete_database') -%} DELETE DATABASE /*+ IF EXISTS */ {{ relation.without_identifier().include(database=False) }} ALL; diff --git a/dbt/include/teradata/macros/materializations/otf/create_otf_table_as.sql b/dbt/include/teradata/macros/materializations/otf/create_otf_table_as.sql new file mode 100644 index 00000000..3e8af3bf --- /dev/null +++ b/dbt/include/teradata/macros/materializations/otf/create_otf_table_as.sql @@ -0,0 +1,103 @@ +{# + OTF (Open Table Format) table creation macros for Teradata DATALAKE objects. + + These macros generate CREATE TABLE AS statements targeting Iceberg/Delta Lake + tables via Teradata's native OTF support using 3-part naming: + + .""."
" + + Supported model config options: + - catalog_name : name of the catalog integration (from catalogs.yml) + - partitioned_by : partition expression, e.g. 'YEAR(dt), country' + - sorted_by : sort order, e.g. 'id ASC' + - tblproperties : Iceberg table properties, e.g. "'gc.enabled'='true'" + - purge_mode : DROP behavior -- 'NO PURGE' (default, safe) or 'PURGE ALL' +#} + + +{% macro teradata__validate_purge_mode(value) %} + {#- Normalise to upper-case and validate. Returns the validated value so + callers can use the normalised form directly. -#} + {%- set normalized = (value or 'NO PURGE') | upper -%} + {%- if normalized not in ('PURGE ALL', 'NO PURGE') -%} + {{ exceptions.raise_compiler_error( + "Invalid purge_mode '" ~ value ~ "'. Must be 'PURGE ALL' or 'NO PURGE'." + ) }} + {%- endif -%} + {{ return(normalized) }} +{% endmacro %} + + +{% macro teradata__build_otf_relation_name(catalog_integration, identifier) %} + {#- Build the 3-part OTF relation string: .""."" -#} + {{ return(catalog_integration.datalake_name ~ '."' ~ catalog_integration.otf_database ~ '"."' ~ identifier ~ '"') }} +{% endmacro %} + + +{% macro teradata__drop_otf_table(catalog_integration, identifier, purge_mode) %} + {#- Drop an OTF table. The purge clause is mandatory for OTF DROP TABLE; + the validator normalises and rejects unsupported values. -#} + {%- set validated_purge_mode = teradata__validate_purge_mode(purge_mode) -%} + {%- set otf_relation = teradata__build_otf_relation_name(catalog_integration, identifier) -%} + {% call statement('drop_otf_table', auto_begin=False) -%} + DROP TABLE /*+ IF EXISTS */ {{ otf_relation }} {{ validated_purge_mode }}; + {%- endcall %} +{% endmacro %} + + +{% macro teradata__create_otf_table_as(relation, sql, catalog_name) %} + {#- Router: dispatch to the catalog-type-specific create macro. -#} + {% set catalog_integration = adapter.get_catalog_integration(catalog_name) %} + + {% if catalog_integration.catalog_type == 'datalake' %} + {{ teradata__create_datalake_table_as(relation, sql, catalog_integration) }} + {% else %} + {{ exceptions.raise_compiler_error( + "Unsupported catalog_type '" ~ catalog_integration.catalog_type ~ "'." + ) }} + {% endif %} + +{% endmacro %} + + +{# Native OTF via DATALAKE: 3-part naming + ----------------------------------------------------------------- + datalake_name -> from catalog_integration (catalogs.yml) + otf_database -> from catalog_integration (catalogs.yml) + table name -> relation.identifier (the dbt model name) + relation.schema is NOT used -- it is the Teradata database, not the OTF database + + Generated SQL pattern: + DROP TABLE /*+ IF EXISTS */ dl."db"."tbl" {NO PURGE|PURGE ALL}; + CREATE TABLE dl."db"."tbl" + [PARTITIONED BY (...)] + [SORTED BY ...] + [TBLPROPERTIES(...)] + AS (...) WITH DATA; +#} + +{% macro teradata__create_datalake_table_as(relation, sql, catalog_integration) %} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set otf_relation = teradata__build_otf_relation_name(catalog_integration, relation.identifier) -%} + + {# Model-level DDL config options #} + {%- set partitioned_by = config.get('partitioned_by', none) -%} + {%- set sorted_by = config.get('sorted_by', none) -%} + {%- set tblproperties = config.get('tblproperties', none) -%} + {%- set purge_mode = config.get('purge_mode') -%} + + {# Drop existing table before re-creation (idempotent via IF EXISTS hint). + Error 7825 (table not found in external catalog) is suppressed by the + adapter's add_query() error handler in connections.py. Trade-off: this + is non-atomic -- if the subsequent CREATE fails, the table is gone. #} + {{ teradata__drop_otf_table(catalog_integration, relation.identifier, purge_mode) }} + + {{ sql_header if sql_header is not none }} + {% call statement('main') %} + CREATE TABLE {{ otf_relation }} + {% if partitioned_by %}PARTITIONED BY ({{ partitioned_by }}){% endif %} + {% if sorted_by %}SORTED BY {{ sorted_by }}{% endif %} + {% if tblproperties %}TBLPROPERTIES({{ tblproperties }}){% endif %} + AS ({{ sql }}) WITH DATA; + {% endcall %} +{% endmacro %} diff --git a/dbt/include/teradata/macros/materializations/table/table.sql b/dbt/include/teradata/macros/materializations/table/table.sql index 228640c2..0ad9c106 100644 --- a/dbt/include/teradata/macros/materializations/table/table.sql +++ b/dbt/include/teradata/macros/materializations/table/table.sql @@ -2,7 +2,61 @@ -- calling the macro set_query_band() which will set the query_band for this materialization as per the user_configuration {% do set_query_band() %} - {% set relations = materialization_table_default() %} -- calling the default table materialization from dbt-core - {{ return(relations) }} -{%- endmaterialization -%} \ No newline at end of file + {%- set catalog_name = config.get('catalog_name', none) -%} + {% if catalog_name is not none %} + {#-- OTF models: create directly at target, skip intermediate/rename pattern. + DATALAKE tables use 3-part naming that can't be renamed via standard DDL, + so we cannot use the build-tmp-then-rename pattern from the default + table materialization. Trade-off: a failed CREATE leaves the target + dropped (non-atomic re-materialization). --#} + {#- Guard against config combinations that are not supported on the OTF path. + Teradata-native options do not apply to Iceberg/Delta tables. -#} + {%- set unsupported = [] -%} + {%- if config.get('table_kind') -%}{%- do unsupported.append('table_kind') -%}{%- endif -%} + {%- if config.get('table_option') -%}{%- do unsupported.append('table_option') -%}{%- endif -%} + {%- if config.get('with_statistics') -%}{%- do unsupported.append('with_statistics') -%}{%- endif -%} + {%- if config.get('index') -%}{%- do unsupported.append('index') -%}{%- endif -%} + {%- if unsupported | length > 0 -%} + {{ exceptions.raise_compiler_error( + "The following config option(s) are not supported with catalog_name (OTF): " + ~ unsupported | join(', ') + ) }} + {%- endif -%} + + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') -%} + + {{ run_hooks(pre_hooks) }} + + {#- Drop existing relation up front. teradata__drop_relation reads the + current model's catalog_name config, so the drop targets the OTF + table (not a hypothetical native table at the same name). -#} + {% if existing_relation is not none %} + {% do adapter.drop_relation(existing_relation) %} + {% endif %} + + {{ teradata__create_otf_table_as(target_relation, sql, catalog_name) }} + + {{ run_hooks(post_hooks) }} + + {% do persist_docs(target_relation, model) %} + + {#- Teradata does not support GRANT on OTF tables (3-part names are + invalid in GRANT syntax, and OTF objects are not in DBC.AllRights). + Access control for OTF tables is managed via AUTHORIZATION objects + and external IAM/OAuth policies. -#} + {%- if config.get('grants') -%} + {{ exceptions.warn("grants config is ignored for OTF models — Teradata does not support GRANT on DATALAKE tables.") }} + {%- endif -%} + + {% do adapter.commit() %} + {% do adapter.cache_added(target_relation) %} + + {{ return({'relations': [target_relation]}) }} + {% else %} + {% set relations = materialization_table_default() %} -- calling the default table materialization from dbt-core + {{ return(relations) }} + {% endif %} + +{%- endmaterialization -%} diff --git a/pytest.ini b/pytest.ini index d01ba123..810be28e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -8,4 +8,5 @@ filterwarnings = # rather than passing them in every CLI command, or setting in `PYTEST_ADDOPTS` # be sure to add "test.env" to .gitignore as well! testpaths = + tests/unit # fast unit tests, no database required tests/functional # name per convention \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index 50e8d8cc..a8669138 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -10,6 +10,7 @@ pylava~=0.3.0 teradatasql>=20.00.00.10 dbt-adapters>=1.17.2 dbt-common>=1.13,<2.0 +dbt-core MarkupSafe==2.0.1 pytest-dotenv pytest-cov diff --git a/tests/conftest.py b/tests/conftest.py index efc446be..35a1d383 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,24 @@ #load_dotenv("../test.env") +# Environment variables read by the test suite: +# +# Connection (with defaults for Vantage Express running on localhost): +# DBT_TERADATA_SERVER_NAME -- hostname (default: 'localhost') +# DBT_TERADATA_USERNAME -- username (default: 'dbc') +# DBT_TERADATA_PASSWORD -- password (default: 'dbc') +# DBT_TERADATA_TMODE -- transaction mode (default: 'ANSI') +# +# Grants tests (created automatically if set): +# DBT_TEST_USER_1 +# DBT_TEST_USER_2 +# DBT_TEST_USER_3 +# +# OTF / DATALAKE tests (tests/functional/adapter/test_otf_integration.py). +# When BOTH are set, OTF integration tests run; otherwise they are skipped: +# DBT_TERADATA_DATALAKE -- pre-created DATALAKE object name +# DBT_TERADATA_OTF_DATABASE -- pre-created OTF database within the DATALAKE + # Import the standard functional fixtures as a plugin # Note: fixtures with session scope need to be locals pytest_plugins = ["dbt.tests.fixtures.project"] diff --git a/tests/functional/adapter/test_otf_integration.py b/tests/functional/adapter/test_otf_integration.py new file mode 100644 index 00000000..efb68c45 --- /dev/null +++ b/tests/functional/adapter/test_otf_integration.py @@ -0,0 +1,287 @@ +"""End-to-end functional tests for OTF (Iceberg/Delta Lake) materialization. + +These tests require a Teradata instance with a pre-created DATALAKE object +and OTF database. They are gated on env vars and skipped otherwise so the +file is safe to include in the standard pytest run. + +Required env vars: + DBT_TERADATA_DATALAKE -- name of the pre-created DATALAKE object + DBT_TERADATA_OTF_DATABASE -- name of the pre-created OTF database within + that DATALAKE + +The standard DBT_TERADATA_* connection env vars (or their defaults from +tests/conftest.py) are used for the Teradata connection. + +Scenarios covered (see Phase 4.4 of PR_237_REMEDIATION_PLAN.md): + 1. Basic OTF table create + 2. Idempotency (DROP+CREATE cycle survives re-runs) + 3. Cross-model ref() to an OTF model produces 3-part compiled SQL + 4. Cross-model source() with database/schema set produces 3-part SQL + (verifies the database != schema auto-OTF heuristic) + 5. grants config is applied (verifies Phase 1.3 feature parity) + 6. purge_mode: 'NO PURGE' end-to-end +""" + +import os + +import pytest + +from dbt.tests.adapter.catalog_integrations.test_catalog_integration import ( + BaseCatalogIntegrationValidation, +) +from dbt.tests.util import run_dbt + + +DATALAKE_NAME = os.getenv("DBT_TERADATA_DATALAKE") +OTF_DATABASE = os.getenv("DBT_TERADATA_OTF_DATABASE") + +pytestmark = pytest.mark.skipif( + not (DATALAKE_NAME and OTF_DATABASE), + reason="requires DBT_TERADATA_DATALAKE and DBT_TERADATA_OTF_DATABASE env vars", +) + + +CATALOG_NAME = "test_catalog" + +CATALOGS_CONFIG = { + "catalogs": [ + { + "name": CATALOG_NAME, + "active_write_integration": "td_datalake", + "write_integrations": [ + { + "name": "td_datalake", + "catalog_type": "datalake", + "adapter_properties": { + "datalake_name": DATALAKE_NAME, + "otf_database": OTF_DATABASE, + }, + } + ], + } + ] +} + + +# --------------------------------------------------------------------------- +# Model SQL fixtures +# --------------------------------------------------------------------------- + +basic_otf_model_sql = f""" +{{{{ config( + materialized='table', + catalog_name='{CATALOG_NAME}' +) }}}} +select id, name from {{{{ target.schema }}}}.otf_src +""" + +otf_with_purge_mode_sql = f""" +{{{{ config( + materialized='table', + catalog_name='{CATALOG_NAME}', + purge_mode='NO PURGE' +) }}}} +select id from {{{{ target.schema }}}}.otf_src +""" + +downstream_of_otf_sql = """ +{{ config(materialized='table') }} +select * from {{ ref('basic_otf') }} +""" + +source_referencing_otf_sql = """ +{{ config(materialized='table') }} +select * from {{ source('otf_source', 'external_otf_table') }} +""" + +sources_yml = f""" +version: 2 +sources: + - name: otf_source + database: {DATALAKE_NAME or 'placeholder'} + schema: {OTF_DATABASE or 'placeholder'} + tables: + - name: external_otf_table +""" + + +# =================================================================== +# Scenarios 1 & 2: basic create + idempotency +# =================================================================== + +class TestOTFBasicAndIdempotent(BaseCatalogIntegrationValidation): + @pytest.fixture(scope="class") + def catalogs(self): + return CATALOGS_CONFIG + + @pytest.fixture(scope="class") + def models(self): + return {"basic_otf.sql": basic_otf_model_sql} + + def test_basic_create_and_rerun(self, project): + project.run_sql( + "CREATE TABLE {schema}.otf_src (id INTEGER, name VARCHAR(100))" + ) + project.run_sql("INSERT INTO {schema}.otf_src VALUES (1, 'a')") + project.run_sql("INSERT INTO {schema}.otf_src VALUES (2, 'b')") + try: + # First run: creates the OTF table. + results = run_dbt(["run", "--select", "basic_otf"]) + assert len(results) == 1 + assert results[0].status == "success" + + # Second run: must succeed idempotently (DROP+CREATE cycle). + results = run_dbt(["run", "--select", "basic_otf"]) + assert len(results) == 1 + assert results[0].status == "success" + finally: + project.run_sql("DROP TABLE {schema}.otf_src") + + +# =================================================================== +# Scenario 3: cross-model ref() produces 3-part name in compiled SQL +# =================================================================== + +class TestOTFRefRendersThreePartName(BaseCatalogIntegrationValidation): + @pytest.fixture(scope="class") + def catalogs(self): + return CATALOGS_CONFIG + + @pytest.fixture(scope="class") + def models(self): + return { + "basic_otf.sql": basic_otf_model_sql, + "downstream_of_otf.sql": downstream_of_otf_sql, + } + + def test_compiled_sql_contains_three_part_name(self, project): + run_dbt(["compile", "--select", "downstream_of_otf"]) + compiled_path = os.path.join( + str(project.project_root), + "target", "compiled", "test", "models", "downstream_of_otf.sql", + ) + with open(compiled_path, "r", encoding="utf-8") as f: + compiled = f.read() + # 3-part: .""."
" + expected = f'{DATALAKE_NAME}."{OTF_DATABASE}"."basic_otf"' + assert expected in compiled, ( + f"Expected 3-part OTF name {expected!r} in compiled SQL, got:\n{compiled}" + ) + + +# =================================================================== +# Scenario 4: source() with database != schema triggers OTF heuristic +# =================================================================== + +class TestOTFSourceHeuristic(BaseCatalogIntegrationValidation): + @pytest.fixture(scope="class") + def catalogs(self): + return CATALOGS_CONFIG + + @pytest.fixture(scope="class") + def models(self): + return {"source_referencing_otf.sql": source_referencing_otf_sql} + + @pytest.fixture(scope="class") + def properties(self): + return {"sources.yml": sources_yml} + + def test_source_compiles_to_three_part_name(self, project): + # Compile only -- we don't require the source to actually exist. + run_dbt(["compile", "--select", "source_referencing_otf"]) + compiled_path = os.path.join( + str(project.project_root), + "target", "compiled", "test", "models", "source_referencing_otf.sql", + ) + with open(compiled_path, "r", encoding="utf-8") as f: + compiled = f.read() + expected = f'{DATALAKE_NAME}."{OTF_DATABASE}"."external_otf_table"' + assert expected in compiled, ( + f"Expected 3-part OTF source name {expected!r} in compiled SQL, " + f"got:\n{compiled}" + ) + + +# =================================================================== +# Scenario 6: purge_mode: 'NO PURGE' end-to-end +# =================================================================== + +class TestOTFNoPurge(BaseCatalogIntegrationValidation): + @pytest.fixture(scope="class") + def catalogs(self): + return CATALOGS_CONFIG + + @pytest.fixture(scope="class") + def models(self): + return {"otf_no_purge.sql": otf_with_purge_mode_sql} + + def test_no_purge_run_succeeds(self, project): + project.run_sql( + "CREATE TABLE {schema}.otf_src (id INTEGER, name VARCHAR(100))" + ) + project.run_sql("INSERT INTO {schema}.otf_src VALUES (1, 'a')") + try: + results = run_dbt(["run", "--select", "otf_no_purge"]) + assert len(results) == 1 + assert results[0].status == "success" + # Re-run to exercise the DROP path with NO PURGE. + results = run_dbt(["run", "--select", "otf_no_purge"]) + assert results[0].status == "success" + finally: + project.run_sql("DROP TABLE {schema}.otf_src") + + +# =================================================================== +# Phase 2 guardrails: unsupported config combinations +# (these do not require a real DATALAKE, but live here to keep all OTF +# scenarios in one file; they are NOT gated on env vars) +# =================================================================== + +unsupported_table_kind_sql = f""" +{{{{ config( + materialized='table', + catalog_name='{CATALOG_NAME}', + table_kind='SET' +) }}}} +select 1 as id +""" + +invalid_purge_mode_sql = f""" +{{{{ config( + materialized='table', + catalog_name='{CATALOG_NAME}', + purge_mode='SOFT' +) }}}} +select 1 as id +""" + + +class TestOTFCompileTimeErrors(BaseCatalogIntegrationValidation): + """These exercise guardrails for unsupported config combinations on the OTF + path. The checks fire inside the materialization macro, so they require + ``dbt run`` (not ``compile``). A real DATALAKE does not need to exist + because the error is raised before any SQL is sent to the database. + """ + + @pytest.fixture(scope="class") + def catalogs(self): + return CATALOGS_CONFIG + + @pytest.fixture(scope="class") + def models(self): + return { + "bad_table_kind.sql": unsupported_table_kind_sql, + "bad_purge_mode.sql": invalid_purge_mode_sql, + } + + def test_table_kind_with_catalog_name_fails(self, project): + results = run_dbt( + ["run", "--select", "bad_table_kind"], expect_pass=False + ) + assert any("table_kind" in str(r.message or "") for r in results) + + def test_invalid_purge_mode_fails(self, project): + results = run_dbt( + ["run", "--select", "bad_purge_mode"], expect_pass=False + ) + assert any("purge_mode" in str(r.message or "").lower() for r in results) diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/test_otf_catalogs.py b/tests/unit/test_otf_catalogs.py new file mode 100644 index 00000000..14158af8 --- /dev/null +++ b/tests/unit/test_otf_catalogs.py @@ -0,0 +1,380 @@ +"""Unit tests for Teradata OTF (Open Table Format) catalog integration. + +Covers: + - TeradataCatalogRelation dataclass fields and defaults + - TeradataDatalakeCatalogIntegration initialization and validation + - build_relation() config passthrough + - Adapter CATALOG_INTEGRATIONS registration + - TeradataRelation.render() for both 2-part (native) and 3-part (OTF) names + - is_otf invariant guard in render() + +These tests are pure unit tests (no database required) and live in tests/unit/ +rather than tests/functional/ so they can run in a fast CI matrix without +provisioning Vantage Express. +""" + +import pytest +from dataclasses import asdict +from unittest.mock import MagicMock + +from dbt.adapters.catalogs import ( + CatalogIntegrationConfig, + InvalidCatalogIntegrationConfigError, +) +from dbt_common.exceptions import DbtRuntimeError + +from dbt.adapters.teradata.catalogs import ( + TeradataDatalakeCatalogIntegration, + TeradataCatalogRelation, +) +from dbt.adapters.teradata.relation import TeradataRelation + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_config(**overrides): + """Build a spec'd MagicMock of CatalogIntegrationConfig. + + Using spec= ensures the mock fails on attribute access for anything + not on the protocol — masking-by-MagicMock would defeat the test. + """ + config = MagicMock(spec=CatalogIntegrationConfig) + config.name = overrides.get("name", "test_catalog") + config.catalog_type = overrides.get("catalog_type", "datalake") + config.catalog_name = overrides.get("catalog_name", None) + config.table_format = overrides.get("table_format", None) + config.external_volume = overrides.get("external_volume", None) + config.file_format = overrides.get("file_format", None) + config.adapter_properties = overrides.get("adapter_properties", {}) + return config + + +def _make_valid_config(**overrides): + defaults = { + "adapter_properties": { + "datalake_name": "my_datalake", + "otf_database": "my_otf_db", + }, + } + defaults.update(overrides) + return _make_config(**defaults) + + +# =================================================================== +# TeradataCatalogRelation +# =================================================================== + +class TestTeradataCatalogRelation: + def test_defaults(self): + rel = TeradataCatalogRelation() + assert rel.catalog_type is None + assert rel.catalog_name is None + assert rel.table_format is None + assert rel.file_format is None + assert rel.external_volume is None + assert rel.datalake_name is None + assert rel.otf_database is None + assert rel.partitioned_by is None + assert rel.sorted_by is None + assert rel.tblproperties is None + assert rel.purge_mode is None + + def test_all_fields(self): + rel = TeradataCatalogRelation( + catalog_type="datalake", + catalog_name="cat", + table_format="iceberg", + file_format="parquet", + external_volume="vol", + datalake_name="dl", + otf_database="otf_db", + partitioned_by="YEAR(dt)", + sorted_by="id ASC", + tblproperties="'k'='v'", + purge_mode="NO PURGE", + ) + assert rel.catalog_type == "datalake" + assert rel.datalake_name == "dl" + assert rel.otf_database == "otf_db" + assert rel.partitioned_by == "YEAR(dt)" + assert rel.sorted_by == "id ASC" + assert rel.tblproperties == "'k'='v'" + assert rel.purge_mode == "NO PURGE" + + def test_is_dataclass(self): + rel = TeradataCatalogRelation(datalake_name="dl", otf_database="db") + d = asdict(rel) + assert d["datalake_name"] == "dl" + assert d["otf_database"] == "db" + assert "partitioned_by" in d + + def test_purge_mode_no_purge(self): + rel = TeradataCatalogRelation(purge_mode="NO PURGE") + assert rel.purge_mode == "NO PURGE" + + def test_purge_mode_purge_all(self): + rel = TeradataCatalogRelation(purge_mode="PURGE ALL") + assert rel.purge_mode == "PURGE ALL" + + +# =================================================================== +# TeradataDatalakeCatalogIntegration -- __init__ +# =================================================================== + +class TestTeradataDatalakeCatalogIntegration: + # -- happy path -- + + def test_init_with_valid_config(self): + integration = TeradataDatalakeCatalogIntegration(_make_valid_config()) + assert integration.datalake_name == "my_datalake" + assert integration.otf_database == "my_otf_db" + assert integration.catalog_type == "datalake" + assert integration.allows_writes is True + assert integration.table_format == "iceberg" + assert integration.file_format == "parquet" + + def test_class_level_defaults(self): + assert TeradataDatalakeCatalogIntegration.catalog_type == "datalake" + assert TeradataDatalakeCatalogIntegration.allows_writes is True + assert TeradataDatalakeCatalogIntegration.table_format == "iceberg" + assert TeradataDatalakeCatalogIntegration.file_format == "parquet" + + def test_file_format_defaults_to_parquet_when_none(self): + integration = TeradataDatalakeCatalogIntegration(_make_valid_config(file_format=None)) + assert integration.file_format == "parquet" + + def test_file_format_override(self): + integration = TeradataDatalakeCatalogIntegration(_make_valid_config(file_format="orc")) + assert integration.file_format == "orc" + + def test_table_format_override(self): + integration = TeradataDatalakeCatalogIntegration(_make_valid_config(table_format="delta")) + assert integration.table_format == "delta" + + def test_catalog_name_passed_through(self): + integration = TeradataDatalakeCatalogIntegration(_make_valid_config(catalog_name="glue_catalog")) + assert integration.catalog_name == "glue_catalog" + + # -- validation errors -- + + def test_init_missing_datalake_name_raises(self): + config = _make_config(adapter_properties={"otf_database": "my_otf_db"}) + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(config) + + def test_init_missing_otf_database_raises(self): + config = _make_config(adapter_properties={"datalake_name": "my_datalake"}) + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(config) + + def test_init_empty_adapter_properties_raises(self): + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(_make_config(adapter_properties={})) + + def test_init_none_adapter_properties_raises(self): + # Pinned to InvalidCatalogIntegrationConfigError only: the production + # code coerces None via `or {}`, so this is the only path that fires. + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(_make_config(adapter_properties=None)) + + def test_init_empty_string_datalake_name_raises(self): + config = _make_config(adapter_properties={"datalake_name": "", "otf_database": "db"}) + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(config) + + def test_init_empty_string_otf_database_raises(self): + config = _make_config(adapter_properties={"datalake_name": "dl", "otf_database": ""}) + with pytest.raises(InvalidCatalogIntegrationConfigError): + TeradataDatalakeCatalogIntegration(config) + + def test_error_message_mentions_datalake_name(self): + with pytest.raises(InvalidCatalogIntegrationConfigError, match="datalake_name"): + TeradataDatalakeCatalogIntegration(_make_config(adapter_properties={"otf_database": "db"})) + + def test_error_message_mentions_otf_database(self): + with pytest.raises(InvalidCatalogIntegrationConfigError, match="otf_database"): + TeradataDatalakeCatalogIntegration(_make_config(adapter_properties={"datalake_name": "dl"})) + + +# =================================================================== +# build_relation() +# =================================================================== + +class TestBuildRelation: + def _make_integration(self, **config_overrides): + return TeradataDatalakeCatalogIntegration(_make_valid_config(**config_overrides)) + + def test_basic_build(self): + integration = self._make_integration( + catalog_name="glue_catalog", + adapter_properties={ + "datalake_name": "my_glue_datalake", + "otf_database": "my_otf_db", + }, + ) + relation_config = MagicMock() + relation_config.config = {} + result = integration.build_relation(relation_config) + + assert isinstance(result, TeradataCatalogRelation) + assert result.datalake_name == "my_glue_datalake" + assert result.otf_database == "my_otf_db" + assert result.catalog_type == "datalake" + assert result.catalog_name == "glue_catalog" + assert result.table_format == "iceberg" + assert result.file_format == "parquet" + assert result.partitioned_by is None + assert result.sorted_by is None + assert result.tblproperties is None + assert result.purge_mode is None + + def test_with_all_ddl_options(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = { + "partitioned_by": "YEAR(dt)", + "sorted_by": "id ASC", + "tblproperties": "'write.format.default'='parquet'", + "purge_mode": "NO PURGE", + } + result = integration.build_relation(relation_config) + assert result.partitioned_by == "YEAR(dt)" + assert result.sorted_by == "id ASC" + assert result.tblproperties == "'write.format.default'='parquet'" + assert result.purge_mode == "NO PURGE" + + def test_with_partitioned_by_only(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = {"partitioned_by": "MONTH(created_at)"} + result = integration.build_relation(relation_config) + assert result.partitioned_by == "MONTH(created_at)" + assert result.sorted_by is None + assert result.tblproperties is None + + def test_with_sorted_by_only(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = {"sorted_by": "ts DESC"} + result = integration.build_relation(relation_config) + assert result.partitioned_by is None + assert result.sorted_by == "ts DESC" + + def test_with_tblproperties_only(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = {"tblproperties": "'gc.enabled'='true'"} + result = integration.build_relation(relation_config) + assert result.tblproperties == "'gc.enabled'='true'" + assert result.partitioned_by is None + + def test_with_purge_mode_only(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = {"purge_mode": "NO PURGE"} + result = integration.build_relation(relation_config) + assert result.purge_mode == "NO PURGE" + assert result.partitioned_by is None + + def test_no_config_attr(self): + integration = self._make_integration() + relation_config = MagicMock(spec=[]) + result = integration.build_relation(relation_config) + assert result.partitioned_by is None + assert result.purge_mode is None + + def test_none_config_attr(self): + integration = self._make_integration() + relation_config = MagicMock() + relation_config.config = None + result = integration.build_relation(relation_config) + assert result.partitioned_by is None + assert result.purge_mode is None + + +# =================================================================== +# Adapter registration +# =================================================================== + +class TestAdapterCatalogRegistration: + def test_only_datalake_registered(self): + from dbt.adapters.teradata.impl import TeradataAdapter + assert len(TeradataAdapter.CATALOG_INTEGRATIONS) == 1 + assert TeradataAdapter.CATALOG_INTEGRATIONS[0] is TeradataDatalakeCatalogIntegration + + +# =================================================================== +# TeradataRelation.render() -- 2-part native vs 3-part OTF +# =================================================================== + +class TestTeradataRelationRender: + """Exercise render() directly to assert the actual rendered string. + + Replaces the old TestThreeDotNaming, which only re-implemented the Jinja + formula in Python and so caught nothing. + """ + + # -- 2-part native -- + + def test_native_render_two_part(self): + rel = TeradataRelation.create(schema="mydb", identifier="mytbl") + assert rel.render() == '"mydb"."mytbl"' + + def test_native_render_database_none_schema_set(self): + rel = TeradataRelation.create(database=None, schema="db", identifier="t") + assert rel.render() == '"db"."t"' + + # -- 3-part OTF (explicit is_otf=True via create()) -- + + def test_otf_render_three_part(self): + rel = TeradataRelation.create( + database="dl", + schema="db", + identifier="t", + quote_policy={"database": False, "schema": True, "identifier": True}, + include_policy={"database": True, "schema": True, "identifier": True}, + is_otf=True, + ) + assert rel.render() == 'dl."db"."t"' + + def test_otf_render_with_underscores_and_hyphens(self): + rel = TeradataRelation.create( + database="dl_1", + schema="db-2", + identifier="tbl 3", + quote_policy={"database": False, "schema": True, "identifier": True}, + include_policy={"database": True, "schema": True, "identifier": True}, + is_otf=True, + ) + assert rel.render() == 'dl_1."db-2"."tbl 3"' + + # -- Invariant guard: is_otf=True with a None part must raise -- + + def test_otf_render_missing_database_raises(self): + rel = TeradataRelation.create( + database=None, schema="db", identifier="t", + include_policy={"database": True, "schema": True, "identifier": True}, + is_otf=True, + ) + with pytest.raises(DbtRuntimeError, match="OTF relation is missing"): + rel.render() + + def test_otf_render_missing_schema_raises(self): + rel = TeradataRelation.create( + database="dl", schema=None, identifier="t", + include_policy={"database": True, "schema": True, "identifier": True}, + is_otf=True, + ) + with pytest.raises(DbtRuntimeError, match="OTF relation is missing"): + rel.render() + + def test_otf_render_missing_identifier_returns_two_part(self): + rel = TeradataRelation.create( + database="dl", schema="db", identifier=None, + include_policy={"database": True, "schema": True, "identifier": True}, + is_otf=True, + ) + # Schema-only OTF relation (e.g. cache warming) returns 2-part form + assert rel.render() == 'dl."db"'