diff --git a/CHANGELOG.md b/CHANGELOG.md index a2ec4f82f..d9dc0441d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## dbt-databricks 1.7.x (TBD) +### Features + +- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481)) + ## dbt-databricks 1.7.0rc1 (October 13, 2023) ### Fixes diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index e1d6e1973..88869fa1b 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -23,6 +23,7 @@ from dbt.adapters.base.impl import catch_as_completed from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation, InformationSchema +from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability from dbt.adapters.spark.impl import ( SparkAdapter, GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, @@ -107,6 +108,10 @@ class DatabricksAdapter(SparkAdapter): AdapterSpecificConfigs = DatabricksConfig + _capabilities = CapabilityDict( + {Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)} + ) + @available.parse(lambda *a, **k: 0) def compare_dbr_version(self, major: int, minor: int) -> int: """ diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index f26f43499..00d6720a4 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -3,7 +3,7 @@ from dbt.contracts.relation import ( ComponentName, ) -from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.adapters.base.relation import BaseRelation, Policy, InformationSchema from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS from dbt.dataclass_schema import StrEnum @@ -37,6 +37,16 @@ class DatabricksRelationType(StrEnum): StreamingTable = "streamingtable" +@dataclass(frozen=True, eq=False, repr=False) +class DatabricksInformationSchema(InformationSchema): + quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy()) + include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy()) + quote_character: str = "`" + + def is_hive_metastore(self) -> bool: + return self.database is None or self.database == "hive_metastore" + + @dataclass(frozen=True, eq=False, repr=False) class DatabricksRelation(BaseRelation): type: Optional[DatabricksRelationType] = None # type: ignore @@ -115,3 +125,13 @@ def matches( @classproperty def get_relation_type(cls) -> Type[DatabricksRelationType]: return DatabricksRelationType + + def information_schema(self, view_name: Optional[str] = None) -> InformationSchema: + # some of our data comes from jinja, where things can be `Undefined`. + if not isinstance(view_name, str): + view_name = None + + # Kick the user-supplied schema out of the information schema relation + # Instead address this as .information_schema by default + info_schema = DatabricksInformationSchema.from_relation(self, view_name) + return info_schema.incorporate(path={"schema": None}) diff --git a/dbt/include/databricks/macros/metadata.sql b/dbt/include/databricks/macros/metadata.sql new file mode 100644 index 000000000..42ee73033 --- /dev/null +++ b/dbt/include/databricks/macros/metadata.sql @@ -0,0 +1,32 @@ +{% macro databricks__get_relation_last_modified(information_schema, relations) -%} + + {%- call statement('last_modified', fetch_result=True) -%} + {% if information_schema.is_hive_metastore %} + {%- for relation in relations -%} + select '{{ relation.schema }}' as schema, + '{{ relation.identifier }}' as identifier, + max(timestamp) as last_modified, + {{ current_timestamp() }} as snapshotted_at + from (describe history {{ relation.schema }}.{{ relation.identifier }}) + {% if not loop.last %} + union all + {% endif %} + {%- endfor -%} + {% else %} + select table_schema as schema, + table_name as identifier, + last_altered as last_modified, + {{ current_timestamp() }} as snapshotted_at + from {{ information_schema }}.tables + where ( + {%- for relation in relations -%} + (table_schema = '{{ relation.schema }}' and + table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + {% endif %} + {%- endcall -%} + + {{ return(load_result('last_modified')) }} + +{% endmacro %} \ No newline at end of file diff --git a/tests/functional/adapter/test_source_freshness.py b/tests/functional/adapter/test_source_freshness.py new file mode 100644 index 000000000..ea8091571 --- /dev/null +++ b/tests/functional/adapter/test_source_freshness.py @@ -0,0 +1,53 @@ +import os +import pytest + +from dbt.tests.util import get_artifact, run_dbt + +freshness_via_metadata_schema_yml = """ +version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table +""" + + +class TestGetRelationLastModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_RELATION_TEST_SCHEMA"] + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_via_metadata_schema_yml} + + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + def test_get_relation_last_modified(self, project, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);" + ) + + run_dbt(["source", "freshness"]) + + sources = get_artifact("target/sources.json") + + assert sources["results"][0]["status"] == "pass"