Skip to content

Commit

Permalink
feat(dbt): emit column dependencies using sqlglot (#20407)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Makes use of the great `sqlglot` library to build column lineage metadata when executing a dbt project.

We do this in the following steps:

1. Retrieve the current dbt node's SQL file and its parents' column schemas.
2. Retrieve the column names from the current node. 
3. For each column, retrieve its dependencies on upstream columns from direct parents. Basically just invoke [`lineage`](https://sqlglot.com/sqlglot/lineage.html#lineage) from `sqlglot`)
4. Render the lineage as a JSON blob on the asset materialization for the dbt node.

To retrieve the dbt node's parents, and those corresponding nodes' column schemas, we augment our `dagster` dbt package implementation from #19623 to emit column schemas for the dbt node's parents. We make use of the dbt [`model`](https://docs.getdbt.com/reference/dbt-jinja-functions/model) variable to retrieve dbt node's refs/sources as relation objects to pass to [`adapter.get_columns_in_relation`](https://docs.getdbt.com/reference/dbt-jinja-functions/adapter#get_columns_in_relation).

## How I Tested These Changes
pytest
- assert expected column dependencies against jaffle shop
- assert expected column dependencies against executing a subset of jaffle shop
- assert expected column dependencies against executing a subset of jaffle shop with ambiguous column selection (e.g. `select *`)
  • Loading branch information
rexledesma authored and PedramNavid committed Mar 28, 2024
1 parent 33f4ed2 commit 0b09ef5
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 105 deletions.
18 changes: 9 additions & 9 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -571,18 +571,18 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

---

## Emit column schema as materialization metadata <Experimental />
## Emit column-level metadata as materialization metadata <Experimental />

<Note>
<strong>
Emitting column schema as materialization metadata is currently an
Emitting column-level metadata as materialization metadata is currently an
experimental feature.{" "}
</strong>{" "}
To use this feature, you'll need to be on at least `dagster==1.6.6` and
`dagster-dbt==0.22.6`.
To use this feature, you'll need to be on at least `dagster>=1.6.12` and
`dagster-dbt>=0.22.12`.
</Note>

Dagster allows you to emit column schema [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata), which includes the column names and data types of your materialized dbt models, seeds, and snapshots.
Dagster allows you to emit column-level metadata, like column schema and column dependencies, as [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata).

With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project.

Expand All @@ -595,20 +595,20 @@ packages:
revision: DAGSTER_VERSION # replace with the version of `dagster` you are using.
```

Then, enable the `dagster.log_columns_in_relation()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:
Then, enable the `dagster.log_column_level_metadata()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:

```yaml
models:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

seeds:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

snapshots:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"
```
---
Expand Down
3 changes: 3 additions & 0 deletions pyright/alt-1/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ asn1crypto==1.5.1
astroid==3.1.0
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
babel==2.14.0
backoff==2.2.1
Expand Down Expand Up @@ -256,6 +257,8 @@ snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
stack-data==0.6.3
starlette==0.37.2
Expand Down
4 changes: 4 additions & 0 deletions pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ asn1crypto==1.5.1
-e examples/assets_pandas_pyspark
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
autodocsumm==0.2.12
autoflake==2.3.1
Expand Down Expand Up @@ -196,6 +197,7 @@ duckdb==0.10.1
ecdsa==0.18.0
email-validator==1.3.1
entrypoints==0.4
exceptiongroup==1.2.0
execnet==2.0.2
executing==2.0.1
expandvars==0.12.0
Expand Down Expand Up @@ -509,6 +511,8 @@ sphinxcontrib-serializinghtml==1.1.10
sqlalchemy==1.4.52
sqlalchemy-jsonfield==1.0.2
sqlalchemy-utils==0.41.1
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
sshpubkeys==3.3.1
sshtunnel==0.4.0
Expand Down
145 changes: 140 additions & 5 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand All @@ -27,10 +29,12 @@
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetMaterialization,
AssetObservation,
AssetsDefinition,
ConfigurableResource,
JsonMetadataValue,
OpExecutionContext,
Output,
get_dagster_logger,
Expand All @@ -43,6 +47,13 @@
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, validator
from sqlglot import (
MappingSchema,
exp,
parse_one,
)
from sqlglot.lineage import lineage
from sqlglot.optimizer import optimize
from typing_extensions import Literal

from ..asset_utils import (
Expand Down Expand Up @@ -104,19 +115,28 @@ def log_level(self) -> str:
"""The log level of the event."""
return self.raw_event["info"]["level"]

@property
def has_column_lineage_metadata(self) -> bool:
"""Whether the event has column level lineage metadata."""
return bool(self._event_history_metadata) and "parents" in self._event_history_metadata

@public
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
context: Optional[OpExecutionContext] = None,
target_path: Optional[Path] = None,
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
"""Convert a dbt CLI event to a set of corresponding Dagster events.
Args:
manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
linking dbt nodes to Dagster assets.
context (Optional[OpExecutionContext]): The execution context.
target_path (Optional[Path]): An explicit path to a target folder used to retrieve
dbt artifacts while generating events.
Returns:
Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
Expand Down Expand Up @@ -175,6 +195,16 @@ def to_default_asset_events(
finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
duration_seconds = (finished_at - started_at).total_seconds()

lineage_metadata = (
self._build_column_lineage_metadata(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
target_path=target_path,
)
if target_path
else {}
)

if has_asset_def:
yield Output(
value=None,
Expand All @@ -183,6 +213,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
else:
Expand All @@ -195,6 +226,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
Expand Down Expand Up @@ -286,6 +318,107 @@ def _process_adapter_response_metadata(

return processed_adapter_response

def _build_column_lineage_metadata(
self,
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator,
target_path: Path,
) -> Dict[str, Any]:
"""Process the lineage metadata for a dbt CLI event.
Args:
manifest (Mapping[str, Any]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): The translator for dbt nodes to Dagster assets.
target_path (Path): The path to the dbt target folder.
Returns:
Dict[str, Any]: The lineage metadata.
"""
if (
# The dbt project name is only available from the manifest in `dbt-core>=1.6`.
version.parse(dbt_version) < version.parse("1.6.0")
# Column lineage can only be built if initial metadata is provided.
or not self.has_column_lineage_metadata
):
return {}

event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
unique_id: str = event_node_info["unique_id"]
dbt_resource_props: Dict[str, Any] = manifest["nodes"][unique_id]

# If the unique_id is a seed, then we don't need to process lineage.
if unique_id.startswith("seed"):
return {}

# 1. Retrieve the current node's SQL file and its parents' column schemas.
sqlglot_mapping_schema = MappingSchema()
for relation_name, relation_metadata in self._event_history_metadata["parents"].items():
sqlglot_mapping_schema.add_table(
table=relation_name,
column_mapping={
column_name: column_metadata["data_type"]
for column_name, column_metadata in relation_metadata["columns"].items()
},
)

node_sql_path = target_path.joinpath(
"run", manifest["metadata"]["project_name"], dbt_resource_props["original_file_path"]
)
node_ast = parse_one(sql=node_sql_path.read_text()).expression
optimized_node_ast = cast(
exp.Query,
optimize(
node_ast,
schema=sqlglot_mapping_schema,
validate_qualify_columns=False, # Don't throw an error if we can't qualify a column without ambiguity.
),
)

# 2. Retrieve the column names from the current node.
column_names = cast(exp.Query, optimized_node_ast).named_selects

# 3. For each column, retrieve its dependencies on upstream columns from direct parents.
# TODO: this should be refactored as a Python object that renders the JSON metadata.
lineage_metadata: Dict[str, List[Tuple[AssetKey, str]]] = {}
for column_name in column_names:
dbt_parent_resource_props_by_alias: Dict[str, Dict[str, Any]] = {
parent_dbt_resource_props["alias"]: parent_dbt_resource_props
for parent_dbt_resource_props in map(
lambda parent_unique_id: manifest["nodes"][parent_unique_id],
dbt_resource_props["depends_on"]["nodes"],
)
}

parent_columns: Set[Tuple[AssetKey, str]] = set()
for sqlglot_lineage_node in lineage(
column=column_name, sql=optimized_node_ast, schema=sqlglot_mapping_schema
).walk():
column = sqlglot_lineage_node.expression.find(exp.Column)
if column and column.table in dbt_parent_resource_props_by_alias:
parent_resource_props = dbt_parent_resource_props_by_alias[column.table]
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_resource_props)

parent_columns.add((parent_asset_key, column.name))

lineage_metadata[column_name] = list(parent_columns)

# 4. Render the lineage as a JSON blob.
# TODO: this should just call a method on a Python object that renders the JSON metadata.
return {
"dagster/lineage": JsonMetadataValue(
{
column_name: [
{
"upstream_asset_key": parent_asset_key,
"upstream_column_name": parent_column_name,
}
for parent_asset_key, parent_column_name in parent_columns
]
for column_name, parent_columns in lineage_metadata.items()
}
)
}


@dataclass
class DbtCliInvocation:
Expand Down Expand Up @@ -456,6 +589,7 @@ def my_dbt_assets(context, dbt: DbtCliResource):
manifest=self.manifest,
dagster_dbt_translator=self.dagster_dbt_translator,
context=self.context,
target_path=self.target_path,
)

@public
Expand Down Expand Up @@ -489,14 +623,15 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the columns metadata from the event message.
# Attempt to parse the column level metadata from the event message.
# If it exists, save it as historical metadata to attach to the NodeFinished event.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
event_history_metadata_by_unique_id[cast(str, unique_id)] = {
"columns": columns
}
column_level_metadata = orjson.loads(event.raw_event["info"]["msg"])

event_history_metadata_by_unique_id[cast(str, unique_id)] = (
column_level_metadata
)

# Don't show this message in stdout
continue
Expand Down
24 changes: 13 additions & 11 deletions python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def disable_openblas_threading_affinity_fixture() -> None:
os.environ["GOTOBLAS_MAIN_FREE"] = "1"


def _create_dbt_invocation(project_dir: Path) -> DbtCliInvocation:
def _create_dbt_invocation(project_dir: Path, build_project: bool = False) -> DbtCliInvocation:
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])

dbt.cli(["deps"]).wait()
dbt_invocation = dbt.cli(["compile"]).wait()

if build_project:
dbt.cli(["build"], raise_on_error=False).wait()

return dbt_invocation


Expand All @@ -83,15 +86,10 @@ def test_jaffle_shop_manifest_fixture(
@pytest.fixture(name="test_asset_checks_manifest", scope="session")
def test_asset_checks_manifest_fixture() -> Dict[str, Any]:
# Prepopulate duckdb with jaffle shop data to support testing individual asset checks.
(
DbtCliResource(
project_dir=os.fspath(test_asset_checks_path), global_config_flags=["--quiet"]
)
.cli(["build"], raise_on_error=False)
.wait()
)

return _create_dbt_invocation(test_asset_checks_path).get_artifact("manifest.json")
return _create_dbt_invocation(
test_asset_checks_path,
build_project=True,
).get_artifact("manifest.json")


@pytest.fixture(name="test_asset_key_exceptions_manifest", scope="session")
Expand Down Expand Up @@ -138,4 +136,8 @@ def test_meta_config_manifest_fixture() -> Dict[str, Any]:

@pytest.fixture(name="test_metadata_manifest", scope="session")
def test_metadata_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_invocation(test_metadata_path).get_artifact("manifest.json")
# Prepopulate duckdb with jaffle shop data to support testing individual column metadata.
return _create_dbt_invocation(
test_metadata_path,
build_project=True,
).get_artifact("manifest.json")
Loading

0 comments on commit 0b09ef5

Please sign in to comment.