Skip to content

Commit

Permalink
Implementing metadata freshness checks (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Oct 23, 2023
2 parents c901f8c + 8204482 commit 9127720
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.7.x (TBD)

### Features

- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

## dbt-databricks 1.7.0rc1 (October 13, 2023)

### Fixes
Expand Down
5 changes: 5 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.spark.impl import (
SparkAdapter,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
Expand Down Expand Up @@ -107,6 +108,10 @@ class DatabricksAdapter(SparkAdapter):

AdapterSpecificConfigs = DatabricksConfig

_capabilities = CapabilityDict(
{Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)}
)

@available.parse(lambda *a, **k: 0)
def compare_dbr_version(self, major: int, minor: int) -> int:
"""
Expand Down
22 changes: 21 additions & 1 deletion dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dbt.contracts.relation import (
ComponentName,
)
from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.base.relation import BaseRelation, Policy, InformationSchema
from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS
from dbt.dataclass_schema import StrEnum

Expand Down Expand Up @@ -37,6 +37,16 @@ class DatabricksRelationType(StrEnum):
StreamingTable = "streamingtable"


@dataclass(frozen=True, eq=False, repr=False)
class DatabricksInformationSchema(InformationSchema):
quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy())
include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy())
quote_character: str = "`"

def is_hive_metastore(self) -> bool:
return self.database is None or self.database == "hive_metastore"


@dataclass(frozen=True, eq=False, repr=False)
class DatabricksRelation(BaseRelation):
type: Optional[DatabricksRelationType] = None # type: ignore
Expand Down Expand Up @@ -115,3 +125,13 @@ def matches(
@classproperty
def get_relation_type(cls) -> Type[DatabricksRelationType]:
return DatabricksRelationType

def information_schema(self, view_name: Optional[str] = None) -> InformationSchema:
# some of our data comes from jinja, where things can be `Undefined`.
if not isinstance(view_name, str):
view_name = None

# Kick the user-supplied schema out of the information schema relation
# Instead address this as <database>.information_schema by default
info_schema = DatabricksInformationSchema.from_relation(self, view_name)
return info_schema.incorporate(path={"schema": None})
32 changes: 32 additions & 0 deletions dbt/include/databricks/macros/metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{% macro databricks__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
{% if information_schema.is_hive_metastore %}
{%- for relation in relations -%}
select '{{ relation.schema }}' as schema,
'{{ relation.identifier }}' as identifier,
max(timestamp) as last_modified,
{{ current_timestamp() }} as snapshotted_at
from (describe history {{ relation.schema }}.{{ relation.identifier }})
{% if not loop.last %}
union all
{% endif %}
{%- endfor -%}
{% else %}
select table_schema as schema,
table_name as identifier,
last_altered as last_modified,
{{ current_timestamp() }} as snapshotted_at
from {{ information_schema }}.tables
where (
{%- for relation in relations -%}
(table_schema = '{{ relation.schema }}' and
table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{% endif %}
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}
53 changes: 53 additions & 0 deletions tests/functional/adapter/test_source_freshness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import pytest

from dbt.tests.util import get_artifact, run_dbt

freshness_via_metadata_schema_yml = """
version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
"""


class TestGetRelationLastModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

def test_get_relation_last_modified(self, project, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);"
)

run_dbt(["source", "freshness"])

sources = get_artifact("target/sources.json")

assert sources["results"][0]["status"] == "pass"

0 comments on commit 9127720

Please sign in to comment.