This commit is contained in:
Ubuntu 2022-04-13 14:06:23 +07:00
parent 59ac317c58
commit 250f61873c
22 changed files with 380 additions and 28 deletions

View File

@ -6,5 +6,5 @@ push:
run:
dbt deps --profiles-dir=. --project-dir=.
#dbt run --profiles-dir=. --project-dir=. --full-refresh
dbt run --profiles-dir=. --project-dir=. --full-refresh --select order_items
dbt run --profiles-dir=. --project-dir=. --full-refresh --select tracking_product_shares

View File

@ -4,27 +4,27 @@
# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'airbyte_utils'
version: '1.0'
name: "airbyte_utils"
version: "1.0"
config-version: 2
# This setting configures which "profile" dbt uses for this project. Profiles contain
# database connection information, and should be configured in the ~/.dbt/profiles.yml file
profile: 'normalize'
profile: "normalize"
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
seed-paths: ["data"]
macro-paths: ["macros"]
target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
modules-path: "../dbt_modules" # directory which will store external DBT dependencies
packages-install-path: "../dbt" # directory which will store external DBT dependencies
clean-targets: # directories to be removed by `dbt clean`
- "build"
@ -32,12 +32,12 @@ clean-targets: # directories to be removed by `dbt clean`
quoting:
database: true
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
# Temporarily disabling the behavior of the ExtendedNameTransformer on table/schema names, see (issue #1785)
# all schemas should be unquoted
schema: false
identifier: true
# You can define configurations for models in the `source-paths` directory here.
# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
@ -60,4 +60,4 @@ models:
dispatch:
- macro_namespace: dbt_utils
search_order: ['airbyte_utils', 'dbt_utils']
search_order: ["airbyte_utils", "dbt_utils"]

File diff suppressed because one or more lines are too long

View File

@ -1 +1 @@
{"ssl":false,"host":"localhost","port":5555,"schema":"public","database":"selly_etl","password":"123","username":"selly","tunnel_method":{"tunnel_method":"NO_TUNNEL"}}
{"ssl":false,"host":"18.140.112.89","port":5432,"schema":"public","database":"mongo_etl","password":"bvxJaDGW2R55uyDXfODJ2a0Y","username":"selly","tunnel_method":{"tunnel_method":"NO_TUNNEL"}}

View File

@ -5,6 +5,7 @@
- Redshift: -> https://blog.getdbt.com/how-to-unnest-arrays-in-redshift/
- postgres: unnest() -> https://www.postgresqltutorial.com/postgresql-array/
- MSSQL: openjson() > https://docs.microsoft.com/en-us/sql/relational-databases/json/validate-query-and-change-json-data-with-built-in-functions-sql-server?view=sql-server-ver15
- ClickHouse: ARRAY JOIN > https://clickhouse.com/docs/zh/sql-reference/statements/select/array-join/
#}
{# cross_join_unnest ------------------------------------------------- #}
@ -21,6 +22,10 @@
cross join unnest({{ array_col }}) as {{ array_col }}
{%- endmacro %}
{% macro clickhouse__cross_join_unnest(stream_name, array_col) -%}
ARRAY JOIN {{ array_col }}
{%- endmacro %}
{% macro oracle__cross_join_unnest(stream_name, array_col) -%}
{% do exceptions.warn("Normalization does not support unnesting for Oracle yet.") %}
{%- endmacro %}

View File

@ -135,9 +135,9 @@
{% macro clickhouse__json_extract(from_table, json_column, json_path_list, normalized_json_path) -%}
{%- if from_table|string() == '' %}
JSONExtractRaw({{ json_column }}, {{ format_json_path(json_path_list) }})
JSONExtractRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }})
{% else %}
JSONExtractRaw({{ from_table }}.{{ json_column }}, {{ format_json_path(json_path_list) }})
JSONExtractRaw(assumeNotNull({{ from_table }}.{{ json_column }}), {{ format_json_path(json_path_list) }})
{% endif -%}
{%- endmacro %}
@ -180,7 +180,7 @@
{%- endmacro %}
{% macro clickhouse__json_extract_scalar(json_column, json_path_list, normalized_json_path) -%}
JSONExtractRaw({{ json_column }}, {{ format_json_path(json_path_list) }})
JSONExtractRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}
{# json_extract_array ------------------------------------------------- #}
@ -222,5 +222,5 @@
{%- endmacro %}
{% macro clickhouse__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
JSONExtractArrayRaw({{ json_column }}, {{ format_json_path(json_path_list) }})
JSONExtractArrayRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

View File

@ -57,6 +57,11 @@
cast({{ field }} as bit)
{%- endmacro %}
{# -- ClickHouse does not support converting string directly to Int8, it must go through int first #}
{% macro clickhouse__cast_to_boolean(field) -%}
IF(lower({{ field }}) = 'true', 1, 0)
{%- endmacro %}
{# empty_string_to_null ------------------------------------------------- #}
{% macro empty_string_to_null(field) -%}
{{ return(adapter.dispatch('empty_string_to_null')(field)) }}

View File

@ -0,0 +1,21 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('unibag', '_airbyte_raw_social_post_views') }}
select
{{ json_extract_scalar('_airbyte_data', ['_id'], ['_id']) }} as _id,
{{ json_extract_scalar('_airbyte_data', ['user'], ['user']) }} as {{ adapter.quote('user') }},
{{ json_extract_scalar('_airbyte_data', ['clientIP'], ['clientIP']) }} as clientip,
{{ json_extract_scalar('_airbyte_data', ['lastViewAt'], ['lastViewAt']) }} as lastviewat,
{{ json_extract_scalar('_airbyte_data', ['socialPost'], ['socialPost']) }} as socialpost,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('unibag', '_airbyte_raw_social_post_views') }} as table_alias
-- social_post_views
where 1 = 1

