Skip to content

Commit

Permalink
feat(dbt): emit column lineage using sqlglot
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 20, 2024
1 parent 0f56e94 commit 4f0105d
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 105 deletions.
18 changes: 9 additions & 9 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -571,18 +571,18 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

---

## Emit column schema as materialization metadata <Experimental />
## Emit column-level metadata as materialization metadata <Experimental />

<Note>
<strong>
Emitting column schema as materialization metadata is currently an
Emitting column-level metadata as materialization metadata is currently an
experimental feature.{" "}
</strong>{" "}
To use this feature, you'll need to be on at least `dagster==1.6.6` and
`dagster-dbt==0.22.6`.
To use this feature, you'll need to be on at least `dagster>=1.6.12` and
`dagster-dbt>=0.22.12`.
</Note>

Dagster allows you to emit column schema [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata), which includes the column names and data types of your materialized dbt models, seeds, and snapshots.
Dagster allows you to emit column-level metadata, like column schema and column dependencies, as [materialization metadata](/concepts/assets/software-defined-assets#recording-materialization-metadata).

With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project.

Expand All @@ -595,20 +595,20 @@ packages:
revision: DAGSTER_VERSION # replace with the version of `dagster` you are using.
```

Then, enable the `dagster.log_columns_in_relation()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:
Then, enable the `dagster.log_column_level_metadata()` macro as a [post-hook](https://docs.getdbt.com/reference/resource-configs/pre-hook-post-hook) for the dbt resources that should emit column schema metadata. For example, adding the following snippet in `dbt_project.yml` enables this macro for all dbt models, seeds, and snapshots:

```yaml
models:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

seeds:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"

snapshots:
+post-hook:
- "{{ dagster.log_columns_in_relation() }}"
- "{{ dagster.log_column_level_metadata() }}"
```
---
Expand Down
3 changes: 3 additions & 0 deletions pyright/alt-1/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ asn1crypto==1.5.1
astroid==3.1.0
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
babel==2.14.0
backoff==2.2.1
Expand Down Expand Up @@ -256,6 +257,8 @@ snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
stack-data==0.6.3
starlette==0.37.2
Expand Down
4 changes: 4 additions & 0 deletions pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ asn1crypto==1.5.1
-e examples/assets_pandas_pyspark
asttokens==2.4.1
async-lru==2.0.4
async-timeout==4.0.3
attrs==23.2.0
autodocsumm==0.2.12
autoflake==2.3.1
Expand Down Expand Up @@ -196,6 +197,7 @@ duckdb==0.10.1
ecdsa==0.18.0
email-validator==1.3.1
entrypoints==0.4
exceptiongroup==1.2.0
execnet==2.0.2
executing==2.0.1
expandvars==0.12.0
Expand Down Expand Up @@ -509,6 +511,8 @@ sphinxcontrib-serializinghtml==1.1.10
sqlalchemy==1.4.52
sqlalchemy-jsonfield==1.0.2
sqlalchemy-utils==0.41.1
sqlglot==22.3.1
sqlglotrs==0.1.2
sqlparse==0.4.4
sshpubkeys==3.3.1
sshtunnel==0.4.0
Expand Down
145 changes: 140 additions & 5 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand All @@ -27,10 +29,12 @@
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetMaterialization,
AssetObservation,
AssetsDefinition,
ConfigurableResource,
JsonMetadataValue,
OpExecutionContext,
Output,
get_dagster_logger,
Expand All @@ -43,6 +47,13 @@
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, validator
from sqlglot import (
MappingSchema,
exp,
parse_one,
)
from sqlglot.lineage import lineage
from sqlglot.optimizer import optimize
from typing_extensions import Literal

from ..asset_utils import (
Expand Down Expand Up @@ -104,19 +115,28 @@ def log_level(self) -> str:
"""The log level of the event."""
return self.raw_event["info"]["level"]

@property
def has_column_lineage_metadata(self) -> bool:
"""Whether the event has column level lineage metadata."""
return bool(self._event_history_metadata) and "parents" in self._event_history_metadata

@public
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
context: Optional[OpExecutionContext] = None,
target_path: Optional[Path] = None,
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
"""Convert a dbt CLI event to a set of corresponding Dagster events.
Args:
manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
linking dbt nodes to Dagster assets.
context (Optional[OpExecutionContext]): The execution context.
target_path (Optional[Path]): An explicit path to a target folder used to retrieve
dbt artifacts while generating events.
Returns:
Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]:
Expand Down Expand Up @@ -175,6 +195,16 @@ def to_default_asset_events(
finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
duration_seconds = (finished_at - started_at).total_seconds()

lineage_metadata = (
self._build_column_lineage_metadata(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
target_path=target_path,
)
if target_path
else {}
)

if has_asset_def:
yield Output(
value=None,
Expand All @@ -183,6 +213,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
else:
Expand All @@ -195,6 +226,7 @@ def to_default_asset_events(
**default_metadata,
"Execution Duration": duration_seconds,
**adapter_response_metadata,
**lineage_metadata,
},
)
elif manifest and node_resource_type == NodeType.Test and is_node_finished:
Expand Down Expand Up @@ -286,6 +318,107 @@ def _process_adapter_response_metadata(

return processed_adapter_response

def _build_column_lineage_metadata(
self,
manifest: Mapping[str, Any],
dagster_dbt_translator: DagsterDbtTranslator,
target_path: Path,
) -> Dict[str, Any]:
"""Process the lineage metadata for a dbt CLI event.
Args:
manifest (Mapping[str, Any]): The dbt manifest blob.
dagster_dbt_translator (DagsterDbtTranslator): The translator for dbt nodes to Dagster assets.
target_path (Path): The path to the dbt target folder.
Returns:
Dict[str, Any]: The lineage metadata.
"""
if (
# The dbt project name is only available from the manifest in `dbt-core>=1.6`.
version.parse(dbt_version) < version.parse("1.6.0")
# Column lineage can only be built if initial metadata is provided.
or not self.has_column_lineage_metadata
):
return {}

event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
unique_id: str = event_node_info["unique_id"]
dbt_resource_props: Dict[str, Any] = manifest["nodes"][unique_id]

# If the unique_id is a seed, then we don't need to process lineage.
if unique_id.startswith("seed"):
return {}

# 1. Retrieve the current node's SQL file and its parents' column schemas.
sqlglot_mapping_schema = MappingSchema()
for relation_name, relation_metadata in self._event_history_metadata["parents"].items():
sqlglot_mapping_schema.add_table(
table=relation_name,
column_mapping={
column_name: column_metadata["data_type"]
for column_name, column_metadata in relation_metadata["columns"].items()
},
)

node_sql_path = target_path.joinpath(
"run", manifest["metadata"]["project_name"], dbt_resource_props["original_file_path"]
)
node_ast = parse_one(sql=node_sql_path.read_text()).expression
optimized_node_ast = cast(
exp.Query,
optimize(
node_ast,
schema=sqlglot_mapping_schema,
validate_qualify_columns=False, # Don't throw an error if we can't qualify a column without ambiguity.
),
)

# 2. Retrieve the column names from the current node.
column_names = cast(exp.Query, optimized_node_ast).named_selects

# 3. For each column, retrieve its dependencies on upstream columns from direct parents.
# TODO: this should be refactored as a Python object that renders the JSON metadata.
lineage_metadata: Dict[str, List[Tuple[AssetKey, str]]] = {}
for column_name in column_names:
dbt_parent_resource_props_by_alias: Dict[str, Dict[str, Any]] = {
parent_dbt_resource_props["alias"]: parent_dbt_resource_props
for parent_dbt_resource_props in map(
lambda parent_unique_id: manifest["nodes"][parent_unique_id],
dbt_resource_props["depends_on"]["nodes"],
)
}

parent_columns: Set[Tuple[AssetKey, str]] = set()
for sqlglot_lineage_node in lineage(
column=column_name, sql=optimized_node_ast, schema=sqlglot_mapping_schema
).walk():
column = sqlglot_lineage_node.expression.find(exp.Column)
if column and column.table in dbt_parent_resource_props_by_alias:
parent_resource_props = dbt_parent_resource_props_by_alias[column.table]
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_resource_props)

parent_columns.add((parent_asset_key, column.name))

lineage_metadata[column_name] = list(parent_columns)

# 4. Render the lineage as a JSON blob.
# TODO: this should just call a method on a Python object that renders the JSON metadata.
return {
"dagster/column_lineage": JsonMetadataValue(
{
column_name: [
{
"upstream_asset_key": parent_asset_key,
"upstream_column_name": parent_column_name,
}
for parent_asset_key, parent_column_name in parent_columns
]
for column_name, parent_columns in lineage_metadata.items()
}
)
}


@dataclass
class DbtCliInvocation:
Expand Down Expand Up @@ -456,6 +589,7 @@ def my_dbt_assets(context, dbt: DbtCliResource):
manifest=self.manifest,
dagster_dbt_translator=self.dagster_dbt_translator,
context=self.context,
target_path=self.target_path,
)

@public
Expand Down Expand Up @@ -489,14 +623,15 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

# Attempt to parse the columns metadata from the event message.
# Attempt to parse the column level metadata from the event message.
# If it exists, save it as historical metadata to attach to the NodeFinished event.
if event.raw_event["info"]["name"] == "JinjaLogInfo":
with contextlib.suppress(orjson.JSONDecodeError):
columns = orjson.loads(event.raw_event["info"]["msg"])
event_history_metadata_by_unique_id[cast(str, unique_id)] = {
"columns": columns
}
column_level_metadata = orjson.loads(event.raw_event["info"]["msg"])

event_history_metadata_by_unique_id[cast(str, unique_id)] = (
column_level_metadata
)

# Don't show this message in stdout
continue
Expand Down
24 changes: 13 additions & 11 deletions python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def disable_openblas_threading_affinity_fixture() -> None:
os.environ["GOTOBLAS_MAIN_FREE"] = "1"


def _create_dbt_invocation(project_dir: Path) -> DbtCliInvocation:
def _create_dbt_invocation(project_dir: Path, build_project: bool = False) -> DbtCliInvocation:
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])

dbt.cli(["deps"]).wait()
dbt_invocation = dbt.cli(["compile"]).wait()

if build_project:
dbt.cli(["build"], raise_on_error=False).wait()

return dbt_invocation


Expand All @@ -83,15 +86,10 @@ def test_jaffle_shop_manifest_fixture(
@pytest.fixture(name="test_asset_checks_manifest", scope="session")
def test_asset_checks_manifest_fixture() -> Dict[str, Any]:
# Prepopulate duckdb with jaffle shop data to support testing individual asset checks.
(
DbtCliResource(
project_dir=os.fspath(test_asset_checks_path), global_config_flags=["--quiet"]
)
.cli(["build"], raise_on_error=False)
.wait()
)

return _create_dbt_invocation(test_asset_checks_path).get_artifact("manifest.json")
return _create_dbt_invocation(
test_asset_checks_path,
build_project=True,
).get_artifact("manifest.json")


@pytest.fixture(name="test_asset_key_exceptions_manifest", scope="session")
Expand Down Expand Up @@ -138,4 +136,8 @@ def test_meta_config_manifest_fixture() -> Dict[str, Any]:

@pytest.fixture(name="test_metadata_manifest", scope="session")
def test_metadata_manifest_fixture() -> Dict[str, Any]:
return _create_dbt_invocation(test_metadata_path).get_artifact("manifest.json")
# Prepopulate duckdb with jaffle shop data to support testing individual column metadata.
return _create_dbt_invocation(
test_metadata_path,
build_project=True,
).get_artifact("manifest.json")
Loading

0 comments on commit 4f0105d

Please sign in to comment.