52 lines
2.2 KiB
SQL
52 lines
2.2 KiB
SQL
{#
|
|
These macros control how incremental models are updated in Airbyte's normalization step
|
|
- get_max_normalized_cursor retrieve the value of the last normalized data
|
|
- incremental_clause controls the predicate to filter on new data to process incrementally
|
|
#}
|
|
|
|
{% macro incremental_clause(col_emitted_at) -%}
|
|
{{ adapter.dispatch('incremental_clause')(col_emitted_at) }}
|
|
{%- endmacro %}
|
|
|
|
{%- macro default__incremental_clause(col_emitted_at) -%}
|
|
{% if is_incremental() %}
|
|
and coalesce(
|
|
cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >= (select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}),
|
|
{# -- if {{ col_emitted_at }} is NULL in either table, the previous comparison would evaluate to NULL, #}
|
|
{# -- so we coalesce and make sure the row is always returned for incremental processing instead #}
|
|
true)
|
|
{% endif %}
|
|
{%- endmacro -%}
|
|
|
|
{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
|
|
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
|
|
{% if is_incremental() %}
|
|
{% if get_max_normalized_cursor(col_emitted_at) %}
|
|
and cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
|
|
cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
|
|
{% endif %}
|
|
{% endif %}
|
|
{%- endmacro -%}
|
|
|
|
{%- macro sqlserver__incremental_clause(col_emitted_at) -%}
|
|
{% if is_incremental() %}
|
|
and ((select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}) is null
|
|
or cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }}) >=
|
|
(select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}))
|
|
{% endif %}
|
|
{%- endmacro -%}
|
|
|
|
{% macro get_max_normalized_cursor(col_emitted_at) %}
|
|
{% if execute and is_incremental() %}
|
|
{% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %}
|
|
{% set query %}
|
|
select max(cast({{ col_emitted_at }} as {{ type_timestamp_with_timezone() }})) from {{ this }}
|
|
{% endset %}
|
|
{% set max_cursor = run_query(query).columns[0][0] %}
|
|
{% do return(max_cursor) %}
|
|
{% else %}
|
|
{% do return(env_var('INCREMENTAL_CURSOR')) %}
|
|
{% endif %}
|
|
{% endif %}
|
|
{% endmacro %}
|