diff --git a/.changes/unreleased/Features-20240422-081302.yaml b/.changes/unreleased/Features-20240422-081302.yaml new file mode 100644 index 00000000..c58e471e --- /dev/null +++ b/.changes/unreleased/Features-20240422-081302.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allows unique_key for snapshots to take a list +time: 2024-04-22T08:13:02.937534-04:00 +custom: + Author: agpapa + Issue: "181" diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 80dbd34b..ecd87345 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -6,6 +6,7 @@ Dict, FrozenSet, Iterator, + List, Optional, Set, Tuple, @@ -341,6 +342,16 @@ def create( ) return cls.from_dict(kwargs) + @classmethod + def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]: + scd_args = [] + if isinstance(primary_key, list): + scd_args.extend(primary_key) + else: + scd_args.append(primary_key) + scd_args.append(updated_at) + return scd_args + @property def can_be_renamed(self) -> bool: return self.type in self.renameable_relations diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 52fdb9bf..5c2bad99 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -49,9 +49,7 @@ snapshotted_data as ( - select *, - {{ strategy.unique_key }} as dbt_unique_key - + select *, {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} where {% if config.get('dbt_valid_to_current') %} @@ -65,9 +63,7 @@ insertions_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key, + select *, {{ unique_key_fields(strategy.unique_key) }}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }}, @@ -78,9 +74,7 @@ updates_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key, + select *, {{ unique_key_fields(strategy.unique_key) }}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} @@ -92,9 +86,7 @@ deletes_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key + select *, {{ unique_key_fields(strategy.unique_key) }} from snapshot_query ), {% endif %} @@ -106,13 +98,11 @@ source_data.* from insertions_source_data as source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }}) + ) ), @@ -125,7 +115,8 @@ snapshotted_data.{{ columns.dbt_scd_id }} from updates_source_data as source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where ( {{ strategy.row_changed }} ) @@ -145,8 +136,9 @@ snapshotted_data.{{ columns.dbt_scd_id }} from snapshotted_data - left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} ) {%- endif %} @@ -217,8 +209,51 @@ {% endif %} {% endmacro %} + {% macro get_dbt_valid_to_current(strategy, columns) %} {% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %} coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}}) as {{ columns.dbt_valid_to }} {% endmacro %} + + +{% macro unique_key_fields(unique_key) %} + {% if unique_key | is_list %} + {% for key in unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} + {{ unique_key }} as dbt_unique_key + {% endif %} +{% endmacro %} + + +{% macro unique_key_join_on(unique_key, identifier, from_identifier) %} + {% if strategy.unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + {% else %} + {{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is null + {% else %} + {{ identifer }}.dbt_unique_key is null + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_not_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is not null + {% else %} + {{ identifer }}.dbt_unique_key is not null + {% endif %} +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 5daead4c..0c9590b6 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -46,20 +46,22 @@ {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 8c086182..f9f5afbd 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -70,7 +70,8 @@ ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, @@ -166,7 +167,8 @@ ) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, diff --git a/pyproject.toml b/pyproject.toml index 52550fb5..7a8d1a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ - "dbt-common>=1.10,<2.0", + "dbt-common>=1.11,<2.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0",