Skip to content

Commit

Permalink
get constraints working
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Oct 12, 2023
1 parent 9cc5574 commit 117bdae
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
63 changes: 34 additions & 29 deletions dbt/include/databricks/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@

{% macro get_constraints_sql(relation, constraints, model, column) %}
{% set statements = [] %}
{% for constraint in constraints %}
-- Hack so that not null constraints will be applied before primary key constraints
{% for constraint in constraints|sort(attribute='type') %}
{% if constraint %}
{% set constraint_statements = get_constraint_sql(relation, constraint, model, column) %}
{% for statement in constraint_statements %}
Expand Down Expand Up @@ -289,40 +290,44 @@
{{ exceptions.warn("unenforced constraint type: " ~ constraint.type)}}
{% endif %}

{% set column_names = constraint.get("columns", []) %}
{% if column and not column_names %}
{% set column_names = [column['name']] %}
{% endif %}
{% set quoted_names = [] %}
{% for column_name in column_names %}
{% set column = model.get('columns', {}).get(column_name) %}
{% if not column %}
{{ exceptions.warn('Invalid foreign key column: ' ~ column_name) }}
{% else %}
{% set quoted_name = adapter.quote(column['name']) if column['quote'] else column['name'] %}
{% do quoted_names.append(quoted_name) %}
{% endif %}
{% endfor %}

{% set joined_names = quoted_names|join(", ") %}

{% set name = constraint.get("name") %}
{% if not name and local_md5 %}
{{ exceptions.warn("Constraint of type " ~ type ~ " with no `name` provided. Generating hash instead.") }}
{%- set name = local_md5("primary_key;" ~ column_names ~ ";") -%}
{% endif %}

{% set parent = constraint.get("parent") %}
{% if not parent %}
{{ exceptions.raise_compiler_error('No parent table defined for foreign key: ' ~ expression) }}
{% endif %}
{% if not "." in parent %}
{% set parent = relation.schema ~ "." ~ parent%}
{% endif %}
{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %}
{% set parent_columns = constraint.get("parent_columns") %}
{% if parent_columns %}
{% set stmt = stmt ~ "(" ~ parent_columns|join(", ") ~ ")"%}
{% if constraint.get('expression') %}
{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key" ~ constraint.get('expression') %}
{% else %}
{% set column_names = constraint.get("columns", []) %}
{% if column and not column_names %}
{% set column_names = [column['name']] %}
{% endif %}
{% set quoted_names = [] %}
{% for column_name in column_names %}
{% set column = model.get('columns', {}).get(column_name) %}
{% if not column %}
{{ exceptions.warn('Invalid foreign key column: ' ~ column_name) }}
{% else %}
{% set quoted_name = adapter.quote(column['name']) if column['quote'] else column['name'] %}
{% do quoted_names.append(quoted_name) %}
{% endif %}
{% endfor %}

{% set joined_names = quoted_names|join(", ") %}

{% set parent = constraint.get("parent") %}
{% if not parent %}
{{ exceptions.raise_compiler_error('No parent table defined for foreign key: ' ~ expression) }}
{% endif %}
{% if not "." in parent %}
{% set parent = relation.schema ~ "." ~ parent%}
{% endif %}
{% set stmt = "alter table " ~ relation ~ " add constraint " ~ name ~ " foreign key(" ~ joined_names ~ ") references " ~ parent %}
{% set parent_columns = constraint.get("parent_columns") %}
{% if parent_columns %}
{% set stmt = stmt ~ "(" ~ parent_columns|join(", ") ~ ")"%}
{% endif %}
{% endif %}
{% set stmt = stmt ~ ";" %}
{% do statements.append(stmt) %}
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="databricks_cluster", type=str)
parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str)


# Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type'
Expand Down
63 changes: 62 additions & 1 deletion tests/functional/adapter/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
my_model_incremental_wrong_order_sql,
my_model_incremental_wrong_name_sql,
my_incremental_model_sql,
incremental_foreign_key_model_raw_numbers_sql,
incremental_foreign_key_model_stg_numbers_sql,
)
from dbt.tests.util import (
run_dbt,
write_file,
read_file,
)


# constraints are enforced via 'alter' statements that run after table creation
_expected_sql_spark = """
Expand Down Expand Up @@ -208,3 +214,58 @@ def models(self):
"my_model.sql": my_incremental_model_sql,
"constraints_schema.yml": constraints_yml,
}


incremental_foreign_key_schema_yml = """
version: 2
models:
- name: raw_numbers
config:
contract:
enforced: true
materialized: table
columns:
- name: n
data_type: integer
constraints:
- type: primary_key
- type: not_null
- name: stg_numbers
config:
contract:
enforced: true
materialized: incremental
on_schema_change: append_new_columns
unique_key: n
columns:
- name: n
data_type: integer
constraints:
- type: foreign_key
name: fk_n
expression: (n) REFERENCES {schema}.raw_numbers
"""


@pytest.mark.skip_profile("databricks_cluster")
class TestDatabricksIncrementalForeignKeyConstraint:
@pytest.fixture(scope="class")
def models(self):
return {
"schema.yml": incremental_foreign_key_schema_yml,
"raw_numbers.sql": incremental_foreign_key_model_raw_numbers_sql,
"stg_numbers.sql": incremental_foreign_key_model_stg_numbers_sql,
}

def test_incremental_foreign_key_constraint(self, project):
unformatted_constraint_schema_yml = read_file("models", "schema.yml")
write_file(
unformatted_constraint_schema_yml.format(schema=project.test_schema),
"models",
"schema.yml",
)

run_dbt(["run", "--select", "raw_numbers"])
run_dbt(["run", "--select", "stg_numbers"])
run_dbt(["run", "--select", "stg_numbers"])

0 comments on commit 117bdae

Please sign in to comment.