View File

@ -0,0 +1,21 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('social_post_views_ab1') }}
select
cast(_id as {{ dbt_utils.type_string() }}) as _id,
cast({{ adapter.quote('user') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('user') }},
cast(clientip as {{ dbt_utils.type_string() }}) as clientip,
cast(lastviewat as {{ dbt_utils.type_string() }}) as lastviewat,
cast(socialpost as {{ dbt_utils.type_string() }}) as socialpost,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ ref('social_post_views_ab1') }}
-- social_post_views
where 1 = 1

View File

@ -0,0 +1,21 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
-- depends_on: {{ ref('social_post_views_ab2') }}
select
{{ dbt_utils.surrogate_key([
'_id',
adapter.quote('user'),
'clientip',
'lastviewat',
'socialpost',
]) }} as _airbyte_social_post_views_hashid,
tmp.*
from {{ ref('social_post_views_ab2') }} tmp
-- social_post_views
where 1 = 1

View File

@ -0,0 +1,43 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('unibag', '_airbyte_raw_social_posts') }}
select
{{ json_extract_scalar('_airbyte_data', ['_id'], ['_id']) }} as _id,
{{ json_extract_array('_airbyte_data', ['tags'], ['tags']) }} as tags,
{{ json_extract_scalar('_airbyte_data', ['isPin'], ['isPin']) }} as ispin,
{{ json_extract_scalar('_airbyte_data', ['order'], ['order']) }} as {{ adapter.quote('order') }},
{{ json_extract_scalar('_airbyte_data', ['title'], ['title']) }} as title,
{{ json_extract_scalar('_airbyte_data', ['author'], ['author']) }} as author,
{{ json_extract_array('_airbyte_data', ['cities'], ['cities']) }} as cities,
{{ json_extract_array('_airbyte_data', ['photos'], ['photos']) }} as photos,
{{ json_extract_scalar('_airbyte_data', ['reason'], ['reason']) }} as reason,
{{ json_extract_scalar('_airbyte_data', ['status'], ['status']) }} as status,
{{ json_extract_array('_airbyte_data', ['videos'], ['videos']) }} as videos,
{{ json_extract_scalar('_airbyte_data', ['content'], ['content']) }} as {{ adapter.quote('content') }},
{{ json_extract_scalar('_airbyte_data', ['isTimer'], ['isTimer']) }} as istimer,
{{ json_extract_scalar('_airbyte_data', ['startAt'], ['startAt']) }} as startat,
{{ json_extract_array('_airbyte_data', ['products'], ['products']) }} as products,
{{ json_extract_scalar('_airbyte_data', ['createdAt'], ['createdAt']) }} as createdat,
{{ json_extract_scalar('_airbyte_data', ['hasUpdate'], ['hasUpdate']) }} as hasupdate,
{{ json_extract_scalar('_airbyte_data', ['statistic'], ['statistic']) }} as statistic,
{{ json_extract_scalar('_airbyte_data', ['updatedAt'], ['updatedAt']) }} as updatedat,
{{ json_extract_array('_airbyte_data', ['categories'], ['categories']) }} as categories,
{{ json_extract_scalar('_airbyte_data', ['fromSystem'], ['fromSystem']) }} as fromsystem,
{{ json_extract_scalar('_airbyte_data', ['contributor'], ['contributor']) }} as contributor,
{{ json_extract_scalar('_airbyte_data', ['publishedAt'], ['publishedAt']) }} as publishedat,
{{ json_extract_scalar('_airbyte_data', ['searchString'], ['searchString']) }} as searchstring,
{{ json_extract_scalar('_airbyte_data', ['updatedCount'], ['updatedCount']) }} as updatedcount,
{{ json_extract_scalar('_airbyte_data', ['shareStatistic'], ['shareStatistic']) }} as sharestatistic,
{{ json_extract_scalar('_airbyte_data', ['enableNotificationForContributor'], ['enableNotificationForContributor']) }} as enablenotificationforcontributor,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('unibag', '_airbyte_raw_social_posts') }} as table_alias
-- social_posts
where 1 = 1

