diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 1888f576..3ce860d8 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -13,6 +13,7 @@ - [ ] New features (breaking change) - [ ] Other (non-breaking change) - [ ] Other (breaking change) +- [ ] Release preparation ## What does this solve? diff --git a/.github/workflows/ci_lint_package.yml b/.github/workflows/ci_lint_package.yml index bbd3264c..2563e0cf 100644 --- a/.github/workflows/ci_lint_package.yml +++ b/.github/workflows/ci_lint_package.yml @@ -43,8 +43,14 @@ jobs: with: ref: ${{ github.event.pull_request.head.sha }} # Check out the code of the PR + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.9.x" + architecture: "x64" + - name: Install Python packages - run: python -m pip install dbt-snowflake~=1.5.0 sqlfluff-templater-dbt + run: python -m pip install dbt-snowflake~=1.7.0 sqlfluff-templater-dbt~=2.3.2 - name: Test database connection run: dbt debug diff --git a/.github/workflows/ci_test_package.yml b/.github/workflows/ci_test_package.yml index 34c47163..034a316f 100644 --- a/.github/workflows/ci_test_package.yml +++ b/.github/workflows/ci_test_package.yml @@ -21,7 +21,7 @@ env: DBT_ENV_SECRET_DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} DBT_ENV_SECRET_GCP_PROJECT: ${{ secrets.GCP_PROJECT }} # Env var to test version - LAST_RELEASE_SUPPORTED_DBT_VERSION: 1_4_0 # A dbt version supported by both the last release and this one + LAST_RELEASE_SUPPORTED_DBT_VERSION: 1_7_0 # A dbt version supported by both the last release and this one # Env vars to test invocations model DBT_CLOUD_PROJECT_ID: 123 DBT_CLOUD_JOB_ID: ABC @@ -37,7 +37,7 @@ jobs: strategy: fail-fast: false # Don't fail one DWH if the others fail matrix: - warehouse: ["snowflake", "bigquery"] + warehouse: ["snowflake", "bigquery", "postgres"] runs-on: ubuntu-latest environment: name: Approve Integration Tests @@ -45,6 +45,19 @@ jobs: contents: "read" id-token: "write" + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: - name: Get latest release uses: rez0n/actions-github-release@main @@ -100,9 +113,9 @@ jobs: strategy: fail-fast: false # Don't fail one DWH if the others fail matrix: - warehouse: ["snowflake", "bigquery"] + warehouse: ["snowflake", "bigquery", "postgres"] # When supporting a new version, update the list here - version: ["1_3_0", "1_4_0", "1_5_0"] + version: ["1_3_0", "1_4_0", "1_5_0", "1_6_0", "1_7_0"] runs-on: ubuntu-latest environment: name: Approve Integration Tests @@ -110,6 +123,19 @@ jobs: contents: "read" id-token: "write" + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: - uses: actions/setup-python@v4 with: diff --git a/.github/workflows/main_lint_package.yml b/.github/workflows/main_lint_package.yml index e3d25dd0..771d185f 100644 --- a/.github/workflows/main_lint_package.yml +++ b/.github/workflows/main_lint_package.yml @@ -39,8 +39,14 @@ jobs: - name: Checkout branch uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.9.x" + architecture: "x64" + - name: Install Python packages - run: python -m pip install dbt-snowflake~=1.5.0 sqlfluff-templater-dbt + run: python -m pip install dbt-snowflake~=1.7.0 sqlfluff-templater-dbt~=2.3.2 - name: Test database connection run: dbt debug diff --git a/.github/workflows/main_test_package.yml b/.github/workflows/main_test_package.yml index 2202c05c..c8e503b5 100644 --- a/.github/workflows/main_test_package.yml +++ b/.github/workflows/main_test_package.yml @@ -34,13 +34,26 @@ jobs: integration: strategy: matrix: - warehouse: ["snowflake", "bigquery"] - version: ["1_3_0", "1_4_0", "1_5_0"] + warehouse: ["snowflake", "bigquery", "postgres"] + version: ["1_3_0", "1_4_0", "1_5_0", "1_6_0", "1_7_0"] runs-on: ubuntu-latest permissions: contents: "read" id-token: "write" + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/publish_docs_on_release.yml b/.github/workflows/publish_docs_on_release.yml index cb588373..b02ee057 100644 --- a/.github/workflows/publish_docs_on_release.yml +++ b/.github/workflows/publish_docs_on_release.yml @@ -39,7 +39,7 @@ jobs: uses: actions/checkout@v3 - name: Install Python packages - run: python -m pip install dbt-snowflake~=1.3.0 + run: python -m pip install dbt-snowflake~=1.7.0 - name: Test database connection run: dbt debug diff --git a/README.md b/README.md index 9a055e8c..6350d743 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ The package currently supports - Spark :white_check_mark: - Snowflake :white_check_mark: - Google BigQuery :white_check_mark: +- Postgres :white_check_mark: Models included: @@ -45,7 +46,7 @@ See the generated [dbt docs site](https://brooklyn-data.github.io/dbt_artifacts/ ``` packages: - package: brooklyn-data/dbt_artifacts - version: 2.4.2 + version: 2.6.2 ``` :construction_worker: Make sure to fix at least the **minor** version, to avoid issues when a new release is open. See the notes on upgrading below for more detail. @@ -55,15 +56,15 @@ packages: 3. Add an on-run-end hook to your `dbt_project.yml` ```yml - `on-run-end: - - "{{ dbt_artifacts.upload_results(results) }}"` + on-run-end: + - "{{ dbt_artifacts.upload_results(results) }}" ``` We recommend adding a conditional here so that the upload only occurs in your production environment, such as: ```yml on-run-end: - - "{% if target.name == 'prod' %}{{ dbt_artifacts.upload_results(results) }}{% endif %}"`) + - "{% if target.name == 'prod' %}{{ dbt_artifacts.upload_results(results) }}{% endif %}" ``` 4. Run the tables! @@ -141,6 +142,37 @@ vars: ] ``` +## Creating custom marts tables + +Multiple modelled `dim` and `fct` models have been provided for ease of use, but we recognise that some use cases may require custom ones. To this end, you can disable all but the raw sources tables using the following in your `dbt_project.yml` file: + +```yml +# dbt_project.yml + +models: + dbt_artifacts: + +enabled: false + sources: + +enabled: true +``` + +In these sources tables, you will find a JSON column `all_results` which contains a JSON blob of the results object used, which you can use in your own analysis: + +- exposures +- models +- seeds +- snapshots +- sources +- tests + +This column can cause queries to become too long - particularly in BigQuery. Therefore, if you want to disable this column, you can make use of the `dbt_artifacts_exclude_all_results` variable, and set this to `true` in your `dbt_project.yml` file. + +``` +# dbt_project.yml +vars: + dbt_artifacts_exclude_all_results: true +``` + ## Upgrading from 1.x to >=2.0.0 If you were using the following variables: @@ -189,29 +221,6 @@ An example operation is as follows: dbt run-operation migrate_from_v0_to_v1 --args '{old_database: analytics, old_schema: dbt_artifacts, new_database: analytics, new_schema: artifact_sources}' ``` -## Creating custom marts tables - -Multiple modelled `dim` and `fct` models have been provided for ease of use, but we recognise that some use cases may require custom ones. To this end, you can disable all but the raw sources tables using the following in your `dbt_project.yml` file: - -```yml -# dbt_project.yml - -models: - dbt_artifacts: - +enabled: false - sources: - +enabled: true -``` - -In these sources tables, you will find a JSON column `all_results` which contains a JSON blob of the results object used, which you can use in your own analysis: - -- exposures -- models -- seeds -- snapshots -- sources -- tests - ## Acknowledgements Thank you to [Tails.com](https://tails.com/gb/careers/) for initial development and maintenance of this package. On 2021/12/20, the repository was transferred from the Tails.com GitHub organization to Brooklyn Data Co. diff --git a/dbt_project.yml b/dbt_project.yml index 24da069d..60bd2117 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,7 +1,7 @@ name: "dbt_artifacts" -version: "2.4.2" +version: "2.6.2" config-version: 2 -require-dbt-version: [">=1.3.0", "<1.6.0"] +require-dbt-version: [">=1.3.0", "<1.8.0"] profile: "dbt_artifacts" clean-targets: # folders to be removed by `dbt clean` diff --git a/integration_test_project/dbt_project.yml b/integration_test_project/dbt_project.yml index a6550009..dba89a50 100644 --- a/integration_test_project/dbt_project.yml +++ b/integration_test_project/dbt_project.yml @@ -21,6 +21,7 @@ vars: env_vars: ["TEST_ENV_VAR_NUMBER", "TEST_ENV_VAR_EMPTY", "TEST_ENV_VAR_WITH_QUOTE"] dbt_vars: ["test_dbt_vars_1", "test_dbt_vars_2", "test_dbt_vars_3"] + dbt_artifacts_exclude_all_results: true models: +persist_docs: diff --git a/integration_test_project/example-env.sh b/integration_test_project/example-env.sh index 51450cf3..47cb0d67 100755 --- a/integration_test_project/example-env.sh +++ b/integration_test_project/example-env.sh @@ -18,6 +18,7 @@ export DBT_ENV_SPARK_DRIVER_PATH= # /Library/simba/spark/lib/libsparkodbc_sbu.dy export DBT_ENV_SPARK_ENDPOINT= # The endpoint ID from the Databricks HTTP path # dbt environment variables, change these +export DBT_VERSION="1_5_0" export DBT_CLOUD_PROJECT_ID= export DBT_CLOUD_JOB_ID= export DBT_CLOUD_RUN_ID= diff --git a/integration_test_project/profiles.yml b/integration_test_project/profiles.yml index e8548e3f..b24ad80d 100644 --- a/integration_test_project/profiles.yml +++ b/integration_test_project/profiles.yml @@ -2,8 +2,8 @@ # You should __NEVER__ check credentials into version control. Thanks for reading :) config: - send_anonymous_usage_stats: False - use_colors: True + send_anonymous_usage_stats: False + use_colors: True dbt_artifacts: target: snowflake @@ -43,3 +43,12 @@ dbt_artifacts: timeout_seconds: 300 priority: interactive retries: 1 + postgres: + type: postgres + host: localhost + user: postgres + password: postgres + port: 5432 + dbname: postgres + schema: public + threads: 8 diff --git a/integration_test_project/tests/singular_test.sql b/integration_test_project/tests/singular_test.sql index 67a2508d..a1e49e8c 100644 --- a/integration_test_project/tests/singular_test.sql +++ b/integration_test_project/tests/singular_test.sql @@ -1 +1 @@ -select 1 as failures from (select 2) where 1 = 2 +select 1 as failures from (select 2) as foo where 1 = 2 diff --git a/macros/_macros.yml b/macros/_macros.yml new file mode 100644 index 00000000..7b798447 --- /dev/null +++ b/macros/_macros.yml @@ -0,0 +1,246 @@ +version: 2 + +macros: + ## DATABASE SPECIFIC HELPERS ## + - name: column_identifier + description: | + Dependent on the adapter type, return the identifier for a column using a numerical index. + arguments: + - name: column_index + type: integer + description: | + The index of the column to return the identifier for + + - name: generate_surrogate_key + description: | + Since folks commonly install dbt_artifacts alongside a myriad of other packages, + we copy the dbt_utils implementation of the surrogate_key macro so we don't have + any dependencies to make conflicts worse! + + This version is: + URL: https://github.com/dbt-labs/dbt-utils/blob/main/macros/sql/generate_surrogate_key.sql + Commit SHA: eaa0e41b033bdf252eff0ae014ec11888f37ebff + Date: 2023-04-28 + arguments: + - name: field_list + type: list + description: | + A list of fields to concatenate together to form the surrogate key + + - name: get_relation + description: | + Identify a relation in the graph from a relation name + arguments: + - name: get_relation_name + type: string + description: | + The name of the relation to return from the graph + + - name: parse_json + description: | + Dependent on the adapter type, return a column which parses the JSON field. + arguments: + - name: field + type: string + description: | + The name of the field to parse + + - name: type_array + description: | + Dependent on the adapter type, returns the native type for storing an array. + + - name: type_boolean + description: | + Dependent on the adapter type, returns the native boolean type. + + - name: type_json + description: | + Dependent on the adapter type, returns the native type for storing JSON. + + ## MIGRATION ## + - name: migrate_from_v0_to_v1 + description: | + A macro to assist with migrating from v0 to v1 of dbt_artifacts. See + https://github.com/brooklyn-data/dbt_artifacts/blob/main/README.md#migrating-from-100-to-100 + for details on the usage. + arguments: + - name: old_database + type: string + description: | + The database of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_database` + - name: old_schema + type: string + description: | + The schema of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_schema` + - name: new_database + type: string + description: | + The target database that the v1 artifact sources are in - does not have to be different to `old_database` + - name: new_schema + type: string + description: | + The target schema that the v1 artifact sources are in - does not have to be different to `old_schema` + + ## UPLOAD INDIVIDUAL DATASETS ## + - name: upload_exposures + description: | + The macro to support upload of the data to the exposures table. + arguments: + - name: exposures + type: list + description: | + A list of exposure objects extracted from the dbt graph + + - name: upload_invocations + description: | + The macro to support upload of the data to the invocations table. + + - name: upload_model_executions + description: | + The macro to support upload of the data to the model_executions table. + arguments: + - name: models + type: list + description: | + A list of model execution results objects extracted from the dbt result object + + - name: upload_models + description: | + The macro to support upload of the data to the models table. + arguments: + - name: models + type: list + description: | + A list of test objects extracted from the dbt graph + + - name: upload_seed_executions + description: | + The macro to support upload of the data to the seed_executions table. + arguments: + - name: seeds + type: list + description: | + A list of seed execution results objects extracted from the dbt result object + + - name: upload_seeds + description: | + The macro to support upload of the data to the seeds table. + arguments: + - name: seeds + type: list + description: | + A list of seeds objects extracted from the dbt graph + + - name: upload_snapshot_executions + description: | + The macro to support upload of the data to the snapshot_executions table. + arguments: + - name: snapshots + type: list + description: | + A list of snapshot execution results objects extracted from the dbt result object + + - name: upload_snapshots + description: | + The macro to support upload of the data to the snapshots table. + arguments: + - name: snapshots + type: list + description: | + A list of snapshots objects extracted from the dbt graph + + - name: upload_sources + description: | + The macro to support upload of the data to the sources table. + arguments: + - name: sources + type: list + description: | + A list of sources objects extracted from the dbt graph + + - name: upload_test_executions + description: | + The macro to support upload of the data to the test_executions table. + arguments: + - name: tests + type: list + description: | + A list of test execution results objects extracted from the dbt result object + + - name: upload_tests + description: | + The macro to support upload of the data to the tests table. + arguments: + - name: tests + type: list + description: | + A list of test objects extracted from the dbt graph + + ## UPLOAD RESULTS ## + - name: get_column_name_list + description: | + A macro to return the list of column names for a particular dataset. Returns a comment if the dataset is not + valid. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + + - name: get_dataset_content + description: | + A macro to extract the data to be uploaded from either the results or the graph object. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the data for e.g. `models` + + - name: get_table_content_values + description: | + A macro to create the insert statement values required to be uploaded to the table. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + - name: objects_to_upload + type: list + description: | + The objects to be used to generate the insert statement values - extracted from `get_dataset_content` + + - name: insert_into_metadata_table + description: | + Dependent on the adapter type, the wrapper to insert the data into a table from a list of values. Used in the + `upload_results` macro, alongside the `get_column_lists` macro to generate the column names and the + `upload_dataset` macros to generate the data to be inserted. + arguments: + - name: database_name + type: string + description: | + The database name for the relation that the data is to be inserted into + - name: schema_name + type: string + description: | + The schema name for the relation that the data is to be inserted into + - name: table_name + type: string + description: | + The table name for the relation that the data is to be inserted into + - name: fields + type: string + description: | + The list of fields for the relation that the data is to be inserted into + - name: content + type: string + description: | + The data content to insert into the relation + + - name: upload_results + description: | + The main macro called to upload the metadata into each of the source tables. + arguments: + - name: results + type: list + description: | + The results object from dbt. diff --git a/macros/column_identifier.sql b/macros/database_specific_helpers/column_identifier.sql similarity index 100% rename from macros/column_identifier.sql rename to macros/database_specific_helpers/column_identifier.sql diff --git a/macros/generate_surrogate_key.sql b/macros/database_specific_helpers/generate_surrogate_key.sql similarity index 100% rename from macros/generate_surrogate_key.sql rename to macros/database_specific_helpers/generate_surrogate_key.sql diff --git a/macros/database_specific_helpers/get_relation.sql b/macros/database_specific_helpers/get_relation.sql new file mode 100644 index 00000000..ed7d02e5 --- /dev/null +++ b/macros/database_specific_helpers/get_relation.sql @@ -0,0 +1,14 @@ +{% macro get_relation(relation_name) %} + {% if execute %} + {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', relation_name) | first %} + {% set relation = api.Relation.create( + database = model_get_relation_node.database, + schema = model_get_relation_node.schema, + identifier = model_get_relation_node.alias + ) + %} + {% do return(relation) %} + {% else %} + {% do return(api.Relation.create()) %} + {% endif %} +{% endmacro %} diff --git a/macros/parse_json.sql b/macros/database_specific_helpers/parse_json.sql similarity index 82% rename from macros/parse_json.sql rename to macros/database_specific_helpers/parse_json.sql index 1184b542..39ea5d8f 100644 --- a/macros/parse_json.sql +++ b/macros/database_specific_helpers/parse_json.sql @@ -11,6 +11,6 @@ {%- endmacro %} {% macro bigquery__parse_json(field) -%} - parse_json({{ field }}) + safe.parse_json("""{{ field }}""", wide_number_mode=>'round') {%- endmacro %} diff --git a/macros/type_helpers.sql b/macros/database_specific_helpers/type_helpers.sql similarity index 95% rename from macros/type_helpers.sql rename to macros/database_specific_helpers/type_helpers.sql index 19c3a718..4064ad46 100644 --- a/macros/type_helpers.sql +++ b/macros/database_specific_helpers/type_helpers.sql @@ -19,11 +19,11 @@ {% endmacro %} {% macro snowflake__type_json() %} - OBJECT + object {% endmacro %} {% macro bigquery__type_json() %} - JSON + json {% endmacro %} {#- ARRAY -#} @@ -37,9 +37,9 @@ {% endmacro %} {% macro snowflake__type_array() %} - ARRAY + array {% endmacro %} {% macro bigquery__type_array() %} - ARRAY + array {% endmacro %} diff --git a/macros/insert_into_metadata_table.sql b/macros/insert_into_metadata_table.sql deleted file mode 100644 index 235d2326..00000000 --- a/macros/insert_into_metadata_table.sql +++ /dev/null @@ -1,38 +0,0 @@ -{% macro insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% if content != "" %} - {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(database_name, schema_name, table_name, content)) }} - {% endif %} -{%- endmacro %} - -{% macro spark__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% set insert_into_table_query %} - insert into {% if database_name %}{{ database_name }}.{% endif %}{{ schema_name }}.{{ table_name }} - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} -{%- endmacro %} - -{% macro snowflake__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% set insert_into_table_query %} - insert into {{database_name}}.{{ schema_name }}.{{ table_name }} - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} -{%- endmacro %} - -{% macro bigquery__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - - {% set insert_into_table_query %} - insert into `{{database_name}}.{{ schema_name }}.{{ table_name }}` - VALUES - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} - -{%- endmacro %} - -{% macro default__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} -{%- endmacro %} diff --git a/macros/migrate_from_v0_to_v1.sql b/macros/migration/migrate_from_v0_to_v1.sql similarity index 100% rename from macros/migrate_from_v0_to_v1.sql rename to macros/migration/migrate_from_v0_to_v1.sql diff --git a/macros/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql similarity index 63% rename from macros/upload_exposures.sql rename to macros/upload_individual_datasets/upload_exposures.sql index 3aa74a29..9f0ec5d3 100644 --- a/macros/upload_exposures.sql +++ b/macros/upload_individual_datasets/upload_exposures.sql @@ -1,8 +1,4 @@ -{% macro upload_exposures(graph) -%} - {% set exposures = [] %} - {% for node in graph.exposures.values() %} - {% do exposures.append(node) %} - {% endfor %} +{% macro upload_exposures(exposures) -%} {{ return(adapter.dispatch('get_exposures_dml_sql', 'dbt_artifacts')(exposures)) }} {%- endmacro %} @@ -41,7 +37,11 @@ '{{ exposure.package_name }}', {# package_name #} '{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #} '{{ tojson(exposure.tags) }}', {# tags #} - '{{ tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -62,7 +62,7 @@ '{{ run_started_at }}', {# run_started_at #} '{{ exposure.name | replace("'","\\'") }}', {# name #} '{{ exposure.type }}', {# type #} - parse_json('{{ tojson(exposure.owner) | replace("'","\\'") }}'), {# owner #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure.owner) | replace("'","\\'")) }}, {# owner #} '{{ exposure.maturity }}', {# maturity #} '{{ exposure.original_file_path | replace('\\', '\\\\') }}', {# path #} """{{ exposure.description | replace("'","\\'") }}""", {# description #} @@ -70,7 +70,45 @@ '{{ exposure.package_name }}', {# package_name #} {{ tojson(exposure.depends_on.nodes) }}, {# depends_on_nodes #} {{ tojson(exposure.tags) }}, {# tags #} - parse_json('{{ tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', wide_number_mode=>'round') {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ exposure_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_exposures_dml_sql(exposures) -%} + {% if exposures != [] %} + + {% set exposure_values %} + {% for exposure in exposures -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + $${{ exposure.unique_id }}$$, {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + $${{ exposure.name }}$$, {# name #} + '{{ exposure.type }}', {# type #} + $${{ tojson(exposure.owner) }}$$, {# owner #} + '{{ exposure.maturity }}', {# maturity #} + $${{ exposure.original_file_path }}$$, {# path #} + $${{ exposure.description }}$$, {# description #} + '{{ exposure.url }}', {# url #} + '{{ exposure.package_name }}', {# package_name #} + $${{ tojson(exposure.depends_on.nodes) }}$$, {# depends_on_nodes #} + $${{ tojson(exposure.tags) }}$$, {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(exposure) }}$$ {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_invocations.sql b/macros/upload_individual_datasets/upload_invocations.sql similarity index 68% rename from macros/upload_invocations.sql rename to macros/upload_individual_datasets/upload_invocations.sql index 0f891ce1..21c5574c 100644 --- a/macros/upload_invocations.sql +++ b/macros/upload_individual_datasets/upload_invocations.sql @@ -83,7 +83,7 @@ null, {# dbt_vars #} {% endif %} - '{{ tojson(invocation_args_dict) | replace('\\', '\\\\') }}', {# invocation_args #} + '{{ tojson(invocation_args_dict) | replace('\\', '\\\\') | replace("'", "\\'") }}', {# invocation_args #} {% set metadata_env = {} %} {% for key, value in dbt_metadata_envs.items() %} @@ -122,7 +122,7 @@ {% for env_variable in var('env_vars') %} {% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %} {% endfor %} - parse_json('''{{ tojson(env_vars_dict) }}'''), {# env_vars #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(env_vars_dict)) }}, {# env_vars #} {% else %} null, {# env_vars #} {% endif %} @@ -132,7 +132,7 @@ {% for dbt_var in var('dbt_vars') %} {% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %} {% endfor %} - parse_json('''{{ tojson(dbt_vars_dict) }}'''), {# dbt_vars #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(dbt_vars_dict)) }}, {# dbt_vars #} {% else %} null, {# dbt_vars #} {% endif %} @@ -146,16 +146,78 @@ {% endif %} {% endif %} - safe.parse_json('''{{ tojson(invocation_args_dict) }}'''), {# invocation_args #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(invocation_args_dict) | replace("'", "\\'")) }}, {# invocation_args #} {% set metadata_env = {} %} {% for key, value in dbt_metadata_envs.items() %} {% do metadata_env.update({key: value}) %} {% endfor %} - parse_json('''{{ tojson(metadata_env) | replace('\\', '\\\\') }}''') {# dbt_custom_envs #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(metadata_env) | replace('\\', '\\\\')) }} {# dbt_custom_envs #} ) {% endset %} {{ invocation_values }} {% endmacro -%} + +{% macro postgres__get_invocations_dml_sql() -%} + {% set invocation_values %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ dbt_version }}', {# dbt_version #} + '{{ project_name }}', {# project_name #} + '{{ run_started_at }}', {# run_started_at #} + '{{ flags.WHICH }}', {# dbt_command #} + {{ flags.FULL_REFRESH }}, {# full_refresh_flag #} + '{{ target.profile_name }}', {# target_profile_name #} + '{{ target.name }}', {# target_name #} + '{{ target.schema }}', {# target_schema #} + {{ target.threads }}, {# target_threads #} + + '{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #} + '{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #} + '{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #} + '{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #} + $${{ env_var('DBT_CLOUD_RUN_REASON', '') }}$$, {# dbt_cloud_run_reason #} + + {% if var('env_vars', none) %} + {% set env_vars_dict = {} %} + {% for env_variable in var('env_vars') %} + {% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %} + {% endfor %} + $${{ tojson(env_vars_dict) }}$$, {# env_vars #} + {% else %} + null, {# env_vars #} + {% endif %} + + {% if var('dbt_vars', none) %} + {% set dbt_vars_dict = {} %} + {% for dbt_var in var('dbt_vars') %} + {% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %} + {% endfor %} + $${{ tojson(dbt_vars_dict) }}$$, {# dbt_vars #} + {% else %} + null, {# dbt_vars #} + {% endif %} + + {% if invocation_args_dict.vars %} + {# vars - different format for pre v1.5 (yaml vs list) #} + {% if invocation_args_dict.vars is string %} + {# BigQuery does not handle the yaml-string from "--vars" well, when passed to "parse_json". Workaround is to parse the string, and then "tojson" will properly format the dict as a json-object. #} + {% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %} + {% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %} + {% endif %} + {% endif %} + + $${{ tojson(invocation_args_dict) }}$$, {# invocation_args #} + + {% set metadata_env = {} %} + {% for key, value in dbt_metadata_envs.items() %} + {% do metadata_env.update({key: value}) %} + {% endfor %} + $${{ tojson(metadata_env) }}$$ {# dbt_custom_envs #} + ) + {% endset %} + {{ invocation_values }} + +{% endmacro -%} diff --git a/macros/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql similarity index 66% rename from macros/upload_model_executions.sql rename to macros/upload_individual_datasets/upload_model_executions.sql index 476c7ba7..bca26fea 100644 --- a/macros/upload_model_executions.sql +++ b/macros/upload_individual_datasets/upload_model_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_model_executions(results) -%} - {% set models = [] %} - {% for result in results %} - {% if result.node.resource_type == "model" %} - {% do models.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_model_executions(models) -%} {{ return(adapter.dispatch('get_model_executions_dml_sql', 'dbt_artifacts')(models)) }} {%- endmacro %} @@ -45,26 +39,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} null, -- rows_affected not available {# Only available in Snowflake & BigQuery #} @@ -102,26 +80,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} safe_cast('{{ model.adapter_response.rows_affected }}' as int64), @@ -130,8 +92,8 @@ '{{ model.node.schema }}', {# schema #} '{{ model.node.name }}', {# name #} '{{ model.node.alias }}', {# alias #} - '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - parse_json('{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}') {# adapter_response #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -178,26 +140,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #} @@ -216,3 +162,44 @@ {{ return("") }} {% endif %} {% endmacro -%} + +{% macro postgres__get_model_executions_dml_sql(models) -%} + {% if models != [] %} + {% set model_execution_values %} + {% for model in models -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ model.execution_time }}, {# total_node_runtime #} + null, {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + $${{ model.message }}$$, {# message #} + $${{ tojson(model.adapter_response) }}$$ {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_models.sql b/macros/upload_individual_datasets/upload_models.sql new file mode 100644 index 00000000..7570b06a --- /dev/null +++ b/macros/upload_individual_datasets/upload_models.sql @@ -0,0 +1,129 @@ +{% macro upload_models(models) -%} + {{ return(adapter.dispatch('get_models_dml_sql', 'dbt_artifacts')(models)) }} +{%- endmacro %} + +{% macro default__get_models_dml_sql(models) -%} + + {% if models != [] %} + {% set model_values %} + select + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(7)) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(15)) }} + from values + {% for model in models -%} + {% set model_copy = model.copy() -%} + {% do model_copy.pop('raw_code', None) %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model_copy.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ model_copy.database }}', {# database #} + '{{ model_copy.schema }}', {# schema #} + '{{ model_copy.name }}', {# name #} + '{{ tojson(model_copy.depends_on.nodes) | replace('\\', '\\\\') }}', {# depends_on_nodes #} + '{{ model_copy.package_name }}', {# package_name #} + '{{ model_copy.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ model_copy.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} + '{{ model_copy.config.materialized }}', {# materialization #} + '{{ tojson(model_copy.tags) }}', {# tags #} + '{{ tojson(model_copy.config.meta) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', {# meta #} + '{{ model_copy.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(model_copy) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} + +{% macro bigquery__get_models_dml_sql(models) -%} + {% if models != [] %} + {% set model_values %} + {% for model in models -%} + {% set model_copy = model.copy() -%} + {% do model_copy.pop('raw_code', None) %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model_copy.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ model_copy.database }}', {# database #} + '{{ model_copy.schema }}', {# schema #} + '{{ model_copy.name }}', {# name #} + {{ tojson(model_copy.depends_on.nodes) }}, {# depends_on_nodes #} + '{{ model_copy.package_name }}', {# package_name #} + '{{ model_copy.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ model_copy.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} + '{{ model_copy.config.materialized }}', {# materialization #} + {{ tojson(model_copy.tags) }}, {# tags #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy.config.meta)) }}, {# meta #} + '{{ model_copy.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model_copy) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"')) }} {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_models_dml_sql(models) -%} + {% if models != [] %} + {% set model_values %} + {% for model in models -%} + {% set model_copy = model.copy() -%} + {% do model_copy.pop('raw_code', None) %} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model_copy.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ model_copy.database }}', {# database #} + '{{ model_copy.schema }}', {# schema #} + '{{ model_copy.name }}', {# name #} + '{{ tojson(model_copy.depends_on.nodes) }}', {# depends_on_nodes #} + '{{ model_copy.package_name }}', {# package_name #} + $${{ model_copy.original_file_path | replace('\\', '\\\\') }}$$, {# path #} + '{{ model_copy.checksum.checksum }}', {# checksum #} + '{{ model_copy.config.materialized }}', {# materialization #} + '{{ tojson(model_copy.tags) }}', {# tags #} + $${{ model_copy.config.meta }}$$, {# meta #} + '{{ model_copy.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(model_copy) }}$$ {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ model_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} diff --git a/macros/upload_seed_executions.sql b/macros/upload_individual_datasets/upload_seed_executions.sql similarity index 71% rename from macros/upload_seed_executions.sql rename to macros/upload_individual_datasets/upload_seed_executions.sql index a3a9d521..1ccbfe2a 100644 --- a/macros/upload_seed_executions.sql +++ b/macros/upload_individual_datasets/upload_seed_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_seed_executions(results) -%} - {% set seeds = [] %} - {% for result in results %} - {% if result.node.resource_type == "seed" %} - {% do seeds.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_seed_executions(seeds) -%} {{ return(adapter.dispatch('get_seed_executions_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} @@ -44,26 +38,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} null, -- rows_affected not available {# Only available in Snowflake #} @@ -101,26 +79,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} null, -- rows_affected not available {# Databricks #} @@ -128,8 +90,8 @@ '{{ model.node.schema }}', {# schema #} '{{ model.node.name }}', {# name #} '{{ model.node.alias }}', {# alias #} - '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - parse_json('{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}') {# adapter_response #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -176,6 +138,47 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ model.execution_time }}, {# total_node_runtime #} + try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} + '{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ seed_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} + +{% macro postgres__get_seed_executions_dml_sql(seeds) -%} + {% if seeds != [] %} + {% set seed_execution_values %} + {% for model in seeds -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + {% if model.timing != [] %} {% for stage in model.timing if stage.name == "compile" %} {% if loop.length == 0 %} @@ -198,13 +201,13 @@ {% endif %} {{ model.execution_time }}, {# total_node_runtime #} - try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #} + null, -- rows_affected not available {# Databricks #} '{{ model.node.config.materialized }}', {# materialization #} '{{ model.node.schema }}', {# schema #} '{{ model.node.name }}', {# name #} '{{ model.node.alias }}', {# alias #} - '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - '{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + $${{ model.message }}$$, {# message #} + $${{ tojson(model.adapter_response) }}$$ {# adapter_response #} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_seeds.sql b/macros/upload_individual_datasets/upload_seeds.sql similarity index 59% rename from macros/upload_seeds.sql rename to macros/upload_individual_datasets/upload_seeds.sql index f256c4a1..32e67383 100644 --- a/macros/upload_seeds.sql +++ b/macros/upload_individual_datasets/upload_seeds.sql @@ -1,8 +1,4 @@ -{% macro upload_seeds(graph) -%} - {% set seeds = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "seed") %} - {% do seeds.append(node) %} - {% endfor %} +{% macro upload_seeds(seeds) -%} {{ return(adapter.dispatch('get_seeds_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} @@ -34,10 +30,14 @@ '{{ seed.name }}', {# name #} '{{ seed.package_name }}', {# package_name #} '{{ seed.original_file_path | replace('\\', '\\\\') }}', {# path #} - '{{ seed.checksum.checksum }}', {# checksum #} + '{{ seed.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} '{{ tojson(seed.config.meta) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', {# meta #} '{{ seed.alias }}', {# alias #} - '{{ tojson(seed) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(seed) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -49,6 +49,37 @@ {% endmacro -%} {% macro bigquery__get_seeds_dml_sql(seeds) -%} + {% if seeds != [] %} + {% set seed_values %} + {% for seed in seeds -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ seed.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ seed.database }}', {# database #} + '{{ seed.schema }}', {# schema #} + '{{ seed.name }}', {# name #} + '{{ seed.package_name }}', {# package_name #} + '{{ seed.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ seed.checksum.checksum | replace('\\', '\\\\')}}', {# checksum #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(seed.config.meta)) }}, {# meta #} + '{{ seed.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(seed) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"')) }} {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ seed_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_seeds_dml_sql(seeds) -%} {% if seeds != [] %} {% set seed_values %} {% for seed in seeds -%} @@ -62,9 +93,13 @@ '{{ seed.package_name }}', {# package_name #} '{{ seed.original_file_path | replace('\\', '\\\\') }}', {# path #} '{{ seed.checksum.checksum }}', {# checksum #} - parse_json('''{{ tojson(seed.config.meta) }}'''), {# meta #} + $${{ tojson(seed.config.meta) }}$$, {# meta #} '{{ seed.alias }}', {# alias #} - parse_json('{{ tojson(seed) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', wide_number_mode=>'round') {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(seed) }}$$ {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_snapshot_executions.sql b/macros/upload_individual_datasets/upload_snapshot_executions.sql similarity index 71% rename from macros/upload_snapshot_executions.sql rename to macros/upload_individual_datasets/upload_snapshot_executions.sql index e0bf6dac..2006b168 100644 --- a/macros/upload_snapshot_executions.sql +++ b/macros/upload_individual_datasets/upload_snapshot_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_snapshot_executions(results) -%} - {% set snapshots = [] %} - {% for result in results %} - {% if result.node.resource_type == "snapshot" %} - {% do snapshots.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_snapshot_executions(snapshots) -%} {{ return(adapter.dispatch('get_snapshot_executions_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} @@ -44,26 +38,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} null, -- rows_affected not available {# Only available in Snowflake #} @@ -101,26 +79,10 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} - {% if model.timing != [] %} - {% for stage in model.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in model.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ model.execution_time }}, {# total_node_runtime #} null, -- rows_affected not available {# Databricks #} @@ -128,8 +90,8 @@ '{{ model.node.schema }}', {# schema #} '{{ model.node.name }}', {# name #} '{{ model.node.alias }}', {# alias #} - '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - parse_json('{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}') {# adapter_response #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -176,6 +138,47 @@ '{{ model.thread_id }}', {# thread_id #} '{{ model.status }}', {# status #} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ model.execution_time }}, {# total_node_runtime #} + try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #} + '{{ model.node.config.materialized }}', {# materialization #} + '{{ model.node.schema }}', {# schema #} + '{{ model.node.name }}', {# name #} + '{{ model.node.alias }}', {# alias #} + '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} + '{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ snapshot_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} + +{% macro postgres__get_snapshot_executions_dml_sql(snapshots) -%} + {% if snapshots != [] %} + {% set snapshot_execution_values %} + {% for model in snapshots -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ model.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + + {% set config_full_refresh = model.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ model.thread_id }}', {# thread_id #} + '{{ model.status }}', {# status #} + {% if model.timing != [] %} {% for stage in model.timing if stage.name == "compile" %} {% if loop.length == 0 %} @@ -198,13 +201,13 @@ {% endif %} {{ model.execution_time }}, {# total_node_runtime #} - try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #} + null, {# rows_affected #} '{{ model.node.config.materialized }}', {# materialization #} '{{ model.node.schema }}', {# schema #} '{{ model.node.name }}', {# name #} '{{ model.node.alias }}', {# alias #} - '{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - '{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + $${{ model.message }}$$, {# message #} + $${{ tojson(model.adapter_response) }}$$ {# adapter_response #} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_snapshots.sql b/macros/upload_individual_datasets/upload_snapshots.sql similarity index 60% rename from macros/upload_snapshots.sql rename to macros/upload_individual_datasets/upload_snapshots.sql index 711d3a00..0be6759b 100644 --- a/macros/upload_snapshots.sql +++ b/macros/upload_individual_datasets/upload_snapshots.sql @@ -1,8 +1,5 @@ -{% macro upload_snapshots(graph) -%} - {% set snapshots = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot") %} - {% do snapshots.append(node) %} - {% endfor %} +{% macro upload_snapshots(snapshots) -%} + {{ return(adapter.dispatch('get_snapshots_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} @@ -38,11 +35,15 @@ '{{ tojson(snapshot.depends_on.nodes) }}', {# depends_on_nodes #} '{{ snapshot.package_name }}', {# package_name #} '{{ snapshot.original_file_path | replace('\\', '\\\\') }}', {# path #} - '{{ snapshot.checksum.checksum }}', {# checksum #} + '{{ snapshot.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} '{{ snapshot.config.strategy }}', {# strategy #} '{{ tojson(snapshot.config.meta) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', {# meta #} '{{ snapshot.alias }}', {# alias #} - '{{ tojson(snapshot) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(snapshot) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -67,11 +68,48 @@ {{ tojson(snapshot.depends_on.nodes) }}, {# depends_on_nodes #} '{{ snapshot.package_name }}', {# package_name #} '{{ snapshot.original_file_path | replace('\\', '\\\\') }}', {# path #} + '{{ snapshot.checksum.checksum | replace('\\', '\\\\') }}', {# checksum #} + '{{ snapshot.config.strategy }}', {# strategy #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(snapshot.config.meta)) }}, {# meta #} + '{{ snapshot.alias }}', {# alias #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(snapshot) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"')) }} {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ snapshot_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_snapshots_dml_sql(snapshots) -%} + {% if snapshots != [] %} + {% set snapshot_values %} + {% for snapshot in snapshots -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ snapshot.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ snapshot.database }}', {# database #} + '{{ snapshot.schema }}', {# schema #} + '{{ snapshot.name }}', {# name #} + $${{ tojson(snapshot.depends_on.nodes) }}$$, {# depends_on_nodes #} + '{{ snapshot.package_name }}', {# package_name #} + '{{ snapshot.original_file_path | replace('\\', '\\\\') }}', {# path #} '{{ snapshot.checksum.checksum }}', {# checksum #} '{{ snapshot.config.strategy }}', {# strategy #} - parse_json('''{{ tojson(snapshot.config.meta) }}'''), {# meta #} + $${{ tojson(snapshot.config.meta) }}$$, {# meta #} '{{ snapshot.alias }}', {# alias #} - parse_json('{{ tojson(snapshot) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', wide_number_mode=>'round') {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(snapshot) }}$$ {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_sources.sql b/macros/upload_individual_datasets/upload_sources.sql similarity index 61% rename from macros/upload_sources.sql rename to macros/upload_individual_datasets/upload_sources.sql index ffe0e2b3..dbcb49e4 100644 --- a/macros/upload_sources.sql +++ b/macros/upload_individual_datasets/upload_sources.sql @@ -33,7 +33,11 @@ '{{ source.identifier }}', {# identifier #} '{{ source.loaded_at_field | replace("'","\\'") }}', {# loaded_at_field #} '{{ tojson(source.freshness) | replace("'","\\'") }}', {# freshness #} - '{{ tojson(source) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(source) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -59,8 +63,43 @@ '{{ source.name }}', {# name #} '{{ source.identifier }}', {# identifier #} '{{ source.loaded_at_field | replace("'","\\'") }}', {# loaded_at_field #} - parse_json('{{ tojson(source.freshness) | replace("'","\\'") }}'), {# freshness #} - parse_json('{{ tojson(source) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', wide_number_mode=>'round') {# all_results #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source.freshness) | replace("'","\\'")) }}, {# freshness #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(source) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# all_results #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ source_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_sources_dml_sql(sources) -%} + {% if sources != [] %} + {% set source_values %} + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ source.database }}', {# database #} + '{{ source.schema }}', {# schema #} + '{{ source.source_name }}', {# source_name #} + '{{ source.loader }}', {# loader #} + '{{ source.name }}', {# name #} + '{{ source.identifier }}', {# identifier #} + $${{ source.loaded_at_field }}$$, {# loaded_at_field #} + $${{ tojson(source.freshness) }}$$, {# freshness #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(source) }}$$ {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql similarity index 64% rename from macros/upload_test_executions.sql rename to macros/upload_individual_datasets/upload_test_executions.sql index 198fc95f..ea3553ae 100644 --- a/macros/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_test_executions(results) -%} - {% set tests = [] %} - {% for result in results %} - {% if result.node.resource_type == "test" %} - {% do tests.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_test_executions(tests) -%} {{ return(adapter.dispatch('get_test_executions_dml_sql', 'dbt_artifacts')(tests)) }} {%- endmacro %} @@ -41,26 +35,10 @@ '{{ test.thread_id }}', {# thread_id #} '{{ test.status }}', {# status #} - {% if test.timing != [] %} - {% for stage in test.timing if stage.name == "compile" %} - {% if loop.length == 0 %} - null, {# compile_started_at #} - {% else %} - '{{ stage.started_at }}', {# compile_started_at #} - {% endif %} - {% endfor %} - - {% for stage in test.timing if stage.name == "execute" %} - {% if loop.length == 0 %} - null, {# query_completed_at #} - {% else %} - '{{ stage.completed_at }}', {# query_completed_at #} - {% endif %} - {% endfor %} - {% else %} - null, {# compile_started_at #} - null, {# query_completed_at #} - {% endif %} + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} {{ test.execution_time }}, {# total_node_runtime #} null, {# rows_affected not available in Databricks #} @@ -78,6 +56,45 @@ {% endmacro -%} {% macro bigquery__get_test_executions_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_execution_values %} + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + + {% set config_full_refresh = test.node.config.full_refresh %} + {% if config_full_refresh is none %} + {% set config_full_refresh = flags.FULL_REFRESH %} + {% endif %} + {{ config_full_refresh }}, {# was_full_refresh #} + + '{{ test.thread_id }}', {# thread_id #} + '{{ test.status }}', {# status #} + + {% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ test.execution_time }}, {# total_node_runtime #} + null, {# rows_affected not available in Databricks #} + {{ 'null' if test.failures is none else test.failures }}, {# failures #} + '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + + {%- endfor %} + {% endset %} + {{ test_execution_values }} + {% else %} + {{ return("") }} + {% endif %} +{% endmacro -%} + +{% macro postgres__get_test_executions_dml_sql(tests) -%} {% if tests != [] %} {% set test_execution_values %} {% for test in tests -%} @@ -119,9 +136,8 @@ {{ test.execution_time }}, {# total_node_runtime #} null, {# rows_affected not available in Databricks #} {{ 'null' if test.failures is none else test.failures }}, {# failures #} - '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} - parse_json('{{ tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}') {# adapter_response #} - + $${{ test.message }}$$, {# message #} + $${{ tojson(test.adapter_response) }}$$ {# adapter_response #} ) {%- if not loop.last %},{%- endif %} diff --git a/macros/upload_tests.sql b/macros/upload_individual_datasets/upload_tests.sql similarity index 61% rename from macros/upload_tests.sql rename to macros/upload_individual_datasets/upload_tests.sql index ab4f2a1b..cf75e9ba 100644 --- a/macros/upload_tests.sql +++ b/macros/upload_individual_datasets/upload_tests.sql @@ -27,7 +27,11 @@ '{{ test.package_name }}', {# package_name #} '{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #} '{{ tojson(test.tags) }}', {# tags #} - '{{ tojson(test) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_fields #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + '{{ tojson(test) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_fields #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} @@ -51,7 +55,39 @@ '{{ test.package_name }}', {# package_name #} '{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #} {{ tojson(test.tags) }}, {# tags #} - parse_json('{{ tojson(test) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}') {# all_fields #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"')) }} {# all_fields #} + {% endif %} + ) + {%- if not loop.last %},{%- endif %} + {%- endfor %} + {% endset %} + {{ test_values }} + {% else %} + {{ return("") }} + {% endif %} +{%- endmacro %} + +{% macro postgres__get_tests_dml_sql(tests) -%} + {% if tests != [] %} + {% set test_values %} + {% for test in tests -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ test.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ test.name }}', {# name #} + $${{ tojson(test.depends_on.nodes) }}$$, {# depends_on_nodes #} + '{{ test.package_name }}', {# package_name #} + '{{ test.original_file_path | replace('\\', '\\\\') }}', {# test_path #} + $${{ tojson(test.tags) }}$$, {# tags #} + {% if var('dbt_artifacts_exclude_all_results', false) %} + null + {% else %} + $${{ tojson(test) }}$$ {# all_results #} + {% endif %} ) {%- if not loop.last %},{%- endif %} {%- endfor %} diff --git a/macros/upload_models.sql b/macros/upload_models.sql deleted file mode 100644 index 7673f3f4..00000000 --- a/macros/upload_models.sql +++ /dev/null @@ -1,83 +0,0 @@ -{% macro upload_models(models) -%} - {{ return(adapter.dispatch('get_models_dml_sql', 'dbt_artifacts')(models)) }} -{%- endmacro %} - -{% macro default__get_models_dml_sql(models) -%} - - {% if models != [] %} - {% set model_values %} - select - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(7)) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(15)) }} - from values - {% for model in models -%} - {% do model.pop('raw_code', None) %} - ( - '{{ invocation_id }}', {# command_invocation_id #} - '{{ model.unique_id }}', {# node_id #} - '{{ run_started_at }}', {# run_started_at #} - '{{ model.database }}', {# database #} - '{{ model.schema }}', {# schema #} - '{{ model.name }}', {# name #} - '{{ tojson(model.depends_on.nodes) | replace('\\', '\\\\') }}', {# depends_on_nodes #} - '{{ model.package_name }}', {# package_name #} - '{{ model.original_file_path | replace('\\', '\\\\') }}', {# path #} - '{{ model.checksum.checksum }}', {# checksum #} - '{{ model.config.materialized }}', {# materialization #} - '{{ tojson(model.tags) }}', {# tags #} - '{{ tojson(model.config.meta) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', {# meta #} - '{{ model.alias }}', {# alias #} - '{{ tojson(model) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}' {# all_results #} - ) - {%- if not loop.last %},{%- endif %} - {%- endfor %} - {% endset %} - {{ model_values }} - {% else %} - {{ return("") }} - {% endif %} -{% endmacro -%} - -{% macro bigquery__get_models_dml_sql(models) -%} - {% if models != [] %} - {% set model_values %} - {% for model in models -%} - {% do model.pop('raw_code', None) %} - ( - '{{ invocation_id }}', {# command_invocation_id #} - '{{ model.unique_id }}', {# node_id #} - '{{ run_started_at }}', {# run_started_at #} - '{{ model.database }}', {# database #} - '{{ model.schema }}', {# schema #} - '{{ model.name }}', {# name #} - {{ tojson(model.depends_on.nodes) }}, {# depends_on_nodes #} - '{{ model.package_name }}', {# package_name #} - '{{ model.original_file_path | replace('\\', '\\\\') }}', {# path #} - '{{ model.checksum.checksum }}', {# checksum #} - '{{ model.config.materialized }}', {# materialization #} - {{ tojson(model.tags) }}, {# tags #} - parse_json('''{{ tojson(model.config.meta) }}'''), {# meta #} - '{{ model.alias }}', {# alias #} - parse_json('{{ tojson(model) | replace("\\", "\\\\") | replace("'","\\'") | replace('"', '\\"') }}', wide_number_mode=>'round') {# all_results #} - ) - {%- if not loop.last %},{%- endif %} - {%- endfor %} - {% endset %} - {{ model_values }} - {% else %} - {{ return("") }} - {% endif %} -{%- endmacro %} diff --git a/macros/upload_results.sql b/macros/upload_results.sql deleted file mode 100644 index dbfb8051..00000000 --- a/macros/upload_results.sql +++ /dev/null @@ -1,169 +0,0 @@ -{# dbt doesn't like us ref'ing in an operation so we fetch the info from the graph #} -{% macro get_relation(get_relation_name) %} - {% if execute %} - {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', get_relation_name) | first %} - {% set relation = api.Relation.create( - database = model_get_relation_node.database, - schema = model_get_relation_node.schema, - identifier = model_get_relation_node.alias - ) - %} - {% do return(relation) %} - {% else %} - {% do return(api.Relation.create()) %} - {% endif %} -{% endmacro %} - -{% macro upload_results(results) -%} - - {% if execute %} - - {% if results != [] %} - {% do log("Uploading model executions", true) %} - {% set model_executions = dbt_artifacts.get_relation('model_executions') %} - {% set content_model_executions = dbt_artifacts.upload_model_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=model_executions.database, - schema_name=model_executions.schema, - table_name=model_executions.identifier, - content=content_model_executions - ) - }} - - {% do log("Uploading seed executions", true) %} - {% set seed_executions = dbt_artifacts.get_relation('seed_executions') %} - {% set content_seed_executions = dbt_artifacts.upload_seed_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seed_executions.database, - schema_name=seed_executions.schema, - table_name=seed_executions.identifier, - content=content_seed_executions - ) - }} - - {% do log("Uploading snapshot executions", true) %} - {% set snapshot_executions = dbt_artifacts.get_relation('snapshot_executions') %} - {% set content_snapshot_executions = dbt_artifacts.upload_snapshot_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshot_executions.database, - schema_name=snapshot_executions.schema, - table_name=snapshot_executions.identifier, - content=content_snapshot_executions - ) - }} - - {% do log("Uploading test executions", true) %} - {% set test_executions = dbt_artifacts.get_relation('test_executions') %} - {% set content_test_executions = dbt_artifacts.upload_test_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=test_executions.database, - schema_name=test_executions.schema, - table_name=test_executions.identifier, - content=content_test_executions - ) - }} - - {% endif %} - - {% do log("Uploading exposures", true) %} - {% set exposures = dbt_artifacts.get_relation('exposures') %} - {% set content_exposures = dbt_artifacts.upload_exposures(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=exposures.database, - schema_name=exposures.schema, - table_name=exposures.identifier, - content=content_exposures - ) - }} - - {% do log("Uploading tests", true) %} - {% set tests = dbt_artifacts.get_relation('tests') %} - {% set tests_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "test") %} - {% do tests_set.append(node) %} - {% endfor %} - {# upload tests in chunks of 5000 tests (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, tests_set | length, upload_limit) -%} - {% set content_tests = dbt_artifacts.upload_tests(tests_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=tests.database, - schema_name=tests.schema, - table_name=tests.identifier, - content=content_tests - ) - }} - {%- endfor %} - - {% do log("Uploading seeds", true) %} - {% set seeds = dbt_artifacts.get_relation('seeds') %} - {% set content_seeds = dbt_artifacts.upload_seeds(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seeds.database, - schema_name=seeds.schema, - table_name=seeds.identifier, - content=content_seeds - ) - }} - - {% do log("Uploading models", true) %} - {% set models = dbt_artifacts.get_relation('models') %} - {% set models_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} - {% do models_set.append(node) %} - {% endfor %} - {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} - {% for i in range(0, models_set | length, upload_limit) -%} - {% set content_models = dbt_artifacts.upload_models(models_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=models.database, - schema_name=models.schema, - table_name=models.identifier, - content=content_models - ) - }} - {%- endfor %} - - {% do log("Uploading sources", true) %} - {% set sources = dbt_artifacts.get_relation('sources') %} - {% set sources_set = [] %} - {% for node in graph.sources.values() %} - {% do sources_set.append(node) %} - {% endfor %} - {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, sources_set | length, upload_limit) -%} - {% set content_sources = dbt_artifacts.upload_sources(sources_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=sources.database, - schema_name=sources.schema, - table_name=sources.identifier, - content=content_sources - ) - }} - {%- endfor %} - - {% do log("Uploading snapshots", true) %} - {% set snapshots = dbt_artifacts.get_relation('snapshots') %} - {% set content_snapshots = dbt_artifacts.upload_snapshots(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshots.database, - schema_name=snapshots.schema, - table_name=snapshots.identifier, - content=content_snapshots - ) - }} - - {% do log("Uploading invocations", true) %} - {% set invocations = dbt_artifacts.get_relation('invocations') %} - {% set content_invocations = dbt_artifacts.upload_invocations() %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=invocations.database, - schema_name=invocations.schema, - table_name=invocations.identifier, - content=content_invocations - ) - }} - - {% endif %} -{%- endmacro %} diff --git a/macros/upload_results/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql new file mode 100644 index 00000000..7911e866 --- /dev/null +++ b/macros/upload_results/get_column_name_lists.sql @@ -0,0 +1,230 @@ + +{# + These are the column lists used as part of the upload macros - the order here should be the same + as the order in each individual `upload_dataset` macro. +#} + +{% macro get_column_name_list(dataset) -%} + + {% if dataset == 'exposures' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + type, + owner, + maturity, + path, + description, + url, + package_name, + depends_on_nodes, + tags, + all_results + ) + + {% elif dataset == 'invocations' %} + + ( + command_invocation_id, + dbt_version, + project_name, + run_started_at, + dbt_command, + full_refresh_flag, + target_profile_name, + target_name, + target_schema, + target_threads, + dbt_cloud_project_id, + dbt_cloud_job_id, + dbt_cloud_run_id, + dbt_cloud_run_reason_category, + dbt_cloud_run_reason, + env_vars, + dbt_vars, + invocation_args, + dbt_custom_envs + ) + + {% elif dataset == 'model_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + {% if target.type == 'bigquery' %} + bytes_processed, + {% endif %} + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'models' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + materialization, + tags, + meta, + alias, + all_results + ) + + + {% elif dataset == 'seed_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'seeds' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + package_name, + path, + checksum, + meta, + alias, + all_results + ) + + {% elif dataset == 'snapshot_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'snapshots' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + strategy, + meta, + alias, + all_results + ) + + {% elif dataset == 'sources' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + source_name, + loader, + name, + identifier, + loaded_at_field, + freshness, + all_results + ) + + {% elif dataset == 'test_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + failures, + message, + adapter_response + ) + + {% elif dataset == 'tests' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + depends_on_nodes, + package_name, + test_path, + tags, + all_results + ) + + {% else %} + + /* No column list available */ + + {% endif %} + +{%- endmacro %} diff --git a/macros/upload_results/get_dataset_content.sql b/macros/upload_results/get_dataset_content.sql new file mode 100644 index 00000000..b34e39cd --- /dev/null +++ b/macros/upload_results/get_dataset_content.sql @@ -0,0 +1,22 @@ +{% macro get_dataset_content(dataset) %} + + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {# Executions make use of the results object #} + {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} + {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} + {# Use the nodes in the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) | list %} + {% elif dataset in ['exposures', 'sources'] %} + {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.get(dataset).values() | list %} + {% elif dataset == 'invocations' %} + {# + Invocations doesn't need anything input, but we include this so that it will still be picked up + as part of the loop below - the length must be >0 to allow for an upload, hence the empty string + #} + {% set objects = [''] %} + {% endif %} + + {{ return(objects) }} + +{% endmacro %} diff --git a/macros/upload_results/get_table_content_values.sql b/macros/upload_results/get_table_content_values.sql new file mode 100644 index 00000000..277899c5 --- /dev/null +++ b/macros/upload_results/get_table_content_values.sql @@ -0,0 +1,32 @@ +{% macro get_table_content_values(dataset, objects_to_upload) %} + + {# Convert the results to data to be imported #} + + {% if dataset == 'model_executions' %} + {% set content = dbt_artifacts.upload_model_executions(objects_to_upload) %} + {% elif dataset == 'seed_executions' %} + {% set content = dbt_artifacts.upload_seed_executions(objects_to_upload) %} + {% elif dataset == 'test_executions' %} + {% set content = dbt_artifacts.upload_test_executions(objects_to_upload) %} + {% elif dataset == 'snapshot_executions' %} + {% set content = dbt_artifacts.upload_snapshot_executions(objects_to_upload) %} + {% elif dataset == 'exposures' %} + {% set content = dbt_artifacts.upload_exposures(objects_to_upload) %} + {% elif dataset == 'models' %} + {% set content = dbt_artifacts.upload_models(objects_to_upload) %} + {% elif dataset == 'seeds' %} + {% set content = dbt_artifacts.upload_seeds(objects_to_upload) %} + {% elif dataset == 'snapshots' %} + {% set content = dbt_artifacts.upload_snapshots(objects_to_upload) %} + {% elif dataset == 'sources' %} + {% set content = dbt_artifacts.upload_sources(objects_to_upload) %} + {% elif dataset == 'tests' %} + {% set content = dbt_artifacts.upload_tests(objects_to_upload) %} + {# Invocations only requires data from variables available in the macro #} + {% elif dataset == 'invocations' %} + {% set content = dbt_artifacts.upload_invocations() %} + {% endif %} + + {{ return(content) }} + +{% endmacro %} diff --git a/macros/upload_results/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql new file mode 100644 index 00000000..24f1eb77 --- /dev/null +++ b/macros/upload_results/insert_into_metadata_table.sql @@ -0,0 +1,63 @@ +{% macro insert_into_metadata_table(dataset, fields, content) -%} + + {% if content != "" %} + + {# Get the relation that the results will be uploaded to #} + {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} + {# Insert the data into the table #} + {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(dataset_relation, fields, content)) }} + + {% endif %} + +{%- endmacro %} + +{% macro spark__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro snowflake__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro bigquery__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + values + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{%- endmacro %} + +{% macro postgres__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + values + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro default__insert_into_metadata_table(relation, fields, content) -%} +{%- endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql new file mode 100644 index 00000000..114a667d --- /dev/null +++ b/macros/upload_results/upload_results.sql @@ -0,0 +1,50 @@ +{# dbt doesn't like us ref'ing in an operation so we fetch the info from the graph #} + +{% macro upload_results(results) -%} + + {% if execute %} + + {% set datasets_to_load = ['exposures', 'seeds', 'snapshots', 'invocations', 'sources', 'tests', 'models'] %} + {% if results != [] %} + {# When executing, and results are available, then upload the results #} + {% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + datasets_to_load %} + {% endif %} + + {# Upload each data set in turn #} + {% for dataset in datasets_to_load %} + + {% do log("Uploading " ~ dataset.replace("_", " "), true) %} + + {# Get the results that need to be uploaded #} + {% set objects = dbt_artifacts.get_dataset_content(dataset) %} + + {# Upload in chunks to reduce the query size #} + {% if dataset == 'models' %} + {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} + {% else %} + {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} + {% endif %} + + {# Loop through each chunk in turn #} + {% for i in range(0, objects | length, upload_limit) -%} + + {# Get just the objects to load on this loop #} + {% set content = dbt_artifacts.get_table_content_values(dataset, objects[i: i + upload_limit]) %} + + {# Insert the content into the metadata table #} + {{ dbt_artifacts.insert_into_metadata_table( + dataset=dataset, + fields=dbt_artifacts.get_column_name_list(dataset), + content=content + ) + }} + + {# Loop the next 'chunk' #} + {% endfor %} + + {# Loop the next 'dataset' #} + {% endfor %} + + {% endif %} + +{%- endmacro %} diff --git a/models/dim_dbt__current_models.sql b/models/dim_dbt__current_models.sql index c78a063b..4d7a3c46 100644 --- a/models/dim_dbt__current_models.sql +++ b/models/dim_dbt__current_models.sql @@ -27,10 +27,16 @@ latest_models_runs as ( {% if target.type == 'bigquery' %} , model_executions.bytes_processed {% endif %} + /* Row number by refresh and node ID */ , row_number() over ( partition by latest_models.node_id, model_executions.was_full_refresh order by model_executions.query_completed_at desc /* most recent ranked first */ ) as run_idx + /* Row number by node ID */ + , row_number() over ( + partition by latest_models.node_id + order by model_executions.query_completed_at desc /* most recent ranked first */ + ) as run_idx_id_only from model_executions inner join latest_models on model_executions.node_id = latest_models.node_id where model_executions.status = 'success' @@ -45,11 +51,17 @@ latest_model_stats as ( {% if target.type == 'bigquery' %} , max(case when was_full_refresh then bytes_processed end) as last_full_refresh_run_bytes_processed {% endif %} - , max(query_completed_at) as last_run_completed_at - , max(total_node_runtime) as last_run_total_runtime - , max(rows_affected) as last_run_rows_affected + , max(case when run_idx_id_only = 1 then query_completed_at end) as last_run_completed_at + , max(case when run_idx_id_only = 1 then total_node_runtime end) as last_run_total_runtime + , max(case when run_idx_id_only = 1 then rows_affected end) as last_run_rows_affected + {% if target.type == 'bigquery' %} + , max(case when run_idx_id_only = 1 then bytes_processed end) as last_run_bytes_processed + {% endif %} + , max(case when not was_full_refresh then query_completed_at end) as last_incremental_run_completed_at + , max(case when not was_full_refresh then total_node_runtime end) as last_incremental_run_total_runtime + , max(case when not was_full_refresh then rows_affected end) as last_incremental_run_rows_affected {% if target.type == 'bigquery' %} - , max(bytes_processed) as last_run_bytes_processed + , max(case when not was_full_refresh then bytes_processed end) as last_incremental_run_bytes_processed {% endif %} from latest_models_runs where run_idx = 1 @@ -71,6 +83,12 @@ final as ( {% if target.type == 'bigquery' %} , latest_model_stats.last_run_bytes_processed {% endif %} + , latest_model_stats.last_incremental_run_completed_at + , latest_model_stats.last_incremental_run_total_runtime + , latest_model_stats.last_incremental_run_rows_affected + {% if target.type == 'bigquery' %} + , latest_model_stats.last_incremental_run_bytes_processed + {% endif %} from latest_models left join latest_model_stats on latest_models.node_id = latest_model_stats.node_id diff --git a/tox.ini b/tox.ini index f311d76d..542d6e21 100644 --- a/tox.ini +++ b/tox.ini @@ -36,7 +36,7 @@ rules = LT01,LT02,LT03,CP01,AL01,AL02,CP02,ST08,LT06,LT07,AM01,LT08,AL05,RF02,RF deps = sqlfluff-templater-dbt~=2.0.2 - dbt-snowflake~=1.3.0 + dbt-snowflake~=1.7.0 [sqlfluff:indentation] indent_unit = space @@ -114,13 +114,13 @@ commands = sqlfluff fix models --ignore parsing # Generate docs [testenv:generate_docs] -deps = dbt-snowflake~=1.5.0 +deps = dbt-snowflake~=1.7.0 commands = dbt docs generate --profiles-dir integration_test_project # Snowflake integration tests [testenv:integration_snowflake] changedir = integration_test_project -deps = dbt-snowflake~=1.4.0 +deps = dbt-snowflake~=1.7.0 commands = dbt clean dbt deps @@ -151,14 +151,30 @@ commands = dbt deps dbt build --target snowflake +[testenv:integration_snowflake_1_6_0] +changedir = integration_test_project +deps = dbt-snowflake~=1.6.0 +commands = + dbt clean + dbt deps + dbt build --target snowflake + +[testenv:integration_snowflake_1_7_0] +changedir = integration_test_project +deps = dbt-snowflake~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target snowflake + # Databricks integration tests [testenv:integration_databricks] changedir = integration_test_project -deps = dbt-databricks~=1.4.0 +deps = dbt-databricks~=1.7.0 commands = dbt clean dbt deps - dbt --debug build --target databricks + dbt build --target databricks [testenv:integration_databricks_1_3_0] changedir = integration_test_project @@ -184,10 +200,26 @@ commands = dbt deps dbt build --target databricks +[testenv:integration_databricks_1_6_0] +changedir = integration_test_project +deps = dbt-databricks~=1.6.0 +commands = + dbt clean + dbt deps + dbt build --target databricks + +[testenv:integration_databricks_1_7_0] +changedir = integration_test_project +deps = dbt-databricks~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target databricks + # Bigquery integration tests [testenv:integration_bigquery] changedir = integration_test_project -deps = dbt-bigquery~=1.4.0 +deps = dbt-bigquery~=1.7.0 commands = dbt clean dbt deps @@ -217,6 +249,22 @@ commands = dbt deps dbt build --target bigquery --vars '"my_var": "my value"' +[testenv:integration_bigquery_1_6_0] +changedir = integration_test_project +deps = dbt-bigquery~=1.6.0 +commands = + dbt clean + dbt deps + dbt build --target bigquery --vars '"my_var": "my value"' + +[testenv:integration_bigquery_1_7_0] +changedir = integration_test_project +deps = dbt-bigquery~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target bigquery --vars '"my_var": "my value"' + # Spark integration test (disabled) [testenv:integration_spark] changedir = integration_test_project @@ -226,3 +274,52 @@ commands = dbt deps dbt build --exclude snapshot --target spark +[testenv:integration_postgres] +changedir = integration_test_project +deps = dbt-postgres~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +[testenv:integration_postgres_1_3_0] +changedir = integration_test_project +deps = dbt-postgres~=1.3.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +[testenv:integration_postgres_1_4_0] +changedir = integration_test_project +deps = dbt-postgres~=1.4.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +[testenv:integration_postgres_1_5_0] +changedir = integration_test_project +deps = dbt-postgres~=1.5.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +[testenv:integration_postgres_1_6_0] +changedir = integration_test_project +deps = dbt-postgres~=1.6.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +[testenv:integration_postgres_1_7_0] +changedir = integration_test_project +deps = dbt-postgres~=1.7.0 +commands = + dbt clean + dbt deps + dbt build --target postgres + +