From 022cf0139a92bc227123c962f75a655a57a4efaf Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Mar 2022 21:00:47 +0700 Subject: [PATCH] init --- Makefile | 10 + README.md | 20 +- dbt_project.yml | 63 +++++ destination_catalog.json | 1 + destination_config.json | 1 + macros/cross_db_utils/array.sql | 166 +++++++++++++ macros/cross_db_utils/concat.sql | 23 ++ macros/cross_db_utils/current_timestamp.sql | 7 + macros/cross_db_utils/datatypes.sql | 181 ++++++++++++++ macros/cross_db_utils/except.sql | 7 + macros/cross_db_utils/hash.sql | 5 + macros/cross_db_utils/json_operations.sql | 226 ++++++++++++++++++ macros/cross_db_utils/quote.sql | 16 ++ macros/cross_db_utils/surrogate_key.sql | 25 ++ macros/cross_db_utils/type_conversions.sql | 67 ++++++ macros/get_custom_schema.sql | 4 + macros/incremental.sql | 51 ++++ macros/schema_tests/equal_rowcount.sql | 34 +++ macros/schema_tests/equality.sql | 107 +++++++++ macros/should_full_refresh.sql | 51 ++++ macros/star_intersect.sql | 46 ++++ .../selly_express/se_orders_ab1.sql | 35 +++ .../selly_express/se_orders_ab2.sql | 35 +++ .../selly_express/se_orders_ab3.sql | 35 +++ .../selly_express/se_orders.sql | 35 +++ models/generated/sources.yml | 9 + packages.yml | 5 + profiles.yml | 17 ++ 28 files changed, 1281 insertions(+), 1 deletion(-) create mode 100644 Makefile create mode 100755 dbt_project.yml create mode 100644 destination_catalog.json create mode 100644 destination_config.json create mode 100644 macros/cross_db_utils/array.sql create mode 100644 macros/cross_db_utils/concat.sql create mode 100644 macros/cross_db_utils/current_timestamp.sql create mode 100644 macros/cross_db_utils/datatypes.sql create mode 100644 macros/cross_db_utils/except.sql create mode 100644 macros/cross_db_utils/hash.sql create mode 100644 macros/cross_db_utils/json_operations.sql create mode 100644 macros/cross_db_utils/quote.sql create mode 100644 macros/cross_db_utils/surrogate_key.sql create mode 100644 macros/cross_db_utils/type_conversions.sql create mode 100644 macros/get_custom_schema.sql create mode 100644 macros/incremental.sql create mode 100644 macros/schema_tests/equal_rowcount.sql create mode 100644 macros/schema_tests/equality.sql create mode 100644 macros/should_full_refresh.sql create mode 100644 macros/star_intersect.sql create mode 100644 models/generated/airbyte_ctes/selly_express/se_orders_ab1.sql create mode 100644 models/generated/airbyte_ctes/selly_express/se_orders_ab2.sql create mode 100644 models/generated/airbyte_ctes/selly_express/se_orders_ab3.sql create mode 100644 models/generated/airbyte_tables/selly_express/se_orders.sql create mode 100644 models/generated/sources.yml create mode 100755 packages.yml create mode 100644 profiles.yml diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..119d82d --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +#!bin/bash + +push: + git add . && git commit -a -m "update" && git push + +run: + dbt deps --profiles-dir=. --project-dir=. + #dbt run --profiles-dir=. --project-dir=. --full-refresh + dbt run --profiles-dir=. --project-dir=. --full-refresh --select se_orders + diff --git a/README.md b/README.md index 18d00b7..0444b3b 100644 --- a/README.md +++ b/README.md @@ -1 +1,19 @@ -# dbt-selly-express \ No newline at end of file +## Installing dbt + +1. Activate your venv and run `pip3 install dbt` +1. Copy `airbyte-normalization/sample_files/profiles.yml` over to `~/.dbt/profiles.yml` +1. Edit to configure your profiles accordingly + +## Running dbt + +1. `cd airbyte-normalization` +1. You can now run dbt commands, to check the setup is fine: `dbt debug` +1. To build the dbt tables in your warehouse: `dbt run` + +## Running dbt from Airbyte generated config + +1. You can also change directory (`cd /tmp/dev_root/workspace/1/0/normalize` for example) to one of the workspace generated by Airbyte within one of the `normalize` folder. +1. You should find `profiles.yml` and a bunch of other dbt files/folders created there. +1. To check everything is setup properly: `dbt debug --profiles-dir=$(pwd) --project-dir=$(pwd)` +1. You can modify the `.sql` files and run `dbt run --profiles-dir=$(pwd) --project-dir=$(pwd)` too +1. You can inspect compiled dbt `.sql` files before they are run in the destination engine in `normalize/build/compiled` or `normalize/build/run` folders \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml new file mode 100755 index 0000000..4869f0a --- /dev/null +++ b/dbt_project.yml @@ -0,0 +1,63 @@ +# This file is necessary to install dbt-utils with dbt deps +# the content will be overwritten by the transform function + +# Name your package! Package names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: 'airbyte_utils' +version: '1.0' +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. Profiles contain +# database connection information, and should be configured in the ~/.dbt/profiles.yml file +profile: 'normalize' + +# These configurations specify where dbt should look for different types of files. +# The `source-paths` config, for example, states that source models can be found +# in the "models/" directory. You probably won't need to change these! +source-paths: ["models"] +docs-paths: ["docs"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +data-paths: ["data"] +macro-paths: ["macros"] + +target-path: "../build" # directory which will store compiled SQL files +log-path: "../logs" # directory which will store DBT logs +modules-path: "../dbt_modules" # directory which will store external DBT dependencies + +clean-targets: # directories to be removed by `dbt clean` + - "build" + - "dbt_modules" + +quoting: + database: true +# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785) +# all schemas should be unquoted + schema: false + identifier: true + +# You can define configurations for models in the `source-paths` directory here. +# Using these configurations, you can enable or disable models, change how they +# are materialized, and more! +models: + airbyte_utils: + +materialized: table + generated: + airbyte_ctes: + +tags: airbyte_internal_cte + +materialized: ephemeral + airbyte_incremental: + +tags: incremental_tables + +materialized: incremental + +on_schema_change: sync_all_columns + airbyte_tables: + +tags: normalized_tables + +materialized: table + airbyte_views: + +tags: airbyte_internal_views + +materialized: view + +dispatch: + - macro_namespace: dbt_utils + search_order: ['airbyte_utils', 'dbt_utils'] diff --git a/destination_catalog.json b/destination_catalog.json new file mode 100644 index 0000000..f02509f --- /dev/null +++ b/destination_catalog.json @@ -0,0 +1 @@ +{"streams":[{"stream":{"name":"se-orders","json_schema":{"type":"object","properties":{"to":{"type":"string"},"_id":{"type":"string"},"cod":{"type":"number"},"code":{"type":"string"},"from":{"type":"string"},"note":{"type":"string"},"value":{"type":"number"},"client":{"type":"string"},"status":{"type":"string"},"volume":{"type":"string"},"weight":{"type":"number"},"courier":{"type":"string"},"distance":{"type":"number"},"createdAt":{"type":"string"},"updatedAt":{"type":"string"},"updatedBy":{"type":"string"},"itemVolume":{"type":"number"},"searchString":{"type":"string"},"extraServices":{"type":"array"}}},"supported_sync_modes":["full_refresh","incremental"],"default_cursor_field":[],"source_defined_primary_key":[],"namespace":"selly-express"},"sync_mode":"full_refresh","cursor_field":[],"destination_sync_mode":"overwrite","primary_key":[]}]} \ No newline at end of file diff --git a/destination_config.json b/destination_config.json new file mode 100644 index 0000000..30d7904 --- /dev/null +++ b/destination_config.json @@ -0,0 +1 @@ +{"ssl":false,"host":"18.140.112.89","port":5432,"schema":"public","database":"mongo_etl","password":"bvxJaDGW2R55uyDXfODJ2a0Y","username":"selly","tunnel_method":{"tunnel_method":"NO_TUNNEL"}} \ No newline at end of file diff --git a/macros/cross_db_utils/array.sql b/macros/cross_db_utils/array.sql new file mode 100644 index 0000000..4bc642f --- /dev/null +++ b/macros/cross_db_utils/array.sql @@ -0,0 +1,166 @@ +{# + Adapter Macros for the following functions: + - Bigquery: unnest() -> https://cloud.google.com/bigquery/docs/reference/standard-sql/arrays#flattening-arrays-and-repeated-fields + - Snowflake: flatten() -> https://docs.snowflake.com/en/sql-reference/functions/flatten.html + - Redshift: -> https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ + - postgres: unnest() -> https://www.postgresqltutorial.com/postgresql-array/ + - MSSQL: openjson() –> https://docs.microsoft.com/en-us/sql/relational-databases/json/validate-query-and-change-json-data-with-built-in-functions-sql-server?view=sql-server-ver15 +#} + +{# cross_join_unnest ------------------------------------------------- #} + +{% macro cross_join_unnest(stream_name, array_col) -%} + {{ adapter.dispatch('cross_join_unnest')(stream_name, array_col) }} +{%- endmacro %} + +{% macro default__cross_join_unnest(stream_name, array_col) -%} + {% do exceptions.warn("Undefined macro cross_join_unnest for this destination engine") %} +{%- endmacro %} + +{% macro bigquery__cross_join_unnest(stream_name, array_col) -%} + cross join unnest({{ array_col }}) as {{ array_col }} +{%- endmacro %} + +{% macro oracle__cross_join_unnest(stream_name, array_col) -%} + {% do exceptions.warn("Normalization does not support unnesting for Oracle yet.") %} +{%- endmacro %} + +{% macro postgres__cross_join_unnest(stream_name, array_col) -%} + cross join jsonb_array_elements( + case jsonb_typeof({{ array_col }}) + when 'array' then {{ array_col }} + else '[]' end + ) as _airbyte_nested_data +{%- endmacro %} + +{% macro mysql__cross_join_unnest(stream_name, array_col) -%} + left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid +{%- endmacro %} + +{% macro redshift__cross_join_unnest(stream_name, array_col) -%} + left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid +{%- endmacro %} + +{% macro snowflake__cross_join_unnest(stream_name, array_col) -%} + cross join table(flatten({{ array_col }})) as {{ array_col }} +{%- endmacro %} + +{% macro sqlserver__cross_join_unnest(stream_name, array_col) -%} +{# https://docs.microsoft.com/en-us/sql/relational-databases/json/convert-json-data-to-rows-and-columns-with-openjson-sql-server?view=sql-server-ver15#option-1---openjson-with-the-default-output #} + CROSS APPLY ( + SELECT [value] = CASE + WHEN [type] = 4 THEN (SELECT [value] FROM OPENJSON([value])) + WHEN [type] = 5 THEN [value] + END + FROM OPENJSON({{ array_col }}) + ) AS {{ array_col }} +{%- endmacro %} + +{# unnested_column_value -- this macro is related to unnest_cte #} + +{% macro unnested_column_value(column_col) -%} + {{ adapter.dispatch('unnested_column_value')(column_col) }} +{%- endmacro %} + +{% macro default__unnested_column_value(column_col) -%} + {{ column_col }} +{%- endmacro %} + +{% macro postgres__unnested_column_value(column_col) -%} + _airbyte_nested_data +{%- endmacro %} + +{% macro snowflake__unnested_column_value(column_col) -%} + {{ column_col }}.value +{%- endmacro %} + +{% macro redshift__unnested_column_value(column_col) -%} + _airbyte_nested_data +{%- endmacro %} + +{% macro mysql__unnested_column_value(column_col) -%} + _airbyte_nested_data +{%- endmacro %} + +{% macro oracle__unnested_column_value(column_col) -%} + {{ column_col }} +{%- endmacro %} + +{% macro sqlserver__unnested_column_value(column_col) -%} + {# unnested array/sub_array will be located in `value` column afterwards, we need to address to it #} + {{ column_col }}.value +{%- endmacro %} + +{# unnest_cte ------------------------------------------------- #} + +{% macro unnest_cte(from_table, stream_name, column_col) -%} + {{ adapter.dispatch('unnest_cte')(from_table, stream_name, column_col) }} +{%- endmacro %} + +{% macro default__unnest_cte(from_table, stream_name, column_col) -%}{%- endmacro %} + +{# -- based on https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/ #} +{% macro redshift__unnest_cte(from_table, stream_name, column_col) -%} + {%- if not execute -%} + {{ return('') }} + {% endif %} + {%- call statement('max_json_array_length', fetch_result=True) -%} + with max_value as ( + select max(json_array_length({{ column_col }}, true)) as max_number_of_items + from {{ from_table }} + ) + select + case when max_number_of_items is not null and max_number_of_items > 1 + then max_number_of_items + else 1 end as max_number_of_items + from max_value + {%- endcall -%} + {%- set max_length = load_result('max_json_array_length') -%} +with numbers as ( + {{dbt_utils.generate_series(max_length["data"][0][0])}} +), +joined as ( + select + _airbyte_{{ stream_name }}_hashid as _airbyte_hashid, + json_extract_array_element_text({{ column_col }}, numbers.generated_number::int - 1, true) as _airbyte_nested_data + from {{ from_table }} + cross join numbers + -- only generate the number of records in the cross join that corresponds + -- to the number of items in {{ from_table }}.{{ column_col }} + where numbers.generated_number <= json_array_length({{ column_col }}, true) +) +{%- endmacro %} + +{% macro mysql__unnest_cte(from_table, stream_name, column_col) -%} + {%- if not execute -%} + {{ return('') }} + {% endif %} + + {%- call statement('max_json_array_length', fetch_result=True) -%} + with max_value as ( + select max(json_length({{ column_col }})) as max_number_of_items + from {{ from_table }} + ) + select + case when max_number_of_items is not null and max_number_of_items > 1 + then max_number_of_items + else 1 end as max_number_of_items + from max_value + {%- endcall -%} + + {%- set max_length = load_result('max_json_array_length') -%} + with numbers as ( + {{ dbt_utils.generate_series(max_length["data"][0][0]) }} + ), + joined as ( + select + _airbyte_{{ stream_name }}_hashid as _airbyte_hashid, + {# -- json_extract(column_col, '$[i][0]') as _airbyte_nested_data #} + json_extract({{ column_col }}, concat("$[", numbers.generated_number - 1, "][0]")) as _airbyte_nested_data + from {{ from_table }} + cross join numbers + -- only generate the number of records in the cross join that corresponds + -- to the number of items in {{ from_table }}.{{ column_col }} + where numbers.generated_number <= json_length({{ column_col }}) + ) +{%- endmacro %} diff --git a/macros/cross_db_utils/concat.sql b/macros/cross_db_utils/concat.sql new file mode 100644 index 0000000..536cebc --- /dev/null +++ b/macros/cross_db_utils/concat.sql @@ -0,0 +1,23 @@ +{# + concat in dbt 0.6.4 used to work fine for bigquery but the new implementaion in 0.7.3 is less scalable (can not handle too many columns) + Therefore, we revert the implementation here and add versions for missing destinations +#} + +{% macro concat(fields) -%} + {{ adapter.dispatch('concat')(fields) }} +{%- endmacro %} + +{% macro bigquery__concat(fields) -%} + {#-- concat() in SQL bigquery scales better with number of columns than using the '||' operator --#} + concat({{ fields|join(', ') }}) +{%- endmacro %} + +{% macro sqlserver__concat(fields) -%} + {#-- CONCAT() in SQL SERVER accepts from 2 to 254 arguments, we use batches for the main concat, to overcome the limit. --#} + {% set concat_chunks = [] %} + {% for chunk in fields|batch(253) -%} + {% set _ = concat_chunks.append( "concat(" ~ chunk|join(', ') ~ ",'')" ) %} + {% endfor %} + + concat({{ concat_chunks|join(', ') }}, '') +{%- endmacro %} diff --git a/macros/cross_db_utils/current_timestamp.sql b/macros/cross_db_utils/current_timestamp.sql new file mode 100644 index 0000000..a9df34c --- /dev/null +++ b/macros/cross_db_utils/current_timestamp.sql @@ -0,0 +1,7 @@ +{% macro mysql__current_timestamp() %} + CURRENT_TIMESTAMP +{% endmacro %} + +{% macro oracle__current_timestamp() %} + CURRENT_TIMESTAMP +{% endmacro %} diff --git a/macros/cross_db_utils/datatypes.sql b/macros/cross_db_utils/datatypes.sql new file mode 100644 index 0000000..080aea5 --- /dev/null +++ b/macros/cross_db_utils/datatypes.sql @@ -0,0 +1,181 @@ +{# json ------------------------------------------------- #} + +{%- macro type_json() -%} + {{ adapter.dispatch('type_json')() }} +{%- endmacro -%} + +{% macro default__type_json() %} + string +{% endmacro %} + +{%- macro redshift__type_json() -%} + varchar +{%- endmacro -%} + +{% macro postgres__type_json() %} + jsonb +{% endmacro %} + +{%- macro oracle__type_json() -%} + varchar2(4000) +{%- endmacro -%} + +{% macro snowflake__type_json() %} + variant +{% endmacro %} + +{%- macro mysql__type_json() -%} + json +{%- endmacro -%} + +{%- macro sqlserver__type_json() -%} + VARCHAR(max) +{%- endmacro -%} + +{% macro clickhouse__type_json() %} + String +{% endmacro %} + + +{# string ------------------------------------------------- #} + +{%- macro mysql__type_string() -%} + char +{%- endmacro -%} + +{%- macro oracle__type_string() -%} + varchar2(4000) +{%- endmacro -%} + +{% macro sqlserver__type_string() %} + VARCHAR(max) +{%- endmacro -%} + +{%- macro clickhouse__type_string() -%} + String +{%- endmacro -%} + + +{# float ------------------------------------------------- #} +{% macro mysql__type_float() %} + float +{% endmacro %} + +{% macro oracle__type_float() %} + float +{% endmacro %} + +{% macro clickhouse__type_float() %} + Float64 +{% endmacro %} + + +{# int ------------------------------------------------- #} +{% macro default__type_int() %} + signed +{% endmacro %} + +{% macro oracle__type_int() %} + int +{% endmacro %} + +{% macro clickhouse__type_int() %} + INT +{% endmacro %} + + +{# bigint ------------------------------------------------- #} +{% macro mysql__type_bigint() %} + signed +{% endmacro %} + +{% macro oracle__type_bigint() %} + numeric +{% endmacro %} + +{% macro clickhouse__type_bigint() %} + BIGINT +{% endmacro %} + + +{# numeric ------------------------------------------------- --#} +{% macro mysql__type_numeric() %} + float +{% endmacro %} + +{% macro clickhouse__type_numeric() %} + Float64 +{% endmacro %} + + +{# timestamp ------------------------------------------------- --#} +{% macro mysql__type_timestamp() %} + time +{% endmacro %} + +{%- macro sqlserver__type_timestamp() -%} + {#-- in TSQL timestamp is really datetime --#} + {#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#} + datetime +{%- endmacro -%} + +{% macro clickhouse__type_timestamp() %} + DateTime64 +{% endmacro %} + + +{# timestamp with time zone ------------------------------------------------- #} + +{%- macro type_timestamp_with_timezone() -%} + {{ adapter.dispatch('type_timestamp_with_timezone')() }} +{%- endmacro -%} + +{% macro default__type_timestamp_with_timezone() %} + timestamp with time zone +{% endmacro %} + +{% macro bigquery__type_timestamp_with_timezone() %} + timestamp +{% endmacro %} + +{#-- MySQL doesnt allow cast operation to work with TIMESTAMP so we have to use char --#} +{%- macro mysql__type_timestamp_with_timezone() -%} + char +{%- endmacro -%} + +{% macro oracle__type_timestamp_with_timezone() %} + varchar2(4000) +{% endmacro %} + +{%- macro sqlserver__type_timestamp_with_timezone() -%} + {#-- in TSQL timestamp is really datetime or datetime2 --#} + {#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#} + datetime +{%- endmacro -%} + +{% macro clickhouse__type_timestamp_with_timezone() %} + DateTime64 +{% endmacro %} + + +{# date ------------------------------------------------- #} + +{%- macro type_date() -%} + {{ adapter.dispatch('type_date')() }} +{%- endmacro -%} + +{% macro default__type_date() %} + date +{% endmacro %} + +{% macro oracle__type_date() %} + varchar2(4000) +{% endmacro %} + +{%- macro sqlserver__type_date() -%} + date +{%- endmacro -%} + +{% macro clickhouse__type_date() %} + Date +{% endmacro %} diff --git a/macros/cross_db_utils/except.sql b/macros/cross_db_utils/except.sql new file mode 100644 index 0000000..a0f0c15 --- /dev/null +++ b/macros/cross_db_utils/except.sql @@ -0,0 +1,7 @@ +{% macro mysql__except() %} + {% do exceptions.warn("MySQL does not support EXCEPT operator") %} +{% endmacro %} + +{% macro oracle__except() %} + minus +{% endmacro %} diff --git a/macros/cross_db_utils/hash.sql b/macros/cross_db_utils/hash.sql new file mode 100644 index 0000000..1848887 --- /dev/null +++ b/macros/cross_db_utils/hash.sql @@ -0,0 +1,5 @@ +{# converting hash in varchar _macro #} + +{% macro sqlserver__hash(field) -%} + convert(varchar(32), HashBytes('md5', coalesce(cast({{field}} as {{dbt_utils.type_string()}}), '')), 2) +{%- endmacro %} \ No newline at end of file diff --git a/macros/cross_db_utils/json_operations.sql b/macros/cross_db_utils/json_operations.sql new file mode 100644 index 0000000..619eaf4 --- /dev/null +++ b/macros/cross_db_utils/json_operations.sql @@ -0,0 +1,226 @@ +{# + Adapter Macros for the following functions: + - Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions + - Snowflake: JSON_EXTRACT_PATH_TEXT( , '' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html + - Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html + - Postgres: json_extract_path_text(, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html + - MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html + - ClickHouse: JSONExtractString(json_doc, 'path' [, 'path'] ...) -> https://clickhouse.com/docs/en/sql-reference/functions/json-functions/ +#} + +{# format_json_path -------------------------------------------------- #} +{% macro format_json_path(json_path_list) -%} + {{ adapter.dispatch('format_json_path')(json_path_list) }} +{%- endmacro %} + +{% macro default__format_json_path(json_path_list) -%} + {{ '.' ~ json_path_list|join('.') }} +{%- endmacro %} + +{% macro oracle__format_json_path(json_path_list) -%} + {{ '\'$."' ~ json_path_list|join('."') ~ '"\'' }} +{%- endmacro %} + +{% macro bigquery__format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace('"', '\\"')) -%} {%- endif -%} + {%- endfor -%} + {{ '"$[\'' ~ str_list|join('\'][\'') ~ '\']"' }} +{%- endmacro %} + +{% macro postgres__format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace("'", "''")) -%} {%- endif -%} + {%- endfor -%} + {{ "'" ~ str_list|join("','") ~ "'" }} +{%- endmacro %} + +{% macro mysql__format_json_path(json_path_list) -%} + {# -- '$."x"."y"."z"' #} + {{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }} +{%- endmacro %} + +{% macro redshift__format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace("'", "''")) -%} {%- endif -%} + {%- endfor -%} + {{ "'" ~ str_list|join("','") ~ "'" }} +{%- endmacro %} + +{% macro snowflake__format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace("'", "''").replace('"', '""')) -%} {%- endif -%} + {%- endfor -%} + {{ "'\"" ~ str_list|join('"."') ~ "\"'" }} +{%- endmacro %} + +{% macro sqlserver__format_json_path(json_path_list) -%} + {# -- '$."x"."y"."z"' #} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace("'", "''").replace('"', '\\"')) -%} {%- endif -%} + {%- endfor -%} + {{ "'$.\"" ~ str_list|join(".") ~ "\"'" }} +{%- endmacro %} + +{% macro clickhouse__format_json_path(json_path_list) -%} + {%- set str_list = [] -%} + {%- for json_path in json_path_list -%} + {%- if str_list.append(json_path.replace("'", "''").replace('"', '\\"')) -%} {%- endif -%} + {%- endfor -%} + {{ "'" ~ str_list|join("','") ~ "'" }} +{%- endmacro %} + +{# json_extract ------------------------------------------------- #} + +{% macro json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {{ adapter.dispatch('json_extract')(from_table, json_column, json_path_list, normalized_json_path) }} +{%- endmacro %} + +{% macro default__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro oracle__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + json_value({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro bigquery__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% else %} + json_extract({{ from_table}}.{{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% endif -%} +{%- endmacro %} + +{% macro postgres__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }}) + {% else %} + jsonb_extract_path({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}) + {% endif -%} +{%- endmacro %} + +{% macro mysql__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% else %} + json_extract({{ from_table }}.{{ json_column }}, {{ format_json_path(normalized_json_path) }}) + {% endif -%} +{%- endmacro %} + +{% macro redshift__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end + {% else %} + case when json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}, true) end + {% endif -%} +{%- endmacro %} + +{% macro snowflake__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }}) + {% else %} + get_path(parse_json({{ from_table }}.{{ json_column }}), {{ format_json_path(json_path_list) }}) + {% endif -%} +{%- endmacro %} + +{% macro sqlserver__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + json_query({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro clickhouse__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%} + {%- if from_table|string() == '' %} + JSONExtractRaw({{ json_column }}, {{ format_json_path(json_path_list) }}) + {% else %} + JSONExtractRaw({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }}) + {% endif -%} +{%- endmacro %} + +{# json_extract_scalar ------------------------------------------------- #} + +{% macro json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + {{ adapter.dispatch('json_extract_scalar')(json_column, json_path_list, normalized_json_path) }} +{%- endmacro %} + +{% macro default__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_extract_scalar({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro oracle__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_value({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro bigquery__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_extract_scalar({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro postgres__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro mysql__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_value({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro redshift__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end +{%- endmacro %} + +{% macro snowflake__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + to_varchar(get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }})) +{%- endmacro %} + +{% macro sqlserver__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + json_value({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro clickhouse__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%} + JSONExtractRaw({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{# json_extract_array ------------------------------------------------- #} + +{% macro json_extract_array(json_column, json_path_list, normalized_json_path) -%} + {{ adapter.dispatch('json_extract_array')(json_column, json_path_list, normalized_json_path) }} +{%- endmacro %} + +{% macro default__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro oracle__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_value({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro bigquery__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_array({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro postgres__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro mysql__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_extract({{ json_column }}, {{ format_json_path(normalized_json_path) }}) +{%- endmacro %} + +{% macro redshift__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) +{%- endmacro %} + +{% macro snowflake__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + get_path(parse_json({{ json_column }}), {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro sqlserver__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + json_query({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} + +{% macro clickhouse__json_extract_array(json_column, json_path_list, normalized_json_path) -%} + JSONExtractArrayRaw({{ json_column }}, {{ format_json_path(json_path_list) }}) +{%- endmacro %} diff --git a/macros/cross_db_utils/quote.sql b/macros/cross_db_utils/quote.sql new file mode 100644 index 0000000..8786249 --- /dev/null +++ b/macros/cross_db_utils/quote.sql @@ -0,0 +1,16 @@ +{# quote ---------------------------------- #} +{% macro quote(column_name) -%} + {{ adapter.dispatch('quote')(column_name) }} +{%- endmacro %} + +{% macro default__quote(column_name) -%} + adapter.quote(column_name) +{%- endmacro %} + +{% macro oracle__quote(column_name) -%} + {{ '\"' ~ column_name ~ '\"'}} +{%- endmacro %} + +{% macro clickhouse__quote(column_name) -%} + {{ '\"' ~ column_name ~ '\"'}} +{%- endmacro %} diff --git a/macros/cross_db_utils/surrogate_key.sql b/macros/cross_db_utils/surrogate_key.sql new file mode 100644 index 0000000..9de2965 --- /dev/null +++ b/macros/cross_db_utils/surrogate_key.sql @@ -0,0 +1,25 @@ +{# surrogate_key ---------------------------------- #} + +{% macro oracle__surrogate_key(field_list) -%} + ora_hash( + {%- for field in field_list %} + {% if not loop.last %} + {{ field }} || '~' || + {% else %} + {{ field }} + {% endif %} + {%- endfor %} + ) +{%- endmacro %} + +{% macro clickhouse__surrogate_key(field_list) -%} + assumeNotNull(hex(MD5( + {%- for field in field_list %} + {% if not loop.last %} + toString({{ field }}) || '~' || + {% else %} + toString({{ field }}) + {% endif %} + {%- endfor %} + ))) +{%- endmacro %} diff --git a/macros/cross_db_utils/type_conversions.sql b/macros/cross_db_utils/type_conversions.sql new file mode 100644 index 0000000..89dd68b --- /dev/null +++ b/macros/cross_db_utils/type_conversions.sql @@ -0,0 +1,67 @@ + +{# boolean_to_string ------------------------------------------------- #} +{% macro boolean_to_string(boolean_column) -%} + {{ adapter.dispatch('boolean_to_string')(boolean_column) }} +{%- endmacro %} + +{% macro default__boolean_to_string(boolean_column) -%} + {{ boolean_column }} +{%- endmacro %} + +{% macro redshift__boolean_to_string(boolean_column) -%} + case when {{ boolean_column }} then 'true' else 'false' end +{%- endmacro %} + +{# array_to_string ------------------------------------------------- #} +{% macro array_to_string(array_column) -%} + {{ adapter.dispatch('array_to_string')(array_column) }} +{%- endmacro %} + +{% macro default__array_to_string(array_column) -%} + {{ array_column }} +{%- endmacro %} + +{% macro bigquery__array_to_string(array_column) -%} + array_to_string({{ array_column }}, "|", "") +{%- endmacro %} + +{% macro oracle__array_to_string(array_column) -%} + cast({{ array_column }} as varchar2(4000)) +{%- endmacro %} + +{% macro sqlserver__array_to_string(array_column) -%} + cast({{ array_column }} as {{dbt_utils.type_string()}}) +{%- endmacro %} + +{# cast_to_boolean ------------------------------------------------- #} +{% macro cast_to_boolean(field) -%} + {{ adapter.dispatch('cast_to_boolean')(field) }} +{%- endmacro %} + +{% macro default__cast_to_boolean(field) -%} + cast({{ field }} as boolean) +{%- endmacro %} + +{# -- MySQL does not support cast function converting string directly to boolean (an alias of tinyint(1), https://dev.mysql.com/doc/refman/8.0/en/cast-functions.html#function_cast #} +{% macro mysql__cast_to_boolean(field) -%} + IF(lower({{ field }}) = 'true', true, false) +{%- endmacro %} + +{# -- Redshift does not support converting string directly to boolean, it must go through int first #} +{% macro redshift__cast_to_boolean(field) -%} + cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean) +{%- endmacro %} + +{# -- MS SQL Server does not support converting string directly to boolean, it must be casted as bit #} +{% macro sqlserver__cast_to_boolean(field) -%} + cast({{ field }} as bit) +{%- endmacro %} + +{# empty_string_to_null ------------------------------------------------- #} +{% macro empty_string_to_null(field) -%} + {{ return(adapter.dispatch('empty_string_to_null')(field)) }} +{%- endmacro %} + +{%- macro default__empty_string_to_null(field) -%} + nullif({{ field }}, '') +{%- endmacro %} diff --git a/macros/get_custom_schema.sql b/macros/get_custom_schema.sql new file mode 100644 index 0000000..77e83c7 --- /dev/null +++ b/macros/get_custom_schema.sql @@ -0,0 +1,4 @@ +-- see https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-custom-schemas/#an-alternative-pattern-for-generating-schema-names +{% macro generate_schema_name(custom_schema_name, node) -%} + {{ generate_schema_name_for_env(custom_schema_name, node) }} +{%- endmacro %} diff --git a/macros/incremental.sql b/macros/incremental.sql new file mode 100644 index 0000000..f70b479 --- /dev/null +++ b/macros/incremental.sql @@ -0,0 +1,51 @@ +{# + These macros control how incremental models are updated in Airbyte's normalization step + - get_max_normalized_cursor retrieve the value of the last normalized data + - incremental_clause controls the predicate to filter on new data to process incrementally +#} + +{% macro incremental_clause(col_emitted_at) -%} + {{ adapter.dispatch('incremental_clause')(col_emitted_at) }} +{%- endmacro %} + +{%- macro default__incremental_clause(col_emitted_at) -%} +{% if is_incremental() %} +and coalesce( + cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}), + {# -- if {{ col_emitted_at }} is NULL in either table, the previous comparison would evaluate to NULL, #} + {# -- so we coalesce and make sure the row is always returned for incremental processing instead #} + true) +{% endif %} +{%- endmacro -%} + +{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #} +{%- macro snowflake__incremental_clause(col_emitted_at) -%} +{% if is_incremental() %} + {% if get_max_normalized_cursor(col_emitted_at) %} +and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= + cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }}) + {% endif %} +{% endif %} +{%- endmacro -%} + +{%- macro sqlserver__incremental_clause(col_emitted_at) -%} +{% if is_incremental() %} +and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) is null + or cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= + (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }})) +{% endif %} +{%- endmacro -%} + +{% macro get_max_normalized_cursor(col_emitted_at) %} +{% if execute and is_incremental() %} + {% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %} + {% set query %} + select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }} + {% endset %} + {% set max_cursor = run_query(query).columns[0][0] %} + {% do return(max_cursor) %} + {% else %} + {% do return(env_var('INCREMENTAL_CURSOR')) %} + {% endif %} +{% endif %} +{% endmacro %} diff --git a/macros/schema_tests/equal_rowcount.sql b/macros/schema_tests/equal_rowcount.sql new file mode 100644 index 0000000..0dd4dc6 --- /dev/null +++ b/macros/schema_tests/equal_rowcount.sql @@ -0,0 +1,34 @@ +{% macro oracle__test_equal_rowcount(model, compare_model) %} + +{#-- Needs to be set at parse time, before we return '' below --#} +{{ config(fail_calc = 'coalesce(diff_count, 0)') }} + +{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #} +{%- if not execute -%} + {{ return('') }} +{% endif %} + +with a as ( + + select count(*) as count_a from {{ model }} + +), +b as ( + + select count(*) as count_b from {{ compare_model }} + +), +final as ( + + select + count_a, + count_b, + abs(count_a - count_b) as diff_count + from a + cross join b + +) + +select diff_count from final + +{% endmacro %} diff --git a/macros/schema_tests/equality.sql b/macros/schema_tests/equality.sql new file mode 100644 index 0000000..ef83a02 --- /dev/null +++ b/macros/schema_tests/equality.sql @@ -0,0 +1,107 @@ +{# +-- Adapted from https://github.com/dbt-labs/dbt-utils/blob/0-19-0-updates/macros/schema_tests/equality.sql +-- dbt-utils version: 0.6.4 +-- This macro needs to be updated accordingly when dbt-utils is upgraded. +-- This is needed because MySQL does not support the EXCEPT operator! +#} + +{% macro mysql__test_equality(model, compare_model, compare_columns=None) %} + + {%- if not execute -%} + {{ return('') }} + {% endif %} + + {%- do dbt_utils._is_relation(model, 'test_equality') -%} + + {%- if not compare_columns -%} + {%- do dbt_utils._is_ephemeral(model, 'test_equality') -%} + {%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%} + {%- endif -%} + + {% set compare_cols_csv = compare_columns | join(', ') %} + + with a as ( + select * from {{ model }} + ), + + b as ( + select * from {{ compare_model }} + ), + + a_minus_b as ( + select {{ compare_cols_csv }} from a + where ({{ compare_cols_csv }}) not in + (select {{ compare_cols_csv }} from b) + ), + + b_minus_a as ( + select {{ compare_cols_csv }} from b + where ({{ compare_cols_csv }}) not in + (select {{ compare_cols_csv }} from a) + ), + + unioned as ( + select * from a_minus_b + union all + select * from b_minus_a + ), + + final as ( + select (select count(*) from unioned) + + (select abs( + (select count(*) from a_minus_b) - + (select count(*) from b_minus_a) + )) + as count + ) + + select count from final + +{% endmacro %} + +{% macro oracle__test_equality(model) %} + {#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #} + {%- if not execute -%} + {{ return('') }} + {% endif %} + + -- setup + {%- do dbt_utils._is_relation(model, 'test_equality') -%} + + {#- + If the compare_cols arg is provided, we can run this test without querying the + information schema — this allows the model to be an ephemeral model + -#} + {%- set compare_columns = kwargs.get('compare_columns', None) -%} + + {%- if not compare_columns -%} + {%- do dbt_utils._is_ephemeral(model, 'test_equality') -%} + {%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%} + {%- endif -%} + + {% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %} + {% set compare_cols_csv = compare_columns | join(', ') %} + + with a as ( + select * from {{ model }} + ), + b as ( + select * from {{ compare_model }} + ), + a_minus_b as ( + select {{compare_cols_csv}} from a + {{ dbt_utils.except() }} + select {{compare_cols_csv}} from b + ), + b_minus_a as ( + select {{compare_cols_csv}} from b + {{ dbt_utils.except() }} + select {{compare_cols_csv}} from a + ), + unioned as ( + select * from a_minus_b + union all + select * from b_minus_a + ) + select count(*) from unioned +{% endmacro %} diff --git a/macros/should_full_refresh.sql b/macros/should_full_refresh.sql new file mode 100644 index 0000000..ff2c6d5 --- /dev/null +++ b/macros/should_full_refresh.sql @@ -0,0 +1,51 @@ +{# + This overrides the behavior of the macro `should_full_refresh` so full refresh are triggered if: + - the dbt cli is run with --full-refresh flag or the model is configured explicitly to full_refresh + - the column _airbyte_ab_id does not exists in the normalized tables and make sure it is well populated. +#} + +{%- macro need_full_refresh(col_ab_id, target_table=this) -%} + {%- if not execute -%} + {{ return(false) }} + {%- endif -%} + {%- set found_column = [] %} + {%- set cols = adapter.get_columns_in_relation(target_table) -%} + {%- for col in cols -%} + {%- if col.column == col_ab_id -%} + {% do found_column.append(col.column) %} + {%- endif -%} + {%- endfor -%} + {%- if found_column -%} + {{ return(false) }} + {%- else -%} + {{ dbt_utils.log_info(target_table ~ "." ~ col_ab_id ~ " does not exist yet. The table will be created or rebuilt with dbt.full_refresh") }} + {{ return(true) }} + {%- endif -%} +{%- endmacro -%} + +{%- macro should_full_refresh() -%} + {% set config_full_refresh = config.get('full_refresh') %} + {%- if config_full_refresh is none -%} + {% set config_full_refresh = flags.FULL_REFRESH %} + {%- endif -%} + {%- if not config_full_refresh -%} + {% set config_full_refresh = need_full_refresh(get_col_ab_id(), this) %} + {%- endif -%} + {% do return(config_full_refresh) %} +{%- endmacro -%} + +{%- macro get_col_ab_id() -%} + {{ adapter.dispatch('get_col_ab_id')() }} +{%- endmacro -%} + +{%- macro default__get_col_ab_id() -%} + _airbyte_ab_id +{%- endmacro -%} + +{%- macro oracle__get_col_ab_id() -%} + "_AIRBYTE_AB_ID" +{%- endmacro -%} + +{%- macro snowflake__get_col_ab_id() -%} + _AIRBYTE_AB_ID +{%- endmacro -%} diff --git a/macros/star_intersect.sql b/macros/star_intersect.sql new file mode 100644 index 0000000..3f3d06c --- /dev/null +++ b/macros/star_intersect.sql @@ -0,0 +1,46 @@ +{# + Similar to the star macro here: https://github.com/dbt-labs/dbt-utils/blob/main/macros/sql/star.sql + + This star_intersect macro takes an additional 'intersect' relation as argument. + Its behavior is to select columns from both 'intersect' and 'from' relations with the following rules: + - if the columns are existing in both 'from' and the 'intersect' relations, then the column from 'intersect' is used + - if it's not in the both relation, then only the column in the 'from' relation is used +#} +{% macro star_intersect(from, intersect, from_alias=False, intersect_alias=False, except=[]) -%} + {%- do dbt_utils._is_relation(from, 'star_intersect') -%} + {%- do dbt_utils._is_ephemeral(from, 'star_intersect') -%} + {%- do dbt_utils._is_relation(intersect, 'star_intersect') -%} + {%- do dbt_utils._is_ephemeral(intersect, 'star_intersect') -%} + + {#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #} + {%- if not execute -%} + {{ return('') }} + {% endif %} + + {%- set include_cols = [] %} + {%- set cols = adapter.get_columns_in_relation(from) -%} + {%- set except = except | map("lower") | list %} + {%- for col in cols -%} + {%- if col.column|lower not in except -%} + {% do include_cols.append(col.column) %} + {%- endif %} + {%- endfor %} + + {%- set include_intersect_cols = [] %} + {%- set intersect_cols = adapter.get_columns_in_relation(intersect) -%} + {%- for col in intersect_cols -%} + {%- if col.column|lower not in except -%} + {% do include_intersect_cols.append(col.column) %} + {%- endif %} + {%- endfor %} + + {%- for col in include_cols %} + {%- if col in include_intersect_cols -%} + {%- if intersect_alias %}{{ intersect_alias }}.{% else %}{%- endif -%}{{ adapter.quote(col)|trim }} + {%- if not loop.last %},{{ '\n ' }}{% endif %} + {%- else %} + {%- if from_alias %}{{ from_alias }}.{% else %}{{ from }}.{%- endif -%}{{ adapter.quote(col)|trim }} as {{ adapter.quote(col)|trim }} + {%- if not loop.last %},{{ '\n ' }}{% endif %} + {%- endif %} + {%- endfor -%} +{%- endmacro %} diff --git a/models/generated/airbyte_ctes/selly_express/se_orders_ab1.sql b/models/generated/airbyte_ctes/selly_express/se_orders_ab1.sql new file mode 100644 index 0000000..f1988d1 --- /dev/null +++ b/models/generated/airbyte_ctes/selly_express/se_orders_ab1.sql @@ -0,0 +1,35 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_selly_express", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema +-- depends_on: {{ source('selly_express', '_airbyte_raw_se_orders') }} +select + {{ json_extract_scalar('_airbyte_data', ['to'], ['to']) }} as {{ adapter.quote('to') }}, + {{ json_extract_scalar('_airbyte_data', ['_id'], ['_id']) }} as _id, + {{ json_extract_scalar('_airbyte_data', ['cod'], ['cod']) }} as cod, + {{ json_extract_scalar('_airbyte_data', ['code'], ['code']) }} as code, + {{ json_extract_scalar('_airbyte_data', ['from'], ['from']) }} as {{ adapter.quote('from') }}, + {{ json_extract_scalar('_airbyte_data', ['note'], ['note']) }} as note, + {{ json_extract_scalar('_airbyte_data', ['value'], ['value']) }} as {{ adapter.quote('value') }}, + {{ json_extract_scalar('_airbyte_data', ['client'], ['client']) }} as client, + {{ json_extract_scalar('_airbyte_data', ['status'], ['status']) }} as status, + {{ json_extract_scalar('_airbyte_data', ['volume'], ['volume']) }} as volume, + {{ json_extract_scalar('_airbyte_data', ['weight'], ['weight']) }} as weight, + {{ json_extract_scalar('_airbyte_data', ['courier'], ['courier']) }} as courier, + {{ json_extract_scalar('_airbyte_data', ['distance'], ['distance']) }} as distance, + {{ json_extract_scalar('_airbyte_data', ['createdAt'], ['createdAt']) }} as createdat, + {{ json_extract_scalar('_airbyte_data', ['updatedAt'], ['updatedAt']) }} as updatedat, + {{ json_extract_scalar('_airbyte_data', ['updatedBy'], ['updatedBy']) }} as updatedby, + {{ json_extract_scalar('_airbyte_data', ['itemVolume'], ['itemVolume']) }} as itemvolume, + {{ json_extract_scalar('_airbyte_data', ['searchString'], ['searchString']) }} as searchstring, + {{ json_extract_array('_airbyte_data', ['extraServices'], ['extraServices']) }} as extraservices, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ source('selly_express', '_airbyte_raw_se_orders') }} as table_alias +-- se_orders +where 1 = 1 + diff --git a/models/generated/airbyte_ctes/selly_express/se_orders_ab2.sql b/models/generated/airbyte_ctes/selly_express/se_orders_ab2.sql new file mode 100644 index 0000000..90e9f35 --- /dev/null +++ b/models/generated/airbyte_ctes/selly_express/se_orders_ab2.sql @@ -0,0 +1,35 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_selly_express", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type +-- depends_on: {{ ref('se_orders_ab1') }} +select + cast({{ adapter.quote('to') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('to') }}, + cast(_id as {{ dbt_utils.type_string() }}) as _id, + cast(cod as {{ dbt_utils.type_float() }}) as cod, + cast(code as {{ dbt_utils.type_string() }}) as code, + cast({{ adapter.quote('from') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('from') }}, + cast(note as {{ dbt_utils.type_string() }}) as note, + cast({{ adapter.quote('value') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('value') }}, + cast(client as {{ dbt_utils.type_string() }}) as client, + cast(status as {{ dbt_utils.type_string() }}) as status, + cast(volume as {{ dbt_utils.type_string() }}) as volume, + cast(weight as {{ dbt_utils.type_float() }}) as weight, + cast(courier as {{ dbt_utils.type_string() }}) as courier, + cast(distance as {{ dbt_utils.type_float() }}) as distance, + cast(createdat as {{ dbt_utils.type_string() }}) as createdat, + cast(updatedat as {{ dbt_utils.type_string() }}) as updatedat, + cast(updatedby as {{ dbt_utils.type_string() }}) as updatedby, + cast(itemvolume as {{ dbt_utils.type_float() }}) as itemvolume, + cast(searchstring as {{ dbt_utils.type_string() }}) as searchstring, + extraservices, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at +from {{ ref('se_orders_ab1') }} +-- se_orders +where 1 = 1 + diff --git a/models/generated/airbyte_ctes/selly_express/se_orders_ab3.sql b/models/generated/airbyte_ctes/selly_express/se_orders_ab3.sql new file mode 100644 index 0000000..e926eec --- /dev/null +++ b/models/generated/airbyte_ctes/selly_express/se_orders_ab3.sql @@ -0,0 +1,35 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "_airbyte_selly_express", + tags = [ "top-level-intermediate" ] +) }} +-- SQL model to build a hash column based on the values of this record +-- depends_on: {{ ref('se_orders_ab2') }} +select + {{ dbt_utils.surrogate_key([ + adapter.quote('to'), + '_id', + 'cod', + 'code', + adapter.quote('from'), + 'note', + adapter.quote('value'), + 'client', + 'status', + 'volume', + 'weight', + 'courier', + 'distance', + 'createdat', + 'updatedat', + 'updatedby', + 'itemvolume', + 'searchstring', + array_to_string('extraservices'), + ]) }} as _airbyte_se_orders_hashid, + tmp.* +from {{ ref('se_orders_ab2') }} tmp +-- se_orders +where 1 = 1 + diff --git a/models/generated/airbyte_tables/selly_express/se_orders.sql b/models/generated/airbyte_tables/selly_express/se_orders.sql new file mode 100644 index 0000000..1893c76 --- /dev/null +++ b/models/generated/airbyte_tables/selly_express/se_orders.sql @@ -0,0 +1,35 @@ +{{ config( + indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}], + unique_key = '_airbyte_ab_id', + schema = "selly_express", + tags = [ "top-level" ] +) }} +-- Final base SQL model +-- depends_on: {{ ref('se_orders_ab3') }} +select + {{ adapter.quote('to') }}, + _id, + {{ adapter.quote('client') }}::json->>'code' AS client_code, + cast({{ adapter.quote('client') }}::json->'fee'->>'total' AS numeric) AS client_fee_total, + cast({{ adapter.quote('client') }}::json->'fee'->>'shipping' AS numeric) AS client_fee_shipping, + cast({{ adapter.quote('client') }}::json->'fee'->>'other' AS numeric) AS client_fee_other, + {{ adapter.quote('courier') }}::json->>'code' AS courier_code, + cast({{ adapter.quote('courier') }}::json->'fee'->>'total' AS numeric) AS courier_fee_total, + cast({{ adapter.quote('courier') }}::json->'fee'->>'shipping' AS numeric) AS courier_fee_shipping, + cast({{ adapter.quote('courier') }}::json->'fee'->>'other' AS numeric) AS courier_fee_other, + courier, + cast(weight AS numeric) AS weight, + volume, + code, + status, + cast({{ adapter.quote('value') }} AS numeric) AS value, + createdat::timestamp AS created_at, + updatedat::timestamp AS updated_at, + _airbyte_ab_id, + _airbyte_emitted_at, + {{ current_timestamp() }} as _airbyte_normalized_at, + _airbyte_se_orders_hashid +from {{ ref('se_orders_ab3') }} +-- se_orders from {{ source('selly_express', '_airbyte_raw_se_orders') }} +where 1 = 1 + diff --git a/models/generated/sources.yml b/models/generated/sources.yml new file mode 100644 index 0000000..ba17ed4 --- /dev/null +++ b/models/generated/sources.yml @@ -0,0 +1,9 @@ +version: 2 +sources: +- name: selly_express + quoting: + database: true + schema: false + identifier: false + tables: + - name: _airbyte_raw_se_orders diff --git a/packages.yml b/packages.yml new file mode 100755 index 0000000..4b74445 --- /dev/null +++ b/packages.yml @@ -0,0 +1,5 @@ +# add dependencies. these will get pulled during the `dbt deps` process. + +packages: + - git: "https://github.com/fishtown-analytics/dbt-utils.git" + revision: 0.7.4 diff --git a/profiles.yml b/profiles.yml new file mode 100644 index 0000000..56bc977 --- /dev/null +++ b/profiles.yml @@ -0,0 +1,17 @@ +config: + partial_parse: true + printer_width: 120 + send_anonymous_usage_stats: false + use_colors: true +normalize: + outputs: + prod: + dbname: mongo_etl + host: 18.140.112.89 + pass: bvxJaDGW2R55uyDXfODJ2a0Y + port: 5432 + schema: public + threads: 8 + type: postgres + user: selly + target: prod