View File

@ -0,0 +1,43 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('social_posts_ab1') }}
select
cast(_id as {{ dbt_utils.type_string() }}) as _id,
tags,
{{ cast_to_boolean('ispin') }} as ispin,
cast({{ adapter.quote('order') }} as {{ dbt_utils.type_float() }}) as {{ adapter.quote('order') }},
cast(title as {{ dbt_utils.type_string() }}) as title,
cast(author as {{ dbt_utils.type_string() }}) as author,
cities,
photos,
cast(reason as {{ dbt_utils.type_string() }}) as reason,
cast(status as {{ dbt_utils.type_string() }}) as status,
videos,
cast({{ adapter.quote('content') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('content') }},
{{ cast_to_boolean('istimer') }} as istimer,
cast(startat as {{ dbt_utils.type_string() }}) as startat,
products,
cast(createdat as {{ dbt_utils.type_string() }}) as createdat,
{{ cast_to_boolean('hasupdate') }} as hasupdate,
cast(statistic as {{ dbt_utils.type_string() }}) as statistic,
cast(updatedat as {{ dbt_utils.type_string() }}) as updatedat,
categories,
{{ cast_to_boolean('fromsystem') }} as fromsystem,
cast(contributor as {{ dbt_utils.type_string() }}) as contributor,
cast(publishedat as {{ dbt_utils.type_string() }}) as publishedat,
cast(searchstring as {{ dbt_utils.type_string() }}) as searchstring,
cast(updatedcount as {{ dbt_utils.type_float() }}) as updatedcount,
cast(sharestatistic as {{ dbt_utils.type_string() }}) as sharestatistic,
{{ cast_to_boolean('enablenotificationforcontributor') }} as enablenotificationforcontributor,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ ref('social_posts_ab1') }}
-- social_posts
where 1 = 1

View File

@ -0,0 +1,43 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
-- depends_on: {{ ref('social_posts_ab2') }}
select
{{ dbt_utils.surrogate_key([
'_id',
array_to_string('tags'),
boolean_to_string('ispin'),
adapter.quote('order'),
'title',
'author',
array_to_string('cities'),
array_to_string('photos'),
'reason',
'status',
array_to_string('videos'),
adapter.quote('content'),
boolean_to_string('istimer'),
'startat',
array_to_string('products'),
'createdat',
boolean_to_string('hasupdate'),
'statistic',
'updatedat',
array_to_string('categories'),
boolean_to_string('fromsystem'),
'contributor',
'publishedat',
'searchstring',
'updatedcount',
'sharestatistic',
boolean_to_string('enablenotificationforcontributor'),
]) }} as _airbyte_social_posts_hashid,
tmp.*
from {{ ref('social_posts_ab2') }} tmp
-- social_posts
where 1 = 1

View File

@ -0,0 +1,22 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('unibag', '_airbyte_raw_tracking_product_shares') }}
select
{{ json_extract_scalar('_airbyte_data', ['_id'], ['_id']) }} as _id,
{{ json_extract_scalar('_airbyte_data', ['user'], ['user']) }} as {{ adapter.quote('user') }},
{{ json_extract_scalar('_airbyte_data', ['source'], ['source']) }} as {{ adapter.quote('source') }},
{{ json_extract_scalar('_airbyte_data', ['options'], ['options']) }} as {{ adapter.quote('options') }},
{{ json_extract_scalar('_airbyte_data', ['products'], ['products']) }} as products,
{{ json_extract_scalar('_airbyte_data', ['createdAt'], ['createdAt']) }} as createdat,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('unibag', '_airbyte_raw_tracking_product_shares') }} as table_alias
-- tracking_product_shares
where 1 = 1

View File

@ -0,0 +1,22 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('tracking_product_shares_ab1') }}
select
cast(_id as {{ dbt_utils.type_string() }}) as _id,
cast({{ adapter.quote('user') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('user') }},
cast({{ adapter.quote('source') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('source') }},
cast({{ adapter.quote('options') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('options') }},
cast(products as {{ dbt_utils.type_string() }}) as products,
cast(createdat as {{ dbt_utils.type_string() }}) as createdat,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
from {{ ref('tracking_product_shares_ab1') }}
-- tracking_product_shares
where 1 = 1

View File

@ -0,0 +1,22 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "_airbyte_unibag",
tags = [ "top-level-intermediate" ]
) }}
-- SQL model to build a hash column based on the values of this record
-- depends_on: {{ ref('tracking_product_shares_ab2') }}
select
{{ dbt_utils.surrogate_key([
'_id',
adapter.quote('user'),
adapter.quote('source'),
adapter.quote('options'),
'products',
'createdat',
]) }} as _airbyte_tracking_product_shares_hashid,
tmp.*
from {{ ref('tracking_product_shares_ab2') }} tmp
-- tracking_product_shares
where 1 = 1

