-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathoptional.sql
More file actions
83 lines (74 loc) · 3.29 KB
/
optional.sql
File metadata and controls
83 lines (74 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
{%- macro optional(relation, optional_column, data_type) -%}
{# When the relation is not defined (optional tables), set the columns and relation to empty #}
{%- if load_relation(relation) is none -%}
{%- set columns = [] -%}
{%- set relation = null -%}
{%- else -%}
{%- set columns = adapter.get_columns_in_relation(relation) -%}
{%- endif -%}
{# Check if relation is a source based on whether the relation's schema and identifier is defined as source.
Only check when relation exists to prevent dbt compile errors. #}
{%- set ns = namespace(is_source_relation = false) -%}
{% if execute and relation != null %}
{% for node in graph.sources.values() -%}
{% if node.schema == relation.schema and node.identifier == relation.identifier %}
{% set ns.is_source_relation = true %}
{% endif %}
{% endfor %}
{% endif %}
{# Create list of column names.#}
{%- set column_names = [] -%}
{%- for column in columns -%}
{%- set column_names = column_names.append('"' + column.name + '"') -%}
{%- endfor -%}
{# When the column is in the list, use the column, otherwise create the column with null values.#}
{%- if optional_column in column_names -%}
{% set column_value = optional_column -%}
{%- else -%}
{% set column_value = 'null' -%}
{%- endif -%}
{# Apply casting when relation is a source or when the field doesn't exist and is being created. #}
{% if ns.is_source_relation or column_value == 'null' %}
{%- if data_type == 'boolean' -%}
{{ pm_utils.to_boolean(column_value, relation) }}
{%- elif data_type == 'date' -%}
{{ pm_utils.to_date(column_value, relation) }}
{%- elif data_type == 'double' -%}
{{ pm_utils.to_double(column_value, relation) }}
{%- elif data_type == 'integer' -%}
{{ pm_utils.to_integer(column_value, relation) }}
{%- elif data_type == 'datetime' -%}
{{ pm_utils.to_timestamp(column_value, relation) }}
{%- elif data_type == 'text' -%}
{{ pm_utils.to_varchar(column_value) }}
{# Generate an id when the column doesn't exist or only contains null values. #}
{%- elif data_type == 'id' -%}
{% if relation is defined %}
{% set query %}
select
count(*) as "total_count",
coalesce(sum(case when {{ column_value }} is null then 1 else 0 end), 0) as "null_records"
from "{{ relation.database }}"."{{ relation.schema }}"."{{ relation.identifier }}"
{% endset %}
{% set result_query = run_query(query) %}
{% if execute %}
{% set record_count =
result_query.columns['total_count'].values()[0]
- result_query.columns['null_records'].values()[0] %}
{% else %}
{% set record_count = 0 %}
{% endif %}
{% if optional_column in column_names and record_count > 0 %}
{{ column_value }}
{% else %}
{{ pm_utils.id() }}
{% endif %}
{% endif %}
{%- else -%}
{{ pm_utils.to_varchar(column_value) }}
{%- endif -%}
{# Don't apply casting when relation is not a source. Casting should already have been done in a previous transformation step. #}
{%- else -%}
{{ column_value }}
{%- endif -%}
{%- endmacro -%}