View File

@ -0,0 +1,20 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "unibag",
tags = [ "top-level" ]
) }}
-- Final base SQL model
-- depends_on: {{ ref('social_post_views_ab3') }}
select
_id,
{{ adapter.quote('user') }} AS seller_id,
socialpost AS social_post_id,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_social_post_views_hashid
from {{ ref('social_post_views_ab3') }}
-- social_post_views from {{ source('unibag', '_airbyte_raw_social_post_views') }}
where 1 = 1

View File

@ -0,0 +1,38 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "unibag",
tags = [ "top-level" ]
) }}
-- Final base SQL model
-- depends_on: {{ ref('social_posts_ab3') }}
select
_id,
title,
author AS author_id,
{{ adapter.quote('content') }},
status,
cast({{ adapter.quote('photos') }} AS jsonb) AS photos,
cast({{ adapter.quote('videos') }} AS jsonb) AS videos,
string_to_array(REPLACE(REPLACE(REPLACE(tags::text, '[', ''), ']', ''), '"', ''), ',') AS tags,
COALESCE(cast({{ adapter.quote('statistic') }}::json->>'views' AS integer), 0) AS statistic_views,
COALESCE(cast({{ adapter.quote('statistic') }}::json->>'uniqueViews' AS integer), 0) AS statistic_unique_views,
COALESCE(cast({{ adapter.quote('statistic') }}::json->>'likes' AS integer), 0) AS statistic_likes,
COALESCE(cast({{ adapter.quote('statistic') }}::json->>'shares' AS integer), 0) AS statistic_shares,
COALESCE(cast({{ adapter.quote('statistic') }}::json->>'comments' AS integer), 0) AS statistic_comments,
string_to_array(REPLACE(REPLACE(REPLACE(products::text, '[', ''), ']', ''), '"', ''), ',') AS products,
string_to_array(REPLACE(REPLACE(REPLACE(categories::text, '[', ''), ']', ''), '"', ''), ',') AS categories,
string_to_array(REPLACE(REPLACE(REPLACE(cities::text, '[', ''), ']', ''), '"', ''), ',') AS cities,
{{ adapter.quote('order') }},
contributor AS contributor_id,
createdat::timestamp AS created_at,
updatedat::timestamp AS updated_at,
publishedat::timestamp AS published_at,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_social_posts_hashid
from {{ ref('social_posts_ab3') }}
-- social_posts from {{ source('unibag', '_airbyte_raw_social_posts') }}
where 1 = 1

View File

@ -0,0 +1,23 @@
{{ config(
indexes = [{'columns':['_airbyte_emitted_at'],'type':'btree'}],
unique_key = '_airbyte_ab_id',
schema = "unibag",
tags = [ "top-level" ]
) }}
-- Final base SQL model
-- depends_on: {{ ref('tracking_product_shares_ab3') }}
select
_id,
{{ adapter.quote('source') }},
{{ adapter.quote('user') }} AS seller_id,
products AS product_id,
{{ adapter.quote('options') }}::json->>'action' AS options_action,
createdat::timestamp AS created_at,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
_airbyte_tracking_product_shares_hashid
from {{ ref('tracking_product_shares_ab3') }}
-- tracking_product_shares from {{ source('unibag', '_airbyte_raw_tracking_product_shares') }}
where 1 = 1

View File

@ -29,11 +29,14 @@ sources:
- name: _airbyte_raw_promotion_orders
- name: _airbyte_raw_promotions
- name: _airbyte_raw_referrals
- name: _airbyte_raw_social_post_views
- name: _airbyte_raw_social_posts
- name: _airbyte_raw_suppliers
- name: _airbyte_raw_team_activities
- name: _airbyte_raw_team_bonus
- name: _airbyte_raw_team_members
- name: _airbyte_raw_teams
- name: _airbyte_raw_tracking_product_shares
- name: _airbyte_raw_user_identifications
- name: _airbyte_raw_users
- name: _airbyte_raw_wards

View File

@ -2,4 +2,4 @@
packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.7.4
revision: 0.8.2

View File

@ -6,10 +6,10 @@ config:
normalize:
outputs:
prod:
dbname: selly_etl
host: localhost
pass: '123'
port: 5555
dbname: mongo_etl
host: 18.140.112.89
pass: bvxJaDGW2R55uyDXfODJ2a0Y
port: 5432
schema: public
threads: 8
type: postgres